diff --git a/emper/Actor.hpp b/emper/Actor.hpp index 9810fe611a082014e9f874fa18a11ebb3030a736..4f08e7bc1c39308e606b3eb3cd2235baa0c54f4d 100644 --- a/emper/Actor.hpp +++ b/emper/Actor.hpp @@ -23,6 +23,10 @@ private: UnboundedBlockingMpscQueue<T> queue; + bool switchState(State oldState, State newState) { + return state.compare_exchange_strong(oldState, newState, std::memory_order_acq_rel); + } + void setState(State newState) { state.store(newState, std::memory_order_release); } @@ -30,16 +34,19 @@ private: void actorLoop() { setState(Running); - while (state == Running) { - setState(Retrieving); + while (state.load(std::memory_order_acquire) == Running) { + if (!switchState(Running, Retrieving)) { + // Someone else must have set our state to Stopped + break; + } + T t = queue.get([this] { - setState(Running); + // Prevent lost stop() + switchState(Retrieving, Running); }); receive(t); } - - setState(Stopped); } protected: diff --git a/emper/UnboundedBlockingMpscQueue.hpp b/emper/UnboundedBlockingMpscQueue.hpp index c28449abb61e2f500a9c666023d90b8c84ecb409..47dd19f6abea0b5c67e1d641cebac7da12d3f963 100644 --- a/emper/UnboundedBlockingMpscQueue.hpp +++ b/emper/UnboundedBlockingMpscQueue.hpp @@ -11,7 +11,7 @@ class UnboundedBlockingMpscQueue : protected Logger<LogSubsystem::U_B_MPSC_Q> , protected Blockable { private: - std::atomic<Context*> blockedContext; + std::atomic<Context*> blockedContext = nullptr; bool tPopped; T t; @@ -27,6 +27,10 @@ private: } void tryToGetElement(std::function<void(void)> postRetrieve) { + // tPopped indicates that 't' is a popped and usable value. + // Therefore if we tryToGet a new 't' while 't' is available the current 't' + // would be overridden and dropped. + assert(!tPopped); std::lock_guard<std::mutex> lock(queueMutex); if (!mpscQueue.empty()) { t = mpscQueue.front();