From e6cc92f161a3c51c3cc711097dc0a2a58ccdb74c Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fl.fischer@fau.de>
Date: Fri, 26 Feb 2021 17:49:30 +0100
Subject: [PATCH] [IO] fix the possible lost wakeup for the IoContext::cq_lock
 race

Our current naive try lock protecting a worker's IoContext's cq is racy.
This fact alone is no problem a try lock is by design racy in the sense
that two threads race who can take the lock.

The actual problem is:

While a worker is holding the lock additional completions could arrive
which the worker does not observe because it could be already finished
iterating the CQ.

In the case that the worker still holds the lock preventing the globalCompleter
from reaping the additional completions there exists a lost wakeup problem
possibly leading to a completely sleeping runtime with runnable completions
in a worker's IoContext.

To prevent this lost wakeup the cq_lock now counts the unsuccessful
lock attempts from the globalCompleter.

If a worker observes that the globalCompleter tried to reapCompletions
more than once we know that a lost wakeup could have occurred and we try to
reap again.
Observing one attempt is normal since we know the globalCompleter and the
worker owning the IoContext race for the cq_lock required to reap completions.

Additionally:

* Reduce the critical section in which the cq_lock is held by copying all
  seen cqes and completing the Futures after the lock was released.

* Don't immediately schedule blocked Fibers or Callbacks rather collect them
  an return them as batch. Maybe the caller knows better what to to with a
  batch of runnable Fibers
---
 emper/Runtime.cpp                  |   2 +-
 emper/io/GlobalIoContext.cpp       |   2 +-
 emper/io/IoContext.cpp             | 167 ++++++++++++++++++-----------
 emper/io/IoContext.hpp             |  29 ++++-
 emper/io/Stats.hpp                 |  20 ++--
 emper/lib/adt/AtomicTryLock.hpp    |  25 -----
 emper/lib/sync/CountingTryLock.hpp |  67 ++++++++++++
 7 files changed, 208 insertions(+), 104 deletions(-)
 delete mode 100644 emper/lib/adt/AtomicTryLock.hpp
 create mode 100644 emper/lib/sync/CountingTryLock.hpp

diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp
index 831f15c3..82736087 100644
--- a/emper/Runtime.cpp
+++ b/emper/Runtime.cpp
@@ -253,7 +253,7 @@ void Runtime::yield() {
 auto Runtime::nextFiber() -> NextFiberResult {
 	if constexpr (emper::IO) {
 		// Schedule all fibers waiting on completed IO
-		IoContext::getWorkerIo()->reapCompletions();
+		IoContext::getWorkerIo()->reapAndScheduleCompletions();
 	}
 
 	return scheduler.nextFiber();
diff --git a/emper/io/GlobalIoContext.cpp b/emper/io/GlobalIoContext.cpp
index 9c68f5a0..9a463728 100644
--- a/emper/io/GlobalIoContext.cpp
+++ b/emper/io/GlobalIoContext.cpp
@@ -98,7 +98,7 @@ auto GlobalIoContext::globalCompleterFunc(void* arg) -> void* {
 
 				assert(submitted == 1);
 
-				worker_io->reapCompletions<CallerEnvironment::ANYWHERE>();
+				worker_io->reapAndScheduleCompletions<CallerEnvironment::ANYWHERE>();
 			}
 			break;
 
diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp
index 398092d6..addc76d3 100644
--- a/emper/io/IoContext.cpp
+++ b/emper/io/IoContext.cpp
@@ -11,7 +11,7 @@
 #include <cassert>	// for assert
 #include <cerrno>		// for errno, ECANCELED, EBUSY, EAGAIN, EINTR
 #include <cstring>	// for memset
-#include <memory>		// for allocator
+#include <utility>
 #include <vector>
 
 #include "CallerEnvironment.hpp"	// for CallerEnvironment, EMPER, ANYWHERE
@@ -19,7 +19,8 @@
 #include "Debug.hpp"							// for LOGD
 #include "Emper.hpp"							// for DEBUG, IO_URING_SQPOLL
 #include "Fiber.hpp"
-#include "Runtime.hpp"		// for Runtime
+#include "Runtime.hpp"
+#include "emper-common.h"
 #include "io/Future.hpp"	// for Future, operator<<, Future::State
 #include "io/GlobalIoContext.hpp"
 #include "io/Stats.hpp"	 // for Stats, nanoseconds
@@ -35,14 +36,6 @@ namespace emper::io {
 
 enum class PointerTags : uint16_t { Future, Callback };
 
-static inline auto castIfCallback(TaggedPtr ptr) -> Future::Callback * {
-	if (ptr.getTag() == static_cast<uint16_t>(PointerTags::Callback)) {
-		return ptr.getPtr<Future::Callback>();
-	}
-
-	return nullptr;
-}
-
 thread_local IoContext *IoContext::workerIo = nullptr;
 
 auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> unsigned {
@@ -120,7 +113,7 @@ void IoContext::submit(Future &future) {
 		TIME_NS(
 				{
 					do {
-						reapCompletions();
+						reapAndScheduleCompletions();
 					} while ((submitted = io_uring_submit(&ring)) == -EBUSY);
 				},
 				stats.record_io_submit_full_cq);
@@ -159,7 +152,7 @@ void IoContext::submit(Future &future) {
 	// Immediate offloading can be enforced by setting the IOSQE_ASYNC flag in the sqe.
 	// Try to reap a possible synchronous completion if we are on a worker's io_uring.
 	if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
-		reapCompletions<callerEnvironment>();
+		reapAndScheduleCompletions();
 	}
 }
 
@@ -168,10 +161,28 @@ template void IoContext::submit<CallerEnvironment::EMPER>(Future &future);
 template void IoContext::submit<CallerEnvironment::ANYWHERE>(Future &future);
 
 template <CallerEnvironment callerEnvironment>
-void IoContext::reapCompletions() {
+auto IoContext::reapCompletions() -> std::vector<Fiber *> {
+	// vector returned containing all reaped completions
+	std::vector<Fiber *> continuationFibers;
+
+	uint32_t maxRaceFreeCompleterAttempts = 1;
+
+	// this label is not used for callerEnvironment::ANYWHERE and thus has to be
+	// annotated with ATTR_UNUSED
+reap_cqes:
+	ATTR_UNUSED;
+
 	// Someone else is currently reaping completions
-	if (unlikely(!cq_lock.try_lock())) {
-		return;
+	if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
+		if (unlikely(!cq_lock.try_lock())) {
+			LOGD("worker unsuccessful try_lock");
+			return continuationFibers;
+		}
+	} else {
+		if (!cq_lock.try_lock_or_increment()) {
+			LOGD("Global completer unsuccessful try_lock_or_increment");
+			return continuationFibers;
+		}
 	}
 
 	// never reap completions on the global IoContext
@@ -182,8 +193,10 @@ void IoContext::reapCompletions() {
 	struct io_uring_cqe *cqe;
 	unsigned count = 0;
 
-	// vector used to batch all completions scheduled to the AnywhereQueue
-	std::vector<Fiber *> continuationFibers;
+	using Completion = std::pair<uint32_t, TaggedPtr>;
+	// vector to store seen cqes to make the critical section
+	// where cq_lock is held as small as possible
+	std::vector<Completion> reapedCompletions;
 
 	int err = io_uring_peek_cqe(&ring, &cqe);
 	if (err) {
@@ -204,43 +217,7 @@ void IoContext::reapCompletions() {
 			continue;
 		}
 
-		auto *callback = castIfCallback(tptr);
-		if (callback) {
-			LOGD("Schedule new callback fiber for " << callback);
-			auto *callbackFiber = Fiber::from([&c = *callback, res = cqe->res] {
-				c(res);
-				delete &c;
-			});
-			if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
-				runtime.schedule(*callbackFiber);
-			} else {
-				if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) {
-					continuationFibers.push_back(callbackFiber);
-				} else {
-					runtime.scheduleFromAnywhere(*callbackFiber);
-				}
-			}
-			continue;
-		}
-
-		auto *future = tptr.getPtr<Future>();
-
-		// assert that the future was previously in the uringFutureSet
-		assert(uringFutureSet.erase(future) > 0);
-
-		future->recordCompletion(stats, cqe->res);
-		if constexpr (callerEnvironment == EMPER) {
-			future->complete(cqe->res);
-		} else {
-			if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) {
-				Fiber *continuation = future->completeAndGetContinuation(cqe->res);
-				if (continuation) {
-					continuationFibers.push_back(continuation);
-				}
-			} else {
-				future->completeFromAnywhere(cqe->res);
-			}
-		}
+		reapedCompletions.emplace_back(cqe->res, tptr);
 	}
 
 	LOGD("got " << count << " cqes from the io_uring");
@@ -251,22 +228,86 @@ void IoContext::reapCompletions() {
 		reqs_in_uring -= count;
 	}
 
+unlock:
+	uint32_t globalCompleterAttempts = cq_lock.unlock();
+
+	// A naive try lock protecting a worker's IoContext's cq is racy.
+	// While a worker is holding the lock additional completions could arrive
+	// which the worker does not observe because it could be already finished iterating.
+	// In the case that the worker still holds the lock preventing the globalCompleter
+	// from reaping the additional completions we have a lost wakeup possibly leading
+	// to a completely sleeping runtime with runnable completions in a worker's IoContext.
+
+	// To prevent this race the cq_lock counts the unsuccessful tries from
+	// the globalCompleter.
+	// If a worker observes that the globalCompleter tried to reapCompletions
+	// more than twice we know that a lost wakeup could have occurred and we try to
+	// reap again.
+
+	// In the case a lost wakeup was possible we schedule our reaped cqes
+	// and try again.
+
+	// On all cq iteration after the first we expect no globalCompleterAttempt
+	// or in other words a single globalCompleterAttempt attempt means
+	// additional completions arrive and lost wakeup was possible again.
+
+	stats.record_reaps<callerEnvironment>(count);
+
+	for (auto &completion : reapedCompletions) {
+		auto res = completion.first;
+		auto tptr = completion.second;
+
+		auto tag = static_cast<PointerTags>(tptr.getTag());
+		switch (tag) {
+			case PointerTags::Callback: {
+				auto *callback = tptr.getPtr<Future::Callback>();
+				LOGD("Create new callback fiber for " << callback);
+				auto *callbackFiber = Fiber::from([&c = *callback, res] {
+					c(res);
+					delete &c;
+				});
+
+				continuationFibers.push_back(callbackFiber);
+			} break;
+
+			case PointerTags::Future: {
+				auto *future = tptr.getPtr<Future>();
+				// assert that the future was previously in the uringFutureSet
+				assert(uringFutureSet.erase(future) > 0);
+
+				future->recordCompletion(stats, res);
+				Fiber *continuation = future->completeAndGetContinuation(res);
+				if (continuation) {
+					continuationFibers.push_back(continuation);
+				}
+			} break;
+
+			default:
+				DIE_MSG("Unknown pointer tag encountered: " << (int)tag);
+				break;
+		}
+	}
+
+	// check if lost wakeup was possible
 	if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
-		stats.record_worker_reaps(count);
-	} else {
-		// actually schedule all completion fibers
-		runtime.scheduleFromAnywhere(continuationFibers.begin(), continuationFibers.end());
-		stats.record_completer_reaps(count);
+		if (unlikely(globalCompleterAttempts > maxRaceFreeCompleterAttempts)) {
+			// schedule all already collected continuation fibers
+			runtime.schedule(continuationFibers.begin(), continuationFibers.end());
+			continuationFibers.clear();
+
+			// In all CQ iteration after the first we expect no further globalCompleter attempts
+			maxRaceFreeCompleterAttempts = 0;
+			goto reap_cqes;
+		}
 	}
 
-unlock:
-	cq_lock.unlock();
+	return continuationFibers;
 }
 
 // Show the compiler our template incarnations this is needed again because
 // reapCompletions<CallerEnvironment::ANYWHERE> is now called from GlobalIoContext.cpp
-template void IoContext::reapCompletions<CallerEnvironment::ANYWHERE>();
-template void IoContext::reapCompletions<CallerEnvironment::EMPER>();
+template auto IoContext::reapCompletions<CallerEnvironment::ANYWHERE>() -> std::vector<Fiber *>;
+template auto IoContext::reapCompletions<CallerEnvironment::EMPER>() -> std::vector<Fiber *>;
 
 IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) {
 	struct io_uring_params params;
diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp
index 64382fea..0a2e92f8 100644
--- a/emper/io/IoContext.hpp
+++ b/emper/io/IoContext.hpp
@@ -9,15 +9,18 @@
 #include <cstddef>		 // for size_t
 #include <cstdint>		 // for uint64_t
 #include <functional>	 // for less
-#include <memory>			 // for allocator
+#include <vector>
 
 #include "CallerEnvironment.hpp"	// for CallerEnvironment, EMPER
+#include "Common.hpp"
 #include "Debug.hpp"							// for LogSubsystem, LogSubsystem::IO, Logger
 #include "Runtime.hpp"						// for Runtime
 #include "emper-config.h"					// for EMPER_IO_WORKER_URING_ENTRIES
 #include "io/Stats.hpp"						// for Stats
-#include "lib/adt/AtomicTryLock.hpp"
 #include "lib/adt/LockedSet.hpp"	// for LockedSet
+#include "lib/sync/CountingTryLock.hpp"
+
+class Fiber;
 
 namespace emper::io {
 class Future;
@@ -37,7 +40,7 @@ class IoContext : public Logger<LogSubsystem::IO> {
 
 	static thread_local IoContext *workerIo;
 	// TryLock protecting the completion queue of ring.
-	lib::adt::AtomicTryLock cq_lock;
+	ALIGN_TO_CACHE_LINE lib::sync::CountingTryLock cq_lock;
 	struct io_uring ring;
 
 	// In a worker's IoContext This eventfd is registered with the io_uring to get completion
@@ -117,10 +120,28 @@ class IoContext : public Logger<LogSubsystem::IO> {
 	template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
 	void submit(Future &future);
 
+	/**
+	 * @brief Collect all fibers waiting on completed IO
+	 *
+	 * @return A vector containing all runnable Fibers
+	 */
+	template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
+	auto reapCompletions() -> std::vector<Fiber *>;
+
 	/**
 	 * @brief Schedule all fibers waiting on completed IO
 	 */
 	template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
-	void reapCompletions();
+	void reapAndScheduleCompletions() {
+		auto completions = reapCompletions();
+		for (auto it = completions.begin(); it != completions.end(); ++it) {
+			Fiber *fiber = *it;
+			if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
+				runtime.schedule(*fiber);
+			} else {
+				runtime.scheduleFromAnywhere(*fiber);
+			}
+		}
+	}
 };
 }	 // namespace emper::io
diff --git a/emper/io/Stats.hpp b/emper/io/Stats.hpp
index 56f0c3c7..29ff2a1a 100644
--- a/emper/io/Stats.hpp
+++ b/emper/io/Stats.hpp
@@ -12,6 +12,7 @@
 #include <map>			 // for map, map<>::value_compare
 #include <vector>		 // for vector
 
+#include "CallerEnvironment.hpp"
 #include "Debug.hpp"				 // for LOGW
 #include "Emper.hpp"				 // for STATS
 #include "emper-common.h"		 // for workerid_t
@@ -198,18 +199,17 @@ class Stats {
 		io_submit_full_cq_running_mean += diff;
 	}
 
-	inline void record_completer_reaps(unsigned count) {
+	template <CallerEnvironment callerEnvironment>
+	inline void record_reaps(unsigned count) {
 		RETURN_IF_NO_STATS();
 
-		completer_reap++;
-		completer_reaped_completions += count;
-	}
-
-	inline void record_worker_reaps(unsigned count) {
-		RETURN_IF_NO_STATS();
-
-		worker_reap++;
-		worker_reaped_completions += count;
+		if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
+			worker_reap++;
+			worker_reaped_completions += count;
+		} else {
+			completer_reap++;
+			completer_reaped_completions += count;
+		}
 	}
 
 	friend auto operator<<(std::ostream& os, const Stats& s) -> std::ostream&;
diff --git a/emper/lib/adt/AtomicTryLock.hpp b/emper/lib/adt/AtomicTryLock.hpp
deleted file mode 100644
index 222d80e7..00000000
--- a/emper/lib/adt/AtomicTryLock.hpp
+++ /dev/null
@@ -1,25 +0,0 @@
-// SPDX-License-Identifier: LGPL-3.0-or-later
-// Copyright © 2020 Florian Fischer
-#pragma once
-
-#include <atomic>
-
-#include "Common.hpp"
-
-namespace emper::lib::adt {
-
-class ALIGN_TO_CACHE_LINE AtomicTryLock {
- private:
-	std::atomic<bool> locked;
-
- public:
-	AtomicTryLock(bool locked = false) : locked(locked) {}
-
-	auto try_lock() -> bool {
-		bool previously_locked = locked.exchange(true, std::memory_order_acquire);
-		return !previously_locked;
-	}
-
-	void unlock() { locked.store(false, std::memory_order_release); }
-};
-}	 // namespace emper::lib::adt
diff --git a/emper/lib/sync/CountingTryLock.hpp b/emper/lib/sync/CountingTryLock.hpp
new file mode 100644
index 00000000..cc67ec1b
--- /dev/null
+++ b/emper/lib/sync/CountingTryLock.hpp
@@ -0,0 +1,67 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2021 Florian Fischer
+#pragma once
+
+#include <atomic>
+#include <climits>
+
+#include "Common.hpp"
+
+namespace emper::lib::sync {
+
+class CountingTryLock {
+ private:
+	// The lower 32-bit are used as the actual lock
+	// The higher 32-bit are used as a counter which is incremented if
+	// try_lock_or_increment fails to acquire the lock.
+
+	// Layout of our lock union:
+	// |--------------  countingLock  --------------|
+	//                     |------- counter  -------|
+	// | lock |
+	// 0    sizeof(std::atomic<bool>)              64
+	// 0                COUNTER_SHIFT              64
+	union {
+		std::atomic<uint64_t> countingLock;
+		std::atomic<bool> lock;
+	};
+
+	static const int COUNTER_SHIFT = 32;
+	static_assert(sizeof(std::atomic<bool>) * CHAR_BIT < COUNTER_SHIFT);
+
+	static const uint64_t LOCKED = 1;
+	static const uint64_t UNLOCKED = 0;
+
+ public:
+	CountingTryLock(bool locked = false) : countingLock(locked ? LOCKED : UNLOCKED) {}
+
+	[[nodiscard]] auto try_lock() -> bool { return !lock.exchange(true, std::memory_order_acquire); }
+
+	[[nodiscard]] auto try_lock_or_increment() -> bool {
+		uint64_t oldVal, newVal;
+		oldVal = countingLock.load(std::memory_order_relaxed);
+		for (;;) {
+			// currently unlocked -> try to lock
+			if (oldVal == UNLOCKED) {
+				newVal = LOCKED;
+				// currently locked -> increment the counter
+			} else {
+				newVal = oldVal + (1L << COUNTER_SHIFT);
+			}
+
+			if (countingLock.compare_exchange_weak(oldVal, newVal, std::memory_order_acquire,
+																						 std::memory_order_relaxed)) {
+				break;
+			}
+		}
+
+		return oldVal == UNLOCKED;
+	}
+
+	// release the lock, zero and return the counter
+	auto unlock() -> uint32_t {
+		uint64_t oldVal = countingLock.exchange(UNLOCKED, std::memory_order_release);
+		return static_cast<uint32_t>(oldVal >> COUNTER_SHIFT);
+	}
+};
+}	 // namespace emper::lib::sync
-- 
GitLab