Skip to content
Snippets Groups Projects
Commit 99ace7e5 authored by Florian Fischer's avatar Florian Fischer
Browse files

[Actor] allow multiple Ts to be told to an Actor

parent fd5cfe5e
No related branches found
No related tags found
1 merge request!230[IO] Implement configurable "simple architecture"
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include "Fiber.hpp" #include "Fiber.hpp"
#include "UnboundedBlockingMpscQueue.hpp" #include "UnboundedBlockingMpscQueue.hpp"
#include "emper.hpp" #include "emper.hpp"
#include "io/Future.hpp"
template <typename T> template <typename T>
class Actor { class Actor {
...@@ -19,8 +20,10 @@ class Actor { ...@@ -19,8 +20,10 @@ class Actor {
Running, Running,
}; };
protected:
Runtime& runtime; Runtime& runtime;
private:
std::atomic<State> state = {Stopped}; std::atomic<State> state = {Stopped};
UnboundedBlockingMpscQueue<T> queue; UnboundedBlockingMpscQueue<T> queue;
...@@ -73,9 +76,15 @@ class Actor { ...@@ -73,9 +76,15 @@ class Actor {
void startFromAnywhere() { start<CallerEnvironment::ANYWHERE>(); } void startFromAnywhere() { start<CallerEnvironment::ANYWHERE>(); }
void tell(T t) { queue.put(t); } template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
void tell(T t) {
queue.template put<callerEnvironment>(t);
}
void tellFromAnywhere(T t) { queue.putFromAnywhere(t); } template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER, typename InputIt>
void tell(InputIt begin, InputIt end) {
queue.template put<callerEnvironment>(begin, end);
}
auto pendingMailboxItems() -> size_t { return queue.size(); } auto pendingMailboxItems() -> size_t { return queue.size(); }
...@@ -86,7 +95,7 @@ class Actor { ...@@ -86,7 +95,7 @@ class Actor {
if constexpr (emper::IO) { if constexpr (emper::IO) {
emper::sleep(1); emper::sleep(1);
} else { } else {
emper::yield(); runtime.yield();
} }
// TODO: The suppressed linter error below may be a false positive // TODO: The suppressed linter error below may be a false positive
// reported by clang-tidy. // reported by clang-tidy.
......
...@@ -60,8 +60,29 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { ...@@ -60,8 +60,29 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> {
} }
} }
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER, typename InputIt>
void put(InputIt begin, InputIt end) {
{
std::lock_guard<std::mutex> lock(queueMutex);
for (InputIt cur = begin; cur != end; ++cur) {
mpscQueue.push(*cur);
}
}
// Micro optimization, see if there is a blocked context
// before performing the atomic exchange operation.
if (blockedContext != nullptr) {
tryToWakeupBlockedContext<callerEnvironment>();
}
}
void putFromAnywhere(T t) { put<CallerEnvironment::ANYWHERE>(t); } void putFromAnywhere(T t) { put<CallerEnvironment::ANYWHERE>(t); }
template <typename InputIt>
void putFromAnywhere(InputIt begin, InputIt end) {
put<CallerEnvironment::ANYWHERE>(begin, end);
}
auto get(const std::function<void(void)>& postRetrieve) -> T { auto get(const std::function<void(void)>& postRetrieve) -> T {
tPopped = false; tPopped = false;
tryToGetElement(postRetrieve); tryToGetElement(postRetrieve);
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "Actor.hpp" #include "Actor.hpp"
#include "BinaryPrivateSemaphore.hpp" #include "BinaryPrivateSemaphore.hpp"
#include "CallerEnvironment.hpp"
#include "emper-common.h" #include "emper-common.h"
class SignallingActor : public Actor<unsigned int> { class SignallingActor : public Actor<unsigned int> {
...@@ -25,7 +26,7 @@ void emperTest() { ...@@ -25,7 +26,7 @@ void emperTest() {
signallingActor->start(); signallingActor->start();
// TODO: Use std::jthread once EMPER uses C++20. // TODO: Use std::jthread once EMPER uses C++20.
std::thread signallingThread([&] { signallingActor->tellFromAnywhere(1); }); std::thread signallingThread([&] { signallingActor->tell<CallerEnvironment::ANYWHERE>(1); });
bps->wait(); bps->wait();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment