diff --git a/apps/EchoClient.cpp b/apps/EchoClient.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3240643f2953f02539ac8b39c6cf5f7513861158 --- /dev/null +++ b/apps/EchoClient.cpp @@ -0,0 +1,240 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include <netdb.h> //for getaddrinfo +#include <sys/socket.h> // for shutdown, socket, AF_INET + +#include <algorithm> // for find +#include <cerrno> // for errno, ECANCELED +#include <chrono> // for nanoseconds, duration, durat... +#include <cstdint> // for uint64_t, int32_t +#include <cstdlib> // for size_t, strtol, exit, EXIT_F... +#include <cstring> // for memcmp +#include <iostream> // for operator<<, basic_ostream, endl +#include <string> // for allocator, string, char_traits + +#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli... +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "Debug.hpp" // for LOGE +#include "Fiber.hpp" // for Fiber +#include "Runtime.hpp" // for Runtime +#include "Semaphore.hpp" // for Semaphore +#include "emper.hpp" // for spawn +#include "io.hpp" // for connectAndWait +#include "io/Future.hpp" // for CloseFuture, RecvFuture, Sen... + +using emper::Semaphore; +using emper::io::CloseFuture; +using emper::io::RecvFuture; +using emper::io::SendFuture; + +// Defaults +const int DECIMAL = 10; +const std::string HOST = "0.0.0.0"; +const std::string PORT = "12345"; +const size_t ITERATIONS = 10000; +const size_t CLIENTS = 10000; +const size_t SIZE = 32; + +static auto clientFunc(struct addrinfo& server, size_t iterations, size_t size, Semaphore& readySem, + Semaphore& startSem) -> uint64_t { + int client_sock = socket(server.ai_family, server.ai_socktype, server.ai_protocol); + if (client_sock < 0) { + DIE_MSG_ERRNO("creating new client socket failed"); + } + + int err = emper::io::connectAndWait(client_sock, server.ai_addr, server.ai_addrlen); + if (err) { + DIE_MSG_ERRNO("connecting new client socket failed"); + } + + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char* buf[2]; + buf[0] = new char[size]; + buf[1] = new char[size]; + uint64_t avg_ns = 0; + + readySem.release(); + startSem.acquire(); + + for (size_t i = 0; i < iterations; ++i) { + char* outBuf = buf[i % 2]; + char* inBuf = buf[(i + 1) % 2]; + + SendFuture sendFuture(client_sock, outBuf, size, MSG_NOSIGNAL); + RecvFuture recvFuture(client_sock, inBuf, size, MSG_WAITALL); + recvFuture.setDependency(sendFuture); + + auto start = std::chrono::high_resolution_clock::now(); + int32_t res = recvFuture.submitAndWait(); + auto end = std::chrono::high_resolution_clock::now(); + + if (res <= 0) { + // recv failed + if (res != -ECANCELED) { + if (res == 0) { + DIE_MSG("server closed the connection"); + } + + errno = -res; + DIE_MSG_ERRNO("receive failed"); + } + + // send failed + int32_t send_res = sendFuture.wait(); + if (send_res < 0) { + errno = -send_res; + DIE_MSG_ERRNO("send failed"); + } + + DIE_MSG("short send" << send_res); + } + + if (memcmp(outBuf, inBuf, size) != 0) { + DIE_MSG("got unexpected echo from server"); + } + + auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start); + uint64_t ns = duration.count(); + if (unlikely(avg_ns == 0)) { + avg_ns = ns; + } else { + avg_ns += ns; + avg_ns /= 2; + } + } + + // Shutdown and close the client socket; + shutdown(client_sock, SHUT_RDWR); + + // ShutdownFuture shut_f(client_sock, SHUT_RDWR); + CloseFuture cf(client_sock); + // cf.setDependency(shut_f); + cf.submit(); + + delete[] buf[0]; + delete[] buf[1]; + + cf.wait(); + + return avg_ns; +} + +auto getOption(int argc, char** argv, const std::string& option) -> char* { + char** end = argv + argc; + char** itr = std::find(argv, end, option); + + if (itr != end && ++itr != end) { + return *itr; + } + + return nullptr; +} + +static auto existsOption(int argc, char** argv, const std::string& option) -> bool { + char** end = argv + argc; + return std::find(argv, end, option) != end; +} + +static void printUsage(char* name) { + std::cerr << "Usage: " << name + << "[-h] [-p <port>] [-c <clients>] [-i <iterations>] [-a <address>] [-s <size>]" + << std::endl; +} + +auto main(int argc, char* argv[]) -> int { + std::string host = HOST; + std::string port = PORT; + size_t iterations = ITERATIONS; + size_t clients = CLIENTS; + size_t size = SIZE; + + if (existsOption(argc, argv, "-h")) { + printUsage(argv[0]); + exit(EXIT_FAILURE); + } + + char* port_s = getOption(argc, argv, "-p"); + if (port_s) { + port = std::string(port_s); + } + + char* addr_s = getOption(argc, argv, "-a"); + if (addr_s) { + host = std::string(addr_s); + } + + char* client_s = getOption(argc, argv, "-c"); + if (client_s) { + clients = strtol(client_s, nullptr, DECIMAL); + } + + char* iterations_s = getOption(argc, argv, "-i"); + if (iterations_s) { + iterations = strtol(iterations_s, nullptr, DECIMAL); + } + + char* size_s = getOption(argc, argv, "-s"); + if (size_s) { + size = strtol(size_s, nullptr, DECIMAL); + } + + struct addrinfo* server; + int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server); + if (err) { + if (err == EAI_SYSTEM) { + DIE_MSG_ERRNO("getaddrinfo failed"); + } else { + LOGE("error in getaddrinfo: " << gai_strerror(err)); + exit(EXIT_FAILURE); + } + } + + Runtime runtime; + + Fiber* alphaFiber = Fiber::from([&] { + auto* client_avgs = new uint64_t[clients]; + CPS cps; + Semaphore readySemaphore; + Semaphore startSemaphore; + + // establish client connections + for (size_t i = 0; i < clients; ++i) { + spawn( + [&, i] { + client_avgs[i] = clientFunc(*server, iterations, size, readySemaphore, startSemaphore); + }, + cps); + } + + // await the clients + for (size_t i = 0; i < clients; ++i) { + readySemaphore.acquire(); + } + + // start the clients + for (size_t i = 0; i < clients; ++i) { + startSemaphore.release(); + } + + // await the clients + cps.wait(); + + uint64_t avg_ns = client_avgs[0]; + for (size_t i = 1; i < clients; ++i) { + avg_ns += client_avgs[i]; + avg_ns /= 2; + } + + std::cout << "clients,iterations,size,time" << std::endl; + std::cout << clients << "," << iterations << "," << size << "," << avg_ns << std::endl; + delete[] client_avgs; + + exit(EXIT_SUCCESS); + }); + + runtime.scheduleFromAnywhere(*alphaFiber); + + runtime.waitUntilFinished(); + + return EXIT_FAILURE; +} diff --git a/apps/EchoServer.cpp b/apps/EchoServer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4d21dac67043eee4caed1480cd48c65d17b5e931 --- /dev/null +++ b/apps/EchoServer.cpp @@ -0,0 +1,72 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <sys/types.h> + +#include <cassert> +#include <climits> +#include <cstdlib> +#include <cstring> +#include <iostream> +#include <string> + +#include "Common.hpp" +#include "Runtime.hpp" +#include "emper-common.h" +#include "io.hpp" + +const int DECIMAL = 10; +const std::string HOST = "0.0.0.0"; +const int PORT = 12345; + +auto main(int argc, char* argv[]) -> int { + int port = PORT; + std::string host = HOST; + + if (argc > 2) { + std::cerr << "Usage: " << argv[0] << " [port]" << std::endl; + exit(EXIT_FAILURE); + } + + if (argc > 1) { + long aport = strtol(argv[1], nullptr, DECIMAL); + assert(aport <= INT_MAX && aport >= INT_MIN); + port = (int)aport; + } + + std::cout << "Echoserver listening on " << host << ":" << port << std::endl; + + Runtime runtime; + auto* listener = emper::io::tcp_listener(host, port, [](int socket) { + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char buf[1024]; + for (;;) { + ssize_t bytes_recv = emper::io::recvAndWait(socket, buf, sizeof(buf), 0); + if (unlikely(bytes_recv <= 0)) { + // socket was shutdown + if (bytes_recv < 0) { + DIE_MSG_ERRNO("server read failed"); + } + + emper::io::closeAndForget(socket); + return; + } + + if (unlikely(bytes_recv == 5 && strncmp("quit\n", buf, bytes_recv) == 0)) { + exit(EXIT_SUCCESS); + } + + ATTR_UNUSED ssize_t bytes_send = emper::io::sendAndWait(socket, buf, bytes_recv, 0); + assert(bytes_recv == bytes_send); + } + }); + + if (!listener) { + exit(EXIT_FAILURE); + } + + runtime.scheduleFromAnywhere(*listener); + + runtime.waitUntilFinished(); + + return EXIT_FAILURE; +} diff --git a/apps/meson.build b/apps/meson.build index 7ed455b24328a4572a91e1e17c71e68f425191f8..5075128a54a7118400b8d1d1965615331c7dd69d 100644 --- a/apps/meson.build +++ b/apps/meson.build @@ -9,3 +9,15 @@ worker_sleep_example_exe = executable( 'WorkerSleepExample.cpp', dependencies: emper_dep, ) + +echoserver_exe = executable( + 'echoserver', + 'EchoServer.cpp', + dependencies: emper_dep, +) + +echoclient_exe = executable( + 'echoclient', + 'EchoClient.cpp', + dependencies: emper_dep, +) diff --git a/emper/BinaryPrivateSemaphore.hpp b/emper/BinaryPrivateSemaphore.hpp index 54410bca5b1bb1c96ca74a0938696d42fa7e62e8..5dcee79238888069be9423fe115fd5292d25425f 100644 --- a/emper/BinaryPrivateSemaphore.hpp +++ b/emper/BinaryPrivateSemaphore.hpp @@ -35,6 +35,8 @@ class BinaryPrivateSemaphore : public PrivateSemaphore { BinaryPrivateSemaphore() : bpsState(initial) {} void wait() override; + + void reset() { bpsState = initial; }; }; using BPS = BinaryPrivateSemaphore; diff --git a/emper/Blockable.hpp b/emper/Blockable.hpp index 04cbb493ab0cc112ecbc4cd40b6ec06c3fbf7fcb..7b2378f6508e7120588e9160cc73dc257c95af72 100644 --- a/emper/Blockable.hpp +++ b/emper/Blockable.hpp @@ -9,7 +9,11 @@ #include "Context.hpp" #include "ContextManager.hpp" #include "Debug.hpp" +#include "Emper.hpp" #include "Runtime.hpp" +#include "lib/adt/LockedSet.hpp" + +static emper::lib::adt::LockedSet<Context*> blockedContexts; template <LogSubsystem logSubsystem> class Blockable : public Logger<logSubsystem> { @@ -29,12 +33,21 @@ class Blockable : public Logger<logSubsystem> { assert(Runtime::inRuntime()); LOGD("block() blockedContext is " << Context::getCurrentContext()); + + if constexpr (emper::BLOCKED_CONTEXT_SET) { + blockedContexts.insert(Context::getCurrentContext()); + } + contextManager.saveAndStartNew(std::move(freshContextHook)); } template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void unblock(Context* context) { assert(context != nullptr); + if constexpr (emper::BLOCKED_CONTEXT_SET) { + blockedContexts.erase(context); + } + // cppcheck-suppress unsafeClassCanLeak Fiber* unblockFiber = Fiber::from([this, context]() { contextManager.discardAndResume(context); }); diff --git a/emper/Common.hpp b/emper/Common.hpp index a78233cd2154bcf61ac1e3cd79475fa0b3620d27..fdda0ccfb80f50391047787b7d28816e8982fd19 100644 --- a/emper/Common.hpp +++ b/emper/Common.hpp @@ -23,3 +23,17 @@ using func_t = std::function<void()>; [[noreturn]] void die(const char* message, bool usePerror); using WORD = unsigned int; + +#ifdef EMPER_STATS +#include <chrono> +#define TIME_NS(code, record_func) \ + auto _time_ns_start = std::chrono::high_resolution_clock::now(); \ + code; \ + auto _time_ns_end = std::chrono::high_resolution_clock::now(); \ + auto _time_ns_duration = \ + std::chrono::duration_cast<std::chrono::nanoseconds>(_time_ns_end - _time_ns_start); \ + record_func(_time_ns_duration); + +#else +#define TIME_NS(code, record) code +#endif diff --git a/emper/Debug.hpp b/emper/Debug.hpp index 71b9cea518484703e2824f0277a006cba1933076..b02d89c07eed87eca25f1812f85c2c92bc746070 100644 --- a/emper/Debug.hpp +++ b/emper/Debug.hpp @@ -63,6 +63,7 @@ enum class LogSubsystem { SCHED, RUNTI, U_B_MPSC_Q, + IO, }; enum LogLevel { @@ -99,6 +100,7 @@ class Logger { case LogSubsystem::SCHED: case LogSubsystem::RUNTI: case LogSubsystem::U_B_MPSC_Q: + case LogSubsystem::IO: default: return ALL; } @@ -122,6 +124,8 @@ class Logger { return "RUNTI"; case LogSubsystem::U_B_MPSC_Q: return "UBSCQ"; + case LogSubsystem::IO: + return "IO "; default: return "UNKNOWN SUBSYSTEM (Add it *now*)"; } diff --git a/emper/Emper.hpp b/emper/Emper.hpp index 995a567e7fa34280fdcaccb616ab94b01b8ffbc2..bbebc22ac9cc083a14f11d528c273479ab2f3032 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -65,4 +65,28 @@ static const bool OVERFLOW_QUEUE = ; auto getFullVersion() -> std::string; + +static const bool BLOCKED_CONTEXT_SET = +#ifdef EMPER_BLOCKED_CONTEXT_SET + DEBUG +#else + false +#endif + ; + +static const bool IO = +#ifdef EMPER_IO + true +#else + false +#endif + ; + +static const bool IO_URING_SQPOLL = +#ifdef EMPER_IO_URING_SQPOLL + true +#else + false +#endif + ; } // namespace emper diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index b6a9c58f8be1dac35671cfa00f2169323cebe4f9..e195d3e05f24a07876511c1beb6002b2cc56fc54 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020-2021 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus, Florian Fischer #include "Runtime.hpp" #include <pthread.h> // for pthread_t, pthread_attr_init @@ -24,6 +24,8 @@ #include "RuntimeStrategyFactory.hpp" #include "RuntimeStrategyStats.hpp" // for RuntimeStrategyStats #include "emper-config.h" // IWYU pragma: keep +#include "io/IoContext.hpp" // for IoContext +#include "io/Stats.hpp" // for emper::io::Stats #include "lib/DebugUtil.hpp" #ifdef EMPER_DEFAULT_SCHEDULING_STRATEGY_WORK_STEALING @@ -58,6 +60,8 @@ RuntimeStrategyFactory& Runtime::DEFAULT_STRATEGY = #endif ; +using emper::io::IoContext; + Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory, unsigned int seed) : workerCount(workerCount), workerLatch(workerCount), @@ -80,6 +84,32 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory currentRuntime = this; } + // initialize the global and all worker IoContexts + if constexpr (emper::IO) { + // The global io_uring needs at least workerCount entries in its SQ because + // for each worker's IoContext one eventfd read is prepared before the + // globalCompleter is started and submits all previously prepared sqes. + globalIo = new IoContext(workerCount); + ioContexts = new IoContext[workerCount]; + + if constexpr (emper::STATS) { + globalIo->stats.workerId = -1; + std::atexit(emper::io::Stats::printWorkerStats); + } + + // submit the workers' CQ eventfds to the global IoContext + for (workerid_t i = 0; i < workerCount; ++i) { + ioContexts[i].submit_efd(); + if constexpr (emper::STATS) { + ioContexts[i].stats.workerId = i; + } + } + + // start the globalCompleter after all eventfd are submitted so we don't need to + // synchronize the globalIo's SQ + IoContext::startGlobalCompleter(*globalIo); + } + for (workerid_t i = 0; i < workerCount; ++i) { pthread_attr_t attr; errno = pthread_attr_init(&attr); @@ -141,6 +171,12 @@ Runtime::~Runtime() { } delete[] workers; delete[] threads; + + if constexpr (emper::IO) { + delete[] ioContexts; + delete globalIo; + } + { std::lock_guard<std::mutex> lock(currentRuntimeMutex); currentRuntime = nullptr; @@ -151,6 +187,10 @@ Runtime::~Runtime() { auto Runtime::workerLoop(Worker* worker) -> void* { worker->setWorker(); + if constexpr (emper::IO) { + ioContexts[worker->workerId].setWorkerIo(); + } + LOGD("Worker loop started by thread " << syscall(SYS_gettid)); int oldType; @@ -169,7 +209,14 @@ auto Runtime::workerLoop(Worker* worker) -> void* { return nullptr; } -auto Runtime::nextFiber() -> NextFiberResult { return scheduler.nextFiber(); } +auto Runtime::nextFiber() -> NextFiberResult { + if constexpr (emper::IO) { + // Schedule all fibers waiting on completed IO + IoContext::getWorkerIo()->reapCompletions(); + } + + return scheduler.nextFiber(); +} void Runtime::waitUntilFinished() { for (workerid_t i = 0; i < workerCount; ++i) { diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 9cfd7679a91df131da576a718122d9da919abeb9..05c0ea5119cab09aff3ba8544fda2807232ab1e6 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -30,6 +30,12 @@ class Fiber; class RuntimeStrategy; class RuntimeStrategyFactory; +namespace emper::io { +class IoContext; +} + +using emper::io::IoContext; + enum WakeupMode { IF_SLEEPING_OBSERVED, ALWAYS, @@ -52,6 +58,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { ContextManager& contextManager; pthread_t* threads; Worker** workers; + IoContext* globalIo; + IoContext* ioContexts; std::default_random_engine randomEngine; std::uniform_int_distribution<unsigned int> uniformIntDistribution; @@ -115,6 +123,14 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { workerSleepConditionVariable.wait(lk); } + inline auto getGlobalIo() -> IoContext* { + if constexpr (emper::IO) { + return globalIo; + } + + return nullptr; + } + public: Runtime() : Runtime(std::thread::hardware_concurrency()) {} @@ -164,6 +180,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { friend ContextManager; friend Scheduler; friend Dispatcher; + friend IoContext; template <typename T, intptr_t WS_QUEUE_SIZE, size_t WORKER_EXCLUSIVE_QUEUE_SIZE> friend class MemoryManager; }; diff --git a/emper/c_emper.cpp b/emper/c_emper.cpp index 6822dee64352f2175c6d8be875b1b7399ace35cf..85a9bf9605939c90edc2bd9c3f49c0db4aa61586 100644 --- a/emper/c_emper.cpp +++ b/emper/c_emper.cpp @@ -1,12 +1,22 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020 Florian Schmaus, Florian Fischer #include "BinaryPrivateSemaphore.hpp" // for BinaryPrivateSemaphore, BPS #include "CountingPrivateSemaphore.hpp" // for CountingPrivateSemaphore, CPS #include "Fiber.hpp" // for Fiber, Fiber::NOT_AFFINE #include "Runtime.hpp" // for Runtime #include "emper-common.h" // for workeraffinity_t +#include "emper-config.h" // IWYU pragma: keep #include "emper.h" // for fiber, cps, bps, runtime +#ifdef EMPER_IO +#include <sys/socket.h> // for socklen_t +#include <sys/types.h> // for off_t + +#include <cstddef> // for size_t + +#include "io.hpp" // for emper::io::emper_* +#endif + auto init_runtime(void) -> runtime* { auto* r = new Runtime(); return reinterpret_cast<runtime*>(r); @@ -99,3 +109,35 @@ void wait_cps(cps* sem) { auto* countingPrivateSemaphore = reinterpret_cast<CountingPrivateSemaphore*>(sem); countingPrivateSemaphore->wait(); } + +#ifdef EMPER_IO +auto emper_recv(int socket, void* buffer, size_t length, int flags) -> ssize_t { + return emper::io::recvAndWait(socket, buffer, length, flags); +} + +auto emper_send(int socket, const void* buffer, size_t length, int flags) -> ssize_t { + return emper::io::sendAndWait(socket, buffer, length, flags); +} + +auto emper_connect(int socket, const struct sockaddr* address, socklen_t address_len) -> int { + return emper::io::connectAndWait(socket, address, address_len); +} + +auto emper_accept(int socket, struct sockaddr* address, socklen_t* address_len) -> int { + return emper::io::acceptAndWait(socket, address, address_len); +} + +auto emper_read_file(int fildes, void* buf, size_t nbyte, off_t offset) -> ssize_t { + return emper::io::readFileAndWait(fildes, buf, nbyte, offset); +} + +auto emper_write_file(int fildes, const void* buf, size_t nbyte, off_t offset) -> ssize_t { + return emper::io::writeFileAndWait(fildes, buf, nbyte, offset); +} + +auto emper_writev(int fildes, const struct iovec* iov, int iovcnt) -> ssize_t { + return emper::io::writevAndWait(fildes, iov, iovcnt); +} + +auto emper_close(int fd) -> int { return emper::io::closeAndWait(fd); } +#endif diff --git a/emper/include/emper.h b/emper/include/emper.h index e93cbf081531a2e29870a420822ab6474ec14d0a..4a1402e7013f856c6808d1c3db84f23d9a48e04e 100644 --- a/emper/include/emper.h +++ b/emper/include/emper.h @@ -1,8 +1,14 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020 Florian Schmaus, Florian Fischer #pragma once #include "emper-common.h" +#include "emper-config.h" +#include "stddef.h" // NOLINT(modernize-deprecated-headers) + +#ifdef EMPER_IO +#include "sys/socket.h" +#endif typedef struct runtime runtime; // NOLINT(modernize-use-using) @@ -60,6 +66,32 @@ void signal_and_exit_cps(cps* sem); void wait_cps(cps* sem); +#ifdef EMPER_IO +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +ssize_t emper_recv(int socket, void* buffer, size_t length, int flags); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +ssize_t emper_send(int socket, const void* buffer, size_t length, int flags); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +int emper_connect(int socket, const struct sockaddr* address, socklen_t address_len); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +int emper_accept(int socket, struct sockaddr* address, socklen_t* address_len); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +ssize_t emper_read_file(int fildes, void* buf, size_t nbyte, off_t offset); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +ssize_t emper_write_file(int fildes, const void* buf, size_t nbyte, off_t offset); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +ssize_t emper_writev(int fildes, const struct iovec* iov, int iovcnt); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +int emper_close(int fd); +#endif + #ifdef __cplusplus } #endif diff --git a/emper/io.hpp b/emper/io.hpp new file mode 100644 index 0000000000000000000000000000000000000000..76da837b79775247cafe7d7e95923b6496843eef --- /dev/null +++ b/emper/io.hpp @@ -0,0 +1,395 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#pragma once + +#include <sys/socket.h> // for socklen_t +#include <sys/types.h> // for ssize_t, off_t + +#include <cstddef> // for size_t +#include <functional> // for function +#include <memory> // for unique_ptr +#include <string> // for string + +#include "Future.hpp" // for Futures + +/* + * Header defining the public POSIX-like IO interface of emper. + * All functions defined in this header must be used inside a fiber. + * The emper IoContext to handle the IO request will be the one setup by the + * Runtime and returned by IoContext::getWorkerIo(). + */ + +class Fiber; + +namespace emper::io { +/** + * @brief non-blocking recv mimicking POSIX recv(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param buffer destination buffer + * @param length length of the message in bytes + * @param flags type of message reception + * + * @return Future object which signals the completion of the recv request + */ +inline auto recv(int socket, void *buffer, size_t length, int flags) -> std::unique_ptr<Future> { + auto future = std::make_unique<RecvFuture>(socket, buffer, length, flags); + future->submit(); + return future; +} + +/** + * @brief Blocking recv mimicking POSIX recv(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param buffer destination buffer + * @param length length of the message in bytes + * @param flags type of message reception + + * @return -1 on error, 0 when receiving from a closed socket, otherwise the received bytes + */ +inline auto recvAndWait(int socket, void *buffer, size_t length, int flags) -> ssize_t { + RecvFuture future(socket, buffer, length, flags); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief Non-blocking send mimicking POSIX send(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param buffer source buffer + * @param length length of the message in bytes + * @param flags type of message transmission + * @param send_all If true sginal the future only if all bytes are sent or + * sending further bytes is not possible + * + * @return Future object which signals the completion of the send request + */ +inline auto send(int socket, const void *buffer, size_t length, int flags, bool send_all = true) + -> std::unique_ptr<Future> { + auto future = std::make_unique<SendFuture>(socket, buffer, length, flags, send_all); + future->submit(); + return future; +} + +/** + * @brief Blocking send mimicking POSIX send(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param buffer source buffer + * @param length length of the message in bytes + * @param flags type of message transmission + * @param send_all If true return to the user only if all bytes are sent or + * sending further bytes is not possible + * + * @return -1 on error, otherwise the number of sent bytes + */ +inline auto sendAndWait(int socket, const void *buffer, size_t length, int flags, + bool send_all = true) -> ssize_t { + void *mut_buf = const_cast<void *>(buffer); + SendFuture future(socket, mut_buf, length, flags, send_all); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief Non-blocking connect mimicking POSIX connect(3) + * + * It is discouraged to call connect on a socket with the SOCK_NONBLOCK flag set + * because the request submitted to io_uring will immediately be completed + * with EINPROGESS or EALREADY and must be repeatedly reinserted into the submission + * queue and io_uring afterwards. + * To efficiently connect a socket don't set its SOCK_NONBLOCK flag. + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param address buffer containing the address to connect to + * @param address_len length of the sockaddr structure + * + * @return Future object which signals the completion of the connect request + */ +inline auto connect(int socket, const struct sockaddr *address, socklen_t address_len) + -> std::unique_ptr<Future> { + auto future = std::make_unique<ConnectFuture>(socket, address, address_len); + future->submit(); + return future; +} + +/** + * @brief Blocking connect mimicking POSIX connect(3) + * + * It is discouraged to call connect on a socket with the SOCK_NONBLOCK flag set + * because the request submitted to io_uring will immediately be completed + * with EINPROGESS or EALREADY and must be repeatedly reinserted into the submission + * queue and io_uring afterwards. + * To efficiently connect a socket don't set its SOCK_NONBLOCK flag. + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param address buffer containing the address to connect to + * @param address_len length of the sockaddr structure + * + * @return -1 on error, 0 otherwise + */ +inline auto connectAndWait(int socket, const struct sockaddr *address, socklen_t address_len) + -> int { + ConnectFuture future(socket, address, address_len); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief Non-blocking accept mimicking POSIX accept(3) + * + * It is discouraged to call accept on a socket with the SOCK_NONBLOCK flag set + * because the request submitted to io_uring will immediately be completed + * with EAGAIN or EWOULDBLOCK and must be repeatedly reinserted into the submission + * queue and io_uring afterwards. + * To efficiently accept new connections don't set the SOCK_NONBLOCK flag. + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param[out] address buffer to store the clients address + * @param[in, out] address_len length of the supplied sockaddr structure + * + * @return Future object which signals the completion of the accept request + */ +inline auto accept(int socket, struct sockaddr *address, socklen_t *address_len) + -> std::unique_ptr<Future> { + auto future = std::make_unique<AcceptFuture>(socket, address, address_len); + future->submit(); + return future; +} + +/** + * @brief Blocking accept mimicking POSIX accept(3) + * + * It is discouraged to call accept on a socket with the SOCK_NONBLOCK flag set + * because the request submitted to io_uring will immediately be completed + * with EAGAIN or EWOULDBLOCK and must be repeatedly reinserted into the submission + * queue and io_uring afterwards. + * To efficiently accept new connections don't set the SOCK_NONBLOCK flag. + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param[out] address buffer to store the clients address + * @param[in, out] address_len length of the supplied sockaddr structure + * + * @return -1 on error, otherwise the non-negative file descriptor of the accepted socket. + */ +inline auto acceptAndWait(int socket, struct sockaddr *address, socklen_t *address_len) -> int { + AcceptFuture future(socket, address, address_len); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief Non-blocking read for regular files mimicking POSIX read(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fildes file descriptor to the regular file to be read from + * @param buf destination buffer + * @param nbyte amount of bytes to read + * @param offset offset in the file + * @param read_all If true signal the future only if the buffer is completely + * filled or no further data can be read. + * + * @return Future object which signals the completion of the read request + */ +inline auto readFile(int fildes, void *buf, size_t nbyte, off_t offset = 0, bool read_all = false) + -> std::unique_ptr<Future> { + auto future = std::make_unique<ReadFuture>(fildes, buf, nbyte, offset, read_all); + future->submit(); + return future; +} + +/** + * @brief Blocking read for regular files mimicking POSIX read(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fildes file descriptor to the regular file to be written to + * @param buf destination buffer + * @param nbyte amount of bytes to read + * @param offset offset in the file + * @param read_all If true return only to the user if the buffer is completely + * filled or no further data can be read. + * + * @return -1 on error, otherwise the number of bytes read + */ +inline auto readFileAndWait(int fildes, void *buf, size_t nbyte, off_t offset = 0, + bool read_all = false) -> ssize_t { + ReadFuture future(fildes, buf, nbyte, offset, read_all); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief Non-blocking write for regular files mimicking POSIX write(3) + * + * Currently only reading from regular files is tested and supported. + * Reading from other file types could work but may result in undefined behavior. + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fildes file descriptor to the regular file to be written to + * @param buf source buffer + * @param nbyte amount of bytes to write + * @param offset offset in the file + * @param write_all If true signal this future only all nbyte were written + * or no further data can be written. + * + * @return Future object which signals the completion of the write request + */ +inline auto writeFile(int fildes, const void *buf, size_t nbyte, off_t offset = 0, + bool write_all = true) -> std::unique_ptr<Future> { + auto future = std::make_unique<WriteFuture>(fildes, buf, nbyte, offset, write_all); + future->submit(); + return future; +} + +/** + * @brief Blocking write for regular files mimicking POSIX write(3) + * + * Currently only writing to regular files is tested and supported. + * Writing to other file types could work but may result in undefined behavior. + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fildes file descriptor to the regular file to be written to + * @param buf source buffer + * @param nbyte amount of bytes to write + * @param offset offset in the file + * @param write_all If true return only to the user if all nbyte were written + * or no further data can be written. + * + * @return -1 on error, otherwise the number of bytes written + */ +inline auto writeFileAndWait(int fildes, const void *buf, size_t nbyte, off_t offset = 0, + bool write_all = true) -> ssize_t { + WriteFuture future(fildes, buf, nbyte, offset, write_all); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief Non-blocking writev mimicking POSIX writev(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fildes file descriptor to write to + * @param iov pointer to the iovec struct holding buffers to write + * @param iovcnt number of buffers in the iovec array + * + * @return Future object which signals the completion of the write request + */ +inline auto writev(int fildes, const struct iovec *iov, int iovcnt) -> std::unique_ptr<Future> { + auto future = std::make_unique<WritevFuture>(fildes, iov, iovcnt); + future->submit(); + return future; +} + +/** + * @brief Blocking writev mimicking POSIX writev(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fildes file descriptor to write to + * @param iov pointer to the iovec struct holding buffers to write + * @param iovcnt number of buffers in the iovec array + * + * @return -1 on error, otherwise the number of bytes written + */ +inline auto writevAndWait(int fildes, const struct iovec *iov, int iovcnt) -> ssize_t { + WritevFuture future(fildes, iov, iovcnt); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief Non-blocking close mimicking POSIX close(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fd file descriptor to close + * + * @return Future object which signals the completion of the close request + */ +inline auto close(int fd) -> std::unique_ptr<Future> { + auto future = std::make_unique<CloseFuture>(fd); + future->submit(); + return future; +} + +/** + * @brief Blocking close mimicking POSIX close(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fd file descriptor to close + * + * @return 0 on success, -1 on error + */ +inline auto closeAndWait(int fd) -> ssize_t { + CloseFuture future(fd); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief submit a close request mimicking POSIX close(3) and forget about it + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fd file descriptor to close + */ +inline void closeAndForget(int fd) { + CloseFuture future(fd); + future.submitAndForget(); +} + +/** + * @brief Fiber accepting TCP connections and scheduling handler fibers + * + * @param host The host address the socket should be listening on + * @param port The port the socket should be listening on + * @param handler A function or lambda the started handler Fibers should execute. + * It is called with the file descriptor returned by accept. + * @param backlog The TCP connection backlog. + * + * @return nullptr on error, otherwise the TCP listener Fiber + */ +auto tcp_listener(std::string &host, int &port, const std::function<void(int)> &handler, + int backlog = 1024) -> Fiber *; +} // namespace emper::io diff --git a/emper/io/Future.cpp b/emper/io/Future.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8d59a3d698ee17af82173642c6deef092108c7fb --- /dev/null +++ b/emper/io/Future.cpp @@ -0,0 +1,118 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include "Future.hpp" + +#include <sys/types.h> // for ssize_t + +#include <cassert> // for assert +#include <cerrno> // for errno, EAGAIN, EWOULDBLOCK +#include <ostream> // for operator<<, ostream, basic_ost... + +#include "BinaryPrivateSemaphore.hpp" // for BPS +#include "CallerEnvironment.hpp" // for CallerEnvironment +#include "Debug.hpp" // for LOGD, LOGW +#include "Runtime.hpp" // for Runtime +#include "io/IoContext.hpp" // for IoContext +#include "io/Stats.hpp" // for Stats, Operation + +namespace emper::io { + +template <CallerEnvironment callerEnvironment> +auto PartialCompletableFuture::tryComplete(int32_t res) + -> PartialCompletableFuture::CompletionType { + // We expect partial completions and made progress + if (!isDependency() && partialCompletion > DISABLE_PARTIAL_COMPLETION && res > 0) { + partialCompletion += res; + // write/send: we have not written the whole buffer yet + // read: we have not filled the buffer yet + if ((size_t)partialCompletion < len) { + IoContext* io = IoContext::getIo<callerEnvironment>(); + io->submit<callerEnvironment>(*this); + return CompletionType::Resubmission; + } + + return CompletionType::IncrementalCompletion; + } + + return CompletionType::Completion; +} + +// show the compiler our template incarnations +template PartialCompletableFuture::CompletionType +PartialCompletableFuture::tryComplete<CallerEnvironment::EMPER>(int32_t res); +template PartialCompletableFuture::CompletionType +PartialCompletableFuture::tryComplete<CallerEnvironment::ANYWHERE>(int32_t res); + +auto Future::wait() -> int32_t { + LOGD("Waiting on " << this); + + sem.wait(); + + if constexpr (emper::DEBUG) { + state.retrieved = true; + } + + return returnValue; +} + +auto Future::waitAndSetErrno() -> ssize_t { + int32_t res = wait(); + // move error codes from io_uring into errno + if (res < 0) { + errno = -res; + return -1; + } + + return res; +} + +void Future::submit() { + assert(Runtime::inRuntime()); + assert(!isSubmitted()); + IoContext* io = IoContext::getWorkerIo(); + + LOGD("submit " << this << " to IoContext " << io); + + io->submit(*this); +} + +auto Future::cancel() -> int32_t { + if (dependency) { + dependency->cancel(); + } + + state.cancelled = true; + + if (!isSubmitted() || isForgotten()) { + LOGW("Cancelling unsubmitted or forgotten " << this); + return -ENOENT; + } + + if (isCompleted()) { + return returnValue; + } + + CancelWrapper cancellation(*this); + cancellation.submitAndWait(); + return wait(); +} + +/** + * Internal hiding functions for Stats::recordCompletion double dispatch + * + * The polymorphism functions are Future::recordCompletion and are defined in + * Future.hpp so they are included in each compilation unit and the + * compiler can optimize the recordCompletion calls if stats are off + */ +void Future::recordCompletionInternal(Stats& stats, int32_t res) const { + stats.recordCompletion(*this, res); +} + +void PartialCompletableFuture::recordCompletionInternal(Stats& stats, int32_t res) const { + stats.recordCompletion(*this, res); +} + +void WritevFuture::recordCompletionInternal(Stats& stats, int32_t res) const { + stats.recordCompletion(*this, res); +} +} // namespace emper::io diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp new file mode 100644 index 0000000000000000000000000000000000000000..76baee9c64ddc1ca249c02ad1e18b36bf2edc410 --- /dev/null +++ b/emper/io/Future.hpp @@ -0,0 +1,535 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#pragma once + +#include <liburing.h> // for io_uring_prep_read, io_uring_p... +#include <sys/socket.h> // for socklen_t +#include <sys/types.h> // for ssize_t + +#include <cstddef> // for size_t +#include <cstdint> // for int32_t, uint8_t +#include <cstdlib> // for abort +#include <ostream> // for operator<<, ostream, basic_ost... + +#include "BinaryPrivateSemaphore.hpp" // for BPS +#include "CallerEnvironment.hpp" // for CallerEnvironment, ANYWHERE +#include "Debug.hpp" // for LOGD, LogSubsystem, LogSubsyst... +#include "Emper.hpp" // for DEBUG +#include "io/Operation.hpp" // for Operation, operator<<, Operati... + +struct __kernel_timespec; +struct io_uring_sqe; + +namespace emper::io { +class Stats; + +/* + * @brief Future representing an IO request which can be awaited + */ +class Future : public Logger<LogSubsystem::IO> { + friend class IoContext; + friend class Stats; + + protected: + using State = struct State { + uint8_t submitted : 1 = 0; /*!< An sqe for this Future was prepared */ + uint8_t completed : 1 = 0; /*!< The semaphor was signaled */ + uint8_t retrieved : 1 = 0; /*!< Future::wait() has retrieved the result */ + uint8_t dependency : 1 = 0; /*!< This Future is a dependency of another */ + uint8_t cancelled : 1 = 0; /*!< Future::cancel() was called */ + uint8_t forgotten : 1 = + 0; /*!< This Future does not care about the result of the IO operation */ + }; + + BPS sem; + + State state; + + /* IO operation to perform */ + const Operation op; + + /* Arguments for the IO operation */ + const int fd; + void* const buf; + const size_t len; + const int offsetOrFlags; + + /** + * Return value of the operation. + * __s32 is used by liburing for the res field in its cqe struct + * This value is returned by Future::wait() and can only differentiate between + * failure or full success. + */ + int32_t returnValue = 0; + + // Dependency futures will be submitted to the io_uring before + // this and will be linked to this IO request using IOSQE_IO_LINK + Future* dependency = nullptr; + + virtual void prepareSqe(io_uring_sqe* sqe) = 0; + + void setCompletion(int32_t res) { + LOGD("Complete " << this); + returnValue = res; + state.completed = true; + } + + virtual void complete(int32_t res) { + setCompletion(res); + sem.signal(); + } + + virtual void completeFromAnywhere(int32_t res) { + setCompletion(res); + sem.signalFromAnywhere(); + } + + /** + * Used for Stats::recordCompletion double dispatch + */ + virtual void recordCompletion(Stats& stats, int32_t res) { + if constexpr (emper::STATS) { + recordCompletionInternal(stats, res); + } + } + + void recordCompletionInternal(Stats& stats, int32_t res) const; + + Future(Operation op, int fd, void* buf, size_t len, int offsetOrFlags) + : op(op), fd(fd), buf(buf), len(len), offsetOrFlags(offsetOrFlags){}; + + public: + virtual ~Future() { + if (isForgotten()) { + return; + } + + if constexpr (emper::DEBUG) { + if (!isRetrieved() && !isDependency() && !isCancelled()) { + LOGE(this << " created but never awaited"); + abort(); + } + } + + // cancel this Future to guaranty memory safety + // By canceling we assure that the this pointer and the contained buffer + // passed to io_uring is not used any more by the kernel or the IoContext + cancel(); + } + + inline auto isRetrieved() -> bool { return state.retrieved; } + inline auto isSubmitted() -> bool { return state.submitted; } + inline auto isCompleted() -> bool { return state.completed; } + inline auto isCancelled() -> bool { return state.cancelled; } + inline auto isDependency() -> bool { return state.dependency; } + inline auto isForgotten() -> bool { return state.forgotten; } + + /* + * @brief reset the Future + * + * Reset the state and the BPS. + * Resetting a Future is useful if we wan't to use the same Future object + * multiple times. + * A Future for a specific reoccurring IO operation can be created and reused + * in a submit, reset loop. + */ + virtual inline void reset() { + LOGD("Resetting Future"); + state = State(); + sem.reset(); + } + + /* + * @brief set Future as dependency + * + * The dependency must complete before this future. + * This is assured by linking the futures' sqes in the io_uring using the IOSQE_IO_LINK flag. + * If a request in the link fails all dependent futures will return -ECANCELED. + * + * See: <a href="https://unixism.net/loti/tutorial/link_liburing.html">Liburing linking + * requests</a> + * The emper equivalent to the example from the liburing documentation. + * + * @code + * int fd = open(FILE_NAME, O_RDWR|O_TRUNC|O_CREAT, 0644); + * if (fd < 0 ) { + * perror("open"); + * return 1; + * } + * + * char* msg = "Hallo emper!\n"; + * WriteFuture writeFuture(fd, &msg, strlen(msg), 0); + * + * char buf[32]; + * ReadFuture readFuture(fd, &buf, sizeof(buf), 0); + * readFuture.setDependency(writeFuture); + * + * CloseFuture closeFuture(fd); + * closeFuture.setDependency(readFuture); + * + * // Blocks till all three futures are completed + * int bytes_read = closeFuture.submitAndWait(); + * + * // So request in the chain could not be fully completed + * if (bytes_read == -ECANCELED) { + * } + * @endcode + * + */ + inline void setDependency(Future& dependency) { + dependency.state.dependency = true; + this->dependency = &dependency; + } + + /* + * @brief submit Future for asynchronous completion to the workers IoContext + */ + void submit(); + + /** + * @brief cancel Future and wait if necessary for its cancellation + * + * @return return the result received from the io_uring + */ + auto cancel() -> int32_t; + + /** + * @brief Block till the IO request is completed + * + * @return return the result received from the io_uring + */ + auto wait() -> int32_t; + + /** + * @brief Block till the IO request is completed and set errno on error + * + * @return -1 on error and errno will be set, otherwise the return value of the IO request + */ + auto waitAndSetErrno() -> ssize_t; + + /** + * @brief Equivalent to calling wait() after calling submit() + */ + inline auto submitAndWait() -> int32_t { + submit(); + return wait(); + } + + friend auto operator<<(std::ostream& os, const Future& f) -> std::ostream& { + return os << f.op << " Future " << (void*)&f; + } + + friend auto operator<<(std::ostream& os, const Future* f) -> std::ostream& { + if (f) { + return os << *f; + } else { + return os << "Future nullptr"; + } + } +}; + +class PartialCompletableFuture : public Future { + friend class Stats; + friend class IoContext; + friend class RecvFuture; + + enum class CompletionType { + Resubmission, + IncrementalCompletion, + Completion, + }; + + /** + * @brief Try to complete this IO request using the result of an IO operation. + * If the IO request is completed return true otherwise resubmit it. + * + * @param res Result from an IO operation from a io_uring_cqe or a common syscall + * + * @return the type of completion + */ + template <CallerEnvironment callerEnvironment> + auto tryComplete(int32_t res) -> CompletionType; + + protected: + /** + * This member is used internally to handle partial completion. + * If initialized with -1 it signals that the IO request should immediately return to the user. + * A value >= 0 signals that this IO request should not * return to the user until it is + * fully completed or a persistent error arises. + * The IO request is considered fully completed if partialCompletion == len. + * + * When the IO request returns to the user the combination of @c future.returnValue and + * @c future.partialCompletion can signal partial completion. + */ + ssize_t partialCompletion; + + // return immediately to the user + static const ssize_t DISABLE_PARTIAL_COMPLETION = -1; + // try to fully complete the request + static const ssize_t ENABLE_PARTIAL_COMPLETION = 0; + + PartialCompletableFuture(Operation op, int fd, void* buf, size_t len, int offsetOrFlags, + bool completeFully) + : Future(op, fd, buf, len, offsetOrFlags), + partialCompletion(completeFully ? ENABLE_PARTIAL_COMPLETION : DISABLE_PARTIAL_COMPLETION){}; + + PartialCompletableFuture(Operation op, int fd, void* buf, size_t len, int offsetOrFlags, + ssize_t partialCompletion) + : Future(op, fd, buf, len, offsetOrFlags), partialCompletion(partialCompletion){}; + + /** + * Used for Stats::recordCompletion double dispatch + */ + virtual void recordCompletion(Stats& stats, int32_t res) override { + if constexpr (emper::STATS) { + recordCompletionInternal(stats, res); + } + } + + void recordCompletionInternal(Stats& stats, int32_t res) const; + + void complete(int32_t res) override { + CompletionType completion = tryComplete<CallerEnvironment::EMPER>(res); + + switch (completion) { + case CompletionType::Resubmission: + return; + case CompletionType::IncrementalCompletion: + Future::complete(partialCompletion); + return; + case CompletionType::Completion: + Future::complete(res); + return; + } + } + + void completeFromAnywhere(int32_t res) override { + CompletionType completion = tryComplete<CallerEnvironment::ANYWHERE>(res); + + switch (completion) { + case CompletionType::Resubmission: + return; + case CompletionType::IncrementalCompletion: + Future::completeFromAnywhere(partialCompletion); + return; + case CompletionType::Completion: + Future::completeFromAnywhere(res); + return; + } + } + + public: + /** + * @brief Block till the IO request is completed + * + * @param[out] partialCompletion The amount of work done till an persistent error arose + * + * @return return the result received from the io_uring + */ + auto wait(ssize_t& partialCompletion) -> int32_t { + int32_t res = Future::wait(); + partialCompletion = this->partialCompletion; + return res; + } + using Future::wait; + + /* + * @brief Equivalent to calling wait(partialCompletion) after calling submit() + */ + inline auto submitAndWait(ssize_t& partialCompletion) -> int32_t { + Future::submit(); + return wait(partialCompletion); + } + using Future::submitAndWait; + + /** + * @brief reset the Future and its partialCompletion. + */ + void reset() override { + if (partialCompletion != DISABLE_PARTIAL_COMPLETION) { + partialCompletion = ENABLE_PARTIAL_COMPLETION; + } + + Future::reset(); + } +}; + +class SendFuture : public PartialCompletableFuture { + void prepareSqe(io_uring_sqe* sqe) override { + if (partialCompletion == DISABLE_PARTIAL_COMPLETION) { + io_uring_prep_send(sqe, fd, buf, len, offsetOrFlags); + } else { + io_uring_prep_send(sqe, fd, (char*)buf + partialCompletion, len - partialCompletion, + offsetOrFlags); + } + } + + public: + SendFuture(int socket, const void* buffer, size_t length, int flags, ssize_t partialCompletion) + : PartialCompletableFuture(Operation::SEND, socket, const_cast<void*>(buffer), length, flags, + partialCompletion){}; + + SendFuture(int socket, const void* buffer, size_t length, int flags, bool send_all = true) + : SendFuture(socket, const_cast<void*>(buffer), length, flags, + send_all ? ENABLE_PARTIAL_COMPLETION : DISABLE_PARTIAL_COMPLETION){}; +}; + +class RecvFuture : public Future { + void prepareSqe(io_uring_sqe* sqe) override { + io_uring_prep_recv(sqe, fd, buf, len, offsetOrFlags); + } + + public: + RecvFuture(int socket, void* buffer, size_t length, int flags) + : Future(Operation::RECV, socket, buffer, length, flags){}; +}; + +class ConnectFuture : public Future { + void prepareSqe(io_uring_sqe* sqe) override { + io_uring_prep_connect(sqe, fd, (const struct sockaddr*)buf, (socklen_t)len); + } + + public: + ConnectFuture(int socket, const struct sockaddr* address, socklen_t address_len) + : Future(Operation::CONNECT, socket, (void*)address, address_len, 0){}; +}; + +class AcceptFuture : public Future { + void prepareSqe(io_uring_sqe* sqe) override { + io_uring_prep_accept(sqe, fd, (struct sockaddr*)buf, (socklen_t*)len, 0); + } + + public: + AcceptFuture(int socket, const struct sockaddr* address, socklen_t* address_len) + : Future(Operation::ACCEPT, socket, (void*)address, (size_t)address_len, 0){}; +}; + +class ReadFuture : public PartialCompletableFuture { + void prepareSqe(io_uring_sqe* sqe) override { + if (partialCompletion == DISABLE_PARTIAL_COMPLETION) { + io_uring_prep_read(sqe, fd, buf, len, offsetOrFlags); + } else { + io_uring_prep_read(sqe, fd, (char*)buf + partialCompletion, len - partialCompletion, + offsetOrFlags + partialCompletion); + } + } + + public: + ReadFuture(int fildes, void* buf, size_t nbyte, int offset, bool read_all = false) + : PartialCompletableFuture( + Operation::READ, fildes, buf, nbyte, offset, + read_all ? ENABLE_PARTIAL_COMPLETION : DISABLE_PARTIAL_COMPLETION){}; +}; + +class WriteFuture : public PartialCompletableFuture { + void prepareSqe(io_uring_sqe* sqe) override { + if (partialCompletion == DISABLE_PARTIAL_COMPLETION) { + io_uring_prep_write(sqe, fd, buf, len, offsetOrFlags); + } else { + // TODO: think about partial writes with incremental offset! + // Are transparent incremental writes on files without O_APPEND even reasonable? + io_uring_prep_write(sqe, fd, (char*)buf + partialCompletion, len - partialCompletion, + offsetOrFlags + partialCompletion); + } + } + + public: + WriteFuture(int fildes, const void* buf, size_t nbyte, int offset, bool write_all = true) + : PartialCompletableFuture( + Operation::WRITE, fildes, const_cast<void*>(buf), nbyte, offset, + write_all ? ENABLE_PARTIAL_COMPLETION : DISABLE_PARTIAL_COMPLETION){}; +}; + +class WritevFuture : public Future { + void prepareSqe(io_uring_sqe* sqe) override { + io_uring_prep_writev(sqe, fd, (const struct iovec*)buf, len, 0); + } + + /** + * Used for Stats::recordCompletion double dispatch + */ + void recordCompletion(Stats& stats, int32_t res) override { + if constexpr (emper::STATS) { + recordCompletionInternal(stats, res); + } + } + + void recordCompletionInternal(Stats& stats, int32_t res) const; + + public: + WritevFuture(int fildes, const struct iovec* iov, int iovnct) + : Future(Operation::WRITEV, fildes, (void*)iov, iovnct, 0){}; +}; + +class CloseFuture : public Future { + void prepareSqe(io_uring_sqe* sqe) override { io_uring_prep_close(sqe, fd); } + + public: + CloseFuture(int fildes) : Future(Operation::CLOSE, fildes, nullptr, 0, 0){}; + + void submitAndForget() { + state.forgotten = true; + submit(); + } +}; + +/* + * @brief Add a timeout to any Future not already submitted. + * + * A Timeout is added to a Future by inserting an dependent IOURING_OP_LINK_TIMEOUT + * sqe after the actual IO operation. + * The dependent timeout will not be started like usual dependent requests after + * the previous request is completed, it is instead armed immediately when the + * actual request is started. + * See: https://lwn.net/Articles/803932/ + * + * @code + * ReadFuture rf(fd, buf, buf_len, 0); + * struct __kernel_timespec ts = {.tv_sec = 0, .tv_nsec = 0); + * TimeoutWrapper t(rf, ts); + * // Both futures should be awaited by calling its wait method to + * // prevent use after free problems. + * t.submitAndWait() + * rf.Wait() + * @endcode + */ +class TimeoutWrapper : public Future { + void prepareSqe(io_uring_sqe* sqe) override { + io_uring_prep_link_timeout(sqe, (struct ::__kernel_timespec*)buf, 0); + } + + public: + TimeoutWrapper(Future& future, struct __kernel_timespec& ts) + : Future(Operation::LINK_TIMEOUT, 0, (void*)&ts, 0, 0) { + setDependency(future); + }; +}; + +/* + * @brief Arm a timeout which will signal the future when it is reached + * + * If the timeout was reached it will wait() will return -ETIME. + */ +class AlarmFuture : public Future { + void prepareSqe(io_uring_sqe* sqe) override { + io_uring_prep_timeout(sqe, (struct ::__kernel_timespec*)buf, 0, 0); + } + + public: + AlarmFuture(struct __kernel_timespec& ts) : Future(Operation::TIMEOUT, 0, (void*)&ts, 0, 0){}; +}; + +/* + * @brief Request the cancellation of another Future and await the cancellation + * + * If the target was found and cancelled wait() will immediately return 0. + * If the target was already completed wait() will immediately return -ENOENT. + * If the target was already started wait() will return -EALREADY. + */ +class CancelWrapper : public Future { + void prepareSqe(io_uring_sqe* sqe) override { io_uring_prep_cancel(sqe, buf, 0); } + + public: + CancelWrapper(Future& future) : Future(Operation::CANCEL, 0, &future, 0, 0){}; +}; +} // namespace emper::io diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp new file mode 100644 index 0000000000000000000000000000000000000000..df9eb29b5f8cc1e30dcd798a2101a0dffda122c3 --- /dev/null +++ b/emper/io/IoContext.cpp @@ -0,0 +1,372 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include "io/IoContext.hpp" + +#include <liburing.h> // for io_uring_submit, io_uring_get_sqe +#include <liburing/io_uring.h> // for io_uring_cqe, io_uring_params, IORI... +#include <sys/eventfd.h> // for eventfd +#include <unistd.h> // for close + +#include <atomic> // for atomic, __atomic_base +#include <cassert> // for assert +#include <cerrno> // for errno, ECANCELED, EBUSY, EAGAIN, EINTR +#include <chrono> // for nanoseconds +#include <cstdio> // for perror +#include <cstdlib> // for exit +#include <cstring> // for memset +#include <memory> // for allocator + +#include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER, ANYWHERE +#include "Common.hpp" // for unlikely, DIE_MSG_ERRNO, DIE_MSG +#include "Debug.hpp" // for LOGD +#include "Emper.hpp" // for DEBUG, IO_URING_SQPOLL +#include "Runtime.hpp" // for Runtime +#include "io/Future.hpp" // for Future, operator<<, Future::State +#include "io/Stats.hpp" // for Stats, nanoseconds + +#ifndef EMPER_LOG_OFF +#include <ostream> // for basic_osteram::operator<<, operator<< +#endif + +// use the most significant bit of a pointer to differ between an IoContext and a Future +// in the global IoContext's CQ +static const uintptr_t IOCONTEXT_TAG = 1L << (sizeof(size_t) * 8 - 1); +static const uintptr_t IOCONTEXT_TAG_MASK = IOCONTEXT_TAG - 1; + +namespace emper::io { +thread_local IoContext *IoContext::workerIo = nullptr; + +pthread_t IoContext::globalCompleter; + +auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> unsigned { + unsigned total_chain_length = chain_length; + + if (unlikely(chain_length > *this->ring.sq.kring_entries)) { + DIE_MSG("Future chain is to long for the io_uring's SQ"); + } + + if (future.dependency) { + total_chain_length = prepareFutureChain(*future.dependency, chain_length + 1); + } + + if constexpr (emper::DEBUG) { + reqs_in_uring++; + uringFutureSet.insert(&future); + } + + struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); + // The kernel consumes all sqes from the SQ during io_uring_enter + // If we can't get a sqe the chain was to long for our SQ + assert(sqe); + + future.prepareSqe(sqe); + + // Someone wants to be notified about the completion of this Future + if (!(future.isForgotten())) { + io_uring_sqe_set_data(sqe, &future); + } + + future.state.submitted = true; + + if (future.isDependency()) { + LOGD("Prepare " << future << " as a dependency"); + sqe->flags |= IOSQE_IO_LINK; + } + + return total_chain_length; +} + +template <CallerEnvironment callerEnvironment> +void IoContext::submit(Future &future) { + LOGD("submitting " << future << (future.dependency ? " and it's dependencies" : "")); +#ifdef NDEBUG + UNUSED_ARG +#endif + int prepared = static_cast<int>(prepareFutureChain(future, 1)); + + // submit the Future to the io_uring + int submitted = io_uring_submit(&ring); + + // We can't submit our sqe because the CQ is full + // in worker thread -> reapCompletions + // in globalCompleter thread -> TODO: deterministically handle global full CQ + // for now hope a jam does not happen or will solve itself + if (unlikely(submitted == -EBUSY)) { + if constexpr (emper::DEBUG) { + std::stringstream sst; + sst << "io_submit returned EBUSY trying to submit in addition to " << reqs_in_uring + << std::endl; + logI(sst.str()); + } else { + logI("io_submit returned EBUSY"); + } + + if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { + // we are not busy looping in the globalIo + stats.record_io_submit_full_cq(std::chrono::nanoseconds(0)); + return; + } + + TIME_NS( + { + do { + reapCompletions(); + } while ((submitted = io_uring_submit(&ring)) == -EBUSY); + }, + stats.record_io_submit_full_cq); + } + + if (unlikely(submitted < 0)) { + errno = -submitted; + DIE_MSG_ERRNO("io_uring_submit failed"); + } + + // We submitted some Futures to the io_uring + // Because we submit every Future right away multiple prepared sqes can only + // occur for Future chains. + // If we could not submit the whole chain cancel all non submitted Futures + // because we can not guaranty the soundness of the chain + // req1 -> invalid_req -> req3 + // will submit only 2 instead of all 3 prepared sqes + // See: https://github.com/axboe/liburing/issues/186 + if (unlikely(submitted < prepared)) { + unsigned unsubmitted = io_uring_sq_ready(&ring); + Future *unsubmittedFuture = &future; + while (unsubmitted) { + unsubmittedFuture->recordCompletion(stats, -ECANCELED); + if constexpr (callerEnvironment == EMPER) { + unsubmittedFuture->complete(-ECANCELED); + } else { + unsubmittedFuture->completeFromAnywhere(-ECANCELED); + } + unsubmittedFuture = future.dependency; + unsubmitted--; + } + } + + // io_uring will try to synchronously complete any IO request before + // offloading it to the async backend. See io_uring_enter(2). + // Immediate offloading can be enforced by setting the IOSQE_ASYNC flag in the sqe. + // Try to reap a possible synchronous completion if we are on a worker's io_uring. + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + reapCompletions<callerEnvironment>(); + } +} + +// show the compiler our template incarnations +template void IoContext::submit<CallerEnvironment::EMPER>(Future &future); +template void IoContext::submit<CallerEnvironment::ANYWHERE>(Future &future); + +template <CallerEnvironment callerEnvironment> +void IoContext::reapCompletions() { + // Someone else is currently reaping completions + if (unlikely(!cq_mutex.try_lock())) { + return; + } + + // never reap completions on the global IoContext + assert(this != getGlobalIo()); + + LOGD("Reaping completions"); + unsigned head; + struct io_uring_cqe *cqe; + unsigned count = 0; + + int err = io_uring_peek_cqe(&ring, &cqe); + if (err) { + if (err == -EAGAIN) { + goto unlock; + } + errno = -err; + DIE_MSG_ERRNO("io_uring_peek_cqe failed"); + } + + io_uring_for_each_cqe(&ring, head, cqe) { + count++; + + auto *future = reinterpret_cast<Future *>(io_uring_cqe_get_data(cqe)); + + // Got a CQE for a forgotten Future + if (!future) { + continue; + } + + if constexpr (emper::DEBUG) { + // assert that the future was previously in the uringFutureSet + assert(uringFutureSet.erase(future) > 0); + } + + future->recordCompletion(stats, cqe->res); + if constexpr (callerEnvironment == EMPER) { + future->complete(cqe->res); + } else { + future->completeFromAnywhere(cqe->res); + } + } + + LOGD("got " << count << " cqes from the io_uring"); + io_uring_cq_advance(&ring, count); + + if constexpr (emper::DEBUG) { + assert(count <= reqs_in_uring); + reqs_in_uring -= count; + } + + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + stats.record_worker_reaps(count); + } else { + stats.record_completer_reaps(count); + } + +unlock: + cq_mutex.unlock(); +} + +// Show the compiler our template incarnations +template void IoContext::reapCompletions<CallerEnvironment::ANYWHERE>(); +template void IoContext::reapCompletions<CallerEnvironment::EMPER>(); + +// The globalCompleter must be started after all worker io_uring eventfds are submitted +// so we don't have to synchronize the global sq +auto IoContext::globalCompleterFunc(void *arg) -> void * { + IoContext &io = *reinterpret_cast<IoContext *>(arg); + + io.logD("submit all worker io_uring eventfds"); + + // submit all eventfds in the SQ inserted by IoContext::submit_efd calls + int submitted = io_uring_submit(&io.ring); + if (unlikely(submitted < 0)) { + DIE_MSG_ERRNO("initial global io_uring submit failed"); + } + + // We have submitted all eventfds + assert(submitted == Runtime::getRuntime()->getWorkerCount()); + + io.logD("start global completer loop"); + while (true) { + struct io_uring_cqe *cqe; + + // wait for completions + int err = io_uring_wait_cqe(&io.ring, &cqe); + if (unlikely(err)) { + if (err == -EINTR) { + continue; + } + errno = -err; + perror("io_uring_wait_cqe"); + } + + auto data = (uintptr_t)io_uring_cqe_get_data(cqe); + + // The cqe is for a completed Future + if (!(data & IOCONTEXT_TAG)) { + auto *future = reinterpret_cast<Future *>(data); + + uint32_t res = cqe->res; + io_uring_cqe_seen(&io.ring, cqe); + + future->recordCompletion(io.stats, res); + future->completeFromAnywhere(res); + continue; + } + + // The cqe is for a IoContext.eventfd read + // -> there are completions on this worker IoContext + auto *worker_io = reinterpret_cast<IoContext *>(data & IOCONTEXT_TAG_MASK); + assert(worker_io); + + io_uring_cqe_seen(&io.ring, cqe); + + // re-add the eventfd read + struct io_uring_sqe *sqe = io_uring_get_sqe(&io.ring); + // we initialized the global ring big enough we should always get a free sqe + assert(sqe); + + io_uring_prep_read(sqe, worker_io->ring_eventfd, &worker_io->ring_eventfd_readbuf, + sizeof(worker_io->ring_eventfd_readbuf), 0); + io_uring_sqe_set_data(sqe, reinterpret_cast<void *>(data)); + + submitted = io_uring_submit(&io.ring); + + if (unlikely(submitted < 0)) { + errno = -submitted; + DIE_MSG_ERRNO("re-submitting eventfd read to global_ring failed"); + } + + assert(submitted == 1); + + worker_io->reapCompletions<CallerEnvironment::ANYWHERE>(); + } + + return nullptr; +} + +void IoContext::startGlobalCompleter(IoContext &globalIo) { + int err = pthread_create(&globalCompleter, nullptr, globalCompleterFunc, &globalIo); + if (unlikely(err)) { + errno = err; + DIE_MSG_ERRNO("Creating global completer thread failed"); + } +} + +IoContext::IoContext(size_t uring_entries) { + struct io_uring_params params; + memset(¶ms, 0, sizeof(params)); + + if constexpr (emper::IO_URING_SQPOLL) { + params.flags |= IORING_SETUP_SQPOLL; + } + + auto ret = io_uring_queue_init_params(uring_entries, &ring, ¶ms); + if (ret < 0) { + // skip tests if we are not able to setup an io_uring + perror("io_uring_queue_init failed"); + exit(77); + } + LOGD("Ring fd is " << ring.ring_fd); + + if constexpr (emper::IO_URING_SQPOLL) { + // included in liburing since 41e0d97cb23667df000ce76789297f4e06134a28 +#ifndef IORING_FEAT_SQPOLL_NONFIXED +#define IORING_FEAT_SQPOLL_NONFIXED (1U << 7) +#endif + if (!(params.features & IORING_FEAT_SQPOLL_NONFIXED)) { + DIE_MSG("io_uring_sqpoll defined but kernel does not support IORING_FEAT_SQPOLL_NONFIXED"); + } + } + + ring_eventfd = eventfd(0, 0); + if (unlikely(ring_eventfd < 0)) { + DIE_MSG_ERRNO("creating eventfd for io_uring failed"); + } + + if (unlikely(io_uring_register_eventfd(&ring, ring_eventfd) < 0)) { + DIE_MSG_ERRNO("io_uring_register_eventfd failed"); + } +} + +// This function must not be executed in parallel because it does not synchronize +// the global io_uring's SQ +void IoContext::submit_efd() { + // the global IoContext must be initialized to submit our eventfd + assert(getGlobalIo()); + + IoContext &globalIo = *getGlobalIo(); + + struct io_uring_sqe *sqe = io_uring_get_sqe(&globalIo.ring); + // we initialized the global ring big enough we should always get a free sqe + assert(sqe); + + io_uring_prep_read(sqe, ring_eventfd, &ring_eventfd_readbuf, sizeof(ring_eventfd_readbuf), 0); + auto *tagged_io_ptr = reinterpret_cast<void *>((uintptr_t)this | IOCONTEXT_TAG); + io_uring_sqe_set_data(sqe, tagged_io_ptr); + + // The sqe we prepared will be submitted to io_uring when the globalCompleter starts. +} + +IoContext::~IoContext() { + io_uring_queue_exit(&ring); + // TODO: check if this is safe + ::close(ring_eventfd); +} +} // namespace emper::io diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp new file mode 100644 index 0000000000000000000000000000000000000000..943f6206e6bc3cf5a8f1c34db74748101a38760f --- /dev/null +++ b/emper/io/IoContext.hpp @@ -0,0 +1,132 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#pragma once + +#include <liburing.h> // for io_uring +#include <pthread.h> // for pthread_t + +#include <atomic> // for atomic +#include <cassert> // for assert +#include <cstddef> // for size_t +#include <cstdint> // for uint64_t +#include <functional> // for less +#include <memory> // for allocator +#include <mutex> // for mutex + +#include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER +#include "Debug.hpp" // for LogSubsystem, LogSubsystem::IO, Logger +#include "Runtime.hpp" // for Runtime +#include "emper-config.h" // for EMPER_IO_WORKER_URING_ENTRIES +#include "io/Stats.hpp" // for Stats +#include "lib/adt/LockedSet.hpp" // for LockedSet + +namespace emper::io { +class Future; + +class IoContext : public Logger<LogSubsystem::IO> { + friend class ::Runtime; + friend class Future; + friend class SendFuture; + friend class RecvFuture; + + private: + static thread_local IoContext *workerIo; + + // pthread used to monitor the CQs from worker io_urings + // as well as handling IO requests submitted from anywhere + static pthread_t globalCompleter; + + /* function executed by the global completer thread */ + static auto globalCompleterFunc(void *) -> void *; + + // start the global completer thread + // this must be called after all worker IoContexts' eventfds are submitted + // to the global IoContext + static void startGlobalCompleter(IoContext &globalIo); + + // Mutex protecting the completion queue of ring. + // It is used with try_lock() in reapCompletions. + std::mutex cq_mutex; + struct io_uring ring; + + // eventfd registered with ring. + // It is used to notify the globalCompleter about available completions in ring's CQ. + int ring_eventfd; + uint64_t ring_eventfd_readbuf; + + // submit the eventfd of the io_uring to the global IoContext + // All eventfds must be registered before the globalCompleter thread is started + void submit_efd(); + + Stats stats; + + // Members useful for debugging + std::atomic<long> reqs_in_uring = 0; + + // set containing all Futures currently in the io_uring for debugging + emper::lib::adt::LockedSet<Future *> uringFutureSet; + + /** + * @brief recursively prepare a Future and its dependencies + * + * Futures prepared as dependency will have disabled partial completion + * support and are submitted to io_uring with the IOSQE_IO_LINK flag + * set. + * Abort on preparing a longer list then there a places in the io_uring. + * + * @return the number of prepared Futures + */ + auto prepareFutureChain(Future &future, unsigned chain_length) -> unsigned; + + inline void setWorkerIo() { workerIo = this; } + + /** + * @brief Return the globalIo of the current Runtime + */ + static inline auto getGlobalIo() -> IoContext * { return Runtime::getRuntime()->getGlobalIo(); } + + public: + IoContext(size_t uring_entries); + IoContext() : IoContext(EMPER_IO_WORKER_URING_ENTRIES){}; + ~IoContext(); + + /** + * @brief get IoContext of the current worker + * + * This function must be called from with in the emper runtime + * + * @return the thread_local IoContext of the current worker + */ + static inline auto getWorkerIo() -> IoContext * { + assert(Runtime::inRuntime()); + return workerIo; + } + + /** + * @brief Return either the worker or the globalIo dependent on the callerEnvironment + */ + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + static inline auto getIo() -> IoContext * { + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + return getWorkerIo(); + } else { + return getGlobalIo(); + } + } + + /** + * @brief Submit a future for asynchronous completion in this IoContext + * + * @param future The Future which should be completed. The Future object + * must stay valid until it is completed. + */ + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + void submit(Future &future); + + /** + * @brief Schedule all fibers waiting on completed IO + */ + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + void reapCompletions(); +}; +} // namespace emper::io diff --git a/emper/io/Operation.cpp b/emper/io/Operation.cpp new file mode 100644 index 0000000000000000000000000000000000000000..83a67abef68c6c3648f05008b120dbb8734ad087 --- /dev/null +++ b/emper/io/Operation.cpp @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include "io/Operation.hpp" + +#include <cstdlib> // for abort +#include <ostream> // for operator<<, stringstream, basi... + +namespace emper::io { +auto operator<<(std::ostream& os, const Operation& op) -> std::ostream& { + switch (op) { + case Operation::SEND: + os << "send"; + break; + case Operation::RECV: + os << "recv"; + break; + case Operation::CONNECT: + os << "connect"; + break; + case Operation::ACCEPT: + os << "accept"; + break; + case Operation::READ: + os << "read"; + break; + case Operation::WRITE: + os << "write"; + break; + case Operation::WRITEV: + os << "writev"; + break; + case Operation::CLOSE: + os << "close"; + break; + case Operation::LINK_TIMEOUT: + os << "linked timeout"; + break; + case Operation::TIMEOUT: + os << "timeout"; + break; + case Operation::CANCEL: + os << "cancel"; + break; + default: + abort(); + } + return os; +} +} // namespace emper::io diff --git a/emper/io/Operation.hpp b/emper/io/Operation.hpp new file mode 100644 index 0000000000000000000000000000000000000000..498919293214983e1174e86aedb315e63361dbe4 --- /dev/null +++ b/emper/io/Operation.hpp @@ -0,0 +1,26 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#pragma once + +#include <ostream> + +namespace emper::io { + +enum class Operation { + SEND = 0, + RECV, + CONNECT, + ACCEPT, + READ, + WRITE, + WRITEV, + CLOSE, + LINK_TIMEOUT, + TIMEOUT, + CANCEL, + NUMBER_OF_OPERATIONS +}; + +auto operator<<(std::ostream& os, const Operation& o) -> std::ostream&; + +} // namespace emper::io diff --git a/emper/io/Stats.cpp b/emper/io/Stats.cpp new file mode 100644 index 0000000000000000000000000000000000000000..42ac4c4cb852b8fccb0d45038d3766e043deec58 --- /dev/null +++ b/emper/io/Stats.cpp @@ -0,0 +1,160 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Fischer +#include "io/Stats.hpp" + +#include <cassert> // for assert +#include <initializer_list> // for initializer_list +#include <iomanip> // for operator<<, setfill, setw +#include <string> // for operator<<, to_string +#include <utility> // for pair + +#include "io/Operation.hpp" // for Operation, operator<<, Operation::RECV + +namespace emper::io { + +std::vector<Stats*> Stats::workerStats; + +auto operator<<(std::ostream& os, const Stats::CompletionType& t) -> std::ostream& { + switch (t) { + case Stats::CompletionType::FullCompletion: + os << "full completion"; + break; + case Stats::CompletionType::ErrorCompletion: + os << "error completion"; + break; + case Stats::CompletionType::PartialCompletion: + os << "partial completion"; + break; + case Stats::CompletionType::PartialResubmission: + os << "partial resubmission"; + break; + case Stats::CompletionType::IncrementalCompletion: + os << "incremental completion"; + break; + case Stats::CompletionType::IncrementalError: + os << "incremental error"; + break; + default: + abort(); + } + return os; +} + +auto operator<<(std::ostream& os, const Stats& s) -> std::ostream& { + if (s.workerId == -1) { + bool futuresToPrint = false; + std::stringstream ss; + ss << "# global IO Stats: #" << std::endl; + // Only PartialCompletableFutures can submitted to the globalIo + for (const auto& op : {Operation::SEND, Operation::READ, Operation::WRITE}) { + const auto& op_map = s.io_uring_completions.at(op); + + ss << op << ": "; + ss << std::left << std::setfill(' ') << std::setw(8) << ss.str(); + + for (const auto& tp : op_map) { + auto completionType = tp.first; + auto count = tp.second; + + if (count > 0) { + futuresToPrint = true; + } + + ss << count << " " << completionType << " "; + } + ss << std::endl; + } + + if (futuresToPrint) { + os << ss.str(); + } + + return os; + } + + if (s.workerId == -2) { + os << "# AVG IO worker Stats: #" << std::endl; + } else { + os << "# IO Stats of worker " << std::to_string(s.workerId) << ": #" << std::endl; + } + + for (const auto& pp : s.io_uring_completions) { + const auto& op = pp.first; + const auto& op_map = pp.second; + + std::stringstream ss; + ss << op << ": "; + os << std::left << std::setfill(' ') << std::setw(16) << ss.str(); + + for (const auto& tp : op_map) { + auto completionType = tp.first; + auto count = tp.second; + os << count << " " << completionType << " "; + } + + os << std::endl; + } + + if (s.workerId == -1) { + return os; + } + + os << "io_uring_submit reap completion loops: " << s.io_submit_full_cq << " taking " + << s.io_submit_full_cq_running_mean << "ns on average" << std::endl; + + os << "worker reaped completions " << s.worker_reap << " times reaping " + << s.worker_reaped_completions << " completions in total" << std::endl; + + os << "global completer thread reaped completions " << s.completer_reap << " times reaping " + << s.completer_reaped_completions << " completions in total" << std::endl; + + return os; +} + +static void calcRunningAvg(uint64_t& avg, int n, uint64_t x) { + int64_t diff = ((int64_t)x - (int64_t)avg) / n; + assert(diff >= 0 || (uint64_t)-diff <= avg); + avg += diff; + assert(avg < 1000000); +} + +void Stats::printWorkerStats() { + auto* globalStats = workerStats.front(); + workerStats.erase(workerStats.begin()); + std::cout << *globalStats << std::endl; + + // Use a stats object to calculate the averages + Stats avgs; + // mark as averages + avgs.workerId = -2; + + // calculate avgs + int i = 1; + for (auto& stats : workerStats) { + for (const auto& op_pair : stats->io_uring_completions) { + auto op = op_pair.first; + auto op_map = op_pair.second; + + for (const auto& compl_pair : op_map) { + auto compl_typ = compl_pair.first; + auto compl_count = compl_pair.second; + calcRunningAvg(avgs.io_uring_completions[op][compl_typ], i, compl_count); + } + } + + calcRunningAvg(avgs.io_submit_full_cq, i, stats->io_submit_full_cq); + calcRunningAvg(avgs.io_submit_full_cq_running_mean, i, stats->io_submit_full_cq_running_mean); + + calcRunningAvg(avgs.completer_reap, i, stats->completer_reap); + calcRunningAvg(avgs.completer_reaped_completions, i, stats->completer_reaped_completions); + + calcRunningAvg(avgs.worker_reap, i, stats->worker_reap); + calcRunningAvg(avgs.worker_reaped_completions, i, stats->worker_reaped_completions); + ++i; + } + + // print avgs + std::cout << avgs << std::endl; +} + +} // namespace emper::io diff --git a/emper/io/Stats.hpp b/emper/io/Stats.hpp new file mode 100644 index 0000000000000000000000000000000000000000..08824aa151e47c2e57e46f75ef0ddd998ce79f22 --- /dev/null +++ b/emper/io/Stats.hpp @@ -0,0 +1,205 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Fischer +#pragma once +#include <bits/types/struct_iovec.h> // for iovec +#include <sys/types.h> // for ssize_t + +#include <chrono> // for nanoseconds +#include <cstddef> // for size_t +#include <cstdint> // for uint64_t, int32_t, uint32_t +#include <cstdlib> // for abort +#include <iostream> // for operator<<, basic_ostream, basi... +#include <map> // for map, map<>::value_compare +#include <vector> // for vector + +#include "Debug.hpp" // for LOGW +#include "Emper.hpp" // for STATS +#include "emper-common.h" // for workerid_t +#include "io/Future.hpp" // for PartialCompletableFuture, Write... +#include "io/Operation.hpp" // for Operation, Operation::RECV, Ope... +class Runtime; // lines 28-28 + +#define RETURN_IF_NO_STATS() \ + if constexpr (!emper::STATS) { \ + return; \ + } + +namespace emper::io { + +using emper::io::Operation; +using std::chrono::nanoseconds; + +class Stats { + friend class ::Runtime; + friend class IoContext; + friend class Future; + friend class PartialCompletableFuture; + friend class SendFuture; + friend class RecvFuture; + friend class WritevFuture; + + int workerId = 0; + static_assert(sizeof(int) > sizeof(workerid_t)); + + static std::vector<Stats*> workerStats; + + enum CompletionType { + FullCompletion = 0, + ErrorCompletion, + PartialCompletion, + PartialResubmission, + IncrementalCompletion, + IncrementalError, + END_OF_COMPLETION_TYPES + }; + + /** + * @brief Maps containing all completion events + * + * The first dimension are the different Operations defined by emper::io::Operation + * + * The second dimension is used to differentiate the available completion types per Operation + * 0: full completions + * 1: error completions + * 2: partial completions + * 3: incremental completions + * 4: incremental error + * 5: partial resubmissions + * 6: immediate resubmissions + */ + + typedef std::map<Operation, std::map<CompletionType, uint64_t>> CompletionMap; + + CompletionMap io_uring_completions = { + {Operation::SEND, + {{FullCompletion, 0}, + {ErrorCompletion, 0}, + {PartialCompletion, 0}, + {IncrementalCompletion, 0}, + {IncrementalError, 0}, + {PartialResubmission, 0}}}, + {Operation::RECV, {{FullCompletion, 0}, {ErrorCompletion, 0}, {PartialCompletion, 0}}}, + {Operation::CONNECT, {{FullCompletion, 0}, {ErrorCompletion, 0}}}, + {Operation::ACCEPT, {{FullCompletion, 0}, {ErrorCompletion, 0}}}, + {Operation::READ, + {{FullCompletion, 0}, + {ErrorCompletion, 0}, + {PartialCompletion, 0}, + {IncrementalCompletion, 0}, + {IncrementalError, 0}, + {PartialResubmission, 0}}}, + {Operation::WRITE, + {{FullCompletion, 0}, + {ErrorCompletion, 0}, + {PartialCompletion, 0}, + {IncrementalCompletion, 0}, + {IncrementalError, 0}, + {PartialResubmission, 0}}}, + {Operation::WRITEV, {{FullCompletion, 0}, {ErrorCompletion, 0}, {PartialCompletion, 0}}}, + {Operation::CLOSE, {{FullCompletion, 0}, {ErrorCompletion, 0}}}, + {Operation::LINK_TIMEOUT, {{FullCompletion, 0}, {ErrorCompletion, 0}}}, + {Operation::TIMEOUT, {{FullCompletion, 0}, {ErrorCompletion, 0}}}, + {Operation::CANCEL, {{FullCompletion, 0}, {ErrorCompletion, 0}}}, + }; + uint64_t io_submit_full_cq = 0; + uint64_t io_submit_full_cq_running_mean = 0; + + uint64_t completer_reap = 0; + uint64_t completer_reaped_completions = 0; + + uint64_t worker_reap = 0; + uint64_t worker_reaped_completions = 0; + + static void printWorkerStats(); + + Stats() { + if constexpr (emper::STATS) { + workerStats.push_back(this); + } + } + + void recordCompletion(const Future& f, const int32_t& res) { + record_completion(f.op, res, PartialCompletableFuture::DISABLE_PARTIAL_COMPLETION, f.len); + } + + void recordCompletion(const PartialCompletableFuture& f, const int32_t& res) { + record_completion(f.op, res, f.partialCompletion, f.len); + } + + void recordCompletion(const WritevFuture& f, const int32_t& res) { + size_t exp = 0; + auto iov = reinterpret_cast<const struct iovec*>(f.buf); + for (unsigned i = 0; i < f.len; ++i) { + exp += iov[i].iov_len; + } + + record_completion(Operation::WRITEV, res, PartialCompletableFuture::DISABLE_PARTIAL_COMPLETION, + exp); + } + + void record_completion(Operation op, int32_t res, ssize_t partial_completion, size_t exp) { + auto& operation_map = io_uring_completions[op]; + + // Persistent Error + if (res < 0) { + if (partial_completion > 0) { + operation_map[IncrementalError]++; + } else { + operation_map[ErrorCompletion]++; + } + return; + } + + // Full completion + if (!exp || (uint32_t)res == exp) { + operation_map[FullCompletion]++; + return; + } + + // we expect partial completion + if (partial_completion != PartialCompletableFuture::DISABLE_PARTIAL_COMPLETION) { + if ((size_t)(res + partial_completion) < exp) { + operation_map[PartialResubmission]++; + } else { + operation_map[IncrementalCompletion]++; + } + return; + } + + if ((uint32_t)res < exp) { + operation_map[PartialCompletion]++; + return; + } + + LOGW("Unexpected completion case in IoStats"); + LOGW(op << ", " << res << ", " << partial_completion << ", " << exp); + abort(); + } + + // running mean calculation taken from + // https://math.stackexchange.com/questions/106700/incremental-averageing + inline void record_io_submit_full_cq(nanoseconds ns) { + RETURN_IF_NO_STATS(); + io_submit_full_cq++; + int64_t diff = (ns.count() - io_submit_full_cq_running_mean) / io_submit_full_cq; + io_submit_full_cq_running_mean += diff; + } + + inline void record_completer_reaps(unsigned count) { + RETURN_IF_NO_STATS(); + + completer_reap++; + completer_reaped_completions += count; + } + + inline void record_worker_reaps(unsigned count) { + RETURN_IF_NO_STATS(); + + worker_reap++; + worker_reaped_completions += count; + } + + friend auto operator<<(std::ostream& os, const Stats& s) -> std::ostream&; + friend auto operator<<(std::ostream& os, const Stats::CompletionType& t) -> std::ostream&; +}; +} // namespace emper::io diff --git a/emper/io/io.cpp b/emper/io/io.cpp new file mode 100644 index 0000000000000000000000000000000000000000..140f91c9513cb84e1e3695fbeaec3997d0b062b9 --- /dev/null +++ b/emper/io/io.cpp @@ -0,0 +1,71 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include "io.hpp" + +#include <arpa/inet.h> +#include <netinet/in.h> +#include <sys/socket.h> + +#include <cerrno> +#include <cstring> + +#include "Common.hpp" +#include "Debug.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" + +#ifndef EMPER_LOG_OFF +#include <ostream> +#endif + +namespace emper::io { +auto tcp_listener(std::string& host, int& port, const std::function<void(int)>& handler, + int backlog) -> Fiber* { + int listen_socket; + struct sockaddr_in servaddr; + + // socket creation and verification + listen_socket = socket(AF_INET, SOCK_STREAM, 0); + if (listen_socket == -1) { + LOGE("listen socket creation failed: " << strerror(errno)); + return nullptr; + } + memset(&servaddr, 0, sizeof(servaddr)); + + // assign IP, PORT + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = inet_addr(host.c_str()); + servaddr.sin_port = htons(port); + + int reuseaddr = 1; + if (setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1) { + LOGE("setsockopt failed: " << strerror(errno)); + return nullptr; + } + + if (bind(listen_socket, (sockaddr*)&servaddr, sizeof(servaddr)) == -1) { + LOGE("bind failed: " << strerror(errno)); + return nullptr; + } + + if (listen(listen_socket, backlog) != 0) { + LOGE("listen failed: " << strerror(errno)); + return nullptr; + } + + Runtime* runtime = Runtime::getRuntime(); + + return Fiber::from([=] { + for (;;) { + int client_fd = acceptAndWait(listen_socket, nullptr, nullptr); + if (unlikely(client_fd < 0)) { + LOGE("accept failed: " << strerror(errno)); + return; + } + + Fiber* handler_fiber = Fiber::from([=] { handler(client_fd); }); + runtime->schedule(*handler_fiber); + } + }); +} +} // namespace emper::io diff --git a/emper/io/meson.build b/emper/io/meson.build new file mode 100644 index 0000000000000000000000000000000000000000..75ee73eb1b8dae222ff092f0ab665730d0bc3003 --- /dev/null +++ b/emper/io/meson.build @@ -0,0 +1,9 @@ +emper_io_include = include_directories('.') +emper_library_include += [emper_io_include] +emper_cpp_sources += files( + 'io.cpp', + 'Future.cpp', + 'Stats.cpp', + 'Operation.cpp', + 'IoContext.cpp', +) diff --git a/emper/lib/adt/LockedSet.hpp b/emper/lib/adt/LockedSet.hpp new file mode 100644 index 0000000000000000000000000000000000000000..e3e1b1928a1e037cd3f2dbcf391c106c9f39a4a6 --- /dev/null +++ b/emper/lib/adt/LockedSet.hpp @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Fischer +#pragma once + +#include <iterator> +#include <mutex> +#include <set> + +namespace emper::lib::adt { + +template <class Key, class Compare = std::less<Key>, class Allocator = std::allocator<Key>> +class LockedSet { + private: + std::mutex _mutex; + std::set<Key, Compare, Allocator> _set; + + public: + auto insert(const Key& item) + -> std::pair<typename std::set<Key, Compare, Allocator>::iterator, bool> { + std::lock_guard<std::mutex> lock(_mutex); + return _set.insert(item); + } + + template <class InputIt> + void insert(InputIt first, InputIt last) { + std::lock_guard<std::mutex> lock(_mutex); + _set.insert(first, last); + } + + size_t erase(const Key& key) { + std::lock_guard<std::mutex> lock(_mutex); + return _set.erase(key); + } +}; +} // namespace emper::lib::adt diff --git a/emper/meson.build b/emper/meson.build index 0ffa56160b4a22d8ee0ca52957e3b56842e15181..c3c85bb024f56e138e443e8e7de7e9ecab61a6d4 100644 --- a/emper/meson.build +++ b/emper/meson.build @@ -37,6 +37,7 @@ subdir('include') emper_library_include = [] emper_library_include += include_directories('.') +subdir('io') subdir('lib') subdir('strategies') diff --git a/meson.build b/meson.build index ba757b68be9c486b08dc37ead46d57e3ed1e4a1f..00d1255221521dccfdaa1c4d64582bee989891ab 100644 --- a/meson.build +++ b/meson.build @@ -12,7 +12,8 @@ project('EMPER', 'c', 'cpp', add_project_arguments('-Wno-non-virtual-dtor', language: 'cpp') thread_dep = dependency('threads') -emper_dependencies = [thread_dep] +uring_dep = dependency('liburing') +emper_dependencies = [thread_dep, uring_dep] run_target('iwyu', command: 'tools/check-iwyu') @@ -32,6 +33,7 @@ conf_data.set('EMPER_OVERFLOW_QUEUE', get_option('overflow_queue')) conf_data.set('EMPER_LOCKED_MPSC_QUEUE', get_option('locked_mpsc_queue')) conf_data.set('EMPER_STATS', get_option('stats')) conf_data.set('EMPER_OVERFLOW_QUEUE', get_option('overflow_queue')) +conf_data.set('EMPER_BLOCKED_CONTEXT_SET', get_option('blocked_context_set')) default_scheduling_strategy = get_option('default_scheduling_strategy') conf_data.set('EMPER_DEFAULT_SCHEDULING_STRATEGY_' + default_scheduling_strategy.to_upper(), true) @@ -47,6 +49,31 @@ endif conf_data.set('EMPER_LOG_LEVEL', log_level) conf_data.set('EMPER_LOG_TIMESTAMP', get_option('log_timestamp')) +option_io = get_option('io') +if option_io + conf_data.set('EMPER_IO', true) +endif + +io_bool_options = [ + 'uring_sqpoll', +] + +io_raw_options = [ + 'worker_uring_entries', +] + +foreach option : io_bool_options + value = get_option('io_' + option) + if value == true and not option_io + error('io_' + option + ' defined without io') + endif + conf_data.set('EMPER_IO_' + option.to_upper(), value) +endforeach + +foreach option : io_raw_options + conf_data.set('EMPER_IO_' + option.to_upper(), get_option('io_' + option)) +endforeach + subdir('emper') subdir('tests') subdir('apps') diff --git a/meson_options.txt b/meson_options.txt index e4f8a399783aaa724905ecb68f52a9fad9a7ba63..3fa7046edbbeb637f20992cabfafab10626d40cf 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -10,6 +10,12 @@ option( choices: ['automatic', 'OFF', 'Error', 'Warning', 'Info', 'Debug', 'FineDebug', 'FinerDebug', 'FinestDebug', 'ALL'], value: 'automatic', ) +option( + 'blocked_context_set', + type: 'boolean', + value: false, + description: 'Store all currently blocked contexts for debugging in the set Blockable<LogLevel>::blocked' +) option( 'log_timestamp', type: 'boolean', @@ -68,3 +74,21 @@ option( value: true, description: 'Use an overflow queue in case scheduling queues become full', ) +option( + 'io', + type: 'boolean', + value: true, + description: 'Add support for asynchronous IO using io_uring' +) +option( + 'io_worker_uring_entries', + type: 'integer', + value: 16, + description: 'Number of entries in each worker io_uring' +) +option( + 'io_uring_sqpoll', + type: 'boolean', + value: false, + description: 'Enable io_urings SQPOLL feature (start a separate kernel thread which polls the sq reducing the amount of syscalls to submit new requests. This is a privileged operation.).' +) diff --git a/tests/AlarmFutureTest.cpp b/tests/AlarmFutureTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d6fe915bfc3acbbe52f6452f73181007cb670206 --- /dev/null +++ b/tests/AlarmFutureTest.cpp @@ -0,0 +1,31 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <linux/time_types.h> // for __kernel_timespec + +#include <cassert> // for assert +#include <cerrno> // for ETIME +#include <chrono> // for microseconds, duration_cast, operator- +#include <compare> // for operator>=, strong_ordering +#include <cstdint> // for int32_t +#include <cstdlib> // for exit, EXIT_SUCCESS + +#include "io/Future.hpp" // for AlarmFuture + +using emper::io::AlarmFuture; + +void emperTest() { + struct __kernel_timespec ts = {.tv_sec = 1, .tv_nsec = 0}; + AlarmFuture alarm(ts); + + auto start = std::chrono::high_resolution_clock::now(); + int32_t res = alarm.submitAndWait(); + auto end = std::chrono::high_resolution_clock::now(); + + // timeouts return -ETIME if they trigger + assert(res == -ETIME); + + assert(std::chrono::duration_cast<std::chrono::microseconds>(end - start) >= + std::chrono::seconds(1)); + + exit(EXIT_SUCCESS); +} diff --git a/tests/CancelFutureTest.cpp b/tests/CancelFutureTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..21eaa79fa71d56267be77fe6a907a1fa5f6e5809 --- /dev/null +++ b/tests/CancelFutureTest.cpp @@ -0,0 +1,62 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <sys/eventfd.h> // for eventfd, EFD_SEMAPHORE + +#include <cassert> // for assert +#include <cerrno> // for ECANCELED, ETIME +#include <cstdint> // for uint64_t, int32_t +#include <cstdlib> // for exit, EXIT_SUCCESS + +#include "Common.hpp" // for DIE_MSG_ERRNO +#include "io/Future.hpp" // for ReadFuture, WriteFuture + +using emper::io::ReadFuture; +using emper::io::WriteFuture; + +void emperTest() { + int efd = eventfd(0, 0); + if (efd == -1) { + DIE_MSG_ERRNO("eventfd failed"); + } + + uint64_t read_buf; + uint64_t write_buf = 42; + // cancel not submitted Future + ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); + assert(readFuture.cancel() == -ENOENT); + + // cancel submitted non-completed Future + readFuture.submit(); + assert(readFuture.cancel() == -ECANCELED); + + readFuture.reset(); + + readFuture.submit(); + WriteFuture writeFuture(efd, &write_buf, sizeof(write_buf), 0); + assert(writeFuture.submitAndWait() == sizeof(write_buf)); + assert(readFuture.cancel() == sizeof(write_buf) && read_buf == write_buf); + + writeFuture.reset(); + read_buf = 0; + readFuture.reset(); + + // cancel a chain + ReadFuture readFuture2(efd, &read_buf, sizeof(read_buf), 0); + readFuture2.setDependency(readFuture); + readFuture2.submit(); + assert(readFuture2.cancel() == -ECANCELED); + assert(readFuture.wait() == -ECANCELED); + + readFuture2.reset(); + readFuture.reset(); + + // cancel partial completed chain + readFuture2.submit(); + assert(writeFuture.submitAndWait() == sizeof(write_buf)); + // TODO: investigate why this read is completed with -EINTR most of the time + int r = readFuture2.cancel(); + assert(r == -EINTR || r == -ECANCELED); + assert(readFuture.wait() == sizeof(write_buf) && read_buf == write_buf); + + exit(EXIT_SUCCESS); +} diff --git a/tests/ConcurrentNetworkEchoTest.cpp b/tests/ConcurrentNetworkEchoTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..864065c082a3e18be1ae73c2cc4b799d54454348 --- /dev/null +++ b/tests/ConcurrentNetworkEchoTest.cpp @@ -0,0 +1,61 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <cstdlib> +#include <iostream> +#include <string> +#include <vector> + +#include "CountingPrivateSemaphore.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" +#include "emper.hpp" +#include "fixtures/network.hpp" +#include "io.hpp" + +#define PORT 4241 +#define BUF_SIZE 1024 + +auto main(int argc, char* argv[]) -> int { + unsigned int client_count = 10; + unsigned int echos = 1000; + int port = PORT; + std::string host = "127.0.0.1"; + + if (argc > 3) { + std::cerr << "Usage: " << argv[0] << " [clients] [echos]" << std::endl; + exit(EXIT_FAILURE); + } + + const int DECIMAL = 10; + if (argc > 1) { + client_count = strtol(argv[1], nullptr, DECIMAL); + } + + if (argc > 2) { + echos = strtol(argv[2], nullptr, DECIMAL); + } + + Runtime runtime; + + runtime.scheduleFromAnywhere(*emper::io::tcp_listener(host, port, echo_serve)); + + Fiber* client_producer = Fiber::from([&] { + CPS cps; + for (unsigned int i = 0; i < client_count; ++i) { + spawn( + [&] { + const std::vector<std::string> msgs = {"foo"}; + echo_client("127.0.0.1", PORT, msgs, echos); + }, + cps); + } + cps.wait(); + exit(EXIT_SUCCESS); + }); + + runtime.scheduleFromAnywhere(*client_producer); + + runtime.waitUntilFinished(); + + return EXIT_FAILURE; +} diff --git a/tests/IncrementalCompletionTest.cpp b/tests/IncrementalCompletionTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..229ce1783c013b0ba8b63e31fc8e3c017da3fd61 --- /dev/null +++ b/tests/IncrementalCompletionTest.cpp @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Fischer +#include <unistd.h> // for pipe + +#include <cassert> // for assert +#include <cstdint> // for uint64_t, int32_t +#include <cstdlib> // for exit, EXIT_SUCCESS +#include <cstring> // for memcmp + +#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG +#include "io/Future.hpp" // for ReadFuture, CloseFuture, WriteFuture + +using emper::io::CloseFuture; +using emper::io::ReadFuture; +using emper::io::WriteFuture; + +static const size_t MEMSIZE = 10 << 20; + +void emperTest() { + auto* memOut = new char[MEMSIZE]; + auto* memIn = new char[MEMSIZE]; + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + int pipefds[2]; + + if (pipe(pipefds) == -1) { + DIE_MSG_ERRNO("pipe failed"); + } + + WriteFuture writeFuture(pipefds[1], memOut, MEMSIZE, 0, true); + writeFuture.submit(); + + ReadFuture readFuture(pipefds[0], memIn, MEMSIZE, 0, true); + readFuture.submit(); + + int32_t res = writeFuture.wait(); + assert(res == MEMSIZE); + + res = readFuture.wait(); + assert(res == MEMSIZE); + + assert(memcmp(memOut, memIn, MEMSIZE) == 0); + + CloseFuture closeWriteSide(pipefds[1]); + assert(closeWriteSide.submitAndWait() == 0); + + CloseFuture closeReadSide(pipefds[0]); + assert(closeReadSide.submitAndWait() == 0); + + delete[] memOut; + delete[] memIn; + + exit(EXIT_SUCCESS); +} diff --git a/tests/LinkFutureTest.cpp b/tests/LinkFutureTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..eb7289ba409e9942565882276201dad6f07ee0b8 --- /dev/null +++ b/tests/LinkFutureTest.cpp @@ -0,0 +1,64 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <sys/eventfd.h> // for eventfd +#include <sys/types.h> // for ssize_t + +#include <cassert> // for assert +#include <cerrno> // for EBADF, ECANCELED +#include <cstdint> // for uint64_t, int32_t +#include <cstdlib> // for exit, EXIT_SUCCESS + +#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG +#include "io/Future.hpp" // for ReadFuture, CloseFuture, WriteFuture + +using emper::io::CloseFuture; +using emper::io::ReadFuture; +using emper::io::WriteFuture; + +static void successChain() { + int efd = eventfd(0, 0); + if (efd == -1) { + DIE_MSG_ERRNO("eventfd failed"); + } + + uint64_t write_buf = 42; + WriteFuture writeFuture(efd, &write_buf, sizeof(write_buf), 0); + + uint64_t read_buf; + ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); + readFuture.setDependency(writeFuture); + + CloseFuture closeFuture(efd); + closeFuture.setDependency(readFuture); + ssize_t res = closeFuture.submitAndWait(); + + if (res == -1) { + DIE_MSG_ERRNO("linked requests chain failed"); + } + + if (read_buf != 42) { + DIE_MSG("dependent read value differs from written value"); + } +} + +static void failureChain() { + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char buf[32]; + ReadFuture invalidReadFuture(42, &buf, sizeof(buf), 0); + + ReadFuture readFuture(0, &buf, sizeof(buf), 0); + readFuture.setDependency(invalidReadFuture); + + int32_t res = readFuture.submitAndWait(); + assert(res == -ECANCELED); + + res = invalidReadFuture.wait(); + assert(res == -EBADF); +} + +void emperTest() { + successChain(); + failureChain(); + + exit(EXIT_SUCCESS); +} diff --git a/tests/ReuseBpsTest.cpp b/tests/ReuseBpsTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..82d8a06aea05509f2d38fd21780b2f090fb3c1a9 --- /dev/null +++ b/tests/ReuseBpsTest.cpp @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#include <cstdlib> // for exit, EXIT_SUC... + +#include "BinaryPrivateSemaphore.hpp" // for BPS +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "emper.hpp" // for spawn + +void emperTest() { + CPS cps; + BPS bps1, bps2; + + const int ITERATIONS = 100; + + spawn( + [&bps1, &bps2] { + for (int i = 0; i < ITERATIONS; ++i) { + bps1.wait(); + bps1.reset(); + bps2.signal(); + } + }, + cps); + + spawn( + [&bps1, &bps2] { + for (int i = 0; i < ITERATIONS; ++i) { + bps1.signal(); + bps2.wait(); + bps2.reset(); + } + }, + cps); + + // Wait for the fibers to finish. + cps.wait(); + + exit(EXIT_SUCCESS); +} diff --git a/tests/ReuseFutureTest.cpp b/tests/ReuseFutureTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7bc930c8885ee7872a2f323fa60a23a3b4bd9472 --- /dev/null +++ b/tests/ReuseFutureTest.cpp @@ -0,0 +1,66 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <sys/eventfd.h> // for eventfd, EFD_SEMAPHORE + +#include <cerrno> // for errno +#include <cstdint> // for uint64_t, int32_t +#include <cstdlib> // for exit, EXIT_SUCCESS + +#include "Common.hpp" // for DIE_MSG_ERRNO +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "emper.hpp" // for spawn +#include "io/Future.hpp" // for ReadFuture, WriteFuture + +using emper::io::ReadFuture; +using emper::io::WriteFuture; + +void emperTest() { + CPS cps; + int efd = eventfd(0, EFD_SEMAPHORE); + if (efd == -1) { + DIE_MSG_ERRNO("eventfd failed"); + } + + uint64_t read_buf; + ReadFuture read_future(efd, &read_buf, sizeof(read_buf), 0); + + uint64_t write_buf = 1; + WriteFuture write_future(efd, &write_buf, sizeof(write_buf), 0); + + const int ITERATIONS = 100; + + spawn( + [&read_future] { + for (int i = 0; i < ITERATIONS; ++i) { + int32_t res = read_future.submitAndWait(); + if (res < 0) { + errno = -res; + DIE_MSG_ERRNO("read failed"); + } + + // reset the BPS used to signal the completion of this future + read_future.reset(); + } + }, + cps); + + spawn( + [&write_future] { + for (int i = 0; i < ITERATIONS; ++i) { + int32_t res = write_future.submitAndWait(); + if (res < 0) { + errno = -res; + DIE_MSG_ERRNO("write failed"); + } + + // reset the BPS used to signal the completion of this future + write_future.reset(); + } + }, + cps); + + // Wait for the fibers to finish. + cps.wait(); + + exit(EXIT_SUCCESS); +} diff --git a/tests/SimpleDiskAndNetworkTest.cpp b/tests/SimpleDiskAndNetworkTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f9caa677719e848b6c71099e557f6418ff145452 --- /dev/null +++ b/tests/SimpleDiskAndNetworkTest.cpp @@ -0,0 +1,178 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <arpa/inet.h> // for inet_addr +#include <bits/types/struct_iovec.h> // for iovec +#include <fcntl.h> // for open, O_RDONLY +#include <netinet/in.h> // for sockaddr_in, htons, in_addr +#include <sys/socket.h> // for bind, listen, setsockopt +#include <unistd.h> // for close + +#include <cassert> // for assert +#include <cstdlib> // for mkstemp, exit, EXIT_SUCCESS +#include <cstring> // for memcmp, memset +#include <memory> // for allocator, unique_ptr +#include <string> // for string +#include <vector> // for vector + +#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "emper.hpp" // for spawn +#include "fixtures/network.hpp" // for echo_client +#include "io.hpp" // for readFile, accept, recv, send +#include "io/Future.hpp" // for Future + +#define PORT 4243 +#define MAX 1024 +static void server_func() { + int sockfd; + struct sockaddr_in servaddr; + + // socket create and varification + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd == -1) { + DIE_MSG_ERRNO("socket creation failed"); + } + memset(&servaddr, 0, sizeof(servaddr)); + + // assign IP, PORT + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = inet_addr("127.0.0.1"); + servaddr.sin_port = htons(PORT); + + int reuseaddr = 1; + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1) { + DIE_MSG_ERRNO("setsockopt failed"); + } + if (bind(sockfd, (sockaddr*)&servaddr, sizeof(servaddr)) == -1) { + DIE_MSG_ERRNO("bind failed"); + } + if (listen(sockfd, 1) != 0) { + DIE_MSG_ERRNO("listen failed"); + } + + struct sockaddr_in clientaddr; + socklen_t clientaddr_len = sizeof(clientaddr); + auto client_fd = + emper::io::accept(sockfd, reinterpret_cast<struct sockaddr*>(&clientaddr), &clientaddr_len) + ->wait(); + + if (client_fd < 0) { + DIE_MSG_ERRNO("accept failed"); + } + + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char recv_buf[MAX]; + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char read_buf[MAX]; + + for (;;) { + int received = emper::io::recv(client_fd, recv_buf, sizeof(recv_buf), 0)->wait(); + if (received == 0) { + exit(EXIT_SUCCESS); + } + + if (received == -1) { + DIE_MSG_ERRNO("recv failed"); + break; + } + + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char file_name[] = "/tmp/emper-SimpleDiskAndNetworkTestFile-XXXXXX"; + int file_fd = mkstemp(file_name); + if (file_fd == -1) { + DIE_MSG_ERRNO("mkstemp failed"); + } + + int written = emper::io::writeFile(file_fd, recv_buf, received)->wait(); + if (written < 0) { + DIE_MSG_ERRNO("write failed"); + } + close(file_fd); + + file_fd = open(file_name, O_RDONLY); + if (file_fd == -1) { + DIE_MSG_ERRNO("open failed"); + } + + int bytes_read = emper::io::readFile(file_fd, read_buf, written)->wait(); + if (bytes_read == 0) { + DIE_MSG("nothing to read"); + } + + if (bytes_read < 0) { + DIE_MSG_ERRNO("read failed"); + } + close(file_fd); + + int sent = emper::io::send(client_fd, read_buf, bytes_read, 0)->wait(); + if (sent == 0) { + DIE_MSG("client socket unexpected shutdown"); + } + + if (sent == -1) { + DIE_MSG_ERRNO("send failed"); + } + + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char file2_name[] = "/tmp/emper-SimpleDiskAndNetworkTestFile-XXXXXX"; + file_fd = mkstemp(file2_name); + if (file_fd == -1) { + DIE_MSG_ERRNO("mkstemp failed"); + } + + const int iovcnt = 2; + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + struct iovec iov[iovcnt]; + + std::string s1 = "foo"; + std::string s2 = "bar"; + + iov[0].iov_base = (void*)s1.c_str(); + iov[0].iov_len = s1.length(); + iov[1].iov_base = (void*)s2.c_str(); + iov[1].iov_len = s2.length(); + + auto writevFuture = emper::io::writev(file_fd, &iov[0], iovcnt); + written = writevFuture->wait(); + if (written < 0) { + DIE_MSG_ERRNO("wrtev failed"); + } + close(file_fd); + + file_fd = open(file2_name, O_RDONLY); + if (file_fd == -1) { + DIE_MSG_ERRNO("open failed"); + } + + auto readFuture = emper::io::readFile(file_fd, read_buf, written, 0, true); + bytes_read = readFuture->wait(); + if (bytes_read == 0) { + DIE_MSG("nothing to read"); + } + + if (bytes_read < 0) { + DIE_MSG_ERRNO("read failed"); + } + + assert(written == bytes_read); + + assert(memcmp(read_buf, iov[0].iov_base, iov[0].iov_len) == 0); + assert(memcmp((char*)read_buf + iov[0].iov_len, iov[1].iov_base, iov[1].iov_len) == 0); + + close(file_fd); + } +} + +void emperTest() { + CPS cps; + spawn(server_func, cps); + + spawn( + [] { + const std::vector<std::string> msgs{"foo", "bar"}; + echo_client("127.0.0.1", PORT, msgs); + }, + cps); + + cps.wait(); +} diff --git a/tests/SimpleNetworkTest.cpp b/tests/SimpleNetworkTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3515862506fe6ebafc4d78fe3785d3f502419dd1 --- /dev/null +++ b/tests/SimpleNetworkTest.cpp @@ -0,0 +1,27 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <string> // for string, allocator +#include <vector> // for vector + +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "emper.hpp" // for async, spawn +#include "fixtures/network.hpp" // for echo_client, echo_serve +#include "io.hpp" // for tcp_listener + +#define MAX 1024 +int PORT = 4242; +std::string ADDR = "127.0.0.1"; + +void emperTest() { + CPS cps; + async(emper::io::tcp_listener(ADDR, PORT, echo_serve)); + + spawn( + [] { + const std::vector<std::string> strings = {"foo", "bar", std::string(MAX, 'a'), "quit\n"}; + echo_client(ADDR.c_str(), PORT, strings); + }, + cps); + + cps.wait(); +} diff --git a/tests/TimeoutWrapperTest.cpp b/tests/TimeoutWrapperTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8d1ee00138de1db26428e7c62e484bdf2fb7f42f --- /dev/null +++ b/tests/TimeoutWrapperTest.cpp @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Fischer +#include <linux/time_types.h> // for __kernel_timespec +#include <sys/eventfd.h> // for eventfd, EFD_SEMAPHORE + +#include <cassert> // for assert +#include <cerrno> // for ECANCELED, ETIME +#include <cstdint> // for uint64_t, int32_t +#include <cstdlib> // for exit, EXIT_SUCCESS + +#include "Common.hpp" // for DIE_MSG_ERRNO +#include "io/Future.hpp" // for ReadFuture, TimeoutWrapper + +using emper::io::ReadFuture; +using emper::io::TimeoutWrapper; + +void emperTest() { + int efd = eventfd(0, EFD_SEMAPHORE); + if (efd == -1) { + DIE_MSG_ERRNO("eventfd failed"); + } + + uint64_t read_buf; + ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); + + struct __kernel_timespec ts = {.tv_sec = 1, .tv_nsec = 0}; + TimeoutWrapper t(readFuture, ts); + + int32_t res = t.submitAndWait(); + assert(res == -ETIME); + + res = readFuture.wait(); + assert(res == -ECANCELED); + + exit(EXIT_SUCCESS); +} diff --git a/tests/TooLongFutureChain.cpp b/tests/TooLongFutureChain.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a9954de426817cb28c155863a8e33a70c4f3ee62 --- /dev/null +++ b/tests/TooLongFutureChain.cpp @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <linux/time_types.h> // for __kernel_timespec + +#include <array> // for array +#include <cstdlib> // for exit, EXIT_SUCCESS, size_t + +#include "emper-config.h" // for EMPER_IO_WORKER_URING_ENTRIES +#include "io/Future.hpp" // for AlarmFuture + +using emper::io::AlarmFuture; + +void emperTest() { + const size_t links = EMPER_IO_WORKER_URING_ENTRIES * 2; + std::array<AlarmFuture*, links> futures; + struct __kernel_timespec ts = {.tv_sec = 0, .tv_nsec = 100}; + + for (unsigned i = 0; i < links; ++i) { + auto* future = new AlarmFuture(ts); + if (i > 0) { + future->setDependency(*futures[i - 1]); + } + + futures[i] = future; + } + + futures[links - 1]->submitAndWait(); + exit(EXIT_SUCCESS); +} diff --git a/tests/fixtures/meson.build b/tests/fixtures/meson.build new file mode 100644 index 0000000000000000000000000000000000000000..9b3f3c133e3edf7bbccd46b316ee54e831e64e1a --- /dev/null +++ b/tests/fixtures/meson.build @@ -0,0 +1,18 @@ +test_fixtures_include_dir = include_directories('.') + +test_fixtures_sources = [] +if get_option('io') + test_fixtures_sources += ['network.cpp'] +endif + +test_fixtures_lib = library( + 'test-fixtures', + test_fixtures_sources, + include_directories: emper_all_include, + link_with: emper, +) + +test_fixtures = declare_dependency( + include_directories: test_fixtures_include_dir, + link_with: test_fixtures_lib, +) diff --git a/tests/fixtures/network.cpp b/tests/fixtures/network.cpp new file mode 100644 index 0000000000000000000000000000000000000000..97858086292faeb42e314308209f90ef53b86e52 --- /dev/null +++ b/tests/fixtures/network.cpp @@ -0,0 +1,100 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include "network.hpp" + +#include <arpa/inet.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/types.h> + +#include <cassert> +#include <cstdlib> +#include <cstring> +#include <vector> + +#include "Common.hpp" +#include "io.hpp" + +#define MAX 1024 + +static void send_all(int fd, const void* buf, size_t len) { + // emper send everything or encounters an unrecoverable state + ssize_t bytes_send = emper::io::sendAndWait(fd, buf, len, MSG_NOSIGNAL, true); + if (unlikely(bytes_send == -1)) { + DIE_MSG_ERRNO("send failed"); + } +} + +void echo_serve(int client_socket) { + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char buf[MAX]; + for (;;) { + ssize_t bytes_recv = emper::io::recvAndWait(client_socket, buf, sizeof(buf), 0); + // socket was shutdown + if (bytes_recv == 0) { + return; + } + + if (bytes_recv == -1) { + DIE_MSG_ERRNO("server read failed"); + } + + if (unlikely(strncmp("quit\n", buf, bytes_recv) == 0)) { + exit(EXIT_SUCCESS); + } + + send_all(client_socket, buf, bytes_recv); + } +} + +void echo_client(const char* addr, int port, const std::vector<std::string>& msgs, + size_t iterations) { + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd == -1) { + DIE_MSG_ERRNO("client socket creation failed"); + } + + struct sockaddr_in server_addr; + // assign IP, PORT + memset(&server_addr, 0, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = inet_addr(addr); + server_addr.sin_port = htons(port); + + if (emper::io::connectAndWait(sockfd, reinterpret_cast<struct sockaddr*>(&server_addr), + sizeof(server_addr)) == -1) { + DIE_MSG_ERRNO("connect failed"); + } + + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char buf[MAX]; +#ifndef NDEBUG + memset(buf, 0, MAX); +#endif + + for (size_t i = 0; i < iterations; ++i) { + for (const auto& s : msgs) { + assert(s.length() <= MAX); + send_all(sockfd, s.c_str(), s.length()); + + ssize_t bytes_recv = emper::io::recvAndWait(sockfd, buf, s.length(), MSG_WAITALL); + if (bytes_recv == 0) { + DIE_MSG("client: socket closed"); + } + + if (bytes_recv == -1) { + DIE_MSG_ERRNO("client read failed"); + } + + assert(bytes_recv > 0); + assert((size_t)bytes_recv == s.length()); + + if (memcmp(s.c_str(), buf, s.length()) != 0) { + // if (strncmp(s.c_str(), buf, bytes_recv) != 0) { + DIE_MSG("unexpected string received from server"); + } + } + } + + shutdown(sockfd, SHUT_RDWR); +} diff --git a/tests/fixtures/network.hpp b/tests/fixtures/network.hpp new file mode 100644 index 0000000000000000000000000000000000000000..271cb2d37375e7417f012d47a5f3ff0ef78d89dc --- /dev/null +++ b/tests/fixtures/network.hpp @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#pragma once + +#include <cstddef> +#include <string> +#include <vector> + +void echo_serve(int client_socket); + +void echo_client(const char* addr, int port, const std::vector<std::string>& msgs, + size_t iterations = 1); diff --git a/tests/meson.build b/tests/meson.build index a4ba8ce841f6ca9fb25687de3f83563f6ec46912..264746c299180b903ee127e88d7a0fcdc5e39d37 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -1,7 +1,11 @@ +subdir('fixtures') + cc = meson.get_compiler('c') liburcu_memb = option_urcu ? cc.find_library('urcu-memb') : disabler() liburcu_cds = option_urcu ? cc.find_library('urcu-cds') : disabler() +# TODO: Parallel tests should be run without io_uring compiled +# into emper, to avoid running into `ulimit -l`. tests = { 'SimpleFibTest.cpp': { @@ -12,7 +16,7 @@ tests = { { 'description': 'Simplest fib test', 'test_suite': 'smoke', - 'is_parallel': true, +# 'is_parallel': true, }, 'c_api_test.c': @@ -25,6 +29,12 @@ tests = { 'description': 'Test EMPER\'s C++ API', }, + 'ReuseBpsTest.cpp': + { + 'description': 'Test resetting of BPSs', + 'emper_test_runner': true, + }, + 'SimpleActorTest.cpp': { 'description': 'Simple Actor Test', @@ -53,16 +63,97 @@ tests = { { 'description': 'Simple test for PrivateSemaphore:signalFromAnywhere()', 'test_suite': 'smoke', - 'is_parallel': true, +# 'is_parallel': true, 'emper_test_runner': true, }, 'TellActorFromAnywhereTest.cpp': { 'description': 'Simple test for Actor:tellFromAnywhere()', 'test_suite': 'smoke', - 'is_parallel': true, +# 'is_parallel': true, + 'emper_test_runner': true, + }, + + 'IncrementalCompletionTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Test incremental completion for hugre reads/writes', + 'test_suite': 'io', + 'emper_test_runner': true, + }, + + 'ReuseFutureTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Test reusing Future objects', + 'test_suite': 'io', + 'emper_test_runner': true, + }, + + 'LinkFutureTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Test linking Future objects', + 'test_suite': 'io', + 'emper_test_runner': true, + }, + + 'TooLongFutureChain.cpp': + { + 'feature_flags': ['io'], + 'description': 'Test linking Future objects', + 'test_suite': 'io', + 'should_fail': true, + 'emper_test_runner': true, + }, + + 'AlarmFutureTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Test AlarmFuture object based timeouts', + 'test_suite': 'io', + 'emper_test_runner': true, + }, + + 'TimeoutWrapperTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Test TimeoutWrapper object based IO request timeouts', + 'test_suite': 'io', 'emper_test_runner': true, }, + + 'CancelFutureTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Test Future cancellation', + 'test_suite': 'io', + 'emper_test_runner': true, + }, + + 'SimpleNetworkTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Simple network test', + 'test_suite': 'io', + 'emper_test_runner': true, + }, + + 'SimpleDiskAndNetworkTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Simple network and disk IO test', + 'test_suite': 'io', + 'emper_test_runner': true, + }, + + 'ConcurrentNetworkEchoTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Concurrent network echo test with 10 clients', + 'test_suite': 'io', + 'args': ['10', '10000'], + }, } undef_ndebug = '-UNDEBUG' @@ -79,13 +170,27 @@ test_env = environment( subdir('test-runner') foreach source, test_dict : tests + # check feature flags + if test_dict.has_key('feature_flags') + include_test = true + foreach flag : test_dict.get('feature_flags') + if not get_option(flag) + include_test = false + break + endif + endforeach + + if not include_test + continue + endif + endif # TODO: Use meson fs (filesystem) module once meson >= 0.53 is in # buster-backports, instead of split('.')[0] # test_name = fs.replace_suffix(source, '') # The test_name is the name of the source file without the file suffix. test_name = source.split('.')[0] - test_deps = [thread_dep] + test_deps = [thread_dep, test_fixtures] if test_dict.has_key('dependencies') test_deps += test_dict['dependencies'] endif @@ -106,7 +211,9 @@ foreach source, test_dict : tests test_exe, is_parallel: test_dict.get('is_parallel', false), suite: test_dict.get('test_suite', 'all'), - timeout: 180, env: test_env, + timeout: test_dict.get('timeout', 180), + args: test_dict.get('args', []), + should_fail: test_dict.get('should_fail', false), ) endforeach