diff --git a/emper/io/CallbackHandle.cpp b/emper/io/CallbackHandle.cpp new file mode 100644 index 0000000000000000000000000000000000000000..465eba6d0ab7e507da1d9ffb99a2c6ea47b2e7d3 --- /dev/null +++ b/emper/io/CallbackHandle.cpp @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Fischer +#include "io/CallbackHandle.hpp" + +#include <cassert> + +#include "Emper.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" +#include "Worker.hpp" +#include "io/Future.hpp" +#include "io/IoContext.hpp" +#include "lib/TaggedPtr.hpp" + +namespace emper::io { + +auto CallbackHandle::cancel() -> int32_t { + assert(callback); + CancelWrapper cancellation(IoContext::createCallbackTag(*this)); + if (!emper::IO_SINGLE_URING && submitter != IoContext::workerIo) { + assert(submitter); + submitter->runtime.scheduleOn(*Fiber::from([&cancellation] { cancellation.submit(); }), + submitter->worker->getWorkerId()); + } else { + cancellation.submit(); + } + int32_t res = cancellation.wait(); + return res; +} +} // namespace emper::io diff --git a/emper/io/CallbackHandle.hpp b/emper/io/CallbackHandle.hpp new file mode 100644 index 0000000000000000000000000000000000000000..358033217ce05343ae00987ff9cfcb44f4241ede --- /dev/null +++ b/emper/io/CallbackHandle.hpp @@ -0,0 +1,21 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Fischer +#pragma once + +#include <stdint.h> + +namespace emper::io { +class IoContext; +class CallbackHandle { + friend class Future; + friend class IoContext; + + void* callback = nullptr; + IoContext* submitter = nullptr; + CallbackHandle(void* callback, IoContext* submitter) : callback(callback), submitter(submitter) {} + + public: + CallbackHandle() {} + auto cancel() -> int32_t; +}; +} // namespace emper::io diff --git a/emper/io/Future.cpp b/emper/io/Future.cpp index 329386c9781a3454dec9030e9b8651205c30a6d1..7cf65e6ae2c205b2c773deb1ccd741fe8ab559d9 100644 --- a/emper/io/Future.cpp +++ b/emper/io/Future.cpp @@ -176,11 +176,17 @@ normal_submit: } void CancelWrapper::prepareSqeInternal(struct io_uring_sqe* sqe) { - auto& future = *reinterpret_cast<Future*>(buf); - if constexpr (!emper::IO_SINGLE_URING) { - assert(future.submitter == IoContext::workerIo); + void* user_data; + if (!cancelRaw) { + auto& future = *reinterpret_cast<Future*>(buf); + if constexpr (!emper::IO_SINGLE_URING) { + assert(future.submitter == IoContext::workerIo); + } + user_data = IoContext::createFutureTag(future); + } else { + user_data = buf; } - void* user_data = IoContext::createFutureTag(future); + io_uring_prep_cancel(sqe, user_data, 0); } } // namespace emper::io diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp index dd65483f066500be3b5d34ef22aa84658021a30d..a2b0a9f3147a443b4b69ffb444601451a148c4eb 100644 --- a/emper/io/Future.hpp +++ b/emper/io/Future.hpp @@ -24,6 +24,7 @@ #include "Debug.hpp" // for LOGD, LogSubsystem, LogSubsyst... #include "Emper.hpp" // for DEBUG #include "emper-common.h" +#include "io/CallbackHandle.hpp" #include "io/Operation.hpp" // for Operation, operator<<, Operati... class Fiber; @@ -376,6 +377,22 @@ class Future : public Logger<LogSubsystem::IO> { this->dependency = &dependency; } + /* + * @brief submit Future for asynchronous completion to an IoContext + */ + virtual void submit(); + + /** + * @brief cancel Future and wait if necessary for its cancellation + * + * @return return the result received from the io_uring + * + * It is guarantied that after cancel() returned the memory of a Future and + * all necessary resources for the operation are no longer used and + * referenced by EMPER and thus can be freed. + */ + auto cancel() -> int32_t; + /* * @brief register a callback which is executed in a new Fiber on completion * @@ -390,20 +407,15 @@ class Future : public Logger<LogSubsystem::IO> { } /* - * @brief submit Future for asynchronous completion to an IoContext - */ - virtual void submit(); - - /** - * @brief cancel Future and wait if necessary for its cancellation + * @brief submit a future with set callback and get a CallbackHandle * - * @return return the result received from the io_uring - * - * It is guarantied that after cancel() returned the memory of a Future and - * all necessary resources for the operation are no longer used and - * referenced by EMPER and thus can be freed. + * @return the handle used to cancel the submitted Callback */ - auto cancel() -> int32_t; + inline auto submitAndGetCallbackHandle() -> CallbackHandle { + assert(this->callback); + submitAndForget(); + return {this->callback, this->submitter}; + } /** * @brief Block till the IO request is completed @@ -829,10 +841,12 @@ class AlarmFuture : public Future { * If the target was already started wait() will return -EALREADY. */ class CancelWrapper : public Future { + bool cancelRaw = false; void prepareSqeInternal(struct io_uring_sqe* sqe) override; public: CancelWrapper(Future& future) : Future(Operation::CANCEL, 0, &future, 0, 0){}; + CancelWrapper(void* user_data) : Future(Operation::CANCEL, 0, user_data, 0, 0), cancelRaw(true){}; }; /* diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 67ee0e998e280a623ca8945f12c76e58ee299cd2..b0303c65883fafb01c71ed3ed95f22164bc0a147 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -28,6 +28,7 @@ #include "StealingResult.hpp" #include "Worker.hpp" #include "emper-config.h" +#include "io/CallbackHandle.hpp" #include "io/Future.hpp" #include "io/Stats.hpp" #include "io/SubmitActor.hpp" // IWYU pragma: keep @@ -43,6 +44,7 @@ class IoContext : public Logger<LogSubsystem::IO> { friend class ::AbstractWorkStealingScheduler; friend class SubmitActor; friend class Future; + friend class CallbackHandle; friend class SendFuture; friend class RecvFuture; friend class CancelWrapper; @@ -147,7 +149,12 @@ class IoContext : public Logger<LogSubsystem::IO> { enum class PointerTags : uint16_t { Future, Callback, NewWorkNotification }; - [[nodiscard]] static auto createFutureTag(Future &future) -> emper::lib::TaggedPtr { + [[nodiscard]] static auto createCallbackTag(const CallbackHandle &handle) + -> emper::lib::TaggedPtr { + return {handle.callback, static_cast<uint16_t>(PointerTags::Callback)}; + } + + [[nodiscard]] static auto createFutureTag(const Future &future) -> emper::lib::TaggedPtr { if (future.callback) return {future.callback, static_cast<uint16_t>(PointerTags::Callback)}; if (!future.isForgotten()) return {&future, static_cast<uint16_t>(PointerTags::Future)}; diff --git a/emper/io/meson.build b/emper/io/meson.build index 2f5a5d4033b502ccfa6f770d813cb1d1b60fc709..27fae5e05fa58dbabe0a9b84aaeeb5037f6a514c 100644 --- a/emper/io/meson.build +++ b/emper/io/meson.build @@ -19,6 +19,7 @@ endif emper_io_include = include_directories('.') emper_library_include += [emper_io_include] emper_cpp_sources += files( + 'CallbackHandle.cpp', 'io.cpp', 'Future.cpp', 'Stats.cpp', diff --git a/tests/io/CancelFutureTest.cpp b/tests/io/CancelFutureTest.cpp index 46ac6eb0a682a0fa0592190f981e6ca120a868b9..9e2ace69bdae8184f71d6ec98d840ae18e8d7f97 100644 --- a/tests/io/CancelFutureTest.cpp +++ b/tests/io/CancelFutureTest.cpp @@ -14,6 +14,7 @@ #include "emper.hpp" #include "fixtures/assert.hpp" #include "io.hpp" +#include "io/CallbackHandle.hpp" #include "io/Future.hpp" #include "lib/LinuxVersion.hpp" @@ -24,6 +25,24 @@ int efd; uint64_t read_buf; uint64_t write_buf = 42; +static void cancelCallbackHandle() { + emper::io::CallbackHandle handle; + { + ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); + readFuture.setCallback([](int32_t res) { ASSERT(res == -ECANCELED) }); + handle = readFuture.submitAndGetCallbackHandle(); + } + int32_t res = handle.cancel(); + + if constexpr (emper::IO_SINGLE_URING) { + // When using a single io_uring it is possible the submitted Future was + // canceled before reaching the io_uring. + ASSERT(!res || res == -ENOENT); + } else { + ASSERT(!res); + } +} + static void cancelSubmittedCallback() { ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); readFuture.setCallback([](int32_t res) { ASSERT(res == -ECANCELED) }); @@ -151,6 +170,7 @@ void emperTest() { DIE_MSG_ERRNO("eventfd failed"); } + cancelCallbackHandle(); cancelSubmittedCallback(); cancelNotSubmitted(); cancelSubmittedNotCompleted();