diff --git a/emper/Actor.hpp b/emper/Actor.hpp index e4cc429631f7be93daf5d8cfd9f40440943914af..9810fe611a082014e9f874fa18a11ebb3030a736 100644 --- a/emper/Actor.hpp +++ b/emper/Actor.hpp @@ -32,9 +32,10 @@ private: while (state == Running) { setState(Retrieving); - T t = queue.get(); + T t = queue.get([this] { + setState(Running); + }); - setState(Running); receive(t); } diff --git a/emper/UnboundedBlockingMpscQueue.hpp b/emper/UnboundedBlockingMpscQueue.hpp index 293c78ab5fb84bd9dccb284ab6540b45f45ef13f..c28449abb61e2f500a9c666023d90b8c84ecb409 100644 --- a/emper/UnboundedBlockingMpscQueue.hpp +++ b/emper/UnboundedBlockingMpscQueue.hpp @@ -26,12 +26,13 @@ private: } } - void tryToGetElement() { + void tryToGetElement(std::function<void(void)> postRetrieve) { std::lock_guard<std::mutex> lock(queueMutex); if (!mpscQueue.empty()) { t = mpscQueue.front(); mpscQueue.pop(); tPopped = true; + postRetrieve(); } } @@ -57,16 +58,16 @@ public: } } - T get() { + T get(std::function<void(void)> postRetrieve) { tPopped = false; - tryToGetElement(); + tryToGetElement(postRetrieve); if (!tPopped) { Context* context = Context::getCurrentContext(); - block([this, context] { + block([this, context, postRetrieve] { blockedContext = context; - tryToGetElement(); + tryToGetElement(postRetrieve); if (tPopped) { tryToWakeupBlockedContext(); } @@ -79,6 +80,7 @@ public: assert(!mpscQueue.empty()); t = mpscQueue.front(); mpscQueue.pop(); + postRetrieve(); } } return t;