diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index 13157322348beca93d27049c3eb50c0b101a8690..d71ba18b813d1be89113d6bff27520315366b1aa 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -3,6 +3,7 @@ #pragma once #include <functional> // for function +#include <ostream> #include "CallerEnvironment.hpp" #include "Debug.hpp" // for LogSubsystem, LogSubsystem::SCHED, Logger @@ -51,14 +52,16 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { auto dequeueFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); } virtual void scheduleInternal(Fiber& fiber) = 0; + virtual void scheduleFromAnywhereInternal(Fiber& fiber) = 0; + virtual void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) = 0; public: void schedule(Fiber& fiber) { LOGD("Scheduling fiber " << &fiber); - scheduleInternal(fiber); } + // TODO: maybe this should also be a specialized function void schedule(Fiber** fibers, unsigned count) { for (unsigned i = 0; i < count; ++i) { Fiber& fiber = *fibers[i]; @@ -70,16 +73,11 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { virtual auto nextFiber() -> NextFiberResult = 0; void scheduleFromAnywhere(Fiber& fiber) { - enqueueInAnywhereQueue(fiber); - - onNewWork<CallerEnvironment::ANYWHERE>(); + LOGD("Scheduling fiber " << &fiber << " from anywhere"); + scheduleFromAnywhereInternal(fiber); } - // TODO: investigate if it is still a good idea to wakeup only a single - // worker maybe we want something like onNewWork(amountOfWork) void scheduleFromAnywhere(Fiber** fibers, unsigned count) { - insertInAnywhereQueue(fibers, count); - - onNewWork<CallerEnvironment::ANYWHERE>(); + scheduleFromAnywhereInternal(fibers, count); } }; diff --git a/emper/strategies/laws/LawsScheduler.cpp b/emper/strategies/laws/LawsScheduler.cpp index 6ea84c23fef24c9f9ad52be0b17303c2718039df..3bf7079a8e0bc7d0ba625c792d91f53d8c1008be 100644 --- a/emper/strategies/laws/LawsScheduler.cpp +++ b/emper/strategies/laws/LawsScheduler.cpp @@ -4,6 +4,7 @@ #include <cstdint> +#include "CallerEnvironment.hpp" #include "Emper.hpp" #include "LawsStrategy.hpp" // IWYU pragma: keep #include "NextFiberResult.hpp" @@ -21,17 +22,17 @@ LawsScheduler::LawsScheduler(Runtime& runtime) : AbstractWorkStealingScheduler(r addNewWorkerHook(newWorkerHook); } -void LawsScheduler::scheduleInternal(Fiber& fiber) { +void LawsScheduler::tryScheduleToPriorityQueue(Fiber& fiber) { workeraffinity_t* const affinity_buffer = getAffinityBuffer(fiber); if (affinity_buffer) { workeraffinity_t affinity = *affinity_buffer; workerid_t workerId = Runtime::getWorkerId(); if (affinity == workerId) { - goto scheduleViaWorkStealing; + return; } if (affinity == Fiber::NOT_AFFINE) { - goto scheduleViaWorkStealing; + return; } // We found a fiber to schedule on a remote prority queue. @@ -40,11 +41,28 @@ void LawsScheduler::scheduleInternal(Fiber& fiber) { emper::statsIncr(LawsStrategy::stats.scheduledFibersToPriority); } +} -scheduleViaWorkStealing: +void LawsScheduler::scheduleInternal(Fiber& fiber) { + tryScheduleToPriorityQueue(fiber); scheduleViaWorkStealing(fiber); } +void LawsScheduler::scheduleFromAnywhereInternal(Fiber& fiber) { + tryScheduleToPriorityQueue(fiber); + enqueueInAnywhereQueue(fiber); + onNewWork<CallerEnvironment::ANYWHERE>(); +} + +void LawsScheduler::scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) { + for (unsigned i = 0; i < count; ++i) { + Fiber& fiber = *fibers[i]; + tryScheduleToPriorityQueue(fiber); + } + insertInAnywhereQueue(fibers, count); + onNewWork<CallerEnvironment::ANYWHERE>(); +} + auto LawsScheduler::nextFiber() -> NextFiberResult { Fiber* fiber = priorityQueue.dequeue(); if (fiber != nullptr) { diff --git a/emper/strategies/laws/LawsScheduler.hpp b/emper/strategies/laws/LawsScheduler.hpp index d95a4bf06146caaca0af83b35bfd0183108d4019..43f7f12b2cf71ac7cc79ef6b319dc208e5a154cc 100644 --- a/emper/strategies/laws/LawsScheduler.hpp +++ b/emper/strategies/laws/LawsScheduler.hpp @@ -17,8 +17,12 @@ class LawsScheduler : public AbstractWorkStealingScheduler { static thread_local LawsMpscQueue priorityQueue; + void tryScheduleToPriorityQueue(Fiber& fiber); + protected: void scheduleInternal(Fiber& fiber) override; + void scheduleFromAnywhereInternal(Fiber& fiber) override; + void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) override; public: LawsScheduler(Runtime& runtime); diff --git a/emper/strategies/ws/WsScheduler.hpp b/emper/strategies/ws/WsScheduler.hpp index 392b0aff0c7f9742be48efaff3ef94ef0d2784a4..c122f301a6cc817a9fe567154bde5b8ac7ca1707 100644 --- a/emper/strategies/ws/WsScheduler.hpp +++ b/emper/strategies/ws/WsScheduler.hpp @@ -2,6 +2,7 @@ // Copyright © 2020-2021 Florian Schmaus #pragma once +#include "CallerEnvironment.hpp" #include "NextFiberResult.hpp" #include "strategies/AbstractWorkStealingScheduler.hpp" @@ -11,6 +12,15 @@ class Runtime; class WsScheduler : public AbstractWorkStealingScheduler { protected: void scheduleInternal(Fiber& fiber) override { scheduleViaWorkStealing(fiber); } + void scheduleFromAnywhereInternal(Fiber& fiber) override { + enqueueInAnywhereQueue(fiber); + onNewWork<CallerEnvironment::ANYWHERE>(); + } + + void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) override { + insertInAnywhereQueue(fibers, count); + onNewWork<CallerEnvironment::ANYWHERE>(); + } public: WsScheduler(Runtime& runtime);