From 9d62746219d36e9a053b87a22286bc1336c67b7a Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fl.fischer@fau.de>
Date: Tue, 11 Jan 2022 13:06:29 +0100
Subject: [PATCH] fix futex usage and shrinking counter

* sleeping workers have decremented the semaphore count before sleeping.
  But if they are notified specifically the semaphore counter is
  decremented excessively
  This results in unnecessary suspension/notifications because the
  counter is out of sync with the actual waiter count.
* waitv expects that the futex size is specified in the futex flags
* wake sleepers using FUTEX_PRIVATE_FLAG
* futex_waitv returns the index of the woken futex -> wake on ret > -1
* add debug output and asserts
---
 emper/lib/sync/SpuriousFutex2Semaphore.cpp | 29 +++++++++++++++-------
 emper/lib/sync/SpuriousFutex2Semaphore.hpp | 23 +++++++++++------
 2 files changed, 36 insertions(+), 16 deletions(-)

diff --git a/emper/lib/sync/SpuriousFutex2Semaphore.cpp b/emper/lib/sync/SpuriousFutex2Semaphore.cpp
index 964910dd..41abdc8b 100644
--- a/emper/lib/sync/SpuriousFutex2Semaphore.cpp
+++ b/emper/lib/sync/SpuriousFutex2Semaphore.cpp
@@ -7,6 +7,7 @@
 
 #include <atomic>
 #include <cassert>
+#include <cerrno>
 
 #include "Worker.hpp"
 
@@ -14,6 +15,10 @@
 #define SYS_futex_waitv 449
 #endif
 
