Skip to content
Snippets Groups Projects
Commit c1c70c25 authored by Florian Fischer's avatar Florian Fischer
Browse files

[IO] fail hard if the globalCompleter would drop a Future

Currently the globalCompleter would drop Futures which he is
unable to submit to its SQ because io_uring_submit returned -EBUSY,
signalizing a full CQ.
For now we fail hard if this hopefully unlikely condition occurs.

Additional small code changes:
* don't wrap assert in if constexpr(DEBUG)
* annotate Future completion in globalCompleter as unlikely
* use prepared Future count as unsigned
* remove redundant parenthesis
* introduce helper function for IoContext* tagging
* introduce Stats::record_io_submit_full_cq default parameter
parent 6ead33f3
No related branches found
No related tags found
No related merge requests found
Pipeline #57418 passed
......@@ -10,7 +10,6 @@
#include <atomic> // for atomic, __atomic_base
#include <cassert> // for assert
#include <cerrno> // for errno, ECANCELED, EBUSY, EAGAIN, EINTR
#include <chrono> // for nanoseconds
#include <cstdio> // for perror
#include <cstring> // for memset
#include <memory> // for allocator
......@@ -32,6 +31,12 @@
static const uintptr_t IOCONTEXT_TAG = 1L << (sizeof(size_t) * 8 - 1);
static const uintptr_t IOCONTEXT_TAG_MASK = IOCONTEXT_TAG - 1;
static inline auto isIoContext(uintptr_t ptr) -> bool { return (ptr & IOCONTEXT_TAG) != 0; }
static inline auto stripIoContextTag(uintptr_t ptr) -> IoContext * {
return reinterpret_cast<IoContext *>(ptr & IOCONTEXT_TAG_MASK);
}
namespace emper::io {
thread_local IoContext *IoContext::workerIo = nullptr;
......@@ -61,7 +66,7 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns
future.prepareSqe(sqe);
// Someone wants to be notified about the completion of this Future
if (!(future.isForgotten())) {
if (!future.isForgotten()) {
io_uring_sqe_set_data(sqe, &future);
}
......@@ -78,10 +83,7 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns
template <CallerEnvironment callerEnvironment>
void IoContext::submit(Future &future) {
LOGD("submitting " << future << (future.dependency ? " and it's dependencies" : ""));
#ifdef NDEBUG
UNUSED_ARG
#endif
int prepared = static_cast<int>(prepareFutureChain(future, 1));
unsigned prepared = prepareFutureChain(future, 1);
// submit the Future to the io_uring
int submitted = io_uring_submit(&ring);
......@@ -102,8 +104,8 @@ void IoContext::submit(Future &future) {
if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) {
// we are not busy looping in the globalIo
stats.record_io_submit_full_cq(std::chrono::nanoseconds(0));
return;
stats.record_io_submit_full_cq();
DIE_MSG("Future" << future << " dropped because global completer SQ is full");
}
TIME_NS(
......@@ -128,7 +130,7 @@ void IoContext::submit(Future &future) {
// req1 -> invalid_req -> req3
// will submit only 2 instead of all 3 prepared sqes
// See: https://github.com/axboe/liburing/issues/186
if (unlikely(submitted < prepared)) {
if (unlikely(static_cast<unsigned>(submitted) < prepared)) {
unsigned unsubmitted = io_uring_sq_ready(&ring);
Future *unsubmittedFuture = &future;
while (unsubmitted) {
......@@ -190,10 +192,8 @@ void IoContext::reapCompletions() {
continue;
}
if constexpr (emper::DEBUG) {
// assert that the future was previously in the uringFutureSet
assert(uringFutureSet.erase(future) > 0);
}
// assert that the future was previously in the uringFutureSet
assert(uringFutureSet.erase(future) > 0);
future->recordCompletion(stats, cqe->res);
if constexpr (callerEnvironment == EMPER) {
......@@ -258,7 +258,7 @@ auto IoContext::globalCompleterFunc(void *arg) -> void * {
auto data = (uintptr_t)io_uring_cqe_get_data(cqe);
// The cqe is for a completed Future
if (!(data & IOCONTEXT_TAG)) {
if (unlikely(!isIoContext(data))) {
auto *future = reinterpret_cast<Future *>(data);
uint32_t res = cqe->res;
......@@ -271,7 +271,7 @@ auto IoContext::globalCompleterFunc(void *arg) -> void * {
// The cqe is for a IoContext.eventfd read
// -> there are completions on this worker IoContext
auto *worker_io = reinterpret_cast<IoContext *>(data & IOCONTEXT_TAG_MASK);
auto *worker_io = stripIoContextTag(data);
assert(worker_io);
io_uring_cqe_seen(&io.ring, cqe);
......
......@@ -178,7 +178,7 @@ class Stats {
// running mean calculation taken from
// https://math.stackexchange.com/questions/106700/incremental-averageing
inline void record_io_submit_full_cq(nanoseconds ns) {
inline void record_io_submit_full_cq(nanoseconds ns = std::chrono::nanoseconds(0)) {
RETURN_IF_NO_STATS();
io_submit_full_cq++;
int64_t diff = (ns.count() - io_submit_full_cq_running_mean) / io_submit_full_cq;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment