From 249931751a40bbded0c8567ce8c975a847a0c507 Mon Sep 17 00:00:00 2001 From: Florian Fischer <florian.fischer@muhq.space> Date: Mon, 20 Dec 2021 12:16:47 +0100 Subject: [PATCH] introduce new Scheduler::scheduleOn(fiber, workerId) function This function is needed to deal with worker local ressources: io_uring requests for example. Each worker now always has a MPSC inbox queue which was already used in the laws scheduling strategy. Fibers can be scheduled to a specific worker using the new Scheduler::scheduleOn method. Since the inbox queues are now always present we can use a single FiberSource enum combining AbstractWorkStealingStrategy::FiberSource and LawsStrategy::FiberSource. The laws strategy now uses the inbox queues as its priority queues. With the only differenze that when scheduling to a inbox queue using the Scheduler::scheduleOn the Fiber lifes only in the inbox queue and not also simultaneously in a WSQ. Unrelated code changes made while touching the code anyway: * Introduce FiberSource::io which hints that a Fiber comes from the worker's own CQ. * Strongly type the fiber's source in NextFiberResult. * Make all scheduler functions return std::optional<NextFiberResult> * Cleanup the identation in nextFiberResultViaWorkStealing --- emper/FiberSource.cpp | 30 ++++ emper/FiberSource.hpp | 28 ++++ emper/NextFiberResult.hpp | 4 +- emper/Runtime.cpp | 8 +- emper/Runtime.hpp | 15 +- emper/Scheduler.cpp | 11 ++ emper/Scheduler.hpp | 6 +- emper/meson.build | 1 + .../AbstractWorkStealingScheduler.cpp | 151 ++++++++++-------- .../AbstractWorkStealingScheduler.hpp | 32 ++-- .../strategies/AbstractWorkStealingStats.cpp | 2 + .../AbstractWorkStealingWorkerStats.cpp | 1 + .../AbstractWorkStealingWorkerStats.hpp | 1 + emper/strategies/laws/LawsDispatcher.cpp | 29 ++-- emper/strategies/laws/LawsScheduler.cpp | 24 +-- emper/strategies/laws/LawsScheduler.hpp | 14 +- emper/strategies/laws/LawsStrategy.hpp | 12 -- emper/strategies/ws/WsDispatcher.cpp | 10 +- emper/strategies/ws/WsScheduler.hpp | 10 +- tests/ScheduleOnTest.cpp | 35 ++++ tests/ScheduleOnUnknownWorkerTest.cpp | 17 ++ tests/meson.build | 15 ++ 22 files changed, 301 insertions(+), 155 deletions(-) create mode 100644 emper/FiberSource.cpp create mode 100644 emper/FiberSource.hpp create mode 100644 tests/ScheduleOnTest.cpp create mode 100644 tests/ScheduleOnUnknownWorkerTest.cpp diff --git a/emper/FiberSource.cpp b/emper/FiberSource.cpp new file mode 100644 index 00000000..41081336 --- /dev/null +++ b/emper/FiberSource.cpp @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "FiberSource.hpp" + +#include <iostream> + +#include "Common.hpp" + +auto operator<<(std::ostream& os, const emper::FiberSource& fiberSource) -> std::ostream& { + switch (fiberSource) { + case emper::FiberSource::local: + return os << "local"; + case emper::FiberSource::inbox: + return os << "inbox"; + case emper::FiberSource::stolen: + return os << "stolen"; + case emper::FiberSource::io: + return os << "io"; + case emper::FiberSource::ioStolen: + return os << "ioStolen"; + case emper::FiberSource::anywhereQueue: + return os << "anywhereQueue"; + case emper::FiberSource::hintWsq: + return os << "hintWsq"; + case emper::FiberSource::hintAq: + return os << "hintAq"; + default: + DIE_MSG("Unknown FiberSource"); + } +} diff --git a/emper/FiberSource.hpp b/emper/FiberSource.hpp new file mode 100644 index 00000000..e721fe1b --- /dev/null +++ b/emper/FiberSource.hpp @@ -0,0 +1,28 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus, Florian Fischer +#pragma once + +#include <cstdint> +#include <iostream> + +namespace emper { +/** + * @brief Descriptor for all the possible locations a Fiber can be obtained from + * + * This enum is used to collect stats, create hints where fibers are and + * make informed notification or scheduling decisions. + */ +enum class FiberSource : uintptr_t { + local, /*!< A worker's own work-stealing queue */ + inbox, /*!< A worker's own inbox or priority queue */ + stolen, /*!< A other worker's work-stealing queue */ + io, /*!< A worker's own io_uring completion queue */ + ioStolen, /*!< A other worker's io_uring completion queue */ + anywhereQueue, /*!< The anywhere queue */ + hintWsq, /*!< A known other worker's work-stealing queue */ + hintAq, /*!< Straight from the anywhere queue */ +}; + +} // namespace emper + +auto operator<<(std::ostream& os, const emper::FiberSource& fiberSource) -> std::ostream&; diff --git a/emper/NextFiberResult.hpp b/emper/NextFiberResult.hpp index 15ab40a3..9b927e14 100644 --- a/emper/NextFiberResult.hpp +++ b/emper/NextFiberResult.hpp @@ -4,9 +4,11 @@ #include <cstdint> +#include "FiberSource.hpp" + class Fiber; struct NextFiberResult { Fiber* const fiber; - const uintptr_t metadata; + const emper::FiberSource source; }; diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index e9e9ae03..b0bd380b 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -25,6 +25,7 @@ #include "Debug.hpp" // for DBG, ABORT, LOGD, LOGE #include "Emper.hpp" #include "Fiber.hpp" // for Fiber +#include "FiberSource.hpp" #include "NextFiberResult.hpp" #include "RuntimeStrategy.hpp" // for RuntimeStrategy #include "RuntimeStrategyFactory.hpp" @@ -38,7 +39,6 @@ #include "log/LogBuffer.hpp" #include "stats/FromAnywhere.hpp" #include "stats/Worker.hpp" -#include "strategies/AbstractWorkStealingScheduler.hpp" #ifdef EMPER_DEFAULT_SCHEDULING_STRATEGY_WORK_STEALING #include "strategies/ws/WsStrategyFactory.hpp" @@ -357,7 +357,7 @@ void Runtime::yield() { }); } -auto Runtime::nextFiber() -> NextFiberResult { +auto Runtime::nextFiber() -> std::optional<NextFiberResult> { if constexpr (emper::IO_WORKER_URING) { // Schedule all fibers waiting on completed IO IoContext::ContinuationBuffer completions; @@ -368,9 +368,7 @@ auto Runtime::nextFiber() -> NextFiberResult { Fiber* next = completions[0]; schedule(&completions[1], ncompletions - 1); - // TODO: hint that this fiber comes from the IO subsystem - return NextFiberResult{ - next, static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::local)}; + return NextFiberResult{next, emper::FiberSource::io}; } } diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index c8a4bbad..2f3a1cd8 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -19,7 +19,6 @@ #include "CallerEnvironment.hpp" #include "Context.hpp" #include "Debug.hpp" -#include "NextFiberResult.hpp" #include "Scheduler.hpp" // for Scheduler #include "WakeupStrategy.hpp" #include "Worker.hpp" @@ -34,6 +33,7 @@ class RuntimeBuilder; class ContextManager; class Dispatcher; class Fiber; +struct NextFiberResult; class RuntimeStrategy; class RuntimeStrategyFactory; class RuntimeStrategyStats; @@ -161,6 +161,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { } while (!canWake); } + auto nextFiber() -> std::optional<NextFiberResult>; + public: Runtime() : Runtime(getDefaultWorkerCount()) {} @@ -189,11 +191,14 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { scheduler.scheduleFromAnywhere(fibers, count); } + /** + * @brief Schedule a fiber on a specific worker + * + * For now scheduleOn is only available from within th EMPER runtime. + */ + inline void scheduleOn(Fiber& fiber, workerid_t workerId) { scheduler.scheduleOn(fiber, workerId); } void yield(); - // TODO: This should probably not be a public method of Runtime. - auto nextFiber() -> NextFiberResult; - // https://stackoverflow.com/a/3747462/194894 static inline auto rand() -> int { return Worker::rand(); } @@ -224,6 +229,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { void executeAndWait(std::function<void()> f); friend class AbstractWorkStealingScheduler; + friend class WsDispatcher; + friend class LawsDispatcher; template <LogSubsystem> friend class Blockable; friend RuntimeBuilder; diff --git a/emper/Scheduler.cpp b/emper/Scheduler.cpp index f04ca047..44c07883 100644 --- a/emper/Scheduler.cpp +++ b/emper/Scheduler.cpp @@ -19,6 +19,17 @@ void Scheduler::wakeupSleepingWorkers() { runtime.wakeupSleepingWorkers<callerEnvironment>(); } +void Scheduler::scheduleOn(Fiber& fiber, workerid_t workerId) { + // Calling scheduleOn() only works from within the EMPER runtime. + emper::assertInRuntime(); + + if (unlikely(workerId >= runtime.getWorkerCount())) + throw std::runtime_error("WorkerId to big for worker count"); + + LOGD("Scheduling fiber " << &fiber << " on worker " << workerId); + scheduleOnInternal(fiber, workerId); +} + void Scheduler::notifyRuntimeAboutWorkStolen() { runtime.wakeupStrategy.onWorkStolen(); } // show the compiler our template incarnations diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index be9e9a07..26f303c4 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -4,6 +4,7 @@ #include <cstddef> #include <functional> // for function +#include <optional> #include <ostream> #include "CallerEnvironment.hpp" @@ -68,12 +69,13 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { } virtual void scheduleInternal(Fiber& fiber) = 0; + virtual void scheduleOnInternal(Fiber& fiber, workerid_t workerId) = 0; virtual void scheduleFromAnywhereInternal(Fiber& fiber) = 0; virtual void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) = 0; void recycle(Fiber* fiber) { dispatcher.recycle(fiber); }; - virtual auto nextFiber() -> NextFiberResult = 0; + virtual auto nextFiber() -> std::optional<NextFiberResult> = 0; friend class Runtime; @@ -98,6 +100,8 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { } } + void scheduleOn(Fiber& fiber, workerid_t workerId); + void scheduleFromAnywhere(Fiber& fiber) { LOGD("Scheduling fiber " << &fiber << " from anywhere"); scheduleFromAnywhereInternal(fiber); diff --git a/emper/meson.build b/emper/meson.build index fbd3e44c..a4f9da9d 100644 --- a/emper/meson.build +++ b/emper/meson.build @@ -17,6 +17,7 @@ emper_cpp_sources = [ 'Runtime.cpp', 'Emper.cpp', 'Fiber.cpp', + 'FiberSource.cpp', 'FiberManager.cpp', 'Context.cpp', 'Scheduler.cpp', diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index a4a6d251..30f5c1dc 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -1,10 +1,11 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2021 Florian Schmaus +// Copyright © 2021 Florian Schmaus, Florian Fischer #include "AbstractWorkStealingScheduler.hpp" #include <algorithm> #include <array> #include <cassert> +#include <cstdint> #include <ostream> // for operator<<, basic_ostream<>::__ostream_type #include <vector> @@ -13,6 +14,7 @@ #include "Debug.hpp" // for ABORT #include "Emper.hpp" // for OVERFLOW_QUEUE #include "Fiber.hpp" +#include "FiberSource.hpp" #include "NextFiberResult.hpp" #include "Runtime.hpp" // for Runtime #include "StealingResult.hpp" @@ -31,13 +33,19 @@ using emper::io::IoContext; thread_local AbstractWorkStealingScheduler::WsQueue<AbstractWorkStealingScheduler::QUEUE_SIZE> AbstractWorkStealingScheduler::queue; +thread_local AbstractWorkStealingScheduler::InboxQueue AbstractWorkStealingScheduler::inbox; + AbstractWorkStealingScheduler::AbstractWorkStealingScheduler(Runtime& runtime, RuntimeStrategy& strategy) : Scheduler(runtime, strategy) { const workerid_t workerCount = runtime.getWorkerCount(); queues = new AbstractWorkStealingScheduler::WsQueue<QUEUE_SIZE>*[workerCount]; + inboxes = new AbstractWorkStealingScheduler::InboxQueue*[workerCount]; - auto newWorkerHook = [this](workerid_t workerId) { queues[workerId] = &queue; }; + auto newWorkerHook = [this](workerid_t workerId) { + queues[workerId] = &queue; + inboxes[workerId] = &inbox; + }; addNewWorkerHook(newWorkerHook); } @@ -61,6 +69,15 @@ void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) { onNewWork(); } +void AbstractWorkStealingScheduler::scheduleToInbox(Fiber& fiber, workerid_t workerId) { + inboxes[workerId]->enqueue(&fiber); + emper::statsIncr(awss::stats.scheduledFibersToInbox); + + // Classes using this method are supposed to always invoke this + // method. Hence we call onNewWork() here. + onNewWork(); +} + auto AbstractWorkStealingScheduler::maybeRecycle(Fiber* fiber) -> bool { if (fiber->isRunnable()) return false; @@ -68,8 +85,7 @@ auto AbstractWorkStealingScheduler::maybeRecycle(Fiber* fiber) -> bool { return true; } -auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() - -> std::optional<std::pair<Fiber*, FiberSource>> { +auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() -> std::optional<NextFiberResult> { const size_t KEEP_FREE_SLOTS = 64; const size_t DEQUEUE_FROM_ANYWHERE_MAX = 512; @@ -121,13 +137,13 @@ auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() } } - if (res) return std::make_pair(res, FiberSource::anywhereQueue); + if (res) return NextFiberResult{res, emper::FiberSource::anywhereQueue}; return std::nullopt; } auto AbstractWorkStealingScheduler::tryStealFiberFrom(workerid_t victim) - -> std::optional<std::pair<Fiber*, FiberSource>> { + -> std::optional<NextFiberResult> { constexpr int maxRetries = emper::WAITFREE_WORK_STEALING ? 0 : -1; Fiber* fiber; popTop: @@ -137,7 +153,7 @@ popTop: if (maybeRecycle(fiber)) goto popTop; - return std::make_pair(fiber, FiberSource::stolen); + return NextFiberResult{fiber, emper::FiberSource::stolen}; } if constexpr (emper::IO_STEALING) { @@ -145,15 +161,15 @@ popTop: fiber = victimIo->reapSingleCompletion<CallerEnvironment::EMPER>(); if (fiber) { emper::statsIncr(awss::stats.nextIoFiberStolen); - return std::make_pair(fiber, FiberSource::ioStolen); + return NextFiberResult{fiber, emper::FiberSource::ioStolen}; } } return std::nullopt; } -auto AbstractWorkStealingScheduler::nextFiberViaWorkStealing() -> std::pair<Fiber*, FiberSource> { - FiberSource fiberSource = FiberSource::local; +auto AbstractWorkStealingScheduler::nextFiberResultViaWorkStealing() + -> std::optional<NextFiberResult> { Fiber* fiber; popBottom: @@ -163,7 +179,7 @@ popBottom: if (maybeRecycle(fiber)) goto popBottom; - return std::make_pair(fiber, fiberSource); + return NextFiberResult{fiber, emper::FiberSource::local}; } auto* const currentWorker = Worker::getCurrentWorker(); @@ -180,20 +196,17 @@ popBottom: const auto victim = static_cast<workerid_t>(dispatchHint.getPtrValue()); const auto stolen = tryStealFiberFrom(victim); if (stolen) { - fiber = (*stolen).first; - fiberSource = FiberSource::hintWsq; emper::statsIncr(awss::stats.nextFiberFromHintLocal); - goto out; + onWorkStolen(); + return NextFiberResult{(*stolen).fiber, emper::FiberSource::hintWsq}; } } break; case IoContext::PointerTags::NewWorkAq: { const auto fromAnywhere = nextFiberViaAnywhereQueue(); if (fromAnywhere) { - fiber = (*fromAnywhere).first; - fiberSource = FiberSource::hintAq; emper::statsIncr(awss::stats.nextFiberFromHintAnywhere); - goto out; + return NextFiberResult{(*fromAnywhere).fiber, emper::FiberSource::hintAq}; } break; } @@ -204,67 +217,65 @@ popBottom: } // Go into work stealing - { - // TODO: Determine if there is a better value than 1/3. - const float CHECK_ANYWHERE_QUEUE_AT_PERCENTAGE = 0.33; - const workerid_t myWorkerId = currentWorker->getWorkerId(); - const workerid_t workerCount = runtime.getWorkerCount(); - // NOLINTNEXTLINE(bugprone-narrowing-conversions) - const workerid_t checkAnywhereQueueAt = workerCount * CHECK_ANYWHERE_QUEUE_AT_PERCENTAGE; - - workerid_t startWorkerId = currentWorker->nextRandomWorkerId(); - // TODO: See how changing the victim count affects things. - const workerid_t victimCount = [&] { - if constexpr (emper::WS_VICTIM_COUNT) - return emper::WS_VICTIM_COUNT; - else if constexpr (emper::WS_VICTIM_DENOMINATOR) - return workerCount / emper::WS_VICTIM_DENOMINATOR; - else - return workerCount; - }(); - - for (workerid_t i = 0; i < victimCount; ++i) { - workerid_t victim = (startWorkerId + i) % workerCount; - - // Don't steal from ourselves. - if (unlikely(victim == myWorkerId)) continue; - - auto stolenFiber = tryStealFiberFrom(victim); - if (stolenFiber) return *stolenFiber; - - if constexpr (!emper::CHECK_ANYWHERE_QUEUE_WHILE_STEALING) continue; - - // If we failed to steal from a certain number of victims, check - // the anywhere queue for new fibers. - if (i == checkAnywhereQueueAt) { - auto anywhereQueueFiber = nextFiberViaAnywhereQueue(); - if (anywhereQueueFiber) return *anywhereQueueFiber; - } + // TODO: Determine if there is a better value than 1/3. + const float CHECK_ANYWHERE_QUEUE_AT_PERCENTAGE = 0.33; + const workerid_t myWorkerId = currentWorker->getWorkerId(); + const workerid_t workerCount = runtime.getWorkerCount(); + // NOLINTNEXTLINE(bugprone-narrowing-conversions) + const workerid_t checkAnywhereQueueAt = workerCount * CHECK_ANYWHERE_QUEUE_AT_PERCENTAGE; + + workerid_t startWorkerId = currentWorker->nextRandomWorkerId(); + // TODO: See how changing the victim count affects things. + const workerid_t victimCount = [&] { + if constexpr (emper::WS_VICTIM_COUNT) + return emper::WS_VICTIM_COUNT; + else if constexpr (emper::WS_VICTIM_DENOMINATOR) + return workerCount / emper::WS_VICTIM_DENOMINATOR; + else + return workerCount; + }(); + + for (workerid_t i = 0; i < victimCount; ++i) { + workerid_t victim = (startWorkerId + i) % workerCount; + + // Don't steal from ourselves. + if (unlikely(victim == myWorkerId)) continue; + + auto stolenFiber = tryStealFiberFrom(victim); + if (stolenFiber) { + onWorkStolen(); + return *stolenFiber; + } + + if constexpr (!emper::CHECK_ANYWHERE_QUEUE_WHILE_STEALING) continue; + + // If we failed to steal from a certain number of victims, check + // the anywhere queue for new fibers. + if (i == checkAnywhereQueueAt) { + auto anywhereQueueFiber = nextFiberViaAnywhereQueue(); + if (anywhereQueueFiber) return *anywhereQueueFiber; } } // Try the "scheduled from anywhere" queue to get work as last resort. - { - auto anywhereQueueFiber = nextFiberViaAnywhereQueue(); - if (anywhereQueueFiber) return *anywhereQueueFiber; - } + auto anywhereQueueFiber = nextFiberViaAnywhereQueue(); + if (anywhereQueueFiber) return *anywhereQueueFiber; // We where not able to dequeue any fiber if we reach this point. - fiber = nullptr; - -out: - return std::make_pair(fiber, fiberSource); + return std::nullopt; } -auto AbstractWorkStealingScheduler::nextFiberResultViaWorkStealing() -> NextFiberResult { - std::pair<Fiber*, FiberSource> nextFiberWsResult = nextFiberViaWorkStealing(); +auto AbstractWorkStealingScheduler::nextFiberResultFromInbox() -> std::optional<NextFiberResult> { + Fiber* fiber = inbox.dequeue(); + if (fiber != nullptr) return NextFiberResult{fiber, emper::FiberSource::inbox}; - if (nextFiberWsResult.second == FiberSource::stolen) { - onWorkStolen(); - } + return std::nullopt; +} - return NextFiberResult{ - nextFiberWsResult.first, - static_cast<uintptr_t>(nextFiberWsResult.second), - }; +auto AbstractWorkStealingScheduler::nextFiberResultFromInboxOrWorkStealing() + -> std::optional<NextFiberResult> { + auto result = nextFiberResultFromInbox(); + if (result) return result; + + return nextFiberResultViaWorkStealing(); } diff --git a/emper/strategies/AbstractWorkStealingScheduler.hpp b/emper/strategies/AbstractWorkStealingScheduler.hpp index ade437e1..74f67a17 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.hpp +++ b/emper/strategies/AbstractWorkStealingScheduler.hpp @@ -3,13 +3,12 @@ #pragma once #include <cstddef> // for size_t -#include <cstdint> #include <optional> -#include <utility> -#include "NextFiberResult.hpp" +#include "Fiber.hpp" #include "Scheduler.hpp" #include "emper-common.h" +#include "lib/adt/MpscQueue.hpp" #ifdef EMPER_LOCKED_WS_QUEUE #include "lib/adt/LockedQueue.hpp" @@ -17,7 +16,7 @@ #include "lib/adt/WsClQueue.hpp" #endif -class Fiber; +struct NextFiberResult; class Runtime; class RuntimeStrategy; @@ -28,34 +27,31 @@ class AbstractWorkStealingScheduler : public Scheduler { #else using WsQueue = adt::WsClQueue<Fiber*, SIZE>; #endif + using InboxQueue = adt::MpscQueue<Fiber>; public: static const int QUEUE_SIZE = 1024; - enum class FiberSource : uintptr_t { - local, - hintWsq, - hintAq, - stolen, - ioStolen, - anywhereQueue, - }; - private: - auto nextFiberViaAnywhereQueue() -> std::optional<std::pair<Fiber*, FiberSource>>; - auto tryStealFiberFrom(workerid_t victim) -> std::optional<std::pair<Fiber*, FiberSource>>; + auto nextFiberViaAnywhereQueue() -> std::optional<NextFiberResult>; + auto tryStealFiberFrom(workerid_t victim) -> std::optional<NextFiberResult>; protected: WsQueue<QUEUE_SIZE>** queues; static thread_local WsQueue<QUEUE_SIZE> queue; + InboxQueue** inboxes; + static thread_local InboxQueue inbox; + void scheduleViaWorkStealing(Fiber& fiber); + void scheduleToInbox(Fiber& fiber, workerid_t workerId); auto maybeRecycle(Fiber* fiber) -> bool; - auto nextFiberViaWorkStealing() -> std::pair<Fiber*, FiberSource>; - - auto nextFiberResultViaWorkStealing() -> NextFiberResult; + // This method is static because it only uses the thread_local inbox + static auto nextFiberResultFromInbox() -> std::optional<NextFiberResult>; + auto nextFiberResultViaWorkStealing() -> std::optional<NextFiberResult>; + auto nextFiberResultFromInboxOrWorkStealing() -> std::optional<NextFiberResult>; public: AbstractWorkStealingScheduler(Runtime& runtime, RuntimeStrategy& strategy); diff --git a/emper/strategies/AbstractWorkStealingStats.cpp b/emper/strategies/AbstractWorkStealingStats.cpp index 64bcaac5..3a715ec2 100644 --- a/emper/strategies/AbstractWorkStealingStats.cpp +++ b/emper/strategies/AbstractWorkStealingStats.cpp @@ -21,6 +21,8 @@ void AbstractWorkStealingStats::print(std::ostream& out) { << std::to_string(comulatedWorkerStats.scheduledFibersToLocal) << std::endl << "total-scheduled-fibers-to-overflow-queue: " << std::to_string(comulatedWorkerStats.scheduledFibersToOverflowQueue) << std::endl + << "total-scheduled-fibers-to-inbox: " + << std::to_string(comulatedWorkerStats.scheduledFibersToInbox) << std::endl << "global-max-queue-length: " << std::to_string(comulatedWorkerStats.maxQueueLength) << std::endl << "total-next-fiber-from-local: " << std::to_string(comulatedWorkerStats.nextFiberFromLocal) diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.cpp b/emper/strategies/AbstractWorkStealingWorkerStats.cpp index ceaf7b40..68375ca7 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.cpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.cpp @@ -8,6 +8,7 @@ auto AbstractWorkStealingWorkerStats::operator+=(const AbstractWorkStealingWorke -> AbstractWorkStealingWorkerStats& { scheduledFibersToLocal += other.scheduledFibersToLocal; scheduledFibersToOverflowQueue += other.scheduledFibersToOverflowQueue; + scheduledFibersToInbox += other.scheduledFibersToInbox; maxQueueLength = std::max(maxQueueLength, other.maxQueueLength); nextFiberFromLocal += other.nextFiberFromLocal; nextFiberFromHintLocal += other.nextFiberFromHintLocal; diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.hpp b/emper/strategies/AbstractWorkStealingWorkerStats.hpp index 57052eec..a6ae44e7 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.hpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.hpp @@ -15,6 +15,7 @@ class AbstractWorkStealingWorkerStats { public: uint64_t scheduledFibersToLocal = 0; uint64_t scheduledFibersToOverflowQueue = 0; + uint64_t scheduledFibersToInbox = 0; uint64_t maxQueueLength = 0; uint64_t nextFiberFromLocal = 0; uint64_t nextFiberFromHintLocal = 0; diff --git a/emper/strategies/laws/LawsDispatcher.cpp b/emper/strategies/laws/LawsDispatcher.cpp index 1cc2164d..0c8efb3f 100644 --- a/emper/strategies/laws/LawsDispatcher.cpp +++ b/emper/strategies/laws/LawsDispatcher.cpp @@ -2,10 +2,13 @@ // Copyright © 2020-2021 Florian Schmaus #include "LawsDispatcher.hpp" +#include <optional> + #include "Common.hpp" // for DIE_MSG #include "Emper.hpp" #include "Fiber.hpp" -#include "LawsStrategy.hpp" // for LawsStrategy, LawsStrategy::FiberSource +#include "FiberSource.hpp" +#include "LawsStrategy.hpp" // for LawsStrategy #include "NextFiberResult.hpp" #include "Runtime.hpp" #include "emper-common.h" @@ -23,41 +26,41 @@ void LawsDispatcher::recycle(Fiber* fiber) { void LawsDispatcher::dispatchLoop() { while (true) { - NextFiberResult next = runtime.nextFiber(); - Fiber* const fiber = next.fiber; - if (!fiber) { + std::optional<NextFiberResult> next = runtime.nextFiber(); + if (!next) { dispatchLoopDoSleep(); continue; } + Fiber* const fiber = next->fiber; + // The isRunnable() method performes an atomic swap on a boolean, // which was initialized to true, in order to check if this fiber // is runnable. if (isRunnable(fiber)) { if constexpr (emper::STATS) { - auto fiberSource = static_cast<LawsStrategy::FiberSource>(next.metadata); - switch (fiberSource) { - case LawsStrategy::FiberSource::fromPriority: + switch (next->source) { + case emper::FiberSource::inbox: LawsStrategy::stats.dispatchedFibersFromPriority++; break; - case LawsStrategy::FiberSource::local: + case emper::FiberSource::local: LawsStrategy::stats.dispatchedFibersFromLocal++; break; - case LawsStrategy::FiberSource::hintWsq: + case emper::FiberSource::hintWsq: LawsStrategy::stats.dispatchedFibersFromHintLocal++; break; - case LawsStrategy::FiberSource::hintAq: + case emper::FiberSource::hintAq: LawsStrategy::stats.dispatchedFibersFromHintAnywhere++; break; - case LawsStrategy::FiberSource::stolen: + case emper::FiberSource::stolen: LawsStrategy::stats.dispatchedFibersStolen++; break; - case LawsStrategy::FiberSource::anywhereQueue: + case emper::FiberSource::anywhereQueue: LawsStrategy::stats.dispatchedFibersFromAnywhereQueue++; break; default: - DIE_MSG("Unknown fiber source: " << next.metadata); + DIE_MSG("Unknown fiber source: " << next->source); break; } } diff --git a/emper/strategies/laws/LawsScheduler.cpp b/emper/strategies/laws/LawsScheduler.cpp index 8066c5cb..8f846b54 100644 --- a/emper/strategies/laws/LawsScheduler.cpp +++ b/emper/strategies/laws/LawsScheduler.cpp @@ -2,8 +2,6 @@ // Copyright © 2020-2021 Florian Schmaus #include "LawsScheduler.hpp" -#include <cstdint> - #include "CallerEnvironment.hpp" #include "Emper.hpp" #include "LawsStrategy.hpp" // IWYU pragma: keep @@ -12,16 +10,8 @@ #include "emper-common.h" #include "strategies/laws/LawsWorkerStats.hpp" -thread_local LawsScheduler::LawsMpscQueue LawsScheduler::priorityQueue; - LawsScheduler::LawsScheduler(Runtime& runtime, RuntimeStrategy& strategy) - : AbstractWorkStealingScheduler(runtime, strategy) { - const workerid_t workerCount = runtime.getWorkerCount(); - priorityQueues = new LawsScheduler::LawsMpscQueue*[workerCount]; - - auto newWorkerHook = [this](workerid_t workerId) { priorityQueues[workerId] = &priorityQueue; }; - addNewWorkerHook(newWorkerHook); -} + : AbstractWorkStealingScheduler(runtime, strategy) {} template <CallerEnvironment callerEnvironment> void LawsScheduler::tryScheduleToPriorityQueue(Fiber& fiber) { @@ -70,14 +60,6 @@ void LawsScheduler::scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) onNewWork<CallerEnvironment::ANYWHERE>(); } -auto LawsScheduler::nextFiber() -> NextFiberResult { - Fiber* fiber = priorityQueue.dequeue(); - if (fiber != nullptr) { - return NextFiberResult{ - fiber, - static_cast<uintptr_t>(LawsStrategy::FiberSource::fromPriority), - }; - } - - return nextFiberResultViaWorkStealing(); +auto LawsScheduler::nextFiber() -> std::optional<NextFiberResult> { + return nextFiberResultFromInboxOrWorkStealing(); } diff --git a/emper/strategies/laws/LawsScheduler.hpp b/emper/strategies/laws/LawsScheduler.hpp index f4b0f242..4a5f9bc9 100644 --- a/emper/strategies/laws/LawsScheduler.hpp +++ b/emper/strategies/laws/LawsScheduler.hpp @@ -1,9 +1,12 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020-2021 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus, Florian Fischer #pragma once +#include <optional> + #include "CallerEnvironment.hpp" #include "Fiber.hpp" +#include "emper-common.h" #include "lib/adt/MpscQueue.hpp" #include "strategies/AbstractWorkStealingScheduler.hpp" @@ -15,20 +18,21 @@ class LawsScheduler : public AbstractWorkStealingScheduler { using LawsMpscQueue = adt::MpscQueue<Fiber>; private: - LawsMpscQueue** priorityQueues; - - static thread_local LawsMpscQueue priorityQueue; + LawsMpscQueue**& priorityQueues = AbstractWorkStealingScheduler::inboxes; template <CallerEnvironment callerEnvironment> void tryScheduleToPriorityQueue(Fiber& fiber); protected: void scheduleInternal(Fiber& fiber) override; + void scheduleOnInternal(Fiber& fiber, workerid_t workerId) override { + scheduleToInbox(fiber, workerId); + } void scheduleFromAnywhereInternal(Fiber& fiber) override; void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) override; public: LawsScheduler(Runtime& runtime, RuntimeStrategy& strategy); - auto nextFiber() -> NextFiberResult override; + auto nextFiber() -> std::optional<NextFiberResult> override; }; diff --git a/emper/strategies/laws/LawsStrategy.hpp b/emper/strategies/laws/LawsStrategy.hpp index d287b446..c2eea7ed 100644 --- a/emper/strategies/laws/LawsStrategy.hpp +++ b/emper/strategies/laws/LawsStrategy.hpp @@ -2,11 +2,9 @@ // Copyright © 2020-2021 Florian Schmaus #pragma once -#include <cstdint> #include <memory> #include "WorkerLocalData.hpp" -#include "strategies/AbstractWorkStealingScheduler.hpp" #include "strategies/AbstractWorkStealingStrategy.hpp" #include "strategies/laws/LawsDispatcher.hpp" #include "strategies/laws/LawsScheduler.hpp" @@ -19,16 +17,6 @@ struct LawsWorkerStats; class LawsStrategy : public AbstractWorkStealingStrategy { private: - enum class FiberSource : uintptr_t { - local = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::local), - hintWsq = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::hintWsq), - hintAq = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::hintAq), - stolen = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::stolen), - anywhereQueue = - static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::anywhereQueue), - fromPriority, - }; - LawsScheduler scheduler; LawsDispatcher dispatcher; diff --git a/emper/strategies/ws/WsDispatcher.cpp b/emper/strategies/ws/WsDispatcher.cpp index 00569799..7826f33a 100644 --- a/emper/strategies/ws/WsDispatcher.cpp +++ b/emper/strategies/ws/WsDispatcher.cpp @@ -1,7 +1,9 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2022 Florian Schmaus Florian Fischer #include "WsDispatcher.hpp" +#include <optional> + #include "NextFiberResult.hpp" #include "Runtime.hpp" // for Runtime @@ -9,13 +11,13 @@ class Fiber; void WsDispatcher::dispatchLoop() { while (true) { - NextFiberResult next = runtime.nextFiber(); - Fiber* const fiber = next.fiber; - if (!fiber) { + std::optional<NextFiberResult> next = runtime.nextFiber(); + if (!next) { dispatchLoopDoSleep(); continue; } + Fiber* const fiber = next->fiber; dispatch(fiber); diff --git a/emper/strategies/ws/WsScheduler.hpp b/emper/strategies/ws/WsScheduler.hpp index c0ce9c3b..d9dc87d6 100644 --- a/emper/strategies/ws/WsScheduler.hpp +++ b/emper/strategies/ws/WsScheduler.hpp @@ -2,8 +2,11 @@ // Copyright © 2020-2021 Florian Schmaus #pragma once +#include <optional> + #include "CallerEnvironment.hpp" #include "NextFiberResult.hpp" +#include "emper-common.h" #include "strategies/AbstractWorkStealingScheduler.hpp" class Fiber; @@ -13,6 +16,9 @@ class RuntimeStrategy; class WsScheduler : public AbstractWorkStealingScheduler { protected: void scheduleInternal(Fiber& fiber) override { scheduleViaWorkStealing(fiber); } + void scheduleOnInternal(Fiber& fiber, workerid_t workerId) override { + scheduleToInbox(fiber, workerId); + } void scheduleFromAnywhereInternal(Fiber& fiber) override { enqueueInAnywhereQueue(fiber); onNewWork<CallerEnvironment::ANYWHERE>(); @@ -26,5 +32,7 @@ class WsScheduler : public AbstractWorkStealingScheduler { public: WsScheduler(Runtime& runtime, RuntimeStrategy& strategy); - auto nextFiber() -> NextFiberResult override { return nextFiberResultViaWorkStealing(); }; + auto nextFiber() -> std::optional<NextFiberResult> override { + return nextFiberResultFromInboxOrWorkStealing(); + }; }; diff --git a/tests/ScheduleOnTest.cpp b/tests/ScheduleOnTest.cpp new file mode 100644 index 00000000..f458a99d --- /dev/null +++ b/tests/ScheduleOnTest.cpp @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "CountingPrivateSemaphore.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" +#include "Worker.hpp" +#include "emper.hpp" +#include "fixtures/assert.hpp" + +static const unsigned ITERATIONS = 100; +static Runtime* runtime; +static unsigned workerCount; +unsigned iteration = 0; + +static void runOn(CPS& cps) { + ASSERT(Worker::getCurrentWorkerId() == (iteration % workerCount)); + ++iteration; + if (iteration == ITERATIONS) cps.signalAndExit(); + + runtime->scheduleOn(*Fiber::from([&] { runOn(cps); }), (iteration % workerCount)); +} + +static void scheduleOnTest() { + runtime = Runtime::getRuntime(); + ASSERT(runtime); + workerCount = runtime->getWorkerCount(); + + emper::sleep(1); + + CPS cps(1); + runtime->scheduleOn(*Fiber::from([&] { runOn(cps); }), (iteration % workerCount)); + cps.wait(); +} + +void emperTest() { scheduleOnTest(); } diff --git a/tests/ScheduleOnUnknownWorkerTest.cpp b/tests/ScheduleOnUnknownWorkerTest.cpp new file mode 100644 index 00000000..2af239c5 --- /dev/null +++ b/tests/ScheduleOnUnknownWorkerTest.cpp @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "Fiber.hpp" +#include "Runtime.hpp" +#include "fixtures/assert.hpp" + +auto main() -> int { + const unsigned nthreads = 1; + Runtime runtime(nthreads); + + runtime.executeAndWait([&] { + auto* next = Fiber::from([] { ASSERT(false); }); + runtime.scheduleOn(*next, nthreads + 1); + }); + + return 0; +} diff --git a/tests/meson.build b/tests/meson.build index c1d0d09a..eb4e9607 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -49,6 +49,21 @@ tests = [ 'test_runner': 'emper', }, + { + 'source': files('ScheduleOnTest.cpp'), + 'name': 'ScheduleOnTest', + 'description': 'Test Scheduler::scheduleOn', + 'test_runner': 'emper', + }, + + { + 'source': files('ScheduleOnUnknownWorkerTest.cpp'), + 'name': 'ScheduleOnUnknownWorkerTest', + 'description': 'Test Scheduler::scheduleOn with to big workerId', + 'should_fail': true, + 'is_parallel': true, + }, + { 'source': files('ReuseBpsTest.cpp'), 'name': 'ReuseBpsTest', -- GitLab