From baedc8746a47e03be9d7f4b5f62fbafac4d1c64d Mon Sep 17 00:00:00 2001 From: Florian Fischer <florian.fischer@muhq.space> Date: Mon, 4 Oct 2021 19:02:46 +0200 Subject: [PATCH] [WakeupStrategy] fix the throttle algorithm for notifiaction from anywhere The throttle algorithm had the same problem like our sleep algorithms where notifications from anywhere may race with a worker going to sleep resulting in lost wakeups. In the sleep strategy we prevent those races by preventing sleep attempts when notifing from anywhere. The throttle algorithm also does now exactly this. A notifier from anywhere will now always set the WakeupStrategy state to notified. If the state was previously pending this new approach does not differ from the previous behavior and a sleeping worker will be notified. If the state was waking the waking worker skips its sleep if it observes the WakeupStrategy state as notified. --- .gitlab-ci.yml | 8 +++---- emper/Runtime.hpp | 2 +- emper/WakeupStrategy.hpp | 48 ++++++++++++++++++++++++++-------------- 3 files changed, 36 insertions(+), 22 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 67610ec8..498ed530 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -259,10 +259,10 @@ test-worker-wakeup-strategy-all: - .test - .worker-wakeup-strategy-all -#test-worker-wakeup-strategy-throttle: -# extends: -# - .test -# - .emper-worker-wakeup-strategy-throttle +test-worker-wakeup-strategy-throttle: + extends: + - .test + - .emper-worker-wakeup-strategy-throttle test-do-not-log-timestamp: extends: diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index f80dfb6f..f80e7a43 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -138,7 +138,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { bool canWake; do { // Notify the wakeup strategy about our sleep attempt - wakeupStrategy.goingToSleep(); + if (!wakeupStrategy.canSleep()) break; workerSleepStrategy.sleep(); diff --git a/emper/WakeupStrategy.hpp b/emper/WakeupStrategy.hpp index 06dcee09..75674eae 100644 --- a/emper/WakeupStrategy.hpp +++ b/emper/WakeupStrategy.hpp @@ -22,13 +22,15 @@ namespace emper { * - 'throttle': Only wakeup a suspended worker if there is currently no worker * already waking up. The wakup strategy can be in a tri-state: * (pending, notified, waking). 'Pending' means no unconsumed notification - * is available. 'Notified' means there was a notification but no - * worker has woken and consumed it yet. Waking means a worker - * has consumed the notification and enetered its dispatch loop. + * is available. 'Notified' means there is a consumable notification available. + * Waking means a worker has consumed the notification and entered + * its dispatch loop. + * * A waking worker will either pass its working status when it - * calls onNewWork itself or reset the state to pending when + * calls onNewWork itself, consumes a further notification from anywhere + * and skip sleeping to prevent lost wakups, or reset the state to pending when * successfully stolen work. - * The 'throttle' strategy is havily inspired by the zap thread-pool: + * The 'throttle' strategy is heavily inspired by the zap thread-pool: * https://zig.news/kprotty/resource-efficient-thread-pools-with-zig-3291 */ class WakeupStrategy : public Logger<LogSubsystem::WAKE_S> { @@ -68,24 +70,33 @@ class WakeupStrategy : public Logger<LogSubsystem::WAKE_S> { template <CallerEnvironment callerEnvironment> auto getWakeupCountThrottle() -> workerid_t { + bool wasPending; // If the waking worker calls onNewWork->getWakeupCount he is clearly done waking up - if (isWaking) { - resetWaking(); - LOGD("waking worker wants to wakeup others -> 1"); - return 1; + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + if (isWaking) { + resetWaking(); + LOGD("waking worker wants to wakeup others -> 1"); + return 1; + } + + ThrottleState state = ThrottleState::pending; + wasPending = throttleState.compare_exchange_strong(state, ThrottleState::notified); + + LOGD("get wakeup count from " << callerEnvironment << " -> " << (wasPending ? 1 : 0)); + // If we notify from anywhere and there is currently a waking worker we have + // to prevent it them from sleeping -> always set the state to notified; + } else { + wasPending = throttleState.exchange(ThrottleState::notified) == ThrottleState::pending; } - ThrottleState pending = ThrottleState::pending; - bool wasPending = throttleState.compare_exchange_strong(pending, ThrottleState::notified); - LOGD("get wakeup count from " << callerEnvironment << " -> " << (wasPending ? 1 : 0)); return wasPending ? 1 : 0; } - void resetWaking() { + ThrottleState resetWaking() { assert(isWaking); isWaking = false; - ATTR_UNUSED ThrottleState old = throttleState.exchange(ThrottleState::pending); - assert(old == ThrottleState::waking); + ThrottleState old = throttleState.exchange(ThrottleState::pending); + return old; } public: @@ -100,13 +111,16 @@ class WakeupStrategy : public Logger<LogSubsystem::WAKE_S> { } } - void goingToSleep() { + bool canSleep() { if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::throttle) { if (isWaking) { LOGD("waking worker is going to sleep -> reset to pending"); - resetWaking(); + auto oldState = resetWaking(); + return oldState == ThrottleState::waking; } } + + return true; } auto canWakeup() -> bool { -- GitLab