diff --git a/emper/Common.hpp b/emper/Common.hpp index 351a194b89456494b41741a9008fe79c82b50d11..6fd18884eaa8626bafd2fe859f8b02bfdc4ca353 100644 --- a/emper/Common.hpp +++ b/emper/Common.hpp @@ -3,7 +3,8 @@ #pragma once #include <functional> -#include <sstream> // IWYU pragma: keep +#include <sstream> // IWYU pragma: keep +#include <type_traits> // IWYU pragma: keep #include "emper-config.h" // IWYU pragma: keep @@ -30,6 +31,9 @@ using func_t = std::function<void()>; #define unlikely(x) __builtin_expect(!!(x), 0) #define ALIGN_TO_CACHE_LINE alignas(64) +#define CACHE_LINE_EXCLUSIVE(T, symbol) \ + std::aligned_storage<64, 64>::type __symbol_mem; \ + T& symbol = *new (&__symbol_mem) T() [[noreturn]] void die(const char* message, bool usePerror); diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index 6a867a05893fed7fecd757acee78caf17a7b2c18..bc5a01cb94a5b9a43c1b1366509f4aa59db0fb5a 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -220,37 +220,30 @@ template void IoContext::submitAndWait<CallerEnvironment::ANYWHERE>(Future &futu template <CallerEnvironment callerEnvironment> auto IoContext::reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned { unsigned reReapCount = 0; - uint32_t maxRaceFreeCompleterAttempts = 1; using Completion = std::pair<int32_t, void *>; // vector to store seen cqes to make the critical section // where cq_lock is held as small as possible std::array<Completion, CQE_BATCH_COUNT> reapedCompletions; - // this label is not used for callerEnvironment::ANYWHERE and thus has to be - // annotated with ATTR_UNUSED -reap_cqes: - ATTR_UNUSED; - // never reap completions on the global IoContext assert(this != runtime.globalIo); LOGD("Reaping completions for worker " << std::to_string(worker->getWorkerId())); +// this label is not used for callerEnvironment::ANYWHERE and thus has to be +// annotated with ATTR_UNUSED +reap_cqes: + ATTR_UNUSED; std::array<struct io_uring_cqe *, CQE_BATCH_COUNT> cqes; if constexpr (needsCqLock) { // Someone else is currently reaping completions - if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - if (unlikely(!cq_lock.try_lock())) { - LOGD("worker unsuccessful try_lock"); - return 0; - } - } else { - if (!cq_lock.try_lock_or_increment()) { - LOGD("Global completer unsuccessful try_lock_or_increment"); - return 0; - } + if (unlikely(!cq_lock.try_lock())) { + LOGD("unsuccessful try_lock"); + return 0; + } + if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { // We have to check the waitInflight flag with the cq_lock held to // ensure we observe an update by the worker holding the lock. // Otherwise this could happen: @@ -288,9 +281,8 @@ reap_cqes: io_uring_cq_advance(&ring, count); - uint32_t globalCompleterAttempts; if constexpr (needsCqLock) { - globalCompleterAttempts = cq_lock.unlock(); + cq_lock.unlock(); } LOGD("got " << count << " cqes from worker " << worker->getWorkerId() << "'s io_uring"); @@ -307,18 +299,8 @@ reap_cqes: // from reaping the additional completions we have a lost wakeup possibly leading // to a completely sleeping runtime with runnable completions in a worker's IoContext. - // To prevent this race the cq_lock counts the unsuccessful tries from - // the globalCompleter. - // If a worker observes that the globalCompleter tried to reapCompletions - // more than twice we know that a lost wakeup could have occurred and we try to - // reap again. - - // In the case a lost wakeup was possible we schedule our reaped cqes - // and try again. - - // On all cq iteration after the first we expect no globalCompleterAttempt - // or in other words a single globalCompleterAttempt attempt means - // additional completions arrive and lost wakeup was possible again. + // To prevent this race we check the CQ again for new cqes after we already + // dropped the CQ lock and possibly reap again. stats.record_reaps<callerEnvironment>(count); @@ -375,26 +357,17 @@ reap_cqes: } } - // check if lost wakeup was possible - if constexpr (needsCqLock && callerEnvironment == CallerEnvironment::EMPER) { - bool reReap = false; - // TODO: How sure are we that this is unlikely? - if (unlikely(globalCompleterAttempts > maxRaceFreeCompleterAttempts)) { - // In all CQ iteration after the first we expect no further globalCompleter attempts - maxRaceFreeCompleterAttempts = 0; - reReap = true; - } else if (count == CQE_BATCH_COUNT) { - // We reaped a full batch, this means there could be potentially - // more CQEs in the completion queue. - reReap = true; - } - + // check if we missed new cqes + // TODO: should only the worker recheck? + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + bool reReap = io_uring_cq_ready(&ring) != 0; if (reReap) { // schedule all already collected continuation fibers runtime.schedule(continuationFibers.data(), posInBuf); reReapCount++; + LOGD("Re-Reaping completions"); goto reap_cqes; } diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index ec9c8eaff646dc1b1d67e805fa9135a040dac91b..95684fd1b0f5dc78ff3d1865ca1e90830222deb4 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -11,6 +11,7 @@ #include <cstdint> // for uint64_t #include <functional> // for less #include <iterator> +#include <mutex> #include <vector> #include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER @@ -28,25 +29,6 @@ class AbstractWorkStealingScheduler; class Fiber; -#ifdef EMPER_IO_CQ_LOCK_COUNTING_TRY_LOCK -#include "lib/sync/CountingTryLock.hpp" -using CqLock = emper::lib::sync::CountingTryLock; - -#elif defined EMPER_IO_CQ_LOCK_MUTEX -#include <mutex> - -#include "lib/sync/PseudoCountingTryLock.hpp" -using CqLock = emper::lib::sync::PseudoCountingTryLock<std::mutex>; - -#elif defined EMPER_IO_CQ_LOCK_SPIN_LOCK -#include "lib/sync/PseudoCountingTryLock.hpp" -#include "lib/sync/SpinLock.hpp" -using CqLock = emper::lib::sync::PseudoCountingTryLock<emper::lib::sync::SpinLock>; - -#else -#error Uknown cq lock implementation -#endif - namespace emper::sleep_strategy { class PipeSleepStrategy; } @@ -73,7 +55,7 @@ class IoContext : public Logger<LogSubsystem::IO> { static constexpr bool needsCqLock = emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none; // TryLock protecting the completion queue of ring. - ALIGN_TO_CACHE_LINE CqLock cq_lock; + CACHE_LINE_EXCLUSIVE(std::mutex, cq_lock); struct io_uring ring; // In a worker's IoContext This eventfd is registered with the io_uring to get completion diff --git a/emper/lib/sync/CountingTryLock.hpp b/emper/lib/sync/CountingTryLock.hpp deleted file mode 100644 index cc67ec1bed0ac615b5a8a4a6c0654cdff2f1252f..0000000000000000000000000000000000000000 --- a/emper/lib/sync/CountingTryLock.hpp +++ /dev/null @@ -1,67 +0,0 @@ -// SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2021 Florian Fischer -#pragma once - -#include <atomic> -#include <climits> - -#include "Common.hpp" - -namespace emper::lib::sync { - -class CountingTryLock { - private: - // The lower 32-bit are used as the actual lock - // The higher 32-bit are used as a counter which is incremented if - // try_lock_or_increment fails to acquire the lock. - - // Layout of our lock union: - // |-------------- countingLock --------------| - // |------- counter -------| - // | lock | - // 0 sizeof(std::atomic<bool>) 64 - // 0 COUNTER_SHIFT 64 - union { - std::atomic<uint64_t> countingLock; - std::atomic<bool> lock; - }; - - static const int COUNTER_SHIFT = 32; - static_assert(sizeof(std::atomic<bool>) * CHAR_BIT < COUNTER_SHIFT); - - static const uint64_t LOCKED = 1; - static const uint64_t UNLOCKED = 0; - - public: - CountingTryLock(bool locked = false) : countingLock(locked ? LOCKED : UNLOCKED) {} - - [[nodiscard]] auto try_lock() -> bool { return !lock.exchange(true, std::memory_order_acquire); } - - [[nodiscard]] auto try_lock_or_increment() -> bool { - uint64_t oldVal, newVal; - oldVal = countingLock.load(std::memory_order_relaxed); - for (;;) { - // currently unlocked -> try to lock - if (oldVal == UNLOCKED) { - newVal = LOCKED; - // currently locked -> increment the counter - } else { - newVal = oldVal + (1L << COUNTER_SHIFT); - } - - if (countingLock.compare_exchange_weak(oldVal, newVal, std::memory_order_acquire, - std::memory_order_relaxed)) { - break; - } - } - - return oldVal == UNLOCKED; - } - - // release the lock, zero and return the counter - auto unlock() -> uint32_t { - uint64_t oldVal = countingLock.exchange(UNLOCKED, std::memory_order_release); - return static_cast<uint32_t>(oldVal >> COUNTER_SHIFT); - } -}; -} // namespace emper::lib::sync diff --git a/emper/lib/sync/PseudoCountingTryLock.hpp b/emper/lib/sync/PseudoCountingTryLock.hpp deleted file mode 100644 index 2b278bad9d531774d40332666a73affac8d70636..0000000000000000000000000000000000000000 --- a/emper/lib/sync/PseudoCountingTryLock.hpp +++ /dev/null @@ -1,25 +0,0 @@ -// SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2021 Florian Fischer -#pragma once - -namespace emper::lib::sync { - -template <class Lockable> -class PseudoCountingTryLock { - private: - Lockable lock; - - public: - [[nodiscard]] auto try_lock() -> bool { - lock.lock(); - return true; - } - - [[nodiscard]] auto try_lock_or_increment() -> bool { return try_lock(); } - - auto unlock() -> uint32_t { - lock.unlock(); - return 0; - } -}; -} // namespace emper::lib::sync diff --git a/emper/sleep_strategy/PipeSleepStrategy.cpp b/emper/sleep_strategy/PipeSleepStrategy.cpp index 3c02694efecafe9853258c58eb76f7fb06a9203d..51ba171323d3c3c99a3b3e5f8724c1fb2dd2808d 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.cpp +++ b/emper/sleep_strategy/PipeSleepStrategy.cpp @@ -6,6 +6,7 @@ #include <atomic> #include <cassert> +#include <mutex> #include "CallerEnvironment.hpp" #include "Emper.hpp" diff --git a/emper/sleep_strategy/PipeSleepStrategy.hpp b/emper/sleep_strategy/PipeSleepStrategy.hpp index d96e69552bcf0e9ee20532a4aef1e29298012a99..6a67cf984956de3043be0660124513e0809e22bf 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.hpp +++ b/emper/sleep_strategy/PipeSleepStrategy.hpp @@ -9,7 +9,6 @@ #include <cstdint> #include <iostream> #include <stdexcept> -#include <type_traits> #include <vector> #include "CallerEnvironment.hpp" @@ -128,8 +127,7 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, Stats stats; // Make sure the shared counter lives in an exlusive cache line - std::aligned_storage<64, 64>::type sleepers_mem; - std::atomic<int64_t>& sleepers = *reinterpret_cast<std::atomic<int64_t>*>(&sleepers_mem); + CACHE_LINE_EXCLUSIVE(std::atomic<int64_t>, sleepers); template <CallerEnvironment callerEnvironment> [[nodiscard]] auto createHint() -> TaggedPtr; diff --git a/meson.build b/meson.build index d6f9cbfd2319991130fdf71ea5217546f4d95bba..ce0f95beadf568a453413c56740467575c32c284 100644 --- a/meson.build +++ b/meson.build @@ -111,9 +111,6 @@ foreach option : io_raw_options conf_data.set('EMPER_IO_' + option.to_upper(), get_option('io_' + option)) endforeach -io_cq_lock_impl = get_option('io_cq_lock_implementation') -conf_data.set('EMPER_IO_CQ_LOCK_' + io_cq_lock_impl.to_upper(), true) - io_completer_behavior = get_option('io_completer_behavior') if io_completer_behavior == 'maybe_wakeup' if get_option('worker_sleep') diff --git a/meson_options.txt b/meson_options.txt index 7c91669b7c0deb530b071ef6a952c144d3939cfe..b8246124a607c106a01b7338c1275f21004bde52 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -140,13 +140,6 @@ option( value: false, description: 'Share a common async backend between all io_urings' ) -option( - 'io_cq_lock_implementation', - type: 'combo', - description: 'The lock implementation used to protect a worker IoContext CQ', - choices: ['spin_lock', 'counting_try_lock', 'mutex'], - value: 'counting_try_lock', -) option( 'io_completer_behavior', type: 'combo',