diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index 44cff730eff2a243738e2f4d842af00eed9fe420..bfd680a340fed2d5e50be02da4583970dd66910c 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -12,7 +12,6 @@ #include <cassert> // for assert #include <cerrno> // for errno, ECANCELED, EBUSY, EAGAIN, EINTR #include <cstring> // for memset -#include <memory> #include <ostream> // for basic_osteram::operator<<, operator<< #include <string> #include <utility> @@ -31,7 +30,7 @@ #include "io/Stats.hpp" // for Stats, nanoseconds #include "io/SubmitActor.hpp" #include "lib/TaggedPtr.hpp" -#include "sleep_strategy/PipeSleepStrategy.hpp" +#include "sleep_strategy/PipeSleepStrategy.hpp" // IWYU pragma: keep using emper::lib::TaggedPtr; using emper::sleep_strategy::PipeSleepStrategy; @@ -240,16 +239,30 @@ reap_cqes: return 0; } } else { - // We can not reap completions of this IoContext to not race + if (!cq_lock.try_lock_or_increment()) { + LOGD("Global completer unsuccessful try_lock_or_increment"); + return 0; + } + + // We have to check the waitInflight flag with the cq_lock held to + // ensure we observe an update by the worker holding the lock. + // Otherwise this could happen: + // C W + // | lock + // | prepare to sleep + // check flag | + // | set flag + // | unlock + // lock | + + // Which results in the Completer possible consuming new work notifications. + + // We must not reap completions of this IoContext to not race // with the sleeping worker. if (waitInflight.load(std::memory_order_acquire)) { LOGD("Not reaping worker " << std::to_string(worker->getWorkerId()) << " since this worker is already waiting for its CQEs"); - return 0; - } - - if (!cq_lock.try_lock_or_increment()) { - LOGD("Global completer unsuccessful try_lock_or_increment"); + cq_lock.unlock(); return 0; } } @@ -318,18 +331,9 @@ reap_cqes: switch (tag) { case PointerTags::NewWorkWsq: case PointerTags::NewWorkAq: { - if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { - DIE_MSG("Completer reaping new work notification"); - } - - LOGD("Got new work notification"); - auto *pipeSleepStrategy = - reinterpret_cast<PipeSleepStrategy *>(&runtime.getWorkerSleepStrategy()); - statsIncr(pipeSleepStrategy->stats->wakeupDueToNotify); - - // Reset flag to indicate that a new sleep cqe must be prepared - // and allow the completer to reap completions again - waitInflight.store(false, std::memory_order_release); + auto &sleepStrategy = + reinterpret_cast<PipeSleepStrategy &>(runtime.getWorkerSleepStrategy()); + sleepStrategy.onNewWorkNotification<callerEnvironment>(*this); break; } diff --git a/emper/sleep_strategy/PipeSleepStrategy.cpp b/emper/sleep_strategy/PipeSleepStrategy.cpp index 48ac397c4cb9550d262ccae1d9471f5f04163c76..31e5c1287c9573d8125dc31025ad260369cab88c 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.cpp +++ b/emper/sleep_strategy/PipeSleepStrategy.cpp @@ -35,12 +35,54 @@ template auto PipeSleepStrategy::createHint<CallerEnvironment::ANYWHERE>() -> Ta void PipeSleepStrategy::sleep() { IoContext& io = *IoContext::getWorkerIo(); - const bool mustPrepareSqe = !io.waitInflight; + // Only we are setting the flag therefore it is safe to use relaxed here + const bool mustPrepareSqe = !io.waitInflight.load(std::memory_order_relaxed); if (mustPrepareSqe) { + // If there is a completer we have to ensure that the completer does not + // reap our new work notification rendering the worker sleeping forever. + // The waitInflight flag is not enough because it is possible set + // while the completer is currently reaping. + // Only using a flag means there is this possibility: + // C W1 W2 Kernel + // | | calls sleep | + // | | | generate cqe in W2's CQ + // check waitInflight | | | + // | | inc sleeper count | + // | | prepare read sqe | + // | | submit read sqe | + // | | set waitInflight | + // | notify | | + // | observe sleepers | | + // | write to pipe | | + // | | | complete W2's read + // reap W2 CQ | | | + // consume newWork cqe | | | + // | | await completions | + // | | sleep forever | + + // Just setting the waitInflight flag earlier is no solution. + + // Our approach to this race is to repurpose the lock protecting the CQ to ensure the + // completer is not reaping our CQ while we are about to sleep. + + // This has also the advantage that new work generated by the completer + // reaping our CQ may be faster scheduled and executed this worker + + // Try to take the cq_lock + if constexpr (IoContext::needsCqLock) { + if (unlikely(!io.cq_lock.try_lock())) { + LOGD("Completer is currently reaping -> skip sleeping to handle possible continuations"); + return; + } + } + // increment the sleeper count if it was negative we should skip sleeping int64_t sleeping = sleepers.fetch_add(1, std::memory_order_acquire); if (sleeping < 0) { LOGD("observed sleeper count as: " << sleeping << " -> skip sleeping"); + if constexpr (IoContext::needsCqLock) { + io.cq_lock.unlock(); + } return; } @@ -67,6 +109,11 @@ void PipeSleepStrategy::sleep() { // on our IoContext otherwise we would neither safely wakeup nor reset // the waitInflight flag. io.waitInflight.store(true, std::memory_order_acquire); + LOGD("set waitinflight"); + + if constexpr (IoContext::needsCqLock) { + io.cq_lock.unlock(); + } } // Wait for IO completions @@ -77,6 +124,23 @@ void PipeSleepStrategy::sleep() { statsIncr(stats->wakeup); } +template <CallerEnvironment callerEnvironment> +void PipeSleepStrategy::onNewWorkNotification(IoContext& io) { + if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { + DIE_MSG("Completer reaping new work notification from " << io.worker->getWorkerId() << " CQ"); + } + + LOGD("Got new work notification"); + statsIncr(stats->wakeupDueToNotify); + + // Reset flag to indicate that a new sleep cqe must be prepared + // and allow the completer to reap completions again + io.waitInflight.store(false, std::memory_order_release); +} + +template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::EMPER>(IoContext& io); +template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::ANYWHERE>(IoContext& io); + auto operator<<(std::ostream& os, const PipeSleepStrategy::Stats& s) -> std::ostream& { os << "PipeSleepStrategy Stats:" << std::endl; os << "total-onNewWork: " << std::to_string(s.onNewWork) << std::endl; diff --git a/emper/sleep_strategy/PipeSleepStrategy.hpp b/emper/sleep_strategy/PipeSleepStrategy.hpp index 5dc4030411cabf3c2ad08f4e0bdaff3d49c6f100..4bacee4ef161d12432d0944e914f6ed63e6e1b3d 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.hpp +++ b/emper/sleep_strategy/PipeSleepStrategy.hpp @@ -61,7 +61,7 @@ namespace emper::sleep_strategy { * Atomic increment sleep count * Remember that we are sleeping * Prepare read cqe from the hint pipe to dispatch hint buffer - * Prevent the completer from reaping completions on this worker IoContext + * Prevent the completer from reaping completions on this worker IoContext * Wait until IO completions occurred * * NotifyEmper(n): @@ -286,6 +286,8 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, // notify<callerEnvironment>(createHint<callerEnvironment>(), specific); } + template <CallerEnvironment callerEnvironment> + void onNewWorkNotification(emper::io::IoContext& io); void sleep(); friend auto operator<<(std::ostream& os, const PipeSleepStrategy::Stats& s) -> std::ostream&;