+#ifndef FUTEX_32
+#define FUTEX_32 2
+#endif
+
 struct futex_waitv {
 	uint64_t val;
 	uint64_t uaddr;
@@ -24,7 +29,7 @@ struct futex_waitv {
 static void init_futex_waitv(struct futex_waitv* waiter, uint64_t val, void* uaddr) {
 	waiter->val = val;
 	waiter->uaddr = reinterpret_cast<uintptr_t>(uaddr);
-	waiter->flags = FUTEX_PRIVATE_FLAG;
+	waiter->flags = FUTEX_PRIVATE_FLAG | FUTEX_32;
 	waiter->_reserved = 0;
 }
 
@@ -47,6 +52,8 @@ void SpuriousFutex2Semaphore::wait() {
 	// Decrement the global semaphore count.
 	// See fetch_add comment in SpuriousFutex2Semaphore.hpp.
 	CounterType c = counter.fetch_sub(1, std::memory_order_relaxed) - 1;
+	assert(c >= -workerCount);
+	LOGD("Decrement counter to: " << c);
 
 	while (c < 0 && workerState.load(std::memory_order_relaxed) == SleeperState::Sleeping) {
 		// NOLINTNEXTLINE(modernize-avoid-c-arrays)
@@ -57,7 +64,15 @@ void SpuriousFutex2Semaphore::wait() {
 		// Specific futex
 		init_futex_waitv(&waiters[1], static_cast<uint64_t>(SleeperState::Sleeping), &workerState);
 
-		long err = syscall(SYS_futex_waitv, waiters, 2, 0, nullptr, 0);
+		LOGD("Sleep on both futex");
+		long ret = syscall(SYS_futex_waitv, waiters, 2, 0, nullptr, 0);
+		if (ret == -1) {
+			assert(errno == EAGAIN);
+			c = getValue();
+			continue;
+		}
+		// Someone called FUTEX_WAIT on either of the two futex
+
 		// The futex(2) manpage says that applications should check the futex word
 		// again because it is possible to have spurious wakeups.
 		// We can't check the futex value because there is no way to decided
@@ -65,14 +80,10 @@ void SpuriousFutex2Semaphore::wait() {
 		// A sound generic Semaphore needs to split its atomic counter into
 		// a semaphore value and a waiter count part.
 
-		// Someone called FUTEX_WAIT on either of the two mutexes
-		if (!err) {
-			workerState.store(SleeperState::Running, std::memory_order_relaxed);
-			return;
-		}
-
-		c = getValue();
+		LOGD("futex_waitv returned with: " << ret);
+		break;
 	}
+	workerState.store(SleeperState::Running, std::memory_order_relaxed);
 }
 
 }	 // namespace emper::lib::sync
diff --git a/emper/lib/sync/SpuriousFutex2Semaphore.hpp b/emper/lib/sync/SpuriousFutex2Semaphore.hpp
index c321ad02..ac369e3e 100644
--- a/emper/lib/sync/SpuriousFutex2Semaphore.hpp
+++ b/emper/lib/sync/SpuriousFutex2Semaphore.hpp
@@ -10,8 +10,10 @@
 #include <cstdint>
 #include <memory>
 #include <new>
+#include <ostream>
 
 #include "Common.hpp"
+#include "Debug.hpp"
 #include "emper-common.h"
 #include "lib/LinuxVersion.hpp"
 #include "sleep_strategy/SleeperState.hpp"
@@ -42,11 +44,8 @@ namespace emper::lib::sync {
  * Use this class only where it is safe to have spurious wakeups.
  * It is designed to wake up and suspend worker threads it does not provided any
  * safety guaranties when protecting from data races.
- *
- * TODO: Improve that sleep -> notifySpecific -> sleep will decrease the sem counter
- *       twice with only one sleeper resulting in unnecessary FUTEX_WAKE calls.
  */
-class SpuriousFutex2Semaphore {
+class SpuriousFutex2Semaphore : Logger<LogSubsystem::SLEEP_S> {
  public:
 	// All futexes are currently 32 bits in size
 	using CounterType = int32_t;
@@ -60,6 +59,8 @@ class SpuriousFutex2Semaphore {
 	// < 0 means there are waiters
 	std::atomic<CounterType> counter;
 
+	int workerCount;
+
  public:
 	SpuriousFutex2Semaphore(CounterType counter = 0) : counter(counter) {
 		if (EMPER_LINUX_LT("5.16.0"))
@@ -69,7 +70,8 @@ class SpuriousFutex2Semaphore {
 	~SpuriousFutex2Semaphore() { delete states; }
 
 	void init(workerid_t workerCount) {
-		// Make each worker specific SleeperState life in its own cache line
+		this->workerCount = workerCount;
+		// Allcoate each worker specific SleeperState in its own cache line
 		states = new (std::align_val_t(CACHE_LINE_SIZE)) std::atomic<SleeperState>[workerCount];
 	}
 
@@ -82,8 +84,10 @@ class SpuriousFutex2Semaphore {
 		// give any memory synchronization guaranties because of the
 		// possible spurious wakeups
 		CounterType c = counter.fetch_add(count, std::memory_order_relaxed);
+		LOGD("Futex2Sem: notify_many(" << count << ") inc counter to: " << c + count);
 		if (c < 0) {
-			syscall(SYS_futex, &counter, FUTEX_WAKE, count, 0, 0, 0);
+			LOGD("Futex2Sem: notify_many(" << count << ") wake " << count << " on " << &counter);
+			syscall(SYS_futex, &counter, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count, 0, 0, 0);
 		}
 	}
 
@@ -95,7 +99,12 @@ class SpuriousFutex2Semaphore {
 
 		// We are responsible to wake the futex
 		if (oldState == SleeperState::Sleeping) {
-			syscall(SYS_futex, &workerState, FUTEX_WAKE, 1, 0, 0, 0);
+			// The sleeper has decremented the semaphore count before sleeping.
+			// We have to increment it when not using the counter to wakeup
+			// to prevent the counter from shrinking uncontrolled.
+			counter.fetch_add(1, std::memory_order_relaxed);
+			LOGD("increment counter to " << counter << " and wake specific");
+			syscall(SYS_futex, &workerState, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, 0, 0, 0);
 		}
 	}
 
-- 
GitLab