From 96a846a1a893f7ed440eed24e55a623d03bc1eca Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fischer@muhq.space>
Date: Tue, 21 Dec 2021 17:50:26 +0100
Subject: [PATCH] add semaphore using futex_waitv(2) supporting notify_specific

The SpuriousFutex2Semaphore is able to notify a specific worker
by using two futexes two wait on.

One working like a normal semaphore used for global non specific
notifications via notify() and notify_many().

And a second one per worker which is based on a SleeperState.
To notify a specific worker we change its SleeperState to Notified
and call FUTEX_WAKE if needed.
---
 .gitlab-ci.yml                                |  10 ++
 emper/lib/meson.build                         |   2 +
 emper/lib/sync/SpuriousFutex2Semaphore.cpp    |  78 +++++++++++++
 emper/lib/sync/SpuriousFutex2Semaphore.hpp    | 105 ++++++++++++++++++
 emper/lib/sync/meson.build                    |   3 +
 .../SemaphoreWorkerSleepStrategy.hpp          |  27 ++++-
 emper/sleep_strategy/SleeperState.hpp         |  16 +++
 meson_options.txt                             |   1 +
 8 files changed, 236 insertions(+), 6 deletions(-)
 create mode 100644 emper/lib/sync/SpuriousFutex2Semaphore.cpp
 create mode 100644 emper/lib/sync/SpuriousFutex2Semaphore.hpp
 create mode 100644 emper/lib/sync/meson.build
 create mode 100644 emper/sleep_strategy/SleeperState.hpp

diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index c2e62475..d068b335 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -223,6 +223,10 @@ clang-tidy:
   variables:
     EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "futex"
 
+.futex2-wakeup-semaphore:
+  variables:
+    EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "futex2"
+
 .locked-wakeup-semaphore:
   variables:
     EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "locked"
@@ -343,6 +347,12 @@ test-futex-wakeup-semaphore:
     - .test
     - .futex-wakeup-semaphore
 
+# TODO: enable this if the CI has linux >= 5.16
+build-futex-wakeup-semaphore:
+  extends:
+    - .build
+    - .futex2-wakeup-semaphore
+
 test-locked-wakeup-semaphore:
   extends:
     - .test
diff --git a/emper/lib/meson.build b/emper/lib/meson.build
index 88b68f89..e59bfa43 100644
--- a/emper/lib/meson.build
+++ b/emper/lib/meson.build
@@ -2,3 +2,5 @@ emper_cpp_sources += files(
   'DebugUtil.cpp',
   'LinuxVersion.cpp',
 )
