diff --git a/emper/Blockable.hpp b/emper/Blockable.hpp index 7b2378f6508e7120588e9160cc73dc257c95af72..d0bef641b188eeb888e7752fe60651f83167317c 100644 --- a/emper/Blockable.hpp +++ b/emper/Blockable.hpp @@ -41,16 +41,18 @@ class Blockable : public Logger<logSubsystem> { contextManager.saveAndStartNew(std::move(freshContextHook)); } - template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - void unblock(Context* context) { + auto unblockAndGetContinuation(Context* context) -> Fiber* { assert(context != nullptr); if constexpr (emper::BLOCKED_CONTEXT_SET) { blockedContexts.erase(context); } - // cppcheck-suppress unsafeClassCanLeak - Fiber* unblockFiber = - Fiber::from([this, context]() { contextManager.discardAndResume(context); }); + return Fiber::from([this, context]() { contextManager.discardAndResume(context); }); + } + + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + void unblock(Context* context) { + Fiber* unblockFiber = unblockAndGetContinuation(context); if constexpr (callerEnvironment == CallerEnvironment::EMPER) { runtime.schedule(*unblockFiber); diff --git a/emper/Emper.hpp b/emper/Emper.hpp index 8a04980038cc238138f9c85182113a9295605bad..ee265179dfb3604d7834ee95c1c94ddf5e6285e5 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -97,4 +97,11 @@ static const bool IO_URING_SHARED_WQ = false #endif ; +static const bool IO_BATCH_ANYWHERE_COMPLETIONS = +#ifdef EMPER_IO_BATCH_ANYWHERE_COMPLETIONS + true +#else + false +#endif + ; } // namespace emper diff --git a/emper/PrivateSemaphore.hpp b/emper/PrivateSemaphore.hpp index a72347d65958ab9f328301ce7cc98fd4e08d7447..8e89d21d5d850732ff1f88fce1b6847a18cd9895 100644 --- a/emper/PrivateSemaphore.hpp +++ b/emper/PrivateSemaphore.hpp @@ -37,6 +37,13 @@ class PrivateSemaphore : protected Blockable<LogSubsystem::PS> { // invalid at this point. } + [[nodiscard]] auto signalAndGetContinuation() -> Fiber* { + if (Context* readyContext = signalInternal()) { + return unblockAndGetContinuation(readyContext); + } + return nullptr; + } + void signalFromAnywhere() { signal<CallerEnvironment::ANYWHERE>(); } void signalAndExit() { diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 6e7bed73ab8151a5452545f07b048c3cfff062dc..fbbabc9771fdde55f99cf0de6d13475f99749310 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -158,6 +158,11 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); } + template <class InputIt> + inline void scheduleFromAnywhere(InputIt begin, InputIt end) { + scheduler.scheduleFromAnywhere(begin, end); + } + void yield(); // TODO: This should probably not be a public method of Runtime. diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index f49308eda4118e14f1c7147103afe05fb8ed7d39..9bcb9e4d6663852329f47948e6527aa4f4c4127b 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -42,6 +42,11 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { void enqueueInAnywhereQueue(Fiber& fiber) { scheduleAnywhereQueue.enqueue(&fiber); } + template <class InputIt> + void insertInAnywhereQueue(InputIt begin, InputIt end) { + scheduleAnywhereQueue.insert(begin, end); + } + auto dequeueFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); } virtual void scheduleInternal(Fiber& fiber) = 0; @@ -60,4 +65,13 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { onNewWork<CallerEnvironment::ANYWHERE>(); } + + // TODO: investigate if it is still a good idea to wakeup only a single + // worker maybe we want something like onNewWork(amountOfWork) + template <class InputIt> + void scheduleFromAnywhere(InputIt begin, InputIt end) { + insertInAnywhereQueue(begin, end); + + onNewWork<CallerEnvironment::ANYWHERE>(); + } }; diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp index b3c5c5ccc5d6937c0740488228b45a6ecb7b7660..a346253d0d527dd80ec7eeb7e6157f8ca3705d0d 100644 --- a/emper/io/Future.hpp +++ b/emper/io/Future.hpp @@ -22,6 +22,7 @@ #include "io/Operation.hpp" // for Operation, operator<<, Operati... struct io_uring_sqe; +class Fiber; namespace emper::io { class Stats; @@ -106,6 +107,11 @@ class Future : public Logger<LogSubsystem::IO> { sem.signalFromAnywhere(); } + virtual auto completeAndGetContinuation(int32_t res) -> Fiber* { + setCompletion(res); + return sem.signalAndGetContinuation(); + } + /** * Used for Stats::recordCompletion double dispatch */ @@ -377,6 +383,21 @@ class PartialCompletableFuture : public Future { } } + auto completeAndGetContinuation(int32_t res) -> Fiber* override { + CompletionType completion = tryComplete<CallerEnvironment::ANYWHERE>(res); + + switch (completion) { + case CompletionType::Resubmission: + return nullptr; + case CompletionType::IncrementalCompletion: + return Future::completeAndGetContinuation(partialCompletion); + case CompletionType::Completion: + return Future::completeAndGetContinuation(res); + default: + DIE_MSG("Unknown CompletionType: " << (int)completion); + } + } + public: /** * @brief Block till the IO request is completed diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index bd9aebaf39e8a16abc797c79708941d4fbd86705..0f52ef84dbaefa038e799c0fc7d59345f33f48df 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -13,6 +13,7 @@ #include <cstdio> // for perror #include <cstring> // for memset #include <memory> // for allocator +#include <vector> #include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER, ANYWHERE #include "Common.hpp" // for unlikely, DIE_MSG_ERRNO, DIE_MSG @@ -191,6 +192,10 @@ void IoContext::reapCompletions() { struct io_uring_cqe *cqe; unsigned count = 0; + // vector used to batch all completions scheduled to the AnywhereQueue + std::vector<Fiber *> continuationFibers; + Runtime *runtime = Runtime::getRuntime(); + int err = io_uring_peek_cqe(&ring, &cqe); if (err) { if (err == -EAGAIN) { @@ -217,11 +222,14 @@ void IoContext::reapCompletions() { c(res); delete &c; }); - Runtime *runtime = Runtime::getRuntime(); if constexpr (callerEnvironment == CallerEnvironment::EMPER) { runtime->schedule(*callbackFiber); } else { - runtime->scheduleFromAnywhere(*callbackFiber); + if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) { + continuationFibers.push_back(callbackFiber); + } else { + runtime->scheduleFromAnywhere(*callbackFiber); + } } continue; } @@ -235,7 +243,14 @@ void IoContext::reapCompletions() { if constexpr (callerEnvironment == EMPER) { future->complete(cqe->res); } else { - future->completeFromAnywhere(cqe->res); + if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) { + Fiber *continuation = future->completeAndGetContinuation(cqe->res); + if (continuation) { + continuationFibers.push_back(continuation); + } + } else { + future->completeFromAnywhere(cqe->res); + } } } @@ -250,6 +265,8 @@ void IoContext::reapCompletions() { if constexpr (callerEnvironment == CallerEnvironment::EMPER) { stats.record_worker_reaps(count); } else { + // actually schedule all completion fibers + runtime->scheduleFromAnywhere(continuationFibers.begin(), continuationFibers.end()); stats.record_completer_reaps(count); } diff --git a/emper/lib/adt/LockedUnboundedQueue.hpp b/emper/lib/adt/LockedUnboundedQueue.hpp index 1d76cd742c1631edaceea5e9c1f3edc4b2f17fdb..f6ccda487c3e28e5ec7b18bd53e1ebfd7fd28cd0 100644 --- a/emper/lib/adt/LockedUnboundedQueue.hpp +++ b/emper/lib/adt/LockedUnboundedQueue.hpp @@ -20,6 +20,14 @@ class LockedUnboundedQueue { queue.push(item); } + template <class InputIt> + void insert(InputIt begin, InputIt end) { + std::lock_guard<std::mutex> lock(queue_mutex); + for (; begin != end; ++begin) { + queue.push(*begin); + } + } + auto dequeue() -> I* { std::lock_guard<std::mutex> lock(queue_mutex); if (queue.empty()) { diff --git a/meson.build b/meson.build index e5b6b9d0378882e12053ff6346a6058c25908c1a..d5493868e9ec9f37832dc760d33c03887b87f8b2 100644 --- a/meson.build +++ b/meson.build @@ -65,6 +65,7 @@ endif io_bool_options = [ 'uring_sqpoll', 'uring_shared_wq', + 'batch_anywhere_completions', ] io_raw_options = [ diff --git a/meson_options.txt b/meson_options.txt index 95ce83775779e98a1b30d8aa22024ef29d534fa1..610cbdbb99e6fcc7617a4bd4d8d0ba0033958122 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -109,3 +109,9 @@ option( value: false, description: 'Share a common async backend between all io_urings' ) +option( + 'io_batch_anywhere_completions', + type: 'boolean', + value: false, + description: 'Collect and schedule all completions reaped by the global completer at once' +)