From 31de63c7fe4d20cb6b45e585e226181f39f15e14 Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fl.fischer@fau.de>
Date: Tue, 15 Dec 2020 17:47:05 +0100
Subject: [PATCH] [IO] make Future::submit thread safe

Future::submit<ANYWHERE> and Future::submit<EMPER> can be executed
in parallel by the globalCompleter and.
To prevent possible data races of the io_urings SQ a new mutex
is introduced.
This new sq_mutex must be unlocked before calling reapCompletions() because
reapCompletions() may call Future::submit on not fully completed Futures
and thus causing a deadlock.
---
 emper/io/Future.cpp    | 23 +++++++++++++++++++++--
 emper/io/IoContext.hpp |  7 +++++++
 2 files changed, 28 insertions(+), 2 deletions(-)

diff --git a/emper/io/Future.cpp b/emper/io/Future.cpp
index b71c00bb..c3cbb74b 100644
--- a/emper/io/Future.cpp
+++ b/emper/io/Future.cpp
@@ -101,6 +101,13 @@ void Future::submit() {
 		origin.uringFutureSet.insert(this);
 	}
 
+	// protect the sq from parallel modification from the completer
+	// calling submit<ANYWHERE> and the worker owning origin calling
+	// submit<EMPER>.
+	// To prevent deadlocks when reapCompletions is resubmitting not fully completed
+	// Futures we must unlock  aq_mutex before calling reapCompletions.
+	std::unique_lock<std::mutex> sq_lock(origin.sq_mutex);
+
 	struct io_uring_sqe *sqe = io_uring_get_sqe(&origin.ring);
 	// no entries in submission queue left -> try to reap some completions
 	if (!sqe) {
@@ -108,7 +115,13 @@ void Future::submit() {
 		LOGD("Busy waiting for a free sqe");
 
 		TIME_NS(
-				while (!(sqe = io_uring_get_sqe(&origin.ring))) { origin.reapCompletions(); },
+				{
+					while (!(sqe = io_uring_get_sqe(&origin.ring))) {
+						sq_lock.unlock();
+						origin.reapCompletions();
+						sq_lock.lock();
+					}
+				},
 				origin.stats.record_get_sqe_busy_wait);
 	}
 
@@ -121,7 +134,13 @@ void Future::submit() {
 		LOGW("io_submit returned EBUSY trying to submit in addition to " << origin.reqs_in_uring);
 
 		TIME_NS(
-				while ((submitted = io_uring_submit(&origin.ring)) == -EBUSY) { origin.reapCompletions(); },
+				{
+					while ((submitted = io_uring_submit(&origin.ring)) == -EBUSY) {
+						sq_lock.lock();
+						origin.reapCompletions();
+						sq_lock.unlock();
+					}
+				},
 				origin.stats.record_io_submit_busy_wait);
 	}
 
diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp
index bf47f379..120a625a 100644
--- a/emper/io/IoContext.hpp
+++ b/emper/io/IoContext.hpp
@@ -37,8 +37,15 @@ class IoContext : public Logger<LogSubsystem::IO> {
 	/* function executed by the global completer thread */
 	static auto globalCompleterFunc(void *) -> void *;
 
+	// Mutex protecting the submission queue of ring.
+	// Used in Future::submit<> to prevent parallel access from the globalCompleter and the worker
+	std::mutex sq_mutex;
+	// Mutex protecting the completion queue of ring.
 	std::mutex cq_mutex;
 	struct io_uring ring;
+
+	// eventfd registered with ring. It is used to notify the globalCompleter about
+	// available completions in ring.
 	int ring_eventfd;
 	uint64_t ring_eventfd_readbuf;
 
-- 
GitLab