Skip to content
Snippets Groups Projects
Commit 9d627462 authored by Florian Fischer's avatar Florian Fischer Committed by Florian Fischer
Browse files

fix futex usage and shrinking counter

* sleeping workers have decremented the semaphore count before sleeping.
  But if they are notified specifically the semaphore counter is
  decremented excessively
  This results in unnecessary suspension/notifications because the
  counter is out of sync with the actual waiter count.
* waitv expects that the futex size is specified in the futex flags
* wake sleepers using FUTEX_PRIVATE_FLAG
* futex_waitv returns the index of the woken futex -> wake on ret > -1
* add debug output and asserts
parent 88b015be
No related branches found
No related tags found
1 merge request!296fix Future::cancel with new Scheduler::scheduleOn(fiber, workerId)
......@@ -7,6 +7,7 @@
#include <atomic>
#include <cassert>
#include <cerrno>
#include "Worker.hpp"
......@@ -14,6 +15,10 @@
#define SYS_futex_waitv 449
#endif
#ifndef FUTEX_32
#define FUTEX_32 2
#endif
struct futex_waitv {
uint64_t val;
uint64_t uaddr;
......@@ -24,7 +29,7 @@ struct futex_waitv {
static void init_futex_waitv(struct futex_waitv* waiter, uint64_t val, void* uaddr) {
waiter->val = val;
waiter->uaddr = reinterpret_cast<uintptr_t>(uaddr);
waiter->flags = FUTEX_PRIVATE_FLAG;
waiter->flags = FUTEX_PRIVATE_FLAG | FUTEX_32;
waiter->_reserved = 0;
}
......@@ -47,6 +52,8 @@ void SpuriousFutex2Semaphore::wait() {
// Decrement the global semaphore count.
// See fetch_add comment in SpuriousFutex2Semaphore.hpp.
CounterType c = counter.fetch_sub(1, std::memory_order_relaxed) - 1;
assert(c >= -workerCount);
LOGD("Decrement counter to: " << c);
while (c < 0 && workerState.load(std::memory_order_relaxed) == SleeperState::Sleeping) {
// NOLINTNEXTLINE(modernize-avoid-c-arrays)
......@@ -57,7 +64,15 @@ void SpuriousFutex2Semaphore::wait() {
// Specific futex
init_futex_waitv(&waiters[1], static_cast<uint64_t>(SleeperState::Sleeping), &workerState);
long err = syscall(SYS_futex_waitv, waiters, 2, 0, nullptr, 0);
LOGD("Sleep on both futex");
long ret = syscall(SYS_futex_waitv, waiters, 2, 0, nullptr, 0);
if (ret == -1) {
assert(errno == EAGAIN);
c = getValue();
continue;
}
// Someone called FUTEX_WAIT on either of the two futex
// The futex(2) manpage says that applications should check the futex word
// again because it is possible to have spurious wakeups.
// We can't check the futex value because there is no way to decided
......@@ -65,14 +80,10 @@ void SpuriousFutex2Semaphore::wait() {
// A sound generic Semaphore needs to split its atomic counter into
// a semaphore value and a waiter count part.
// Someone called FUTEX_WAIT on either of the two mutexes
if (!err) {
workerState.store(SleeperState::Running, std::memory_order_relaxed);
return;
}
c = getValue();
LOGD("futex_waitv returned with: " << ret);
break;
}
workerState.store(SleeperState::Running, std::memory_order_relaxed);
}
} // namespace emper::lib::sync
......@@ -10,8 +10,10 @@
#include <cstdint>
#include <memory>
#include <new>
#include <ostream>
#include "Common.hpp"
#include "Debug.hpp"
#include "emper-common.h"
#include "lib/LinuxVersion.hpp"
#include "sleep_strategy/SleeperState.hpp"
......@@ -42,11 +44,8 @@ namespace emper::lib::sync {
* Use this class only where it is safe to have spurious wakeups.
* It is designed to wake up and suspend worker threads it does not provided any
* safety guaranties when protecting from data races.
*
* TODO: Improve that sleep -> notifySpecific -> sleep will decrease the sem counter
* twice with only one sleeper resulting in unnecessary FUTEX_WAKE calls.
*/
class SpuriousFutex2Semaphore {
class SpuriousFutex2Semaphore : Logger<LogSubsystem::SLEEP_S> {
public:
// All futexes are currently 32 bits in size
using CounterType = int32_t;
......@@ -60,6 +59,8 @@ class SpuriousFutex2Semaphore {
// < 0 means there are waiters
std::atomic<CounterType> counter;
int workerCount;
public:
SpuriousFutex2Semaphore(CounterType counter = 0) : counter(counter) {
if (EMPER_LINUX_LT("5.16.0"))
......@@ -69,7 +70,8 @@ class SpuriousFutex2Semaphore {
~SpuriousFutex2Semaphore() { delete states; }
void init(workerid_t workerCount) {
// Make each worker specific SleeperState life in its own cache line
this->workerCount = workerCount;
// Allcoate each worker specific SleeperState in its own cache line
states = new (std::align_val_t(CACHE_LINE_SIZE)) std::atomic<SleeperState>[workerCount];
}
......@@ -82,8 +84,10 @@ class SpuriousFutex2Semaphore {
// give any memory synchronization guaranties because of the
// possible spurious wakeups
CounterType c = counter.fetch_add(count, std::memory_order_relaxed);
LOGD("Futex2Sem: notify_many(" << count << ") inc counter to: " << c + count);
if (c < 0) {
syscall(SYS_futex, &counter, FUTEX_WAKE, count, 0, 0, 0);
LOGD("Futex2Sem: notify_many(" << count << ") wake " << count << " on " << &counter);
syscall(SYS_futex, &counter, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count, 0, 0, 0);
}
}
......@@ -95,7 +99,12 @@ class SpuriousFutex2Semaphore {
// We are responsible to wake the futex
if (oldState == SleeperState::Sleeping) {
syscall(SYS_futex, &workerState, FUTEX_WAKE, 1, 0, 0, 0);
// The sleeper has decremented the semaphore count before sleeping.
// We have to increment it when not using the counter to wakeup
// to prevent the counter from shrinking uncontrolled.
counter.fetch_add(1, std::memory_order_relaxed);
LOGD("increment counter to " << counter << " and wake specific");
syscall(SYS_futex, &workerState, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, 0, 0, 0);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment