Commit 37143de2 authored by Florian Fischer's avatar Florian Fischer
Browse files

[WakeupStrategy] introduce a new class to model our wakeup strategies

Add new 'throttle' wakeup strategy inspired by the algorithm used
by zap, go and tokio. This tries to prevent a possible thundering herd
problem and reduce contention on the scheduler by only waking a single
worker at a time. It further ensures that the next worker is only notified
if the previous successfully found work.
parent ed2cbd91
......@@ -80,6 +80,7 @@ enum class LogSubsystem {
SCHED,
RUNTI,
SLEEP_S,
WAKE_S,
U_B_MPSC_Q,
IO,
};
......@@ -108,6 +109,7 @@ class Logger {
case LogSubsystem::SCHED:
case LogSubsystem::RUNTI:
case LogSubsystem::SLEEP_S:
case LogSubsystem::WAKE_S:
case LogSubsystem::U_B_MPSC_Q:
case LogSubsystem::IO:
default:
......@@ -133,6 +135,8 @@ class Logger {
return "RUNTI";
case LogSubsystem::SLEEP_S:
return "SLEEP_S";
case LogSubsystem::WAKE_S:
return "WAKE_S";
case LogSubsystem::U_B_MPSC_Q:
return "UBSCQ";
case LogSubsystem::IO:
......
......@@ -40,6 +40,7 @@ static const bool WORKER_SLEEP =
enum class WorkerWakeupStrategy {
one,
throttle,
all,
};
......
......@@ -85,7 +85,8 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory
ioContexts(emper::IO ? workerCount : 0),
ioReadySem(0),
randomEngine(seed),
workerSleepStrategy(*this, workerCount) {
workerSleepStrategy(*this, workerCount),
wakeupStrategy(workerCount) {
const int nprocs = get_nprocs();
{
......
......@@ -19,9 +19,9 @@
#include "CallerEnvironment.hpp"
#include "Context.hpp"
#include "Debug.hpp"
#include "Emper.hpp"
#include "NextFiberResult.hpp"
#include "Scheduler.hpp" // for Scheduler
#include "WakeupStrategy.hpp"
#include "Worker.hpp"
#include "emper-common.h" // for workerid_t
#include "lib/env.hpp"
......@@ -97,6 +97,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
auto workerLoop(Worker* worker) -> void*;
WorkerSleepStrategy workerSleepStrategy;
emper::WakeupStrategy wakeupStrategy;
static RuntimeStrategyFactory& DEFAULT_STRATEGY;
......@@ -119,12 +120,9 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
inline void wakeupSleepingWorkers() {
if constexpr (::emper::WORKER_WAKEUP_STRATEGY == ::emper::WorkerWakeupStrategy::one) {
workerSleepStrategy.notifyOne<callerEnvironment>();
} else if constexpr (::emper::WORKER_WAKEUP_STRATEGY == ::emper::WorkerWakeupStrategy::all) {
workerSleepStrategy.notifyAll<callerEnvironment>();
} else {
ABORT("Unknown CallerEnvironment");
workerid_t wakeupCount = wakeupStrategy.getWakeupCount();
if (wakeupCount) {
workerSleepStrategy.notifyMany<callerEnvironment>(wakeupCount);
}
}
......@@ -136,7 +134,18 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
}
}
void dispatchLoopSleep() { workerSleepStrategy.sleep(); }
void dispatchLoopSleep() {
bool canWake;
do {
// Notify the wakeup strategy about our sleep attempt
wakeupStrategy.goingToSleep();
workerSleepStrategy.sleep();
// We always wakeup if the runtime is terminating
canWake = wakeupStrategy.canWakeup() || terminateWorkers.load(std::memory_order_relaxed);
} while (!canWake);
}
public:
Runtime() : Runtime(getDefaultWorkerCount()) {}
......
......@@ -5,6 +5,7 @@
#include "CallerEnvironment.hpp"
#include "Runtime.hpp"
#include "RuntimeStrategy.hpp"
#include "WakeupStrategy.hpp"
Scheduler::Scheduler(Runtime& runtime, RuntimeStrategy& strategy)
: runtime(runtime), dispatcher(strategy.getDispatcher()) {}
......@@ -18,6 +19,8 @@ void Scheduler::wakeupSleepingWorkers() {
runtime.wakeupSleepingWorkers<callerEnvironment>();
}
void Scheduler::notifyRuntimeAboutWorkStolen() { runtime.wakeupStrategy.onWorkStolen(); }
// show the compiler our template incarnations
template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::EMPER>();
template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::ANYWHERE>();
......@@ -25,6 +25,8 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
void wakeupSleepingWorkers();
void notifyRuntimeAboutWorkStolen();
protected:
Runtime& runtime;
Dispatcher& dispatcher;
......@@ -48,6 +50,12 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
}
}
inline void onWorkStolen() {
if constexpr (emper::WORKER_SLEEP) {
notifyRuntimeAboutWorkStolen();
}
}
void enqueueInAnywhereQueue(Fiber& fiber) { scheduleAnywhereQueue.enqueue(&fiber); }
void insertInAnywhereQueue(Fiber** fibers, unsigned count) {
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include "WakeupStrategy.hpp"
namespace emper {
// TODO: maybe we want to pass this information through the dispatch loop.
// If we are waking up we can skip trying our local work stealing queue as
// an optimization and would save the rather expensive tls
thread_local bool WakeupStrategy::isWaking = false;
} // namespace emper
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#pragma once
#include <atomic>
#include <cassert>
#include "Debug.hpp"
#include "Emper.hpp"
#include "emper-common.h"
namespace emper {
/**
* @brief Class modeling the strategy used to wakeup suspended worker threads
*
* The possible behaviors are:
* - 'one': Wakeup a SINGLE suspended worker for each onNewWork notification
* - 'all': Wakeup ALL suspended worker for each onNewWork notification
* - 'throttle': Only wakeup a suspended worker if there is currently no worker
* already waking up. The wakup strategy can be in a tri-state:
* (pending, notified, waking). 'Pending' means no unconsumed notification
* is available. 'Notified' means there was a notification but no
* worker has woken and consumed it yet. Waking means a worker
* has consumed the notification and enetered its dispatch loop.
* A waking worker will either pass its working status when it
* calls onNewWork itself or reset the state to pending when
* successfully stolen work.
* The 'throttle' strategy is havily inspired by the zap thread-pool:
* https://zig.news/kprotty/resource-efficient-thread-pools-with-zig-3291
*/
class WakeupStrategy : public Logger<LogSubsystem::WAKE_S> {
workerid_t workerCount;
enum class ThrottleState {
pending,
notified,
waking,
};
// TODO: figure out the weakest sound memory ordering of the atomic operations
std::atomic<ThrottleState> throttleState = ThrottleState::pending;
/**
* @brief Flag indicating that this worker is the waking thread
*
* When using the 'throttle' wakeup strategy the 'waking' thread is responsible
* to reset the notified flag of the wakeup strategy and thus allowing more
* threads to be awoken.
*/
static thread_local bool isWaking;
auto canWakeupThrottle() -> bool {
ThrottleState notified = ThrottleState::notified;
bool wasNotified = throttleState.compare_exchange_strong(notified, ThrottleState::waking);
// This is the working thread now
if (wasNotified) {
isWaking = true;
}
return wasNotified;
}
auto getWakeupCountThrottle() -> workerid_t {
// If the waking worker calls onNewWork/shouldWakeup he is clearly done waking up
if (isWaking) {
resetWaking();
return 1;
}
ThrottleState pending = ThrottleState::pending;
bool wasPending = throttleState.compare_exchange_strong(pending, ThrottleState::notified);
return wasPending ? 1 : 0;
}
void resetWaking() {
assert(isWaking);
isWaking = false;
ATTR_UNUSED ThrottleState old = throttleState.exchange(ThrottleState::pending);
assert(old == ThrottleState::waking);
}
public:
WakeupStrategy(workerid_t workerCount) : workerCount(workerCount) {}
void onWorkStolen() {
if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::throttle) {
if (isWaking) {
LOGD("waking worker stole work -> reset to pending");
resetWaking();
}
}
}
void goingToSleep() {
if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::throttle) {
if (isWaking) {
LOGD("waking worker is going to sleep -> reset to pending");
resetWaking();
}
}
}
auto canWakeup() -> bool {
if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::throttle) {
return canWakeupThrottle();
} else {
return true;
}
}
auto getWakeupCount() -> workerid_t {
if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::one) {
return 1;
} else if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::all) {
return workerCount;
} else if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::throttle) {
return getWakeupCountThrottle();
} else {
ABORT("Unknown WakeStrategy");
}
}
};
} // namespace emper
......@@ -25,6 +25,7 @@ emper_cpp_sources = [
'BinaryPrivateSemaphore.cpp',
'CountingPrivateSemaphore.cpp',
'Semaphore.cpp',
'WakeupStrategy.cpp',
'Worker.cpp',
]
......
......@@ -233,6 +233,11 @@ out:
auto AbstractWorkStealingScheduler::nextFiberResultViaWorkStealing() -> NextFiberResult {
std::pair<Fiber*, FiberSource> nextFiberWsResult = nextFiberViaWorkStealing();
if (nextFiberWsResult.second == FiberSource::stolen) {
onWorkStolen();
}
return NextFiberResult{
nextFiberWsResult.first,
static_cast<uintptr_t>(nextFiberWsResult.second),
......
......@@ -32,7 +32,7 @@ option(
'worker_wakeup_strategy',
type: 'combo',
description: 'The strategy used to wakeup sleeping workers (only effective if worker_sleep is enabled)',
choices: ['one', 'all'],
choices: ['one', 'throttle', 'all'],
value: 'one',
)
option(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment