diff --git a/emper/io/Future.cpp b/emper/io/Future.cpp index 329386c9781a3454dec9030e9b8651205c30a6d1..6955fbd55b58ac07fbb9fdfe7f82d2341258309a 100644 --- a/emper/io/Future.cpp +++ b/emper/io/Future.cpp @@ -66,6 +66,11 @@ auto Future::waitAndSetErrno() -> int32_t { void Future::submit() { emper::assertInRuntime(); assert(!isSubmitted()); + + if (callback) { + throw FutureError("Futures with registered callbacks must be submitted through their handle"); + } + IoContext* io = IoContext::getWorkerIo(); LOGD("submit " << this << " to IoContext " << io); @@ -74,6 +79,10 @@ void Future::submit() { } auto Future::cancel() -> int32_t { + if (callback) { + throw FutureError("Futures with registered callbacks must be canceled through their handle"); + } + LOGD("Cancelling " << this); markCancelled(); @@ -105,8 +114,6 @@ auto Future::cancel() -> int32_t { cancellation.submit(); } const int32_t cancelResult = cancellation.wait(); - - if (callback) returnValue = cancelResult; } // We have to ensure that the Future can be dropped when we return. @@ -122,6 +129,17 @@ auto Future::cancel() -> int32_t { return _wait(); } +void Future::CallbackHandle::cancel() { + LOGD("Cancelling " << this); + CancelWrapper cancellation(*this); + if (!emper::IO_SINGLE_URING && submitter != IoContext::workerIo) { + submitter->runtime.scheduleOn(*Fiber::from([&cancellation] { cancellation.submit(); }), + submitter->worker->getWorkerId()); + } else { + cancellation.submit(); + } +} + /** * Internal hiding functions for Stats::recordCompletion double dispatch * @@ -176,11 +194,21 @@ normal_submit: } void CancelWrapper::prepareSqeInternal(struct io_uring_sqe* sqe) { - auto& future = *reinterpret_cast<Future*>(buf); - if constexpr (!emper::IO_SINGLE_URING) { - assert(future.submitter == IoContext::workerIo); + void* user_data; + if (cancelCallback) { + auto& handle = *reinterpret_cast<CallbackHandle*>(buf); + if constexpr (!emper::IO_SINGLE_URING) { + assert(handle.submitter == IoContext::workerIo); + } + user_data = IoContext::createCallbackTag(handle); + } else { + auto& future = *reinterpret_cast<Future*>(buf); + if constexpr (!emper::IO_SINGLE_URING) { + assert(future.submitter == IoContext::workerIo); + } + user_data = IoContext::createFutureTag(future); } - void* user_data = IoContext::createFutureTag(future); + io_uring_prep_cancel(sqe, user_data, 0); } } // namespace emper::io diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp index dd65483f066500be3b5d34ef22aa84658021a30d..749e59a4856269269f1d755c0e8c8bdc13120f3c 100644 --- a/emper/io/Future.hpp +++ b/emper/io/Future.hpp @@ -376,19 +376,6 @@ class Future : public Logger<LogSubsystem::IO> { this->dependency = &dependency; } - /* - * @brief register a callback which is executed in a new Fiber on completion - * - * @param callback Callback reference which is copied and executed on completion. - * It gets passed the value causing the completion. - */ - inline void setCallback(const Callback& callback) { - if (unlikely(this->callback)) { - delete this->callback; - } - this->callback = new CallbackInternal(callback); - } - /* * @brief submit Future for asynchronous completion to an IoContext */ @@ -405,6 +392,44 @@ class Future : public Logger<LogSubsystem::IO> { */ auto cancel() -> int32_t; + /* + * @brief user facing Future::Callback handle + * + * The handle is used to submit and cancel io operations using callbacks. + * + */ + template <class FutureType> + class CallbackHandle { + friend class Future; + friend class IoContext; + + void* callback; + FutureType& future; + IoContext* submitter = nullptr; + + CallbackHandle(void* callback, Future& future) : callback(callback), future(future) {} + + public: + void submit() { submitter = IoContext::WorkerIo; future.submit(); } + void cancel(); + }; + + /* + * @brief register a callback which is executed in a new Fiber on completion + * + * @param callback Callback reference which is copied and executed on completion. + * It gets passed the value causing the completion. + * + * @return the handle + */ + inline auto setCallback(const Callback& callback) -> CallbackHandle<decltype(*this)> { + if (unlikely(this->callback)) { + delete this->callback; + } + this->callback = new CallbackInternal(callback); + return CallbackHandle<decltype(*this)>(this->callback, *this); + } + /** * @brief Block till the IO request is completed * @@ -829,10 +854,13 @@ class AlarmFuture : public Future { * If the target was already started wait() will return -EALREADY. */ class CancelWrapper : public Future { + bool cancelCallback = false; void prepareSqeInternal(struct io_uring_sqe* sqe) override; public: CancelWrapper(Future& future) : Future(Operation::CANCEL, 0, &future, 0, 0){}; + CancelWrapper(CallbackHandle& handle) + : Future(Operation::CANCEL, 0, &handle, 0, 0), cancelCallback(true){}; }; /* diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 67ee0e998e280a623ca8945f12c76e58ee299cd2..7232bad16589868574dfe8f2b45712c70efee827 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -43,6 +43,7 @@ class IoContext : public Logger<LogSubsystem::IO> { friend class ::AbstractWorkStealingScheduler; friend class SubmitActor; friend class Future; + friend class Future::CallbackHandle; friend class SendFuture; friend class RecvFuture; friend class CancelWrapper; @@ -147,7 +148,12 @@ class IoContext : public Logger<LogSubsystem::IO> { enum class PointerTags : uint16_t { Future, Callback, NewWorkNotification }; - [[nodiscard]] static auto createFutureTag(Future &future) -> emper::lib::TaggedPtr { + [[nodiscard]] static auto createCallbackTag(const Future::CallbackHandle &handle) + -> emper::lib::TaggedPtr { + return {handle.callback, static_cast<uint16_t>(PointerTags::Callback)}; + } + + [[nodiscard]] static auto createFutureTag(const Future &future) -> emper::lib::TaggedPtr { if (future.callback) return {future.callback, static_cast<uint16_t>(PointerTags::Callback)}; if (!future.isForgotten()) return {&future, static_cast<uint16_t>(PointerTags::Future)};