From ad3d2fc110c3046bdf09a13f9207b59f935111fb Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fischer@muhq.space>
Date: Sun, 8 Aug 2021 22:15:21 +0200
Subject: [PATCH] add submit variant using iterators to submit multiple Futures
 at once

---
 apps/Coordinator.cpp   | 14 ++++++++------
 emper/io/IoContext.cpp |  2 +-
 emper/io/IoContext.hpp | 25 +++++++++++++++++++++++--
 3 files changed, 32 insertions(+), 9 deletions(-)

diff --git a/apps/Coordinator.cpp b/apps/Coordinator.cpp
index dada5daa..df1a94aa 100644
--- a/apps/Coordinator.cpp
+++ b/apps/Coordinator.cpp
@@ -11,9 +11,10 @@
 #include <vector>
 
 #include "Common.hpp"
-#include "Future.hpp"
 #include "Runtime.hpp"
 #include "io.hpp"
+#include "io/Future.hpp"
+#include "io/IoContext.hpp"
 
 std::string HOST = "::";
 std::string PORT = "12346";
@@ -27,16 +28,17 @@ void notify() {
 	std::string msg = "Go";
 	for (int conn : conns) {
 		auto* sf = new emper::io::SendFuture(conn, msg.c_str(), msg.size(), 0);
-		if (!futures.empty()) sf->setDependency(*futures.back());
 		futures.push_back(sf);
 	}
 
-	int res = futures.back()->submitAndWait();
-	if (res < 0) DIE_MSG_ERRNO("send failed");
+	IoContext::getIo()->submit(futures.begin(), futures.end());
+	for (auto* f : futures) {
+		int res = f->wait();
+		if (res < 0) DIE_MSG_ERRNO("send failed");
+		delete f;
+	}
 
 	for (int conn : conns) emper::io::closeAndForget(conn);
-
-	for (auto* sf : futures) delete sf;
 }
 
 auto main(int argc, char* argv[]) -> int {
diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp
index 550c55b4..6c53f2b9 100644
--- a/emper/io/IoContext.cpp
+++ b/emper/io/IoContext.cpp
@@ -126,7 +126,7 @@ auto IoContext::submitPreparedSqesAndWait(unsigned wait_nr) -> unsigned {
 template <CallerEnvironment callerEnvironment>
 void IoContext::submitAndWait(Future &future, unsigned wait_nr) {
 	LOGD("submitting " << future << (future.dependency ? " and it's dependencies" : ""));
-	unsigned prepared = prepareFutureChain(future, 1);
+	unsigned prepared = prepareFutureChain(future);
 
 	// submit the Future to the io_uring
 	int submitted = submitPreparedSqesAndWait<callerEnvironment>(wait_nr);
diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp
index f849bb6f..b7825a04 100644
--- a/emper/io/IoContext.hpp
+++ b/emper/io/IoContext.hpp
@@ -95,7 +95,7 @@ class IoContext : public Logger<LogSubsystem::IO> {
 	// Members useful for debugging
 	std::atomic<long> reqs_in_uring = 0;
 
-	// We need to keep track of prepared to sqes to invalidate them short submission
+	// We need to keep track of prepared sqes to invalidate them on short submission
 	// If a user wants to submit a chain of 5 Futures and the third is invalid
 	// io_uring_submit will only submit 3 of 5 prepared sqes leaving the 2 unsubmitted in
 	// the SQ.
@@ -119,9 +119,12 @@ class IoContext : public Logger<LogSubsystem::IO> {
 	 * set.
 	 * Abort on preparing a longer list then there a places in the io_uring.
 	 *
+	 * @param future The future starting the Future chain to prepare
+	 * @param chain_length Recursion parameter tracking the resulting chain length
+	 *
 	 * @return the number of prepared Futures
 	 */
-	auto prepareFutureChain(Future &future, unsigned chain_length) -> unsigned;
+	auto prepareFutureChain(Future &future, unsigned chain_length = 1) -> unsigned;
 
 	enum class PointerTags : uint16_t { Future, Callback, NewWorkAq, NewWorkWsq };
 
@@ -221,6 +224,24 @@ class IoContext : public Logger<LogSubsystem::IO> {
 		submitAndWait(future, 0);
 	}
 
+	/**
+	 * @brief Submit a future for asynchronous completion in this IoContext
+	 *
+	 * @param begin Iterator pointing at the start of the Future collection
+	 * @param end Iterator pointing at the end of the Future collection
+	 *
+	 * TODO: gracefully handle cases where there would be more sqes prepared than
+	 * there is room in the SQ
+	 */
+	template <typename InputIt, CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
+	auto submit(InputIt begin, InputIt end) -> unsigned {
+		for (InputIt cur = begin; cur != end; ++cur) {
+			auto future = *cur;
+			prepareFutureChain(*future);
+		}
+		return submitPreparedSqes<callerEnvironment>();
+	}
+
 	/**
 	 * @brief Submit a future for asynchronous completion in this IoContext and reap completions
 	 *
-- 
GitLab