From d9d350d91b8ec453cd16cbe7ebf6d1c65bae1195 Mon Sep 17 00:00:00 2001 From: Florian Fischer <florian.fischer@muhq.space> Date: Thu, 23 Sep 2021 15:29:10 +0200 Subject: [PATCH] [IoContext] implement lockless CQ reaping TODO: think about stats and possible ring buffer pointers overflow and ABA. --- .gitlab-ci.yml | 23 +++++++++ emper/Emper.hpp | 8 +++ emper/io/IoContext.cpp | 107 ++++++++++++++++++++++++++++++++++++----- emper/io/IoContext.hpp | 34 +++++++++++-- meson.build | 1 + meson_options.txt | 6 +++ 6 files changed, 164 insertions(+), 15 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index e2cb70b8..b7ef3eec 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -117,6 +117,10 @@ clang-tidy: variables: EMPER_IO_STEALING: 'true' +.emper-lockless-cq: + variables: + EMPER_IO_LOCKED_CQ: 'false' + .default-library-static: variables: EMPER_DEFAULT_LIBRARY: 'static' @@ -314,14 +318,33 @@ test-pipe-sleep-strategy-no-completer: - .emper-pipe-sleep-strategy - .emper-no-completer +test-lockless-cq: + extends: + - .test + - .emper-lockless-cq + test-io-stealing: extends: - .test - .emper-io-stealing +test-lockless-io-stealing: + extends: + - .test + - .emper-io-stealing + - .emper-lockless-cq + test-io-stealing-pipe-no-completer: extends: - .test - .emper-pipe-sleep-strategy - .emper-no-completer - .emper-io-stealing + +test-io-stealing-pipe-no-completer-lockless: + extends: + - .test + - .emper-pipe-sleep-strategy + - .emper-no-completer + - .emper-io-stealing + - .emper-lockless-cq diff --git a/emper/Emper.hpp b/emper/Emper.hpp index dbc73ca6..fba9961d 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -113,6 +113,14 @@ static const bool IO_STEALING = #endif ; +static const bool IO_LOCKED_CQ = +#ifdef EMPER_IO_LOCKED_CQ + true +#else + false +#endif + ; + static const bool IO_SINGLE_URING = #ifdef EMPER_IO_SINGLE_URING true diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index 1183f6fd..a3319071 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -7,6 +7,7 @@ #include <sys/eventfd.h> // for eventfd #include <unistd.h> // for close +#include <algorithm> #include <array> #include <atomic> // for atomic, __atomic_base #include <cassert> // for assert @@ -217,7 +218,91 @@ template void IoContext::submitAndWait<CallerEnvironment::ANYWHERE>(Future &futu unsigned wait_nr); template <CallerEnvironment callerEnvironment> -auto IoContext::reapCompletions(Fiber **continuations, size_t toReap) -> unsigned { +auto IoContext::reapCompletionsLockless(Fiber **continuations, unsigned toReap) -> unsigned { + std::array<Completion, CQE_BATCH_COUNT> reapedCompletions; + + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + stats.recordStealAttempt(); + } + + struct io_uring_cq *cq = &ring.cq; + const unsigned mask = *cq->kring_mask; + auto *atail = reinterpret_cast<std::atomic<unsigned> *>(cq->ktail); + auto *ahead = reinterpret_cast<std::atomic<unsigned> *>(cq->khead); + + // NOTE: just using head for the CAS introduces a possible ABA problem + // if the unsigned head counter overflows during the read and the CAS. + + // Load possibly concurrently used userspace written head pointer + unsigned head = ahead->load(std::memory_order_acquire); + unsigned count; + do { + // Load concurrently used kernel written tail pointer + unsigned tail = atail->load(std::memory_order_acquire); + + // NOTE: This number may already be wrong during its calculation + unsigned ready = tail - head; + + if (!ready) return 0; + + count = std::min(toReap, ready); + + for (unsigned i = 0; i < count; ++i) { + const struct io_uring_cqe *cqe = &cq->cqes[(head + i) & mask]; + void *cqe_data = io_uring_cqe_get_data(cqe); + + // Only the owner is allowed to reap new work notifications + if constexpr (callerEnvironment != CallerEnvironment::OWNER) { + const TaggedPtr tptr(cqe_data); + const auto tag = static_cast<PointerTags>(tptr.getTag()); + if (tag == PointerTags::NewWorkWsq || tag == PointerTags::NewWorkAq) { + // don't consume the new work notification + if (i == 0) return 0; + // Since i starts at 0 using i as count is correct. + // If i = 1 this means we are at the second cqe and + // count = i = 1 will consume only the first cqe. + count = i; + break; + } + } + + auto &reapedCompletion = reapedCompletions[i]; + reapedCompletion.first = cqe->res; + reapedCompletion.second = cqe_data; + } + + // try to consume those cqes we stored in reapedCompletions + } while (!ahead->compare_exchange_weak(head, head + count, std::memory_order_release, + std::memory_order_acquire)); + + LOGD("got " << count << " cqes from worker " << worker->getWorkerId() << "'s io_uring"); + + if constexpr (emper::DEBUG) { + assert(count <= reqs_in_uring); + reqs_in_uring -= count; + } + + // Reaps done by other worker threads are counted in the AbstractWorkStealingStats + // as stolenIoFibers. + stats.record_reaps<callerEnvironment>(count); + + return getContinuationsFromCompletions<callerEnvironment>(reapedCompletions.data(), count, + continuations); +} + +// Show the compiler our template incarnations +template auto IoContext::reapCompletionsLockless<CallerEnvironment::OWNER>(Fiber **continuations, + unsigned toReap) + -> unsigned; +template auto IoContext::reapCompletionsLockless<CallerEnvironment::EMPER>(Fiber **continuations, + unsigned toReap) + -> unsigned; +template auto IoContext::reapCompletionsLockless<CallerEnvironment::ANYWHERE>(Fiber **contiunations, + unsigned toReap) + -> unsigned; + +template <CallerEnvironment callerEnvironment> +auto IoContext::reapCompletionsLocked(Fiber **continuations, unsigned toReap) -> unsigned { // TODO: should only the owner possibly rereap? constexpr bool checkForRereap = callerEnvironment == CallerEnvironment::OWNER; unsigned reReapCount = 0; @@ -229,11 +314,6 @@ auto IoContext::reapCompletions(Fiber **continuations, size_t toReap) -> unsigne // where cq_lock is held as small as possible std::array<Completion, CQE_BATCH_COUNT> reapedCompletions; - // never reap completions on the global IoContext - assert(this != runtime.globalIo); - - LOGD("Reaping completions for worker " << std::to_string(worker->getWorkerId())); - // this label is not used for callerEnvironment::ANYWHERE and thus has to be // annotated with ATTR_UNUSED reap_cqes: @@ -353,12 +433,15 @@ check_for_rereap: } // Show the compiler our template incarnations -template auto IoContext::reapCompletions<CallerEnvironment::OWNER>(Fiber **continuations, - size_t toReap) -> unsigned; -template auto IoContext::reapCompletions<CallerEnvironment::EMPER>(Fiber **continuations, - size_t toReap) -> unsigned; -template auto IoContext::reapCompletions<CallerEnvironment::ANYWHERE>(Fiber **contiunations, - size_t toReap) -> unsigned; +template auto IoContext::reapCompletionsLocked<CallerEnvironment::OWNER>(Fiber **continuations, + unsigned toReap) + -> unsigned; +template auto IoContext::reapCompletionsLocked<CallerEnvironment::EMPER>(Fiber **continuations, + unsigned toReap) + -> unsigned; +template auto IoContext::reapCompletionsLocked<CallerEnvironment::ANYWHERE>(Fiber **contiunations, + unsigned toReap) + -> unsigned; IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) { struct io_uring_params params; diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 82c108bb..05fe4f8c 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -13,6 +13,7 @@ #include <iterator> #include <mutex> #include <ostream> +#include <string> #include <utility> #include <vector> @@ -52,8 +53,15 @@ class IoContext : public Logger<LogSubsystem::IO> { Runtime &runtime; static thread_local IoContext *workerIo; + + // We must synchronize the CQ if it is accessed by multiple threads. + // This is the case if we use a completer or if other workers try to steal IO. static constexpr bool needsCqLock = (emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) || (emper::IO_STEALING); + + // Are we synchronizing the CQs lockfree + static constexpr bool locklessCq = needsCqLock && !emper::IO_LOCKED_CQ; + // TryLock protecting the completion queue of ring. CACHE_LINE_EXCLUSIVE(std::mutex, cq_lock); struct io_uring ring; @@ -234,6 +242,12 @@ class IoContext : public Logger<LogSubsystem::IO> { return posInBuf; } + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapCompletionsLockless(Fiber **continuations, unsigned toReap) -> unsigned; + + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapCompletionsLocked(Fiber **continuations, unsigned toReap) -> unsigned; + public: IoContext(Runtime &runtime, size_t uring_entries); IoContext(Runtime &runtime) : IoContext(runtime, EMPER_IO_WORKER_URING_ENTRIES){}; @@ -271,7 +285,7 @@ class IoContext : public Logger<LogSubsystem::IO> { if constexpr (callerEnvironment == CallerEnvironment::EMPER) { return IoContext::getWorkerIo(); } else { - // we use a reinterpret_ cast here because GlobalIoContext is incomplete + // we use a reinterpret_cast here because GlobalIoContext is incomplete // at this point but we can't include GlobalIoContext.hpp because it // would introduce a cyclic dependency. return reinterpret_cast<IoContext *>(Runtime::getRuntime()->globalIo); @@ -364,12 +378,26 @@ class IoContext : public Logger<LogSubsystem::IO> { continuation Fibers. * Passing the buffer form the caller to the callee allows the buffer to * be statically allocated. - * @param toReap + * @param toReap length of the continuations buffer * * @return The number of continuation Fibers */ template <CallerEnvironment callerEnvironment> - [[nodiscard]] auto reapCompletions(Fiber **continuations, size_t toReap) -> unsigned; + [[nodiscard]] auto reapCompletions(Fiber **continuations, unsigned toReap) -> unsigned { + // Why reap more than there are entries in the CQ + assert(toReap <= CQE_BATCH_COUNT); + + // never reap completions on the global IoContext + assert(this != reinterpret_cast<IoContext *>(Runtime::getRuntime()->globalIo)); + + LOGD("Reaping completions for worker " << std::to_string(worker->getWorkerId())); + + if constexpr (locklessCq) { + return reapCompletionsLockless<callerEnvironment>(continuations, toReap); + } + + return reapCompletionsLocked<callerEnvironment>(continuations, toReap); + } /** * @brief Collect one fibers waiting on completed IO diff --git a/meson.build b/meson.build index ba626817..c8f98994 100644 --- a/meson.build +++ b/meson.build @@ -90,6 +90,7 @@ endif io_bool_options = [ 'stealing', + 'locked_cq', 'single_uring', 'try_syscall', 'uring_sqpoll', diff --git a/meson_options.txt b/meson_options.txt index cdf195f0..1c6ced87 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -167,3 +167,9 @@ option( description: 'Work-Stealing workers will also try to steal IO from other workers', value: false, ) +option( + 'io_locked_cq', + type: 'boolean', + description: 'Synchronize the CQs with a try lock', + value: true, +) -- GitLab