diff --git a/emper/Debug.cpp b/emper/Debug.cpp index c551cf0f4ea6e46ab58fb2976e14e68396c019e2..ffc6012f4108e1383ed70321fded214d21d529a5 100644 --- a/emper/Debug.cpp +++ b/emper/Debug.cpp @@ -2,20 +2,30 @@ // Copyright © 2020 Florian Schmaus #include "Debug.hpp" +#include <iomanip> // for operator<<, setfill, setw #include <iostream> // for operator<<, cerr, ostream, basic_ostream #include <mutex> // for mutex, unique_lock -#include "Runtime.hpp" // for Runtime +#include "Common.hpp" +#include "Worker.hpp" // for Runtime #include "emper-common.h" // for workerid_t #include "emper-config.h" // for EMPER_LOG_LEVEL -static std::mutex worker_log_mutex; +static std::mutex emper_log_mutex; -void worker_log(const std::string& prefix, const std::string& message) { - const workerid_t workerId = Runtime::getWorkerId(); +void emper_log(const std::string& prefix, const std::string& message) { + Worker* worker = Worker::getCurrentWorker(); + std::ostringstream workerPrefix; + if (likely(worker)) { + workerid_t workerId = worker->getWorkerId(); + std::string workerIdAsString = std::to_string(workerId); + workerPrefix << std::setfill('0') << std::setw(3) << workerIdAsString << " "; + } else { + workerPrefix << " "; + } - std::unique_lock<std::mutex> lock(worker_log_mutex); - std::cerr << workerId; + std::unique_lock<std::mutex> lock(emper_log_mutex); + std::cerr << workerPrefix.str(); if (!prefix.empty()) { std::cerr << " " << prefix << " "; } else { diff --git a/emper/Debug.hpp b/emper/Debug.hpp index 1057dfa3ce950b4552c353a8e7dcdfea435155d8..a2de8be4832ac9d0f336a3e7cee8a704e7c3c910 100644 --- a/emper/Debug.hpp +++ b/emper/Debug.hpp @@ -24,9 +24,9 @@ // clang-format off // NOLINTNEXTLINE(bugprone-macro-parentheses) -#define DBG(x) do { std::cerr << x << std::endl; } while (false) +#define DBG(x) do { std::stringstream sst; sst << x << std::endl; emper_log("", sst.str()); } while (false) // NOLINTNEXTLINE(bugprone-macro-parentheses) -#define WDBG(x) do { std::stringstream sst; sst << x; worker_log("", sst.str()); } while (false) +#define WDBG(x) do { std::stringstream sst; sst << x; emper_log("", sst.str()); } while (false) // To avoid "error: there are no arguments to ‘logD’ that depend on a // template parameter, so a declaration of ‘logD’ must be available" // we use "this->logD()" instead of simply "logD()" below. @@ -82,7 +82,7 @@ namespace emper { extern enum LogLevel log_level; } -void worker_log(const std::string& prefix, const std::string& message); +void emper_log(const std::string& prefix, const std::string& message); template <LogSubsystem logSubsystem> class Logger { @@ -153,7 +153,7 @@ class Logger { sst << " " << this; } - worker_log(sst.str(), string); + emper_log(sst.str(), string); } inline void logE(const std::string& string) const { log(Error, string); } diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 5858198bf9297ecd7858bca0da579b6bed0ecdc3..964cd248ea46f19c2f3c369dab521cb8dda2352c 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -38,9 +38,6 @@ std::mutex Runtime::currentRuntimeMutex; Runtime* Runtime::currentRuntime; -thread_local bool Runtime::workerThread = false; -thread_local unsigned int Runtime::seed; -thread_local workerid_t Runtime::workerId; RuntimeStrategy& Runtime::DEFAULT_STRATEGY = WsStrategy::INSTANCE; Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy, unsigned int seed) @@ -50,11 +47,12 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy, unsigned int scheduler(strategy.getScheduler(*this)), dispatcher(strategy.getDispatcher(*this)), contextManager(*(new ContextManager(*this))), + threads(new pthread_t[workerCount]), + workers(new Worker*[workerCount]), randomEngine(seed), atLeastOneWorkerIsSleeping(false), skipSleep(false) { threads = new pthread_t[workerCount]; - workerArgs = new struct WorkerArgs[workerCount]; const int nprocs = get_nprocs(); @@ -80,22 +78,21 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy, unsigned int if (errno) DIE_MSG_ERRNO("pthread_attr_setaffinity_np() failed"); // End non portable. - // Load the worker ID into the worker Argument array. - workerArgs[i].id = i; - // Load the worker PRNG seed into the Argument array. // This is not done by each thread individually because the randomEngine is // mutated and therefore produces data races. - workerArgs[i].seed = uniformIntDistribution(randomEngine); + unsigned int seed = uniformIntDistribution(randomEngine); + auto* worker = new Worker(i, seed); + workers[i] = worker; - auto thread_function = [](void* voidWorkerArgs) -> void* { + auto thread_function = [](void* voidWorkerPtr) -> void* { #ifdef EMPER_LIBURCU rcu_register_thread(); #endif - auto* workerArgs = reinterpret_cast<struct WorkerArgs*>(voidWorkerArgs); - return currentRuntime->workerLoop(workerArgs); + auto* worker = reinterpret_cast<Worker*>(voidWorkerPtr); + return currentRuntime->workerLoop(worker); }; - errno = pthread_create(&threads[i], &attr, thread_function, &workerArgs[i]); + errno = pthread_create(&threads[i], &attr, thread_function, worker); if (errno) DIE_MSG_ERRNO("pthread_create() failed"); } @@ -123,6 +120,11 @@ Runtime::~Runtime() { DIE_MSG_ERRNO("pthread_cancel() failed"); } } + for (unsigned int i = 0; i < workerCount; ++i) { + delete workers[i]; + } + delete[] workers; + delete[] threads; { std::lock_guard<std::mutex> lock(currentRuntimeMutex); currentRuntime = nullptr; @@ -130,10 +132,8 @@ Runtime::~Runtime() { DBG("Runtime " << this << " terminated"); } -auto Runtime::workerLoop(struct WorkerArgs* workerArgs) -> void* { - workerThread = true; - workerId = workerArgs->id; - seed = workerArgs->seed; +auto Runtime::workerLoop(Worker* worker) -> void* { + worker->setWorker(); LOGD("Worker loop started by thread " << syscall(SYS_gettid)); diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index cbb9e02b20107a94562bb4ae2870df9ab3496437..8ec07545cb22bf59709ed55399b723ac52349429 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -15,9 +15,10 @@ #include <thread> // for thread #include <vector> // for vector -#include "Common.hpp" // for ALIGN_TO_CACHE_LINE -#include "Debug.hpp" // for LogSubsystem, LogSubsystem::RUNTI, Logger -#include "Scheduler.hpp" // for Scheduler +#include "Common.hpp" // for ALIGN_TO_CACHE_LINE +#include "Debug.hpp" // for LogSubsystem, LogSubsystem::RUNTI, Logger +#include "Scheduler.hpp" // for Scheduler +#include "Worker.hpp" #include "emper-common.h" // for workerid_t #include "lib/sync/Latch.hpp" // for Latch @@ -36,10 +37,6 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { static std::mutex currentRuntimeMutex; static Runtime* currentRuntime; - static thread_local bool workerThread; - static thread_local unsigned int seed; - static thread_local workerid_t workerId; - const workerid_t workerCount; std::vector<std::function<void(void)>> newWorkerHooks; @@ -51,18 +48,12 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { Dispatcher& dispatcher; ContextManager& contextManager; pthread_t* threads; - - // Arguments passed to a new worker thread - struct WorkerArgs { - workerid_t id; - unsigned int seed; - }; - struct WorkerArgs* workerArgs; + Worker** workers; std::default_random_engine randomEngine; std::uniform_int_distribution<unsigned int> uniformIntDistribution; - auto workerLoop(struct WorkerArgs* workerArgs) -> void*; + auto workerLoop(Worker* worker) -> void*; ALIGN_TO_CACHE_LINE std::mutex workerSleepMutex; std::condition_variable workerSleepConditionVariable; @@ -137,13 +128,9 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { auto nextFiber() -> Fiber*; // https://stackoverflow.com/a/3747462/194894 - static inline auto rand() -> int { - seed = 214013 * seed + 2531011; - auto shifted_seed = seed >> 16; - return *reinterpret_cast<int*>(&shifted_seed) & 0x7FFF; - } + static inline auto rand() -> int { return Worker::rand(); } - static inline auto getWorkerId() -> workerid_t { return workerId; } + static inline auto getWorkerId() -> workerid_t { return Worker::getCurrentWorkerId(); } [[nodiscard]] inline auto getWorkerCount() const -> workerid_t { return workerCount; } @@ -157,7 +144,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { void printStats(); - static auto inRuntime() -> bool { return workerThread; } + static auto inRuntime() -> bool { return Worker::isWorkerThread(); } void executeAndWait(std::function<void()> f); diff --git a/emper/Worker.cpp b/emper/Worker.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1c3053199c4605cb0e02d49aefb7931b8517f375 --- /dev/null +++ b/emper/Worker.cpp @@ -0,0 +1,7 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#include "Worker.hpp" + +thread_local Worker* Worker::currentWorker = nullptr; + +void Worker::setWorker() { currentWorker = this; } diff --git a/emper/Worker.hpp b/emper/Worker.hpp new file mode 100644 index 0000000000000000000000000000000000000000..0cec08e3b279bf309bf1985a455ac91d553a93af --- /dev/null +++ b/emper/Worker.hpp @@ -0,0 +1,47 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#pragma once + +#include "emper-common.h" + +class Runtime; + +class Worker { + private: + static thread_local Worker* currentWorker; + + const workerid_t workerId; + + unsigned int seed; + + Worker(workerid_t workerId, unsigned int seed) : workerId(workerId), seed(seed) {} + + void setWorker(); + + friend Runtime; + + public: + [[nodiscard]] auto getWorkerId() const -> workerid_t { return workerId; } + + // https://stackoverflow.com/a/3747462/194894 + inline auto nextRand() -> int { + seed = 214013 * seed + 2531011; + auto shifted_seed = seed >> 16; + return *reinterpret_cast<int*>(&shifted_seed) & 0x7FFF; + } + + static inline auto isWorkerThread() -> bool { return currentWorker != nullptr; } + + static inline auto getCurrentWorker() -> Worker* { return currentWorker; } + + // https://stackoverflow.com/a/3747462/194894 + static inline auto rand() -> int { + Worker* currentWorker = getCurrentWorker(); + return currentWorker->nextRand(); + } + + static inline auto getCurrentWorkerId() -> workerid_t { + Worker* currentWorker = getCurrentWorker(); + return currentWorker->getWorkerId(); + } +}; diff --git a/emper/meson.build b/emper/meson.build index 51beb57c95ed5ca975f8d4601dc463dfcd199080..0ffa56160b4a22d8ee0ca52957e3b56842e15181 100644 --- a/emper/meson.build +++ b/emper/meson.build @@ -26,6 +26,7 @@ emper_cpp_sources = [ 'BinaryPrivateSemaphore.cpp', 'CountingPrivateSemaphore.cpp', 'Semaphore.cpp', + 'Worker.cpp', ] emper_generated_files = []