diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 279cb1156b85d433add8a566d6c670c716f78666..94f24b0c47c6175a962c59c273d2e2201c40d95d 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -183,6 +183,10 @@ clang-tidy: variables: EMPER_LOCKED_WS_QUEUE: "true" +.waitfree-ws: + variables: + EMPER_WAITFREE_WORK_STEALING: "true" + .futex-wakeup-semaphore: variables: EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "futex" @@ -286,6 +290,11 @@ test-locked-ws-queues: - .test - .locked-ws-queues +test-waitfree-ws: + extends: + - .test + - .waitfree-ws + test-futex-wakeup-semaphore: extends: - .test diff --git a/emper/Emper.hpp b/emper/Emper.hpp index ff3c27f2acb554bf93888d2f3c9105d44b714d8e..c3bf3f2e0b4a2ae8eacab555a5eecefb04a7f37e 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -59,6 +59,14 @@ static const bool LIBURCU = #endif ; +static const bool WAITFREE_WORK_STEALING = +#ifdef EMPER_WAITFREE_WORK_STEALING + true +#else + false +#endif + ; + static const bool DEBUG = #ifndef NDEBUG true @@ -159,6 +167,14 @@ static const bool IO_URING_SQPOLL = #endif ; +static const bool WAITFREE_IO_STEALING = +#ifdef EMPER_IO_WAITFREE_STEALING + true +#else + false +#endif + ; + // Initialize this bool in Emper.cpp because it needs code evaluation // (LinuxVersion::compare) during runtime. // Using a static variable here means EACH object file including this header has to diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index a178ad01303867c8a994d50b75dedf18ea0aefe2..d537a464a4b87d5a0c92130e34835a5e82ce64ce 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -411,8 +411,8 @@ class IoContext : public Logger { */ template [[nodiscard]] auto reapCompletions(Fiber **continuations) -> unsigned { - // Why reap more than there are entries in the CQ - assert(toReap <= CQE_BATCH_COUNT); + static_assert(toReap <= CQE_BATCH_COUNT, + "No need to reap more than there are entries in the CQ"); // never reap completions on the global IoContext assert(this != reinterpret_cast(Runtime::getRuntime()->globalIo)); @@ -440,6 +440,13 @@ class IoContext : public Logger { template [[nodiscard]] auto reapSingleCompletion() -> Fiber * { Fiber *fiber; + + if constexpr (emper::WAITFREE_IO_STEALING) { + auto res = tryReapCompletionWaitFree(&fiber); + if (res == emper::StealingResult::Stolen) return fiber; + return nullptr; + } + return reapCompletions(&fiber) == 1 ? fiber : nullptr; } diff --git a/emper/lib/adt/LockedQueue.hpp b/emper/lib/adt/LockedQueue.hpp index 6344ccd468107234f42c8cbdfa915cd4606f7452..f2f8b2fe3e57a805a291fad6ae831917ea8ad25d 100644 --- a/emper/lib/adt/LockedQueue.hpp +++ b/emper/lib/adt/LockedQueue.hpp @@ -6,6 +6,7 @@ #include #include "Common.hpp" +#include "StealingResult.hpp" namespace adt { @@ -44,16 +45,17 @@ class LockedQueue { return true; } - auto popTop(I* itemPtr) -> bool { + template + auto popTop(I* itemPtr) -> emper::StealingResult { std::lock_guard lock(queue_mutex); - if (deque.empty()) return false; + if (deque.empty()) return emper::StealingResult::Empty; *itemPtr = deque.front(); deque.pop_front(); - return true; + return emper::StealingResult::Stolen; } auto popBottom(I* itemPtr) -> bool { diff --git a/emper/lib/adt/WsClQueue.hpp b/emper/lib/adt/WsClQueue.hpp index 23c1c19deb51e74b37b88cfd2fc6e19097fedb1c..34112f0e10d7f6f02158c313692539a6804e8f50 100644 --- a/emper/lib/adt/WsClQueue.hpp +++ b/emper/lib/adt/WsClQueue.hpp @@ -6,6 +6,7 @@ #include #include "Common.hpp" +#include "StealingResult.hpp" namespace adt { @@ -47,7 +48,8 @@ class WsClQueue { // TODO: Decide what to do regarding the following suppressed lint. // NOLINTNEXTLINE(readability-avoid-const-params-in-decls) auto pushBottom(const PAYLOAD item) -> bool; - auto popTop(PAYLOAD *item) -> bool; + template + auto popTop(PAYLOAD *item) -> emper::StealingResult; auto popBottom(PAYLOAD *item) -> bool; [[nodiscard]] auto isFull() const -> bool; [[nodiscard]] auto isEmpty() const -> bool; @@ -71,10 +73,14 @@ auto WsClQueue::pushBottom(const PAYLOAD item) -> bool { } template -auto WsClQueue::popTop(PAYLOAD *item) -> bool { -start: +template +auto WsClQueue::popTop(PAYLOAD *item) -> emper::StealingResult { + int retries = 0; uint64_t oldTop = top; - if (bottom <= oldTop) return false; + +loop: + ATTR_UNUSED; + if (bottom <= oldTop) return emper::StealingResult::Empty; // Read fence / memory barrier atomic_thread_fence(std::memory_order_acquire); @@ -87,11 +93,20 @@ start: // If cas fails, then this popTop() lost the race against another // popTop(). - if (!top.compare_exchange_weak( - oldTop, newTop)) //, std::memory_order_release, std::memory_order_relaxed)) - goto start; + if (top.compare_exchange_weak(oldTop, + newTop)) //, std::memory_order_release, std::memory_order_relaxed)) + return emper::StealingResult::Stolen; - return true; + // Loop indefinitely + if constexpr (maxRetries < 0) goto loop; + // Loop maxRetries times + if constexpr (maxRetries > 0) { + if (retries == maxRetries) return emper::StealingResult::LostRace; + + ++retries; + } + + return emper::StealingResult::LostRace; } template diff --git a/emper/lib/adt/WsClV2Queue.hpp b/emper/lib/adt/WsClV2Queue.hpp index afc41b809057dd70914c43109f44fb9c4ca24b93..8e4d88a2c46e15da6045fdd78d312e5117e69fab 100644 --- a/emper/lib/adt/WsClV2Queue.hpp +++ b/emper/lib/adt/WsClV2Queue.hpp @@ -6,6 +6,7 @@ #include #include "Common.hpp" +#include "StealingResult.hpp" namespace adt { @@ -52,7 +53,8 @@ class WsClV2Queue { WsClV2Queue() : top(1) {} auto pushBottom(ITEM_TYPE item) -> bool; auto popBottom(ITEM_TYPE *item) -> bool; - auto popTop(ITEM_TYPE *item) -> bool; + template + auto popTop(ITEM_TYPE *item) -> emper::StealingResult; [[nodiscard]] auto isFull() const -> bool; [[nodiscard]] auto isEmpty() const -> bool; void print() const; @@ -106,10 +108,13 @@ auto WsClV2Queue::popBottom(ITEM_TYPE *item) -> bool { } template -auto WsClV2Queue::popTop(ITEM_TYPE *item) -> bool { -start: +template +auto WsClV2Queue::popTop(ITEM_TYPE *item) -> emper::StealingResult { + int retries = 0; uint64_t localTop = top.load(std::memory_order_acquire); +loop: + ATTR_UNUSED; // Initially this fence was after the check if the queue is // empty. But since the shared variable 'bottom' is read by the // check, I believe it belongs before the check. @@ -117,23 +122,27 @@ start: atomic_thread_fence(std::memory_order_acquire); // Check if the queue is empty. - if (bottom <= localTop) return false; + if (bottom <= localTop) return emper::StealingResult::Empty; *item = queue[localTop % CAPACITY]; // If cas fails, then this popTop() lost the race against another // popTop() or a rare-case popBottom(). - if (!top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, - std::memory_order_relaxed)) { - // TODO: Return 'ABORT' enum value here. -#ifdef WAIT_FREE_WS_QUEUE - return false; -#else - goto start; -#endif + if (top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, + std::memory_order_relaxed)) { + return emper::StealingResult::Stolen; } - return true; + // Loop indefinitely + if constexpr (maxRetries < 0) goto loop; + // Loop maxRetries times + if constexpr (maxRetries > 0) { + if (retires == maxRetries) return emper::StealingResult::LostRace; + + ++retries; + } + + return emper::StealingResult::LostRace; } template diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index 5f20567f130017815a6b7abb01ff1e94560b42ae..a4a6d251437244389d6f9108b31ae276331cb1c8 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -15,6 +15,7 @@ #include "Fiber.hpp" #include "NextFiberResult.hpp" #include "Runtime.hpp" // for Runtime +#include "StealingResult.hpp" #include "Worker.hpp" #include "emper-common.h" // for workerid_t #include "io/IoContext.hpp" @@ -24,6 +25,7 @@ using awss = AbstractWorkStealingStrategy; +using emper::StealingResult; using emper::io::IoContext; thread_local AbstractWorkStealingScheduler::WsQueue @@ -126,10 +128,11 @@ auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() auto AbstractWorkStealingScheduler::tryStealFiberFrom(workerid_t victim) -> std::optional> { + constexpr int maxRetries = emper::WAITFREE_WORK_STEALING ? 0 : -1; Fiber* fiber; popTop: - bool poped = queues[victim]->popTop(&fiber); - if (poped) { + StealingResult res = queues[victim]->popTop(&fiber); + if (res == StealingResult::Stolen) { emper::statsIncr(awss::stats.nextFiberStolen); if (maybeRecycle(fiber)) goto popTop; diff --git a/meson.build b/meson.build index c5c70a36ba9f5e708863fc801bd7baaada414e02..6979092c18d9bcdbdb676528442f9655e4850892 100644 --- a/meson.build +++ b/meson.build @@ -82,6 +82,8 @@ endif conf_data.set('EMPER_WS_VICTIM_COUNT', ws_victim_count) conf_data.set('EMPER_WS_VICTIM_DENOMINATOR', ws_victim_denominator) +conf_data.set('EMPER_WAITFREE_WORK_STEALING', get_option('waitfree_work_stealing')) + log_level = get_option('log_level') if log_level == 'automatic' # output only error messages in release builds @@ -96,12 +98,16 @@ if option_io endif io_bool_options = [ - 'stealing', - 'lockless_cq', - 'single_uring', - 'try_syscall', - 'uring_sqpoll', - 'uring_shared_wq', + {'option': 'stealing'}, + {'option': 'lockless_cq'}, + {'option': 'single_uring', + 'dependencies': {'io_uring_shared_wq': false, + 'io_completer_behavior': 'schedule'}}, + {'option': 'try_syscall'}, + {'option': 'uring_sqpoll'}, + {'option': 'uring_shared_wq'}, + {'option': 'waitfree_stealing', + 'dependencies': {'io_stealing': true, 'io_lockless_cq': true}}, ] io_raw_options = [ @@ -109,11 +115,25 @@ io_raw_options = [ 'lockless_memory_order', ] -foreach option : io_bool_options +foreach option_dict : io_bool_options + option = option_dict['option'] value = get_option('io_' + option) - if value == true and not option_io + if not value + continue + endif + + if not option_io error('io_' + option + ' defined without io') endif + + dependencies = option_dict.get('dependencies', {}) + foreach dependency, dep_value: dependencies + if get_option(dependency) != dep_value + error('io_' + option + ' defined but dependency ' + dependency + ':' + + dep_value.to_string() + ' not satisfied') + endif + endforeach + conf_data.set('EMPER_IO_' + option.to_upper(), value) endforeach @@ -143,17 +163,6 @@ else check_anywhere_queue_while_stealing == 'true') endif -# check io meson options consistency -if get_option('io_single_uring') - if not (io_completer_behavior == 'schedule') - error('Using a single io_uring need a "scheduling" completer') - endif - - if get_option('io_uring_shared_wq') - error('Sharing io_uring ressources is useless when using a single io_uring') - endif -endif - subdir('emper') subdir('tests') subdir('apps') diff --git a/meson_options.txt b/meson_options.txt index ce30c5efc8ee1f74fa7aeaafe50738fc5fef8feb..8e2b537e16b616fe10ce5e0a1378467d9fd307e8 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -93,6 +93,12 @@ option( ], value: 'work_stealing', ) +option( + 'waitfree_work_stealing', + type: 'boolean', + value: false, + description: 'Do not retry stealing when the popTop race was lost' +) option( 'work_stealing_victim_count', type: 'integer', @@ -180,6 +186,12 @@ option( description: 'Work-Stealing workers will also try to steal IO from other workers', value: false, ) +option( + 'io_waitfree_stealing', + type: 'boolean', + description: 'Work-Stealing workers will also try to steal waitfree IO from other workers', + value: false, +) option( 'io_lockless_cq', type: 'boolean',