From b2e2a7b44b239cbe591149f689e98582a54c8637 Mon Sep 17 00:00:00 2001 From: Florian Schmaus <flow@cs.fau.de> Date: Thu, 14 Jan 2021 09:31:09 +0100 Subject: [PATCH] De-duplicate work-stealing scheduling code This introduces AbstractWorkStealingScheduler which holds the common work-stealing scheduling strategy. --- emper/Emper.hpp | 8 ++ emper/Fiber.hpp | 9 -- emper/NextFiberResult.hpp | 12 ++ emper/Runtime.cpp | 30 ++--- emper/Runtime.hpp | 24 ++-- emper/RuntimeStrategy.hpp | 6 +- emper/RuntimeStrategyFactory.hpp | 12 ++ emper/Scheduler.hpp | 17 ++- .../AbstractWorkStealingScheduler.cpp | 113 ++++++++++++++++++ .../AbstractWorkStealingScheduler.hpp | 58 +++++++++ .../strategies/AbstractWorkStealingStats.cpp | 24 ++++ .../strategies/AbstractWorkStealingStats.hpp | 22 ++++ .../AbstractWorkStealingStrategy.hpp | 24 ++++ emper/strategies/laws/LawsDispatcher.cpp | 13 +- emper/strategies/laws/LawsScheduler.cpp | 95 +++------------ emper/strategies/laws/LawsScheduler.hpp | 33 ++--- emper/strategies/laws/LawsStrategy.cpp | 21 ++-- emper/strategies/laws/LawsStrategy.hpp | 43 ++++--- emper/strategies/laws/LawsStrategyFactory.cpp | 11 ++ emper/strategies/laws/LawsStrategyFactory.hpp | 18 +++ emper/strategies/laws/LawsStrategyStats.cpp | 12 +- emper/strategies/laws/LawsStrategyStats.hpp | 8 +- emper/strategies/laws/meson.build | 1 + emper/strategies/meson.build | 5 + emper/strategies/ws/WsDispatcher.cpp | 4 +- emper/strategies/ws/WsScheduler.cpp | 76 +----------- emper/strategies/ws/WsScheduler.hpp | 33 ++--- emper/strategies/ws/WsStrategy.cpp | 14 +-- emper/strategies/ws/WsStrategy.hpp | 29 ++--- emper/strategies/ws/WsStrategyFactory.cpp | 11 ++ emper/strategies/ws/WsStrategyFactory.hpp | 15 +++ emper/strategies/ws/WsStrategyStats.cpp | 25 +--- emper/strategies/ws/WsStrategyStats.hpp | 17 +-- emper/strategies/ws/meson.build | 1 + eval/Locality.cpp | 20 ++-- meson.build | 1 + meson_options.txt | 6 + tests/SimpleLawsTest.cpp | 20 ++-- 38 files changed, 510 insertions(+), 381 deletions(-) create mode 100644 emper/NextFiberResult.hpp create mode 100644 emper/RuntimeStrategyFactory.hpp create mode 100644 emper/strategies/AbstractWorkStealingScheduler.cpp create mode 100644 emper/strategies/AbstractWorkStealingScheduler.hpp create mode 100644 emper/strategies/AbstractWorkStealingStats.cpp create mode 100644 emper/strategies/AbstractWorkStealingStats.hpp create mode 100644 emper/strategies/AbstractWorkStealingStrategy.hpp create mode 100644 emper/strategies/laws/LawsStrategyFactory.cpp create mode 100644 emper/strategies/laws/LawsStrategyFactory.hpp create mode 100644 emper/strategies/ws/WsStrategyFactory.cpp create mode 100644 emper/strategies/ws/WsStrategyFactory.hpp diff --git a/emper/Emper.hpp b/emper/Emper.hpp index ef2ee4e1..995a567e 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -56,5 +56,13 @@ static const bool LOG_TIMESTAMP = #endif ; +static const bool OVERFLOW_QUEUE = +#ifdef EMPER_OVERFLOW_QUEUE + true +#else + false +#endif + ; + auto getFullVersion() -> std::string; } // namespace emper diff --git a/emper/Fiber.hpp b/emper/Fiber.hpp index 6c2da785..0449d3ba 100644 --- a/emper/Fiber.hpp +++ b/emper/Fiber.hpp @@ -44,11 +44,6 @@ class ALIGN_TO_CACHE_LINE Fiber : public Logger<LogSubsystem::F> { Fiber* mpscNext = nullptr; - /** - * A flag used to indicate where the fiber was from. - */ - unsigned int flag = 0; - /** * Dummy constructor. Used for example by the MpscQueue. */ @@ -100,16 +95,12 @@ class ALIGN_TO_CACHE_LINE Fiber : public Logger<LogSubsystem::F> { return --referenceCounter; } - inline void setFlag(unsigned int flag) { this->flag = flag; } - friend class adt::MpscQueue<Fiber>; friend class Scheduler; friend class Dispatcher; friend class LawsScheduler; public: - [[nodiscard]] auto getFlag() const -> unsigned int { return flag; } - [[nodiscard]] auto getAffinity() const -> workeraffinity_t { if (affinity == nullptr) { return NOT_AFFINE; diff --git a/emper/NextFiberResult.hpp b/emper/NextFiberResult.hpp new file mode 100644 index 00000000..15ab40a3 --- /dev/null +++ b/emper/NextFiberResult.hpp @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus +#pragma once + +#include <cstdint> + +class Fiber; + +struct NextFiberResult { + Fiber* const fiber; + const uintptr_t metadata; +}; diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index b8d743cc..b6a9c58f 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #include "Runtime.hpp" #include <pthread.h> // for pthread_t, pthread_attr_init @@ -18,16 +18,18 @@ #include "ContextManager.hpp" // for ContextManager #include "Debug.hpp" // for DBG, ABORT, LOGD, LOGE #include "Emper.hpp" -#include "Fiber.hpp" // for Fiber -#include "RuntimeStrategy.hpp" // for RuntimeStrategy +#include "Fiber.hpp" // for Fiber +#include "NextFiberResult.hpp" +#include "RuntimeStrategy.hpp" // for RuntimeStrategy +#include "RuntimeStrategyFactory.hpp" #include "RuntimeStrategyStats.hpp" // for RuntimeStrategyStats #include "emper-config.h" // IWYU pragma: keep #include "lib/DebugUtil.hpp" #ifdef EMPER_DEFAULT_SCHEDULING_STRATEGY_WORK_STEALING -#include "strategies/ws/WsStrategy.hpp" // for WsStrategy, WsStrategy::INST... +#include "strategies/ws/WsStrategyFactory.hpp" #elif defined EMPER_DEFAULT_SCHEDULING_STRATEGY_LOCALITY_AWARE_WORK_STEALING -#include "strategies/laws/LawsStrategy.hpp" +#include "strategies/laws/LawsStrategyFactory.hpp" #else #error "Unknown default scheduling strategy" #endif @@ -46,22 +48,22 @@ std::mutex Runtime::currentRuntimeMutex; Runtime* Runtime::currentRuntime; -RuntimeStrategy& Runtime::DEFAULT_STRATEGY = +RuntimeStrategyFactory& Runtime::DEFAULT_STRATEGY = #ifdef EMPER_DEFAULT_SCHEDULING_STRATEGY_WORK_STEALING - WsStrategy::INSTANCE + WsStrategyFactory::INSTANCE #elif defined EMPER_DEFAULT_SCHEDULING_STRATEGY_LOCALITY_AWARE_WORK_STEALING - LawsStrategy::INSTANCE + LawsStrategyFactory::INSTANCE #else #error "Unknown default scheduling strategy" #endif ; -Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy, unsigned int seed) +Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory, unsigned int seed) : workerCount(workerCount), workerLatch(workerCount), - strategy(strategy), - scheduler(strategy.getScheduler(*this)), - dispatcher(strategy.getDispatcher(*this)), + strategy(strategyFactory.constructRuntimeStrategy(*this)), + scheduler(strategy->getScheduler()), + dispatcher(strategy->getDispatcher()), contextManager(*(new ContextManager(*this))), threads(new pthread_t[workerCount]), workers(new Worker*[workerCount]), @@ -167,7 +169,7 @@ auto Runtime::workerLoop(Worker* worker) -> void* { return nullptr; } -auto Runtime::nextFiber() -> Fiber* { return scheduler.nextFiber(); } +auto Runtime::nextFiber() -> NextFiberResult { return scheduler.nextFiber(); } void Runtime::waitUntilFinished() { for (workerid_t i = 0; i < workerCount; ++i) { @@ -176,7 +178,7 @@ void Runtime::waitUntilFinished() { } void Runtime::printStats() { - auto runtimeStrategyStats = strategy.getStats(); + auto runtimeStrategyStats = strategy->getStats(); runtimeStrategyStats->print(); } diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index b4a51435..9cfd7679 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #pragma once #include <pthread.h> // for pthread_t @@ -15,9 +15,10 @@ #include <thread> // for thread #include <vector> // for vector -#include "Common.hpp" // for ALIGN_TO_CACHE_LINE -#include "Debug.hpp" // for LogSubsystem, LogSubsystem::RUNTI, Logger -#include "Emper.hpp" // for WORKER_NOTIFY +#include "Common.hpp" // for ALIGN_TO_CACHE_LINE +#include "Debug.hpp" // for LogSubsystem, LogSubsystem::RUNTI, Logger +#include "Emper.hpp" // for WORKER_NOTIFY +#include "NextFiberResult.hpp" #include "Scheduler.hpp" // for Scheduler #include "Worker.hpp" #include "emper-common.h" // for workerid_t @@ -27,6 +28,7 @@ class ContextManager; class Dispatcher; class Fiber; class RuntimeStrategy; +class RuntimeStrategyFactory; enum WakeupMode { IF_SLEEPING_OBSERVED, @@ -44,7 +46,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { Latch workerLatch; - RuntimeStrategy& strategy; + RuntimeStrategy* const strategy; Scheduler& scheduler; Dispatcher& dispatcher; ContextManager& contextManager; @@ -61,7 +63,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { ALIGN_TO_CACHE_LINE std::atomic<unsigned long> sleepingWorkers; bool skipSleep; - static RuntimeStrategy& DEFAULT_STRATEGY; + static RuntimeStrategyFactory& DEFAULT_STRATEGY; static void printLastRuntimeStats(); @@ -118,9 +120,10 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { Runtime(workerid_t workerCount) : Runtime(workerCount, DEFAULT_STRATEGY) {} - Runtime(RuntimeStrategy& strategy) : Runtime(std::thread::hardware_concurrency(), strategy) {} + Runtime(RuntimeStrategyFactory& strategyFactory) + : Runtime(std::thread::hardware_concurrency(), strategyFactory) {} - Runtime(workerid_t workerCount, RuntimeStrategy& strategy, + Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory, unsigned int seed = std::random_device()()); ~Runtime(); @@ -134,7 +137,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); } - auto nextFiber() -> Fiber*; + // TODO: This should probably not be a public method of Runtime. + auto nextFiber() -> NextFiberResult; // https://stackoverflow.com/a/3747462/194894 static inline auto rand() -> int { return Worker::rand(); } @@ -147,7 +151,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { inline auto getContextManager() -> ContextManager& { return contextManager; } - inline auto getStrategy() -> RuntimeStrategy& { return strategy; } + inline auto getStrategy() -> RuntimeStrategy& { return *strategy; } void waitUntilFinished(); diff --git a/emper/RuntimeStrategy.hpp b/emper/RuntimeStrategy.hpp index c3d0d1a5..d152c2bf 100644 --- a/emper/RuntimeStrategy.hpp +++ b/emper/RuntimeStrategy.hpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #pragma once class Runtime; @@ -13,9 +13,9 @@ class RuntimeStrategy { friend class Runtime; private: - virtual auto getScheduler(Runtime& runtime) -> Scheduler& = 0; + virtual auto getScheduler() -> Scheduler& = 0; - virtual auto getDispatcher(Runtime& runtime) -> Dispatcher& = 0; + virtual auto getDispatcher() -> Dispatcher& = 0; public: virtual auto getStats() -> std::shared_ptr<RuntimeStrategyStats> = 0; diff --git a/emper/RuntimeStrategyFactory.hpp b/emper/RuntimeStrategyFactory.hpp new file mode 100644 index 00000000..30071092 --- /dev/null +++ b/emper/RuntimeStrategyFactory.hpp @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus +#pragma once + +#include "RuntimeStrategy.hpp" + +class Runtime; + +class RuntimeStrategyFactory { + public: + virtual auto constructRuntimeStrategy(Runtime& runtime) -> RuntimeStrategy* = 0; +}; diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index eaff0de4..e563cbd2 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #pragma once #include <functional> // for function @@ -12,6 +12,7 @@ #include "lib/adt/LockedUnboundedQueue.hpp" class Runtime; +struct NextFiberResult; class Scheduler : public Logger<LogSubsystem::SCHED> { private: @@ -44,15 +45,23 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { } } + void enqueueInAnywhereQueue(Fiber& fiber) { scheduleAnywhereQueue.enqueue(&fiber); } + auto dequeFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); } + virtual void scheduleInternal(Fiber& fiber) = 0; + public: - virtual void schedule(Fiber& fiber) = 0; + void schedule(Fiber& fiber) { + LOGD("Scheduling fiber " << &fiber); + + scheduleInternal(fiber); + } - virtual auto nextFiber() -> Fiber* = 0; + virtual auto nextFiber() -> NextFiberResult = 0; void scheduleFromAnywhere(Fiber& fiber) { - scheduleAnywhereQueue.enqueue(&fiber); + enqueueInAnywhereQueue(fiber); onNewWork<CallerEnvironment::ANYWHERE>(); } diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp new file mode 100644 index 00000000..9067ca8a --- /dev/null +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -0,0 +1,113 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus +#include "AbstractWorkStealingScheduler.hpp" + +#include <atomic> +#include <ostream> // for operator<<, basic_ostream<>::__ostream_type + +#include "Common.hpp" // for unlikely, likely +#include "Debug.hpp" // for ABORT +#include "Emper.hpp" // for OVERFLOW_QUEUE +#include "NextFiberResult.hpp" +#include "Runtime.hpp" // for Runtime +#include "emper-common.h" // for workerid_t +#include "strategies/AbstractWorkStealingStrategy.hpp" + +class Fiber; + +thread_local AbstractWorkStealingScheduler::WsQueue<AbstractWorkStealingScheduler::QUEUE_SIZE> + AbstractWorkStealingScheduler::queue; + +AbstractWorkStealingScheduler::AbstractWorkStealingScheduler( + Runtime& runtime, AbstractWorkStealingStrategy& abstractWorkStealingStrategy) + : Scheduler(runtime), abstractWorkStealingStrategy(abstractWorkStealingStrategy) { + const workerid_t workerCount = runtime.getWorkerCount(); + queues = new AbstractWorkStealingScheduler::WsQueue<QUEUE_SIZE>*[workerCount]; + + auto newWorkerHook = [this]() { queues[Runtime::getWorkerId()] = &queue; }; + addNewWorkerHook(newWorkerHook); +} + +void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) { + bool pushed = queue.pushBottom(&fiber); + if (unlikely(!pushed)) { + if constexpr (emper::OVERFLOW_QUEUE) { + enqueueInAnywhereQueue(fiber); + + if constexpr (emper::STATS) { + // TODO: Use template magic so that this becomes + // incrementRelaxed(abstractWorkStealingStrategy.scheduledFibersToLocal) + abstractWorkStealingStrategy.scheduledFibersToOverflowQueue.fetch_add( + 1, std::memory_order_relaxed); + } + } else { + ABORT("Could not push fiber " << &fiber << " into queue"); + } + } else if constexpr (emper::STATS) { + abstractWorkStealingStrategy.scheduledFibersToLocal.fetch_add(1, std::memory_order_relaxed); + } + + // Classes using this method are supposed to always invoke this + // method. Hence we call onNewWork() here. + onNewWork(); +} + +auto AbstractWorkStealingScheduler::nextFiberViaWorkStealing() -> std::pair<Fiber*, FiberSource> { + FiberSource fiberSource = FiberSource::local; + Fiber* fiber; + + bool poped = queue.popBottom(&fiber); + if (likely(poped)) { + if constexpr (emper::STATS) { + abstractWorkStealingStrategy.nextFiberFromLocal.fetch_add(1, std::memory_order_relaxed); + } + + goto out; + } + + { + const workerid_t myWorkerId = Runtime::getWorkerId(); + const workerid_t workerCount = runtime.getWorkerCount(); + workerid_t startWorkerId = Runtime::rand() % workerCount; + // TODO: See how reducing the loop bound affects things. + for (workerid_t i = 0; i < workerCount; ++i) { + workerid_t victim = (startWorkerId + i) % workerCount; + + // Don't steal from ourselves. + if (unlikely(victim == myWorkerId)) continue; + + poped = queues[victim]->popTop(&fiber); + if (poped) { + if constexpr (emper::STATS) { + abstractWorkStealingStrategy.nextFiberStolen.fetch_add(1, std::memory_order_relaxed); + } + + fiberSource = FiberSource::stolen; + goto out; + } + } + } + + // Try the "scheduled from anywhere" queue to get work as last resort. + fiber = dequeFiberFromAnywhereQueue(); + if (fiber) { + if constexpr (emper::STATS) { + abstractWorkStealingStrategy.nextFiberFromAnywhereQueue.fetch_add(1, + std::memory_order_relaxed); + } + + fiberSource = FiberSource::anywhere_queue; + goto out; + } + +out: + return std::make_pair(fiber, fiberSource); +} + +auto AbstractWorkStealingScheduler::nextFiberResultViaWorkStealing() -> NextFiberResult { + std::pair<Fiber*, FiberSource> nextFiberWsResult = nextFiberViaWorkStealing(); + return NextFiberResult{ + nextFiberWsResult.first, + static_cast<uintptr_t>(nextFiberWsResult.second), + }; +} diff --git a/emper/strategies/AbstractWorkStealingScheduler.hpp b/emper/strategies/AbstractWorkStealingScheduler.hpp new file mode 100644 index 00000000..acd17d7b --- /dev/null +++ b/emper/strategies/AbstractWorkStealingScheduler.hpp @@ -0,0 +1,58 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus +#pragma once + +#include <cstddef> // for size_t +#include <cstdint> +#include <utility> + +#include "NextFiberResult.hpp" +#include "Scheduler.hpp" +#include "emper-common.h" +#include "lib/adt/WsClQueue.hpp" // for WsClQueue + +class Fiber; +class Runtime; +class AbstractWorkStealingStrategy; + +class AbstractWorkStealingScheduler : public Scheduler { + template <size_t SIZE> +#ifdef EMPER_LOCKED_WS_QUEUE + using WsQueue = adt::LockedQueue<Fiber*, SIZE>; +#else + using WsQueue = adt::WsClQueue<Fiber*, SIZE>; +#endif + + private: +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wattributes" + AbstractWorkStealingStrategy& abstractWorkStealingStrategy +#ifndef EMPER_STATS + ATTR_UNUSED +#endif + ; +#pragma GCC diagnostic pop + + public: + static const int QUEUE_SIZE = 1024; + + enum struct FiberSource : uintptr_t { + local, + stolen, + anywhere_queue, + }; + + protected: + WsQueue<QUEUE_SIZE>** queues; + static thread_local WsQueue<QUEUE_SIZE> queue; + + void scheduleViaWorkStealing(Fiber& fiber); + + auto nextFiberViaWorkStealing() -> std::pair<Fiber*, FiberSource>; + + auto nextFiberResultViaWorkStealing() -> NextFiberResult; + + public: + AbstractWorkStealingScheduler(Runtime& runtime, + AbstractWorkStealingStrategy& abstractWorkStealingStrategy); +}; diff --git a/emper/strategies/AbstractWorkStealingStats.cpp b/emper/strategies/AbstractWorkStealingStats.cpp new file mode 100644 index 00000000..7d6c5860 --- /dev/null +++ b/emper/strategies/AbstractWorkStealingStats.cpp @@ -0,0 +1,24 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus +#include "strategies/AbstractWorkStealingStats.hpp" + +#include <atomic> +#include <iostream> + +#include "strategies/AbstractWorkStealingStrategy.hpp" + +AbstractWorkStealingStats::AbstractWorkStealingStats(AbstractWorkStealingStrategy& strategy) + : scheduledFibersToLocal(strategy.scheduledFibersToLocal), + scheduledFibersToOverflowQueue(strategy.scheduledFibersToOverflowQueue), + nextFiberFromLocal(strategy.nextFiberFromLocal), + nextFiberStolen(strategy.nextFiberStolen), + nextFiberFromAnywhereQueue(strategy.nextFiberFromAnywhereQueue) {} + +void AbstractWorkStealingStats::print() { + std::cout << "AbstractWorkStealingStats" + << " scheduledFibersToLocal:" << scheduledFibersToLocal + << " scheduledFibersToOverflowQueue:" << scheduledFibersToOverflowQueue + << " nextFiberFromLocal:" << nextFiberFromLocal + << " nextFiberStolen:" << nextFiberStolen + << " nextFiberFromAnywhereQueue:" << nextFiberFromAnywhereQueue << std::endl; +} diff --git a/emper/strategies/AbstractWorkStealingStats.hpp b/emper/strategies/AbstractWorkStealingStats.hpp new file mode 100644 index 00000000..a5f86150 --- /dev/null +++ b/emper/strategies/AbstractWorkStealingStats.hpp @@ -0,0 +1,22 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus +#pragma once + +#include <cstdint> + +#include "RuntimeStrategyStats.hpp" + +class AbstractWorkStealingStrategy; + +class AbstractWorkStealingStats : public RuntimeStrategyStats { + public: + const uint64_t scheduledFibersToLocal; + const uint64_t scheduledFibersToOverflowQueue; + const uint64_t nextFiberFromLocal; + const uint64_t nextFiberStolen; + const uint64_t nextFiberFromAnywhereQueue; + + AbstractWorkStealingStats(AbstractWorkStealingStrategy &strategy); + + void print() override; +}; diff --git a/emper/strategies/AbstractWorkStealingStrategy.hpp b/emper/strategies/AbstractWorkStealingStrategy.hpp new file mode 100644 index 00000000..fbaf054f --- /dev/null +++ b/emper/strategies/AbstractWorkStealingStrategy.hpp @@ -0,0 +1,24 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#pragma once + +#include <atomic> +#include <cstdint> +#include <memory> + +#include "RuntimeStrategy.hpp" + +class AbstractWorkStealingScheduler; +class AbstractWorkStealingStats; + +class AbstractWorkStealingStrategy : public RuntimeStrategy { + private: + std::atomic<std::uint64_t> scheduledFibersToLocal; + std::atomic<std::uint64_t> scheduledFibersToOverflowQueue; + std::atomic<std::uint64_t> nextFiberFromLocal; + std::atomic<std::uint64_t> nextFiberStolen; + std::atomic<std::uint64_t> nextFiberFromAnywhereQueue; + + friend AbstractWorkStealingScheduler; + friend AbstractWorkStealingStats; +}; diff --git a/emper/strategies/laws/LawsDispatcher.cpp b/emper/strategies/laws/LawsDispatcher.cpp index 6930783d..2deca760 100644 --- a/emper/strategies/laws/LawsDispatcher.cpp +++ b/emper/strategies/laws/LawsDispatcher.cpp @@ -1,13 +1,13 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #include "LawsDispatcher.hpp" #include <atomic> // for atomic, memory_order_relaxed #include "Common.hpp" // for DIE_MSG #include "Emper.hpp" -#include "Fiber.hpp" // for Fiber #include "LawsStrategy.hpp" // for LawsStrategy, LawsStrategy::FiberSource +#include "NextFiberResult.hpp" #include "Runtime.hpp" void LawsDispatcher::recycle(Fiber* fiber) { @@ -20,7 +20,8 @@ void LawsDispatcher::recycle(Fiber* fiber) { void LawsDispatcher::dispatchLoop() { while (true) { - Fiber* const fiber = runtime.nextFiber(); + NextFiberResult next = runtime.nextFiber(); + Fiber* const fiber = next.fiber; if (!fiber) { dispatchLoopDoSleep(); @@ -32,18 +33,18 @@ void LawsDispatcher::dispatchLoop() { // is runnable. if (isRunnable(fiber)) { if constexpr (emper::STATS) { - auto fiberSource = static_cast<LawsStrategy::FiberSource>(fiber->getFlag()); + auto fiberSource = static_cast<LawsStrategy::FiberSource>(next.metadata); switch (fiberSource) { case LawsStrategy::FiberSource::fromPriority: lawsStrategy.dispatchedFiberFromPriority.fetch_add(1, std::memory_order_relaxed); break; - case LawsStrategy::FiberSource::fromLocal: + case LawsStrategy::FiberSource::local: lawsStrategy.dispatchedFiberFromLocal.fetch_add(1, std::memory_order_relaxed); break; case LawsStrategy::FiberSource::stolen: lawsStrategy.dispatchedFiberStolen.fetch_add(1, std::memory_order_relaxed); break; - case LawsStrategy::FiberSource::anywhereQueue: + case LawsStrategy::FiberSource::anywhere_queue: lawsStrategy.dispatchedFiberFromAnywhere.fetch_add(1, std::memory_order_relaxed); break; default: diff --git a/emper/strategies/laws/LawsScheduler.cpp b/emper/strategies/laws/LawsScheduler.cpp index 1432b3d0..3e72b841 100644 --- a/emper/strategies/laws/LawsScheduler.cpp +++ b/emper/strategies/laws/LawsScheduler.cpp @@ -1,49 +1,42 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #include "LawsScheduler.hpp" #include <atomic> // for atomic, memory_order_relaxed +#include <cstdint> -#include "Common.hpp" -#include "Debug.hpp" #include "Emper.hpp" #include "LawsStrategy.hpp" // IWYU pragma: keep +#include "NextFiberResult.hpp" #include "Runtime.hpp" #define EMPER_OVERFLOW_QUEUE thread_local LawsScheduler::LawsMpscQueue LawsScheduler::priorityQueue; -thread_local LawsScheduler::WsQueue<LawsScheduler::QUEUE_SIZE> LawsScheduler::queue; - LawsScheduler::LawsScheduler(Runtime& runtime, LawsStrategy& lawsStrategy) - : Scheduler(runtime), lawsStrategy(lawsStrategy) { + : AbstractWorkStealingScheduler(runtime, lawsStrategy), lawsStrategy(lawsStrategy) { const workerid_t workerCount = runtime.getWorkerCount(); - queues = new LawsScheduler::WsQueue<QUEUE_SIZE>*[workerCount]; priorityQueues = new LawsScheduler::LawsMpscQueue*[workerCount]; - mainThreadQueue = &queue; auto newWorkerHook = [this]() { workerid_t workerId = Runtime::getWorkerId(); - queues[workerId] = &queue; priorityQueues[workerId] = &priorityQueue; }; addNewWorkerHook(newWorkerHook); } -void LawsScheduler::schedule(Fiber& fiber) { - LOGD("Scheduling fiber " << &fiber); - +void LawsScheduler::scheduleInternal(Fiber& fiber) { workeraffinity_t* const affinity_buffer = getAffinityBuffer(fiber); if (affinity_buffer) { workeraffinity_t affinity = *affinity_buffer; workerid_t workerId = Runtime::getWorkerId(); if (affinity == workerId) { - goto scheduleToLocalWsQueue; + goto scheduleViaWorkStealing; } if (affinity == Fiber::NOT_AFFINE) { - goto scheduleToLocalWsQueue; + goto scheduleViaWorkStealing; } // We found a fiber to schedule on a remote prority queue. @@ -54,76 +47,18 @@ void LawsScheduler::schedule(Fiber& fiber) { } } -scheduleToLocalWsQueue: - bool pushed = queue.pushBottom(&fiber); - if (unlikely(!pushed)) { -#ifdef EMPER_OVERFLOW_QUEUE - priorityQueue.enqueue(&fiber); -#else - ABORT("Could not push fiber " << &fiber << " into queue"); -#endif - } else { - if constexpr (emper::STATS) { - lawsStrategy.scheduledFibersToLocal.fetch_add(1, std::memory_order_relaxed); - } - } - - onNewWork(); +scheduleViaWorkStealing: + scheduleViaWorkStealing(fiber); } -auto LawsScheduler::nextFiber() -> Fiber* { +auto LawsScheduler::nextFiber() -> NextFiberResult { Fiber* fiber = priorityQueue.dequeue(); if (fiber != nullptr) { - // We fetched a fiber from your local priority queue. - if constexpr (emper::STATS) { - auto flag = static_cast<unsigned int>(LawsStrategy::FiberSource::fromPriority); - fiber->setFlag(flag); - } - - return fiber; + return NextFiberResult{ + fiber, + static_cast<uintptr_t>(LawsStrategy::FiberSource::fromPriority), + }; } - bool poped = queue.popBottom(&fiber); - - if (likely(poped)) { - if constexpr (emper::STATS) { - auto flag = static_cast<unsigned int>(LawsStrategy::FiberSource::fromLocal); - fiber->setFlag(flag); - } - - return fiber; - } - - // TODO: The code below is nearly duplicated, besides the statsk - // part, in WsScheduler, deduplicate. - const workerid_t myWorkerId = Runtime::getWorkerId(); - const workerid_t workerCount = runtime.getWorkerCount(); - workerid_t startWorkerId = Runtime::rand() % workerCount; - // TODO: See how reducing the loop bound affects things. - for (workerid_t i = 0; i < workerCount; ++i) { - workerid_t victim = (startWorkerId + i) % workerCount; - - // Don't steal from ourselves. - if (unlikely(victim == myWorkerId)) continue; - - poped = queues[victim]->popTop(&fiber); - if (poped) { - if constexpr (emper::STATS) { - auto flag = static_cast<unsigned int>(LawsStrategy::FiberSource::stolen); - fiber->setFlag(flag); - } - - return fiber; - } - } - - // Try the "scheduled from anywhere" queue to get work as last resort. - fiber = dequeFiberFromAnywhereQueue(); - if (fiber) { - if constexpr (emper::STATS) { - auto flag = static_cast<unsigned int>(LawsStrategy::FiberSource::anywhereQueue); - fiber->setFlag(flag); - } - } - return fiber; + return nextFiberResultViaWorkStealing(); } diff --git a/emper/strategies/laws/LawsScheduler.hpp b/emper/strategies/laws/LawsScheduler.hpp index 931dd0e5..a2fd6184 100644 --- a/emper/strategies/laws/LawsScheduler.hpp +++ b/emper/strategies/laws/LawsScheduler.hpp @@ -1,42 +1,24 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #pragma once -#include <cstddef> - #include "Fiber.hpp" -#include "Scheduler.hpp" #include "emper-common.h" #include "lib/adt/MpscQueue.hpp" -#include "lib/adt/WsClQueue.hpp" +#include "strategies/AbstractWorkStealingScheduler.hpp" class LawsStrategy; class Runtime; +struct NextFiberResult; -class LawsScheduler : public Scheduler { - template <size_t SIZE> -#ifdef EMPER_LOCKED_WS_QUEUE - using WsQueue = adt::LockedQueue<Fiber*, SIZE>; -#else - using WsQueue = adt::WsClQueue<Fiber*, SIZE>; -#endif - +class LawsScheduler : public AbstractWorkStealingScheduler { using LawsMpscQueue = adt::MpscQueue<Fiber>; - public: - static const int QUEUE_SIZE = 1024; - private: LawsMpscQueue** priorityQueues; - WsQueue<QUEUE_SIZE>** queues; - static thread_local LawsMpscQueue priorityQueue; - static thread_local LawsScheduler::WsQueue<QUEUE_SIZE> queue; - - WsQueue<QUEUE_SIZE>* mainThreadQueue; - #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wattributes" LawsStrategy& lawsStrategy @@ -46,10 +28,11 @@ class LawsScheduler : public Scheduler { ; #pragma GCC diagnostic pop + protected: + void scheduleInternal(Fiber& fiber) override; + public: LawsScheduler(Runtime& runtime, LawsStrategy& lawsStrategy); - void schedule(Fiber& fiber) override; - - auto nextFiber() -> Fiber* override; + auto nextFiber() -> NextFiberResult override; }; diff --git a/emper/strategies/laws/LawsStrategy.cpp b/emper/strategies/laws/LawsStrategy.cpp index 5b2745df..68bb821e 100644 --- a/emper/strategies/laws/LawsStrategy.cpp +++ b/emper/strategies/laws/LawsStrategy.cpp @@ -1,22 +1,23 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #include "LawsStrategy.hpp" #include "strategies/laws/LawsDispatcher.hpp" // for LawsDispatcher #include "strategies/laws/LawsScheduler.hpp" // for LawsScheduler #include "strategies/laws/LawsStrategyStats.hpp" // for LawsStrategyStats -LawsStrategy LawsStrategy::INSTANCE; +LawsStrategy::LawsStrategy(Runtime& runtime) + : scheduler(runtime, *this), + dispatcher(runtime, *this), + scheduledFibersToRemotePriority(0), + dispatchedFiberFromPriority(0), + dispatchedFiberFromLocal(0), + dispatchedFiberStolen(0), + dispatchedFiberFromAnywhere(0) {} -auto LawsStrategy::getScheduler(Runtime& runtime) -> Scheduler& { - Scheduler* scheduler = new LawsScheduler(runtime, *this); - return *scheduler; -} +auto LawsStrategy::getScheduler() -> LawsScheduler& { return scheduler; } -auto LawsStrategy::getDispatcher(Runtime& runtime) -> Dispatcher& { - Dispatcher* dispatcher = new LawsDispatcher(runtime, *this); - return *dispatcher; -} +auto LawsStrategy::getDispatcher() -> LawsDispatcher& { return dispatcher; } auto LawsStrategy::getStats() -> std::shared_ptr<RuntimeStrategyStats> { return std::make_shared<LawsStrategyStats>(*this); diff --git a/emper/strategies/laws/LawsStrategy.hpp b/emper/strategies/laws/LawsStrategy.hpp index 7d56702b..6d672ee0 100644 --- a/emper/strategies/laws/LawsStrategy.hpp +++ b/emper/strategies/laws/LawsStrategy.hpp @@ -1,55 +1,52 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #pragma once #include <atomic> #include <cstdint> #include <memory> -#include "RuntimeStrategy.hpp" +#include "strategies/AbstractWorkStealingScheduler.hpp" +#include "strategies/AbstractWorkStealingStrategy.hpp" +#include "strategies/laws/LawsDispatcher.hpp" +#include "strategies/laws/LawsScheduler.hpp" -class Dispatcher; -class LawsDispatcher; -class LawsScheduler; +class LawsStrategyFactory; class LawsStrategyStats; class Runtime; class RuntimeStrategyStats; -class Scheduler; -class LawsStrategy : public RuntimeStrategy { +class LawsStrategy : public AbstractWorkStealingStrategy { private: - enum struct FiberSource : unsigned int { + enum struct FiberSource : uintptr_t { + local = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::local), + stolen = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::stolen), + anywhere_queue = + static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::anywhere_queue), fromPriority, - fromLocal, - stolen, - anywhereQueue, }; + LawsScheduler scheduler; + LawsDispatcher dispatcher; + + // TODO: Align those all to cache line! std::atomic<std::uint64_t> scheduledFibersToRemotePriority; - std::atomic<std::uint64_t> scheduledFibersToLocal; std::atomic<std::uint64_t> dispatchedFiberFromPriority; std::atomic<std::uint64_t> dispatchedFiberFromLocal; std::atomic<std::uint64_t> dispatchedFiberStolen; std::atomic<std::uint64_t> dispatchedFiberFromAnywhere; - LawsStrategy() - : scheduledFibersToRemotePriority(0), - scheduledFibersToLocal(0), - dispatchedFiberFromPriority(0), - dispatchedFiberFromLocal(0), - dispatchedFiberStolen(0), - dispatchedFiberFromAnywhere(0) {} + LawsStrategy(Runtime& runtime); - auto getScheduler(Runtime& runtime) -> Scheduler& override; + auto getScheduler() -> LawsScheduler& override; - auto getDispatcher(Runtime& runtime) -> Dispatcher& override; + auto getDispatcher() -> LawsDispatcher& override; public: auto getStats() -> std::shared_ptr<RuntimeStrategyStats> override; - static LawsStrategy INSTANCE; - friend LawsScheduler; friend LawsDispatcher; friend LawsStrategyStats; + friend LawsStrategyFactory; }; diff --git a/emper/strategies/laws/LawsStrategyFactory.cpp b/emper/strategies/laws/LawsStrategyFactory.cpp new file mode 100644 index 00000000..1dc946c0 --- /dev/null +++ b/emper/strategies/laws/LawsStrategyFactory.cpp @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus +#include "strategies/laws/LawsStrategyFactory.hpp" + +class Runtime; + +LawsStrategyFactory LawsStrategyFactory::INSTANCE; + +auto LawsStrategyFactory::constructRuntimeStrategy(Runtime& runtime) -> LawsStrategy* { + return new LawsStrategy(runtime); +} diff --git a/emper/strategies/laws/LawsStrategyFactory.hpp b/emper/strategies/laws/LawsStrategyFactory.hpp new file mode 100644 index 00000000..2ae89ea5 --- /dev/null +++ b/emper/strategies/laws/LawsStrategyFactory.hpp @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus +#pragma once + +#include "RuntimeStrategyFactory.hpp" +#include "strategies/laws/LawsStrategy.hpp" + +class Runtime; + +class LawsStrategyFactory : public RuntimeStrategyFactory { + private: + LawsStrategyFactory() = default; + + public: + auto constructRuntimeStrategy(Runtime& runtime) -> LawsStrategy* override; + + static LawsStrategyFactory INSTANCE; +}; diff --git a/emper/strategies/laws/LawsStrategyStats.cpp b/emper/strategies/laws/LawsStrategyStats.cpp index c1acec27..966c84c3 100644 --- a/emper/strategies/laws/LawsStrategyStats.cpp +++ b/emper/strategies/laws/LawsStrategyStats.cpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #include "LawsStrategyStats.hpp" #include <atomic> @@ -8,8 +8,8 @@ #include "LawsStrategy.hpp" LawsStrategyStats::LawsStrategyStats(LawsStrategy& lawsStrategy) - : scheduledFibersToRemotePriority(lawsStrategy.scheduledFibersToRemotePriority), - scheduledFibersToLocal(lawsStrategy.scheduledFibersToLocal), + : AbstractWorkStealingStats(lawsStrategy), + scheduledFibersToRemotePriority(lawsStrategy.scheduledFibersToRemotePriority), dispatchedFiberFromPriority(lawsStrategy.dispatchedFiberFromPriority), dispatchedFiberFromLocal(lawsStrategy.dispatchedFiberFromLocal), dispatchedFiberStolen(lawsStrategy.dispatchedFiberStolen), @@ -19,10 +19,6 @@ auto LawsStrategyStats::getScheduledFibersToRemotePriority() const -> uint64_t { return scheduledFibersToRemotePriority; } -auto LawsStrategyStats::getScheduledFibersToLocal() const -> uint64_t { - return scheduledFibersToLocal; -} - auto LawsStrategyStats::getDispatchedFiberFromPriority() const -> uint64_t { return dispatchedFiberFromPriority; } @@ -40,9 +36,9 @@ auto LawsStrategyStats::getDispatchedFiberFromAnywhere() const -> uint64_t { } void LawsStrategyStats::print() { + // TODO: Print also the stats from AbstractWorkStealingStrategy. std::cout << "LawsStrategyStats" << " scheduledFibersToRemotePriority:" << scheduledFibersToRemotePriority - << " scheduledFibersToLocal:" << scheduledFibersToLocal << " dispatchedFiberFromPriority:" << dispatchedFiberFromPriority << " dispatchedFiberFromLocal:" << dispatchedFiberFromLocal << " dispatchedFiberStolen:" << dispatchedFiberStolen diff --git a/emper/strategies/laws/LawsStrategyStats.hpp b/emper/strategies/laws/LawsStrategyStats.hpp index 48de230b..2d4cff89 100644 --- a/emper/strategies/laws/LawsStrategyStats.hpp +++ b/emper/strategies/laws/LawsStrategyStats.hpp @@ -1,17 +1,16 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #pragma once #include <cstdint> -#include "RuntimeStrategyStats.hpp" +#include "strategies/AbstractWorkStealingStats.hpp" class LawsStrategy; -class LawsStrategyStats : public RuntimeStrategyStats { +class LawsStrategyStats : public AbstractWorkStealingStats { private: const uint64_t scheduledFibersToRemotePriority; - const uint64_t scheduledFibersToLocal; const uint64_t dispatchedFiberFromPriority; const uint64_t dispatchedFiberFromLocal; const uint64_t dispatchedFiberStolen; @@ -21,7 +20,6 @@ class LawsStrategyStats : public RuntimeStrategyStats { LawsStrategyStats(LawsStrategy& lawsStrategy); [[nodiscard]] auto getScheduledFibersToRemotePriority() const -> uint64_t; - [[nodiscard]] auto getScheduledFibersToLocal() const -> uint64_t; [[nodiscard]] auto getDispatchedFiberFromPriority() const -> uint64_t; [[nodiscard]] auto getDispatchedFiberFromLocal() const -> uint64_t; [[nodiscard]] auto getDispatchedFiberStolen() const -> uint64_t; diff --git a/emper/strategies/laws/meson.build b/emper/strategies/laws/meson.build index 3297836b..b36183e6 100644 --- a/emper/strategies/laws/meson.build +++ b/emper/strategies/laws/meson.build @@ -1,5 +1,6 @@ emper_cpp_sources += files( 'LawsStrategy.cpp', + 'LawsStrategyFactory.cpp', 'LawsStrategyStats.cpp', 'LawsScheduler.cpp', 'LawsDispatcher.cpp', diff --git a/emper/strategies/meson.build b/emper/strategies/meson.build index 24e52026..2586f72a 100644 --- a/emper/strategies/meson.build +++ b/emper/strategies/meson.build @@ -1,2 +1,7 @@ +emper_cpp_sources += files( + 'AbstractWorkStealingScheduler.cpp', + 'AbstractWorkStealingStats.cpp', +) + subdir('ws') subdir('laws') diff --git a/emper/strategies/ws/WsDispatcher.cpp b/emper/strategies/ws/WsDispatcher.cpp index 74182792..00569799 100644 --- a/emper/strategies/ws/WsDispatcher.cpp +++ b/emper/strategies/ws/WsDispatcher.cpp @@ -2,13 +2,15 @@ // Copyright © 2020 Florian Schmaus #include "WsDispatcher.hpp" +#include "NextFiberResult.hpp" #include "Runtime.hpp" // for Runtime class Fiber; void WsDispatcher::dispatchLoop() { while (true) { - Fiber* fiber = runtime.nextFiber(); + NextFiberResult next = runtime.nextFiber(); + Fiber* const fiber = next.fiber; if (!fiber) { dispatchLoopDoSleep(); diff --git a/emper/strategies/ws/WsScheduler.cpp b/emper/strategies/ws/WsScheduler.cpp index 720a99a5..41c1b0a5 100644 --- a/emper/strategies/ws/WsScheduler.cpp +++ b/emper/strategies/ws/WsScheduler.cpp @@ -2,81 +2,7 @@ // Copyright © 2020 Florian Schmaus #include "WsScheduler.hpp" -#include <atomic> -#include <ostream> - -#include "Common.hpp" -#include "Debug.hpp" -#include "Emper.hpp" -#include "Runtime.hpp" #include "strategies/ws/WsStrategy.hpp" -class Fiber; - -thread_local WsScheduler::WsQueue<WsScheduler::QUEUE_SIZE> WsScheduler::queue; - WsScheduler::WsScheduler(Runtime& runtime, WsStrategy& wsStrategy) - : Scheduler(runtime), wsStrategy(wsStrategy) { - const workerid_t workerCount = runtime.getWorkerCount(); - queues = new WsScheduler::WsQueue<QUEUE_SIZE>*[workerCount]; - - auto newWorkerHook = [this]() { queues[Runtime::getWorkerId()] = &queue; }; - addNewWorkerHook(newWorkerHook); -} - -void WsScheduler::schedule(Fiber& fiber) { - LOGD("Scheduling fiber " << &fiber); - - bool pushed = queue.pushBottom(&fiber); - if (unlikely(!pushed)) { - // Work-stealing should not use an overflow queue - // (EMPER_OVERFLOW_QUEUE), because of the extra overhead - // required to check that queue for work, so we have to abort - // here. - ABORT("Could not push fiber " << &fiber << " into queue"); - } - - if constexpr (emper::STATS) { - wsStrategy.scheduledFibers.fetch_add(1, std::memory_order_relaxed); - } - - onNewWork(); -} - -auto WsScheduler::nextFiber() -> Fiber* { - Fiber* fiber; - bool poped = queue.popBottom(&fiber); - - if (likely(poped)) { - if constexpr (emper::STATS) { - wsStrategy.nextFiberFromLocal.fetch_add(1, std::memory_order_relaxed); - } - return fiber; - } - - // TODO: The code below is nearly duplicated, besides the stats - // part, in LawsScheduler, deduplicate. - const workerid_t myWorkerId = Runtime::getWorkerId(); - const workerid_t workerCount = runtime.getWorkerCount(); - workerid_t startWorkerId = Runtime::rand() % workerCount; - // TODO: See how reducing the loop bound affects things. - for (workerid_t i = 0; i < workerCount; ++i) { - workerid_t victim = (startWorkerId + i) % workerCount; - - // Don't steal from ourselves. - if (unlikely(victim == myWorkerId)) continue; - - poped = queues[victim]->popTop(&fiber); - if (poped) { - if constexpr (emper::STATS) { - wsStrategy.nextFiberStolen.fetch_add(1, std::memory_order_relaxed); - } - - return fiber; - } - } - - // Try the "scheduled from anywhere" queue to get work as last resort. - fiber = dequeFiberFromAnywhereQueue(); - return fiber; -} + : AbstractWorkStealingScheduler(runtime, wsStrategy), wsStrategy(wsStrategy) {} diff --git a/emper/strategies/ws/WsScheduler.hpp b/emper/strategies/ws/WsScheduler.hpp index 569a9034..63a634d3 100644 --- a/emper/strategies/ws/WsScheduler.hpp +++ b/emper/strategies/ws/WsScheduler.hpp @@ -1,32 +1,16 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #pragma once -#include <cstddef> // for size_t - -#include "Scheduler.hpp" // for Scheduler -#include "emper-common.h" // for ATTR_UNUSED -#include "lib/adt/WsClQueue.hpp" // for WsClQueue +#include "NextFiberResult.hpp" +#include "emper-common.h" // for ATTR_UNUSED +#include "strategies/AbstractWorkStealingScheduler.hpp" class Fiber; class Runtime; class WsStrategy; -class WsScheduler : public Scheduler { - template <size_t SIZE> -#ifdef EMPER_LOCKED_WS_QUEUE - using WsQueue = adt::LockedQueue<Fiber*, SIZE>; -#else - using WsQueue = adt::WsClQueue<Fiber*, SIZE>; -#endif - - public: - static const int QUEUE_SIZE = 1024; - - private: - WsQueue<QUEUE_SIZE>** queues; - static thread_local WsQueue<QUEUE_SIZE> queue; - +class WsScheduler : public AbstractWorkStealingScheduler { #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wattributes" WsStrategy& wsStrategy @@ -36,10 +20,11 @@ class WsScheduler : public Scheduler { ; #pragma GCC diagnostic pop + protected: + void scheduleInternal(Fiber& fiber) override { scheduleViaWorkStealing(fiber); } + public: WsScheduler(Runtime& runtime, WsStrategy& wsStrategy); - void schedule(Fiber& fiber) override; - - auto nextFiber() -> Fiber* override; + auto nextFiber() -> NextFiberResult override { return nextFiberResultViaWorkStealing(); }; }; diff --git a/emper/strategies/ws/WsStrategy.cpp b/emper/strategies/ws/WsStrategy.cpp index ded25e63..72bc6fe4 100644 --- a/emper/strategies/ws/WsStrategy.cpp +++ b/emper/strategies/ws/WsStrategy.cpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #include "WsStrategy.hpp" #include "WsDispatcher.hpp" @@ -9,17 +9,11 @@ class Runtime; class RuntimeStrategyStats; -WsStrategy WsStrategy::INSTANCE; +WsStrategy::WsStrategy(Runtime& runtime) : scheduler(runtime, *this), dispatcher(runtime) {} -auto WsStrategy::getScheduler(Runtime& runtime) -> Scheduler& { - Scheduler* scheduler = new WsScheduler(runtime, *this); - return *scheduler; -} +auto WsStrategy::getScheduler() -> WsScheduler& { return scheduler; } -auto WsStrategy::getDispatcher(Runtime& runtime) -> Dispatcher& { - Dispatcher* dispatcher = new WsDispatcher(runtime); - return *dispatcher; -} +auto WsStrategy::getDispatcher() -> WsDispatcher& { return dispatcher; } auto WsStrategy::getStats() -> std::shared_ptr<RuntimeStrategyStats> { return std::make_shared<WsStrategyStats>(*this); diff --git a/emper/strategies/ws/WsStrategy.hpp b/emper/strategies/ws/WsStrategy.hpp index f6e963ca..1bba1024 100644 --- a/emper/strategies/ws/WsStrategy.hpp +++ b/emper/strategies/ws/WsStrategy.hpp @@ -1,39 +1,34 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #pragma once -#include <atomic> -#include <cstdint> #include <memory> -#include "RuntimeStrategy.hpp" +#include "strategies/AbstractWorkStealingStrategy.hpp" +#include "strategies/ws/WsDispatcher.hpp" +#include "strategies/ws/WsScheduler.hpp" -class Scheduler; -class Dispatcher; class Runtime; class RuntimeStrategyStats; -class WsScheduler; -class WsDispatcher; +class WsStrategyFactory; class WsStrategyStats; -class WsStrategy : public RuntimeStrategy { +class WsStrategy : public AbstractWorkStealingStrategy { private: - std::atomic<std::uint64_t> scheduledFibers; - std::atomic<std::uint64_t> nextFiberFromLocal; - std::atomic<std::uint64_t> nextFiberStolen; + WsScheduler scheduler; + WsDispatcher dispatcher; - WsStrategy() : scheduledFibers(0), nextFiberFromLocal(0), nextFiberStolen(0) {} + WsStrategy(Runtime& runtime); - auto getScheduler(Runtime& runtime) -> Scheduler& override; + auto getScheduler() -> WsScheduler& override; - auto getDispatcher(Runtime& runtime) -> Dispatcher& override; + auto getDispatcher() -> WsDispatcher& override; public: auto getStats() -> std::shared_ptr<RuntimeStrategyStats> override; - static WsStrategy INSTANCE; - friend WsScheduler; friend WsDispatcher; + friend WsStrategyFactory; friend WsStrategyStats; }; diff --git a/emper/strategies/ws/WsStrategyFactory.cpp b/emper/strategies/ws/WsStrategyFactory.cpp new file mode 100644 index 00000000..fd5151d0 --- /dev/null +++ b/emper/strategies/ws/WsStrategyFactory.cpp @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus +#include "strategies/ws/WsStrategyFactory.hpp" + +class Runtime; + +WsStrategyFactory WsStrategyFactory::INSTANCE; + +auto WsStrategyFactory::constructRuntimeStrategy(Runtime& runtime) -> WsStrategy* { + return new WsStrategy(runtime); +} diff --git a/emper/strategies/ws/WsStrategyFactory.hpp b/emper/strategies/ws/WsStrategyFactory.hpp new file mode 100644 index 00000000..a3c7d04c --- /dev/null +++ b/emper/strategies/ws/WsStrategyFactory.hpp @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus +#pragma once + +#include "RuntimeStrategyFactory.hpp" +#include "strategies/ws/WsStrategy.hpp" + +class Runtime; + +class WsStrategyFactory : public RuntimeStrategyFactory { + public: + auto constructRuntimeStrategy(Runtime& runtime) -> WsStrategy* override; + + static WsStrategyFactory INSTANCE; +}; diff --git a/emper/strategies/ws/WsStrategyStats.cpp b/emper/strategies/ws/WsStrategyStats.cpp index e20cf34f..98568ab6 100644 --- a/emper/strategies/ws/WsStrategyStats.cpp +++ b/emper/strategies/ws/WsStrategyStats.cpp @@ -1,26 +1,7 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #include "WsStrategyStats.hpp" -#include <atomic> -#include <iostream> +#include "strategies/ws/WsStrategy.hpp" -#include "WsStrategy.hpp" - -WsStrategyStats::WsStrategyStats(WsStrategy& wsStrategy) - : scheduledFibers(wsStrategy.scheduledFibers), - nextFiberFromLocal(wsStrategy.nextFiberFromLocal), - nextFiberStolen(wsStrategy.nextFiberStolen) {} - -auto WsStrategyStats::getScheduledFibers() const -> uint64_t { return scheduledFibers; } - -auto WsStrategyStats::getNextFiberFromLocal() const -> uint64_t { return nextFiberFromLocal; } - -auto WsStrategyStats::getNextFiberStolen() const -> uint64_t { return nextFiberStolen; } - -void WsStrategyStats::print() { - std::cout << "WsStrategyStats" - << " scheduledFibers:" << scheduledFibers - << " nextFiberFromLocal:" << nextFiberFromLocal - << " nextFiberStolen:" << nextFiberStolen << std::endl; -} +WsStrategyStats::WsStrategyStats(WsStrategy& wsStrategy) : AbstractWorkStealingStats(wsStrategy) {} diff --git a/emper/strategies/ws/WsStrategyStats.hpp b/emper/strategies/ws/WsStrategyStats.hpp index 85249a5e..8845f824 100644 --- a/emper/strategies/ws/WsStrategyStats.hpp +++ b/emper/strategies/ws/WsStrategyStats.hpp @@ -2,24 +2,11 @@ // Copyright © 2020 Florian Schmaus #pragma once -#include <cstdint> - -#include "RuntimeStrategyStats.hpp" +#include "strategies/AbstractWorkStealingStats.hpp" class WsStrategy; -class WsStrategyStats : public RuntimeStrategyStats { - private: - const uint64_t scheduledFibers; - const uint64_t nextFiberFromLocal; - const uint64_t nextFiberStolen; - +class WsStrategyStats : public AbstractWorkStealingStats { public: WsStrategyStats(WsStrategy& wsStrategy); - - [[nodiscard]] auto getScheduledFibers() const -> uint64_t; - [[nodiscard]] auto getNextFiberFromLocal() const -> uint64_t; - [[nodiscard]] auto getNextFiberStolen() const -> uint64_t; - - void print() override; }; diff --git a/emper/strategies/ws/meson.build b/emper/strategies/ws/meson.build index 6554bc5a..3cc10e2e 100644 --- a/emper/strategies/ws/meson.build +++ b/emper/strategies/ws/meson.build @@ -1,5 +1,6 @@ emper_cpp_sources += files( 'WsStrategy.cpp', + 'WsStrategyFactory.cpp', 'WsStrategyStats.cpp', 'WsScheduler.cpp', 'WsDispatcher.cpp', diff --git a/eval/Locality.cpp b/eval/Locality.cpp index 24c7249b..26ddb8ae 100644 --- a/eval/Locality.cpp +++ b/eval/Locality.cpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #include <unistd.h> // for getopt, optarg #include <algorithm> // for generate @@ -10,14 +10,14 @@ #include <random> // for mt19937, uniform_int_dis... #include <string> // for string, operator<<, oper... -#include "CountingPrivateSemaphore.hpp" // for CPS -#include "Debug.hpp" // for DBG -#include "Fiber.hpp" // for Fiber, Fiber::NOT_AFFINE -#include "PrivateSemaphore.hpp" // for PS -#include "Runtime.hpp" // for Runtime -#include "emper-common.h" // for workeraffinity_t, UNUSED... -#include "lib/DebugUtil.hpp" // for enableStacktraceOnAborts -#include "strategies/laws/LawsStrategy.hpp" // for LawsStrategy, LawsStrate... +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "Debug.hpp" // for DBG +#include "Fiber.hpp" // for Fiber, Fiber::NOT_AFFINE +#include "PrivateSemaphore.hpp" // for PS +#include "Runtime.hpp" // for Runtime +#include "emper-common.h" // for workeraffinity_t, UNUSED... +#include "lib/DebugUtil.hpp" // for enableStacktraceOnAborts +#include "strategies/laws/LawsStrategyFactory.hpp" // for LawsStrategy, LawsStrate... #define L1_CACHE_LINE_SIZE 64 // 64 Bytes @@ -253,7 +253,7 @@ auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { runtime = new Runtime(); break; case wslh: - runtime = new Runtime(LawsStrategy::INSTANCE); + runtime = new Runtime(LawsStrategyFactory::INSTANCE); break; } diff --git a/meson.build b/meson.build index d0ae395f..ba757b68 100644 --- a/meson.build +++ b/meson.build @@ -31,6 +31,7 @@ conf_data.set('EMPER_LOCKED_WS_QUEUE', get_option('locked_ws_queue')) conf_data.set('EMPER_OVERFLOW_QUEUE', get_option('overflow_queue')) conf_data.set('EMPER_LOCKED_MPSC_QUEUE', get_option('locked_mpsc_queue')) conf_data.set('EMPER_STATS', get_option('stats')) +conf_data.set('EMPER_OVERFLOW_QUEUE', get_option('overflow_queue')) default_scheduling_strategy = get_option('default_scheduling_strategy') conf_data.set('EMPER_DEFAULT_SCHEDULING_STRATEGY_' + default_scheduling_strategy.to_upper(), true) diff --git a/meson_options.txt b/meson_options.txt index 2fcd1b2c..e4f8a399 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -62,3 +62,9 @@ option( ], value: 'work_stealing', ) +option( + 'overflow_queue', + type: 'boolean', + value: true, + description: 'Use an overflow queue in case scheduling queues become full', +) diff --git a/tests/SimpleLawsTest.cpp b/tests/SimpleLawsTest.cpp index c35cd856..217d2fd2 100644 --- a/tests/SimpleLawsTest.cpp +++ b/tests/SimpleLawsTest.cpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus #include <atomic> // for atomic, __atomic_base #include <cstdint> // for uint64_t, UINT64_MAX #include <cstdlib> // for free, exit, EXIT_FAILURE @@ -7,14 +7,14 @@ #include <iostream> // for operator<<, endl, basic_... #include <random> // for mt19937_64, random_device -#include "Common.hpp" // for ALIGN_TO_CACHE_LINE -#include "CountingPrivateSemaphore.hpp" // for CPS -#include "Fiber.hpp" // for Fiber, Fiber::NOT_AFFINE -#include "Runtime.hpp" // for Runtime -#include "emper-common.h" // for UNUSED_ARG, workeraffini... -#include "strategies/laws/LawsStrategy.hpp" // for LawsStrategy, LawsStrate... +#include "Common.hpp" // for ALIGN_TO_CACHE_LINE +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "Fiber.hpp" // for Fiber, Fiber::NOT_AFFINE +#include "Runtime.hpp" // for Runtime +#include "emper-common.h" // for UNUSED_ARG, workeraffini... +#include "strategies/laws/LawsStrategyFactory.hpp" -class RuntimeStrategy; +class RuntimeStrategyFactory; static const unsigned int ROUND_COUNT = 10; static const unsigned int FIBER_LOOPS = 10; @@ -103,8 +103,8 @@ static void alphaFun() { } auto main(UNUSED_ARG int args, UNUSED_ARG char* argv[]) -> int { - RuntimeStrategy& lawsStrategy = LawsStrategy::INSTANCE; - Runtime runtime(lawsStrategy); + RuntimeStrategyFactory& lawsStrategyFactory = LawsStrategyFactory::INSTANCE; + Runtime runtime(lawsStrategyFactory); Fiber* alphaFiber = Fiber::from(&alphaFun); -- GitLab