From 30fe27bf4091ec7714675eaf93e356151d6ef755 Mon Sep 17 00:00:00 2001
From: Florian Schmaus <flow@cs.fau.de>
Date: Fri, 5 Mar 2021 22:00:27 +0100
Subject: [PATCH] [IO] Use io_uring_peek_batch_cqe()

---
 emper/io/IoContext.cpp | 77 ++++++++++++++++++++++++------------------
 1 file changed, 44 insertions(+), 33 deletions(-)

diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp
index addc76d3..8d8aaa31 100644
--- a/emper/io/IoContext.cpp
+++ b/emper/io/IoContext.cpp
@@ -7,6 +7,7 @@
 #include <sys/eventfd.h>				// for eventfd
 #include <unistd.h>							// for close
 
+#include <array>
 #include <atomic>		// for atomic, __atomic_base
 #include <cassert>	// for assert
 #include <cerrno>		// for errno, ECANCELED, EBUSY, EAGAIN, EINTR
@@ -162,16 +163,33 @@ template void IoContext::submit<CallerEnvironment::ANYWHERE>(Future &future);
 
 template <CallerEnvironment callerEnvironment>
 auto IoContext::reapCompletions() -> std::vector<Fiber *> {
-	// vector returned containing all reaped completions
-	std::vector<Fiber *> continuationFibers;
+	// Should not be more than the uring_entries count.
+	const unsigned CQE_BATCH_COUNT = EMPER_IO_WORKER_URING_ENTRIES;
 
 	uint32_t maxRaceFreeCompleterAttempts = 1;
 
+	using Completion = std::pair<uint32_t, TaggedPtr>;
+	// vector to store seen cqes to make the critical section
+	// where cq_lock is held as small as possible
+	std::vector<Completion> reapedCompletions;
+	reapedCompletions.reserve(CQE_BATCH_COUNT);
+
+	// Vector returned containing all reaped completions in form of fibers.
+	std::vector<Fiber *> continuationFibers;
+	// We reserve capacity in contiunationFibers as soon as we know the
+	// amount of reaped completions.
+
 	// this label is not used for callerEnvironment::ANYWHERE and thus has to be
 	// annotated with ATTR_UNUSED
 reap_cqes:
 	ATTR_UNUSED;
 
+	// never reap completions on the global IoContext
+	assert(this != runtime.globalIo);
+
+	LOGD("Reaping completions");
+	std::array<struct io_uring_cqe *, CQE_BATCH_COUNT> cqes;
+
 	// Someone else is currently reaping completions
 	if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
 		if (unlikely(!cq_lock.try_lock())) {
@@ -185,32 +203,12 @@ reap_cqes:
 		}
 	}
 
-	// never reap completions on the global IoContext
-	assert(this != runtime.globalIo);
-
-	LOGD("Reaping completions");
-	unsigned head;
-	struct io_uring_cqe *cqe;
-	unsigned count = 0;
-
-	using Completion = std::pair<uint32_t, TaggedPtr>;
-	// vector to store seen cqes to make the critical section
-	// where cq_lock is held as small as possible
-	std::vector<Completion> reapedCompletions;
-
-	int err = io_uring_peek_cqe(&ring, &cqe);
-	if (err) {
-		if (err == -EAGAIN) {
-			goto unlock;
-		}
-		errno = -err;
-		DIE_MSG_ERRNO("io_uring_peek_cqe failed");
-	}
-
-	io_uring_for_each_cqe(&ring, head, cqe) {
-		count++;
+	unsigned count = io_uring_peek_batch_cqe(&ring, cqes.data(), CQE_BATCH_COUNT);
 
-		TaggedPtr tptr(io_uring_cqe_get_data(cqe));
+	for (unsigned i = 0; i < count; ++i) {
+		struct io_uring_cqe *cqe = cqes[i];
+		void *cqe_data = io_uring_cqe_get_data(cqe);
+		TaggedPtr tptr(cqe_data);
 
 		// Got a CQE for a forgotten Future
 		if (!tptr) {
@@ -220,17 +218,17 @@ reap_cqes:
 		reapedCompletions.emplace_back(cqe->res, tptr);
 	}
 
-	LOGD("got " << count << " cqes from the io_uring");
 	io_uring_cq_advance(&ring, count);
 
+	uint32_t globalCompleterAttempts = cq_lock.unlock();
+
+	LOGD("got " << count << " cqes from the io_uring");
+
 	if constexpr (emper::DEBUG) {
 		assert(count <= reqs_in_uring);
 		reqs_in_uring -= count;
 	}
 
-unlock:
-	uint32_t globalCompleterAttempts = cq_lock.unlock();
-
 	// A naive try lock protecting a worker's IoContext's cq is racy.
 	// While a worker is holding the lock additional completions could arrive
 	// which the worker does not observe because it could be already finished iterating.
@@ -253,6 +251,8 @@ unlock:
 
 	stats.record_reaps<callerEnvironment>(count);
 
+	continuationFibers.reserve(reapedCompletions.size());
+
 	for (auto &completion : reapedCompletions) {
 		auto res = completion.first;
 		auto tptr = completion.second;
@@ -290,13 +290,24 @@ unlock:
 
 	// check if lost wakeup was possible
 	if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
+		bool reReap = false;
+		// TODO: How sure are we that this is unlikely?
 		if (unlikely(globalCompleterAttempts > maxRaceFreeCompleterAttempts)) {
+			// In all CQ iteration after the first we expect no further globalCompleter attempts
+			maxRaceFreeCompleterAttempts = 0;
+			reReap = true;
+		} else if (count == CQE_BATCH_COUNT) {
+			// We reaped a full batch, this means there could be potentially
+			// more CQEs in the completion queue.
+			reReap = true;
+		}
+
+		if (reReap) {
 			// schedule all already collected continuation fibers
 			runtime.schedule(continuationFibers.begin(), continuationFibers.end());
+			reapedCompletions.clear();
 			continuationFibers.clear();
 
-			// In all CQ iteration after the first we expect no further globalCompleter attempts
-			maxRaceFreeCompleterAttempts = 0;
 			goto reap_cqes;
 		}
 	}
-- 
GitLab