diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 67610ec89571c68bc2c2e9970d65fc89cc46a348..498ed53086ee41a1e04d55113b23c53ad459e167 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -259,10 +259,10 @@ test-worker-wakeup-strategy-all: - .test - .worker-wakeup-strategy-all -#test-worker-wakeup-strategy-throttle: -# extends: -# - .test -# - .emper-worker-wakeup-strategy-throttle +test-worker-wakeup-strategy-throttle: + extends: + - .test + - .emper-worker-wakeup-strategy-throttle test-do-not-log-timestamp: extends: diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index f80dfb6fceaec9b3c7e271957b4b5278ebd43e83..f80e7a43509c4e2a13357318af42c9c2bf0c5853 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -138,7 +138,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { bool canWake; do { // Notify the wakeup strategy about our sleep attempt - wakeupStrategy.goingToSleep(); + if (!wakeupStrategy.canSleep()) break; workerSleepStrategy.sleep(); diff --git a/emper/WakeupStrategy.hpp b/emper/WakeupStrategy.hpp index 06dcee098ca7aca452030cb96aa3692a27fb4914..75674eae7228ea2902fe1d0898741649cf26f535 100644 --- a/emper/WakeupStrategy.hpp +++ b/emper/WakeupStrategy.hpp @@ -22,13 +22,15 @@ namespace emper { * - 'throttle': Only wakeup a suspended worker if there is currently no worker * already waking up. The wakup strategy can be in a tri-state: * (pending, notified, waking). 'Pending' means no unconsumed notification - * is available. 'Notified' means there was a notification but no - * worker has woken and consumed it yet. Waking means a worker - * has consumed the notification and enetered its dispatch loop. + * is available. 'Notified' means there is a consumable notification available. + * Waking means a worker has consumed the notification and entered + * its dispatch loop. + * * A waking worker will either pass its working status when it - * calls onNewWork itself or reset the state to pending when + * calls onNewWork itself, consumes a further notification from anywhere + * and skip sleeping to prevent lost wakups, or reset the state to pending when * successfully stolen work. - * The 'throttle' strategy is havily inspired by the zap thread-pool: + * The 'throttle' strategy is heavily inspired by the zap thread-pool: * https://zig.news/kprotty/resource-efficient-thread-pools-with-zig-3291 */ class WakeupStrategy : public Logger<LogSubsystem::WAKE_S> { @@ -68,24 +70,33 @@ class WakeupStrategy : public Logger<LogSubsystem::WAKE_S> { template <CallerEnvironment callerEnvironment> auto getWakeupCountThrottle() -> workerid_t { + bool wasPending; // If the waking worker calls onNewWork->getWakeupCount he is clearly done waking up - if (isWaking) { - resetWaking(); - LOGD("waking worker wants to wakeup others -> 1"); - return 1; + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + if (isWaking) { + resetWaking(); + LOGD("waking worker wants to wakeup others -> 1"); + return 1; + } + + ThrottleState state = ThrottleState::pending; + wasPending = throttleState.compare_exchange_strong(state, ThrottleState::notified); + + LOGD("get wakeup count from " << callerEnvironment << " -> " << (wasPending ? 1 : 0)); + // If we notify from anywhere and there is currently a waking worker we have + // to prevent it them from sleeping -> always set the state to notified; + } else { + wasPending = throttleState.exchange(ThrottleState::notified) == ThrottleState::pending; } - ThrottleState pending = ThrottleState::pending; - bool wasPending = throttleState.compare_exchange_strong(pending, ThrottleState::notified); - LOGD("get wakeup count from " << callerEnvironment << " -> " << (wasPending ? 1 : 0)); return wasPending ? 1 : 0; } - void resetWaking() { + ThrottleState resetWaking() { assert(isWaking); isWaking = false; - ATTR_UNUSED ThrottleState old = throttleState.exchange(ThrottleState::pending); - assert(old == ThrottleState::waking); + ThrottleState old = throttleState.exchange(ThrottleState::pending); + return old; } public: @@ -100,13 +111,16 @@ class WakeupStrategy : public Logger<LogSubsystem::WAKE_S> { } } - void goingToSleep() { + bool canSleep() { if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::throttle) { if (isWaking) { LOGD("waking worker is going to sleep -> reset to pending"); - resetWaking(); + auto oldState = resetWaking(); + return oldState == ThrottleState::waking; } } + + return true; } auto canWakeup() -> bool {