diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 00e6f3e56ea3f9893248131dc90bbea2f2ec7e17..8798bc84e5336a2c43496b90f2feac2f2343c3c5 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -271,11 +271,12 @@ void Runtime::yield() { auto Runtime::nextFiber() -> NextFiberResult { if constexpr (emper::IO) { // Schedule all fibers waiting on completed IO - std::vector<Fiber*> completions = IoContext::getWorkerIo()->reapCompletions(); - if (!completions.empty()) { + IoContext::ContinuationBuffer completions; + unsigned ncompletions = IoContext::getWorkerIo()->reapCompletions(completions); + if (ncompletions > 0) { // Keep the first and schedule the rest Fiber* next = completions[0]; - schedule(completions.begin() + 1, completions.end()); + schedule(completions.begin() + 1, completions.begin() + ncompletions); // TODO: hint that this fiber comes from the IO subsystem return NextFiberResult{ diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index 8d12e712835d854d223601368e6c8067d63e42ef..eebd741f432a977c581c97a55b57db914e698c6d 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -179,7 +179,7 @@ template void IoContext::submit<CallerEnvironment::EMPER>(Future &future); template void IoContext::submit<CallerEnvironment::ANYWHERE>(Future &future); template <CallerEnvironment callerEnvironment> -auto IoContext::reapCompletions() -> std::vector<Fiber *> { +auto IoContext::reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned { // Should not be more than the uring_entries count. const unsigned CQE_BATCH_COUNT = EMPER_IO_WORKER_URING_ENTRIES; @@ -191,11 +191,6 @@ auto IoContext::reapCompletions() -> std::vector<Fiber *> { // where cq_lock is held as small as possible std::array<Completion, CQE_BATCH_COUNT> reapedCompletions; - // Vector returned containing all reaped completions in form of fibers. - std::vector<Fiber *> continuationFibers; - // We reserve capacity in contiunationFibers as soon as we know the - // amount of reaped completions. - // this label is not used for callerEnvironment::ANYWHERE and thus has to be // annotated with ATTR_UNUSED reap_cqes: @@ -211,12 +206,12 @@ reap_cqes: if constexpr (callerEnvironment == CallerEnvironment::EMPER) { if (unlikely(!cq_lock.try_lock())) { LOGD("worker unsuccessful try_lock"); - return continuationFibers; + return 0; } } else { if (!cq_lock.try_lock_or_increment()) { LOGD("Global completer unsuccessful try_lock_or_increment"); - return continuationFibers; + return 0; } } @@ -264,8 +259,7 @@ reap_cqes: stats.record_reaps<callerEnvironment>(count); - continuationFibers.reserve(count); - + unsigned posInBuf = 0; for (unsigned i = 0; i < count; ++i) { auto &completion = reapedCompletions[i]; auto res = completion.first; @@ -287,7 +281,7 @@ reap_cqes: delete &c; }); - continuationFibers.push_back(callbackFiber); + continuationFibers[posInBuf++] = callbackFiber; } break; case PointerTags::Future: { @@ -298,7 +292,7 @@ reap_cqes: future->recordCompletion(stats, res); Fiber *continuation = future->completeAndGetContinuation(res); if (continuation) { - continuationFibers.push_back(continuation); + continuationFibers[posInBuf++] = continuation; } } break; @@ -324,8 +318,7 @@ reap_cqes: if (reReap) { // schedule all already collected continuation fibers - runtime.schedule(continuationFibers.begin(), continuationFibers.end()); - continuationFibers.clear(); + runtime.schedule(continuationFibers.begin(), continuationFibers.begin() + posInBuf); reReapCount++; @@ -335,13 +328,15 @@ reap_cqes: stats.record_reReapCount(reReapCount); } - return continuationFibers; + return posInBuf; } // Show the compiler our template incarnations this is needed again because // reapCompletions<CallerEnvironment::ANYWHERE> is now called from GlobalIoContext.cpp -template auto IoContext::reapCompletions<CallerEnvironment::ANYWHERE>() -> std::vector<Fiber *>; -template auto IoContext::reapCompletions<CallerEnvironment::EMPER>() -> std::vector<Fiber *>; +template auto IoContext::reapCompletions<CallerEnvironment::ANYWHERE>( + ContinuationBuffer &contiunationFibers) -> unsigned; +template auto IoContext::reapCompletions<CallerEnvironment::EMPER>( + ContinuationBuffer &continuationFibers) -> unsigned; IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) { struct io_uring_params params; diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index af07b659dfb64c1963b58d5cef51b7125f88e3d5..31805c34e814c3d2f55ff3e18f5cdb4e2c9ea081 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -4,6 +4,7 @@ #include <liburing.h> // for io_uring +#include <array> #include <atomic> // for atomic #include <cassert> // for assert #include <cstddef> // for size_t @@ -191,25 +192,36 @@ class IoContext : public Logger<LogSubsystem::IO> { reapAndScheduleCompletions(); } + static const unsigned CQE_BATCH_COUNT = EMPER_IO_WORKER_URING_ENTRIES; + using ContinuationBuffer = std::array<Fiber *, CQE_BATCH_COUNT>; + /** * @brief Collect all fibers waiting on completed IO * - * @return A vector containing all runnable Fibers + * @param[in, out] continuationFibers Buffer big enough to hold all possible continuation Fibers. + * Passing the buffer fomr the caller to the calle allows the buffer to + * be statically allocated. + * + * @return The number of continuation Fibers */ template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - auto reapCompletions() -> std::vector<Fiber *>; + auto reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned; /** * @brief Schedule all fibers waiting on completed IO */ template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void reapAndScheduleCompletions() { - auto completions = reapCompletions<callerEnvironment>(); - if (!completions.empty()) { + ContinuationBuffer completionBuf; + unsigned ncompletions = reapCompletions<callerEnvironment>(completionBuf); + if (ncompletions > 0) { + auto *begin = completionBuf.begin(); + auto *finish = completionBuf.begin() + ncompletions; + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - runtime.schedule(completions.begin(), completions.end()); + runtime.schedule(begin, finish); } else { - runtime.scheduleFromAnywhere(completions.begin(), completions.end()); + runtime.scheduleFromAnywhere(begin, finish); } } }