From eec49cf1e85e42f6b2ebee9dcacdb5950c91cdf2 Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fischer@muhq.space>
Date: Fri, 4 Mar 2022 11:40:20 +0100
Subject: [PATCH] introduce simple CallbackHandle

A CallbackHandle is used to cancel a previosuly submitted future with
set callback without keeping the future object around.

A CallbackHandle remembers the callback pointer and the IoContext
where the callback was submitted to submit a cancel operation for the
pointer on the same IoContext.

A CallbackHandle can be obtained by calling
Future::submitAndGetCallbackHandle().
---
 emper/io/CallbackHandle.cpp   | 30 +++++++++++++++++++++++++++
 emper/io/CallbackHandle.hpp   | 21 +++++++++++++++++++
 emper/io/Future.cpp           | 14 +++++++++----
 emper/io/Future.hpp           | 38 ++++++++++++++++++++++++-----------
 emper/io/IoContext.hpp        |  9 ++++++++-
 emper/io/meson.build          |  1 +
 tests/io/CancelFutureTest.cpp | 20 ++++++++++++++++++
 7 files changed, 116 insertions(+), 17 deletions(-)
 create mode 100644 emper/io/CallbackHandle.cpp
 create mode 100644 emper/io/CallbackHandle.hpp

diff --git a/emper/io/CallbackHandle.cpp b/emper/io/CallbackHandle.cpp
new file mode 100644
index 00000000..465eba6d
--- /dev/null
+++ b/emper/io/CallbackHandle.cpp
@@ -0,0 +1,30 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2022 Florian Fischer
+#include "io/CallbackHandle.hpp"
+
+#include <cassert>
+
+#include "Emper.hpp"
+#include "Fiber.hpp"
+#include "Runtime.hpp"
+#include "Worker.hpp"
+#include "io/Future.hpp"
+#include "io/IoContext.hpp"
+#include "lib/TaggedPtr.hpp"
+
+namespace emper::io {
+
+auto CallbackHandle::cancel() -> int32_t {
+	assert(callback);
+	CancelWrapper cancellation(IoContext::createCallbackTag(*this));
+	if (!emper::IO_SINGLE_URING && submitter != IoContext::workerIo) {
+		assert(submitter);
+		submitter->runtime.scheduleOn(*Fiber::from([&cancellation] { cancellation.submit(); }),
+																	submitter->worker->getWorkerId());
+	} else {
+		cancellation.submit();
+	}
+	int32_t res = cancellation.wait();
+	return res;
+}
+}	 // namespace emper::io
diff --git a/emper/io/CallbackHandle.hpp b/emper/io/CallbackHandle.hpp
new file mode 100644
index 00000000..35803321
--- /dev/null
+++ b/emper/io/CallbackHandle.hpp
@@ -0,0 +1,21 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2022 Florian Fischer
+#pragma once
+
+#include <stdint.h>
+
+namespace emper::io {
+class IoContext;
+class CallbackHandle {
+	friend class Future;
+	friend class IoContext;
+
+	void* callback = nullptr;
+	IoContext* submitter = nullptr;
+	CallbackHandle(void* callback, IoContext* submitter) : callback(callback), submitter(submitter) {}
+
+ public:
+	CallbackHandle() {}
+	auto cancel() -> int32_t;
+};
+}	 // namespace emper::io
diff --git a/emper/io/Future.cpp b/emper/io/Future.cpp
index 329386c9..7cf65e6a 100644
--- a/emper/io/Future.cpp
+++ b/emper/io/Future.cpp
@@ -176,11 +176,17 @@ normal_submit:
 }
 
 void CancelWrapper::prepareSqeInternal(struct io_uring_sqe* sqe) {
-	auto& future = *reinterpret_cast<Future*>(buf);
-	if constexpr (!emper::IO_SINGLE_URING) {
-		assert(future.submitter == IoContext::workerIo);
+	void* user_data;
+	if (!cancelRaw) {
+		auto& future = *reinterpret_cast<Future*>(buf);
+		if constexpr (!emper::IO_SINGLE_URING) {
+			assert(future.submitter == IoContext::workerIo);
+		}
+		user_data = IoContext::createFutureTag(future);
+	} else {
+		user_data = buf;
 	}
-	void* user_data = IoContext::createFutureTag(future);
+
 	io_uring_prep_cancel(sqe, user_data, 0);
 }
 }	 // namespace emper::io
diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp
index dd65483f..a2b0a9f3 100644
--- a/emper/io/Future.hpp
+++ b/emper/io/Future.hpp
@@ -24,6 +24,7 @@
 #include "Debug.hpp"	// for LOGD, LogSubsystem, LogSubsyst...
 #include "Emper.hpp"	// for DEBUG
 #include "emper-common.h"
+#include "io/CallbackHandle.hpp"
 #include "io/Operation.hpp"	 // for Operation, operator<<, Operati...
 
 class Fiber;
@@ -376,6 +377,22 @@ class Future : public Logger<LogSubsystem::IO> {
 		this->dependency = &dependency;
 	}
 
