diff --git a/emper/Actor.hpp b/emper/Actor.hpp index 4065ee2c8ff74916ede9e851126249424d8f1431..f8d5651b5fb76c9cbfc7fee83977f3075aa0364d 100644 --- a/emper/Actor.hpp +++ b/emper/Actor.hpp @@ -9,6 +9,7 @@ #include "Fiber.hpp" #include "UnboundedBlockingMpscQueue.hpp" #include "emper.hpp" +#include "io/Future.hpp" template <typename T> class Actor { @@ -19,8 +20,10 @@ class Actor { Running, }; + protected: Runtime& runtime; + private: std::atomic<State> state = {Stopped}; UnboundedBlockingMpscQueue<T> queue; @@ -73,9 +76,15 @@ class Actor { void startFromAnywhere() { start<CallerEnvironment::ANYWHERE>(); } - void tell(T t) { queue.put(t); } + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + void tell(T t) { + queue.template put<callerEnvironment>(t); + } - void tellFromAnywhere(T t) { queue.putFromAnywhere(t); } + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER, typename InputIt> + void tell(InputIt begin, InputIt end) { + queue.template put<callerEnvironment>(begin, end); + } auto pendingMailboxItems() -> size_t { return queue.size(); } @@ -86,7 +95,7 @@ class Actor { if constexpr (emper::IO) { emper::sleep(1); } else { - emper::yield(); + runtime.yield(); } // TODO: The suppressed linter error below may be a false positive // reported by clang-tidy. diff --git a/emper/UnboundedBlockingMpscQueue.hpp b/emper/UnboundedBlockingMpscQueue.hpp index 9df3fdb02e166ec812266423dd91fc650939c748..67353bfa3090c31bbff48ce589ad1bcb4da14b46 100644 --- a/emper/UnboundedBlockingMpscQueue.hpp +++ b/emper/UnboundedBlockingMpscQueue.hpp @@ -60,8 +60,29 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { } } + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER, typename InputIt> + void put(InputIt begin, InputIt end) { + { + std::lock_guard<std::mutex> lock(queueMutex); + for (InputIt cur = begin; cur != end; ++cur) { + mpscQueue.push(*cur); + } + } + + // Micro optimization, see if there is a blocked context + // before performing the atomic exchange operation. + if (blockedContext != nullptr) { + tryToWakeupBlockedContext<callerEnvironment>(); + } + } + void putFromAnywhere(T t) { put<CallerEnvironment::ANYWHERE>(t); } + template <typename InputIt> + void putFromAnywhere(InputIt begin, InputIt end) { + put<CallerEnvironment::ANYWHERE>(begin, end); + } + auto get(const std::function<void(void)>& postRetrieve) -> T { tPopped = false; tryToGetElement(postRetrieve); diff --git a/tests/TellActorFromAnywhereTest.cpp b/tests/TellActorFromAnywhereTest.cpp index 3e63f1f1a4b9c49d41d6bb11a5c00d093886bfcb..7fc9cdad57e10ec2176416396cf5f04cfae7d2a0 100644 --- a/tests/TellActorFromAnywhereTest.cpp +++ b/tests/TellActorFromAnywhereTest.cpp @@ -4,6 +4,7 @@ #include "Actor.hpp" #include "BinaryPrivateSemaphore.hpp" +#include "CallerEnvironment.hpp" #include "emper-common.h" class SignallingActor : public Actor<unsigned int> { @@ -25,7 +26,7 @@ void emperTest() { signallingActor->start(); // TODO: Use std::jthread once EMPER uses C++20. - std::thread signallingThread([&] { signallingActor->tellFromAnywhere(1); }); + std::thread signallingThread([&] { signallingActor->tell<CallerEnvironment::ANYWHERE>(1); }); bps->wait();