Skip to content
Snippets Groups Projects
Commit 4237660f authored by Florian Fischer's avatar Florian Fischer
Browse files

use IORING_OP_MSG_RING instead of eventfds to notify specific

With Linux 5.18 io_uring supports sending notifications between io_urings.
This removes the need of writing to a specific eventfd to notify
specific sleeping workers.
This greatly reduces the complexity of the sued algorithm because
we do not longer need to prepare two sleeps in an atomic way.

The notify specific algorithm no can be similar to the one used by the
semaphore implementation.
The notifier sets the state of the specific worker to Notified and
if its was previously sleeping it is responsible to send a wakeup
message.

When a worker is going to sleep it sets its state to Running if
it was notified the state is reset and the sleep attempt skipped.
Otherwise a global read is prepared if needed and the worker waits
on its io_uring.
parent 308613e3
No related branches found
No related tags found
No related merge requests found
......@@ -20,64 +20,56 @@
using emper::io::IoContext;
using emper::lib::TaggedPtr;
#ifndef IORING_OP_MSG_RING
#define IORING_OP_MSG_RING (IORING_OP_LINKAT + 1)
#endif
// TODO: Remove when liburing supports IORING_OP_MSG_RING
static void io_uring_prep_msg_ring(struct io_uring_sqe* sqe, int ring_fd, void* user_data,
uint32_t res) {
io_uring_prep_nop(sqe);
sqe->opcode = IORING_OP_MSG_RING;
sqe->fd = ring_fd;
sqe->off = (uint64_t)user_data;
sqe->len = res;
}
namespace emper::sleep_strategy {
thread_local AbstractIoSleepStrategy::SleepState AbstractIoSleepStrategy::sleepState;
thread_local int AbstractIoSleepStrategy::specificEvfd;
thread_local bool AbstractIoSleepStrategy::readingGlobal = false;
AbstractIoSleepStrategy::AbstractIoSleepStrategy(Runtime& runtime, workerid_t workerCount)
: workerCount(workerCount), stats(runtime) {
: workerCount(workerCount), stats(runtime), runtime(runtime) {
sleepers.store(0, std::memory_order_relaxed);
sleepStates = new SleepState*[workerCount];
evfds = new int[workerCount];
runtime.addNewWorkerHook([this](workerid_t workerId) {
specificEvfd = eventfd(0, 0);
if (specificEvfd == -1) {
DIE_MSG_ERRNO("Creating worker specific notification eventfd failed");
}
evfds[workerId] = specificEvfd;
sleepStates[workerId] = &sleepState;
});
sleepStates = new (std::align_val_t(CACHE_LINE_SIZE)) std::atomic<SleeperState>[workerCount];
}
AbstractIoSleepStrategy::~AbstractIoSleepStrategy() { delete[] sleepStates; }
auto AbstractIoSleepStrategy::prepareSleep(IoContext& io,
const std::function<bool(void)>& customLogic) -> bool {
{
std::lock_guard<std::mutex> lock(sleepState.lock);
// Check if we were notified specifically and should skip the sleep attempt
if (sleepState.isNotified()) {
sleepState.markRunning();
LOGD("Reset notified state to running -> skip sleeping");
return false;
}
if (!sleepState.readingGlobal) {
const bool shouldSleep = customLogic();
if (!shouldSleep) return false;
sleepState.readingGlobal = true;
}
if (!sleepState.isSleeping()) {
sleepState.markSleeping();
auto AbstractIoSleepStrategy::prepareSleep(const std::function<bool(void)>& customLogic) -> bool {
const workerid_t workerId = Worker::getCurrentWorkerId();
std::atomic<SleeperState>& sleepState = sleepStates[workerId];
// Check if we were notified specifically and should skip the sleep attempt
const SleeperState oldState = sleepState.exchange(SleeperState::Sleeping);
// Someone has notified us specifically -> skip sleeping.
if (oldState == SleeperState::Notified) {
sleepState.store(SleeperState::Running, std::memory_order_relaxed);
LOGD("State was notified -> reset notified state to running and skip sleeping");
return false;
}
struct io_uring_sqe* sqe = io.getSqe();
io_uring_prep_read(sqe, specificEvfd, &specificReadBuf, sizeof(specificReadBuf), 0);
// Me must have been running.
assert(oldState == SleeperState::Running);
const auto tag = static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification);
// Tag the sqe with a marked new work notification to reset the specific state
// when reaping the resulting cqe.
io_uring_sqe_set_data(sqe, TaggedPtr((void*)nullptr, tag, true));
if (!readingGlobal) {
const bool shouldSleep = customLogic();
if (!shouldSleep) return false;
io.trackReqsInUring(1);
LOGD("prepared specificEvfd read");
}
readingGlobal = true;
}
return true;
}
......@@ -105,6 +97,22 @@ void AbstractIoSleepStrategy::prepareSleepRead(IoContext& io, int fd, uint8_t sq
io.trackReqsInUring(1);
}
void AbstractIoSleepStrategy::sendSpecificNotification(workerid_t workerId) {
stats.addNotifications(1);
emper::assertInRuntime();
auto& io = *IoContext::getWorkerIo();
struct io_uring_sqe* sqe = io.getSqe();
TaggedPtr tag((void*)nullptr, static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification),
true);
io_uring_prep_msg_ring(sqe, runtime.ioContexts[workerId]->ring.ring_fd, tag, 0);
io.trackReqsInUring(1);
io.submitPreparedSqesAndWait<CallerEnvironment::EMPER>(0);
}
template <CallerEnvironment callerEnvironment>
void AbstractIoSleepStrategy::onNewWorkNotification(IoContext& io,
const emper::lib::TaggedPtr& data) {
......@@ -114,20 +122,21 @@ void AbstractIoSleepStrategy::onNewWorkNotification(IoContext& io,
assert(data.getTag() == static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification));
stats.incWakeupDueToNotify();
std::atomic<SleeperState>& sleepState = sleepStates[io.worker->getWorkerId()];
if (data.isMarked()) {
std::lock_guard<std::mutex> lock(sleepState.lock);
LOGD("Got specific notification");
stats.incWakeupDueToNotify();
// Reset specific and notified flag to indicate that a new specific notification
// was consumed, a new specific read must be prepared and other notifySpecific
// calls must notify again.
sleepState.markRunning();
ATTR_UNUSED auto oldState =
sleepState.exchange(SleeperState::Running, std::memory_order_release);
assert(oldState == SleeperState::Notified);
} else {
LOGD("Got new work notification");
stats.incWakeupDueToNotify();
sleepState.store(SleeperState::Running, std::memory_order_release);
// Reset global flag to indicate that a new sleep cqe must be prepared
sleepState.readingGlobal = false;
readingGlobal = false;
}
}
......
......@@ -49,21 +49,17 @@ namespace emper::sleep_strategy {
* sleepers count
* Per worker:
* dispatch hint buffer
* specific IO object
* lockable sleep state
* readingGlobal flag
* atomic sleep state
*
* PrepareSleep(customLogic):
* Lock state
* Defer unlock state
* If state is notified
* Exchange state with sleeping
* If state was notified
* Reset to running
* Return
* If stats not reading global
* If not readingGlobal
* Return false if customLogic returns false
* Set state reading global
* If state is not sleeping
* Mark state sleeping
* Prepare read from specificEvfd
* Set readingGlobal
* Return true
*
* WriteNotifications : implementation specific
......@@ -90,17 +86,14 @@ namespace emper::sleep_strategy {
*
* NotifySpecific(w):
* Get w's state
* Lock state
* Defer unlock state
* Return if already notified
* Mark state notified
* If state was sleeping
* Write specific notification
* Relaxed exchange with notified
* Return w was sleeping
* Send specific notification
*
* onNewWorkCompletion(data):
* Release state to running
* If data is marked
* lock state
* Mark state running
* assert was notified
* Else
* Reset readingGlobal
*```
......@@ -213,35 +206,8 @@ class AbstractIoSleepStrategy : public Logger<LogSubsystem::SLEEP_S> {
return static_cast<uint64_t>(toWakeup);
}
/**
* @brief State of a worker
*/
class SleepState {
friend class AbstractIoSleepStrategy;
bool readingGlobal = false;
std::mutex lock;
emper::sleep_strategy::SleeperState s = emper::sleep_strategy::SleeperState::Running;
auto markNotified() -> bool {
auto oldS = s;
s = emper::sleep_strategy::SleeperState::Notified;
return oldS == emper::sleep_strategy::SleeperState::Sleeping;
}
void markSleeping() { s = emper::sleep_strategy::SleeperState::Sleeping; }
void markRunning() { s = emper::sleep_strategy::SleeperState::Running; }
auto isNotified() const -> bool { return s == emper::sleep_strategy::SleeperState::Notified; }
auto isSleeping() const -> bool { return s == emper::sleep_strategy::SleeperState::Sleeping; }
};
SleepState** sleepStates;
static thread_local SleepState sleepState;
int* evfds;
static thread_local int specificEvfd;
// Buff used by all specirfic reads since their results are meaningless
uint64_t specificReadBuf;
static thread_local bool readingGlobal;
std::atomic<SleeperState>* sleepStates;
// Make sure the shared counter lives in an exclusive cache line
CACHE_LINE_EXCLUSIVE(std::atomic<int64_t>, sleepers);
......@@ -249,34 +215,27 @@ class AbstractIoSleepStrategy : public Logger<LogSubsystem::SLEEP_S> {
workerid_t workerCount;
Stats stats;
const Runtime& runtime;
AbstractIoSleepStrategy(Runtime& runtime, workerid_t workerCount);
virtual ~AbstractIoSleepStrategy();
virtual void writeNotificationsFromEmper(emper::FiberHint hint, unsigned count) = 0;
virtual void writeNotificationsFromAnywhere(emper::FiberHint hint, unsigned count) = 0;
void writeSpecificNotification(workerid_t workerId) {
stats.addNotifications(1);
const uint64_t b = 1;
ssize_t res = write(evfds[workerId], &b, sizeof(b));
if (unlikely(res) < 0) {
DIE_MSG_ERRNO("writing to the worker " << workerId << " evfd failed");
}
}
void sendSpecificNotification(workerid_t workerId);
using CustomLogic = std::function<bool(void)>;
/**
* @brief Prepare everything to sleep until new work is available
*
* @param io The IoContext used to sleep
* @param customLogic Code specific to the actual IO sleep strategy. Reads are only prepared
* if customLogic returns true.
*
* @return True if any read was prepared and IO completions should be awaited
*/
auto prepareSleep(emper::io::IoContext& io, const CustomLogic& customLogic) -> bool;
auto prepareSleep(const CustomLogic& customLogic) -> bool;
/**
* @brief prepare a new work notification read request
......@@ -322,21 +281,15 @@ class AbstractIoSleepStrategy : public Logger<LogSubsystem::SLEEP_S> {
template <CallerEnvironment callerEnvironment>
void notifySpecificInternal(workerid_t workerId) {
auto& specificState = *sleepStates[workerId];
std::lock_guard<std::mutex> lock(specificState.lock);
auto& specificState = sleepStates[workerId];
LOGD("Specifically notify worker " << workerId << " from " << callerEnvironment);
if (specificState.isNotified()) {
LOGD(workerId << " already marked notified");
return;
}
const bool isSleeping = specificState.markNotified();
if (isSleeping) {
LOGD(workerId << " has specific read -> write notification");
writeSpecificNotification(workerId);
auto oldState = specificState.exchange(SleeperState::Notified, std::memory_order_relaxed);
// We are responsible to wake the futex
if (oldState == SleeperState::Sleeping) {
LOGD(workerId << " was sleeping send notification");
sendSpecificNotification(workerId);
}
}
......
......@@ -26,7 +26,7 @@ void PipeSleepStrategy::sleep() {
return true;
};
const bool shouldSleep = prepareSleep(io, customSleepLogic);
const bool shouldSleep = prepareSleep(customSleepLogic);
if (!shouldSleep) return;
submitAndWaitForCompletionsIn(io);
}
......
......@@ -49,11 +49,7 @@ WaitfdSleepStrategy::WaitfdSleepStrategy(Runtime& runtime, workerid_t workerCoun
}
}
WaitfdSleepStrategy::~WaitfdSleepStrategy() {
for (unsigned i = 0; i < workerCount; ++i) close(evfds[i]);
delete[] evfds;
close(waitfd);
}
WaitfdSleepStrategy::~WaitfdSleepStrategy() { close(waitfd); }
void WaitfdSleepStrategy::sleep() {
IoContext& io = *IoContext::getWorkerIo();
......@@ -67,7 +63,7 @@ void WaitfdSleepStrategy::sleep() {
LOGD("prepared waitfd read");
return true;
};
const bool shouldSleep = prepareSleep(io, customLogic);
const bool shouldSleep = prepareSleep(customLogic);
if (!shouldSleep) return;
......
......@@ -8,8 +8,10 @@
#include "Fiber.hpp"
#include "Runtime.hpp"
#include "Worker.hpp"
#include "emper-config.h"
#include "emper.hpp"
#include "fixtures/assert.hpp"
#include "lib/LinuxVersion.hpp" // IWYU pragma: keep
// static const unsigned ITERATIONS = 100;
static const unsigned ITERATIONS = 2;
......@@ -42,4 +44,10 @@ static void scheduleOnTest() {
cps.wait();
}
void emperTest() { scheduleOnTest(); }
void emperTest() {
#ifndef EMPER_SEMAPHORE_SLEEP_STRATEGY
// io_uring supports IORING_OP_MSG_RING since Linux 5.18
if (EMPER_LINUX_LT("5.18")) exit(77);
#endif
scheduleOnTest();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment