diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 8798bc84e5336a2c43496b90f2feac2f2343c3c5..44e37b732b704bd210db89b3ddfcfc406c6473b2 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -78,7 +78,7 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory ioContexts(emper::IO ? workerCount : 0), ioReadySem(0), randomEngine(seed), - workerSleepStrategy(workerCount) { + workerSleepStrategy(*this) { const int nprocs = get_nprocs(); { diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 0837ec35fcf3861e2a5f823818590e8bab673791..142e6f7beaab75c68f2365d6892c7fb4dbbae08a 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -41,6 +41,10 @@ using emper::io::GlobalIoContext; using emper::io::IoContext; using emper::sleep_strategy::WorkerSleepStrategy; +namespace emper::sleep_strategy { +class LinkedQueueIoStrategy; +} + class Runtime : public Logger<LogSubsystem::RUNTI> { private: static std::mutex currentRuntimeMutex; @@ -179,4 +183,5 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { friend class MemoryManager; template <typename> friend class WorkerLocalData; + friend class emper::sleep_strategy::LinkedQueueIoStrategy; }; diff --git a/emper/lib/TaggedPtr.hpp b/emper/lib/TaggedPtr.hpp index fcbeb1d3721b3593e273616755fa9cc86e1a9eb7..1b6acb55a923befb83a802fe4f17d58b7235b053 100644 --- a/emper/lib/TaggedPtr.hpp +++ b/emper/lib/TaggedPtr.hpp @@ -23,6 +23,8 @@ class TaggedPtr { uintptr_t tptr = 0; public: + TaggedPtr() = default; + template <typename T> TaggedPtr(T* ptr, uint16_t tag = 0, bool marked = false) : tptr(reinterpret_cast<uintptr_t>(ptr)) { @@ -30,9 +32,13 @@ class TaggedPtr { setMark(marked); } - TaggedPtr(void* ptr) : tptr(reinterpret_cast<uintptr_t>(ptr)) {} TaggedPtr(uintptr_t ptr) : tptr(ptr) {} + TaggedPtr(void* ptr, uint16_t tag) : tptr(reinterpret_cast<uintptr_t>(ptr)) { + setTag(tag); + } + TaggedPtr(void* ptr) : tptr(reinterpret_cast<uintptr_t>(ptr)) {} + /** * @brief extract the 48-bit the pointer part * diff --git a/emper/sleep_strategy/LinkedQueueIoStrategy.cpp b/emper/sleep_strategy/LinkedQueueIoStrategy.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5bfddf91ce444ba32e3b9670ea84de4122a34f12 --- /dev/null +++ b/emper/sleep_strategy/LinkedQueueIoStrategy.cpp @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "sleep_strategy/LinkedQueueIoStrategy.hpp" + +#include "Runtime.hpp" + +namespace emper::sleep_strategy { + +LinkedQueueIoStrategy::LinkedQueueIoStrategy(Runtime& runtime) + : sleepQueueEntries(runtime.getWorkerCount()), notifyEfds(new int[runtime.getWorkerCount()]) { + unsigned i = 0; + for (auto& sleepQueueEntry : sleepQueueEntries) { + sleepQueueEntry.workerId = i++; + } + + int* efds = this->notifyEfds; + auto newWorkerHook = [efds](workerid_t workerId) { + int efd = eventfd(0, EFD_SEMAPHORE); + if (efd == -1) { + DIE_MSG("creating eventdf failed"); + } + efds[workerId] = efd; + }; + runtime.addNewWorkerHook(newWorkerHook); +} + +// This get hopefully inlined by LTO +auto LinkedQueueIoStrategy::getWorkerId() -> workerid_t { return Runtime::getWorkerId(); } +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/LinkedQueueIoStrategy.hpp b/emper/sleep_strategy/LinkedQueueIoStrategy.hpp new file mode 100644 index 0000000000000000000000000000000000000000..4e9336deec920f0fbae26685f49a1eeb8c7ea5da --- /dev/null +++ b/emper/sleep_strategy/LinkedQueueIoStrategy.hpp @@ -0,0 +1,237 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once +#include <sys/eventfd.h> +#include <unistd.h> + +#include <atomic> +#include <cassert> + +#include "CallerEnvironment.hpp" +#include "Debug.hpp" +#include "Emper.hpp" +#include "Worker.hpp" +#include "emper-common.h" +#include "emper-config.h" +#include "lib/TaggedPtr.hpp" +#include "sleep_strategy/AbstractWorkerSleepStrategy.hpp" + +using emper::lib::TaggedPtr; + +class Runtime; + +namespace emper::sleep_strategy { + +class LinkedQueueIoStrategy : AbstractWorkerSleepStrategy<LinkedQueueIoStrategy> { + class SleepQueueEntry { + friend class LinkedQueueIoStrategy; + workerid_t workerId; + SleepQueueEntry* next; + }; + + std::vector<SleepQueueEntry> sleepQueueEntries; + SleepQueueEntry skipSleepEntry; + + int* notifyEfds; + + std::atomic<TaggedPtr> sleeperQueue; + + enum class SleeperTags : uint16_t { Worker, SkipSleep }; + + auto markSkipSleep() -> bool { + TaggedPtr head = sleeperQueue.load(std::memory_order_relaxed); + // Queue is observed not empty + if (head) { + return false; + } + + TaggedPtr mark = TaggedPtr(&skipSleepEntry, head.getTag() + 1); + return sleeperQueue.compare_exchange_strong(head, mark, std::memory_order_acq_rel, + std::memory_order_relaxed); + } + + static auto getWorkerId() -> workerid_t; + + void wakeup(SleepQueueEntry* sleeper) { + int efd = notifyEfds[sleeper->workerId]; + + uint64_t buf = 1; + int res = write(efd, &buf, sizeof(buf)); + if (unlikely(res != sizeof(buf))) { + DIE_MSG("writing eventfd failed"); + } + } + + auto popOne() -> SleepQueueEntry* { + for (;;) { + TaggedPtr head = sleeperQueue.load(std::memory_order_relaxed); + if (!head) { + // queue observed empty + return nullptr; + } + + TaggedPtr newHead = TaggedPtr(head.getPtr<SleepQueueEntry>(), head.getTag() + 1); + if (sleeperQueue.compare_exchange_weak(head, newHead, std::memory_order_acq_rel, + std::memory_order_relaxed)) { + SleepQueueEntry* sleeper = head.getPtr<SleepQueueEntry>(); + + // If we are going to wakeup a sleeper we are sure it is no longer in + // the sleeper queue. + // By setting the next pointer to the nullptr we signal the worker that it + // is no longer in the sleeperQueue. + // A worker observing that it is still in the queue and goes to sleep + // is no problem because we will wake it up by a write to its eventfd. + sleeper->next = nullptr; + return sleeper; + } + } + } + + auto popAll() -> SleepQueueEntry* { + for (;;) { + TaggedPtr head = sleeperQueue.load(std::memory_order_relaxed); + // queue observed empty + if (!head) { + return nullptr; + } + + TaggedPtr newHead = TaggedPtr(nullptr, head.getTag() + 1); + if (sleeperQueue.compare_exchange_weak(head, newHead, std::memory_order_acq_rel, + std::memory_order_relaxed)) { + return head.getPtr<SleepQueueEntry>(); + } + } + } + + // Register our sleepQueueEntries as sleeping or pop a skipSleep mark + auto registerSleeping(SleepQueueEntry* sleeper) -> bool { + for (;;) { + TaggedPtr head = sleeperQueue.load(std::memory_order_relaxed); + TaggedPtr nextHead; + SleepQueueEntry* headSleeper = head.getPtr<SleepQueueEntry>(); + + // we should skip sleeping -> pop the sleep mark + if (headSleeper == &skipSleepEntry) { + nextHead = TaggedPtr(headSleeper->next, head.getTag() + 1); + // register our self as sleeping + } else { + assert(sleeper != headSleeper); + sleeper->next = headSleeper; + nextHead = TaggedPtr(sleeper, head.getTag() + 1); + } + + if (sleeperQueue.compare_exchange_weak(head, nextHead, std::memory_order_acq_rel, + std::memory_order_relaxed)) { + return headSleeper != &skipSleepEntry; + } + } + } + + void preventSleepLock() { + bool marked = markSkipSleep(); + // we marked the queue so at least one will be still running and we are done + if (marked) { + return; + } + + // We could not mark the queue this means someone wen to sleep so we wake that one up. + // It is totally fine if we can't find a sleeper because this means that someone else + // must have awoken it. And the awoken worker is running and will keep the Runtime alive. + SleepQueueEntry* sleeper = popOne(); + if (sleeper) { + wakeup(sleeper); + } + } + + public: + LinkedQueueIoStrategy(Runtime& runtime); + + ~LinkedQueueIoStrategy() { delete[] notifyEfds; } + + [[nodiscard]] inline auto getSleeping() const -> long { + TaggedPtr head = sleeperQueue.load(std::memory_order_relaxed); + SleepQueueEntry* sleeper = head.getPtr<SleepQueueEntry>(); + if (!sleeper) { + return 0; + } + + long sleeping = 0; + while (sleeper) { + sleeping++; + sleeper = sleeper->next; + } + + return sleeping; + } + + template <CallerEnvironment callerEnvironment> + inline void notifyOne() { + SleepQueueEntry* sleeper = popOne(); + if (sleeper) { + wakeup(sleeper); + return; + } + + // If we are in the EMPER there is at least one worker running: this one. + // Therefore we don't have to prevent situations where all workers are going to sleep. + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + return; + } + + // If we wakeup from anywhere we have to ensure that if we could not wakeup + // a worker that there will be at least on worker not sleeping. + // The situation that all workers are going to sleep but we did not observe one yet + // is theoretical possible. + // To ensure that at least one worker will be awake we enqueue the skipSleepEntry. + preventSleepLock(); + } + + template <CallerEnvironment callerEnvironment> + inline void notifyMany(unsigned count) { + for (unsigned i = 0; i < count; ++i) { + notifyOne<callerEnvironment>(); + } + } + + template <CallerEnvironment callerEnvironment> + inline void notifyAll() { + SleepQueueEntry* sleeper = popAll(); + if (!sleeper) { + if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { + preventSleepLock(); + } + return; + } + + while (sleeper) { + // TODO: this could be vectorized to reduce syscall costs + wakeup(sleeper); + sleeper = sleeper->next; + } + } + + template <CallerEnvironment callerEnvironment> + void notifySpecific(workerid_t workerId) { + wakeup(&sleepQueueEntries[workerId]); + } + + inline void sleep() { + workerid_t workerId = getWorkerId(); + SleepQueueEntry* entry = &sleepQueueEntries[workerId]; + + // We are not in the queue but we found a skipSleep mark during sleep + // registration -> skip sleeping + if (entry->next && !registerSleeping(entry)) { + return; + } + + // TODO: use the worker's IoContext + int efd = notifyEfds[workerId]; + uint64_t buf; + int res = read(efd, &buf, sizeof(buf)); + if (unlikely(res != sizeof(buf))) { + DIE_MSG("reading eventfd failed"); + } + } +}; +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.cpp b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.cpp index ffec6f494f9f620c6ef59780ffddfdc9c68c2271..9448248e323cb632794e1acbc4dd153428fdfe89 100644 --- a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.cpp +++ b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.cpp @@ -9,6 +9,13 @@ namespace emper::sleep_strategy { // This must be excluded from SemaphoreWorkerSleepStrategy.hpp because this // header is included from Runtime.hpp and therefore we can not easily use Runtime // function -auto isRuntimeTerminating() -> bool { return Runtime::getRuntime()->isTerminating(); } +template <> +auto SemaphoreWorkerSleepStrategy::isRuntimeTerminating() -> bool { + return runtime.isTerminating(); +} +template <> +auto SemaphoreWorkerSleepStrategy::getWorkerCount() -> workerid_t { + return runtime.getWorkerCount(); +} } // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp index 2a81e061b894f5c47aba4643e66208036d74e632..a9d40483ce309e020711d2eb7c6882094449c5d1 100644 --- a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp +++ b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp @@ -26,14 +26,11 @@ namespace emper::sleep_strategy { -// Needed to stop waiting for a worker to change its flag if it may be already -// terminated -auto isRuntimeTerminating() -> bool; - template <class Sem> class AbstractSemaphoreWorkerSleepStrategy : AbstractWorkerSleepStrategy<AbstractSemaphoreWorkerSleepStrategy<Sem>> { - const workerid_t workerCount; + Runtime& runtime; + workerid_t workerCount; Sem wakeupSem; // check if the used Semaphore provides a notifySpecific implementation @@ -45,6 +42,12 @@ class AbstractSemaphoreWorkerSleepStrategy // Member used for the generic notifySpecific implementation std::atomic<bool>* notifiedFlags; + auto getWorkerCount() -> workerid_t; + + // Needed to stop waiting for a worker to change its flag if it may be already + // terminated + auto isRuntimeTerminating() -> bool; + // Sleep part of the generic notifySpecific implementation inline void genericNotifySpecificSleep() { workerid_t workerId = Worker::getCurrentWorkerId(); @@ -158,7 +161,8 @@ class AbstractSemaphoreWorkerSleepStrategy } public: - AbstractSemaphoreWorkerSleepStrategy(workerid_t workerCount) : workerCount(workerCount) { + AbstractSemaphoreWorkerSleepStrategy(Runtime& runtime) : runtime(runtime) { + workerCount = getWorkerCount(); if constexpr (useGenericNotifySpecificImpl) { notifiedFlags = new std::atomic<bool>[workerCount]; } diff --git a/emper/sleep_strategy/WorkerSleepStrategy.hpp b/emper/sleep_strategy/WorkerSleepStrategy.hpp index c528052976701e0c43ddb9bb5eb91c60906b30cc..93080a53045ab4f38752920402961fe13857ce16 100644 --- a/emper/sleep_strategy/WorkerSleepStrategy.hpp +++ b/emper/sleep_strategy/WorkerSleepStrategy.hpp @@ -2,15 +2,25 @@ // Copyright © 2021 Florian Fischer #pragma once -#if defined EMPER_LOCKED_WAKEUP_SEMAPHORE || defined EMPER_POSIX_WAKEUP_SEMAPHORE || \ - defined EMPER_FUTEX_WAKEUP_SEMAPHORE +#include "emper-config.h" + +#if defined EMPER_WORKER_SLEEP_STRATEGY_SEMAPHORE #include "sleep_strategy/SemaphoreWorkerSleepStrategy.hpp" +#elif defined EMPER_WORKER_SLEEP_STRATEGY_LINKED_QUEUE_IO +#include "sleep_strategy/LinkedQueueIoStrategy.hpp" + #else #error Unknown WorkerSleepStrategy implementation #endif namespace emper::sleep_strategy { - +#if defined EMPER_WORKER_SLEEP_STRATEGY_SEMAPHORE using WorkerSleepStrategy = SemaphoreWorkerSleepStrategy; + +#elif defined EMPER_WORKER_SLEEP_STRATEGY_LINKED_QUEUE_IO +using WorkerSleepStrategy = LinkedQueueIoStrategy; + +#endif + } // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/meson.build b/emper/sleep_strategy/meson.build index c1d45fd29de15076e43e9974a412c109be0359af..4fd8228fcbc67f35bcd6ee4d93bb1e92a4f420a3 100644 --- a/emper/sleep_strategy/meson.build +++ b/emper/sleep_strategy/meson.build @@ -1,3 +1,4 @@ emper_cpp_sources += files( 'SemaphoreWorkerSleepStrategy.cpp', + 'LinkedQueueIoStrategy.cpp', ) diff --git a/meson.build b/meson.build index 276a7c4e1dd24563e7229d6aa5db5d999d2c9cb5..c76434e55e4da3547c9981720da9f1e39ced64c1 100644 --- a/meson.build +++ b/meson.build @@ -41,6 +41,9 @@ conf_data.set('EMPER_STATS', get_option('stats')) conf_data.set('EMPER_OVERFLOW_QUEUE', get_option('overflow_queue')) conf_data.set('EMPER_BLOCKED_CONTEXT_SET', get_option('blocked_context_set')) +worker_sleep_strategy = get_option('worker_sleep_strategy') +conf_data.set('EMPER_WORKER_SLEEP_STRATEGY_' + worker_sleep_strategy.to_upper(), true) + semaphore_impl = get_option('wakeup_semaphore_implementation') conf_data.set('EMPER_' + semaphore_impl.to_upper() + '_WAKEUP_SEMAPHORE', true) diff --git a/meson_options.txt b/meson_options.txt index ac1f842cd90c82f09a9b80c5cd3538b34b724f9b..2ecf0f390a98b64999b1809870a13f3beb2a65bf 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -28,6 +28,13 @@ option( value: true, description: 'Enable sleeping worker support', ) +option( + 'worker_sleep_strategy', + type: 'combo', + choices: ['semaphore', 'linked_queue_io'], + value: 'semaphore', + description: 'The used worker sleep algorithm', +) option( 'worker_wakeup_strategy', type: 'combo',