From f00a4fe288656eac729a86c1c321844aaed0c7ed Mon Sep 17 00:00:00 2001 From: Florian Schmaus <flow@cs.fau.de> Date: Tue, 2 Mar 2021 14:06:25 +0100 Subject: [PATCH 1/4] [Common] Add missing #include <sstream> --- emper/Common.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/emper/Common.hpp b/emper/Common.hpp index 72136a72..9dd9c61a 100644 --- a/emper/Common.hpp +++ b/emper/Common.hpp @@ -3,6 +3,7 @@ #pragma once #include <functional> +#include <sstream> // IWYU pragma: keep using func_t = std::function<void()>; -- GitLab From 5e8b9c95f071ac6e87b5e073264095e7d46a9f6e Mon Sep 17 00:00:00 2001 From: Florian Schmaus <flow@cs.fau.de> Date: Tue, 2 Mar 2021 14:10:38 +0100 Subject: [PATCH 2/4] [RwLockUnboundedQueue] Add missing #include "Common.hpp" --- emper/lib/adt/RwLockUnboundedQueue.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/emper/lib/adt/RwLockUnboundedQueue.hpp b/emper/lib/adt/RwLockUnboundedQueue.hpp index 0d60254e..8a305b37 100644 --- a/emper/lib/adt/RwLockUnboundedQueue.hpp +++ b/emper/lib/adt/RwLockUnboundedQueue.hpp @@ -7,6 +7,8 @@ #include <cstring> #include <queue> +#include "Common.hpp" + namespace lib::adt { static void aquire_wrlock(pthread_rwlock_t& lock) { -- GitLab From b6f1dbc69662b0d628880c36b222ad564dfe23ff Mon Sep 17 00:00:00 2001 From: Florian Schmaus <flow@cs.fau.de> Date: Tue, 2 Mar 2021 14:15:57 +0100 Subject: [PATCH 3/4] [Emper] Fix include in Emper.hpp --- emper/Emper.cpp | 1 + emper/Emper.hpp | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/emper/Emper.cpp b/emper/Emper.cpp index 2935ae06..8c463274 100644 --- a/emper/Emper.cpp +++ b/emper/Emper.cpp @@ -2,6 +2,7 @@ // Copyright © 2020 Florian Schmaus #include "Emper.hpp" +#include "emper-common.h" #include "emper-version.h" auto emper::getFullVersion() -> std::string { return EMPER_FULL_VERSION; } diff --git a/emper/Emper.hpp b/emper/Emper.hpp index ee265179..d70bb6d7 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -2,10 +2,10 @@ // Copyright © 2020 Florian Schmaus #pragma once -#include <emper-config.h> - #include <string> +#include "emper-config.h" + namespace emper { static const bool STATS = -- GitLab From 8078cabc34bd93e585cc9d2dc2a7d2482c7a77f9 Mon Sep 17 00:00:00 2001 From: Florian Schmaus <flow@cs.fau.de> Date: Fri, 26 Feb 2021 14:49:21 +0100 Subject: [PATCH 4/4] Add support for clean EMPER runtime shutdowns --- emper/Context.cpp | 4 ++ emper/Context.hpp | 20 ++++++- emper/ContextManager.cpp | 3 +- emper/ContextManager.hpp | 2 +- emper/Dispatcher.cpp | 6 +- emper/Dispatcher.hpp | 2 - emper/Emper.cpp | 41 +++++++++++++- emper/Runtime.cpp | 40 +++++++++++--- emper/Runtime.hpp | 22 +++++++- emper/include/emper.hpp | 8 +++ tests/RuntimeDestroyTest.cpp | 103 +++++++++++++++++++++++++++++++++++ tests/meson.build | 7 +++ 12 files changed, 237 insertions(+), 21 deletions(-) create mode 100644 tests/RuntimeDestroyTest.cpp diff --git a/emper/Context.cpp b/emper/Context.cpp index e497a653..c3c0a58f 100644 --- a/emper/Context.cpp +++ b/emper/Context.cpp @@ -6,6 +6,10 @@ thread_local Context* Context::currentContext; +thread_local void* Context::originalStack; + +thread_local Context* Context::lastContextBeforeReturningToOriginalStack; + auto operator<<(std::ostream& strm, const Context& context) -> std::ostream& { strm << "Context " << &context << " [tos: " << context.tos << " bos: " << &context.context diff --git a/emper/Context.hpp b/emper/Context.hpp index 70727a12..a010c897 100644 --- a/emper/Context.hpp +++ b/emper/Context.hpp @@ -31,6 +31,10 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> { static thread_local Context* currentContext; + static thread_local void* originalStack; + + static thread_local Context* lastContextBeforeReturningToOriginalStack; + Fiber* currentFiber = nullptr; void* const tos; @@ -131,12 +135,22 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> { [[nodiscard]] inline auto getTos() const -> const void* { return tos; } /** - * Start this context. + * Start this context. This method is called from an unmanaged stack which is saved. */ - [[noreturn]] inline void start() { + inline auto start() -> Context* { LOGD("starting"); currentContext = this; - switch_context(&savedStackpointer); + + save_and_switch_context(&savedStackpointer, &originalStack); + + return lastContextBeforeReturningToOriginalStack; + } + + [[noreturn]] static void switchToOriginalStack() { + // Can't use new context hook to communicate the context to free. + lastContextBeforeReturningToOriginalStack = currentContext; + currentContext = nullptr; + switch_and_load_context(&originalStack); } /** diff --git a/emper/ContextManager.cpp b/emper/ContextManager.cpp index 6cf7d76c..b923c1b8 100644 --- a/emper/ContextManager.cpp +++ b/emper/ContextManager.cpp @@ -48,7 +48,8 @@ void ContextManager::putFreeContext(Context* context) { void ContextManager::start() { Context* freeContext = getFreeContext(); - freeContext->start(); + freeContext = freeContext->start(); + putFreeContext(freeContext); } /** diff --git a/emper/ContextManager.hpp b/emper/ContextManager.hpp index 57dcaaa7..5262f76f 100644 --- a/emper/ContextManager.hpp +++ b/emper/ContextManager.hpp @@ -26,7 +26,7 @@ class ContextManager static void putFreeContext(Context* context); - [[noreturn]] void start(); + void start(); void saveAndStartNew(func_t freshContextHook); diff --git a/emper/Dispatcher.cpp b/emper/Dispatcher.cpp index 810e399f..0a5e4a44 100644 --- a/emper/Dispatcher.cpp +++ b/emper/Dispatcher.cpp @@ -11,11 +11,11 @@ auto Dispatcher::getDispatchLoop() -> func_t { return [this] { dispatchLoop(); }; } -void Dispatcher::putRuntimeWorkerToSleep() { runtime.dispatchLoopSleep(); } - void Dispatcher::dispatchLoopDoSleep() { + runtime.maybeTerminateWorker(); + if constexpr (emper::WORKER_SLEEP) { - putRuntimeWorkerToSleep(); + runtime.dispatchLoopSleep(); } else { pthread_yield(); } diff --git a/emper/Dispatcher.hpp b/emper/Dispatcher.hpp index c3b52427..4aca48c4 100644 --- a/emper/Dispatcher.hpp +++ b/emper/Dispatcher.hpp @@ -42,8 +42,6 @@ class Dispatcher : public Logger<LogSubsystem::DISP> { virtual void recycle(Fiber* fiber) { delete fiber; } - void putRuntimeWorkerToSleep(); - public: Dispatcher(Runtime& runtime) : runtime(runtime) {} diff --git a/emper/Emper.cpp b/emper/Emper.cpp index 8c463274..afa74f41 100644 --- a/emper/Emper.cpp +++ b/emper/Emper.cpp @@ -2,7 +2,46 @@ // Copyright © 2020 Florian Schmaus #include "Emper.hpp" +#include <stdexcept> + +#include "Runtime.hpp" #include "emper-common.h" #include "emper-version.h" -auto emper::getFullVersion() -> std::string { return EMPER_FULL_VERSION; } +namespace emper { + +auto getFullVersion() -> std::string { return EMPER_FULL_VERSION; } + +static void ensure_no_current_runtime() { + Runtime* runtime = Runtime::getRuntime(); + if (runtime) { + throw std::runtime_error("Runtime already initialized"); + } +} + +void init_runtime() { + ensure_no_current_runtime(); + + // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks) + new Runtime(); + // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks) +} + +void init_runtime(workerid_t worker_count) { + ensure_no_current_runtime(); + + // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks) + new Runtime(worker_count); + // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks) +} + +void destroy_runtime() { + Runtime* runtime = Runtime::getRuntime(); + if (!runtime) { + throw std::runtime_error("No runtime to destroy"); + } + + delete runtime; +} + +} // namespace emper diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index c9ec2e99..5c8eb146 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -156,6 +156,8 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory if (errno) DIE_MSG_ERRNO("pthread_create() failed"); } + threadsRunning = true; + if constexpr (emper::STATS) { int res = std::atexit(&printLastRuntimeStats); if (res) { @@ -173,13 +175,10 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory Runtime::~Runtime() { DBG("Runtime " << this << " is terminating"); - for (workerid_t i = 0; i < workerCount; ++i) { - DBG("Runtime " << this << " is cancelling worker " << unsigned(i)); - errno = pthread_cancel(threads[i]); - if (errno) { - DIE_MSG_ERRNO("pthread_cancel() failed"); - } + if (threadsRunning) { + initiateAndWaitUntilTermination(); } + for (unsigned int i = 0; i < workerCount; ++i) { delete workers[i]; } @@ -225,7 +224,8 @@ auto Runtime::workerLoop(Worker* worker) -> void* { contextManager.start(); - abort(); + // Threads return here if Context::switchToOriginalStack() is called. + return nullptr; } @@ -269,10 +269,34 @@ auto Runtime::nextFiber() -> NextFiberResult { return scheduler.nextFiber(); } +void Runtime::initiateTermination() { + terminateWorkers.store(true, std::memory_order_release); + + wakeupSem.notify_many(workerCount); + + if constexpr (emper::IO) { + globalIo->initiateTermination(); + } +} + void Runtime::waitUntilFinished() { for (workerid_t i = 0; i < workerCount; ++i) { - pthread_join(threads[i], nullptr); + errno = pthread_join(threads[i], nullptr); + if (errno) { + DIE_MSG_ERRNO("pthread_join() returned error for worker ID " << std::to_string(i)); + } + } + + if constexpr (emper::IO) { + globalIo->waitUntilFinished(); } + + threadsRunning = false; +} + +void Runtime::initiateAndWaitUntilTermination() { + initiateTermination(); + waitUntilFinished(); } void Runtime::printStats() { diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 787c7c11..0b5d4c8d 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -4,6 +4,7 @@ #include <pthread.h> // for pthread_t +#include <atomic> #include <cassert> // for assert #include <cstdint> // for intptr_t #include <cstdlib> // for abort @@ -14,8 +15,9 @@ #include "CallerEnvironment.hpp" #include "Common.hpp" // for ALIGN_TO_CACHE_LINE -#include "Debug.hpp" // for LogSubsystem, LogSubsystem::RUNTI, Logger -#include "Emper.hpp" // for WORKER_NOTIFY +#include "Context.hpp" +#include "Debug.hpp" +#include "Emper.hpp" // for WORKER_NOTIFY #include "NextFiberResult.hpp" #include "Scheduler.hpp" // for Scheduler #include "Worker.hpp" @@ -61,6 +63,10 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { std::default_random_engine randomEngine; std::uniform_int_distribution<unsigned int> uniformIntDistribution; + std::atomic<bool> terminateWorkers = false; + + bool threadsRunning = false; + auto workerLoop(Worker* worker) -> void*; ALIGN_TO_CACHE_LINE WorkerWakeupSemaphore wakeupSem; @@ -128,6 +134,14 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { } } + void maybeTerminateWorker() { + if (terminateWorkers.load(std::memory_order_relaxed)) { + LOGD("Worker terminating"); + // Switch to the original stack, which will simply terminate the thread. + Context::switchToOriginalStack(); + } + } + void dispatchLoopSleep() { wakeupSem.wait(); } public: @@ -183,8 +197,12 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { inline auto getStrategy() -> RuntimeStrategy& { return *strategy; } + void initiateTermination(); + void waitUntilFinished(); + void initiateAndWaitUntilTermination(); + void printStats(); static auto inRuntime() -> bool { return Worker::isWorkerThread(); } diff --git a/emper/include/emper.hpp b/emper/include/emper.hpp index c5691e23..eef06738 100644 --- a/emper/include/emper.hpp +++ b/emper/include/emper.hpp @@ -63,6 +63,13 @@ void spawn(Fiber::fiber_fun0_t function, workeraffinity_t* affinity, S& semaphor } namespace emper { + +void init_runtime(); + +void init_runtime(workerid_t worker_count); + +void destroy_runtime(); + void yield() { Runtime* runtime = Runtime::getRuntime(); runtime->yield(); @@ -79,4 +86,5 @@ auto sleep(unsigned int seconds) -> bool { return res == -ETIME; } + } // namespace emper diff --git a/tests/RuntimeDestroyTest.cpp b/tests/RuntimeDestroyTest.cpp new file mode 100644 index 00000000..6b1d6308 --- /dev/null +++ b/tests/RuntimeDestroyTest.cpp @@ -0,0 +1,103 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus +#include <cstdlib> // for abort, exit, EXIT_SUCCESS +#include <iostream> // for operator<<, basic_ostream::o... + +#include "BinaryPrivateSemaphore.hpp" // for BPS +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "Debug.hpp" // for WDBG +#include "Fiber.hpp" // for Fiber +#include "PrivateSemaphore.hpp" // for PS +#include "Runtime.hpp" // for Runtime +#include "emper-common.h" // for UNUSED_ARG +#include "emper.hpp" // for async +#include "lib/sync/Semaphore.hpp" + +using fibParams = struct { + int n; + int* result; + PS* sem; +}; + +static void fib(void* voidParams) { + auto* params = static_cast<fibParams*>(voidParams); + int n = params->n; + int* result = params->result; + if (!result) { + std::cerr << "voidParams: " << voidParams << " n: " << params->n << " sem: " << params->sem + << std::endl; + abort(); + } + PS* sem = params->sem; + + if (n < 2) { + *result = n; + } else { + CPS newSem(2); + + int a, b; + + fibParams newParams1; + newParams1.n = n - 1; + newParams1.result = &a; + newParams1.sem = &newSem; + fibParams newParams2; + newParams2.n = n - 2; + newParams2.result = &b; + newParams2.sem = &newSem; + + Fiber* f1 = Fiber::from(&fib, &newParams1); + Fiber* f2 = Fiber::from(&fib, &newParams2); + + Runtime* runtime = Runtime::getRuntime(); + runtime->schedule(*f1); + runtime->schedule(*f2); + + WDBG("fib: Calling wait for n=" << n); + newSem.wait(); + + *result = a + b; + } + + WDBG("fib: Calling signalAndExit for n=" << n); + sem->signalAndExit(); +} + +static emper::lib::sync::Semaphore exit_semaphore; + +static void fibKickoff() { + const int fibNum = 2; + int result; + BPS sem; + fibParams params = {fibNum, &result, &sem}; + + Fiber* fibFiber = Fiber::from(fib, ¶ms); + async(fibFiber); + + sem.wait(); + + std::cout << "fib(" << fibNum << ") = " << result << std::endl; + exit_semaphore.notify(); +} + +auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { + // const unsigned nthreads = std::thread::hardware_concurrency(); + const unsigned nthreads = 2; + + std::cout << "Number of threads: " << nthreads << std::endl; + + Runtime runtime(nthreads); + + Fiber* fibFiber = Fiber::from(&fibKickoff); + + std::cout << "Just alloacted alpha fiber at " << fibFiber << std::endl; + + runtime.scheduleFromAnywhere(*fibFiber); + + exit_semaphore.wait(); + + // Once Runtime goes out of scope, it's desctructor will be called, + // which, in turn, terminates the runtime, and hence causes the + // behavior this test is checking. + return 0; +} diff --git a/tests/meson.build b/tests/meson.build index 52662f38..780d3bdb 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -19,6 +19,13 @@ tests = { 'is_parallel': true, }, + 'RuntimeDestroyTest.cpp': + { + 'description': 'Test runtime destruction', + 'test_suite': 'smoke', + 'is_parallel': true, + }, + 'c_api_test.c': { 'description': 'Test EMPER\'s C API', -- GitLab