diff --git a/apps/Main.cpp b/apps/Main.cpp index d74f1ea740e163a2044a9cae18a2d4b4d186f027..0ab7d43739b7c3e07591ede377cd2e4eb73461a4 100644 --- a/apps/Main.cpp +++ b/apps/Main.cpp @@ -82,7 +82,7 @@ auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { std::cout << "Just alloacted alpha fiber at " << fibFiber << std::endl; - runtime.schedule(*fibFiber); + runtime.scheduleFromAnywhere(*fibFiber); runtime.waitUntilFinished(); diff --git a/apps/WorkerSleepExample.cpp b/apps/WorkerSleepExample.cpp index f3426d91e859fc88090c99749a75d7df673762c6..0eef863e3bbf3823a5e18e19c65e2082ff2c66d5 100644 --- a/apps/WorkerSleepExample.cpp +++ b/apps/WorkerSleepExample.cpp @@ -59,7 +59,7 @@ auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { std::cout << "Just alloacted alpha fiber at " << fibFiber << std::endl; - runtime.schedule(*fibFiber); + runtime.scheduleFromAnywhere(*fibFiber); runtime.waitUntilFinished(); diff --git a/emper/Blockable.hpp b/emper/Blockable.hpp index 3bab4824f33aff238616a30eec0f26b025e76af9..162f89fea2c18229d63c000f72a5fb4ad3c600a7 100644 --- a/emper/Blockable.hpp +++ b/emper/Blockable.hpp @@ -4,6 +4,7 @@ #include <utility> +#include "CallerEnvironment.hpp" #include "Common.hpp" #include "Context.hpp" #include "ContextManager.hpp" @@ -24,6 +25,9 @@ class Blockable : public Logger<logSubsystem> { // this error if std::move() is used (as we do). // NOLINTNEXTLINE(performance-unnecessary-value-param) void block(func_t freshContextHook) { + // Only contexts managed by EMPER's runtime can block. + assert(Runtime::inRuntime()); + LOGD("block() blockedContext is " << Context::getCurrentContext()); contextManager.saveAndStartNew(std::move(freshContextHook)); } diff --git a/emper/CallerEnvironment.hpp b/emper/CallerEnvironment.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b06fb4198e0a97a1f46fecc54e21e4e8a48d89d3 --- /dev/null +++ b/emper/CallerEnvironment.hpp @@ -0,0 +1,8 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#pragma once + +enum CallerEnvironment { + EMPER, + ANYWHERE, +}; diff --git a/emper/PrivateSemaphore.hpp b/emper/PrivateSemaphore.hpp index 5fa679db7f0904ccec6226b5077da27a09605098..526c5e3a9b21e21b4cb4a4236b028d229a9efda7 100644 --- a/emper/PrivateSemaphore.hpp +++ b/emper/PrivateSemaphore.hpp @@ -3,6 +3,7 @@ #pragma once #include "Blockable.hpp" +#include "CallerEnvironment.hpp" #include "Context.hpp" #include "ContextManager.hpp" #include "Debug.hpp" diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index a12541352b966c1af66748f7bab2eb5aaeb390cd..5858198bf9297ecd7858bca0da579b6bed0ecdc3 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -17,7 +17,6 @@ #include "Common.hpp" // for DIE_MSG_ERRNO, DIE, DIE_MSG #include "ContextManager.hpp" // for ContextManager #include "Debug.hpp" // for DBG, ABORT, LOGD, LOGE -#include "Dispatcher.hpp" // for Dispatcher #include "Emper.hpp" #include "Fiber.hpp" // for Fiber #include "RuntimeStrategy.hpp" // for RuntimeStrategy @@ -39,6 +38,7 @@ std::mutex Runtime::currentRuntimeMutex; Runtime* Runtime::currentRuntime; +thread_local bool Runtime::workerThread = false; thread_local unsigned int Runtime::seed; thread_local workerid_t Runtime::workerId; RuntimeStrategy& Runtime::DEFAULT_STRATEGY = WsStrategy::INSTANCE; @@ -51,7 +51,8 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy, unsigned int dispatcher(strategy.getDispatcher(*this)), contextManager(*(new ContextManager(*this))), randomEngine(seed), - atLeastOneWorkerIsSleeping(false) { + atLeastOneWorkerIsSleeping(false), + skipSleep(false) { threads = new pthread_t[workerCount]; workerArgs = new struct WorkerArgs[workerCount]; @@ -130,6 +131,7 @@ Runtime::~Runtime() { } auto Runtime::workerLoop(struct WorkerArgs* workerArgs) -> void* { + workerThread = true; workerId = workerArgs->id; seed = workerArgs->seed; @@ -174,8 +176,6 @@ void Runtime::printLastRuntimeStats() { currentRuntime->printStats(); } -auto Runtime::inRuntime() -> bool { return Dispatcher::isDispatchedControlFlow(); } - void Runtime::executeAndWait(std::function<void()> f) { if (inRuntime()) { ABORT("Ca not use executeAndWait() from within the Runtime"); @@ -190,7 +190,7 @@ void Runtime::executeAndWait(std::function<void()> f) { fiberFinished.unlock(); }); - schedule(*fiber); + scheduleFromAnywhere(*fiber); fiberFinished.lock(); } diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 95c35b8c11c76d713195d13b6cc37c487ed92c05..cbb9e02b20107a94562bb4ae2870df9ab3496437 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -5,6 +5,7 @@ #include <pthread.h> // for pthread_t #include <atomic> // for atomic, memory_order_relaxed +#include <cassert> // for assert #include <condition_variable> // for condition_variable #include <cstddef> // for size_t #include <cstdint> // for intptr_t @@ -25,11 +26,17 @@ class Dispatcher; class Fiber; class RuntimeStrategy; +enum WakeupMode { + IF_SLEEPING_OBSERVED, + ALWAYS, +}; + class Runtime : public Logger<LogSubsystem::RUNTI> { private: static std::mutex currentRuntimeMutex; static Runtime* currentRuntime; + static thread_local bool workerThread; static thread_local unsigned int seed; static thread_local workerid_t workerId; @@ -57,9 +64,10 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { auto workerLoop(struct WorkerArgs* workerArgs) -> void*; - std::mutex workerSleepMutex; + ALIGN_TO_CACHE_LINE std::mutex workerSleepMutex; std::condition_variable workerSleepConditionVariable; ALIGN_TO_CACHE_LINE std::atomic<bool> atLeastOneWorkerIsSleeping; + bool skipSleep; static RuntimeStrategy& DEFAULT_STRATEGY; @@ -68,18 +76,41 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { protected: void addNewWorkerHook(const std::function<void(void)>& hook) { newWorkerHooks.push_back(hook); }; - inline void notifyAboutNewWork() { - if (!atLeastOneWorkerIsSleeping.load(std::memory_order_relaxed)) return; + template <WakeupMode wakeupMode = WakeupMode::IF_SLEEPING_OBSERVED> + inline void wakeupSleepingWorkers() { + if constexpr (wakeupMode == WakeupMode::IF_SLEEPING_OBSERVED) { + // If we observe no worker sleeping, then we do not try to + // attempt to wakeup one. Note that this is racy, i.e., just + // because we do not see any worker sleeping, does not mean that + // one (or all but us, as a matter of a fact) is about to + // sleep. However, in the worst case, this causes a worker to + // sleep longer, until this worker (or another active worker) + // schedules another fiber, when this method will be called + // again. And then it is likely that the scheduling worker will + // observe the sleeping worker. + if (!atLeastOneWorkerIsSleeping.load(std::memory_order_relaxed)) { + return; + } + } std::lock_guard<std::mutex> lk(workerSleepMutex); + skipSleep = true; + atLeastOneWorkerIsSleeping = false; workerSleepConditionVariable.notify_all(); } void dispatcherLoopSleep() { + if (skipSleep) { + skipSleep = false; + return; + } + std::unique_lock<std::mutex> lk(workerSleepMutex); - atLeastOneWorkerIsSleeping.store(true, std::memory_order_relaxed); + // Check again if "skip sleep" has been set. + if (skipSleep) return; + + atLeastOneWorkerIsSleeping = true; workerSleepConditionVariable.wait(lk); - atLeastOneWorkerIsSleeping.store(false, std::memory_order_relaxed); } public: @@ -94,7 +125,14 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { ~Runtime(); - inline void schedule(Fiber& fiber) { scheduler.schedule(fiber); } + inline void schedule(Fiber& fiber) { + // Calling schedule() only works from within the EMPER runtime. + assert(inRuntime()); + + scheduler.schedule(fiber); + } + + inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); } auto nextFiber() -> Fiber*; @@ -119,7 +157,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { void printStats(); - static auto inRuntime() -> bool; + static auto inRuntime() -> bool { return workerThread; } void executeAndWait(std::function<void()> f); diff --git a/emper/Scheduler.cpp b/emper/Scheduler.cpp index aa8c43e950c4a8e416af8cdc5bb402ea094f2d56..ab8289054837e674795ea625743f096486da79ad 100644 --- a/emper/Scheduler.cpp +++ b/emper/Scheduler.cpp @@ -10,4 +10,10 @@ void Scheduler::addNewWorkerHook(const std::function<void(void)>& hook) { runtime.addNewWorkerHook(hook); } -void Scheduler::notifyRuntimeAboutNewWork() { runtime.notifyAboutNewWork(); } +void Scheduler::wakeupSleepingWorkersIfSleepingObserved() { + runtime.wakeupSleepingWorkers<WakeupMode::IF_SLEEPING_OBSERVED>(); +} + +void Scheduler::wakeupSleepingWorkersAlways() { + runtime.wakeupSleepingWorkers<WakeupMode::ALWAYS>(); +} diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index e4cdcd3ba5cdd9e510ceb074b340068a85da1788..de4ff849689cdedb5d1a81146365f6c50fefbe89 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -2,15 +2,28 @@ // Copyright © 2020 Florian Schmaus #pragma once +#include <algorithm> // for copy, copy_backward #include <functional> // for function +#include <mutex> +#include <queue> -#include "Debug.hpp" // for LogSubsystem, LogSubsystem::SCHED, Logger +#include "CallerEnvironment.hpp" +#include "Debug.hpp" // for LogSubsystem, LogSubsystem::SCHED, Logger +#include "Emper.hpp" #include "Fiber.hpp" // for Fiber #include "emper-common.h" // for workeraffinity_t class Runtime; class Scheduler : public Logger<LogSubsystem::SCHED> { + private: + std::queue<Fiber*> scheduleAnywhereQueue; + std::mutex scheduleAnywhereQueueMutex; + + void wakeupSleepingWorkersIfSleepingObserved(); + + void wakeupSleepingWorkersAlways(); + protected: Runtime& runtime; Scheduler(Runtime& runtime); @@ -23,10 +36,37 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { static inline void increaseRefCount(Fiber& fiber) { fiber.doAtomicIncrRefCount(); } - void notifyRuntimeAboutNewWork(); + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + inline void onNewWork() { + if constexpr (emper::WORKER_SLEEP) { + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + wakeupSleepingWorkersIfSleepingObserved(); + } else { + wakeupSleepingWorkersAlways(); + } + } + } + + auto dequeFiberFromAnywhereQueue() -> Fiber* { + std::lock_guard<std::mutex> lock(scheduleAnywhereQueueMutex); + if (scheduleAnywhereQueue.empty()) return nullptr; + + Fiber* res = scheduleAnywhereQueue.front(); + scheduleAnywhereQueue.pop(); + return res; + } public: virtual void schedule(Fiber& fiber) = 0; virtual auto nextFiber() -> Fiber* = 0; + + void scheduleFromAnywhere(Fiber& fiber) { + { + std::lock_guard<std::mutex> lock(scheduleAnywhereQueueMutex); + scheduleAnywhereQueue.push(&fiber); + } + + onNewWork<CallerEnvironment::ANYWHERE>(); + } }; diff --git a/emper/Semaphore.hpp b/emper/Semaphore.hpp index 8753855df08538232196619eec98d917d0da42b1..be1163701373982cba892655fd233f89adf8e6fe 100644 --- a/emper/Semaphore.hpp +++ b/emper/Semaphore.hpp @@ -6,6 +6,7 @@ #include <queue> #include "BinaryPrivateSemaphore.hpp" +#include "CallerEnvironment.hpp" namespace emper { diff --git a/emper/UnboundedBlockingMpscQueue.hpp b/emper/UnboundedBlockingMpscQueue.hpp index 68acb8035d00cbcc8b669e54124daad56e3e0fda..f9841b30d18fb53a9f53833462c0c2c1cd96e86c 100644 --- a/emper/UnboundedBlockingMpscQueue.hpp +++ b/emper/UnboundedBlockingMpscQueue.hpp @@ -6,6 +6,7 @@ #include <queue> #include "Blockable.hpp" +#include "CallerEnvironment.hpp" #include "Context.hpp" template <typename T> diff --git a/emper/c_emper.cpp b/emper/c_emper.cpp index 79e4c730764ad633770b758e117214db401790e4..6822dee64352f2175c6d8be875b1b7399ace35cf 100644 --- a/emper/c_emper.cpp +++ b/emper/c_emper.cpp @@ -49,6 +49,12 @@ void schedule(fiber* fiber) { runtime->schedule(*f); } +void schedule_from_anywhere(fiber* fiber) { + Runtime* runtime = Runtime::getRuntime(); + auto* f = reinterpret_cast<Fiber*>(fiber); + runtime->scheduleFromAnywhere(*f); +} + auto new_binary_sem() -> bps* { BPS* sem = new BPS(); return reinterpret_cast<bps*>(sem); diff --git a/emper/include/emper.h b/emper/include/emper.h index 05a907c3bca564e610799aecca638707fb711572..e93cbf081531a2e29870a420822ab6474ec14d0a 100644 --- a/emper/include/emper.h +++ b/emper/include/emper.h @@ -37,6 +37,8 @@ void init_affinity(workeraffinity_t affinity[], unsigned int n); void schedule(fiber* fiber); +void schedule_from_anywhere(fiber* fiber); + // NOLINTNEXTLINE(modernize-use-trailing-return-type) bps* new_binary_sem(void); diff --git a/emper/strategies/laws/LawsDispatcher.cpp b/emper/strategies/laws/LawsDispatcher.cpp index 04bfbde1be06b7b5c9c19e54abd2fe7c812f1f10..2ece84a4461af5bb88a6e7fde22c63f21db99d44 100644 --- a/emper/strategies/laws/LawsDispatcher.cpp +++ b/emper/strategies/laws/LawsDispatcher.cpp @@ -35,8 +35,8 @@ void LawsDispatcher::dispatchLoop() { case LawsStrategy::FiberSource::stolen: lawsStrategy.dispatchedFiberStolen.fetch_add(1, std::memory_order_relaxed); break; - case LawsStrategy::FiberSource::mainThread: - lawsStrategy.dispatchedFiberFromMainThread.fetch_add(1, std::memory_order_relaxed); + case LawsStrategy::FiberSource::anywhereQueue: + lawsStrategy.dispatchedFiberFromAnywhere.fetch_add(1, std::memory_order_relaxed); break; default: DIE_MSG("Unknown fiber flag: " << flag); diff --git a/emper/strategies/laws/LawsScheduler.cpp b/emper/strategies/laws/LawsScheduler.cpp index 2c867f1893d098c54277030868883a962c44a88d..6473407ada3a5678e271f414d834ba53a88237fc 100644 --- a/emper/strategies/laws/LawsScheduler.cpp +++ b/emper/strategies/laws/LawsScheduler.cpp @@ -68,9 +68,7 @@ scheduleToLocalWsQueue: } } - if constexpr (emper::WORKER_SLEEP) { - notifyRuntimeAboutNewWork(); - } + onNewWork(); } auto LawsScheduler::nextFiber() -> Fiber* { @@ -114,16 +112,13 @@ auto LawsScheduler::nextFiber() -> Fiber* { // TODO: Reduce pressure on mainThreadQueue by only checking every N-th time. - // Try the main thread queue to get work as last resort. - poped = mainThreadQueue->popTop(&fiber); - if (poped) { + // Try the "scheduled from anywhere" queue to get work as last resort. + fiber = dequeFiberFromAnywhereQueue(); + if (fiber) { if constexpr (emper::STATS) { - auto flag = static_cast<unsigned int>(LawsStrategy::FiberSource::mainThread); + auto flag = static_cast<unsigned int>(LawsStrategy::FiberSource::anywhereQueue); fiber->setFlag(flag); } - - return fiber; } - - return nullptr; + return fiber; } diff --git a/emper/strategies/laws/LawsStrategy.hpp b/emper/strategies/laws/LawsStrategy.hpp index 3ec3e14e6c0fe8e0ffa17ee1b5f9cffc0b30d8ab..7d56702b26ab26212e2e49e12666969fbcac40d2 100644 --- a/emper/strategies/laws/LawsStrategy.hpp +++ b/emper/strategies/laws/LawsStrategy.hpp @@ -22,7 +22,7 @@ class LawsStrategy : public RuntimeStrategy { fromPriority, fromLocal, stolen, - mainThread, + anywhereQueue, }; std::atomic<std::uint64_t> scheduledFibersToRemotePriority; @@ -30,7 +30,7 @@ class LawsStrategy : public RuntimeStrategy { std::atomic<std::uint64_t> dispatchedFiberFromPriority; std::atomic<std::uint64_t> dispatchedFiberFromLocal; std::atomic<std::uint64_t> dispatchedFiberStolen; - std::atomic<std::uint64_t> dispatchedFiberFromMainThread; + std::atomic<std::uint64_t> dispatchedFiberFromAnywhere; LawsStrategy() : scheduledFibersToRemotePriority(0), @@ -38,7 +38,7 @@ class LawsStrategy : public RuntimeStrategy { dispatchedFiberFromPriority(0), dispatchedFiberFromLocal(0), dispatchedFiberStolen(0), - dispatchedFiberFromMainThread(0) {} + dispatchedFiberFromAnywhere(0) {} auto getScheduler(Runtime& runtime) -> Scheduler& override; diff --git a/emper/strategies/laws/LawsStrategyStats.cpp b/emper/strategies/laws/LawsStrategyStats.cpp index 7e0aac9ac70b1a68342f26b12fd6f1f4cacd3343..c1acec276d015b3f79d2b0039519cfa734f2854c 100644 --- a/emper/strategies/laws/LawsStrategyStats.cpp +++ b/emper/strategies/laws/LawsStrategyStats.cpp @@ -13,7 +13,7 @@ LawsStrategyStats::LawsStrategyStats(LawsStrategy& lawsStrategy) dispatchedFiberFromPriority(lawsStrategy.dispatchedFiberFromPriority), dispatchedFiberFromLocal(lawsStrategy.dispatchedFiberFromLocal), dispatchedFiberStolen(lawsStrategy.dispatchedFiberStolen), - dispatchedFiberFromMainThread(lawsStrategy.dispatchedFiberFromMainThread) {} + dispatchedFiberFromAnywhere(lawsStrategy.dispatchedFiberFromAnywhere) {} auto LawsStrategyStats::getScheduledFibersToRemotePriority() const -> uint64_t { return scheduledFibersToRemotePriority; @@ -35,8 +35,8 @@ auto LawsStrategyStats::getDispatchedFiberStolen() const -> uint64_t { return dispatchedFiberStolen; } -auto LawsStrategyStats::getDispatchedFiberFromMainThread() const -> uint64_t { - return dispatchedFiberFromMainThread; +auto LawsStrategyStats::getDispatchedFiberFromAnywhere() const -> uint64_t { + return dispatchedFiberFromAnywhere; } void LawsStrategyStats::print() { @@ -46,5 +46,5 @@ void LawsStrategyStats::print() { << " dispatchedFiberFromPriority:" << dispatchedFiberFromPriority << " dispatchedFiberFromLocal:" << dispatchedFiberFromLocal << " dispatchedFiberStolen:" << dispatchedFiberStolen - << " dispatchedFiberFromMainThread:" << dispatchedFiberFromMainThread << std::endl; + << " dispatchedFiberFromAnywhere:" << dispatchedFiberFromAnywhere << std::endl; } diff --git a/emper/strategies/laws/LawsStrategyStats.hpp b/emper/strategies/laws/LawsStrategyStats.hpp index ca0ae2c268659d5b80478615a6f30dc06052b396..48de230b384b970a9ff904c7cfa8a84204b08020 100644 --- a/emper/strategies/laws/LawsStrategyStats.hpp +++ b/emper/strategies/laws/LawsStrategyStats.hpp @@ -15,7 +15,7 @@ class LawsStrategyStats : public RuntimeStrategyStats { const uint64_t dispatchedFiberFromPriority; const uint64_t dispatchedFiberFromLocal; const uint64_t dispatchedFiberStolen; - const uint64_t dispatchedFiberFromMainThread; + const uint64_t dispatchedFiberFromAnywhere; public: LawsStrategyStats(LawsStrategy& lawsStrategy); @@ -25,7 +25,7 @@ class LawsStrategyStats : public RuntimeStrategyStats { [[nodiscard]] auto getDispatchedFiberFromPriority() const -> uint64_t; [[nodiscard]] auto getDispatchedFiberFromLocal() const -> uint64_t; [[nodiscard]] auto getDispatchedFiberStolen() const -> uint64_t; - [[nodiscard]] auto getDispatchedFiberFromMainThread() const -> uint64_t; + [[nodiscard]] auto getDispatchedFiberFromAnywhere() const -> uint64_t; void print() override; }; diff --git a/emper/strategies/ws/WsScheduler.cpp b/emper/strategies/ws/WsScheduler.cpp index 6b081a264be5d768b49cf92ed3623b22a73e1c22..38636e34f04c111f3382152970ae03e68d1a6f38 100644 --- a/emper/strategies/ws/WsScheduler.cpp +++ b/emper/strategies/ws/WsScheduler.cpp @@ -19,7 +19,6 @@ WsScheduler::WsScheduler(Runtime& runtime, WsStrategy& wsStrategy) : Scheduler(runtime), wsStrategy(wsStrategy) { const workerid_t workerCount = runtime.getWorkerCount(); queues = new WsScheduler::WsQueue<QUEUE_SIZE>*[workerCount]; - mainThreadQueue = &queue; auto newWorkerHook = [this]() { queues[Runtime::getWorkerId()] = &queue; }; addNewWorkerHook(newWorkerHook); @@ -41,9 +40,7 @@ void WsScheduler::schedule(Fiber& fiber) { wsStrategy.scheduledFibers.fetch_add(1, std::memory_order_relaxed); } - if constexpr (emper::WORKER_SLEEP) { - notifyRuntimeAboutNewWork(); - } + onNewWork(); } auto WsScheduler::nextFiber() -> Fiber* { @@ -76,9 +73,7 @@ auto WsScheduler::nextFiber() -> Fiber* { } } - // Try the main thread queue to get work as last resort. - poped = mainThreadQueue->popTop(&fiber); - if (poped) return fiber; - - return nullptr; + // Try the "scheduled from anywhere" queue to get work as last resort. + fiber = dequeFiberFromAnywhereQueue(); + return fiber; } diff --git a/emper/strategies/ws/WsScheduler.hpp b/emper/strategies/ws/WsScheduler.hpp index da1db2bb9a1378744d4f317216bddc8dc6f0f2b8..569a9034430aaebba106583e5f4349f014169cb3 100644 --- a/emper/strategies/ws/WsScheduler.hpp +++ b/emper/strategies/ws/WsScheduler.hpp @@ -27,8 +27,6 @@ class WsScheduler : public Scheduler { WsQueue<QUEUE_SIZE>** queues; static thread_local WsQueue<QUEUE_SIZE> queue; - WsQueue<QUEUE_SIZE>* mainThreadQueue; - #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wattributes" WsStrategy& wsStrategy diff --git a/eval/TimeToSpawn.cpp b/eval/TimeToSpawn.cpp index 36e4d7d7edc4e26f667b55c6edc3438fb7641c5f..06102bfa68fb3a245866e8fa9ce9a70d3323cedf 100644 --- a/eval/TimeToSpawn.cpp +++ b/eval/TimeToSpawn.cpp @@ -41,7 +41,7 @@ static auto fiberTimeToSpawn() -> std::chrono::nanoseconds { res = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start); testFinished.unlock(); }); - runtime.schedule(*fiber); + runtime.scheduleFromAnywhere(*fiber); testFinished.lock(); diff --git a/tests/AlarmActorTest.cpp b/tests/AlarmActorTest.cpp index bad812f0402d8b19a2e104fbe37cd5b351563b46..580c45d65c65fedbde7f806facef3548c4acfafd 100644 --- a/tests/AlarmActorTest.cpp +++ b/tests/AlarmActorTest.cpp @@ -40,10 +40,10 @@ auto main(int argc, char* argv[]) -> int { Runtime runtime; - AlarmActor alarmActor(runtime); - alarmActor.start(); - Fiber* fiber = Fiber::from([&] { + AlarmActor alarmActor(runtime); + alarmActor.start(); + CPS cps; for (unsigned int i = 0; i < sleeper_count; ++i) { spawn( @@ -72,7 +72,7 @@ auto main(int argc, char* argv[]) -> int { exit(EXIT_SUCCESS); }); - runtime.schedule(*fiber); + runtime.scheduleFromAnywhere(*fiber); runtime.waitUntilFinished(); diff --git a/tests/CppApiTest.cpp b/tests/CppApiTest.cpp index b61ee9e0035eed9f719b4ec624e87c22b8ffdc18..cbdd8e1f03fb51bfcab2c4eb6d9d0b85bd12b9e9 100644 --- a/tests/CppApiTest.cpp +++ b/tests/CppApiTest.cpp @@ -4,9 +4,10 @@ #include <cstdlib> // for exit, EXIT_FAILURE, EXIT_SUC... #include "CountingPrivateSemaphore.hpp" // for CountingPrivateSemaphore -#include "Runtime.hpp" // for Runtime -#include "emper-common.h" // for UNUSED_ARG -#include "emper.hpp" // for async, spawn +#include "Fiber.hpp" +#include "Runtime.hpp" // for Runtime +#include "emper-common.h" // for UNUSED_ARG +#include "emper.hpp" // for async, spawn static std::atomic_uint counter; @@ -30,10 +31,11 @@ static void mainFiber() { exit(EXIT_SUCCESS); } -auto main(UNUSED_ARG int arg, UNUSED_ARG char *argv[]) -> int { +auto main(UNUSED_ARG int arg, UNUSED_ARG char* argv[]) -> int { Runtime runtime; - async(&mainFiber); + Fiber* alphaFiber = Fiber::from([] { async(&mainFiber); }); + runtime.scheduleFromAnywhere(*alphaFiber); runtime.waitUntilFinished(); diff --git a/tests/SimpleActorTest.cpp b/tests/SimpleActorTest.cpp index 5fe140a5e85da8170a6c7c824a4f1d44d4fffaee..fbd398a031a261947eef208e2334b0f8ca0a84d0 100644 --- a/tests/SimpleActorTest.cpp +++ b/tests/SimpleActorTest.cpp @@ -80,7 +80,7 @@ auto main(UNUSED_ARG int arg, UNUSED_ARG char* argv[]) -> int { Runtime runtime; Fiber* fiber = Fiber::from(mainFiber, (void*)&runtime); - runtime.schedule(*fiber); + runtime.scheduleFromAnywhere(*fiber); runtime.waitUntilFinished(); diff --git a/tests/SimpleFibTest.cpp b/tests/SimpleFibTest.cpp index aace0206fb37e4c5b9545c49e796fbaf0779ab44..79bd1f725a7dcc0b2cf209b4ae341a977e71d886 100644 --- a/tests/SimpleFibTest.cpp +++ b/tests/SimpleFibTest.cpp @@ -74,7 +74,7 @@ auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { }, nullptr); - runtime.schedule(*fibFiber); + runtime.scheduleFromAnywhere(*fibFiber); runtime.waitUntilFinished(); diff --git a/tests/SimpleLawsTest.cpp b/tests/SimpleLawsTest.cpp index c4fb8177aedee9c3581d1b95b138c18ddcd13cdb..c35cd856bba70af99592b7d7cada5bb79a8bda8f 100644 --- a/tests/SimpleLawsTest.cpp +++ b/tests/SimpleLawsTest.cpp @@ -108,7 +108,7 @@ auto main(UNUSED_ARG int args, UNUSED_ARG char* argv[]) -> int { Fiber* alphaFiber = Fiber::from(&alphaFun); - runtime.schedule(*alphaFiber); + runtime.scheduleFromAnywhere(*alphaFiber); runtime.waitUntilFinished(); diff --git a/tests/SimpleUrcuTest.cpp b/tests/SimpleUrcuTest.cpp index 75c552b950ec2f866a2287aeebd01500cbe74b56..39ab24a5bd690753bb374dbcb9feba80861c113e 100644 --- a/tests/SimpleUrcuTest.cpp +++ b/tests/SimpleUrcuTest.cpp @@ -86,7 +86,7 @@ auto main() -> int { exit(EXIT_SUCCESS); }); - runtime.schedule(*verifier); + runtime.scheduleFromAnywhere(*verifier); runtime.waitUntilFinished(); return EXIT_FAILURE; diff --git a/tests/SimplestFibTest.cpp b/tests/SimplestFibTest.cpp index 6ff8afde24566564788586f753f98afd14969c10..bd0700a3700f7e62c80749b75df31530b3958009 100644 --- a/tests/SimplestFibTest.cpp +++ b/tests/SimplestFibTest.cpp @@ -89,7 +89,7 @@ auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { std::cout << "Just alloacted alpha fiber at " << fibFiber << std::endl; - runtime.schedule(*fibFiber); + runtime.scheduleFromAnywhere(*fibFiber); runtime.waitUntilFinished(); diff --git a/tests/c_api_test.c b/tests/c_api_test.c index a9ef015d60f693cd4c4d1bdcf1b830f8b716fb83..eb8eee42a3a937bace40ba5e762adefca2618b9c 100644 --- a/tests/c_api_test.c +++ b/tests/c_api_test.c @@ -46,7 +46,7 @@ int main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) { init_runtime(); fiber* alpha_fiber = fiber_from0(alpha_fun); - schedule(alpha_fiber); + schedule_from_anywhere(alpha_fiber); wait_until_runtime_finished();