diff --git a/emper/Dispatcher.hpp b/emper/Dispatcher.hpp index 9b2d390ee9b297d3770e26c52bee4dd1142f90ac..78d75fff7e166d1212e7b8cf95a748e66dad3ddb 100644 --- a/emper/Dispatcher.hpp +++ b/emper/Dispatcher.hpp @@ -30,7 +30,12 @@ class Dispatcher : public Logger<LogSubsystem::DISP> { fiber->run(); } - static inline auto isRunnable(Fiber* fiber) -> bool { return fiber->setRunnableFalse(); } + static inline auto isRunnable(Fiber* fiber) -> bool { + if (fiber->isMultiFiber()) { + return fiber->setRunnableFalse(); + } + return true; + } static inline auto getAffinityBuffer(Fiber* fiber) -> workeraffinity_t* { return fiber->getAffinityBuffer(); diff --git a/emper/Fiber.hpp b/emper/Fiber.hpp index 9f6965f9e2526c26e637bdb9fb2e378ac54e3e61..8fd61a78399f4143b4b79f33ffbf17325f76b47b 100644 --- a/emper/Fiber.hpp +++ b/emper/Fiber.hpp @@ -36,6 +36,12 @@ class ALIGN_TO_CACHE_LINE Fiber : public Logger<LogSubsystem::F> { const fiber_fun_t function; void* const arg; + /** + * Denotes if this is a multi Fiber, i.e. a Fiber that is placed in + * multiple ready-lists, instead of just one. + */ + bool isMulti = false; + std::atomic<bool> runnable = {true}; ALIGN_TO_CACHE_LINE std::atomic_uint referenceCounter = {1}; @@ -85,14 +91,23 @@ class ALIGN_TO_CACHE_LINE Fiber : public Logger<LogSubsystem::F> { return runnable.exchange(false); } - inline auto doAtomicIncrRefCount() -> unsigned int { + inline void doAtomicIncrRefCount() { assert(referenceCounter < UINT_MAX); - return ++referenceCounter; + isMulti = true; + // Note: Although the first impulse is to prevent the re-ordering + // of the isMulti store after the reference counter modification, + // this *should* not be necessary. The sychronization point where + // this ordering is guranteed is, when the fiber is put in a + // concurrent queue. There the queue must gurantee that all + // previous stores are visible at the time the fiber appears in + // the queue to other threads. + referenceCounter.fetch_add(1, std::memory_order_relaxed); } inline auto doAtomicDecrRefCount() -> unsigned int { assert(referenceCounter > 0); - return --referenceCounter; + auto previous = referenceCounter.fetch_sub(1, std::memory_order_acq_rel); + return previous - 1; } template <LogSubsystem> @@ -114,6 +129,8 @@ class ALIGN_TO_CACHE_LINE Fiber : public Logger<LogSubsystem::F> { [[nodiscard]] auto isRunnable() const -> bool { return runnable; } + [[nodiscard]] auto isMultiFiber() const -> bool { return isMulti; } + friend auto operator<<(std::ostream& strm, const Fiber& fiber) -> std::ostream&; static inline auto from(fiber_fun_t function, void* arg) -> Fiber* { diff --git a/emper/strategies/laws/LawsDispatcher.cpp b/emper/strategies/laws/LawsDispatcher.cpp index 12af9af83e266482135cbeebef50dd60981b5604..c99d4d3ffbb146c779f58df57f22a42f77be3557 100644 --- a/emper/strategies/laws/LawsDispatcher.cpp +++ b/emper/strategies/laws/LawsDispatcher.cpp @@ -4,6 +4,7 @@ #include "Common.hpp" // for DIE_MSG #include "Emper.hpp" +#include "Fiber.hpp" #include "LawsStrategy.hpp" // for LawsStrategy, LawsStrategy::FiberSource #include "NextFiberResult.hpp" #include "Runtime.hpp" @@ -12,8 +13,10 @@ void LawsDispatcher::recycle(Fiber* fiber) { // If the ref count has not reached zero yet, do not recycle the - // fiber. - if (decreaseRefCount(fiber)) return; + // fiber. But only if the fiber is a multi fiber, i.e. was placed + // in multiple queues in the first place, and hence needed to be + // refcounted. + if (fiber->isMultiFiber() && decreaseRefCount(fiber)) return; Dispatcher::recycle(fiber); }