From 9f545ba08fad48cba0cfbdfaa4777d17669e2dc7 Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fischer@muhq.space>
Date: Tue, 21 Sep 2021 14:16:29 +0200
Subject: [PATCH] [IoContext] replace fancy CountingTryLock with simple CQ
 emptiness check

---
 emper/io/IoContext.cpp                     | 61 ++++++--------------
 emper/io/IoContext.hpp                     | 22 +------
 emper/lib/sync/CountingTryLock.hpp         | 67 ----------------------
 emper/lib/sync/PseudoCountingTryLock.hpp   | 25 --------
 emper/sleep_strategy/PipeSleepStrategy.cpp |  1 +
 meson.build                                |  3 -
 meson_options.txt                          |  7 ---
 7 files changed, 20 insertions(+), 166 deletions(-)
 delete mode 100644 emper/lib/sync/CountingTryLock.hpp
 delete mode 100644 emper/lib/sync/PseudoCountingTryLock.hpp

diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp
index 6a867a05..bc5a01cb 100644
--- a/emper/io/IoContext.cpp
+++ b/emper/io/IoContext.cpp
@@ -220,37 +220,30 @@ template void IoContext::submitAndWait<CallerEnvironment::ANYWHERE>(Future &futu
 template <CallerEnvironment callerEnvironment>
 auto IoContext::reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned {
 	unsigned reReapCount = 0;
-	uint32_t maxRaceFreeCompleterAttempts = 1;
 
 	using Completion = std::pair<int32_t, void *>;
 	// vector to store seen cqes to make the critical section
 	// where cq_lock is held as small as possible
 	std::array<Completion, CQE_BATCH_COUNT> reapedCompletions;
 
-	// this label is not used for callerEnvironment::ANYWHERE and thus has to be
-	// annotated with ATTR_UNUSED
-reap_cqes:
-	ATTR_UNUSED;
-
 	// never reap completions on the global IoContext
 	assert(this != runtime.globalIo);
 
 	LOGD("Reaping completions for worker " << std::to_string(worker->getWorkerId()));
+// this label is not used for callerEnvironment::ANYWHERE and thus has to be
+// annotated with ATTR_UNUSED
+reap_cqes:
+	ATTR_UNUSED;
 	std::array<struct io_uring_cqe *, CQE_BATCH_COUNT> cqes;
 
 	if constexpr (needsCqLock) {
 		// Someone else is currently reaping completions
-		if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
-			if (unlikely(!cq_lock.try_lock())) {
-				LOGD("worker unsuccessful try_lock");
-				return 0;
-			}
-		} else {
-			if (!cq_lock.try_lock_or_increment()) {
-				LOGD("Global completer unsuccessful try_lock_or_increment");
-				return 0;
-			}
+		if (unlikely(!cq_lock.try_lock())) {
+			LOGD("unsuccessful try_lock");
+			return 0;
+		}
 
+		if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) {
 			// We have to check the waitInflight flag with the cq_lock held to
 			// ensure we observe an update by the worker holding the lock.
 			// Otherwise this could happen:
@@ -288,9 +281,8 @@ reap_cqes:
 
 	io_uring_cq_advance(&ring, count);
 
-	uint32_t globalCompleterAttempts;
 	if constexpr (needsCqLock) {
-		globalCompleterAttempts = cq_lock.unlock();
+		cq_lock.unlock();
 	}
 
 	LOGD("got " << count << " cqes from worker " << worker->getWorkerId() << "'s io_uring");
@@ -307,18 +299,8 @@ reap_cqes:
 	// from reaping the additional completions we have a lost wakeup possibly leading
 	// to a completely sleeping runtime with runnable completions in a worker's IoContext.
 
-	// To prevent this race the cq_lock counts the unsuccessful tries from
-	// the globalCompleter.
-	// If a worker observes that the globalCompleter tried to reapCompletions
-	// more than twice we know that a lost wakeup could have occurred and we try to
-	// reap again.
-
-	// In the case a lost wakeup was possible we schedule our reaped cqes
-	// and try again.
-
-	// On all cq iteration after the first we expect no globalCompleterAttempt
-	// or in other words a single globalCompleterAttempt attempt means
-	// additional completions arrive and lost wakeup was possible again.
+	// To prevent this race we check the CQ again for new cqes after we already
+	// dropped the CQ lock and possibly reap again.
 
 	stats.record_reaps<callerEnvironment>(count);
 
@@ -375,26 +357,17 @@ reap_cqes:
 		}
 	}
 
-	// check if lost wakeup was possible
-	if constexpr (needsCqLock && callerEnvironment == CallerEnvironment::EMPER) {
-		bool reReap = false;
-		// TODO: How sure are we that this is unlikely?
-		if (unlikely(globalCompleterAttempts > maxRaceFreeCompleterAttempts)) {
-			// In all CQ iteration after the first we expect no further globalCompleter attempts
-			maxRaceFreeCompleterAttempts = 0;
-			reReap = true;
-		} else if (count == CQE_BATCH_COUNT) {
-			// We reaped a full batch, this means there could be potentially
-			// more CQEs in the completion queue.
-			reReap = true;
-		}
-
+	// check if we missed new cqes
+	// TODO: should only the worker recheck?
+	if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
+		bool reReap = io_uring_cq_ready(&ring) != 0;
 		if (reReap) {
 			// schedule all already collected continuation fibers
 			runtime.schedule(continuationFibers.data(), posInBuf);
 
 			reReapCount++;
 
+			LOGD("Re-Reaping completions");
 			goto reap_cqes;
 		}
 
diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp
index ec9c8eaf..95684fd1 100644
--- a/emper/io/IoContext.hpp
+++ b/emper/io/IoContext.hpp
@@ -11,6 +11,7 @@
 #include <cstdint>		 // for uint64_t
 #include <functional>	 // for less
 #include <iterator>
+#include <mutex>
 #include <vector>
 
 #include "CallerEnvironment.hpp"	// for CallerEnvironment, EMPER
@@ -28,25 +29,6 @@
 class AbstractWorkStealingScheduler;
 class Fiber;
 
-#ifdef EMPER_IO_CQ_LOCK_COUNTING_TRY_LOCK
-#include "lib/sync/CountingTryLock.hpp"
-using CqLock = emper::lib::sync::CountingTryLock;
-
-#elif defined EMPER_IO_CQ_LOCK_MUTEX
-#include <mutex>
-
-#include "lib/sync/PseudoCountingTryLock.hpp"
-using CqLock = emper::lib::sync::PseudoCountingTryLock<std::mutex>;
-
-#elif defined EMPER_IO_CQ_LOCK_SPIN_LOCK
-#include "lib/sync/PseudoCountingTryLock.hpp"
-#include "lib/sync/SpinLock.hpp"
-using CqLock = emper::lib::sync::PseudoCountingTryLock<emper::lib::sync::SpinLock>;
-
-#else
-#error Uknown cq lock implementation
-#endif
-
 namespace emper::sleep_strategy {
 class PipeSleepStrategy;
 }
@@ -73,7 +55,7 @@ class IoContext : public Logger<LogSubsystem::IO> {
 	static constexpr bool needsCqLock =
 			emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none;
 	// TryLock protecting the completion queue of ring.
-	ALIGN_TO_CACHE_LINE CqLock cq_lock;
+	CACHE_LINE_EXCLUSIVE(std::mutex, cq_lock);
 	struct io_uring ring;
 
 	// In a worker's IoContext This eventfd is registered with the io_uring to get completion
diff --git a/emper/lib/sync/CountingTryLock.hpp b/emper/lib/sync/CountingTryLock.hpp
deleted file mode 100644
index cc67ec1b..00000000
--- a/emper/lib/sync/CountingTryLock.hpp
+++ /dev/null
@@ -1,67 +0,0 @@
-// SPDX-License-Identifier: LGPL-3.0-or-later
-// Copyright © 2021 Florian Fischer
-#pragma once
-
-#include <atomic>
-#include <climits>
-
-#include "Common.hpp"
-
-namespace emper::lib::sync {
-
-class CountingTryLock {
- private:
-	// The lower 32-bit are used as the actual lock
-	// The higher 32-bit are used as a counter which is incremented if
-	// try_lock_or_increment fails to acquire the lock.
-
-	// Layout of our lock union:
-	// |--------------  countingLock  --------------|
-	//                     |------- counter  -------|
-	// | lock |
-	// 0    sizeof(std::atomic<bool>)              64
-	// 0                COUNTER_SHIFT              64
-	union {
-		std::atomic<uint64_t> countingLock;
-		std::atomic<bool> lock;
-	};
-
-	static const int COUNTER_SHIFT = 32;
-	static_assert(sizeof(std::atomic<bool>) * CHAR_BIT < COUNTER_SHIFT);
-
-	static const uint64_t LOCKED = 1;
-	static const uint64_t UNLOCKED = 0;
-
- public:
-	CountingTryLock(bool locked = false) : countingLock(locked ? LOCKED : UNLOCKED) {}
-
-	[[nodiscard]] auto try_lock() -> bool { return !lock.exchange(true, std::memory_order_acquire); }
-
-	[[nodiscard]] auto try_lock_or_increment() -> bool {
-		uint64_t oldVal, newVal;
-		oldVal = countingLock.load(std::memory_order_relaxed);
-		for (;;) {
-			// currently unlocked -> try to lock
-			if (oldVal == UNLOCKED) {
-				newVal = LOCKED;
-				// currently locked -> increment the counter
-			} else {
-				newVal = oldVal + (1L << COUNTER_SHIFT);
-			}
-
-			if (countingLock.compare_exchange_weak(oldVal, newVal, std::memory_order_acquire,
-																						 std::memory_order_relaxed)) {
-				break;
-			}
-		}
-
-		return oldVal == UNLOCKED;
-	}
-
-	// release the lock, zero and return the counter
-	auto unlock() -> uint32_t {
-		uint64_t oldVal = countingLock.exchange(UNLOCKED, std::memory_order_release);
-		return static_cast<uint32_t>(oldVal >> COUNTER_SHIFT);
-	}
-};
-}	 // namespace emper::lib::sync
diff --git a/emper/lib/sync/PseudoCountingTryLock.hpp b/emper/lib/sync/PseudoCountingTryLock.hpp
deleted file mode 100644
index 2b278bad..00000000
--- a/emper/lib/sync/PseudoCountingTryLock.hpp
+++ /dev/null
@@ -1,25 +0,0 @@
-// SPDX-License-Identifier: LGPL-3.0-or-later
-// Copyright © 2021 Florian Fischer
-#pragma once
-
-namespace emper::lib::sync {
-
-template <class Lockable>
-class PseudoCountingTryLock {
- private:
-	Lockable lock;
-
- public:
-	[[nodiscard]] auto try_lock() -> bool {
-		lock.lock();
-		return true;
-	}
-
-	[[nodiscard]] auto try_lock_or_increment() -> bool { return try_lock(); }
-
-	auto unlock() -> uint32_t {
-		lock.unlock();
-		return 0;
-	}
-};
-}	 // namespace emper::lib::sync
diff --git a/emper/sleep_strategy/PipeSleepStrategy.cpp b/emper/sleep_strategy/PipeSleepStrategy.cpp
index 3c02694e..51ba1713 100644
--- a/emper/sleep_strategy/PipeSleepStrategy.cpp
+++ b/emper/sleep_strategy/PipeSleepStrategy.cpp
@@ -6,6 +6,7 @@
 
 #include <atomic>
 #include <cassert>
+#include <mutex>
 
 #include "CallerEnvironment.hpp"
 #include "Emper.hpp"
diff --git a/meson.build b/meson.build
index d6f9cbfd..ce0f95be 100644
--- a/meson.build
+++ b/meson.build
@@ -111,9 +111,6 @@ foreach option : io_raw_options
 	conf_data.set('EMPER_IO_' + option.to_upper(), get_option('io_' + option))
 endforeach
 
-io_cq_lock_impl = get_option('io_cq_lock_implementation')
-conf_data.set('EMPER_IO_CQ_LOCK_' + io_cq_lock_impl.to_upper(), true)
-
 io_completer_behavior = get_option('io_completer_behavior')
 if io_completer_behavior == 'maybe_wakeup'
 	if get_option('worker_sleep')
diff --git a/meson_options.txt b/meson_options.txt
index 1cd5d81a..b98b9f8e 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -140,13 +140,6 @@ option(
   value: false,
   description: 'Share a common async backend between all io_urings'
 )
-option(
-  'io_cq_lock_implementation',
-  type: 'combo',
-  description: 'The lock implementation used to protect a worker IoContext CQ',
-  choices: ['spin_lock', 'counting_try_lock', 'mutex'],
-  value: 'counting_try_lock',
-)
 option(
   'io_completer_behavior',
   type: 'combo',
-- 
GitLab