diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 831f15c38cd42c748836775eeaad15f82f437125..c9ec2e99cb39f9aefea6f6e7ef42c5a07bfe5cc2 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -31,6 +31,7 @@ #include "io/IoContext.hpp" // for IoContext #include "io/Stats.hpp" // for emper::io::Stats #include "lib/DebugUtil.hpp" +#include "strategies/AbstractWorkStealingScheduler.hpp" #ifdef EMPER_DEFAULT_SCHEDULING_STRATEGY_WORK_STEALING #include "strategies/ws/WsStrategyFactory.hpp" @@ -253,7 +254,16 @@ void Runtime::yield() { auto Runtime::nextFiber() -> NextFiberResult { if constexpr (emper::IO) { // Schedule all fibers waiting on completed IO - IoContext::getWorkerIo()->reapCompletions(); + std::vector<Fiber*> completions = IoContext::getWorkerIo()->reapCompletions(); + if (!completions.empty()) { + // Keep the first and schedule the rest + Fiber* next = completions[0]; + schedule(completions.begin() + 1, completions.end()); + + // TODO: hint that this fiber comes from the IO subsystem + return NextFiberResult{ + next, static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::local)}; + } } return scheduler.nextFiber(); diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 2edd7aa9c3c0c23314699f4f0c6e4b273b32050b..787c7c113c7be34767ea64289cf0a6dc2bc88cc2 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -150,6 +150,14 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { scheduler.schedule(fiber); } + template <class InputIt> + inline void schedule(InputIt begin, InputIt end) { + // Calling schedule() only works from within the EMPER runtime. + assert(inRuntime()); + + scheduler.schedule(begin, end); + } + inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); } template <class InputIt> diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index 85fb8d80dc8b67d1b586c68764991221da6d67e6..9263f1e7416c6e7c5eacbf3079c8b60bf2885b84 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -60,6 +60,15 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { scheduleInternal(fiber); } + template <class InputIt> + void schedule(InputIt begin, InputIt end) { + for (; begin != end; ++begin) { + Fiber& fiber = **begin; + LOGD("Scheduling batched fiber " << &fiber); + scheduleInternal(fiber); + } + } + virtual auto nextFiber() -> NextFiberResult = 0; void scheduleFromAnywhere(Fiber& fiber) { diff --git a/emper/io/GlobalIoContext.cpp b/emper/io/GlobalIoContext.cpp index 9c68f5a032f057337776de143c235eecbce8d5c8..9a46372841b37098faee3be2d4cee8b768fdb946 100644 --- a/emper/io/GlobalIoContext.cpp +++ b/emper/io/GlobalIoContext.cpp @@ -98,7 +98,7 @@ auto GlobalIoContext::globalCompleterFunc(void* arg) -> void* { assert(submitted == 1); - worker_io->reapCompletions<CallerEnvironment::ANYWHERE>(); + worker_io->reapAndScheduleCompletions<CallerEnvironment::ANYWHERE>(); } break; diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index 398092d6b6c19fbe9921064f18f7958860b9c387..addc76d3cd6260c92c3310550f2e7f76a6532d2a 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -11,7 +11,7 @@ #include <cassert> // for assert #include <cerrno> // for errno, ECANCELED, EBUSY, EAGAIN, EINTR #include <cstring> // for memset -#include <memory> // for allocator +#include <utility> #include <vector> #include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER, ANYWHERE @@ -19,7 +19,8 @@ #include "Debug.hpp" // for LOGD #include "Emper.hpp" // for DEBUG, IO_URING_SQPOLL #include "Fiber.hpp" -#include "Runtime.hpp" // for Runtime +#include "Runtime.hpp" +#include "emper-common.h" #include "io/Future.hpp" // for Future, operator<<, Future::State #include "io/GlobalIoContext.hpp" #include "io/Stats.hpp" // for Stats, nanoseconds @@ -35,14 +36,6 @@ namespace emper::io { enum class PointerTags : uint16_t { Future, Callback }; -static inline auto castIfCallback(TaggedPtr ptr) -> Future::Callback * { - if (ptr.getTag() == static_cast<uint16_t>(PointerTags::Callback)) { - return ptr.getPtr<Future::Callback>(); - } - - return nullptr; -} - thread_local IoContext *IoContext::workerIo = nullptr; auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> unsigned { @@ -120,7 +113,7 @@ void IoContext::submit(Future &future) { TIME_NS( { do { - reapCompletions(); + reapAndScheduleCompletions(); } while ((submitted = io_uring_submit(&ring)) == -EBUSY); }, stats.record_io_submit_full_cq); @@ -159,7 +152,7 @@ void IoContext::submit(Future &future) { // Immediate offloading can be enforced by setting the IOSQE_ASYNC flag in the sqe. // Try to reap a possible synchronous completion if we are on a worker's io_uring. if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - reapCompletions<callerEnvironment>(); + reapAndScheduleCompletions(); } } @@ -168,10 +161,28 @@ template void IoContext::submit<CallerEnvironment::EMPER>(Future &future); template void IoContext::submit<CallerEnvironment::ANYWHERE>(Future &future); template <CallerEnvironment callerEnvironment> -void IoContext::reapCompletions() { +auto IoContext::reapCompletions() -> std::vector<Fiber *> { + // vector returned containing all reaped completions + std::vector<Fiber *> continuationFibers; + + uint32_t maxRaceFreeCompleterAttempts = 1; + + // this label is not used for callerEnvironment::ANYWHERE and thus has to be + // annotated with ATTR_UNUSED +reap_cqes: + ATTR_UNUSED; + // Someone else is currently reaping completions - if (unlikely(!cq_lock.try_lock())) { - return; + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + if (unlikely(!cq_lock.try_lock())) { + LOGD("worker unsuccessful try_lock"); + return continuationFibers; + } + } else { + if (!cq_lock.try_lock_or_increment()) { + LOGD("Global completer unsuccessful try_lock_or_increment"); + return continuationFibers; + } } // never reap completions on the global IoContext @@ -182,8 +193,10 @@ void IoContext::reapCompletions() { struct io_uring_cqe *cqe; unsigned count = 0; - // vector used to batch all completions scheduled to the AnywhereQueue - std::vector<Fiber *> continuationFibers; + using Completion = std::pair<uint32_t, TaggedPtr>; + // vector to store seen cqes to make the critical section + // where cq_lock is held as small as possible + std::vector<Completion> reapedCompletions; int err = io_uring_peek_cqe(&ring, &cqe); if (err) { @@ -204,43 +217,7 @@ void IoContext::reapCompletions() { continue; } - auto *callback = castIfCallback(tptr); - if (callback) { - LOGD("Schedule new callback fiber for " << callback); - auto *callbackFiber = Fiber::from([&c = *callback, res = cqe->res] { - c(res); - delete &c; - }); - if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - runtime.schedule(*callbackFiber); - } else { - if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) { - continuationFibers.push_back(callbackFiber); - } else { - runtime.scheduleFromAnywhere(*callbackFiber); - } - } - continue; - } - - auto *future = tptr.getPtr<Future>(); - - // assert that the future was previously in the uringFutureSet - assert(uringFutureSet.erase(future) > 0); - - future->recordCompletion(stats, cqe->res); - if constexpr (callerEnvironment == EMPER) { - future->complete(cqe->res); - } else { - if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) { - Fiber *continuation = future->completeAndGetContinuation(cqe->res); - if (continuation) { - continuationFibers.push_back(continuation); - } - } else { - future->completeFromAnywhere(cqe->res); - } - } + reapedCompletions.emplace_back(cqe->res, tptr); } LOGD("got " << count << " cqes from the io_uring"); @@ -251,22 +228,86 @@ void IoContext::reapCompletions() { reqs_in_uring -= count; } +unlock: + uint32_t globalCompleterAttempts = cq_lock.unlock(); + + // A naive try lock protecting a worker's IoContext's cq is racy. + // While a worker is holding the lock additional completions could arrive + // which the worker does not observe because it could be already finished iterating. + // In the case that the worker still holds the lock preventing the globalCompleter + // from reaping the additional completions we have a lost wakeup possibly leading + // to a completely sleeping runtime with runnable completions in a worker's IoContext. + + // To prevent this race the cq_lock counts the unsuccessful tries from + // the globalCompleter. + // If a worker observes that the globalCompleter tried to reapCompletions + // more than twice we know that a lost wakeup could have occurred and we try to + // reap again. + + // In the case a lost wakeup was possible we schedule our reaped cqes + // and try again. + + // On all cq iteration after the first we expect no globalCompleterAttempt + // or in other words a single globalCompleterAttempt attempt means + // additional completions arrive and lost wakeup was possible again. + + stats.record_reaps<callerEnvironment>(count); + + for (auto &completion : reapedCompletions) { + auto res = completion.first; + auto tptr = completion.second; + + auto tag = static_cast<PointerTags>(tptr.getTag()); + switch (tag) { + case PointerTags::Callback: { + auto *callback = tptr.getPtr<Future::Callback>(); + LOGD("Create new callback fiber for " << callback); + auto *callbackFiber = Fiber::from([&c = *callback, res] { + c(res); + delete &c; + }); + + continuationFibers.push_back(callbackFiber); + } break; + + case PointerTags::Future: { + auto *future = tptr.getPtr<Future>(); + // assert that the future was previously in the uringFutureSet + assert(uringFutureSet.erase(future) > 0); + + future->recordCompletion(stats, res); + Fiber *continuation = future->completeAndGetContinuation(res); + if (continuation) { + continuationFibers.push_back(continuation); + } + } break; + + default: + DIE_MSG("Unknown pointer tag encountered: " << (int)tag); + break; + } + } + + // check if lost wakeup was possible if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - stats.record_worker_reaps(count); - } else { - // actually schedule all completion fibers - runtime.scheduleFromAnywhere(continuationFibers.begin(), continuationFibers.end()); - stats.record_completer_reaps(count); + if (unlikely(globalCompleterAttempts > maxRaceFreeCompleterAttempts)) { + // schedule all already collected continuation fibers + runtime.schedule(continuationFibers.begin(), continuationFibers.end()); + continuationFibers.clear(); + + // In all CQ iteration after the first we expect no further globalCompleter attempts + maxRaceFreeCompleterAttempts = 0; + goto reap_cqes; + } } -unlock: - cq_lock.unlock(); + return continuationFibers; } // Show the compiler our template incarnations this is needed again because // reapCompletions<CallerEnvironment::ANYWHERE> is now called from GlobalIoContext.cpp -template void IoContext::reapCompletions<CallerEnvironment::ANYWHERE>(); -template void IoContext::reapCompletions<CallerEnvironment::EMPER>(); +template auto IoContext::reapCompletions<CallerEnvironment::ANYWHERE>() -> std::vector<Fiber *>; +template auto IoContext::reapCompletions<CallerEnvironment::EMPER>() -> std::vector<Fiber *>; IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) { struct io_uring_params params; diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 64382feaea80105efd848c2b618b858e6f6ff233..100a11a30f04c3b1332b1723365cf38576bb4e83 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -9,15 +9,18 @@ #include <cstddef> // for size_t #include <cstdint> // for uint64_t #include <functional> // for less -#include <memory> // for allocator +#include <vector> #include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER +#include "Common.hpp" #include "Debug.hpp" // for LogSubsystem, LogSubsystem::IO, Logger #include "Runtime.hpp" // for Runtime #include "emper-config.h" // for EMPER_IO_WORKER_URING_ENTRIES #include "io/Stats.hpp" // for Stats -#include "lib/adt/AtomicTryLock.hpp" #include "lib/adt/LockedSet.hpp" // for LockedSet +#include "lib/sync/CountingTryLock.hpp" + +class Fiber; namespace emper::io { class Future; @@ -37,7 +40,7 @@ class IoContext : public Logger<LogSubsystem::IO> { static thread_local IoContext *workerIo; // TryLock protecting the completion queue of ring. - lib::adt::AtomicTryLock cq_lock; + ALIGN_TO_CACHE_LINE lib::sync::CountingTryLock cq_lock; struct io_uring ring; // In a worker's IoContext This eventfd is registered with the io_uring to get completion @@ -117,10 +120,27 @@ class IoContext : public Logger<LogSubsystem::IO> { template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void submit(Future &future); + /** + * @brief Collect all fibers waiting on completed IO + * + * @return A vector containing all runnable Fibers + */ + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + auto reapCompletions() -> std::vector<Fiber *>; + /** * @brief Schedule all fibers waiting on completed IO */ template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - void reapCompletions(); + void reapAndScheduleCompletions() { + auto completions = reapCompletions(); + if (!completions.empty()) { + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + runtime.schedule(completions.begin(), completions.end()); + } else { + runtime.scheduleFromAnywhere(completions.begin(), completions.end()); + } + } + } }; } // namespace emper::io diff --git a/emper/io/Stats.hpp b/emper/io/Stats.hpp index 56f0c3c7d994dd92efa826ee7355976dfaa72025..29ff2a1acc3d004fd534e2827b41a60715c4aed3 100644 --- a/emper/io/Stats.hpp +++ b/emper/io/Stats.hpp @@ -12,6 +12,7 @@ #include <map> // for map, map<>::value_compare #include <vector> // for vector +#include "CallerEnvironment.hpp" #include "Debug.hpp" // for LOGW #include "Emper.hpp" // for STATS #include "emper-common.h" // for workerid_t @@ -198,18 +199,17 @@ class Stats { io_submit_full_cq_running_mean += diff; } - inline void record_completer_reaps(unsigned count) { + template <CallerEnvironment callerEnvironment> + inline void record_reaps(unsigned count) { RETURN_IF_NO_STATS(); - completer_reap++; - completer_reaped_completions += count; - } - - inline void record_worker_reaps(unsigned count) { - RETURN_IF_NO_STATS(); - - worker_reap++; - worker_reaped_completions += count; + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + worker_reap++; + worker_reaped_completions += count; + } else { + completer_reap++; + completer_reaped_completions += count; + } } friend auto operator<<(std::ostream& os, const Stats& s) -> std::ostream&; diff --git a/emper/lib/adt/AtomicTryLock.hpp b/emper/lib/adt/AtomicTryLock.hpp deleted file mode 100644 index 222d80e70c9ddf60dad68ea99e3c20541bff867e..0000000000000000000000000000000000000000 --- a/emper/lib/adt/AtomicTryLock.hpp +++ /dev/null @@ -1,25 +0,0 @@ -// SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Fischer -#pragma once - -#include <atomic> - -#include "Common.hpp" - -namespace emper::lib::adt { - -class ALIGN_TO_CACHE_LINE AtomicTryLock { - private: - std::atomic<bool> locked; - - public: - AtomicTryLock(bool locked = false) : locked(locked) {} - - auto try_lock() -> bool { - bool previously_locked = locked.exchange(true, std::memory_order_acquire); - return !previously_locked; - } - - void unlock() { locked.store(false, std::memory_order_release); } -}; -} // namespace emper::lib::adt diff --git a/emper/lib/sync/CountingTryLock.hpp b/emper/lib/sync/CountingTryLock.hpp new file mode 100644 index 0000000000000000000000000000000000000000..cc67ec1bed0ac615b5a8a4a6c0654cdff2f1252f --- /dev/null +++ b/emper/lib/sync/CountingTryLock.hpp @@ -0,0 +1,67 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#include <atomic> +#include <climits> + +#include "Common.hpp" + +namespace emper::lib::sync { + +class CountingTryLock { + private: + // The lower 32-bit are used as the actual lock + // The higher 32-bit are used as a counter which is incremented if + // try_lock_or_increment fails to acquire the lock. + + // Layout of our lock union: + // |-------------- countingLock --------------| + // |------- counter -------| + // | lock | + // 0 sizeof(std::atomic<bool>) 64 + // 0 COUNTER_SHIFT 64 + union { + std::atomic<uint64_t> countingLock; + std::atomic<bool> lock; + }; + + static const int COUNTER_SHIFT = 32; + static_assert(sizeof(std::atomic<bool>) * CHAR_BIT < COUNTER_SHIFT); + + static const uint64_t LOCKED = 1; + static const uint64_t UNLOCKED = 0; + + public: + CountingTryLock(bool locked = false) : countingLock(locked ? LOCKED : UNLOCKED) {} + + [[nodiscard]] auto try_lock() -> bool { return !lock.exchange(true, std::memory_order_acquire); } + + [[nodiscard]] auto try_lock_or_increment() -> bool { + uint64_t oldVal, newVal; + oldVal = countingLock.load(std::memory_order_relaxed); + for (;;) { + // currently unlocked -> try to lock + if (oldVal == UNLOCKED) { + newVal = LOCKED; + // currently locked -> increment the counter + } else { + newVal = oldVal + (1L << COUNTER_SHIFT); + } + + if (countingLock.compare_exchange_weak(oldVal, newVal, std::memory_order_acquire, + std::memory_order_relaxed)) { + break; + } + } + + return oldVal == UNLOCKED; + } + + // release the lock, zero and return the counter + auto unlock() -> uint32_t { + uint64_t oldVal = countingLock.exchange(UNLOCKED, std::memory_order_release); + return static_cast<uint32_t>(oldVal >> COUNTER_SHIFT); + } +}; +} // namespace emper::lib::sync