diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 84a587106bfd1ec430b68ddd814c010643ca984a..4c8a62776cef62e3ccf5fa8db2135020de7f5aa5 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -12,7 +12,9 @@ #include <cstdlib> // for rand, srand, abort #include <cstring> #include <memory> // for __shared_ptr_access, shared_ptr +#include <ostream> #include <string> // for string +#include <thread> #include "Common.hpp" // for DIE_MSG_ERRNO, DIE, DIE_MSG #include "Context.hpp" @@ -44,8 +46,6 @@ #ifndef EMPER_LOG_OFF #include <syscall.h> // for SYS_gettid #include <unistd.h> // for syscall - -#include <ostream> // for operator<<, basic_ostream<>:... #endif std::mutex Runtime::currentRuntimeMutex; @@ -109,6 +109,17 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory IoContext::startGlobalCompleter(*globalIo); } + // Core id we start the worker pinning + workerid_t pinningOffset = 0; + char* pinningOffsetEnv = std::getenv("EMPER_PINNING_OFFSET"); + if (pinningOffsetEnv) { + int pinningOffsetInt = std::stoi(pinningOffsetEnv); + if (pinningOffsetInt > UINT8_MAX) { + DIE_MSG("Pinning offset " << pinningOffsetInt << " to big for its datatype"); + } + pinningOffset = static_cast<workerid_t>(pinningOffsetInt); + } + for (workerid_t i = 0; i < workerCount; ++i) { pthread_attr_t attr; errno = pthread_attr_init(&attr); @@ -117,7 +128,7 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory // Start non portable. cpu_set_t cpuset; CPU_ZERO(&cpuset); - CPU_SET(i % nprocs, &cpuset); + CPU_SET((i + pinningOffset) % nprocs, &cpuset); errno = pthread_attr_setaffinity_np(&attr, sizeof(cpuset), &cpuset); if (errno) DIE_MSG_ERRNO("pthread_attr_setaffinity_np() failed"); @@ -208,6 +219,19 @@ auto Runtime::workerLoop(Worker* worker) -> void* { return nullptr; } +auto Runtime::getDefaultWorkerCount() -> workerid_t { + char* workerCountEnv = std::getenv("EMPER_WORKER_COUNT"); + if (workerCountEnv) { + int workerCountInt = std::stoi(workerCountEnv); + if (workerCountInt > UINT8_MAX) { + DIE_MSG("Worker count " << workerCountInt << " to big for its datatype"); + } + return static_cast<workerid_t>(workerCountInt); + } + + return std::thread::hardware_concurrency(); +} + void Runtime::yield() { Context* context = Context::getCurrentContext(); contextManager.saveAndStartNew([context, this] { diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 15a95f8f4c73becfabb2779683e33fff3d3fa0c5..6e7bed73ab8151a5452545f07b048c3cfff062dc 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -10,7 +10,6 @@ #include <functional> // for function #include <mutex> // for mutex, lock_guard, unique_lock #include <random> -#include <thread> // for thread #include <vector> // for vector #include "CallerEnvironment.hpp" @@ -68,6 +67,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { static void printLastRuntimeStats(); + static auto getDefaultWorkerCount() -> workerid_t; + protected: void addNewWorkerHook(const std::function<void(void)>& hook) { newWorkerHooks.push_back(hook); }; @@ -136,12 +137,12 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { } public: - Runtime() : Runtime(std::thread::hardware_concurrency()) {} + Runtime() : Runtime(getDefaultWorkerCount()) {} Runtime(workerid_t workerCount) : Runtime(workerCount, DEFAULT_STRATEGY) {} Runtime(RuntimeStrategyFactory& strategyFactory) - : Runtime(std::thread::hardware_concurrency(), strategyFactory) {} + : Runtime(getDefaultWorkerCount(), strategyFactory) {} Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory, unsigned int seed = std::random_device()());