diff --git a/emper/Actor.hpp b/emper/Actor.hpp index a436899c7269ad3af78e6f6562bea326252c9855..de36e9ac14329ac3cf4b9d0f3b495be6ce4a64b9 100644 --- a/emper/Actor.hpp +++ b/emper/Actor.hpp @@ -48,7 +48,7 @@ class Actor { } protected: - Actor(Runtime& runtime) : runtime(runtime), queue(runtime) {} + Actor(Runtime& runtime = *Runtime::getRuntime()) : runtime(runtime), queue(runtime) {} virtual void receive(T t) = 0; @@ -64,6 +64,8 @@ class Actor { void tell(T t) { queue.put(t); } + void tellFromAnywhere(T t) { queue.putFromAnywhere(t); } + auto pendingMailboxItems() -> size_t { return queue.size(); } auto waitUntilIdle(long timeout) -> bool { diff --git a/emper/Blockable.hpp b/emper/Blockable.hpp index 162f89fea2c18229d63c000f72a5fb4ad3c600a7..04cbb493ab0cc112ecbc4cd40b6ec06c3fbf7fcb 100644 --- a/emper/Blockable.hpp +++ b/emper/Blockable.hpp @@ -32,11 +32,17 @@ class Blockable : public Logger<logSubsystem> { contextManager.saveAndStartNew(std::move(freshContextHook)); } + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void unblock(Context* context) { assert(context != nullptr); // cppcheck-suppress unsafeClassCanLeak Fiber* unblockFiber = Fiber::from([this, context]() { contextManager.discardAndResume(context); }); - runtime.schedule(*unblockFiber); + + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + runtime.schedule(*unblockFiber); + } else { + runtime.scheduleFromAnywhere(*unblockFiber); + } } }; diff --git a/emper/PrivateSemaphore.hpp b/emper/PrivateSemaphore.hpp index 526c5e3a9b21e21b4cb4a4236b028d229a9efda7..a72347d65958ab9f328301ce7cc98fd4e08d7447 100644 --- a/emper/PrivateSemaphore.hpp +++ b/emper/PrivateSemaphore.hpp @@ -27,15 +27,18 @@ class PrivateSemaphore : protected Blockable<LogSubsystem::PS> { public: virtual void wait() = 0; + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void signal() { if (Context* readyContext = signalInternal()) { - unblock(readyContext); + unblock<callerEnvironment>(readyContext); } // Assure that nothing happens after here with the sync // primitive, as the data of the sync primitive may became // invalid at this point. } + void signalFromAnywhere() { signal<CallerEnvironment::ANYWHERE>(); } + void signalAndExit() { if (Context* readyContext = signalInternal()) { unblockAndExit(readyContext); diff --git a/emper/Semaphore.hpp b/emper/Semaphore.hpp index be1163701373982cba892655fd233f89adf8e6fe..0ec2340e90ea931b2fdf15b6bf705c4b5d93b429 100644 --- a/emper/Semaphore.hpp +++ b/emper/Semaphore.hpp @@ -34,6 +34,7 @@ class Semaphore { return blocked; } + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> auto release() -> bool { mutex.lock(); bool waiterListEmpty = waiterList.empty(); @@ -44,11 +45,13 @@ class Semaphore { BinaryPrivateSemaphore* semaphore = waiterList.front(); waiterList.pop(); mutex.unlock(); - semaphore->signal(); + semaphore->signal<callerEnvironment>(); } return waiterListEmpty; } + auto releaseFromAnywhere() -> bool { return release<CallerEnvironment::ANYWHERE>(); } + void print(); }; diff --git a/emper/UnboundedBlockingMpscQueue.hpp b/emper/UnboundedBlockingMpscQueue.hpp index f9841b30d18fb53a9f53833462c0c2c1cd96e86c..6cdff408bd41ae1a24b41e6c840f8c3b6f18e686 100644 --- a/emper/UnboundedBlockingMpscQueue.hpp +++ b/emper/UnboundedBlockingMpscQueue.hpp @@ -20,11 +20,12 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { std::queue<T> mpscQueue; std::mutex queueMutex; + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void tryToWakeupBlockedContext() { Context* context = blockedContext.exchange(nullptr); - if (context) { - unblock(context); - } + if (!context) return; + + unblock<callerEnvironment>(context); } void tryToGetElement(const std::function<void(void)>& postRetrieve) { @@ -44,6 +45,7 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { public: UnboundedBlockingMpscQueue(Runtime& runtime) : Blockable(runtime) {} + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void put(T t) { { std::lock_guard<std::mutex> lock(queueMutex); @@ -53,10 +55,12 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { // Micro optimization, see if there is a blocked context // before performing the atomic exchange operation. if (blockedContext.load() != nullptr) { - tryToWakeupBlockedContext(); + tryToWakeupBlockedContext<callerEnvironment>(); } } + void putFromAnywhere(T t) { put<CallerEnvironment::ANYWHERE>(t); } + auto get(const std::function<void(void)>& postRetrieve) -> T { tPopped = false; tryToGetElement(postRetrieve); diff --git a/tests/SignalPrivateSemaphoreFromAnywhereTest.cpp b/tests/SignalPrivateSemaphoreFromAnywhereTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d4fd5858023e19a279f4f1a223dec95a6124cbad --- /dev/null +++ b/tests/SignalPrivateSemaphoreFromAnywhereTest.cpp @@ -0,0 +1,45 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#include <cstdlib> // for exit, EXIT_FAILURE, EXIT_SUCCESS +#include <thread> + +#include "BinaryPrivateSemaphore.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" // for Runtime +#include "emper-common.h" +#include "emper.hpp" + +static void testSignalPrivateSemaphoreFromAnywhere() { + BinaryPrivateSemaphore bpsTest; + BinaryPrivateSemaphore bpsSignalledFromAnywhere; + + Fiber* fiberWaitingForBps = Fiber::from([&] { + bpsSignalledFromAnywhere.wait(); + + bpsTest.signal(); + }); + + async(fiberWaitingForBps); + + // TODO: Use std::jthread once EMPER uses C++20. + std::thread signallingThread([&] { bpsSignalledFromAnywhere.signalFromAnywhere(); }); + + bpsTest.wait(); + + // TODO: Remove this once we use std::jthread when EMPER uses C++20. + signallingThread.join(); + + exit(EXIT_SUCCESS); +} + +auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { + Runtime runtime; + + Fiber* alphaFiber = Fiber::from(&testSignalPrivateSemaphoreFromAnywhere); + + runtime.scheduleFromAnywhere(*alphaFiber); + + runtime.waitUntilFinished(); + + return EXIT_FAILURE; +} diff --git a/tests/TellActorFromAnywhereTest.cpp b/tests/TellActorFromAnywhereTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8396641388772d98d31e318ca5fbc38905d61da1 --- /dev/null +++ b/tests/TellActorFromAnywhereTest.cpp @@ -0,0 +1,50 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#include <algorithm> // for copy, copy_backward +#include <cstdlib> // for exit, EXIT_FAILURE, EXIT_SUCCESS +#include <thread> + +#include "Actor.hpp" +#include "BinaryPrivateSemaphore.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" // for Runtime +#include "emper-common.h" + +class SignallingActor : public Actor<unsigned int> { + private: + BinaryPrivateSemaphore& bps; + + protected: + void receive(UNUSED_ARG unsigned int signalCount) override { bps.signal(); } + + public: + SignallingActor(BinaryPrivateSemaphore& bps) : bps(bps){}; +}; + +static void testTellActorFromAnywhere() { + BinaryPrivateSemaphore bps; + SignallingActor signallingActor(bps); + signallingActor.start(); + + // TODO: Use std::jthread once EMPER uses C++20. + std::thread signallingThread([&] { signallingActor.tellFromAnywhere(1); }); + + bps.wait(); + + // TODO: Remove this once we use std::jthread when EMPER uses C++20. + signallingThread.join(); + + exit(EXIT_SUCCESS); +} + +auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { + Runtime runtime; + + Fiber* alphaFiber = Fiber::from(&testTellActorFromAnywhere); + + runtime.scheduleFromAnywhere(*alphaFiber); + + runtime.waitUntilFinished(); + + return EXIT_FAILURE; +} diff --git a/tests/meson.build b/tests/meson.build index b6cbcd961506dad9d8de01db2f39d9cd584123a5..0fdfdbf747ee599ce3705ead767c15eb501df333 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -44,6 +44,18 @@ tests = { 'description': 'Simple userspace-rcu hash table test', 'dependencies': [liburcu_memb, liburcu_cds] }, + 'SignalPrivateSemaphoreFromAnywhereTest.cpp': + { + 'description': 'Simple test for PrivateSemaphore:signalFromAnywhere()', + 'test_suite': 'smoke', + 'is_parallel': true, + }, + 'TellActorFromAnywhereTest.cpp': + { + 'description': 'Simple test for Actor:tellFromAnywhere()', + 'test_suite': 'smoke', + 'is_parallel': true, + }, } undef_ndebug = '-UNDEBUG'