diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index d604ceb826beecd4d3384ede40d6932cc6343ef7..00e6f3e56ea3f9893248131dc90bbea2f2ec7e17 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -77,7 +77,8 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory workers(new Worker*[workerCount]), ioContexts(emper::IO ? workerCount : 0), ioReadySem(0), - randomEngine(seed) { + randomEngine(seed), + workerSleepStrategy(workerCount) { const int nprocs = get_nprocs(); { @@ -210,11 +211,7 @@ auto Runtime::workerLoop(Worker* worker) -> void* { ioReadySem.notify(); ioContexts[worker->workerId] = workerIo; - workerIo->setWorkerIo(); - - if constexpr (emper::STATS) { - workerIo->stats.workerId = worker->workerId; - } + workerIo->setWorkerIo(worker); } LOGD("Worker loop started by thread " << syscall(SYS_gettid)); @@ -292,7 +289,7 @@ auto Runtime::nextFiber() -> NextFiberResult { void Runtime::initiateTermination() { terminateWorkers.store(true, std::memory_order_release); - wakeupSem.notify_many(workerCount); + workerSleepStrategy.notifyAll<CallerEnvironment::ANYWHERE>(); if constexpr (emper::IO && emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) { globalIo->initiateTermination(); diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 993bd24f177af0c553180229190d3ffca27f7d60..0837ec35fcf3861e2a5f823818590e8bab673791 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -15,17 +15,15 @@ #include <vector> // for vector #include "CallerEnvironment.hpp" -#include "Common.hpp" // for ALIGN_TO_CACHE_LINE #include "Context.hpp" #include "Debug.hpp" -#include "Emper.hpp" // for WORKER_NOTIFY #include "NextFiberResult.hpp" #include "Scheduler.hpp" // for Scheduler #include "Worker.hpp" #include "emper-common.h" // for workerid_t #include "lib/sync/Latch.hpp" // for Latch #include "lib/sync/Semaphore.hpp" -#include "lib/sync/WorkerWakeupSemaphore.hpp" +#include "sleep_strategy/WorkerSleepStrategy.hpp" class ContextManager; class Dispatcher; @@ -41,7 +39,7 @@ class IoContext; using emper::io::GlobalIoContext; using emper::io::IoContext; -using emper::lib::sync::WorkerWakeupSemaphore; +using emper::sleep_strategy::WorkerSleepStrategy; class Runtime : public Logger<LogSubsystem::RUNTI> { private: @@ -77,7 +75,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { auto workerLoop(Worker* worker) -> void*; - ALIGN_TO_CACHE_LINE WorkerWakeupSemaphore wakeupSem; + WorkerSleepStrategy workerSleepStrategy; static RuntimeStrategyFactory& DEFAULT_STRATEGY; @@ -92,56 +90,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> inline void wakeupSleepingWorkers() { - WorkerWakeupSemaphore::CounterType skipWakeupThreshold; - if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { - // On external work we always increment the semaphore unless we observe - // that its value is > workerCount. - // If we observe semValue > workerCount we are ensured that some worker will iterate - // its dispatchLoop again and must observe the new work. - skipWakeupThreshold = workerCount; - } else { - // For work from within emper we skip wakeup if we observe no one sleeping. - // This is sound because wakeupSleepingWorkers() is called from a active - // worker which will observe its own new work in its next dispatchLoop before - // going to sleep. - - // Note that sem_getvalue() is allowed to return 0 if there are - // waiting workers, hence we need to set the threshold also to - // 0. This has the disadvantage that we will perform one - // unnecessary sem_post. If we ever switch the wakeupSem - // implementation, then the skipWakeupThreshold value should be - // reviewed and potentially changed to '-1'. - skipWakeupThreshold = 0; - } - - auto semValue = wakeupSem.getValue(); - if (semValue > skipWakeupThreshold) { - return; - } - - if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::one) { - wakeupSem.notify(); - } else if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::all) { - // notify all we observed sleeping - // It is sound to increment the semaphore to much, thus this will only cause - // workers to iterate the dispatchLoop more often before actually sleeping - - // TODO: Switch to c++20 std::counting_semaphore, which has release(std::ptrdiff_t) - - // Reading the manpage explains the function. - // POSIX sem_getvalue is allowed to return 0 or a negative count if there are - // waiters. - // Linux sem_getvalue indeed does return 0 - // To notify all sleeping workers we increment the semaphore once for each worker. - if (semValue == 0) { - semValue = workerCount; - } - - // make sure that the amount to notify is always positive - wakeupSem.notify_many(semValue < 0 ? -semValue : semValue); - } else { - ABORT("Unknown CallerEnvironment"); - } + workerSleepStrategy.notifyOne<callerEnvironment>(); } void maybeTerminateWorker() { @@ -152,7 +101,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { } } - void dispatchLoopSleep() { wakeupSem.wait(); } + void dispatchLoopSleep() { workerSleepStrategy.sleep(); } public: Runtime() : Runtime(getDefaultWorkerCount()) {} @@ -207,6 +156,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { inline auto getStrategy() -> RuntimeStrategy& { return *strategy; } + inline auto isTerminating() -> bool { return terminateWorkers; } + void initiateTermination(); void waitUntilFinished(); diff --git a/emper/io/GlobalIoContext.cpp b/emper/io/GlobalIoContext.cpp index 83799c7829130fab1908727fcfbb6994f33faeec..da8c944d7d1ecc18cecbf498a0ab226a3d81091e 100644 --- a/emper/io/GlobalIoContext.cpp +++ b/emper/io/GlobalIoContext.cpp @@ -16,10 +16,12 @@ #include "Common.hpp" #include "Emper.hpp" #include "Runtime.hpp" +#include "Worker.hpp" #include "io/Future.hpp" #include "io/IoContext.hpp" #include "lib/TaggedPtr.hpp" #include "lib/sync/Semaphore.hpp" +#include "sleep_strategy/WorkerSleepStrategy.hpp" using emper::lib::TaggedPtr; @@ -105,9 +107,10 @@ auto GlobalIoContext::globalCompleterFunc(void* arg) -> void* { assert(submitted == 1); if constexpr (emper::IO_COMPLETER_BEHAVIOR == emper::IoCompleterBehavior::wakeup) { - // Only wakeup sleeping workers and hope that the worker which - // has ready cqes will reap them soon. - globalIoContext->runtime.wakeupSleepingWorkers<CallerEnvironment::ANYWHERE>(); + // Notify the worker which's CQ has a new cqe + workerid_t worker = worker_io->worker->getWorkerId(); + globalIoContext->runtime.workerSleepStrategy.notifySpecific<CallerEnvironment::ANYWHERE>( + worker); } else { worker_io->reapAndScheduleCompletions<CallerEnvironment::ANYWHERE>(); } diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index ee4d78ebfeb1567b8883fedfe0fff24f4c6e37c9..df99e8c177396212fe9fc6df6c49b03ec3b09b89 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -13,8 +13,10 @@ #include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER #include "Common.hpp" -#include "Debug.hpp" // for LogSubsystem, LogSubsystem::IO, Logger -#include "Runtime.hpp" // for Runtime +#include "Debug.hpp" // for LogSubsystem, LogSubsystem::IO, Logger +#include "Emper.hpp" +#include "Runtime.hpp" // for Runtime +#include "Worker.hpp" #include "emper-config.h" // for EMPER_IO_WORKER_URING_ENTRIES #include "io/Stats.hpp" // for Stats #include "lib/adt/LockedSet.hpp" // for LockedSet @@ -67,6 +69,9 @@ class IoContext : public Logger<LogSubsystem::IO> { int notificationEventFd; uint64_t notificationEventFdBuf; + // Remember the worker object for this IoContext + Worker *worker; + Stats stats; // Members useful for debugging @@ -108,7 +113,15 @@ class IoContext : public Logger<LogSubsystem::IO> { template <CallerEnvironment callerEnvironment> auto submitPreparedSqes() -> unsigned; - inline void setWorkerIo() { workerIo = this; } + inline void setWorkerIo(Worker *worker) { + // remember our worker + this->worker = worker; + workerIo = this; + + if (emper::STATS) { + stats.workerId = worker->getWorkerId(); + } + } public: IoContext(Runtime &runtime, size_t uring_entries); diff --git a/emper/lib/sync/WorkerWakeupSemaphore.hpp b/emper/lib/sync/WorkerWakeupSemaphore.hpp deleted file mode 100644 index 357e9a780abef1b6d6150347c2a965ec39ad23c5..0000000000000000000000000000000000000000 --- a/emper/lib/sync/WorkerWakeupSemaphore.hpp +++ /dev/null @@ -1,30 +0,0 @@ -// SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2021 Florian Fischer -#pragma once - -#ifdef EMPER_LOCKED_WAKEUP_SEMAPHORE -#include "lib/sync/LockedSemaphore.hpp" -#endif - -#ifdef EMPER_POSIX_WAKEUP_SEMAPHORE -#include "lib/sync/PosixSemaphore.hpp" -#endif - -#ifdef EMPER_FUTEX_WAKEUP_SEMAPHORE -#include "lib/sync/SpuriousFutexSemaphore.hpp" -#endif - -namespace emper::lib::sync { -#ifdef EMPER_LOCKED_WAKEUP_SEMAPHORE -using WorkerWakeupSemaphore = LockedSemaphore; - -#elif defined EMPER_POSIX_WAKEUP_SEMAPHORE -using WorkerWakeupSemaphore = PosixSemaphore; - -#elif defined EMPER_FUTEX_WAKEUP_SEMAPHORE -using WorkerWakeupSemaphore = SpuriousFutexSemaphore; - -#else -#error Unknown WorkerWakeupSemaphore implementation -#endif -} // namespace emper::lib::sync diff --git a/emper/meson.build b/emper/meson.build index 935099db9b90383a45d33cf28ab47983eab2299b..777e7ca5a341d67cdcbb23181881b79d781bfc7e 100644 --- a/emper/meson.build +++ b/emper/meson.build @@ -40,6 +40,7 @@ emper_library_include += include_directories('.') subdir('io') subdir('lib') subdir('strategies') +subdir('sleep_strategy') emper_all_include = emper_library_include + [emper_include] diff --git a/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp b/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp new file mode 100644 index 0000000000000000000000000000000000000000..9f083f93d4d6bc8418592ef52d40684d60a9e98c --- /dev/null +++ b/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp @@ -0,0 +1,47 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#include <stdexcept> + +#include "CallerEnvironment.hpp" + +namespace emper::sleep_strategy { + +static constexpr bool needsNotifySpecific = + (emper::IO_COMPLETER_BEHAVIOR == emper::IoCompleterBehavior::wakeup); + +template <class T> +class AbstractWorkerSleepStrategy { + [[nodiscard]] inline auto getSleeping() const -> long { + return static_cast<T*>(this)->getValue(); + } + + template <CallerEnvironment callerEnvironment> + inline void notifyMany(unsigned count) { + static_cast<T*>(this)->template notifyMany<callerEnvironment>(count); + } + + template <CallerEnvironment callerEnvironment> + inline void notifyOne() { + static_cast<T*>(this)->template notifyOne<callerEnvironment>(); + } + + template <CallerEnvironment callerEnvironment> + inline void notifyAll() { + static_cast<T*>(this)->template notifyAll<callerEnvironment>(); + } + + template <CallerEnvironment callerEnvironment> + void notifySpecific(workerid_t workerId) { + if constexpr (needsNotifySpecific) { + static_cast<T*>(this)->template notifySpecific<callerEnvironment>(workerId); + } else { + throw std::logic_error( + "Called SemaphoreWorkerSleepStrategy::notifySpecific but needsNotifySpecific is false"); + } + } + + void sleep() { static_cast<T*>(this)->sleep(); } +}; +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.cpp b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ffec6f494f9f620c6ef59780ffddfdc9c68c2271 --- /dev/null +++ b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.cpp @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "sleep_strategy/SemaphoreWorkerSleepStrategy.hpp" + +#include "Runtime.hpp" + +namespace emper::sleep_strategy { + +// This must be excluded from SemaphoreWorkerSleepStrategy.hpp because this +// header is included from Runtime.hpp and therefore we can not easily use Runtime +// function +auto isRuntimeTerminating() -> bool { return Runtime::getRuntime()->isTerminating(); } + +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp new file mode 100644 index 0000000000000000000000000000000000000000..2a81e061b894f5c47aba4643e66208036d74e632 --- /dev/null +++ b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp @@ -0,0 +1,226 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#include <atomic> + +#include "CallerEnvironment.hpp" +#include "Debug.hpp" +#include "Emper.hpp" +#include "Worker.hpp" +#include "emper-common.h" +#include "emper-config.h" +#include "sleep_strategy/AbstractWorkerSleepStrategy.hpp" + +#ifdef EMPER_LOCKED_WAKEUP_SEMAPHORE +#include "lib/sync/LockedSemaphore.hpp" +#endif + +#ifdef EMPER_POSIX_WAKEUP_SEMAPHORE +#include "lib/sync/PosixSemaphore.hpp" +#endif + +#ifdef EMPER_FUTEX_WAKEUP_SEMAPHORE +#include "lib/sync/SpuriousFutexSemaphore.hpp" +#endif + +namespace emper::sleep_strategy { + +// Needed to stop waiting for a worker to change its flag if it may be already +// terminated +auto isRuntimeTerminating() -> bool; + +template <class Sem> +class AbstractSemaphoreWorkerSleepStrategy + : AbstractWorkerSleepStrategy<AbstractSemaphoreWorkerSleepStrategy<Sem>> { + const workerid_t workerCount; + Sem wakeupSem; + + // check if the used Semaphore provides a notifySpecific implementation + static constexpr bool semHasNotifySpecific = requires(Sem s) { s.notifySpecific(0); }; + + // should the generic notifySpecific implementation be used + static constexpr bool useGenericNotifySpecificImpl = !semHasNotifySpecific && needsNotifySpecific; + + // Member used for the generic notifySpecific implementation + std::atomic<bool>* notifiedFlags; + + // Sleep part of the generic notifySpecific implementation + inline void genericNotifySpecificSleep() { + workerid_t workerId = Worker::getCurrentWorkerId(); + auto& notified = notifiedFlags[workerId]; + if (notified.load(std::memory_order_relaxed)) { + notified.store(false, std::memory_order_relaxed); + return; + } + + wakeupSem.wait(); + + if (notified.load(std::memory_order_relaxed)) { + notified.store(false, std::memory_order_relaxed); + } + } + + // Currently we don't have a good sempahore based algorithm to notify a specific + // worker. + // Just brute forcing (notifyAll and hope) is unfortunatly not possbile. + // + // A single worker can consume more tokens from the semaphore than others, + // especially if notifyAll is implemented as `foreach worker: notify()`. + // This could theoretically lead to a "lost-wakeup" for the targeted worker. + // + // 0. Two workers W1, W2 + // IO completer with wakeup behaviour + // inflight IO operation for W1 + // 1. W1 is at the end of its dispatch loop going to sleep + // 2. W2 is sleeping + // 3. Kernel produces a cqe in CQ of W1 + // 4. Completer notifies all workers -> sem has value 2 + // 5. W2 iterates the dispatch loop two times without finding work -> goes to sleep + // 6. W1 calls sleep() with a sem value of 0 -> goes to sleep though it has a cqe in its CQ + // + // The used approach is to keep a notified flag for each worker in the notifiedFlags + // array. Those flags make it possible to notify a specific worker. + // We are sure that the specific worker was successfully notified if we observe + // that its flag was reset. + // Therefore we can notify all worker in a loop until we observe that the + // specific worker has changed its flag or the runtime is terminating. + // + // Unfortunately this introduces significant overhead in the sleep method which is + // only necessary if the runtime actually wants to notify a specific worker. + // This should be configured at compile time using a boolean template parameter of the + // WorkerWakeupStrategy (<bool needsNotifySpecific>). + // + // For now we hardcode a constexpr check for the only condition where we need notifySpecific + template <CallerEnvironment callerEnvironment> + inline void genericNotifySpecific(workerid_t workerId) { + auto& notifiedFlag = notifiedFlags[workerId]; + notifiedFlag.store(true, std::memory_order_relaxed); + // NotifyAll since we observe that the specific worker has reset its flag + while (notifiedFlag.load(std::memory_order_relaxed) && !isRuntimeTerminating()) { + notifyAll<callerEnvironment>(); + } + } + + // The actual semaphore based worker sleep algorithm + template <CallerEnvironment callerEnvironment> + void notifyInternal() { + typename Sem::CounterType skipWakeupThreshold; + if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { + // On external work we always increment the semaphore unless we observe + // that its value is > workerCount. + // If we observe semValue > workerCount we are ensured that some worker will iterate + // its dispatchLoop again and must observe the new work. + skipWakeupThreshold = workerCount; + } else { + // For work from within emper we skip wakeup if we observe no one sleeping. + // This is sound because wakeupSleepingWorkers() is called from a active + // worker which will observe its own new work in its next dispatchLoop before + // going to sleep. + + // Note that sem_getvalue() is allowed to return 0 if there are + // waiting workers, hence we need to set the threshold also to + // 0. This has the disadvantage that we will perform one + // unnecessary sem_post. If we ever switch the wakeupSem + // implementation, then the skipWakeupThreshold value should be + // reviewed and potentially changed to '-1'. + skipWakeupThreshold = 0; + } + + auto semValue = wakeupSem.getValue(); + if (semValue > skipWakeupThreshold) { + return; + } + + if constexpr (::emper::WORKER_WAKEUP_STRATEGY == ::emper::WorkerWakeupStrategy::one) { + wakeupSem.notify(); + } else if constexpr (::emper::WORKER_WAKEUP_STRATEGY == ::emper::WorkerWakeupStrategy::all) { + // notify all we observed sleeping + // It is sound to increment the semaphore to much, thus this will only cause + // workers to iterate the dispatchLoop more often before actually sleeping + + // TODO: Switch to c++20 std::counting_semaphore, which has release(std::ptrdiff_t) + + // Reading the manpage explains the function. + // POSIX sem_getvalue is allowed to return 0 or a negative count if there are + // waiters. + // Linux sem_getvalue indeed does return 0 + // To notify all sleeping workers we increment the semaphore once for each worker. + if (semValue == 0) { + semValue = workerCount; + } + + // make sure that the amount to notify is always positive + wakeupSem.notify_many(semValue < 0 ? -semValue : semValue); + } else { + ABORT("Unknown CallerEnvironment"); + } + } + + public: + AbstractSemaphoreWorkerSleepStrategy(workerid_t workerCount) : workerCount(workerCount) { + if constexpr (useGenericNotifySpecificImpl) { + notifiedFlags = new std::atomic<bool>[workerCount]; + } + } + + ~AbstractSemaphoreWorkerSleepStrategy() { + if constexpr (useGenericNotifySpecificImpl) { + delete[] notifiedFlags; + } + } + + [[nodiscard]] inline auto getSleeping() const -> long { return wakeupSem.getValue(); } + + template <CallerEnvironment callerEnvironment> + inline void notifyOne() { + notifyInternal<callerEnvironment>(); + } + + template <CallerEnvironment callerEnvironment> + inline void notifyMany(unsigned count) { + for (unsigned i = 0; i < count; ++i) { + notifyInternal<callerEnvironment>(); + } + } + + template <CallerEnvironment callerEnvironment> + inline void notifyAll() { + notifyMany<callerEnvironment>(workerCount); + } + + template <CallerEnvironment callerEnvironment> + void notifySpecific(workerid_t workerId) { + if constexpr (semHasNotifySpecific) { + wakeupSem.notifySpecific(workerId); + } else { + genericNotifySpecific<callerEnvironment>(workerId); + } + } + + inline void sleep() { + if constexpr (useGenericNotifySpecificImpl) { + genericNotifySpecificSleep(); + } else { + wakeupSem.wait(); + } + } +}; + +#ifdef EMPER_LOCKED_WAKEUP_SEMAPHORE +using ::emper::lib::sync::LockedSemaphore; +using SemaphoreWorkerSleepStrategy = AbstractSemaphoreWorkerSleepStrategy<LockedSemaphore>; + +#elif defined EMPER_POSIX_WAKEUP_SEMAPHORE +using ::emper::lib::sync::PosixSemaphore; +using SemaphoreWorkerSleepStrategy = AbstractSemaphoreWorkerSleepStrategy<PosixSemaphore>; + +#elif defined EMPER_FUTEX_WAKEUP_SEMAPHORE +using ::emper::lib::sync::SpuriousFutexSemaphore; +using SemaphoreWorkerSleepStrategy = AbstractSemaphoreWorkerSleepStrategy<SpuriousFutexSemaphore>; + +#else +#error Unknown WorkerSleepSemaphore implementation +#endif + +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/WorkerSleepStrategy.hpp b/emper/sleep_strategy/WorkerSleepStrategy.hpp new file mode 100644 index 0000000000000000000000000000000000000000..c528052976701e0c43ddb9bb5eb91c60906b30cc --- /dev/null +++ b/emper/sleep_strategy/WorkerSleepStrategy.hpp @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#if defined EMPER_LOCKED_WAKEUP_SEMAPHORE || defined EMPER_POSIX_WAKEUP_SEMAPHORE || \ + defined EMPER_FUTEX_WAKEUP_SEMAPHORE +#include "sleep_strategy/SemaphoreWorkerSleepStrategy.hpp" + +#else +#error Unknown WorkerSleepStrategy implementation +#endif + +namespace emper::sleep_strategy { + +using WorkerSleepStrategy = SemaphoreWorkerSleepStrategy; +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/meson.build b/emper/sleep_strategy/meson.build new file mode 100644 index 0000000000000000000000000000000000000000..c1d45fd29de15076e43e9974a412c109be0359af --- /dev/null +++ b/emper/sleep_strategy/meson.build @@ -0,0 +1,3 @@ +emper_cpp_sources += files( + 'SemaphoreWorkerSleepStrategy.cpp', +)