diff --git a/emper/Context.cpp b/emper/Context.cpp index e497a6530b10a671557b42186a17b33434d22ce0..c3c0a58fc56cfbad9da71dde046bef27a4b66372 100644 --- a/emper/Context.cpp +++ b/emper/Context.cpp @@ -6,6 +6,10 @@ thread_local Context* Context::currentContext; +thread_local void* Context::originalStack; + +thread_local Context* Context::lastContextBeforeReturningToOriginalStack; + auto operator<<(std::ostream& strm, const Context& context) -> std::ostream& { strm << "Context " << &context << " [tos: " << context.tos << " bos: " << &context.context diff --git a/emper/Context.hpp b/emper/Context.hpp index 70727a12a9baf83b1658c0db506ac070885f706e..a010c897ee5e023cbf16daa2198483be8488383d 100644 --- a/emper/Context.hpp +++ b/emper/Context.hpp @@ -31,6 +31,10 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> { static thread_local Context* currentContext; + static thread_local void* originalStack; + + static thread_local Context* lastContextBeforeReturningToOriginalStack; + Fiber* currentFiber = nullptr; void* const tos; @@ -131,12 +135,22 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> { [[nodiscard]] inline auto getTos() const -> const void* { return tos; } /** - * Start this context. + * Start this context. This method is called from an unmanaged stack which is saved. */ - [[noreturn]] inline void start() { + inline auto start() -> Context* { LOGD("starting"); currentContext = this; - switch_context(&savedStackpointer); + + save_and_switch_context(&savedStackpointer, &originalStack); + + return lastContextBeforeReturningToOriginalStack; + } + + [[noreturn]] static void switchToOriginalStack() { + // Can't use new context hook to communicate the context to free. + lastContextBeforeReturningToOriginalStack = currentContext; + currentContext = nullptr; + switch_and_load_context(&originalStack); } /** diff --git a/emper/ContextManager.cpp b/emper/ContextManager.cpp index 6cf7d76cc88d186f8c5f36ec8290292998e5ade2..b923c1b86321601b6364e5c105cf3861b72a0c3d 100644 --- a/emper/ContextManager.cpp +++ b/emper/ContextManager.cpp @@ -48,7 +48,8 @@ void ContextManager::putFreeContext(Context* context) { void ContextManager::start() { Context* freeContext = getFreeContext(); - freeContext->start(); + freeContext = freeContext->start(); + putFreeContext(freeContext); } /** diff --git a/emper/ContextManager.hpp b/emper/ContextManager.hpp index 57dcaaa7c8a09ad240cedc80c6e8a694e1493df7..5262f76f68431003c14034112b06febeb8efdcb0 100644 --- a/emper/ContextManager.hpp +++ b/emper/ContextManager.hpp @@ -26,7 +26,7 @@ class ContextManager static void putFreeContext(Context* context); - [[noreturn]] void start(); + void start(); void saveAndStartNew(func_t freshContextHook); diff --git a/emper/Dispatcher.cpp b/emper/Dispatcher.cpp index 810e399f4eb9d9ab72a1f13f90429a8f7a8669ae..0a5e4a44a13299aa6a708a1ce3f98f334079e7fd 100644 --- a/emper/Dispatcher.cpp +++ b/emper/Dispatcher.cpp @@ -11,11 +11,11 @@ auto Dispatcher::getDispatchLoop() -> func_t { return [this] { dispatchLoop(); }; } -void Dispatcher::putRuntimeWorkerToSleep() { runtime.dispatchLoopSleep(); } - void Dispatcher::dispatchLoopDoSleep() { + runtime.maybeTerminateWorker(); + if constexpr (emper::WORKER_SLEEP) { - putRuntimeWorkerToSleep(); + runtime.dispatchLoopSleep(); } else { pthread_yield(); } diff --git a/emper/Dispatcher.hpp b/emper/Dispatcher.hpp index c3b524275cf29e5f857852a2aea3c3e9dbd17107..4aca48c416e84ec37313986723ae880201e5e88c 100644 --- a/emper/Dispatcher.hpp +++ b/emper/Dispatcher.hpp @@ -42,8 +42,6 @@ class Dispatcher : public Logger<LogSubsystem::DISP> { virtual void recycle(Fiber* fiber) { delete fiber; } - void putRuntimeWorkerToSleep(); - public: Dispatcher(Runtime& runtime) : runtime(runtime) {} diff --git a/emper/Emper.cpp b/emper/Emper.cpp index 8c463274f85bd770475e3ebe9658f49e5478f9df..afa74f412051091a7cd773221dcd640c94777176 100644 --- a/emper/Emper.cpp +++ b/emper/Emper.cpp @@ -2,7 +2,46 @@ // Copyright © 2020 Florian Schmaus #include "Emper.hpp" +#include <stdexcept> + +#include "Runtime.hpp" #include "emper-common.h" #include "emper-version.h" -auto emper::getFullVersion() -> std::string { return EMPER_FULL_VERSION; } +namespace emper { + +auto getFullVersion() -> std::string { return EMPER_FULL_VERSION; } + +static void ensure_no_current_runtime() { + Runtime* runtime = Runtime::getRuntime(); + if (runtime) { + throw std::runtime_error("Runtime already initialized"); + } +} + +void init_runtime() { + ensure_no_current_runtime(); + + // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks) + new Runtime(); + // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks) +} + +void init_runtime(workerid_t worker_count) { + ensure_no_current_runtime(); + + // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks) + new Runtime(worker_count); + // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks) +} + +void destroy_runtime() { + Runtime* runtime = Runtime::getRuntime(); + if (!runtime) { + throw std::runtime_error("No runtime to destroy"); + } + + delete runtime; +} + +} // namespace emper diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index c9ec2e99cb39f9aefea6f6e7ef42c5a07bfe5cc2..5c8eb1463427b6cbe504db49a080330a1b2105c6 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -156,6 +156,8 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory if (errno) DIE_MSG_ERRNO("pthread_create() failed"); } + threadsRunning = true; + if constexpr (emper::STATS) { int res = std::atexit(&printLastRuntimeStats); if (res) { @@ -173,13 +175,10 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory Runtime::~Runtime() { DBG("Runtime " << this << " is terminating"); - for (workerid_t i = 0; i < workerCount; ++i) { - DBG("Runtime " << this << " is cancelling worker " << unsigned(i)); - errno = pthread_cancel(threads[i]); - if (errno) { - DIE_MSG_ERRNO("pthread_cancel() failed"); - } + if (threadsRunning) { + initiateAndWaitUntilTermination(); } + for (unsigned int i = 0; i < workerCount; ++i) { delete workers[i]; } @@ -225,7 +224,8 @@ auto Runtime::workerLoop(Worker* worker) -> void* { contextManager.start(); - abort(); + // Threads return here if Context::switchToOriginalStack() is called. + return nullptr; } @@ -269,10 +269,34 @@ auto Runtime::nextFiber() -> NextFiberResult { return scheduler.nextFiber(); } +void Runtime::initiateTermination() { + terminateWorkers.store(true, std::memory_order_release); + + wakeupSem.notify_many(workerCount); + + if constexpr (emper::IO) { + globalIo->initiateTermination(); + } +} + void Runtime::waitUntilFinished() { for (workerid_t i = 0; i < workerCount; ++i) { - pthread_join(threads[i], nullptr); + errno = pthread_join(threads[i], nullptr); + if (errno) { + DIE_MSG_ERRNO("pthread_join() returned error for worker ID " << std::to_string(i)); + } + } + + if constexpr (emper::IO) { + globalIo->waitUntilFinished(); } + + threadsRunning = false; +} + +void Runtime::initiateAndWaitUntilTermination() { + initiateTermination(); + waitUntilFinished(); } void Runtime::printStats() { diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 787c7c113c7be34767ea64289cf0a6dc2bc88cc2..0b5d4c8dc93926e23a0e5f5542df2d7c2203baf6 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -4,6 +4,7 @@ #include <pthread.h> // for pthread_t +#include <atomic> #include <cassert> // for assert #include <cstdint> // for intptr_t #include <cstdlib> // for abort @@ -14,8 +15,9 @@ #include "CallerEnvironment.hpp" #include "Common.hpp" // for ALIGN_TO_CACHE_LINE -#include "Debug.hpp" // for LogSubsystem, LogSubsystem::RUNTI, Logger -#include "Emper.hpp" // for WORKER_NOTIFY +#include "Context.hpp" +#include "Debug.hpp" +#include "Emper.hpp" // for WORKER_NOTIFY #include "NextFiberResult.hpp" #include "Scheduler.hpp" // for Scheduler #include "Worker.hpp" @@ -61,6 +63,10 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { std::default_random_engine randomEngine; std::uniform_int_distribution<unsigned int> uniformIntDistribution; + std::atomic<bool> terminateWorkers = false; + + bool threadsRunning = false; + auto workerLoop(Worker* worker) -> void*; ALIGN_TO_CACHE_LINE WorkerWakeupSemaphore wakeupSem; @@ -128,6 +134,14 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { } } + void maybeTerminateWorker() { + if (terminateWorkers.load(std::memory_order_relaxed)) { + LOGD("Worker terminating"); + // Switch to the original stack, which will simply terminate the thread. + Context::switchToOriginalStack(); + } + } + void dispatchLoopSleep() { wakeupSem.wait(); } public: @@ -183,8 +197,12 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { inline auto getStrategy() -> RuntimeStrategy& { return *strategy; } + void initiateTermination(); + void waitUntilFinished(); + void initiateAndWaitUntilTermination(); + void printStats(); static auto inRuntime() -> bool { return Worker::isWorkerThread(); } diff --git a/emper/include/emper.hpp b/emper/include/emper.hpp index c5691e232354e9803ab2c7e978de0f503b1c74f5..eef0673862f9e81609cc9131ecd1dee670c3f59f 100644 --- a/emper/include/emper.hpp +++ b/emper/include/emper.hpp @@ -63,6 +63,13 @@ void spawn(Fiber::fiber_fun0_t function, workeraffinity_t* affinity, S& semaphor } namespace emper { + +void init_runtime(); + +void init_runtime(workerid_t worker_count); + +void destroy_runtime(); + void yield() { Runtime* runtime = Runtime::getRuntime(); runtime->yield(); @@ -79,4 +86,5 @@ auto sleep(unsigned int seconds) -> bool { return res == -ETIME; } + } // namespace emper diff --git a/tests/RuntimeDestroyTest.cpp b/tests/RuntimeDestroyTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6b1d6308e378d8df6fa21769fa75a4cac4e209a0 --- /dev/null +++ b/tests/RuntimeDestroyTest.cpp @@ -0,0 +1,103 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus +#include <cstdlib> // for abort, exit, EXIT_SUCCESS +#include <iostream> // for operator<<, basic_ostream::o... + +#include "BinaryPrivateSemaphore.hpp" // for BPS +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "Debug.hpp" // for WDBG +#include "Fiber.hpp" // for Fiber +#include "PrivateSemaphore.hpp" // for PS +#include "Runtime.hpp" // for Runtime +#include "emper-common.h" // for UNUSED_ARG +#include "emper.hpp" // for async +#include "lib/sync/Semaphore.hpp" + +using fibParams = struct { + int n; + int* result; + PS* sem; +}; + +static void fib(void* voidParams) { + auto* params = static_cast<fibParams*>(voidParams); + int n = params->n; + int* result = params->result; + if (!result) { + std::cerr << "voidParams: " << voidParams << " n: " << params->n << " sem: " << params->sem + << std::endl; + abort(); + } + PS* sem = params->sem; + + if (n < 2) { + *result = n; + } else { + CPS newSem(2); + + int a, b; + + fibParams newParams1; + newParams1.n = n - 1; + newParams1.result = &a; + newParams1.sem = &newSem; + fibParams newParams2; + newParams2.n = n - 2; + newParams2.result = &b; + newParams2.sem = &newSem; + + Fiber* f1 = Fiber::from(&fib, &newParams1); + Fiber* f2 = Fiber::from(&fib, &newParams2); + + Runtime* runtime = Runtime::getRuntime(); + runtime->schedule(*f1); + runtime->schedule(*f2); + + WDBG("fib: Calling wait for n=" << n); + newSem.wait(); + + *result = a + b; + } + + WDBG("fib: Calling signalAndExit for n=" << n); + sem->signalAndExit(); +} + +static emper::lib::sync::Semaphore exit_semaphore; + +static void fibKickoff() { + const int fibNum = 2; + int result; + BPS sem; + fibParams params = {fibNum, &result, &sem}; + + Fiber* fibFiber = Fiber::from(fib, ¶ms); + async(fibFiber); + + sem.wait(); + + std::cout << "fib(" << fibNum << ") = " << result << std::endl; + exit_semaphore.notify(); +} + +auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { + // const unsigned nthreads = std::thread::hardware_concurrency(); + const unsigned nthreads = 2; + + std::cout << "Number of threads: " << nthreads << std::endl; + + Runtime runtime(nthreads); + + Fiber* fibFiber = Fiber::from(&fibKickoff); + + std::cout << "Just alloacted alpha fiber at " << fibFiber << std::endl; + + runtime.scheduleFromAnywhere(*fibFiber); + + exit_semaphore.wait(); + + // Once Runtime goes out of scope, it's desctructor will be called, + // which, in turn, terminates the runtime, and hence causes the + // behavior this test is checking. + return 0; +} diff --git a/tests/meson.build b/tests/meson.build index 52662f38a5bcdb2ce515401a27afcbf3f56911c0..780d3bdb58b0b091bed1e0c30d54f2791776fd3e 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -19,6 +19,13 @@ tests = { 'is_parallel': true, }, + 'RuntimeDestroyTest.cpp': + { + 'description': 'Test runtime destruction', + 'test_suite': 'smoke', + 'is_parallel': true, + }, + 'c_api_test.c': { 'description': 'Test EMPER\'s C API',