diff --git a/emper/Blockable.hpp b/emper/Blockable.hpp index 7b2378f6508e7120588e9160cc73dc257c95af72..d0bef641b188eeb888e7752fe60651f83167317c 100644 --- a/emper/Blockable.hpp +++ b/emper/Blockable.hpp @@ -41,16 +41,18 @@ class Blockable : public Logger { contextManager.saveAndStartNew(std::move(freshContextHook)); } - template - void unblock(Context* context) { + auto unblockAndGetContinuation(Context* context) -> Fiber* { assert(context != nullptr); if constexpr (emper::BLOCKED_CONTEXT_SET) { blockedContexts.erase(context); } - // cppcheck-suppress unsafeClassCanLeak - Fiber* unblockFiber = - Fiber::from([this, context]() { contextManager.discardAndResume(context); }); + return Fiber::from([this, context]() { contextManager.discardAndResume(context); }); + } + + template + void unblock(Context* context) { + Fiber* unblockFiber = unblockAndGetContinuation(context); if constexpr (callerEnvironment == CallerEnvironment::EMPER) { runtime.schedule(*unblockFiber); diff --git a/emper/Emper.hpp b/emper/Emper.hpp index 8a04980038cc238138f9c85182113a9295605bad..ee265179dfb3604d7834ee95c1c94ddf5e6285e5 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -97,4 +97,11 @@ static const bool IO_URING_SHARED_WQ = false #endif ; +static const bool IO_BATCH_ANYWHERE_COMPLETIONS = +#ifdef EMPER_IO_BATCH_ANYWHERE_COMPLETIONS + true +#else + false +#endif + ; } // namespace emper diff --git a/emper/PrivateSemaphore.hpp b/emper/PrivateSemaphore.hpp index a72347d65958ab9f328301ce7cc98fd4e08d7447..8e89d21d5d850732ff1f88fce1b6847a18cd9895 100644 --- a/emper/PrivateSemaphore.hpp +++ b/emper/PrivateSemaphore.hpp @@ -37,6 +37,13 @@ class PrivateSemaphore : protected Blockable { // invalid at this point. } + [[nodiscard]] auto signalAndGetContinuation() -> Fiber* { + if (Context* readyContext = signalInternal()) { + return unblockAndGetContinuation(readyContext); + } + return nullptr; + } + void signalFromAnywhere() { signal(); } void signalAndExit() { diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 6e7bed73ab8151a5452545f07b048c3cfff062dc..fbbabc9771fdde55f99cf0de6d13475f99749310 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -158,6 +158,11 @@ class Runtime : public Logger { inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); } + template + inline void scheduleFromAnywhere(InputIt begin, InputIt end) { + scheduler.scheduleFromAnywhere(begin, end); + } + void yield(); // TODO: This should probably not be a public method of Runtime. diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index f49308eda4118e14f1c7147103afe05fb8ed7d39..9bcb9e4d6663852329f47948e6527aa4f4c4127b 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -42,6 +42,11 @@ class Scheduler : public Logger { void enqueueInAnywhereQueue(Fiber& fiber) { scheduleAnywhereQueue.enqueue(&fiber); } + template + void insertInAnywhereQueue(InputIt begin, InputIt end) { + scheduleAnywhereQueue.insert(begin, end); + } + auto dequeueFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); } virtual void scheduleInternal(Fiber& fiber) = 0; @@ -60,4 +65,13 @@ class Scheduler : public Logger { onNewWork(); } + + // TODO: investigate if it is still a good idea to wakeup only a single + // worker maybe we want something like onNewWork(amountOfWork) + template + void scheduleFromAnywhere(InputIt begin, InputIt end) { + insertInAnywhereQueue(begin, end); + + onNewWork(); + } }; diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp index b3c5c5ccc5d6937c0740488228b45a6ecb7b7660..4fbb84b3610d310426f4c83871334fb0c418a2a3 100644 --- a/emper/io/Future.hpp +++ b/emper/io/Future.hpp @@ -106,6 +106,11 @@ class Future : public Logger { sem.signalFromAnywhere(); } + virtual auto completeAndGetContinuation(int32_t res) -> Fiber* { + setCompletion(res); + return sem.signalAndGetContinuation(); + } + /** * Used for Stats::recordCompletion double dispatch */ @@ -377,6 +382,21 @@ class PartialCompletableFuture : public Future { } } + auto completeAndGetContinuation(int32_t res) -> Fiber* override { + CompletionType completion = tryComplete(res); + + switch (completion) { + case CompletionType::Resubmission: + return nullptr; + case CompletionType::IncrementalCompletion: + return Future::completeAndGetContinuation(partialCompletion); + case CompletionType::Completion: + return Future::completeAndGetContinuation(res); + default: + DIE_MSG("Unknown CompletionType: " << completion); + } + } + public: /** * @brief Block till the IO request is completed diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index bd9aebaf39e8a16abc797c79708941d4fbd86705..d85355060b259a76fcf4c86900da92ea640ba990 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -191,6 +191,10 @@ void IoContext::reapCompletions() { struct io_uring_cqe *cqe; unsigned count = 0; + // vector used to batch all completions scheduled to the AnywhereQueue + std::vector continuationFibers; + Runtime *runtime = Runtime::getRuntime(); + int err = io_uring_peek_cqe(&ring, &cqe); if (err) { if (err == -EAGAIN) { @@ -217,11 +221,14 @@ void IoContext::reapCompletions() { c(res); delete &c; }); - Runtime *runtime = Runtime::getRuntime(); if constexpr (callerEnvironment == CallerEnvironment::EMPER) { runtime->schedule(*callbackFiber); } else { - runtime->scheduleFromAnywhere(*callbackFiber); + if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) { + continuationFibers.push_back(callbackFiber); + } else { + runtime->scheduleFromAnywhere(*callbackFiber); + } } continue; } @@ -235,7 +242,14 @@ void IoContext::reapCompletions() { if constexpr (callerEnvironment == EMPER) { future->complete(cqe->res); } else { - future->completeFromAnywhere(cqe->res); + if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) { + Fiber *continuation = future->completeAndGetContinuation(cqe->res); + if (continuation) { + continuationFibers.push_back(continuation); + } + } else { + future->completeFromAnywhere(cqe->res); + } } } @@ -250,6 +264,8 @@ void IoContext::reapCompletions() { if constexpr (callerEnvironment == CallerEnvironment::EMPER) { stats.record_worker_reaps(count); } else { + // actually schedule all completion fibers + runtime->scheduleFromAnywhere(continuationFibers.begin(), continuationFibers.end()); stats.record_completer_reaps(count); } diff --git a/emper/lib/adt/LockedUnboundedQueue.hpp b/emper/lib/adt/LockedUnboundedQueue.hpp index 1d76cd742c1631edaceea5e9c1f3edc4b2f17fdb..f6ccda487c3e28e5ec7b18bd53e1ebfd7fd28cd0 100644 --- a/emper/lib/adt/LockedUnboundedQueue.hpp +++ b/emper/lib/adt/LockedUnboundedQueue.hpp @@ -20,6 +20,14 @@ class LockedUnboundedQueue { queue.push(item); } + template + void insert(InputIt begin, InputIt end) { + std::lock_guard lock(queue_mutex); + for (; begin != end; ++begin) { + queue.push(*begin); + } + } + auto dequeue() -> I* { std::lock_guard lock(queue_mutex); if (queue.empty()) { diff --git a/meson.build b/meson.build index e5b6b9d0378882e12053ff6346a6058c25908c1a..d5493868e9ec9f37832dc760d33c03887b87f8b2 100644 --- a/meson.build +++ b/meson.build @@ -65,6 +65,7 @@ endif io_bool_options = [ 'uring_sqpoll', 'uring_shared_wq', + 'batch_anywhere_completions', ] io_raw_options = [ diff --git a/meson_options.txt b/meson_options.txt index 95ce83775779e98a1b30d8aa22024ef29d534fa1..610cbdbb99e6fcc7617a4bd4d8d0ba0033958122 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -109,3 +109,9 @@ option( value: false, description: 'Share a common async backend between all io_urings' ) +option( + 'io_batch_anywhere_completions', + type: 'boolean', + value: false, + description: 'Collect and schedule all completions reaped by the global completer at once' +)