diff --git a/emper/sleep_strategy/AbstractIoSleepStrategy.cpp b/emper/sleep_strategy/AbstractIoSleepStrategy.cpp index 3a804ea33b56d1d5e6abdb848bb5c097a6b857be..971c1a26532f437b5219e1f1ded13ce02712f6e2 100644 --- a/emper/sleep_strategy/AbstractIoSleepStrategy.cpp +++ b/emper/sleep_strategy/AbstractIoSleepStrategy.cpp @@ -20,64 +20,56 @@ using emper::io::IoContext; using emper::lib::TaggedPtr; +#ifndef IORING_OP_MSG_RING +#define IORING_OP_MSG_RING (IORING_OP_LINKAT + 1) +#endif + +// TODO: Remove when liburing supports IORING_OP_MSG_RING +static void io_uring_prep_msg_ring(struct io_uring_sqe* sqe, int ring_fd, void* user_data, + uint32_t res) { + io_uring_prep_nop(sqe); + sqe->opcode = IORING_OP_MSG_RING; + sqe->fd = ring_fd; + sqe->off = (uint64_t)user_data; + sqe->len = res; +} + namespace emper::sleep_strategy { -thread_local AbstractIoSleepStrategy::SleepState AbstractIoSleepStrategy::sleepState; -thread_local int AbstractIoSleepStrategy::specificEvfd; +thread_local bool AbstractIoSleepStrategy::readingGlobal = false; AbstractIoSleepStrategy::AbstractIoSleepStrategy(Runtime& runtime, workerid_t workerCount) - : workerCount(workerCount), stats(runtime) { + : workerCount(workerCount), stats(runtime), runtime(runtime) { sleepers.store(0, std::memory_order_relaxed); - sleepStates = new SleepState*[workerCount]; - evfds = new int[workerCount]; - - runtime.addNewWorkerHook([this](workerid_t workerId) { - specificEvfd = eventfd(0, 0); - if (specificEvfd == -1) { - DIE_MSG_ERRNO("Creating worker specific notification eventfd failed"); - } - evfds[workerId] = specificEvfd; - sleepStates[workerId] = &sleepState; - }); + sleepStates = new (std::align_val_t(CACHE_LINE_SIZE)) std::atomic<SleeperState>[workerCount]; } AbstractIoSleepStrategy::~AbstractIoSleepStrategy() { delete[] sleepStates; } -auto AbstractIoSleepStrategy::prepareSleep(IoContext& io, - const std::function<bool(void)>& customLogic) -> bool { - { - std::lock_guard<std::mutex> lock(sleepState.lock); - - // Check if we were notified specifically and should skip the sleep attempt - if (sleepState.isNotified()) { - sleepState.markRunning(); - LOGD("Reset notified state to running -> skip sleeping"); - return false; - } - - if (!sleepState.readingGlobal) { - const bool shouldSleep = customLogic(); - if (!shouldSleep) return false; - - sleepState.readingGlobal = true; - } - - if (!sleepState.isSleeping()) { - sleepState.markSleeping(); +auto AbstractIoSleepStrategy::prepareSleep(const std::function<bool(void)>& customLogic) -> bool { + const workerid_t workerId = Worker::getCurrentWorkerId(); + std::atomic<SleeperState>& sleepState = sleepStates[workerId]; + + // Check if we were notified specifically and should skip the sleep attempt + const SleeperState oldState = sleepState.exchange(SleeperState::Sleeping); + // Someone has notified us specifically -> skip sleeping. + if (oldState == SleeperState::Notified) { + sleepState.store(SleeperState::Running, std::memory_order_relaxed); + LOGD("State was notified -> reset notified state to running and skip sleeping"); + return false; + } - struct io_uring_sqe* sqe = io.getSqe(); - io_uring_prep_read(sqe, specificEvfd, &specificReadBuf, sizeof(specificReadBuf), 0); + // Me must have been running. + assert(oldState == SleeperState::Running); - const auto tag = static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification); - // Tag the sqe with a marked new work notification to reset the specific state - // when reaping the resulting cqe. - io_uring_sqe_set_data(sqe, TaggedPtr((void*)nullptr, tag, true)); + if (!readingGlobal) { + const bool shouldSleep = customLogic(); + if (!shouldSleep) return false; - io.trackReqsInUring(1); - LOGD("prepared specificEvfd read"); - } + readingGlobal = true; } + return true; } @@ -105,6 +97,22 @@ void AbstractIoSleepStrategy::prepareSleepRead(IoContext& io, int fd, uint8_t sq io.trackReqsInUring(1); } +void AbstractIoSleepStrategy::sendSpecificNotification(workerid_t workerId) { + stats.addNotifications(1); + + emper::assertInRuntime(); + auto& io = *IoContext::getWorkerIo(); + struct io_uring_sqe* sqe = io.getSqe(); + + TaggedPtr tag((void*)nullptr, static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification), + true); + + io_uring_prep_msg_ring(sqe, runtime.ioContexts[workerId]->ring.ring_fd, tag, 0); + + io.trackReqsInUring(1); + io.submitPreparedSqesAndWait<CallerEnvironment::EMPER>(0); +} + template <CallerEnvironment callerEnvironment> void AbstractIoSleepStrategy::onNewWorkNotification(IoContext& io, const emper::lib::TaggedPtr& data) { @@ -114,20 +122,21 @@ void AbstractIoSleepStrategy::onNewWorkNotification(IoContext& io, assert(data.getTag() == static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification)); + stats.incWakeupDueToNotify(); + + std::atomic<SleeperState>& sleepState = sleepStates[io.worker->getWorkerId()]; + if (data.isMarked()) { - std::lock_guard<std::mutex> lock(sleepState.lock); LOGD("Got specific notification"); - stats.incWakeupDueToNotify(); - // Reset specific and notified flag to indicate that a new specific notification - // was consumed, a new specific read must be prepared and other notifySpecific - // calls must notify again. - sleepState.markRunning(); + ATTR_UNUSED auto oldState = + sleepState.exchange(SleeperState::Running, std::memory_order_release); + assert(oldState == SleeperState::Notified); } else { LOGD("Got new work notification"); - stats.incWakeupDueToNotify(); + sleepState.store(SleeperState::Running, std::memory_order_release); // Reset global flag to indicate that a new sleep cqe must be prepared - sleepState.readingGlobal = false; + readingGlobal = false; } } diff --git a/emper/sleep_strategy/AbstractIoSleepStrategy.hpp b/emper/sleep_strategy/AbstractIoSleepStrategy.hpp index 8c495d4b8549c9ab58ee2c45167d4103be85a71a..bab7e299664a3676c1645e17a685af8668021678 100644 --- a/emper/sleep_strategy/AbstractIoSleepStrategy.hpp +++ b/emper/sleep_strategy/AbstractIoSleepStrategy.hpp @@ -49,21 +49,17 @@ namespace emper::sleep_strategy { * sleepers count * Per worker: * dispatch hint buffer - * specific IO object - * lockable sleep state + * readingGlobal flag + * atomic sleep state * * PrepareSleep(customLogic): - * Lock state - * Defer unlock state - * If state is notified + * Exchange state with sleeping + * If state was notified * Reset to running * Return - * If stats not reading global + * If not readingGlobal * Return false if customLogic returns false - * Set state reading global - * If state is not sleeping - * Mark state sleeping - * Prepare read from specificEvfd + * Set readingGlobal * Return true * * WriteNotifications : implementation specific @@ -90,17 +86,14 @@ namespace emper::sleep_strategy { * * NotifySpecific(w): * Get w's state - * Lock state - * Defer unlock state - * Return if already notified - * Mark state notified - * If state was sleeping - * Write specific notification + * Relaxed exchange with notified + * Return w was sleeping + * Send specific notification * * onNewWorkCompletion(data): + * Release state to running * If data is marked - * lock state - * Mark state running + * assert was notified * Else * Reset readingGlobal *``` @@ -213,35 +206,8 @@ class AbstractIoSleepStrategy : public Logger<LogSubsystem::SLEEP_S> { return static_cast<uint64_t>(toWakeup); } - /** - * @brief State of a worker - */ - class SleepState { - friend class AbstractIoSleepStrategy; - - bool readingGlobal = false; - std::mutex lock; - emper::sleep_strategy::SleeperState s = emper::sleep_strategy::SleeperState::Running; - - auto markNotified() -> bool { - auto oldS = s; - s = emper::sleep_strategy::SleeperState::Notified; - return oldS == emper::sleep_strategy::SleeperState::Sleeping; - } - void markSleeping() { s = emper::sleep_strategy::SleeperState::Sleeping; } - void markRunning() { s = emper::sleep_strategy::SleeperState::Running; } - - auto isNotified() const -> bool { return s == emper::sleep_strategy::SleeperState::Notified; } - auto isSleeping() const -> bool { return s == emper::sleep_strategy::SleeperState::Sleeping; } - }; - - SleepState** sleepStates; - static thread_local SleepState sleepState; - - int* evfds; - static thread_local int specificEvfd; - // Buff used by all specirfic reads since their results are meaningless - uint64_t specificReadBuf; + static thread_local bool readingGlobal; + std::atomic<SleeperState>* sleepStates; // Make sure the shared counter lives in an exclusive cache line CACHE_LINE_EXCLUSIVE(std::atomic<int64_t>, sleepers); @@ -249,34 +215,27 @@ class AbstractIoSleepStrategy : public Logger<LogSubsystem::SLEEP_S> { workerid_t workerCount; Stats stats; + const Runtime& runtime; + AbstractIoSleepStrategy(Runtime& runtime, workerid_t workerCount); virtual ~AbstractIoSleepStrategy(); virtual void writeNotificationsFromEmper(emper::FiberHint hint, unsigned count) = 0; virtual void writeNotificationsFromAnywhere(emper::FiberHint hint, unsigned count) = 0; - void writeSpecificNotification(workerid_t workerId) { - stats.addNotifications(1); - - const uint64_t b = 1; - ssize_t res = write(evfds[workerId], &b, sizeof(b)); - if (unlikely(res) < 0) { - DIE_MSG_ERRNO("writing to the worker " << workerId << " evfd failed"); - } - } + void sendSpecificNotification(workerid_t workerId); using CustomLogic = std::function<bool(void)>; /** * @brief Prepare everything to sleep until new work is available * - * @param io The IoContext used to sleep * @param customLogic Code specific to the actual IO sleep strategy. Reads are only prepared * if customLogic returns true. * * @return True if any read was prepared and IO completions should be awaited */ - auto prepareSleep(emper::io::IoContext& io, const CustomLogic& customLogic) -> bool; + auto prepareSleep(const CustomLogic& customLogic) -> bool; /** * @brief prepare a new work notification read request @@ -322,21 +281,15 @@ class AbstractIoSleepStrategy : public Logger<LogSubsystem::SLEEP_S> { template <CallerEnvironment callerEnvironment> void notifySpecificInternal(workerid_t workerId) { - auto& specificState = *sleepStates[workerId]; - std::lock_guard<std::mutex> lock(specificState.lock); + auto& specificState = sleepStates[workerId]; LOGD("Specifically notify worker " << workerId << " from " << callerEnvironment); - if (specificState.isNotified()) { - LOGD(workerId << " already marked notified"); - return; - } - - const bool isSleeping = specificState.markNotified(); - - if (isSleeping) { - LOGD(workerId << " has specific read -> write notification"); - writeSpecificNotification(workerId); + auto oldState = specificState.exchange(SleeperState::Notified, std::memory_order_relaxed); + // We are responsible to wake the futex + if (oldState == SleeperState::Sleeping) { + LOGD(workerId << " was sleeping send notification"); + sendSpecificNotification(workerId); } } diff --git a/emper/sleep_strategy/PipeSleepStrategy.cpp b/emper/sleep_strategy/PipeSleepStrategy.cpp index 1328dfdf5100b66bc1ed582cbaf8a0b740354cf2..cf1d9bcb22c36c10bdbd0a7068f02c768bcdf5e2 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.cpp +++ b/emper/sleep_strategy/PipeSleepStrategy.cpp @@ -26,7 +26,7 @@ void PipeSleepStrategy::sleep() { return true; }; - const bool shouldSleep = prepareSleep(io, customSleepLogic); + const bool shouldSleep = prepareSleep(customSleepLogic); if (!shouldSleep) return; submitAndWaitForCompletionsIn(io); } diff --git a/emper/sleep_strategy/WaitfdSleepStrategy.cpp b/emper/sleep_strategy/WaitfdSleepStrategy.cpp index 605ce494c6307bd6649eb9e9098f4c182772e8d8..2e221254db8da860f0cc33155e67a95d136de184 100644 --- a/emper/sleep_strategy/WaitfdSleepStrategy.cpp +++ b/emper/sleep_strategy/WaitfdSleepStrategy.cpp @@ -49,11 +49,7 @@ WaitfdSleepStrategy::WaitfdSleepStrategy(Runtime& runtime, workerid_t workerCoun } } -WaitfdSleepStrategy::~WaitfdSleepStrategy() { - for (unsigned i = 0; i < workerCount; ++i) close(evfds[i]); - delete[] evfds; - close(waitfd); -} +WaitfdSleepStrategy::~WaitfdSleepStrategy() { close(waitfd); } void WaitfdSleepStrategy::sleep() { IoContext& io = *IoContext::getWorkerIo(); @@ -67,7 +63,7 @@ void WaitfdSleepStrategy::sleep() { LOGD("prepared waitfd read"); return true; }; - const bool shouldSleep = prepareSleep(io, customLogic); + const bool shouldSleep = prepareSleep(customLogic); if (!shouldSleep) return; diff --git a/tests/ScheduleOnTest.cpp b/tests/ScheduleOnTest.cpp index 5f7507fb8511221a5f6e26278f57816f7baa8e0e..3ed34f19bf7771443f00f80de2c4664d40c3a197 100644 --- a/tests/ScheduleOnTest.cpp +++ b/tests/ScheduleOnTest.cpp @@ -8,8 +8,10 @@ #include "Fiber.hpp" #include "Runtime.hpp" #include "Worker.hpp" +#include "emper-config.h" #include "emper.hpp" #include "fixtures/assert.hpp" +#include "lib/LinuxVersion.hpp" // IWYU pragma: keep // static const unsigned ITERATIONS = 100; static const unsigned ITERATIONS = 2; @@ -42,4 +44,10 @@ static void scheduleOnTest() { cps.wait(); } -void emperTest() { scheduleOnTest(); } +void emperTest() { +#ifndef EMPER_SEMAPHORE_SLEEP_STRATEGY + // io_uring supports IORING_OP_MSG_RING since Linux 5.18 + if (EMPER_LINUX_LT("5.18")) exit(77); +#endif + scheduleOnTest(); +}