diff --git a/emper/Debug.hpp b/emper/Debug.hpp index 6520cf6c7fd322ab8f2683e19b85496f626b3701..e2e781262669c776e0e9dce5b31a9a707d230c70 100644 --- a/emper/Debug.hpp +++ b/emper/Debug.hpp @@ -80,6 +80,7 @@ enum class LogSubsystem { SCHED, RUNTI, SLEEP_S, + WAKE_S, U_B_MPSC_Q, IO, }; @@ -108,6 +109,7 @@ class Logger { case LogSubsystem::SCHED: case LogSubsystem::RUNTI: case LogSubsystem::SLEEP_S: + case LogSubsystem::WAKE_S: case LogSubsystem::U_B_MPSC_Q: case LogSubsystem::IO: default: @@ -133,6 +135,8 @@ class Logger { return "RUNTI"; case LogSubsystem::SLEEP_S: return "SLEEP_S"; + case LogSubsystem::WAKE_S: + return "WAKE_S"; case LogSubsystem::U_B_MPSC_Q: return "UBSCQ"; case LogSubsystem::IO: diff --git a/emper/Emper.hpp b/emper/Emper.hpp index 13cb2c0097b7d91ce1bfcd1635648737e73b26fa..e38bd31b0ca6f482916fd784e943fd7fba17b683 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -40,6 +40,7 @@ static const bool WORKER_SLEEP = enum class WorkerWakeupStrategy { one, + throttle, all, }; diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index abb54b24540a81ca91cb576f15e971a0fd7d2aed..4b6b00800bbbd8bb16e61f15b47c35b8c5a1bb86 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -85,7 +85,8 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory ioContexts(emper::IO ? workerCount : 0), ioReadySem(0), randomEngine(seed), - workerSleepStrategy(*this, workerCount) { + workerSleepStrategy(*this, workerCount), + wakeupStrategy(workerCount) { const int nprocs = get_nprocs(); { diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index c9f71733893a3cc77818da15daa23b8cd38908dd..f9b0e276e7e6f8c32bd43ccd17149cd74cf33328 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -19,9 +19,9 @@ #include "CallerEnvironment.hpp" #include "Context.hpp" #include "Debug.hpp" -#include "Emper.hpp" #include "NextFiberResult.hpp" #include "Scheduler.hpp" // for Scheduler +#include "WakeupStrategy.hpp" #include "Worker.hpp" #include "emper-common.h" // for workerid_t #include "lib/env.hpp" @@ -97,6 +97,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { auto workerLoop(Worker* worker) -> void*; WorkerSleepStrategy workerSleepStrategy; + emper::WakeupStrategy wakeupStrategy; static RuntimeStrategyFactory& DEFAULT_STRATEGY; @@ -119,12 +120,9 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> inline void wakeupSleepingWorkers() { - if constexpr (::emper::WORKER_WAKEUP_STRATEGY == ::emper::WorkerWakeupStrategy::one) { - workerSleepStrategy.notifyOne<callerEnvironment>(); - } else if constexpr (::emper::WORKER_WAKEUP_STRATEGY == ::emper::WorkerWakeupStrategy::all) { - workerSleepStrategy.notifyAll<callerEnvironment>(); - } else { - ABORT("Unknown CallerEnvironment"); + workerid_t wakeupCount = wakeupStrategy.getWakeupCount(); + if (wakeupCount) { + workerSleepStrategy.notifyMany<callerEnvironment>(wakeupCount); } } @@ -136,7 +134,18 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { } } - void dispatchLoopSleep() { workerSleepStrategy.sleep(); } + void dispatchLoopSleep() { + bool canWake; + do { + // Notify the wakeup strategy about our sleep attempt + wakeupStrategy.goingToSleep(); + + workerSleepStrategy.sleep(); + + // We always wakeup if the runtime is terminating + canWake = wakeupStrategy.canWakeup() || terminateWorkers.load(std::memory_order_relaxed); + } while (!canWake); + } public: Runtime() : Runtime(getDefaultWorkerCount()) {} diff --git a/emper/Scheduler.cpp b/emper/Scheduler.cpp index 534edfbe8dac6d4eb4b5da096bedcc7232bef845..f04ca047bd022408bb9deeea2507b1cd634b8891 100644 --- a/emper/Scheduler.cpp +++ b/emper/Scheduler.cpp @@ -5,6 +5,7 @@ #include "CallerEnvironment.hpp" #include "Runtime.hpp" #include "RuntimeStrategy.hpp" +#include "WakeupStrategy.hpp" Scheduler::Scheduler(Runtime& runtime, RuntimeStrategy& strategy) : runtime(runtime), dispatcher(strategy.getDispatcher()) {} @@ -18,6 +19,8 @@ void Scheduler::wakeupSleepingWorkers() { runtime.wakeupSleepingWorkers<callerEnvironment>(); } +void Scheduler::notifyRuntimeAboutWorkStolen() { runtime.wakeupStrategy.onWorkStolen(); } + // show the compiler our template incarnations template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::EMPER>(); template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::ANYWHERE>(); diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index 38b9e084665cb787acdc50cfdab59d0a5b4a7491..41160ff3612910003f03836a9a5dd3f4d16396f5 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -25,6 +25,8 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void wakeupSleepingWorkers(); + void notifyRuntimeAboutWorkStolen(); + protected: Runtime& runtime; Dispatcher& dispatcher; @@ -48,6 +50,12 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { } } + inline void onWorkStolen() { + if constexpr (emper::WORKER_SLEEP) { + notifyRuntimeAboutWorkStolen(); + } + } + void enqueueInAnywhereQueue(Fiber& fiber) { scheduleAnywhereQueue.enqueue(&fiber); } void insertInAnywhereQueue(Fiber** fibers, unsigned count) { diff --git a/emper/WakeupStrategy.cpp b/emper/WakeupStrategy.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ba5476b787945d8f2ff2633556bf508337f4598a --- /dev/null +++ b/emper/WakeupStrategy.cpp @@ -0,0 +1,10 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "WakeupStrategy.hpp" + +namespace emper { +// TODO: maybe we want to pass this information through the dispatch loop. +// If we are waking up we can skip trying our local work stealing queue as +// an optimization and would save the rather expensive tls +thread_local bool WakeupStrategy::isWaking = false; +} // namespace emper diff --git a/emper/WakeupStrategy.hpp b/emper/WakeupStrategy.hpp new file mode 100644 index 0000000000000000000000000000000000000000..f2c32bbb7cdc2fe6186908d53e2642da887ac47d --- /dev/null +++ b/emper/WakeupStrategy.hpp @@ -0,0 +1,128 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#include <atomic> +#include <cassert> + +#include "Debug.hpp" +#include "Emper.hpp" +#include "emper-common.h" + +namespace emper { + +/** + * @brief Class modeling the strategy used to wakeup suspended worker threads + * + * The possible behaviors are: + * - 'one': Wakeup a SINGLE suspended worker for each onNewWork notification + * - 'all': Wakeup ALL suspended worker for each onNewWork notification + * - 'throttle': Only wakeup a suspended worker if there is currently no worker + * already waking up. The wakup strategy can be in a tri-state: + * (pending, notified, waking). 'Pending' means no unconsumed notification + * is available. 'Notified' means there was a notification but no + * worker has woken and consumed it yet. Waking means a worker + * has consumed the notification and enetered its dispatch loop. + * A waking worker will either pass its working status when it + * calls onNewWork itself or reset the state to pending when + * successfully stolen work. + * The 'throttle' strategy is havily inspired by the zap thread-pool: + * https://zig.news/kprotty/resource-efficient-thread-pools-with-zig-3291 + */ +class WakeupStrategy : public Logger<LogSubsystem::WAKE_S> { + workerid_t workerCount; + + enum class ThrottleState { + pending, + notified, + waking, + }; + + // TODO: figure out the weakest sound memory ordering of the atomic operations + std::atomic<ThrottleState> throttleState = ThrottleState::pending; + + /** + * @brief Flag indicating that this worker is the waking thread + * + * When using the 'throttle' wakeup strategy the 'waking' thread is responsible + * to reset the notified flag of the wakeup strategy and thus allowing more + * threads to be awoken. + */ + static thread_local bool isWaking; + + auto canWakeupThrottle() -> bool { + ThrottleState notified = ThrottleState::notified; + bool wasNotified = throttleState.compare_exchange_strong(notified, ThrottleState::waking); + + // This is the working thread now + if (wasNotified) { + isWaking = true; + } + + return wasNotified; + } + + auto getWakeupCountThrottle() -> workerid_t { + // If the waking worker calls onNewWork/shouldWakeup he is clearly done waking up + if (isWaking) { + resetWaking(); + return 1; + } + + ThrottleState pending = ThrottleState::pending; + bool wasPending = throttleState.compare_exchange_strong(pending, ThrottleState::notified); + return wasPending ? 1 : 0; + } + + void resetWaking() { + assert(isWaking); + isWaking = false; + ATTR_UNUSED ThrottleState old = throttleState.exchange(ThrottleState::pending); + assert(old == ThrottleState::waking); + } + + public: + WakeupStrategy(workerid_t workerCount) : workerCount(workerCount) {} + + void onWorkStolen() { + if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::throttle) { + if (isWaking) { + LOGD("waking worker stole work -> reset to pending"); + resetWaking(); + } + } + } + + void goingToSleep() { + if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::throttle) { + if (isWaking) { + LOGD("waking worker is going to sleep -> reset to pending"); + resetWaking(); + } + } + } + + auto canWakeup() -> bool { + if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::throttle) { + return canWakeupThrottle(); + } else { + return true; + } + } + + auto getWakeupCount() -> workerid_t { + if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::one) { + return 1; + + } else if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::all) { + return workerCount; + + } else if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::throttle) { + return getWakeupCountThrottle(); + + } else { + ABORT("Unknown WakeStrategy"); + } + } +}; +} // namespace emper diff --git a/emper/meson.build b/emper/meson.build index e5b942b972dc96bf175fd3ad34e7d5e9a2c151e7..1faa562936f56b65f0cd8dfab4f5e1900bf1f1ce 100644 --- a/emper/meson.build +++ b/emper/meson.build @@ -25,6 +25,7 @@ emper_cpp_sources = [ 'BinaryPrivateSemaphore.cpp', 'CountingPrivateSemaphore.cpp', 'Semaphore.cpp', + 'WakeupStrategy.cpp', 'Worker.cpp', ] diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index 05abc0e8cc6809fe122c47255b09a54ae24b1bf2..fba97e7de0ce106c2d39cb65dc16c192e0a6da39 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -233,6 +233,11 @@ out: auto AbstractWorkStealingScheduler::nextFiberResultViaWorkStealing() -> NextFiberResult { std::pair<Fiber*, FiberSource> nextFiberWsResult = nextFiberViaWorkStealing(); + + if (nextFiberWsResult.second == FiberSource::stolen) { + onWorkStolen(); + } + return NextFiberResult{ nextFiberWsResult.first, static_cast<uintptr_t>(nextFiberWsResult.second), diff --git a/meson_options.txt b/meson_options.txt index 1cd5d81a4e618501181c035e1aa16bb62dc183db..7c91669b7c0deb530b071ef6a952c144d3939cfe 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -32,7 +32,7 @@ option( 'worker_wakeup_strategy', type: 'combo', description: 'The strategy used to wakeup sleeping workers (only effective if worker_sleep is enabled)', - choices: ['one', 'all'], + choices: ['one', 'throttle', 'all'], value: 'one', ) option(