diff --git a/emper/Emper.hpp b/emper/Emper.hpp index 0d16f2a6ff24a9238e7db95adc7207dd391112e0..c917c962c6755bcdcb79d660ab168749ffb02386 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -153,6 +153,30 @@ const bool IO = #endif ; +const bool IO_NOTIFICATION = +#ifdef EMPER_IO_NOTIFICATION + true +#else + false +#endif + ; + +const bool IO_STEAL_ALL_ON_NOTIFICATION = +#ifdef EMPER_IO_STEAL_ALL_ON_NOTIFICATION + true +#else + false +#endif + ; + +const bool IO_PUNT_WAITFD_READ = +#ifdef EMPER_IO_SKIP_WAITFD_READ_PUNT + false +#else + true +#endif + ; + const bool IO_STEALING = #ifdef EMPER_IO_STEALING true diff --git a/emper/FiberSource.hpp b/emper/FiberSource.hpp index 75412f41d81fbca90683e33b60840d67cf4de1fe..39d5d7294cec286c013790215279429248c5e29b 100644 --- a/emper/FiberSource.hpp +++ b/emper/FiberSource.hpp @@ -20,6 +20,7 @@ enum class FiberSource : uintptr_t { ioStolen, /*!< A other worker's io_uring completion queue */ anywhereQueue, /*!< The anywhere queue */ hintWsq, /*!< A known other worker's work-stealing queue */ + hintIo, /*!< A known worker's io_uring completion queue */ hintAq, /*!< Straight from the anywhere queue */ }; diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index c1d6171d84e6a31442b50641e2365c5c172c1e85..83ad94f6801087bf685c3bb0cf44891745449e40 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -20,6 +20,7 @@ #include <ratio> #include <string> // for string #include <thread> +#include <utility> #include "Common.hpp" // for DIE_MSG_ERRNO, DIE, DIE_MSG #include "Context.hpp" @@ -430,16 +431,9 @@ void Runtime::yield() { auto Runtime::nextFiber() -> std::optional<NextFiberResult> { if constexpr (emper::IO_WORKER_URING && !emper::IO_SYNCHRONOUS) { // Schedule all fibers waiting on completed IO - IoContext::ContinuationBuffer completions; - unsigned ncompletions = - IoContext::getWorkerIo()->reapCompletions<CallerEnvironment::OWNER>(completions); - if (ncompletions > 0) { - // Keep the first and schedule the rest - Fiber* next = completions[0]; - schedule(&completions[1], ncompletions - 1); - - return NextFiberResult{next, emper::FiberSource::io}; - } + Fiber* next = + IoContext::getWorkerIo()->reapAndScheduleAllReturnFirst<CallerEnvironment::OWNER>().first; + if (next) return NextFiberResult{next, emper::FiberSource::io}; } return scheduler.nextFiber(); diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index b8388f878b1d04137bcea6cb7749d1bdc419245e..26f9e9ada58d0ceb6f0c58724dfedc1e03ba25ee 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -53,8 +53,10 @@ namespace log { class LogBuffer; } namespace sleep_strategy { +class AbstractIoSleepStrategy; class PipeSleepStrategy; -} +class WaitfdSleepStrategy; +} // namespace sleep_strategy namespace stats { class Worker; class FromAnywhere; @@ -262,8 +264,10 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { template <typename> friend class WorkerLocalData; friend class Fibril; + friend class emper::sleep_strategy::AbstractIoSleepStrategy; friend class emper::sleep_strategy::PipeSleepStrategy; friend class emper::lib::ShardedFileBuffer; + friend class emper::sleep_strategy::WaitfdSleepStrategy; friend void emper::log::log(const std::string& prefix, const std::string& message); }; diff --git a/emper/Worker.hpp b/emper/Worker.hpp index d47cd89c5d84182e3658d9fa3fd279dfe28eabaf..07abdccfa39ad05372f360d7c84b40044f0c135f 100644 --- a/emper/Worker.hpp +++ b/emper/Worker.hpp @@ -11,9 +11,8 @@ class Runtime; namespace emper::sleep_strategy { -class PipeSleepStrategy; +class AbstractIoSleepStrategy; } -using emper::sleep_strategy::PipeSleepStrategy; class Worker { private: @@ -39,7 +38,7 @@ class Worker { friend Runtime; friend class AbstractWorkStealingScheduler; - friend class PipeSleepStrategy; + friend class emper::sleep_strategy::AbstractIoSleepStrategy; public: [[nodiscard]] auto getWorkerId() const -> workerid_t { return workerId; } diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index a8241d1f98355c7fa3b3c45bcb97bb1d69490296..1ea37f70e325f3ae2589921eacc81108a45ae1b8 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -33,11 +33,15 @@ #include "io/Stats.hpp" #include "io/SubmitActor.hpp" // IWYU pragma: keep #include "lib/TaggedPtr.hpp" -#include "lib/adt/LockedSet.hpp" // for LockedSet -#include "sleep_strategy/PipeSleepStrategy.hpp" // IWYU pragma: keep +#include "lib/adt/LockedSet.hpp" +#include "sleep_strategy/AbstractIoSleepStrategy.hpp" // IWYU pragma: keep class AbstractWorkStealingScheduler; +namespace emper::sleep_strategy { +class WaitfdSleepStrategy; +} + namespace emper::io { class IoContext : public Logger<LogSubsystem::IO> { friend class ::Runtime; @@ -51,7 +55,8 @@ class IoContext : public Logger<LogSubsystem::IO> { // IoContext pointer in GlobalIoContext::globalCompleterFunc friend class GlobalIoContext; - friend class emper::sleep_strategy::PipeSleepStrategy; + friend class emper::sleep_strategy::AbstractIoSleepStrategy; + friend class emper::sleep_strategy::WaitfdSleepStrategy; // Debug functions to access the mmaped memory of ring. // gdb is not allowed to access the io mmaped memory of the io_uring fd. @@ -223,8 +228,9 @@ class IoContext : public Logger<LogSubsystem::IO> { auto tag = static_cast<PointerTags>(tptr.getTag()); switch (tag) { case PointerTags::NewWorkNotification: { - auto &sleepStrategy = - reinterpret_cast<PipeSleepStrategy &>(runtime.getWorkerSleepStrategy()); + if (res < 0) DIE_MSG("NewWorkNotification error: " << strerror(-res)); + auto &sleepStrategy = reinterpret_cast<emper::sleep_strategy::AbstractIoSleepStrategy &>( + runtime.getWorkerSleepStrategy()); sleepStrategy.onNewWorkNotification<callerEnvironment>(*this, tptr); break; } @@ -535,6 +541,29 @@ class IoContext : public Logger<LogSubsystem::IO> { } } + /** + * @brief Reap all completions and schedule their continuations except the first + * + * @return Pair of the first fiber and the number of scheduled ones + */ + template <CallerEnvironment callerEnvironment> + auto reapAndScheduleAllReturnFirst() -> std::pair<Fiber *, unsigned> { + ContinuationBuffer completionBuf; + unsigned ncompletions = reapCompletions<callerEnvironment>(completionBuf); + if (ncompletions > 0) { + auto *toSchedule = &completionBuf[1]; + if constexpr (callerEnvironment != CallerEnvironment::ANYWHERE) { + runtime.schedule(toSchedule, ncompletions - 1); + } else { + runtime.scheduleFromAnywhere(toSchedule, ncompletions - 1); + } + + return {completionBuf[0], ncompletions - 1}; + } + + return {nullptr, 0}; + } + auto getStats() -> Stats & { return stats; }; }; } // namespace emper::io diff --git a/emper/sleep_strategy/AbstractIoSleepStrategy.cpp b/emper/sleep_strategy/AbstractIoSleepStrategy.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6eae8bdc2bf45792c2f894fdc8e31a206cb59a55 --- /dev/null +++ b/emper/sleep_strategy/AbstractIoSleepStrategy.cpp @@ -0,0 +1,142 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Fischer +#include "sleep_strategy/AbstractIoSleepStrategy.hpp" + +#include <liburing.h> +#include <liburing/io_uring.h> + +#include <atomic> +#include <cassert> +#include <cstdint> +#include <new> +#include <vector> + +#include "CallerEnvironment.hpp" +#include "Common.hpp" +#include "Emper.hpp" +#include "Runtime.hpp" +#include "Worker.hpp" +#include "io/IoContext.hpp" +#include "lib/TaggedPtr.hpp" + +using emper::io::IoContext; +using emper::lib::TaggedPtr; + +namespace emper::sleep_strategy { + +thread_local bool AbstractIoSleepStrategy::readingGlobal = false; + +AbstractIoSleepStrategy::AbstractIoSleepStrategy(Runtime& runtime, workerid_t workerCount) + : workerCount(workerCount), stats(runtime), runtime(runtime) { + sleepers.store(0, std::memory_order_relaxed); + + sleepStates = new (std::align_val_t(CACHE_LINE_SIZE)) std::atomic<SleeperState>[workerCount]; +} + +AbstractIoSleepStrategy::~AbstractIoSleepStrategy() { delete[] sleepStates; } + +auto AbstractIoSleepStrategy::prepareSleep(const std::function<bool(void)>& customLogic) -> bool { + const workerid_t workerId = Worker::getCurrentWorkerId(); + std::atomic<SleeperState>& sleepState = sleepStates[workerId]; + + // Check if we were notified specifically and should skip the sleep attempt + const SleeperState oldState = sleepState.exchange(SleeperState::Sleeping); + // Someone has notified us specifically -> skip sleeping. + if (oldState == SleeperState::Notified) { + LOGD("State was notified -> reset notified state to running and skip sleeping"); + goto skip; + } + + // Me must have been running or already reading from the global IO object. + assert(oldState == SleeperState::Running || readingGlobal); + + if (!readingGlobal) { + const bool shouldSleep = customLogic(); + if (!shouldSleep) goto skip; + readingGlobal = true; + } + + return true; + +skip: + sleepState.store(SleeperState::Running, std::memory_order_relaxed); + return false; +} + +void AbstractIoSleepStrategy::submitAndWaitForCompletionsIn(IoContext& io) { + LOGD("sleep until IO completions occur"); + io.submitPreparedSqesAndWait<CallerEnvironment::EMPER>(1); + LOGD("wakeup due to available IO completions"); + + stats.incWakeup(); +} + +void AbstractIoSleepStrategy::prepareSleepRead(IoContext& io, int fd, uint8_t sqe_flags) { + struct io_uring_sqe* sqe = io.getSqe(); + + // We read directly into the workers dispatchHint + // TODO: Think about the race between the two reads + io_uring_prep_read(sqe, fd, &io.worker->dispatchHint, sizeof(io.worker->dispatchHint), 0); + + sqe->flags = sqe_flags; + + io_uring_sqe_set_data( + sqe, TaggedPtr((void*)nullptr, + static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification))); + + io.trackReqsInUring(1); +} + +void AbstractIoSleepStrategy::sendSpecificNotification(workerid_t workerId) { + stats.addNotifications(1); + + emper::assertInRuntime(); + auto& io = *IoContext::getWorkerIo(); + struct io_uring_sqe* sqe = io.getSqe(); + + TaggedPtr tag((void*)nullptr, static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification), + true); + + io_uring_prep_msg_ring(sqe, runtime.ioContexts[workerId]->ring.ring_fd, 0, uintptr_t(tag), 0); + + // Do not generate a cqe on the submiting io_uring + sqe->flags = IOSQE_CQE_SKIP_SUCCESS; + io_uring_sqe_set_data(sqe, tag); + + // The sent message will create a cqes one on the messaged io_uring + runtime.ioContexts[workerId]->trackReqsInUring(1); + io.submitPreparedSqesAndWait<CallerEnvironment::EMPER>(0); +} + +template <CallerEnvironment callerEnvironment> +void AbstractIoSleepStrategy::onNewWorkNotification(IoContext& io, + const emper::lib::TaggedPtr& data) { + if constexpr (callerEnvironment != CallerEnvironment::OWNER) { + DIE_MSG("Others reaping new work notification from " << io.worker->getWorkerId() << " CQ"); + } + + assert(data.getTag() == static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification)); + + stats.incWakeupDueToNotify(); + + std::atomic<SleeperState>& sleepState = sleepStates[io.worker->getWorkerId()]; + + if (data.isMarked()) { + LOGD("Got specific notification"); + } else { + LOGD("Got new work notification"); + // Reset global flag to indicate that a new sleep cqe must be prepared + readingGlobal = false; + } + + sleepState.store(SleeperState::Running, std::memory_order_release); +} + +template void AbstractIoSleepStrategy::onNewWorkNotification<CallerEnvironment::OWNER>( + IoContext& io, const TaggedPtr& data); +template void AbstractIoSleepStrategy::onNewWorkNotification<CallerEnvironment::EMPER>( + IoContext& io, const TaggedPtr& data); +template void AbstractIoSleepStrategy::onNewWorkNotification<CallerEnvironment::ANYWHERE>( + IoContext& io, const TaggedPtr& data); + +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/AbstractIoSleepStrategy.hpp b/emper/sleep_strategy/AbstractIoSleepStrategy.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b678cf7ce7f8d2a785e99b3998baa2ba29d2df3f --- /dev/null +++ b/emper/sleep_strategy/AbstractIoSleepStrategy.hpp @@ -0,0 +1,301 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Fischer +#pragma once + +#include <algorithm> +#include <atomic> +#include <cassert> +#include <cstdint> +#include <functional> +#include <iostream> + +#include "CallerEnvironment.hpp" +#include "Common.hpp" +#include "Debug.hpp" +#include "FiberHint.hpp" +#include "emper-common.h" +#include "lib/TaggedPtr.hpp" +#include "sleep_strategy/SleeperState.hpp" +#include "sleep_strategy/Stats.hpp" + +class Runtime; + +namespace emper::io { +class IoContext; +} + +namespace emper::sleep_strategy { +/** + * @brief An abstract IO based sleep strategy combining the IO subsystem with the sleep strategy + * + * Design goals + * ============ + * + * * Wakeup either on external new work notifications or on local IO completions + * -> Sleep strategy is sound without the IO completer + * * Do as less as possible in a system saturated with work + * * Pass a hint where to find new work to suspended workers + * + * Algorithm + * ========= + * + *``` + * Data: + * Global: + * global IO object + * sleepers count + * Per worker: + * dispatch hint buffer + * readingGlobal flag + * atomic sleep state + * + * PrepareSleep(customLogic): + * Exchange state with sleeping + * If state was notified + * Reset to running + * Return + * If not readingGlobal + * Return false if customLogic returns false + * Set readingGlobal + * Return true + * + * WriteNotifications : implementation specific + * + * NotifyEmper(n): + * if observed sleepers <= 0 + * return + * + * // Determine how many we are responsible to wake + * do + * toWakeup = min(observed sleepers, n) + * while (!CAS(sleepers, toWakeup)) + * + * write toWakeup hints + * + * NotifyAnywhere(n): + * // Ensure all n notifications take effect + * while (!CAS(sleepers, observed sleepers - n)) + * if observed sleeping <= -n + * return + * + * toWakeup = min(observed sleeping, n) + * write toWakeup hints + * + * NotifySpecific(w): + * Get w's state + * Relaxed exchange with notified + * Return w was sleeping + * Send specific notification + * + * onNewWorkCompletion(data): + * Release state to running + * If data is marked + * assert was notified + * Else + * Reset readingGlobal + *``` + * + * Notes + * ===== + * + * * We must decrement the sleepers count on the notifier side to + * prevent multiple notifiers to observe all the same amount of sleepers, + * trying to wake up the same sleepers by writing to the e.g a pipe will jam + * it up with unconsumed hints and thus block in writeNotifications resulting + * in a deadlock. + * * The CAS loops on the notifier side are needed because decrementing + * and incrementing is racy: Two notifier can observe the + * sum of both their excess decrement and increment to much resulting in a + * broken counter. + * * Add the dispatch hint code in AbstractWorkStealingScheduler::nextFiber. + * This allows workers to check the dispatch hint after there + * where no local work to execute. + * This is a trade-off where we trade slower wakeup - a just awoken worker + * will check for local work - against a faster dispatch hot path when + * we have work to do in our local WSQ. + * * Other thread must not reap new work notifications because this + * would introduces a race for cqes and a possible + * lost wakeup if the other consumes the completions before the worker + * is actually waiting for them. + * * When notifying sleeping workers from anywhere we must ensure that all + * notifications take effect. This is needed for example when terminating + * the runtime to prevent sleep attempt from worker thread which are + * about to sleep but have not incremented the sleeper count yet. + * We achieve this by always decrementing the sleeper count by the notification + * count. + */ +class AbstractIoSleepStrategy : public Logger<LogSubsystem::SLEEP_S> { + friend class emper::io::IoContext; + + protected: + /** + * @brief decrement the sleep count and return how many workers should be notified + * + * As mentioned in the AbstractIoSleepStrategy documentation We need to decrement + * the sleeper count on the notification side to prevent overcommiting notifications. + * And the CAS loop is needed because we don't want to decrement the sleepers + * count to much. + * + * Decrement the sleeper count for each sleeper we are going to wakeup + */ + virtual auto getSleeperCountToWakeupEmper(int64_t& sleeping, int64_t count) -> uint64_t { + int64_t toWakeup; + do { + // No one is sleeping -> we don't need to notify + if (sleeping <= 0) { + return 0; + } + + // Count of sleepers we notify + toWakeup = std::min(sleeping, count); + } while (!sleepers.compare_exchange_weak(sleeping, sleeping - toWakeup, + std::memory_order_release, std::memory_order_acquire)); + + LOGD("notifyFromEmper writes " + << toWakeup << " notifications and set sleepers count to: " << sleeping - toWakeup); + + assert(toWakeup >= 0); + return static_cast<uint64_t>(toWakeup); + } + + /** + * @brief decrement the sleep count and return how many workers should be notified + * + * If we observe nobody sleeping we need to prevent sleep locks when + * everybody is about to sleep but none has incremented the sleepers count yet. + * We prevent the next sleep from blocking by setting the sleeper count to + * the negative amount we want to prevent from sleeping. + * + * More general if we notify from anywhere we need all notifications to take + * effect. Only notifying the sleepers we observe may not be enough. + * For example when terminating the runtime we notify all workers if we + * only wakeup the ones we observe sleeping we may never terminate because + * after we notified all we observed new one went to sleep which will never + * be notified. + * To prevent this we make sure that all count notifications take effect + * by making the sleepers count negative and thus preventing at least count + * sleep attempts. + */ + virtual auto getSleeperCountToWakeupAnywhere(int64_t& sleeping, int64_t count) -> uint64_t { + int64_t newSleeping; + do { + // We already prevent enough sleep attempts + if (sleeping <= -count) { + LOGD("notifyFromAnywhere sleeper count already preventing enough sleep attempts"); + return 0; + } + + // Don't decrease the sleeper count in the CAS loop further than -count, + // which is the threshold we need to ensure that the notifications will be + // observed. + // Decreasing it further than this threshold is not faulty it just + // results in unnecessary skipped sleeps. + newSleeping = std::max(-count, sleeping - count); + } while (!sleepers.compare_exchange_weak(sleeping, newSleeping, std::memory_order_release, + std::memory_order_acquire)); + + // toWakeup should not be negative but could be if sleeping was already negative + const int64_t toWakeup = std::max(std::min(sleeping, count), 0L); + LOGD("notifyFromAnywhere writes " + << toWakeup << " notifications and set sleepers count to: " << newSleeping); + + assert(toWakeup >= 0); + return static_cast<uint64_t>(toWakeup); + } + + static thread_local bool readingGlobal; + std::atomic<SleeperState>* sleepStates; + + // Make sure the shared counter lives in an exclusive cache line + CACHE_LINE_EXCLUSIVE(std::atomic<int64_t>, sleepers); + + workerid_t workerCount; + Stats stats; + + const Runtime& runtime; + + AbstractIoSleepStrategy(Runtime& runtime, workerid_t workerCount); + virtual ~AbstractIoSleepStrategy(); + + virtual void writeNotificationsFromEmper(emper::FiberHint hint, unsigned count) = 0; + virtual void writeNotificationsFromAnywhere(emper::FiberHint hint, unsigned count) = 0; + + void sendSpecificNotification(workerid_t workerId); + + using CustomLogic = std::function<bool(void)>; + + /** + * @brief Prepare everything to sleep until new work is available + * + * @param customLogic Code specific to the actual IO sleep strategy. Reads are only prepared + * if customLogic returns true. + * + * @return True if any read was prepared and IO completions should be awaited + */ + auto prepareSleep(const CustomLogic& customLogic) -> bool; + + /** + * @brief prepare a new work notification read request + * + * @param io The IoContext where the read request will be prepared + * @param fd The file descriptor to read from + * @param sqe_flags The flags set on the issued SQE + */ + static void prepareSleepRead(emper::io::IoContext& io, int fd, uint8_t sqe_flags = 0); + + void submitAndWaitForCompletionsIn(emper::io::IoContext& io); + + template <CallerEnvironment callerEnvironment> + void onNewWorkNotification(emper::io::IoContext& io, const emper::lib::TaggedPtr& data); + + template <CallerEnvironment callerEnvironment> + auto getSleeperCountToWakeup(int64_t& sleeping, unsigned count) -> unsigned { + auto signedCount = static_cast<int64_t>(count); + + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + return getSleeperCountToWakeupEmper(sleeping, signedCount); + } else { + return getSleeperCountToWakeupAnywhere(sleeping, signedCount); + } + } + + template <CallerEnvironment callerEnvironment> + void notify(emper::FiberHint hint, unsigned count) { + stats.incOnNewWork(); + + int64_t sleeping = getSleeping(); + + const unsigned toWakeup = getSleeperCountToWakeup<callerEnvironment>(sleeping, count); + if (toWakeup) { + stats.incNotify(); + stats.addNotifications(toWakeup); + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + writeNotificationsFromEmper(hint, toWakeup); + } else { + writeNotificationsFromAnywhere(hint, toWakeup); + } + } + } + + template <CallerEnvironment callerEnvironment> + void notifySpecificInternal(workerid_t workerId) { + auto& specificState = sleepStates[workerId]; + + LOGD("Specifically notify worker " << workerId << " from " << callerEnvironment); + + auto oldState = specificState.exchange(SleeperState::Notified, std::memory_order_relaxed); + // We are responsible to wake the futex + if (oldState == SleeperState::Sleeping) { + LOGD(workerId << " was sleeping send notification"); + sendSpecificNotification(workerId); + } + } + + public: + void printStats(std::ostream& out = std::cout) { stats.print(out); } + + [[nodiscard]] inline auto getSleeping() const -> long { return sleepers.load(); } + + virtual void sleep() = 0; +}; +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/PipeSleepStrategy.cpp b/emper/sleep_strategy/PipeSleepStrategy.cpp index 3466425d2f3bbeb13f289157135d59fa24e78341..cf1d9bcb22c36c10bdbd0a7068f02c768bcdf5e2 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.cpp +++ b/emper/sleep_strategy/PipeSleepStrategy.cpp @@ -1,167 +1,33 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2021 Florian Fischer +// Copyright © 2021-2022 Florian Fischer #include "sleep_strategy/PipeSleepStrategy.hpp" -#include <liburing.h> - #include <atomic> -#include <cassert> +#include <cstdint> -#include "CallerEnvironment.hpp" -#include "Runtime.hpp" -#include "Worker.hpp" #include "io/IoContext.hpp" -#include "lib/TaggedPtr.hpp" using emper::io::IoContext; -using emper::lib::TaggedPtr; namespace emper::sleep_strategy { - -thread_local PipeSleepStrategy::Pipe PipeSleepStrategy::pipe; -thread_local PipeSleepStrategy::SleepState PipeSleepStrategy::sleepState; - -PipeSleepStrategy::PipeSleepStrategy(Runtime& runtime, workerid_t workerCount) - : workerCount(workerCount), stats(runtime) { - LOGD("init pipe sleep startegy"); - sleepers.store(0, std::memory_order_relaxed); - - sleepStates = new SleepState*[workerCount]; - pipes = new Pipe*[workerCount]; - - runtime.addNewWorkerHook([this](workerid_t workerId) { - sleepStates[workerId] = &sleepState; - pipes[workerId] = &pipe; - }); -} - -PipeSleepStrategy::~PipeSleepStrategy() { - delete[] sleepStates; - delete[] pipes; -} - void PipeSleepStrategy::sleep() { + LOGD("going to sleep"); IoContext& io = *IoContext::getWorkerIo(); - { - std::lock_guard<std::mutex> lock(sleepState.lock); - - // Check if we were notified specifically and should skip the sleep attempt - if (sleepState.isNotified()) { - sleepState.markRunning(); - LOGD("Reset notified state to running -> skip sleeping"); - return; + auto customSleepLogic = [this, &io] { + // increment the sleeper count if it was negative we should skip sleeping + int64_t sleeping = this->sleepers.fetch_add(1, std::memory_order_acquire); + if (sleeping < 0) { + LOGD("observed sleeper count as: " << sleeping << " -> skip sleeping"); + return false; } - - if (!sleepState.globalReadInflight) { - // increment the sleeper count if it was negative we should skip sleeping - int64_t sleeping = sleepers.fetch_add(1, std::memory_order_acquire); - if (sleeping < 0) { - LOGD("observed sleeper count as: " << sleeping << " -> skip sleeping"); - return; - } - assert(sleeping <= workerCount); - - sleepState.globalReadInflight = true; - - struct io_uring_sqe* sqe = io.getSqe(); - - // We read directly into the workers dispatchHint - io_uring_prep_read(sqe, global.sleepFd, &io.worker->dispatchHint, - sizeof(io.worker->dispatchHint), 0); - - // Mark the sqe as a new work notification to reset the Global flag when reaping the - // resulting cqe - io_uring_sqe_set_data( - sqe, TaggedPtr(0, static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification))); - - io.trackReqsInUring(1); - - LOGD("prepared global.sleepFd read and set sleepers count to: " << sleeping + 1); - } - - if (!sleepState.isSleeping()) { - sleepState.markSleeping(); - - struct io_uring_sqe* sqe = io.getSqe(); - - // We read directly into the workers dispatchHint - // TODO: Think about the race between the two reads - io_uring_prep_read(sqe, pipe.sleepFd, &io.worker->dispatchHint, - sizeof(io.worker->dispatchHint), 0); - - // Tag the sqe with a marked new work notification to reset the specific state - // when reaping the resulting cqe. - io_uring_sqe_set_data( - sqe, TaggedPtr((void*)nullptr, - static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification), true)); - - io.trackReqsInUring(1); - LOGD("prepared pipe.sleepFd read"); - } - } - - // Wait for IO completions - LOGD("sleep until IO completions occur"); - io.submitPreparedSqesAndWait<CallerEnvironment::EMPER>(1); - LOGD("wakeup due to available IO completions"); - - stats.incWakeup(); + prepareSleepRead(io, this->global.sleepFd); + LOGD("prepared sleepFd read and set sleepers count to: " << sleeping + 1); + return true; + }; + + const bool shouldSleep = prepareSleep(customSleepLogic); + if (!shouldSleep) return; + submitAndWaitForCompletionsIn(io); } - -template <CallerEnvironment callerEnvironment> -void PipeSleepStrategy::notifySpecific(workerid_t workerId) { - auto& specificState = *sleepStates[workerId]; - std::lock_guard<std::mutex> lock(specificState.lock); - - LOGD("Specifically notify worker " << workerId << " from " << callerEnvironment); - - if (specificState.isNotified()) { - LOGD(workerId << " already marked notified"); - return; - } - - const bool isSleeping = specificState.markNotified(); - - if (isSleeping) { - LOGD(workerId << " has specific read -> write notification"); - writeSpecificNotification(workerId); - } -} - -template void PipeSleepStrategy::notifySpecific<CallerEnvironment::EMPER>(workerid_t workerId); -template void PipeSleepStrategy::notifySpecific<CallerEnvironment::ANYWHERE>(workerid_t workerId); - -template <CallerEnvironment callerEnvironment> -void PipeSleepStrategy::onNewWorkNotification(IoContext& io, TaggedPtr data) { - if constexpr (callerEnvironment != CallerEnvironment::OWNER) { - DIE_MSG("Others reaping new work notification from " << io.worker->getWorkerId() << " CQ"); - } - - assert(data.getTag() == static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification)); - - if (data.isMarked()) { - std::lock_guard<std::mutex> lock(sleepState.lock); - LOGD("Got specific notification"); - stats.incWakeupDueToNotify(); - - // Reset specific and notified flag to indicate that a new specific notification - // was consumed, a new specific read must be prepared and other notifySpecific - // calls must notify again. - sleepState.markRunning(); - } else { - LOGD("Got new work notification"); - stats.incWakeupDueToNotify(); - // Reset global flag to indicate that a new sleep cqe must be prepared - // and allow the completer to reap completions again - sleepState.globalReadInflight = false; - } -} - -template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::OWNER>(IoContext& io, - TaggedPtr data); -template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::EMPER>(IoContext& io, - TaggedPtr data); -template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::ANYWHERE>(IoContext& io, - TaggedPtr data); } // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/PipeSleepStrategy.hpp b/emper/sleep_strategy/PipeSleepStrategy.hpp index ccce97be42b27e29c6a15842349f6d0dc11f7390..c1ab8c4fca0c21d873c8acaa8589264836d53086 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.hpp +++ b/emper/sleep_strategy/PipeSleepStrategy.hpp @@ -1,14 +1,10 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2021 Florian Fischer +// Copyright © 2021-2022 Florian Fischer #pragma once #include <unistd.h> -#include <algorithm> -#include <atomic> -#include <cstdint> #include <iostream> -#include <mutex> #include <vector> #include "CallerEnvironment.hpp" @@ -16,158 +12,19 @@ #include "Debug.hpp" #include "FiberHint.hpp" #include "emper-common.h" -#include "lib/TaggedPtr.hpp" +#include "sleep_strategy/AbstractIoSleepStrategy.hpp" #include "sleep_strategy/AbstractWorkerSleepStrategy.hpp" -#include "sleep_strategy/SleeperState.hpp" -#include "sleep_strategy/Stats.hpp" class Runtime; -namespace emper::io { -class IoContext; -} - namespace emper::sleep_strategy { /** * @brief A pipe(2) based sleep strategy combining the IO subsystem with the sleep strategy * * implement a pipe based sleep strategy using the IO subsystem - * - * Design goals - * ============ - * - * * Wakeup either on external newWork notifications or on local IO completions - * -> Sleep strategy is sound without the IO completer - * * Do as less as possible in a system saturated with work - * * Pass a hint where to find new work to suspended workers - * - * Algorithm - * ========= - * - *``` - * Data: - * Global: - * global pipe - * sleepers count - * Per worker: - * dispatch hint buffer - * specific pipe - * state lock - * sleep state - * - * Sleep: - * Lock state - * If state == notified - * Set state = running - * return - * - * If we have no global sleep request in flight - * Atomic increment sleep count - * Skip sleeping if sleep count was < 0 - * Mark sleep requests in flight - * Prepare read cqe from the global pipe to dispatch hint buffer - * - * If state == running - * Set state = sleeping - * Prepare marked read cqe from the specific pipe to dispatch hint buffer - * - * Unlock state - * Wait until IO completions occurred - * - * NotifyEmper(n): - * if observed sleepers <= 0 - * return - * - * // Determine how many we are responsible to wake - * do - * toWakeup = min(observed sleepers, n) - * while (!CAS(sleepers, toWakeup)) - * - * write toWakeup hints to the hint pipe - * - * NotifyAnywhere(n): - * // Ensure all n notifications take effect - * while (!CAS(sleepers, observed sleepers - n)) - * if observed sleeping <= -n - * return - * - * toWakeup = min(observed sleeping, n) - * write toWakeup hints to the hint pipe - * - * NotifySpecific(w): - * Get w's state - * Lock state - * Return if already notified - * Mark state notified - * Write hint if was stat was sleeping - * - * onNewWorkCompletion: - * If data is marked - * lock state - * set state = running - * return - * - * Reset global read inflight - *``` - * - * Notes - * ===== - * - * * We must decrement the sleepers count on the notifier side to - * prevent multiple notifiers to observe all the same amount of sleepers, - * trying to wake up the same sleepers by writing to the pipe and jamming it up - * with unconsumed hints and thus blocking in the notify write resulting - * in a deadlock. - * * The CAS loops on the notifier side are needed because decrementing - * and incrementing the excess is racy: Two notifier can observe the - * sum of both their excess decrement and increment to much resulting in a - * broken counter. - * * Add the dispatch hint code in AbstractWorkStealingScheduler::nextFiber. - * This allows workers to check the dispatch hint after there - * where no local work to execute. - * This is a trade-off where we trade slower wakeup - a just awoken worker - * will check for local work - against a faster dispatch hot path when - * we have work to do in our local WSQ. - * * Other thread must not reap new work notifications because this - * would introduces a race for cqes and a possible - * lost wakeup if the other consumes the completions before the worker - * is actually waiting for them. - * * When notifying sleeping workers from anywhere we must ensure that all - * notifications take effect. This is needed for example when terminating - * the runtime to prevent sleep attempt from worker thread which are - * about to sleep but have not incremented the sleeper count yet. - * We achieve this by always decrementing the sleeper count by the notification - * count. */ -class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, - public Logger<LogSubsystem::SLEEP_S> { - friend class emper::io::IoContext; - - /** - * @brief State of a worker - */ - class SleepState { - friend class PipeSleepStrategy; - - bool globalReadInflight = false; - std::mutex lock; - emper::sleep_strategy::SleeperState s = emper::sleep_strategy::SleeperState::Running; - - auto markNotified() -> bool { - auto oldS = s; - s = emper::sleep_strategy::SleeperState::Notified; - return oldS == emper::sleep_strategy::SleeperState::Sleeping; - } - void markSleeping() { s = emper::sleep_strategy::SleeperState::Sleeping; } - void markRunning() { s = emper::sleep_strategy::SleeperState::Running; } - - auto isNotified() const -> bool { return s == emper::sleep_strategy::SleeperState::Notified; } - auto isSleeping() const -> bool { return s == emper::sleep_strategy::SleeperState::Sleeping; } - }; - - SleepState** sleepStates; - static thread_local SleepState sleepState; - +class PipeSleepStrategy : public AbstractWorkerSleepStrategy<PipeSleepStrategy>, + public AbstractIoSleepStrategy { class Pipe { friend class PipeSleepStrategy; int sleepFd; @@ -186,23 +43,9 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, } }; - const workerid_t workerCount; Pipe global; - /** - * @brief Per worker pipe to notify specific workers - */ - Pipe** pipes; - static thread_local Pipe pipe; - - Stats stats; - - // Make sure the shared counter lives in an exlusive cache line - CACHE_LINE_EXCLUSIVE(std::atomic<int64_t>, sleepers); - - void writeNotifications(emper::FiberHint hint, int64_t count) { - stats.addNotifications(count); - + void writeNotifications(emper::FiberHint hint, unsigned count) { std::vector<void*> hints(count, hint); ssize_t res = write(global.notifyFd, hints.data(), sizeof(void*) * hints.size()); @@ -211,113 +54,18 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, } } - void writeSpecificNotification(workerid_t workerId) { - stats.addNotifications(1); - - const void* b = nullptr; - ssize_t res = write(pipes[workerId]->notifyFd, &b, sizeof(b)); - if (unlikely(res) < 0) { - DIE_MSG_ERRNO("writing to the notifyFd of worker " << workerId << " failed"); - } + void writeNotificationsFromEmper(emper::FiberHint hint, unsigned count) override { + writeNotifications(hint, count); } - - void notifyFromEmper(int64_t& sleeping, emper::FiberHint hint, int64_t count) { - int64_t toWakeup; - - // We need to decrement the sleeper count on the notification side - // to prevent overcommiting notifications, filling up the pipe which results - // in notify calls to block on write. - // And the CAS loop is needed because we don't want to decrement the sleepers - // count to much. - - // Decrement the sleeper count for each sleeper we are going to wakeup - do { - // No one is sleeping -> we don't need to notify - if (sleeping <= 0) { - return; - } - - // Count of sleepers we notify - toWakeup = std::min(sleeping, count); - } while (!sleepers.compare_exchange_weak(sleeping, sleeping - toWakeup, - std::memory_order_release, std::memory_order_acquire)); - - // increment the notify stat if we actually have to notify someone - stats.incNotify(); - writeNotifications(hint, toWakeup); - - LOGD("notifyFromEmper written " - << toWakeup << " notifications and set sleepers count to: " << sleeping - toWakeup); - } - - void notifyFromAnywhere(int64_t& sleeping, emper::FiberHint hint, int64_t count) { - // If we observe nobody sleeping we need to prevent sleep locks when - // everybody is about to sleep but none has incremented the sleepers count yet. - // We prevent the next sleep from blocking by setting the sleeper count to - // the negative amount we want to prevent from sleeping. - - // More general if we notify from anywhere we need all notifications to take - // effect. Only notifying the sleepers we observe may not be enough. - // For example when terminating the runtime we notify all workers if we - // only wakeup the ones we observe sleeping we may never terminate because - // after we notified all we observed new one went to sleep which will never - // be notified. - // To prevent this we make sure that all count notifications take effect - // by making the sleepers count negative and thus preventing at least count - // sleep attempts. - - int64_t newSleeping; - do { - // We already prevent enough sleep attempts - if (sleeping <= -count) { - LOGD("notifyFromAnywhere sleeper count already preventing enough sleep attempts"); - return; - } - // Don't decrease the sleeper count in the CAS loop further than -count, - // which is the threshold we need to ensure that the notifications will be - // observed. - // Decreasing it further than this threshold is not faulty it just - // results in unnecessary skipped sleeps. - newSleeping = std::max(-count, sleeping - count); - } while (!sleepers.compare_exchange_weak(sleeping, newSleeping, std::memory_order_release, - std::memory_order_acquire)); - - stats.incNotify(); - - // To wakeup should not be negative but could be if sleeping was already negative - int64_t toWakeup = std::max(std::min(sleeping, count), 0L); - if (toWakeup > 0) { - writeNotifications(hint, toWakeup); - } - - LOGD("notifyFromAnywhere written " - << toWakeup << " notifications and set sleepers count to: " << newSleeping); - } - - template <CallerEnvironment callerEnvironment> - void notify(emper::FiberHint hint, uint32_t count) { - // The hint must be != nullptr so sleep() knows when to prepare and submit - // a sleepFd read sqe. - - stats.incOnNewWork(); - - int64_t sleeping = getSleeping(); - auto signedCount = static_cast<int64_t>(count); - - if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { - notifyFromAnywhere(sleeping, hint, signedCount); - } else { - notifyFromEmper(sleeping, hint, signedCount); - } + void writeNotificationsFromAnywhere(emper::FiberHint hint, unsigned count) override { + writeNotifications(hint, count); } public: - PipeSleepStrategy(Runtime& runtime, workerid_t workerCount); - ~PipeSleepStrategy(); + PipeSleepStrategy(Runtime& runtime, workerid_t workerCount) + : AbstractIoSleepStrategy(runtime, workerCount) {} - void printStats(std::ostream& out = std::cout) { stats.print(out); } - - [[nodiscard]] inline auto getSleeping() const -> long { return sleepers.load(); } + void printStats(std::ostream& out = std::cout) { AbstractIoSleepStrategy::printStats(out); } template <CallerEnvironment callerEnvironment> inline void notifyOne() { @@ -336,10 +84,10 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, } template <CallerEnvironment callerEnvironment> - void notifySpecific(workerid_t workerId); + inline void notifySpecific(workerid_t workerId) { + notifySpecificInternal<callerEnvironment>(workerId); + } - template <CallerEnvironment callerEnvironment> - void onNewWorkNotification(emper::io::IoContext& io, emper::lib::TaggedPtr data); - void sleep(); + void sleep() override; }; } // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/WaitfdSleepStrategy.cpp b/emper/sleep_strategy/WaitfdSleepStrategy.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e75a806efd2dfa346ac8985854aea9d3a9d5cccd --- /dev/null +++ b/emper/sleep_strategy/WaitfdSleepStrategy.cpp @@ -0,0 +1,79 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "sleep_strategy/WaitfdSleepStrategy.hpp" + +#include <liburing.h> +#include <liburing/io_uring.h> +#include <syscall.h> + +#include "Emper.hpp" +#include "FiberSource.hpp" +#include "Runtime.hpp" +#include "io/IoContext.hpp" + +using emper::io::IoContext; + +#ifndef IOURING_REGISTER_WAITFD +static const unsigned IOURING_REGISTER_WAITFD = []() -> unsigned { + if (EMPER_LINUX_GE("5.19")) return 24; + if (EMPER_LINUX_GE("5.18")) return 22; + return 20; +}(); +#endif + +struct io_uring_waitfd_registration { + void* hint; + int fd; +}; + +namespace emper::sleep_strategy { + +uint32_t WaitfdSleepStrategy::WAITFD_NOTIFY_AND_PREVENT_SLEEP = 1; + +WaitfdSleepStrategy::WaitfdSleepStrategy(Runtime& runtime, workerid_t workerCount, + unsigned waitfdFlags) + : AbstractIoSleepStrategy(runtime, workerCount), waitfdFlags(waitfdFlags) { + long res = syscall(__NR_waitfd, &sleepers, waitfdFlags); + if (res < 0) { + DIE_MSG_ERRNO("creating waitfd failed"); + } + waitfd = static_cast<int>(res); + + if constexpr (emper::IO_NOTIFICATION) { + auto registerWaitFd = [this](workerid_t workerId) { + IoContext& io = *IoContext::getWorkerIo(); + emper::FiberHint hint(workerId, emper::FiberSource::hintIo); + struct io_uring_waitfd_registration reg { + hint, this->waitfd + }; + long ret = syscall(__NR_io_uring_register, io.ring.ring_fd, IOURING_REGISTER_WAITFD, ®, + sizeof(reg)); + if (ret) { + DIE_MSG_ERRNO("failed to register waitfd for worker " << workerId); + } + }; + runtime.addNewWorkerHook(registerWaitFd); + } +} + +WaitfdSleepStrategy::~WaitfdSleepStrategy() { close(waitfd); } + +void WaitfdSleepStrategy::sleep() { + IoContext& io = *IoContext::getWorkerIo(); + + auto customLogic = [this, &io] { + // waitfd non-blocking reads will always return -EAGAIN and waitfd is not pollable + // therefore we can skip that and immediately issue the read in an async manner. + uint8_t sqe_flags = 0; + if constexpr (emper::IO_PUNT_WAITFD_READ) sqe_flags |= IOSQE_ASYNC; + prepareSleepRead(io, waitfd, sqe_flags); + LOGD("prepared waitfd read"); + return true; + }; + const bool shouldSleep = prepareSleep(customLogic); + + if (!shouldSleep) return; + + submitAndWaitForCompletionsIn(io); +} +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/WaitfdSleepStrategy.hpp b/emper/sleep_strategy/WaitfdSleepStrategy.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b6fc7befaa80301fabe4c77b34607ebe79369a49 --- /dev/null +++ b/emper/sleep_strategy/WaitfdSleepStrategy.hpp @@ -0,0 +1,151 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#include <unistd.h> + +#include <algorithm> +#include <cstdint> +#include <cstdlib> +#include <iostream> +#include <string> + +#include "CallerEnvironment.hpp" +#include "Common.hpp" +#include "Debug.hpp" +#include "FiberHint.hpp" +#include "emper-common.h" +#include "emper-config.h" // IWYU pragma: keep +#include "lib/LinuxVersion.hpp" +#include "sleep_strategy/AbstractIoSleepStrategy.hpp" +#include "sleep_strategy/AbstractWorkerSleepStrategy.hpp" + +class Runtime; + +#ifndef __NR_waitfd +static const unsigned __NR_waitfd = []() -> unsigned { + if (EMPER_LINUX_GE("5.17")) + return 451; + else if (EMPER_LINUX_GE("5.16")) + return 450; + else + return 449; +}(); +#endif + +namespace emper::sleep_strategy { + +class WaitfdSleepStrategy : public AbstractWorkerSleepStrategy<WaitfdSleepStrategy>, + public AbstractIoSleepStrategy { + int waitfd; + + enum class WaitfdFlags { + none = 0, + accounting = 1, + waitFree = 2, + bestEffort = 4, + }; + + const unsigned waitfdFlags; + + struct WaitfdNotification { + void* d; + uint32_t flags; + uint32_t count; + }; + + static auto getDefaultWaitfdFlags() -> unsigned { + unsigned flags = 0; + flags = static_cast<unsigned>(WaitfdFlags::EMPER_WAITFD_FLAGS); + + const auto* flagsEnvVal = std::getenv("EMPER_WAITFD_FLAGS"); + if (flagsEnvVal) { + const std::string envValStr(flagsEnvVal); + if (envValStr == "accounting") + flags = static_cast<unsigned>(WaitfdFlags::accounting); + else if (envValStr == "wait-free") + flags = static_cast<unsigned>(WaitfdFlags::waitFree); + else if (envValStr == "best-effort") + flags = static_cast<unsigned>(WaitfdFlags::bestEffort); + else + DIE_MSG("Unknown waitfd flags specified in environment: " << envValStr); + } + + return flags; + } + + void writeNotifications(emper::FiberHint hint, uint32_t flags, int32_t count) { + struct WaitfdNotification notification = {hint, flags, static_cast<uint32_t>(count)}; + + LOGD("write: {" << notification.d << ", " << notification.flags << ", " << notification.count + << "}"); + + ssize_t res = write(waitfd, reinterpret_cast<void*>(¬ification), sizeof(notification)); + if (unlikely(res) < 0) { + DIE_MSG_ERRNO("writing to waitfd failed"); + } + } + + static uint32_t WAITFD_NOTIFY_AND_PREVENT_SLEEP; + + void writeNotificationsFromEmper(emper::FiberHint hint, unsigned count) override { + writeNotifications(hint, 0, count); + } + + void writeNotificationsFromAnywhere(emper::FiberHint hint, unsigned count) override { + writeNotifications(hint, WAITFD_NOTIFY_AND_PREVENT_SLEEP, count); + } + + // If the waitfd implementation does the sleeper accounting in kernel space + // we must not decrement the sleeper count in userspace. + auto getSleeperCountToWakeupEmper(int64_t& sleeping, int64_t count) -> uint64_t override { + if (waitfdFlags != static_cast<unsigned>(WaitfdFlags::accounting)) + return AbstractIoSleepStrategy::getSleeperCountToWakeupEmper(sleeping, count); + + const int64_t toWakeup = std::min(sleeping, count); + LOGD("notifyFromEmper writes " << toWakeup << " notifications"); + return toWakeup > 0 ? static_cast<uint64_t>(toWakeup) : 0; + } + + auto getSleeperCountToWakeupAnywhere(int64_t& sleeping, int64_t count) -> uint64_t override { + if (waitfdFlags != static_cast<unsigned>(WaitfdFlags::accounting)) + return AbstractIoSleepStrategy::getSleeperCountToWakeupAnywhere(sleeping, count); + + if (sleeping <= -count) { + LOGD("notifyFromAnywhere sleeper count already preventing enough sleep attempts"); + return 0; + } + LOGD("notifyFromAnywhere writes " << count << " notifications"); + return static_cast<uint64_t>(count); + } + + public: + WaitfdSleepStrategy(Runtime& runtime, workerid_t workerCount, + unsigned waitfdFlags = getDefaultWaitfdFlags()); + ~WaitfdSleepStrategy(); + + template <CallerEnvironment callerEnvironment> + inline void notifyOne() { + notifyMany<callerEnvironment>(1); + } + + template <CallerEnvironment callerEnvironment> + inline void notifyMany(unsigned count) { + if (count > 1) LOGD("notify(hint, " << count << ")"); + notify<callerEnvironment>(emper::FiberHint::createNewWorkHint<callerEnvironment>(), count); + } + + template <CallerEnvironment callerEnvironment> + inline void notifyAll() { + notifyMany<callerEnvironment>(workerCount); + } + + template <CallerEnvironment callerEnvironment> + inline void notifySpecific(ATTR_UNUSED workerid_t workerId) { + notifySpecificInternal<callerEnvironment>(workerId); + } + + void printStats(std::ostream& out = std::cout) { AbstractIoSleepStrategy::printStats(out); } + void sleep() override; +}; +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/WorkerSleepStrategy.hpp b/emper/sleep_strategy/WorkerSleepStrategy.hpp index e97af5480111f852c75856091ed4e1052af808f5..4bbca54dc5728940ead2c7b4d0ace058103da13f 100644 --- a/emper/sleep_strategy/WorkerSleepStrategy.hpp +++ b/emper/sleep_strategy/WorkerSleepStrategy.hpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2021 Florian Fischer +// Copyright © 2021-2022 Florian Fischer #pragma once #ifdef EMPER_SEMAPHORE_SLEEP_STRATEGY @@ -8,6 +8,9 @@ #elif defined EMPER_PIPE_SLEEP_STRATEGY #include "sleep_strategy/PipeSleepStrategy.hpp" +#elif defined EMPER_WAITFD_SLEEP_STRATEGY +#include "sleep_strategy/WaitfdSleepStrategy.hpp" + #else #error Unknown WorkerSleepStrategy implementation #endif @@ -19,5 +22,8 @@ using WorkerSleepStrategy = SemaphoreWorkerSleepStrategy; #elif defined EMPER_PIPE_SLEEP_STRATEGY using WorkerSleepStrategy = PipeSleepStrategy; + +#elif defined EMPER_WAITFD_SLEEP_STRATEGY +using WorkerSleepStrategy = WaitfdSleepStrategy; #endif } // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/meson.build b/emper/sleep_strategy/meson.build index 819c54129548a7530f3ff01a7286cd635ee1c7f4..20f417642c22f8c1bc154eefa8dc1879a086932e 100644 --- a/emper/sleep_strategy/meson.build +++ b/emper/sleep_strategy/meson.build @@ -1,6 +1,8 @@ emper_cpp_sources += files( + 'AbstractIoSleepStrategy.cpp', 'PipeSleepStrategy.cpp', 'SemaphoreWorkerSleepStrategy.cpp', 'Stats.cpp', + 'WaitfdSleepStrategy.cpp', 'WorkerStats.cpp', ) diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index 6045e06df12f4cd9f1f08e58bd178885e05d3702..3607a1b7b8e20fa49c608fd3ea5e9af09dbd0946 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -7,6 +7,7 @@ #include <cassert> #include <cstdint> #include <ostream> // for operator<<, basic_ostream<>::__ostream_type +#include <utility> #include <vector> #include "AbstractFiber.hpp" @@ -149,11 +150,33 @@ auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() -> std::optional return std::nullopt; } -auto AbstractWorkStealingScheduler::tryStealFiberFrom(workerid_t victim) +auto AbstractWorkStealingScheduler::tryStealIoCompletionFrom(workerid_t victim) + -> std::optional<NextFiberResult> { + auto* victimIo = runtime.ioContexts[victim]; + Fiber* fiber; + + if constexpr (emper::IO_STEAL_ALL_ON_NOTIFICATION) { + auto res = victimIo->reapAndScheduleAllReturnFirst<CallerEnvironment::EMPER>(); + fiber = res.first; + emper::statsAdd(awss::stats.nextIoFiberStolen, res.second); + } else { + fiber = victimIo->reapSingleCompletion<CallerEnvironment::EMPER>(); + } + + if (fiber) { + emper::statsIncr(awss::stats.nextIoFiberStolen); + return NextFiberResult{fiber, emper::FiberSource::ioStolen}; + } + + return std::nullopt; +} + +auto AbstractWorkStealingScheduler::tryStealWorkFrom(workerid_t victim) -> std::optional<NextFiberResult> { constexpr int maxRetries = emper::WAITFREE_WORK_STEALING ? 0 : -1; AbstractFiber* fiber; popTop: + StealingResult res = queues[victim]->popTop<maxRetries>(&fiber); if (res == StealingResult::Stolen) { emper::statsIncr(awss::stats.nextFiberStolen); @@ -163,13 +186,16 @@ popTop: return NextFiberResult{fiber, emper::FiberSource::stolen}; } + return std::nullopt; +} + +auto AbstractWorkStealingScheduler::tryStealFiberFrom(workerid_t victim) + -> std::optional<NextFiberResult> { + auto result = tryStealWorkFrom(victim); + if (result) return result; + if constexpr (emper::IO_STEALING) { - auto* victimIo = runtime.ioContexts[victim]; - fiber = victimIo->reapSingleCompletion<CallerEnvironment::EMPER>(); - if (fiber) { - emper::statsIncr(awss::stats.nextIoFiberStolen); - return NextFiberResult{fiber, emper::FiberSource::ioStolen}; - } + return tryStealIoCompletionFrom(victim); } return std::nullopt; @@ -204,7 +230,7 @@ popBottom: switch (source) { case emper::FiberSource::hintWsq: { const auto victim = dispatchHint.getWorker(); - const auto stolen = tryStealFiberFrom(victim); + const auto stolen = tryStealWorkFrom(victim); if (stolen) { emper::statsIncr(awss::stats.nextFiberFromHintLocal); onWorkStolen(); @@ -220,6 +246,15 @@ popBottom: } break; } + case emper::FiberSource::hintIo: { + const auto victim = dispatchHint.getWorker(); + const auto stolenIo = tryStealIoCompletionFrom(victim); + if (stolenIo) { + emper::statsIncr(awss::stats.nextFiberFromHintIo); + return NextFiberResult{(*stolenIo).fiber, emper::FiberSource::hintIo}; + } + break; + } default: DIE_MSG("invalid dispatch hint"); } diff --git a/emper/strategies/AbstractWorkStealingScheduler.hpp b/emper/strategies/AbstractWorkStealingScheduler.hpp index 9afead16bce5c3356b5b20bcad64c5363b6b6a99..face284fef3fe3996085267b56c0ebaa0640487b 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.hpp +++ b/emper/strategies/AbstractWorkStealingScheduler.hpp @@ -26,6 +26,8 @@ class AbstractWorkStealingScheduler : public Scheduler { private: auto nextFiberViaAnywhereQueue() -> std::optional<NextFiberResult>; + auto tryStealWorkFrom(workerid_t victim) -> std::optional<NextFiberResult>; + auto tryStealIoCompletionFrom(workerid_t victim) -> std::optional<NextFiberResult>; auto tryStealFiberFrom(workerid_t victim) -> std::optional<NextFiberResult>; bool queueFullWarningEmitted = false; diff --git a/emper/strategies/AbstractWorkStealingStats.cpp b/emper/strategies/AbstractWorkStealingStats.cpp index af7b74ee83e34238c09833be21638f993d4c2428..0fb4f536fa89566e2dbb282738dc4854e412c3ef 100644 --- a/emper/strategies/AbstractWorkStealingStats.cpp +++ b/emper/strategies/AbstractWorkStealingStats.cpp @@ -31,6 +31,8 @@ void AbstractWorkStealingStats::print(std::ostream& out) { << std::to_string(comulatedWorkerStats.nextFiberFromHintLocal) << std::endl << "total-next-fiber-hint-anywherequeue: " << std::to_string(comulatedWorkerStats.nextFiberFromHintAnywhere) << std::endl + << "total-next-fiber-hint-io: " << std::to_string(comulatedWorkerStats.nextFiberFromHintIo) + << std::endl << "total-next-fiber-stolen: " << std::to_string(comulatedWorkerStats.nextFiberStolen) << std::endl << "total-next-io-fiber-stolen: " << std::to_string(comulatedWorkerStats.nextIoFiberStolen) diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.cpp b/emper/strategies/AbstractWorkStealingWorkerStats.cpp index ca3ef250b10adbb8a38dd09e9bf851b990b8c9ae..44b62d32da90816abc8646c67fb3b1ff7c8e4723 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.cpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.cpp @@ -13,6 +13,7 @@ auto AbstractWorkStealingWorkerStats::operator+=(const AbstractWorkStealingWorke nextFiberFromLocal += other.nextFiberFromLocal; nextFiberFromHintLocal += other.nextFiberFromHintLocal; nextFiberFromHintAnywhere += other.nextFiberFromHintAnywhere; + nextFiberFromHintIo += other.nextFiberFromHintIo; nextFiberStolen += other.nextFiberStolen; nextIoFiberStolen += other.nextIoFiberStolen; nextFiberFromAnywhereQueue += other.nextFiberFromAnywhereQueue; diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.hpp b/emper/strategies/AbstractWorkStealingWorkerStats.hpp index e8e25bef9e692d550ec21a722cbf23adbb67d835..3616c374626db55b96417d2a3891e44fa3fe0929 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.hpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.hpp @@ -20,6 +20,7 @@ class AbstractWorkStealingWorkerStats { uint64_t nextFiberFromLocal = 0; uint64_t nextFiberFromHintLocal = 0; uint64_t nextFiberFromHintAnywhere = 0; + uint64_t nextFiberFromHintIo = 0; uint64_t nextFiberStolen = 0; uint64_t nextIoFiberStolen = 0; uint64_t nextFiberFromAnywhereQueue = 0; diff --git a/iwyu-mappings.imp b/iwyu-mappings.imp index 1b49c3112403a171cceceb20bdd69dcbe71ddcd0..51227cf4d2982cb99a27ed89ffd699e53bb9e6b5 100644 --- a/iwyu-mappings.imp +++ b/iwyu-mappings.imp @@ -1,4 +1,5 @@ [ + { include: ["<bits/chrono.h>", "private", "<chrono>", "public"]}, { include: ["<bits/getopt_core.h>", "private", "<unistd.h>", "public"] }, { include: ["@<gtest/.*>", "private", "<gtest/gtest.h>", "public"] }, { include: ["<urcu/map/urcu-memb.h>", "private", "<urcu.h>", "public"] }, diff --git a/meson.build b/meson.build index e97c319626408057f88c4a3fb01fe9d685bcf705..f46889df1acb80bfa1c319c4f3a9c87f8a8b4374 100644 --- a/meson.build +++ b/meson.build @@ -113,6 +113,7 @@ conf_data.set('EMPER_STATS_STACK_USAGE', stats_stack_usage) conf_data.set('EMPER_STATS_WORKER_SLEEP', stats_worker_sleep) conf_data.set('EMPER_STATS_BLOCKED_CONTEXT', stats_blocked_context) conf_data.set('EMPER_STATS_BLOCKED_CONTEXT_COUNT', stats_blocked_context_count) +conf_data.set('EMPER_WAITFD_FLAGS', get_option('waitfd_flags')) context_alignment = get_option('context_alignment') if context_alignment == 'none' @@ -142,14 +143,9 @@ endif semaphore_impl = get_option('wakeup_semaphore_implementation') conf_data.set('EMPER_' + semaphore_impl.to_upper() + '_WAKEUP_SEMAPHORE', true) -sleep_stratey = get_option('worker_sleep_strategy') -if sleep_stratey == 'semaphore' - conf_data.set('EMPER_SEMAPHORE_SLEEP_STRATEGY', true) -elif sleep_stratey == 'pipe' - conf_data.set('EMPER_PIPE_SLEEP_STRATEGY', true) -else - error('Unsupported sleep strategy') -endif +sleep_strategy = get_option('worker_sleep_strategy') +sleep_strategy_var_name = sleep_strategy.to_upper().replace('-', '_') +conf_data.set('EMPER_' + sleep_strategy_var_name + '_SLEEP_STRATEGY', true) locked_unbounded_queue_impl = get_option('locked_unbounded_queue_implementation') if locked_unbounded_queue_impl == 'boost_shared_mutex' @@ -195,7 +191,13 @@ io_bool_options = [ {'option': 'try_syscall'}, {'option': 'waitfree_stealing', 'dependencies': {'io_stealing': true, 'io_lockless_cq': true}}, + {'option': 'notification', + 'dependencies': {'worker_sleep_strategy': 'waitfd'}}, + {'option': 'steal_all_on_notification', + 'dependencies': {'io_notification': true}}, {'option': 'synchronous'}, + {'option': 'skip_waitfd_read_punt', + 'dependencies': {'worker_sleep_strategy': 'waitfd'}}, ] io_raw_options = [ diff --git a/meson_options.txt b/meson_options.txt index 3ec221d6eb1660399f3709ca6a9b1869870bf6bc..c83d0c5b777f1b1f649ad73adc885481ab033156 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -63,9 +63,22 @@ option( 'worker_sleep_strategy', type: 'combo', description: 'The used algorithm to suspend and wakeup workers', - choices: ['semaphore', 'pipe'], + choices: ['semaphore', 'pipe', 'waitfd'], value: 'semaphore', ) +option( + 'waitfd_flags', + type: 'combo', + description: 'The flags of the used waitfd', + choices: ['none', 'accounting', 'waitFree', 'bestEffort'], + value: 'none', +) +option( + 'io_skip_waitfd_read_punt', + type: 'boolean', + description: 'Skip punting the sleep waitfd read to the io_wq', + value: false, +) option( 'wakeup_semaphore_implementation', type: 'combo', @@ -314,6 +327,18 @@ option( description: 'Use synchronous blocking systemcalls instead of the emper::io subsystem', value: false, ) +option( + 'io_notification', + type: 'boolean', + description: 'Let the kernel notify suspended EMPER workers about new IO completions', + value: false, +) +option( + 'io_steal_all_on_notification', + type: 'boolean', + description: 'Reap all cqes when notified about available completions', + value: false, +) option( 'continuation_stealing_mode', type: 'combo', diff --git a/tests/ScheduleOnTest.cpp b/tests/ScheduleOnTest.cpp index 6b7f5ceb69e095d42a84103bca27e1dd6805e7d4..3ed34f19bf7771443f00f80de2c4664d40c3a197 100644 --- a/tests/ScheduleOnTest.cpp +++ b/tests/ScheduleOnTest.cpp @@ -8,10 +8,13 @@ #include "Fiber.hpp" #include "Runtime.hpp" #include "Worker.hpp" +#include "emper-config.h" #include "emper.hpp" #include "fixtures/assert.hpp" +#include "lib/LinuxVersion.hpp" // IWYU pragma: keep -static const unsigned ITERATIONS = 100; +// static const unsigned ITERATIONS = 100; +static const unsigned ITERATIONS = 2; static Runtime* runtime; static unsigned workerCount; unsigned iteration = 0; @@ -41,4 +44,10 @@ static void scheduleOnTest() { cps.wait(); } -void emperTest() { scheduleOnTest(); } +void emperTest() { +#ifndef EMPER_SEMAPHORE_SLEEP_STRATEGY + // io_uring supports IORING_OP_MSG_RING since Linux 5.18 + if (EMPER_LINUX_LT("5.18")) exit(77); +#endif + scheduleOnTest(); +}