Skip to content
Snippets Groups Projects
Commit 15efdc5c authored by Florian Fischer's avatar Florian Fischer
Browse files

fix/improve notifySpecific using semaphores

* Using SleeperState instead of boolean flags make the code more readable.
* Don't try to notify ourselves resulting in an infinite loop.
* Allocate the worker states cache line exclusive.
* Add debug messages.
* Back off for 1ms when notifying everyone to allow the specific worker
  to wake up.
parent 96a846a1
No related branches found
No related tags found
1 merge request!296fix Future::cancel with new Scheduler::scheduleOn(fiber, workerId)
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include "CallerEnvironment.hpp" #include "CallerEnvironment.hpp"
#include "Emper.hpp" #include "Emper.hpp"
#include "Worker.hpp"
namespace emper::sleep_strategy { namespace emper::sleep_strategy {
...@@ -32,6 +33,11 @@ class AbstractWorkerSleepStrategy { ...@@ -32,6 +33,11 @@ class AbstractWorkerSleepStrategy {
template <CallerEnvironment callerEnvironment> template <CallerEnvironment callerEnvironment>
void notifySpecific(workerid_t workerId) { void notifySpecific(workerid_t workerId) {
// Do not notify ourselves
if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
if (workerId == Worker::getCurrentWorkerId()) return;
}
static_cast<T*>(this)->template notifySpecific<callerEnvironment>(workerId); static_cast<T*>(this)->template notifySpecific<callerEnvironment>(workerId);
} }
......
...@@ -3,14 +3,19 @@ ...@@ -3,14 +3,19 @@
#pragma once #pragma once
#include <atomic> #include <atomic>
#include <chrono>
#include <iostream> #include <iostream>
#include <new>
#include <thread>
#include "CallerEnvironment.hpp" #include "CallerEnvironment.hpp"
#include "Common.hpp"
#include "Debug.hpp" #include "Debug.hpp"
#include "Worker.hpp" #include "Worker.hpp"
#include "emper-common.h" #include "emper-common.h"
#include "emper-config.h" #include "emper-config.h"
#include "sleep_strategy/AbstractWorkerSleepStrategy.hpp" #include "sleep_strategy/AbstractWorkerSleepStrategy.hpp"
#include "sleep_strategy/SleeperState.hpp"
#include "sleep_strategy/Stats.hpp" #include "sleep_strategy/Stats.hpp"
#ifdef EMPER_LOCKED_WAKEUP_SEMAPHORE #ifdef EMPER_LOCKED_WAKEUP_SEMAPHORE
...@@ -56,24 +61,28 @@ class AbstractSemaphoreWorkerSleepStrategy ...@@ -56,24 +61,28 @@ class AbstractSemaphoreWorkerSleepStrategy
static constexpr bool useGenericNotifySpecificImpl = !semHasNotifySpecific; static constexpr bool useGenericNotifySpecificImpl = !semHasNotifySpecific;
// Member used for the generic notifySpecific implementation // Member used for the generic notifySpecific implementation
std::atomic<bool>* notifiedFlags; std::atomic<SleeperState>* states = nullptr;
// Sleep part of the generic notifySpecific implementation // Sleep part of the generic notifySpecific implementation
inline void genericNotifySpecificSleep() { inline void genericNotifySpecificSleep() {
workerid_t workerId = Worker::getCurrentWorkerId(); const workerid_t workerId = Worker::getCurrentWorkerId();
auto& notified = notifiedFlags[workerId]; auto& state = states[workerId];
if (notified.load(std::memory_order_relaxed)) {
notified.store(false, std::memory_order_relaxed); auto expected = SleeperState::Running;
if (!state.compare_exchange_strong(expected, SleeperState::Sleeping, std::memory_order_release,
std::memory_order_relaxed)) {
state.store(SleeperState::Running, std::memory_order_release);
stats.incSkip(); stats.incSkip();
LOGD("State " << &state << " was Notified reset to Running and skip sleeping");
return; return;
} }
LOGD("going to sleep");
wakeupSem.wait(); wakeupSem.wait();
stats.incWakeup(); stats.incWakeup();
if (notified.load(std::memory_order_relaxed)) { LOGD("awoken set state to Running");
notified.store(false, std::memory_order_relaxed); state.store(SleeperState::Running, std::memory_order_relaxed);
}
} }
// Currently we don't have a good sempahore based algorithm to notify a specific // Currently we don't have a good sempahore based algorithm to notify a specific
...@@ -94,24 +103,31 @@ class AbstractSemaphoreWorkerSleepStrategy ...@@ -94,24 +103,31 @@ class AbstractSemaphoreWorkerSleepStrategy
// 5. W2 iterates the dispatch loop two times without finding work -> goes to sleep // 5. W2 iterates the dispatch loop two times without finding work -> goes to sleep
// 6. W1 calls sleep() with a sem value of 0 -> goes to sleep though it has a cqe in its CQ // 6. W1 calls sleep() with a sem value of 0 -> goes to sleep though it has a cqe in its CQ
// //
// The used approach is to keep a notified flag for each worker in the notifiedFlags // The used approach is to keep a state for each worker in the states
// array. Those flags make it possible to notify a specific worker. // array. Those states make it possible to notify a specific worker.
// We are sure that the specific worker was successfully notified if we observe // We are sure that the specific worker was successfully notified if we observe
// that its flag was reset. // that its state has changed.
// Therefore we can notify all worker in a loop until we observe that the // Therefore we can notify all worker in a loop until we observe that the
// specific worker has changed its flag or the runtime is terminating. // specific worker has changed its state or the runtime is terminating.
//
// Unfortunately this introduces significant overhead in the sleep method which is
// only necessary if the runtime actually wants to notify a specific worker.
// This should be configured at compile time using a boolean template parameter of the
// WorkerWakeupStrategy (<bool needsNotifySpecific>).
template <CallerEnvironment callerEnvironment> template <CallerEnvironment callerEnvironment>
inline void genericNotifySpecific(workerid_t workerId) { inline void genericNotifySpecific(workerid_t workerId) {
auto& notifiedFlag = notifiedFlags[workerId]; auto& state = states[workerId];
notifiedFlag.store(true, std::memory_order_relaxed); if (state.exchange(SleeperState::Notified, std::memory_order_release) !=
// NotifyAll since we observe that the specific worker has reset its flag SleeperState::Sleeping) {
while (notifiedFlag.load(std::memory_order_relaxed) && !isRuntimeTerminating()) { LOGD("Worker to notify (" << workerId << ") is not sleeping -> skip notifying");
return;
}
// NotifyAll since we observe that the specific worker has changes its state
while (state.load(std::memory_order_relaxed) == SleeperState::Notified &&
!isRuntimeTerminating()) {
notifyAll<callerEnvironment>(); notifyAll<callerEnvironment>();
// clang-tidy bug workaround
// see:
// https://stackoverflow.com/questions/65564677/clang-tidy-parsing-error-at-spaceship-operator
std::this_thread::sleep_until(std::chrono::system_clock::now() +
std::chrono::milliseconds(1));
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
} }
} }
...@@ -149,7 +165,7 @@ class AbstractSemaphoreWorkerSleepStrategy ...@@ -149,7 +165,7 @@ class AbstractSemaphoreWorkerSleepStrategy
AbstractSemaphoreWorkerSleepStrategy(Runtime& runtime, workerid_t workerCount) AbstractSemaphoreWorkerSleepStrategy(Runtime& runtime, workerid_t workerCount)
: workerCount(workerCount), stats(runtime) { : workerCount(workerCount), stats(runtime) {
if constexpr (useGenericNotifySpecificImpl) { if constexpr (useGenericNotifySpecificImpl) {
notifiedFlags = new std::atomic<bool>[workerCount]; states = new (std::align_val_t(CACHE_LINE_SIZE)) std::atomic<SleeperState>[workerCount];
} }
if constexpr (semNeedsInit) { if constexpr (semNeedsInit) {
...@@ -157,11 +173,7 @@ class AbstractSemaphoreWorkerSleepStrategy ...@@ -157,11 +173,7 @@ class AbstractSemaphoreWorkerSleepStrategy
} }
} }
~AbstractSemaphoreWorkerSleepStrategy() { ~AbstractSemaphoreWorkerSleepStrategy() { delete[] states; }
if constexpr (useGenericNotifySpecificImpl) {
delete[] notifiedFlags;
}
}
[[nodiscard]] inline auto getSleeping() const -> long { return wakeupSem.getValue(); } [[nodiscard]] inline auto getSleeping() const -> long { return wakeupSem.getValue(); }
...@@ -199,11 +211,14 @@ class AbstractSemaphoreWorkerSleepStrategy ...@@ -199,11 +211,14 @@ class AbstractSemaphoreWorkerSleepStrategy
template <CallerEnvironment callerEnvironment> template <CallerEnvironment callerEnvironment>
void notifySpecific(workerid_t workerId) { void notifySpecific(workerid_t workerId) {
LOGD("specifically notify worker " << workerId << " from " << callerEnvironment);
if constexpr (semHasNotifySpecific) { if constexpr (semHasNotifySpecific) {
wakeupSem.notifySpecific(workerId); wakeupSem.notifySpecific(workerId);
} else { } else {
genericNotifySpecific<callerEnvironment>(workerId); genericNotifySpecific<callerEnvironment>(workerId);
} }
stats.incNotify(); stats.incNotify();
stats.incNotifications(); stats.incNotifications();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment