Skip to content
Snippets Groups Projects
Commit 58d376aa authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Merge branch 'env_options' into 'master'

[Runtime] add env options to define workerCount and pinningOffset

See merge request !109
parents 2712c004 365f11db
Branches
No related tags found
1 merge request!109[Runtime] add env options to define workerCount and pinningOffset
Pipeline #58687 passed
......@@ -12,7 +12,9 @@
#include <cstdlib> // for rand, srand, abort
#include <cstring>
#include <memory> // for __shared_ptr_access, shared_ptr
#include <ostream>
#include <string> // for string
#include <thread>
#include "Common.hpp" // for DIE_MSG_ERRNO, DIE, DIE_MSG
#include "Context.hpp"
......@@ -44,8 +46,6 @@
#ifndef EMPER_LOG_OFF
#include <syscall.h> // for SYS_gettid
#include <unistd.h> // for syscall
#include <ostream> // for operator<<, basic_ostream<>:...
#endif
std::mutex Runtime::currentRuntimeMutex;
......@@ -109,6 +109,17 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory
IoContext::startGlobalCompleter(*globalIo);
}
// Core id we start the worker pinning
workerid_t pinningOffset = 0;
char* pinningOffsetEnv = std::getenv("EMPER_PINNING_OFFSET");
if (pinningOffsetEnv) {
int pinningOffsetInt = std::stoi(pinningOffsetEnv);
if (pinningOffsetInt > UINT8_MAX) {
DIE_MSG("Pinning offset " << pinningOffsetInt << " to big for its datatype");
}
pinningOffset = static_cast<workerid_t>(pinningOffsetInt);
}
for (workerid_t i = 0; i < workerCount; ++i) {
pthread_attr_t attr;
errno = pthread_attr_init(&attr);
......@@ -117,7 +128,7 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory
// Start non portable.
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i % nprocs, &cpuset);
CPU_SET((i + pinningOffset) % nprocs, &cpuset);
errno = pthread_attr_setaffinity_np(&attr, sizeof(cpuset), &cpuset);
if (errno) DIE_MSG_ERRNO("pthread_attr_setaffinity_np() failed");
......@@ -208,6 +219,19 @@ auto Runtime::workerLoop(Worker* worker) -> void* {
return nullptr;
}
auto Runtime::getDefaultWorkerCount() -> workerid_t {
char* workerCountEnv = std::getenv("EMPER_WORKER_COUNT");
if (workerCountEnv) {
int workerCountInt = std::stoi(workerCountEnv);
if (workerCountInt > UINT8_MAX) {
DIE_MSG("Worker count " << workerCountInt << " to big for its datatype");
}
return static_cast<workerid_t>(workerCountInt);
}
return std::thread::hardware_concurrency();
}
void Runtime::yield() {
Context* context = Context::getCurrentContext();
contextManager.saveAndStartNew([context, this] {
......
......@@ -10,7 +10,6 @@
#include <functional> // for function
#include <mutex> // for mutex, lock_guard, unique_lock
#include <random>
#include <thread> // for thread
#include <vector> // for vector
#include "CallerEnvironment.hpp"
......@@ -68,6 +67,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
static void printLastRuntimeStats();
static auto getDefaultWorkerCount() -> workerid_t;
protected:
void addNewWorkerHook(const std::function<void(void)>& hook) { newWorkerHooks.push_back(hook); };
......@@ -136,12 +137,12 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
}
public:
Runtime() : Runtime(std::thread::hardware_concurrency()) {}
Runtime() : Runtime(getDefaultWorkerCount()) {}
Runtime(workerid_t workerCount) : Runtime(workerCount, DEFAULT_STRATEGY) {}
Runtime(RuntimeStrategyFactory& strategyFactory)
: Runtime(std::thread::hardware_concurrency(), strategyFactory) {}
: Runtime(getDefaultWorkerCount(), strategyFactory) {}
Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory,
unsigned int seed = std::random_device()());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment