diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 2f3a1cd891fbe2e62135bd8b1d77214e8cc2a859..2772b39920e307866ece3f48e320a3e8cfdbb731 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -17,8 +17,11 @@ #include <vector> // for vector #include "CallerEnvironment.hpp" +#include "Common.hpp" #include "Context.hpp" #include "Debug.hpp" +#include "FiberHint.hpp" +#include "FiberSource.hpp" #include "Scheduler.hpp" // for Scheduler #include "WakeupStrategy.hpp" #include "Worker.hpp" @@ -132,11 +135,28 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { return workerSleepStrategy; } - template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - inline void wakeupSleepingWorkers() { - workerid_t wakeupCount = wakeupStrategy.getWakeupCount<callerEnvironment>(); - if (wakeupCount) { - workerSleepStrategy.notifyMany<callerEnvironment>(wakeupCount); + template <CallerEnvironment callerEnvironment> + inline void wakeupSleepingWorkers(emper::FiberHint hint) { + LOGD("Wake sleepers from " << callerEnvironment << " cause new work: " << hint); + switch (hint.getSource()) { + // We scheduled to an inbox where the fiber lives exclusively -> wake up the + // specific worker if necessary + case emper::FiberSource::inbox: { + workerSleepStrategy.notifySpecific<callerEnvironment>(hint.getWorker()); + } break; + + // We scheduled to a work-stealing or the anywhereQueue, where anyone can + // access this fiber -> wake up anyone if appropriate + case emper::FiberSource::local: + case emper::FiberSource::anywhereQueue: { + workerid_t wakeupCount = wakeupStrategy.getWakeupCount<callerEnvironment>(); + if (wakeupCount) { + workerSleepStrategy.notifyMany<callerEnvironment>(wakeupCount); + } + } break; + + default: + DIE_MSG("Unexpected FiberSource encountered during wakeupSleepingWorkers"); } } @@ -196,7 +216,9 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { * * For now scheduleOn is only available from within th EMPER runtime. */ - inline void scheduleOn(Fiber& fiber, workerid_t workerId) { scheduler.scheduleOn(fiber, workerId); } + inline void scheduleOn(Fiber& fiber, workerid_t workerId) { + scheduler.scheduleOn(fiber, workerId); + } void yield(); // https://stackoverflow.com/a/3747462/194894 diff --git a/emper/Scheduler.cpp b/emper/Scheduler.cpp index 44c07883b247d295cadb2a6cf8d51edafd748cf1..e9de80ef9353fefe6b62b0e8b5af2c32a992b2d3 100644 --- a/emper/Scheduler.cpp +++ b/emper/Scheduler.cpp @@ -2,7 +2,10 @@ // Copyright © 2020-2021 Florian Schmaus #include "Scheduler.hpp" +#include <stdexcept> + #include "CallerEnvironment.hpp" +#include "Common.hpp" #include "Runtime.hpp" #include "RuntimeStrategy.hpp" #include "WakeupStrategy.hpp" @@ -15,10 +18,14 @@ void Scheduler::addNewWorkerHook(const std::function<void(workerid_t)>& hook) { } template <CallerEnvironment callerEnvironment> -void Scheduler::wakeupSleepingWorkers() { - runtime.wakeupSleepingWorkers<callerEnvironment>(); +void Scheduler::wakeupSleepingWorkers(emper::FiberHint hint) { + runtime.wakeupSleepingWorkers<callerEnvironment>(hint); } +// show the compiler our template incarnations +template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::EMPER>(emper::FiberHint hint); +template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::ANYWHERE>(emper::FiberHint hint); + void Scheduler::scheduleOn(Fiber& fiber, workerid_t workerId) { // Calling scheduleOn() only works from within the EMPER runtime. emper::assertInRuntime(); @@ -31,7 +38,3 @@ void Scheduler::scheduleOn(Fiber& fiber, workerid_t workerId) { } void Scheduler::notifyRuntimeAboutWorkStolen() { runtime.wakeupStrategy.onWorkStolen(); } - -// show the compiler our template incarnations -template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::EMPER>(); -template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::ANYWHERE>(); diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index 26f303c49f78b45ea31b9ae38df3650b053d26e4..57204e267655036ededee7b963fdf78132df6589 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -11,7 +11,8 @@ #include "Debug.hpp" // for LogSubsystem, LogSubsystem::SCHED, Logger #include "Dispatcher.hpp" #include "Emper.hpp" -#include "Fiber.hpp" // for Fiber +#include "Fiber.hpp" +#include "FiberHint.hpp" #include "emper-common.h" // for workeraffinity_t #include "lib/adt/LockedUnboundedQueue.hpp" @@ -24,7 +25,7 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { lib::adt::LockedUnboundedQueue<Fiber> scheduleAnywhereQueue; template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - void wakeupSleepingWorkers(); + void wakeupSleepingWorkers(emper::FiberHint hint); void notifyRuntimeAboutWorkStolen(); @@ -44,10 +45,10 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { static inline void increaseRefCount(Fiber& fiber) { fiber.doAtomicIncrRefCount(); } - template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - inline void onNewWork() { + template <CallerEnvironment callerEnvironment> + inline void onNewWork(emper::FiberHint hint) { if constexpr (emper::WORKER_SLEEP) { - wakeupSleepingWorkers<callerEnvironment>(); + wakeupSleepingWorkers<callerEnvironment>(hint); } } diff --git a/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp b/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp index 47a8e6e69002ab467dd2ac28dc0fe7ccf91f943b..13c473374df29f2db5e43c28f546d7416d6a55c9 100644 --- a/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp +++ b/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp @@ -9,9 +9,6 @@ namespace emper::sleep_strategy { -static constexpr bool needsNotifySpecific = - (emper::IO_COMPLETER_BEHAVIOR == emper::IoCompleterBehavior::wakeup); - template <class T> class AbstractWorkerSleepStrategy { [[nodiscard]] inline auto getSleeping() const -> long { @@ -35,12 +32,7 @@ class AbstractWorkerSleepStrategy { template <CallerEnvironment callerEnvironment> void notifySpecific(workerid_t workerId) { - if constexpr (needsNotifySpecific) { - static_cast<T*>(this)->template notifySpecific<callerEnvironment>(workerId); - } else { - throw std::logic_error( - "Called SemaphoreWorkerSleepStrategy::notifySpecific but needsNotifySpecific is false"); - } + static_cast<T*>(this)->template notifySpecific<callerEnvironment>(workerId); } void sleep() { static_cast<T*>(this)->sleep(); } diff --git a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp index 188f0152fb777f80dc560065551d584b90eebd7b..142ccffacd6cf838373ef7c15250a0dae70cf2c8 100644 --- a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp +++ b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp @@ -46,7 +46,7 @@ class AbstractSemaphoreWorkerSleepStrategy static constexpr bool semHasNotifySpecific = requires(Sem s) { s.notifySpecific(0); }; // should the generic notifySpecific implementation be used - static constexpr bool useGenericNotifySpecificImpl = !semHasNotifySpecific && needsNotifySpecific; + static constexpr bool useGenericNotifySpecificImpl = !semHasNotifySpecific; // Member used for the generic notifySpecific implementation std::atomic<bool>* notifiedFlags; @@ -98,8 +98,6 @@ class AbstractSemaphoreWorkerSleepStrategy // only necessary if the runtime actually wants to notify a specific worker. // This should be configured at compile time using a boolean template parameter of the // WorkerWakeupStrategy (<bool needsNotifySpecific>). - // - // For now we hardcode a constexpr check for the only condition where we need notifySpecific template <CallerEnvironment callerEnvironment> inline void genericNotifySpecific(workerid_t workerId) { auto& notifiedFlag = notifiedFlags[workerId]; diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index afc4593d833ef86cdd394c1f910f59dc2d7aba2e..00f174e15626d0b6901d72e4eb66c9de1ea9a4ce 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -66,16 +66,16 @@ void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) { // Classes using this method are supposed to always invoke this // method. Hence we call onNewWork() here. - onNewWork(); + onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{emper::FiberSource::local}); } void AbstractWorkStealingScheduler::scheduleToInbox(Fiber& fiber, workerid_t workerId) { inboxes[workerId]->enqueue(&fiber); emper::statsIncr(awss::stats.scheduledFibersToInbox); - // Classes using this method are supposed to always invoke this - // method. Hence we call onNewWork() here. - onNewWork(); + // Notify the runtime that we scheduled work to a specific workers' inbox + // the runtime is responsible to notify it if necessary. + onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{workerId, emper::FiberSource::inbox}); } auto AbstractWorkStealingScheduler::maybeRecycle(Fiber* fiber) -> bool { @@ -130,7 +130,7 @@ auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() -> std::optional } if (fibersLiftedFromAnywhere) { - onNewWork(); + onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{emper::FiberSource::local}); if constexpr (emper::STATS) { awss::stats.recordFibersLiftedFromAnywhereQueue(fibersLiftedFromAnywhere); diff --git a/emper/strategies/laws/LawsScheduler.cpp b/emper/strategies/laws/LawsScheduler.cpp index 8f846b5429822e6b54cdc0ee269a8b88d55840e7..82cb3c02216724d85a89807397b1d1b7f865016b 100644 --- a/emper/strategies/laws/LawsScheduler.cpp +++ b/emper/strategies/laws/LawsScheduler.cpp @@ -4,6 +4,8 @@ #include "CallerEnvironment.hpp" #include "Emper.hpp" +#include "FiberHint.hpp" +#include "FiberSource.hpp" #include "LawsStrategy.hpp" // IWYU pragma: keep #include "NextFiberResult.hpp" #include "Runtime.hpp" @@ -48,7 +50,7 @@ void LawsScheduler::scheduleInternal(Fiber& fiber) { void LawsScheduler::scheduleFromAnywhereInternal(Fiber& fiber) { tryScheduleToPriorityQueue<CallerEnvironment::ANYWHERE>(fiber); enqueueInAnywhereQueue(fiber); - onNewWork<CallerEnvironment::ANYWHERE>(); + onNewWork<CallerEnvironment::ANYWHERE>(emper::FiberHint{emper::FiberSource::anywhereQueue}); } void LawsScheduler::scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) { @@ -57,7 +59,7 @@ void LawsScheduler::scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) tryScheduleToPriorityQueue<CallerEnvironment::ANYWHERE>(fiber); } insertInAnywhereQueue(fibers, count); - onNewWork<CallerEnvironment::ANYWHERE>(); + onNewWork<CallerEnvironment::ANYWHERE>(emper::FiberHint{emper::FiberSource::anywhereQueue}); } auto LawsScheduler::nextFiber() -> std::optional<NextFiberResult> { diff --git a/emper/strategies/ws/WsScheduler.hpp b/emper/strategies/ws/WsScheduler.hpp index d9dc87d62a50e202be36f6bd5d36384c06bf481c..5aa22c1b0e06e25c655c0993ecd69241b68e260c 100644 --- a/emper/strategies/ws/WsScheduler.hpp +++ b/emper/strategies/ws/WsScheduler.hpp @@ -5,6 +5,8 @@ #include <optional> #include "CallerEnvironment.hpp" +#include "FiberHint.hpp" +#include "FiberSource.hpp" #include "NextFiberResult.hpp" #include "emper-common.h" #include "strategies/AbstractWorkStealingScheduler.hpp" @@ -21,12 +23,12 @@ class WsScheduler : public AbstractWorkStealingScheduler { } void scheduleFromAnywhereInternal(Fiber& fiber) override { enqueueInAnywhereQueue(fiber); - onNewWork<CallerEnvironment::ANYWHERE>(); + onNewWork<CallerEnvironment::ANYWHERE>(emper::FiberHint{emper::FiberSource::anywhereQueue}); } void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) override { insertInAnywhereQueue(fibers, count); - onNewWork<CallerEnvironment::ANYWHERE>(); + onNewWork<CallerEnvironment::ANYWHERE>(emper::FiberHint{emper::FiberSource::anywhereQueue}); } public: