diff --git a/emper/Debug.hpp b/emper/Debug.hpp index a1e5c878405cf382f8d57452027975b661ba44ea..507399fc96c6fa6441330b1eb4042123e08091cd 100644 --- a/emper/Debug.hpp +++ b/emper/Debug.hpp @@ -76,6 +76,7 @@ enum class LogSubsystem { DISP, SCHED, RUNTI, + SLEEP_S, U_B_MPSC_Q, IO, }; @@ -109,6 +110,7 @@ class Logger { case LogSubsystem::DISP: case LogSubsystem::SCHED: case LogSubsystem::RUNTI: + case LogSubsystem::SLEEP_S: case LogSubsystem::U_B_MPSC_Q: case LogSubsystem::IO: default: @@ -132,6 +134,8 @@ class Logger { return "SCHED"; case LogSubsystem::RUNTI: return "RUNTI"; + case LogSubsystem::SLEEP_S: + return "SLEEP_S"; case LogSubsystem::U_B_MPSC_Q: return "UBSCQ"; case LogSubsystem::IO: diff --git a/emper/Emper.hpp b/emper/Emper.hpp index 30d9dfca2416fddae8aa088730d5a70108d6d45f..aae699525a2e3b77520e83ba1a4ecad7f3ae092e 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -23,6 +23,13 @@ void statsIncr(C& counter) { } } +template <typename C> +void statsAdd(C& counter, long a) { + if constexpr (STATS) { + counter += a; + } +} + static const bool WORKER_SLEEP = #ifdef EMPER_WORKER_SLEEP true diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 14a68bf8b6d9d144bdf861fa572d93d1a46ba7cd..3fb71dde9ea0baa73b0bc7ea687daac067a96122 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -177,6 +177,8 @@ Runtime::~Runtime() { if constexpr (emper::IO) { emper::io::Stats::printStats(globalIo, ioContexts); } + + workerSleepStrategy.printStats(); } for (unsigned int i = 0; i < workerCount; ++i) { diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index fdd0783acfac5fb3730783ba331cbdfb8ae44625..6b39db70c52ccb42bdff0a222cd02a971b12ef31 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -95,6 +95,10 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { newWorkerHooks.push_back(hook); }; + [[nodiscard]] inline auto getWorkerSleepStrategy() -> WorkerSleepStrategy& { + return workerSleepStrategy; + } + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> inline void wakeupSleepingWorkers() { if constexpr (::emper::WORKER_WAKEUP_STRATEGY == ::emper::WorkerWakeupStrategy::one) { @@ -167,7 +171,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { inline auto getStrategy() -> RuntimeStrategy& { return *strategy; } - inline auto isTerminating() -> bool { return terminateWorkers; } + [[nodiscard]] inline auto isTerminating() const -> bool { return terminateWorkers; } void initiateTermination(); diff --git a/emper/Worker.hpp b/emper/Worker.hpp index f19bfe4e6114e4a6099171d19840559e4a229c29..45f52f291c9a2b76de227dfbcbeb02a72129b0b2 100644 --- a/emper/Worker.hpp +++ b/emper/Worker.hpp @@ -4,9 +4,17 @@ #include "Common.hpp" #include "emper-common.h" +#include "lib/TaggedPtr.hpp" class Runtime; +namespace emper::sleep_strategy { +class PipeSleepStrategy; +} +using emper::sleep_strategy::PipeSleepStrategy; + +using emper::lib::TaggedPtr; + class Worker { private: static thread_local Worker* currentWorker; @@ -15,11 +23,16 @@ class Worker { const workerid_t workerId; - Worker(workerid_t workerId, unsigned int seed) : seed(seed), workerId(workerId) {} + TaggedPtr dispatchHint; + + Worker(workerid_t workerId, unsigned int seed) + : seed(seed), workerId(workerId), dispatchHint(nullptr) {} void setWorker(); friend Runtime; + friend class AbstractWorkStealingScheduler; + friend class PipeSleepStrategy; public: [[nodiscard]] auto getWorkerId() const -> workerid_t { return workerId; } diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index 473f8924ee2c4956e9fad2d54e0d4408ba225ab7..b34c5e128d01b665817879b22b18c7a4261c56d4 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -13,6 +13,7 @@ #include <cerrno> // for errno, ECANCELED, EBUSY, EAGAIN, EINTR #include <cstring> // for memset #include <ostream> // for basic_osteram::operator<<, operator<< +#include <string> #include <utility> #include <vector> @@ -27,13 +28,13 @@ #include "io/GlobalIoContext.hpp" #include "io/Stats.hpp" // for Stats, nanoseconds #include "lib/TaggedPtr.hpp" +#include "sleep_strategy/PipeSleepStrategy.hpp" using emper::lib::TaggedPtr; +using emper::sleep_strategy::PipeSleepStrategy; namespace emper::io { -enum class PointerTags : uint16_t { Future, Callback }; - thread_local IoContext *IoContext::workerIo = nullptr; auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> unsigned { @@ -86,9 +87,9 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns } template <CallerEnvironment callerEnvironment> -auto IoContext::submitPreparedSqes() -> unsigned { +auto IoContext::submitPreparedSqesAndWait(unsigned wait_nr) -> unsigned { // submit the Future to the io_uring - int submitted = io_uring_submit(&ring); + int submitted = io_uring_submit_and_wait(&ring, wait_nr); // Actually I don't know how "unlikely" this is if (unlikely(submitted < 0)) { @@ -122,12 +123,12 @@ auto IoContext::submitPreparedSqes() -> unsigned { } template <CallerEnvironment callerEnvironment> -void IoContext::submit(Future &future) { +void IoContext::submitAndWait(Future &future, unsigned wait_nr) { LOGD("submitting " << future << (future.dependency ? " and it's dependencies" : "")); unsigned prepared = prepareFutureChain(future, 1); // submit the Future to the io_uring - int submitted = submitPreparedSqes<callerEnvironment>(); + int submitted = submitPreparedSqesAndWait<callerEnvironment>(wait_nr); // We submitted some Futures to the io_uring // Because we submit every Future right away multiple prepared sqes can only @@ -182,8 +183,9 @@ void IoContext::submit(Future &future) { } // show the compiler our template incarnations -template void IoContext::submit<CallerEnvironment::EMPER>(Future &future); -template void IoContext::submit<CallerEnvironment::ANYWHERE>(Future &future); +template void IoContext::submitAndWait<CallerEnvironment::EMPER>(Future &future, unsigned wait_nr); +template void IoContext::submitAndWait<CallerEnvironment::ANYWHERE>(Future &future, + unsigned wait_nr); template <CallerEnvironment callerEnvironment> auto IoContext::reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned { @@ -206,7 +208,7 @@ reap_cqes: // never reap completions on the global IoContext assert(this != runtime.globalIo); - LOGD("Reaping completions"); + LOGD("Reaping completions for worker " << std::to_string(worker->getWorkerId())); std::array<struct io_uring_cqe *, CQE_BATCH_COUNT> cqes; // Someone else is currently reaping completions @@ -216,6 +218,14 @@ reap_cqes: return 0; } } else { + // We can not reap completions of this IoContext to not race + // with the sleeping worker. + if (waitInflight.load(std::memory_order_acquire)) { + LOGD("Not reaping worker " << std::to_string(worker->getWorkerId()) + << " since this worker is already waiting for its CQEs"); + return 0; + } + if (!cq_lock.try_lock_or_increment()) { LOGD("Global completer unsuccessful try_lock_or_increment"); return 0; @@ -237,7 +247,7 @@ reap_cqes: uint32_t globalCompleterAttempts = cq_lock.unlock(); - LOGD("got " << count << " cqes from the io_uring"); + LOGD("got " << count << " cqes from worker " << worker->getWorkerId() << "'s io_uring"); if constexpr (emper::DEBUG) { assert(count <= reqs_in_uring); @@ -280,6 +290,20 @@ reap_cqes: auto tag = static_cast<PointerTags>(tptr.getTag()); switch (tag) { + case PointerTags::NewWorkWsq: + case PointerTags::NewWorkAq: { + LOGD("Got new work notification"); + + auto *pipeSleepStrategy = + reinterpret_cast<PipeSleepStrategy *>(&runtime.getWorkerSleepStrategy()); + statsIncr(pipeSleepStrategy->stats->wakeupDueToNotify); + + // Reset flag to indicate that a new sleep cqe must be prepared + // and allow the completer to reap completions again + waitInflight.store(false, std::memory_order_release); + break; + } + case PointerTags::Callback: { auto *callback = tptr.getPtr<Future::CallbackInternal>(); LOGD("Create new callback fiber for " << callback); diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 9c1183197b97b5c250c2602a27f30e6714e3bf3e..841246ecc732fdff1c173e9ef0626bedacc8553a 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -22,6 +22,7 @@ #include "io/Stats.hpp" // for Stats #include "lib/adt/LockedSet.hpp" // for LockedSet +class AbstractWorkStealingScheduler; class Fiber; #ifdef EMPER_IO_CQ_LOCK_COUNTING_TRY_LOCK @@ -43,11 +44,16 @@ using CqLock = emper::lib::sync::PseudoCountingTryLock<emper::lib::sync::SpinLoc #error Uknown cq lock implementation #endif +namespace emper::sleep_strategy { +class PipeSleepStrategy; +} + namespace emper::io { class Future; class IoContext : public Logger<LogSubsystem::IO> { friend class ::Runtime; + friend class ::AbstractWorkStealingScheduler; friend class Future; friend class SendFuture; friend class RecvFuture; @@ -55,6 +61,8 @@ class IoContext : public Logger<LogSubsystem::IO> { // IoContext pointer in GlobalIoContext::globalCompleterFunc friend class GlobalIoContext; + friend class emper::sleep_strategy::PipeSleepStrategy; + protected: // Remember the Runtime which created the IoContext Runtime &runtime; @@ -73,6 +81,13 @@ class IoContext : public Logger<LogSubsystem::IO> { // Remember the worker object for this IoContext Worker *worker; + // Flag to indicate that we already have a sleep related request in the io_uring. + // Gets set by the worker on WaitdfdSleepStrategy::sleep() and + // reset when reaping a completion containing a NewWork{Wsq,Aq} TaggedPtr. + // It also prevents the completer from reaping completions on this IoContext + // to prevent races between the worker and the completer. + std::atomic<bool> waitInflight = false; + Stats stats; // Members useful for debugging @@ -106,13 +121,42 @@ class IoContext : public Logger<LogSubsystem::IO> { */ auto prepareFutureChain(Future &future, unsigned chain_length) -> unsigned; + enum class PointerTags : uint16_t { Future, Callback, NewWorkAq, NewWorkWsq }; + /** * @brief submit prepared sqes possibly reaping completions if CQ is full * + * @param wait_nr The number of cqes which should be awaited before returning from + * io_uring_submit_and_wait + * * @return the number of submitted Futures */ template <CallerEnvironment callerEnvironment> - auto submitPreparedSqes() -> unsigned; + auto submitPreparedSqesAndWait(unsigned wait_nr) -> unsigned; + + /** + * @brief submit prepared sqes possibly reaping completions if CQ is full + * + * @return the number of submitted Futures + */ + template <CallerEnvironment callerEnvironment> + auto submitPreparedSqes() -> unsigned { + return submitPreparedSqesAndWait<callerEnvironment>(0); + } + + /** + * @brief Submit a future for asynchronous completion in this IoContext + * + * This function is not part of the public API of the IoContext because a + * user code should not be able block the worker thread in the IoContext. + * + * @param future The Future which should be completed. The Future object + * must stay valid until it is completed. + * @param wait_nr The number of cqes which should be awaited before returning from + * io_uring_submit_and_wait + */ + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + void submitAndWait(Future &future, unsigned wait_nr); inline void setWorkerIo(Worker *worker) { // remember our worker @@ -171,7 +215,9 @@ class IoContext : public Logger<LogSubsystem::IO> { * The Future object must stay valid until it is completed. */ template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - void submit(Future &future); + void submit(Future &future) { + submitAndWait(future, 0); + } /** * @brief Submit a future for asynchronous completion in this IoContext and reap completions diff --git a/emper/io/Stats.cpp b/emper/io/Stats.cpp index ac39d84153df3a6fa28588193f6f6da5aa0ed471..258179e0105b9600d709fd2dea73c6c466ff4237 100644 --- a/emper/io/Stats.cpp +++ b/emper/io/Stats.cpp @@ -120,7 +120,7 @@ auto operator<<(std::ostream& os, const Stats& s) -> std::ostream& { void Stats::printStats(IoContext* globalIoContext, const std::vector<IoContext*>& workerIoContexts) { if (globalIoContext) { - std::cout << globalIoContext->getStats() << std::endl; + std::cout << globalIoContext->getStats(); } // Use a stats object to calculate the averages diff --git a/emper/lib/TaggedPtr.hpp b/emper/lib/TaggedPtr.hpp index 532082c20730df4a9a2cd8b7a4ad985013f33a4e..9da550e0455d20b3b46054d0af7c63854e9ee58d 100644 --- a/emper/lib/TaggedPtr.hpp +++ b/emper/lib/TaggedPtr.hpp @@ -31,7 +31,7 @@ class TaggedPtr { } TaggedPtr(void* ptr) : tptr(reinterpret_cast<uintptr_t>(ptr)) {} - TaggedPtr(uintptr_t ptr) : tptr(ptr) {} + TaggedPtr(uintptr_t ptr, uint16_t tag = 0) : tptr(ptr) { setTag(tag); } /** * @brief extract the 48-bit the pointer part @@ -45,6 +45,17 @@ class TaggedPtr { return reinterpret_cast<T*>(tptr & (TPTR_POINTER_MASK - 1)); } + /** + * @brief extract the 48-bit the pointer part + * + * @return ptr The value of the actuall pointer part of tptr + */ + [[nodiscard]] inline auto getPtrValue() const -> uintptr_t { + // ignore the least significant bit of the tagged pointer + // NOLINTNEXTLINE(performance-no-int-to-ptr) + return tptr & (TPTR_POINTER_MASK - 1); + } + /** * @brief extract the 16-bit tag part * diff --git a/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp b/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp index f0fbc4af9d8e5a6dfa227c98daed1a25b6684b09..33b7a7ae5774b5a8096b1a5be8029f10c50e7c4d 100644 --- a/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp +++ b/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp @@ -44,5 +44,7 @@ class AbstractWorkerSleepStrategy { } void sleep() { static_cast<T*>(this)->sleep(); } + + inline void printStats() { static_cast<T*>(this)->printStats(); } }; } // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/PipeSleepStrategy.cpp b/emper/sleep_strategy/PipeSleepStrategy.cpp new file mode 100644 index 0000000000000000000000000000000000000000..48ac397c4cb9550d262ccae1d9471f5f04163c76 --- /dev/null +++ b/emper/sleep_strategy/PipeSleepStrategy.cpp @@ -0,0 +1,90 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "sleep_strategy/PipeSleepStrategy.hpp" + +#include <liburing.h> + +#include <atomic> +#include <cassert> +#include <string> + +#include "CallerEnvironment.hpp" +#include "Emper.hpp" +#include "Worker.hpp" +#include "io/IoContext.hpp" +#include "lib/TaggedPtr.hpp" + +using emper::io::IoContext; +using emper::lib::TaggedPtr; + +namespace emper::sleep_strategy { + +template <CallerEnvironment callerEnvironment> +[[nodiscard]] auto PipeSleepStrategy::createHint() -> TaggedPtr { + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + return TaggedPtr((uintptr_t)Worker::getCurrentWorkerId(), + static_cast<uint16_t>(IoContext::PointerTags::NewWorkWsq)); + } else { + return TaggedPtr((uintptr_t) nullptr, static_cast<uint16_t>(IoContext::PointerTags::NewWorkAq)); + } +} + +template auto PipeSleepStrategy::createHint<CallerEnvironment::EMPER>() -> TaggedPtr; +template auto PipeSleepStrategy::createHint<CallerEnvironment::ANYWHERE>() -> TaggedPtr; + +void PipeSleepStrategy::sleep() { + IoContext& io = *IoContext::getWorkerIo(); + + const bool mustPrepareSqe = !io.waitInflight; + if (mustPrepareSqe) { + // increment the sleeper count if it was negative we should skip sleeping + int64_t sleeping = sleepers.fetch_add(1, std::memory_order_acquire); + if (sleeping < 0) { + LOGD("observed sleeper count as: " << sleeping << " -> skip sleeping"); + return; + } + + assert(sleeping <= workerCount); + + struct io_uring_sqe* sqe = io_uring_get_sqe(&io.ring); + assert(sqe); + + // We read directly into the workers dispatchHint + io_uring_prep_read(sqe, sleepFd, &io.worker->dispatchHint, sizeof(io.worker->dispatchHint), 0); + + // Mark the sqe as a new work notification to reset the waitInflight flag when reaping the + // resulting cqe + io_uring_sqe_set_data(sqe, TaggedPtr((uintptr_t) nullptr, + static_cast<uint16_t>(IoContext::PointerTags::NewWorkAq))); + + if constexpr (emper::DEBUG) { + io.reqs_in_uring++; + } + + LOGD("prepared sleepFd read and set sleepers count to: " << sleeping + 1); + + // Before going to sleep prevent the completer from reaping completions + // on our IoContext otherwise we would neither safely wakeup nor reset + // the waitInflight flag. + io.waitInflight.store(true, std::memory_order_acquire); + } + + // Wait for IO completions + LOGD("sleep until IO completions occur"); + io.submitPreparedSqesAndWait<CallerEnvironment::EMPER>(1); + LOGD("wakeup due to available IO completions"); + + statsIncr(stats->wakeup); +} + +auto operator<<(std::ostream& os, const PipeSleepStrategy::Stats& s) -> std::ostream& { + os << "PipeSleepStrategy Stats:" << std::endl; + os << "total-onNewWork: " << std::to_string(s.onNewWork) << std::endl; + os << "total-notify: " << std::to_string(s.notify) << std::endl; + os << "total-notifications: " << std::to_string(s.notifications) << std::endl; + os << "total-skip: " << std::to_string(s.skip) << std::endl; + os << "total-wakeup: " << std::to_string(s.wakeup) << std::endl; + os << "total-wakeup-due-to-notify: " << std::to_string(s.wakeupDueToNotify) << std::endl; + return os; +} +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/PipeSleepStrategy.hpp b/emper/sleep_strategy/PipeSleepStrategy.hpp new file mode 100644 index 0000000000000000000000000000000000000000..5c60b74781a40d98da9773f71970528729528088 --- /dev/null +++ b/emper/sleep_strategy/PipeSleepStrategy.hpp @@ -0,0 +1,296 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#include <unistd.h> + +#include <algorithm> +#include <atomic> +#include <cstddef> +#include <cstdint> +#include <iostream> +#include <stdexcept> +#include <type_traits> +#include <vector> + +#include "CallerEnvironment.hpp" +#include "Common.hpp" +#include "Debug.hpp" +#include "Emper.hpp" +#include "Worker.hpp" +#include "emper-common.h" +#include "lib/TaggedPtr.hpp" +#include "sleep_strategy/AbstractWorkerSleepStrategy.hpp" + +using emper::lib::TaggedPtr; + +namespace emper::io { +class IoContext; +} + +namespace emper::sleep_strategy { + +/** + * @brief A pipe(2) based sleep strategy combining the IO subsystem with the sleep strategy + * + * implement a pipe based sleep strategy using the IO subsystem + * + * Design goals + * ============ + * + * * Wakeup either on external newWork notifications or on local IO completions + * -> Sleep strategy is sound without the IO completer + * * Do as less as possible in a system saturated with work + * * Pass a hint where to find new work to suspended workers + * + * Algorithm + * ========= + * + *``` + * Data: + * Global: + * hint pipe + * sleepers count + * Per worker: + * dispatch hint buffer + * in flight flag + * + * Sleep: + * if we have no sleep request in flight + * Atomic increment sleep count + * Remember that we are sleeping + * Prepare read cqe from the hint pipe to dispatch hint buffer + * Prevent the completer from reaping completions on this worker IoContext + * Wait until IO completions occurred + * + * NotifyEmper(n): + * if observed sleepers <= 0 + * return + * + * // Determine how many we are responsible to wake + * do + * toWakeup = min(observed sleepers, n) + * while (!CAS(sleepers, toWakeup)) + * + * write toWakeup hints to the hint pipe + * + * NotifyAnywhere(n): + * // Ensure all n notifications take effect + * while (!CAS(sleepers, observed sleepers - n)) + * if observed sleeping <= -n + * return + * + * toWakeup = min(observed sleeping, n) + * write toWakeup hints to the hint pipe + * + * onNewWorkCompletion: + * reset in flight flag + * allow completer to reap completions on this IoContext + *``` + * + * Notes + * ===== + * + * * We must decrement the sleepers count on the notifier side to + * prevent multiple notifiers to observe all the same amount of sleepers, + * trying to wake up the same sleepers by writing to the pipe and jamming it up + * with unconsumed hints and thus blocking in the notify write resulting + * in a deadlock. + * * The CAS loops on the notifier side are needed because decrementing + * and incrementing the excess is racy: Two notifier can observe the + * sum of both their excess decrement and increment to much resulting in a + * broken counter. + * * Add the dispatch hint code in AbstractWorkStealingScheduler::nextFiber. + * This allows workers to check the dispatch hint after there + * where no local work to execute. + * This is a trade-off where we trade slower wakeup - a just awoken worker + * will check for local work - against a faster dispatch hot path when + * we have work to do in our local WSQ. + * * The completer tread must not reap completions on the IoContexts of + * sleeping workers because this introduces a race for cqes and a possible + * lost wakeup if the completer consumes the completions before the worker + * is actually waiting for them. + * * When notifying sleeping workers from anywhere we must ensure that all + * notifications take effect. This is needed for example when terminating + * the runtime to prevent sleep attempt from worker thread which are + * about to sleep but have not incremented the sleeper count yet. + * We achieve this by always decrementing the sleeper count by the notification + * count. + */ +class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, + public Logger<LogSubsystem::SLEEP_S> { + friend class emper::io::IoContext; + + class Stats { + public: + std::atomic<size_t> onNewWork = 0; + std::atomic<size_t> notify = 0; + std::atomic<size_t> notifications = 0; + std::atomic<size_t> skip = 0; + std::atomic<size_t> wakeup = 0; + std::atomic<size_t> wakeupDueToNotify = 0; + }; + + Stats* stats; + + workerid_t workerCount; + int sleepFd; + int notifyFd; + + // Make sure the shared counter lives in an exlusive cache line + std::aligned_storage<64, 64>::type sleepers_mem; + std::atomic<int64_t>& sleepers = *reinterpret_cast<std::atomic<int64_t>*>(&sleepers_mem); + + template <CallerEnvironment callerEnvironment> + [[nodiscard]] auto createHint() -> TaggedPtr; + + void writeNotifications(TaggedPtr hint, int64_t count) { + statsAdd(stats->notifications, count); + + std::vector<void*> hints(count, hint); + + ssize_t res = write(notifyFd, hints.data(), sizeof(void*) * hints.size()); + if (unlikely(res) < 0) { + DIE_MSG_ERRNO("writing to notifyFd failed"); + } + } + + void notifyFromEmper(int64_t& sleeping, TaggedPtr hint, int64_t count) { + int64_t toWakeup; + + // We need to decrement the sleeper count on the notification side + // to prevent overcommiting notifications, filling up the pipe which results + // in notify calls to block on write. + // And the CAS loop is needed because we don't want to decrement the sleepers + // count to much. + + // Decrement the sleeper count for each sleeper we are going to wakeup + do { + // No one is sleeping -> we don't need to notify + if (sleeping <= 0) { + return; + } + + // Count of sleepers we notify + toWakeup = std::min(sleeping, count); + } while (!sleepers.compare_exchange_weak(sleeping, sleeping - toWakeup, + std::memory_order_release, std::memory_order_acquire)); + + // increment the notify stat if we actually have to notify someone + statsIncr(stats->notify); + writeNotifications(hint, toWakeup); + + LOGD("notifyFromEmper written " + << toWakeup << " notifications and set sleepers count to: " << sleeping - toWakeup); + } + + void notifyFromAnywhere(int64_t& sleeping, TaggedPtr hint, int64_t count) { + // If we observe nobody sleeping we need to prevent sleep locks when + // everybody is about to sleep but none has incremented the sleepers count yet. + // We prevent the next sleep from blocking by setting the sleeper count to + // the negative amount we want to prevent from sleeping. + + // More general if we notify from anywhere we need all notifications to take + // effect. Only notifying the sleepers we observe may not be enough. + // For example when terminating the runtime we notify all workers if we + // only wakeup the ones we observe sleeping we may never terminate because + // after we notified all we observed new one went to sleep which will never + // be notified. + // To prevent this we make sure that all count notifications take effect + // by making the sleepers count negative and thus preventing at least count + // sleep attempts. + + // Always increment the notify count because notifications from anywhere always + // take effect + statsIncr(stats->notify); + + do { + // We already prevent enough sleep attempts + if (sleeping <= -count) { + LOGD("notifyFromAnywhere sleeper count already preventing enough sleep attempts"); + return; + } + } while (!sleepers.compare_exchange_weak(sleeping, sleeping - count, std::memory_order_release, + std::memory_order_acquire)); + + int64_t toWakeup = std::min(sleeping, count); + if (sleeping > 0) { + writeNotifications(hint, toWakeup); + } + + LOGD("notifyFromAnywhere written " + << toWakeup << " notifications and set sleepers count to: " << sleeping - count); + } + + template <CallerEnvironment callerEnvironment> + void notify(TaggedPtr hint, uint32_t count) { + // The hint must be != nullptr so sleep() knows when to prepare and submit + // a sleepFd read sqe. + + statsIncr(stats->onNewWork); + + int64_t sleeping = getSleeping(); + auto signedCount = static_cast<int64_t>(count); + + if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { + notifyFromAnywhere(sleeping, hint, signedCount); + } else { + notifyFromEmper(sleeping, hint, signedCount); + } + } + + public: + PipeSleepStrategy(workerid_t workerCount) : workerCount(workerCount) { + LOGD("init pipe sleep startegy"); + sleepers.store(0, std::memory_order_relaxed); + + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + int fds[2]; + if (pipe(fds)) { + DIE_MSG_ERRNO("pipe failed"); + } + sleepFd = fds[0]; + notifyFd = fds[1]; + + if constexpr (emper::STATS) { + stats = new Stats(); + } + } + + ~PipeSleepStrategy() { delete stats; } + + void printStats() { std::cout << *stats; } + + [[nodiscard]] inline auto getSleeping() const -> long { return sleepers.load(); } + + template <CallerEnvironment callerEnvironment> + inline void notifyOne() { + notifyMany<callerEnvironment>(1); + } + + template <CallerEnvironment callerEnvironment> + inline void notifyMany(unsigned count) { + if (count > 1) LOGD("notify(hint, " << count << ")"); + notify<callerEnvironment>(createHint<callerEnvironment>(), count); + } + + template <CallerEnvironment callerEnvironment> + inline void notifyAll() { + notifyMany<callerEnvironment>(workerCount); + } + + template <CallerEnvironment callerEnvironment> + inline void notifySpecific(workerid_t /* workerId */) { + throw std::logic_error("Not implemented"); + // // TODO: get pid of specific worker + // pid_t specific = 0; + // notify<callerEnvironment>(createHint<callerEnvironment>(), specific); + } + + void sleep(); + + friend auto operator<<(std::ostream& os, const PipeSleepStrategy::Stats& s) -> std::ostream&; +}; + +auto operator<<(std::ostream& os, const PipeSleepStrategy::Stats& s) -> std::ostream&; +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp index c49439e046481b04986ef4a6f21bb56965cf7240..47349fc66ba90bb3d4f76f6cf18adeb61d158378 100644 --- a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp +++ b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp @@ -185,6 +185,8 @@ class AbstractSemaphoreWorkerSleepStrategy wakeupSem.wait(); } } + + void printStats(){}; }; #ifdef EMPER_LOCKED_WAKEUP_SEMAPHORE diff --git a/emper/sleep_strategy/WorkerSleepStrategy.hpp b/emper/sleep_strategy/WorkerSleepStrategy.hpp index c528052976701e0c43ddb9bb5eb91c60906b30cc..e97af5480111f852c75856091ed4e1052af808f5 100644 --- a/emper/sleep_strategy/WorkerSleepStrategy.hpp +++ b/emper/sleep_strategy/WorkerSleepStrategy.hpp @@ -2,15 +2,22 @@ // Copyright © 2021 Florian Fischer #pragma once -#if defined EMPER_LOCKED_WAKEUP_SEMAPHORE || defined EMPER_POSIX_WAKEUP_SEMAPHORE || \ - defined EMPER_FUTEX_WAKEUP_SEMAPHORE +#ifdef EMPER_SEMAPHORE_SLEEP_STRATEGY #include "sleep_strategy/SemaphoreWorkerSleepStrategy.hpp" +#elif defined EMPER_PIPE_SLEEP_STRATEGY +#include "sleep_strategy/PipeSleepStrategy.hpp" + #else #error Unknown WorkerSleepStrategy implementation #endif namespace emper::sleep_strategy { +#ifdef EMPER_SEMAPHORE_SLEEP_STRATEGY using WorkerSleepStrategy = SemaphoreWorkerSleepStrategy; + +#elif defined EMPER_PIPE_SLEEP_STRATEGY +using WorkerSleepStrategy = PipeSleepStrategy; +#endif } // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/meson.build b/emper/sleep_strategy/meson.build index c1d45fd29de15076e43e9974a412c109be0359af..6f04a329644a4267b47a52412633f16a84af4cff 100644 --- a/emper/sleep_strategy/meson.build +++ b/emper/sleep_strategy/meson.build @@ -1,3 +1,4 @@ emper_cpp_sources += files( 'SemaphoreWorkerSleepStrategy.cpp', + 'PipeSleepStrategy.cpp', ) diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index e354a12688c05703954052e84a8b1571a70b0338..9205a6890ac5ab70c3ee853ac7b5088392c97f11 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -12,13 +12,18 @@ #include "Emper.hpp" // for OVERFLOW_QUEUE #include "Fiber.hpp" #include "NextFiberResult.hpp" -#include "Runtime.hpp" // for Runtime +#include "Runtime.hpp" // for Runtime +#include "Worker.hpp" #include "emper-common.h" // for workerid_t +#include "io/IoContext.hpp" +#include "lib/TaggedPtr.hpp" #include "strategies/AbstractWorkStealingStrategy.hpp" #include "strategies/AbstractWorkStealingWorkerStats.hpp" using awss = AbstractWorkStealingStrategy; +using emper::io::IoContext; + thread_local AbstractWorkStealingScheduler::WsQueue<AbstractWorkStealingScheduler::QUEUE_SIZE> AbstractWorkStealingScheduler::queue; @@ -116,6 +121,22 @@ auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() return std::nullopt; } +auto AbstractWorkStealingScheduler::tryStealFiberFrom(workerid_t victim) + -> std::optional<std::pair<Fiber*, FiberSource>> { + Fiber* fiber; +popTop: + bool poped = queues[victim]->popTop(&fiber); + if (poped) { + emper::statsIncr(awss::stats.nextFiberStolen); + + if (maybeRecycle(fiber)) goto popTop; + + return std::make_pair(fiber, FiberSource::stolen); + } + + return std::nullopt; +} + auto AbstractWorkStealingScheduler::nextFiberViaWorkStealing() -> std::pair<Fiber*, FiberSource> { FiberSource fiberSource = FiberSource::local; Fiber* fiber; @@ -130,6 +151,41 @@ popBottom: goto out; } + // Try dispatch hint possibly set by the sleep strategy + { + auto* const currentWorker = Worker::getCurrentWorker(); + if (currentWorker->dispatchHint) { + const TaggedPtr dispatchHint = currentWorker->dispatchHint; + currentWorker->dispatchHint = nullptr; + const auto tag = static_cast<IoContext::PointerTags>(dispatchHint.getTag()); + + switch (tag) { + case IoContext::PointerTags::NewWorkWsq: { + const auto victim = static_cast<workerid_t>(dispatchHint.getPtrValue()); + const auto stolen = tryStealFiberFrom(victim); + if (stolen) { + fiber = (*stolen).first; + fiberSource = FiberSource::hintWsq; + goto out; + } + } break; + + case IoContext::PointerTags::NewWorkAq: { + const auto fromAnywhere = nextFiberViaAnywhereQueue(); + if (fromAnywhere) { + fiber = (*fromAnywhere).first; + fiberSource = FiberSource::hintAq; + goto out; + } + break; + } + default: + DIE_MSG("invalid dispatch hint"); + } + } + } + + // Go into work stealing { // TODO: Determine if there is a better value than 1/3. const float CHECK_ANYWHERE_QUEUE_AT_PERCENTAGE = 0.33; @@ -146,16 +202,8 @@ popBottom: // Don't steal from ourselves. if (unlikely(victim == myWorkerId)) continue; - popTop: - poped = queues[victim]->popTop(&fiber); - if (poped) { - emper::statsIncr(awss::stats.nextFiberStolen); - - if (maybeRecycle(fiber)) goto popTop; - - fiberSource = FiberSource::stolen; - goto out; - } + auto stolenFiber = tryStealFiberFrom(victim); + if (stolenFiber) return *stolenFiber; // If we failed to steal from a certain number of victims, check // the anywhere queue for new fibers. diff --git a/emper/strategies/AbstractWorkStealingScheduler.hpp b/emper/strategies/AbstractWorkStealingScheduler.hpp index f162da5c0d85e2c73450921a7ab038bd6246f4b4..5960ae2a06832f50a020a16f9d2ddd02a3130a69 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.hpp +++ b/emper/strategies/AbstractWorkStealingScheduler.hpp @@ -9,6 +9,7 @@ #include "NextFiberResult.hpp" #include "Scheduler.hpp" +#include "emper-common.h" #ifdef EMPER_LOCKED_WS_QUEUE #include "lib/adt/LockedQueue.hpp" @@ -33,12 +34,15 @@ class AbstractWorkStealingScheduler : public Scheduler { enum class FiberSource : uintptr_t { local, + hintWsq, + hintAq, stolen, anywhereQueue, }; private: auto nextFiberViaAnywhereQueue() -> std::optional<std::pair<Fiber*, FiberSource>>; + auto tryStealFiberFrom(workerid_t victim) -> std::optional<std::pair<Fiber*, FiberSource>>; protected: WsQueue<QUEUE_SIZE>** queues; @@ -54,4 +58,6 @@ class AbstractWorkStealingScheduler : public Scheduler { public: AbstractWorkStealingScheduler(Runtime& runtime, RuntimeStrategy& strategy); + + friend class Runtime; }; diff --git a/emper/strategies/AbstractWorkStealingStats.cpp b/emper/strategies/AbstractWorkStealingStats.cpp index 1052d9dc174f9a7863a97f90a3260edb58053b00..0e6492a2d096c2391a71386ca07ae51a597b69c7 100644 --- a/emper/strategies/AbstractWorkStealingStats.cpp +++ b/emper/strategies/AbstractWorkStealingStats.cpp @@ -23,6 +23,10 @@ void AbstractWorkStealingStats::print() { << std::to_string(comulatedWorkerStats.scheduledFibersToOverflowQueue) << std::endl << "total-next-fiber-from-local: " << std::to_string(comulatedWorkerStats.nextFiberFromLocal) << std::endl + << "total-next-fiber-hint-local: " + << std::to_string(comulatedWorkerStats.nextFiberFromHintLocal) << std::endl + << "total-next-fiber-hint-anywherequeue: " + << std::to_string(comulatedWorkerStats.nextFiberFromHintAnywhere) << std::endl << "total-next-fiber-stolen: " << std::to_string(comulatedWorkerStats.nextFiberStolen) << std::endl << "total-next-fiber-from-anywhere-queue: " diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.hpp b/emper/strategies/AbstractWorkStealingWorkerStats.hpp index a07af68ccbdcfbdd55b315d945597217b982a4d7..35126d12537fc37fe720601f43663bbb0f2ca655 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.hpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.hpp @@ -16,6 +16,8 @@ class AbstractWorkStealingWorkerStats { uint64_t scheduledFibersToLocal = 0; uint64_t scheduledFibersToOverflowQueue = 0; uint64_t nextFiberFromLocal = 0; + uint64_t nextFiberFromHintLocal = 0; + uint64_t nextFiberFromHintAnywhere = 0; uint64_t nextFiberStolen = 0; uint64_t nextFiberFromAnywhereQueue = 0; diff --git a/emper/strategies/laws/LawsDispatcher.cpp b/emper/strategies/laws/LawsDispatcher.cpp index c99d4d3ffbb146c779f58df57f22a42f77be3557..1cc2164df97f55fe3dc2c2b500a5201383e21bb9 100644 --- a/emper/strategies/laws/LawsDispatcher.cpp +++ b/emper/strategies/laws/LawsDispatcher.cpp @@ -44,6 +44,12 @@ void LawsDispatcher::dispatchLoop() { case LawsStrategy::FiberSource::local: LawsStrategy::stats.dispatchedFibersFromLocal++; break; + case LawsStrategy::FiberSource::hintWsq: + LawsStrategy::stats.dispatchedFibersFromHintLocal++; + break; + case LawsStrategy::FiberSource::hintAq: + LawsStrategy::stats.dispatchedFibersFromHintAnywhere++; + break; case LawsStrategy::FiberSource::stolen: LawsStrategy::stats.dispatchedFibersStolen++; break; diff --git a/emper/strategies/laws/LawsStrategy.hpp b/emper/strategies/laws/LawsStrategy.hpp index 113427d3df2d735d1e717315715fde2c1a6cbb6d..d287b446cfbdf667beb258a4ad0a27c880c8a953 100644 --- a/emper/strategies/laws/LawsStrategy.hpp +++ b/emper/strategies/laws/LawsStrategy.hpp @@ -21,6 +21,8 @@ class LawsStrategy : public AbstractWorkStealingStrategy { private: enum class FiberSource : uintptr_t { local = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::local), + hintWsq = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::hintWsq), + hintAq = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::hintAq), stolen = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::stolen), anywhereQueue = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::anywhereQueue), diff --git a/emper/strategies/laws/LawsStrategyStats.cpp b/emper/strategies/laws/LawsStrategyStats.cpp index 451d112606b1043ad7d5774a7a04ccec21f7ed93..7d718508c7edc42b6e79d1b21596c86a60fc4da9 100644 --- a/emper/strategies/laws/LawsStrategyStats.cpp +++ b/emper/strategies/laws/LawsStrategyStats.cpp @@ -25,6 +25,10 @@ void LawsStrategyStats::print() { << std::to_string(comulatedWorkerStats.dispatchedFibersFromPriority) << std::endl << "total-dispatched-fibers-from-local: " << std::to_string(comulatedWorkerStats.dispatchedFibersFromLocal) << std::endl + << "total-dispatched-fibers-from-hint-local: " + << std::to_string(comulatedWorkerStats.dispatchedFibersFromHintLocal) << std::endl + << "total-dispatched-fibers-from-hint-anywhere: " + << std::to_string(comulatedWorkerStats.dispatchedFibersFromHintAnywhere) << std::endl << "total-dispatched-fibers-stolen: " << std::to_string(comulatedWorkerStats.dispatchedFibersStolen) << std::endl << "total-dispatched-fibers-from-anywhere-queue: " diff --git a/emper/strategies/laws/LawsWorkerStats.hpp b/emper/strategies/laws/LawsWorkerStats.hpp index f60b211d1dbd96dda5319e7f2047a5c96d690e22..8f7f8fccf1b26315216ae66914b08993dbad42c8 100644 --- a/emper/strategies/laws/LawsWorkerStats.hpp +++ b/emper/strategies/laws/LawsWorkerStats.hpp @@ -8,6 +8,8 @@ struct LawsWorkerStats { uint64_t scheduledFibersToPriority = 0; uint64_t dispatchedFibersFromPriority = 0; uint64_t dispatchedFibersFromLocal = 0; + uint64_t dispatchedFibersFromHintLocal = 0; + uint64_t dispatchedFibersFromHintAnywhere = 0; uint64_t dispatchedFibersStolen = 0; uint64_t dispatchedFibersFromAnywhereQueue = 0; @@ -15,6 +17,8 @@ struct LawsWorkerStats { scheduledFibersToPriority += other.scheduledFibersToPriority; dispatchedFibersFromPriority += other.dispatchedFibersFromPriority; dispatchedFibersFromLocal += other.dispatchedFibersFromLocal; + dispatchedFibersFromHintLocal += other.dispatchedFibersFromHintLocal; + dispatchedFibersFromHintAnywhere += other.dispatchedFibersFromHintAnywhere; dispatchedFibersStolen += other.dispatchedFibersStolen; dispatchedFibersFromAnywhereQueue += other.dispatchedFibersFromAnywhereQueue; return *this; diff --git a/meson.build b/meson.build index 50c1674be9b776a3b999c67121f3c7cdf3335129..1256b288d8b3350db70bafc13826ddacfeca5ce6 100644 --- a/meson.build +++ b/meson.build @@ -45,6 +45,15 @@ conf_data.set('EMPER_SET_AFFINITY_ON_BLOCK', get_option('set_affinity_on_block') semaphore_impl = get_option('wakeup_semaphore_implementation') conf_data.set('EMPER_' + semaphore_impl.to_upper() + '_WAKEUP_SEMAPHORE', true) +sleep_stratey = get_option('worker_sleep_strategy') +if sleep_stratey == 'semaphore' + conf_data.set('EMPER_SEMAPHORE_SLEEP_STRATEGY', true) +elif sleep_stratey == 'pipe' + conf_data.set('EMPER_PIPE_SLEEP_STRATEGY', true) +else + error('Unsupported sleep strategy') +endif + locked_unbounded_queue_impl = get_option('locked_unbounded_queue_implementation') if locked_unbounded_queue_impl == 'boost_shared_mutex' if not boost_thread_dep.found() @@ -102,6 +111,7 @@ if io_completer_behavior == 'maybe_wakeup' io_completer_behavior = 'none' endif endif + conf_data.set('EMPER_IO_COMPLETER_BEHAVIOR', io_completer_behavior) subdir('emper') diff --git a/meson_options.txt b/meson_options.txt index 7f6f6c98ea6889cac8e462fa7b525ead26ea3d8f..1a719c0e279865439e58c101403e199edca6c0f4 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -36,10 +36,11 @@ option( value: 'one', ) option( - 'locked_ws_queue', - type: 'boolean', - value: false, - description: 'Use a fully locked queue for work-stealing', + 'worker_sleep_strategy', + type: 'combo', + description: 'The used algorithm to suspend and wakeup workers', + choices: ['semaphore', 'pipe'], + value: 'semaphore', ) option( 'wakeup_semaphore_implementation', @@ -52,6 +53,12 @@ option( value: 'posix', description: 'Semaphore implementation to suspend/wakeup workers', ) +option( + 'locked_ws_queue', + type: 'boolean', + value: false, + description: 'Use a fully locked queue for work-stealing', +) option( 'locked_mpsc_queue', type: 'boolean', @@ -126,7 +133,7 @@ option( 'io_completer_behavior', type: 'combo', description: 'The behaviour of the IO completer thread', - choices: ['schedule', 'maybe_wakeup'], + choices: ['schedule', 'maybe_wakeup', 'none'], value: 'schedule', ) option(