diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index d827a1a0dc43896a9bf745781cb392eecd162d01..4a4e20930b5a9368f62df06541adb650a78dcfd6 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -59,6 +59,9 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns future.prepareSqe(sqe); + // remember the prepared sqe to invalidate it if it was not properly submitted + preparedSqes.push_back(sqe); + // we should start a new Fiber executing callback on completion if (future.callback) { LOGD("prepare " << future << " Callback " << future.callback); @@ -81,31 +84,28 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns } template <CallerEnvironment callerEnvironment> -void IoContext::submit(Future &future) { - LOGD("submitting " << future << (future.dependency ? " and it's dependencies" : "")); - unsigned prepared = prepareFutureChain(future, 1); - +auto IoContext::submitPreparedSqes() -> unsigned { // submit the Future to the io_uring int submitted = io_uring_submit(&ring); - // We can't submit our sqe because the CQ is full - // in worker thread -> reapCompletions - // in globalCompleter thread -> TODO: deterministically handle global full CQ - // for now hope a jam does not happen or will solve itself - if (unlikely(submitted == -EBUSY)) { - if constexpr (emper::DEBUG) { - std::stringstream sst; - sst << "io_submit returned EBUSY trying to submit in addition to " << reqs_in_uring - << std::endl; - logI(sst.str()); - } else { - logI("io_submit returned EBUSY"); + // Actually I don't know how "unlikely" this is + if (unlikely(submitted < 0)) { + if (unlikely(submitted != -EBUSY)) { + errno = -submitted; + DIE_MSG_ERRNO("io_uring_submit failed"); } + // We can't submit our sqe because the CQ is full + // in worker thread -> reapCompletions + // in globalCompleter thread -> TODO: deterministically handle global full CQ + // for now hope a jam does not happen or will solve itself + LOGGER_LOGI("io_submit returned EBUSY trying to submit " + << io_uring_sq_ready(&ring) << " in addition to " << reqs_in_uring); + if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { // we are not busy looping in the globalIo stats.record_io_submit_full_cq(); - DIE_MSG("Future" << future << " dropped because global completer SQ is full"); + DIE_MSG("Future dropped because global completer SQ is full"); } TIME_NS( @@ -117,23 +117,43 @@ void IoContext::submit(Future &future) { stats.record_io_submit_full_cq); } - if (unlikely(submitted < 0)) { - errno = -submitted; - DIE_MSG_ERRNO("io_uring_submit failed"); - } + return static_cast<unsigned>(submitted); +} + +template <CallerEnvironment callerEnvironment> +void IoContext::submit(Future &future) { + LOGD("submitting " << future << (future.dependency ? " and it's dependencies" : "")); + unsigned prepared = prepareFutureChain(future, 1); + + // submit the Future to the io_uring + int submitted = submitPreparedSqes<callerEnvironment>(); // We submitted some Futures to the io_uring // Because we submit every Future right away multiple prepared sqes can only // occur for Future chains. // If we could not submit the whole chain cancel all non submitted Futures - // because we can not guaranty the soundness of the chain + // because we can not guaranty the soundness of the chain and invalidate their + // prepared sqe // req1 -> invalid_req -> req3 - // will submit only 2 instead of all 3 prepared sqes + // will submit only 2 instead of all 3 prepared sqes leaving a sqe for req3 + // in the SQ but unsubmitted // See: https://github.com/axboe/liburing/issues/186 + // https://github.com/axboe/liburing/issues/92 if (unlikely(static_cast<unsigned>(submitted) < prepared)) { unsigned unsubmitted = io_uring_sq_ready(&ring); Future *unsubmittedFuture = &future; - while (unsubmitted) { + + for (unsigned i = 0; i < unsubmitted; ++i) { + // we prepare sqes for future from the chain end to the head + // A -> B -> C will result in CQ [C, B, A] and preparedSqes [C, B, A] + // invalidate/cancel N sqes/Futures from the back + struct io_uring_sqe *sqe = preparedSqes.back(); + preparedSqes.pop_back(); + // invalidate the prepared sqe + io_uring_prep_nop(sqe); + io_uring_sqe_set_data(sqe, nullptr); + + // cancel the according future unsubmittedFuture->recordCompletion(stats, -ECANCELED); if constexpr (callerEnvironment == EMPER) { unsubmittedFuture->complete(-ECANCELED); @@ -141,10 +161,19 @@ void IoContext::submit(Future &future) { unsubmittedFuture->completeFromAnywhere(-ECANCELED); } unsubmittedFuture = future.dependency; - unsubmitted--; } + + // submit all now invalidated sqes +#ifdef NDEBUG + ATTR_UNUSED +#endif + unsigned invalidated = submitPreparedSqes<callerEnvironment>(); + assert(invalidated == unsubmitted); } + // we have submitted all our prepared sqes + preparedSqes.clear(); + // io_uring will try to synchronously complete any IO request before // offloading it to the async backend. See io_uring_enter(2). // Immediate offloading can be enforced by setting the IOSQE_ASYNC flag in the sqe. @@ -359,6 +388,9 @@ IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) } } + // reserve space for a full SQ + preparedSqes.reserve(*this->ring.sq.kring_entries); + // This eventfd will be registered to retrieve completion notifications when the Runtime // calls registerWorkerIo() on the globalIo // Or it will be used to terminate the globalIo diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 4f3d65789e36560f21697c33e69e1ffe41690a94..2f34771e2156554c583b92d6c7090de8bf1b66e1 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -72,6 +72,19 @@ class IoContext : public Logger<LogSubsystem::IO> { // Members useful for debugging std::atomic<long> reqs_in_uring = 0; + // We need to keep track of prepared to sqes to invalidate them short submission + // If a user wants to submit a chain of 5 Futures and the third is invalid + // io_uring_submit will only submit 3 of 5 prepared sqes leaving the 2 unsubmitted in + // the SQ. + // Those unsubmitted sqes would be submitted on the next call of io_uring_submit + // possibly breaking the guaranties of a chain (cancel all dependent operations). + // Further more having unsubmitted sqes in the SQ prevents SQ-sized chains from beeing + // prepared. + // To prevent all those problem we keep track of the prepared sqes and "invalidate" + // (prepare as NOP with a nullptr as user_data) submit all unsubmitted sqes + // and cancel their futures + std::vector<struct io_uring_sqe *> preparedSqes; + // set containing all Futures currently in the io_uring for debugging emper::lib::adt::LockedSet<Future *> uringFutureSet; @@ -87,6 +100,14 @@ class IoContext : public Logger<LogSubsystem::IO> { */ auto prepareFutureChain(Future &future, unsigned chain_length) -> unsigned; + /** + * @brief submit prepared sqes possibly reaping completions if CQ is full + * + * @return the number of submitted Futures + */ + template <CallerEnvironment callerEnvironment> + auto submitPreparedSqes() -> unsigned; + inline void setWorkerIo() { workerIo = this; } public: diff --git a/tests/LinkFutureTest.cpp b/tests/LinkFutureTest.cpp index 6d3d0c7a36afc9b2eb5216e3e3e2b3aad68fc262..b9859ec5c4bc132641058518cca02e62b1633e91 100644 --- a/tests/LinkFutureTest.cpp +++ b/tests/LinkFutureTest.cpp @@ -120,18 +120,12 @@ static void failureChainCorInvCor() { } void emperTest() { - // This leaves the io_uring in a weird state because the successChain() afterwards - // fails because the write future is already completed with -ECANCELED but - // there is successful cqe in the CQ for this future + // Test if a invalid chain leaves the IoContext in an unexpected / invalid state failureChainCorInvCor(); - successChain(); successLoop(); failureChainInvCor(); - - // failureChainInvCor left the io_uring in a weird state because - // the io_uring_submit of the openAndWait() never returns. - // failureChainCorInvCor(); + failureChainCorInvCor(); exit(EXIT_SUCCESS); }