From 0abc29ade0cc9b6c2035a0d39dda7754b8cc5082 Mon Sep 17 00:00:00 2001 From: Florian Fischer <florian.fischer@muhq.space> Date: Wed, 22 Sep 2021 18:18:24 +0200 Subject: [PATCH] implement IO stealing IO stealing is analog to work-stealing and means that worker thread without work will try to steal IO completions (CQEs) from other worker's IoContexts. The work stealing algorithm is modified to check a victims CQ after findig their work queue empty. This approach in combination with future additions (global notifications on IO completions, and lock free CQE consumption) are a realistic candidate to replace the completer thread without loosing its benefits. To allow IO stealing the CQ must be synchronized which is already the case with the IoContext::cq_lock. Currently stealing workers always try to pop a single CQE (this could be configurable). Steal attempts are recorded in the IoContext's Stats object and successfully stolen IO continuations in the AbstractWorkStealingWorkerStats. I moved the code transforming CQEs into continuation Fibers from reapCompletions into a seperate function to make the rather complicated function more readable and thus easier to understand. Remove the default CallerEnvironment template arguments to make the code more explicit and prevent easy errors (not propagating the caller environment or forgetting the function takes a caller environment). io::Stats now need to use atomics because multiple thread may increment them in parallel from EMPER and the OWNER. And since using std::atomic<T*> in std::map is not easily possible we use the compiler __atomic_* builtins. Add, adjust and fix some comments. --- .gitlab-ci.yml | 16 +++ emper/Emper.hpp | 8 ++ emper/Runtime.cpp | 3 +- emper/Runtime.hpp | 1 + emper/io/IoContext.cpp | 136 ++++++++---------- emper/io/IoContext.hpp | 130 +++++++++++++++-- emper/io/Stats.cpp | 22 ++- emper/io/Stats.hpp | 27 ++-- emper/log/LogBuffer.cpp | 8 +- emper/log/LogBuffer.hpp | 2 + emper/sleep_strategy/PipeSleepStrategy.cpp | 5 +- .../AbstractWorkStealingScheduler.cpp | 13 ++ .../AbstractWorkStealingScheduler.hpp | 1 + .../strategies/AbstractWorkStealingStats.cpp | 2 + .../AbstractWorkStealingWorkerStats.cpp | 1 + .../AbstractWorkStealingWorkerStats.hpp | 1 + meson.build | 1 + meson_options.txt | 6 + 18 files changed, 269 insertions(+), 114 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 498ed530..e2cb70b8 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -113,6 +113,10 @@ clang-tidy: variables: EMPER_IO_SINGLE_URING: 'true' +.emper-io-stealing: + variables: + EMPER_IO_STEALING: 'true' + .default-library-static: variables: EMPER_DEFAULT_LIBRARY: 'static' @@ -309,3 +313,15 @@ test-pipe-sleep-strategy-no-completer: - .test - .emper-pipe-sleep-strategy - .emper-no-completer + +test-io-stealing: + extends: + - .test + - .emper-io-stealing + +test-io-stealing-pipe-no-completer: + extends: + - .test + - .emper-pipe-sleep-strategy + - .emper-no-completer + - .emper-io-stealing diff --git a/emper/Emper.hpp b/emper/Emper.hpp index 85147cee..dbc73ca6 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -105,6 +105,14 @@ static const bool IO = #endif ; +static const bool IO_STEALING = +#ifdef EMPER_IO_STEALING + true +#else + false +#endif + ; + static const bool IO_SINGLE_URING = #ifdef EMPER_IO_SINGLE_URING true diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index c9d73304..bb5e1ef8 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -340,7 +340,8 @@ auto Runtime::nextFiber() -> NextFiberResult { if constexpr (emper::IO_WORKER_URING) { // Schedule all fibers waiting on completed IO IoContext::ContinuationBuffer completions; - unsigned ncompletions = IoContext::getWorkerIo()->reapCompletions(completions); + unsigned ncompletions = + IoContext::getWorkerIo()->reapCompletions<CallerEnvironment::OWNER>(completions); if (ncompletions > 0) { // Keep the first and schedule the rest Fiber* next = completions[0]; diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 18a16649..c3467559 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -213,6 +213,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { void executeAndWait(std::function<void()> f); + friend class AbstractWorkStealingScheduler; template <LogSubsystem> friend class Blockable; friend ContextManager; diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index bc5a01cb..1183f6fd 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -22,7 +22,6 @@ #include "Common.hpp" // for unlikely, DIE_MSG_ERRNO, DIE_MSG #include "Debug.hpp" // for LOGD #include "Emper.hpp" // for DEBUG, IO_URING_SQPOLL -#include "Fiber.hpp" #include "Runtime.hpp" #include "emper-common.h" #include "io/Future.hpp" // for Future, operator<<, Future::State @@ -122,7 +121,7 @@ auto IoContext::submitPreparedSqesAndWait(unsigned wait_nr) -> unsigned { TIME_NS( { do { - reapAndScheduleCompletions(); + reapAndScheduleCompletions<callerEnvironment>(); } while ((submitted = io_uring_submit(&ring)) == -EBUSY); }, stats.record_io_submit_full_cq); @@ -218,11 +217,15 @@ template void IoContext::submitAndWait<CallerEnvironment::ANYWHERE>(Future &futu unsigned wait_nr); template <CallerEnvironment callerEnvironment> -auto IoContext::reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned { +auto IoContext::reapCompletions(Fiber **continuations, size_t toReap) -> unsigned { + // TODO: should only the owner possibly rereap? + constexpr bool checkForRereap = callerEnvironment == CallerEnvironment::OWNER; unsigned reReapCount = 0; - using Completion = std::pair<int32_t, void *>; - // vector to store seen cqes to make the critical section + // array to store cqe* needed by liburing + std::array<struct io_uring_cqe *, CQE_BATCH_COUNT> cqes; + + // array to store seen cqes to make the critical section // where cq_lock is held as small as possible std::array<Completion, CQE_BATCH_COUNT> reapedCompletions; @@ -230,24 +233,28 @@ auto IoContext::reapCompletions(ContinuationBuffer &continuationFibers) -> unsig assert(this != runtime.globalIo); LOGD("Reaping completions for worker " << std::to_string(worker->getWorkerId())); + // this label is not used for callerEnvironment::ANYWHERE and thus has to be // annotated with ATTR_UNUSED reap_cqes: ATTR_UNUSED; - std::array<struct io_uring_cqe *, CQE_BATCH_COUNT> cqes; + + // Number of actual continuation fibers resulting from the reaped CQEs + unsigned continuationsCount = 0; if constexpr (needsCqLock) { // Someone else is currently reaping completions if (unlikely(!cq_lock.try_lock())) { - LOGD("unsuccessful try_lock"); + LOGD("unsuccessful try_lock from " << callerEnvironment); return 0; } - if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { + if constexpr (callerEnvironment != CallerEnvironment::OWNER) { // We have to check the waitInflight flag with the cq_lock held to // ensure we observe an update by the worker holding the lock. // Otherwise this could happen: - // C W + // Other Owner + // | | // | lock // | prepare to sleep // check flag | @@ -255,10 +262,10 @@ reap_cqes: // | unlock // lock | - // Which results in the Completer possible consuming new work notifications. + // Which results in the Other possible consuming new work notifications. // We must not reap completions of this IoContext to not race - // with the sleeping worker. + // with the sleeping owner. if (waitInflight.load(std::memory_order_acquire)) { LOGD("Not reaping worker " << std::to_string(worker->getWorkerId()) << " since this worker is already waiting for its CQEs"); @@ -268,7 +275,22 @@ reap_cqes: } } - unsigned count = io_uring_peek_batch_cqe(&ring, cqes.data(), CQE_BATCH_COUNT); + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + stats.recordStealAttempt(); + } + + unsigned count = io_uring_peek_batch_cqe(&ring, cqes.data(), toReap); + if (!count) { + if constexpr (needsCqLock) { + cq_lock.unlock(); + } + + if constexpr (checkForRereap) { + goto check_for_rereap; + } + + return 0; + } for (unsigned i = 0; i < count; ++i) { struct io_uring_cqe *cqe = cqes[i]; @@ -292,78 +314,31 @@ reap_cqes: reqs_in_uring -= count; } + // Reaps done by other worker threads are counted in the AbstractWorkStealingStats + // as stolenIoFibers. + stats.record_reaps<callerEnvironment>(count); + + continuationsCount = getContinuationsFromCompletions<callerEnvironment>(reapedCompletions.data(), + count, continuations); + // A naive try lock protecting a worker's IoContext's cq is racy. // While a worker is holding the lock additional completions could arrive // which the worker does not observe because it could be already finished iterating. - // In the case that the worker still holds the lock preventing the globalCompleter + // In the case that the worker still holds the lock preventing others // from reaping the additional completions we have a lost wakeup possibly leading // to a completely sleeping runtime with runnable completions in a worker's IoContext. // To prevent this race we check the CQ again for new cqes after we already // dropped the CQ lock and possibly reap again. - stats.record_reaps<callerEnvironment>(count); - - unsigned posInBuf = 0; - for (unsigned i = 0; i < count; ++i) { - auto &completion = reapedCompletions[i]; - auto res = completion.first; - auto *cqe_data = completion.second; - - TaggedPtr tptr(cqe_data); - // Got a CQE for a forgotten Future. - if (!tptr) { - continue; - } - - auto tag = static_cast<PointerTags>(tptr.getTag()); - switch (tag) { - case PointerTags::NewWorkWsq: - case PointerTags::NewWorkAq: { - auto &sleepStrategy = - reinterpret_cast<PipeSleepStrategy &>(runtime.getWorkerSleepStrategy()); - sleepStrategy.onNewWorkNotification<callerEnvironment>(*this); - break; - } - - case PointerTags::Callback: { - auto *callback = tptr.getPtr<Future::CallbackInternal>(); - LOGD("Create new callback fiber for " << callback); - auto *callbackFiber = Fiber::from( - [&c = *callback, res] { - c(res); - delete &c; - }, - &callback->affinity); - - continuationFibers[posInBuf++] = callbackFiber; - } break; - - case PointerTags::Future: { - auto *future = tptr.getPtr<Future>(); - // assert that the future was previously in the uringFutureSet - assert(uringFutureSet.erase(future) > 0); - - future->recordCompletion(stats, res); - Fiber *continuation = future->completeAndGetContinuation(res); - if (continuation) { - continuationFibers[posInBuf++] = continuation; - } - } break; - - default: - DIE_MSG("Unknown pointer tag encountered: " << (int)tag); - break; - } - } - - // check if we missed new cqes - // TODO: should only the worker recheck? - if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - bool reReap = io_uring_cq_ready(&ring) != 0; + // Check if we missed new cqes. +check_for_rereap: + ATTR_UNUSED; + if constexpr (checkForRereap) { + bool reReap = cqeCount() != 0; if (reReap) { // schedule all already collected continuation fibers - runtime.schedule(continuationFibers.data(), posInBuf); + runtime.schedule(continuations, continuationsCount); reReapCount++; @@ -374,15 +349,16 @@ reap_cqes: stats.record_reReapCount(reReapCount); } - return posInBuf; + return continuationsCount; } -// Show the compiler our template incarnations this is needed again because -// reapCompletions<CallerEnvironment::ANYWHERE> is now called from GlobalIoContext.cpp -template auto IoContext::reapCompletions<CallerEnvironment::ANYWHERE>( - ContinuationBuffer &contiunationFibers) -> unsigned; -template auto IoContext::reapCompletions<CallerEnvironment::EMPER>( - ContinuationBuffer &continuationFibers) -> unsigned; +// Show the compiler our template incarnations +template auto IoContext::reapCompletions<CallerEnvironment::OWNER>(Fiber **continuations, + size_t toReap) -> unsigned; +template auto IoContext::reapCompletions<CallerEnvironment::EMPER>(Fiber **continuations, + size_t toReap) -> unsigned; +template auto IoContext::reapCompletions<CallerEnvironment::ANYWHERE>(Fiber **contiunations, + size_t toReap) -> unsigned; IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) { struct io_uring_params params; diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 95684fd1..82c108bb 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -12,26 +12,26 @@ #include <functional> // for less #include <iterator> #include <mutex> +#include <ostream> +#include <utility> #include <vector> #include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER #include "Common.hpp" #include "Debug.hpp" // for LogSubsystem, LogSubsystem::IO, Logger #include "Emper.hpp" +#include "Fiber.hpp" #include "Runtime.hpp" // for Runtime #include "Worker.hpp" #include "emper-config.h" #include "io/Future.hpp" #include "io/Stats.hpp" -#include "io/SubmitActor.hpp" // IWYU pragma: keep -#include "lib/adt/LockedSet.hpp" // for LockedSet +#include "io/SubmitActor.hpp" // IWYU pragma: keep +#include "lib/TaggedPtr.hpp" +#include "lib/adt/LockedSet.hpp" // for LockedSet +#include "sleep_strategy/PipeSleepStrategy.hpp" // IWYU pragma: keep class AbstractWorkStealingScheduler; -class Fiber; - -namespace emper::sleep_strategy { -class PipeSleepStrategy; -} namespace emper::io { class IoContext : public Logger<LogSubsystem::IO> { @@ -53,7 +53,7 @@ class IoContext : public Logger<LogSubsystem::IO> { static thread_local IoContext *workerIo; static constexpr bool needsCqLock = - emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none; + (emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) || (emper::IO_STEALING); // TryLock protecting the completion queue of ring. CACHE_LINE_EXCLUSIVE(std::mutex, cq_lock); struct io_uring ring; @@ -164,6 +164,76 @@ class IoContext : public Logger<LogSubsystem::IO> { } } + /** The for now relevant data contained in a cqe */ + using Completion = std::pair<int32_t, void *>; + + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + auto getContinuationsFromCompletions(Completion *completions, size_t count, Fiber **continuations) + -> unsigned { + unsigned posInBuf = 0; + for (unsigned i = 0; i < count; ++i) { + auto &completion = completions[i]; + auto res = completion.first; + auto *cqe_data = completion.second; + + TaggedPtr tptr(cqe_data); + // Got a CQE for a forgotten Future. + if (!tptr) { + continue; + } + + auto tag = static_cast<PointerTags>(tptr.getTag()); + switch (tag) { + case PointerTags::NewWorkWsq: + case PointerTags::NewWorkAq: { + auto &sleepStrategy = + reinterpret_cast<PipeSleepStrategy &>(runtime.getWorkerSleepStrategy()); + sleepStrategy.onNewWorkNotification<callerEnvironment>(*this); + break; + } + + case PointerTags::Callback: { + auto *callback = tptr.getPtr<Future::CallbackInternal>(); + LOGD("Create new callback fiber for " << callback); + auto *callbackFiber = Fiber::from( + [&c = *callback, res] { + c(res); + delete &c; + }, + &callback->affinity); + + continuations[posInBuf++] = callbackFiber; + } break; + + case PointerTags::Future: { + auto *future = tptr.getPtr<Future>(); + // assert that the future was previously in the uringFutureSet + assert(uringFutureSet.erase(future) > 0); + + // This is called from all three contexts and must be synchronized + // using atomics. + future->recordCompletion(stats, res); + + // It is still safe to call this parallel from all three contexts + // because possible resubmission is always done in the globalIo + // when in CallerEnvironment::ANYWHERE and the worker local + // IoContext otherwise + // NOTE: This means that IncrementalCompletableFutures now may + // move between worker IoContexts. + Fiber *continuation = future->completeAndGetContinuation(res); + if (continuation) { + continuations[posInBuf++] = continuation; + } + } break; + + default: + DIE_MSG("Unknown pointer tag encountered: " << (int)tag); + break; + } + } + return posInBuf; + } + public: IoContext(Runtime &runtime, size_t uring_entries); IoContext(Runtime &runtime) : IoContext(runtime, EMPER_IO_WORKER_URING_ENTRIES){}; @@ -279,7 +349,37 @@ class IoContext : public Logger<LogSubsystem::IO> { // offloading it to the async backend. See io_uring_enter(2). // Immediate offloading can be enforced by setting the IOSQE_ASYNC flag in the sqe. // Try to reap a possible synchronous completions. - reapAndScheduleCompletions(); + reapAndScheduleCompletions<CallerEnvironment::OWNER>(); + } + + /** + * @brief return the number of available cqes + */ + [[nodiscard]] auto cqeCount() const -> unsigned { return io_uring_cq_ready(&ring); } + + /** + * @brief Collect \p toReap fibers waiting on completed IO + * + * @param[in, out] continuations Buffer big enough to hold \p toReap possible + continuation Fibers. + * Passing the buffer form the caller to the callee allows the buffer to + * be statically allocated. + * @param toReap + * + * @return The number of continuation Fibers + */ + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapCompletions(Fiber **continuations, size_t toReap) -> unsigned; + + /** + * @brief Collect one fibers waiting on completed IO + * + * @return The fiber or nullptr + */ + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapSingleCompletion() -> Fiber * { + Fiber *fiber; + return reapCompletions<callerEnvironment>(&fiber, 1) == 1 ? fiber : nullptr; } // Should not be more than the uring_entries count. @@ -290,25 +390,27 @@ class IoContext : public Logger<LogSubsystem::IO> { * @brief Collect all fibers waiting on completed IO * * @param[in, out] continuationFibers Buffer big enough to hold all possible continuation Fibers. - * Passing the buffer fomr the caller to the calle allows the buffer to + * Passing the buffer form the caller to the callee allows the buffer to * be statically allocated. * * @return The number of continuation Fibers */ - template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - auto reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned; + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned { + return reapCompletions<callerEnvironment>(continuationFibers.data(), continuationFibers.size()); + } /** * @brief Schedule all fibers waiting on completed IO */ - template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + template <CallerEnvironment callerEnvironment> void reapAndScheduleCompletions() { ContinuationBuffer completionBuf; unsigned ncompletions = reapCompletions<callerEnvironment>(completionBuf); if (ncompletions > 0) { auto *fibers = completionBuf.data(); - if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + if constexpr (callerEnvironment != CallerEnvironment::ANYWHERE) { runtime.schedule(fibers, ncompletions); } else { runtime.scheduleFromAnywhere(fibers, ncompletions); diff --git a/emper/io/Stats.cpp b/emper/io/Stats.cpp index 627074af..6b4f854d 100644 --- a/emper/io/Stats.cpp +++ b/emper/io/Stats.cpp @@ -67,7 +67,6 @@ auto operator<<(std::ostream& os, const Stats& s) -> std::ostream& { if (futuresToPrint) { os << ss.str(); } - return os; } @@ -98,14 +97,21 @@ auto operator<<(std::ostream& os, const Stats& s) -> std::ostream& { return os; } - os << "io_uring_submit reap completion loops: " << s.io_submit_full_cq << " taking " - << s.io_submit_full_cq_running_mean << "ns on average" << std::endl; + os << "io_uring_submit-loops: " << s.io_submit_full_cq << std::endl; + os << "io_uring_submit-loops-ns: " << s.io_submit_full_cq_running_mean << std::endl; + + os << "worker-reaping: " << s.worker_reap << std::endl; + os << "worker-reaped-completions: " << s.worker_reaped_completions << std::endl; + + if constexpr (emper::IO_STEALING) { + os << "steal-attempts: " << s.steal_attempts << std::endl; + } - os << "worker reaped completions " << s.worker_reap << " times reaping " - << s.worker_reaped_completions << " completions in total" << std::endl; + if constexpr (emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) { + os << "completer-reaping: " << s.completer_reap << std::endl; + os << "completer-reaped-completions: " << s.completer_reaped_completions << std::endl; + } - os << "global completer thread reaped completions " << s.completer_reap << " times reaping " - << s.completer_reaped_completions << " completions in total" << std::endl; os << "re-reap count average [count]: " << s.reReapCount_average << std::endl << "re-reap count max: " << s.reReapCount_max << std::endl << "re-reap last 10:"; @@ -150,6 +156,8 @@ void Stats::printStats(IoContext* globalIoContext, const std::vector<IoContext*> math::calcRunningAvg(avgs.completer_reap, i, stats->completer_reap); math::calcRunningAvg(avgs.completer_reaped_completions, i, stats->completer_reaped_completions); + math::calcRunningAvg(avgs.steal_attempts, i, stats->steal_attempts); + math::calcRunningAvg(avgs.worker_reap, i, stats->worker_reap); math::calcRunningAvg(avgs.worker_reaped_completions, i, stats->worker_reaped_completions); ++i; diff --git a/emper/io/Stats.hpp b/emper/io/Stats.hpp index 8a104ce3..eb181455 100644 --- a/emper/io/Stats.hpp +++ b/emper/io/Stats.hpp @@ -35,6 +35,11 @@ namespace math = emper::lib::math; return; \ } +// Atomic increment macro needed because std::atomic<uint64_t> is not easily usable +// as map value. +#define ATOMIC_ADD_RELAXED(var, count) (__atomic_fetch_add(&var, count, __ATOMIC_RELAXED)) +#define ATOMIC_INC_RELAXED(var) (__atomic_fetch_add(&var, 1, __ATOMIC_RELAXED)) + namespace emper::io { using emper::io::Operation; @@ -123,6 +128,8 @@ class Stats { uint64_t worker_reap = 0; uint64_t worker_reaped_completions = 0; + uint64_t steal_attempts = 0; + math::RunningAverage<float> reReapCount_average; unsigned reReapCount_max = 0; boost::circular_buffer<unsigned> reReapCount_last; @@ -158,9 +165,9 @@ class Stats { // Persistent Error if (res < 0) { if (partial_completion > 0) { - operation_map[IncrementalError]++; + ATOMIC_INC_RELAXED(operation_map[IncrementalError]); } else { - operation_map[ErrorCompletion]++; + ATOMIC_INC_RELAXED(operation_map[ErrorCompletion]); } return; } @@ -180,22 +187,22 @@ class Stats { // Full completion if (!exp || (uint32_t)res == exp) { - operation_map[FullCompletion]++; + ATOMIC_INC_RELAXED(operation_map[FullCompletion]); return; } // we expect partial completion if (partial_completion != PartialCompletableFuture::DISABLE_PARTIAL_COMPLETION) { if ((size_t)(res + partial_completion) < exp) { - operation_map[PartialResubmission]++; + ATOMIC_INC_RELAXED(operation_map[PartialResubmission]); } else { - operation_map[IncrementalCompletion]++; + ATOMIC_INC_RELAXED(operation_map[IncrementalCompletion]); } return; } if ((uint32_t)res < exp) { - operation_map[PartialCompletion]++; + ATOMIC_INC_RELAXED(operation_map[PartialCompletion]); return; } @@ -204,6 +211,8 @@ class Stats { abort(); } + inline void recordStealAttempt() { ATOMIC_INC_RELAXED(steal_attempts); } + // running mean calculation taken from // https://math.stackexchange.com/questions/106700/incremental-averageing inline void record_io_submit_full_cq(nanoseconds ns) { @@ -217,10 +226,12 @@ class Stats { inline void record_reaps(unsigned count) { RETURN_IF_NO_STATS(); - if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + // CallerEnvironment EMPER is counted by the AbstractWorkStealingStats and + // io::Stats::steal_attempts. + if constexpr (callerEnvironment == CallerEnvironment::OWNER) { worker_reap++; worker_reaped_completions += count; - } else { + } else if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { completer_reap++; completer_reaped_completions += count; } diff --git a/emper/log/LogBuffer.cpp b/emper/log/LogBuffer.cpp index 790abb9b..94bd3f7b 100644 --- a/emper/log/LogBuffer.cpp +++ b/emper/log/LogBuffer.cpp @@ -35,11 +35,15 @@ LogBuffer::LogBuffer(const std::string& logFile) : logFile(logFile) { } } -LogBuffer::~LogBuffer() { - auto finalPos = static_cast<off_t>(bufPos.load(std::memory_order_relaxed)); +void LogBuffer::trim() { + const auto finalPos = static_cast<off_t>(bufPos.load(std::memory_order_relaxed)); if (ftruncate(logFd, finalPos)) { DIE_MSG_ERRNO("trimming log file " << logFile << " failed"); } +} + +LogBuffer::~LogBuffer() { + trim(); for (auto* buf : bufs) { if (munmap(buf, BUFFER_SIZE)) { diff --git a/emper/log/LogBuffer.hpp b/emper/log/LogBuffer.hpp index e60a15b7..f4149ee9 100644 --- a/emper/log/LogBuffer.hpp +++ b/emper/log/LogBuffer.hpp @@ -23,6 +23,8 @@ class LogBuffer { auto getBuf(size_t pos) -> char* { return bufs[(pos / BUFFER_SIZE) % BUFFER_COUNT]; } + void trim(); + public: LogBuffer(const std::string& logFile); ~LogBuffer(); diff --git a/emper/sleep_strategy/PipeSleepStrategy.cpp b/emper/sleep_strategy/PipeSleepStrategy.cpp index 51ba1713..f2d878ef 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.cpp +++ b/emper/sleep_strategy/PipeSleepStrategy.cpp @@ -147,8 +147,8 @@ void PipeSleepStrategy::sleep() { template <CallerEnvironment callerEnvironment> void PipeSleepStrategy::onNewWorkNotification(IoContext& io) { - if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { - DIE_MSG("Completer reaping new work notification from " << io.worker->getWorkerId() << " CQ"); + if constexpr (callerEnvironment != CallerEnvironment::OWNER) { + DIE_MSG("Others reaping new work notification from " << io.worker->getWorkerId() << " CQ"); } LOGD("Got new work notification"); @@ -159,6 +159,7 @@ void PipeSleepStrategy::onNewWorkNotification(IoContext& io) { io.waitInflight.store(false, std::memory_order_release); } +template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::OWNER>(IoContext& io); template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::EMPER>(IoContext& io); template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::ANYWHERE>(IoContext& io); } // namespace emper::sleep_strategy diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index 976baada..fa954ad6 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -6,7 +6,9 @@ #include <array> #include <cassert> #include <ostream> // for operator<<, basic_ostream<>::__ostream_type +#include <vector> +#include "CallerEnvironment.hpp" #include "Common.hpp" // for unlikely, likely #include "Debug.hpp" // for ABORT #include "Emper.hpp" // for OVERFLOW_QUEUE @@ -135,6 +137,17 @@ popTop: return std::make_pair(fiber, FiberSource::stolen); } + if constexpr (emper::IO_STEALING) { + auto* victimIo = runtime.ioContexts[victim]; + if (victimIo->cqeCount()) { + fiber = victimIo->reapSingleCompletion<CallerEnvironment::EMPER>(); + if (fiber) { + emper::statsIncr(awss::stats.nextIoFiberStolen); + return std::make_pair(fiber, FiberSource::ioStolen); + } + } + } + return std::nullopt; } diff --git a/emper/strategies/AbstractWorkStealingScheduler.hpp b/emper/strategies/AbstractWorkStealingScheduler.hpp index 5960ae2a..ade437e1 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.hpp +++ b/emper/strategies/AbstractWorkStealingScheduler.hpp @@ -37,6 +37,7 @@ class AbstractWorkStealingScheduler : public Scheduler { hintWsq, hintAq, stolen, + ioStolen, anywhereQueue, }; diff --git a/emper/strategies/AbstractWorkStealingStats.cpp b/emper/strategies/AbstractWorkStealingStats.cpp index 6b53c066..64bcaac5 100644 --- a/emper/strategies/AbstractWorkStealingStats.cpp +++ b/emper/strategies/AbstractWorkStealingStats.cpp @@ -31,6 +31,8 @@ void AbstractWorkStealingStats::print(std::ostream& out) { << std::to_string(comulatedWorkerStats.nextFiberFromHintAnywhere) << std::endl << "total-next-fiber-stolen: " << std::to_string(comulatedWorkerStats.nextFiberStolen) << std::endl + << "total-next-io-fiber-stolen: " << std::to_string(comulatedWorkerStats.nextIoFiberStolen) + << std::endl << "total-next-fiber-from-anywhere-queue: " << std::to_string(comulatedWorkerStats.nextFiberFromAnywhereQueue) << std::endl << "total-fibers-lifted-from-anywhere-queue: " diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.cpp b/emper/strategies/AbstractWorkStealingWorkerStats.cpp index 7e164d33..ceaf7b40 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.cpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.cpp @@ -13,6 +13,7 @@ auto AbstractWorkStealingWorkerStats::operator+=(const AbstractWorkStealingWorke nextFiberFromHintLocal += other.nextFiberFromHintLocal; nextFiberFromHintAnywhere += other.nextFiberFromHintAnywhere; nextFiberStolen += other.nextFiberStolen; + nextIoFiberStolen += other.nextIoFiberStolen; nextFiberFromAnywhereQueue += other.nextFiberFromAnywhereQueue; totalFibersLiftedFromAnywhereQueue += other.totalFibersLiftedFromAnywhereQueue; diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.hpp b/emper/strategies/AbstractWorkStealingWorkerStats.hpp index 684b1d59..57052eec 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.hpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.hpp @@ -20,6 +20,7 @@ class AbstractWorkStealingWorkerStats { uint64_t nextFiberFromHintLocal = 0; uint64_t nextFiberFromHintAnywhere = 0; uint64_t nextFiberStolen = 0; + uint64_t nextIoFiberStolen = 0; uint64_t nextFiberFromAnywhereQueue = 0; uint64_t totalFibersLiftedFromAnywhereQueue = 0; diff --git a/meson.build b/meson.build index ce0f95be..ba626817 100644 --- a/meson.build +++ b/meson.build @@ -89,6 +89,7 @@ if option_io endif io_bool_options = [ + 'stealing', 'single_uring', 'try_syscall', 'uring_sqpoll', diff --git a/meson_options.txt b/meson_options.txt index 8f8d93e7..cdf195f0 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -161,3 +161,9 @@ option( choices: ['unchanged', 'normal', 'nice_10', 'nice_19', 'idle'], value: 'unchanged', ) +option( + 'io_stealing', + type: 'boolean', + description: 'Work-Stealing workers will also try to steal IO from other workers', + value: false, +) -- GitLab