From c92d529a6b533426af74bb3203700da4f17d1761 Mon Sep 17 00:00:00 2001 From: Florian Schmaus <flow@cs.fau.de> Date: Wed, 19 Feb 2020 19:10:03 +0100 Subject: [PATCH] Ensure that the current fiber is recycled in discardAndResume() The current Fiber is now always stored in the context, not just on debug builds. This also means that we can remove the currentFiber thread local variable, as using only a thread local storage would yield wrong results in case a blocked context is resumed on another worker thread. --- emper/Context.hpp | 24 ++++++++++++++++++--- emper/ContextManager.cpp | 9 ++++++++ emper/Dispatcher.cpp | 2 -- emper/Dispatcher.hpp | 27 ++++++------------------ emper/strategies/laws/LawsDispatcher.cpp | 12 ++++++++--- emper/strategies/laws/LawsDispatcher.hpp | 3 +++ emper/strategies/ws/WsDispatcher.cpp | 2 +- 7 files changed, 50 insertions(+), 29 deletions(-) diff --git a/emper/Context.hpp b/emper/Context.hpp index e1bbd7ce..3428ccc2 100644 --- a/emper/Context.hpp +++ b/emper/Context.hpp @@ -14,6 +14,8 @@ #include "Debug.hpp" // for LOGD, LogSubsystem, LogSubsystem::C, Logger #include "Emper.hpp" // for Emper::DEBUG +class ContextManager; +class Dispatcher; class Fiber; extern "C" [[noreturn]] void switch_and_load_context(void** toTos); @@ -29,7 +31,7 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> { static thread_local Context* currentContext; - const Fiber* fiber = nullptr; + Fiber* currentFiber = nullptr; void* const tos; @@ -49,6 +51,10 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> { friend auto operator<<(std::ostream&, const Context&) -> std::ostream&; + friend ContextManager; + + auto getFiber() -> Fiber* { return currentFiber; } + /** * The first function that a newly started context will * execute. This function simply first runs the pre main hook and @@ -60,6 +66,20 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> { context->mainFunction(); } + friend Dispatcher; + + static void setCurrentFiber(Fiber* fiber) { + assert(currentContext); + + currentContext->currentFiber = fiber; + } + + static auto getCurrentFiber() -> Fiber* { + assert(currentContext); + + return currentContext->currentFiber; + } + public: // cppcheck-suppress noExplicitConstructor selfInitialization Context(func_t mainFunction) @@ -108,8 +128,6 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> { inline void setHook(func_t hook) { startAndResumeHook = std::move(hook); } - inline void setFiber(const Fiber* fiber) { this->fiber = fiber; } - [[nodiscard]] inline auto getTos() const -> const void* { return tos; } /** diff --git a/emper/ContextManager.cpp b/emper/ContextManager.cpp index 552b6ae0..6eec4d55 100644 --- a/emper/ContextManager.cpp +++ b/emper/ContextManager.cpp @@ -10,6 +10,8 @@ #include "Dispatcher.hpp" // for Dispatcher #include "Runtime.hpp" // for Runtime +class Fiber; + ContextManager::ContextManager(Runtime& runtime) : MemoryManager(runtime), runtime(runtime) { auto newWorkerHook = [this]() { for (unsigned int i = 0; i < CONTEXT_MANAGER_FIRST_LAYER_QUEUE_SIZE * 2; ++i) { @@ -78,5 +80,12 @@ void ContextManager::discardAndResume(Context* context) { LOGD("Freeing context " << contextToFree); putFreeContext(contextToFree); }); + + // Since we are going to discard this context, it will never reach + // the end of its dispatch loop, and hence we need to ensure that + // its fiber is recycled. + Fiber* currentFiber = contextToFree->getFiber(); + runtime.dispatcher.recycle(currentFiber); + contextToFree->discardAndResume(context); } diff --git a/emper/Dispatcher.cpp b/emper/Dispatcher.cpp index 0cc1fb33..5d05163d 100644 --- a/emper/Dispatcher.cpp +++ b/emper/Dispatcher.cpp @@ -7,8 +7,6 @@ #include "Emper.hpp" #include "Runtime.hpp" // for Runtime -thread_local const Fiber* Dispatcher::currentFiber; - auto Dispatcher::getDispatchLoop() -> func_t { return [this] { dispatchLoop(); }; } diff --git a/emper/Dispatcher.hpp b/emper/Dispatcher.hpp index 6b33d732..175240c5 100644 --- a/emper/Dispatcher.hpp +++ b/emper/Dispatcher.hpp @@ -2,12 +2,9 @@ // Copyright © 2020 Florian Schmaus #pragma once -#include <cassert> // for assert - #include "Common.hpp" // for func_t #include "Context.hpp" // for Context #include "Debug.hpp" // for LOGD, LogSubsystem, LogSubsystem::DISP -#include "Emper.hpp" // for Emper::DEBUG #include "Fiber.hpp" // for Fiber #include "emper-common.h" // for workeraffinity_t @@ -16,8 +13,6 @@ class ContextManager; class Dispatcher : public Logger<LogSubsystem::DISP> { protected: - static thread_local const Fiber* currentFiber; - Runtime& runtime; void dispatchLoopDoSleep(); @@ -29,14 +24,9 @@ class Dispatcher : public Logger<LogSubsystem::DISP> { // The dispatch() method could theoretically be made static in // non-debug builds. // NOLINTNEXTLINE(readability-convert-member-functions-to-static) - inline void dispatch(const Fiber* fiber) { + inline void dispatch(Fiber* fiber) { LOGD("executing fiber " << fiber); - currentFiber = fiber; - - if constexpr (emper::DEBUG) { - Context::getCurrentContext()->setFiber(fiber); - } - + Context::setCurrentFiber(fiber); fiber->run(); } @@ -50,24 +40,21 @@ class Dispatcher : public Logger<LogSubsystem::DISP> { return fiber->doAtomicDecrRefCount(); } - static inline void recycle(const Fiber* fiber) { delete fiber; } + virtual void recycle(Fiber* fiber) { delete fiber; } void putRuntimeWorkerToSleep(); public: Dispatcher(Runtime& runtime) : runtime(runtime) {} - static auto getCurrentFiber() -> const Fiber& { - const Fiber* fiber = getCurrentFiberPtr(); + static auto getCurrentFiber() -> Fiber& { + Fiber* fiber = getCurrentFiberPtr(); return *fiber; } - static auto getCurrentFiberPtr() -> const Fiber* { - assert(currentFiber); - return currentFiber; - } + static auto getCurrentFiberPtr() -> Fiber* { return Context::getCurrentFiber(); } - static auto isDispatchedControlFlow() -> bool { return currentFiber != nullptr; } + static auto isDispatchedControlFlow() -> bool { return getCurrentFiberPtr() != nullptr; } friend ContextManager; }; diff --git a/emper/strategies/laws/LawsDispatcher.cpp b/emper/strategies/laws/LawsDispatcher.cpp index 2ece84a4..6930783d 100644 --- a/emper/strategies/laws/LawsDispatcher.cpp +++ b/emper/strategies/laws/LawsDispatcher.cpp @@ -10,6 +10,14 @@ #include "LawsStrategy.hpp" // for LawsStrategy, LawsStrategy::FiberSource #include "Runtime.hpp" +void LawsDispatcher::recycle(Fiber* fiber) { + // If the ref count has not reached zero yet, do not recycle the + // fiber. + if (decreaseRefCount(fiber)) return; + + Dispatcher::recycle(fiber); +} + void LawsDispatcher::dispatchLoop() { while (true) { Fiber* const fiber = runtime.nextFiber(); @@ -51,8 +59,6 @@ void LawsDispatcher::dispatchLoop() { if (affinity) *affinity = Runtime::getWorkerId(); } - if (decreaseRefCount(fiber) == 0) { - recycle(fiber); - } + recycle(fiber); } } diff --git a/emper/strategies/laws/LawsDispatcher.hpp b/emper/strategies/laws/LawsDispatcher.hpp index 32b64995..fec816a1 100644 --- a/emper/strategies/laws/LawsDispatcher.hpp +++ b/emper/strategies/laws/LawsDispatcher.hpp @@ -5,6 +5,7 @@ #include "Dispatcher.hpp" #include "emper-common.h" +class Fiber; class LawsStrategy; class Runtime; @@ -19,6 +20,8 @@ class LawsDispatcher : public Dispatcher { ; #pragma GCC diagnostic pop + void recycle(Fiber* fiber) override; + public: LawsDispatcher(Runtime& runtime, LawsStrategy& lawsStrategy) : Dispatcher(runtime), lawsStrategy(lawsStrategy) {} diff --git a/emper/strategies/ws/WsDispatcher.cpp b/emper/strategies/ws/WsDispatcher.cpp index 9e3394ac..74182792 100644 --- a/emper/strategies/ws/WsDispatcher.cpp +++ b/emper/strategies/ws/WsDispatcher.cpp @@ -8,7 +8,7 @@ class Fiber; void WsDispatcher::dispatchLoop() { while (true) { - const Fiber* fiber = runtime.nextFiber(); + Fiber* fiber = runtime.nextFiber(); if (!fiber) { dispatchLoopDoSleep(); -- GitLab