From 958ce6f6a5c15731fd178fe12a3bf62d96de9946 Mon Sep 17 00:00:00 2001 From: Florian Schmaus <flow@cs.fau.de> Date: Mon, 7 Dec 2020 13:38:20 +0100 Subject: [PATCH] Add *fromAnywhere() methods to EMPER synchronization primitives --- emper/Actor.hpp | 4 +- emper/Blockable.hpp | 8 ++- emper/PrivateSemaphore.hpp | 5 +- emper/Semaphore.hpp | 5 +- emper/UnboundedBlockingMpscQueue.hpp | 12 +++-- ...SignalPrivateSemaphoreFromAnywhereTest.cpp | 45 +++++++++++++++++ tests/TellActorFromAnywhereTest.cpp | 50 +++++++++++++++++++ tests/meson.build | 12 +++++ 8 files changed, 133 insertions(+), 8 deletions(-) create mode 100644 tests/SignalPrivateSemaphoreFromAnywhereTest.cpp create mode 100644 tests/TellActorFromAnywhereTest.cpp diff --git a/emper/Actor.hpp b/emper/Actor.hpp index a436899c..de36e9ac 100644 --- a/emper/Actor.hpp +++ b/emper/Actor.hpp @@ -48,7 +48,7 @@ class Actor { } protected: - Actor(Runtime& runtime) : runtime(runtime), queue(runtime) {} + Actor(Runtime& runtime = *Runtime::getRuntime()) : runtime(runtime), queue(runtime) {} virtual void receive(T t) = 0; @@ -64,6 +64,8 @@ class Actor { void tell(T t) { queue.put(t); } + void tellFromAnywhere(T t) { queue.putFromAnywhere(t); } + auto pendingMailboxItems() -> size_t { return queue.size(); } auto waitUntilIdle(long timeout) -> bool { diff --git a/emper/Blockable.hpp b/emper/Blockable.hpp index 162f89fe..04cbb493 100644 --- a/emper/Blockable.hpp +++ b/emper/Blockable.hpp @@ -32,11 +32,17 @@ class Blockable : public Logger<logSubsystem> { contextManager.saveAndStartNew(std::move(freshContextHook)); } + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void unblock(Context* context) { assert(context != nullptr); // cppcheck-suppress unsafeClassCanLeak Fiber* unblockFiber = Fiber::from([this, context]() { contextManager.discardAndResume(context); }); - runtime.schedule(*unblockFiber); + + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + runtime.schedule(*unblockFiber); + } else { + runtime.scheduleFromAnywhere(*unblockFiber); + } } }; diff --git a/emper/PrivateSemaphore.hpp b/emper/PrivateSemaphore.hpp index 526c5e3a..a72347d6 100644 --- a/emper/PrivateSemaphore.hpp +++ b/emper/PrivateSemaphore.hpp @@ -27,15 +27,18 @@ class PrivateSemaphore : protected Blockable<LogSubsystem::PS> { public: virtual void wait() = 0; + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void signal() { if (Context* readyContext = signalInternal()) { - unblock(readyContext); + unblock<callerEnvironment>(readyContext); } // Assure that nothing happens after here with the sync // primitive, as the data of the sync primitive may became // invalid at this point. } + void signalFromAnywhere() { signal<CallerEnvironment::ANYWHERE>(); } + void signalAndExit() { if (Context* readyContext = signalInternal()) { unblockAndExit(readyContext); diff --git a/emper/Semaphore.hpp b/emper/Semaphore.hpp index be116370..0ec2340e 100644 --- a/emper/Semaphore.hpp +++ b/emper/Semaphore.hpp @@ -34,6 +34,7 @@ class Semaphore { return blocked; } + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> auto release() -> bool { mutex.lock(); bool waiterListEmpty = waiterList.empty(); @@ -44,11 +45,13 @@ class Semaphore { BinaryPrivateSemaphore* semaphore = waiterList.front(); waiterList.pop(); mutex.unlock(); - semaphore->signal(); + semaphore->signal<callerEnvironment>(); } return waiterListEmpty; } + auto releaseFromAnywhere() -> bool { return release<CallerEnvironment::ANYWHERE>(); } + void print(); }; diff --git a/emper/UnboundedBlockingMpscQueue.hpp b/emper/UnboundedBlockingMpscQueue.hpp index f9841b30..6cdff408 100644 --- a/emper/UnboundedBlockingMpscQueue.hpp +++ b/emper/UnboundedBlockingMpscQueue.hpp @@ -20,11 +20,12 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { std::queue<T> mpscQueue; std::mutex queueMutex; + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void tryToWakeupBlockedContext() { Context* context = blockedContext.exchange(nullptr); - if (context) { - unblock(context); - } + if (!context) return; + + unblock<callerEnvironment>(context); } void tryToGetElement(const std::function<void(void)>& postRetrieve) { @@ -44,6 +45,7 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { public: UnboundedBlockingMpscQueue(Runtime& runtime) : Blockable(runtime) {} + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void put(T t) { { std::lock_guard<std::mutex> lock(queueMutex); @@ -53,10 +55,12 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { // Micro optimization, see if there is a blocked context // before performing the atomic exchange operation. if (blockedContext.load() != nullptr) { - tryToWakeupBlockedContext(); + tryToWakeupBlockedContext<callerEnvironment>(); } } + void putFromAnywhere(T t) { put<CallerEnvironment::ANYWHERE>(t); } + auto get(const std::function<void(void)>& postRetrieve) -> T { tPopped = false; tryToGetElement(postRetrieve); diff --git a/tests/SignalPrivateSemaphoreFromAnywhereTest.cpp b/tests/SignalPrivateSemaphoreFromAnywhereTest.cpp new file mode 100644 index 00000000..d4fd5858 --- /dev/null +++ b/tests/SignalPrivateSemaphoreFromAnywhereTest.cpp @@ -0,0 +1,45 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#include <cstdlib> // for exit, EXIT_FAILURE, EXIT_SUCCESS +#include <thread> + +#include "BinaryPrivateSemaphore.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" // for Runtime +#include "emper-common.h" +#include "emper.hpp" + +static void testSignalPrivateSemaphoreFromAnywhere() { + BinaryPrivateSemaphore bpsTest; + BinaryPrivateSemaphore bpsSignalledFromAnywhere; + + Fiber* fiberWaitingForBps = Fiber::from([&] { + bpsSignalledFromAnywhere.wait(); + + bpsTest.signal(); + }); + + async(fiberWaitingForBps); + + // TODO: Use std::jthread once EMPER uses C++20. + std::thread signallingThread([&] { bpsSignalledFromAnywhere.signalFromAnywhere(); }); + + bpsTest.wait(); + + // TODO: Remove this once we use std::jthread when EMPER uses C++20. + signallingThread.join(); + + exit(EXIT_SUCCESS); +} + +auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { + Runtime runtime; + + Fiber* alphaFiber = Fiber::from(&testSignalPrivateSemaphoreFromAnywhere); + + runtime.scheduleFromAnywhere(*alphaFiber); + + runtime.waitUntilFinished(); + + return EXIT_FAILURE; +} diff --git a/tests/TellActorFromAnywhereTest.cpp b/tests/TellActorFromAnywhereTest.cpp new file mode 100644 index 00000000..83966413 --- /dev/null +++ b/tests/TellActorFromAnywhereTest.cpp @@ -0,0 +1,50 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#include <algorithm> // for copy, copy_backward +#include <cstdlib> // for exit, EXIT_FAILURE, EXIT_SUCCESS +#include <thread> + +#include "Actor.hpp" +#include "BinaryPrivateSemaphore.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" // for Runtime +#include "emper-common.h" + +class SignallingActor : public Actor<unsigned int> { + private: + BinaryPrivateSemaphore& bps; + + protected: + void receive(UNUSED_ARG unsigned int signalCount) override { bps.signal(); } + + public: + SignallingActor(BinaryPrivateSemaphore& bps) : bps(bps){}; +}; + +static void testTellActorFromAnywhere() { + BinaryPrivateSemaphore bps; + SignallingActor signallingActor(bps); + signallingActor.start(); + + // TODO: Use std::jthread once EMPER uses C++20. + std::thread signallingThread([&] { signallingActor.tellFromAnywhere(1); }); + + bps.wait(); + + // TODO: Remove this once we use std::jthread when EMPER uses C++20. + signallingThread.join(); + + exit(EXIT_SUCCESS); +} + +auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { + Runtime runtime; + + Fiber* alphaFiber = Fiber::from(&testTellActorFromAnywhere); + + runtime.scheduleFromAnywhere(*alphaFiber); + + runtime.waitUntilFinished(); + + return EXIT_FAILURE; +} diff --git a/tests/meson.build b/tests/meson.build index b6cbcd96..0fdfdbf7 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -44,6 +44,18 @@ tests = { 'description': 'Simple userspace-rcu hash table test', 'dependencies': [liburcu_memb, liburcu_cds] }, + 'SignalPrivateSemaphoreFromAnywhereTest.cpp': + { + 'description': 'Simple test for PrivateSemaphore:signalFromAnywhere()', + 'test_suite': 'smoke', + 'is_parallel': true, + }, + 'TellActorFromAnywhereTest.cpp': + { + 'description': 'Simple test for Actor:tellFromAnywhere()', + 'test_suite': 'smoke', + 'is_parallel': true, + }, } undef_ndebug = '-UNDEBUG' -- GitLab