From 17776ba241e668460b8fc5f3bd95f9dd84bedbef Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fl.fischer@fau.de>
Date: Fri, 26 Feb 2021 12:29:10 +0100
Subject: [PATCH] add a batch optimization for the global completer

This change introduces new scheduleFromAnywhere methods which take
a range of Fibers to schedule.

Blockable gets a new method returning the fiber used to start
the unblocked context, which is used by Future/PartialCompletableFuture
to provide a way of completion and returning the continuation Fiber
to the caller so they may schedule the continuation how they want.

If the meson option io_batch_anywhere_completions is set the global
completer will collect all callback and continuation fibers before
scheduling them at once when it is done reaping the completions.
The idea is that taking the AnywhereQueue write lock and calling onNewWork
must only be done once.

TODO: investigate if onNewWork should be extended by an amountOfWork
argument which determines how many worker can be awoken and have work to
do. This should be trivially since our WorkerWakeupSemaphore implementations
already support notify_many(), which may be implemented in terms of
notify_all though.
---
 emper/Blockable.hpp                    | 12 +++++++-----
 emper/Emper.hpp                        |  7 +++++++
 emper/PrivateSemaphore.hpp             |  7 +++++++
 emper/Runtime.hpp                      |  5 +++++
 emper/Scheduler.hpp                    | 14 ++++++++++++++
 emper/io/Future.hpp                    | 21 +++++++++++++++++++++
 emper/io/IoContext.cpp                 | 23 ++++++++++++++++++++---
 emper/lib/adt/LockedUnboundedQueue.hpp |  8 ++++++++
 meson.build                            |  1 +
 meson_options.txt                      |  6 ++++++
 10 files changed, 96 insertions(+), 8 deletions(-)

diff --git a/emper/Blockable.hpp b/emper/Blockable.hpp
index 7b2378f6..d0bef641 100644
--- a/emper/Blockable.hpp
+++ b/emper/Blockable.hpp
@@ -41,16 +41,18 @@ class Blockable : public Logger<logSubsystem> {
 		contextManager.saveAndStartNew(std::move(freshContextHook));
 	}
 
-	template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
-	void unblock(Context* context) {
+	auto unblockAndGetContinuation(Context* context) -> Fiber* {
 		assert(context != nullptr);
 		if constexpr (emper::BLOCKED_CONTEXT_SET) {
 			blockedContexts.erase(context);
 		}
 
-		// cppcheck-suppress unsafeClassCanLeak
-		Fiber* unblockFiber =
-				Fiber::from([this, context]() { contextManager.discardAndResume(context); });
+		return Fiber::from([this, context]() { contextManager.discardAndResume(context); });
+	}
+
+	template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
+	void unblock(Context* context) {
+		Fiber* unblockFiber = unblockAndGetContinuation(context);
 
 		if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
 			runtime.schedule(*unblockFiber);
diff --git a/emper/Emper.hpp b/emper/Emper.hpp
index 8a049800..ee265179 100644
--- a/emper/Emper.hpp
+++ b/emper/Emper.hpp
@@ -97,4 +97,11 @@ static const bool IO_URING_SHARED_WQ =
 		false
 #endif
 		;
+static const bool IO_BATCH_ANYWHERE_COMPLETIONS =
+#ifdef EMPER_IO_BATCH_ANYWHERE_COMPLETIONS
+		true
+#else
+		false
+#endif
+		;
 }	 // namespace emper
diff --git a/emper/PrivateSemaphore.hpp b/emper/PrivateSemaphore.hpp
index a72347d6..8e89d21d 100644
--- a/emper/PrivateSemaphore.hpp
+++ b/emper/PrivateSemaphore.hpp
@@ -37,6 +37,13 @@ class PrivateSemaphore : protected Blockable<LogSubsystem::PS> {
 		// invalid at this point.
 	}
 
+	[[nodiscard]] auto signalAndGetContinuation() -> Fiber* {
+		if (Context* readyContext = signalInternal()) {
+			return unblockAndGetContinuation(readyContext);
+		}
+		return nullptr;
+	}
+
 	void signalFromAnywhere() { signal<CallerEnvironment::ANYWHERE>(); }
 
 	void signalAndExit() {
diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp
index 6e7bed73..fbbabc97 100644
--- a/emper/Runtime.hpp
+++ b/emper/Runtime.hpp
@@ -158,6 +158,11 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
 
 	inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); }
 
+	template <class InputIt>
+	inline void scheduleFromAnywhere(InputIt begin, InputIt end) {
+		scheduler.scheduleFromAnywhere(begin, end);
+	}
+
 	void yield();
 
 	// TODO: This should probably not be a public method of Runtime.
diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp
index f49308ed..9bcb9e4d 100644
--- a/emper/Scheduler.hpp
+++ b/emper/Scheduler.hpp
@@ -42,6 +42,11 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
 
 	void enqueueInAnywhereQueue(Fiber& fiber) { scheduleAnywhereQueue.enqueue(&fiber); }
 
+	template <class InputIt>
+	void insertInAnywhereQueue(InputIt begin, InputIt end) {
+		scheduleAnywhereQueue.insert(begin, end);
+	}
+
 	auto dequeueFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); }
 
 	virtual void scheduleInternal(Fiber& fiber) = 0;
@@ -60,4 +65,13 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
 
 		onNewWork<CallerEnvironment::ANYWHERE>();
 	}
+
+	// TODO: investigate if it is still a good idea to wakeup only a single
+	// worker maybe we want something like onNewWork(amountOfWork)
+	template <class InputIt>
+	void scheduleFromAnywhere(InputIt begin, InputIt end) {
+		insertInAnywhereQueue(begin, end);
+
+		onNewWork<CallerEnvironment::ANYWHERE>();
+	}
 };
diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp
index b3c5c5cc..a346253d 100644
--- a/emper/io/Future.hpp
+++ b/emper/io/Future.hpp
@@ -22,6 +22,7 @@
 #include "io/Operation.hpp"	 // for Operation, operator<<, Operati...
 
 struct io_uring_sqe;
+class Fiber;
 
 namespace emper::io {
 class Stats;
@@ -106,6 +107,11 @@ class Future : public Logger<LogSubsystem::IO> {
 		sem.signalFromAnywhere();
 	}
 
+	virtual auto completeAndGetContinuation(int32_t res) -> Fiber* {
+		setCompletion(res);
+		return sem.signalAndGetContinuation();
+	}
+
 	/**
 	 * Used for Stats::recordCompletion double dispatch
 	 */
@@ -377,6 +383,21 @@ class PartialCompletableFuture : public Future {
 		}
 	}
 
+	auto completeAndGetContinuation(int32_t res) -> Fiber* override {
+		CompletionType completion = tryComplete<CallerEnvironment::ANYWHERE>(res);
+
+		switch (completion) {
+			case CompletionType::Resubmission:
+				return nullptr;
+			case CompletionType::IncrementalCompletion:
+				return Future::completeAndGetContinuation(partialCompletion);
+			case CompletionType::Completion:
+				return Future::completeAndGetContinuation(res);
+			default:
+				DIE_MSG("Unknown CompletionType: " << (int)completion);
+		}
+	}
+
  public:
 	/**
 	 * @brief Block till the IO request is completed
diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp
index bd9aebaf..0f52ef84 100644
--- a/emper/io/IoContext.cpp
+++ b/emper/io/IoContext.cpp
@@ -13,6 +13,7 @@
 #include <cstdio>		// for perror
 #include <cstring>	// for memset
 #include <memory>		// for allocator
+#include <vector>
 
 #include "CallerEnvironment.hpp"	// for CallerEnvironment, EMPER, ANYWHERE
 #include "Common.hpp"							// for unlikely, DIE_MSG_ERRNO, DIE_MSG
@@ -191,6 +192,10 @@ void IoContext::reapCompletions() {
 	struct io_uring_cqe *cqe;
 	unsigned count = 0;
 
+	// vector used to batch all completions scheduled to the AnywhereQueue
+	std::vector<Fiber *> continuationFibers;
+	Runtime *runtime = Runtime::getRuntime();
+
 	int err = io_uring_peek_cqe(&ring, &cqe);
 	if (err) {
 		if (err == -EAGAIN) {
@@ -217,11 +222,14 @@ void IoContext::reapCompletions() {
 				c(res);
 				delete &c;
 			});
-			Runtime *runtime = Runtime::getRuntime();
 			if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
 				runtime->schedule(*callbackFiber);
 			} else {
-				runtime->scheduleFromAnywhere(*callbackFiber);
+				if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) {
+					continuationFibers.push_back(callbackFiber);
+				} else {
+					runtime->scheduleFromAnywhere(*callbackFiber);
+				}
 			}
 			continue;
 		}
@@ -235,7 +243,14 @@ void IoContext::reapCompletions() {
 		if constexpr (callerEnvironment == EMPER) {
 			future->complete(cqe->res);
 		} else {
-			future->completeFromAnywhere(cqe->res);
+			if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) {
+				Fiber *continuation = future->completeAndGetContinuation(cqe->res);
+				if (continuation) {
+					continuationFibers.push_back(continuation);
+				}
+			} else {
+				future->completeFromAnywhere(cqe->res);
+			}
 		}
 	}
 
@@ -250,6 +265,8 @@ void IoContext::reapCompletions() {
 	if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
 		stats.record_worker_reaps(count);
 	} else {
+		// actually schedule all completion fibers
+		runtime->scheduleFromAnywhere(continuationFibers.begin(), continuationFibers.end());
 		stats.record_completer_reaps(count);
 	}
 
diff --git a/emper/lib/adt/LockedUnboundedQueue.hpp b/emper/lib/adt/LockedUnboundedQueue.hpp
index 1d76cd74..f6ccda48 100644
--- a/emper/lib/adt/LockedUnboundedQueue.hpp
+++ b/emper/lib/adt/LockedUnboundedQueue.hpp
@@ -20,6 +20,14 @@ class LockedUnboundedQueue {
 		queue.push(item);
 	}
 
+	template <class InputIt>
+	void insert(InputIt begin, InputIt end) {
+		std::lock_guard<std::mutex> lock(queue_mutex);
+		for (; begin != end; ++begin) {
+			queue.push(*begin);
+		}
+	}
+
 	auto dequeue() -> I* {
 		std::lock_guard<std::mutex> lock(queue_mutex);
 		if (queue.empty()) {
diff --git a/meson.build b/meson.build
index e5b6b9d0..d5493868 100644
--- a/meson.build
+++ b/meson.build
@@ -65,6 +65,7 @@ endif
 io_bool_options = [
 	'uring_sqpoll',
 	'uring_shared_wq',
+	'batch_anywhere_completions',
 ]
 
 io_raw_options = [
diff --git a/meson_options.txt b/meson_options.txt
index 95ce8377..610cbdbb 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -109,3 +109,9 @@ option(
   value: false,
   description: 'Share a common async backend between all io_urings'
 )
+option(
+  'io_batch_anywhere_completions',
+  type: 'boolean',
+  value: false,
+  description: 'Collect and schedule all completions reaped by the global completer at once'
+)
-- 
GitLab