From a328256ca9c8e39adac2aa9d5d0de9289c405401 Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fischer@muhq.space>
Date: Sat, 1 May 2021 15:42:57 +0200
Subject: [PATCH] [Runtime] switch batch schedule interface from STL to C array
 style

Using the STL iterator based approach has caused more harm than it
has advantages.

First of all it is unneeded we know how the Fibers we want to schedule are stored
and don't need a algorithmic generic abstraction.

We keep Fibers pointers in continues memory for memory locality benefits
therefore we can simply use the C dynamic array style of passing the
Fibers to the scheduler.

This removes the template from the batched schedule methods and allows us
to use virtual functions to specialize the batched scheduleFromAnywhere
method.
---
 emper/Runtime.cpp                               |  2 +-
 emper/Runtime.hpp                               | 10 ++++------
 emper/Scheduler.hpp                             | 17 +++++++----------
 emper/io/IoContext.cpp                          |  2 +-
 emper/io/IoContext.hpp                          |  7 +++----
 .../lib/adt/BoostSharedMutexUnboundedQueue.hpp  |  7 +++++++
 emper/lib/adt/LockedSet.hpp                     |  7 +++++++
 emper/lib/adt/MutexUnboundedQueue.hpp           |  7 +++++++
 emper/lib/adt/RwLockUnboundedQueue.hpp          |  8 ++++++++
 emper/lib/adt/SharedMutexUnboundedQueue.hpp     |  7 +++++++
 10 files changed, 52 insertions(+), 22 deletions(-)

diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp
index cadd6ed4..5ab9afc2 100644
--- a/emper/Runtime.cpp
+++ b/emper/Runtime.cpp
@@ -284,7 +284,7 @@ auto Runtime::nextFiber() -> NextFiberResult {
 		if (ncompletions > 0) {
 			// Keep the first and schedule the rest
 			Fiber* next = completions[0];
-			schedule(completions.begin() + 1, completions.begin() + ncompletions);
+			schedule(&completions[1], ncompletions - 1);
 
 			// TODO: hint that this fiber comes from the IO subsystem
 			return NextFiberResult{
diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp
index c1868e77..fdd0783a 100644
--- a/emper/Runtime.hpp
+++ b/emper/Runtime.hpp
@@ -136,19 +136,17 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
 		scheduler.schedule(fiber);
 	}
 
-	template <class InputIt>
-	inline void schedule(InputIt begin, InputIt end) {
+	inline void schedule(Fiber** fibers, unsigned count) {
 		// Calling schedule() only works from within the EMPER runtime.
 		assert(inRuntime());
 
-		scheduler.schedule(begin, end);
+		scheduler.schedule(fibers, count);
 	}
 
 	inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); }
 
-	template <class InputIt>
-	inline void scheduleFromAnywhere(InputIt begin, InputIt end) {
-		scheduler.scheduleFromAnywhere(begin, end);
+	inline void scheduleFromAnywhere(Fiber** fibers, unsigned count) {
+		scheduler.scheduleFromAnywhere(fibers, count);
 	}
 
 	void yield();
diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp
index 6ea42bc8..13157322 100644
--- a/emper/Scheduler.hpp
+++ b/emper/Scheduler.hpp
@@ -44,9 +44,8 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
 
 	void enqueueInAnywhereQueue(Fiber& fiber) { scheduleAnywhereQueue.enqueue(&fiber); }
 
-	template <class InputIt>
-	void insertInAnywhereQueue(InputIt begin, InputIt end) {
-		scheduleAnywhereQueue.insert(begin, end);
+	void insertInAnywhereQueue(Fiber** fibers, unsigned count) {
+		scheduleAnywhereQueue.insert(fibers, count);
 	}
 
 	auto dequeueFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); }
@@ -60,10 +59,9 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
 		scheduleInternal(fiber);
 	}
 
