Skip to content
Snippets Groups Projects
Commit da94f93c authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Merge branch 'fix-pipe-sleep-strategy' into 'master'

[PipeSleepStrategy] prevent the still present race to cqes with the completer

See merge request !249
parents ecc4c2ca 7a08804d
No related branches found
No related tags found
1 merge request!249[PipeSleepStrategy] prevent the still present race to cqes with the completer
Pipeline #68109 passed
......@@ -12,7 +12,6 @@
#include <cassert> // for assert
#include <cerrno> // for errno, ECANCELED, EBUSY, EAGAIN, EINTR
#include <cstring> // for memset
#include <memory>
#include <ostream> // for basic_osteram::operator<<, operator<<
#include <string>
#include <utility>
......@@ -31,7 +30,7 @@
#include "io/Stats.hpp" // for Stats, nanoseconds
#include "io/SubmitActor.hpp"
#include "lib/TaggedPtr.hpp"
#include "sleep_strategy/PipeSleepStrategy.hpp"
#include "sleep_strategy/PipeSleepStrategy.hpp" // IWYU pragma: keep
using emper::lib::TaggedPtr;
using emper::sleep_strategy::PipeSleepStrategy;
......@@ -240,16 +239,30 @@ reap_cqes:
return 0;
}
} else {
// We can not reap completions of this IoContext to not race
if (!cq_lock.try_lock_or_increment()) {
LOGD("Global completer unsuccessful try_lock_or_increment");
return 0;
}
// We have to check the waitInflight flag with the cq_lock held to
// ensure we observe an update by the worker holding the lock.
// Otherwise this could happen:
// C W
// | lock
// | prepare to sleep
// check flag |
// | set flag
// | unlock
// lock |
// Which results in the Completer possible consuming new work notifications.
// We must not reap completions of this IoContext to not race
// with the sleeping worker.
if (waitInflight.load(std::memory_order_acquire)) {
LOGD("Not reaping worker " << std::to_string(worker->getWorkerId())
<< " since this worker is already waiting for its CQEs");
return 0;
}
if (!cq_lock.try_lock_or_increment()) {
LOGD("Global completer unsuccessful try_lock_or_increment");
cq_lock.unlock();
return 0;
}
}
......@@ -318,18 +331,9 @@ reap_cqes:
switch (tag) {
case PointerTags::NewWorkWsq:
case PointerTags::NewWorkAq: {
if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) {
DIE_MSG("Completer reaping new work notification");
}
LOGD("Got new work notification");
auto *pipeSleepStrategy =
reinterpret_cast<PipeSleepStrategy *>(&runtime.getWorkerSleepStrategy());
statsIncr(pipeSleepStrategy->stats->wakeupDueToNotify);
// Reset flag to indicate that a new sleep cqe must be prepared
// and allow the completer to reap completions again
waitInflight.store(false, std::memory_order_release);
auto &sleepStrategy =
reinterpret_cast<PipeSleepStrategy &>(runtime.getWorkerSleepStrategy());
sleepStrategy.onNewWorkNotification<callerEnvironment>(*this);
break;
}
......
......@@ -35,12 +35,54 @@ template auto PipeSleepStrategy::createHint<CallerEnvironment::ANYWHERE>() -> Ta
void PipeSleepStrategy::sleep() {
IoContext& io = *IoContext::getWorkerIo();
const bool mustPrepareSqe = !io.waitInflight;
// Only we are setting the flag therefore it is safe to use relaxed here
const bool mustPrepareSqe = !io.waitInflight.load(std::memory_order_relaxed);
if (mustPrepareSqe) {
// If there is a completer we have to ensure that the completer does not
// reap our new work notification rendering the worker sleeping forever.
// The waitInflight flag is not enough because it is possible set
// while the completer is currently reaping.
// Only using a flag means there is this possibility:
// C W1 W2 Kernel
// | | calls sleep |
// | | | generate cqe in W2's CQ
// check waitInflight | | |
// | | inc sleeper count |
// | | prepare read sqe |
// | | submit read sqe |
// | | set waitInflight |
// | notify | |
// | observe sleepers | |
// | write to pipe | |
// | | | complete W2's read
// reap W2 CQ | | |
// consume newWork cqe | | |
// | | await completions |
// | | sleep forever |
// Just setting the waitInflight flag earlier is no solution.
// Our approach to this race is to repurpose the lock protecting the CQ to ensure the
// completer is not reaping our CQ while we are about to sleep.
// This has also the advantage that new work generated by the completer
// reaping our CQ may be faster scheduled and executed this worker
// Try to take the cq_lock
if constexpr (IoContext::needsCqLock) {
if (unlikely(!io.cq_lock.try_lock())) {
LOGD("Completer is currently reaping -> skip sleeping to handle possible continuations");
return;
}
}
// increment the sleeper count if it was negative we should skip sleeping
int64_t sleeping = sleepers.fetch_add(1, std::memory_order_acquire);
if (sleeping < 0) {
LOGD("observed sleeper count as: " << sleeping << " -> skip sleeping");
if constexpr (IoContext::needsCqLock) {
io.cq_lock.unlock();
}
return;
}
......@@ -67,6 +109,11 @@ void PipeSleepStrategy::sleep() {
// on our IoContext otherwise we would neither safely wakeup nor reset
// the waitInflight flag.
io.waitInflight.store(true, std::memory_order_acquire);
LOGD("set waitinflight");
if constexpr (IoContext::needsCqLock) {
io.cq_lock.unlock();
}
}
// Wait for IO completions
......@@ -77,6 +124,23 @@ void PipeSleepStrategy::sleep() {
statsIncr(stats->wakeup);
}
template <CallerEnvironment callerEnvironment>
void PipeSleepStrategy::onNewWorkNotification(IoContext& io) {
if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) {
DIE_MSG("Completer reaping new work notification from " << io.worker->getWorkerId() << " CQ");
}
LOGD("Got new work notification");
statsIncr(stats->wakeupDueToNotify);
// Reset flag to indicate that a new sleep cqe must be prepared
// and allow the completer to reap completions again
io.waitInflight.store(false, std::memory_order_release);
}
template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::EMPER>(IoContext& io);
template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::ANYWHERE>(IoContext& io);
auto operator<<(std::ostream& os, const PipeSleepStrategy::Stats& s) -> std::ostream& {
os << "PipeSleepStrategy Stats:" << std::endl;
os << "total-onNewWork: " << std::to_string(s.onNewWork) << std::endl;
......
......@@ -61,7 +61,7 @@ namespace emper::sleep_strategy {
* Atomic increment sleep count
* Remember that we are sleeping
* Prepare read cqe from the hint pipe to dispatch hint buffer
* Prevent the completer from reaping completions on this worker IoContext
* Prevent the completer from reaping completions on this worker IoContext
* Wait until IO completions occurred
*
* NotifyEmper(n):
......@@ -286,6 +286,8 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>,
// notify<callerEnvironment>(createHint<callerEnvironment>(), specific);
}
template <CallerEnvironment callerEnvironment>
void onNewWorkNotification(emper::io::IoContext& io);
void sleep();
friend auto operator<<(std::ostream& os, const PipeSleepStrategy::Stats& s) -> std::ostream&;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment