diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index cc612eb9459b81e5be3501e20bafd92ea23d3dc3..6781ce19d4b9874d4a6734be252c49c5b9291366 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,6 +1,7 @@ image: "flowdalic/debian-testing-dev:1.14" before_script: + - ulimit -a - | readarray TOOLS <<EOF c++ @@ -26,7 +27,6 @@ variables: CC: gcc CXX: g++ EXTRA_NINJA_ARGS: -v - EMPER_IO: "false" stages: - smoke-test @@ -101,6 +101,18 @@ clang-tidy: variables: EMPER_USERSPACE_RCU: 'true' +.emper-pipe-sleep-strategy: + variables: + EMPER_WORKER_SLEEP_STRATEGY: 'pipe' + +.emper-no-completer: + variables: + EMPER_IO_COMPLETER_BEHAVIOR: 'none' + +.emper-single-uring: + variables: + EMPER_IO_SINGLE_URING: 'true' + .default-library-static: variables: EMPER_DEFAULT_LIBRARY: 'static' @@ -272,3 +284,19 @@ test-mmapped-log: extends: - .meson-test script: make && EMPER_LOG_FILE=emper.log make test + +test-single-uring: + extends: + - .test + - .emper-single-uring + +test-pipe-sleep-strategy: + extends: + - .test + - .emper-pipe-sleep-strategy + +test-pipe-sleep-strategy-no-completer: + extends: + - .test + - .emper-pipe-sleep-strategy + - .emper-no-completer diff --git a/emper/io.hpp b/emper/io.hpp index b36b287bdb30570fe6dd88831957866200007101..db52092befe20fda4dc727551448e99b526097cc 100644 --- a/emper/io.hpp +++ b/emper/io.hpp @@ -645,7 +645,7 @@ struct SockOpt { socklen_t val_len; static const int ENABLE; - static const SockOpt ReUsePort; + static const SockOpt ReusePort; }; /** @@ -660,7 +660,7 @@ struct SockOpt { * * @return nullptr on error, otherwise the TCP listener Fiber */ -auto tcp_listener(std::string &host, std::string &port, const std::function<void(int)> &handler, - int backlog = 1024, const std::vector<SockOpt> &sockopts = std::vector<SockOpt>()) - -> Fiber *; +auto tcp_listener(const std::string &host, const std::string &port, + const std::function<void(int)> &handler, int backlog = 1024, + const std::vector<SockOpt> &sockopts = std::vector<SockOpt>()) -> Fiber *; } // namespace emper::io diff --git a/emper/io/io.cpp b/emper/io/io.cpp index bba906cd09f79cb0a91d75d2eab07817a6e2e9a7..f7ef9aa225acefd1d6e3111cc1c53e758d14b086 100644 --- a/emper/io/io.cpp +++ b/emper/io/io.cpp @@ -24,7 +24,7 @@ namespace fs = std::filesystem; namespace emper::io { const int SockOpt::ENABLE = 1; -const SockOpt SockOpt::ReUsePort = {SOL_SOCKET, SO_REUSEPORT, &SockOpt::ENABLE, +const SockOpt SockOpt::ReusePort = {SOL_SOCKET, SO_REUSEPORT, &SockOpt::ENABLE, sizeof(SockOpt::ENABLE)}; static void recursive_directory_walk_helper( @@ -58,8 +58,9 @@ void recursive_directory_walk(const fs::path& dirpath, cps.wait(); } -auto tcp_listener(std::string& host, std::string& port, const std::function<void(int)>& handler, - int backlog, const std::vector<SockOpt>& sockopts) -> Fiber* { +auto tcp_listener(const std::string& host, const std::string& port, + const std::function<void(int)>& handler, int backlog, + const std::vector<SockOpt>& sockopts) -> Fiber* { int listen_socket; struct addrinfo* servaddr; diff --git a/tests/fixtures/network.cpp b/tests/fixtures/network.cpp index 5bbf01ce3c250d50c26f1b9fdf0bd3657f435f41..c21cf7c3f30b2d1c30ebce01518de8bac7c782c5 100644 --- a/tests/fixtures/network.cpp +++ b/tests/fixtures/network.cpp @@ -48,8 +48,8 @@ void echo_serve(int client_socket) { } } -void echo_client(std::string& host, std::string& port, const std::vector<std::string>& msgs, - size_t iterations) { +void echo_client(const std::string& host, const std::string& port, + const std::vector<std::string>& msgs, size_t iterations) { struct addrinfo* server; int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server); diff --git a/tests/fixtures/network.hpp b/tests/fixtures/network.hpp index 8c50c1532c6d2bb5c72a384804d61af7f0146336..d5137b16d9a55255eef57803958331b69c7c0ecd 100644 --- a/tests/fixtures/network.hpp +++ b/tests/fixtures/network.hpp @@ -8,5 +8,5 @@ void echo_serve(int client_socket); -void echo_client(std::string& host, std::string& port, const std::vector<std::string>& msgs, - size_t iterations = 1); +void echo_client(const std::string& host, const std::string& port, + const std::vector<std::string>& msgs, size_t iterations = 1); diff --git a/tests/io/ConcurrentNetworkEchoTest.cpp b/tests/io/ConcurrentNetworkEchoTest.cpp index 4f3b8d23c22557b605a5309cd71cb498c86c15e0..30d2c864ac931b9dcef269e3196990ba4594776f 100644 --- a/tests/io/ConcurrentNetworkEchoTest.cpp +++ b/tests/io/ConcurrentNetworkEchoTest.cpp @@ -22,7 +22,7 @@ auto main(int argc, char* argv[]) -> int { } unsigned int client_count = 10; - unsigned int echos = 1000; + unsigned int echos = 500; std::string port = "4241"; std::string host = "127.0.0.1"; @@ -43,7 +43,7 @@ auto main(int argc, char* argv[]) -> int { Runtime runtime; Fiber* listener = - emper::io::tcp_listener(host, port, echo_serve, 1024, {emper::io::SockOpt::ReUsePort}); + emper::io::tcp_listener(host, port, echo_serve, 1024, {emper::io::SockOpt::ReusePort}); if (!listener) { DIE_MSG("failed to create listener"); } diff --git a/tests/io/SimpleDiskAndNetworkTest.cpp b/tests/io/SimpleDiskAndNetworkTest.cpp index 99bf37e22293c8c2ed6f7078b1dace3bca325d24..89e0db0c1562f2c28f9b82d41035e36615ba3be2 100644 --- a/tests/io/SimpleDiskAndNetworkTest.cpp +++ b/tests/io/SimpleDiskAndNetworkTest.cpp @@ -1,46 +1,37 @@ // SPDX-License-Identifier: LGPL-3.0-or-later // Copyright © 2020-2021 Florian Fischer -#include <arpa/inet.h> // for inet_addr #include <bits/types/struct_iovec.h> // for iovec #include <fcntl.h> // for open, O_RDONLY -#include <netinet/in.h> // for sockaddr_in, htons, in_addr -#include <sys/socket.h> // for bind, listen, setsockopt -#include <unistd.h> // for close +#include <sys/types.h> +#include <array> #include <cstdlib> // for mkstemp #include <cstring> // for memcmp, memset #include <memory> // for allocator, unique_ptr #include <string> // for string #include <vector> // for vector -#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG -#include "CountingPrivateSemaphore.hpp" // for CPS +#include "Common.hpp" +#include "Debug.hpp" #include "Runtime.hpp" -#include "emper.hpp" #include "fixtures/assert.hpp" #include "fixtures/network.hpp" #include "io.hpp" #include "io/Future.hpp" +class Fiber; -#define PORT 4243 +using namespace emper::io; + +static const std::string HOST = "127.0.0.1"; +static const std::string PORT = "4243"; #define MAX 1024 -static void server_func(int sockfd) { - struct sockaddr_in clientaddr; - socklen_t clientaddr_len = sizeof(clientaddr); - auto client_fd = emper::io::acceptAndWait(sockfd, reinterpret_cast<struct sockaddr*>(&clientaddr), - &clientaddr_len); - - if (client_fd < 0) { - DIE_MSG_ERRNO("accept failed"); - } - // NOLINTNEXTLINE(modernize-avoid-c-arrays) - char recv_buf[MAX]; - // NOLINTNEXTLINE(modernize-avoid-c-arrays) - char read_buf[MAX]; +static void server_func(int client_fd) { + std::array<char, MAX> recv_buf; + std::array<char, MAX> read_buf; for (;;) { - ssize_t received = emper::io::recvAndWait(client_fd, recv_buf, sizeof(recv_buf), 0); + ssize_t received = recvAndWait(client_fd, recv_buf.data(), recv_buf.size(), 0); if (received == 0) { Runtime::getRuntime()->initiateTermination(); return; @@ -51,6 +42,8 @@ static void server_func(int sockfd) { break; } + DBG("Server got msg from client -> writing to file"); + // NOLINTNEXTLINE(modernize-avoid-c-arrays) char file_name[] = "/tmp/emper-SimpleDiskAndNetworkTestFile-XXXXXX"; int file_fd = mkstemp(file_name); @@ -58,18 +51,20 @@ static void server_func(int sockfd) { DIE_MSG_ERRNO("mkstemp failed"); } - ssize_t written = emper::io::writeFileAndWait(file_fd, recv_buf, received); + ssize_t written = writeFileAndWait(file_fd, recv_buf.data(), received); if (written < 0) { DIE_MSG_ERRNO("write failed"); } - close(file_fd); + closeAndWait(file_fd); - file_fd = emper::io::openAndWait(file_name, O_RDONLY); + DBG("Msg written to file"); + + file_fd = openAndWait(file_name, O_RDONLY); if (file_fd == -1) { DIE_MSG_ERRNO("open failed"); } - ssize_t bytes_read = emper::io::readFileAndWait(file_fd, read_buf, written); + ssize_t bytes_read = readFileAndWait(file_fd, read_buf.data(), written); if (bytes_read == 0) { DIE_MSG("nothing to read"); } @@ -77,9 +72,11 @@ static void server_func(int sockfd) { if (bytes_read < 0) { DIE_MSG_ERRNO("read failed"); } - close(file_fd); + closeAndWait(file_fd); + + DBG("Msg read from file"); - ssize_t sent = emper::io::sendAndWait(client_fd, read_buf, bytes_read, 0); + ssize_t sent = sendAndWait(client_fd, read_buf.data(), bytes_read, 0); if (sent == 0) { DIE_MSG("client socket unexpected shutdown"); } @@ -88,94 +85,79 @@ static void server_func(int sockfd) { DIE_MSG_ERRNO("send failed"); } - // NOLINTNEXTLINE(modernize-avoid-c-arrays) - char file2_name[] = "/tmp/emper-SimpleDiskAndNetworkTestFile-XXXXXX"; - file_fd = mkstemp(file2_name); - if (file_fd == -1) { - DIE_MSG_ERRNO("mkstemp failed"); - } + DBG("Msg sent back to client"); + } +} - const int iovcnt = 2; - // NOLINTNEXTLINE(modernize-avoid-c-arrays) - struct iovec iov[iovcnt]; +static void testDiskAndNetwork() { + Fiber* listener = tcp_listener(HOST, PORT, server_func, 1024, {SockOpt::ReusePort}); + if (!listener) { + DIE_MSG("creating listener failed"); + } + Runtime::getRuntime()->schedule(*listener); - std::string s1 = "foo"; - std::string s2 = "bar"; + const std::vector<std::string> msgs{"foo", "bar"}; + echo_client(HOST, PORT, msgs); +} - iov[0].iov_base = (void*)s1.c_str(); - iov[0].iov_len = s1.length(); - iov[1].iov_base = (void*)s2.c_str(); - iov[1].iov_len = s2.length(); +static void testIov() { + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char file_name[] = "/tmp/emper-SimpleDiskAndNetworkTestFile-XXXXXX"; + int file_fd = mkstemp(file_name); + if (file_fd == -1) { + DIE_MSG_ERRNO("mkstemp failed"); + } - auto writevFuture = emper::io::writev(file_fd, &iov[0], iovcnt); - written = writevFuture->waitAndSetErrno(); - if (written < 0) { - DIE_MSG_ERRNO("wrtev failed"); - } - close(file_fd); + DBG("Opened temp file to test writev"); - file_fd = emper::io::openAndWait(file2_name, O_RDONLY); - if (file_fd == -1) { - DIE_MSG_ERRNO("open failed"); - } + std::array<char, MAX> read_buf; - auto readFuture = emper::io::readFile(file_fd, read_buf, written, 0, true); - bytes_read = readFuture->waitAndSetErrno(); - if (bytes_read == 0) { - DIE_MSG("nothing to read"); - } + const int iovcnt = 2; + std::array<struct iovec, iovcnt> iov; - if (bytes_read < 0) { - DIE_MSG_ERRNO("read failed"); - } + std::string s1 = "foo"; + std::string s2 = "bar"; - ASSERT(written == bytes_read); + iov[0].iov_base = (void*)s1.c_str(); + iov[0].iov_len = s1.length(); + iov[1].iov_base = (void*)s2.c_str(); + iov[1].iov_len = s2.length(); - ASSERT(memcmp(read_buf, iov[0].iov_base, iov[0].iov_len) == 0); - ASSERT(memcmp((char*)read_buf + iov[0].iov_len, iov[1].iov_base, iov[1].iov_len) == 0); + DBG("Prepared iov"); - close(file_fd); + auto writevFuture = emper::io::writev(file_fd, &iov[0], iovcnt); + ssize_t written = writevFuture->waitAndSetErrno(); + if (written < 0) { + DIE_MSG_ERRNO("writev failed"); } -} + closeAndWait(file_fd); -void emperTest() { - int sockfd; - struct sockaddr_in servaddr; - - // socket creation and verification - sockfd = socket(AF_INET, SOCK_STREAM, 0); - if (sockfd == -1) { - DIE_MSG_ERRNO("socket creation failed"); + file_fd = openAndWait(file_name, O_RDONLY); + if (file_fd == -1) { + DIE_MSG_ERRNO("open failed"); } - memset(&servaddr, 0, sizeof(servaddr)); - // assign IP, PORT - servaddr.sin_family = AF_INET; - servaddr.sin_addr.s_addr = inet_addr("127.0.0.1"); - servaddr.sin_port = htons(PORT); + DBG("Reread written data"); - int enable = 1; - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)) == -1) { - DIE_MSG_ERRNO("setsockopt SO_REUSEPORT failed"); - } - if (bind(sockfd, (sockaddr*)&servaddr, sizeof(servaddr)) == -1) { - DIE_MSG_ERRNO("bind failed"); + auto readFuture = readFile(file_fd, read_buf.data(), written, 0, true); + ssize_t bytes_read = readFuture->waitAndSetErrno(); + if (bytes_read == 0) { + DIE_MSG("nothing to read"); } - if (listen(sockfd, 1) != 0) { - DIE_MSG_ERRNO("listen failed"); + + if (bytes_read < 0) { + DIE_MSG_ERRNO("read failed"); } - CPS cps; - spawn([=] { server_func(sockfd); }, cps); + ASSERT(written == bytes_read); + + ASSERT(memcmp(read_buf.data(), iov[0].iov_base, iov[0].iov_len) == 0); + ASSERT(memcmp(read_buf.data() + iov[0].iov_len, iov[1].iov_base, iov[1].iov_len) == 0); - spawn( - [] { - const std::vector<std::string> msgs{"foo", "bar"}; - std::string port = std::to_string(PORT); - std::string host("127.0.0.1"); - echo_client(host, port, msgs); - }, - cps); + closeAndWait(file_fd); +} - cps.wait(); +void emperTest() { + testDiskAndNetwork(); + testIov(); } diff --git a/tests/io/SimpleNetworkTest.cpp b/tests/io/SimpleNetworkTest.cpp index 3040f69ba711bb9ad36bb19410658421b9e4a94c..4f1f76c4afe72210d19f11ae18132891d6bb5e39 100644 --- a/tests/io/SimpleNetworkTest.cpp +++ b/tests/io/SimpleNetworkTest.cpp @@ -19,7 +19,7 @@ void emperTest() { CPS cps; Fiber* listener = - emper::io::tcp_listener(addr, port, echo_serve, 1024, {emper::io::SockOpt::ReUsePort}); + emper::io::tcp_listener(addr, port, echo_serve, 1024, {emper::io::SockOpt::ReusePort}); if (!listener) { DIE_MSG("failed to create listener"); }