From db5d1b342c95fb652c7fed4b25143e77b9f550bb Mon Sep 17 00:00:00 2001 From: Florian Fischer <florian.fl.fischer@fau.de> Date: Thu, 3 Dec 2020 10:24:38 +0100 Subject: [PATCH] prevent data races when initializing the workers PRNG seeds Each worker currently calls uniformIntDistribution(randomEngine) which modifies the randomEngine internally and thus produces data races when the threads run in parallel. This change calls uniformIntDistribution(randomEngine) on the main thread for each worker and passes the resulting seeds to the workerLoop. The data race was found by gcc's tsan. --- emper/Runtime.cpp | 26 +++++++++++++++----------- emper/Runtime.hpp | 10 ++++++++-- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index e1a67910..42d0d9c0 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -53,7 +53,7 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy, unsigned int randomEngine(seed), atLeastOneWorkerIsSleeping(false) { threads = new pthread_t[workerCount]; - workerIds = new workerid_t[workerCount]; + workerArgs = new struct WorkerArgs[workerCount]; const int nprocs = get_nprocs(); @@ -79,17 +79,22 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy, unsigned int if (errno) DIE_MSG_ERRNO("pthread_attr_setaffinity_np() failed"); // End non portable. - // Load the worker ID into the worker ID map. - workerIds[i] = i; + // Load the worker ID into the worker Argument array. + workerArgs[i].id = i; + // Load the worker PRNG seed into the Argument array. + // This is not done by each thread individually because the randomEngine is + // mutated and therefore produces data races. + workerArgs[i].seed = uniformIntDistribution(randomEngine); - auto thread_function = [](void* voidWorkerId) -> void* { + auto thread_function = [](void* voidWorkerArgs) -> void* { #ifdef EMPER_LIBURCU rcu_register_thread(); #endif - return currentRuntime->workerLoop(voidWorkerId); + struct WorkerArgs* workerArgs = reinterpret_cast<struct WorkerArgs*>(voidWorkerArgs); + return currentRuntime->workerLoop(workerArgs); }; - errno = pthread_create(&threads[i], &attr, thread_function, &workerIds[i]); + errno = pthread_create(&threads[i], &attr, thread_function, &workerArgs[i]); if (errno) DIE_MSG_ERRNO("pthread_create() failed"); } @@ -124,8 +129,10 @@ Runtime::~Runtime() { DBG("Runtime " << this << " terminated"); } -auto Runtime::workerLoop(void* voidWorkerId) -> void* { - workerId = *(workerid_t*)voidWorkerId; +auto Runtime::workerLoop(struct WorkerArgs* workerArgs) -> void* { + workerId = workerArgs->id; + seed = workerArgs->seed; + LOGD("Worker loop started by thread " << syscall(SYS_gettid)); int oldType; @@ -134,9 +141,6 @@ auto Runtime::workerLoop(void* voidWorkerId) -> void* { DIE_MSG_ERRNO("pthread_setcanceltype() failed"); } - // Initialze the workers PRNG seed. - seed = uniformIntDistribution(randomEngine); - for (const auto& f : newWorkerHooks) f(); workerLatch.count_down_and_wait(); diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 42346551..95c35b8c 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -44,12 +44,18 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { Dispatcher& dispatcher; ContextManager& contextManager; pthread_t* threads; - workerid_t* workerIds; + + // Arguments passed to a new worker thread + struct WorkerArgs { + workerid_t id; + unsigned int seed; + }; + struct WorkerArgs* workerArgs; std::default_random_engine randomEngine; std::uniform_int_distribution<unsigned int> uniformIntDistribution; - auto workerLoop(void* workerId) -> void*; + auto workerLoop(struct WorkerArgs* workerArgs) -> void*; std::mutex workerSleepMutex; std::condition_variable workerSleepConditionVariable; -- GitLab