From 460c2f05ab1f7ffefe7f8f51357d79f9ddce7dff Mon Sep 17 00:00:00 2001 From: Florian Fischer <florian.fl.fischer@fau.de> Date: Fri, 15 Jan 2021 12:19:23 +0100 Subject: [PATCH] [IO] introduce emper::io a IO subsystem using io_uring Empers IO design is based on a proactor pattern where each worker can issue IO requests through its exclusive IoContext object which wraps an io_uring instance. IO completions are reaped at 4 places: 1. After a submit to collect inline completions 2. Before dispatching a new Fiber 3. When no new IO can be submitted because the completion queue is full 4. And by a global completer thread which gets notified about completions on worker IoContexts through registered eventfds All IO requests are modeled as Future objects which can be either instantiated and submitted manually, retrieved by POSIX-like non-blocking or implicitly used by posix-like blocking functions. User facing API is exported in the following headers: * emper/io.hpp (POSIX-like) * emper.h (POSIX-like) * emper/io/Future.hpp Catching short write/reads/sends and resubmitting the request without unblocking the Fiber is supported. Using AlarmFuture objects Fibers have a emper-native way to sleep for a given time. IO request timeouts with TimeoutWrapper class. Request Cancellation is supported with Future::cancel() or the CancelWrapper() Future class. A proactor design demands that buffers are committed to the kernel as long as the request is active. To guaranty memory safety Futures get canceled in their Destructor which will only return after the committed memory is free to use. Linking Futures to chains is supported using the Future::SetDependency() method. Future are submitted when their last Future gets submitted. A linked Request will start if the previous has finished. Error or partial completions will cancel the not started tail of a chain. TODO: Handle possible situations where the CQ of the global completer is full and no more sqe can be submitted to the SQ. --- emper/BinaryPrivateSemaphore.hpp | 2 + emper/Common.hpp | 14 + emper/Debug.hpp | 4 + emper/Emper.hpp | 16 + emper/Runtime.cpp | 51 ++- emper/Runtime.hpp | 17 + emper/c_emper.cpp | 44 ++- emper/include/emper.h | 34 +- emper/io.hpp | 395 ++++++++++++++++++++ emper/io/Future.cpp | 118 ++++++ emper/io/Future.hpp | 535 ++++++++++++++++++++++++++++ emper/io/IoContext.cpp | 372 +++++++++++++++++++ emper/io/IoContext.hpp | 132 +++++++ emper/io/Operation.cpp | 49 +++ emper/io/Operation.hpp | 26 ++ emper/io/Stats.cpp | 160 +++++++++ emper/io/Stats.hpp | 205 +++++++++++ emper/io/io.cpp | 71 ++++ emper/io/meson.build | 9 + emper/meson.build | 1 + meson.build | 28 +- meson_options.txt | 18 + tests/AlarmFutureTest.cpp | 31 ++ tests/CancelFutureTest.cpp | 62 ++++ tests/ConcurrentNetworkEchoTest.cpp | 61 ++++ tests/IncrementalCompletionTest.cpp | 53 +++ tests/LinkFutureTest.cpp | 64 ++++ tests/ReuseBpsTest.cpp | 39 ++ tests/ReuseFutureTest.cpp | 66 ++++ tests/SimpleDiskAndNetworkTest.cpp | 178 +++++++++ tests/SimpleNetworkTest.cpp | 27 ++ tests/TimeoutWrapperTest.cpp | 36 ++ tests/TooLongFutureChain.cpp | 29 ++ tests/fixtures/meson.build | 18 + tests/fixtures/network.cpp | 100 ++++++ tests/fixtures/network.hpp | 12 + tests/meson.build | 117 +++++- 37 files changed, 3184 insertions(+), 10 deletions(-) create mode 100644 emper/io.hpp create mode 100644 emper/io/Future.cpp create mode 100644 emper/io/Future.hpp create mode 100644 emper/io/IoContext.cpp create mode 100644 emper/io/IoContext.hpp create mode 100644 emper/io/Operation.cpp create mode 100644 emper/io/Operation.hpp create mode 100644 emper/io/Stats.cpp create mode 100644 emper/io/Stats.hpp create mode 100644 emper/io/io.cpp create mode 100644 emper/io/meson.build create mode 100644 tests/AlarmFutureTest.cpp create mode 100644 tests/CancelFutureTest.cpp create mode 100644 tests/ConcurrentNetworkEchoTest.cpp create mode 100644 tests/IncrementalCompletionTest.cpp create mode 100644 tests/LinkFutureTest.cpp create mode 100644 tests/ReuseBpsTest.cpp create mode 100644 tests/ReuseFutureTest.cpp create mode 100644 tests/SimpleDiskAndNetworkTest.cpp create mode 100644 tests/SimpleNetworkTest.cpp create mode 100644 tests/TimeoutWrapperTest.cpp create mode 100644 tests/TooLongFutureChain.cpp create mode 100644 tests/fixtures/meson.build create mode 100644 tests/fixtures/network.cpp create mode 100644 tests/fixtures/network.hpp diff --git a/emper/BinaryPrivateSemaphore.hpp b/emper/BinaryPrivateSemaphore.hpp index 54410bca..5dcee792 100644 --- a/emper/BinaryPrivateSemaphore.hpp +++ b/emper/BinaryPrivateSemaphore.hpp @@ -35,6 +35,8 @@ class BinaryPrivateSemaphore : public PrivateSemaphore { BinaryPrivateSemaphore() : bpsState(initial) {} void wait() override; + + void reset() { bpsState = initial; }; }; using BPS = BinaryPrivateSemaphore; diff --git a/emper/Common.hpp b/emper/Common.hpp index a78233cd..fdda0ccf 100644 --- a/emper/Common.hpp +++ b/emper/Common.hpp @@ -23,3 +23,17 @@ using func_t = std::function<void()>; [[noreturn]] void die(const char* message, bool usePerror); using WORD = unsigned int; + +#ifdef EMPER_STATS +#include <chrono> +#define TIME_NS(code, record_func) \ + auto _time_ns_start = std::chrono::high_resolution_clock::now(); \ + code; \ + auto _time_ns_end = std::chrono::high_resolution_clock::now(); \ + auto _time_ns_duration = \ + std::chrono::duration_cast<std::chrono::nanoseconds>(_time_ns_end - _time_ns_start); \ + record_func(_time_ns_duration); + +#else +#define TIME_NS(code, record) code +#endif diff --git a/emper/Debug.hpp b/emper/Debug.hpp index 71b9cea5..b02d89c0 100644 --- a/emper/Debug.hpp +++ b/emper/Debug.hpp @@ -63,6 +63,7 @@ enum class LogSubsystem { SCHED, RUNTI, U_B_MPSC_Q, + IO, }; enum LogLevel { @@ -99,6 +100,7 @@ class Logger { case LogSubsystem::SCHED: case LogSubsystem::RUNTI: case LogSubsystem::U_B_MPSC_Q: + case LogSubsystem::IO: default: return ALL; } @@ -122,6 +124,8 @@ class Logger { return "RUNTI"; case LogSubsystem::U_B_MPSC_Q: return "UBSCQ"; + case LogSubsystem::IO: + return "IO "; default: return "UNKNOWN SUBSYSTEM (Add it *now*)"; } diff --git a/emper/Emper.hpp b/emper/Emper.hpp index a766160f..bbebc22a 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -73,4 +73,20 @@ static const bool BLOCKED_CONTEXT_SET = false #endif ; + +static const bool IO = +#ifdef EMPER_IO + true +#else + false +#endif + ; + +static const bool IO_URING_SQPOLL = +#ifdef EMPER_IO_URING_SQPOLL + true +#else + false +#endif + ; } // namespace emper diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index b6a9c58f..e195d3e0 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020-2021 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus, Florian Fischer #include "Runtime.hpp" #include <pthread.h> // for pthread_t, pthread_attr_init @@ -24,6 +24,8 @@ #include "RuntimeStrategyFactory.hpp" #include "RuntimeStrategyStats.hpp" // for RuntimeStrategyStats #include "emper-config.h" // IWYU pragma: keep +#include "io/IoContext.hpp" // for IoContext +#include "io/Stats.hpp" // for emper::io::Stats #include "lib/DebugUtil.hpp" #ifdef EMPER_DEFAULT_SCHEDULING_STRATEGY_WORK_STEALING @@ -58,6 +60,8 @@ RuntimeStrategyFactory& Runtime::DEFAULT_STRATEGY = #endif ; +using emper::io::IoContext; + Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory, unsigned int seed) : workerCount(workerCount), workerLatch(workerCount), @@ -80,6 +84,32 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory currentRuntime = this; } + // initialize the global and all worker IoContexts + if constexpr (emper::IO) { + // The global io_uring needs at least workerCount entries in its SQ because + // for each worker's IoContext one eventfd read is prepared before the + // globalCompleter is started and submits all previously prepared sqes. + globalIo = new IoContext(workerCount); + ioContexts = new IoContext[workerCount]; + + if constexpr (emper::STATS) { + globalIo->stats.workerId = -1; + std::atexit(emper::io::Stats::printWorkerStats); + } + + // submit the workers' CQ eventfds to the global IoContext + for (workerid_t i = 0; i < workerCount; ++i) { + ioContexts[i].submit_efd(); + if constexpr (emper::STATS) { + ioContexts[i].stats.workerId = i; + } + } + + // start the globalCompleter after all eventfd are submitted so we don't need to + // synchronize the globalIo's SQ + IoContext::startGlobalCompleter(*globalIo); + } + for (workerid_t i = 0; i < workerCount; ++i) { pthread_attr_t attr; errno = pthread_attr_init(&attr); @@ -141,6 +171,12 @@ Runtime::~Runtime() { } delete[] workers; delete[] threads; + + if constexpr (emper::IO) { + delete[] ioContexts; + delete globalIo; + } + { std::lock_guard<std::mutex> lock(currentRuntimeMutex); currentRuntime = nullptr; @@ -151,6 +187,10 @@ Runtime::~Runtime() { auto Runtime::workerLoop(Worker* worker) -> void* { worker->setWorker(); + if constexpr (emper::IO) { + ioContexts[worker->workerId].setWorkerIo(); + } + LOGD("Worker loop started by thread " << syscall(SYS_gettid)); int oldType; @@ -169,7 +209,14 @@ auto Runtime::workerLoop(Worker* worker) -> void* { return nullptr; } -auto Runtime::nextFiber() -> NextFiberResult { return scheduler.nextFiber(); } +auto Runtime::nextFiber() -> NextFiberResult { + if constexpr (emper::IO) { + // Schedule all fibers waiting on completed IO + IoContext::getWorkerIo()->reapCompletions(); + } + + return scheduler.nextFiber(); +} void Runtime::waitUntilFinished() { for (workerid_t i = 0; i < workerCount; ++i) { diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 9cfd7679..05c0ea51 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -30,6 +30,12 @@ class Fiber; class RuntimeStrategy; class RuntimeStrategyFactory; +namespace emper::io { +class IoContext; +} + +using emper::io::IoContext; + enum WakeupMode { IF_SLEEPING_OBSERVED, ALWAYS, @@ -52,6 +58,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { ContextManager& contextManager; pthread_t* threads; Worker** workers; + IoContext* globalIo; + IoContext* ioContexts; std::default_random_engine randomEngine; std::uniform_int_distribution<unsigned int> uniformIntDistribution; @@ -115,6 +123,14 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { workerSleepConditionVariable.wait(lk); } + inline auto getGlobalIo() -> IoContext* { + if constexpr (emper::IO) { + return globalIo; + } + + return nullptr; + } + public: Runtime() : Runtime(std::thread::hardware_concurrency()) {} @@ -164,6 +180,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { friend ContextManager; friend Scheduler; friend Dispatcher; + friend IoContext; template <typename T, intptr_t WS_QUEUE_SIZE, size_t WORKER_EXCLUSIVE_QUEUE_SIZE> friend class MemoryManager; }; diff --git a/emper/c_emper.cpp b/emper/c_emper.cpp index 6822dee6..85a9bf96 100644 --- a/emper/c_emper.cpp +++ b/emper/c_emper.cpp @@ -1,12 +1,22 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020 Florian Schmaus, Florian Fischer #include "BinaryPrivateSemaphore.hpp" // for BinaryPrivateSemaphore, BPS #include "CountingPrivateSemaphore.hpp" // for CountingPrivateSemaphore, CPS #include "Fiber.hpp" // for Fiber, Fiber::NOT_AFFINE #include "Runtime.hpp" // for Runtime #include "emper-common.h" // for workeraffinity_t +#include "emper-config.h" // IWYU pragma: keep #include "emper.h" // for fiber, cps, bps, runtime +#ifdef EMPER_IO +#include <sys/socket.h> // for socklen_t +#include <sys/types.h> // for off_t + +#include <cstddef> // for size_t + +#include "io.hpp" // for emper::io::emper_* +#endif + auto init_runtime(void) -> runtime* { auto* r = new Runtime(); return reinterpret_cast<runtime*>(r); @@ -99,3 +109,35 @@ void wait_cps(cps* sem) { auto* countingPrivateSemaphore = reinterpret_cast<CountingPrivateSemaphore*>(sem); countingPrivateSemaphore->wait(); } + +#ifdef EMPER_IO +auto emper_recv(int socket, void* buffer, size_t length, int flags) -> ssize_t { + return emper::io::recvAndWait(socket, buffer, length, flags); +} + +auto emper_send(int socket, const void* buffer, size_t length, int flags) -> ssize_t { + return emper::io::sendAndWait(socket, buffer, length, flags); +} + +auto emper_connect(int socket, const struct sockaddr* address, socklen_t address_len) -> int { + return emper::io::connectAndWait(socket, address, address_len); +} + +auto emper_accept(int socket, struct sockaddr* address, socklen_t* address_len) -> int { + return emper::io::acceptAndWait(socket, address, address_len); +} + +auto emper_read_file(int fildes, void* buf, size_t nbyte, off_t offset) -> ssize_t { + return emper::io::readFileAndWait(fildes, buf, nbyte, offset); +} + +auto emper_write_file(int fildes, const void* buf, size_t nbyte, off_t offset) -> ssize_t { + return emper::io::writeFileAndWait(fildes, buf, nbyte, offset); +} + +auto emper_writev(int fildes, const struct iovec* iov, int iovcnt) -> ssize_t { + return emper::io::writevAndWait(fildes, iov, iovcnt); +} + +auto emper_close(int fd) -> int { return emper::io::closeAndWait(fd); } +#endif diff --git a/emper/include/emper.h b/emper/include/emper.h index e93cbf08..4a1402e7 100644 --- a/emper/include/emper.h +++ b/emper/include/emper.h @@ -1,8 +1,14 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020 Florian Schmaus, Florian Fischer #pragma once #include "emper-common.h" +#include "emper-config.h" +#include "stddef.h" // NOLINT(modernize-deprecated-headers) + +#ifdef EMPER_IO +#include "sys/socket.h" +#endif typedef struct runtime runtime; // NOLINT(modernize-use-using) @@ -60,6 +66,32 @@ void signal_and_exit_cps(cps* sem); void wait_cps(cps* sem); +#ifdef EMPER_IO +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +ssize_t emper_recv(int socket, void* buffer, size_t length, int flags); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +ssize_t emper_send(int socket, const void* buffer, size_t length, int flags); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +int emper_connect(int socket, const struct sockaddr* address, socklen_t address_len); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +int emper_accept(int socket, struct sockaddr* address, socklen_t* address_len); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +ssize_t emper_read_file(int fildes, void* buf, size_t nbyte, off_t offset); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +ssize_t emper_write_file(int fildes, const void* buf, size_t nbyte, off_t offset); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +ssize_t emper_writev(int fildes, const struct iovec* iov, int iovcnt); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +int emper_close(int fd); +#endif + #ifdef __cplusplus } #endif diff --git a/emper/io.hpp b/emper/io.hpp new file mode 100644 index 00000000..76da837b --- /dev/null +++ b/emper/io.hpp @@ -0,0 +1,395 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#pragma once + +#include <sys/socket.h> // for socklen_t +#include <sys/types.h> // for ssize_t, off_t + +#include <cstddef> // for size_t +#include <functional> // for function +#include <memory> // for unique_ptr +#include <string> // for string + +#include "Future.hpp" // for Futures + +/* + * Header defining the public POSIX-like IO interface of emper. + * All functions defined in this header must be used inside a fiber. + * The emper IoContext to handle the IO request will be the one setup by the + * Runtime and returned by IoContext::getWorkerIo(). + */ + +class Fiber; + +namespace emper::io { +/** + * @brief non-blocking recv mimicking POSIX recv(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param buffer destination buffer + * @param length length of the message in bytes + * @param flags type of message reception + * + * @return Future object which signals the completion of the recv request + */ +inline auto recv(int socket, void *buffer, size_t length, int flags) -> std::unique_ptr<Future> { + auto future = std::make_unique<RecvFuture>(socket, buffer, length, flags); + future->submit(); + return future; +} + +/** + * @brief Blocking recv mimicking POSIX recv(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param buffer destination buffer + * @param length length of the message in bytes + * @param flags type of message reception + + * @return -1 on error, 0 when receiving from a closed socket, otherwise the received bytes + */ +inline auto recvAndWait(int socket, void *buffer, size_t length, int flags) -> ssize_t { + RecvFuture future(socket, buffer, length, flags); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief Non-blocking send mimicking POSIX send(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param buffer source buffer + * @param length length of the message in bytes + * @param flags type of message transmission + * @param send_all If true sginal the future only if all bytes are sent or + * sending further bytes is not possible + * + * @return Future object which signals the completion of the send request + */ +inline auto send(int socket, const void *buffer, size_t length, int flags, bool send_all = true) + -> std::unique_ptr<Future> { + auto future = std::make_unique<SendFuture>(socket, buffer, length, flags, send_all); + future->submit(); + return future; +} + +/** + * @brief Blocking send mimicking POSIX send(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param buffer source buffer + * @param length length of the message in bytes + * @param flags type of message transmission + * @param send_all If true return to the user only if all bytes are sent or + * sending further bytes is not possible + * + * @return -1 on error, otherwise the number of sent bytes + */ +inline auto sendAndWait(int socket, const void *buffer, size_t length, int flags, + bool send_all = true) -> ssize_t { + void *mut_buf = const_cast<void *>(buffer); + SendFuture future(socket, mut_buf, length, flags, send_all); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief Non-blocking connect mimicking POSIX connect(3) + * + * It is discouraged to call connect on a socket with the SOCK_NONBLOCK flag set + * because the request submitted to io_uring will immediately be completed + * with EINPROGESS or EALREADY and must be repeatedly reinserted into the submission + * queue and io_uring afterwards. + * To efficiently connect a socket don't set its SOCK_NONBLOCK flag. + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param address buffer containing the address to connect to + * @param address_len length of the sockaddr structure + * + * @return Future object which signals the completion of the connect request + */ +inline auto connect(int socket, const struct sockaddr *address, socklen_t address_len) + -> std::unique_ptr<Future> { + auto future = std::make_unique<ConnectFuture>(socket, address, address_len); + future->submit(); + return future; +} + +/** + * @brief Blocking connect mimicking POSIX connect(3) + * + * It is discouraged to call connect on a socket with the SOCK_NONBLOCK flag set + * because the request submitted to io_uring will immediately be completed + * with EINPROGESS or EALREADY and must be repeatedly reinserted into the submission + * queue and io_uring afterwards. + * To efficiently connect a socket don't set its SOCK_NONBLOCK flag. + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param address buffer containing the address to connect to + * @param address_len length of the sockaddr structure + * + * @return -1 on error, 0 otherwise + */ +inline auto connectAndWait(int socket, const struct sockaddr *address, socklen_t address_len) + -> int { + ConnectFuture future(socket, address, address_len); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief Non-blocking accept mimicking POSIX accept(3) + * + * It is discouraged to call accept on a socket with the SOCK_NONBLOCK flag set + * because the request submitted to io_uring will immediately be completed + * with EAGAIN or EWOULDBLOCK and must be repeatedly reinserted into the submission + * queue and io_uring afterwards. + * To efficiently accept new connections don't set the SOCK_NONBLOCK flag. + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param[out] address buffer to store the clients address + * @param[in, out] address_len length of the supplied sockaddr structure + * + * @return Future object which signals the completion of the accept request + */ +inline auto accept(int socket, struct sockaddr *address, socklen_t *address_len) + -> std::unique_ptr<Future> { + auto future = std::make_unique<AcceptFuture>(socket, address, address_len); + future->submit(); + return future; +} + +/** + * @brief Blocking accept mimicking POSIX accept(3) + * + * It is discouraged to call accept on a socket with the SOCK_NONBLOCK flag set + * because the request submitted to io_uring will immediately be completed + * with EAGAIN or EWOULDBLOCK and must be repeatedly reinserted into the submission + * queue and io_uring afterwards. + * To efficiently accept new connections don't set the SOCK_NONBLOCK flag. + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param socket file descriptor for this operation + * @param[out] address buffer to store the clients address + * @param[in, out] address_len length of the supplied sockaddr structure + * + * @return -1 on error, otherwise the non-negative file descriptor of the accepted socket. + */ +inline auto acceptAndWait(int socket, struct sockaddr *address, socklen_t *address_len) -> int { + AcceptFuture future(socket, address, address_len); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief Non-blocking read for regular files mimicking POSIX read(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fildes file descriptor to the regular file to be read from + * @param buf destination buffer + * @param nbyte amount of bytes to read + * @param offset offset in the file + * @param read_all If true signal the future only if the buffer is completely + * filled or no further data can be read. + * + * @return Future object which signals the completion of the read request + */ +inline auto readFile(int fildes, void *buf, size_t nbyte, off_t offset = 0, bool read_all = false) + -> std::unique_ptr<Future> { + auto future = std::make_unique<ReadFuture>(fildes, buf, nbyte, offset, read_all); + future->submit(); + return future; +} + +/** + * @brief Blocking read for regular files mimicking POSIX read(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fildes file descriptor to the regular file to be written to + * @param buf destination buffer + * @param nbyte amount of bytes to read + * @param offset offset in the file + * @param read_all If true return only to the user if the buffer is completely + * filled or no further data can be read. + * + * @return -1 on error, otherwise the number of bytes read + */ +inline auto readFileAndWait(int fildes, void *buf, size_t nbyte, off_t offset = 0, + bool read_all = false) -> ssize_t { + ReadFuture future(fildes, buf, nbyte, offset, read_all); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief Non-blocking write for regular files mimicking POSIX write(3) + * + * Currently only reading from regular files is tested and supported. + * Reading from other file types could work but may result in undefined behavior. + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fildes file descriptor to the regular file to be written to + * @param buf source buffer + * @param nbyte amount of bytes to write + * @param offset offset in the file + * @param write_all If true signal this future only all nbyte were written + * or no further data can be written. + * + * @return Future object which signals the completion of the write request + */ +inline auto writeFile(int fildes, const void *buf, size_t nbyte, off_t offset = 0, + bool write_all = true) -> std::unique_ptr<Future> { + auto future = std::make_unique<WriteFuture>(fildes, buf, nbyte, offset, write_all); + future->submit(); + return future; +} + +/** + * @brief Blocking write for regular files mimicking POSIX write(3) + * + * Currently only writing to regular files is tested and supported. + * Writing to other file types could work but may result in undefined behavior. + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fildes file descriptor to the regular file to be written to + * @param buf source buffer + * @param nbyte amount of bytes to write + * @param offset offset in the file + * @param write_all If true return only to the user if all nbyte were written + * or no further data can be written. + * + * @return -1 on error, otherwise the number of bytes written + */ +inline auto writeFileAndWait(int fildes, const void *buf, size_t nbyte, off_t offset = 0, + bool write_all = true) -> ssize_t { + WriteFuture future(fildes, buf, nbyte, offset, write_all); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief Non-blocking writev mimicking POSIX writev(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fildes file descriptor to write to + * @param iov pointer to the iovec struct holding buffers to write + * @param iovcnt number of buffers in the iovec array + * + * @return Future object which signals the completion of the write request + */ +inline auto writev(int fildes, const struct iovec *iov, int iovcnt) -> std::unique_ptr<Future> { + auto future = std::make_unique<WritevFuture>(fildes, iov, iovcnt); + future->submit(); + return future; +} + +/** + * @brief Blocking writev mimicking POSIX writev(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fildes file descriptor to write to + * @param iov pointer to the iovec struct holding buffers to write + * @param iovcnt number of buffers in the iovec array + * + * @return -1 on error, otherwise the number of bytes written + */ +inline auto writevAndWait(int fildes, const struct iovec *iov, int iovcnt) -> ssize_t { + WritevFuture future(fildes, iov, iovcnt); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief Non-blocking close mimicking POSIX close(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fd file descriptor to close + * + * @return Future object which signals the completion of the close request + */ +inline auto close(int fd) -> std::unique_ptr<Future> { + auto future = std::make_unique<CloseFuture>(fd); + future->submit(); + return future; +} + +/** + * @brief Blocking close mimicking POSIX close(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fd file descriptor to close + * + * @return 0 on success, -1 on error + */ +inline auto closeAndWait(int fd) -> ssize_t { + CloseFuture future(fd); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief submit a close request mimicking POSIX close(3) and forget about it + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param fd file descriptor to close + */ +inline void closeAndForget(int fd) { + CloseFuture future(fd); + future.submitAndForget(); +} + +/** + * @brief Fiber accepting TCP connections and scheduling handler fibers + * + * @param host The host address the socket should be listening on + * @param port The port the socket should be listening on + * @param handler A function or lambda the started handler Fibers should execute. + * It is called with the file descriptor returned by accept. + * @param backlog The TCP connection backlog. + * + * @return nullptr on error, otherwise the TCP listener Fiber + */ +auto tcp_listener(std::string &host, int &port, const std::function<void(int)> &handler, + int backlog = 1024) -> Fiber *; +} // namespace emper::io diff --git a/emper/io/Future.cpp b/emper/io/Future.cpp new file mode 100644 index 00000000..8d59a3d6 --- /dev/null +++ b/emper/io/Future.cpp @@ -0,0 +1,118 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include "Future.hpp" + +#include <sys/types.h> // for ssize_t + +#include <cassert> // for assert +#include <cerrno> // for errno, EAGAIN, EWOULDBLOCK +#include <ostream> // for operator<<, ostream, basic_ost... + +#include "BinaryPrivateSemaphore.hpp" // for BPS +#include "CallerEnvironment.hpp" // for CallerEnvironment +#include "Debug.hpp" // for LOGD, LOGW +#include "Runtime.hpp" // for Runtime +#include "io/IoContext.hpp" // for IoContext +#include "io/Stats.hpp" // for Stats, Operation + +namespace emper::io { + +template <CallerEnvironment callerEnvironment> +auto PartialCompletableFuture::tryComplete(int32_t res) + -> PartialCompletableFuture::CompletionType { + // We expect partial completions and made progress + if (!isDependency() && partialCompletion > DISABLE_PARTIAL_COMPLETION && res > 0) { + partialCompletion += res; + // write/send: we have not written the whole buffer yet + // read: we have not filled the buffer yet + if ((size_t)partialCompletion < len) { + IoContext* io = IoContext::getIo<callerEnvironment>(); + io->submit<callerEnvironment>(*this); + return CompletionType::Resubmission; + } + + return CompletionType::IncrementalCompletion; + } + + return CompletionType::Completion; +} + +// show the compiler our template incarnations +template PartialCompletableFuture::CompletionType +PartialCompletableFuture::tryComplete<CallerEnvironment::EMPER>(int32_t res); +template PartialCompletableFuture::CompletionType +PartialCompletableFuture::tryComplete<CallerEnvironment::ANYWHERE>(int32_t res); + +auto Future::wait() -> int32_t { + LOGD("Waiting on " << this); + + sem.wait(); + + if constexpr (emper::DEBUG) { + state.retrieved = true; + } + + return returnValue; +} + +auto Future::waitAndSetErrno() -> ssize_t { + int32_t res = wait(); + // move error codes from io_uring into errno + if (res < 0) { + errno = -res; + return -1; + } + + return res; +} + +void Future::submit() { + assert(Runtime::inRuntime()); + assert(!isSubmitted()); + IoContext* io = IoContext::getWorkerIo(); + + LOGD("submit " << this << " to IoContext " << io); + + io->submit(*this); +} + +auto Future::cancel() -> int32_t { + if (dependency) { + dependency->cancel(); + } + + state.cancelled = true; + + if (!isSubmitted() || isForgotten()) { + LOGW("Cancelling unsubmitted or forgotten " << this); + return -ENOENT; + } + + if (isCompleted()) { + return returnValue; + } + + CancelWrapper cancellation(*this); + cancellation.submitAndWait(); + return wait(); +} + +/** + * Internal hiding functions for Stats::recordCompletion double dispatch + * + * The polymorphism functions are Future::recordCompletion and are defined in + * Future.hpp so they are included in each compilation unit and the + * compiler can optimize the recordCompletion calls if stats are off + */ +void Future::recordCompletionInternal(Stats& stats, int32_t res) const { + stats.recordCompletion(*this, res); +} + +void PartialCompletableFuture::recordCompletionInternal(Stats& stats, int32_t res) const { + stats.recordCompletion(*this, res); +} + +void WritevFuture::recordCompletionInternal(Stats& stats, int32_t res) const { + stats.recordCompletion(*this, res); +} +} // namespace emper::io diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp new file mode 100644 index 00000000..76baee9c --- /dev/null +++ b/emper/io/Future.hpp @@ -0,0 +1,535 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#pragma once + +#include <liburing.h> // for io_uring_prep_read, io_uring_p... +#include <sys/socket.h> // for socklen_t +#include <sys/types.h> // for ssize_t + +#include <cstddef> // for size_t +#include <cstdint> // for int32_t, uint8_t +#include <cstdlib> // for abort +#include <ostream> // for operator<<, ostream, basic_ost... + +#include "BinaryPrivateSemaphore.hpp" // for BPS +#include "CallerEnvironment.hpp" // for CallerEnvironment, ANYWHERE +#include "Debug.hpp" // for LOGD, LogSubsystem, LogSubsyst... +#include "Emper.hpp" // for DEBUG +#include "io/Operation.hpp" // for Operation, operator<<, Operati... + +struct __kernel_timespec; +struct io_uring_sqe; + +namespace emper::io { +class Stats; + +/* + * @brief Future representing an IO request which can be awaited + */ +class Future : public Logger<LogSubsystem::IO> { + friend class IoContext; + friend class Stats; + + protected: + using State = struct State { + uint8_t submitted : 1 = 0; /*!< An sqe for this Future was prepared */ + uint8_t completed : 1 = 0; /*!< The semaphor was signaled */ + uint8_t retrieved : 1 = 0; /*!< Future::wait() has retrieved the result */ + uint8_t dependency : 1 = 0; /*!< This Future is a dependency of another */ + uint8_t cancelled : 1 = 0; /*!< Future::cancel() was called */ + uint8_t forgotten : 1 = + 0; /*!< This Future does not care about the result of the IO operation */ + }; + + BPS sem; + + State state; + + /* IO operation to perform */ + const Operation op; + + /* Arguments for the IO operation */ + const int fd; + void* const buf; + const size_t len; + const int offsetOrFlags; + + /** + * Return value of the operation. + * __s32 is used by liburing for the res field in its cqe struct + * This value is returned by Future::wait() and can only differentiate between + * failure or full success. + */ + int32_t returnValue = 0; + + // Dependency futures will be submitted to the io_uring before + // this and will be linked to this IO request using IOSQE_IO_LINK + Future* dependency = nullptr; + + virtual void prepareSqe(io_uring_sqe* sqe) = 0; + + void setCompletion(int32_t res) { + LOGD("Complete " << this); + returnValue = res; + state.completed = true; + } + + virtual void complete(int32_t res) { + setCompletion(res); + sem.signal(); + } + + virtual void completeFromAnywhere(int32_t res) { + setCompletion(res); + sem.signalFromAnywhere(); + } + + /** + * Used for Stats::recordCompletion double dispatch + */ + virtual void recordCompletion(Stats& stats, int32_t res) { + if constexpr (emper::STATS) { + recordCompletionInternal(stats, res); + } + } + + void recordCompletionInternal(Stats& stats, int32_t res) const; + + Future(Operation op, int fd, void* buf, size_t len, int offsetOrFlags) + : op(op), fd(fd), buf(buf), len(len), offsetOrFlags(offsetOrFlags){}; + + public: + virtual ~Future() { + if (isForgotten()) { + return; + } + + if constexpr (emper::DEBUG) { + if (!isRetrieved() && !isDependency() && !isCancelled()) { + LOGE(this << " created but never awaited"); + abort(); + } + } + + // cancel this Future to guaranty memory safety + // By canceling we assure that the this pointer and the contained buffer + // passed to io_uring is not used any more by the kernel or the IoContext + cancel(); + } + + inline auto isRetrieved() -> bool { return state.retrieved; } + inline auto isSubmitted() -> bool { return state.submitted; } + inline auto isCompleted() -> bool { return state.completed; } + inline auto isCancelled() -> bool { return state.cancelled; } + inline auto isDependency() -> bool { return state.dependency; } + inline auto isForgotten() -> bool { return state.forgotten; } + + /* + * @brief reset the Future + * + * Reset the state and the BPS. + * Resetting a Future is useful if we wan't to use the same Future object + * multiple times. + * A Future for a specific reoccurring IO operation can be created and reused + * in a submit, reset loop. + */ + virtual inline void reset() { + LOGD("Resetting Future"); + state = State(); + sem.reset(); + } + + /* + * @brief set Future as dependency + * + * The dependency must complete before this future. + * This is assured by linking the futures' sqes in the io_uring using the IOSQE_IO_LINK flag. + * If a request in the link fails all dependent futures will return -ECANCELED. + * + * See: <a href="https://unixism.net/loti/tutorial/link_liburing.html">Liburing linking + * requests</a> + * The emper equivalent to the example from the liburing documentation. + * + * @code + * int fd = open(FILE_NAME, O_RDWR|O_TRUNC|O_CREAT, 0644); + * if (fd < 0 ) { + * perror("open"); + * return 1; + * } + * + * char* msg = "Hallo emper!\n"; + * WriteFuture writeFuture(fd, &msg, strlen(msg), 0); + * + * char buf[32]; + * ReadFuture readFuture(fd, &buf, sizeof(buf), 0); + * readFuture.setDependency(writeFuture); + * + * CloseFuture closeFuture(fd); + * closeFuture.setDependency(readFuture); + * + * // Blocks till all three futures are completed + * int bytes_read = closeFuture.submitAndWait(); + * + * // So request in the chain could not be fully completed + * if (bytes_read == -ECANCELED) { + * } + * @endcode + * + */ + inline void setDependency(Future& dependency) { + dependency.state.dependency = true; + this->dependency = &dependency; + } + + /* + * @brief submit Future for asynchronous completion to the workers IoContext + */ + void submit(); + + /** + * @brief cancel Future and wait if necessary for its cancellation + * + * @return return the result received from the io_uring + */ + auto cancel() -> int32_t; + + /** + * @brief Block till the IO request is completed + * + * @return return the result received from the io_uring + */ + auto wait() -> int32_t; + + /** + * @brief Block till the IO request is completed and set errno on error + * + * @return -1 on error and errno will be set, otherwise the return value of the IO request + */ + auto waitAndSetErrno() -> ssize_t; + + /** + * @brief Equivalent to calling wait() after calling submit() + */ + inline auto submitAndWait() -> int32_t { + submit(); + return wait(); + } + + friend auto operator<<(std::ostream& os, const Future& f) -> std::ostream& { + return os << f.op << " Future " << (void*)&f; + } + + friend auto operator<<(std::ostream& os, const Future* f) -> std::ostream& { + if (f) { + return os << *f; + } else { + return os << "Future nullptr"; + } + } +}; + +class PartialCompletableFuture : public Future { + friend class Stats; + friend class IoContext; + friend class RecvFuture; + + enum class CompletionType { + Resubmission, + IncrementalCompletion, + Completion, + }; + + /** + * @brief Try to complete this IO request using the result of an IO operation. + * If the IO request is completed return true otherwise resubmit it. + * + * @param res Result from an IO operation from a io_uring_cqe or a common syscall + * + * @return the type of completion + */ + template <CallerEnvironment callerEnvironment> + auto tryComplete(int32_t res) -> CompletionType; + + protected: + /** + * This member is used internally to handle partial completion. + * If initialized with -1 it signals that the IO request should immediately return to the user. + * A value >= 0 signals that this IO request should not * return to the user until it is + * fully completed or a persistent error arises. + * The IO request is considered fully completed if partialCompletion == len. + * + * When the IO request returns to the user the combination of @c future.returnValue and + * @c future.partialCompletion can signal partial completion. + */ + ssize_t partialCompletion; + + // return immediately to the user + static const ssize_t DISABLE_PARTIAL_COMPLETION = -1; + // try to fully complete the request + static const ssize_t ENABLE_PARTIAL_COMPLETION = 0; + + PartialCompletableFuture(Operation op, int fd, void* buf, size_t len, int offsetOrFlags, + bool completeFully) + : Future(op, fd, buf, len, offsetOrFlags), + partialCompletion(completeFully ? ENABLE_PARTIAL_COMPLETION : DISABLE_PARTIAL_COMPLETION){}; + + PartialCompletableFuture(Operation op, int fd, void* buf, size_t len, int offsetOrFlags, + ssize_t partialCompletion) + : Future(op, fd, buf, len, offsetOrFlags), partialCompletion(partialCompletion){}; + + /** + * Used for Stats::recordCompletion double dispatch + */ + virtual void recordCompletion(Stats& stats, int32_t res) override { + if constexpr (emper::STATS) { + recordCompletionInternal(stats, res); + } + } + + void recordCompletionInternal(Stats& stats, int32_t res) const; + + void complete(int32_t res) override { + CompletionType completion = tryComplete<CallerEnvironment::EMPER>(res); + + switch (completion) { + case CompletionType::Resubmission: + return; + case CompletionType::IncrementalCompletion: + Future::complete(partialCompletion); + return; + case CompletionType::Completion: + Future::complete(res); + return; + } + } + + void completeFromAnywhere(int32_t res) override { + CompletionType completion = tryComplete<CallerEnvironment::ANYWHERE>(res); + + switch (completion) { + case CompletionType::Resubmission: + return; + case CompletionType::IncrementalCompletion: + Future::completeFromAnywhere(partialCompletion); + return; + case CompletionType::Completion: + Future::completeFromAnywhere(res); + return; + } + } + + public: + /** + * @brief Block till the IO request is completed + * + * @param[out] partialCompletion The amount of work done till an persistent error arose + * + * @return return the result received from the io_uring + */ + auto wait(ssize_t& partialCompletion) -> int32_t { + int32_t res = Future::wait(); + partialCompletion = this->partialCompletion; + return res; + } + using Future::wait; + + /* + * @brief Equivalent to calling wait(partialCompletion) after calling submit() + */ + inline auto submitAndWait(ssize_t& partialCompletion) -> int32_t { + Future::submit(); + return wait(partialCompletion); + } + using Future::submitAndWait; + + /** + * @brief reset the Future and its partialCompletion. + */ + void reset() override { + if (partialCompletion != DISABLE_PARTIAL_COMPLETION) { + partialCompletion = ENABLE_PARTIAL_COMPLETION; + } + + Future::reset(); + } +}; + +class SendFuture : public PartialCompletableFuture { + void prepareSqe(io_uring_sqe* sqe) override { + if (partialCompletion == DISABLE_PARTIAL_COMPLETION) { + io_uring_prep_send(sqe, fd, buf, len, offsetOrFlags); + } else { + io_uring_prep_send(sqe, fd, (char*)buf + partialCompletion, len - partialCompletion, + offsetOrFlags); + } + } + + public: + SendFuture(int socket, const void* buffer, size_t length, int flags, ssize_t partialCompletion) + : PartialCompletableFuture(Operation::SEND, socket, const_cast<void*>(buffer), length, flags, + partialCompletion){}; + + SendFuture(int socket, const void* buffer, size_t length, int flags, bool send_all = true) + : SendFuture(socket, const_cast<void*>(buffer), length, flags, + send_all ? ENABLE_PARTIAL_COMPLETION : DISABLE_PARTIAL_COMPLETION){}; +}; + +class RecvFuture : public Future { + void prepareSqe(io_uring_sqe* sqe) override { + io_uring_prep_recv(sqe, fd, buf, len, offsetOrFlags); + } + + public: + RecvFuture(int socket, void* buffer, size_t length, int flags) + : Future(Operation::RECV, socket, buffer, length, flags){}; +}; + +class ConnectFuture : public Future { + void prepareSqe(io_uring_sqe* sqe) override { + io_uring_prep_connect(sqe, fd, (const struct sockaddr*)buf, (socklen_t)len); + } + + public: + ConnectFuture(int socket, const struct sockaddr* address, socklen_t address_len) + : Future(Operation::CONNECT, socket, (void*)address, address_len, 0){}; +}; + +class AcceptFuture : public Future { + void prepareSqe(io_uring_sqe* sqe) override { + io_uring_prep_accept(sqe, fd, (struct sockaddr*)buf, (socklen_t*)len, 0); + } + + public: + AcceptFuture(int socket, const struct sockaddr* address, socklen_t* address_len) + : Future(Operation::ACCEPT, socket, (void*)address, (size_t)address_len, 0){}; +}; + +class ReadFuture : public PartialCompletableFuture { + void prepareSqe(io_uring_sqe* sqe) override { + if (partialCompletion == DISABLE_PARTIAL_COMPLETION) { + io_uring_prep_read(sqe, fd, buf, len, offsetOrFlags); + } else { + io_uring_prep_read(sqe, fd, (char*)buf + partialCompletion, len - partialCompletion, + offsetOrFlags + partialCompletion); + } + } + + public: + ReadFuture(int fildes, void* buf, size_t nbyte, int offset, bool read_all = false) + : PartialCompletableFuture( + Operation::READ, fildes, buf, nbyte, offset, + read_all ? ENABLE_PARTIAL_COMPLETION : DISABLE_PARTIAL_COMPLETION){}; +}; + +class WriteFuture : public PartialCompletableFuture { + void prepareSqe(io_uring_sqe* sqe) override { + if (partialCompletion == DISABLE_PARTIAL_COMPLETION) { + io_uring_prep_write(sqe, fd, buf, len, offsetOrFlags); + } else { + // TODO: think about partial writes with incremental offset! + // Are transparent incremental writes on files without O_APPEND even reasonable? + io_uring_prep_write(sqe, fd, (char*)buf + partialCompletion, len - partialCompletion, + offsetOrFlags + partialCompletion); + } + } + + public: + WriteFuture(int fildes, const void* buf, size_t nbyte, int offset, bool write_all = true) + : PartialCompletableFuture( + Operation::WRITE, fildes, const_cast<void*>(buf), nbyte, offset, + write_all ? ENABLE_PARTIAL_COMPLETION : DISABLE_PARTIAL_COMPLETION){}; +}; + +class WritevFuture : public Future { + void prepareSqe(io_uring_sqe* sqe) override { + io_uring_prep_writev(sqe, fd, (const struct iovec*)buf, len, 0); + } + + /** + * Used for Stats::recordCompletion double dispatch + */ + void recordCompletion(Stats& stats, int32_t res) override { + if constexpr (emper::STATS) { + recordCompletionInternal(stats, res); + } + } + + void recordCompletionInternal(Stats& stats, int32_t res) const; + + public: + WritevFuture(int fildes, const struct iovec* iov, int iovnct) + : Future(Operation::WRITEV, fildes, (void*)iov, iovnct, 0){}; +}; + +class CloseFuture : public Future { + void prepareSqe(io_uring_sqe* sqe) override { io_uring_prep_close(sqe, fd); } + + public: + CloseFuture(int fildes) : Future(Operation::CLOSE, fildes, nullptr, 0, 0){}; + + void submitAndForget() { + state.forgotten = true; + submit(); + } +}; + +/* + * @brief Add a timeout to any Future not already submitted. + * + * A Timeout is added to a Future by inserting an dependent IOURING_OP_LINK_TIMEOUT + * sqe after the actual IO operation. + * The dependent timeout will not be started like usual dependent requests after + * the previous request is completed, it is instead armed immediately when the + * actual request is started. + * See: https://lwn.net/Articles/803932/ + * + * @code + * ReadFuture rf(fd, buf, buf_len, 0); + * struct __kernel_timespec ts = {.tv_sec = 0, .tv_nsec = 0); + * TimeoutWrapper t(rf, ts); + * // Both futures should be awaited by calling its wait method to + * // prevent use after free problems. + * t.submitAndWait() + * rf.Wait() + * @endcode + */ +class TimeoutWrapper : public Future { + void prepareSqe(io_uring_sqe* sqe) override { + io_uring_prep_link_timeout(sqe, (struct ::__kernel_timespec*)buf, 0); + } + + public: + TimeoutWrapper(Future& future, struct __kernel_timespec& ts) + : Future(Operation::LINK_TIMEOUT, 0, (void*)&ts, 0, 0) { + setDependency(future); + }; +}; + +/* + * @brief Arm a timeout which will signal the future when it is reached + * + * If the timeout was reached it will wait() will return -ETIME. + */ +class AlarmFuture : public Future { + void prepareSqe(io_uring_sqe* sqe) override { + io_uring_prep_timeout(sqe, (struct ::__kernel_timespec*)buf, 0, 0); + } + + public: + AlarmFuture(struct __kernel_timespec& ts) : Future(Operation::TIMEOUT, 0, (void*)&ts, 0, 0){}; +}; + +/* + * @brief Request the cancellation of another Future and await the cancellation + * + * If the target was found and cancelled wait() will immediately return 0. + * If the target was already completed wait() will immediately return -ENOENT. + * If the target was already started wait() will return -EALREADY. + */ +class CancelWrapper : public Future { + void prepareSqe(io_uring_sqe* sqe) override { io_uring_prep_cancel(sqe, buf, 0); } + + public: + CancelWrapper(Future& future) : Future(Operation::CANCEL, 0, &future, 0, 0){}; +}; +} // namespace emper::io diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp new file mode 100644 index 00000000..df9eb29b --- /dev/null +++ b/emper/io/IoContext.cpp @@ -0,0 +1,372 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include "io/IoContext.hpp" + +#include <liburing.h> // for io_uring_submit, io_uring_get_sqe +#include <liburing/io_uring.h> // for io_uring_cqe, io_uring_params, IORI... +#include <sys/eventfd.h> // for eventfd +#include <unistd.h> // for close + +#include <atomic> // for atomic, __atomic_base +#include <cassert> // for assert +#include <cerrno> // for errno, ECANCELED, EBUSY, EAGAIN, EINTR +#include <chrono> // for nanoseconds +#include <cstdio> // for perror +#include <cstdlib> // for exit +#include <cstring> // for memset +#include <memory> // for allocator + +#include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER, ANYWHERE +#include "Common.hpp" // for unlikely, DIE_MSG_ERRNO, DIE_MSG +#include "Debug.hpp" // for LOGD +#include "Emper.hpp" // for DEBUG, IO_URING_SQPOLL +#include "Runtime.hpp" // for Runtime +#include "io/Future.hpp" // for Future, operator<<, Future::State +#include "io/Stats.hpp" // for Stats, nanoseconds + +#ifndef EMPER_LOG_OFF +#include <ostream> // for basic_osteram::operator<<, operator<< +#endif + +// use the most significant bit of a pointer to differ between an IoContext and a Future +// in the global IoContext's CQ +static const uintptr_t IOCONTEXT_TAG = 1L << (sizeof(size_t) * 8 - 1); +static const uintptr_t IOCONTEXT_TAG_MASK = IOCONTEXT_TAG - 1; + +namespace emper::io { +thread_local IoContext *IoContext::workerIo = nullptr; + +pthread_t IoContext::globalCompleter; + +auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> unsigned { + unsigned total_chain_length = chain_length; + + if (unlikely(chain_length > *this->ring.sq.kring_entries)) { + DIE_MSG("Future chain is to long for the io_uring's SQ"); + } + + if (future.dependency) { + total_chain_length = prepareFutureChain(*future.dependency, chain_length + 1); + } + + if constexpr (emper::DEBUG) { + reqs_in_uring++; + uringFutureSet.insert(&future); + } + + struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); + // The kernel consumes all sqes from the SQ during io_uring_enter + // If we can't get a sqe the chain was to long for our SQ + assert(sqe); + + future.prepareSqe(sqe); + + // Someone wants to be notified about the completion of this Future + if (!(future.isForgotten())) { + io_uring_sqe_set_data(sqe, &future); + } + + future.state.submitted = true; + + if (future.isDependency()) { + LOGD("Prepare " << future << " as a dependency"); + sqe->flags |= IOSQE_IO_LINK; + } + + return total_chain_length; +} + +template <CallerEnvironment callerEnvironment> +void IoContext::submit(Future &future) { + LOGD("submitting " << future << (future.dependency ? " and it's dependencies" : "")); +#ifdef NDEBUG + UNUSED_ARG +#endif + int prepared = static_cast<int>(prepareFutureChain(future, 1)); + + // submit the Future to the io_uring + int submitted = io_uring_submit(&ring); + + // We can't submit our sqe because the CQ is full + // in worker thread -> reapCompletions + // in globalCompleter thread -> TODO: deterministically handle global full CQ + // for now hope a jam does not happen or will solve itself + if (unlikely(submitted == -EBUSY)) { + if constexpr (emper::DEBUG) { + std::stringstream sst; + sst << "io_submit returned EBUSY trying to submit in addition to " << reqs_in_uring + << std::endl; + logI(sst.str()); + } else { + logI("io_submit returned EBUSY"); + } + + if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { + // we are not busy looping in the globalIo + stats.record_io_submit_full_cq(std::chrono::nanoseconds(0)); + return; + } + + TIME_NS( + { + do { + reapCompletions(); + } while ((submitted = io_uring_submit(&ring)) == -EBUSY); + }, + stats.record_io_submit_full_cq); + } + + if (unlikely(submitted < 0)) { + errno = -submitted; + DIE_MSG_ERRNO("io_uring_submit failed"); + } + + // We submitted some Futures to the io_uring + // Because we submit every Future right away multiple prepared sqes can only + // occur for Future chains. + // If we could not submit the whole chain cancel all non submitted Futures + // because we can not guaranty the soundness of the chain + // req1 -> invalid_req -> req3 + // will submit only 2 instead of all 3 prepared sqes + // See: https://github.com/axboe/liburing/issues/186 + if (unlikely(submitted < prepared)) { + unsigned unsubmitted = io_uring_sq_ready(&ring); + Future *unsubmittedFuture = &future; + while (unsubmitted) { + unsubmittedFuture->recordCompletion(stats, -ECANCELED); + if constexpr (callerEnvironment == EMPER) { + unsubmittedFuture->complete(-ECANCELED); + } else { + unsubmittedFuture->completeFromAnywhere(-ECANCELED); + } + unsubmittedFuture = future.dependency; + unsubmitted--; + } + } + + // io_uring will try to synchronously complete any IO request before + // offloading it to the async backend. See io_uring_enter(2). + // Immediate offloading can be enforced by setting the IOSQE_ASYNC flag in the sqe. + // Try to reap a possible synchronous completion if we are on a worker's io_uring. + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + reapCompletions<callerEnvironment>(); + } +} + +// show the compiler our template incarnations +template void IoContext::submit<CallerEnvironment::EMPER>(Future &future); +template void IoContext::submit<CallerEnvironment::ANYWHERE>(Future &future); + +template <CallerEnvironment callerEnvironment> +void IoContext::reapCompletions() { + // Someone else is currently reaping completions + if (unlikely(!cq_mutex.try_lock())) { + return; + } + + // never reap completions on the global IoContext + assert(this != getGlobalIo()); + + LOGD("Reaping completions"); + unsigned head; + struct io_uring_cqe *cqe; + unsigned count = 0; + + int err = io_uring_peek_cqe(&ring, &cqe); + if (err) { + if (err == -EAGAIN) { + goto unlock; + } + errno = -err; + DIE_MSG_ERRNO("io_uring_peek_cqe failed"); + } + + io_uring_for_each_cqe(&ring, head, cqe) { + count++; + + auto *future = reinterpret_cast<Future *>(io_uring_cqe_get_data(cqe)); + + // Got a CQE for a forgotten Future + if (!future) { + continue; + } + + if constexpr (emper::DEBUG) { + // assert that the future was previously in the uringFutureSet + assert(uringFutureSet.erase(future) > 0); + } + + future->recordCompletion(stats, cqe->res); + if constexpr (callerEnvironment == EMPER) { + future->complete(cqe->res); + } else { + future->completeFromAnywhere(cqe->res); + } + } + + LOGD("got " << count << " cqes from the io_uring"); + io_uring_cq_advance(&ring, count); + + if constexpr (emper::DEBUG) { + assert(count <= reqs_in_uring); + reqs_in_uring -= count; + } + + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + stats.record_worker_reaps(count); + } else { + stats.record_completer_reaps(count); + } + +unlock: + cq_mutex.unlock(); +} + +// Show the compiler our template incarnations +template void IoContext::reapCompletions<CallerEnvironment::ANYWHERE>(); +template void IoContext::reapCompletions<CallerEnvironment::EMPER>(); + +// The globalCompleter must be started after all worker io_uring eventfds are submitted +// so we don't have to synchronize the global sq +auto IoContext::globalCompleterFunc(void *arg) -> void * { + IoContext &io = *reinterpret_cast<IoContext *>(arg); + + io.logD("submit all worker io_uring eventfds"); + + // submit all eventfds in the SQ inserted by IoContext::submit_efd calls + int submitted = io_uring_submit(&io.ring); + if (unlikely(submitted < 0)) { + DIE_MSG_ERRNO("initial global io_uring submit failed"); + } + + // We have submitted all eventfds + assert(submitted == Runtime::getRuntime()->getWorkerCount()); + + io.logD("start global completer loop"); + while (true) { + struct io_uring_cqe *cqe; + + // wait for completions + int err = io_uring_wait_cqe(&io.ring, &cqe); + if (unlikely(err)) { + if (err == -EINTR) { + continue; + } + errno = -err; + perror("io_uring_wait_cqe"); + } + + auto data = (uintptr_t)io_uring_cqe_get_data(cqe); + + // The cqe is for a completed Future + if (!(data & IOCONTEXT_TAG)) { + auto *future = reinterpret_cast<Future *>(data); + + uint32_t res = cqe->res; + io_uring_cqe_seen(&io.ring, cqe); + + future->recordCompletion(io.stats, res); + future->completeFromAnywhere(res); + continue; + } + + // The cqe is for a IoContext.eventfd read + // -> there are completions on this worker IoContext + auto *worker_io = reinterpret_cast<IoContext *>(data & IOCONTEXT_TAG_MASK); + assert(worker_io); + + io_uring_cqe_seen(&io.ring, cqe); + + // re-add the eventfd read + struct io_uring_sqe *sqe = io_uring_get_sqe(&io.ring); + // we initialized the global ring big enough we should always get a free sqe + assert(sqe); + + io_uring_prep_read(sqe, worker_io->ring_eventfd, &worker_io->ring_eventfd_readbuf, + sizeof(worker_io->ring_eventfd_readbuf), 0); + io_uring_sqe_set_data(sqe, reinterpret_cast<void *>(data)); + + submitted = io_uring_submit(&io.ring); + + if (unlikely(submitted < 0)) { + errno = -submitted; + DIE_MSG_ERRNO("re-submitting eventfd read to global_ring failed"); + } + + assert(submitted == 1); + + worker_io->reapCompletions<CallerEnvironment::ANYWHERE>(); + } + + return nullptr; +} + +void IoContext::startGlobalCompleter(IoContext &globalIo) { + int err = pthread_create(&globalCompleter, nullptr, globalCompleterFunc, &globalIo); + if (unlikely(err)) { + errno = err; + DIE_MSG_ERRNO("Creating global completer thread failed"); + } +} + +IoContext::IoContext(size_t uring_entries) { + struct io_uring_params params; + memset(¶ms, 0, sizeof(params)); + + if constexpr (emper::IO_URING_SQPOLL) { + params.flags |= IORING_SETUP_SQPOLL; + } + + auto ret = io_uring_queue_init_params(uring_entries, &ring, ¶ms); + if (ret < 0) { + // skip tests if we are not able to setup an io_uring + perror("io_uring_queue_init failed"); + exit(77); + } + LOGD("Ring fd is " << ring.ring_fd); + + if constexpr (emper::IO_URING_SQPOLL) { + // included in liburing since 41e0d97cb23667df000ce76789297f4e06134a28 +#ifndef IORING_FEAT_SQPOLL_NONFIXED +#define IORING_FEAT_SQPOLL_NONFIXED (1U << 7) +#endif + if (!(params.features & IORING_FEAT_SQPOLL_NONFIXED)) { + DIE_MSG("io_uring_sqpoll defined but kernel does not support IORING_FEAT_SQPOLL_NONFIXED"); + } + } + + ring_eventfd = eventfd(0, 0); + if (unlikely(ring_eventfd < 0)) { + DIE_MSG_ERRNO("creating eventfd for io_uring failed"); + } + + if (unlikely(io_uring_register_eventfd(&ring, ring_eventfd) < 0)) { + DIE_MSG_ERRNO("io_uring_register_eventfd failed"); + } +} + +// This function must not be executed in parallel because it does not synchronize +// the global io_uring's SQ +void IoContext::submit_efd() { + // the global IoContext must be initialized to submit our eventfd + assert(getGlobalIo()); + + IoContext &globalIo = *getGlobalIo(); + + struct io_uring_sqe *sqe = io_uring_get_sqe(&globalIo.ring); + // we initialized the global ring big enough we should always get a free sqe + assert(sqe); + + io_uring_prep_read(sqe, ring_eventfd, &ring_eventfd_readbuf, sizeof(ring_eventfd_readbuf), 0); + auto *tagged_io_ptr = reinterpret_cast<void *>((uintptr_t)this | IOCONTEXT_TAG); + io_uring_sqe_set_data(sqe, tagged_io_ptr); + + // The sqe we prepared will be submitted to io_uring when the globalCompleter starts. +} + +IoContext::~IoContext() { + io_uring_queue_exit(&ring); + // TODO: check if this is safe + ::close(ring_eventfd); +} +} // namespace emper::io diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp new file mode 100644 index 00000000..943f6206 --- /dev/null +++ b/emper/io/IoContext.hpp @@ -0,0 +1,132 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#pragma once + +#include <liburing.h> // for io_uring +#include <pthread.h> // for pthread_t + +#include <atomic> // for atomic +#include <cassert> // for assert +#include <cstddef> // for size_t +#include <cstdint> // for uint64_t +#include <functional> // for less +#include <memory> // for allocator +#include <mutex> // for mutex + +#include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER +#include "Debug.hpp" // for LogSubsystem, LogSubsystem::IO, Logger +#include "Runtime.hpp" // for Runtime +#include "emper-config.h" // for EMPER_IO_WORKER_URING_ENTRIES +#include "io/Stats.hpp" // for Stats +#include "lib/adt/LockedSet.hpp" // for LockedSet + +namespace emper::io { +class Future; + +class IoContext : public Logger<LogSubsystem::IO> { + friend class ::Runtime; + friend class Future; + friend class SendFuture; + friend class RecvFuture; + + private: + static thread_local IoContext *workerIo; + + // pthread used to monitor the CQs from worker io_urings + // as well as handling IO requests submitted from anywhere + static pthread_t globalCompleter; + + /* function executed by the global completer thread */ + static auto globalCompleterFunc(void *) -> void *; + + // start the global completer thread + // this must be called after all worker IoContexts' eventfds are submitted + // to the global IoContext + static void startGlobalCompleter(IoContext &globalIo); + + // Mutex protecting the completion queue of ring. + // It is used with try_lock() in reapCompletions. + std::mutex cq_mutex; + struct io_uring ring; + + // eventfd registered with ring. + // It is used to notify the globalCompleter about available completions in ring's CQ. + int ring_eventfd; + uint64_t ring_eventfd_readbuf; + + // submit the eventfd of the io_uring to the global IoContext + // All eventfds must be registered before the globalCompleter thread is started + void submit_efd(); + + Stats stats; + + // Members useful for debugging + std::atomic<long> reqs_in_uring = 0; + + // set containing all Futures currently in the io_uring for debugging + emper::lib::adt::LockedSet<Future *> uringFutureSet; + + /** + * @brief recursively prepare a Future and its dependencies + * + * Futures prepared as dependency will have disabled partial completion + * support and are submitted to io_uring with the IOSQE_IO_LINK flag + * set. + * Abort on preparing a longer list then there a places in the io_uring. + * + * @return the number of prepared Futures + */ + auto prepareFutureChain(Future &future, unsigned chain_length) -> unsigned; + + inline void setWorkerIo() { workerIo = this; } + + /** + * @brief Return the globalIo of the current Runtime + */ + static inline auto getGlobalIo() -> IoContext * { return Runtime::getRuntime()->getGlobalIo(); } + + public: + IoContext(size_t uring_entries); + IoContext() : IoContext(EMPER_IO_WORKER_URING_ENTRIES){}; + ~IoContext(); + + /** + * @brief get IoContext of the current worker + * + * This function must be called from with in the emper runtime + * + * @return the thread_local IoContext of the current worker + */ + static inline auto getWorkerIo() -> IoContext * { + assert(Runtime::inRuntime()); + return workerIo; + } + + /** + * @brief Return either the worker or the globalIo dependent on the callerEnvironment + */ + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + static inline auto getIo() -> IoContext * { + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + return getWorkerIo(); + } else { + return getGlobalIo(); + } + } + + /** + * @brief Submit a future for asynchronous completion in this IoContext + * + * @param future The Future which should be completed. The Future object + * must stay valid until it is completed. + */ + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + void submit(Future &future); + + /** + * @brief Schedule all fibers waiting on completed IO + */ + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + void reapCompletions(); +}; +} // namespace emper::io diff --git a/emper/io/Operation.cpp b/emper/io/Operation.cpp new file mode 100644 index 00000000..83a67abe --- /dev/null +++ b/emper/io/Operation.cpp @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include "io/Operation.hpp" + +#include <cstdlib> // for abort +#include <ostream> // for operator<<, stringstream, basi... + +namespace emper::io { +auto operator<<(std::ostream& os, const Operation& op) -> std::ostream& { + switch (op) { + case Operation::SEND: + os << "send"; + break; + case Operation::RECV: + os << "recv"; + break; + case Operation::CONNECT: + os << "connect"; + break; + case Operation::ACCEPT: + os << "accept"; + break; + case Operation::READ: + os << "read"; + break; + case Operation::WRITE: + os << "write"; + break; + case Operation::WRITEV: + os << "writev"; + break; + case Operation::CLOSE: + os << "close"; + break; + case Operation::LINK_TIMEOUT: + os << "linked timeout"; + break; + case Operation::TIMEOUT: + os << "timeout"; + break; + case Operation::CANCEL: + os << "cancel"; + break; + default: + abort(); + } + return os; +} +} // namespace emper::io diff --git a/emper/io/Operation.hpp b/emper/io/Operation.hpp new file mode 100644 index 00000000..49891929 --- /dev/null +++ b/emper/io/Operation.hpp @@ -0,0 +1,26 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#pragma once + +#include <ostream> + +namespace emper::io { + +enum class Operation { + SEND = 0, + RECV, + CONNECT, + ACCEPT, + READ, + WRITE, + WRITEV, + CLOSE, + LINK_TIMEOUT, + TIMEOUT, + CANCEL, + NUMBER_OF_OPERATIONS +}; + +auto operator<<(std::ostream& os, const Operation& o) -> std::ostream&; + +} // namespace emper::io diff --git a/emper/io/Stats.cpp b/emper/io/Stats.cpp new file mode 100644 index 00000000..42ac4c4c --- /dev/null +++ b/emper/io/Stats.cpp @@ -0,0 +1,160 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Fischer +#include "io/Stats.hpp" + +#include <cassert> // for assert +#include <initializer_list> // for initializer_list +#include <iomanip> // for operator<<, setfill, setw +#include <string> // for operator<<, to_string +#include <utility> // for pair + +#include "io/Operation.hpp" // for Operation, operator<<, Operation::RECV + +namespace emper::io { + +std::vector<Stats*> Stats::workerStats; + +auto operator<<(std::ostream& os, const Stats::CompletionType& t) -> std::ostream& { + switch (t) { + case Stats::CompletionType::FullCompletion: + os << "full completion"; + break; + case Stats::CompletionType::ErrorCompletion: + os << "error completion"; + break; + case Stats::CompletionType::PartialCompletion: + os << "partial completion"; + break; + case Stats::CompletionType::PartialResubmission: + os << "partial resubmission"; + break; + case Stats::CompletionType::IncrementalCompletion: + os << "incremental completion"; + break; + case Stats::CompletionType::IncrementalError: + os << "incremental error"; + break; + default: + abort(); + } + return os; +} + +auto operator<<(std::ostream& os, const Stats& s) -> std::ostream& { + if (s.workerId == -1) { + bool futuresToPrint = false; + std::stringstream ss; + ss << "# global IO Stats: #" << std::endl; + // Only PartialCompletableFutures can submitted to the globalIo + for (const auto& op : {Operation::SEND, Operation::READ, Operation::WRITE}) { + const auto& op_map = s.io_uring_completions.at(op); + + ss << op << ": "; + ss << std::left << std::setfill(' ') << std::setw(8) << ss.str(); + + for (const auto& tp : op_map) { + auto completionType = tp.first; + auto count = tp.second; + + if (count > 0) { + futuresToPrint = true; + } + + ss << count << " " << completionType << " "; + } + ss << std::endl; + } + + if (futuresToPrint) { + os << ss.str(); + } + + return os; + } + + if (s.workerId == -2) { + os << "# AVG IO worker Stats: #" << std::endl; + } else { + os << "# IO Stats of worker " << std::to_string(s.workerId) << ": #" << std::endl; + } + + for (const auto& pp : s.io_uring_completions) { + const auto& op = pp.first; + const auto& op_map = pp.second; + + std::stringstream ss; + ss << op << ": "; + os << std::left << std::setfill(' ') << std::setw(16) << ss.str(); + + for (const auto& tp : op_map) { + auto completionType = tp.first; + auto count = tp.second; + os << count << " " << completionType << " "; + } + + os << std::endl; + } + + if (s.workerId == -1) { + return os; + } + + os << "io_uring_submit reap completion loops: " << s.io_submit_full_cq << " taking " + << s.io_submit_full_cq_running_mean << "ns on average" << std::endl; + + os << "worker reaped completions " << s.worker_reap << " times reaping " + << s.worker_reaped_completions << " completions in total" << std::endl; + + os << "global completer thread reaped completions " << s.completer_reap << " times reaping " + << s.completer_reaped_completions << " completions in total" << std::endl; + + return os; +} + +static void calcRunningAvg(uint64_t& avg, int n, uint64_t x) { + int64_t diff = ((int64_t)x - (int64_t)avg) / n; + assert(diff >= 0 || (uint64_t)-diff <= avg); + avg += diff; + assert(avg < 1000000); +} + +void Stats::printWorkerStats() { + auto* globalStats = workerStats.front(); + workerStats.erase(workerStats.begin()); + std::cout << *globalStats << std::endl; + + // Use a stats object to calculate the averages + Stats avgs; + // mark as averages + avgs.workerId = -2; + + // calculate avgs + int i = 1; + for (auto& stats : workerStats) { + for (const auto& op_pair : stats->io_uring_completions) { + auto op = op_pair.first; + auto op_map = op_pair.second; + + for (const auto& compl_pair : op_map) { + auto compl_typ = compl_pair.first; + auto compl_count = compl_pair.second; + calcRunningAvg(avgs.io_uring_completions[op][compl_typ], i, compl_count); + } + } + + calcRunningAvg(avgs.io_submit_full_cq, i, stats->io_submit_full_cq); + calcRunningAvg(avgs.io_submit_full_cq_running_mean, i, stats->io_submit_full_cq_running_mean); + + calcRunningAvg(avgs.completer_reap, i, stats->completer_reap); + calcRunningAvg(avgs.completer_reaped_completions, i, stats->completer_reaped_completions); + + calcRunningAvg(avgs.worker_reap, i, stats->worker_reap); + calcRunningAvg(avgs.worker_reaped_completions, i, stats->worker_reaped_completions); + ++i; + } + + // print avgs + std::cout << avgs << std::endl; +} + +} // namespace emper::io diff --git a/emper/io/Stats.hpp b/emper/io/Stats.hpp new file mode 100644 index 00000000..08824aa1 --- /dev/null +++ b/emper/io/Stats.hpp @@ -0,0 +1,205 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Fischer +#pragma once +#include <bits/types/struct_iovec.h> // for iovec +#include <sys/types.h> // for ssize_t + +#include <chrono> // for nanoseconds +#include <cstddef> // for size_t +#include <cstdint> // for uint64_t, int32_t, uint32_t +#include <cstdlib> // for abort +#include <iostream> // for operator<<, basic_ostream, basi... +#include <map> // for map, map<>::value_compare +#include <vector> // for vector + +#include "Debug.hpp" // for LOGW +#include "Emper.hpp" // for STATS +#include "emper-common.h" // for workerid_t +#include "io/Future.hpp" // for PartialCompletableFuture, Write... +#include "io/Operation.hpp" // for Operation, Operation::RECV, Ope... +class Runtime; // lines 28-28 + +#define RETURN_IF_NO_STATS() \ + if constexpr (!emper::STATS) { \ + return; \ + } + +namespace emper::io { + +using emper::io::Operation; +using std::chrono::nanoseconds; + +class Stats { + friend class ::Runtime; + friend class IoContext; + friend class Future; + friend class PartialCompletableFuture; + friend class SendFuture; + friend class RecvFuture; + friend class WritevFuture; + + int workerId = 0; + static_assert(sizeof(int) > sizeof(workerid_t)); + + static std::vector<Stats*> workerStats; + + enum CompletionType { + FullCompletion = 0, + ErrorCompletion, + PartialCompletion, + PartialResubmission, + IncrementalCompletion, + IncrementalError, + END_OF_COMPLETION_TYPES + }; + + /** + * @brief Maps containing all completion events + * + * The first dimension are the different Operations defined by emper::io::Operation + * + * The second dimension is used to differentiate the available completion types per Operation + * 0: full completions + * 1: error completions + * 2: partial completions + * 3: incremental completions + * 4: incremental error + * 5: partial resubmissions + * 6: immediate resubmissions + */ + + typedef std::map<Operation, std::map<CompletionType, uint64_t>> CompletionMap; + + CompletionMap io_uring_completions = { + {Operation::SEND, + {{FullCompletion, 0}, + {ErrorCompletion, 0}, + {PartialCompletion, 0}, + {IncrementalCompletion, 0}, + {IncrementalError, 0}, + {PartialResubmission, 0}}}, + {Operation::RECV, {{FullCompletion, 0}, {ErrorCompletion, 0}, {PartialCompletion, 0}}}, + {Operation::CONNECT, {{FullCompletion, 0}, {ErrorCompletion, 0}}}, + {Operation::ACCEPT, {{FullCompletion, 0}, {ErrorCompletion, 0}}}, + {Operation::READ, + {{FullCompletion, 0}, + {ErrorCompletion, 0}, + {PartialCompletion, 0}, + {IncrementalCompletion, 0}, + {IncrementalError, 0}, + {PartialResubmission, 0}}}, + {Operation::WRITE, + {{FullCompletion, 0}, + {ErrorCompletion, 0}, + {PartialCompletion, 0}, + {IncrementalCompletion, 0}, + {IncrementalError, 0}, + {PartialResubmission, 0}}}, + {Operation::WRITEV, {{FullCompletion, 0}, {ErrorCompletion, 0}, {PartialCompletion, 0}}}, + {Operation::CLOSE, {{FullCompletion, 0}, {ErrorCompletion, 0}}}, + {Operation::LINK_TIMEOUT, {{FullCompletion, 0}, {ErrorCompletion, 0}}}, + {Operation::TIMEOUT, {{FullCompletion, 0}, {ErrorCompletion, 0}}}, + {Operation::CANCEL, {{FullCompletion, 0}, {ErrorCompletion, 0}}}, + }; + uint64_t io_submit_full_cq = 0; + uint64_t io_submit_full_cq_running_mean = 0; + + uint64_t completer_reap = 0; + uint64_t completer_reaped_completions = 0; + + uint64_t worker_reap = 0; + uint64_t worker_reaped_completions = 0; + + static void printWorkerStats(); + + Stats() { + if constexpr (emper::STATS) { + workerStats.push_back(this); + } + } + + void recordCompletion(const Future& f, const int32_t& res) { + record_completion(f.op, res, PartialCompletableFuture::DISABLE_PARTIAL_COMPLETION, f.len); + } + + void recordCompletion(const PartialCompletableFuture& f, const int32_t& res) { + record_completion(f.op, res, f.partialCompletion, f.len); + } + + void recordCompletion(const WritevFuture& f, const int32_t& res) { + size_t exp = 0; + auto iov = reinterpret_cast<const struct iovec*>(f.buf); + for (unsigned i = 0; i < f.len; ++i) { + exp += iov[i].iov_len; + } + + record_completion(Operation::WRITEV, res, PartialCompletableFuture::DISABLE_PARTIAL_COMPLETION, + exp); + } + + void record_completion(Operation op, int32_t res, ssize_t partial_completion, size_t exp) { + auto& operation_map = io_uring_completions[op]; + + // Persistent Error + if (res < 0) { + if (partial_completion > 0) { + operation_map[IncrementalError]++; + } else { + operation_map[ErrorCompletion]++; + } + return; + } + + // Full completion + if (!exp || (uint32_t)res == exp) { + operation_map[FullCompletion]++; + return; + } + + // we expect partial completion + if (partial_completion != PartialCompletableFuture::DISABLE_PARTIAL_COMPLETION) { + if ((size_t)(res + partial_completion) < exp) { + operation_map[PartialResubmission]++; + } else { + operation_map[IncrementalCompletion]++; + } + return; + } + + if ((uint32_t)res < exp) { + operation_map[PartialCompletion]++; + return; + } + + LOGW("Unexpected completion case in IoStats"); + LOGW(op << ", " << res << ", " << partial_completion << ", " << exp); + abort(); + } + + // running mean calculation taken from + // https://math.stackexchange.com/questions/106700/incremental-averageing + inline void record_io_submit_full_cq(nanoseconds ns) { + RETURN_IF_NO_STATS(); + io_submit_full_cq++; + int64_t diff = (ns.count() - io_submit_full_cq_running_mean) / io_submit_full_cq; + io_submit_full_cq_running_mean += diff; + } + + inline void record_completer_reaps(unsigned count) { + RETURN_IF_NO_STATS(); + + completer_reap++; + completer_reaped_completions += count; + } + + inline void record_worker_reaps(unsigned count) { + RETURN_IF_NO_STATS(); + + worker_reap++; + worker_reaped_completions += count; + } + + friend auto operator<<(std::ostream& os, const Stats& s) -> std::ostream&; + friend auto operator<<(std::ostream& os, const Stats::CompletionType& t) -> std::ostream&; +}; +} // namespace emper::io diff --git a/emper/io/io.cpp b/emper/io/io.cpp new file mode 100644 index 00000000..140f91c9 --- /dev/null +++ b/emper/io/io.cpp @@ -0,0 +1,71 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include "io.hpp" + +#include <arpa/inet.h> +#include <netinet/in.h> +#include <sys/socket.h> + +#include <cerrno> +#include <cstring> + +#include "Common.hpp" +#include "Debug.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" + +#ifndef EMPER_LOG_OFF +#include <ostream> +#endif + +namespace emper::io { +auto tcp_listener(std::string& host, int& port, const std::function<void(int)>& handler, + int backlog) -> Fiber* { + int listen_socket; + struct sockaddr_in servaddr; + + // socket creation and verification + listen_socket = socket(AF_INET, SOCK_STREAM, 0); + if (listen_socket == -1) { + LOGE("listen socket creation failed: " << strerror(errno)); + return nullptr; + } + memset(&servaddr, 0, sizeof(servaddr)); + + // assign IP, PORT + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = inet_addr(host.c_str()); + servaddr.sin_port = htons(port); + + int reuseaddr = 1; + if (setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1) { + LOGE("setsockopt failed: " << strerror(errno)); + return nullptr; + } + + if (bind(listen_socket, (sockaddr*)&servaddr, sizeof(servaddr)) == -1) { + LOGE("bind failed: " << strerror(errno)); + return nullptr; + } + + if (listen(listen_socket, backlog) != 0) { + LOGE("listen failed: " << strerror(errno)); + return nullptr; + } + + Runtime* runtime = Runtime::getRuntime(); + + return Fiber::from([=] { + for (;;) { + int client_fd = acceptAndWait(listen_socket, nullptr, nullptr); + if (unlikely(client_fd < 0)) { + LOGE("accept failed: " << strerror(errno)); + return; + } + + Fiber* handler_fiber = Fiber::from([=] { handler(client_fd); }); + runtime->schedule(*handler_fiber); + } + }); +} +} // namespace emper::io diff --git a/emper/io/meson.build b/emper/io/meson.build new file mode 100644 index 00000000..75ee73eb --- /dev/null +++ b/emper/io/meson.build @@ -0,0 +1,9 @@ +emper_io_include = include_directories('.') +emper_library_include += [emper_io_include] +emper_cpp_sources += files( + 'io.cpp', + 'Future.cpp', + 'Stats.cpp', + 'Operation.cpp', + 'IoContext.cpp', +) diff --git a/emper/meson.build b/emper/meson.build index 0ffa5616..c3c85bb0 100644 --- a/emper/meson.build +++ b/emper/meson.build @@ -37,6 +37,7 @@ subdir('include') emper_library_include = [] emper_library_include += include_directories('.') +subdir('io') subdir('lib') subdir('strategies') diff --git a/meson.build b/meson.build index e161441d..00d12552 100644 --- a/meson.build +++ b/meson.build @@ -12,7 +12,8 @@ project('EMPER', 'c', 'cpp', add_project_arguments('-Wno-non-virtual-dtor', language: 'cpp') thread_dep = dependency('threads') -emper_dependencies = [thread_dep] +uring_dep = dependency('liburing') +emper_dependencies = [thread_dep, uring_dep] run_target('iwyu', command: 'tools/check-iwyu') @@ -48,6 +49,31 @@ endif conf_data.set('EMPER_LOG_LEVEL', log_level) conf_data.set('EMPER_LOG_TIMESTAMP', get_option('log_timestamp')) +option_io = get_option('io') +if option_io + conf_data.set('EMPER_IO', true) +endif + +io_bool_options = [ + 'uring_sqpoll', +] + +io_raw_options = [ + 'worker_uring_entries', +] + +foreach option : io_bool_options + value = get_option('io_' + option) + if value == true and not option_io + error('io_' + option + ' defined without io') + endif + conf_data.set('EMPER_IO_' + option.to_upper(), value) +endforeach + +foreach option : io_raw_options + conf_data.set('EMPER_IO_' + option.to_upper(), get_option('io_' + option)) +endforeach + subdir('emper') subdir('tests') subdir('apps') diff --git a/meson_options.txt b/meson_options.txt index 5e8d939d..3fa7046e 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -74,3 +74,21 @@ option( value: true, description: 'Use an overflow queue in case scheduling queues become full', ) +option( + 'io', + type: 'boolean', + value: true, + description: 'Add support for asynchronous IO using io_uring' +) +option( + 'io_worker_uring_entries', + type: 'integer', + value: 16, + description: 'Number of entries in each worker io_uring' +) +option( + 'io_uring_sqpoll', + type: 'boolean', + value: false, + description: 'Enable io_urings SQPOLL feature (start a separate kernel thread which polls the sq reducing the amount of syscalls to submit new requests. This is a privileged operation.).' +) diff --git a/tests/AlarmFutureTest.cpp b/tests/AlarmFutureTest.cpp new file mode 100644 index 00000000..d6fe915b --- /dev/null +++ b/tests/AlarmFutureTest.cpp @@ -0,0 +1,31 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <linux/time_types.h> // for __kernel_timespec + +#include <cassert> // for assert +#include <cerrno> // for ETIME +#include <chrono> // for microseconds, duration_cast, operator- +#include <compare> // for operator>=, strong_ordering +#include <cstdint> // for int32_t +#include <cstdlib> // for exit, EXIT_SUCCESS + +#include "io/Future.hpp" // for AlarmFuture + +using emper::io::AlarmFuture; + +void emperTest() { + struct __kernel_timespec ts = {.tv_sec = 1, .tv_nsec = 0}; + AlarmFuture alarm(ts); + + auto start = std::chrono::high_resolution_clock::now(); + int32_t res = alarm.submitAndWait(); + auto end = std::chrono::high_resolution_clock::now(); + + // timeouts return -ETIME if they trigger + assert(res == -ETIME); + + assert(std::chrono::duration_cast<std::chrono::microseconds>(end - start) >= + std::chrono::seconds(1)); + + exit(EXIT_SUCCESS); +} diff --git a/tests/CancelFutureTest.cpp b/tests/CancelFutureTest.cpp new file mode 100644 index 00000000..21eaa79f --- /dev/null +++ b/tests/CancelFutureTest.cpp @@ -0,0 +1,62 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <sys/eventfd.h> // for eventfd, EFD_SEMAPHORE + +#include <cassert> // for assert +#include <cerrno> // for ECANCELED, ETIME +#include <cstdint> // for uint64_t, int32_t +#include <cstdlib> // for exit, EXIT_SUCCESS + +#include "Common.hpp" // for DIE_MSG_ERRNO +#include "io/Future.hpp" // for ReadFuture, WriteFuture + +using emper::io::ReadFuture; +using emper::io::WriteFuture; + +void emperTest() { + int efd = eventfd(0, 0); + if (efd == -1) { + DIE_MSG_ERRNO("eventfd failed"); + } + + uint64_t read_buf; + uint64_t write_buf = 42; + // cancel not submitted Future + ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); + assert(readFuture.cancel() == -ENOENT); + + // cancel submitted non-completed Future + readFuture.submit(); + assert(readFuture.cancel() == -ECANCELED); + + readFuture.reset(); + + readFuture.submit(); + WriteFuture writeFuture(efd, &write_buf, sizeof(write_buf), 0); + assert(writeFuture.submitAndWait() == sizeof(write_buf)); + assert(readFuture.cancel() == sizeof(write_buf) && read_buf == write_buf); + + writeFuture.reset(); + read_buf = 0; + readFuture.reset(); + + // cancel a chain + ReadFuture readFuture2(efd, &read_buf, sizeof(read_buf), 0); + readFuture2.setDependency(readFuture); + readFuture2.submit(); + assert(readFuture2.cancel() == -ECANCELED); + assert(readFuture.wait() == -ECANCELED); + + readFuture2.reset(); + readFuture.reset(); + + // cancel partial completed chain + readFuture2.submit(); + assert(writeFuture.submitAndWait() == sizeof(write_buf)); + // TODO: investigate why this read is completed with -EINTR most of the time + int r = readFuture2.cancel(); + assert(r == -EINTR || r == -ECANCELED); + assert(readFuture.wait() == sizeof(write_buf) && read_buf == write_buf); + + exit(EXIT_SUCCESS); +} diff --git a/tests/ConcurrentNetworkEchoTest.cpp b/tests/ConcurrentNetworkEchoTest.cpp new file mode 100644 index 00000000..864065c0 --- /dev/null +++ b/tests/ConcurrentNetworkEchoTest.cpp @@ -0,0 +1,61 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <cstdlib> +#include <iostream> +#include <string> +#include <vector> + +#include "CountingPrivateSemaphore.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" +#include "emper.hpp" +#include "fixtures/network.hpp" +#include "io.hpp" + +#define PORT 4241 +#define BUF_SIZE 1024 + +auto main(int argc, char* argv[]) -> int { + unsigned int client_count = 10; + unsigned int echos = 1000; + int port = PORT; + std::string host = "127.0.0.1"; + + if (argc > 3) { + std::cerr << "Usage: " << argv[0] << " [clients] [echos]" << std::endl; + exit(EXIT_FAILURE); + } + + const int DECIMAL = 10; + if (argc > 1) { + client_count = strtol(argv[1], nullptr, DECIMAL); + } + + if (argc > 2) { + echos = strtol(argv[2], nullptr, DECIMAL); + } + + Runtime runtime; + + runtime.scheduleFromAnywhere(*emper::io::tcp_listener(host, port, echo_serve)); + + Fiber* client_producer = Fiber::from([&] { + CPS cps; + for (unsigned int i = 0; i < client_count; ++i) { + spawn( + [&] { + const std::vector<std::string> msgs = {"foo"}; + echo_client("127.0.0.1", PORT, msgs, echos); + }, + cps); + } + cps.wait(); + exit(EXIT_SUCCESS); + }); + + runtime.scheduleFromAnywhere(*client_producer); + + runtime.waitUntilFinished(); + + return EXIT_FAILURE; +} diff --git a/tests/IncrementalCompletionTest.cpp b/tests/IncrementalCompletionTest.cpp new file mode 100644 index 00000000..229ce178 --- /dev/null +++ b/tests/IncrementalCompletionTest.cpp @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Fischer +#include <unistd.h> // for pipe + +#include <cassert> // for assert +#include <cstdint> // for uint64_t, int32_t +#include <cstdlib> // for exit, EXIT_SUCCESS +#include <cstring> // for memcmp + +#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG +#include "io/Future.hpp" // for ReadFuture, CloseFuture, WriteFuture + +using emper::io::CloseFuture; +using emper::io::ReadFuture; +using emper::io::WriteFuture; + +static const size_t MEMSIZE = 10 << 20; + +void emperTest() { + auto* memOut = new char[MEMSIZE]; + auto* memIn = new char[MEMSIZE]; + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + int pipefds[2]; + + if (pipe(pipefds) == -1) { + DIE_MSG_ERRNO("pipe failed"); + } + + WriteFuture writeFuture(pipefds[1], memOut, MEMSIZE, 0, true); + writeFuture.submit(); + + ReadFuture readFuture(pipefds[0], memIn, MEMSIZE, 0, true); + readFuture.submit(); + + int32_t res = writeFuture.wait(); + assert(res == MEMSIZE); + + res = readFuture.wait(); + assert(res == MEMSIZE); + + assert(memcmp(memOut, memIn, MEMSIZE) == 0); + + CloseFuture closeWriteSide(pipefds[1]); + assert(closeWriteSide.submitAndWait() == 0); + + CloseFuture closeReadSide(pipefds[0]); + assert(closeReadSide.submitAndWait() == 0); + + delete[] memOut; + delete[] memIn; + + exit(EXIT_SUCCESS); +} diff --git a/tests/LinkFutureTest.cpp b/tests/LinkFutureTest.cpp new file mode 100644 index 00000000..eb7289ba --- /dev/null +++ b/tests/LinkFutureTest.cpp @@ -0,0 +1,64 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <sys/eventfd.h> // for eventfd +#include <sys/types.h> // for ssize_t + +#include <cassert> // for assert +#include <cerrno> // for EBADF, ECANCELED +#include <cstdint> // for uint64_t, int32_t +#include <cstdlib> // for exit, EXIT_SUCCESS + +#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG +#include "io/Future.hpp" // for ReadFuture, CloseFuture, WriteFuture + +using emper::io::CloseFuture; +using emper::io::ReadFuture; +using emper::io::WriteFuture; + +static void successChain() { + int efd = eventfd(0, 0); + if (efd == -1) { + DIE_MSG_ERRNO("eventfd failed"); + } + + uint64_t write_buf = 42; + WriteFuture writeFuture(efd, &write_buf, sizeof(write_buf), 0); + + uint64_t read_buf; + ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); + readFuture.setDependency(writeFuture); + + CloseFuture closeFuture(efd); + closeFuture.setDependency(readFuture); + ssize_t res = closeFuture.submitAndWait(); + + if (res == -1) { + DIE_MSG_ERRNO("linked requests chain failed"); + } + + if (read_buf != 42) { + DIE_MSG("dependent read value differs from written value"); + } +} + +static void failureChain() { + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char buf[32]; + ReadFuture invalidReadFuture(42, &buf, sizeof(buf), 0); + + ReadFuture readFuture(0, &buf, sizeof(buf), 0); + readFuture.setDependency(invalidReadFuture); + + int32_t res = readFuture.submitAndWait(); + assert(res == -ECANCELED); + + res = invalidReadFuture.wait(); + assert(res == -EBADF); +} + +void emperTest() { + successChain(); + failureChain(); + + exit(EXIT_SUCCESS); +} diff --git a/tests/ReuseBpsTest.cpp b/tests/ReuseBpsTest.cpp new file mode 100644 index 00000000..82d8a06a --- /dev/null +++ b/tests/ReuseBpsTest.cpp @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#include <cstdlib> // for exit, EXIT_SUC... + +#include "BinaryPrivateSemaphore.hpp" // for BPS +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "emper.hpp" // for spawn + +void emperTest() { + CPS cps; + BPS bps1, bps2; + + const int ITERATIONS = 100; + + spawn( + [&bps1, &bps2] { + for (int i = 0; i < ITERATIONS; ++i) { + bps1.wait(); + bps1.reset(); + bps2.signal(); + } + }, + cps); + + spawn( + [&bps1, &bps2] { + for (int i = 0; i < ITERATIONS; ++i) { + bps1.signal(); + bps2.wait(); + bps2.reset(); + } + }, + cps); + + // Wait for the fibers to finish. + cps.wait(); + + exit(EXIT_SUCCESS); +} diff --git a/tests/ReuseFutureTest.cpp b/tests/ReuseFutureTest.cpp new file mode 100644 index 00000000..7bc930c8 --- /dev/null +++ b/tests/ReuseFutureTest.cpp @@ -0,0 +1,66 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <sys/eventfd.h> // for eventfd, EFD_SEMAPHORE + +#include <cerrno> // for errno +#include <cstdint> // for uint64_t, int32_t +#include <cstdlib> // for exit, EXIT_SUCCESS + +#include "Common.hpp" // for DIE_MSG_ERRNO +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "emper.hpp" // for spawn +#include "io/Future.hpp" // for ReadFuture, WriteFuture + +using emper::io::ReadFuture; +using emper::io::WriteFuture; + +void emperTest() { + CPS cps; + int efd = eventfd(0, EFD_SEMAPHORE); + if (efd == -1) { + DIE_MSG_ERRNO("eventfd failed"); + } + + uint64_t read_buf; + ReadFuture read_future(efd, &read_buf, sizeof(read_buf), 0); + + uint64_t write_buf = 1; + WriteFuture write_future(efd, &write_buf, sizeof(write_buf), 0); + + const int ITERATIONS = 100; + + spawn( + [&read_future] { + for (int i = 0; i < ITERATIONS; ++i) { + int32_t res = read_future.submitAndWait(); + if (res < 0) { + errno = -res; + DIE_MSG_ERRNO("read failed"); + } + + // reset the BPS used to signal the completion of this future + read_future.reset(); + } + }, + cps); + + spawn( + [&write_future] { + for (int i = 0; i < ITERATIONS; ++i) { + int32_t res = write_future.submitAndWait(); + if (res < 0) { + errno = -res; + DIE_MSG_ERRNO("write failed"); + } + + // reset the BPS used to signal the completion of this future + write_future.reset(); + } + }, + cps); + + // Wait for the fibers to finish. + cps.wait(); + + exit(EXIT_SUCCESS); +} diff --git a/tests/SimpleDiskAndNetworkTest.cpp b/tests/SimpleDiskAndNetworkTest.cpp new file mode 100644 index 00000000..f9caa677 --- /dev/null +++ b/tests/SimpleDiskAndNetworkTest.cpp @@ -0,0 +1,178 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <arpa/inet.h> // for inet_addr +#include <bits/types/struct_iovec.h> // for iovec +#include <fcntl.h> // for open, O_RDONLY +#include <netinet/in.h> // for sockaddr_in, htons, in_addr +#include <sys/socket.h> // for bind, listen, setsockopt +#include <unistd.h> // for close + +#include <cassert> // for assert +#include <cstdlib> // for mkstemp, exit, EXIT_SUCCESS +#include <cstring> // for memcmp, memset +#include <memory> // for allocator, unique_ptr +#include <string> // for string +#include <vector> // for vector + +#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "emper.hpp" // for spawn +#include "fixtures/network.hpp" // for echo_client +#include "io.hpp" // for readFile, accept, recv, send +#include "io/Future.hpp" // for Future + +#define PORT 4243 +#define MAX 1024 +static void server_func() { + int sockfd; + struct sockaddr_in servaddr; + + // socket create and varification + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd == -1) { + DIE_MSG_ERRNO("socket creation failed"); + } + memset(&servaddr, 0, sizeof(servaddr)); + + // assign IP, PORT + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = inet_addr("127.0.0.1"); + servaddr.sin_port = htons(PORT); + + int reuseaddr = 1; + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1) { + DIE_MSG_ERRNO("setsockopt failed"); + } + if (bind(sockfd, (sockaddr*)&servaddr, sizeof(servaddr)) == -1) { + DIE_MSG_ERRNO("bind failed"); + } + if (listen(sockfd, 1) != 0) { + DIE_MSG_ERRNO("listen failed"); + } + + struct sockaddr_in clientaddr; + socklen_t clientaddr_len = sizeof(clientaddr); + auto client_fd = + emper::io::accept(sockfd, reinterpret_cast<struct sockaddr*>(&clientaddr), &clientaddr_len) + ->wait(); + + if (client_fd < 0) { + DIE_MSG_ERRNO("accept failed"); + } + + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char recv_buf[MAX]; + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char read_buf[MAX]; + + for (;;) { + int received = emper::io::recv(client_fd, recv_buf, sizeof(recv_buf), 0)->wait(); + if (received == 0) { + exit(EXIT_SUCCESS); + } + + if (received == -1) { + DIE_MSG_ERRNO("recv failed"); + break; + } + + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char file_name[] = "/tmp/emper-SimpleDiskAndNetworkTestFile-XXXXXX"; + int file_fd = mkstemp(file_name); + if (file_fd == -1) { + DIE_MSG_ERRNO("mkstemp failed"); + } + + int written = emper::io::writeFile(file_fd, recv_buf, received)->wait(); + if (written < 0) { + DIE_MSG_ERRNO("write failed"); + } + close(file_fd); + + file_fd = open(file_name, O_RDONLY); + if (file_fd == -1) { + DIE_MSG_ERRNO("open failed"); + } + + int bytes_read = emper::io::readFile(file_fd, read_buf, written)->wait(); + if (bytes_read == 0) { + DIE_MSG("nothing to read"); + } + + if (bytes_read < 0) { + DIE_MSG_ERRNO("read failed"); + } + close(file_fd); + + int sent = emper::io::send(client_fd, read_buf, bytes_read, 0)->wait(); + if (sent == 0) { + DIE_MSG("client socket unexpected shutdown"); + } + + if (sent == -1) { + DIE_MSG_ERRNO("send failed"); + } + + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char file2_name[] = "/tmp/emper-SimpleDiskAndNetworkTestFile-XXXXXX"; + file_fd = mkstemp(file2_name); + if (file_fd == -1) { + DIE_MSG_ERRNO("mkstemp failed"); + } + + const int iovcnt = 2; + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + struct iovec iov[iovcnt]; + + std::string s1 = "foo"; + std::string s2 = "bar"; + + iov[0].iov_base = (void*)s1.c_str(); + iov[0].iov_len = s1.length(); + iov[1].iov_base = (void*)s2.c_str(); + iov[1].iov_len = s2.length(); + + auto writevFuture = emper::io::writev(file_fd, &iov[0], iovcnt); + written = writevFuture->wait(); + if (written < 0) { + DIE_MSG_ERRNO("wrtev failed"); + } + close(file_fd); + + file_fd = open(file2_name, O_RDONLY); + if (file_fd == -1) { + DIE_MSG_ERRNO("open failed"); + } + + auto readFuture = emper::io::readFile(file_fd, read_buf, written, 0, true); + bytes_read = readFuture->wait(); + if (bytes_read == 0) { + DIE_MSG("nothing to read"); + } + + if (bytes_read < 0) { + DIE_MSG_ERRNO("read failed"); + } + + assert(written == bytes_read); + + assert(memcmp(read_buf, iov[0].iov_base, iov[0].iov_len) == 0); + assert(memcmp((char*)read_buf + iov[0].iov_len, iov[1].iov_base, iov[1].iov_len) == 0); + + close(file_fd); + } +} + +void emperTest() { + CPS cps; + spawn(server_func, cps); + + spawn( + [] { + const std::vector<std::string> msgs{"foo", "bar"}; + echo_client("127.0.0.1", PORT, msgs); + }, + cps); + + cps.wait(); +} diff --git a/tests/SimpleNetworkTest.cpp b/tests/SimpleNetworkTest.cpp new file mode 100644 index 00000000..35158625 --- /dev/null +++ b/tests/SimpleNetworkTest.cpp @@ -0,0 +1,27 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <string> // for string, allocator +#include <vector> // for vector + +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "emper.hpp" // for async, spawn +#include "fixtures/network.hpp" // for echo_client, echo_serve +#include "io.hpp" // for tcp_listener + +#define MAX 1024 +int PORT = 4242; +std::string ADDR = "127.0.0.1"; + +void emperTest() { + CPS cps; + async(emper::io::tcp_listener(ADDR, PORT, echo_serve)); + + spawn( + [] { + const std::vector<std::string> strings = {"foo", "bar", std::string(MAX, 'a'), "quit\n"}; + echo_client(ADDR.c_str(), PORT, strings); + }, + cps); + + cps.wait(); +} diff --git a/tests/TimeoutWrapperTest.cpp b/tests/TimeoutWrapperTest.cpp new file mode 100644 index 00000000..8d1ee001 --- /dev/null +++ b/tests/TimeoutWrapperTest.cpp @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Fischer +#include <linux/time_types.h> // for __kernel_timespec +#include <sys/eventfd.h> // for eventfd, EFD_SEMAPHORE + +#include <cassert> // for assert +#include <cerrno> // for ECANCELED, ETIME +#include <cstdint> // for uint64_t, int32_t +#include <cstdlib> // for exit, EXIT_SUCCESS + +#include "Common.hpp" // for DIE_MSG_ERRNO +#include "io/Future.hpp" // for ReadFuture, TimeoutWrapper + +using emper::io::ReadFuture; +using emper::io::TimeoutWrapper; + +void emperTest() { + int efd = eventfd(0, EFD_SEMAPHORE); + if (efd == -1) { + DIE_MSG_ERRNO("eventfd failed"); + } + + uint64_t read_buf; + ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); + + struct __kernel_timespec ts = {.tv_sec = 1, .tv_nsec = 0}; + TimeoutWrapper t(readFuture, ts); + + int32_t res = t.submitAndWait(); + assert(res == -ETIME); + + res = readFuture.wait(); + assert(res == -ECANCELED); + + exit(EXIT_SUCCESS); +} diff --git a/tests/TooLongFutureChain.cpp b/tests/TooLongFutureChain.cpp new file mode 100644 index 00000000..a9954de4 --- /dev/null +++ b/tests/TooLongFutureChain.cpp @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include <linux/time_types.h> // for __kernel_timespec + +#include <array> // for array +#include <cstdlib> // for exit, EXIT_SUCCESS, size_t + +#include "emper-config.h" // for EMPER_IO_WORKER_URING_ENTRIES +#include "io/Future.hpp" // for AlarmFuture + +using emper::io::AlarmFuture; + +void emperTest() { + const size_t links = EMPER_IO_WORKER_URING_ENTRIES * 2; + std::array<AlarmFuture*, links> futures; + struct __kernel_timespec ts = {.tv_sec = 0, .tv_nsec = 100}; + + for (unsigned i = 0; i < links; ++i) { + auto* future = new AlarmFuture(ts); + if (i > 0) { + future->setDependency(*futures[i - 1]); + } + + futures[i] = future; + } + + futures[links - 1]->submitAndWait(); + exit(EXIT_SUCCESS); +} diff --git a/tests/fixtures/meson.build b/tests/fixtures/meson.build new file mode 100644 index 00000000..9b3f3c13 --- /dev/null +++ b/tests/fixtures/meson.build @@ -0,0 +1,18 @@ +test_fixtures_include_dir = include_directories('.') + +test_fixtures_sources = [] +if get_option('io') + test_fixtures_sources += ['network.cpp'] +endif + +test_fixtures_lib = library( + 'test-fixtures', + test_fixtures_sources, + include_directories: emper_all_include, + link_with: emper, +) + +test_fixtures = declare_dependency( + include_directories: test_fixtures_include_dir, + link_with: test_fixtures_lib, +) diff --git a/tests/fixtures/network.cpp b/tests/fixtures/network.cpp new file mode 100644 index 00000000..97858086 --- /dev/null +++ b/tests/fixtures/network.cpp @@ -0,0 +1,100 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include "network.hpp" + +#include <arpa/inet.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/types.h> + +#include <cassert> +#include <cstdlib> +#include <cstring> +#include <vector> + +#include "Common.hpp" +#include "io.hpp" + +#define MAX 1024 + +static void send_all(int fd, const void* buf, size_t len) { + // emper send everything or encounters an unrecoverable state + ssize_t bytes_send = emper::io::sendAndWait(fd, buf, len, MSG_NOSIGNAL, true); + if (unlikely(bytes_send == -1)) { + DIE_MSG_ERRNO("send failed"); + } +} + +void echo_serve(int client_socket) { + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char buf[MAX]; + for (;;) { + ssize_t bytes_recv = emper::io::recvAndWait(client_socket, buf, sizeof(buf), 0); + // socket was shutdown + if (bytes_recv == 0) { + return; + } + + if (bytes_recv == -1) { + DIE_MSG_ERRNO("server read failed"); + } + + if (unlikely(strncmp("quit\n", buf, bytes_recv) == 0)) { + exit(EXIT_SUCCESS); + } + + send_all(client_socket, buf, bytes_recv); + } +} + +void echo_client(const char* addr, int port, const std::vector<std::string>& msgs, + size_t iterations) { + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd == -1) { + DIE_MSG_ERRNO("client socket creation failed"); + } + + struct sockaddr_in server_addr; + // assign IP, PORT + memset(&server_addr, 0, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = inet_addr(addr); + server_addr.sin_port = htons(port); + + if (emper::io::connectAndWait(sockfd, reinterpret_cast<struct sockaddr*>(&server_addr), + sizeof(server_addr)) == -1) { + DIE_MSG_ERRNO("connect failed"); + } + + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char buf[MAX]; +#ifndef NDEBUG + memset(buf, 0, MAX); +#endif + + for (size_t i = 0; i < iterations; ++i) { + for (const auto& s : msgs) { + assert(s.length() <= MAX); + send_all(sockfd, s.c_str(), s.length()); + + ssize_t bytes_recv = emper::io::recvAndWait(sockfd, buf, s.length(), MSG_WAITALL); + if (bytes_recv == 0) { + DIE_MSG("client: socket closed"); + } + + if (bytes_recv == -1) { + DIE_MSG_ERRNO("client read failed"); + } + + assert(bytes_recv > 0); + assert((size_t)bytes_recv == s.length()); + + if (memcmp(s.c_str(), buf, s.length()) != 0) { + // if (strncmp(s.c_str(), buf, bytes_recv) != 0) { + DIE_MSG("unexpected string received from server"); + } + } + } + + shutdown(sockfd, SHUT_RDWR); +} diff --git a/tests/fixtures/network.hpp b/tests/fixtures/network.hpp new file mode 100644 index 00000000..271cb2d3 --- /dev/null +++ b/tests/fixtures/network.hpp @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#pragma once + +#include <cstddef> +#include <string> +#include <vector> + +void echo_serve(int client_socket); + +void echo_client(const char* addr, int port, const std::vector<std::string>& msgs, + size_t iterations = 1); diff --git a/tests/meson.build b/tests/meson.build index a4ba8ce8..264746c2 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -1,7 +1,11 @@ +subdir('fixtures') + cc = meson.get_compiler('c') liburcu_memb = option_urcu ? cc.find_library('urcu-memb') : disabler() liburcu_cds = option_urcu ? cc.find_library('urcu-cds') : disabler() +# TODO: Parallel tests should be run without io_uring compiled +# into emper, to avoid running into `ulimit -l`. tests = { 'SimpleFibTest.cpp': { @@ -12,7 +16,7 @@ tests = { { 'description': 'Simplest fib test', 'test_suite': 'smoke', - 'is_parallel': true, +# 'is_parallel': true, }, 'c_api_test.c': @@ -25,6 +29,12 @@ tests = { 'description': 'Test EMPER\'s C++ API', }, + 'ReuseBpsTest.cpp': + { + 'description': 'Test resetting of BPSs', + 'emper_test_runner': true, + }, + 'SimpleActorTest.cpp': { 'description': 'Simple Actor Test', @@ -53,16 +63,97 @@ tests = { { 'description': 'Simple test for PrivateSemaphore:signalFromAnywhere()', 'test_suite': 'smoke', - 'is_parallel': true, +# 'is_parallel': true, 'emper_test_runner': true, }, 'TellActorFromAnywhereTest.cpp': { 'description': 'Simple test for Actor:tellFromAnywhere()', 'test_suite': 'smoke', - 'is_parallel': true, +# 'is_parallel': true, + 'emper_test_runner': true, + }, + + 'IncrementalCompletionTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Test incremental completion for hugre reads/writes', + 'test_suite': 'io', + 'emper_test_runner': true, + }, + + 'ReuseFutureTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Test reusing Future objects', + 'test_suite': 'io', + 'emper_test_runner': true, + }, + + 'LinkFutureTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Test linking Future objects', + 'test_suite': 'io', + 'emper_test_runner': true, + }, + + 'TooLongFutureChain.cpp': + { + 'feature_flags': ['io'], + 'description': 'Test linking Future objects', + 'test_suite': 'io', + 'should_fail': true, + 'emper_test_runner': true, + }, + + 'AlarmFutureTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Test AlarmFuture object based timeouts', + 'test_suite': 'io', + 'emper_test_runner': true, + }, + + 'TimeoutWrapperTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Test TimeoutWrapper object based IO request timeouts', + 'test_suite': 'io', 'emper_test_runner': true, }, + + 'CancelFutureTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Test Future cancellation', + 'test_suite': 'io', + 'emper_test_runner': true, + }, + + 'SimpleNetworkTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Simple network test', + 'test_suite': 'io', + 'emper_test_runner': true, + }, + + 'SimpleDiskAndNetworkTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Simple network and disk IO test', + 'test_suite': 'io', + 'emper_test_runner': true, + }, + + 'ConcurrentNetworkEchoTest.cpp': + { + 'feature_flags': ['io'], + 'description': 'Concurrent network echo test with 10 clients', + 'test_suite': 'io', + 'args': ['10', '10000'], + }, } undef_ndebug = '-UNDEBUG' @@ -79,13 +170,27 @@ test_env = environment( subdir('test-runner') foreach source, test_dict : tests + # check feature flags + if test_dict.has_key('feature_flags') + include_test = true + foreach flag : test_dict.get('feature_flags') + if not get_option(flag) + include_test = false + break + endif + endforeach + + if not include_test + continue + endif + endif # TODO: Use meson fs (filesystem) module once meson >= 0.53 is in # buster-backports, instead of split('.')[0] # test_name = fs.replace_suffix(source, '') # The test_name is the name of the source file without the file suffix. test_name = source.split('.')[0] - test_deps = [thread_dep] + test_deps = [thread_dep, test_fixtures] if test_dict.has_key('dependencies') test_deps += test_dict['dependencies'] endif @@ -106,7 +211,9 @@ foreach source, test_dict : tests test_exe, is_parallel: test_dict.get('is_parallel', false), suite: test_dict.get('test_suite', 'all'), - timeout: 180, env: test_env, + timeout: test_dict.get('timeout', 180), + args: test_dict.get('args', []), + should_fail: test_dict.get('should_fail', false), ) endforeach -- GitLab