From 03727b29c8f11d151fee233f00122ac611f3438a Mon Sep 17 00:00:00 2001 From: Florian Fischer <florian.fl.fischer@fau.de> Date: Thu, 18 Mar 2021 12:33:01 +0100 Subject: [PATCH] [IoContext] invalidate unsubmitted sqes io_uring_submit does some inline error checking and consumes less cqes than prepared if an error is detected. Currently we just cancel the Futures, whose prepared sqes were not submitted. But this leaves already prepared sqes for those futures in the SQ of the io_uring which will be submitted the next time io_uring_submit is called. This results in a violation of the chain guaranty, that dependent operations are only executed if all dependencies were successful. Additionally this leads to double completions or memory corruption because the io_uring will produce cqes for already completed Futures. To prevent this from happening we track all sqes we prepared to invalidate and resubmit those which were not submitted because of a short submit. We invalidate sqes by preparing them as NOP instructions and set their user data to NULL. I took this approach instead of rewinding the ring or somethings like similar because it seemed safer for me not fiddle with io_uring internals and just be less efficient. Enable previously failing LinkFutureTest test cases. --- emper/io/IoContext.cpp | 82 ++++++++++++++++++++++++++++------------ emper/io/IoContext.hpp | 21 ++++++++++ tests/LinkFutureTest.cpp | 10 +---- 3 files changed, 80 insertions(+), 33 deletions(-) diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index d827a1a0..4a4e2093 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -59,6 +59,9 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns future.prepareSqe(sqe); + // remember the prepared sqe to invalidate it if it was not properly submitted + preparedSqes.push_back(sqe); + // we should start a new Fiber executing callback on completion if (future.callback) { LOGD("prepare " << future << " Callback " << future.callback); @@ -81,31 +84,28 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns } template <CallerEnvironment callerEnvironment> -void IoContext::submit(Future &future) { - LOGD("submitting " << future << (future.dependency ? " and it's dependencies" : "")); - unsigned prepared = prepareFutureChain(future, 1); - +auto IoContext::submitPreparedSqes() -> unsigned { // submit the Future to the io_uring int submitted = io_uring_submit(&ring); - // We can't submit our sqe because the CQ is full - // in worker thread -> reapCompletions - // in globalCompleter thread -> TODO: deterministically handle global full CQ - // for now hope a jam does not happen or will solve itself - if (unlikely(submitted == -EBUSY)) { - if constexpr (emper::DEBUG) { - std::stringstream sst; - sst << "io_submit returned EBUSY trying to submit in addition to " << reqs_in_uring - << std::endl; - logI(sst.str()); - } else { - logI("io_submit returned EBUSY"); + // Actually I don't know how "unlikely" this is + if (unlikely(submitted < 0)) { + if (unlikely(submitted != -EBUSY)) { + errno = -submitted; + DIE_MSG_ERRNO("io_uring_submit failed"); } + // We can't submit our sqe because the CQ is full + // in worker thread -> reapCompletions + // in globalCompleter thread -> TODO: deterministically handle global full CQ + // for now hope a jam does not happen or will solve itself + LOGGER_LOGI("io_submit returned EBUSY trying to submit " + << io_uring_sq_ready(&ring) << " in addition to " << reqs_in_uring); + if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { // we are not busy looping in the globalIo stats.record_io_submit_full_cq(); - DIE_MSG("Future" << future << " dropped because global completer SQ is full"); + DIE_MSG("Future dropped because global completer SQ is full"); } TIME_NS( @@ -117,23 +117,43 @@ void IoContext::submit(Future &future) { stats.record_io_submit_full_cq); } - if (unlikely(submitted < 0)) { - errno = -submitted; - DIE_MSG_ERRNO("io_uring_submit failed"); - } + return static_cast<unsigned>(submitted); +} + +template <CallerEnvironment callerEnvironment> +void IoContext::submit(Future &future) { + LOGD("submitting " << future << (future.dependency ? " and it's dependencies" : "")); + unsigned prepared = prepareFutureChain(future, 1); + + // submit the Future to the io_uring + int submitted = submitPreparedSqes<callerEnvironment>(); // We submitted some Futures to the io_uring // Because we submit every Future right away multiple prepared sqes can only // occur for Future chains. // If we could not submit the whole chain cancel all non submitted Futures - // because we can not guaranty the soundness of the chain + // because we can not guaranty the soundness of the chain and invalidate their + // prepared sqe // req1 -> invalid_req -> req3 - // will submit only 2 instead of all 3 prepared sqes + // will submit only 2 instead of all 3 prepared sqes leaving a sqe for req3 + // in the SQ but unsubmitted // See: https://github.com/axboe/liburing/issues/186 + // https://github.com/axboe/liburing/issues/92 if (unlikely(static_cast<unsigned>(submitted) < prepared)) { unsigned unsubmitted = io_uring_sq_ready(&ring); Future *unsubmittedFuture = &future; - while (unsubmitted) { + + for (unsigned i = 0; i < unsubmitted; ++i) { + // we prepare sqes for future from the chain end to the head + // A -> B -> C will result in CQ [C, B, A] and preparedSqes [C, B, A] + // invalidate/cancel N sqes/Futures from the back + struct io_uring_sqe *sqe = preparedSqes.back(); + preparedSqes.pop_back(); + // invalidate the prepared sqe + io_uring_prep_nop(sqe); + io_uring_sqe_set_data(sqe, nullptr); + + // cancel the according future unsubmittedFuture->recordCompletion(stats, -ECANCELED); if constexpr (callerEnvironment == EMPER) { unsubmittedFuture->complete(-ECANCELED); @@ -141,10 +161,19 @@ void IoContext::submit(Future &future) { unsubmittedFuture->completeFromAnywhere(-ECANCELED); } unsubmittedFuture = future.dependency; - unsubmitted--; } + + // submit all now invalidated sqes +#ifdef NDEBUG + ATTR_UNUSED +#endif + unsigned invalidated = submitPreparedSqes<callerEnvironment>(); + assert(invalidated == unsubmitted); } + // we have submitted all our prepared sqes + preparedSqes.clear(); + // io_uring will try to synchronously complete any IO request before // offloading it to the async backend. See io_uring_enter(2). // Immediate offloading can be enforced by setting the IOSQE_ASYNC flag in the sqe. @@ -359,6 +388,9 @@ IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) } } + // reserve space for a full SQ + preparedSqes.reserve(*this->ring.sq.kring_entries); + // This eventfd will be registered to retrieve completion notifications when the Runtime // calls registerWorkerIo() on the globalIo // Or it will be used to terminate the globalIo diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 4f3d6578..2f34771e 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -72,6 +72,19 @@ class IoContext : public Logger<LogSubsystem::IO> { // Members useful for debugging std::atomic<long> reqs_in_uring = 0; + // We need to keep track of prepared to sqes to invalidate them short submission + // If a user wants to submit a chain of 5 Futures and the third is invalid + // io_uring_submit will only submit 3 of 5 prepared sqes leaving the 2 unsubmitted in + // the SQ. + // Those unsubmitted sqes would be submitted on the next call of io_uring_submit + // possibly breaking the guaranties of a chain (cancel all dependent operations). + // Further more having unsubmitted sqes in the SQ prevents SQ-sized chains from beeing + // prepared. + // To prevent all those problem we keep track of the prepared sqes and "invalidate" + // (prepare as NOP with a nullptr as user_data) submit all unsubmitted sqes + // and cancel their futures + std::vector<struct io_uring_sqe *> preparedSqes; + // set containing all Futures currently in the io_uring for debugging emper::lib::adt::LockedSet<Future *> uringFutureSet; @@ -87,6 +100,14 @@ class IoContext : public Logger<LogSubsystem::IO> { */ auto prepareFutureChain(Future &future, unsigned chain_length) -> unsigned; + /** + * @brief submit prepared sqes possibly reaping completions if CQ is full + * + * @return the number of submitted Futures + */ + template <CallerEnvironment callerEnvironment> + auto submitPreparedSqes() -> unsigned; + inline void setWorkerIo() { workerIo = this; } public: diff --git a/tests/LinkFutureTest.cpp b/tests/LinkFutureTest.cpp index 6d3d0c7a..b9859ec5 100644 --- a/tests/LinkFutureTest.cpp +++ b/tests/LinkFutureTest.cpp @@ -120,18 +120,12 @@ static void failureChainCorInvCor() { } void emperTest() { - // This leaves the io_uring in a weird state because the successChain() afterwards - // fails because the write future is already completed with -ECANCELED but - // there is successful cqe in the CQ for this future + // Test if a invalid chain leaves the IoContext in an unexpected / invalid state failureChainCorInvCor(); - successChain(); successLoop(); failureChainInvCor(); - - // failureChainInvCor left the io_uring in a weird state because - // the io_uring_submit of the openAndWait() never returns. - // failureChainCorInvCor(); + failureChainCorInvCor(); exit(EXIT_SUCCESS); } -- GitLab