diff --git a/.gitignore b/.gitignore index f9d14aec43b9eb9df2417464f4ad22d2951430ed..be4498e268312665d6b94b5a0a53ce8097d69c85 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,8 @@ /.cache/ /.clangd/ +tools/gdb/__pycache__/ + subprojects/packagecache/ subprojects/googletest* subprojects/liburing* diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 498ed53086ee41a1e04d55113b23c53ad459e167..7447f5a647b19f60a0d1a03a307f1efa89e1ad8e 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -113,6 +113,14 @@ clang-tidy: variables: EMPER_IO_SINGLE_URING: 'true' +.emper-io-stealing: + variables: + EMPER_IO_STEALING: 'true' + +.emper-lockless-cq: + variables: + EMPER_IO_LOCKLESS_CQ: 'true' + .default-library-static: variables: EMPER_DEFAULT_LIBRARY: 'static' @@ -309,3 +317,34 @@ test-pipe-sleep-strategy-no-completer: - .test - .emper-pipe-sleep-strategy - .emper-no-completer + +test-lockless-cq: + extends: + - .test + - .emper-lockless-cq + +test-io-stealing: + extends: + - .test + - .emper-io-stealing + +test-lockless-io-stealing: + extends: + - .test + - .emper-io-stealing + - .emper-lockless-cq + +test-io-stealing-pipe-no-completer: + extends: + - .test + - .emper-pipe-sleep-strategy + - .emper-no-completer + - .emper-io-stealing + +test-io-stealing-pipe-no-completer-lockless: + extends: + - .test + - .emper-pipe-sleep-strategy + - .emper-no-completer + - .emper-io-stealing + - .emper-lockless-cq diff --git a/emper/CallerEnvironment.cpp b/emper/CallerEnvironment.cpp new file mode 100644 index 0000000000000000000000000000000000000000..559b7cbcb781ae5facd0c71b532309c80a727a16 --- /dev/null +++ b/emper/CallerEnvironment.cpp @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "CallerEnvironment.hpp" + +#include <iostream> + +#include "Common.hpp" + +auto operator<<(std::ostream& os, const CallerEnvironment& callerEnvironment) -> std::ostream& { + switch (callerEnvironment) { + case OWNER: + return os << "OWNER"; + case EMPER: + return os << "EMPER"; + case ANYWHERE: + return os << "ANYWHERE"; + default: + DIE_MSG("Unknown CallerEnvironment"); + } +} diff --git a/emper/CallerEnvironment.hpp b/emper/CallerEnvironment.hpp index b06fb4198e0a97a1f46fecc54e21e4e8a48d89d3..8ba325f1a091c3953da998c08617358a0dff831b 100644 --- a/emper/CallerEnvironment.hpp +++ b/emper/CallerEnvironment.hpp @@ -1,8 +1,14 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus, Florian Fischer #pragma once +#include <iostream> + +/*! Enum representing the different environments where code can be executed */ enum CallerEnvironment { - EMPER, - ANYWHERE, + OWNER, /*!< indicate code executed by the worker owning the object */ + EMPER, /*!< indicate code executed by any worker */ + ANYWHERE, /*!< indicate code executed outside of any worker */ }; + +auto operator<<(std::ostream& os, const CallerEnvironment& callerEnvironment) -> std::ostream&; diff --git a/emper/Emper.hpp b/emper/Emper.hpp index 85147ceef1bc670aeb1aae91aed38c4958d2d8a6..34b0459e41d0d139069a4141beeb7ddcf427bca6 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -105,6 +105,30 @@ static const bool IO = #endif ; +static const bool IO_STEALING = +#ifdef EMPER_IO_STEALING + true +#else + false +#endif + ; + +static const bool IO_LOCKLESS_CQ = +#ifdef EMPER_IO_LOCKLESS_CQ + true +#else + false +#endif + ; + +enum class IoLocklessMemoryOrder { + weak, + strong, +}; + +static const enum IoLocklessMemoryOrder IO_LOCKLESS_MEMORY_ORDER = + IoLocklessMemoryOrder::EMPER_IO_LOCKLESS_MEMORY_ORDER; + static const bool IO_SINGLE_URING = #ifdef EMPER_IO_SINGLE_URING true diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index c9d73304bb1b305d9aeaed4433d2145e23e099ce..bb5e1ef835d075194ed832e2ef663bac30103b7e 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -340,7 +340,8 @@ auto Runtime::nextFiber() -> NextFiberResult { if constexpr (emper::IO_WORKER_URING) { // Schedule all fibers waiting on completed IO IoContext::ContinuationBuffer completions; - unsigned ncompletions = IoContext::getWorkerIo()->reapCompletions(completions); + unsigned ncompletions = + IoContext::getWorkerIo()->reapCompletions<CallerEnvironment::OWNER>(completions); if (ncompletions > 0) { // Keep the first and schedule the rest Fiber* next = completions[0]; diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 18a16649759adb38e21071bea2f504cae6eda21f..c3467559ac55ea41dcc3cfa3166c0b394a1078d3 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -213,6 +213,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { void executeAndWait(std::function<void()> f); + friend class AbstractWorkStealingScheduler; template <LogSubsystem> friend class Blockable; friend ContextManager; diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index bc5a01cb94a5b9a43c1b1366509f4aa59db0fb5a..eeaa87eff08c599a528066a954c0261c74961b31 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -7,6 +7,7 @@ #include <sys/eventfd.h> // for eventfd #include <unistd.h> // for close +#include <algorithm> #include <array> #include <atomic> // for atomic, __atomic_base #include <cassert> // for assert @@ -22,7 +23,6 @@ #include "Common.hpp" // for unlikely, DIE_MSG_ERRNO, DIE_MSG #include "Debug.hpp" // for LOGD #include "Emper.hpp" // for DEBUG, IO_URING_SQPOLL -#include "Fiber.hpp" #include "Runtime.hpp" #include "emper-common.h" #include "io/Future.hpp" // for Future, operator<<, Future::State @@ -122,7 +122,7 @@ auto IoContext::submitPreparedSqesAndWait(unsigned wait_nr) -> unsigned { TIME_NS( { do { - reapAndScheduleCompletions(); + reapAndScheduleCompletions<callerEnvironment>(); } while ((submitted = io_uring_submit(&ring)) == -EBUSY); }, stats.record_io_submit_full_cq); @@ -218,36 +218,155 @@ template void IoContext::submitAndWait<CallerEnvironment::ANYWHERE>(Future &futu unsigned wait_nr); template <CallerEnvironment callerEnvironment> -auto IoContext::reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned { +auto IoContext::reapCompletionsLockless(Fiber **continuations, unsigned toReap) -> unsigned { + // Configurable memory order for the atomic operations + constexpr auto LL_READ_MEM_ORDER = + emper::IO_LOCKLESS_MEMORY_ORDER == emper::IoLocklessMemoryOrder::weak + ? std::memory_order_acquire + : std::memory_order_seq_cst; + constexpr auto LL_WRITE_MEM_ORDER = + emper::IO_LOCKLESS_MEMORY_ORDER == emper::IoLocklessMemoryOrder::weak + ? std::memory_order_release + : std::memory_order_seq_cst; + + std::array<Completion, CQE_BATCH_COUNT> reapedCompletions; + + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + stats.recordStealAttempt(); + } + + struct io_uring_cq *cq = &ring.cq; + const unsigned mask = *cq->kring_mask; + auto *atail = reinterpret_cast<std::atomic<unsigned> *>(cq->ktail); + auto *ahead = reinterpret_cast<std::atomic<unsigned> *>(cq->khead); + + // NOTE: just using head for the CAS introduces a possible ABA problem + // if the unsigned head counter overflows during the read and the CAS. + + // Load possibly concurrently used userspace written head pointer + unsigned head = ahead->load(LL_READ_MEM_ORDER); + unsigned count; + do { + // Load concurrently used kernel written tail pointer + unsigned tail = atail->load(LL_READ_MEM_ORDER); + + // NOTE: This number may already be wrong during its calculation + unsigned ready = tail - head; + + if (!ready) return 0; + + count = std::min(toReap, ready); + + for (unsigned i = 0; i < count; ++i) { + const struct io_uring_cqe *cqe = &cq->cqes[(head + i) & mask]; + void *cqe_data = io_uring_cqe_get_data(cqe); + + // Only the owner is allowed to reap new work notifications + if constexpr (callerEnvironment != CallerEnvironment::OWNER) { + const TaggedPtr tptr(cqe_data); + const auto tag = static_cast<PointerTags>(tptr.getTag()); + if (tag == PointerTags::NewWorkWsq || tag == PointerTags::NewWorkAq) { + // don't consume the new work notification + if (i == 0) return 0; + // Since i starts at 0 using i as count is correct. + // If i = 1 this means we are at the second cqe and + // count = i = 1 will consume only the first cqe. + count = i; + break; + } + } + + auto &reapedCompletion = reapedCompletions[i]; + reapedCompletion.first = cqe->res; + reapedCompletion.second = cqe_data; + } + + // TODO: think about the correct memory ordering constraints + } while ( + !ahead->compare_exchange_weak(head, head + count, LL_WRITE_MEM_ORDER, LL_READ_MEM_ORDER)); + + LOGD("got " << count << " cqes from worker " << worker->getWorkerId() << "'s io_uring"); + + if constexpr (emper::DEBUG) { + assert(count <= reqs_in_uring); + reqs_in_uring -= count; + } + + // Reaps done by other worker threads are counted in the AbstractWorkStealingStats + // as stolenIoFibers. + stats.record_reaps<callerEnvironment>(count); + + return getContinuationsFromCompletions<callerEnvironment>(reapedCompletions.data(), count, + continuations); +} + +// Show the compiler our template incarnations +template auto IoContext::reapCompletionsLockless<CallerEnvironment::OWNER>(Fiber **continuations, + unsigned toReap) + -> unsigned; +template auto IoContext::reapCompletionsLockless<CallerEnvironment::EMPER>(Fiber **continuations, + unsigned toReap) + -> unsigned; +template auto IoContext::reapCompletionsLockless<CallerEnvironment::ANYWHERE>(Fiber **contiunations, + unsigned toReap) + -> unsigned; + +template <CallerEnvironment callerEnvironment> +auto IoContext::reapCompletionsLocked(Fiber **continuations, unsigned toReap) -> unsigned { + // TODO: should only the owner possibly rereap? + constexpr bool checkForRereap = callerEnvironment == CallerEnvironment::OWNER; unsigned reReapCount = 0; - using Completion = std::pair<int32_t, void *>; - // vector to store seen cqes to make the critical section + // array to store cqe* needed by liburing + std::array<struct io_uring_cqe *, CQE_BATCH_COUNT> cqes; + + // array to store seen cqes to make the critical section // where cq_lock is held as small as possible std::array<Completion, CQE_BATCH_COUNT> reapedCompletions; - // never reap completions on the global IoContext - assert(this != runtime.globalIo); - - LOGD("Reaping completions for worker " << std::to_string(worker->getWorkerId())); // this label is not used for callerEnvironment::ANYWHERE and thus has to be // annotated with ATTR_UNUSED reap_cqes: ATTR_UNUSED; - std::array<struct io_uring_cqe *, CQE_BATCH_COUNT> cqes; - if constexpr (needsCqLock) { - // Someone else is currently reaping completions - if (unlikely(!cq_lock.try_lock())) { - LOGD("unsuccessful try_lock"); - return 0; - } + // Number of actual continuation fibers resulting from the reaped CQEs + unsigned continuationsCount = 0; - if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { + // TODO: Is using a try lock and the waitInflight flag here even sound? + // Coudn't it be possible to have a lost wakeup with unconsumed new work notification + // cqe in our CQ + // + // State only a single worker does work involving IO and another (completer, io-stealing + // worker accesses its CQ. + + // Other Owner + // | submit IO + // | lock + // | prepare to sleep + // | set flag + // | unlock + // | sleep + // lock | + // | try lock unsucessfull + // | sleep again + // check flag | + // unlock | + if constexpr (needsCqLock) { + // The Owner always takes the lock to reap all completions and especially + // new work notifications and prevent the above discribed problem. + if constexpr (callerEnvironment == CallerEnvironment::OWNER) { + cq_lock.lock(); + } else { + // Someone else is currently reaping completions + if (unlikely(!cq_lock.try_lock())) { + LOGD("unsuccessful try_lock from " << callerEnvironment); + return 0; + } // We have to check the waitInflight flag with the cq_lock held to // ensure we observe an update by the worker holding the lock. // Otherwise this could happen: - // C W + // Other Owner + // | | // | lock // | prepare to sleep // check flag | @@ -255,10 +374,10 @@ reap_cqes: // | unlock // lock | - // Which results in the Completer possible consuming new work notifications. + // Which results in the Other possible consuming new work notifications. // We must not reap completions of this IoContext to not race - // with the sleeping worker. + // with the sleeping owner. if (waitInflight.load(std::memory_order_acquire)) { LOGD("Not reaping worker " << std::to_string(worker->getWorkerId()) << " since this worker is already waiting for its CQEs"); @@ -268,7 +387,22 @@ reap_cqes: } } - unsigned count = io_uring_peek_batch_cqe(&ring, cqes.data(), CQE_BATCH_COUNT); + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + stats.recordStealAttempt(); + } + + unsigned count = io_uring_peek_batch_cqe(&ring, cqes.data(), toReap); + if (!count) { + if constexpr (needsCqLock) { + cq_lock.unlock(); + } + + if constexpr (checkForRereap) { + goto check_for_rereap; + } + + return 0; + } for (unsigned i = 0; i < count; ++i) { struct io_uring_cqe *cqe = cqes[i]; @@ -292,78 +426,31 @@ reap_cqes: reqs_in_uring -= count; } + // Reaps done by other worker threads are counted in the AbstractWorkStealingStats + // as stolenIoFibers. + stats.record_reaps<callerEnvironment>(count); + + continuationsCount = getContinuationsFromCompletions<callerEnvironment>(reapedCompletions.data(), + count, continuations); + // A naive try lock protecting a worker's IoContext's cq is racy. // While a worker is holding the lock additional completions could arrive // which the worker does not observe because it could be already finished iterating. - // In the case that the worker still holds the lock preventing the globalCompleter + // In the case that the worker still holds the lock preventing others // from reaping the additional completions we have a lost wakeup possibly leading // to a completely sleeping runtime with runnable completions in a worker's IoContext. // To prevent this race we check the CQ again for new cqes after we already // dropped the CQ lock and possibly reap again. - stats.record_reaps<callerEnvironment>(count); - - unsigned posInBuf = 0; - for (unsigned i = 0; i < count; ++i) { - auto &completion = reapedCompletions[i]; - auto res = completion.first; - auto *cqe_data = completion.second; - - TaggedPtr tptr(cqe_data); - // Got a CQE for a forgotten Future. - if (!tptr) { - continue; - } - - auto tag = static_cast<PointerTags>(tptr.getTag()); - switch (tag) { - case PointerTags::NewWorkWsq: - case PointerTags::NewWorkAq: { - auto &sleepStrategy = - reinterpret_cast<PipeSleepStrategy &>(runtime.getWorkerSleepStrategy()); - sleepStrategy.onNewWorkNotification<callerEnvironment>(*this); - break; - } - - case PointerTags::Callback: { - auto *callback = tptr.getPtr<Future::CallbackInternal>(); - LOGD("Create new callback fiber for " << callback); - auto *callbackFiber = Fiber::from( - [&c = *callback, res] { - c(res); - delete &c; - }, - &callback->affinity); - - continuationFibers[posInBuf++] = callbackFiber; - } break; - - case PointerTags::Future: { - auto *future = tptr.getPtr<Future>(); - // assert that the future was previously in the uringFutureSet - assert(uringFutureSet.erase(future) > 0); - - future->recordCompletion(stats, res); - Fiber *continuation = future->completeAndGetContinuation(res); - if (continuation) { - continuationFibers[posInBuf++] = continuation; - } - } break; - - default: - DIE_MSG("Unknown pointer tag encountered: " << (int)tag); - break; - } - } - - // check if we missed new cqes - // TODO: should only the worker recheck? - if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - bool reReap = io_uring_cq_ready(&ring) != 0; + // Check if we missed new cqes. +check_for_rereap: + ATTR_UNUSED; + if constexpr (checkForRereap) { + bool reReap = cqeCount() != 0; if (reReap) { // schedule all already collected continuation fibers - runtime.schedule(continuationFibers.data(), posInBuf); + runtime.schedule(continuations, continuationsCount); reReapCount++; @@ -374,15 +461,19 @@ reap_cqes: stats.record_reReapCount(reReapCount); } - return posInBuf; + return continuationsCount; } -// Show the compiler our template incarnations this is needed again because -// reapCompletions<CallerEnvironment::ANYWHERE> is now called from GlobalIoContext.cpp -template auto IoContext::reapCompletions<CallerEnvironment::ANYWHERE>( - ContinuationBuffer &contiunationFibers) -> unsigned; -template auto IoContext::reapCompletions<CallerEnvironment::EMPER>( - ContinuationBuffer &continuationFibers) -> unsigned; +// Show the compiler our template incarnations +template auto IoContext::reapCompletionsLocked<CallerEnvironment::OWNER>(Fiber **continuations, + unsigned toReap) + -> unsigned; +template auto IoContext::reapCompletionsLocked<CallerEnvironment::EMPER>(Fiber **continuations, + unsigned toReap) + -> unsigned; +template auto IoContext::reapCompletionsLocked<CallerEnvironment::ANYWHERE>(Fiber **contiunations, + unsigned toReap) + -> unsigned; IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) { struct io_uring_params params; @@ -443,4 +534,31 @@ IoContext::~IoContext() { delete submitter; } + +auto IoContext::getSqHead() const -> unsigned { return *ring.sq.khead; } +auto IoContext::getSqTail() const -> unsigned { return *ring.sq.ktail; } +auto IoContext::getSqEntries() const -> unsigned { return *ring.sq.kring_entries; } +auto IoContext::getSqFlags() const -> unsigned { return *ring.sq.kflags; } + +auto IoContext::getCqHead() const -> unsigned { return *ring.cq.khead; } +auto IoContext::getCqHeadSafe() const -> unsigned { + return reinterpret_cast<std::atomic<unsigned> *>(ring.cq.khead)->load(); +} + +auto IoContext::getCqTail() const -> unsigned { return *ring.cq.ktail; } +auto IoContext::getCqTailSafe() const -> unsigned { + return reinterpret_cast<std::atomic<unsigned> *>(ring.cq.ktail)->load(); +} + +auto IoContext::getCqEntries() const -> unsigned { return *ring.cq.kring_entries; } + +auto IoContext::getCqe(unsigned i) const -> struct io_uring_cqe { + const unsigned mask = *ring.cq.kring_mask; + const unsigned head = getCqHead(); + return ring.cq.cqes[(head + i) & mask]; +} + +auto IoContext::getCqFlags() const -> unsigned { + return *ring.sq.kflags; +} } // namespace emper::io diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 95684fd1b0f5dc78ff3d1865ca1e90830222deb4..60b1e99a8a55194a08ab236135119ccda891aa93 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -2,7 +2,8 @@ // Copyright © 2020-2021 Florian Fischer #pragma once -#include <liburing.h> // for io_uring +#include <liburing.h> +#include <liburing/io_uring.h> #include <array> #include <atomic> // for atomic @@ -12,26 +13,27 @@ #include <functional> // for less #include <iterator> #include <mutex> +#include <ostream> +#include <string> +#include <utility> #include <vector> #include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER #include "Common.hpp" #include "Debug.hpp" // for LogSubsystem, LogSubsystem::IO, Logger #include "Emper.hpp" +#include "Fiber.hpp" #include "Runtime.hpp" // for Runtime #include "Worker.hpp" #include "emper-config.h" #include "io/Future.hpp" #include "io/Stats.hpp" -#include "io/SubmitActor.hpp" // IWYU pragma: keep -#include "lib/adt/LockedSet.hpp" // for LockedSet +#include "io/SubmitActor.hpp" // IWYU pragma: keep +#include "lib/TaggedPtr.hpp" +#include "lib/adt/LockedSet.hpp" // for LockedSet +#include "sleep_strategy/PipeSleepStrategy.hpp" // IWYU pragma: keep class AbstractWorkStealingScheduler; -class Fiber; - -namespace emper::sleep_strategy { -class PipeSleepStrategy; -} namespace emper::io { class IoContext : public Logger<LogSubsystem::IO> { @@ -47,13 +49,36 @@ class IoContext : public Logger<LogSubsystem::IO> { friend class emper::sleep_strategy::PipeSleepStrategy; + // Debug functions to access the mmaped memory of ring. + // gdb is not allowed to access the io mmaped memory of the io_uring fd. + // https://stackoverflow.com/questions/67451177/why-cant-gdb-read-io-uring-cqe-contents + auto getSqHead() const -> unsigned; + auto getSqTail() const -> unsigned; + auto getSqEntries() const -> unsigned; + auto getSqFlags() const -> unsigned; + + auto getCqHead() const -> unsigned; + auto getCqHeadSafe() const -> unsigned; + auto getCqTail() const -> unsigned; + auto getCqTailSafe() const -> unsigned; + auto getCqEntries() const -> unsigned; + auto getCqe(unsigned i) const -> struct io_uring_cqe; + auto getCqFlags() const -> unsigned; + protected: // Remember the Runtime which created the IoContext Runtime &runtime; static thread_local IoContext *workerIo; + + // We must synchronize the CQ if it is accessed by multiple threads. + // This is the case if we use a completer or if other workers try to steal IO. static constexpr bool needsCqLock = - emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none; + (emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) || (emper::IO_STEALING); + + // Are we synchronizing the CQs lockfree + static constexpr bool locklessCq = needsCqLock && emper::IO_LOCKLESS_CQ; + // TryLock protecting the completion queue of ring. CACHE_LINE_EXCLUSIVE(std::mutex, cq_lock); struct io_uring ring; @@ -164,6 +189,82 @@ class IoContext : public Logger<LogSubsystem::IO> { } } + /** The for now relevant data contained in a cqe */ + using Completion = std::pair<int32_t, void *>; + + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + auto getContinuationsFromCompletions(Completion *completions, size_t count, Fiber **continuations) + -> unsigned { + unsigned posInBuf = 0; + for (unsigned i = 0; i < count; ++i) { + auto &completion = completions[i]; + auto res = completion.first; + auto *cqe_data = completion.second; + + TaggedPtr tptr(cqe_data); + // Got a CQE for a forgotten Future. + if (!tptr) { + continue; + } + + auto tag = static_cast<PointerTags>(tptr.getTag()); + switch (tag) { + case PointerTags::NewWorkWsq: + case PointerTags::NewWorkAq: { + auto &sleepStrategy = + reinterpret_cast<PipeSleepStrategy &>(runtime.getWorkerSleepStrategy()); + sleepStrategy.onNewWorkNotification<callerEnvironment>(*this); + break; + } + + case PointerTags::Callback: { + auto *callback = tptr.getPtr<Future::CallbackInternal>(); + LOGD("Create new callback fiber for " << callback); + auto *callbackFiber = Fiber::from( + [&c = *callback, res] { + c(res); + delete &c; + }, + &callback->affinity); + + continuations[posInBuf++] = callbackFiber; + } break; + + case PointerTags::Future: { + auto *future = tptr.getPtr<Future>(); + // assert that the future was previously in the uringFutureSet + assert(uringFutureSet.erase(future) > 0); + + // This is called from all three contexts and must be synchronized + // using atomics. + future->recordCompletion(stats, res); + + // It is still safe to call this parallel from all three contexts + // because possible resubmission is always done in the globalIo + // when in CallerEnvironment::ANYWHERE and the worker local + // IoContext otherwise + // NOTE: This means that IncrementalCompletableFutures now may + // move between worker IoContexts. + Fiber *continuation = future->completeAndGetContinuation(res); + if (continuation) { + continuations[posInBuf++] = continuation; + } + } break; + + default: + DIE_MSG("Unknown pointer tag encountered: " << (int)tag); + break; + } + } + return posInBuf; + } + + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapCompletionsLockless(Fiber **continuations, unsigned toReap) -> unsigned; + + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapCompletionsLocked(Fiber **continuations, unsigned toReap) -> unsigned; + public: IoContext(Runtime &runtime, size_t uring_entries); IoContext(Runtime &runtime) : IoContext(runtime, EMPER_IO_WORKER_URING_ENTRIES){}; @@ -201,7 +302,7 @@ class IoContext : public Logger<LogSubsystem::IO> { if constexpr (callerEnvironment == CallerEnvironment::EMPER) { return IoContext::getWorkerIo(); } else { - // we use a reinterpret_ cast here because GlobalIoContext is incomplete + // we use a reinterpret_cast here because GlobalIoContext is incomplete // at this point but we can't include GlobalIoContext.hpp because it // would introduce a cyclic dependency. return reinterpret_cast<IoContext *>(Runtime::getRuntime()->globalIo); @@ -279,7 +380,51 @@ class IoContext : public Logger<LogSubsystem::IO> { // offloading it to the async backend. See io_uring_enter(2). // Immediate offloading can be enforced by setting the IOSQE_ASYNC flag in the sqe. // Try to reap a possible synchronous completions. - reapAndScheduleCompletions(); + reapAndScheduleCompletions<CallerEnvironment::OWNER>(); + } + + /** + * @brief return the number of available cqes + */ + [[nodiscard]] auto cqeCount() const -> unsigned { return io_uring_cq_ready(&ring); } + + /** + * @brief Collect \p toReap fibers waiting on completed IO + * + * @param[in, out] continuations Buffer big enough to hold \p toReap possible + continuation Fibers. + * Passing the buffer form the caller to the callee allows the buffer to + * be statically allocated. + * @param toReap length of the continuations buffer + * + * @return The number of continuation Fibers + */ + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapCompletions(Fiber **continuations, unsigned toReap) -> unsigned { + // Why reap more than there are entries in the CQ + assert(toReap <= CQE_BATCH_COUNT); + + // never reap completions on the global IoContext + assert(this != reinterpret_cast<IoContext *>(Runtime::getRuntime()->globalIo)); + + LOGD("Reaping completions for worker " << std::to_string(worker->getWorkerId())); + + if constexpr (locklessCq) { + return reapCompletionsLockless<callerEnvironment>(continuations, toReap); + } + + return reapCompletionsLocked<callerEnvironment>(continuations, toReap); + } + + /** + * @brief Collect one fibers waiting on completed IO + * + * @return The fiber or nullptr + */ + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapSingleCompletion() -> Fiber * { + Fiber *fiber; + return reapCompletions<callerEnvironment>(&fiber, 1) == 1 ? fiber : nullptr; } // Should not be more than the uring_entries count. @@ -290,25 +435,27 @@ class IoContext : public Logger<LogSubsystem::IO> { * @brief Collect all fibers waiting on completed IO * * @param[in, out] continuationFibers Buffer big enough to hold all possible continuation Fibers. - * Passing the buffer fomr the caller to the calle allows the buffer to + * Passing the buffer form the caller to the callee allows the buffer to * be statically allocated. * * @return The number of continuation Fibers */ - template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - auto reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned; + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned { + return reapCompletions<callerEnvironment>(continuationFibers.data(), continuationFibers.size()); + } /** * @brief Schedule all fibers waiting on completed IO */ - template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + template <CallerEnvironment callerEnvironment> void reapAndScheduleCompletions() { ContinuationBuffer completionBuf; unsigned ncompletions = reapCompletions<callerEnvironment>(completionBuf); if (ncompletions > 0) { auto *fibers = completionBuf.data(); - if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + if constexpr (callerEnvironment != CallerEnvironment::ANYWHERE) { runtime.schedule(fibers, ncompletions); } else { runtime.scheduleFromAnywhere(fibers, ncompletions); diff --git a/emper/io/Stats.cpp b/emper/io/Stats.cpp index 627074af94bd8fec39cde22b756205d95011f305..6b4f854d33665d753cad5b467e941a296bb21be7 100644 --- a/emper/io/Stats.cpp +++ b/emper/io/Stats.cpp @@ -67,7 +67,6 @@ auto operator<<(std::ostream& os, const Stats& s) -> std::ostream& { if (futuresToPrint) { os << ss.str(); } - return os; } @@ -98,14 +97,21 @@ auto operator<<(std::ostream& os, const Stats& s) -> std::ostream& { return os; } - os << "io_uring_submit reap completion loops: " << s.io_submit_full_cq << " taking " - << s.io_submit_full_cq_running_mean << "ns on average" << std::endl; + os << "io_uring_submit-loops: " << s.io_submit_full_cq << std::endl; + os << "io_uring_submit-loops-ns: " << s.io_submit_full_cq_running_mean << std::endl; + + os << "worker-reaping: " << s.worker_reap << std::endl; + os << "worker-reaped-completions: " << s.worker_reaped_completions << std::endl; + + if constexpr (emper::IO_STEALING) { + os << "steal-attempts: " << s.steal_attempts << std::endl; + } - os << "worker reaped completions " << s.worker_reap << " times reaping " - << s.worker_reaped_completions << " completions in total" << std::endl; + if constexpr (emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) { + os << "completer-reaping: " << s.completer_reap << std::endl; + os << "completer-reaped-completions: " << s.completer_reaped_completions << std::endl; + } - os << "global completer thread reaped completions " << s.completer_reap << " times reaping " - << s.completer_reaped_completions << " completions in total" << std::endl; os << "re-reap count average [count]: " << s.reReapCount_average << std::endl << "re-reap count max: " << s.reReapCount_max << std::endl << "re-reap last 10:"; @@ -150,6 +156,8 @@ void Stats::printStats(IoContext* globalIoContext, const std::vector<IoContext*> math::calcRunningAvg(avgs.completer_reap, i, stats->completer_reap); math::calcRunningAvg(avgs.completer_reaped_completions, i, stats->completer_reaped_completions); + math::calcRunningAvg(avgs.steal_attempts, i, stats->steal_attempts); + math::calcRunningAvg(avgs.worker_reap, i, stats->worker_reap); math::calcRunningAvg(avgs.worker_reaped_completions, i, stats->worker_reaped_completions); ++i; diff --git a/emper/io/Stats.hpp b/emper/io/Stats.hpp index 8a104ce3775657dd2aaaa20f0b4b82f45c7d9a2b..eb181455b1edcc7aa1498c77c0b609973b8d2756 100644 --- a/emper/io/Stats.hpp +++ b/emper/io/Stats.hpp @@ -35,6 +35,11 @@ namespace math = emper::lib::math; return; \ } +// Atomic increment macro needed because std::atomic<uint64_t> is not easily usable +// as map value. +#define ATOMIC_ADD_RELAXED(var, count) (__atomic_fetch_add(&var, count, __ATOMIC_RELAXED)) +#define ATOMIC_INC_RELAXED(var) (__atomic_fetch_add(&var, 1, __ATOMIC_RELAXED)) + namespace emper::io { using emper::io::Operation; @@ -123,6 +128,8 @@ class Stats { uint64_t worker_reap = 0; uint64_t worker_reaped_completions = 0; + uint64_t steal_attempts = 0; + math::RunningAverage<float> reReapCount_average; unsigned reReapCount_max = 0; boost::circular_buffer<unsigned> reReapCount_last; @@ -158,9 +165,9 @@ class Stats { // Persistent Error if (res < 0) { if (partial_completion > 0) { - operation_map[IncrementalError]++; + ATOMIC_INC_RELAXED(operation_map[IncrementalError]); } else { - operation_map[ErrorCompletion]++; + ATOMIC_INC_RELAXED(operation_map[ErrorCompletion]); } return; } @@ -180,22 +187,22 @@ class Stats { // Full completion if (!exp || (uint32_t)res == exp) { - operation_map[FullCompletion]++; + ATOMIC_INC_RELAXED(operation_map[FullCompletion]); return; } // we expect partial completion if (partial_completion != PartialCompletableFuture::DISABLE_PARTIAL_COMPLETION) { if ((size_t)(res + partial_completion) < exp) { - operation_map[PartialResubmission]++; + ATOMIC_INC_RELAXED(operation_map[PartialResubmission]); } else { - operation_map[IncrementalCompletion]++; + ATOMIC_INC_RELAXED(operation_map[IncrementalCompletion]); } return; } if ((uint32_t)res < exp) { - operation_map[PartialCompletion]++; + ATOMIC_INC_RELAXED(operation_map[PartialCompletion]); return; } @@ -204,6 +211,8 @@ class Stats { abort(); } + inline void recordStealAttempt() { ATOMIC_INC_RELAXED(steal_attempts); } + // running mean calculation taken from // https://math.stackexchange.com/questions/106700/incremental-averageing inline void record_io_submit_full_cq(nanoseconds ns) { @@ -217,10 +226,12 @@ class Stats { inline void record_reaps(unsigned count) { RETURN_IF_NO_STATS(); - if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + // CallerEnvironment EMPER is counted by the AbstractWorkStealingStats and + // io::Stats::steal_attempts. + if constexpr (callerEnvironment == CallerEnvironment::OWNER) { worker_reap++; worker_reaped_completions += count; - } else { + } else if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { completer_reap++; completer_reaped_completions += count; } diff --git a/emper/log/LogBuffer.cpp b/emper/log/LogBuffer.cpp index 790abb9b51208d22621bbac232bb9c01e45e8a6d..94bd3f7b4c7940ba21876721a0ed5332ebecb2ee 100644 --- a/emper/log/LogBuffer.cpp +++ b/emper/log/LogBuffer.cpp @@ -35,11 +35,15 @@ LogBuffer::LogBuffer(const std::string& logFile) : logFile(logFile) { } } -LogBuffer::~LogBuffer() { - auto finalPos = static_cast<off_t>(bufPos.load(std::memory_order_relaxed)); +void LogBuffer::trim() { + const auto finalPos = static_cast<off_t>(bufPos.load(std::memory_order_relaxed)); if (ftruncate(logFd, finalPos)) { DIE_MSG_ERRNO("trimming log file " << logFile << " failed"); } +} + +LogBuffer::~LogBuffer() { + trim(); for (auto* buf : bufs) { if (munmap(buf, BUFFER_SIZE)) { diff --git a/emper/log/LogBuffer.hpp b/emper/log/LogBuffer.hpp index e60a15b7cf27e88d055b192c602bc553b1dd1082..f4149ee98b94d22e0b6edbef471944d3112b67bb 100644 --- a/emper/log/LogBuffer.hpp +++ b/emper/log/LogBuffer.hpp @@ -23,6 +23,8 @@ class LogBuffer { auto getBuf(size_t pos) -> char* { return bufs[(pos / BUFFER_SIZE) % BUFFER_COUNT]; } + void trim(); + public: LogBuffer(const std::string& logFile); ~LogBuffer(); diff --git a/emper/meson.build b/emper/meson.build index 1faa562936f56b65f0cd8dfab4f5e1900bf1f1ce..fbd3e44cf19c0aa528233b53d914ddc6767c8ee4 100644 --- a/emper/meson.build +++ b/emper/meson.build @@ -13,6 +13,7 @@ nasm_gen = generator(nasm, emper_asm_objects = nasm_gen.process(emper_asm_sources) emper_cpp_sources = [ + 'CallerEnvironment.cpp', 'Runtime.cpp', 'Emper.cpp', 'Fiber.cpp', diff --git a/emper/sleep_strategy/PipeSleepStrategy.cpp b/emper/sleep_strategy/PipeSleepStrategy.cpp index 51ba171323d3c3c99a3b3e5f8724c1fb2dd2808d..f2d878ef7ea46e7b9c6b926d09130063ae241a77 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.cpp +++ b/emper/sleep_strategy/PipeSleepStrategy.cpp @@ -147,8 +147,8 @@ void PipeSleepStrategy::sleep() { template <CallerEnvironment callerEnvironment> void PipeSleepStrategy::onNewWorkNotification(IoContext& io) { - if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { - DIE_MSG("Completer reaping new work notification from " << io.worker->getWorkerId() << " CQ"); + if constexpr (callerEnvironment != CallerEnvironment::OWNER) { + DIE_MSG("Others reaping new work notification from " << io.worker->getWorkerId() << " CQ"); } LOGD("Got new work notification"); @@ -159,6 +159,7 @@ void PipeSleepStrategy::onNewWorkNotification(IoContext& io) { io.waitInflight.store(false, std::memory_order_release); } +template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::OWNER>(IoContext& io); template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::EMPER>(IoContext& io); template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::ANYWHERE>(IoContext& io); } // namespace emper::sleep_strategy diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index 976baada2588a51effbb422e817600298ca1bf33..fa954ad6ef596d9b5ee05c1be4300823cc163e62 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -6,7 +6,9 @@ #include <array> #include <cassert> #include <ostream> // for operator<<, basic_ostream<>::__ostream_type +#include <vector> +#include "CallerEnvironment.hpp" #include "Common.hpp" // for unlikely, likely #include "Debug.hpp" // for ABORT #include "Emper.hpp" // for OVERFLOW_QUEUE @@ -135,6 +137,17 @@ popTop: return std::make_pair(fiber, FiberSource::stolen); } + if constexpr (emper::IO_STEALING) { + auto* victimIo = runtime.ioContexts[victim]; + if (victimIo->cqeCount()) { + fiber = victimIo->reapSingleCompletion<CallerEnvironment::EMPER>(); + if (fiber) { + emper::statsIncr(awss::stats.nextIoFiberStolen); + return std::make_pair(fiber, FiberSource::ioStolen); + } + } + } + return std::nullopt; } diff --git a/emper/strategies/AbstractWorkStealingScheduler.hpp b/emper/strategies/AbstractWorkStealingScheduler.hpp index 5960ae2a06832f50a020a16f9d2ddd02a3130a69..ade437e1a3501cb7a8864f34d1def359258c65ee 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.hpp +++ b/emper/strategies/AbstractWorkStealingScheduler.hpp @@ -37,6 +37,7 @@ class AbstractWorkStealingScheduler : public Scheduler { hintWsq, hintAq, stolen, + ioStolen, anywhereQueue, }; diff --git a/emper/strategies/AbstractWorkStealingStats.cpp b/emper/strategies/AbstractWorkStealingStats.cpp index 6b53c066cf5f730d488edc7dc231236429e59999..64bcaac58ffc1d0faea9891c378eddb8f142b0cd 100644 --- a/emper/strategies/AbstractWorkStealingStats.cpp +++ b/emper/strategies/AbstractWorkStealingStats.cpp @@ -31,6 +31,8 @@ void AbstractWorkStealingStats::print(std::ostream& out) { << std::to_string(comulatedWorkerStats.nextFiberFromHintAnywhere) << std::endl << "total-next-fiber-stolen: " << std::to_string(comulatedWorkerStats.nextFiberStolen) << std::endl + << "total-next-io-fiber-stolen: " << std::to_string(comulatedWorkerStats.nextIoFiberStolen) + << std::endl << "total-next-fiber-from-anywhere-queue: " << std::to_string(comulatedWorkerStats.nextFiberFromAnywhereQueue) << std::endl << "total-fibers-lifted-from-anywhere-queue: " diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.cpp b/emper/strategies/AbstractWorkStealingWorkerStats.cpp index 7e164d336665e4413b2d71ec9b50adafa575e79d..ceaf7b4004d14746d6c662dcef4722a02a63ac09 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.cpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.cpp @@ -13,6 +13,7 @@ auto AbstractWorkStealingWorkerStats::operator+=(const AbstractWorkStealingWorke nextFiberFromHintLocal += other.nextFiberFromHintLocal; nextFiberFromHintAnywhere += other.nextFiberFromHintAnywhere; nextFiberStolen += other.nextFiberStolen; + nextIoFiberStolen += other.nextIoFiberStolen; nextFiberFromAnywhereQueue += other.nextFiberFromAnywhereQueue; totalFibersLiftedFromAnywhereQueue += other.totalFibersLiftedFromAnywhereQueue; diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.hpp b/emper/strategies/AbstractWorkStealingWorkerStats.hpp index 684b1d5966ac5c910f9c14125a002378dea4152a..57052eecffd9448669e17bd5e6108d0cba732946 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.hpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.hpp @@ -20,6 +20,7 @@ class AbstractWorkStealingWorkerStats { uint64_t nextFiberFromHintLocal = 0; uint64_t nextFiberFromHintAnywhere = 0; uint64_t nextFiberStolen = 0; + uint64_t nextIoFiberStolen = 0; uint64_t nextFiberFromAnywhereQueue = 0; uint64_t totalFibersLiftedFromAnywhereQueue = 0; diff --git a/meson.build b/meson.build index ce0f95beadf568a453413c56740467575c32c284..aeaf05dc19077c2eaa5c0b53b41ba4213a091b14 100644 --- a/meson.build +++ b/meson.build @@ -89,6 +89,8 @@ if option_io endif io_bool_options = [ + 'stealing', + 'lockless_cq', 'single_uring', 'try_syscall', 'uring_sqpoll', @@ -97,6 +99,7 @@ io_bool_options = [ io_raw_options = [ 'worker_uring_entries', + 'lockless_memory_order', ] foreach option : io_bool_options diff --git a/meson_options.txt b/meson_options.txt index 8f8d93e79d5260de4f7d0e06326a1ee3e49292e8..2a6e56270107e580b83c8b1cb551ee69f23a12e6 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -161,3 +161,22 @@ option( choices: ['unchanged', 'normal', 'nice_10', 'nice_19', 'idle'], value: 'unchanged', ) +option( + 'io_stealing', + type: 'boolean', + description: 'Work-Stealing workers will also try to steal IO from other workers', + value: false, +) +option( + 'io_lockless_cq', + type: 'boolean', + description: 'Synchronize the concurrent access to CQs with a lockless algorithm', + value: false, +) +option( + 'io_lockless_memory_order', + type: 'combo', + choices: ['weak', 'strong'], + description: 'Memory ordering used for the lockless CQ algorithm', + value: 'weak', +) diff --git a/tools/gdb/__init__.py b/tools/gdb/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tools/gdb/dump_runtime_state.py b/tools/gdb/dump_runtime_state.py new file mode 100644 index 0000000000000000000000000000000000000000..ae1a6d9e1774d348ee261d9dbdc5799310d94ae2 --- /dev/null +++ b/tools/gdb/dump_runtime_state.py @@ -0,0 +1,213 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later +# Copyright 2021 Florian Fischer +"""gdb python script to dump the state of the runtime""" + +from pathlib import Path +import sys + +import gdb + +gdb_py_dir = Path(__file__).parent +sys.path.append(str(gdb_py_dir)) +from worker_frame_filter import WorkerFrameFilter + + +INDENTATION = ' ' + +def setup(): + gdb.execute('set pagination off') + gdb.execute('set print pretty') + gdb.execute('set scheduler-locking on') + + +def cleanup(): + gdb.execute('set pagination on') + gdb.execute('set print pretty off') + gdb.execute('set scheduler-locking off') + + +def print_queue(queue, indentation=''): + print(f'{indentation}WSL:') + indentation += INDENTATION + + top = queue['top']['_M_i'] + print(f'{indentation}top: {top}') + + bottom = queue['bottom']['_M_i'] + print(f'{indentation}bottom: {bottom}') + + print(f'{indentation}work: {bottom - top}') + + +def print_sq(io, indentation=''): + print(f'{indentation}sq:') + indentation += INDENTATION + sq = io['ring']['sq'] + + print(f'{indentation}ring_ptr: {sq["ring_ptr"]}') + entries = gdb.parse_and_eval("emper::io::IoContext::workerIo->getSqEntries()") + print(f'{indentation}entries: {entries}') + + flags = gdb.parse_and_eval("emper::io::IoContext::workerIo->getSqFlags()") + print(f'{indentation}flags: {flags}') + + head = gdb.parse_and_eval("emper::io::IoContext::workerIo->getSqHead()") + print(f'{indentation}head: {head}') + + tail = gdb.parse_and_eval("emper::io::IoContext::workerIo->getSqTail()") + print(f'{indentation}tail: {tail}') + + unsubmitted = tail - head + print(f'{indentation}unsubmitted: {unsubmitted}') + + # print(sq) + + +def print_cq(io, indentation=''): + print(f'{indentation}cq:') + indentation += INDENTATION + cq = io['ring']['cq'] + + print(f'{indentation}ring_ptr: {cq["ring_ptr"]}') + entries = gdb.parse_and_eval("emper::io::IoContext::workerIo->getCqEntries()") + print(f'{indentation}entries: {entries}') + + flags = gdb.parse_and_eval("emper::io::IoContext::workerIo->getCqFlags()") + print(f'{indentation}flags: {flags}') + + head = gdb.parse_and_eval("emper::io::IoContext::workerIo->getCqHeadSafe()") + print(f'{indentation}head: {head}') + + tail = gdb.parse_and_eval("emper::io::IoContext::workerIo->getCqTailSafe()") + print(f'{indentation}tail: {tail}') + + ready = tail - head + print(f'{indentation}ready: {ready}') + + # print(io['ring']['cq']) + + +def print_uring(io, indentation=''): + uring = io['ring'] + print(f'{indentation}io_uring:') + indentation += INDENTATION + + print(f'{indentation}ring_fd: {uring["ring_fd"]}') + print(f'{indentation}flags: {uring["flags"]}') + print(f'{indentation}features: {uring["features"]}') + + print_sq(io, indentation) + print_cq(io, indentation) + + +def print_io(io, indentation=''): + print(f'{indentation}IoContext:') + indentation += INDENTATION + + print(f'{indentation}needsCqLock: {io["needsCqLock"]}') + print(f'{indentation}locklessCq: {io["locklessCq"]}') + + print(f'{indentation}notificationEventFd: {io["notificationEventFd"]}') + + submitter = io["submitter"] + if not submitter.address: + print(f'{indentation}submitter: {submitter}') + + print(f'{indentation}waitInflight: {io["waitInflight"]["_M_base"]["_M_i"]}') + # print(f'{indentation}{io["CQE_BATCH_SIZE"]}') + print(f'{indentation}reqs_in_uring: {io["reqs_in_uring"]["_M_i"]}') + print(f'{indentation}preparedSqes: {io["preparedSqes"]}') + print(f'{indentation}uringFutureSet: {io["uringFutureSet"]["_set"]}') + + print_uring(io, indentation=indentation) + + +def print_anywhere_queue(runtime, indentation=''): + print(f'{indentation}AnywhereQueue:') + indentation += INDENTATION + scheduler = runtime["scheduler"] + print(f'{indentation}{scheduler["scheduleAnywhereQueue"]["queue"]}') + + +def print_stats(runtime, indentation=''): + print(f'{indentation}Stats:') + indentation += INDENTATION + print_stats_cmd = f'p ((Runtime*){runtime.address})->printStats(std::cout, false)' + stats_str = gdb.execute(print_stats_cmd, False, True) + for l in stats_str.splitlines(): + print(f'{indentation}{l}') + + +def print_bt(): + print(gdb.execute("bt", False, True)) + + +def dump(): + printed_once = False + + threads = gdb.selected_inferior().threads() + for thread in threads: + # switch to thread + thread.switch() + + # A thread may be in a frame from a non emper source file and + # thus gdb can not use the emper namespace. + # Our solution is to walk up the stack until we are in a emper function + emper_found = False + cur = gdb.selected_frame() + while (True): + cur_sym_tab = cur.find_sal().symtab + if cur_sym_tab and 'emper' in cur_sym_tab.filename: + emper_found = True + break + + next = cur.older() + if not next: + break + next.select() + cur = next + + if not emper_found: + print(f"Thread {thread.name} found not executing any emper code") + print_bt() + continue + + runtime = gdb.parse_and_eval("Runtime::currentRuntime").dereference() + if not runtime: + print(f"Thread {thread} found without an active runtime") + print_bt() + continue + + if not printed_once: + print_anywhere_queue(runtime) + print() + print_stats(runtime) + print() + printed_once = True + + worker = gdb.parse_and_eval("Worker::currentWorker").dereference() + if worker.address == 0: + print(f"Non worker thread {thread.name}") + print_bt() + continue + + worker_id = worker['workerId'] + print(f"## Worker {worker_id} ##") + print_bt() + + queue = gdb.parse_and_eval("AbstractWorkStealingScheduler::queue") + print_queue(queue) + print() + + io = gdb.parse_and_eval("emper::io::IoContext::workerIo").dereference() + print_io(io) + print() + + + +if __name__ == '__main__': + # install frame filter + WorkerFrameFilter() + setup() + dump() + cleanup() diff --git a/tools/gdb/worker_frame_filter.py b/tools/gdb/worker_frame_filter.py new file mode 100644 index 0000000000000000000000000000000000000000..92558fe826d9b938eaae0e77afc1598731759af5 --- /dev/null +++ b/tools/gdb/worker_frame_filter.py @@ -0,0 +1,40 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later +# Copyright 2021 Florian Fischer +import gdb + +class WorkerFrameFilter(): + """Frame filter wrapping the frame iterator into an WorkerFrameIterator""" + def __init__(self): + self.name = "WorkerFrameFilter" + self.priority = 100 + self.enabled = True + gdb.frame_filters[self.name] = self + + def filter(self, frame_iter): + return WorkerFrameIterator(frame_iter) + + +class WorkerFrameIterator: + """Frame iterator skiping each frame without a valid name and function + + This is usefull in EMPER because worker stacks are allocated by emper and not glibc + and are not properly walked by gdb resulting in multiple 'broken' frames above + the dispatch loop. + """ + def __init__(self, ii): + self.input_iterator = ii + + def __iter__(self): + return self + + def __next__(self): + while True: + frameDecorator = next(self.input_iterator) + frame = frameDecorator.inferior_frame() + + if frame.name() or frame.function(): + return frameDecorator + + +if __name__ == '__main__': + WorkerFrameFilter()