diff --git a/emper/UnboundedBlockingMpscQueue.hpp b/emper/UnboundedBlockingMpscQueue.hpp index b9de4631dbd7831320da01bb1c1384bc215b79df..9df3fdb02e166ec812266423dd91fc650939c748 100644 --- a/emper/UnboundedBlockingMpscQueue.hpp +++ b/emper/UnboundedBlockingMpscQueue.hpp @@ -68,6 +68,7 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { if (!tPopped) { Context* context = Context::getCurrentContext(); + block_again: block([this, context, postRetrieve] { tryToGetElement(postRetrieve); @@ -82,15 +83,27 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { } }); if (!tPopped) { - std::lock_guard<std::mutex> lock(queueMutex); + std::unique_lock<std::mutex> lock(queueMutex); // If we observed tPopped to be false without holding the // queueMutex, then the same must be true when holding the // mutex. assert(!tPopped); - // If 't' isn't already set, then mspcQueue.get() MUST - // return an element. Note that a non-lineralizabe - // queue may break this invariant. + // Handle spurious wake-ups caused by the non atomic push and wake-up pair + // in put(T). + // Between the push and the wake-up a successful get() can consume the pushed + // element and a second get() could already be blocked because the queue is now empty. + // This blocked context is then unblocked by the put() but the queue is empty. + // When encountering an empty Queue after a wake-up we will block again. + + // TODO: It could be interesting to investigate if immediately retrying to + // pop using tryToGetElement (line 67) is generally better than to immediately block + // again (line 71) which causes a tryToGetElement call in the block hook. + if (unlikely(mpscQueue.empty())) { + lock.unlock(); + goto block_again; + } + assert(!mpscQueue.empty()); t = mpscQueue.front(); diff --git a/tests/AlarmActorTest.cpp b/tests/AlarmActorTest.cpp index bb57ecbfe95a3922701583cf1be28c6e5ccfed44..d3c5e340e4236e6009957958872eab559f8ad5ca 100644 --- a/tests/AlarmActorTest.cpp +++ b/tests/AlarmActorTest.cpp @@ -3,6 +3,7 @@ #include <cstdlib> // for exit, EXIT_FAILURE, EXIT_SUC... #include <functional> // bind #include <iostream> // for operator<<, basic_ostream +#include <thread> // for hardware_concurrency #include "Actor.hpp" // for Actor #include "BinaryPrivateSemaphore.hpp" // for BPS @@ -21,7 +22,7 @@ class AlarmActor : public Actor<BPS*> { }; auto main(int argc, char* argv[]) -> int { - unsigned int sleeper_count = 10; + unsigned int sleeper_count = std::thread::hardware_concurrency(); unsigned int sleeps = 1000; if (argc > 3) { diff --git a/tests/UnblockOnMainActorTest.cpp b/tests/UnblockOnMainActorTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..43c9def73ede3cdefd186a53325bc874686e853f --- /dev/null +++ b/tests/UnblockOnMainActorTest.cpp @@ -0,0 +1,114 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Fischer +#include <atomic> // for atomic +#include <cstdlib> // for exit, EXIT_FAILURE, EXIT_SUC... +#include <ctime> // for nanosleep, timespec +#include <functional> // bind +#include <iostream> // for operator<<, basic_ostream +#include <thread> // for this_thread + +#include "Actor.hpp" // for Actor +#include "BinaryPrivateSemaphore.hpp" // for BPS +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "Fiber.hpp" // for Fiber +#include "Runtime.hpp" // for Runtime +#include "emper.hpp" // for spawn +#include "lib/adt/LockedUnboundedQueue.hpp" + +/* + * This test triggered a spurious wakeup bug in the UnboundedBlockingMpscQueue + * of the AlarmActor which was not triggered by the AlarmActorTest. + * + * Although we are not completely sure why the signaling main thread is needed + * we assume that the shorter AlarmActor::receive function or different scheduling + * caused by the main thread trigger the spurious wakeup in UnboundedBlockingMpscQueue::get + */ + +lib::adt::LockedUnboundedQueue<BPS> wakeupQueue; +std::atomic<bool> quit = false; + +class AlarmActor : public Actor<BPS*> { + protected: + void receive(BPS* sem) override { wakeupQueue.enqueue(sem); } + + public: + AlarmActor(Runtime& runtime) : Actor(runtime) {} + void stop() { Actor::stop(); } +}; + +auto main(int argc, char* argv[]) -> int { + unsigned int sleeper_count = std::thread::hardware_concurrency(); + unsigned int sleeps = 1000; + + if (argc > 3) { + std::cerr << "Usage: " << argv[0] << " [fiber count] [block count]" << std::endl; + exit(EXIT_FAILURE); + } + + const int DECIMAL = 10; + if (argc > 1) { + sleeper_count = strtol(argv[1], nullptr, DECIMAL); + } + + if (argc > 2) { + sleeps = strtol(argv[2], nullptr, DECIMAL); + } + + Runtime runtime; + + Fiber* fiber = Fiber::from([&] { + AlarmActor alarmActor(runtime); + alarmActor.start(); + + CPS cps; + for (unsigned int i = 0; i < sleeper_count; ++i) { + spawn( + [&alarmActor, &sleeps] { + for (unsigned int i = 1; i <= sleeps; ++i) { + BPS sem; + alarmActor.tell(&sem); + sem.wait(); + } + }, + cps); + } + + // Wait for the sleeping fibers to finish + cps.wait(); + + // Wait for the actor to become idle. + bool actorIdle = alarmActor.waitUntilIdle(60 * 1000); + if (!actorIdle) { + std::cerr << "FAILURE: Actor did not went idle"; + quit = true; + exit(EXIT_FAILURE); + } + + alarmActor.stop(); + + quit = true; + exit(EXIT_SUCCESS); + }); + + runtime.scheduleFromAnywhere(*fiber); + + while (!quit) { + BPS* sem = wakeupQueue.dequeue(); + if (sem) { + sem->signalFromAnywhere(); + } else { + // clang-tidy-11 probably crashes when parsing C++ comparison in chrono.h + // See: https://bugs.llvm.org/show_bug.cgi?id=47768. + // std::this_thread::sleep_for(std::chrono::nanoseconds(1000)); + // -> use nanosleep directly + struct timespec duration; + duration.tv_sec = 0; + duration.tv_nsec = 100; + nanosleep(&duration, nullptr); + } + } + + runtime.waitUntilFinished(); + + return EXIT_FAILURE; +} diff --git a/tests/meson.build b/tests/meson.build index 0fdfdbf747ee599ce3705ead767c15eb501df333..d0ffdcedf306649eb522a02cb08c24c06881a1b6 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -35,6 +35,11 @@ tests = { 'description': 'Use an Actor to unblock fibers using BPS', }, + 'UnblockOnMainActorTest.cpp': + { + 'description': 'Slight modification of AlarmActorTest using the main thread to signal BPS', + }, + 'SimpleLawsTest.cpp': { 'description': 'Simple LAWS scheduling strategy test',