diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index c3fed23b250ea5431a0750ebaa616baecc4eba95..7419722be1e374183b6f3e7d627091bff38894b9 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -131,6 +131,14 @@ static-analysis-with-emper-io: variables: EMPER_LOCKED_WS_QUEUE: "true" +.futex-wakeup-semaphore: + variables: + EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "futex" + +.locked-wakeup-semaphore: + variables: + EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "locked" + test-gcc: extends: - .test @@ -216,3 +224,13 @@ test-locked-ws-queues: extends: - .test - .locked-ws-queues + +test-futex-wakeup-semaphore: + extends: + - .test + - .futex-wakeup-semaphore + +test-locked-wakeup-semaphore: + extends: + - .test + - .locked-wakeup-semaphore diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index ac89eb76538b1e0c490938f01cfb74c2308f856d..84a587106bfd1ec430b68ddd814c010643ca984a 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -3,7 +3,6 @@ #include "Runtime.hpp" #include <pthread.h> // for pthread_t, pthread_attr_init -#include <semaphore.h> #include <cerrno> // for errno // Non portable. @@ -84,12 +83,6 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory currentRuntime = this; } - // initialize the wakeup semaphore - int err = sem_init(&wakeupSem, 0, 0); - if (err) { - DIE_MSG_ERRNO("initializing wakeup semaphore failed"); - } - // initialize the global and all worker IoContexts if constexpr (emper::IO) { // The global io_uring needs at least workerCount entries in its SQ because diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 5e417ab95942cb9add742c44493a6e3ab9df5f95..15a95f8f4c73becfabb2779683e33fff3d3fa0c5 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -3,7 +3,6 @@ #pragma once #include <pthread.h> // for pthread_t -#include <semaphore.h> #include <cassert> // for assert #include <cstdint> // for intptr_t @@ -23,6 +22,7 @@ #include "Worker.hpp" #include "emper-common.h" // for workerid_t #include "lib/sync/Latch.hpp" // for Latch +#include "lib/sync/WorkerWakeupSemaphore.hpp" class ContextManager; class Dispatcher; @@ -35,6 +35,7 @@ class IoContext; } using emper::io::IoContext; +using emper::lib::sync::WorkerWakeupSemaphore; class Runtime : public Logger<LogSubsystem::RUNTI> { private: @@ -61,7 +62,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { auto workerLoop(Worker* worker) -> void*; - ALIGN_TO_CACHE_LINE sem_t wakeupSem; + ALIGN_TO_CACHE_LINE WorkerWakeupSemaphore wakeupSem; static RuntimeStrategyFactory& DEFAULT_STRATEGY; @@ -72,7 +73,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> inline void wakeupSleepingWorkers() { - int skipWakeupThreshold; + WorkerWakeupSemaphore::CounterType skipWakeupThreshold; if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { // On external work we always increment the semaphore unless we observe // that its value is > workerCount. @@ -94,14 +95,13 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { skipWakeupThreshold = 0; } - int semValue; - sem_getvalue(&wakeupSem, &semValue); + auto semValue = wakeupSem.getValue(); if (semValue > skipWakeupThreshold) { return; } if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::one) { - sem_post(&wakeupSem); + wakeupSem.notify(); } else if constexpr (emper::WORKER_WAKEUP_STRATEGY == emper::WorkerWakeupStrategy::all) { // notify all we observed sleeping // It is sound to increment the semaphore to much, thus this will only cause @@ -115,18 +115,17 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { // Linux sem_getvalue indeed does return 0 // To notify all sleeping workers we increment the semaphore once for each worker. if (semValue == 0) { - semValue = -workerCount; + semValue = workerCount; } - for (; semValue < 0; ++semValue) { - sem_post(&wakeupSem); - } + // make sure that the amount to notify is always positive + wakeupSem.notify_many(semValue < 0 ? -semValue : semValue); } else { ABORT("Unknown CallerEnvironment"); } } - void dispatchLoopSleep() { sem_wait(&wakeupSem); } + void dispatchLoopSleep() { wakeupSem.wait(); } inline auto getGlobalIo() -> IoContext* { if constexpr (emper::IO) { diff --git a/emper/lib/sync/LockedSemaphore.hpp b/emper/lib/sync/LockedSemaphore.hpp new file mode 100644 index 0000000000000000000000000000000000000000..1cfacb288febfb473b74e9017cf3a61eafee68ca --- /dev/null +++ b/emper/lib/sync/LockedSemaphore.hpp @@ -0,0 +1,44 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#pragma once + +#include <condition_variable> +#include <mutex> + +#include "Common.hpp" +namespace emper::lib::sync { + +class LockedSemaphore { + public: + using CounterType = unsigned int; + + private: + std::mutex m; + std::condition_variable c; + CounterType counter; + + public: + LockedSemaphore(CounterType counter = 0) : counter(counter) {} + + [[nodiscard]] inline auto getValue() const -> CounterType { return counter; } + + inline void notify() { + std::unique_lock<std::mutex> lock(m); + ++counter; + c.notify_one(); + } + + inline void notify_many(CounterType count) { + std::unique_lock<std::mutex> lock(m); + counter += count; + c.notify_all(); + } + + inline void wait() { + std::unique_lock<std::mutex> lock(m); + c.wait(lock, [this]() { return counter > 0; }); + counter--; + } +}; + +} // namespace emper::lib::sync diff --git a/emper/lib/sync/PosixSemaphore.hpp b/emper/lib/sync/PosixSemaphore.hpp new file mode 100644 index 0000000000000000000000000000000000000000..015cbdece12d87a0048716f7f4435b1513f167a3 --- /dev/null +++ b/emper/lib/sync/PosixSemaphore.hpp @@ -0,0 +1,50 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#include <semaphore.h> + +#include "Common.hpp" + +namespace emper::lib::sync { + +class PosixSemaphore { + public: + using CounterType = int; + + private: + sem_t s; + + public: + PosixSemaphore(CounterType counter = 0) { + int err = sem_init(&s, 0, counter); + if (unlikely(err)) { + DIE_MSG_ERRNO("sem_init failed"); + } + } + + ~PosixSemaphore() { + int err = sem_destroy(&s); + if (unlikely(err)) { + DIE_MSG_ERRNO("sem_destroy failed"); + } + } + + [[nodiscard]] inline auto getValue() -> CounterType { + CounterType sval; + sem_getvalue(&s, &sval); + return sval; + } + + inline void notify() { sem_post(&s); } + + inline void notify_many(CounterType count) { + for (; count > 0; --count) { + notify(); + } + } + + inline void wait() { sem_wait(&s); } +}; + +} // namespace emper::lib::sync diff --git a/emper/lib/sync/Semaphore.hpp b/emper/lib/sync/Semaphore.hpp index ae07235448041e634992c0c2f6ce33b6959cde8b..7e97ba18bee429d5eb0a091d15c0c280d8bb375f 100644 --- a/emper/lib/sync/Semaphore.hpp +++ b/emper/lib/sync/Semaphore.hpp @@ -2,37 +2,8 @@ // Copyright © 2020 Florian Schmaus #pragma once -#include <condition_variable> -#include <mutex> +#include "lib/sync/LockedSemaphore.hpp" namespace emper::lib::sync { - -class Semaphore { - private: - std::mutex m; - std::condition_variable c; - unsigned int counter; - - public: - Semaphore(unsigned int counter = 0) : counter(counter) {} - - inline void notify() { - std::unique_lock<std::mutex> lock(m); - ++counter; - c.notify_one(); - } - - inline void notify_many(unsigned int count) { - std::unique_lock<std::mutex> lock(m); - counter += count; - c.notify_all(); - } - - inline void wait() { - std::unique_lock<std::mutex> lock(m); - c.wait(lock, [this]() { return counter > 0; }); - counter--; - } -}; - +using Semaphore = LockedSemaphore; } // namespace emper::lib::sync diff --git a/emper/lib/sync/SpuriousFutexSemaphore.hpp b/emper/lib/sync/SpuriousFutexSemaphore.hpp new file mode 100644 index 0000000000000000000000000000000000000000..ae202f592518692dac487cfa72d7c6672026df64 --- /dev/null +++ b/emper/lib/sync/SpuriousFutexSemaphore.hpp @@ -0,0 +1,68 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#include <linux/futex.h> +#include <sys/syscall.h> +#include <unistd.h> + +#include <atomic> + +#include "Common.hpp" + +namespace emper::lib::sync { + +// ATTENTION: This semaphore implementation is prone to spurious wakeups! +// SpuriousFutexSemaphore::wait() may return spurious without a call to +// SpuriousFutexSemaphore::notify(). +// Use this class only where it is safe to have spurious wakeups. +// It could be used to wakeup and suspend worker threads and does so maximal efficient +// by only using a single atomic operation and a possible system call per method. +class SpuriousFutexSemaphore { + public: + using CounterType = long; + + private: + // >= 0 means no waiters + // < 0 means there are waiters + std::atomic<CounterType> counter; + + public: + SpuriousFutexSemaphore(CounterType counter = 0) : counter(counter) {} + + [[nodiscard]] inline auto getValue() const -> CounterType { + return counter.load(std::memory_order_relaxed); + } + + inline void notify_many(CounterType count) { + // Using std::memory_order_relaxed is fine since we don't + // give any memory synchronization guaranties because of the + // possible spurious wakeups + CounterType c = counter.fetch_add(count, std::memory_order_relaxed); + if (c < 0) { + syscall(SYS_futex, &counter, FUTEX_WAKE, count, 0, 0, 0); + } + } + + inline void notify() { notify_many(1); } + + inline void wait() { + // See fetch_add comment. + CounterType c = counter.fetch_sub(1, std::memory_order_relaxed) - 1; + while (c < 0) { + long err = syscall(SYS_futex, &counter, FUTEX_WAIT, c, 0, 0, 0); + // The futex(2) manpage says that applications should check the futex word + // again because it is possible to have spurious wakeups. + // We can't check the futex value because there is no way to decided + // if someone called notify. + // A sound generic Semaphore needs to split its atomic counter into + // a semaphore value and a waiter count part. + if (!err) { + return; + } + c = getValue(); + } + } +}; + +} // namespace emper::lib::sync diff --git a/emper/lib/sync/WorkerWakeupSemaphore.hpp b/emper/lib/sync/WorkerWakeupSemaphore.hpp new file mode 100644 index 0000000000000000000000000000000000000000..357e9a780abef1b6d6150347c2a965ec39ad23c5 --- /dev/null +++ b/emper/lib/sync/WorkerWakeupSemaphore.hpp @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#ifdef EMPER_LOCKED_WAKEUP_SEMAPHORE +#include "lib/sync/LockedSemaphore.hpp" +#endif + +#ifdef EMPER_POSIX_WAKEUP_SEMAPHORE +#include "lib/sync/PosixSemaphore.hpp" +#endif + +#ifdef EMPER_FUTEX_WAKEUP_SEMAPHORE +#include "lib/sync/SpuriousFutexSemaphore.hpp" +#endif + +namespace emper::lib::sync { +#ifdef EMPER_LOCKED_WAKEUP_SEMAPHORE +using WorkerWakeupSemaphore = LockedSemaphore; + +#elif defined EMPER_POSIX_WAKEUP_SEMAPHORE +using WorkerWakeupSemaphore = PosixSemaphore; + +#elif defined EMPER_FUTEX_WAKEUP_SEMAPHORE +using WorkerWakeupSemaphore = SpuriousFutexSemaphore; + +#else +#error Unknown WorkerWakeupSemaphore implementation +#endif +} // namespace emper::lib::sync diff --git a/meson.build b/meson.build index f2038dac7de9b11085af0dbd61ebd85ea052d182..e5b6b9d0378882e12053ff6346a6058c25908c1a 100644 --- a/meson.build +++ b/meson.build @@ -34,12 +34,15 @@ endif conf_data.set('EMPER_WORKER_SLEEP', get_option('worker_sleep')) conf_data.set('EMPER_WORKER_WAKEUP_STRATEGY', get_option('worker_wakeup_strategy')) conf_data.set('EMPER_LOCKED_WS_QUEUE', get_option('locked_ws_queue')) -conf_data.set('EMPER_OVERFLOW_QUEUE', get_option('overflow_queue')) conf_data.set('EMPER_LOCKED_MPSC_QUEUE', get_option('locked_mpsc_queue')) +conf_data.set('EMPER_OVERFLOW_QUEUE', get_option('overflow_queue')) conf_data.set('EMPER_STATS', get_option('stats')) conf_data.set('EMPER_OVERFLOW_QUEUE', get_option('overflow_queue')) conf_data.set('EMPER_BLOCKED_CONTEXT_SET', get_option('blocked_context_set')) +semaphore_impl = get_option('wakeup_semaphore_implementation') +conf_data.set('EMPER_' + semaphore_impl.to_upper() + '_WAKEUP_SEMAPHORE', true) + default_scheduling_strategy = get_option('default_scheduling_strategy') conf_data.set('EMPER_DEFAULT_SCHEDULING_STRATEGY_' + default_scheduling_strategy.to_upper(), true) diff --git a/meson_options.txt b/meson_options.txt index ab067837f023da3b784c9b7b462d4ea884c41dba..95ce83775779e98a1b30d8aa22024ef29d534fa1 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -41,6 +41,17 @@ option( value: false, description: 'Use a fully locked queue for work-stealing', ) +option( + 'wakeup_semaphore_implementation', + type: 'combo', + choices: [ + 'posix', + 'locked', + 'futex', + ], + value: 'posix', + description: 'Semaphore implementation to suspend/wakeup workers', +) option( 'overflow_queue', type: 'boolean',