diff --git a/emper/Dispatcher.cpp b/emper/Dispatcher.cpp index 5d05163dcbe67987b6e9cf6868117d4f7db6ac1e..810e399f4eb9d9ab72a1f13f90429a8f7a8669ae 100644 --- a/emper/Dispatcher.cpp +++ b/emper/Dispatcher.cpp @@ -11,7 +11,7 @@ auto Dispatcher::getDispatchLoop() -> func_t { return [this] { dispatchLoop(); }; } -void Dispatcher::putRuntimeWorkerToSleep() { runtime.dispatcherLoopSleep(); } +void Dispatcher::putRuntimeWorkerToSleep() { runtime.dispatchLoopSleep(); } void Dispatcher::dispatchLoopDoSleep() { if constexpr (emper::WORKER_SLEEP) { diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 17ef6b9b0982a8d6921439403872736090a32f6c..ac89eb76538b1e0c490938f01cfb74c2308f856d 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -3,6 +3,7 @@ #include "Runtime.hpp" #include <pthread.h> // for pthread_t, pthread_attr_init +#include <semaphore.h> #include <cerrno> // for errno // Non portable. @@ -72,9 +73,7 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory contextManager(*(new ContextManager(*this))), threads(new pthread_t[workerCount]), workers(new Worker*[workerCount]), - randomEngine(seed), - sleepingWorkers(0), - skipSleep(false) { + randomEngine(seed) { const int nprocs = get_nprocs(); { @@ -85,6 +84,12 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory currentRuntime = this; } + // initialize the wakeup semaphore + int err = sem_init(&wakeupSem, 0, 0); + if (err) { + DIE_MSG_ERRNO("initializing wakeup semaphore failed"); + } + // initialize the global and all worker IoContexts if constexpr (emper::IO) { // The global io_uring needs at least workerCount entries in its SQ because diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 92487a336c3219860a1dafce3182c955df6113cd..f52a5843ff072319d0507850803449c6fafcbad7 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -3,18 +3,18 @@ #pragma once #include <pthread.h> // for pthread_t +#include <semaphore.h> -#include <atomic> // for atomic, memory_order_relaxed -#include <cassert> // for assert -#include <condition_variable> // for condition_variable -#include <cstdint> // for intptr_t -#include <cstdlib> // for abort -#include <functional> // for function -#include <mutex> // for mutex, lock_guard, unique_lock +#include <cassert> // for assert +#include <cstdint> // for intptr_t +#include <cstdlib> // for abort +#include <functional> // for function +#include <mutex> // for mutex, lock_guard, unique_lock #include <random> #include <thread> // for thread #include <vector> // for vector +#include "CallerEnvironment.hpp" #include "Common.hpp" // for ALIGN_TO_CACHE_LINE #include "Debug.hpp" // for LogSubsystem, LogSubsystem::RUNTI, Logger #include "Emper.hpp" // for WORKER_NOTIFY @@ -36,11 +36,6 @@ class IoContext; using emper::io::IoContext; -enum WakeupMode { - IF_SLEEPING_OBSERVED, - ALWAYS, -}; - class Runtime : public Logger<LogSubsystem::RUNTI> { private: static std::mutex currentRuntimeMutex; @@ -66,10 +61,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { auto workerLoop(Worker* worker) -> void*; - ALIGN_TO_CACHE_LINE std::mutex workerSleepMutex; - std::condition_variable workerSleepConditionVariable; - ALIGN_TO_CACHE_LINE std::atomic<unsigned long> sleepingWorkers; - bool skipSleep; + ALIGN_TO_CACHE_LINE sem_t wakeupSem; static RuntimeStrategyFactory& DEFAULT_STRATEGY; @@ -78,51 +70,57 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { protected: void addNewWorkerHook(const std::function<void(void)>& hook) { newWorkerHooks.push_back(hook); }; - template <WakeupMode wakeupMode = WakeupMode::IF_SLEEPING_OBSERVED> + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> inline void wakeupSleepingWorkers() { - if constexpr (wakeupMode == WakeupMode::IF_SLEEPING_OBSERVED) { - // If we observe no worker sleeping, then we do not try to - // attempt to wakeup one. Note that this is racy, i.e., just - // because we do not see any worker sleeping, does not mean that - // one (or all but us, as a matter of a fact) is about to - // sleep. However, in the worst case, this causes a worker to - // sleep longer, until this worker (or another active worker) - // schedules another fiber, when this method will be called - // again. And then it is likely that the scheduling worker will - // observe the sleeping worker. - if (!sleepingWorkers.load(std::memory_order_relaxed)) { - return; - } - } - - std::lock_guard<std::mutex> lk(workerSleepMutex); - skipSleep = true; - if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::all) { - sleepingWorkers.store(0, std::memory_order_relaxed); - workerSleepConditionVariable.notify_all(); - } else if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::one) { - sleepingWorkers.fetch_sub(1, std::memory_order_relaxed); - workerSleepConditionVariable.notify_one(); + int skipWakeupThreshold; + if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { + // On external work we always increment the semaphore unless we observe + // that its value is > workerCount. + // If we observe semValue > workerCount we are ensured that some worker will iterate + // its dispatchLoop again and must observe the new work. + skipWakeupThreshold = workerCount; } else { - // Unknown worker wakeup strategy. - abort(); + // For work from within emper we skip wakeup if we observe no one sleeping. + // This is sound because wakeupSleepingWorkers() is called from a active + // worker which will observe its own new work in its next dispatchLoop before + // going to sleep. + skipWakeupThreshold = -1; } - } - void dispatcherLoopSleep() { - if (skipSleep) { - skipSleep = false; + int semValue; + sem_getvalue(&wakeupSem, &semValue); + if (semValue > skipWakeupThreshold) { return; } - std::unique_lock<std::mutex> lk(workerSleepMutex); - // Check again if "skip sleep" has been set. - if (skipSleep) return; + if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::one) { + sem_post(&wakeupSem); + } else if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::all) { + // notify all we observed sleeping + // It is sound to increment the semaphore to much, thus this will only cause + // workers to iterate the dispatchLoop more often before actually sleeping + + // TODO: Switch to c++20 std::counting_semaphore, which has release(std::ptrdiff_t) + + // Reading the manpage explains the function. + // POSIX sem_getvalue is allowed to return 0 or a negative count if there are + // waiters. + // Linux sem_getvalue indeed does return 0 + // To notify all sleeping workers we increment the semaphore once for each worker. + if (semValue == 0) { + semValue = -workerCount; + } - sleepingWorkers.fetch_add(1, std::memory_order_relaxed); - workerSleepConditionVariable.wait(lk); + for (; semValue < 0; ++semValue) { + sem_post(&wakeupSem); + } + } else { + ABORT("Unknown CallerEnvironment"); + } } + void dispatchLoopSleep() { sem_wait(&wakeupSem); } + inline auto getGlobalIo() -> IoContext* { if constexpr (emper::IO) { return globalIo; diff --git a/emper/Scheduler.cpp b/emper/Scheduler.cpp index ab8289054837e674795ea625743f096486da79ad..8db2ea0a5bb50ce1c8e8f3e2a46e96d3b912c374 100644 --- a/emper/Scheduler.cpp +++ b/emper/Scheduler.cpp @@ -2,6 +2,7 @@ // Copyright © 2020 Florian Schmaus #include "Scheduler.hpp" +#include "CallerEnvironment.hpp" #include "Runtime.hpp" Scheduler::Scheduler(Runtime& runtime) : runtime(runtime) {} @@ -10,10 +11,11 @@ void Scheduler::addNewWorkerHook(const std::function<void(void)>& hook) { runtime.addNewWorkerHook(hook); } -void Scheduler::wakeupSleepingWorkersIfSleepingObserved() { - runtime.wakeupSleepingWorkers<WakeupMode::IF_SLEEPING_OBSERVED>(); +template <CallerEnvironment callerEnvironment> +void Scheduler::wakeupSleepingWorkers() { + runtime.wakeupSleepingWorkers<callerEnvironment>(); } -void Scheduler::wakeupSleepingWorkersAlways() { - runtime.wakeupSleepingWorkers<WakeupMode::ALWAYS>(); -} +// show the compiler our template incarnations +template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::EMPER>(); +template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::ANYWHERE>(); diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index eea2ff98c81d7fe95097ee5485a19b53515140bc..f49308eda4118e14f1c7147103afe05fb8ed7d39 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -18,9 +18,8 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { private: lib::adt::LockedUnboundedQueue<Fiber> scheduleAnywhereQueue; - void wakeupSleepingWorkersIfSleepingObserved(); - - void wakeupSleepingWorkersAlways(); + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + void wakeupSleepingWorkers(); protected: Runtime& runtime; @@ -37,11 +36,7 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> inline void onNewWork() { if constexpr (emper::WORKER_SLEEP) { - if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - wakeupSleepingWorkersIfSleepingObserved(); - } else { - wakeupSleepingWorkersAlways(); - } + wakeupSleepingWorkers<callerEnvironment>(); } }