-	template <class InputIt>
-	void schedule(InputIt begin, InputIt end) {
-		for (; begin != end; ++begin) {
-			Fiber& fiber = **begin;
+	void schedule(Fiber** fibers, unsigned count) {
+		for (unsigned i = 0; i < count; ++i) {
+			Fiber& fiber = *fibers[i];
 			LOGD("Scheduling batched fiber " << &fiber);
 			scheduleInternal(fiber);
 		}
@@ -79,9 +77,8 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
 
 	// TODO: investigate if it is still a good idea to wakeup only a single
 	// worker maybe we want something like onNewWork(amountOfWork)
-	template <class InputIt>
-	void scheduleFromAnywhere(InputIt begin, InputIt end) {
-		insertInAnywhereQueue(begin, end);
+	void scheduleFromAnywhere(Fiber** fibers, unsigned count) {
+		insertInAnywhereQueue(fibers, count);
 
 		onNewWork<CallerEnvironment::ANYWHERE>();
 	}
diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp
index eebd741f..307dce05 100644
--- a/emper/io/IoContext.cpp
+++ b/emper/io/IoContext.cpp
@@ -318,7 +318,7 @@ reap_cqes:
 
 		if (reReap) {
 			// schedule all already collected continuation fibers
-			runtime.schedule(continuationFibers.begin(), continuationFibers.begin() + posInBuf);
+			runtime.schedule(continuationFibers.data(), posInBuf);
 
 			reReapCount++;
 
diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp
index 31805c34..9c118319 100644
--- a/emper/io/IoContext.hpp
+++ b/emper/io/IoContext.hpp
@@ -215,13 +215,12 @@ class IoContext : public Logger<LogSubsystem::IO> {
 		ContinuationBuffer completionBuf;
 		unsigned ncompletions = reapCompletions<callerEnvironment>(completionBuf);
 		if (ncompletions > 0) {
-			auto *begin = completionBuf.begin();
-			auto *finish = completionBuf.begin() + ncompletions;
+			auto *fibers = completionBuf.data();
 
 			if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
-				runtime.schedule(begin, finish);
+				runtime.schedule(fibers, ncompletions);
 			} else {
-				runtime.scheduleFromAnywhere(begin, finish);
+				runtime.scheduleFromAnywhere(fibers, ncompletions);
 			}
 		}
 	}
diff --git a/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp b/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp
index bd8465b4..939a092c 100644
--- a/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp
+++ b/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp
@@ -28,6 +28,13 @@ class BoostSharedMutexUnboundedQueue {
 		}
 	}
 
+	void insert(I** items, unsigned count) {
+		boost::unique_lock<boost::shared_mutex> lock(queue_mutex);
+		for (unsigned i = 0; i < count; ++i) {
+			queue.push(items[i]);
+		}
+	}
+
 	auto dequeue() -> I* {
 		boost::upgrade_lock<boost::shared_mutex> rlock(queue_mutex);
 		if (queue.empty()) {
diff --git a/emper/lib/adt/LockedSet.hpp b/emper/lib/adt/LockedSet.hpp
index 7c4cccfd..c1f4a692 100644
--- a/emper/lib/adt/LockedSet.hpp
+++ b/emper/lib/adt/LockedSet.hpp
@@ -27,6 +27,13 @@ class LockedSet {
 		_set.insert(first, last);
 	}
 
+	void insert(const Key* items, unsigned count) {
+		std::lock_guard<std::mutex> lock(_mutex);
+		for (unsigned i = 0; i < count; ++i) {
+			_set.insert(items[i]);
+		}
+	}
+
 	auto erase(const Key& key) -> size_t {
 		std::lock_guard<std::mutex> lock(_mutex);
 		return _set.erase(key);
diff --git a/emper/lib/adt/MutexUnboundedQueue.hpp b/emper/lib/adt/MutexUnboundedQueue.hpp
index 5aa84ecb..f146c007 100644
--- a/emper/lib/adt/MutexUnboundedQueue.hpp
+++ b/emper/lib/adt/MutexUnboundedQueue.hpp
@@ -28,6 +28,13 @@ class MutexUnboundedQueue {
 		}
 	}
 
+	void insert(I** items, unsigned count) {
+		std::lock_guard<std::mutex> lock(queue_mutex);
+		for (unsigned i = 0; i < count; ++i) {
+			queue.push(items[i]);
+		}
+	}
+
 	auto dequeue() -> I* {
 		std::lock_guard<std::mutex> lock(queue_mutex);
 		if (queue.empty()) {
diff --git a/emper/lib/adt/RwLockUnboundedQueue.hpp b/emper/lib/adt/RwLockUnboundedQueue.hpp
index 8a305b37..7e887ded 100644
--- a/emper/lib/adt/RwLockUnboundedQueue.hpp
+++ b/emper/lib/adt/RwLockUnboundedQueue.hpp
@@ -55,6 +55,14 @@ class RwLockUnboundedQueue {
 		pthread_rwlock_unlock(&lock);
 	}
 
+	void insert(I** items, unsigned count) {
+		aquire_wrlock(lock);
+		for (unsigned i = 0; i < count; ++i) {
+			queue.push(items[i]);
+		}
+		pthread_rwlock_unlock(&lock);
+	}
+
 	auto dequeue() -> I* {
 		I* res = nullptr;
 
diff --git a/emper/lib/adt/SharedMutexUnboundedQueue.hpp b/emper/lib/adt/SharedMutexUnboundedQueue.hpp
index 5a478da6..c3a78644 100644
--- a/emper/lib/adt/SharedMutexUnboundedQueue.hpp
+++ b/emper/lib/adt/SharedMutexUnboundedQueue.hpp
@@ -28,6 +28,13 @@ class SharedMutexUnboundedQueue {
 		}
 	}
 
+	void insert(I** items, unsigned count) {
+		std::unique_lock lock(queue_mutex);
+		for (unsigned i = 0; i < count; ++i) {
+			queue.push(items[i]);
+		}
+	}
+
 	auto dequeue() -> I* {
 		{
 			std::shared_lock lock(queue_mutex);
-- 
GitLab