diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index d35ccf21ced59e7f7ed45d2ba2bbadaa2c780985..128cc204662d8ac38352503b582345659e320e83 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -362,6 +362,11 @@ template auto IoContext::reapCompletionsLockless<CallerEnvironment::ANYWHERE, IoContext::CQE_BATCH_COUNT>(Fiber **contiunations) -> unsigned; +#define UNLOCK_IF_NEEDED() \ + do { \ + if constexpr (needsCqLock) cq_lock.unlock(); \ + } while (false); + template <CallerEnvironment callerEnvironment, unsigned toReap> auto IoContext::reapCompletionsLocked(Fiber **continuations) -> unsigned { // TODO: should only the owner possibly rereap? @@ -383,29 +388,9 @@ reap_cqes: // Number of actual continuation fibers resulting from the reaped CQEs unsigned continuationsCount = 0; - // TODO: Is using a try lock and the sleepState here even sound? - // Coudn't it be possible to have a lost wakeup with unconsumed new work notification - // cqe in our CQ - // - // State: - // Only a single worker does work involving IO and - // another (completer, io-stealing worker accesses its CQ. - - // Other Owner - // | submit IO - // | lock - // | prepare to sleep - // | set flag - // | unlock - // | sleep - // lock | - // | try lock unsucessfull - // | sleep again - // check flag | - // unlock | if constexpr (needsCqLock) { // The Owner always takes the lock to reap all completions and especially - // new work notifications and prevent the above discribed problem. + // new work notifications. if constexpr (callerEnvironment == CallerEnvironment::OWNER) { cq_lock.lock(); } else { @@ -414,36 +399,12 @@ reap_cqes: LOGD("unsuccessful try_lock from " << callerEnvironment); return 0; } - // We have to check the sleepState with the cq_lock held to - // ensure we observe an update by the worker holding the lock. - // Otherwise this could happen: - // Other Owner - // | | - // | lock - // | prepare to sleep - // check flag | - // | set flag - // | unlock - // lock | - - // Which results in the Other possible consuming new work notifications. - - // We must not reap completions of this IoContext to not race - // with the sleeping owner. - if (!PipeSleepStrategy::sleepState.isSafeToReap()) { - LOGD("Not reaping worker " << std::to_string(worker->getWorkerId()) - << " since this worker is already waiting for its CQEs"); - cq_lock.unlock(); - return 0; - } } } unsigned count = io_uring_peek_batch_cqe(&ring, cqes, toReap); if (!count) { - if constexpr (needsCqLock) { - cq_lock.unlock(); - } + UNLOCK_IF_NEEDED() if constexpr (checkForRereap) { goto check_for_rereap; @@ -459,13 +420,19 @@ reap_cqes: auto &reapedCompletion = reapedCompletions[i]; reapedCompletion.first = cqe->res; reapedCompletion.second = cqe_data; + + // Do not reap newWorkNotifications if we are no the owner + if constexpr (callerEnvironment != CallerEnvironment::OWNER) { + const auto tag = static_cast<PointerTags>(TaggedPtr(cqe_data).getTag()); + if (tag == PointerTags::NewWorkNotification) { + UNLOCK_IF_NEEDED() + return 0; + } + } } io_uring_cq_advance(&ring, count); - - if constexpr (needsCqLock) { - cq_lock.unlock(); - } + UNLOCK_IF_NEEDED() LOGD("got " << count << " cqes from worker " << worker->getWorkerId() << "'s io_uring"); diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 1d6158c81cb41d5454bf15e3625cf33697684dcd..7fd316ca9669b56d0374c51acd35ce71da37c8e8 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -99,13 +99,6 @@ class IoContext : public Logger<LogSubsystem::IO> { // This only happens if emper is build with the IO_SINGLE_URING option SubmitActor *submitter = nullptr; - // State of a worker when using a io_uring based sleep strategy. - // Gets modfied by the worker on SleepStrategy::sleep(), on IO - // completion and when calling SleepStrategy::notifySpecific(). - // It also prevents the completer from reaping completions on this IoContext - // to prevent races between the worker and the completer. - PipeSleepStrategy::SleepState sleepState; - Stats stats; // Members useful for debugging diff --git a/emper/sleep_strategy/PipeSleepStrategy.cpp b/emper/sleep_strategy/PipeSleepStrategy.cpp index 17880944f25a83910501bf908f46187f25b9bdf0..d855504bcca043a3fda55319b06712771d0294ad 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.cpp +++ b/emper/sleep_strategy/PipeSleepStrategy.cpp @@ -6,7 +6,6 @@ #include <atomic> #include <cassert> -#include <mutex> #include "CallerEnvironment.hpp" #include "Runtime.hpp" @@ -54,58 +53,13 @@ void PipeSleepStrategy::sleep() { } if (SleepState::shouldPrepareGlobalRead(observedState)) { - // If there is a completer we have to ensure that the completer does not - // reap our new work notification rendering the worker sleeping forever. - // The Global flag is not enough because it is possible set - // while the completer is currently reaping. - // Only using a flag means there is this possibility: - // C W1 W2 Kernel - // | | calls sleep | - // | | | generate cqe in W2's CQ - // check waitInflight | | | - // | | inc sleeper count | - // | | prepare read sqe | - // | | submit read sqe | - // | | set waitInflight | - // | notify | | - // | observe sleepers | | - // | write to pipe | | - // | | | complete W2's read - // reap W2 CQ | | | - // consume newWork cqe | | | - // | | await completions | - // | | sleep forever | - - // Just setting the Global flag earlier is no solution. - - // Our approach to this race is to repurpose the lock protecting the CQ to ensure the - // completer is not reaping our CQ while we are about to sleep. - - // This has also the advantage that new work generated by the completer - // reaping our CQ may be faster scheduled and executed this worker - - // Try to take the cq_lock - if constexpr (IoContext::needsCqLock) { - if (unlikely(!io.cq_lock.try_lock())) { - LOGD("Completer is currently reaping -> skip sleeping to handle possible continuations"); - return; - } - } - // increment the sleeper count if it was negative we should skip sleeping int64_t sleeping = sleepers.fetch_add(1, std::memory_order_acquire); if (sleeping < 0) { LOGD("observed sleeper count as: " << sleeping << " -> skip sleeping"); - if constexpr (IoContext::needsCqLock) { - io.cq_lock.unlock(); - } return; } - // Before going to sleep prevent the completer from reaping completions - // on our IoContext otherwise we would neither safely wakeup nor reset - // the waitInflight flag. - // Also signal others to write to the specific pipe when calling notifySpecific. LOGD("mark state as sleeping"); observedState = sleepState.markSleeping(); if (SleepState::isNotified(observedState)) { @@ -133,10 +87,6 @@ void PipeSleepStrategy::sleep() { io.trackReqsInUring(1); LOGD("prepared global.sleepFd read and set sleepers count to: " << sleeping + 1); - - if constexpr (IoContext::needsCqLock) { - io.cq_lock.unlock(); - } } // If we reach this Global is definitly set diff --git a/emper/sleep_strategy/PipeSleepStrategy.hpp b/emper/sleep_strategy/PipeSleepStrategy.hpp index c7047899d64277676a5ce2e316ae19362f5fcaa7..b5a109858fa143fa9893d3efc9897c9c2f88d68b 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.hpp +++ b/emper/sleep_strategy/PipeSleepStrategy.hpp @@ -141,7 +141,7 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, enum _State { Notified = 1 << 0, /*!< The worker was already notified specifically */ Specific = 1 << 1, /*!< The worker is reading from its pipe */ - Global = 1 << 2, /*!< The worker is reading from the global pipe (is sleeping) */ + Global = 1 << 2, /*!< The worker is reading from the global pipe */ }; }; @@ -178,18 +178,6 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, */ auto _loadAcquire() const -> uint8_t { return s.load(std::memory_order_acquire); } - /** - * @brief Is it safe to reap IO-completion on a worker in this state - * - * It is not safe to reap completions on a worker which has a read on - * the global pipe prepared because than it is or about to sleep. - * - * @return true if the state contains Global, false otherwise - */ - [[nodiscard]] auto isSafeToReap() const -> bool { - return s.load(std::memory_order_acquire) & State::Global; - } - /** * @brief Mark the state as notified *