+
+subdir('sync')
diff --git a/emper/lib/sync/SpuriousFutex2Semaphore.cpp b/emper/lib/sync/SpuriousFutex2Semaphore.cpp
new file mode 100644
index 00000000..964910dd
--- /dev/null
+++ b/emper/lib/sync/SpuriousFutex2Semaphore.cpp
@@ -0,0 +1,78 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2021 Florian Fischer
+#include "lib/sync/SpuriousFutex2Semaphore.hpp"
+
+#include <linux/futex.h>
+#include <unistd.h>
+
+#include <atomic>
+#include <cassert>
+
+#include "Worker.hpp"
+
+#ifndef SYS_futex_waitv
+#define SYS_futex_waitv 449
+#endif
+
+struct futex_waitv {
+	uint64_t val;
+	uint64_t uaddr;
+	uint32_t flags;
+	uint32_t _reserved;
+};
+
+static void init_futex_waitv(struct futex_waitv* waiter, uint64_t val, void* uaddr) {
+	waiter->val = val;
+	waiter->uaddr = reinterpret_cast<uintptr_t>(uaddr);
+	waiter->flags = FUTEX_PRIVATE_FLAG;
+	waiter->_reserved = 0;
+}
+
+namespace emper::lib::sync {
+
+void SpuriousFutex2Semaphore::wait() {
+	const workerid_t workerId = Worker::getCurrentWorkerId();
+	std::atomic<SleeperState>& workerState = states[workerId];
+
+	const SleeperState oldState = workerState.exchange(SleeperState::Sleeping);
+	// Someone has notified us specifically -> skip sleeping.
+	if (oldState == SleeperState::Notified) {
+		workerState.store(SleeperState::Running, std::memory_order_relaxed);
+		return;
+	}
+
+	// Me must have been running.
+	assert(oldState == SleeperState::Running);
+
+	// Decrement the global semaphore count.
+	// See fetch_add comment in SpuriousFutex2Semaphore.hpp.
+	CounterType c = counter.fetch_sub(1, std::memory_order_relaxed) - 1;
+
+	while (c < 0 && workerState.load(std::memory_order_relaxed) == SleeperState::Sleeping) {
+		// NOLINTNEXTLINE(modernize-avoid-c-arrays)
+		struct futex_waitv waiters[2];
+		// Global futex
+		init_futex_waitv(&waiters[0], c, &counter);
+
+		// Specific futex
+		init_futex_waitv(&waiters[1], static_cast<uint64_t>(SleeperState::Sleeping), &workerState);
+
+		long err = syscall(SYS_futex_waitv, waiters, 2, 0, nullptr, 0);
+		// The futex(2) manpage says that applications should check the futex word
+		// again because it is possible to have spurious wakeups.
+		// We can't check the futex value because there is no way to decided
+		// if someone called notify.
+		// A sound generic Semaphore needs to split its atomic counter into
+		// a semaphore value and a waiter count part.
+
+		// Someone called FUTEX_WAIT on either of the two mutexes
+		if (!err) {
+			workerState.store(SleeperState::Running, std::memory_order_relaxed);
+			return;
+		}
+
+		c = getValue();
+	}
+}
+
+}	 // namespace emper::lib::sync
diff --git a/emper/lib/sync/SpuriousFutex2Semaphore.hpp b/emper/lib/sync/SpuriousFutex2Semaphore.hpp
new file mode 100644
index 00000000..c321ad02
--- /dev/null
+++ b/emper/lib/sync/SpuriousFutex2Semaphore.hpp
@@ -0,0 +1,105 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2021 Florian Fischer
+#pragma once
+
+#include <linux/futex.h>
+#include <syscall.h>
+#include <unistd.h>
+
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <new>
+
+#include "Common.hpp"
+#include "emper-common.h"
+#include "lib/LinuxVersion.hpp"
+#include "sleep_strategy/SleeperState.hpp"
+
+namespace emper::lib::sync {
+
+/**
+ * @brief A semaphore implementation using two futexes to wait
+ *
+ * SpuriousFutex2Semaphore behaves like a normal Semaphore when using
+ * only notify() and notify_many(count).
+ * But it also supports notify_specific(workerId).
+ * The notify_specific(workerId) is implemented with a exclusive futex per
+ * worker. The worker specific futexes are a cache line exclusive SleeperState
+ * per worker. A worker calling wait will use futex_waitv(2) introduced in linux
+ * v5.16 to wait on both futexes.
+ *
+ * To notify a specific Worker we first set its SleeperState to Notified
+ * and check the previous value:
+ * - If it was running it will skip the next wait() and reset it
+ * - If it was sleeping we have to call FUTEX_WAKE on the futex
+ *   - The futex_waitv call will fail if the notifier races the futex_waitv call
+ *   - If futex_waitv wins the race the notifier's FUTEX_WAKE will wake it
+ *
+ * ATTENTION: This semaphore implementation is prone to spurious wakeups!
+ * SpuriousFutex2Semaphore::wait() may return spurious without a call to
+ * SpuriousFutex2Semaphore::notify().
+ * Use this class only where it is safe to have spurious wakeups.
+ * It is designed to wake up and suspend worker threads it does not provided any
+ * safety guaranties when protecting from data races.
+ *
+ * TODO: Improve that sleep -> notifySpecific -> sleep will decrease the sem counter
+ *       twice with only one sleeper resulting in unnecessary FUTEX_WAKE calls.
+ */
+class SpuriousFutex2Semaphore {
+ public:
+	// All futexes are currently 32 bits in size
+	using CounterType = int32_t;
+
+ private:
+	using SleeperState = emper::sleep_strategy::SleeperState;
+	// States of the worker threads
+	std::atomic<SleeperState>* states;
+
+	// >= 0 means no waiters
+	// < 0 means there are waiters
+	std::atomic<CounterType> counter;
+
+ public:
+	SpuriousFutex2Semaphore(CounterType counter = 0) : counter(counter) {
+		if (EMPER_LINUX_LT("5.16.0"))
+			DIE_MSG("SpuriousFutex2Semaphore needs futex_waitv(2) introduced in linux v5.16");
+	}
+
+	~SpuriousFutex2Semaphore() { delete states; }
+
+	void init(workerid_t workerCount) {
+		// Make each worker specific SleeperState life in its own cache line
+		states = new (std::align_val_t(CACHE_LINE_SIZE)) std::atomic<SleeperState>[workerCount];
+	}
+
+	[[nodiscard]] inline auto getValue() const -> CounterType {
+		return counter.load(std::memory_order_relaxed);
+	}
+
+	inline void notify_many(CounterType count) {
+		// Using std::memory_order_relaxed is fine since we don't
+		// give any memory synchronization guaranties because of the
+		// possible spurious wakeups
+		CounterType c = counter.fetch_add(count, std::memory_order_relaxed);
+		if (c < 0) {
+			syscall(SYS_futex, &counter, FUTEX_WAKE, count, 0, 0, 0);
+		}
+	}
+
+	inline void notify() { notify_many(1); }
+
+	inline void notifySpecific(workerid_t workerId) {
+		std::atomic<SleeperState>& workerState = states[workerId];
+		SleeperState oldState = workerState.exchange(SleeperState::Notified, std::memory_order_relaxed);
+
+		// We are responsible to wake the futex
+		if (oldState == SleeperState::Sleeping) {
+			syscall(SYS_futex, &workerState, FUTEX_WAKE, 1, 0, 0, 0);
+		}
+	}
+
+	void wait();
+};
+
+}	 // namespace emper::lib::sync
diff --git a/emper/lib/sync/meson.build b/emper/lib/sync/meson.build
new file mode 100644
index 00000000..c4476cae
--- /dev/null
+++ b/emper/lib/sync/meson.build
@@ -0,0 +1,3 @@
+emper_cpp_sources += files(
+  'SpuriousFutex2Semaphore.cpp',
+)
diff --git a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp
index 142ccffa..e07d1a2a 100644
--- a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp
+++ b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp
@@ -25,6 +25,10 @@
 #include "lib/sync/SpuriousFutexSemaphore.hpp"
 #endif
 
