diff --git a/emper/io/Future.cpp b/emper/io/Future.cpp index 5d955fd00cfa20ee9bbf4659a5f3c5892bd9ddf0..f07b3849416378c13e7192fe6f28dcb364ae01d2 100644 --- a/emper/io/Future.cpp +++ b/emper/io/Future.cpp @@ -9,8 +9,10 @@ #include "BinaryPrivateSemaphore.hpp" // for BPS #include "CallerEnvironment.hpp" // for CallerEnvironment #include "Debug.hpp" // for LOGD, LOGW -#include "io/IoContext.hpp" // for IoContext -#include "io/Stats.hpp" // for Stats, Operation +#include "Worker.hpp" +#include "io/IoContext.hpp" +#include "io/Stats.hpp" +#include "lib/TaggedPtr.hpp" namespace emper::io { @@ -40,17 +42,13 @@ PartialCompletableFuture::tryComplete<CallerEnvironment::EMPER>(int32_t res); template PartialCompletableFuture::CompletionType PartialCompletableFuture::tryComplete<CallerEnvironment::ANYWHERE>(int32_t res); -auto Future::wait() -> int32_t { - if (unlikely(callback)) { - throw FutureError("Futures with registered callback must not be awaited"); - } - +auto Future::_wait() -> int32_t { LOGD("Waiting on " << this); sem.wait(); if constexpr (emper::DEBUG) { - state.retrieved = true; + markRetrieved(); } return returnValue; @@ -78,12 +76,13 @@ void Future::submit() { } auto Future::cancel() -> int32_t { + LOGD("Cancelling " << this); + markCancelled(); + if (dependency) { dependency->cancel(); } - state.cancelled = true; - // With the separation of isSubmitted and isPrepared dependent Futures // are not marked as submitted anymore. // This means we have to check if we are canceling a not submitted Future @@ -97,9 +96,25 @@ auto Future::cancel() -> int32_t { return returnValue; } - CancelWrapper cancellation(*this); - cancellation.submitAndWait(); - return wait(); + // Only try to cancel Futures which have actually reached the io_uring + if (isPrepared()) { + CancelWrapper cancellation(*this); + const int32_t cancelResult = cancellation.submitAndWait(); + + if (callback) returnValue = cancelResult; + } + + // We have to ensure that the Future can be dropped when we return. + // This is possible when it gets completed or if we are sure it will no longer + // be use in the IO-subsystem (SubmitActor). + // Both can be achieved by awaiting it. + // The former is trivial. + // The latter is guarantied for Futures that can not be completed + // (forgotten, callback ones) because we signal their semaphore during + // preparation and after that they will no longer be used by EMPER. + // The kernel though may still use resources referenced by the Future such as + // a buffer or file descriptor. + return _wait(); } /** @@ -127,7 +142,7 @@ void SendFuture::submit() { ssize_t res = send(fd, buf, len, flags | MSG_DONTWAIT); if (res != -1 || (errno != EAGAIN && errno != EWOULDBLOCK)) { - state.submitted = true; + markSubmitted(); int ires = res == -1 ? -errno : static_cast<int>(res); complete(ires); return; @@ -144,7 +159,7 @@ void RecvFuture::submit() { ssize_t res = recv(fd, buf, len, flags | MSG_DONTWAIT); if (res != -1 || (errno != EAGAIN && errno != EWOULDBLOCK)) { - state.submitted = true; + markSubmitted(); int ires = res == -1 ? -errno : static_cast<int>(res); complete(ires); return; @@ -154,4 +169,9 @@ void RecvFuture::submit() { normal_submit: Future::submit(); } + +void CancelWrapper::prepareSqeInternal(struct io_uring_sqe* sqe) { + void* user_data = IoContext::createFutureTag(*reinterpret_cast<Future*>(buf)); + io_uring_prep_cancel(sqe, user_data, 0); +} } // namespace emper::io diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp index 9dfd0200af830e8211b4cf1d830849c45cfc9b14..5f7f0a9bb940813100877713ee8c611088830154 100644 --- a/emper/io/Future.hpp +++ b/emper/io/Future.hpp @@ -6,6 +6,7 @@ #include <sys/socket.h> // for socklen_t #include <sys/types.h> +#include <atomic> #include <cassert> #include <cstddef> // for size_t #include <cstdint> // for int32_t, uint8_t @@ -25,7 +26,6 @@ #include "emper-common.h" #include "io/Operation.hpp" // for Operation, operator<<, Operati... -struct io_uring_sqe; class Fiber; namespace emper::io { @@ -63,27 +63,68 @@ class Future : public Logger<LogSubsystem::IO> { void operator()(const int32_t& res) { callback(res); } }; - using State = struct State { - uint8_t submitted : 1 = 0; /*!< This Future was submitted to the IO subsystem */ - uint8_t prepared : 1 = 0; /*!< An sqe for this Future was prepared */ - uint8_t completed : 1 = 0; /*!< The semaphor was signaled */ - uint8_t retrieved : 1 = 0; /*!< Future::wait() has retrieved the result */ - uint8_t dependency : 1 = 0; /*!< This Future is a dependency of another */ - uint8_t cancelled : 1 = 0; /*!< Future::cancel() was called */ - uint8_t forgotten : 1 = - 0; /*!< This Future does not care about the result of the IO operation */ - }; - /* * @brief Semaphore enabling Fibers to wait for the completion of this Future * * The semaphore is either used to block contexts waiting on the future - * or used to ensure that the future is not forgotten and thus possibly deleted - * until it was handled by the submit actor (only in the IO_SINGLE_URING case). + * or used to ensure that the future is not dropped and thus possibly deleted + * until it was prepared by the submit actor (only in the IO_SINGLE_URING case). */ BPS sem; - State state; + using State = struct State { + enum _State { + Submitted = 1 << 0, /*!< This Future was submitted to the IO subsystem */ + Prepared = 1 << 1, /*!< An sqe for this Future was prepared */ + Completed = 1 << 2, /*!< The semaphor was signaled */ + Retrieved = 1 << 3, /*!< Future::wait() has retrieved the result */ + Dependency = 1 << 4, /*!< This Future is a dependency of another */ + Cancelled = 1 << 5, /*!< Future::cancel() was called */ + Forgotten = 1 << 6, /*!< This Future will not be awaited */ + }; + }; + + /** + * @brief State of the Future object + * + * This state member is used to track the state of the future and determines + * our use of it. + * Most of the time the state is not use concurrently but some data races exist: + * - cancel() & complete(): They race for state.completed when complete() may + * run in a different thread than the call to submit(). + * This can happen when a completer is present, or we use a + * single io_uring or IO-stealing is enabled. + * This race is **NOT HARMFULL** since when cancel + * wins the race the only thing it does is calling wait(). + * + * - cancel() & prepare(): They race for state.prepared when prepareSqe() may + * run in a different thread than cancel(). + * This can happen when we use a single io_uring. + * The race is **NOT HARMFULL** we always await the + * future's semaphore in cancel. If the Future is + * not completeable the semaphore will be signaled after + * preparation otherwise when completed. + */ + uint8_t state = 0; + + /** + * @brief atomic helper to set a bit in the state in the face of data races + */ + void casIntoState(uint8_t bit) { + auto* astate = reinterpret_cast<std::atomic<uint8_t>*>(&state); + uint8_t oldState = astate->load(std::memory_order_relaxed); + do { + } while (!astate->compare_exchange_weak(oldState, oldState | bit, std::memory_order_release, + std::memory_order_relaxed)); + } + + /** + * @brief atomic helper to acquire the state + */ + auto acquireState() const -> uint8_t { + auto* astate = reinterpret_cast<const std::atomic<uint8_t>*>(&state); + return astate->load(std::memory_order_acquire); + } /* IO operation to perform */ const Operation op; @@ -120,17 +161,22 @@ class Future : public Logger<LogSubsystem::IO> { */ CallbackInternal* callback = nullptr; - virtual void prepareSqe(io_uring_sqe* sqe) = 0; + void prepareSqe(struct io_uring_sqe* sqe) { + markPrepared(); + prepareSqeInternal(sqe); + } + + virtual void prepareSqeInternal(struct io_uring_sqe* sqe) = 0; void submitAndForget() { - state.forgotten = true; + markForgotten(); submit(); // When using a SubmitActor a Future may be submitted to the SubmitActor but // no SQE was prepared for it yet. // It must be ensured that the Future is prepared before forgetting and possibly dropping it. // We await its preperation using the nomal BPS which is not used otherwise for forgotten // Futures. - // The BPS of forgotten Futures is signaled in IoContext::prepareFutureChainwhen + // The BPS of forgotten Futures is signaled in IoContext::prepareFutureChain // when using a single io_uring if constexpr (emper::IO_SINGLE_URING) { sem.wait(); @@ -138,10 +184,10 @@ class Future : public Logger<LogSubsystem::IO> { } void setCompletion(int32_t res) { - assert(!state.completed); + assert(!isCompleted()); LOGD("Complete " << this << " with result " << res); returnValue = res; - state.completed = true; + markCompleted(); } virtual void complete(int32_t res) { @@ -170,6 +216,17 @@ class Future : public Logger<LogSubsystem::IO> { void recordCompletionInternal(Stats& stats, int32_t res) const; + /** + * @brief Block till the IO request is completed without sanity checks + * + * @return return the result received from the io_uring + * + * It is guarantied that after wait() returned the memory of a Future and + * all necessary resources for the operation are no longer used and + * referenced by EMPER and thus can be freed. + */ + auto _wait() -> int32_t; + Future(Operation op, int fd, void* buf, size_t len, off_t offset) : sem(emper::BlockablePurpose::IO), op(op), fd(fd), buf(buf), len(len), offset(offset){}; @@ -183,29 +240,79 @@ class Future : public Logger<LogSubsystem::IO> { // only if callback is set and if callback is set ~Future does not call cancel. // NOLINTNEXTLINE(bugprone-exception-escape) virtual ~Future() { - if (isForgotten() || isRetrieved() || isCancelled() || callback) { + if (isForgotten() || isRetrieved() || isCancelled()) { return; } + // If this Future uses a callback we have to ensure that the future is + // prepared before dropping it. + if (callback && isPrepared()) return; + if constexpr (emper::DEBUG) { - if (!isDependency()) { + if (!isDependency() && !callback) { LOGE(this << " created but never awaited"); abort(); } } - // cancel this Future to guaranty memory safety + // Cancel this Future to guaranty memory safety. // By canceling we assure that the this pointer and the contained buffer - // passed to io_uring is not used any more by the kernel or the IoContext + // passed to io_uring is not used any more by the kernel or the IoContext. cancel(); } - [[nodiscard]] auto isRetrieved() const -> bool { return state.retrieved; } - [[nodiscard]] auto isSubmitted() const -> bool { return state.submitted; } - [[nodiscard]] auto isCompleted() const -> bool { return state.completed; } - [[nodiscard]] auto isCancelled() const -> bool { return state.cancelled; } - [[nodiscard]] auto isDependency() const -> bool { return state.dependency; } - [[nodiscard]] auto isForgotten() const -> bool { return state.forgotten; } + // State setters + // Those are called during the life of a Future and may introduce data races. + void markSubmitted() { state |= State::Submitted; } + void markPrepared() { + if constexpr (emper::IO_SINGLE_URING) { + casIntoState(State::Prepared); + } else { + state |= State::Prepared; + } + } + void markCompleted() { state |= State::Completed; } + void markRetrieved() { state |= State::Retrieved; } + void markCancelled() { + if constexpr (emper::IO_SINGLE_URING) { + casIntoState(State::Cancelled); + } else { + state |= State::Cancelled; + } + } + + // Those are state information which can not change after submitting a Future. + void markAsDependency() { + assert(!isSubmitted()); + state |= State::Dependency; + } + void markForgotten() { + assert(!isSubmitted()); + state |= State::Forgotten; + } + + // State getters + [[nodiscard]] auto isSubmitted() const -> bool { return state & State::Submitted; } + + [[nodiscard]] auto isPrepared() const -> bool { + if constexpr (emper::IO_SINGLE_URING) { + return acquireState() & State::Prepared; + } + return state & State::Prepared; + } + + [[nodiscard]] auto isCompleted() const -> bool { return state & State::Completed; } + [[nodiscard]] auto isRetrieved() const -> bool { return state & State::Retrieved; } + [[nodiscard]] auto isDependency() const -> bool { return state & State::Dependency; } + + [[nodiscard]] auto isCancelled() const -> bool { + if constexpr (emper::IO_SINGLE_URING) { + return acquireState() & State::Cancelled; + } + return state & State::Cancelled; + } + + [[nodiscard]] auto isForgotten() const -> bool { return state & State::Forgotten; } /* * @brief reset the Future @@ -218,7 +325,7 @@ class Future : public Logger<LogSubsystem::IO> { */ virtual inline void reset() { LOGD("Resetting Future"); - state = State(); + state = 0; sem.reset(); } @@ -260,7 +367,7 @@ class Future : public Logger<LogSubsystem::IO> { * */ inline void setDependency(Future& dependency) { - dependency.state.dependency = true; + dependency.markAsDependency(); this->dependency = &dependency; } @@ -278,7 +385,7 @@ class Future : public Logger<LogSubsystem::IO> { } /* - * @brief submit Future for asynchronous completion to the workers IoContext + * @brief submit Future for asynchronous completion to an IoContext */ virtual void submit(); @@ -286,6 +393,10 @@ class Future : public Logger<LogSubsystem::IO> { * @brief cancel Future and wait if necessary for its cancellation * * @return return the result received from the io_uring + * + * It is guarantied that after cancel() returned the memory of a Future and + * all necessary resources for the operation are no longer used and + * referenced by EMPER and thus can be freed. */ auto cancel() -> int32_t; @@ -293,8 +404,18 @@ class Future : public Logger<LogSubsystem::IO> { * @brief Block till the IO request is completed * * @return return the result received from the io_uring + * + * It is guarantied that after wait() returned the memory of a Future and + * all necessary resources for the operation are no longer used and + * referenced by EMPER and thus can be freed. */ - auto wait() -> int32_t; + auto wait() -> int32_t { + if (unlikely(callback)) { + throw FutureError("Futures with registered callback must not be awaited"); + } + + return _wait(); + } /** * @brief Block till the IO request is completed and set errno on error @@ -368,6 +489,7 @@ class PartialCompletableFuture : public Future { bool completeFully) : Future(op, fd, buf, len, offset), partialCompletion(completeFully ? ENABLE_PARTIAL_COMPLETION : DISABLE_PARTIAL_COMPLETION){}; + PartialCompletableFuture(Operation op, int fd, void* buf, size_t len, int flags, bool completeFully) : Future(op, fd, buf, len, flags), @@ -376,6 +498,7 @@ class PartialCompletableFuture : public Future { PartialCompletableFuture(Operation op, int fd, void* buf, size_t len, off_t offset, int32_t partialCompletion) : Future(op, fd, buf, len, offset), partialCompletion(partialCompletion){}; + PartialCompletableFuture(Operation op, int fd, void* buf, size_t len, int flags, int32_t partialCompletion) : Future(op, fd, buf, len, flags), partialCompletion(partialCompletion){}; @@ -481,7 +604,7 @@ class PartialCompletableFuture : public Future { }; class SendFuture : public PartialCompletableFuture { - void prepareSqe(io_uring_sqe* sqe) override { + void prepareSqeInternal(struct io_uring_sqe* sqe) override { if (partialCompletion == DISABLE_PARTIAL_COMPLETION) { io_uring_prep_send(sqe, fd, buf, len, flags); } else { @@ -502,7 +625,9 @@ class SendFuture : public PartialCompletableFuture { }; class RecvFuture : public Future { - void prepareSqe(io_uring_sqe* sqe) override { io_uring_prep_recv(sqe, fd, buf, len, flags); } + void prepareSqeInternal(struct io_uring_sqe* sqe) override { + io_uring_prep_recv(sqe, fd, buf, len, flags); + } public: RecvFuture(int socket, void* buffer, size_t length, int flags) @@ -512,7 +637,7 @@ class RecvFuture : public Future { }; class ConnectFuture : public Future { - void prepareSqe(io_uring_sqe* sqe) override { + void prepareSqeInternal(struct io_uring_sqe* sqe) override { io_uring_prep_connect(sqe, fd, (const struct sockaddr*)buf, (socklen_t)len); } @@ -522,7 +647,7 @@ class ConnectFuture : public Future { }; class AcceptFuture : public Future { - void prepareSqe(io_uring_sqe* sqe) override { + void prepareSqeInternal(struct io_uring_sqe* sqe) override { // NOLINTNEXTLINE(performance-no-int-to-ptr) io_uring_prep_accept(sqe, fd, (struct sockaddr*)buf, (socklen_t*)len, 0); } @@ -533,7 +658,7 @@ class AcceptFuture : public Future { }; class ReadFuture : public PartialCompletableFuture { - void prepareSqe(io_uring_sqe* sqe) override { + void prepareSqeInternal(struct io_uring_sqe* sqe) override { if (partialCompletion == DISABLE_PARTIAL_COMPLETION) { io_uring_prep_read(sqe, fd, buf, len, offset); } else { @@ -550,7 +675,7 @@ class ReadFuture : public PartialCompletableFuture { }; class OpenatFuture : public Future { - void prepareSqe(io_uring_sqe* sqe) override { + void prepareSqeInternal(struct io_uring_sqe* sqe) override { io_uring_prep_openat(sqe, fd, reinterpret_cast<const char*>(buf), static_cast<int>(len), static_cast<mode_t>(offset)); } @@ -562,7 +687,7 @@ class OpenatFuture : public Future { }; class WriteFuture : public PartialCompletableFuture { - void prepareSqe(io_uring_sqe* sqe) override { + void prepareSqeInternal(struct io_uring_sqe* sqe) override { if (partialCompletion == DISABLE_PARTIAL_COMPLETION) { io_uring_prep_write(sqe, fd, buf, len, offset); } else { @@ -581,7 +706,7 @@ class WriteFuture : public PartialCompletableFuture { }; class WritevFuture : public Future { - void prepareSqe(io_uring_sqe* sqe) override { + void prepareSqeInternal(struct io_uring_sqe* sqe) override { io_uring_prep_writev(sqe, fd, (const struct iovec*)buf, len, 0); } @@ -602,7 +727,7 @@ class WritevFuture : public Future { }; class CloseFuture : public Future { - void prepareSqe(io_uring_sqe* sqe) override { io_uring_prep_close(sqe, fd); } + void prepareSqeInternal(struct io_uring_sqe* sqe) override { io_uring_prep_close(sqe, fd); } public: CloseFuture(int fildes) : Future(Operation::CLOSE, fildes, nullptr, 0, 0){}; @@ -611,7 +736,9 @@ class CloseFuture : public Future { }; class ShutdownFuture : public Future { - void prepareSqe(io_uring_sqe* sqe) override { io_uring_prep_shutdown(sqe, fd, flags); } + void prepareSqeInternal(struct io_uring_sqe* sqe) override { + io_uring_prep_shutdown(sqe, fd, flags); + } public: ShutdownFuture(int sockfd, int how) : Future(Operation::SHUTDOWN, sockfd, nullptr, 0, how){}; @@ -647,7 +774,7 @@ class TimeoutWrapper : public Future { }; private: - void prepareSqe(io_uring_sqe* sqe) override { + void prepareSqeInternal(struct io_uring_sqe* sqe) override { io_uring_prep_link_timeout(sqe, (Timespec*)buf, 0); } }; @@ -664,7 +791,9 @@ class AlarmFuture : public Future { AlarmFuture(Timespec& ts) : Future(Operation::TIMEOUT, 0, (void*)&ts, 0, 0){}; private: - void prepareSqe(io_uring_sqe* sqe) override { io_uring_prep_timeout(sqe, (Timespec*)buf, 0, 0); } + void prepareSqeInternal(struct io_uring_sqe* sqe) override { + io_uring_prep_timeout(sqe, (Timespec*)buf, 0, 0); + } }; /* @@ -675,7 +804,7 @@ class AlarmFuture : public Future { * If the target was already started wait() will return -EALREADY. */ class CancelWrapper : public Future { - void prepareSqe(io_uring_sqe* sqe) override { io_uring_prep_cancel(sqe, buf, 0); } + void prepareSqeInternal(struct io_uring_sqe* sqe) override; public: CancelWrapper(Future& future) : Future(Operation::CANCEL, 0, &future, 0, 0){}; @@ -685,7 +814,7 @@ class CancelWrapper : public Future { * @brief Request a madvise operation */ class MadviseFuture : public Future { - void prepareSqe(io_uring_sqe* sqe) override { + void prepareSqeInternal(struct io_uring_sqe* sqe) override { // TODO: Check that len is within the bounds of off_t. auto len_off_t = static_cast<off_t>(len); io_uring_prep_madvise(sqe, buf, len_off_t, flags); diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index 7b59dd2b0069727b9f9ce8d88016f00d776a04ac..850a00bda014f0629cad2c5125c5faf18a6a2d12 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -25,15 +25,13 @@ #include "Emper.hpp" // for DEBUG, IO_URING_SQPOLL #include "Runtime.hpp" #include "emper-common.h" -#include "io/Future.hpp" // for Future, operator<<, Future::State +#include "io/Future.hpp" #include "io/GlobalIoContext.hpp" #include "io/Stats.hpp" // for Stats, nanoseconds #include "io/SubmitActor.hpp" #include "lib/TaggedPtr.hpp" -#include "sleep_strategy/PipeSleepStrategy.hpp" // IWYU pragma: keep using emper::lib::TaggedPtr; -using emper::sleep_strategy::PipeSleepStrategy; namespace emper::io { @@ -62,31 +60,34 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns // remember the prepared sqe to invalidate it if it was not properly submitted preparedSqes.push_back(sqe); - // we should start a new Fiber executing callback on completion + void *user_data = createFutureTag(future); + + // We should start a new Fiber executing callback on completion if (future.callback) { LOGD("prepare " << future << " Callback " << future.callback); // set the Callback's affinity to the submitting worker if (worker) future.callback->affinity = worker->getWorkerId(); - io_uring_sqe_set_data(sqe, - TaggedPtr(future.callback, static_cast<uint16_t>(PointerTags::Callback))); - - // Someone wants to be notified about the completion of this Future - } else if (!future.isForgotten()) { - io_uring_sqe_set_data(sqe, TaggedPtr(&future, static_cast<uint16_t>(PointerTags::Future))); } - future.state.prepared = true; + // Appropriately tag the request so that it can be canceled and we can + // handle its completion. + io_uring_sqe_set_data(sqe, user_data); - // If we use a single io_uring and a SubmitActor forgotten futures may be - // dropped before preparing a sqe. - // This is prevented by waiting on their BPS in Future::submitAndForget. - // This is save because the BPS is guarantied to be unused for forgotten Futures. + // If we use a single io_uring and a SubmitActor the submission and preparation + // of futures are decoupled. + // The Future may be used in a Fiber concurrently to the SubmitActor submitting it + // to the single io_uring. + // We must ensure that the Future is not dropped before it is prepared. + // + // Future::submitAndForget does this by waiting on the BPS of the Future + // which will not be signaled on completion because the Future is forgotten. // We must signal the preparation of the forgotten future and allow its possible // destruction. - if constexpr (emper::IO_SINGLE_URING) { - if (future.isForgotten()) { - future.sem.signal(); - } + // + // And the preparation of Futures with callbacks is now awaited during cancel + // which runs in ~Future which ensures that they can be dropped safely. + if (future.isForgotten() || future.callback) { + future.sem.signal(); } if (future.isDependency()) { @@ -386,8 +387,9 @@ reap_cqes: // Coudn't it be possible to have a lost wakeup with unconsumed new work notification // cqe in our CQ // - // State only a single worker does work involving IO and another (completer, io-stealing - // worker accesses its CQ. + // State: + // Only a single worker does work involving IO and + // another (completer, io-stealing worker accesses its CQ. // Other Owner // | submit IO diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 0a35a5208b6332f2146838199c854f28cf2c5e80..4911503caaeb8b38971cc5c5765a3f44235cda73 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -45,6 +45,7 @@ class IoContext : public Logger<LogSubsystem::IO> { friend class Future; friend class SendFuture; friend class RecvFuture; + friend class CancelWrapper; // We need this friend declaration to access protected members through // IoContext pointer in GlobalIoContext::globalCompleterFunc friend class GlobalIoContext; @@ -153,6 +154,14 @@ class IoContext : public Logger<LogSubsystem::IO> { enum class PointerTags : uint16_t { Future, Callback, NewWorkAq, NewWorkWsq }; + [[nodiscard]] static auto createFutureTag(Future &future) -> emper::lib::TaggedPtr { + if (future.callback) return {future.callback, static_cast<uint16_t>(PointerTags::Callback)}; + + if (!future.isForgotten()) return {&future, static_cast<uint16_t>(PointerTags::Future)}; + + return {nullptr}; + } + /** * @brief submit prepared sqes possibly reaping completions if CQ is full * @@ -354,7 +363,7 @@ class IoContext : public Logger<LogSubsystem::IO> { void submit(Future &future) { // Mark the Future as submitted. We do this here because all Futures // must go through IoContext::submit even calling Future::submit - future.state.submitted = true; + future.markSubmitted(); if constexpr (emper::IO_SINGLE_URING) { submitter->tell<callerEnvironment>(&future); @@ -376,7 +385,7 @@ class IoContext : public Logger<LogSubsystem::IO> { auto submit(InputIt begin, InputIt end) -> unsigned { for (InputIt cur = begin; cur != end; ++cur) { auto *future = *cur; - future->state.submitted = true; + future->markSubmitted(); // This IoContext is Worker exclusive -> prepare the futures if constexpr (emper::IO_WORKER_URING) { diff --git a/emper/io/SubmitActor.cpp b/emper/io/SubmitActor.cpp index 2f17af42b568fd91b31269c3319f8c6242c7a61f..722ad23ea4d6594c6f5e79e681c8299ded502445 100644 --- a/emper/io/SubmitActor.cpp +++ b/emper/io/SubmitActor.cpp @@ -19,6 +19,7 @@ namespace emper::io { void SubmitActor::receive(Future* future) { + LOGD("SubmitActor received " << future); assert(future); assert(future->isSubmitted()); @@ -26,7 +27,8 @@ void SubmitActor::receive(Future* future) { // receives the future complete the chain with -ECANCELED and return if (future->isCancelled()) { // complete the chain - for (Future* f = future; f; f = future->dependency) { + for (Future* f = future; f; f = f->dependency) { + LOGD("SubmitActor cancel " << f); f->recordCompletion(io.stats, -ECANCELED); f->complete(-ECANCELED); } diff --git a/tests/io/CancelFutureTest.cpp b/tests/io/CancelFutureTest.cpp index afc4f4d4aba38d4b34d007f300e8d126283639f3..ca44dbc8d4c7e332f231bebacb6a97665601f702 100644 --- a/tests/io/CancelFutureTest.cpp +++ b/tests/io/CancelFutureTest.cpp @@ -6,6 +6,7 @@ #include <cstdint> // for uint64_t, int32_t #include "Common.hpp" +#include "Emper.hpp" #include "fixtures/assert.hpp" #include "io/Future.hpp" // for ReadFuture, WriteFuture @@ -16,6 +17,21 @@ int efd; uint64_t read_buf; uint64_t write_buf = 42; +void cancelSubmittedCallback() { + ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); + readFuture.setCallback([](int32_t res) { ASSERT(res == -ECANCELED) }); + readFuture.submit(); + int32_t res = readFuture.cancel(); + + if constexpr (emper::IO_SINGLE_URING) { + // When using a single io_uring it is possible the submitted Future was + // canceled before reaching the io_uring. + ASSERT(!res || res == -ECANCELED); + } else { + ASSERT(!res); + } +} + void cancelNotSubmitted() { ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); ASSERT(readFuture.cancel() == -ENOENT); @@ -75,6 +91,7 @@ void emperTest() { DIE_MSG_ERRNO("eventfd failed"); } + cancelSubmittedCallback(); cancelNotSubmitted(); cancelSubmittedNotCompleted(); cancelCompleted();