diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 498ed53086ee41a1e04d55113b23c53ad459e167..e2cb70b8db6ea180c2324003b3427ae391508435 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -113,6 +113,10 @@ clang-tidy: variables: EMPER_IO_SINGLE_URING: 'true' +.emper-io-stealing: + variables: + EMPER_IO_STEALING: 'true' + .default-library-static: variables: EMPER_DEFAULT_LIBRARY: 'static' @@ -309,3 +313,15 @@ test-pipe-sleep-strategy-no-completer: - .test - .emper-pipe-sleep-strategy - .emper-no-completer + +test-io-stealing: + extends: + - .test + - .emper-io-stealing + +test-io-stealing-pipe-no-completer: + extends: + - .test + - .emper-pipe-sleep-strategy + - .emper-no-completer + - .emper-io-stealing diff --git a/emper/Emper.hpp b/emper/Emper.hpp index 85147ceef1bc670aeb1aae91aed38c4958d2d8a6..dbc73ca6e359e3be11f4ab92ff0bfec8c245e630 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -105,6 +105,14 @@ static const bool IO = #endif ; +static const bool IO_STEALING = +#ifdef EMPER_IO_STEALING + true +#else + false +#endif + ; + static const bool IO_SINGLE_URING = #ifdef EMPER_IO_SINGLE_URING true diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index c9d73304bb1b305d9aeaed4433d2145e23e099ce..bb5e1ef835d075194ed832e2ef663bac30103b7e 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -340,7 +340,8 @@ auto Runtime::nextFiber() -> NextFiberResult { if constexpr (emper::IO_WORKER_URING) { // Schedule all fibers waiting on completed IO IoContext::ContinuationBuffer completions; - unsigned ncompletions = IoContext::getWorkerIo()->reapCompletions(completions); + unsigned ncompletions = + IoContext::getWorkerIo()->reapCompletions<CallerEnvironment::OWNER>(completions); if (ncompletions > 0) { // Keep the first and schedule the rest Fiber* next = completions[0]; diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 18a16649759adb38e21071bea2f504cae6eda21f..c3467559ac55ea41dcc3cfa3166c0b394a1078d3 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -213,6 +213,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { void executeAndWait(std::function<void()> f); + friend class AbstractWorkStealingScheduler; template <LogSubsystem> friend class Blockable; friend ContextManager; diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index bc5a01cb94a5b9a43c1b1366509f4aa59db0fb5a..1183f6fd25d166fdd429ee94f7fa97980cf4b101 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -22,7 +22,6 @@ #include "Common.hpp" // for unlikely, DIE_MSG_ERRNO, DIE_MSG #include "Debug.hpp" // for LOGD #include "Emper.hpp" // for DEBUG, IO_URING_SQPOLL -#include "Fiber.hpp" #include "Runtime.hpp" #include "emper-common.h" #include "io/Future.hpp" // for Future, operator<<, Future::State @@ -122,7 +121,7 @@ auto IoContext::submitPreparedSqesAndWait(unsigned wait_nr) -> unsigned { TIME_NS( { do { - reapAndScheduleCompletions(); + reapAndScheduleCompletions<callerEnvironment>(); } while ((submitted = io_uring_submit(&ring)) == -EBUSY); }, stats.record_io_submit_full_cq); @@ -218,11 +217,15 @@ template void IoContext::submitAndWait<CallerEnvironment::ANYWHERE>(Future &futu unsigned wait_nr); template <CallerEnvironment callerEnvironment> -auto IoContext::reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned { +auto IoContext::reapCompletions(Fiber **continuations, size_t toReap) -> unsigned { + // TODO: should only the owner possibly rereap? + constexpr bool checkForRereap = callerEnvironment == CallerEnvironment::OWNER; unsigned reReapCount = 0; - using Completion = std::pair<int32_t, void *>; - // vector to store seen cqes to make the critical section + // array to store cqe* needed by liburing + std::array<struct io_uring_cqe *, CQE_BATCH_COUNT> cqes; + + // array to store seen cqes to make the critical section // where cq_lock is held as small as possible std::array<Completion, CQE_BATCH_COUNT> reapedCompletions; @@ -230,24 +233,28 @@ auto IoContext::reapCompletions(ContinuationBuffer &continuationFibers) -> unsig assert(this != runtime.globalIo); LOGD("Reaping completions for worker " << std::to_string(worker->getWorkerId())); + // this label is not used for callerEnvironment::ANYWHERE and thus has to be // annotated with ATTR_UNUSED reap_cqes: ATTR_UNUSED; - std::array<struct io_uring_cqe *, CQE_BATCH_COUNT> cqes; + + // Number of actual continuation fibers resulting from the reaped CQEs + unsigned continuationsCount = 0; if constexpr (needsCqLock) { // Someone else is currently reaping completions if (unlikely(!cq_lock.try_lock())) { - LOGD("unsuccessful try_lock"); + LOGD("unsuccessful try_lock from " << callerEnvironment); return 0; } - if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { + if constexpr (callerEnvironment != CallerEnvironment::OWNER) { // We have to check the waitInflight flag with the cq_lock held to // ensure we observe an update by the worker holding the lock. // Otherwise this could happen: - // C W + // Other Owner + // | | // | lock // | prepare to sleep // check flag | @@ -255,10 +262,10 @@ reap_cqes: // | unlock // lock | - // Which results in the Completer possible consuming new work notifications. + // Which results in the Other possible consuming new work notifications. // We must not reap completions of this IoContext to not race - // with the sleeping worker. + // with the sleeping owner. if (waitInflight.load(std::memory_order_acquire)) { LOGD("Not reaping worker " << std::to_string(worker->getWorkerId()) << " since this worker is already waiting for its CQEs"); @@ -268,7 +275,22 @@ reap_cqes: } } - unsigned count = io_uring_peek_batch_cqe(&ring, cqes.data(), CQE_BATCH_COUNT); + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + stats.recordStealAttempt(); + } + + unsigned count = io_uring_peek_batch_cqe(&ring, cqes.data(), toReap); + if (!count) { + if constexpr (needsCqLock) { + cq_lock.unlock(); + } + + if constexpr (checkForRereap) { + goto check_for_rereap; + } + + return 0; + } for (unsigned i = 0; i < count; ++i) { struct io_uring_cqe *cqe = cqes[i]; @@ -292,78 +314,31 @@ reap_cqes: reqs_in_uring -= count; } + // Reaps done by other worker threads are counted in the AbstractWorkStealingStats + // as stolenIoFibers. + stats.record_reaps<callerEnvironment>(count); + + continuationsCount = getContinuationsFromCompletions<callerEnvironment>(reapedCompletions.data(), + count, continuations); + // A naive try lock protecting a worker's IoContext's cq is racy. // While a worker is holding the lock additional completions could arrive // which the worker does not observe because it could be already finished iterating. - // In the case that the worker still holds the lock preventing the globalCompleter + // In the case that the worker still holds the lock preventing others // from reaping the additional completions we have a lost wakeup possibly leading // to a completely sleeping runtime with runnable completions in a worker's IoContext. // To prevent this race we check the CQ again for new cqes after we already // dropped the CQ lock and possibly reap again. - stats.record_reaps<callerEnvironment>(count); - - unsigned posInBuf = 0; - for (unsigned i = 0; i < count; ++i) { - auto &completion = reapedCompletions[i]; - auto res = completion.first; - auto *cqe_data = completion.second; - - TaggedPtr tptr(cqe_data); - // Got a CQE for a forgotten Future. - if (!tptr) { - continue; - } - - auto tag = static_cast<PointerTags>(tptr.getTag()); - switch (tag) { - case PointerTags::NewWorkWsq: - case PointerTags::NewWorkAq: { - auto &sleepStrategy = - reinterpret_cast<PipeSleepStrategy &>(runtime.getWorkerSleepStrategy()); - sleepStrategy.onNewWorkNotification<callerEnvironment>(*this); - break; - } - - case PointerTags::Callback: { - auto *callback = tptr.getPtr<Future::CallbackInternal>(); - LOGD("Create new callback fiber for " << callback); - auto *callbackFiber = Fiber::from( - [&c = *callback, res] { - c(res); - delete &c; - }, - &callback->affinity); - - continuationFibers[posInBuf++] = callbackFiber; - } break; - - case PointerTags::Future: { - auto *future = tptr.getPtr<Future>(); - // assert that the future was previously in the uringFutureSet - assert(uringFutureSet.erase(future) > 0); - - future->recordCompletion(stats, res); - Fiber *continuation = future->completeAndGetContinuation(res); - if (continuation) { - continuationFibers[posInBuf++] = continuation; - } - } break; - - default: - DIE_MSG("Unknown pointer tag encountered: " << (int)tag); - break; - } - } - - // check if we missed new cqes - // TODO: should only the worker recheck? - if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - bool reReap = io_uring_cq_ready(&ring) != 0; + // Check if we missed new cqes. +check_for_rereap: + ATTR_UNUSED; + if constexpr (checkForRereap) { + bool reReap = cqeCount() != 0; if (reReap) { // schedule all already collected continuation fibers - runtime.schedule(continuationFibers.data(), posInBuf); + runtime.schedule(continuations, continuationsCount); reReapCount++; @@ -374,15 +349,16 @@ reap_cqes: stats.record_reReapCount(reReapCount); } - return posInBuf; + return continuationsCount; } -// Show the compiler our template incarnations this is needed again because -// reapCompletions<CallerEnvironment::ANYWHERE> is now called from GlobalIoContext.cpp -template auto IoContext::reapCompletions<CallerEnvironment::ANYWHERE>( - ContinuationBuffer &contiunationFibers) -> unsigned; -template auto IoContext::reapCompletions<CallerEnvironment::EMPER>( - ContinuationBuffer &continuationFibers) -> unsigned; +// Show the compiler our template incarnations +template auto IoContext::reapCompletions<CallerEnvironment::OWNER>(Fiber **continuations, + size_t toReap) -> unsigned; +template auto IoContext::reapCompletions<CallerEnvironment::EMPER>(Fiber **continuations, + size_t toReap) -> unsigned; +template auto IoContext::reapCompletions<CallerEnvironment::ANYWHERE>(Fiber **contiunations, + size_t toReap) -> unsigned; IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) { struct io_uring_params params; diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 95684fd1b0f5dc78ff3d1865ca1e90830222deb4..82c108bb8fc948390e09726070104adc947dfe0e 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -12,26 +12,26 @@ #include <functional> // for less #include <iterator> #include <mutex> +#include <ostream> +#include <utility> #include <vector> #include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER #include "Common.hpp" #include "Debug.hpp" // for LogSubsystem, LogSubsystem::IO, Logger #include "Emper.hpp" +#include "Fiber.hpp" #include "Runtime.hpp" // for Runtime #include "Worker.hpp" #include "emper-config.h" #include "io/Future.hpp" #include "io/Stats.hpp" -#include "io/SubmitActor.hpp" // IWYU pragma: keep -#include "lib/adt/LockedSet.hpp" // for LockedSet +#include "io/SubmitActor.hpp" // IWYU pragma: keep +#include "lib/TaggedPtr.hpp" +#include "lib/adt/LockedSet.hpp" // for LockedSet +#include "sleep_strategy/PipeSleepStrategy.hpp" // IWYU pragma: keep class AbstractWorkStealingScheduler; -class Fiber; - -namespace emper::sleep_strategy { -class PipeSleepStrategy; -} namespace emper::io { class IoContext : public Logger<LogSubsystem::IO> { @@ -53,7 +53,7 @@ class IoContext : public Logger<LogSubsystem::IO> { static thread_local IoContext *workerIo; static constexpr bool needsCqLock = - emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none; + (emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) || (emper::IO_STEALING); // TryLock protecting the completion queue of ring. CACHE_LINE_EXCLUSIVE(std::mutex, cq_lock); struct io_uring ring; @@ -164,6 +164,76 @@ class IoContext : public Logger<LogSubsystem::IO> { } } + /** The for now relevant data contained in a cqe */ + using Completion = std::pair<int32_t, void *>; + + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + auto getContinuationsFromCompletions(Completion *completions, size_t count, Fiber **continuations) + -> unsigned { + unsigned posInBuf = 0; + for (unsigned i = 0; i < count; ++i) { + auto &completion = completions[i]; + auto res = completion.first; + auto *cqe_data = completion.second; + + TaggedPtr tptr(cqe_data); + // Got a CQE for a forgotten Future. + if (!tptr) { + continue; + } + + auto tag = static_cast<PointerTags>(tptr.getTag()); + switch (tag) { + case PointerTags::NewWorkWsq: + case PointerTags::NewWorkAq: { + auto &sleepStrategy = + reinterpret_cast<PipeSleepStrategy &>(runtime.getWorkerSleepStrategy()); + sleepStrategy.onNewWorkNotification<callerEnvironment>(*this); + break; + } + + case PointerTags::Callback: { + auto *callback = tptr.getPtr<Future::CallbackInternal>(); + LOGD("Create new callback fiber for " << callback); + auto *callbackFiber = Fiber::from( + [&c = *callback, res] { + c(res); + delete &c; + }, + &callback->affinity); + + continuations[posInBuf++] = callbackFiber; + } break; + + case PointerTags::Future: { + auto *future = tptr.getPtr<Future>(); + // assert that the future was previously in the uringFutureSet + assert(uringFutureSet.erase(future) > 0); + + // This is called from all three contexts and must be synchronized + // using atomics. + future->recordCompletion(stats, res); + + // It is still safe to call this parallel from all three contexts + // because possible resubmission is always done in the globalIo + // when in CallerEnvironment::ANYWHERE and the worker local + // IoContext otherwise + // NOTE: This means that IncrementalCompletableFutures now may + // move between worker IoContexts. + Fiber *continuation = future->completeAndGetContinuation(res); + if (continuation) { + continuations[posInBuf++] = continuation; + } + } break; + + default: + DIE_MSG("Unknown pointer tag encountered: " << (int)tag); + break; + } + } + return posInBuf; + } + public: IoContext(Runtime &runtime, size_t uring_entries); IoContext(Runtime &runtime) : IoContext(runtime, EMPER_IO_WORKER_URING_ENTRIES){}; @@ -279,7 +349,37 @@ class IoContext : public Logger<LogSubsystem::IO> { // offloading it to the async backend. See io_uring_enter(2). // Immediate offloading can be enforced by setting the IOSQE_ASYNC flag in the sqe. // Try to reap a possible synchronous completions. - reapAndScheduleCompletions(); + reapAndScheduleCompletions<CallerEnvironment::OWNER>(); + } + + /** + * @brief return the number of available cqes + */ + [[nodiscard]] auto cqeCount() const -> unsigned { return io_uring_cq_ready(&ring); } + + /** + * @brief Collect \p toReap fibers waiting on completed IO + * + * @param[in, out] continuations Buffer big enough to hold \p toReap possible + continuation Fibers. + * Passing the buffer form the caller to the callee allows the buffer to + * be statically allocated. + * @param toReap + * + * @return The number of continuation Fibers + */ + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapCompletions(Fiber **continuations, size_t toReap) -> unsigned; + + /** + * @brief Collect one fibers waiting on completed IO + * + * @return The fiber or nullptr + */ + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapSingleCompletion() -> Fiber * { + Fiber *fiber; + return reapCompletions<callerEnvironment>(&fiber, 1) == 1 ? fiber : nullptr; } // Should not be more than the uring_entries count. @@ -290,25 +390,27 @@ class IoContext : public Logger<LogSubsystem::IO> { * @brief Collect all fibers waiting on completed IO * * @param[in, out] continuationFibers Buffer big enough to hold all possible continuation Fibers. - * Passing the buffer fomr the caller to the calle allows the buffer to + * Passing the buffer form the caller to the callee allows the buffer to * be statically allocated. * * @return The number of continuation Fibers */ - template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - auto reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned; + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned { + return reapCompletions<callerEnvironment>(continuationFibers.data(), continuationFibers.size()); + } /** * @brief Schedule all fibers waiting on completed IO */ - template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + template <CallerEnvironment callerEnvironment> void reapAndScheduleCompletions() { ContinuationBuffer completionBuf; unsigned ncompletions = reapCompletions<callerEnvironment>(completionBuf); if (ncompletions > 0) { auto *fibers = completionBuf.data(); - if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + if constexpr (callerEnvironment != CallerEnvironment::ANYWHERE) { runtime.schedule(fibers, ncompletions); } else { runtime.scheduleFromAnywhere(fibers, ncompletions); diff --git a/emper/io/Stats.cpp b/emper/io/Stats.cpp index 627074af94bd8fec39cde22b756205d95011f305..6b4f854d33665d753cad5b467e941a296bb21be7 100644 --- a/emper/io/Stats.cpp +++ b/emper/io/Stats.cpp @@ -67,7 +67,6 @@ auto operator<<(std::ostream& os, const Stats& s) -> std::ostream& { if (futuresToPrint) { os << ss.str(); } - return os; } @@ -98,14 +97,21 @@ auto operator<<(std::ostream& os, const Stats& s) -> std::ostream& { return os; } - os << "io_uring_submit reap completion loops: " << s.io_submit_full_cq << " taking " - << s.io_submit_full_cq_running_mean << "ns on average" << std::endl; + os << "io_uring_submit-loops: " << s.io_submit_full_cq << std::endl; + os << "io_uring_submit-loops-ns: " << s.io_submit_full_cq_running_mean << std::endl; + + os << "worker-reaping: " << s.worker_reap << std::endl; + os << "worker-reaped-completions: " << s.worker_reaped_completions << std::endl; + + if constexpr (emper::IO_STEALING) { + os << "steal-attempts: " << s.steal_attempts << std::endl; + } - os << "worker reaped completions " << s.worker_reap << " times reaping " - << s.worker_reaped_completions << " completions in total" << std::endl; + if constexpr (emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) { + os << "completer-reaping: " << s.completer_reap << std::endl; + os << "completer-reaped-completions: " << s.completer_reaped_completions << std::endl; + } - os << "global completer thread reaped completions " << s.completer_reap << " times reaping " - << s.completer_reaped_completions << " completions in total" << std::endl; os << "re-reap count average [count]: " << s.reReapCount_average << std::endl << "re-reap count max: " << s.reReapCount_max << std::endl << "re-reap last 10:"; @@ -150,6 +156,8 @@ void Stats::printStats(IoContext* globalIoContext, const std::vector<IoContext*> math::calcRunningAvg(avgs.completer_reap, i, stats->completer_reap); math::calcRunningAvg(avgs.completer_reaped_completions, i, stats->completer_reaped_completions); + math::calcRunningAvg(avgs.steal_attempts, i, stats->steal_attempts); + math::calcRunningAvg(avgs.worker_reap, i, stats->worker_reap); math::calcRunningAvg(avgs.worker_reaped_completions, i, stats->worker_reaped_completions); ++i; diff --git a/emper/io/Stats.hpp b/emper/io/Stats.hpp index 8a104ce3775657dd2aaaa20f0b4b82f45c7d9a2b..eb181455b1edcc7aa1498c77c0b609973b8d2756 100644 --- a/emper/io/Stats.hpp +++ b/emper/io/Stats.hpp @@ -35,6 +35,11 @@ namespace math = emper::lib::math; return; \ } +// Atomic increment macro needed because std::atomic<uint64_t> is not easily usable +// as map value. +#define ATOMIC_ADD_RELAXED(var, count) (__atomic_fetch_add(&var, count, __ATOMIC_RELAXED)) +#define ATOMIC_INC_RELAXED(var) (__atomic_fetch_add(&var, 1, __ATOMIC_RELAXED)) + namespace emper::io { using emper::io::Operation; @@ -123,6 +128,8 @@ class Stats { uint64_t worker_reap = 0; uint64_t worker_reaped_completions = 0; + uint64_t steal_attempts = 0; + math::RunningAverage<float> reReapCount_average; unsigned reReapCount_max = 0; boost::circular_buffer<unsigned> reReapCount_last; @@ -158,9 +165,9 @@ class Stats { // Persistent Error if (res < 0) { if (partial_completion > 0) { - operation_map[IncrementalError]++; + ATOMIC_INC_RELAXED(operation_map[IncrementalError]); } else { - operation_map[ErrorCompletion]++; + ATOMIC_INC_RELAXED(operation_map[ErrorCompletion]); } return; } @@ -180,22 +187,22 @@ class Stats { // Full completion if (!exp || (uint32_t)res == exp) { - operation_map[FullCompletion]++; + ATOMIC_INC_RELAXED(operation_map[FullCompletion]); return; } // we expect partial completion if (partial_completion != PartialCompletableFuture::DISABLE_PARTIAL_COMPLETION) { if ((size_t)(res + partial_completion) < exp) { - operation_map[PartialResubmission]++; + ATOMIC_INC_RELAXED(operation_map[PartialResubmission]); } else { - operation_map[IncrementalCompletion]++; + ATOMIC_INC_RELAXED(operation_map[IncrementalCompletion]); } return; } if ((uint32_t)res < exp) { - operation_map[PartialCompletion]++; + ATOMIC_INC_RELAXED(operation_map[PartialCompletion]); return; } @@ -204,6 +211,8 @@ class Stats { abort(); } + inline void recordStealAttempt() { ATOMIC_INC_RELAXED(steal_attempts); } + // running mean calculation taken from // https://math.stackexchange.com/questions/106700/incremental-averageing inline void record_io_submit_full_cq(nanoseconds ns) { @@ -217,10 +226,12 @@ class Stats { inline void record_reaps(unsigned count) { RETURN_IF_NO_STATS(); - if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + // CallerEnvironment EMPER is counted by the AbstractWorkStealingStats and + // io::Stats::steal_attempts. + if constexpr (callerEnvironment == CallerEnvironment::OWNER) { worker_reap++; worker_reaped_completions += count; - } else { + } else if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { completer_reap++; completer_reaped_completions += count; } diff --git a/emper/log/LogBuffer.cpp b/emper/log/LogBuffer.cpp index 790abb9b51208d22621bbac232bb9c01e45e8a6d..94bd3f7b4c7940ba21876721a0ed5332ebecb2ee 100644 --- a/emper/log/LogBuffer.cpp +++ b/emper/log/LogBuffer.cpp @@ -35,11 +35,15 @@ LogBuffer::LogBuffer(const std::string& logFile) : logFile(logFile) { } } -LogBuffer::~LogBuffer() { - auto finalPos = static_cast<off_t>(bufPos.load(std::memory_order_relaxed)); +void LogBuffer::trim() { + const auto finalPos = static_cast<off_t>(bufPos.load(std::memory_order_relaxed)); if (ftruncate(logFd, finalPos)) { DIE_MSG_ERRNO("trimming log file " << logFile << " failed"); } +} + +LogBuffer::~LogBuffer() { + trim(); for (auto* buf : bufs) { if (munmap(buf, BUFFER_SIZE)) { diff --git a/emper/log/LogBuffer.hpp b/emper/log/LogBuffer.hpp index e60a15b7cf27e88d055b192c602bc553b1dd1082..f4149ee98b94d22e0b6edbef471944d3112b67bb 100644 --- a/emper/log/LogBuffer.hpp +++ b/emper/log/LogBuffer.hpp @@ -23,6 +23,8 @@ class LogBuffer { auto getBuf(size_t pos) -> char* { return bufs[(pos / BUFFER_SIZE) % BUFFER_COUNT]; } + void trim(); + public: LogBuffer(const std::string& logFile); ~LogBuffer(); diff --git a/emper/sleep_strategy/PipeSleepStrategy.cpp b/emper/sleep_strategy/PipeSleepStrategy.cpp index 51ba171323d3c3c99a3b3e5f8724c1fb2dd2808d..f2d878ef7ea46e7b9c6b926d09130063ae241a77 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.cpp +++ b/emper/sleep_strategy/PipeSleepStrategy.cpp @@ -147,8 +147,8 @@ void PipeSleepStrategy::sleep() { template <CallerEnvironment callerEnvironment> void PipeSleepStrategy::onNewWorkNotification(IoContext& io) { - if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { - DIE_MSG("Completer reaping new work notification from " << io.worker->getWorkerId() << " CQ"); + if constexpr (callerEnvironment != CallerEnvironment::OWNER) { + DIE_MSG("Others reaping new work notification from " << io.worker->getWorkerId() << " CQ"); } LOGD("Got new work notification"); @@ -159,6 +159,7 @@ void PipeSleepStrategy::onNewWorkNotification(IoContext& io) { io.waitInflight.store(false, std::memory_order_release); } +template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::OWNER>(IoContext& io); template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::EMPER>(IoContext& io); template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::ANYWHERE>(IoContext& io); } // namespace emper::sleep_strategy diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index 976baada2588a51effbb422e817600298ca1bf33..fa954ad6ef596d9b5ee05c1be4300823cc163e62 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -6,7 +6,9 @@ #include <array> #include <cassert> #include <ostream> // for operator<<, basic_ostream<>::__ostream_type +#include <vector> +#include "CallerEnvironment.hpp" #include "Common.hpp" // for unlikely, likely #include "Debug.hpp" // for ABORT #include "Emper.hpp" // for OVERFLOW_QUEUE @@ -135,6 +137,17 @@ popTop: return std::make_pair(fiber, FiberSource::stolen); } + if constexpr (emper::IO_STEALING) { + auto* victimIo = runtime.ioContexts[victim]; + if (victimIo->cqeCount()) { + fiber = victimIo->reapSingleCompletion<CallerEnvironment::EMPER>(); + if (fiber) { + emper::statsIncr(awss::stats.nextIoFiberStolen); + return std::make_pair(fiber, FiberSource::ioStolen); + } + } + } + return std::nullopt; } diff --git a/emper/strategies/AbstractWorkStealingScheduler.hpp b/emper/strategies/AbstractWorkStealingScheduler.hpp index 5960ae2a06832f50a020a16f9d2ddd02a3130a69..ade437e1a3501cb7a8864f34d1def359258c65ee 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.hpp +++ b/emper/strategies/AbstractWorkStealingScheduler.hpp @@ -37,6 +37,7 @@ class AbstractWorkStealingScheduler : public Scheduler { hintWsq, hintAq, stolen, + ioStolen, anywhereQueue, }; diff --git a/emper/strategies/AbstractWorkStealingStats.cpp b/emper/strategies/AbstractWorkStealingStats.cpp index 6b53c066cf5f730d488edc7dc231236429e59999..64bcaac58ffc1d0faea9891c378eddb8f142b0cd 100644 --- a/emper/strategies/AbstractWorkStealingStats.cpp +++ b/emper/strategies/AbstractWorkStealingStats.cpp @@ -31,6 +31,8 @@ void AbstractWorkStealingStats::print(std::ostream& out) { << std::to_string(comulatedWorkerStats.nextFiberFromHintAnywhere) << std::endl << "total-next-fiber-stolen: " << std::to_string(comulatedWorkerStats.nextFiberStolen) << std::endl + << "total-next-io-fiber-stolen: " << std::to_string(comulatedWorkerStats.nextIoFiberStolen) + << std::endl << "total-next-fiber-from-anywhere-queue: " << std::to_string(comulatedWorkerStats.nextFiberFromAnywhereQueue) << std::endl << "total-fibers-lifted-from-anywhere-queue: " diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.cpp b/emper/strategies/AbstractWorkStealingWorkerStats.cpp index 7e164d336665e4413b2d71ec9b50adafa575e79d..ceaf7b4004d14746d6c662dcef4722a02a63ac09 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.cpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.cpp @@ -13,6 +13,7 @@ auto AbstractWorkStealingWorkerStats::operator+=(const AbstractWorkStealingWorke nextFiberFromHintLocal += other.nextFiberFromHintLocal; nextFiberFromHintAnywhere += other.nextFiberFromHintAnywhere; nextFiberStolen += other.nextFiberStolen; + nextIoFiberStolen += other.nextIoFiberStolen; nextFiberFromAnywhereQueue += other.nextFiberFromAnywhereQueue; totalFibersLiftedFromAnywhereQueue += other.totalFibersLiftedFromAnywhereQueue; diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.hpp b/emper/strategies/AbstractWorkStealingWorkerStats.hpp index 684b1d5966ac5c910f9c14125a002378dea4152a..57052eecffd9448669e17bd5e6108d0cba732946 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.hpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.hpp @@ -20,6 +20,7 @@ class AbstractWorkStealingWorkerStats { uint64_t nextFiberFromHintLocal = 0; uint64_t nextFiberFromHintAnywhere = 0; uint64_t nextFiberStolen = 0; + uint64_t nextIoFiberStolen = 0; uint64_t nextFiberFromAnywhereQueue = 0; uint64_t totalFibersLiftedFromAnywhereQueue = 0; diff --git a/meson.build b/meson.build index ce0f95beadf568a453413c56740467575c32c284..ba626817dbaebef50f73c3e82624104443af324a 100644 --- a/meson.build +++ b/meson.build @@ -89,6 +89,7 @@ if option_io endif io_bool_options = [ + 'stealing', 'single_uring', 'try_syscall', 'uring_sqpoll', diff --git a/meson_options.txt b/meson_options.txt index 8f8d93e79d5260de4f7d0e06326a1ee3e49292e8..cdf195f09e9bcc486ddae7f1588dd573fb982ce9 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -161,3 +161,9 @@ option( choices: ['unchanged', 'normal', 'nice_10', 'nice_19', 'idle'], value: 'unchanged', ) +option( + 'io_stealing', + type: 'boolean', + description: 'Work-Stealing workers will also try to steal IO from other workers', + value: false, +)