diff --git a/emper/FiberSource.cpp b/emper/FiberSource.cpp new file mode 100644 index 0000000000000000000000000000000000000000..410813360996f82fda444e51571fe79c99acc707 --- /dev/null +++ b/emper/FiberSource.cpp @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "FiberSource.hpp" + +#include <iostream> + +#include "Common.hpp" + +auto operator<<(std::ostream& os, const emper::FiberSource& fiberSource) -> std::ostream& { + switch (fiberSource) { + case emper::FiberSource::local: + return os << "local"; + case emper::FiberSource::inbox: + return os << "inbox"; + case emper::FiberSource::stolen: + return os << "stolen"; + case emper::FiberSource::io: + return os << "io"; + case emper::FiberSource::ioStolen: + return os << "ioStolen"; + case emper::FiberSource::anywhereQueue: + return os << "anywhereQueue"; + case emper::FiberSource::hintWsq: + return os << "hintWsq"; + case emper::FiberSource::hintAq: + return os << "hintAq"; + default: + DIE_MSG("Unknown FiberSource"); + } +} diff --git a/emper/FiberSource.hpp b/emper/FiberSource.hpp new file mode 100644 index 0000000000000000000000000000000000000000..e721fe1b7542f65a52213a6598e4dfef9826fbbf --- /dev/null +++ b/emper/FiberSource.hpp @@ -0,0 +1,28 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus, Florian Fischer +#pragma once + +#include <cstdint> +#include <iostream> + +namespace emper { +/** + * @brief Descriptor for all the possible locations a Fiber can be obtained from + * + * This enum is used to collect stats, create hints where fibers are and + * make informed notification or scheduling decisions. + */ +enum class FiberSource : uintptr_t { + local, /*!< A worker's own work-stealing queue */ + inbox, /*!< A worker's own inbox or priority queue */ + stolen, /*!< A other worker's work-stealing queue */ + io, /*!< A worker's own io_uring completion queue */ + ioStolen, /*!< A other worker's io_uring completion queue */ + anywhereQueue, /*!< The anywhere queue */ + hintWsq, /*!< A known other worker's work-stealing queue */ + hintAq, /*!< Straight from the anywhere queue */ +}; + +} // namespace emper + +auto operator<<(std::ostream& os, const emper::FiberSource& fiberSource) -> std::ostream&; diff --git a/emper/NextFiberResult.hpp b/emper/NextFiberResult.hpp index 15ab40a3c353ab7ec9ca243feb50258b0a6b9172..9b927e14f95870acca971bdc3de0dcfc8350aa82 100644 --- a/emper/NextFiberResult.hpp +++ b/emper/NextFiberResult.hpp @@ -4,9 +4,11 @@ #include <cstdint> +#include "FiberSource.hpp" + class Fiber; struct NextFiberResult { Fiber* const fiber; - const uintptr_t metadata; + const emper::FiberSource source; }; diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index e9e9ae0347fb6475273b92510b403afdd66b7cde..b0bd380bf3c4c22d87fd3e6c519f6ddadba6e97d 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -25,6 +25,7 @@ #include "Debug.hpp" // for DBG, ABORT, LOGD, LOGE #include "Emper.hpp" #include "Fiber.hpp" // for Fiber +#include "FiberSource.hpp" #include "NextFiberResult.hpp" #include "RuntimeStrategy.hpp" // for RuntimeStrategy #include "RuntimeStrategyFactory.hpp" @@ -38,7 +39,6 @@ #include "log/LogBuffer.hpp" #include "stats/FromAnywhere.hpp" #include "stats/Worker.hpp" -#include "strategies/AbstractWorkStealingScheduler.hpp" #ifdef EMPER_DEFAULT_SCHEDULING_STRATEGY_WORK_STEALING #include "strategies/ws/WsStrategyFactory.hpp" @@ -357,7 +357,7 @@ void Runtime::yield() { }); } -auto Runtime::nextFiber() -> NextFiberResult { +auto Runtime::nextFiber() -> std::optional<NextFiberResult> { if constexpr (emper::IO_WORKER_URING) { // Schedule all fibers waiting on completed IO IoContext::ContinuationBuffer completions; @@ -368,9 +368,7 @@ auto Runtime::nextFiber() -> NextFiberResult { Fiber* next = completions[0]; schedule(&completions[1], ncompletions - 1); - // TODO: hint that this fiber comes from the IO subsystem - return NextFiberResult{ - next, static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::local)}; + return NextFiberResult{next, emper::FiberSource::io}; } } diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index c8a4bbadec6c95910bc1b100a6b8c43115af61af..2f3a1cd891fbe2e62135bd8b1d77214e8cc2a859 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -19,7 +19,6 @@ #include "CallerEnvironment.hpp" #include "Context.hpp" #include "Debug.hpp" -#include "NextFiberResult.hpp" #include "Scheduler.hpp" // for Scheduler #include "WakeupStrategy.hpp" #include "Worker.hpp" @@ -34,6 +33,7 @@ class RuntimeBuilder; class ContextManager; class Dispatcher; class Fiber; +struct NextFiberResult; class RuntimeStrategy; class RuntimeStrategyFactory; class RuntimeStrategyStats; @@ -161,6 +161,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { } while (!canWake); } + auto nextFiber() -> std::optional<NextFiberResult>; + public: Runtime() : Runtime(getDefaultWorkerCount()) {} @@ -189,11 +191,14 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { scheduler.scheduleFromAnywhere(fibers, count); } + /** + * @brief Schedule a fiber on a specific worker + * + * For now scheduleOn is only available from within th EMPER runtime. + */ + inline void scheduleOn(Fiber& fiber, workerid_t workerId) { scheduler.scheduleOn(fiber, workerId); } void yield(); - // TODO: This should probably not be a public method of Runtime. - auto nextFiber() -> NextFiberResult; - // https://stackoverflow.com/a/3747462/194894 static inline auto rand() -> int { return Worker::rand(); } @@ -224,6 +229,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { void executeAndWait(std::function<void()> f); friend class AbstractWorkStealingScheduler; + friend class WsDispatcher; + friend class LawsDispatcher; template <LogSubsystem> friend class Blockable; friend RuntimeBuilder; diff --git a/emper/Scheduler.cpp b/emper/Scheduler.cpp index f04ca047bd022408bb9deeea2507b1cd634b8891..44c07883b247d295cadb2a6cf8d51edafd748cf1 100644 --- a/emper/Scheduler.cpp +++ b/emper/Scheduler.cpp @@ -19,6 +19,17 @@ void Scheduler::wakeupSleepingWorkers() { runtime.wakeupSleepingWorkers<callerEnvironment>(); } +void Scheduler::scheduleOn(Fiber& fiber, workerid_t workerId) { + // Calling scheduleOn() only works from within the EMPER runtime. + emper::assertInRuntime(); + + if (unlikely(workerId >= runtime.getWorkerCount())) + throw std::runtime_error("WorkerId to big for worker count"); + + LOGD("Scheduling fiber " << &fiber << " on worker " << workerId); + scheduleOnInternal(fiber, workerId); +} + void Scheduler::notifyRuntimeAboutWorkStolen() { runtime.wakeupStrategy.onWorkStolen(); } // show the compiler our template incarnations diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index be9e9a0779b8ba8a8a2a48066212f32748448c91..26f303c49f78b45ea31b9ae38df3650b053d26e4 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -4,6 +4,7 @@ #include <cstddef> #include <functional> // for function +#include <optional> #include <ostream> #include "CallerEnvironment.hpp" @@ -68,12 +69,13 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { } virtual void scheduleInternal(Fiber& fiber) = 0; + virtual void scheduleOnInternal(Fiber& fiber, workerid_t workerId) = 0; virtual void scheduleFromAnywhereInternal(Fiber& fiber) = 0; virtual void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) = 0; void recycle(Fiber* fiber) { dispatcher.recycle(fiber); }; - virtual auto nextFiber() -> NextFiberResult = 0; + virtual auto nextFiber() -> std::optional<NextFiberResult> = 0; friend class Runtime; @@ -98,6 +100,8 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { } } + void scheduleOn(Fiber& fiber, workerid_t workerId); + void scheduleFromAnywhere(Fiber& fiber) { LOGD("Scheduling fiber " << &fiber << " from anywhere"); scheduleFromAnywhereInternal(fiber); diff --git a/emper/meson.build b/emper/meson.build index fbd3e44cf19c0aa528233b53d914ddc6767c8ee4..a4f9da9d95f22c7869980adaf3db221afce7a728 100644 --- a/emper/meson.build +++ b/emper/meson.build @@ -17,6 +17,7 @@ emper_cpp_sources = [ 'Runtime.cpp', 'Emper.cpp', 'Fiber.cpp', + 'FiberSource.cpp', 'FiberManager.cpp', 'Context.cpp', 'Scheduler.cpp', diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index a4a6d251437244389d6f9108b31ae276331cb1c8..30f5c1dcf3e6fb68dc13e1e1fda39e30bf923d3b 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -1,10 +1,11 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2021 Florian Schmaus +// Copyright © 2021 Florian Schmaus, Florian Fischer #include "AbstractWorkStealingScheduler.hpp" #include <algorithm> #include <array> #include <cassert> +#include <cstdint> #include <ostream> // for operator<<, basic_ostream<>::__ostream_type #include <vector> @@ -13,6 +14,7 @@ #include "Debug.hpp" // for ABORT #include "Emper.hpp" // for OVERFLOW_QUEUE #include "Fiber.hpp" +#include "FiberSource.hpp" #include "NextFiberResult.hpp" #include "Runtime.hpp" // for Runtime #include "StealingResult.hpp" @@ -31,13 +33,19 @@ using emper::io::IoContext; thread_local AbstractWorkStealingScheduler::WsQueue<AbstractWorkStealingScheduler::QUEUE_SIZE> AbstractWorkStealingScheduler::queue; +thread_local AbstractWorkStealingScheduler::InboxQueue AbstractWorkStealingScheduler::inbox; + AbstractWorkStealingScheduler::AbstractWorkStealingScheduler(Runtime& runtime, RuntimeStrategy& strategy) : Scheduler(runtime, strategy) { const workerid_t workerCount = runtime.getWorkerCount(); queues = new AbstractWorkStealingScheduler::WsQueue<QUEUE_SIZE>*[workerCount]; + inboxes = new AbstractWorkStealingScheduler::InboxQueue*[workerCount]; - auto newWorkerHook = [this](workerid_t workerId) { queues[workerId] = &queue; }; + auto newWorkerHook = [this](workerid_t workerId) { + queues[workerId] = &queue; + inboxes[workerId] = &inbox; + }; addNewWorkerHook(newWorkerHook); } @@ -61,6 +69,15 @@ void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) { onNewWork(); } +void AbstractWorkStealingScheduler::scheduleToInbox(Fiber& fiber, workerid_t workerId) { + inboxes[workerId]->enqueue(&fiber); + emper::statsIncr(awss::stats.scheduledFibersToInbox); + + // Classes using this method are supposed to always invoke this + // method. Hence we call onNewWork() here. + onNewWork(); +} + auto AbstractWorkStealingScheduler::maybeRecycle(Fiber* fiber) -> bool { if (fiber->isRunnable()) return false; @@ -68,8 +85,7 @@ auto AbstractWorkStealingScheduler::maybeRecycle(Fiber* fiber) -> bool { return true; } -auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() - -> std::optional<std::pair<Fiber*, FiberSource>> { +auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() -> std::optional<NextFiberResult> { const size_t KEEP_FREE_SLOTS = 64; const size_t DEQUEUE_FROM_ANYWHERE_MAX = 512; @@ -121,13 +137,13 @@ auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() } } - if (res) return std::make_pair(res, FiberSource::anywhereQueue); + if (res) return NextFiberResult{res, emper::FiberSource::anywhereQueue}; return std::nullopt; } auto AbstractWorkStealingScheduler::tryStealFiberFrom(workerid_t victim) - -> std::optional<std::pair<Fiber*, FiberSource>> { + -> std::optional<NextFiberResult> { constexpr int maxRetries = emper::WAITFREE_WORK_STEALING ? 0 : -1; Fiber* fiber; popTop: @@ -137,7 +153,7 @@ popTop: if (maybeRecycle(fiber)) goto popTop; - return std::make_pair(fiber, FiberSource::stolen); + return NextFiberResult{fiber, emper::FiberSource::stolen}; } if constexpr (emper::IO_STEALING) { @@ -145,15 +161,15 @@ popTop: fiber = victimIo->reapSingleCompletion<CallerEnvironment::EMPER>(); if (fiber) { emper::statsIncr(awss::stats.nextIoFiberStolen); - return std::make_pair(fiber, FiberSource::ioStolen); + return NextFiberResult{fiber, emper::FiberSource::ioStolen}; } } return std::nullopt; } -auto AbstractWorkStealingScheduler::nextFiberViaWorkStealing() -> std::pair<Fiber*, FiberSource> { - FiberSource fiberSource = FiberSource::local; +auto AbstractWorkStealingScheduler::nextFiberResultViaWorkStealing() + -> std::optional<NextFiberResult> { Fiber* fiber; popBottom: @@ -163,7 +179,7 @@ popBottom: if (maybeRecycle(fiber)) goto popBottom; - return std::make_pair(fiber, fiberSource); + return NextFiberResult{fiber, emper::FiberSource::local}; } auto* const currentWorker = Worker::getCurrentWorker(); @@ -180,20 +196,17 @@ popBottom: const auto victim = static_cast<workerid_t>(dispatchHint.getPtrValue()); const auto stolen = tryStealFiberFrom(victim); if (stolen) { - fiber = (*stolen).first; - fiberSource = FiberSource::hintWsq; emper::statsIncr(awss::stats.nextFiberFromHintLocal); - goto out; + onWorkStolen(); + return NextFiberResult{(*stolen).fiber, emper::FiberSource::hintWsq}; } } break; case IoContext::PointerTags::NewWorkAq: { const auto fromAnywhere = nextFiberViaAnywhereQueue(); if (fromAnywhere) { - fiber = (*fromAnywhere).first; - fiberSource = FiberSource::hintAq; emper::statsIncr(awss::stats.nextFiberFromHintAnywhere); - goto out; + return NextFiberResult{(*fromAnywhere).fiber, emper::FiberSource::hintAq}; } break; } @@ -204,67 +217,65 @@ popBottom: } // Go into work stealing - { - // TODO: Determine if there is a better value than 1/3. - const float CHECK_ANYWHERE_QUEUE_AT_PERCENTAGE = 0.33; - const workerid_t myWorkerId = currentWorker->getWorkerId(); - const workerid_t workerCount = runtime.getWorkerCount(); - // NOLINTNEXTLINE(bugprone-narrowing-conversions) - const workerid_t checkAnywhereQueueAt = workerCount * CHECK_ANYWHERE_QUEUE_AT_PERCENTAGE; - - workerid_t startWorkerId = currentWorker->nextRandomWorkerId(); - // TODO: See how changing the victim count affects things. - const workerid_t victimCount = [&] { - if constexpr (emper::WS_VICTIM_COUNT) - return emper::WS_VICTIM_COUNT; - else if constexpr (emper::WS_VICTIM_DENOMINATOR) - return workerCount / emper::WS_VICTIM_DENOMINATOR; - else - return workerCount; - }(); - - for (workerid_t i = 0; i < victimCount; ++i) { - workerid_t victim = (startWorkerId + i) % workerCount; - - // Don't steal from ourselves. - if (unlikely(victim == myWorkerId)) continue; - - auto stolenFiber = tryStealFiberFrom(victim); - if (stolenFiber) return *stolenFiber; - - if constexpr (!emper::CHECK_ANYWHERE_QUEUE_WHILE_STEALING) continue; - - // If we failed to steal from a certain number of victims, check - // the anywhere queue for new fibers. - if (i == checkAnywhereQueueAt) { - auto anywhereQueueFiber = nextFiberViaAnywhereQueue(); - if (anywhereQueueFiber) return *anywhereQueueFiber; - } + // TODO: Determine if there is a better value than 1/3. + const float CHECK_ANYWHERE_QUEUE_AT_PERCENTAGE = 0.33; + const workerid_t myWorkerId = currentWorker->getWorkerId(); + const workerid_t workerCount = runtime.getWorkerCount(); + // NOLINTNEXTLINE(bugprone-narrowing-conversions) + const workerid_t checkAnywhereQueueAt = workerCount * CHECK_ANYWHERE_QUEUE_AT_PERCENTAGE; + + workerid_t startWorkerId = currentWorker->nextRandomWorkerId(); + // TODO: See how changing the victim count affects things. + const workerid_t victimCount = [&] { + if constexpr (emper::WS_VICTIM_COUNT) + return emper::WS_VICTIM_COUNT; + else if constexpr (emper::WS_VICTIM_DENOMINATOR) + return workerCount / emper::WS_VICTIM_DENOMINATOR; + else + return workerCount; + }(); + + for (workerid_t i = 0; i < victimCount; ++i) { + workerid_t victim = (startWorkerId + i) % workerCount; + + // Don't steal from ourselves. + if (unlikely(victim == myWorkerId)) continue; + + auto stolenFiber = tryStealFiberFrom(victim); + if (stolenFiber) { + onWorkStolen(); + return *stolenFiber; + } + + if constexpr (!emper::CHECK_ANYWHERE_QUEUE_WHILE_STEALING) continue; + + // If we failed to steal from a certain number of victims, check + // the anywhere queue for new fibers. + if (i == checkAnywhereQueueAt) { + auto anywhereQueueFiber = nextFiberViaAnywhereQueue(); + if (anywhereQueueFiber) return *anywhereQueueFiber; } } // Try the "scheduled from anywhere" queue to get work as last resort. - { - auto anywhereQueueFiber = nextFiberViaAnywhereQueue(); - if (anywhereQueueFiber) return *anywhereQueueFiber; - } + auto anywhereQueueFiber = nextFiberViaAnywhereQueue(); + if (anywhereQueueFiber) return *anywhereQueueFiber; // We where not able to dequeue any fiber if we reach this point. - fiber = nullptr; - -out: - return std::make_pair(fiber, fiberSource); + return std::nullopt; } -auto AbstractWorkStealingScheduler::nextFiberResultViaWorkStealing() -> NextFiberResult { - std::pair<Fiber*, FiberSource> nextFiberWsResult = nextFiberViaWorkStealing(); +auto AbstractWorkStealingScheduler::nextFiberResultFromInbox() -> std::optional<NextFiberResult> { + Fiber* fiber = inbox.dequeue(); + if (fiber != nullptr) return NextFiberResult{fiber, emper::FiberSource::inbox}; - if (nextFiberWsResult.second == FiberSource::stolen) { - onWorkStolen(); - } + return std::nullopt; +} - return NextFiberResult{ - nextFiberWsResult.first, - static_cast<uintptr_t>(nextFiberWsResult.second), - }; +auto AbstractWorkStealingScheduler::nextFiberResultFromInboxOrWorkStealing() + -> std::optional<NextFiberResult> { + auto result = nextFiberResultFromInbox(); + if (result) return result; + + return nextFiberResultViaWorkStealing(); } diff --git a/emper/strategies/AbstractWorkStealingScheduler.hpp b/emper/strategies/AbstractWorkStealingScheduler.hpp index ade437e1a3501cb7a8864f34d1def359258c65ee..74f67a17afafdc0045365b4e387961b135f31fac 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.hpp +++ b/emper/strategies/AbstractWorkStealingScheduler.hpp @@ -3,13 +3,12 @@ #pragma once #include <cstddef> // for size_t -#include <cstdint> #include <optional> -#include <utility> -#include "NextFiberResult.hpp" +#include "Fiber.hpp" #include "Scheduler.hpp" #include "emper-common.h" +#include "lib/adt/MpscQueue.hpp" #ifdef EMPER_LOCKED_WS_QUEUE #include "lib/adt/LockedQueue.hpp" @@ -17,7 +16,7 @@ #include "lib/adt/WsClQueue.hpp" #endif -class Fiber; +struct NextFiberResult; class Runtime; class RuntimeStrategy; @@ -28,34 +27,31 @@ class AbstractWorkStealingScheduler : public Scheduler { #else using WsQueue = adt::WsClQueue<Fiber*, SIZE>; #endif + using InboxQueue = adt::MpscQueue<Fiber>; public: static const int QUEUE_SIZE = 1024; - enum class FiberSource : uintptr_t { - local, - hintWsq, - hintAq, - stolen, - ioStolen, - anywhereQueue, - }; - private: - auto nextFiberViaAnywhereQueue() -> std::optional<std::pair<Fiber*, FiberSource>>; - auto tryStealFiberFrom(workerid_t victim) -> std::optional<std::pair<Fiber*, FiberSource>>; + auto nextFiberViaAnywhereQueue() -> std::optional<NextFiberResult>; + auto tryStealFiberFrom(workerid_t victim) -> std::optional<NextFiberResult>; protected: WsQueue<QUEUE_SIZE>** queues; static thread_local WsQueue<QUEUE_SIZE> queue; + InboxQueue** inboxes; + static thread_local InboxQueue inbox; + void scheduleViaWorkStealing(Fiber& fiber); + void scheduleToInbox(Fiber& fiber, workerid_t workerId); auto maybeRecycle(Fiber* fiber) -> bool; - auto nextFiberViaWorkStealing() -> std::pair<Fiber*, FiberSource>; - - auto nextFiberResultViaWorkStealing() -> NextFiberResult; + // This method is static because it only uses the thread_local inbox + static auto nextFiberResultFromInbox() -> std::optional<NextFiberResult>; + auto nextFiberResultViaWorkStealing() -> std::optional<NextFiberResult>; + auto nextFiberResultFromInboxOrWorkStealing() -> std::optional<NextFiberResult>; public: AbstractWorkStealingScheduler(Runtime& runtime, RuntimeStrategy& strategy); diff --git a/emper/strategies/AbstractWorkStealingStats.cpp b/emper/strategies/AbstractWorkStealingStats.cpp index 64bcaac58ffc1d0faea9891c378eddb8f142b0cd..3a715ec2d7e1cff4fd4f40415c1cc9bf236644bb 100644 --- a/emper/strategies/AbstractWorkStealingStats.cpp +++ b/emper/strategies/AbstractWorkStealingStats.cpp @@ -21,6 +21,8 @@ void AbstractWorkStealingStats::print(std::ostream& out) { << std::to_string(comulatedWorkerStats.scheduledFibersToLocal) << std::endl << "total-scheduled-fibers-to-overflow-queue: " << std::to_string(comulatedWorkerStats.scheduledFibersToOverflowQueue) << std::endl + << "total-scheduled-fibers-to-inbox: " + << std::to_string(comulatedWorkerStats.scheduledFibersToInbox) << std::endl << "global-max-queue-length: " << std::to_string(comulatedWorkerStats.maxQueueLength) << std::endl << "total-next-fiber-from-local: " << std::to_string(comulatedWorkerStats.nextFiberFromLocal) diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.cpp b/emper/strategies/AbstractWorkStealingWorkerStats.cpp index ceaf7b4004d14746d6c662dcef4722a02a63ac09..68375ca77607d23e80d8649cd288c40649d51d98 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.cpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.cpp @@ -8,6 +8,7 @@ auto AbstractWorkStealingWorkerStats::operator+=(const AbstractWorkStealingWorke -> AbstractWorkStealingWorkerStats& { scheduledFibersToLocal += other.scheduledFibersToLocal; scheduledFibersToOverflowQueue += other.scheduledFibersToOverflowQueue; + scheduledFibersToInbox += other.scheduledFibersToInbox; maxQueueLength = std::max(maxQueueLength, other.maxQueueLength); nextFiberFromLocal += other.nextFiberFromLocal; nextFiberFromHintLocal += other.nextFiberFromHintLocal; diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.hpp b/emper/strategies/AbstractWorkStealingWorkerStats.hpp index 57052eecffd9448669e17bd5e6108d0cba732946..a6ae44e7e493c4bb40b96828f8b3dbaff8d2f08b 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.hpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.hpp @@ -15,6 +15,7 @@ class AbstractWorkStealingWorkerStats { public: uint64_t scheduledFibersToLocal = 0; uint64_t scheduledFibersToOverflowQueue = 0; + uint64_t scheduledFibersToInbox = 0; uint64_t maxQueueLength = 0; uint64_t nextFiberFromLocal = 0; uint64_t nextFiberFromHintLocal = 0; diff --git a/emper/strategies/laws/LawsDispatcher.cpp b/emper/strategies/laws/LawsDispatcher.cpp index 1cc2164df97f55fe3dc2c2b500a5201383e21bb9..0c8efb3f8e07175756d419e2d82526c80edc60b4 100644 --- a/emper/strategies/laws/LawsDispatcher.cpp +++ b/emper/strategies/laws/LawsDispatcher.cpp @@ -2,10 +2,13 @@ // Copyright © 2020-2021 Florian Schmaus #include "LawsDispatcher.hpp" +#include <optional> + #include "Common.hpp" // for DIE_MSG #include "Emper.hpp" #include "Fiber.hpp" -#include "LawsStrategy.hpp" // for LawsStrategy, LawsStrategy::FiberSource +#include "FiberSource.hpp" +#include "LawsStrategy.hpp" // for LawsStrategy #include "NextFiberResult.hpp" #include "Runtime.hpp" #include "emper-common.h" @@ -23,41 +26,41 @@ void LawsDispatcher::recycle(Fiber* fiber) { void LawsDispatcher::dispatchLoop() { while (true) { - NextFiberResult next = runtime.nextFiber(); - Fiber* const fiber = next.fiber; - if (!fiber) { + std::optional<NextFiberResult> next = runtime.nextFiber(); + if (!next) { dispatchLoopDoSleep(); continue; } + Fiber* const fiber = next->fiber; + // The isRunnable() method performes an atomic swap on a boolean, // which was initialized to true, in order to check if this fiber // is runnable. if (isRunnable(fiber)) { if constexpr (emper::STATS) { - auto fiberSource = static_cast<LawsStrategy::FiberSource>(next.metadata); - switch (fiberSource) { - case LawsStrategy::FiberSource::fromPriority: + switch (next->source) { + case emper::FiberSource::inbox: LawsStrategy::stats.dispatchedFibersFromPriority++; break; - case LawsStrategy::FiberSource::local: + case emper::FiberSource::local: LawsStrategy::stats.dispatchedFibersFromLocal++; break; - case LawsStrategy::FiberSource::hintWsq: + case emper::FiberSource::hintWsq: LawsStrategy::stats.dispatchedFibersFromHintLocal++; break; - case LawsStrategy::FiberSource::hintAq: + case emper::FiberSource::hintAq: LawsStrategy::stats.dispatchedFibersFromHintAnywhere++; break; - case LawsStrategy::FiberSource::stolen: + case emper::FiberSource::stolen: LawsStrategy::stats.dispatchedFibersStolen++; break; - case LawsStrategy::FiberSource::anywhereQueue: + case emper::FiberSource::anywhereQueue: LawsStrategy::stats.dispatchedFibersFromAnywhereQueue++; break; default: - DIE_MSG("Unknown fiber source: " << next.metadata); + DIE_MSG("Unknown fiber source: " << next->source); break; } } diff --git a/emper/strategies/laws/LawsScheduler.cpp b/emper/strategies/laws/LawsScheduler.cpp index 8066c5cb03c5a3efb05ac8ab43efefcb7f4e5dd9..8f846b5429822e6b54cdc0ee269a8b88d55840e7 100644 --- a/emper/strategies/laws/LawsScheduler.cpp +++ b/emper/strategies/laws/LawsScheduler.cpp @@ -2,8 +2,6 @@ // Copyright © 2020-2021 Florian Schmaus #include "LawsScheduler.hpp" -#include <cstdint> - #include "CallerEnvironment.hpp" #include "Emper.hpp" #include "LawsStrategy.hpp" // IWYU pragma: keep @@ -12,16 +10,8 @@ #include "emper-common.h" #include "strategies/laws/LawsWorkerStats.hpp" -thread_local LawsScheduler::LawsMpscQueue LawsScheduler::priorityQueue; - LawsScheduler::LawsScheduler(Runtime& runtime, RuntimeStrategy& strategy) - : AbstractWorkStealingScheduler(runtime, strategy) { - const workerid_t workerCount = runtime.getWorkerCount(); - priorityQueues = new LawsScheduler::LawsMpscQueue*[workerCount]; - - auto newWorkerHook = [this](workerid_t workerId) { priorityQueues[workerId] = &priorityQueue; }; - addNewWorkerHook(newWorkerHook); -} + : AbstractWorkStealingScheduler(runtime, strategy) {} template <CallerEnvironment callerEnvironment> void LawsScheduler::tryScheduleToPriorityQueue(Fiber& fiber) { @@ -70,14 +60,6 @@ void LawsScheduler::scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) onNewWork<CallerEnvironment::ANYWHERE>(); } -auto LawsScheduler::nextFiber() -> NextFiberResult { - Fiber* fiber = priorityQueue.dequeue(); - if (fiber != nullptr) { - return NextFiberResult{ - fiber, - static_cast<uintptr_t>(LawsStrategy::FiberSource::fromPriority), - }; - } - - return nextFiberResultViaWorkStealing(); +auto LawsScheduler::nextFiber() -> std::optional<NextFiberResult> { + return nextFiberResultFromInboxOrWorkStealing(); } diff --git a/emper/strategies/laws/LawsScheduler.hpp b/emper/strategies/laws/LawsScheduler.hpp index f4b0f242418e48ae92825736456979666913a6c5..4a5f9bc9e2199a436260dc453f15532ba9780e62 100644 --- a/emper/strategies/laws/LawsScheduler.hpp +++ b/emper/strategies/laws/LawsScheduler.hpp @@ -1,9 +1,12 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020-2021 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus, Florian Fischer #pragma once +#include <optional> + #include "CallerEnvironment.hpp" #include "Fiber.hpp" +#include "emper-common.h" #include "lib/adt/MpscQueue.hpp" #include "strategies/AbstractWorkStealingScheduler.hpp" @@ -15,20 +18,21 @@ class LawsScheduler : public AbstractWorkStealingScheduler { using LawsMpscQueue = adt::MpscQueue<Fiber>; private: - LawsMpscQueue** priorityQueues; - - static thread_local LawsMpscQueue priorityQueue; + LawsMpscQueue**& priorityQueues = AbstractWorkStealingScheduler::inboxes; template <CallerEnvironment callerEnvironment> void tryScheduleToPriorityQueue(Fiber& fiber); protected: void scheduleInternal(Fiber& fiber) override; + void scheduleOnInternal(Fiber& fiber, workerid_t workerId) override { + scheduleToInbox(fiber, workerId); + } void scheduleFromAnywhereInternal(Fiber& fiber) override; void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) override; public: LawsScheduler(Runtime& runtime, RuntimeStrategy& strategy); - auto nextFiber() -> NextFiberResult override; + auto nextFiber() -> std::optional<NextFiberResult> override; }; diff --git a/emper/strategies/laws/LawsStrategy.hpp b/emper/strategies/laws/LawsStrategy.hpp index d287b446cfbdf667beb258a4ad0a27c880c8a953..c2eea7ed0bba35ef6e19987cd764244f6f4780ac 100644 --- a/emper/strategies/laws/LawsStrategy.hpp +++ b/emper/strategies/laws/LawsStrategy.hpp @@ -2,11 +2,9 @@ // Copyright © 2020-2021 Florian Schmaus #pragma once -#include <cstdint> #include <memory> #include "WorkerLocalData.hpp" -#include "strategies/AbstractWorkStealingScheduler.hpp" #include "strategies/AbstractWorkStealingStrategy.hpp" #include "strategies/laws/LawsDispatcher.hpp" #include "strategies/laws/LawsScheduler.hpp" @@ -19,16 +17,6 @@ struct LawsWorkerStats; class LawsStrategy : public AbstractWorkStealingStrategy { private: - enum class FiberSource : uintptr_t { - local = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::local), - hintWsq = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::hintWsq), - hintAq = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::hintAq), - stolen = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::stolen), - anywhereQueue = - static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::anywhereQueue), - fromPriority, - }; - LawsScheduler scheduler; LawsDispatcher dispatcher; diff --git a/emper/strategies/ws/WsDispatcher.cpp b/emper/strategies/ws/WsDispatcher.cpp index 00569799c74b21876dcb3c2c42fcbd093f0823a7..7826f33a82d0628091dbccaf38c6cca90bcb61f9 100644 --- a/emper/strategies/ws/WsDispatcher.cpp +++ b/emper/strategies/ws/WsDispatcher.cpp @@ -1,7 +1,9 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2022 Florian Schmaus Florian Fischer #include "WsDispatcher.hpp" +#include <optional> + #include "NextFiberResult.hpp" #include "Runtime.hpp" // for Runtime @@ -9,13 +11,13 @@ class Fiber; void WsDispatcher::dispatchLoop() { while (true) { - NextFiberResult next = runtime.nextFiber(); - Fiber* const fiber = next.fiber; - if (!fiber) { + std::optional<NextFiberResult> next = runtime.nextFiber(); + if (!next) { dispatchLoopDoSleep(); continue; } + Fiber* const fiber = next->fiber; dispatch(fiber); diff --git a/emper/strategies/ws/WsScheduler.hpp b/emper/strategies/ws/WsScheduler.hpp index c0ce9c3b8766f7b673e71a34f04ac24974e1e27c..d9dc87d62a50e202be36f6bd5d36384c06bf481c 100644 --- a/emper/strategies/ws/WsScheduler.hpp +++ b/emper/strategies/ws/WsScheduler.hpp @@ -2,8 +2,11 @@ // Copyright © 2020-2021 Florian Schmaus #pragma once +#include <optional> + #include "CallerEnvironment.hpp" #include "NextFiberResult.hpp" +#include "emper-common.h" #include "strategies/AbstractWorkStealingScheduler.hpp" class Fiber; @@ -13,6 +16,9 @@ class RuntimeStrategy; class WsScheduler : public AbstractWorkStealingScheduler { protected: void scheduleInternal(Fiber& fiber) override { scheduleViaWorkStealing(fiber); } + void scheduleOnInternal(Fiber& fiber, workerid_t workerId) override { + scheduleToInbox(fiber, workerId); + } void scheduleFromAnywhereInternal(Fiber& fiber) override { enqueueInAnywhereQueue(fiber); onNewWork<CallerEnvironment::ANYWHERE>(); @@ -26,5 +32,7 @@ class WsScheduler : public AbstractWorkStealingScheduler { public: WsScheduler(Runtime& runtime, RuntimeStrategy& strategy); - auto nextFiber() -> NextFiberResult override { return nextFiberResultViaWorkStealing(); }; + auto nextFiber() -> std::optional<NextFiberResult> override { + return nextFiberResultFromInboxOrWorkStealing(); + }; }; diff --git a/tests/ScheduleOnTest.cpp b/tests/ScheduleOnTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f458a99dd16ed3557d7c9d848334bcb406c23598 --- /dev/null +++ b/tests/ScheduleOnTest.cpp @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "CountingPrivateSemaphore.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" +#include "Worker.hpp" +#include "emper.hpp" +#include "fixtures/assert.hpp" + +static const unsigned ITERATIONS = 100; +static Runtime* runtime; +static unsigned workerCount; +unsigned iteration = 0; + +static void runOn(CPS& cps) { + ASSERT(Worker::getCurrentWorkerId() == (iteration % workerCount)); + ++iteration; + if (iteration == ITERATIONS) cps.signalAndExit(); + + runtime->scheduleOn(*Fiber::from([&] { runOn(cps); }), (iteration % workerCount)); +} + +static void scheduleOnTest() { + runtime = Runtime::getRuntime(); + ASSERT(runtime); + workerCount = runtime->getWorkerCount(); + + emper::sleep(1); + + CPS cps(1); + runtime->scheduleOn(*Fiber::from([&] { runOn(cps); }), (iteration % workerCount)); + cps.wait(); +} + +void emperTest() { scheduleOnTest(); } diff --git a/tests/ScheduleOnUnknownWorkerTest.cpp b/tests/ScheduleOnUnknownWorkerTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2af239c568edfb449cce451d50ceeaaa03798441 --- /dev/null +++ b/tests/ScheduleOnUnknownWorkerTest.cpp @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "Fiber.hpp" +#include "Runtime.hpp" +#include "fixtures/assert.hpp" + +auto main() -> int { + const unsigned nthreads = 1; + Runtime runtime(nthreads); + + runtime.executeAndWait([&] { + auto* next = Fiber::from([] { ASSERT(false); }); + runtime.scheduleOn(*next, nthreads + 1); + }); + + return 0; +} diff --git a/tests/meson.build b/tests/meson.build index c1d0d09a2aee9f1cb30a7367dad830a892b685fb..eb4e9607f4c6ef43e5acb1024efd20f38d6fdc4a 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -49,6 +49,21 @@ tests = [ 'test_runner': 'emper', }, + { + 'source': files('ScheduleOnTest.cpp'), + 'name': 'ScheduleOnTest', + 'description': 'Test Scheduler::scheduleOn', + 'test_runner': 'emper', + }, + + { + 'source': files('ScheduleOnUnknownWorkerTest.cpp'), + 'name': 'ScheduleOnUnknownWorkerTest', + 'description': 'Test Scheduler::scheduleOn with to big workerId', + 'should_fail': true, + 'is_parallel': true, + }, + { 'source': files('ReuseBpsTest.cpp'), 'name': 'ReuseBpsTest',