diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index 627794785ed2844dd8359cdde8c7db36648ff99e..e28262b8fc12d4787b330e24ebd59603912b0550 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -10,7 +10,6 @@ #include <atomic> // for atomic, __atomic_base #include <cassert> // for assert #include <cerrno> // for errno, ECANCELED, EBUSY, EAGAIN, EINTR -#include <chrono> // for nanoseconds #include <cstdio> // for perror #include <cstring> // for memset #include <memory> // for allocator @@ -32,6 +31,12 @@ static const uintptr_t IOCONTEXT_TAG = 1L << (sizeof(size_t) * 8 - 1); static const uintptr_t IOCONTEXT_TAG_MASK = IOCONTEXT_TAG - 1; +static inline auto isIoContext(uintptr_t ptr) -> bool { return (ptr & IOCONTEXT_TAG) != 0; } + +static inline auto stripIoContextTag(uintptr_t ptr) -> IoContext * { + return reinterpret_cast<IoContext *>(ptr & IOCONTEXT_TAG_MASK); +} + namespace emper::io { thread_local IoContext *IoContext::workerIo = nullptr; @@ -61,7 +66,7 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns future.prepareSqe(sqe); // Someone wants to be notified about the completion of this Future - if (!(future.isForgotten())) { + if (!future.isForgotten()) { io_uring_sqe_set_data(sqe, &future); } @@ -78,10 +83,7 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns template <CallerEnvironment callerEnvironment> void IoContext::submit(Future &future) { LOGD("submitting " << future << (future.dependency ? " and it's dependencies" : "")); -#ifdef NDEBUG - UNUSED_ARG -#endif - int prepared = static_cast<int>(prepareFutureChain(future, 1)); + unsigned prepared = prepareFutureChain(future, 1); // submit the Future to the io_uring int submitted = io_uring_submit(&ring); @@ -102,8 +104,8 @@ void IoContext::submit(Future &future) { if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { // we are not busy looping in the globalIo - stats.record_io_submit_full_cq(std::chrono::nanoseconds(0)); - return; + stats.record_io_submit_full_cq(); + DIE_MSG("Future" << future << " dropped because global completer SQ is full"); } TIME_NS( @@ -128,7 +130,7 @@ void IoContext::submit(Future &future) { // req1 -> invalid_req -> req3 // will submit only 2 instead of all 3 prepared sqes // See: https://github.com/axboe/liburing/issues/186 - if (unlikely(submitted < prepared)) { + if (unlikely(static_cast<unsigned>(submitted) < prepared)) { unsigned unsubmitted = io_uring_sq_ready(&ring); Future *unsubmittedFuture = &future; while (unsubmitted) { @@ -190,10 +192,8 @@ void IoContext::reapCompletions() { continue; } - if constexpr (emper::DEBUG) { - // assert that the future was previously in the uringFutureSet - assert(uringFutureSet.erase(future) > 0); - } + // assert that the future was previously in the uringFutureSet + assert(uringFutureSet.erase(future) > 0); future->recordCompletion(stats, cqe->res); if constexpr (callerEnvironment == EMPER) { @@ -258,7 +258,7 @@ auto IoContext::globalCompleterFunc(void *arg) -> void * { auto data = (uintptr_t)io_uring_cqe_get_data(cqe); // The cqe is for a completed Future - if (!(data & IOCONTEXT_TAG)) { + if (unlikely(!isIoContext(data))) { auto *future = reinterpret_cast<Future *>(data); uint32_t res = cqe->res; @@ -271,7 +271,7 @@ auto IoContext::globalCompleterFunc(void *arg) -> void * { // The cqe is for a IoContext.eventfd read // -> there are completions on this worker IoContext - auto *worker_io = reinterpret_cast<IoContext *>(data & IOCONTEXT_TAG_MASK); + auto *worker_io = stripIoContextTag(data); assert(worker_io); io_uring_cqe_seen(&io.ring, cqe); diff --git a/emper/io/Stats.hpp b/emper/io/Stats.hpp index 95afacb111fab842b6158bb91c4e2adc710e210a..498c8b6f33a95e32fc5d838bc1615491ed976872 100644 --- a/emper/io/Stats.hpp +++ b/emper/io/Stats.hpp @@ -178,7 +178,7 @@ class Stats { // running mean calculation taken from // https://math.stackexchange.com/questions/106700/incremental-averageing - inline void record_io_submit_full_cq(nanoseconds ns) { + inline void record_io_submit_full_cq(nanoseconds ns = std::chrono::nanoseconds(0)) { RETURN_IF_NO_STATS(); io_submit_full_cq++; int64_t diff = (ns.count() - io_submit_full_cq_running_mean) / io_submit_full_cq;