From 941f22fabac3acb72288e1b068357c19d5fe81ab Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fl.fischer@fau.de>
Date: Mon, 1 Mar 2021 15:49:35 +0100
Subject: [PATCH] [Runtime] add iterator-based scheduling and optimize
 Runtime::nextFiber

---
 emper/Runtime.cpp      | 12 +++++++++++-
 emper/Runtime.hpp      |  8 ++++++++
 emper/Scheduler.hpp    |  9 +++++++++
 emper/io/IoContext.hpp |  7 +++----
 4 files changed, 31 insertions(+), 5 deletions(-)

diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp
index 82736087..c9ec2e99 100644
--- a/emper/Runtime.cpp
+++ b/emper/Runtime.cpp
@@ -31,6 +31,7 @@
 #include "io/IoContext.hpp"					 // for IoContext
 #include "io/Stats.hpp"							 // for emper::io::Stats
 #include "lib/DebugUtil.hpp"
+#include "strategies/AbstractWorkStealingScheduler.hpp"
 
 #ifdef EMPER_DEFAULT_SCHEDULING_STRATEGY_WORK_STEALING
 #include "strategies/ws/WsStrategyFactory.hpp"
@@ -253,7 +254,16 @@ void Runtime::yield() {
 auto Runtime::nextFiber() -> NextFiberResult {
 	if constexpr (emper::IO) {
 		// Schedule all fibers waiting on completed IO
-		IoContext::getWorkerIo()->reapAndScheduleCompletions();
+		std::vector<Fiber*> completions = IoContext::getWorkerIo()->reapCompletions();
+		if (!completions.empty()) {
+			// Keep the first and schedule the rest
+			Fiber* next = completions[0];
+			schedule(completions.begin() + 1, completions.end());
+
+			// TODO: hint that this fiber comes from the IO subsystem
+			return NextFiberResult{
+					next, static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::local)};
+		}
 	}
 
 	return scheduler.nextFiber();
diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp
index 2edd7aa9..787c7c11 100644
--- a/emper/Runtime.hpp
+++ b/emper/Runtime.hpp
@@ -150,6 +150,14 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
 		scheduler.schedule(fiber);
 	}
 
+	template <class InputIt>
+	inline void schedule(InputIt begin, InputIt end) {
+		// Calling schedule() only works from within the EMPER runtime.
+		assert(inRuntime());
+
+		scheduler.schedule(begin, end);
+	}
+
 	inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); }
 
 	template <class InputIt>
diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp
index 85fb8d80..9263f1e7 100644
--- a/emper/Scheduler.hpp
+++ b/emper/Scheduler.hpp
@@ -60,6 +60,15 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
 		scheduleInternal(fiber);
 	}
 
+	template <class InputIt>
+	void schedule(InputIt begin, InputIt end) {
+		for (; begin != end; ++begin) {
+			Fiber& fiber = **begin;
+			LOGD("Scheduling batched fiber " << &fiber);
+			scheduleInternal(fiber);
+		}
+	}
+
 	virtual auto nextFiber() -> NextFiberResult = 0;
 
 	void scheduleFromAnywhere(Fiber& fiber) {
diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp
index 0a2e92f8..100a11a3 100644
--- a/emper/io/IoContext.hpp
+++ b/emper/io/IoContext.hpp
@@ -134,12 +134,11 @@ class IoContext : public Logger<LogSubsystem::IO> {
 	template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
 	void reapAndScheduleCompletions() {
 		auto completions = reapCompletions();
-		for (auto it = completions.begin(); it != completions.end(); ++it) {
-			Fiber *fiber = *it;
+		if (!completions.empty()) {
 			if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
-				runtime.schedule(*fiber);
+				runtime.schedule(completions.begin(), completions.end());
 			} else {
-				runtime.scheduleFromAnywhere(*fiber);
+				runtime.scheduleFromAnywhere(completions.begin(), completions.end());
 			}
 		}
 	}
-- 
GitLab