From bb72fca31c93f9a4c59e9828a2e19a1ee22281a2 Mon Sep 17 00:00:00 2001 From: Florian Fischer Date: Fri, 26 Feb 2021 12:29:10 +0100 Subject: [PATCH] add a batch optimization for the global completer This change introduces new scheduleFromAnywhere methods which take a range of Fibers to schedule. Blockable gets a new method returning the fiber used to start the unblocked context, which is used by Future/PartialCompletableFuture to provide a way of completion and returning the continuation Fiber to the caller so they may schedule the continuation how they want. If the meson option io_batch_anywhere_completions is set the global completer will collect all callback and continuation fibers before scheduling them at once when it is done reaping the completions. The idea is that taking the AnywhereQueue write lock and calling onNewWork must only be done once. TODO: investigate if onNewWork should be extended by an amountOfWork argument which determines how many worker can be awoken and have work to do. This should be trivially since our WorkerWakeupSemaphore implementations already support notify_many(), which may be implemented in terms of notify_all though. --- emper/Blockable.hpp | 12 +++++++----- emper/Emper.hpp | 7 +++++++ emper/PrivateSemaphore.hpp | 7 +++++++ emper/Runtime.hpp | 5 +++++ emper/Scheduler.hpp | 14 ++++++++++++++ emper/io/Future.hpp | 20 ++++++++++++++++++++ emper/io/IoContext.cpp | 22 +++++++++++++++++++--- emper/lib/adt/LockedUnboundedQueue.hpp | 8 ++++++++ meson.build | 1 + meson_options.txt | 6 ++++++ 10 files changed, 94 insertions(+), 8 deletions(-) diff --git a/emper/Blockable.hpp b/emper/Blockable.hpp index 7b2378f..d0bef64 100644 --- a/emper/Blockable.hpp +++ b/emper/Blockable.hpp @@ -41,16 +41,18 @@ class Blockable : public Logger { contextManager.saveAndStartNew(std::move(freshContextHook)); } - template - void unblock(Context* context) { + auto unblockAndGetContinuation(Context* context) -> Fiber* { assert(context != nullptr); if constexpr (emper::BLOCKED_CONTEXT_SET) { blockedContexts.erase(context); } - // cppcheck-suppress unsafeClassCanLeak - Fiber* unblockFiber = - Fiber::from([this, context]() { contextManager.discardAndResume(context); }); + return Fiber::from([this, context]() { contextManager.discardAndResume(context); }); + } + + template + void unblock(Context* context) { + Fiber* unblockFiber = unblockAndGetContinuation(context); if constexpr (callerEnvironment == CallerEnvironment::EMPER) { runtime.schedule(*unblockFiber); diff --git a/emper/Emper.hpp b/emper/Emper.hpp index 8a04980..ee26517 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -97,4 +97,11 @@ static const bool IO_URING_SHARED_WQ = false #endif ; +static const bool IO_BATCH_ANYWHERE_COMPLETIONS = +#ifdef EMPER_IO_BATCH_ANYWHERE_COMPLETIONS + true +#else + false +#endif + ; } // namespace emper diff --git a/emper/PrivateSemaphore.hpp b/emper/PrivateSemaphore.hpp index a72347d..8e89d21 100644 --- a/emper/PrivateSemaphore.hpp +++ b/emper/PrivateSemaphore.hpp @@ -37,6 +37,13 @@ class PrivateSemaphore : protected Blockable { // invalid at this point. } + [[nodiscard]] auto signalAndGetContinuation() -> Fiber* { + if (Context* readyContext = signalInternal()) { + return unblockAndGetContinuation(readyContext); + } + return nullptr; + } + void signalFromAnywhere() { signal(); } void signalAndExit() { diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 6e7bed7..fbbabc9 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -158,6 +158,11 @@ class Runtime : public Logger { inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); } + template + inline void scheduleFromAnywhere(InputIt begin, InputIt end) { + scheduler.scheduleFromAnywhere(begin, end); + } + void yield(); // TODO: This should probably not be a public method of Runtime. diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index f49308e..9bcb9e4 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -42,6 +42,11 @@ class Scheduler : public Logger { void enqueueInAnywhereQueue(Fiber& fiber) { scheduleAnywhereQueue.enqueue(&fiber); } + template + void insertInAnywhereQueue(InputIt begin, InputIt end) { + scheduleAnywhereQueue.insert(begin, end); + } + auto dequeueFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); } virtual void scheduleInternal(Fiber& fiber) = 0; @@ -60,4 +65,13 @@ class Scheduler : public Logger { onNewWork(); } + + // TODO: investigate if it is still a good idea to wakeup only a single + // worker maybe we want something like onNewWork(amountOfWork) + template + void scheduleFromAnywhere(InputIt begin, InputIt end) { + insertInAnywhereQueue(begin, end); + + onNewWork(); + } }; diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp index b3c5c5c..4fbb84b 100644 --- a/emper/io/Future.hpp +++ b/emper/io/Future.hpp @@ -106,6 +106,11 @@ class Future : public Logger { sem.signalFromAnywhere(); } + virtual auto completeAndGetContinuation(int32_t res) -> Fiber* { + setCompletion(res); + return sem.signalAndGetContinuation(); + } + /** * Used for Stats::recordCompletion double dispatch */ @@ -377,6 +382,21 @@ class PartialCompletableFuture : public Future { } } + auto completeAndGetContinuation(int32_t res) -> Fiber* override { + CompletionType completion = tryComplete(res); + + switch (completion) { + case CompletionType::Resubmission: + return nullptr; + case CompletionType::IncrementalCompletion: + return Future::completeAndGetContinuation(partialCompletion); + case CompletionType::Completion: + return Future::completeAndGetContinuation(res); + default: + DIE_MSG("Unknown CompletionType: " << completion); + } + } + public: /** * @brief Block till the IO request is completed diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index bd9aeba..d853550 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -191,6 +191,10 @@ void IoContext::reapCompletions() { struct io_uring_cqe *cqe; unsigned count = 0; + // vector used to batch all completions scheduled to the AnywhereQueue + std::vector continuationFibers; + Runtime *runtime = Runtime::getRuntime(); + int err = io_uring_peek_cqe(&ring, &cqe); if (err) { if (err == -EAGAIN) { @@ -217,11 +221,14 @@ void IoContext::reapCompletions() { c(res); delete &c; }); - Runtime *runtime = Runtime::getRuntime(); if constexpr (callerEnvironment == CallerEnvironment::EMPER) { runtime->schedule(*callbackFiber); } else { - runtime->scheduleFromAnywhere(*callbackFiber); + if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) { + continuationFibers.push_back(callbackFiber); + } else { + runtime->scheduleFromAnywhere(*callbackFiber); + } } continue; } @@ -235,7 +242,14 @@ void IoContext::reapCompletions() { if constexpr (callerEnvironment == EMPER) { future->complete(cqe->res); } else { - future->completeFromAnywhere(cqe->res); + if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) { + Fiber *continuation = future->completeAndGetContinuation(cqe->res); + if (continuation) { + continuationFibers.push_back(continuation); + } + } else { + future->completeFromAnywhere(cqe->res); + } } } @@ -250,6 +264,8 @@ void IoContext::reapCompletions() { if constexpr (callerEnvironment == CallerEnvironment::EMPER) { stats.record_worker_reaps(count); } else { + // actually schedule all completion fibers + runtime->scheduleFromAnywhere(continuationFibers.begin(), continuationFibers.end()); stats.record_completer_reaps(count); } diff --git a/emper/lib/adt/LockedUnboundedQueue.hpp b/emper/lib/adt/LockedUnboundedQueue.hpp index 1d76cd7..f6ccda4 100644 --- a/emper/lib/adt/LockedUnboundedQueue.hpp +++ b/emper/lib/adt/LockedUnboundedQueue.hpp @@ -20,6 +20,14 @@ class LockedUnboundedQueue { queue.push(item); } + template + void insert(InputIt begin, InputIt end) { + std::lock_guard lock(queue_mutex); + for (; begin != end; ++begin) { + queue.push(*begin); + } + } + auto dequeue() -> I* { std::lock_guard lock(queue_mutex); if (queue.empty()) { diff --git a/meson.build b/meson.build index e5b6b9d..d549386 100644 --- a/meson.build +++ b/meson.build @@ -65,6 +65,7 @@ endif io_bool_options = [ 'uring_sqpoll', 'uring_shared_wq', + 'batch_anywhere_completions', ] io_raw_options = [ diff --git a/meson_options.txt b/meson_options.txt index 95ce837..610cbdb 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -109,3 +109,9 @@ option( value: false, description: 'Share a common async backend between all io_urings' ) +option( + 'io_batch_anywhere_completions', + type: 'boolean', + value: false, + description: 'Collect and schedule all completions reaped by the global completer at once' +) -- GitLab