From 0d7248ab31a8e338c99f5936796b67b39d461f9a Mon Sep 17 00:00:00 2001 From: Florian Schmaus <flow@cs.fau.de> Date: Mon, 7 Dec 2020 11:48:33 +0100 Subject: [PATCH] Add Runtime::scheduleFromAnywhere(), remove mainThreadQueue In debug builds, the schedule() method is now guarded by an assert(), as due to this change, the schedule() method will no longer work from everywhere. This also improves the worker sleep method. --- apps/Main.cpp | 2 +- apps/WorkerSleepExample.cpp | 2 +- emper/Blockable.hpp | 4 ++ emper/CallerEnvironment.hpp | 8 ++++ emper/PrivateSemaphore.hpp | 1 + emper/Runtime.cpp | 10 ++-- emper/Runtime.hpp | 52 ++++++++++++++++++--- emper/Scheduler.cpp | 8 +++- emper/Scheduler.hpp | 44 ++++++++++++++++- emper/Semaphore.hpp | 1 + emper/UnboundedBlockingMpscQueue.hpp | 1 + emper/c_emper.cpp | 6 +++ emper/include/emper.h | 2 + emper/strategies/laws/LawsDispatcher.cpp | 4 +- emper/strategies/laws/LawsScheduler.cpp | 17 +++---- emper/strategies/laws/LawsStrategy.hpp | 6 +-- emper/strategies/laws/LawsStrategyStats.cpp | 8 ++-- emper/strategies/laws/LawsStrategyStats.hpp | 4 +- emper/strategies/ws/WsScheduler.cpp | 13 ++---- emper/strategies/ws/WsScheduler.hpp | 2 - eval/TimeToSpawn.cpp | 2 +- tests/AlarmActorTest.cpp | 8 ++-- tests/CppApiTest.cpp | 12 +++-- tests/SimpleActorTest.cpp | 2 +- tests/SimpleFibTest.cpp | 2 +- tests/SimpleLawsTest.cpp | 2 +- tests/SimpleUrcuTest.cpp | 2 +- tests/SimplestFibTest.cpp | 2 +- tests/c_api_test.c | 2 +- 29 files changed, 163 insertions(+), 66 deletions(-) create mode 100644 emper/CallerEnvironment.hpp diff --git a/apps/Main.cpp b/apps/Main.cpp index d74f1ea7..0ab7d437 100644 --- a/apps/Main.cpp +++ b/apps/Main.cpp @@ -82,7 +82,7 @@ auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { std::cout << "Just alloacted alpha fiber at " << fibFiber << std::endl; - runtime.schedule(*fibFiber); + runtime.scheduleFromAnywhere(*fibFiber); runtime.waitUntilFinished(); diff --git a/apps/WorkerSleepExample.cpp b/apps/WorkerSleepExample.cpp index f3426d91..0eef863e 100644 --- a/apps/WorkerSleepExample.cpp +++ b/apps/WorkerSleepExample.cpp @@ -59,7 +59,7 @@ auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { std::cout << "Just alloacted alpha fiber at " << fibFiber << std::endl; - runtime.schedule(*fibFiber); + runtime.scheduleFromAnywhere(*fibFiber); runtime.waitUntilFinished(); diff --git a/emper/Blockable.hpp b/emper/Blockable.hpp index 3bab4824..162f89fe 100644 --- a/emper/Blockable.hpp +++ b/emper/Blockable.hpp @@ -4,6 +4,7 @@ #include <utility> +#include "CallerEnvironment.hpp" #include "Common.hpp" #include "Context.hpp" #include "ContextManager.hpp" @@ -24,6 +25,9 @@ class Blockable : public Logger<logSubsystem> { // this error if std::move() is used (as we do). // NOLINTNEXTLINE(performance-unnecessary-value-param) void block(func_t freshContextHook) { + // Only contexts managed by EMPER's runtime can block. + assert(Runtime::inRuntime()); + LOGD("block() blockedContext is " << Context::getCurrentContext()); contextManager.saveAndStartNew(std::move(freshContextHook)); } diff --git a/emper/CallerEnvironment.hpp b/emper/CallerEnvironment.hpp new file mode 100644 index 00000000..b06fb419 --- /dev/null +++ b/emper/CallerEnvironment.hpp @@ -0,0 +1,8 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#pragma once + +enum CallerEnvironment { + EMPER, + ANYWHERE, +}; diff --git a/emper/PrivateSemaphore.hpp b/emper/PrivateSemaphore.hpp index 5fa679db..526c5e3a 100644 --- a/emper/PrivateSemaphore.hpp +++ b/emper/PrivateSemaphore.hpp @@ -3,6 +3,7 @@ #pragma once #include "Blockable.hpp" +#include "CallerEnvironment.hpp" #include "Context.hpp" #include "ContextManager.hpp" #include "Debug.hpp" diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index a1254135..5858198b 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -17,7 +17,6 @@ #include "Common.hpp" // for DIE_MSG_ERRNO, DIE, DIE_MSG #include "ContextManager.hpp" // for ContextManager #include "Debug.hpp" // for DBG, ABORT, LOGD, LOGE -#include "Dispatcher.hpp" // for Dispatcher #include "Emper.hpp" #include "Fiber.hpp" // for Fiber #include "RuntimeStrategy.hpp" // for RuntimeStrategy @@ -39,6 +38,7 @@ std::mutex Runtime::currentRuntimeMutex; Runtime* Runtime::currentRuntime; +thread_local bool Runtime::workerThread = false; thread_local unsigned int Runtime::seed; thread_local workerid_t Runtime::workerId; RuntimeStrategy& Runtime::DEFAULT_STRATEGY = WsStrategy::INSTANCE; @@ -51,7 +51,8 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy, unsigned int dispatcher(strategy.getDispatcher(*this)), contextManager(*(new ContextManager(*this))), randomEngine(seed), - atLeastOneWorkerIsSleeping(false) { + atLeastOneWorkerIsSleeping(false), + skipSleep(false) { threads = new pthread_t[workerCount]; workerArgs = new struct WorkerArgs[workerCount]; @@ -130,6 +131,7 @@ Runtime::~Runtime() { } auto Runtime::workerLoop(struct WorkerArgs* workerArgs) -> void* { + workerThread = true; workerId = workerArgs->id; seed = workerArgs->seed; @@ -174,8 +176,6 @@ void Runtime::printLastRuntimeStats() { currentRuntime->printStats(); } -auto Runtime::inRuntime() -> bool { return Dispatcher::isDispatchedControlFlow(); } - void Runtime::executeAndWait(std::function<void()> f) { if (inRuntime()) { ABORT("Ca not use executeAndWait() from within the Runtime"); @@ -190,7 +190,7 @@ void Runtime::executeAndWait(std::function<void()> f) { fiberFinished.unlock(); }); - schedule(*fiber); + scheduleFromAnywhere(*fiber); fiberFinished.lock(); } diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 95c35b8c..cbb9e02b 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -5,6 +5,7 @@ #include <pthread.h> // for pthread_t #include <atomic> // for atomic, memory_order_relaxed +#include <cassert> // for assert #include <condition_variable> // for condition_variable #include <cstddef> // for size_t #include <cstdint> // for intptr_t @@ -25,11 +26,17 @@ class Dispatcher; class Fiber; class RuntimeStrategy; +enum WakeupMode { + IF_SLEEPING_OBSERVED, + ALWAYS, +}; + class Runtime : public Logger<LogSubsystem::RUNTI> { private: static std::mutex currentRuntimeMutex; static Runtime* currentRuntime; + static thread_local bool workerThread; static thread_local unsigned int seed; static thread_local workerid_t workerId; @@ -57,9 +64,10 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { auto workerLoop(struct WorkerArgs* workerArgs) -> void*; - std::mutex workerSleepMutex; + ALIGN_TO_CACHE_LINE std::mutex workerSleepMutex; std::condition_variable workerSleepConditionVariable; ALIGN_TO_CACHE_LINE std::atomic<bool> atLeastOneWorkerIsSleeping; + bool skipSleep; static RuntimeStrategy& DEFAULT_STRATEGY; @@ -68,18 +76,41 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { protected: void addNewWorkerHook(const std::function<void(void)>& hook) { newWorkerHooks.push_back(hook); }; - inline void notifyAboutNewWork() { - if (!atLeastOneWorkerIsSleeping.load(std::memory_order_relaxed)) return; + template <WakeupMode wakeupMode = WakeupMode::IF_SLEEPING_OBSERVED> + inline void wakeupSleepingWorkers() { + if constexpr (wakeupMode == WakeupMode::IF_SLEEPING_OBSERVED) { + // If we observe no worker sleeping, then we do not try to + // attempt to wakeup one. Note that this is racy, i.e., just + // because we do not see any worker sleeping, does not mean that + // one (or all but us, as a matter of a fact) is about to + // sleep. However, in the worst case, this causes a worker to + // sleep longer, until this worker (or another active worker) + // schedules another fiber, when this method will be called + // again. And then it is likely that the scheduling worker will + // observe the sleeping worker. + if (!atLeastOneWorkerIsSleeping.load(std::memory_order_relaxed)) { + return; + } + } std::lock_guard<std::mutex> lk(workerSleepMutex); + skipSleep = true; + atLeastOneWorkerIsSleeping = false; workerSleepConditionVariable.notify_all(); } void dispatcherLoopSleep() { + if (skipSleep) { + skipSleep = false; + return; + } + std::unique_lock<std::mutex> lk(workerSleepMutex); - atLeastOneWorkerIsSleeping.store(true, std::memory_order_relaxed); + // Check again if "skip sleep" has been set. + if (skipSleep) return; + + atLeastOneWorkerIsSleeping = true; workerSleepConditionVariable.wait(lk); - atLeastOneWorkerIsSleeping.store(false, std::memory_order_relaxed); } public: @@ -94,7 +125,14 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { ~Runtime(); - inline void schedule(Fiber& fiber) { scheduler.schedule(fiber); } + inline void schedule(Fiber& fiber) { + // Calling schedule() only works from within the EMPER runtime. + assert(inRuntime()); + + scheduler.schedule(fiber); + } + + inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); } auto nextFiber() -> Fiber*; @@ -119,7 +157,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { void printStats(); - static auto inRuntime() -> bool; + static auto inRuntime() -> bool { return workerThread; } void executeAndWait(std::function<void()> f); diff --git a/emper/Scheduler.cpp b/emper/Scheduler.cpp index aa8c43e9..ab828905 100644 --- a/emper/Scheduler.cpp +++ b/emper/Scheduler.cpp @@ -10,4 +10,10 @@ void Scheduler::addNewWorkerHook(const std::function<void(void)>& hook) { runtime.addNewWorkerHook(hook); } -void Scheduler::notifyRuntimeAboutNewWork() { runtime.notifyAboutNewWork(); } +void Scheduler::wakeupSleepingWorkersIfSleepingObserved() { + runtime.wakeupSleepingWorkers<WakeupMode::IF_SLEEPING_OBSERVED>(); +} + +void Scheduler::wakeupSleepingWorkersAlways() { + runtime.wakeupSleepingWorkers<WakeupMode::ALWAYS>(); +} diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index e4cdcd3b..de4ff849 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -2,15 +2,28 @@ // Copyright © 2020 Florian Schmaus #pragma once +#include <algorithm> // for copy, copy_backward #include <functional> // for function +#include <mutex> +#include <queue> -#include "Debug.hpp" // for LogSubsystem, LogSubsystem::SCHED, Logger +#include "CallerEnvironment.hpp" +#include "Debug.hpp" // for LogSubsystem, LogSubsystem::SCHED, Logger +#include "Emper.hpp" #include "Fiber.hpp" // for Fiber #include "emper-common.h" // for workeraffinity_t class Runtime; class Scheduler : public Logger<LogSubsystem::SCHED> { + private: + std::queue<Fiber*> scheduleAnywhereQueue; + std::mutex scheduleAnywhereQueueMutex; + + void wakeupSleepingWorkersIfSleepingObserved(); + + void wakeupSleepingWorkersAlways(); + protected: Runtime& runtime; Scheduler(Runtime& runtime); @@ -23,10 +36,37 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { static inline void increaseRefCount(Fiber& fiber) { fiber.doAtomicIncrRefCount(); } - void notifyRuntimeAboutNewWork(); + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + inline void onNewWork() { + if constexpr (emper::WORKER_SLEEP) { + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + wakeupSleepingWorkersIfSleepingObserved(); + } else { + wakeupSleepingWorkersAlways(); + } + } + } + + auto dequeFiberFromAnywhereQueue() -> Fiber* { + std::lock_guard<std::mutex> lock(scheduleAnywhereQueueMutex); + if (scheduleAnywhereQueue.empty()) return nullptr; + + Fiber* res = scheduleAnywhereQueue.front(); + scheduleAnywhereQueue.pop(); + return res; + } public: virtual void schedule(Fiber& fiber) = 0; virtual auto nextFiber() -> Fiber* = 0; + + void scheduleFromAnywhere(Fiber& fiber) { + { + std::lock_guard<std::mutex> lock(scheduleAnywhereQueueMutex); + scheduleAnywhereQueue.push(&fiber); + } + + onNewWork<CallerEnvironment::ANYWHERE>(); + } }; diff --git a/emper/Semaphore.hpp b/emper/Semaphore.hpp index 8753855d..be116370 100644 --- a/emper/Semaphore.hpp +++ b/emper/Semaphore.hpp @@ -6,6 +6,7 @@ #include <queue> #include "BinaryPrivateSemaphore.hpp" +#include "CallerEnvironment.hpp" namespace emper { diff --git a/emper/UnboundedBlockingMpscQueue.hpp b/emper/UnboundedBlockingMpscQueue.hpp index 68acb803..f9841b30 100644 --- a/emper/UnboundedBlockingMpscQueue.hpp +++ b/emper/UnboundedBlockingMpscQueue.hpp @@ -6,6 +6,7 @@ #include <queue> #include "Blockable.hpp" +#include "CallerEnvironment.hpp" #include "Context.hpp" template <typename T> diff --git a/emper/c_emper.cpp b/emper/c_emper.cpp index 79e4c730..6822dee6 100644 --- a/emper/c_emper.cpp +++ b/emper/c_emper.cpp @@ -49,6 +49,12 @@ void schedule(fiber* fiber) { runtime->schedule(*f); } +void schedule_from_anywhere(fiber* fiber) { + Runtime* runtime = Runtime::getRuntime(); + auto* f = reinterpret_cast<Fiber*>(fiber); + runtime->scheduleFromAnywhere(*f); +} + auto new_binary_sem() -> bps* { BPS* sem = new BPS(); return reinterpret_cast<bps*>(sem); diff --git a/emper/include/emper.h b/emper/include/emper.h index 05a907c3..e93cbf08 100644 --- a/emper/include/emper.h +++ b/emper/include/emper.h @@ -37,6 +37,8 @@ void init_affinity(workeraffinity_t affinity[], unsigned int n); void schedule(fiber* fiber); +void schedule_from_anywhere(fiber* fiber); + // NOLINTNEXTLINE(modernize-use-trailing-return-type) bps* new_binary_sem(void); diff --git a/emper/strategies/laws/LawsDispatcher.cpp b/emper/strategies/laws/LawsDispatcher.cpp index 04bfbde1..2ece84a4 100644 --- a/emper/strategies/laws/LawsDispatcher.cpp +++ b/emper/strategies/laws/LawsDispatcher.cpp @@ -35,8 +35,8 @@ void LawsDispatcher::dispatchLoop() { case LawsStrategy::FiberSource::stolen: lawsStrategy.dispatchedFiberStolen.fetch_add(1, std::memory_order_relaxed); break; - case LawsStrategy::FiberSource::mainThread: - lawsStrategy.dispatchedFiberFromMainThread.fetch_add(1, std::memory_order_relaxed); + case LawsStrategy::FiberSource::anywhereQueue: + lawsStrategy.dispatchedFiberFromAnywhere.fetch_add(1, std::memory_order_relaxed); break; default: DIE_MSG("Unknown fiber flag: " << flag); diff --git a/emper/strategies/laws/LawsScheduler.cpp b/emper/strategies/laws/LawsScheduler.cpp index 2c867f18..6473407a 100644 --- a/emper/strategies/laws/LawsScheduler.cpp +++ b/emper/strategies/laws/LawsScheduler.cpp @@ -68,9 +68,7 @@ scheduleToLocalWsQueue: } } - if constexpr (emper::WORKER_SLEEP) { - notifyRuntimeAboutNewWork(); - } + onNewWork(); } auto LawsScheduler::nextFiber() -> Fiber* { @@ -114,16 +112,13 @@ auto LawsScheduler::nextFiber() -> Fiber* { // TODO: Reduce pressure on mainThreadQueue by only checking every N-th time. - // Try the main thread queue to get work as last resort. - poped = mainThreadQueue->popTop(&fiber); - if (poped) { + // Try the "scheduled from anywhere" queue to get work as last resort. + fiber = dequeFiberFromAnywhereQueue(); + if (fiber) { if constexpr (emper::STATS) { - auto flag = static_cast<unsigned int>(LawsStrategy::FiberSource::mainThread); + auto flag = static_cast<unsigned int>(LawsStrategy::FiberSource::anywhereQueue); fiber->setFlag(flag); } - - return fiber; } - - return nullptr; + return fiber; } diff --git a/emper/strategies/laws/LawsStrategy.hpp b/emper/strategies/laws/LawsStrategy.hpp index 3ec3e14e..7d56702b 100644 --- a/emper/strategies/laws/LawsStrategy.hpp +++ b/emper/strategies/laws/LawsStrategy.hpp @@ -22,7 +22,7 @@ class LawsStrategy : public RuntimeStrategy { fromPriority, fromLocal, stolen, - mainThread, + anywhereQueue, }; std::atomic<std::uint64_t> scheduledFibersToRemotePriority; @@ -30,7 +30,7 @@ class LawsStrategy : public RuntimeStrategy { std::atomic<std::uint64_t> dispatchedFiberFromPriority; std::atomic<std::uint64_t> dispatchedFiberFromLocal; std::atomic<std::uint64_t> dispatchedFiberStolen; - std::atomic<std::uint64_t> dispatchedFiberFromMainThread; + std::atomic<std::uint64_t> dispatchedFiberFromAnywhere; LawsStrategy() : scheduledFibersToRemotePriority(0), @@ -38,7 +38,7 @@ class LawsStrategy : public RuntimeStrategy { dispatchedFiberFromPriority(0), dispatchedFiberFromLocal(0), dispatchedFiberStolen(0), - dispatchedFiberFromMainThread(0) {} + dispatchedFiberFromAnywhere(0) {} auto getScheduler(Runtime& runtime) -> Scheduler& override; diff --git a/emper/strategies/laws/LawsStrategyStats.cpp b/emper/strategies/laws/LawsStrategyStats.cpp index 7e0aac9a..c1acec27 100644 --- a/emper/strategies/laws/LawsStrategyStats.cpp +++ b/emper/strategies/laws/LawsStrategyStats.cpp @@ -13,7 +13,7 @@ LawsStrategyStats::LawsStrategyStats(LawsStrategy& lawsStrategy) dispatchedFiberFromPriority(lawsStrategy.dispatchedFiberFromPriority), dispatchedFiberFromLocal(lawsStrategy.dispatchedFiberFromLocal), dispatchedFiberStolen(lawsStrategy.dispatchedFiberStolen), - dispatchedFiberFromMainThread(lawsStrategy.dispatchedFiberFromMainThread) {} + dispatchedFiberFromAnywhere(lawsStrategy.dispatchedFiberFromAnywhere) {} auto LawsStrategyStats::getScheduledFibersToRemotePriority() const -> uint64_t { return scheduledFibersToRemotePriority; @@ -35,8 +35,8 @@ auto LawsStrategyStats::getDispatchedFiberStolen() const -> uint64_t { return dispatchedFiberStolen; } -auto LawsStrategyStats::getDispatchedFiberFromMainThread() const -> uint64_t { - return dispatchedFiberFromMainThread; +auto LawsStrategyStats::getDispatchedFiberFromAnywhere() const -> uint64_t { + return dispatchedFiberFromAnywhere; } void LawsStrategyStats::print() { @@ -46,5 +46,5 @@ void LawsStrategyStats::print() { << " dispatchedFiberFromPriority:" << dispatchedFiberFromPriority << " dispatchedFiberFromLocal:" << dispatchedFiberFromLocal << " dispatchedFiberStolen:" << dispatchedFiberStolen - << " dispatchedFiberFromMainThread:" << dispatchedFiberFromMainThread << std::endl; + << " dispatchedFiberFromAnywhere:" << dispatchedFiberFromAnywhere << std::endl; } diff --git a/emper/strategies/laws/LawsStrategyStats.hpp b/emper/strategies/laws/LawsStrategyStats.hpp index ca0ae2c2..48de230b 100644 --- a/emper/strategies/laws/LawsStrategyStats.hpp +++ b/emper/strategies/laws/LawsStrategyStats.hpp @@ -15,7 +15,7 @@ class LawsStrategyStats : public RuntimeStrategyStats { const uint64_t dispatchedFiberFromPriority; const uint64_t dispatchedFiberFromLocal; const uint64_t dispatchedFiberStolen; - const uint64_t dispatchedFiberFromMainThread; + const uint64_t dispatchedFiberFromAnywhere; public: LawsStrategyStats(LawsStrategy& lawsStrategy); @@ -25,7 +25,7 @@ class LawsStrategyStats : public RuntimeStrategyStats { [[nodiscard]] auto getDispatchedFiberFromPriority() const -> uint64_t; [[nodiscard]] auto getDispatchedFiberFromLocal() const -> uint64_t; [[nodiscard]] auto getDispatchedFiberStolen() const -> uint64_t; - [[nodiscard]] auto getDispatchedFiberFromMainThread() const -> uint64_t; + [[nodiscard]] auto getDispatchedFiberFromAnywhere() const -> uint64_t; void print() override; }; diff --git a/emper/strategies/ws/WsScheduler.cpp b/emper/strategies/ws/WsScheduler.cpp index 6b081a26..38636e34 100644 --- a/emper/strategies/ws/WsScheduler.cpp +++ b/emper/strategies/ws/WsScheduler.cpp @@ -19,7 +19,6 @@ WsScheduler::WsScheduler(Runtime& runtime, WsStrategy& wsStrategy) : Scheduler(runtime), wsStrategy(wsStrategy) { const workerid_t workerCount = runtime.getWorkerCount(); queues = new WsScheduler::WsQueue<QUEUE_SIZE>*[workerCount]; - mainThreadQueue = &queue; auto newWorkerHook = [this]() { queues[Runtime::getWorkerId()] = &queue; }; addNewWorkerHook(newWorkerHook); @@ -41,9 +40,7 @@ void WsScheduler::schedule(Fiber& fiber) { wsStrategy.scheduledFibers.fetch_add(1, std::memory_order_relaxed); } - if constexpr (emper::WORKER_SLEEP) { - notifyRuntimeAboutNewWork(); - } + onNewWork(); } auto WsScheduler::nextFiber() -> Fiber* { @@ -76,9 +73,7 @@ auto WsScheduler::nextFiber() -> Fiber* { } } - // Try the main thread queue to get work as last resort. - poped = mainThreadQueue->popTop(&fiber); - if (poped) return fiber; - - return nullptr; + // Try the "scheduled from anywhere" queue to get work as last resort. + fiber = dequeFiberFromAnywhereQueue(); + return fiber; } diff --git a/emper/strategies/ws/WsScheduler.hpp b/emper/strategies/ws/WsScheduler.hpp index da1db2bb..569a9034 100644 --- a/emper/strategies/ws/WsScheduler.hpp +++ b/emper/strategies/ws/WsScheduler.hpp @@ -27,8 +27,6 @@ class WsScheduler : public Scheduler { WsQueue<QUEUE_SIZE>** queues; static thread_local WsQueue<QUEUE_SIZE> queue; - WsQueue<QUEUE_SIZE>* mainThreadQueue; - #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wattributes" WsStrategy& wsStrategy diff --git a/eval/TimeToSpawn.cpp b/eval/TimeToSpawn.cpp index 36e4d7d7..06102bfa 100644 --- a/eval/TimeToSpawn.cpp +++ b/eval/TimeToSpawn.cpp @@ -41,7 +41,7 @@ static auto fiberTimeToSpawn() -> std::chrono::nanoseconds { res = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start); testFinished.unlock(); }); - runtime.schedule(*fiber); + runtime.scheduleFromAnywhere(*fiber); testFinished.lock(); diff --git a/tests/AlarmActorTest.cpp b/tests/AlarmActorTest.cpp index bad812f0..580c45d6 100644 --- a/tests/AlarmActorTest.cpp +++ b/tests/AlarmActorTest.cpp @@ -40,10 +40,10 @@ auto main(int argc, char* argv[]) -> int { Runtime runtime; - AlarmActor alarmActor(runtime); - alarmActor.start(); - Fiber* fiber = Fiber::from([&] { + AlarmActor alarmActor(runtime); + alarmActor.start(); + CPS cps; for (unsigned int i = 0; i < sleeper_count; ++i) { spawn( @@ -72,7 +72,7 @@ auto main(int argc, char* argv[]) -> int { exit(EXIT_SUCCESS); }); - runtime.schedule(*fiber); + runtime.scheduleFromAnywhere(*fiber); runtime.waitUntilFinished(); diff --git a/tests/CppApiTest.cpp b/tests/CppApiTest.cpp index b61ee9e0..cbdd8e1f 100644 --- a/tests/CppApiTest.cpp +++ b/tests/CppApiTest.cpp @@ -4,9 +4,10 @@ #include <cstdlib> // for exit, EXIT_FAILURE, EXIT_SUC... #include "CountingPrivateSemaphore.hpp" // for CountingPrivateSemaphore -#include "Runtime.hpp" // for Runtime -#include "emper-common.h" // for UNUSED_ARG -#include "emper.hpp" // for async, spawn +#include "Fiber.hpp" +#include "Runtime.hpp" // for Runtime +#include "emper-common.h" // for UNUSED_ARG +#include "emper.hpp" // for async, spawn static std::atomic_uint counter; @@ -30,10 +31,11 @@ static void mainFiber() { exit(EXIT_SUCCESS); } -auto main(UNUSED_ARG int arg, UNUSED_ARG char *argv[]) -> int { +auto main(UNUSED_ARG int arg, UNUSED_ARG char* argv[]) -> int { Runtime runtime; - async(&mainFiber); + Fiber* alphaFiber = Fiber::from([] { async(&mainFiber); }); + runtime.scheduleFromAnywhere(*alphaFiber); runtime.waitUntilFinished(); diff --git a/tests/SimpleActorTest.cpp b/tests/SimpleActorTest.cpp index 5fe140a5..fbd398a0 100644 --- a/tests/SimpleActorTest.cpp +++ b/tests/SimpleActorTest.cpp @@ -80,7 +80,7 @@ auto main(UNUSED_ARG int arg, UNUSED_ARG char* argv[]) -> int { Runtime runtime; Fiber* fiber = Fiber::from(mainFiber, (void*)&runtime); - runtime.schedule(*fiber); + runtime.scheduleFromAnywhere(*fiber); runtime.waitUntilFinished(); diff --git a/tests/SimpleFibTest.cpp b/tests/SimpleFibTest.cpp index aace0206..79bd1f72 100644 --- a/tests/SimpleFibTest.cpp +++ b/tests/SimpleFibTest.cpp @@ -74,7 +74,7 @@ auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { }, nullptr); - runtime.schedule(*fibFiber); + runtime.scheduleFromAnywhere(*fibFiber); runtime.waitUntilFinished(); diff --git a/tests/SimpleLawsTest.cpp b/tests/SimpleLawsTest.cpp index c4fb8177..c35cd856 100644 --- a/tests/SimpleLawsTest.cpp +++ b/tests/SimpleLawsTest.cpp @@ -108,7 +108,7 @@ auto main(UNUSED_ARG int args, UNUSED_ARG char* argv[]) -> int { Fiber* alphaFiber = Fiber::from(&alphaFun); - runtime.schedule(*alphaFiber); + runtime.scheduleFromAnywhere(*alphaFiber); runtime.waitUntilFinished(); diff --git a/tests/SimpleUrcuTest.cpp b/tests/SimpleUrcuTest.cpp index 75c552b9..39ab24a5 100644 --- a/tests/SimpleUrcuTest.cpp +++ b/tests/SimpleUrcuTest.cpp @@ -86,7 +86,7 @@ auto main() -> int { exit(EXIT_SUCCESS); }); - runtime.schedule(*verifier); + runtime.scheduleFromAnywhere(*verifier); runtime.waitUntilFinished(); return EXIT_FAILURE; diff --git a/tests/SimplestFibTest.cpp b/tests/SimplestFibTest.cpp index 6ff8afde..bd0700a3 100644 --- a/tests/SimplestFibTest.cpp +++ b/tests/SimplestFibTest.cpp @@ -89,7 +89,7 @@ auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { std::cout << "Just alloacted alpha fiber at " << fibFiber << std::endl; - runtime.schedule(*fibFiber); + runtime.scheduleFromAnywhere(*fibFiber); runtime.waitUntilFinished(); diff --git a/tests/c_api_test.c b/tests/c_api_test.c index a9ef015d..eb8eee42 100644 --- a/tests/c_api_test.c +++ b/tests/c_api_test.c @@ -46,7 +46,7 @@ int main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) { init_runtime(); fiber* alpha_fiber = fiber_from0(alpha_fun); - schedule(alpha_fiber); + schedule_from_anywhere(alpha_fiber); wait_until_runtime_finished(); -- GitLab