+#ifdef EMPER_FUTEX2_WAKEUP_SEMAPHORE
+#include "lib/sync/SpuriousFutex2Semaphore.hpp"
+#endif
+
 class Runtime;
 
 namespace emper::sleep_strategy {
@@ -45,6 +49,9 @@ class AbstractSemaphoreWorkerSleepStrategy
 	// check if the used Semaphore provides a notifySpecific implementation
 	static constexpr bool semHasNotifySpecific = requires(Sem s) { s.notifySpecific(0); };
 
+	// check if the used Semaphore needs initialization
+	static constexpr bool semNeedsInit = requires(Sem s) { s.init(0); };
+
 	// should the generic notifySpecific implementation be used
 	static constexpr bool useGenericNotifySpecificImpl = !semHasNotifySpecific;
 
@@ -144,6 +151,10 @@ class AbstractSemaphoreWorkerSleepStrategy
 		if constexpr (useGenericNotifySpecificImpl) {
 			notifiedFlags = new std::atomic<bool>[workerCount];
 		}
+
+		if constexpr (semNeedsInit) {
+			wakeupSem.init(workerCount);
+		}
 	}
 
 	~AbstractSemaphoreWorkerSleepStrategy() {
@@ -212,16 +223,20 @@ class AbstractSemaphoreWorkerSleepStrategy
 };
 
 #ifdef EMPER_LOCKED_WAKEUP_SEMAPHORE
-using ::emper::lib::sync::LockedSemaphore;
-using SemaphoreWorkerSleepStrategy = AbstractSemaphoreWorkerSleepStrategy<LockedSemaphore>;
+using SemaphoreWorkerSleepStrategy =
+		AbstractSemaphoreWorkerSleepStrategy<::emper::lib::sync::LockedSemaphore>;
 
 #elif defined EMPER_POSIX_WAKEUP_SEMAPHORE
-using ::emper::lib::sync::PosixSemaphore;
-using SemaphoreWorkerSleepStrategy = AbstractSemaphoreWorkerSleepStrategy<PosixSemaphore>;
+using SemaphoreWorkerSleepStrategy =
+		AbstractSemaphoreWorkerSleepStrategy<::emper::lib::sync::PosixSemaphore>;
 
 #elif defined EMPER_FUTEX_WAKEUP_SEMAPHORE
-using ::emper::lib::sync::SpuriousFutexSemaphore;
-using SemaphoreWorkerSleepStrategy = AbstractSemaphoreWorkerSleepStrategy<SpuriousFutexSemaphore>;
+using SemaphoreWorkerSleepStrategy =
+		AbstractSemaphoreWorkerSleepStrategy<::emper::lib::sync::SpuriousFutexSemaphore>;
+
+#elif defined EMPER_FUTEX2_WAKEUP_SEMAPHORE
+using SemaphoreWorkerSleepStrategy =
+		AbstractSemaphoreWorkerSleepStrategy<::emper::lib::sync::SpuriousFutex2Semaphore>;
 
 #else
 #error Unknown WorkerSleepSemaphore implementation
diff --git a/emper/sleep_strategy/SleeperState.hpp b/emper/sleep_strategy/SleeperState.hpp
new file mode 100644
index 00000000..937b6c41
--- /dev/null
+++ b/emper/sleep_strategy/SleeperState.hpp
@@ -0,0 +1,16 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2021 Florian Fischer
+#pragma once
+
+namespace emper::sleep_strategy {
+
+/**
+ * @brief States a worker can have regarding the sleep strategy
+ */
+enum class SleeperState {
+	Running,	/*!< The worker is in its dispatch loop */
+	Sleeping, /*!< The worker is sleeping in the sleep strategy */
+	Notified, /*!< The worker was notified an should (re-) execute its dispatch loop */
+};
+
+}	 // namespace emper::sleep_strategy
diff --git a/meson_options.txt b/meson_options.txt
index 96b76c34..b10a3b62 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -57,6 +57,7 @@ option(
 	'posix',
 	'locked',
 	'futex',
+	'futex2',
   ],
   value: 'posix',
   description: 'Semaphore implementation to suspend/wakeup workers',
-- 
GitLab