diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index e1a67910083e41b02bb39085fbec4305d197f6f6..42d0d9c0e2f39b125154241670732ab525f9635e 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -53,7 +53,7 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy, unsigned int randomEngine(seed), atLeastOneWorkerIsSleeping(false) { threads = new pthread_t[workerCount]; - workerIds = new workerid_t[workerCount]; + workerArgs = new struct WorkerArgs[workerCount]; const int nprocs = get_nprocs(); @@ -79,17 +79,22 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy, unsigned int if (errno) DIE_MSG_ERRNO("pthread_attr_setaffinity_np() failed"); // End non portable. - // Load the worker ID into the worker ID map. - workerIds[i] = i; + // Load the worker ID into the worker Argument array. + workerArgs[i].id = i; + // Load the worker PRNG seed into the Argument array. + // This is not done by each thread individually because the randomEngine is + // mutated and therefore produces data races. + workerArgs[i].seed = uniformIntDistribution(randomEngine); - auto thread_function = [](void* voidWorkerId) -> void* { + auto thread_function = [](void* voidWorkerArgs) -> void* { #ifdef EMPER_LIBURCU rcu_register_thread(); #endif - return currentRuntime->workerLoop(voidWorkerId); + struct WorkerArgs* workerArgs = reinterpret_cast<struct WorkerArgs*>(voidWorkerArgs); + return currentRuntime->workerLoop(workerArgs); }; - errno = pthread_create(&threads[i], &attr, thread_function, &workerIds[i]); + errno = pthread_create(&threads[i], &attr, thread_function, &workerArgs[i]); if (errno) DIE_MSG_ERRNO("pthread_create() failed"); } @@ -124,8 +129,10 @@ Runtime::~Runtime() { DBG("Runtime " << this << " terminated"); } -auto Runtime::workerLoop(void* voidWorkerId) -> void* { - workerId = *(workerid_t*)voidWorkerId; +auto Runtime::workerLoop(struct WorkerArgs* workerArgs) -> void* { + workerId = workerArgs->id; + seed = workerArgs->seed; + LOGD("Worker loop started by thread " << syscall(SYS_gettid)); int oldType; @@ -134,9 +141,6 @@ auto Runtime::workerLoop(void* voidWorkerId) -> void* { DIE_MSG_ERRNO("pthread_setcanceltype() failed"); } - // Initialze the workers PRNG seed. - seed = uniformIntDistribution(randomEngine); - for (const auto& f : newWorkerHooks) f(); workerLatch.count_down_and_wait(); diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 4234655148e49ba4937d9adc1288f97e7db6bc55..95c35b8c11c76d713195d13b6cc37c487ed92c05 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -44,12 +44,18 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { Dispatcher& dispatcher; ContextManager& contextManager; pthread_t* threads; - workerid_t* workerIds; + + // Arguments passed to a new worker thread + struct WorkerArgs { + workerid_t id; + unsigned int seed; + }; + struct WorkerArgs* workerArgs; std::default_random_engine randomEngine; std::uniform_int_distribution<unsigned int> uniformIntDistribution; - auto workerLoop(void* workerId) -> void*; + auto workerLoop(struct WorkerArgs* workerArgs) -> void*; std::mutex workerSleepMutex; std::condition_variable workerSleepConditionVariable;