+	/*
+	 * @brief submit Future for asynchronous completion to an IoContext
+	 */
+	virtual void submit();
+
+	/**
+	 * @brief cancel Future and wait if necessary for its cancellation
+	 *
+	 * @return return the result received from the io_uring
+	 *
+	 * It is guarantied that after cancel() returned the memory of a Future and
+	 * all necessary resources for the operation are no longer used and
+	 * referenced by EMPER and thus can be freed.
+	 */
+	auto cancel() -> int32_t;
+
 	/*
 	 * @brief register a callback which is executed in a new Fiber on completion
 	 *
@@ -390,20 +407,15 @@ class Future : public Logger<LogSubsystem::IO> {
 	}
 
 	/*
-	 * @brief submit Future for asynchronous completion to an IoContext
-	 */
-	virtual void submit();
-
-	/**
-	 * @brief cancel Future and wait if necessary for its cancellation
+	 * @brief submit a future with set callback and get a CallbackHandle
 	 *
-	 * @return return the result received from the io_uring
-	 *
-	 * It is guarantied that after cancel() returned the memory of a Future and
-	 * all necessary resources for the operation are no longer used and
-	 * referenced by EMPER and thus can be freed.
+	 * @return the handle used to cancel the submitted Callback
 	 */
-	auto cancel() -> int32_t;
+	inline auto submitAndGetCallbackHandle() -> CallbackHandle {
+		assert(this->callback);
+		submitAndForget();
+		return {this->callback, this->submitter};
+	}
 
 	/**
 	 * @brief Block till the IO request is completed
@@ -829,10 +841,12 @@ class AlarmFuture : public Future {
  * If the target was already started wait() will return -EALREADY.
  */
 class CancelWrapper : public Future {
+	bool cancelRaw = false;
 	void prepareSqeInternal(struct io_uring_sqe* sqe) override;
 
  public:
 	CancelWrapper(Future& future) : Future(Operation::CANCEL, 0, &future, 0, 0){};
+	CancelWrapper(void* user_data) : Future(Operation::CANCEL, 0, user_data, 0, 0), cancelRaw(true){};
 };
 
 /*
diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp
index 67ee0e99..b0303c65 100644
--- a/emper/io/IoContext.hpp
+++ b/emper/io/IoContext.hpp
@@ -28,6 +28,7 @@
 #include "StealingResult.hpp"
 #include "Worker.hpp"
 #include "emper-config.h"
+#include "io/CallbackHandle.hpp"
 #include "io/Future.hpp"
 #include "io/Stats.hpp"
 #include "io/SubmitActor.hpp"	 // IWYU pragma: keep
@@ -43,6 +44,7 @@ class IoContext : public Logger<LogSubsystem::IO> {
 	friend class ::AbstractWorkStealingScheduler;
 	friend class SubmitActor;
 	friend class Future;
+	friend class CallbackHandle;
 	friend class SendFuture;
 	friend class RecvFuture;
 	friend class CancelWrapper;
@@ -147,7 +149,12 @@ class IoContext : public Logger<LogSubsystem::IO> {
 
 	enum class PointerTags : uint16_t { Future, Callback, NewWorkNotification };
 
-	[[nodiscard]] static auto createFutureTag(Future &future) -> emper::lib::TaggedPtr {
+	[[nodiscard]] static auto createCallbackTag(const CallbackHandle &handle)
+			-> emper::lib::TaggedPtr {
+		return {handle.callback, static_cast<uint16_t>(PointerTags::Callback)};
+	}
+
+	[[nodiscard]] static auto createFutureTag(const Future &future) -> emper::lib::TaggedPtr {
 		if (future.callback) return {future.callback, static_cast<uint16_t>(PointerTags::Callback)};
 
 		if (!future.isForgotten()) return {&future, static_cast<uint16_t>(PointerTags::Future)};
diff --git a/emper/io/meson.build b/emper/io/meson.build
index 2f5a5d40..27fae5e0 100644
--- a/emper/io/meson.build
+++ b/emper/io/meson.build
@@ -19,6 +19,7 @@ endif
 emper_io_include = include_directories('.')
 emper_library_include += [emper_io_include]
 emper_cpp_sources += files(
+  'CallbackHandle.cpp',
   'io.cpp',
   'Future.cpp',
   'Stats.cpp',
diff --git a/tests/io/CancelFutureTest.cpp b/tests/io/CancelFutureTest.cpp
index 46ac6eb0..9e2ace69 100644
--- a/tests/io/CancelFutureTest.cpp
+++ b/tests/io/CancelFutureTest.cpp
@@ -14,6 +14,7 @@
 #include "emper.hpp"
 #include "fixtures/assert.hpp"
 #include "io.hpp"
+#include "io/CallbackHandle.hpp"
 #include "io/Future.hpp"
 #include "lib/LinuxVersion.hpp"
 
@@ -24,6 +25,24 @@ int efd;
 uint64_t read_buf;
 uint64_t write_buf = 42;
 
+static void cancelCallbackHandle() {
+	emper::io::CallbackHandle handle;
+	{
+		ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0);
+		readFuture.setCallback([](int32_t res) { ASSERT(res == -ECANCELED) });
+		handle = readFuture.submitAndGetCallbackHandle();
+	}
+	int32_t res = handle.cancel();
+
+	if constexpr (emper::IO_SINGLE_URING) {
+		// When using a single io_uring it is possible the submitted Future was
+		// canceled before reaching the io_uring.
+		ASSERT(!res || res == -ENOENT);
+	} else {
+		ASSERT(!res);
+	}
+}
+
 static void cancelSubmittedCallback() {
 	ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0);
 	readFuture.setCallback([](int32_t res) { ASSERT(res == -ECANCELED) });
@@ -151,6 +170,7 @@ void emperTest() {
 		DIE_MSG_ERRNO("eventfd failed");
 	}
 
+	cancelCallbackHandle();
 	cancelSubmittedCallback();
 	cancelNotSubmitted();
 	cancelSubmittedNotCompleted();
-- 
GitLab