From 207fba4d9df2b3b0fc5f916579246c128a352123 Mon Sep 17 00:00:00 2001 From: Florian Fischer <florian.fischer@muhq.space> Date: Tue, 21 Dec 2021 16:09:07 +0100 Subject: [PATCH] pass a fiberHint through the onNewWork notifications The FiberHint is needed to decide in the runtime which worker to wake up. * Hint(Worker, FiberSource::inbox) -> try to notify the specific worker * Hint(FiberSource::{local,anywhereQueue}) -> notify anyone The first case is needed because due to the new worker local inbox queues we must notify the worker of the queue to prevent sleep locks. The SemaphoreSleepStrategy already has a notifySpecific implementation but it is very naive badly and we should implement new ones. The second case is the what the runtime has done before. Its WakeupStrategy decides how many workers the SleepStrategy should wake up. Also remove default CallerEnvironment template parameters to prevent errors where the CallerEnvironment was forgotten and not passed on a call side. --- emper/Runtime.hpp | 34 +++++++++++++++---- emper/Scheduler.cpp | 15 ++++---- emper/Scheduler.hpp | 11 +++--- .../AbstractWorkerSleepStrategy.hpp | 10 +----- .../SemaphoreWorkerSleepStrategy.hpp | 4 +-- .../AbstractWorkStealingScheduler.cpp | 10 +++--- emper/strategies/laws/LawsScheduler.cpp | 6 ++-- emper/strategies/ws/WsScheduler.hpp | 6 ++-- 8 files changed, 58 insertions(+), 38 deletions(-) diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 2f3a1cd8..2772b399 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -17,8 +17,11 @@ #include <vector> // for vector #include "CallerEnvironment.hpp" +#include "Common.hpp" #include "Context.hpp" #include "Debug.hpp" +#include "FiberHint.hpp" +#include "FiberSource.hpp" #include "Scheduler.hpp" // for Scheduler #include "WakeupStrategy.hpp" #include "Worker.hpp" @@ -132,11 +135,28 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { return workerSleepStrategy; } - template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - inline void wakeupSleepingWorkers() { - workerid_t wakeupCount = wakeupStrategy.getWakeupCount<callerEnvironment>(); - if (wakeupCount) { - workerSleepStrategy.notifyMany<callerEnvironment>(wakeupCount); + template <CallerEnvironment callerEnvironment> + inline void wakeupSleepingWorkers(emper::FiberHint hint) { + LOGD("Wake sleepers from " << callerEnvironment << " cause new work: " << hint); + switch (hint.getSource()) { + // We scheduled to an inbox where the fiber lives exclusively -> wake up the + // specific worker if necessary + case emper::FiberSource::inbox: { + workerSleepStrategy.notifySpecific<callerEnvironment>(hint.getWorker()); + } break; + + // We scheduled to a work-stealing or the anywhereQueue, where anyone can + // access this fiber -> wake up anyone if appropriate + case emper::FiberSource::local: + case emper::FiberSource::anywhereQueue: { + workerid_t wakeupCount = wakeupStrategy.getWakeupCount<callerEnvironment>(); + if (wakeupCount) { + workerSleepStrategy.notifyMany<callerEnvironment>(wakeupCount); + } + } break; + + default: + DIE_MSG("Unexpected FiberSource encountered during wakeupSleepingWorkers"); } } @@ -196,7 +216,9 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { * * For now scheduleOn is only available from within th EMPER runtime. */ - inline void scheduleOn(Fiber& fiber, workerid_t workerId) { scheduler.scheduleOn(fiber, workerId); } + inline void scheduleOn(Fiber& fiber, workerid_t workerId) { + scheduler.scheduleOn(fiber, workerId); + } void yield(); // https://stackoverflow.com/a/3747462/194894 diff --git a/emper/Scheduler.cpp b/emper/Scheduler.cpp index 44c07883..e9de80ef 100644 --- a/emper/Scheduler.cpp +++ b/emper/Scheduler.cpp @@ -2,7 +2,10 @@ // Copyright © 2020-2021 Florian Schmaus #include "Scheduler.hpp" +#include <stdexcept> + #include "CallerEnvironment.hpp" +#include "Common.hpp" #include "Runtime.hpp" #include "RuntimeStrategy.hpp" #include "WakeupStrategy.hpp" @@ -15,10 +18,14 @@ void Scheduler::addNewWorkerHook(const std::function<void(workerid_t)>& hook) { } template <CallerEnvironment callerEnvironment> -void Scheduler::wakeupSleepingWorkers() { - runtime.wakeupSleepingWorkers<callerEnvironment>(); +void Scheduler::wakeupSleepingWorkers(emper::FiberHint hint) { + runtime.wakeupSleepingWorkers<callerEnvironment>(hint); } +// show the compiler our template incarnations +template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::EMPER>(emper::FiberHint hint); +template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::ANYWHERE>(emper::FiberHint hint); + void Scheduler::scheduleOn(Fiber& fiber, workerid_t workerId) { // Calling scheduleOn() only works from within the EMPER runtime. emper::assertInRuntime(); @@ -31,7 +38,3 @@ void Scheduler::scheduleOn(Fiber& fiber, workerid_t workerId) { } void Scheduler::notifyRuntimeAboutWorkStolen() { runtime.wakeupStrategy.onWorkStolen(); } - -// show the compiler our template incarnations -template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::EMPER>(); -template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::ANYWHERE>(); diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index 26f303c4..57204e26 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -11,7 +11,8 @@ #include "Debug.hpp" // for LogSubsystem, LogSubsystem::SCHED, Logger #include "Dispatcher.hpp" #include "Emper.hpp" -#include "Fiber.hpp" // for Fiber +#include "Fiber.hpp" +#include "FiberHint.hpp" #include "emper-common.h" // for workeraffinity_t #include "lib/adt/LockedUnboundedQueue.hpp" @@ -24,7 +25,7 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { lib::adt::LockedUnboundedQueue<Fiber> scheduleAnywhereQueue; template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - void wakeupSleepingWorkers(); + void wakeupSleepingWorkers(emper::FiberHint hint); void notifyRuntimeAboutWorkStolen(); @@ -44,10 +45,10 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { static inline void increaseRefCount(Fiber& fiber) { fiber.doAtomicIncrRefCount(); } - template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - inline void onNewWork() { + template <CallerEnvironment callerEnvironment> + inline void onNewWork(emper::FiberHint hint) { if constexpr (emper::WORKER_SLEEP) { - wakeupSleepingWorkers<callerEnvironment>(); + wakeupSleepingWorkers<callerEnvironment>(hint); } } diff --git a/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp b/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp index 47a8e6e6..13c47337 100644 --- a/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp +++ b/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp @@ -9,9 +9,6 @@ namespace emper::sleep_strategy { -static constexpr bool needsNotifySpecific = - (emper::IO_COMPLETER_BEHAVIOR == emper::IoCompleterBehavior::wakeup); - template <class T> class AbstractWorkerSleepStrategy { [[nodiscard]] inline auto getSleeping() const -> long { @@ -35,12 +32,7 @@ class AbstractWorkerSleepStrategy { template <CallerEnvironment callerEnvironment> void notifySpecific(workerid_t workerId) { - if constexpr (needsNotifySpecific) { - static_cast<T*>(this)->template notifySpecific<callerEnvironment>(workerId); - } else { - throw std::logic_error( - "Called SemaphoreWorkerSleepStrategy::notifySpecific but needsNotifySpecific is false"); - } + static_cast<T*>(this)->template notifySpecific<callerEnvironment>(workerId); } void sleep() { static_cast<T*>(this)->sleep(); } diff --git a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp index 188f0152..142ccffa 100644 --- a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp +++ b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp @@ -46,7 +46,7 @@ class AbstractSemaphoreWorkerSleepStrategy static constexpr bool semHasNotifySpecific = requires(Sem s) { s.notifySpecific(0); }; // should the generic notifySpecific implementation be used - static constexpr bool useGenericNotifySpecificImpl = !semHasNotifySpecific && needsNotifySpecific; + static constexpr bool useGenericNotifySpecificImpl = !semHasNotifySpecific; // Member used for the generic notifySpecific implementation std::atomic<bool>* notifiedFlags; @@ -98,8 +98,6 @@ class AbstractSemaphoreWorkerSleepStrategy // only necessary if the runtime actually wants to notify a specific worker. // This should be configured at compile time using a boolean template parameter of the // WorkerWakeupStrategy (<bool needsNotifySpecific>). - // - // For now we hardcode a constexpr check for the only condition where we need notifySpecific template <CallerEnvironment callerEnvironment> inline void genericNotifySpecific(workerid_t workerId) { auto& notifiedFlag = notifiedFlags[workerId]; diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index afc4593d..00f174e1 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -66,16 +66,16 @@ void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) { // Classes using this method are supposed to always invoke this // method. Hence we call onNewWork() here. - onNewWork(); + onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{emper::FiberSource::local}); } void AbstractWorkStealingScheduler::scheduleToInbox(Fiber& fiber, workerid_t workerId) { inboxes[workerId]->enqueue(&fiber); emper::statsIncr(awss::stats.scheduledFibersToInbox); - // Classes using this method are supposed to always invoke this - // method. Hence we call onNewWork() here. - onNewWork(); + // Notify the runtime that we scheduled work to a specific workers' inbox + // the runtime is responsible to notify it if necessary. + onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{workerId, emper::FiberSource::inbox}); } auto AbstractWorkStealingScheduler::maybeRecycle(Fiber* fiber) -> bool { @@ -130,7 +130,7 @@ auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() -> std::optional } if (fibersLiftedFromAnywhere) { - onNewWork(); + onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{emper::FiberSource::local}); if constexpr (emper::STATS) { awss::stats.recordFibersLiftedFromAnywhereQueue(fibersLiftedFromAnywhere); diff --git a/emper/strategies/laws/LawsScheduler.cpp b/emper/strategies/laws/LawsScheduler.cpp index 8f846b54..82cb3c02 100644 --- a/emper/strategies/laws/LawsScheduler.cpp +++ b/emper/strategies/laws/LawsScheduler.cpp @@ -4,6 +4,8 @@ #include "CallerEnvironment.hpp" #include "Emper.hpp" +#include "FiberHint.hpp" +#include "FiberSource.hpp" #include "LawsStrategy.hpp" // IWYU pragma: keep #include "NextFiberResult.hpp" #include "Runtime.hpp" @@ -48,7 +50,7 @@ void LawsScheduler::scheduleInternal(Fiber& fiber) { void LawsScheduler::scheduleFromAnywhereInternal(Fiber& fiber) { tryScheduleToPriorityQueue<CallerEnvironment::ANYWHERE>(fiber); enqueueInAnywhereQueue(fiber); - onNewWork<CallerEnvironment::ANYWHERE>(); + onNewWork<CallerEnvironment::ANYWHERE>(emper::FiberHint{emper::FiberSource::anywhereQueue}); } void LawsScheduler::scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) { @@ -57,7 +59,7 @@ void LawsScheduler::scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) tryScheduleToPriorityQueue<CallerEnvironment::ANYWHERE>(fiber); } insertInAnywhereQueue(fibers, count); - onNewWork<CallerEnvironment::ANYWHERE>(); + onNewWork<CallerEnvironment::ANYWHERE>(emper::FiberHint{emper::FiberSource::anywhereQueue}); } auto LawsScheduler::nextFiber() -> std::optional<NextFiberResult> { diff --git a/emper/strategies/ws/WsScheduler.hpp b/emper/strategies/ws/WsScheduler.hpp index d9dc87d6..5aa22c1b 100644 --- a/emper/strategies/ws/WsScheduler.hpp +++ b/emper/strategies/ws/WsScheduler.hpp @@ -5,6 +5,8 @@ #include <optional> #include "CallerEnvironment.hpp" +#include "FiberHint.hpp" +#include "FiberSource.hpp" #include "NextFiberResult.hpp" #include "emper-common.h" #include "strategies/AbstractWorkStealingScheduler.hpp" @@ -21,12 +23,12 @@ class WsScheduler : public AbstractWorkStealingScheduler { } void scheduleFromAnywhereInternal(Fiber& fiber) override { enqueueInAnywhereQueue(fiber); - onNewWork<CallerEnvironment::ANYWHERE>(); + onNewWork<CallerEnvironment::ANYWHERE>(emper::FiberHint{emper::FiberSource::anywhereQueue}); } void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) override { insertInAnywhereQueue(fibers, count); - onNewWork<CallerEnvironment::ANYWHERE>(); + onNewWork<CallerEnvironment::ANYWHERE>(emper::FiberHint{emper::FiberSource::anywhereQueue}); } public: -- GitLab