diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 4c8a62776cef62e3ccf5fa8db2135020de7f5aa5..831f15c38cd42c748836775eeaad15f82f437125 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -27,6 +27,7 @@ #include "RuntimeStrategyFactory.hpp" #include "RuntimeStrategyStats.hpp" // for RuntimeStrategyStats #include "emper-config.h" // IWYU pragma: keep +#include "io/GlobalIoContext.hpp" // for IoContext #include "io/IoContext.hpp" // for IoContext #include "io/Stats.hpp" // for emper::io::Stats #include "lib/DebugUtil.hpp" @@ -61,6 +62,7 @@ RuntimeStrategyFactory& Runtime::DEFAULT_STRATEGY = #endif ; +using emper::io::GlobalIoContext; using emper::io::IoContext; Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory, unsigned int seed) @@ -72,6 +74,7 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory contextManager(*(new ContextManager(*this))), threads(new pthread_t[workerCount]), workers(new Worker*[workerCount]), + ioContexts(emper::IO ? workerCount : 0), randomEngine(seed) { const int nprocs = get_nprocs(); @@ -88,8 +91,7 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory // The global io_uring needs at least workerCount entries in its SQ because // for each worker's IoContext one eventfd read is prepared before the // globalCompleter is started and submits all previously prepared sqes. - globalIo = new IoContext(workerCount); - ioContexts = new IoContext[workerCount]; + globalIo = new GlobalIoContext(*this, workerCount); if constexpr (emper::STATS) { globalIo->stats.workerId = -1; @@ -98,15 +100,16 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory // submit the workers' CQ eventfds to the global IoContext for (workerid_t i = 0; i < workerCount; ++i) { - ioContexts[i].submit_efd(); + ioContexts[i] = new IoContext(*this); + globalIo->registerWorkerIo(*ioContexts[i]); if constexpr (emper::STATS) { - ioContexts[i].stats.workerId = i; + ioContexts[i]->stats.workerId = i; } } // start the globalCompleter after all eventfd are submitted so we don't need to // synchronize the globalIo's SQ - IoContext::startGlobalCompleter(*globalIo); + globalIo->startGlobalCompleter(); } // Core id we start the worker pinning @@ -183,8 +186,14 @@ Runtime::~Runtime() { delete[] threads; if constexpr (emper::IO) { - delete[] ioContexts; + // It is safer to destroy the globalIo before the worker IoContexts + // because pointer to the worker IoContexts can outlife their objects + // in the globalCompleter delete globalIo; + + for (unsigned int i = 0; i < workerCount; ++i) { + delete ioContexts[i]; + } } { @@ -198,7 +207,7 @@ auto Runtime::workerLoop(Worker* worker) -> void* { worker->setWorker(); if constexpr (emper::IO) { - ioContexts[worker->workerId].setWorkerIo(); + ioContexts[worker->workerId]->setWorkerIo(); } LOGD("Worker loop started by thread " << syscall(SYS_gettid)); diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index fbbabc9771fdde55f99cf0de6d13475f99749310..2edd7aa9c3c0c23314699f4f0c6e4b273b32050b 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -30,9 +30,11 @@ class RuntimeStrategy; class RuntimeStrategyFactory; namespace emper::io { +class GlobalIoContext; class IoContext; -} +} // namespace emper::io +using emper::io::GlobalIoContext; using emper::io::IoContext; using emper::lib::sync::WorkerWakeupSemaphore; @@ -53,8 +55,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { ContextManager& contextManager; pthread_t* threads; Worker** workers; - IoContext* globalIo = nullptr; - IoContext* ioContexts; + GlobalIoContext* globalIo = nullptr; + std::vector<IoContext*> ioContexts; std::default_random_engine randomEngine; std::uniform_int_distribution<unsigned int> uniformIntDistribution; @@ -128,14 +130,6 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { void dispatchLoopSleep() { wakeupSem.wait(); } - inline auto getGlobalIo() -> IoContext* { - if constexpr (emper::IO) { - return globalIo; - } - - return nullptr; - } - public: Runtime() : Runtime(getDefaultWorkerCount()) {} @@ -192,6 +186,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { friend ContextManager; friend Scheduler; friend Dispatcher; + friend GlobalIoContext; friend IoContext; template <typename T, intptr_t WS_QUEUE_SIZE, size_t WORKER_EXCLUSIVE_QUEUE_SIZE> friend class MemoryManager; diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp index a346253d0d527dd80ec7eeb7e6157f8ca3705d0d..3cda8c91ad9f13bfa3e0a099c1ed6f1c97a4a01b 100644 --- a/emper/io/Future.hpp +++ b/emper/io/Future.hpp @@ -36,6 +36,7 @@ class FutureError : public std::logic_error { * @brief Future representing an IO request which can be awaited */ class Future : public Logger<LogSubsystem::IO> { + friend class GlobalIoContext; friend class IoContext; friend class Stats; diff --git a/emper/io/GlobalIoContext.cpp b/emper/io/GlobalIoContext.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9c68f5a032f057337776de143c235eecbce8d5c8 --- /dev/null +++ b/emper/io/GlobalIoContext.cpp @@ -0,0 +1,174 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#include "io/GlobalIoContext.hpp" + +#include <liburing.h> +#include <liburing/io_uring.h> +#include <unistd.h> + +#include <cassert> +#include <cerrno> +#include <cstdio> +#include <memory> + +#include "CallerEnvironment.hpp" +#include "Common.hpp" +#include "Runtime.hpp" +#include "io/Future.hpp" +#include "io/IoContext.hpp" +#include "lib/TaggedPtr.hpp" + +using emper::lib::TaggedPtr; + +namespace emper::io { + +// The globalCompleter must be started after all worker io_uring eventfds are submitted +// so we don't have to synchronize the global SQ +auto GlobalIoContext::globalCompleterFunc(void* arg) -> void* { + auto* globalIoContext = reinterpret_cast<GlobalIoContext*>(arg); + + globalIoContext->logD("submit all worker io_uring eventfds"); + + // submit all eventfds in the SQ inserted by IoContext::submit_efd calls + int submitted = io_uring_submit(&globalIoContext->ring); + if (unlikely(submitted < 0)) { + DIE_MSG_ERRNO("initial global io_uring submit failed"); + } + + // We have submitted all eventfds + assert(submitted == globalIoContext->runtime.getWorkerCount()); + + // Submit the read for our termination eventfd + struct io_uring_sqe* sqe = io_uring_get_sqe(&globalIoContext->ring); + assert(sqe); + + // We reuse the notificationEventFd member of IoContext to receive a the termination notification + io_uring_prep_read(sqe, globalIoContext->notificationEventFd, + &globalIoContext->notificationEventFdBuf, + sizeof(globalIoContext->notificationEventFdBuf), 0); + TaggedPtr terminationEvent(static_cast<void*>(nullptr), + static_cast<uint16_t>(PointerTags::TerminationEvent)); + io_uring_sqe_set_data(sqe, terminationEvent); + + submitted = io_uring_submit(&globalIoContext->ring); + if (unlikely(submitted < 0)) { + DIE_MSG_ERRNO("submitting termination eventfd read failed"); + } + assert(submitted == 1); + + globalIoContext->logD("start global completer loop"); + while (true) { + struct io_uring_cqe* cqe; + + // wait for completions + int err = io_uring_wait_cqe(&globalIoContext->ring, &cqe); + if (unlikely(err)) { + if (err == -EINTR) { + continue; + } + errno = -err; + perror("io_uring_wait_cqe"); + } + + TaggedPtr tptr(io_uring_cqe_get_data(cqe)); + auto tag = static_cast<PointerTags>(tptr.getTag()); + + // The cqe is for a IoContext.notificationEventFd read + // -> there are completions on this worker IoContext + switch (tag) { + // clang-11 does not support [[likely]] yet + // TODO: remove the preprocessor check if clang ass [[likely]] support +#if defined __has_attribute +#if __has_attribute(likely) + [[likely]] // NOLINT(clang-diagnostic-unknown-attributes) +#endif +#endif + case PointerTags::IoContext : { + auto* worker_io = tptr.getPtr<IoContext>(); + assert(worker_io); + + // re-add the eventfd read + globalIoContext->prepareWorkerNotification(*worker_io); + submitted = io_uring_submit(&globalIoContext->ring); + + if (unlikely(submitted < 0)) { + errno = -submitted; + DIE_MSG_ERRNO("re-submitting eventfd read to global_ring failed"); + } + + assert(submitted == 1); + + worker_io->reapCompletions<CallerEnvironment::ANYWHERE>(); + } + break; + + // The cqe is for a completed Future. + // This happens in case the global completer is dealing with a + // partial completion. + case PointerTags::Future: { + auto* future = tptr.getPtr<Future>(); + + uint32_t res = cqe->res; + + future->recordCompletion(globalIoContext->stats, res); + future->completeFromAnywhere(res); + } break; + + // someone initiated termination + case PointerTags::TerminationEvent: { + pthread_exit(nullptr); + } break; + + default: + DIE_MSG("Unknown GlobalIoContext::PointerTag: " << tptr.getTag()); + break; + } + + io_uring_cqe_seen(&globalIoContext->ring, cqe); + } +} + +void GlobalIoContext::startGlobalCompleter() { + errno = pthread_create(&globalCompleter, nullptr, globalCompleterFunc, this); + if (unlikely(errno)) { + DIE_MSG_ERRNO("Creating global completer thread failed"); + } +} + +void GlobalIoContext::initiateTermination() { + uint64_t write_buf = 1; + int bytes_written = write(notificationEventFd, &write_buf, sizeof(write_buf)); + if (bytes_written != sizeof(write_buf)) { + DIE_MSG_ERRNO("Writing to globalIo termination eventfd failed"); + } +} + +void GlobalIoContext::waitUntilFinished() const { + errno = pthread_join(globalCompleter, nullptr); + if (unlikely(errno)) { + DIE_MSG_ERRNO("pthread_join failed for the globalCompleter"); + } +} + +// This function must not be executed in parallel because it does not synchronize +// the SQ +void GlobalIoContext::registerWorkerIo(IoContext& workerIo) { + // Register the worker's notificationEventFd with its io_uring + if (unlikely(io_uring_register_eventfd(&workerIo.ring, workerIo.notificationEventFd) < 0)) { + DIE_MSG_ERRNO("io_uring_register_eventfd failed"); + } + + prepareWorkerNotification(workerIo); + // The sqe we prepared will be submitted to io_uring when the globalCompleter starts. +} + +void GlobalIoContext::prepareWorkerNotification(IoContext& workerIo) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring); + // We initialized the global ring big enough we must always get a free sqe + assert(sqe); + + io_uring_prep_read(sqe, workerIo.notificationEventFd, &workerIo.notificationEventFdBuf, + sizeof(workerIo.notificationEventFdBuf), 0); + io_uring_sqe_set_data(sqe, TaggedPtr(&workerIo, static_cast<uint16_t>(PointerTags::IoContext))); +} +} // namespace emper::io diff --git a/emper/io/GlobalIoContext.hpp b/emper/io/GlobalIoContext.hpp new file mode 100644 index 0000000000000000000000000000000000000000..5ad94939aa563c3ee7b3ce2f0352ccc36567fbb5 --- /dev/null +++ b/emper/io/GlobalIoContext.hpp @@ -0,0 +1,43 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Fischer +#pragma once + +#include <pthread.h> // for pthread_t + +#include <cstdint> + +#include "emper-common.h" +#include "io/IoContext.hpp" +class Runtime; + +namespace emper::io { +class GlobalIoContext : public IoContext { + friend Runtime; + friend class Future; + friend class SendFuture; + friend class RecvFuture; + + private: + GlobalIoContext(Runtime& runtime, workerid_t workerCount) : IoContext(runtime, workerCount) {} + + enum class PointerTags : uint16_t { Future, IoContext, TerminationEvent }; + + // pthread used to monitor the CQs from worker io_urings + // as well as handling IO requests submitted from anywhere + pthread_t globalCompleter; + + /* function executed by the global completer thread */ + static auto globalCompleterFunc(void* arg) -> void*; + + // start the global completer thread + // this must be called after all worker IoContexts' eventfds are submitted + // to the global IoContext + void startGlobalCompleter(); + + void initiateTermination(); + void waitUntilFinished() const; + + void registerWorkerIo(IoContext& workerIo); + void prepareWorkerNotification(IoContext& workerIo); +}; +} // namespace emper::io diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index a8ca9a7fb824113c061f96a45dfb0f9722758e8c..398092d6b6c19fbe9921064f18f7958860b9c387 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -10,7 +10,6 @@ #include <atomic> // for atomic, __atomic_base #include <cassert> // for assert #include <cerrno> // for errno, ECANCELED, EBUSY, EAGAIN, EINTR -#include <cstdio> // for perror #include <cstring> // for memset #include <memory> // for allocator #include <vector> @@ -22,7 +21,8 @@ #include "Fiber.hpp" #include "Runtime.hpp" // for Runtime #include "io/Future.hpp" // for Future, operator<<, Future::State -#include "io/Stats.hpp" // for Stats, nanoseconds +#include "io/GlobalIoContext.hpp" +#include "io/Stats.hpp" // for Stats, nanoseconds #include "lib/TaggedPtr.hpp" #ifndef EMPER_LOG_OFF @@ -33,15 +33,7 @@ using emper::lib::TaggedPtr; namespace emper::io { -enum class PointerTags : uint16_t { Future, IoContext, Callback }; - -static inline auto castIfFuture(TaggedPtr ptr) -> Future * { - if (ptr.getTag() == static_cast<uint16_t>(PointerTags::Future)) { - return ptr.getPtr<Future>(); - } - - return nullptr; -} +enum class PointerTags : uint16_t { Future, Callback }; static inline auto castIfCallback(TaggedPtr ptr) -> Future::Callback * { if (ptr.getTag() == static_cast<uint16_t>(PointerTags::Callback)) { @@ -53,8 +45,6 @@ static inline auto castIfCallback(TaggedPtr ptr) -> Future::Callback * { thread_local IoContext *IoContext::workerIo = nullptr; -pthread_t IoContext::globalCompleter; - auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> unsigned { unsigned total_chain_length = chain_length; @@ -185,7 +175,7 @@ void IoContext::reapCompletions() { } // never reap completions on the global IoContext - assert(this != getGlobalIo()); + assert(this != runtime.globalIo); LOGD("Reaping completions"); unsigned head; @@ -194,7 +184,6 @@ void IoContext::reapCompletions() { // vector used to batch all completions scheduled to the AnywhereQueue std::vector<Fiber *> continuationFibers; - Runtime *runtime = Runtime::getRuntime(); int err = io_uring_peek_cqe(&ring, &cqe); if (err) { @@ -223,12 +212,12 @@ void IoContext::reapCompletions() { delete &c; }); if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - runtime->schedule(*callbackFiber); + runtime.schedule(*callbackFiber); } else { if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) { continuationFibers.push_back(callbackFiber); } else { - runtime->scheduleFromAnywhere(*callbackFiber); + runtime.scheduleFromAnywhere(*callbackFiber); } } continue; @@ -266,7 +255,7 @@ void IoContext::reapCompletions() { stats.record_worker_reaps(count); } else { // actually schedule all completion fibers - runtime->scheduleFromAnywhere(continuationFibers.begin(), continuationFibers.end()); + runtime.scheduleFromAnywhere(continuationFibers.begin(), continuationFibers.end()); stats.record_completer_reaps(count); } @@ -274,91 +263,12 @@ unlock: cq_lock.unlock(); } -// The globalCompleter must be started after all worker io_uring eventfds are submitted -// so we don't have to synchronize the global sq -auto IoContext::globalCompleterFunc(void *arg) -> void * { - IoContext &io = *reinterpret_cast<IoContext *>(arg); - - io.logD("submit all worker io_uring eventfds"); - - // submit all eventfds in the SQ inserted by IoContext::submit_efd calls - int submitted = io_uring_submit(&io.ring); - if (unlikely(submitted < 0)) { - DIE_MSG_ERRNO("initial global io_uring submit failed"); - } - - // We have submitted all eventfds - assert(submitted == Runtime::getRuntime()->getWorkerCount()); - - io.logD("start global completer loop"); - while (true) { - struct io_uring_cqe *cqe; - - // wait for completions - int err = io_uring_wait_cqe(&io.ring, &cqe); - if (unlikely(err)) { - if (err == -EINTR) { - continue; - } - errno = -err; - perror("io_uring_wait_cqe"); - } - - TaggedPtr tptr(io_uring_cqe_get_data(cqe)); - auto *future = castIfFuture(tptr); - - // The cqe is for a completed Future. - // This happens in case the global completer is dealing with a - // partial completion. - if (unlikely(future)) { - uint32_t res = cqe->res; - io_uring_cqe_seen(&io.ring, cqe); - - future->recordCompletion(io.stats, res); - future->completeFromAnywhere(res); - continue; - } - - // The cqe is for a IoContext.eventfd read - // -> there are completions on this worker IoContext - auto *worker_io = tptr.getPtr<IoContext>(); - assert(worker_io); - - io_uring_cqe_seen(&io.ring, cqe); - - // re-add the eventfd read - struct io_uring_sqe *sqe = io_uring_get_sqe(&io.ring); - // we initialized the global ring big enough we should always get a free sqe - assert(sqe); - - io_uring_prep_read(sqe, worker_io->ring_eventfd, &worker_io->ring_eventfd_readbuf, - sizeof(worker_io->ring_eventfd_readbuf), 0); - io_uring_sqe_set_data(sqe, tptr); - - submitted = io_uring_submit(&io.ring); - - if (unlikely(submitted < 0)) { - errno = -submitted; - DIE_MSG_ERRNO("re-submitting eventfd read to global_ring failed"); - } - - assert(submitted == 1); - - worker_io->reapCompletions<CallerEnvironment::ANYWHERE>(); - } - - return nullptr; -} - -void IoContext::startGlobalCompleter(IoContext &globalIo) { - int err = pthread_create(&globalCompleter, nullptr, globalCompleterFunc, &globalIo); - if (unlikely(err)) { - errno = err; - DIE_MSG_ERRNO("Creating global completer thread failed"); - } -} +// Show the compiler our template incarnations this is needed again because +// reapCompletions<CallerEnvironment::ANYWHERE> is now called from GlobalIoContext.cpp +template void IoContext::reapCompletions<CallerEnvironment::ANYWHERE>(); +template void IoContext::reapCompletions<CallerEnvironment::EMPER>(); -IoContext::IoContext(size_t uring_entries) { +IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) { struct io_uring_params params; memset(¶ms, 0, sizeof(params)); @@ -367,7 +277,7 @@ IoContext::IoContext(size_t uring_entries) { } if constexpr (emper::IO_URING_SHARED_WQ) { - auto *gio = getGlobalIo(); + auto *gio = runtime.globalIo; if (gio) { params.flags |= IORING_SETUP_ATTACH_WQ; params.wq_fd = gio->ring.ring_fd; @@ -394,37 +304,18 @@ IoContext::IoContext(size_t uring_entries) { } } - ring_eventfd = eventfd(0, 0); - if (unlikely(ring_eventfd < 0)) { + // This eventfd will be registered to retrieve completion notifications when the Runtime + // calls registerWorkerIo() on the globalIo + // Or it will be used to terminate the globalIo + notificationEventFd = eventfd(0, 0); + if (unlikely(notificationEventFd < 0)) { DIE_MSG_ERRNO("creating eventfd for io_uring failed"); } - - if (unlikely(io_uring_register_eventfd(&ring, ring_eventfd) < 0)) { - DIE_MSG_ERRNO("io_uring_register_eventfd failed"); - } -} - -// This function must not be executed in parallel because it does not synchronize -// the global io_uring's SQ -void IoContext::submit_efd() { - // the global IoContext must be initialized to submit our eventfd - assert(getGlobalIo()); - - IoContext &globalIo = *getGlobalIo(); - - struct io_uring_sqe *sqe = io_uring_get_sqe(&globalIo.ring); - // we initialized the global ring big enough we should always get a free sqe - assert(sqe); - - io_uring_prep_read(sqe, ring_eventfd, &ring_eventfd_readbuf, sizeof(ring_eventfd_readbuf), 0); - io_uring_sqe_set_data(sqe, TaggedPtr(this, static_cast<uint16_t>(PointerTags::IoContext))); - - // The sqe we prepared will be submitted to io_uring when the globalCompleter starts. } IoContext::~IoContext() { io_uring_queue_exit(&ring); // TODO: check if this is safe - ::close(ring_eventfd); + ::close(notificationEventFd); } } // namespace emper::io diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 4031c963308c4cee757facdcfc4c8c2cd7796bcb..64382feaea80105efd848c2b618b858e6f6ff233 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -3,7 +3,6 @@ #pragma once #include <liburing.h> // for io_uring -#include <pthread.h> // for pthread_t #include <atomic> // for atomic #include <cassert> // for assert @@ -28,34 +27,24 @@ class IoContext : public Logger<LogSubsystem::IO> { friend class Future; friend class SendFuture; friend class RecvFuture; + // We need this friend declaration to access protected members through + // IoContext pointer in GlobalIoContext::globalCompleterFunc + friend class GlobalIoContext; - private: - static thread_local IoContext *workerIo; - - // pthread used to monitor the CQs from worker io_urings - // as well as handling IO requests submitted from anywhere - static pthread_t globalCompleter; - - /* function executed by the global completer thread */ - static auto globalCompleterFunc(void *arg) -> void *; - - // start the global completer thread - // this must be called after all worker IoContexts' eventfds are submitted - // to the global IoContext - static void startGlobalCompleter(IoContext &globalIo); + protected: + // Remember the Runtime which created the IoContext + Runtime &runtime; + static thread_local IoContext *workerIo; // TryLock protecting the completion queue of ring. lib::adt::AtomicTryLock cq_lock; struct io_uring ring; - // eventfd registered with ring. - // It is used to notify the globalCompleter about available completions in ring's CQ. - int ring_eventfd; - uint64_t ring_eventfd_readbuf; - - // submit the eventfd of the io_uring to the global IoContext - // All eventfds must be registered before the globalCompleter thread is started - void submit_efd(); + // In a worker's IoContext This eventfd is registered with the io_uring to get completion + // notifications in the globalCompleter. + // In a GlobalIoContext this eventfd is used to initiate termination + int notificationEventFd; + uint64_t notificationEventFdBuf; Stats stats; @@ -79,14 +68,9 @@ class IoContext : public Logger<LogSubsystem::IO> { inline void setWorkerIo() { workerIo = this; } - /** - * @brief Return the globalIo of the current Runtime - */ - static inline auto getGlobalIo() -> IoContext * { return Runtime::getRuntime()->getGlobalIo(); } - public: - IoContext(size_t uring_entries); - IoContext() : IoContext(EMPER_IO_WORKER_URING_ENTRIES){}; + IoContext(Runtime &runtime, size_t uring_entries); + IoContext(Runtime &runtime) : IoContext(runtime, EMPER_IO_WORKER_URING_ENTRIES){}; ~IoContext(); /** @@ -96,20 +80,31 @@ class IoContext : public Logger<LogSubsystem::IO> { * * @return the thread_local IoContext of the current worker */ - static inline auto getWorkerIo() -> IoContext * { + [[nodiscard]] static inline auto getWorkerIo() -> IoContext * { assert(Runtime::inRuntime()); return workerIo; } /** - * @brief Return either the worker or the globalIo dependent on the callerEnvironment + * @brief get IoContext for the CallerEnvironment + * + * This function is for now not independent of a global Runtime reference + * because it uses Runtime::getRuntime() if Runtime::getRuntime() would + * use a thread_local variable instead of a global one this function + * would be like the rest of IoContext independent of a global Runtime + * reference + * + * @return the either the worker's IoContext or the GlobalIoContext */ template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - static inline auto getIo() -> IoContext * { + [[nodiscard]] static inline auto getIo() -> IoContext * { if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - return getWorkerIo(); + return IoContext::getWorkerIo(); } else { - return getGlobalIo(); + // we use a reinterpret_ cast here because GlobalIoContext is incomplete + // at this point but we can't include GlobalIoContext.hpp because it + // would introduce a cyclic dependency. + return reinterpret_cast<IoContext *>(Runtime::getRuntime()->globalIo); } } diff --git a/emper/io/meson.build b/emper/io/meson.build index 75ee73eb1b8dae222ff092f0ab665730d0bc3003..63317bcad025ed7d384c20f7fc2813fbbc62fc5f 100644 --- a/emper/io/meson.build +++ b/emper/io/meson.build @@ -6,4 +6,5 @@ emper_cpp_sources += files( 'Stats.cpp', 'Operation.cpp', 'IoContext.cpp', + 'GlobalIoContext.cpp', )