Skip to content
Snippets Groups Projects
Commit 625dc0f3 authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Merge branch 'fix_actor_uninitialized_unblock' into 'master'

fix Actor and UnboundedBlockingMpscQueue

See merge request i4/manycore/emper!9
parents 72898ff1 bdfc155c
No related branches found
No related tags found
No related merge requests found
......@@ -23,6 +23,10 @@ private:
UnboundedBlockingMpscQueue<T> queue;
bool switchState(State oldState, State newState) {
return state.compare_exchange_strong(oldState, newState, std::memory_order_acq_rel);
}
void setState(State newState) {
state.store(newState, std::memory_order_release);
}
......@@ -30,16 +34,19 @@ private:
void actorLoop() {
setState(Running);
while (state == Running) {
setState(Retrieving);
while (state.load(std::memory_order_acquire) == Running) {
if (!switchState(Running, Retrieving)) {
// Someone else must have set our state to Stopped
break;
}
T t = queue.get([this] {
setState(Running);
// Prevent lost stop()
switchState(Retrieving, Running);
});
receive(t);
}
setState(Stopped);
}
protected:
......
......@@ -11,7 +11,7 @@ class UnboundedBlockingMpscQueue : protected Logger<LogSubsystem::U_B_MPSC_Q>
, protected Blockable {
private:
std::atomic<Context*> blockedContext;
std::atomic<Context*> blockedContext = nullptr;
bool tPopped;
T t;
......@@ -27,6 +27,10 @@ private:
}
void tryToGetElement(std::function<void(void)> postRetrieve) {
// tPopped indicates that 't' is a popped and usable value.
// Therefore if we tryToGet a new 't' while 't' is available the current 't'
// would be overridden and dropped.
assert(!tPopped);
std::lock_guard<std::mutex> lock(queueMutex);
if (!mpscQueue.empty()) {
t = mpscQueue.front();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment