diff --git a/.gitignore b/.gitignore index 317dc78f66e0832f043adb49c539e5467c5a8ae8..85ff3c442290befc992ebfb9b5ff865f98ed4056 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ /build /build-*/ + +/.cache/ diff --git a/apps/Main.cpp b/apps/Main.cpp index d74f1ea740e163a2044a9cae18a2d4b4d186f027..0ab7d43739b7c3e07591ede377cd2e4eb73461a4 100644 --- a/apps/Main.cpp +++ b/apps/Main.cpp @@ -82,7 +82,7 @@ auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { std::cout << "Just alloacted alpha fiber at " << fibFiber << std::endl; - runtime.schedule(*fibFiber); + runtime.scheduleFromAnywhere(*fibFiber); runtime.waitUntilFinished(); diff --git a/apps/WorkerSleepExample.cpp b/apps/WorkerSleepExample.cpp index f3426d91e859fc88090c99749a75d7df673762c6..0eef863e3bbf3823a5e18e19c65e2082ff2c66d5 100644 --- a/apps/WorkerSleepExample.cpp +++ b/apps/WorkerSleepExample.cpp @@ -59,7 +59,7 @@ auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { std::cout << "Just alloacted alpha fiber at " << fibFiber << std::endl; - runtime.schedule(*fibFiber); + runtime.scheduleFromAnywhere(*fibFiber); runtime.waitUntilFinished(); diff --git a/apps/meson.build b/apps/meson.build index 111457173d8872f60d8c6850dcf292f258f2538e..7ed455b24328a4572a91e1e17c71e68f425191f8 100644 --- a/apps/meson.build +++ b/apps/meson.build @@ -1,13 +1,11 @@ fib_exe = executable( 'fib', 'Main.cpp', - include_directories: emper_all_include, - link_with: [emper], + dependencies: emper_dep, ) worker_sleep_example_exe = executable( 'worker_sleep_example', 'WorkerSleepExample.cpp', - include_directories: emper_all_include, - link_with: [emper], + dependencies: emper_dep, ) diff --git a/emper/Actor.hpp b/emper/Actor.hpp index a436899c7269ad3af78e6f6562bea326252c9855..de36e9ac14329ac3cf4b9d0f3b495be6ce4a64b9 100644 --- a/emper/Actor.hpp +++ b/emper/Actor.hpp @@ -48,7 +48,7 @@ class Actor { } protected: - Actor(Runtime& runtime) : runtime(runtime), queue(runtime) {} + Actor(Runtime& runtime = *Runtime::getRuntime()) : runtime(runtime), queue(runtime) {} virtual void receive(T t) = 0; @@ -64,6 +64,8 @@ class Actor { void tell(T t) { queue.put(t); } + void tellFromAnywhere(T t) { queue.putFromAnywhere(t); } + auto pendingMailboxItems() -> size_t { return queue.size(); } auto waitUntilIdle(long timeout) -> bool { diff --git a/emper/Blockable.hpp b/emper/Blockable.hpp index 3bab4824f33aff238616a30eec0f26b025e76af9..04cbb493ab0cc112ecbc4cd40b6ec06c3fbf7fcb 100644 --- a/emper/Blockable.hpp +++ b/emper/Blockable.hpp @@ -4,6 +4,7 @@ #include <utility> +#include "CallerEnvironment.hpp" #include "Common.hpp" #include "Context.hpp" #include "ContextManager.hpp" @@ -24,15 +25,24 @@ class Blockable : public Logger<logSubsystem> { // this error if std::move() is used (as we do). // NOLINTNEXTLINE(performance-unnecessary-value-param) void block(func_t freshContextHook) { + // Only contexts managed by EMPER's runtime can block. + assert(Runtime::inRuntime()); + LOGD("block() blockedContext is " << Context::getCurrentContext()); contextManager.saveAndStartNew(std::move(freshContextHook)); } + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void unblock(Context* context) { assert(context != nullptr); // cppcheck-suppress unsafeClassCanLeak Fiber* unblockFiber = Fiber::from([this, context]() { contextManager.discardAndResume(context); }); - runtime.schedule(*unblockFiber); + + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + runtime.schedule(*unblockFiber); + } else { + runtime.scheduleFromAnywhere(*unblockFiber); + } } }; diff --git a/emper/CallerEnvironment.hpp b/emper/CallerEnvironment.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b06fb4198e0a97a1f46fecc54e21e4e8a48d89d3 --- /dev/null +++ b/emper/CallerEnvironment.hpp @@ -0,0 +1,8 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#pragma once + +enum CallerEnvironment { + EMPER, + ANYWHERE, +}; diff --git a/emper/PrivateSemaphore.hpp b/emper/PrivateSemaphore.hpp index 5fa679db7f0904ccec6226b5077da27a09605098..a72347d65958ab9f328301ce7cc98fd4e08d7447 100644 --- a/emper/PrivateSemaphore.hpp +++ b/emper/PrivateSemaphore.hpp @@ -3,6 +3,7 @@ #pragma once #include "Blockable.hpp" +#include "CallerEnvironment.hpp" #include "Context.hpp" #include "ContextManager.hpp" #include "Debug.hpp" @@ -26,15 +27,18 @@ class PrivateSemaphore : protected Blockable<LogSubsystem::PS> { public: virtual void wait() = 0; + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void signal() { if (Context* readyContext = signalInternal()) { - unblock(readyContext); + unblock<callerEnvironment>(readyContext); } // Assure that nothing happens after here with the sync // primitive, as the data of the sync primitive may became // invalid at this point. } + void signalFromAnywhere() { signal<CallerEnvironment::ANYWHERE>(); } + void signalAndExit() { if (Context* readyContext = signalInternal()) { unblockAndExit(readyContext); diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index a12541352b966c1af66748f7bab2eb5aaeb390cd..5858198bf9297ecd7858bca0da579b6bed0ecdc3 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -17,7 +17,6 @@ #include "Common.hpp" // for DIE_MSG_ERRNO, DIE, DIE_MSG #include "ContextManager.hpp" // for ContextManager #include "Debug.hpp" // for DBG, ABORT, LOGD, LOGE -#include "Dispatcher.hpp" // for Dispatcher #include "Emper.hpp" #include "Fiber.hpp" // for Fiber #include "RuntimeStrategy.hpp" // for RuntimeStrategy @@ -39,6 +38,7 @@ std::mutex Runtime::currentRuntimeMutex; Runtime* Runtime::currentRuntime; +thread_local bool Runtime::workerThread = false; thread_local unsigned int Runtime::seed; thread_local workerid_t Runtime::workerId; RuntimeStrategy& Runtime::DEFAULT_STRATEGY = WsStrategy::INSTANCE; @@ -51,7 +51,8 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy, unsigned int dispatcher(strategy.getDispatcher(*this)), contextManager(*(new ContextManager(*this))), randomEngine(seed), - atLeastOneWorkerIsSleeping(false) { + atLeastOneWorkerIsSleeping(false), + skipSleep(false) { threads = new pthread_t[workerCount]; workerArgs = new struct WorkerArgs[workerCount]; @@ -130,6 +131,7 @@ Runtime::~Runtime() { } auto Runtime::workerLoop(struct WorkerArgs* workerArgs) -> void* { + workerThread = true; workerId = workerArgs->id; seed = workerArgs->seed; @@ -174,8 +176,6 @@ void Runtime::printLastRuntimeStats() { currentRuntime->printStats(); } -auto Runtime::inRuntime() -> bool { return Dispatcher::isDispatchedControlFlow(); } - void Runtime::executeAndWait(std::function<void()> f) { if (inRuntime()) { ABORT("Ca not use executeAndWait() from within the Runtime"); @@ -190,7 +190,7 @@ void Runtime::executeAndWait(std::function<void()> f) { fiberFinished.unlock(); }); - schedule(*fiber); + scheduleFromAnywhere(*fiber); fiberFinished.lock(); } diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 95c35b8c11c76d713195d13b6cc37c487ed92c05..cbb9e02b20107a94562bb4ae2870df9ab3496437 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -5,6 +5,7 @@ #include <pthread.h> // for pthread_t #include <atomic> // for atomic, memory_order_relaxed +#include <cassert> // for assert #include <condition_variable> // for condition_variable #include <cstddef> // for size_t #include <cstdint> // for intptr_t @@ -25,11 +26,17 @@ class Dispatcher; class Fiber; class RuntimeStrategy; +enum WakeupMode { + IF_SLEEPING_OBSERVED, + ALWAYS, +}; + class Runtime : public Logger<LogSubsystem::RUNTI> { private: static std::mutex currentRuntimeMutex; static Runtime* currentRuntime; + static thread_local bool workerThread; static thread_local unsigned int seed; static thread_local workerid_t workerId; @@ -57,9 +64,10 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { auto workerLoop(struct WorkerArgs* workerArgs) -> void*; - std::mutex workerSleepMutex; + ALIGN_TO_CACHE_LINE std::mutex workerSleepMutex; std::condition_variable workerSleepConditionVariable; ALIGN_TO_CACHE_LINE std::atomic<bool> atLeastOneWorkerIsSleeping; + bool skipSleep; static RuntimeStrategy& DEFAULT_STRATEGY; @@ -68,18 +76,41 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { protected: void addNewWorkerHook(const std::function<void(void)>& hook) { newWorkerHooks.push_back(hook); }; - inline void notifyAboutNewWork() { - if (!atLeastOneWorkerIsSleeping.load(std::memory_order_relaxed)) return; + template <WakeupMode wakeupMode = WakeupMode::IF_SLEEPING_OBSERVED> + inline void wakeupSleepingWorkers() { + if constexpr (wakeupMode == WakeupMode::IF_SLEEPING_OBSERVED) { + // If we observe no worker sleeping, then we do not try to + // attempt to wakeup one. Note that this is racy, i.e., just + // because we do not see any worker sleeping, does not mean that + // one (or all but us, as a matter of a fact) is about to + // sleep. However, in the worst case, this causes a worker to + // sleep longer, until this worker (or another active worker) + // schedules another fiber, when this method will be called + // again. And then it is likely that the scheduling worker will + // observe the sleeping worker. + if (!atLeastOneWorkerIsSleeping.load(std::memory_order_relaxed)) { + return; + } + } std::lock_guard<std::mutex> lk(workerSleepMutex); + skipSleep = true; + atLeastOneWorkerIsSleeping = false; workerSleepConditionVariable.notify_all(); } void dispatcherLoopSleep() { + if (skipSleep) { + skipSleep = false; + return; + } + std::unique_lock<std::mutex> lk(workerSleepMutex); - atLeastOneWorkerIsSleeping.store(true, std::memory_order_relaxed); + // Check again if "skip sleep" has been set. + if (skipSleep) return; + + atLeastOneWorkerIsSleeping = true; workerSleepConditionVariable.wait(lk); - atLeastOneWorkerIsSleeping.store(false, std::memory_order_relaxed); } public: @@ -94,7 +125,14 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { ~Runtime(); - inline void schedule(Fiber& fiber) { scheduler.schedule(fiber); } + inline void schedule(Fiber& fiber) { + // Calling schedule() only works from within the EMPER runtime. + assert(inRuntime()); + + scheduler.schedule(fiber); + } + + inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); } auto nextFiber() -> Fiber*; @@ -119,7 +157,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { void printStats(); - static auto inRuntime() -> bool; + static auto inRuntime() -> bool { return workerThread; } void executeAndWait(std::function<void()> f); diff --git a/emper/Scheduler.cpp b/emper/Scheduler.cpp index aa8c43e950c4a8e416af8cdc5bb402ea094f2d56..ab8289054837e674795ea625743f096486da79ad 100644 --- a/emper/Scheduler.cpp +++ b/emper/Scheduler.cpp @@ -10,4 +10,10 @@ void Scheduler::addNewWorkerHook(const std::function<void(void)>& hook) { runtime.addNewWorkerHook(hook); } -void Scheduler::notifyRuntimeAboutNewWork() { runtime.notifyAboutNewWork(); } +void Scheduler::wakeupSleepingWorkersIfSleepingObserved() { + runtime.wakeupSleepingWorkers<WakeupMode::IF_SLEEPING_OBSERVED>(); +} + +void Scheduler::wakeupSleepingWorkersAlways() { + runtime.wakeupSleepingWorkers<WakeupMode::ALWAYS>(); +} diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index e4cdcd3ba5cdd9e510ceb074b340068a85da1788..de4ff849689cdedb5d1a81146365f6c50fefbe89 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -2,15 +2,28 @@ // Copyright © 2020 Florian Schmaus #pragma once +#include <algorithm> // for copy, copy_backward #include <functional> // for function +#include <mutex> +#include <queue> -#include "Debug.hpp" // for LogSubsystem, LogSubsystem::SCHED, Logger +#include "CallerEnvironment.hpp" +#include "Debug.hpp" // for LogSubsystem, LogSubsystem::SCHED, Logger +#include "Emper.hpp" #include "Fiber.hpp" // for Fiber #include "emper-common.h" // for workeraffinity_t class Runtime; class Scheduler : public Logger<LogSubsystem::SCHED> { + private: + std::queue<Fiber*> scheduleAnywhereQueue; + std::mutex scheduleAnywhereQueueMutex; + + void wakeupSleepingWorkersIfSleepingObserved(); + + void wakeupSleepingWorkersAlways(); + protected: Runtime& runtime; Scheduler(Runtime& runtime); @@ -23,10 +36,37 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { static inline void increaseRefCount(Fiber& fiber) { fiber.doAtomicIncrRefCount(); } - void notifyRuntimeAboutNewWork(); + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + inline void onNewWork() { + if constexpr (emper::WORKER_SLEEP) { + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + wakeupSleepingWorkersIfSleepingObserved(); + } else { + wakeupSleepingWorkersAlways(); + } + } + } + + auto dequeFiberFromAnywhereQueue() -> Fiber* { + std::lock_guard<std::mutex> lock(scheduleAnywhereQueueMutex); + if (scheduleAnywhereQueue.empty()) return nullptr; + + Fiber* res = scheduleAnywhereQueue.front(); + scheduleAnywhereQueue.pop(); + return res; + } public: virtual void schedule(Fiber& fiber) = 0; virtual auto nextFiber() -> Fiber* = 0; + + void scheduleFromAnywhere(Fiber& fiber) { + { + std::lock_guard<std::mutex> lock(scheduleAnywhereQueueMutex); + scheduleAnywhereQueue.push(&fiber); + } + + onNewWork<CallerEnvironment::ANYWHERE>(); + } }; diff --git a/emper/Semaphore.hpp b/emper/Semaphore.hpp index 8753855df08538232196619eec98d917d0da42b1..0ec2340e90ea931b2fdf15b6bf705c4b5d93b429 100644 --- a/emper/Semaphore.hpp +++ b/emper/Semaphore.hpp @@ -6,6 +6,7 @@ #include <queue> #include "BinaryPrivateSemaphore.hpp" +#include "CallerEnvironment.hpp" namespace emper { @@ -33,6 +34,7 @@ class Semaphore { return blocked; } + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> auto release() -> bool { mutex.lock(); bool waiterListEmpty = waiterList.empty(); @@ -43,11 +45,13 @@ class Semaphore { BinaryPrivateSemaphore* semaphore = waiterList.front(); waiterList.pop(); mutex.unlock(); - semaphore->signal(); + semaphore->signal<callerEnvironment>(); } return waiterListEmpty; } + auto releaseFromAnywhere() -> bool { return release<CallerEnvironment::ANYWHERE>(); } + void print(); }; diff --git a/emper/UnboundedBlockingMpscQueue.hpp b/emper/UnboundedBlockingMpscQueue.hpp index 68acb8035d00cbcc8b669e54124daad56e3e0fda..6cdff408bd41ae1a24b41e6c840f8c3b6f18e686 100644 --- a/emper/UnboundedBlockingMpscQueue.hpp +++ b/emper/UnboundedBlockingMpscQueue.hpp @@ -6,6 +6,7 @@ #include <queue> #include "Blockable.hpp" +#include "CallerEnvironment.hpp" #include "Context.hpp" template <typename T> @@ -19,11 +20,12 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { std::queue<T> mpscQueue; std::mutex queueMutex; + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void tryToWakeupBlockedContext() { Context* context = blockedContext.exchange(nullptr); - if (context) { - unblock(context); - } + if (!context) return; + + unblock<callerEnvironment>(context); } void tryToGetElement(const std::function<void(void)>& postRetrieve) { @@ -43,6 +45,7 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { public: UnboundedBlockingMpscQueue(Runtime& runtime) : Blockable(runtime) {} + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void put(T t) { { std::lock_guard<std::mutex> lock(queueMutex); @@ -52,10 +55,12 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { // Micro optimization, see if there is a blocked context // before performing the atomic exchange operation. if (blockedContext.load() != nullptr) { - tryToWakeupBlockedContext(); + tryToWakeupBlockedContext<callerEnvironment>(); } } + void putFromAnywhere(T t) { put<CallerEnvironment::ANYWHERE>(t); } + auto get(const std::function<void(void)>& postRetrieve) -> T { tPopped = false; tryToGetElement(postRetrieve); diff --git a/emper/c_emper.cpp b/emper/c_emper.cpp index 79e4c730764ad633770b758e117214db401790e4..6822dee64352f2175c6d8be875b1b7399ace35cf 100644 --- a/emper/c_emper.cpp +++ b/emper/c_emper.cpp @@ -49,6 +49,12 @@ void schedule(fiber* fiber) { runtime->schedule(*f); } +void schedule_from_anywhere(fiber* fiber) { + Runtime* runtime = Runtime::getRuntime(); + auto* f = reinterpret_cast<Fiber*>(fiber); + runtime->scheduleFromAnywhere(*f); +} + auto new_binary_sem() -> bps* { BPS* sem = new BPS(); return reinterpret_cast<bps*>(sem); diff --git a/emper/include/emper.h b/emper/include/emper.h index 05a907c3bca564e610799aecca638707fb711572..e93cbf081531a2e29870a420822ab6474ec14d0a 100644 --- a/emper/include/emper.h +++ b/emper/include/emper.h @@ -37,6 +37,8 @@ void init_affinity(workeraffinity_t affinity[], unsigned int n); void schedule(fiber* fiber); +void schedule_from_anywhere(fiber* fiber); + // NOLINTNEXTLINE(modernize-use-trailing-return-type) bps* new_binary_sem(void); diff --git a/emper/meson.build b/emper/meson.build index f73891a7543d0266bdeea336542a7a009957264d..a175f3b775a87d7375ab61d560b8eaf661bb8904 100644 --- a/emper/meson.build +++ b/emper/meson.build @@ -71,3 +71,8 @@ emper_c_dep = declare_dependency( include_directories : emper_all_include, link_with : emper_c ) + +emper_full_dep = [ + emper_dep, + emper_c_dep, +] diff --git a/emper/strategies/laws/LawsDispatcher.cpp b/emper/strategies/laws/LawsDispatcher.cpp index 04bfbde1be06b7b5c9c19e54abd2fe7c812f1f10..2ece84a4461af5bb88a6e7fde22c63f21db99d44 100644 --- a/emper/strategies/laws/LawsDispatcher.cpp +++ b/emper/strategies/laws/LawsDispatcher.cpp @@ -35,8 +35,8 @@ void LawsDispatcher::dispatchLoop() { case LawsStrategy::FiberSource::stolen: lawsStrategy.dispatchedFiberStolen.fetch_add(1, std::memory_order_relaxed); break; - case LawsStrategy::FiberSource::mainThread: - lawsStrategy.dispatchedFiberFromMainThread.fetch_add(1, std::memory_order_relaxed); + case LawsStrategy::FiberSource::anywhereQueue: + lawsStrategy.dispatchedFiberFromAnywhere.fetch_add(1, std::memory_order_relaxed); break; default: DIE_MSG("Unknown fiber flag: " << flag); diff --git a/emper/strategies/laws/LawsScheduler.cpp b/emper/strategies/laws/LawsScheduler.cpp index 2c867f1893d098c54277030868883a962c44a88d..6473407ada3a5678e271f414d834ba53a88237fc 100644 --- a/emper/strategies/laws/LawsScheduler.cpp +++ b/emper/strategies/laws/LawsScheduler.cpp @@ -68,9 +68,7 @@ scheduleToLocalWsQueue: } } - if constexpr (emper::WORKER_SLEEP) { - notifyRuntimeAboutNewWork(); - } + onNewWork(); } auto LawsScheduler::nextFiber() -> Fiber* { @@ -114,16 +112,13 @@ auto LawsScheduler::nextFiber() -> Fiber* { // TODO: Reduce pressure on mainThreadQueue by only checking every N-th time. - // Try the main thread queue to get work as last resort. - poped = mainThreadQueue->popTop(&fiber); - if (poped) { + // Try the "scheduled from anywhere" queue to get work as last resort. + fiber = dequeFiberFromAnywhereQueue(); + if (fiber) { if constexpr (emper::STATS) { - auto flag = static_cast<unsigned int>(LawsStrategy::FiberSource::mainThread); + auto flag = static_cast<unsigned int>(LawsStrategy::FiberSource::anywhereQueue); fiber->setFlag(flag); } - - return fiber; } - - return nullptr; + return fiber; } diff --git a/emper/strategies/laws/LawsStrategy.hpp b/emper/strategies/laws/LawsStrategy.hpp index 3ec3e14e6c0fe8e0ffa17ee1b5f9cffc0b30d8ab..7d56702b26ab26212e2e49e12666969fbcac40d2 100644 --- a/emper/strategies/laws/LawsStrategy.hpp +++ b/emper/strategies/laws/LawsStrategy.hpp @@ -22,7 +22,7 @@ class LawsStrategy : public RuntimeStrategy { fromPriority, fromLocal, stolen, - mainThread, + anywhereQueue, }; std::atomic<std::uint64_t> scheduledFibersToRemotePriority; @@ -30,7 +30,7 @@ class LawsStrategy : public RuntimeStrategy { std::atomic<std::uint64_t> dispatchedFiberFromPriority; std::atomic<std::uint64_t> dispatchedFiberFromLocal; std::atomic<std::uint64_t> dispatchedFiberStolen; - std::atomic<std::uint64_t> dispatchedFiberFromMainThread; + std::atomic<std::uint64_t> dispatchedFiberFromAnywhere; LawsStrategy() : scheduledFibersToRemotePriority(0), @@ -38,7 +38,7 @@ class LawsStrategy : public RuntimeStrategy { dispatchedFiberFromPriority(0), dispatchedFiberFromLocal(0), dispatchedFiberStolen(0), - dispatchedFiberFromMainThread(0) {} + dispatchedFiberFromAnywhere(0) {} auto getScheduler(Runtime& runtime) -> Scheduler& override; diff --git a/emper/strategies/laws/LawsStrategyStats.cpp b/emper/strategies/laws/LawsStrategyStats.cpp index 7e0aac9ac70b1a68342f26b12fd6f1f4cacd3343..c1acec276d015b3f79d2b0039519cfa734f2854c 100644 --- a/emper/strategies/laws/LawsStrategyStats.cpp +++ b/emper/strategies/laws/LawsStrategyStats.cpp @@ -13,7 +13,7 @@ LawsStrategyStats::LawsStrategyStats(LawsStrategy& lawsStrategy) dispatchedFiberFromPriority(lawsStrategy.dispatchedFiberFromPriority), dispatchedFiberFromLocal(lawsStrategy.dispatchedFiberFromLocal), dispatchedFiberStolen(lawsStrategy.dispatchedFiberStolen), - dispatchedFiberFromMainThread(lawsStrategy.dispatchedFiberFromMainThread) {} + dispatchedFiberFromAnywhere(lawsStrategy.dispatchedFiberFromAnywhere) {} auto LawsStrategyStats::getScheduledFibersToRemotePriority() const -> uint64_t { return scheduledFibersToRemotePriority; @@ -35,8 +35,8 @@ auto LawsStrategyStats::getDispatchedFiberStolen() const -> uint64_t { return dispatchedFiberStolen; } -auto LawsStrategyStats::getDispatchedFiberFromMainThread() const -> uint64_t { - return dispatchedFiberFromMainThread; +auto LawsStrategyStats::getDispatchedFiberFromAnywhere() const -> uint64_t { + return dispatchedFiberFromAnywhere; } void LawsStrategyStats::print() { @@ -46,5 +46,5 @@ void LawsStrategyStats::print() { << " dispatchedFiberFromPriority:" << dispatchedFiberFromPriority << " dispatchedFiberFromLocal:" << dispatchedFiberFromLocal << " dispatchedFiberStolen:" << dispatchedFiberStolen - << " dispatchedFiberFromMainThread:" << dispatchedFiberFromMainThread << std::endl; + << " dispatchedFiberFromAnywhere:" << dispatchedFiberFromAnywhere << std::endl; } diff --git a/emper/strategies/laws/LawsStrategyStats.hpp b/emper/strategies/laws/LawsStrategyStats.hpp index ca0ae2c268659d5b80478615a6f30dc06052b396..48de230b384b970a9ff904c7cfa8a84204b08020 100644 --- a/emper/strategies/laws/LawsStrategyStats.hpp +++ b/emper/strategies/laws/LawsStrategyStats.hpp @@ -15,7 +15,7 @@ class LawsStrategyStats : public RuntimeStrategyStats { const uint64_t dispatchedFiberFromPriority; const uint64_t dispatchedFiberFromLocal; const uint64_t dispatchedFiberStolen; - const uint64_t dispatchedFiberFromMainThread; + const uint64_t dispatchedFiberFromAnywhere; public: LawsStrategyStats(LawsStrategy& lawsStrategy); @@ -25,7 +25,7 @@ class LawsStrategyStats : public RuntimeStrategyStats { [[nodiscard]] auto getDispatchedFiberFromPriority() const -> uint64_t; [[nodiscard]] auto getDispatchedFiberFromLocal() const -> uint64_t; [[nodiscard]] auto getDispatchedFiberStolen() const -> uint64_t; - [[nodiscard]] auto getDispatchedFiberFromMainThread() const -> uint64_t; + [[nodiscard]] auto getDispatchedFiberFromAnywhere() const -> uint64_t; void print() override; }; diff --git a/emper/strategies/ws/WsScheduler.cpp b/emper/strategies/ws/WsScheduler.cpp index 6b081a264be5d768b49cf92ed3623b22a73e1c22..38636e34f04c111f3382152970ae03e68d1a6f38 100644 --- a/emper/strategies/ws/WsScheduler.cpp +++ b/emper/strategies/ws/WsScheduler.cpp @@ -19,7 +19,6 @@ WsScheduler::WsScheduler(Runtime& runtime, WsStrategy& wsStrategy) : Scheduler(runtime), wsStrategy(wsStrategy) { const workerid_t workerCount = runtime.getWorkerCount(); queues = new WsScheduler::WsQueue<QUEUE_SIZE>*[workerCount]; - mainThreadQueue = &queue; auto newWorkerHook = [this]() { queues[Runtime::getWorkerId()] = &queue; }; addNewWorkerHook(newWorkerHook); @@ -41,9 +40,7 @@ void WsScheduler::schedule(Fiber& fiber) { wsStrategy.scheduledFibers.fetch_add(1, std::memory_order_relaxed); } - if constexpr (emper::WORKER_SLEEP) { - notifyRuntimeAboutNewWork(); - } + onNewWork(); } auto WsScheduler::nextFiber() -> Fiber* { @@ -76,9 +73,7 @@ auto WsScheduler::nextFiber() -> Fiber* { } } - // Try the main thread queue to get work as last resort. - poped = mainThreadQueue->popTop(&fiber); - if (poped) return fiber; - - return nullptr; + // Try the "scheduled from anywhere" queue to get work as last resort. + fiber = dequeFiberFromAnywhereQueue(); + return fiber; } diff --git a/emper/strategies/ws/WsScheduler.hpp b/emper/strategies/ws/WsScheduler.hpp index da1db2bb9a1378744d4f317216bddc8dc6f0f2b8..569a9034430aaebba106583e5f4349f014169cb3 100644 --- a/emper/strategies/ws/WsScheduler.hpp +++ b/emper/strategies/ws/WsScheduler.hpp @@ -27,8 +27,6 @@ class WsScheduler : public Scheduler { WsQueue<QUEUE_SIZE>** queues; static thread_local WsQueue<QUEUE_SIZE> queue; - WsQueue<QUEUE_SIZE>* mainThreadQueue; - #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wattributes" WsStrategy& wsStrategy diff --git a/eval/TimeToSpawn.cpp b/eval/TimeToSpawn.cpp index 36e4d7d7edc4e26f667b55c6edc3438fb7641c5f..06102bfa68fb3a245866e8fa9ce9a70d3323cedf 100644 --- a/eval/TimeToSpawn.cpp +++ b/eval/TimeToSpawn.cpp @@ -41,7 +41,7 @@ static auto fiberTimeToSpawn() -> std::chrono::nanoseconds { res = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start); testFinished.unlock(); }); - runtime.schedule(*fiber); + runtime.scheduleFromAnywhere(*fiber); testFinished.lock(); diff --git a/tests/AlarmActorTest.cpp b/tests/AlarmActorTest.cpp index bad812f0402d8b19a2e104fbe37cd5b351563b46..580c45d65c65fedbde7f806facef3548c4acfafd 100644 --- a/tests/AlarmActorTest.cpp +++ b/tests/AlarmActorTest.cpp @@ -40,10 +40,10 @@ auto main(int argc, char* argv[]) -> int { Runtime runtime; - AlarmActor alarmActor(runtime); - alarmActor.start(); - Fiber* fiber = Fiber::from([&] { + AlarmActor alarmActor(runtime); + alarmActor.start(); + CPS cps; for (unsigned int i = 0; i < sleeper_count; ++i) { spawn( @@ -72,7 +72,7 @@ auto main(int argc, char* argv[]) -> int { exit(EXIT_SUCCESS); }); - runtime.schedule(*fiber); + runtime.scheduleFromAnywhere(*fiber); runtime.waitUntilFinished(); diff --git a/tests/CppApiTest.cpp b/tests/CppApiTest.cpp index b61ee9e0035eed9f719b4ec624e87c22b8ffdc18..cbdd8e1f03fb51bfcab2c4eb6d9d0b85bd12b9e9 100644 --- a/tests/CppApiTest.cpp +++ b/tests/CppApiTest.cpp @@ -4,9 +4,10 @@ #include <cstdlib> // for exit, EXIT_FAILURE, EXIT_SUC... #include "CountingPrivateSemaphore.hpp" // for CountingPrivateSemaphore -#include "Runtime.hpp" // for Runtime -#include "emper-common.h" // for UNUSED_ARG -#include "emper.hpp" // for async, spawn +#include "Fiber.hpp" +#include "Runtime.hpp" // for Runtime +#include "emper-common.h" // for UNUSED_ARG +#include "emper.hpp" // for async, spawn static std::atomic_uint counter; @@ -30,10 +31,11 @@ static void mainFiber() { exit(EXIT_SUCCESS); } -auto main(UNUSED_ARG int arg, UNUSED_ARG char *argv[]) -> int { +auto main(UNUSED_ARG int arg, UNUSED_ARG char* argv[]) -> int { Runtime runtime; - async(&mainFiber); + Fiber* alphaFiber = Fiber::from([] { async(&mainFiber); }); + runtime.scheduleFromAnywhere(*alphaFiber); runtime.waitUntilFinished(); diff --git a/tests/SignalPrivateSemaphoreFromAnywhereTest.cpp b/tests/SignalPrivateSemaphoreFromAnywhereTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d4fd5858023e19a279f4f1a223dec95a6124cbad --- /dev/null +++ b/tests/SignalPrivateSemaphoreFromAnywhereTest.cpp @@ -0,0 +1,45 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#include <cstdlib> // for exit, EXIT_FAILURE, EXIT_SUCCESS +#include <thread> + +#include "BinaryPrivateSemaphore.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" // for Runtime +#include "emper-common.h" +#include "emper.hpp" + +static void testSignalPrivateSemaphoreFromAnywhere() { + BinaryPrivateSemaphore bpsTest; + BinaryPrivateSemaphore bpsSignalledFromAnywhere; + + Fiber* fiberWaitingForBps = Fiber::from([&] { + bpsSignalledFromAnywhere.wait(); + + bpsTest.signal(); + }); + + async(fiberWaitingForBps); + + // TODO: Use std::jthread once EMPER uses C++20. + std::thread signallingThread([&] { bpsSignalledFromAnywhere.signalFromAnywhere(); }); + + bpsTest.wait(); + + // TODO: Remove this once we use std::jthread when EMPER uses C++20. + signallingThread.join(); + + exit(EXIT_SUCCESS); +} + +auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { + Runtime runtime; + + Fiber* alphaFiber = Fiber::from(&testSignalPrivateSemaphoreFromAnywhere); + + runtime.scheduleFromAnywhere(*alphaFiber); + + runtime.waitUntilFinished(); + + return EXIT_FAILURE; +} diff --git a/tests/SimpleActorTest.cpp b/tests/SimpleActorTest.cpp index 5fe140a5e85da8170a6c7c824a4f1d44d4fffaee..fbd398a031a261947eef208e2334b0f8ca0a84d0 100644 --- a/tests/SimpleActorTest.cpp +++ b/tests/SimpleActorTest.cpp @@ -80,7 +80,7 @@ auto main(UNUSED_ARG int arg, UNUSED_ARG char* argv[]) -> int { Runtime runtime; Fiber* fiber = Fiber::from(mainFiber, (void*)&runtime); - runtime.schedule(*fiber); + runtime.scheduleFromAnywhere(*fiber); runtime.waitUntilFinished(); diff --git a/tests/SimpleFibTest.cpp b/tests/SimpleFibTest.cpp index aace0206fb37e4c5b9545c49e796fbaf0779ab44..79bd1f725a7dcc0b2cf209b4ae341a977e71d886 100644 --- a/tests/SimpleFibTest.cpp +++ b/tests/SimpleFibTest.cpp @@ -74,7 +74,7 @@ auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { }, nullptr); - runtime.schedule(*fibFiber); + runtime.scheduleFromAnywhere(*fibFiber); runtime.waitUntilFinished(); diff --git a/tests/SimpleLawsTest.cpp b/tests/SimpleLawsTest.cpp index c4fb8177aedee9c3581d1b95b138c18ddcd13cdb..c35cd856bba70af99592b7d7cada5bb79a8bda8f 100644 --- a/tests/SimpleLawsTest.cpp +++ b/tests/SimpleLawsTest.cpp @@ -108,7 +108,7 @@ auto main(UNUSED_ARG int args, UNUSED_ARG char* argv[]) -> int { Fiber* alphaFiber = Fiber::from(&alphaFun); - runtime.schedule(*alphaFiber); + runtime.scheduleFromAnywhere(*alphaFiber); runtime.waitUntilFinished(); diff --git a/tests/SimpleUrcuTest.cpp b/tests/SimpleUrcuTest.cpp index 75c552b950ec2f866a2287aeebd01500cbe74b56..39ab24a5bd690753bb374dbcb9feba80861c113e 100644 --- a/tests/SimpleUrcuTest.cpp +++ b/tests/SimpleUrcuTest.cpp @@ -86,7 +86,7 @@ auto main() -> int { exit(EXIT_SUCCESS); }); - runtime.schedule(*verifier); + runtime.scheduleFromAnywhere(*verifier); runtime.waitUntilFinished(); return EXIT_FAILURE; diff --git a/tests/SimplestFibTest.cpp b/tests/SimplestFibTest.cpp index 6ff8afde24566564788586f753f98afd14969c10..bd0700a3700f7e62c80749b75df31530b3958009 100644 --- a/tests/SimplestFibTest.cpp +++ b/tests/SimplestFibTest.cpp @@ -89,7 +89,7 @@ auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { std::cout << "Just alloacted alpha fiber at " << fibFiber << std::endl; - runtime.schedule(*fibFiber); + runtime.scheduleFromAnywhere(*fibFiber); runtime.waitUntilFinished(); diff --git a/tests/TellActorFromAnywhereTest.cpp b/tests/TellActorFromAnywhereTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8396641388772d98d31e318ca5fbc38905d61da1 --- /dev/null +++ b/tests/TellActorFromAnywhereTest.cpp @@ -0,0 +1,50 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#include <algorithm> // for copy, copy_backward +#include <cstdlib> // for exit, EXIT_FAILURE, EXIT_SUCCESS +#include <thread> + +#include "Actor.hpp" +#include "BinaryPrivateSemaphore.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" // for Runtime +#include "emper-common.h" + +class SignallingActor : public Actor<unsigned int> { + private: + BinaryPrivateSemaphore& bps; + + protected: + void receive(UNUSED_ARG unsigned int signalCount) override { bps.signal(); } + + public: + SignallingActor(BinaryPrivateSemaphore& bps) : bps(bps){}; +}; + +static void testTellActorFromAnywhere() { + BinaryPrivateSemaphore bps; + SignallingActor signallingActor(bps); + signallingActor.start(); + + // TODO: Use std::jthread once EMPER uses C++20. + std::thread signallingThread([&] { signallingActor.tellFromAnywhere(1); }); + + bps.wait(); + + // TODO: Remove this once we use std::jthread when EMPER uses C++20. + signallingThread.join(); + + exit(EXIT_SUCCESS); +} + +auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { + Runtime runtime; + + Fiber* alphaFiber = Fiber::from(&testTellActorFromAnywhere); + + runtime.scheduleFromAnywhere(*alphaFiber); + + runtime.waitUntilFinished(); + + return EXIT_FAILURE; +} diff --git a/tests/c_api_test.c b/tests/c_api_test.c index a9ef015d60f693cd4c4d1bdcf1b830f8b716fb83..eb8eee42a3a937bace40ba5e762adefca2618b9c 100644 --- a/tests/c_api_test.c +++ b/tests/c_api_test.c @@ -46,7 +46,7 @@ int main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) { init_runtime(); fiber* alpha_fiber = fiber_from0(alpha_fun); - schedule(alpha_fiber); + schedule_from_anywhere(alpha_fiber); wait_until_runtime_finished(); diff --git a/tests/meson.build b/tests/meson.build index c88ddd489d466c0bcbab989eab10c8965829903b..0fdfdbf747ee599ce3705ead767c15eb501df333 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -44,6 +44,18 @@ tests = { 'description': 'Simple userspace-rcu hash table test', 'dependencies': [liburcu_memb, liburcu_cds] }, + 'SignalPrivateSemaphoreFromAnywhereTest.cpp': + { + 'description': 'Simple test for PrivateSemaphore:signalFromAnywhere()', + 'test_suite': 'smoke', + 'is_parallel': true, + }, + 'TellActorFromAnywhereTest.cpp': + { + 'description': 'Simple test for Actor:tellFromAnywhere()', + 'test_suite': 'smoke', + 'is_parallel': true, + }, } undef_ndebug = '-UNDEBUG' @@ -64,9 +76,9 @@ foreach source, test_dict : tests # The test_name is the name of the source file without the file suffix. test_name = source.split('.')[0] - test_dep = [thread_dep] + test_deps = [thread_dep] if test_dict.has_key('dependencies') - test_dep += test_dict['dependencies'] + test_deps += test_dict['dependencies'] endif test_exe = executable(test_name, @@ -74,8 +86,7 @@ foreach source, test_dict : tests include_directories: emper_all_include, c_args: undef_ndebug, cpp_args: undef_ndebug, - dependencies: emper_dependencies + test_dep, - link_with: [emper, emper_c], + dependencies: [emper_full_dep] + test_deps, ) test(test_name,