From dfb4ba0e4f097b6cf614fe94171a26da1447c33f Mon Sep 17 00:00:00 2001 From: Florian Fischer <florian.fl.fischer@fau.de> Date: Tue, 16 Feb 2021 12:32:03 +0100 Subject: [PATCH] [Runtime] use a POSIX semaphore to suspend/wakeup workers To prevent deadlocks where all workers are going to sleep and new work arrives from without emper we always increment the wakeup semaphore unless we observe its value as > worker count. If the semaphore value is bigger than worker count it is guarantied that at least on worker will not block and will iterate a second time in its dispatchLoop observing the new work. --- emper/Dispatcher.cpp | 2 +- emper/Runtime.cpp | 11 +++-- emper/Runtime.hpp | 100 +++++++++++++++++++++---------------------- emper/Scheduler.cpp | 12 +++--- emper/Scheduler.hpp | 11 ++--- 5 files changed, 68 insertions(+), 68 deletions(-) diff --git a/emper/Dispatcher.cpp b/emper/Dispatcher.cpp index 5d05163d..810e399f 100644 --- a/emper/Dispatcher.cpp +++ b/emper/Dispatcher.cpp @@ -11,7 +11,7 @@ auto Dispatcher::getDispatchLoop() -> func_t { return [this] { dispatchLoop(); }; } -void Dispatcher::putRuntimeWorkerToSleep() { runtime.dispatcherLoopSleep(); } +void Dispatcher::putRuntimeWorkerToSleep() { runtime.dispatchLoopSleep(); } void Dispatcher::dispatchLoopDoSleep() { if constexpr (emper::WORKER_SLEEP) { diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 17ef6b9b..ac89eb76 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -3,6 +3,7 @@ #include "Runtime.hpp" #include <pthread.h> // for pthread_t, pthread_attr_init +#include <semaphore.h> #include <cerrno> // for errno // Non portable. @@ -72,9 +73,7 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory contextManager(*(new ContextManager(*this))), threads(new pthread_t[workerCount]), workers(new Worker*[workerCount]), - randomEngine(seed), - sleepingWorkers(0), - skipSleep(false) { + randomEngine(seed) { const int nprocs = get_nprocs(); { @@ -85,6 +84,12 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory currentRuntime = this; } + // initialize the wakeup semaphore + int err = sem_init(&wakeupSem, 0, 0); + if (err) { + DIE_MSG_ERRNO("initializing wakeup semaphore failed"); + } + // initialize the global and all worker IoContexts if constexpr (emper::IO) { // The global io_uring needs at least workerCount entries in its SQ because diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 92487a33..f52a5843 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -3,18 +3,18 @@ #pragma once #include <pthread.h> // for pthread_t +#include <semaphore.h> -#include <atomic> // for atomic, memory_order_relaxed -#include <cassert> // for assert -#include <condition_variable> // for condition_variable -#include <cstdint> // for intptr_t -#include <cstdlib> // for abort -#include <functional> // for function -#include <mutex> // for mutex, lock_guard, unique_lock +#include <cassert> // for assert +#include <cstdint> // for intptr_t +#include <cstdlib> // for abort +#include <functional> // for function +#include <mutex> // for mutex, lock_guard, unique_lock #include <random> #include <thread> // for thread #include <vector> // for vector +#include "CallerEnvironment.hpp" #include "Common.hpp" // for ALIGN_TO_CACHE_LINE #include "Debug.hpp" // for LogSubsystem, LogSubsystem::RUNTI, Logger #include "Emper.hpp" // for WORKER_NOTIFY @@ -36,11 +36,6 @@ class IoContext; using emper::io::IoContext; -enum WakeupMode { - IF_SLEEPING_OBSERVED, - ALWAYS, -}; - class Runtime : public Logger<LogSubsystem::RUNTI> { private: static std::mutex currentRuntimeMutex; @@ -66,10 +61,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { auto workerLoop(Worker* worker) -> void*; - ALIGN_TO_CACHE_LINE std::mutex workerSleepMutex; - std::condition_variable workerSleepConditionVariable; - ALIGN_TO_CACHE_LINE std::atomic<unsigned long> sleepingWorkers; - bool skipSleep; + ALIGN_TO_CACHE_LINE sem_t wakeupSem; static RuntimeStrategyFactory& DEFAULT_STRATEGY; @@ -78,51 +70,57 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { protected: void addNewWorkerHook(const std::function<void(void)>& hook) { newWorkerHooks.push_back(hook); }; - template <WakeupMode wakeupMode = WakeupMode::IF_SLEEPING_OBSERVED> + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> inline void wakeupSleepingWorkers() { - if constexpr (wakeupMode == WakeupMode::IF_SLEEPING_OBSERVED) { - // If we observe no worker sleeping, then we do not try to - // attempt to wakeup one. Note that this is racy, i.e., just - // because we do not see any worker sleeping, does not mean that - // one (or all but us, as a matter of a fact) is about to - // sleep. However, in the worst case, this causes a worker to - // sleep longer, until this worker (or another active worker) - // schedules another fiber, when this method will be called - // again. And then it is likely that the scheduling worker will - // observe the sleeping worker. - if (!sleepingWorkers.load(std::memory_order_relaxed)) { - return; - } - } - - std::lock_guard<std::mutex> lk(workerSleepMutex); - skipSleep = true; - if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::all) { - sleepingWorkers.store(0, std::memory_order_relaxed); - workerSleepConditionVariable.notify_all(); - } else if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::one) { - sleepingWorkers.fetch_sub(1, std::memory_order_relaxed); - workerSleepConditionVariable.notify_one(); + int skipWakeupThreshold; + if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { + // On external work we always increment the semaphore unless we observe + // that its value is > workerCount. + // If we observe semValue > workerCount we are ensured that some worker will iterate + // its dispatchLoop again and must observe the new work. + skipWakeupThreshold = workerCount; } else { - // Unknown worker wakeup strategy. - abort(); + // For work from within emper we skip wakeup if we observe no one sleeping. + // This is sound because wakeupSleepingWorkers() is called from a active + // worker which will observe its own new work in its next dispatchLoop before + // going to sleep. + skipWakeupThreshold = -1; } - } - void dispatcherLoopSleep() { - if (skipSleep) { - skipSleep = false; + int semValue; + sem_getvalue(&wakeupSem, &semValue); + if (semValue > skipWakeupThreshold) { return; } - std::unique_lock<std::mutex> lk(workerSleepMutex); - // Check again if "skip sleep" has been set. - if (skipSleep) return; + if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::one) { + sem_post(&wakeupSem); + } else if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::all) { + // notify all we observed sleeping + // It is sound to increment the semaphore to much, thus this will only cause + // workers to iterate the dispatchLoop more often before actually sleeping + + // TODO: Switch to c++20 std::counting_semaphore, which has release(std::ptrdiff_t) + + // Reading the manpage explains the function. + // POSIX sem_getvalue is allowed to return 0 or a negative count if there are + // waiters. + // Linux sem_getvalue indeed does return 0 + // To notify all sleeping workers we increment the semaphore once for each worker. + if (semValue == 0) { + semValue = -workerCount; + } - sleepingWorkers.fetch_add(1, std::memory_order_relaxed); - workerSleepConditionVariable.wait(lk); + for (; semValue < 0; ++semValue) { + sem_post(&wakeupSem); + } + } else { + ABORT("Unknown CallerEnvironment"); + } } + void dispatchLoopSleep() { sem_wait(&wakeupSem); } + inline auto getGlobalIo() -> IoContext* { if constexpr (emper::IO) { return globalIo; diff --git a/emper/Scheduler.cpp b/emper/Scheduler.cpp index ab828905..8db2ea0a 100644 --- a/emper/Scheduler.cpp +++ b/emper/Scheduler.cpp @@ -2,6 +2,7 @@ // Copyright © 2020 Florian Schmaus #include "Scheduler.hpp" +#include "CallerEnvironment.hpp" #include "Runtime.hpp" Scheduler::Scheduler(Runtime& runtime) : runtime(runtime) {} @@ -10,10 +11,11 @@ void Scheduler::addNewWorkerHook(const std::function<void(void)>& hook) { runtime.addNewWorkerHook(hook); } -void Scheduler::wakeupSleepingWorkersIfSleepingObserved() { - runtime.wakeupSleepingWorkers<WakeupMode::IF_SLEEPING_OBSERVED>(); +template <CallerEnvironment callerEnvironment> +void Scheduler::wakeupSleepingWorkers() { + runtime.wakeupSleepingWorkers<callerEnvironment>(); } -void Scheduler::wakeupSleepingWorkersAlways() { - runtime.wakeupSleepingWorkers<WakeupMode::ALWAYS>(); -} +// show the compiler our template incarnations +template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::EMPER>(); +template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::ANYWHERE>(); diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index eea2ff98..f49308ed 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -18,9 +18,8 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { private: lib::adt::LockedUnboundedQueue<Fiber> scheduleAnywhereQueue; - void wakeupSleepingWorkersIfSleepingObserved(); - - void wakeupSleepingWorkersAlways(); + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + void wakeupSleepingWorkers(); protected: Runtime& runtime; @@ -37,11 +36,7 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> inline void onNewWork() { if constexpr (emper::WORKER_SLEEP) { - if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - wakeupSleepingWorkersIfSleepingObserved(); - } else { - wakeupSleepingWorkersAlways(); - } + wakeupSleepingWorkers<callerEnvironment>(); } } -- GitLab