diff --git a/emper/Debug.hpp b/emper/Debug.hpp index d24a1b2fd0307ff48f5415630d7795c4d3c8f2db..768a07e0c1fcc95f5ded73d6bb408c3de115b983 100644 --- a/emper/Debug.hpp +++ b/emper/Debug.hpp @@ -38,10 +38,19 @@ // NOLINTNEXTLINE(bugprone-macro-parentheses) #define LOGI(x) LOG(Info, "Info: " << x, emper_log_no_prefix, true); +// NOLINTNEXTLINE(bugprone-macro-parentheses) +#define LOGGER_LOGI(x) LOG(Info, "Info: " << x, this->logI, true); + // NOLINTNEXTLINE(bugprone-macro-parentheses) #define LOGW(x) LOG(Warning, "Warning: " << x, emper_log_no_prefix, true); +// NOLINTNEXTLINE(bugprone-macro-parentheses) +#define LOGGER_LOGW(x) LOG(Warning, "Warning: " << x, this->logW, true); + // NOLINTNEXTLINE(bugprone-macro-parentheses) #define LOGE(x) LOG(Error, "Error: " << x, emper_log_no_prefix, true); +// NOLINTNEXTLINE(bugprone-macro-parentheses) +#define LOGGER_LOGE(x) LOG(Error, "Error: " << x, emper_log_no_prefix, true); + // NOLINTNEXTLINE(bugprone-macro-parentheses) #define ABORT(x) { LOGE(x); abort(); } diff --git a/emper/io.hpp b/emper/io.hpp index 10b228781b31067b7ed3b74ea391816372e8f01e..64efb93f7d4ea50913c7876bbeb5831eb4c0fa99 100644 --- a/emper/io.hpp +++ b/emper/io.hpp @@ -36,7 +36,8 @@ namespace emper::io { * * @return Future object which signals the completion of the recv request */ -inline auto recv(int socket, void *buffer, size_t length, int flags) -> std::unique_ptr<Future> { +[[nodiscard]] inline auto recv(int socket, void *buffer, size_t length, int flags) + -> std::unique_ptr<Future> { auto future = std::make_unique<RecvFuture>(socket, buffer, length, flags); future->submit(); return future; @@ -76,8 +77,8 @@ inline auto recvAndWait(int socket, void *buffer, size_t length, int flags) -> s * * @return Future object which signals the completion of the send request */ -inline auto send(int socket, const void *buffer, size_t length, int flags, bool send_all = true) - -> std::unique_ptr<Future> { +[[nodiscard]] inline auto send(int socket, const void *buffer, size_t length, int flags, + bool send_all = true) -> std::unique_ptr<Future> { auto future = std::make_unique<SendFuture>(socket, buffer, length, flags, send_all); future->submit(); return future; @@ -124,7 +125,7 @@ inline auto sendAndWait(int socket, const void *buffer, size_t length, int flags * * @return Future object which signals the completion of the connect request */ -inline auto connect(int socket, const struct sockaddr *address, socklen_t address_len) +[[nodiscard]] inline auto connect(int socket, const struct sockaddr *address, socklen_t address_len) -> std::unique_ptr<Future> { auto future = std::make_unique<ConnectFuture>(socket, address, address_len); future->submit(); @@ -174,7 +175,7 @@ inline auto connectAndWait(int socket, const struct sockaddr *address, socklen_t * * @return Future object which signals the completion of the accept request */ -inline auto accept(int socket, struct sockaddr *address, socklen_t *address_len) +[[nodiscard]] inline auto accept(int socket, struct sockaddr *address, socklen_t *address_len) -> std::unique_ptr<Future> { auto future = std::make_unique<AcceptFuture>(socket, address, address_len); future->submit(); @@ -220,8 +221,8 @@ inline auto acceptAndWait(int socket, struct sockaddr *address, socklen_t *addre * * @return Future object which signals the completion of the read request */ -inline auto readFile(int fildes, void *buf, size_t nbyte, off_t offset = -1, bool read_all = false) - -> std::unique_ptr<Future> { +[[nodiscard]] inline auto readFile(int fildes, void *buf, size_t nbyte, off_t offset = -1, + bool read_all = false) -> std::unique_ptr<Future> { auto future = std::make_unique<ReadFuture>(fildes, buf, nbyte, offset, read_all); future->submit(); return future; @@ -267,8 +268,8 @@ inline auto readFileAndWait(int fildes, void *buf, size_t nbyte, off_t offset = * * @return Future object which signals the completion of the write request */ -inline auto writeFile(int fildes, const void *buf, size_t nbyte, off_t offset = -1, - bool write_all = true) -> std::unique_ptr<Future> { +[[nodiscard]] inline auto writeFile(int fildes, const void *buf, size_t nbyte, off_t offset = -1, + bool write_all = true) -> std::unique_ptr<Future> { auto future = std::make_unique<WriteFuture>(fildes, buf, nbyte, offset, write_all); future->submit(); return future; @@ -311,7 +312,8 @@ inline auto writeFileAndWait(int fildes, const void *buf, size_t nbyte, off_t of * * @return Future object which signals the completion of the write request */ -inline auto writev(int fildes, const struct iovec *iov, int iovcnt) -> std::unique_ptr<Future> { +[[nodiscard]] inline auto writev(int fildes, const struct iovec *iov, int iovcnt) + -> std::unique_ptr<Future> { auto future = std::make_unique<WritevFuture>(fildes, iov, iovcnt); future->submit(); return future; @@ -348,7 +350,7 @@ inline auto writevAndWait(int fildes, const struct iovec *iov, int iovcnt) -> ss * * @return Future object which signals the completion of the openat request */ -inline auto openat(int dirfd, const char *pathname, int flags, mode_t mode = 0) +[[nodiscard]] inline auto openat(int dirfd, const char *pathname, int flags, mode_t mode = 0) -> std::unique_ptr<Future> { auto future = std::make_unique<OpenatFuture>(dirfd, pathname, flags, mode); future->submit(); @@ -386,7 +388,8 @@ inline auto openatAndWait(int dirfd, const char *pathname, int flags, mode_t mod * * @return Future object which signals the completion of the open request */ -inline auto open(const char *pathname, int flags, mode_t mode = 0) -> std::unique_ptr<Future> { +[[nodiscard]] inline auto open(const char *pathname, int flags, mode_t mode = 0) + -> std::unique_ptr<Future> { auto future = std::make_unique<OpenatFuture>(AT_FDCWD, pathname, flags, mode); future->submit(); return future; @@ -420,7 +423,7 @@ inline auto openAndWait(const char *pathname, int flags, mode_t mode = 0) -> siz * * @return Future object which signals the completion of the close request */ -inline auto close(int fd) -> std::unique_ptr<Future> { +[[nodiscard]] inline auto close(int fd) -> std::unique_ptr<Future> { auto future = std::make_unique<CloseFuture>(fd); future->submit(); return future; diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index d827a1a0dc43896a9bf745781cb392eecd162d01..4a4e20930b5a9368f62df06541adb650a78dcfd6 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -59,6 +59,9 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns future.prepareSqe(sqe); + // remember the prepared sqe to invalidate it if it was not properly submitted + preparedSqes.push_back(sqe); + // we should start a new Fiber executing callback on completion if (future.callback) { LOGD("prepare " << future << " Callback " << future.callback); @@ -81,31 +84,28 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns } template <CallerEnvironment callerEnvironment> -void IoContext::submit(Future &future) { - LOGD("submitting " << future << (future.dependency ? " and it's dependencies" : "")); - unsigned prepared = prepareFutureChain(future, 1); - +auto IoContext::submitPreparedSqes() -> unsigned { // submit the Future to the io_uring int submitted = io_uring_submit(&ring); - // We can't submit our sqe because the CQ is full - // in worker thread -> reapCompletions - // in globalCompleter thread -> TODO: deterministically handle global full CQ - // for now hope a jam does not happen or will solve itself - if (unlikely(submitted == -EBUSY)) { - if constexpr (emper::DEBUG) { - std::stringstream sst; - sst << "io_submit returned EBUSY trying to submit in addition to " << reqs_in_uring - << std::endl; - logI(sst.str()); - } else { - logI("io_submit returned EBUSY"); + // Actually I don't know how "unlikely" this is + if (unlikely(submitted < 0)) { + if (unlikely(submitted != -EBUSY)) { + errno = -submitted; + DIE_MSG_ERRNO("io_uring_submit failed"); } + // We can't submit our sqe because the CQ is full + // in worker thread -> reapCompletions + // in globalCompleter thread -> TODO: deterministically handle global full CQ + // for now hope a jam does not happen or will solve itself + LOGGER_LOGI("io_submit returned EBUSY trying to submit " + << io_uring_sq_ready(&ring) << " in addition to " << reqs_in_uring); + if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { // we are not busy looping in the globalIo stats.record_io_submit_full_cq(); - DIE_MSG("Future" << future << " dropped because global completer SQ is full"); + DIE_MSG("Future dropped because global completer SQ is full"); } TIME_NS( @@ -117,23 +117,43 @@ void IoContext::submit(Future &future) { stats.record_io_submit_full_cq); } - if (unlikely(submitted < 0)) { - errno = -submitted; - DIE_MSG_ERRNO("io_uring_submit failed"); - } + return static_cast<unsigned>(submitted); +} + +template <CallerEnvironment callerEnvironment> +void IoContext::submit(Future &future) { + LOGD("submitting " << future << (future.dependency ? " and it's dependencies" : "")); + unsigned prepared = prepareFutureChain(future, 1); + + // submit the Future to the io_uring + int submitted = submitPreparedSqes<callerEnvironment>(); // We submitted some Futures to the io_uring // Because we submit every Future right away multiple prepared sqes can only // occur for Future chains. // If we could not submit the whole chain cancel all non submitted Futures - // because we can not guaranty the soundness of the chain + // because we can not guaranty the soundness of the chain and invalidate their + // prepared sqe // req1 -> invalid_req -> req3 - // will submit only 2 instead of all 3 prepared sqes + // will submit only 2 instead of all 3 prepared sqes leaving a sqe for req3 + // in the SQ but unsubmitted // See: https://github.com/axboe/liburing/issues/186 + // https://github.com/axboe/liburing/issues/92 if (unlikely(static_cast<unsigned>(submitted) < prepared)) { unsigned unsubmitted = io_uring_sq_ready(&ring); Future *unsubmittedFuture = &future; - while (unsubmitted) { + + for (unsigned i = 0; i < unsubmitted; ++i) { + // we prepare sqes for future from the chain end to the head + // A -> B -> C will result in CQ [C, B, A] and preparedSqes [C, B, A] + // invalidate/cancel N sqes/Futures from the back + struct io_uring_sqe *sqe = preparedSqes.back(); + preparedSqes.pop_back(); + // invalidate the prepared sqe + io_uring_prep_nop(sqe); + io_uring_sqe_set_data(sqe, nullptr); + + // cancel the according future unsubmittedFuture->recordCompletion(stats, -ECANCELED); if constexpr (callerEnvironment == EMPER) { unsubmittedFuture->complete(-ECANCELED); @@ -141,10 +161,19 @@ void IoContext::submit(Future &future) { unsubmittedFuture->completeFromAnywhere(-ECANCELED); } unsubmittedFuture = future.dependency; - unsubmitted--; } + + // submit all now invalidated sqes +#ifdef NDEBUG + ATTR_UNUSED +#endif + unsigned invalidated = submitPreparedSqes<callerEnvironment>(); + assert(invalidated == unsubmitted); } + // we have submitted all our prepared sqes + preparedSqes.clear(); + // io_uring will try to synchronously complete any IO request before // offloading it to the async backend. See io_uring_enter(2). // Immediate offloading can be enforced by setting the IOSQE_ASYNC flag in the sqe. @@ -359,6 +388,9 @@ IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) } } + // reserve space for a full SQ + preparedSqes.reserve(*this->ring.sq.kring_entries); + // This eventfd will be registered to retrieve completion notifications when the Runtime // calls registerWorkerIo() on the globalIo // Or it will be used to terminate the globalIo diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 4f3d65789e36560f21697c33e69e1ffe41690a94..2f34771e2156554c583b92d6c7090de8bf1b66e1 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -72,6 +72,19 @@ class IoContext : public Logger<LogSubsystem::IO> { // Members useful for debugging std::atomic<long> reqs_in_uring = 0; + // We need to keep track of prepared to sqes to invalidate them short submission + // If a user wants to submit a chain of 5 Futures and the third is invalid + // io_uring_submit will only submit 3 of 5 prepared sqes leaving the 2 unsubmitted in + // the SQ. + // Those unsubmitted sqes would be submitted on the next call of io_uring_submit + // possibly breaking the guaranties of a chain (cancel all dependent operations). + // Further more having unsubmitted sqes in the SQ prevents SQ-sized chains from beeing + // prepared. + // To prevent all those problem we keep track of the prepared sqes and "invalidate" + // (prepare as NOP with a nullptr as user_data) submit all unsubmitted sqes + // and cancel their futures + std::vector<struct io_uring_sqe *> preparedSqes; + // set containing all Futures currently in the io_uring for debugging emper::lib::adt::LockedSet<Future *> uringFutureSet; @@ -87,6 +100,14 @@ class IoContext : public Logger<LogSubsystem::IO> { */ auto prepareFutureChain(Future &future, unsigned chain_length) -> unsigned; + /** + * @brief submit prepared sqes possibly reaping completions if CQ is full + * + * @return the number of submitted Futures + */ + template <CallerEnvironment callerEnvironment> + auto submitPreparedSqes() -> unsigned; + inline void setWorkerIo() { workerIo = this; } public: diff --git a/tests/LinkFutureTest.cpp b/tests/LinkFutureTest.cpp index 9a05b0368d8b93f22c982d21b703e5f33ada6fe9..b9859ec5c4bc132641058518cca02e62b1633e91 100644 --- a/tests/LinkFutureTest.cpp +++ b/tests/LinkFutureTest.cpp @@ -116,22 +116,16 @@ static void failureChainCorInvCor() { res = correctFuture1.wait(); assert(res == (int32_t)buf.size()); - emper::io::close(fd); + emper::io::closeAndWait(fd); } void emperTest() { - // This leaves the io_uring in a weird state because the successChain() afterwards - // fails because the write future is already completed with -ECANCELED but - // there is successful cqe in the CQ for this future + // Test if a invalid chain leaves the IoContext in an unexpected / invalid state failureChainCorInvCor(); - successChain(); successLoop(); failureChainInvCor(); - - // failureChainInvCor left the io_uring in a weird state because - // the io_uring_submit of the openAndWait() never returns. - // failureChainCorInvCor(); + failureChainCorInvCor(); exit(EXIT_SUCCESS); }