diff --git a/emper/Emper.hpp b/emper/Emper.hpp index c3bf3f2e0b4a2ae8eacab555a5eecefb04a7f37e..8293acb3974449e67c9c3dc4d04572d3086e3bae 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -185,8 +185,8 @@ static const bool WAITFREE_IO_STEALING = // warnings during the comparison use not yet initialized components is reduced. extern const bool IO_MUST_INVALIDATE_BROKEN_CHAIN; -static const bool IO_URING_SHARED_WQ = -#ifdef EMPER_IO_URING_SHARED_WQ +static const bool IO_URING_SHARED_POLLER = +#ifdef EMPER_IO_URING_SHARED_POLLER true #else false diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 907bbebcda273596905e6c81bce4c8922821f211..2c4db2992fd6da40b4a101fbda62dcff913466cf 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -243,12 +243,7 @@ auto Runtime::workerLoop(Worker* worker) -> void* { if constexpr (emper::IO_WORKER_URING) { auto* workerIo = new IoContext(*this); - - if constexpr (emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) { - // submit the workers' CQ eventfds to the global IoContext - globalIo->registerWorkerIo(*workerIo); - } - // notify the globalCompleter that we have registered our eventfd + // notify the completer that we have created our IoContext ioReadySem.notify(); ioContexts[worker->workerId] = workerIo; diff --git a/emper/io/GlobalIoContext.cpp b/emper/io/GlobalIoContext.cpp index a4f01718ba0f073ee41431e30b88597813fd331d..422cc93508d2afc16e5d7ba1a9e26f1c94184584 100644 --- a/emper/io/GlobalIoContext.cpp +++ b/emper/io/GlobalIoContext.cpp @@ -9,7 +9,8 @@ #include <cassert> #include <cerrno> -#include <mutex> +#include <ostream> +#include <vector> #include "CallerEnvironment.hpp" #include "Common.hpp" @@ -18,6 +19,7 @@ #include "Fiber.hpp" #include "Runtime.hpp" #include "Worker.hpp" +#include "emper-common.h" #include "io/Future.hpp" #include "io/IoContext.hpp" #include "io/SubmitActor.hpp" @@ -33,14 +35,35 @@ void GlobalIoContext::completerLoop() { int submitted; if constexpr (emper::IO_WORKER_URING) { - // wait till all workers registered their IoContext's eventfd + // wait till all workers created their IoContexts for (workerid_t i = 0; i < runtime.workerCount; ++i) { runtime.ioReadySem.wait(); } LOGD("submit all worker io_uring eventfds"); + // register all eventfds for less overhead in the io_uring + { + LOGD("register all worker eventfds as fixed files on global io_uring"); + std::vector<int> fds; + fds.reserve(runtime.getWorkerCount()); + for (auto* workerIo : runtime.ioContexts) { + assert(workerIo->notificationEventFd); + fds.push_back(workerIo->notificationEventFd); + } + + int err = io_uring_register_files(&ring, fds.data(), fds.size()); + if (err) { + DIE_MSG_ERRNO("registering all eventfds on the global IoCobntext failed"); + } + } + + // prepare a read for each worker eventfd + for (auto* workerIo : runtime.ioContexts) { + prepareWorkerNotification(*workerIo); + } - // submit all eventfds in the SQ inserted by IoContext::submit_efd calls + // submit all eventfds + LOGD("submit all prepared worker io_uring eventfd reads"); submitted = io_uring_submit(&ring); if (unlikely(submitted < 0)) { DIE_MSG_ERRNO("initial global io_uring submit failed"); @@ -51,7 +74,7 @@ void GlobalIoContext::completerLoop() { } // Submit the read for our termination eventfd - struct io_uring_sqe* sqe = io_uring_get_sqe(&ring); + struct io_uring_sqe* sqe = getSqe(); assert(sqe); // We reuse the notificationEventFd member of IoContext to receive the termination notification @@ -111,7 +134,11 @@ void GlobalIoContext::completerLoop() { DIE_MSG_ERRNO("re-submitting eventfd read to global_ring failed"); } - assert(submitted == 1); + // Reporting how many sqes were submitted uses khead which + // is written asynchronously by the poller kthread + if constexpr (!emper::IO_URING_SQPOLL) { + assert(submitted == 1); + } if constexpr (emper::IO_COMPLETER_BEHAVIOR == emper::IoCompleterBehavior::wakeup) { // Notify the worker which's CQ has a new cqe @@ -133,6 +160,7 @@ void GlobalIoContext::completerLoop() { #endif case PointerTags::Future : { auto* future = tptr.getPtr<Future>(); + LOGD("Completer got " << future << " completion"); // Forgotten Futures can only be in the single io_uring if constexpr (emper::IO_SINGLE_URING) { @@ -239,26 +267,13 @@ void GlobalIoContext::waitUntilFinished() const { } } -// This function must not be executed in parallel because it does not synchronize -// the SQ -void GlobalIoContext::registerWorkerIo(IoContext& workerIo) { - // Register the worker's notificationEventFd with its io_uring - if (unlikely(io_uring_register_eventfd(&workerIo.ring, workerIo.notificationEventFd) < 0)) { - DIE_MSG_ERRNO("io_uring_register_eventfd failed"); - } - - std::lock_guard<std::mutex> lock(registerLock); - prepareWorkerNotification(workerIo); - // The sqe we prepared will be submitted to io_uring when the globalCompleter starts. -} - void GlobalIoContext::prepareWorkerNotification(IoContext& workerIo) { - struct io_uring_sqe* sqe = io_uring_get_sqe(&ring); - // We initialized the global ring big enough we must always get a free sqe - assert(sqe); + struct io_uring_sqe* sqe = getSqe(); - io_uring_prep_read(sqe, workerIo.notificationEventFd, &workerIo.notificationEventFdBuf, + io_uring_prep_read(sqe, workerIo.worker->getWorkerId(), &workerIo.notificationEventFdBuf, sizeof(workerIo.notificationEventFdBuf), 0); + // set flags after io_uring_prep* calls because they will overwrite the flags + sqe->flags |= IOSQE_FIXED_FILE; io_uring_sqe_set_data(sqe, TaggedPtr(&workerIo, static_cast<uint16_t>(PointerTags::IoContext))); } } // namespace emper::io diff --git a/emper/io/GlobalIoContext.hpp b/emper/io/GlobalIoContext.hpp index 611793f2a34e9f1861420a346d4d090d59c50e8f..7dbf3114ec7d113a6c94f586f7498ad37dd32a7a 100644 --- a/emper/io/GlobalIoContext.hpp +++ b/emper/io/GlobalIoContext.hpp @@ -5,7 +5,6 @@ #include <pthread.h> // for pthread_t #include <cstdint> -#include <mutex> #include <string> #include "emper-common.h" @@ -25,10 +24,6 @@ class GlobalIoContext : public IoContext { worker = nullptr; } - // This mutex is only used to protect the SQ during start up when all workers - // register their IoContext's eventfds in parallel - std::mutex registerLock; - enum class PointerTags : uint16_t { Future, Callback, IoContext, TerminationEvent }; // pthread used to monitor the CQs from worker io_urings @@ -44,7 +39,6 @@ class GlobalIoContext : public IoContext { void initiateTermination(); void waitUntilFinished() const; - void registerWorkerIo(IoContext& workerIo); void prepareWorkerNotification(IoContext& workerIo); }; } // namespace emper::io diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index 1d9598229cf5fa63c5ab64a94e3da54ec57f705d..741b7bb29925b02fe7430850bd59cda599a3aed1 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -53,10 +53,7 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns uringFutureSet.insert(&future); } - struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); - // The kernel consumes all sqes from the SQ during io_uring_enter - // If we can't get a sqe the chain was to long for our SQ - assert(sqe); + struct io_uring_sqe *sqe = getSqe(); future.prepareSqe(sqe); @@ -532,10 +529,16 @@ IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) memset(¶ms, 0, sizeof(params)); if constexpr (emper::IO_URING_SQPOLL) { + // Using a poller needs at least linux 5.15 to safely handle + // broken chains see: 43b8dc163f985341d2a2a24d1bfe0e6bb939f880. + if (emper::IO_MUST_INVALIDATE_BROKEN_CHAIN) { + DIE_MSG("Invalidating broken chains is not possible when using sqpoll"); + } + params.flags |= IORING_SETUP_SQPOLL; } - if constexpr (emper::IO_URING_SHARED_WQ) { + if constexpr (emper::IO_URING_SHARED_POLLER) { auto *gio = runtime.globalIo; if (gio) { params.flags |= IORING_SETUP_ATTACH_WQ; @@ -554,10 +557,6 @@ IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) } if constexpr (emper::IO_URING_SQPOLL) { - // included in liburing since 41e0d97cb23667df000ce76789297f4e06134a28 -#ifndef IORING_FEAT_SQPOLL_NONFIXED -#define IORING_FEAT_SQPOLL_NONFIXED (1U << 7) -#endif if (!(params.features & IORING_FEAT_SQPOLL_NONFIXED)) { DIE_MSG("io_uring_sqpoll defined but kernel does not support IORING_FEAT_SQPOLL_NONFIXED"); } @@ -566,12 +565,22 @@ IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) // reserve space for a full SQ preparedSqes.reserve(*this->ring.sq.kring_entries); - // This eventfd will be registered to retrieve completion notifications when the Runtime - // calls registerWorkerIo() on the globalIo - // Or it will be used to terminate the globalIo - notificationEventFd = eventfd(0, 0); - if (unlikely(notificationEventFd < 0)) { - DIE_MSG_ERRNO("creating eventfd for io_uring failed"); + if constexpr (emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) { + // In a worker IoContext the eventfd is used to notify the completer thread + // about completions. + // In a GlobalIoContext it is used to terminate the completer thread + notificationEventFd = eventfd(0, 0); + if (unlikely(notificationEventFd < 0)) { + DIE_MSG_ERRNO("creating eventfd for io_uring failed"); + } + + // we are in a worker IoContext -> register the eventfd to get notifications + if (runtime.globalIo) { + if (unlikely(io_uring_register_eventfd(&ring, notificationEventFd) < 0)) { + DIE_MSG_ERRNO("io_uring_register_eventfd failed"); + } + LOGD("registered eventfd " << notificationEventFd << " on io_uring " << ring.ring_fd); + } } if constexpr (emper::IO_SINGLE_URING) { diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index d537a464a4b87d5a0c92130e34835a5e82ce64ce..fd0db5a8356332143c437e25753ecb0f81658e66 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -6,8 +6,9 @@ #include <liburing/io_uring.h> #include <array> -#include <atomic> // for atomic -#include <cassert> // for assert +#include <atomic> +#include <cassert> +#include <cerrno> #include <cstddef> // for size_t #include <cstdint> // for uint64_t #include <functional> // for less @@ -276,6 +277,29 @@ class IoContext : public Logger<LogSubsystem::IO> { template <CallerEnvironment callerEnvironment, unsigned toReap> [[nodiscard]] auto reapCompletionsLocked(Fiber **continuations) -> unsigned; + inline auto getSqe() -> struct io_uring_sqe * { + struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if constexpr (emper::IO_URING_SQPOLL) { + // apparently using io_uring_get_sqe after waiting for one getting available + // with io_uring_sqring_wait may not return a usable sqe. + // Therefore we loop here + while (!sqe) { + LOGD("Poller is behind consuming sqes -> wait"); + int err = io_uring_sqring_wait(&ring); + if (unlikely(err)) { + errno = -err; + DIE_MSG_ERRNO("io_uring_sqring_wait failed"); + } + LOGD("Done waiting -> get sqe"); + sqe = io_uring_get_sqe(&ring); + } + } + LOGD("Got sqe " << sqe << " from io_uring " << ring.ring_fd); + assert(sqe); + + return sqe; + } + public: IoContext(Runtime &runtime, size_t uring_entries); IoContext(Runtime &runtime) : IoContext(runtime, EMPER_IO_WORKER_URING_ENTRIES){}; diff --git a/meson.build b/meson.build index 6979092c18d9bcdbdb676528442f9655e4850892..b5040f04839454f010938d9411e49b282aa5158f 100644 --- a/meson.build +++ b/meson.build @@ -101,11 +101,9 @@ io_bool_options = [ {'option': 'stealing'}, {'option': 'lockless_cq'}, {'option': 'single_uring', - 'dependencies': {'io_uring_shared_wq': false, - 'io_completer_behavior': 'schedule'}}, + 'dependencies': {'io_completer_behavior': 'schedule'}}, {'option': 'try_syscall'}, {'option': 'uring_sqpoll'}, - {'option': 'uring_shared_wq'}, {'option': 'waitfree_stealing', 'dependencies': {'io_stealing': true, 'io_lockless_cq': true}}, ] @@ -141,6 +139,12 @@ foreach option : io_raw_options conf_data.set('EMPER_IO_' + option.to_upper(), get_option('io_' + option)) endforeach +if get_option('io_uring_sqpoll') and get_option('io_uring_shared_poller') == 'automatic' + conf_data.set('EMPER_IO_SQPOLL', true) +else + conf_data.set('EMPER_IO_SQPOLL', false) +endif + io_completer_behavior = get_option('io_completer_behavior') if io_completer_behavior == 'maybe_wakeup' if get_option('worker_sleep') diff --git a/meson_options.txt b/meson_options.txt index 8e2b537e16b616fe10ce5e0a1378467d9fd307e8..ec601da5e57d378986d613201499c505b1639911 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -155,10 +155,11 @@ option( description: 'Enable io_urings SQPOLL feature (start a separate kernel thread which polls the sq reducing the amount of syscalls to submit new requests. This is a privileged operation.).' ) option( - 'io_uring_shared_wq', - type: 'boolean', - value: false, - description: 'Share a common async backend between all io_urings' + 'io_uring_shared_poller', + type: 'combo', + value: 'automatic', + choices: ['automatic', 'off'], + description: 'Share a common poller thread backend between all io_urings' ) option( 'io_completer_behavior',