diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index e2cb70b8db6ea180c2324003b3427ae391508435..b7ef3eec7125025643863e24fc052f5c82c27b6b 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -117,6 +117,10 @@ clang-tidy: variables: EMPER_IO_STEALING: 'true' +.emper-lockless-cq: + variables: + EMPER_IO_LOCKED_CQ: 'false' + .default-library-static: variables: EMPER_DEFAULT_LIBRARY: 'static' @@ -314,14 +318,33 @@ test-pipe-sleep-strategy-no-completer: - .emper-pipe-sleep-strategy - .emper-no-completer +test-lockless-cq: + extends: + - .test + - .emper-lockless-cq + test-io-stealing: extends: - .test - .emper-io-stealing +test-lockless-io-stealing: + extends: + - .test + - .emper-io-stealing + - .emper-lockless-cq + test-io-stealing-pipe-no-completer: extends: - .test - .emper-pipe-sleep-strategy - .emper-no-completer - .emper-io-stealing + +test-io-stealing-pipe-no-completer-lockless: + extends: + - .test + - .emper-pipe-sleep-strategy + - .emper-no-completer + - .emper-io-stealing + - .emper-lockless-cq diff --git a/emper/Emper.hpp b/emper/Emper.hpp index dbc73ca6e359e3be11f4ab92ff0bfec8c245e630..fba9961d40169b28eeb62774168f406707106c0d 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -113,6 +113,14 @@ static const bool IO_STEALING = #endif ; +static const bool IO_LOCKED_CQ = +#ifdef EMPER_IO_LOCKED_CQ + true +#else + false +#endif + ; + static const bool IO_SINGLE_URING = #ifdef EMPER_IO_SINGLE_URING true diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index 1183f6fd25d166fdd429ee94f7fa97980cf4b101..a3319071a63fec5ccf8c74ec765c5d5270867359 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -7,6 +7,7 @@ #include <sys/eventfd.h> // for eventfd #include <unistd.h> // for close +#include <algorithm> #include <array> #include <atomic> // for atomic, __atomic_base #include <cassert> // for assert @@ -217,7 +218,91 @@ template void IoContext::submitAndWait<CallerEnvironment::ANYWHERE>(Future &futu unsigned wait_nr); template <CallerEnvironment callerEnvironment> -auto IoContext::reapCompletions(Fiber **continuations, size_t toReap) -> unsigned { +auto IoContext::reapCompletionsLockless(Fiber **continuations, unsigned toReap) -> unsigned { + std::array<Completion, CQE_BATCH_COUNT> reapedCompletions; + + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + stats.recordStealAttempt(); + } + + struct io_uring_cq *cq = &ring.cq; + const unsigned mask = *cq->kring_mask; + auto *atail = reinterpret_cast<std::atomic<unsigned> *>(cq->ktail); + auto *ahead = reinterpret_cast<std::atomic<unsigned> *>(cq->khead); + + // NOTE: just using head for the CAS introduces a possible ABA problem + // if the unsigned head counter overflows during the read and the CAS. + + // Load possibly concurrently used userspace written head pointer + unsigned head = ahead->load(std::memory_order_acquire); + unsigned count; + do { + // Load concurrently used kernel written tail pointer + unsigned tail = atail->load(std::memory_order_acquire); + + // NOTE: This number may already be wrong during its calculation + unsigned ready = tail - head; + + if (!ready) return 0; + + count = std::min(toReap, ready); + + for (unsigned i = 0; i < count; ++i) { + const struct io_uring_cqe *cqe = &cq->cqes[(head + i) & mask]; + void *cqe_data = io_uring_cqe_get_data(cqe); + + // Only the owner is allowed to reap new work notifications + if constexpr (callerEnvironment != CallerEnvironment::OWNER) { + const TaggedPtr tptr(cqe_data); + const auto tag = static_cast<PointerTags>(tptr.getTag()); + if (tag == PointerTags::NewWorkWsq || tag == PointerTags::NewWorkAq) { + // don't consume the new work notification + if (i == 0) return 0; + // Since i starts at 0 using i as count is correct. + // If i = 1 this means we are at the second cqe and + // count = i = 1 will consume only the first cqe. + count = i; + break; + } + } + + auto &reapedCompletion = reapedCompletions[i]; + reapedCompletion.first = cqe->res; + reapedCompletion.second = cqe_data; + } + + // try to consume those cqes we stored in reapedCompletions + } while (!ahead->compare_exchange_weak(head, head + count, std::memory_order_release, + std::memory_order_acquire)); + + LOGD("got " << count << " cqes from worker " << worker->getWorkerId() << "'s io_uring"); + + if constexpr (emper::DEBUG) { + assert(count <= reqs_in_uring); + reqs_in_uring -= count; + } + + // Reaps done by other worker threads are counted in the AbstractWorkStealingStats + // as stolenIoFibers. + stats.record_reaps<callerEnvironment>(count); + + return getContinuationsFromCompletions<callerEnvironment>(reapedCompletions.data(), count, + continuations); +} + +// Show the compiler our template incarnations +template auto IoContext::reapCompletionsLockless<CallerEnvironment::OWNER>(Fiber **continuations, + unsigned toReap) + -> unsigned; +template auto IoContext::reapCompletionsLockless<CallerEnvironment::EMPER>(Fiber **continuations, + unsigned toReap) + -> unsigned; +template auto IoContext::reapCompletionsLockless<CallerEnvironment::ANYWHERE>(Fiber **contiunations, + unsigned toReap) + -> unsigned; + +template <CallerEnvironment callerEnvironment> +auto IoContext::reapCompletionsLocked(Fiber **continuations, unsigned toReap) -> unsigned { // TODO: should only the owner possibly rereap? constexpr bool checkForRereap = callerEnvironment == CallerEnvironment::OWNER; unsigned reReapCount = 0; @@ -229,11 +314,6 @@ auto IoContext::reapCompletions(Fiber **continuations, size_t toReap) -> unsigne // where cq_lock is held as small as possible std::array<Completion, CQE_BATCH_COUNT> reapedCompletions; - // never reap completions on the global IoContext - assert(this != runtime.globalIo); - - LOGD("Reaping completions for worker " << std::to_string(worker->getWorkerId())); - // this label is not used for callerEnvironment::ANYWHERE and thus has to be // annotated with ATTR_UNUSED reap_cqes: @@ -353,12 +433,15 @@ check_for_rereap: } // Show the compiler our template incarnations -template auto IoContext::reapCompletions<CallerEnvironment::OWNER>(Fiber **continuations, - size_t toReap) -> unsigned; -template auto IoContext::reapCompletions<CallerEnvironment::EMPER>(Fiber **continuations, - size_t toReap) -> unsigned; -template auto IoContext::reapCompletions<CallerEnvironment::ANYWHERE>(Fiber **contiunations, - size_t toReap) -> unsigned; +template auto IoContext::reapCompletionsLocked<CallerEnvironment::OWNER>(Fiber **continuations, + unsigned toReap) + -> unsigned; +template auto IoContext::reapCompletionsLocked<CallerEnvironment::EMPER>(Fiber **continuations, + unsigned toReap) + -> unsigned; +template auto IoContext::reapCompletionsLocked<CallerEnvironment::ANYWHERE>(Fiber **contiunations, + unsigned toReap) + -> unsigned; IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) { struct io_uring_params params; diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 82c108bb8fc948390e09726070104adc947dfe0e..05fe4f8c17a1dbda65705aba3b086f54163fb674 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -13,6 +13,7 @@ #include <iterator> #include <mutex> #include <ostream> +#include <string> #include <utility> #include <vector> @@ -52,8 +53,15 @@ class IoContext : public Logger<LogSubsystem::IO> { Runtime &runtime; static thread_local IoContext *workerIo; + + // We must synchronize the CQ if it is accessed by multiple threads. + // This is the case if we use a completer or if other workers try to steal IO. static constexpr bool needsCqLock = (emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) || (emper::IO_STEALING); + + // Are we synchronizing the CQs lockfree + static constexpr bool locklessCq = needsCqLock && !emper::IO_LOCKED_CQ; + // TryLock protecting the completion queue of ring. CACHE_LINE_EXCLUSIVE(std::mutex, cq_lock); struct io_uring ring; @@ -234,6 +242,12 @@ class IoContext : public Logger<LogSubsystem::IO> { return posInBuf; } + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapCompletionsLockless(Fiber **continuations, unsigned toReap) -> unsigned; + + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapCompletionsLocked(Fiber **continuations, unsigned toReap) -> unsigned; + public: IoContext(Runtime &runtime, size_t uring_entries); IoContext(Runtime &runtime) : IoContext(runtime, EMPER_IO_WORKER_URING_ENTRIES){}; @@ -271,7 +285,7 @@ class IoContext : public Logger<LogSubsystem::IO> { if constexpr (callerEnvironment == CallerEnvironment::EMPER) { return IoContext::getWorkerIo(); } else { - // we use a reinterpret_ cast here because GlobalIoContext is incomplete + // we use a reinterpret_cast here because GlobalIoContext is incomplete // at this point but we can't include GlobalIoContext.hpp because it // would introduce a cyclic dependency. return reinterpret_cast<IoContext *>(Runtime::getRuntime()->globalIo); @@ -364,12 +378,26 @@ class IoContext : public Logger<LogSubsystem::IO> { continuation Fibers. * Passing the buffer form the caller to the callee allows the buffer to * be statically allocated. - * @param toReap + * @param toReap length of the continuations buffer * * @return The number of continuation Fibers */ template <CallerEnvironment callerEnvironment> - [[nodiscard]] auto reapCompletions(Fiber **continuations, size_t toReap) -> unsigned; + [[nodiscard]] auto reapCompletions(Fiber **continuations, unsigned toReap) -> unsigned { + // Why reap more than there are entries in the CQ + assert(toReap <= CQE_BATCH_COUNT); + + // never reap completions on the global IoContext + assert(this != reinterpret_cast<IoContext *>(Runtime::getRuntime()->globalIo)); + + LOGD("Reaping completions for worker " << std::to_string(worker->getWorkerId())); + + if constexpr (locklessCq) { + return reapCompletionsLockless<callerEnvironment>(continuations, toReap); + } + + return reapCompletionsLocked<callerEnvironment>(continuations, toReap); + } /** * @brief Collect one fibers waiting on completed IO diff --git a/meson.build b/meson.build index ba626817dbaebef50f73c3e82624104443af324a..c8f9899436c8eabef3bf621ecd75774280497961 100644 --- a/meson.build +++ b/meson.build @@ -90,6 +90,7 @@ endif io_bool_options = [ 'stealing', + 'locked_cq', 'single_uring', 'try_syscall', 'uring_sqpoll', diff --git a/meson_options.txt b/meson_options.txt index cdf195f09e9bcc486ddae7f1588dd573fb982ce9..1c6ced87499ffe808e9c7020dd857412658c8421 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -167,3 +167,9 @@ option( description: 'Work-Stealing workers will also try to steal IO from other workers', value: false, ) +option( + 'io_locked_cq', + type: 'boolean', + description: 'Synchronize the CQs with a try lock', + value: true, +)