From c57a23beeb7a486a98b605cd33a84b496c4a7cac Mon Sep 17 00:00:00 2001 From: Florian Fischer <florian.fischer@muhq.space> Date: Sun, 26 Dec 2021 15:29:59 +0100 Subject: [PATCH] check sqe tags before reaping Change the mechanisms how EMPER achieves the invariant that only the OWNER of an IoContext is allowed to reap new work notifications from it. Previously we used the state of the PipeSleepStrategy which proved complex and error prone. Now we always check if the completions we are about to reap contain any new work notifications and if so return early without reaping those. Now the behavior of reap locked equals the lock-less variants. --- emper/io/IoContext.cpp | 67 ++++++---------------- emper/io/IoContext.hpp | 7 --- emper/sleep_strategy/PipeSleepStrategy.cpp | 50 ---------------- emper/sleep_strategy/PipeSleepStrategy.hpp | 14 +---- 4 files changed, 18 insertions(+), 120 deletions(-) diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index d35ccf21..128cc204 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -362,6 +362,11 @@ template auto IoContext::reapCompletionsLockless<CallerEnvironment::ANYWHERE, IoContext::CQE_BATCH_COUNT>(Fiber **contiunations) -> unsigned; +#define UNLOCK_IF_NEEDED() \ + do { \ + if constexpr (needsCqLock) cq_lock.unlock(); \ + } while (false); + template <CallerEnvironment callerEnvironment, unsigned toReap> auto IoContext::reapCompletionsLocked(Fiber **continuations) -> unsigned { // TODO: should only the owner possibly rereap? @@ -383,29 +388,9 @@ reap_cqes: // Number of actual continuation fibers resulting from the reaped CQEs unsigned continuationsCount = 0; - // TODO: Is using a try lock and the sleepState here even sound? - // Coudn't it be possible to have a lost wakeup with unconsumed new work notification - // cqe in our CQ - // - // State: - // Only a single worker does work involving IO and - // another (completer, io-stealing worker accesses its CQ. - - // Other Owner - // | submit IO - // | lock - // | prepare to sleep - // | set flag - // | unlock - // | sleep - // lock | - // | try lock unsucessfull - // | sleep again - // check flag | - // unlock | if constexpr (needsCqLock) { // The Owner always takes the lock to reap all completions and especially - // new work notifications and prevent the above discribed problem. + // new work notifications. if constexpr (callerEnvironment == CallerEnvironment::OWNER) { cq_lock.lock(); } else { @@ -414,36 +399,12 @@ reap_cqes: LOGD("unsuccessful try_lock from " << callerEnvironment); return 0; } - // We have to check the sleepState with the cq_lock held to - // ensure we observe an update by the worker holding the lock. - // Otherwise this could happen: - // Other Owner - // | | - // | lock - // | prepare to sleep - // check flag | - // | set flag - // | unlock - // lock | - - // Which results in the Other possible consuming new work notifications. - - // We must not reap completions of this IoContext to not race - // with the sleeping owner. - if (!PipeSleepStrategy::sleepState.isSafeToReap()) { - LOGD("Not reaping worker " << std::to_string(worker->getWorkerId()) - << " since this worker is already waiting for its CQEs"); - cq_lock.unlock(); - return 0; - } } } unsigned count = io_uring_peek_batch_cqe(&ring, cqes, toReap); if (!count) { - if constexpr (needsCqLock) { - cq_lock.unlock(); - } + UNLOCK_IF_NEEDED() if constexpr (checkForRereap) { goto check_for_rereap; @@ -459,13 +420,19 @@ reap_cqes: auto &reapedCompletion = reapedCompletions[i]; reapedCompletion.first = cqe->res; reapedCompletion.second = cqe_data; + + // Do not reap newWorkNotifications if we are no the owner + if constexpr (callerEnvironment != CallerEnvironment::OWNER) { + const auto tag = static_cast<PointerTags>(TaggedPtr(cqe_data).getTag()); + if (tag == PointerTags::NewWorkNotification) { + UNLOCK_IF_NEEDED() + return 0; + } + } } io_uring_cq_advance(&ring, count); - - if constexpr (needsCqLock) { - cq_lock.unlock(); - } + UNLOCK_IF_NEEDED() LOGD("got " << count << " cqes from worker " << worker->getWorkerId() << "'s io_uring"); diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 1d6158c8..7fd316ca 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -99,13 +99,6 @@ class IoContext : public Logger<LogSubsystem::IO> { // This only happens if emper is build with the IO_SINGLE_URING option SubmitActor *submitter = nullptr; - // State of a worker when using a io_uring based sleep strategy. - // Gets modfied by the worker on SleepStrategy::sleep(), on IO - // completion and when calling SleepStrategy::notifySpecific(). - // It also prevents the completer from reaping completions on this IoContext - // to prevent races between the worker and the completer. - PipeSleepStrategy::SleepState sleepState; - Stats stats; // Members useful for debugging diff --git a/emper/sleep_strategy/PipeSleepStrategy.cpp b/emper/sleep_strategy/PipeSleepStrategy.cpp index 17880944..d855504b 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.cpp +++ b/emper/sleep_strategy/PipeSleepStrategy.cpp @@ -6,7 +6,6 @@ #include <atomic> #include <cassert> -#include <mutex> #include "CallerEnvironment.hpp" #include "Runtime.hpp" @@ -54,58 +53,13 @@ void PipeSleepStrategy::sleep() { } if (SleepState::shouldPrepareGlobalRead(observedState)) { - // If there is a completer we have to ensure that the completer does not - // reap our new work notification rendering the worker sleeping forever. - // The Global flag is not enough because it is possible set - // while the completer is currently reaping. - // Only using a flag means there is this possibility: - // C W1 W2 Kernel - // | | calls sleep | - // | | | generate cqe in W2's CQ - // check waitInflight | | | - // | | inc sleeper count | - // | | prepare read sqe | - // | | submit read sqe | - // | | set waitInflight | - // | notify | | - // | observe sleepers | | - // | write to pipe | | - // | | | complete W2's read - // reap W2 CQ | | | - // consume newWork cqe | | | - // | | await completions | - // | | sleep forever | - - // Just setting the Global flag earlier is no solution. - - // Our approach to this race is to repurpose the lock protecting the CQ to ensure the - // completer is not reaping our CQ while we are about to sleep. - - // This has also the advantage that new work generated by the completer - // reaping our CQ may be faster scheduled and executed this worker - - // Try to take the cq_lock - if constexpr (IoContext::needsCqLock) { - if (unlikely(!io.cq_lock.try_lock())) { - LOGD("Completer is currently reaping -> skip sleeping to handle possible continuations"); - return; - } - } - // increment the sleeper count if it was negative we should skip sleeping int64_t sleeping = sleepers.fetch_add(1, std::memory_order_acquire); if (sleeping < 0) { LOGD("observed sleeper count as: " << sleeping << " -> skip sleeping"); - if constexpr (IoContext::needsCqLock) { - io.cq_lock.unlock(); - } return; } - // Before going to sleep prevent the completer from reaping completions - // on our IoContext otherwise we would neither safely wakeup nor reset - // the waitInflight flag. - // Also signal others to write to the specific pipe when calling notifySpecific. LOGD("mark state as sleeping"); observedState = sleepState.markSleeping(); if (SleepState::isNotified(observedState)) { @@ -133,10 +87,6 @@ void PipeSleepStrategy::sleep() { io.trackReqsInUring(1); LOGD("prepared global.sleepFd read and set sleepers count to: " << sleeping + 1); - - if constexpr (IoContext::needsCqLock) { - io.cq_lock.unlock(); - } } // If we reach this Global is definitly set diff --git a/emper/sleep_strategy/PipeSleepStrategy.hpp b/emper/sleep_strategy/PipeSleepStrategy.hpp index c7047899..b5a10985 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.hpp +++ b/emper/sleep_strategy/PipeSleepStrategy.hpp @@ -141,7 +141,7 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, enum _State { Notified = 1 << 0, /*!< The worker was already notified specifically */ Specific = 1 << 1, /*!< The worker is reading from its pipe */ - Global = 1 << 2, /*!< The worker is reading from the global pipe (is sleeping) */ + Global = 1 << 2, /*!< The worker is reading from the global pipe */ }; }; @@ -178,18 +178,6 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, */ auto _loadAcquire() const -> uint8_t { return s.load(std::memory_order_acquire); } - /** - * @brief Is it safe to reap IO-completion on a worker in this state - * - * It is not safe to reap completions on a worker which has a read on - * the global pipe prepared because than it is or about to sleep. - * - * @return true if the state contains Global, false otherwise - */ - [[nodiscard]] auto isSafeToReap() const -> bool { - return s.load(std::memory_order_acquire) & State::Global; - } - /** * @brief Mark the state as notified * -- GitLab