Skip to content
Snippets Groups Projects
Commit baedc874 authored by Florian Fischer's avatar Florian Fischer
Browse files

[WakeupStrategy] fix the throttle algorithm for notifiaction from anywhere

The throttle algorithm had the same problem like our sleep algorithms
where notifications from anywhere may race with a worker going to
sleep resulting in lost wakeups.
In the sleep strategy we prevent those races by preventing sleep attempts
when notifing from anywhere.
The throttle algorithm also does now exactly this. A notifier from anywhere
will now always set the WakeupStrategy state to notified.
If the state was previously pending this new approach does not differ from
the previous behavior and a sleeping worker will be notified.
If the state was waking the waking worker skips its sleep if it observes
the WakeupStrategy state as notified.
parent 7ae0f69b
No related branches found
No related tags found
1 merge request!266[WakeupStrategy] fix the throttle algorithm for notifiaction from anywhere
......@@ -259,10 +259,10 @@ test-worker-wakeup-strategy-all:
- .test
- .worker-wakeup-strategy-all
#test-worker-wakeup-strategy-throttle:
# extends:
# - .test
# - .emper-worker-wakeup-strategy-throttle
test-worker-wakeup-strategy-throttle:
extends:
- .test
- .emper-worker-wakeup-strategy-throttle
test-do-not-log-timestamp:
extends:
......
......@@ -138,7 +138,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
bool canWake;
do {
// Notify the wakeup strategy about our sleep attempt
wakeupStrategy.goingToSleep();
if (!wakeupStrategy.canSleep()) break;
workerSleepStrategy.sleep();
......
......@@ -22,13 +22,15 @@ namespace emper {
* - 'throttle': Only wakeup a suspended worker if there is currently no worker
* already waking up. The wakup strategy can be in a tri-state:
* (pending, notified, waking). 'Pending' means no unconsumed notification
* is available. 'Notified' means there was a notification but no
* worker has woken and consumed it yet. Waking means a worker
* has consumed the notification and enetered its dispatch loop.
* is available. 'Notified' means there is a consumable notification available.
* Waking means a worker has consumed the notification and entered
* its dispatch loop.
*
* A waking worker will either pass its working status when it
* calls onNewWork itself or reset the state to pending when
* calls onNewWork itself, consumes a further notification from anywhere
* and skip sleeping to prevent lost wakups, or reset the state to pending when
* successfully stolen work.
* The 'throttle' strategy is havily inspired by the zap thread-pool:
* The 'throttle' strategy is heavily inspired by the zap thread-pool:
* https://zig.news/kprotty/resource-efficient-thread-pools-with-zig-3291
*/
class WakeupStrategy : public Logger<LogSubsystem::WAKE_S> {
......@@ -68,24 +70,33 @@ class WakeupStrategy : public Logger<LogSubsystem::WAKE_S> {
template <CallerEnvironment callerEnvironment>
auto getWakeupCountThrottle() -> workerid_t {
bool wasPending;
// If the waking worker calls onNewWork->getWakeupCount he is clearly done waking up
if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
if (isWaking) {
resetWaking();
LOGD("waking worker wants to wakeup others -> 1");
return 1;
}
ThrottleState pending = ThrottleState::pending;
bool wasPending = throttleState.compare_exchange_strong(pending, ThrottleState::notified);
ThrottleState state = ThrottleState::pending;
wasPending = throttleState.compare_exchange_strong(state, ThrottleState::notified);
LOGD("get wakeup count from " << callerEnvironment << " -> " << (wasPending ? 1 : 0));
// If we notify from anywhere and there is currently a waking worker we have
// to prevent it them from sleeping -> always set the state to notified;
} else {
wasPending = throttleState.exchange(ThrottleState::notified) == ThrottleState::pending;
}
return wasPending ? 1 : 0;
}
void resetWaking() {
ThrottleState resetWaking() {
assert(isWaking);
isWaking = false;
ATTR_UNUSED ThrottleState old = throttleState.exchange(ThrottleState::pending);
assert(old == ThrottleState::waking);
ThrottleState old = throttleState.exchange(ThrottleState::pending);
return old;
}
public:
......@@ -100,13 +111,16 @@ class WakeupStrategy : public Logger<LogSubsystem::WAKE_S> {
}
}
void goingToSleep() {
bool canSleep() {
if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::throttle) {
if (isWaking) {
LOGD("waking worker is going to sleep -> reset to pending");
resetWaking();
auto oldState = resetWaking();
return oldState == ThrottleState::waking;
}
}
return true;
}
auto canWakeup() -> bool {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment