diff --git a/emper/UnboundedBlockingMpscQueue.hpp b/emper/UnboundedBlockingMpscQueue.hpp index 6cdff408bd41ae1a24b41e6c840f8c3b6f18e686..b9de4631dbd7831320da01bb1c1384bc215b79df 100644 --- a/emper/UnboundedBlockingMpscQueue.hpp +++ b/emper/UnboundedBlockingMpscQueue.hpp @@ -33,13 +33,14 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { // Therefore if we tryToGet a new 't' while 't' is available the current 't' // would be overridden and dropped. assert(!tPopped); + std::lock_guard<std::mutex> lock(queueMutex); - if (!mpscQueue.empty()) { - t = mpscQueue.front(); - mpscQueue.pop(); - tPopped = true; - postRetrieve(); - } + if (mpscQueue.empty()) return; + + t = mpscQueue.front(); + mpscQueue.pop(); + tPopped = true; + postRetrieve(); } public: @@ -54,7 +55,7 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { // Micro optimization, see if there is a blocked context // before performing the atomic exchange operation. - if (blockedContext.load() != nullptr) { + if (blockedContext != nullptr) { tryToWakeupBlockedContext<callerEnvironment>(); } } @@ -82,10 +83,16 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { }); if (!tPopped) { std::lock_guard<std::mutex> lock(queueMutex); + // If we observed tPopped to be false without holding the + // queueMutex, then the same must be true when holding the + // mutex. + assert(!tPopped); + // If 't' isn't already set, then mspcQueue.get() MUST // return an element. Note that a non-lineralizabe // queue may break this invariant. assert(!mpscQueue.empty()); + t = mpscQueue.front(); mpscQueue.pop(); postRetrieve();