diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index c2e62475ce4a500e518d726a9d54ee51aba73dd5..e50b47bdf6d04c968a2823c2645574bd9b88f519 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -223,6 +223,10 @@ clang-tidy: variables: EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "futex" +.futex2-wakeup-semaphore: + variables: + EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "futex2" + .locked-wakeup-semaphore: variables: EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "locked" @@ -318,10 +322,11 @@ test-worker-wakeup-strategy-all: - .test - .worker-wakeup-strategy-all -test-worker-wakeup-strategy-throttle: - extends: - - .test - - .emper-worker-wakeup-strategy-throttle +# Disable throttle test till throttle works with notifySpecific +#test-worker-wakeup-strategy-throttle: +# extends: +# - .test +# - .emper-worker-wakeup-strategy-throttle test-do-not-log-timestamp: extends: @@ -343,6 +348,12 @@ test-futex-wakeup-semaphore: - .test - .futex-wakeup-semaphore +# TODO: enable this if the CI has linux >= 5.16 +build-futex-wakeup-semaphore: + extends: + - .build + - .futex2-wakeup-semaphore + test-locked-wakeup-semaphore: extends: - .test diff --git a/emper/FiberHint.cpp b/emper/FiberHint.cpp new file mode 100644 index 0000000000000000000000000000000000000000..cd2401445e7da90afb573e331a357f1ba5968dab --- /dev/null +++ b/emper/FiberHint.cpp @@ -0,0 +1,23 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "FiberHint.hpp" + +#include "CallerEnvironment.hpp" +#include "Worker.hpp" + +namespace emper { +template <CallerEnvironment callerEnvironment> +auto FiberHint::createNewWorkHint() -> FiberHint { + if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) { + return {emper::FiberSource::hintAq}; + } + + return {Worker::getCurrentWorkerId(), emper::FiberSource::hintWsq}; +} +template auto FiberHint::createNewWorkHint<CallerEnvironment::EMPER>() -> FiberHint; +template auto FiberHint::createNewWorkHint<CallerEnvironment::ANYWHERE>() -> FiberHint; +} // namespace emper + +auto operator<<(std::ostream& os, const emper::FiberHint& hint) -> std::ostream& { + return os << "FiberHint{" << hint.getWorker() << ", " << hint.getSource() << "}"; +} diff --git a/emper/FiberHint.hpp b/emper/FiberHint.hpp new file mode 100644 index 0000000000000000000000000000000000000000..08859774badbc2c446615560a04edb7ebdc1d5e2 --- /dev/null +++ b/emper/FiberHint.hpp @@ -0,0 +1,55 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#include <cstdint> +#include <iostream> + +#include "CallerEnvironment.hpp" +#include "FiberSource.hpp" +#include "emper-common.h" +#include "lib/TaggedPtr.hpp" + +namespace emper { +/** + * @brief A descriptor where to find a fiber + * + * A FiberHint consists of a FiberSource combined with a workerId. + * Valid FiberSources for now are: Fiber::Source::hint{Wsq, Aq}. + * + * This is used during work-stealing to find work faster than stealing + * round-robin from a random starting point. + * The sleep strategies also use FiberHints to decide who and how to notify + * on new work. + */ +class FiberHint { + emper::lib::TaggedPtr _tptr; + + public: + FiberHint() : _tptr((nullptr)) {} + + FiberHint(emper::FiberSource source) : _tptr(0, static_cast<uint16_t>(source)) {} + FiberHint(workerid_t workerId, emper::FiberSource source) + : _tptr(static_cast<uintptr_t>(workerId), static_cast<uint16_t>(source)) {} + + FiberHint(const FiberHint& other) : _tptr(other._tptr){}; + + [[nodiscard]] auto getSource() const -> emper::FiberSource { + return static_cast<emper::FiberSource>(_tptr.getTag()); + } + + [[nodiscard]] auto getWorker() const -> workerid_t { + return static_cast<workerid_t>(_tptr.getRawPtrValue()); + } + + void clear() { _tptr = nullptr; } + + template <CallerEnvironment callerEnvironment> + [[nodiscard]] static auto createNewWorkHint() -> FiberHint; + + inline operator bool() const { return _tptr; } + inline operator void*() const { return _tptr; } +}; +} // namespace emper + +auto operator<<(std::ostream& os, const emper::FiberHint& hint) -> std::ostream&; diff --git a/emper/FiberSource.cpp b/emper/FiberSource.cpp new file mode 100644 index 0000000000000000000000000000000000000000..06844f74470731f5a4ad897e9db43a983a3868cd --- /dev/null +++ b/emper/FiberSource.cpp @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "FiberSource.hpp" + +#include <iostream> + +#include "Common.hpp" + +auto operator<<(std::ostream& os, const emper::FiberSource& fiberSource) -> std::ostream& { + switch (fiberSource) { + case emper::FiberSource::local: + return os << "local"; + case emper::FiberSource::mpscQueue: + return os << "mpscQueue"; + case emper::FiberSource::stolen: + return os << "stolen"; + case emper::FiberSource::io: + return os << "io"; + case emper::FiberSource::ioStolen: + return os << "ioStolen"; + case emper::FiberSource::anywhereQueue: + return os << "anywhereQueue"; + case emper::FiberSource::hintWsq: + return os << "hintWsq"; + case emper::FiberSource::hintAq: + return os << "hintAq"; + default: + DIE_MSG("Unknown FiberSource"); + } +} diff --git a/emper/FiberSource.hpp b/emper/FiberSource.hpp new file mode 100644 index 0000000000000000000000000000000000000000..75412f41d81fbca90683e33b60840d67cf4de1fe --- /dev/null +++ b/emper/FiberSource.hpp @@ -0,0 +1,28 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus, Florian Fischer +#pragma once + +#include <cstdint> +#include <iostream> + +namespace emper { +/** + * @brief Descriptor for all the possible locations a Fiber can be obtained from + * + * This enum is used to collect stats, create hints where fibers are and + * make informed notification or scheduling decisions. + */ +enum class FiberSource : uintptr_t { + local, /*!< A worker's own work-stealing queue */ + mpscQueue, /*!< A worker's own mpsc queue (inbox / priority) */ + stolen, /*!< A other worker's work-stealing queue */ + io, /*!< A worker's own io_uring completion queue */ + ioStolen, /*!< A other worker's io_uring completion queue */ + anywhereQueue, /*!< The anywhere queue */ + hintWsq, /*!< A known other worker's work-stealing queue */ + hintAq, /*!< Straight from the anywhere queue */ +}; + +} // namespace emper + +auto operator<<(std::ostream& os, const emper::FiberSource& fiberSource) -> std::ostream&; diff --git a/emper/NextFiberResult.hpp b/emper/NextFiberResult.hpp index 15ab40a3c353ab7ec9ca243feb50258b0a6b9172..9b927e14f95870acca971bdc3de0dcfc8350aa82 100644 --- a/emper/NextFiberResult.hpp +++ b/emper/NextFiberResult.hpp @@ -4,9 +4,11 @@ #include <cstdint> +#include "FiberSource.hpp" + class Fiber; struct NextFiberResult { Fiber* const fiber; - const uintptr_t metadata; + const emper::FiberSource source; }; diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index e9e9ae0347fb6475273b92510b403afdd66b7cde..b0bd380bf3c4c22d87fd3e6c519f6ddadba6e97d 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -25,6 +25,7 @@ #include "Debug.hpp" // for DBG, ABORT, LOGD, LOGE #include "Emper.hpp" #include "Fiber.hpp" // for Fiber +#include "FiberSource.hpp" #include "NextFiberResult.hpp" #include "RuntimeStrategy.hpp" // for RuntimeStrategy #include "RuntimeStrategyFactory.hpp" @@ -38,7 +39,6 @@ #include "log/LogBuffer.hpp" #include "stats/FromAnywhere.hpp" #include "stats/Worker.hpp" -#include "strategies/AbstractWorkStealingScheduler.hpp" #ifdef EMPER_DEFAULT_SCHEDULING_STRATEGY_WORK_STEALING #include "strategies/ws/WsStrategyFactory.hpp" @@ -357,7 +357,7 @@ void Runtime::yield() { }); } -auto Runtime::nextFiber() -> NextFiberResult { +auto Runtime::nextFiber() -> std::optional<NextFiberResult> { if constexpr (emper::IO_WORKER_URING) { // Schedule all fibers waiting on completed IO IoContext::ContinuationBuffer completions; @@ -368,9 +368,7 @@ auto Runtime::nextFiber() -> NextFiberResult { Fiber* next = completions[0]; schedule(&completions[1], ncompletions - 1); - // TODO: hint that this fiber comes from the IO subsystem - return NextFiberResult{ - next, static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::local)}; + return NextFiberResult{next, emper::FiberSource::io}; } } diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index c8a4bbadec6c95910bc1b100a6b8c43115af61af..8261c8d59f210291706550206d7b4fd4e25ef4eb 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -17,9 +17,11 @@ #include <vector> // for vector #include "CallerEnvironment.hpp" +#include "Common.hpp" #include "Context.hpp" #include "Debug.hpp" -#include "NextFiberResult.hpp" +#include "FiberHint.hpp" +#include "FiberSource.hpp" #include "Scheduler.hpp" // for Scheduler #include "WakeupStrategy.hpp" #include "Worker.hpp" @@ -34,6 +36,7 @@ class RuntimeBuilder; class ContextManager; class Dispatcher; class Fiber; +struct NextFiberResult; class RuntimeStrategy; class RuntimeStrategyFactory; class RuntimeStrategyStats; @@ -48,6 +51,9 @@ class IoContext; namespace log { class LogBuffer; } +namespace sleep_strategy { +class PipeSleepStrategy; +} namespace stats { class Worker; class FromAnywhere; @@ -132,11 +138,28 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { return workerSleepStrategy; } - template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - inline void wakeupSleepingWorkers() { - workerid_t wakeupCount = wakeupStrategy.getWakeupCount<callerEnvironment>(); - if (wakeupCount) { - workerSleepStrategy.notifyMany<callerEnvironment>(wakeupCount); + template <CallerEnvironment callerEnvironment> + inline void wakeupSleepingWorkers(emper::FiberHint hint) { + LOGD("Wake sleepers from " << callerEnvironment << " cause new work: " << hint); + switch (hint.getSource()) { + // We scheduled to an mpsc queue where the fiber lives exclusively -> wake up the + // specific worker if necessary + case emper::FiberSource::mpscQueue: { + workerSleepStrategy.notifySpecific<callerEnvironment>(hint.getWorker()); + } break; + + // We scheduled to a work-stealing or the anywhereQueue, where anyone can + // access this fiber -> wake up anyone if appropriate + case emper::FiberSource::local: + case emper::FiberSource::anywhereQueue: { + workerid_t wakeupCount = wakeupStrategy.getWakeupCount<callerEnvironment>(); + if (wakeupCount) { + workerSleepStrategy.notifyMany<callerEnvironment>(wakeupCount); + } + } break; + + default: + DIE_MSG("Unexpected FiberSource encountered during wakeupSleepingWorkers"); } } @@ -161,6 +184,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { } while (!canWake); } + auto nextFiber() -> std::optional<NextFiberResult>; + public: Runtime() : Runtime(getDefaultWorkerCount()) {} @@ -189,11 +214,16 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { scheduler.scheduleFromAnywhere(fibers, count); } + /** + * @brief Schedule a fiber on a specific worker + * + * For now scheduleOn is only available from within th EMPER runtime. + */ + inline void scheduleOn(Fiber& fiber, workerid_t workerId) { + scheduler.scheduleOn(fiber, workerId); + } void yield(); - // TODO: This should probably not be a public method of Runtime. - auto nextFiber() -> NextFiberResult; - // https://stackoverflow.com/a/3747462/194894 static inline auto rand() -> int { return Worker::rand(); } @@ -224,6 +254,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { void executeAndWait(std::function<void()> f); friend class AbstractWorkStealingScheduler; + friend class WsDispatcher; + friend class LawsDispatcher; template <LogSubsystem> friend class Blockable; friend RuntimeBuilder; @@ -236,6 +268,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { friend class MemoryManager; template <typename> friend class WorkerLocalData; + friend class emper::sleep_strategy::PipeSleepStrategy; friend void emper::log::log(const std::string& prefix, const std::string& message); }; diff --git a/emper/Scheduler.cpp b/emper/Scheduler.cpp index f04ca047bd022408bb9deeea2507b1cd634b8891..e9de80ef9353fefe6b62b0e8b5af2c32a992b2d3 100644 --- a/emper/Scheduler.cpp +++ b/emper/Scheduler.cpp @@ -2,7 +2,10 @@ // Copyright © 2020-2021 Florian Schmaus #include "Scheduler.hpp" +#include <stdexcept> + #include "CallerEnvironment.hpp" +#include "Common.hpp" #include "Runtime.hpp" #include "RuntimeStrategy.hpp" #include "WakeupStrategy.hpp" @@ -15,12 +18,23 @@ void Scheduler::addNewWorkerHook(const std::function<void(workerid_t)>& hook) { } template <CallerEnvironment callerEnvironment> -void Scheduler::wakeupSleepingWorkers() { - runtime.wakeupSleepingWorkers<callerEnvironment>(); +void Scheduler::wakeupSleepingWorkers(emper::FiberHint hint) { + runtime.wakeupSleepingWorkers<callerEnvironment>(hint); } -void Scheduler::notifyRuntimeAboutWorkStolen() { runtime.wakeupStrategy.onWorkStolen(); } - // show the compiler our template incarnations -template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::EMPER>(); -template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::ANYWHERE>(); +template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::EMPER>(emper::FiberHint hint); +template void Scheduler::wakeupSleepingWorkers<CallerEnvironment::ANYWHERE>(emper::FiberHint hint); + +void Scheduler::scheduleOn(Fiber& fiber, workerid_t workerId) { + // Calling scheduleOn() only works from within the EMPER runtime. + emper::assertInRuntime(); + + if (unlikely(workerId >= runtime.getWorkerCount())) + throw std::runtime_error("WorkerId to big for worker count"); + + LOGD("Scheduling fiber " << &fiber << " on worker " << workerId); + scheduleOnInternal(fiber, workerId); +} + +void Scheduler::notifyRuntimeAboutWorkStolen() { runtime.wakeupStrategy.onWorkStolen(); } diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index be9e9a0779b8ba8a8a2a48066212f32748448c91..57204e267655036ededee7b963fdf78132df6589 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -4,13 +4,15 @@ #include <cstddef> #include <functional> // for function +#include <optional> #include <ostream> #include "CallerEnvironment.hpp" #include "Debug.hpp" // for LogSubsystem, LogSubsystem::SCHED, Logger #include "Dispatcher.hpp" #include "Emper.hpp" -#include "Fiber.hpp" // for Fiber +#include "Fiber.hpp" +#include "FiberHint.hpp" #include "emper-common.h" // for workeraffinity_t #include "lib/adt/LockedUnboundedQueue.hpp" @@ -23,7 +25,7 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { lib::adt::LockedUnboundedQueue<Fiber> scheduleAnywhereQueue; template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - void wakeupSleepingWorkers(); + void wakeupSleepingWorkers(emper::FiberHint hint); void notifyRuntimeAboutWorkStolen(); @@ -43,10 +45,10 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { static inline void increaseRefCount(Fiber& fiber) { fiber.doAtomicIncrRefCount(); } - template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> - inline void onNewWork() { + template <CallerEnvironment callerEnvironment> + inline void onNewWork(emper::FiberHint hint) { if constexpr (emper::WORKER_SLEEP) { - wakeupSleepingWorkers<callerEnvironment>(); + wakeupSleepingWorkers<callerEnvironment>(hint); } } @@ -68,12 +70,13 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { } virtual void scheduleInternal(Fiber& fiber) = 0; + virtual void scheduleOnInternal(Fiber& fiber, workerid_t workerId) = 0; virtual void scheduleFromAnywhereInternal(Fiber& fiber) = 0; virtual void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) = 0; void recycle(Fiber* fiber) { dispatcher.recycle(fiber); }; - virtual auto nextFiber() -> NextFiberResult = 0; + virtual auto nextFiber() -> std::optional<NextFiberResult> = 0; friend class Runtime; @@ -98,6 +101,8 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { } } + void scheduleOn(Fiber& fiber, workerid_t workerId); + void scheduleFromAnywhere(Fiber& fiber) { LOGD("Scheduling fiber " << &fiber << " from anywhere"); scheduleFromAnywhereInternal(fiber); diff --git a/emper/Worker.hpp b/emper/Worker.hpp index a2930c82e5fbea70746964ddd38fdc915f84a4b2..d47cd89c5d84182e3658d9fa3fd279dfe28eabaf 100644 --- a/emper/Worker.hpp +++ b/emper/Worker.hpp @@ -5,8 +5,8 @@ #include <random> #include "Common.hpp" +#include "FiberHint.hpp" #include "emper-common.h" -#include "lib/TaggedPtr.hpp" class Runtime; @@ -15,8 +15,6 @@ class PipeSleepStrategy; } using emper::sleep_strategy::PipeSleepStrategy; -using emper::lib::TaggedPtr; - class Worker { private: static thread_local Worker* currentWorker; @@ -29,14 +27,13 @@ class Worker { const workerid_t workerId; - TaggedPtr dispatchHint; + emper::FiberHint dispatchHint; Worker(workerid_t workerId, workerid_t workerCount, unsigned int seed) : seed(seed), mtGenerator(seed), randomWorkerIdGenerator(0, workerCount - 1), - workerId(workerId), - dispatchHint(nullptr) {} + workerId(workerId) {} void setWorker(); diff --git a/emper/io/Future.cpp b/emper/io/Future.cpp index f07b3849416378c13e7192fe6f28dcb364ae01d2..329386c9781a3454dec9030e9b8651205c30a6d1 100644 --- a/emper/io/Future.cpp +++ b/emper/io/Future.cpp @@ -9,6 +9,8 @@ #include "BinaryPrivateSemaphore.hpp" // for BPS #include "CallerEnvironment.hpp" // for CallerEnvironment #include "Debug.hpp" // for LOGD, LOGW +#include "Fiber.hpp" +#include "Runtime.hpp" #include "Worker.hpp" #include "io/IoContext.hpp" #include "io/Stats.hpp" @@ -47,11 +49,7 @@ auto Future::_wait() -> int32_t { sem.wait(); - if constexpr (emper::DEBUG) { - markRetrieved(); - } - - return returnValue; + return get(); } auto Future::waitAndSetErrno() -> int32_t { @@ -99,7 +97,14 @@ auto Future::cancel() -> int32_t { // Only try to cancel Futures which have actually reached the io_uring if (isPrepared()) { CancelWrapper cancellation(*this); - const int32_t cancelResult = cancellation.submitAndWait(); + // We need to cancel on a different worker + if (!emper::IO_SINGLE_URING && submitter != IoContext::workerIo) { + submitter->runtime.scheduleOn(*Fiber::from([&cancellation] { cancellation.submit(); }), + submitter->worker->getWorkerId()); + } else { + cancellation.submit(); + } + const int32_t cancelResult = cancellation.wait(); if (callback) returnValue = cancelResult; } @@ -171,7 +176,11 @@ normal_submit: } void CancelWrapper::prepareSqeInternal(struct io_uring_sqe* sqe) { - void* user_data = IoContext::createFutureTag(*reinterpret_cast<Future*>(buf)); + auto& future = *reinterpret_cast<Future*>(buf); + if constexpr (!emper::IO_SINGLE_URING) { + assert(future.submitter == IoContext::workerIo); + } + void* user_data = IoContext::createFutureTag(future); io_uring_prep_cancel(sqe, user_data, 0); } } // namespace emper::io diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp index 5f7f0a9bb940813100877713ee8c611088830154..dd65483f066500be3b5d34ef22aa84658021a30d 100644 --- a/emper/io/Future.hpp +++ b/emper/io/Future.hpp @@ -31,6 +31,7 @@ class Fiber; namespace emper::io { class Stats; class SubmitActor; // IWYU pragma: keep +class IoContext; class FutureError : public std::logic_error { friend class Future; @@ -45,6 +46,7 @@ class Future : public Logger<LogSubsystem::IO> { friend class IoContext; friend class Stats; friend class SubmitActor; + friend class CancelWrapper; public: // User facing Callback type @@ -126,6 +128,8 @@ class Future : public Logger<LogSubsystem::IO> { return astate->load(std::memory_order_acquire); } + IoContext* submitter = nullptr; + /* IO operation to perform */ const Operation op; @@ -161,7 +165,8 @@ class Future : public Logger<LogSubsystem::IO> { */ CallbackInternal* callback = nullptr; - void prepareSqe(struct io_uring_sqe* sqe) { + void prepareSqe(IoContext& io, struct io_uring_sqe* sqe) { + submitter = &io; markPrepared(); prepareSqeInternal(sqe); } @@ -417,6 +422,26 @@ class Future : public Logger<LogSubsystem::IO> { return _wait(); } + /** + * @brief Get the return value of the Future + * + * @return return the result received from the io_uring + * + * Only call get() if you have ensured that the future was completed. + * By calling either wait() or cancel() beforehand. + * Get can be used to check the return value of Futures canceled as part of + * a Future chain. + */ + auto get() -> int32_t { + LOGD("get return value of " << this); + + if constexpr (emper::DEBUG) { + markRetrieved(); + } + + return returnValue; + } + /** * @brief Block till the IO request is completed and set errno on error * diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index 4f6d74649ba5e12f02744ceb2af9ccec45ec191f..56c4c1e92bfeaa601864474a241f4b66d006e4a4 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -55,7 +55,7 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns struct io_uring_sqe *sqe = getSqe(); - future.prepareSqe(sqe); + future.prepareSqe(*this, sqe); // remember the prepared sqe to invalidate it if it was not properly submitted preparedSqes.push_back(sqe); @@ -256,7 +256,7 @@ auto IoContext::tryReapCompletionWaitFree(Fiber *continuation) -> emper::Stealin if constexpr (callerEnvironment != CallerEnvironment::OWNER) { const TaggedPtr tptr(cqe_data); const auto tag = static_cast<PointerTags>(tptr.getTag()); - if (tag == PointerTags::NewWorkWsq || tag == PointerTags::NewWorkAq) { + if (tag == PointerTags::NewWorkNotification) { // don't consume the new work notification return emper::StealingResult::Empty; } @@ -312,7 +312,7 @@ auto IoContext::reapCompletionsLockless(Fiber **continuations) -> unsigned { if constexpr (callerEnvironment != CallerEnvironment::OWNER) { const TaggedPtr tptr(cqe_data); const auto tag = static_cast<PointerTags>(tptr.getTag()); - if (tag == PointerTags::NewWorkWsq || tag == PointerTags::NewWorkAq) { + if (tag == PointerTags::NewWorkNotification) { // don't consume the new work notification if (i == 0) return 0; // Since i starts at 0 using i as count is correct. @@ -362,6 +362,11 @@ template auto IoContext::reapCompletionsLockless<CallerEnvironment::ANYWHERE, IoContext::CQE_BATCH_COUNT>(Fiber **contiunations) -> unsigned; +#define UNLOCK_IF_NEEDED() \ + do { \ + if constexpr (needsCqLock) cq_lock.unlock(); \ + } while (false); + template <CallerEnvironment callerEnvironment, unsigned toReap> auto IoContext::reapCompletionsLocked(Fiber **continuations) -> unsigned { // TODO: should only the owner possibly rereap? @@ -383,29 +388,9 @@ reap_cqes: // Number of actual continuation fibers resulting from the reaped CQEs unsigned continuationsCount = 0; - // TODO: Is using a try lock and the waitInflight flag here even sound? - // Coudn't it be possible to have a lost wakeup with unconsumed new work notification - // cqe in our CQ - // - // State: - // Only a single worker does work involving IO and - // another (completer, io-stealing worker accesses its CQ. - - // Other Owner - // | submit IO - // | lock - // | prepare to sleep - // | set flag - // | unlock - // | sleep - // lock | - // | try lock unsucessfull - // | sleep again - // check flag | - // unlock | if constexpr (needsCqLock) { // The Owner always takes the lock to reap all completions and especially - // new work notifications and prevent the above discribed problem. + // new work notifications. if constexpr (callerEnvironment == CallerEnvironment::OWNER) { cq_lock.lock(); } else { @@ -414,36 +399,12 @@ reap_cqes: LOGD("unsuccessful try_lock from " << callerEnvironment); return 0; } - // We have to check the waitInflight flag with the cq_lock held to - // ensure we observe an update by the worker holding the lock. - // Otherwise this could happen: - // Other Owner - // | | - // | lock - // | prepare to sleep - // check flag | - // | set flag - // | unlock - // lock | - - // Which results in the Other possible consuming new work notifications. - - // We must not reap completions of this IoContext to not race - // with the sleeping owner. - if (waitInflight.load(std::memory_order_acquire)) { - LOGD("Not reaping worker " << std::to_string(worker->getWorkerId()) - << " since this worker is already waiting for its CQEs"); - cq_lock.unlock(); - return 0; - } } } unsigned count = io_uring_peek_batch_cqe(&ring, cqes, toReap); if (!count) { - if constexpr (needsCqLock) { - cq_lock.unlock(); - } + UNLOCK_IF_NEEDED() if constexpr (checkForRereap) { goto check_for_rereap; @@ -459,13 +420,19 @@ reap_cqes: auto &reapedCompletion = reapedCompletions[i]; reapedCompletion.first = cqe->res; reapedCompletion.second = cqe_data; + + // Do not reap newWorkNotifications if we are no the owner + if constexpr (callerEnvironment != CallerEnvironment::OWNER) { + const auto tag = static_cast<PointerTags>(TaggedPtr(cqe_data).getTag()); + if (tag == PointerTags::NewWorkNotification) { + UNLOCK_IF_NEEDED() + return 0; + } + } } io_uring_cq_advance(&ring, count); - - if constexpr (needsCqLock) { - cq_lock.unlock(); - } + UNLOCK_IF_NEEDED() LOGD("got " << count << " cqes from worker " << worker->getWorkerId() << "'s io_uring"); diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 4911503caaeb8b38971cc5c5765a3f44235cda73..53e2a05b5e9a6071d61e592927b6d42c622138af 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -99,13 +99,6 @@ class IoContext : public Logger<LogSubsystem::IO> { // This only happens if emper is build with the IO_SINGLE_URING option SubmitActor *submitter = nullptr; - // Flag to indicate that we already have a sleep related request in the io_uring. - // Gets set by the worker on WaitdfdSleepStrategy::sleep() and - // reset when reaping a completion containing a NewWork{Wsq,Aq} TaggedPtr. - // It also prevents the completer from reaping completions on this IoContext - // to prevent races between the worker and the completer. - std::atomic<bool> waitInflight = false; - Stats stats; // Members useful for debugging @@ -152,7 +145,7 @@ class IoContext : public Logger<LogSubsystem::IO> { template <CallerEnvironment callerEnvironment> void cancelUnsubmittedChainParts(Future &chain); - enum class PointerTags : uint16_t { Future, Callback, NewWorkAq, NewWorkWsq }; + enum class PointerTags : uint16_t { Future, Callback, NewWorkNotification }; [[nodiscard]] static auto createFutureTag(Future &future) -> emper::lib::TaggedPtr { if (future.callback) return {future.callback, static_cast<uint16_t>(PointerTags::Callback)}; @@ -219,7 +212,7 @@ class IoContext : public Logger<LogSubsystem::IO> { auto res = completion.first; auto *cqe_data = completion.second; - TaggedPtr tptr(cqe_data); + emper::lib::TaggedPtr tptr(cqe_data); // Got a CQE for a forgotten Future. if (!tptr) { continue; @@ -227,11 +220,10 @@ class IoContext : public Logger<LogSubsystem::IO> { auto tag = static_cast<PointerTags>(tptr.getTag()); switch (tag) { - case PointerTags::NewWorkWsq: - case PointerTags::NewWorkAq: { + case PointerTags::NewWorkNotification: { auto &sleepStrategy = reinterpret_cast<PipeSleepStrategy &>(runtime.getWorkerSleepStrategy()); - sleepStrategy.onNewWorkNotification<callerEnvironment>(*this); + sleepStrategy.onNewWorkNotification<callerEnvironment>(*this, tptr); break; } @@ -339,7 +331,7 @@ class IoContext : public Logger<LogSubsystem::IO> { * would be like the rest of IoContext independent of a global Runtime * reference * - * @return the either the worker's IoContext or the GlobalIoContext + * @return either the worker's IoContext or the GlobalIoContext */ template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> [[nodiscard]] static inline auto getIo() -> IoContext * { @@ -448,7 +440,7 @@ class IoContext : public Logger<LogSubsystem::IO> { "No need to reap more than there are entries in the CQ"); // never reap completions on the global IoContext - assert(this != reinterpret_cast<IoContext *>(Runtime::getRuntime()->globalIo)); + assert(this != reinterpret_cast<IoContext *>(runtime.globalIo)); LOGD("Reaping completions for worker " << std::to_string(worker->getWorkerId())); diff --git a/emper/io/Operation.cpp b/emper/io/Operation.cpp index e165a6187f3282dc93d6b4bee6be2730ce492825..93ea4e430414daf9c477e47990912e4880110e29 100644 --- a/emper/io/Operation.cpp +++ b/emper/io/Operation.cpp @@ -2,9 +2,10 @@ // Copyright © 2020-2021 Florian Fischer #include "io/Operation.hpp" -#include <cstdlib> // for abort #include <ostream> // for operator<<, stringstream, basi... +#include "Common.hpp" + namespace emper::io { auto operator<<(std::ostream& os, const Operation& op) -> std::ostream& { switch (op) { @@ -51,7 +52,7 @@ auto operator<<(std::ostream& os, const Operation& op) -> std::ostream& { os << "madvise"; break; default: - abort(); + DIE_MSG("unknown IO operation:" << (int)op); } return os; } diff --git a/emper/lib/LinuxVersion.cpp b/emper/lib/LinuxVersion.cpp index f268d050dc62f992be090bcad00b0896c26c1cdd..e4306b2f782391ac07db1b2ed1b5930f92635a47 100644 --- a/emper/lib/LinuxVersion.cpp +++ b/emper/lib/LinuxVersion.cpp @@ -2,7 +2,6 @@ // Copyright © 2021 Florian Fischer #include "lib/LinuxVersion.hpp" -#include <cassert> #include <cerrno> #include <cstdlib> @@ -47,7 +46,8 @@ auto LinuxVersion::compare(const std::string& s) -> int { dot_pos2 = s.size(); } else { dot_pos2 = s.find('.', last_dot_pos2); - assert(dot_pos2 != std::string::npos); + // The string to compare has less dot-separated components + if (dot_pos2 == std::string::npos) return 0; } long n1 = checked_strtol(this->version.substr(last_dot_pos, dot_pos - last_dot_pos)); diff --git a/emper/lib/TaggedPtr.hpp b/emper/lib/TaggedPtr.hpp index 9da550e0455d20b3b46054d0af7c63854e9ee58d..2532fde3fce6ed97f743cc43cfb0ea3c878b792f 100644 --- a/emper/lib/TaggedPtr.hpp +++ b/emper/lib/TaggedPtr.hpp @@ -46,9 +46,12 @@ class TaggedPtr { } /** - * @brief extract the 48-bit the pointer part + * @brief extract the 48-bit pointer part + * + * The least significant bit is masked since it is used as marker + * and not used by a pointer due to alignment. * - * @return ptr The value of the actuall pointer part of tptr + * @return ptr The value of the actual pointer part of tptr */ [[nodiscard]] inline auto getPtrValue() const -> uintptr_t { // ignore the least significant bit of the tagged pointer @@ -56,6 +59,16 @@ class TaggedPtr { return tptr & (TPTR_POINTER_MASK - 1); } + /** + * @brief extract the 48-bit the pointer part + * + * @return ptr The value of the 48 least significant bits + */ + [[nodiscard]] inline auto getRawPtrValue() const -> uintptr_t { + // NOLINTNEXTLINE(performance-no-int-to-ptr) + return tptr & TPTR_POINTER_MASK; + } + /** * @brief extract the 16-bit tag part * diff --git a/emper/lib/meson.build b/emper/lib/meson.build index 88b68f898aa5ccb9a4a790bae234b640e48306c8..e59bfa43c5742beb9d27d5f1fed7eb659fb7e46f 100644 --- a/emper/lib/meson.build +++ b/emper/lib/meson.build @@ -2,3 +2,5 @@ emper_cpp_sources += files( 'DebugUtil.cpp', 'LinuxVersion.cpp', ) + +subdir('sync') diff --git a/emper/lib/sync/SpuriousFutex2Semaphore.cpp b/emper/lib/sync/SpuriousFutex2Semaphore.cpp new file mode 100644 index 0000000000000000000000000000000000000000..41abdc8b1aeb0a3ee88a3937ba59ba19aafc3f72 --- /dev/null +++ b/emper/lib/sync/SpuriousFutex2Semaphore.cpp @@ -0,0 +1,89 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "lib/sync/SpuriousFutex2Semaphore.hpp" + +#include <linux/futex.h> +#include <unistd.h> + +#include <atomic> +#include <cassert> +#include <cerrno> + +#include "Worker.hpp" + +#ifndef SYS_futex_waitv +#define SYS_futex_waitv 449 +#endif + +#ifndef FUTEX_32 +#define FUTEX_32 2 +#endif + +struct futex_waitv { + uint64_t val; + uint64_t uaddr; + uint32_t flags; + uint32_t _reserved; +}; + +static void init_futex_waitv(struct futex_waitv* waiter, uint64_t val, void* uaddr) { + waiter->val = val; + waiter->uaddr = reinterpret_cast<uintptr_t>(uaddr); + waiter->flags = FUTEX_PRIVATE_FLAG | FUTEX_32; + waiter->_reserved = 0; +} + +namespace emper::lib::sync { + +void SpuriousFutex2Semaphore::wait() { + const workerid_t workerId = Worker::getCurrentWorkerId(); + std::atomic<SleeperState>& workerState = states[workerId]; + + const SleeperState oldState = workerState.exchange(SleeperState::Sleeping); + // Someone has notified us specifically -> skip sleeping. + if (oldState == SleeperState::Notified) { + workerState.store(SleeperState::Running, std::memory_order_relaxed); + return; + } + + // Me must have been running. + assert(oldState == SleeperState::Running); + + // Decrement the global semaphore count. + // See fetch_add comment in SpuriousFutex2Semaphore.hpp. + CounterType c = counter.fetch_sub(1, std::memory_order_relaxed) - 1; + assert(c >= -workerCount); + LOGD("Decrement counter to: " << c); + + while (c < 0 && workerState.load(std::memory_order_relaxed) == SleeperState::Sleeping) { + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + struct futex_waitv waiters[2]; + // Global futex + init_futex_waitv(&waiters[0], c, &counter); + + // Specific futex + init_futex_waitv(&waiters[1], static_cast<uint64_t>(SleeperState::Sleeping), &workerState); + + LOGD("Sleep on both futex"); + long ret = syscall(SYS_futex_waitv, waiters, 2, 0, nullptr, 0); + if (ret == -1) { + assert(errno == EAGAIN); + c = getValue(); + continue; + } + // Someone called FUTEX_WAIT on either of the two futex + + // The futex(2) manpage says that applications should check the futex word + // again because it is possible to have spurious wakeups. + // We can't check the futex value because there is no way to decided + // if someone called notify. + // A sound generic Semaphore needs to split its atomic counter into + // a semaphore value and a waiter count part. + + LOGD("futex_waitv returned with: " << ret); + break; + } + workerState.store(SleeperState::Running, std::memory_order_relaxed); +} + +} // namespace emper::lib::sync diff --git a/emper/lib/sync/SpuriousFutex2Semaphore.hpp b/emper/lib/sync/SpuriousFutex2Semaphore.hpp new file mode 100644 index 0000000000000000000000000000000000000000..ac369e3e974e8a0830c530ae15213f885fe37ede --- /dev/null +++ b/emper/lib/sync/SpuriousFutex2Semaphore.hpp @@ -0,0 +1,114 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#include <linux/futex.h> +#include <syscall.h> +#include <unistd.h> + +#include <atomic> +#include <cstdint> +#include <memory> +#include <new> +#include <ostream> + +#include "Common.hpp" +#include "Debug.hpp" +#include "emper-common.h" +#include "lib/LinuxVersion.hpp" +#include "sleep_strategy/SleeperState.hpp" + +namespace emper::lib::sync { + +/** + * @brief A semaphore implementation using two futexes to wait + * + * SpuriousFutex2Semaphore behaves like a normal Semaphore when using + * only notify() and notify_many(count). + * But it also supports notify_specific(workerId). + * The notify_specific(workerId) is implemented with a exclusive futex per + * worker. The worker specific futexes are a cache line exclusive SleeperState + * per worker. A worker calling wait will use futex_waitv(2) introduced in linux + * v5.16 to wait on both futexes. + * + * To notify a specific Worker we first set its SleeperState to Notified + * and check the previous value: + * - If it was running it will skip the next wait() and reset it + * - If it was sleeping we have to call FUTEX_WAKE on the futex + * - The futex_waitv call will fail if the notifier races the futex_waitv call + * - If futex_waitv wins the race the notifier's FUTEX_WAKE will wake it + * + * ATTENTION: This semaphore implementation is prone to spurious wakeups! + * SpuriousFutex2Semaphore::wait() may return spurious without a call to + * SpuriousFutex2Semaphore::notify(). + * Use this class only where it is safe to have spurious wakeups. + * It is designed to wake up and suspend worker threads it does not provided any + * safety guaranties when protecting from data races. + */ +class SpuriousFutex2Semaphore : Logger<LogSubsystem::SLEEP_S> { + public: + // All futexes are currently 32 bits in size + using CounterType = int32_t; + + private: + using SleeperState = emper::sleep_strategy::SleeperState; + // States of the worker threads + std::atomic<SleeperState>* states; + + // >= 0 means no waiters + // < 0 means there are waiters + std::atomic<CounterType> counter; + + int workerCount; + + public: + SpuriousFutex2Semaphore(CounterType counter = 0) : counter(counter) { + if (EMPER_LINUX_LT("5.16.0")) + DIE_MSG("SpuriousFutex2Semaphore needs futex_waitv(2) introduced in linux v5.16"); + } + + ~SpuriousFutex2Semaphore() { delete states; } + + void init(workerid_t workerCount) { + this->workerCount = workerCount; + // Allcoate each worker specific SleeperState in its own cache line + states = new (std::align_val_t(CACHE_LINE_SIZE)) std::atomic<SleeperState>[workerCount]; + } + + [[nodiscard]] inline auto getValue() const -> CounterType { + return counter.load(std::memory_order_relaxed); + } + + inline void notify_many(CounterType count) { + // Using std::memory_order_relaxed is fine since we don't + // give any memory synchronization guaranties because of the + // possible spurious wakeups + CounterType c = counter.fetch_add(count, std::memory_order_relaxed); + LOGD("Futex2Sem: notify_many(" << count << ") inc counter to: " << c + count); + if (c < 0) { + LOGD("Futex2Sem: notify_many(" << count << ") wake " << count << " on " << &counter); + syscall(SYS_futex, &counter, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count, 0, 0, 0); + } + } + + inline void notify() { notify_many(1); } + + inline void notifySpecific(workerid_t workerId) { + std::atomic<SleeperState>& workerState = states[workerId]; + SleeperState oldState = workerState.exchange(SleeperState::Notified, std::memory_order_relaxed); + + // We are responsible to wake the futex + if (oldState == SleeperState::Sleeping) { + // The sleeper has decremented the semaphore count before sleeping. + // We have to increment it when not using the counter to wakeup + // to prevent the counter from shrinking uncontrolled. + counter.fetch_add(1, std::memory_order_relaxed); + LOGD("increment counter to " << counter << " and wake specific"); + syscall(SYS_futex, &workerState, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, 0, 0, 0); + } + } + + void wait(); +}; + +} // namespace emper::lib::sync diff --git a/emper/lib/sync/meson.build b/emper/lib/sync/meson.build new file mode 100644 index 0000000000000000000000000000000000000000..c4476caeaea16eed711961c3b8673669073a658c --- /dev/null +++ b/emper/lib/sync/meson.build @@ -0,0 +1,3 @@ +emper_cpp_sources += files( + 'SpuriousFutex2Semaphore.cpp', +) diff --git a/emper/meson.build b/emper/meson.build index fbd3e44cf19c0aa528233b53d914ddc6767c8ee4..4423f03b37b1b5adfc76f57f0d7201fcdf27ade9 100644 --- a/emper/meson.build +++ b/emper/meson.build @@ -17,6 +17,8 @@ emper_cpp_sources = [ 'Runtime.cpp', 'Emper.cpp', 'Fiber.cpp', + 'FiberHint.cpp', + 'FiberSource.cpp', 'FiberManager.cpp', 'Context.cpp', 'Scheduler.cpp', diff --git a/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp b/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp index 47a8e6e69002ab467dd2ac28dc0fe7ccf91f943b..cec832b50b6fff789aec85147e467ddefc89950b 100644 --- a/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp +++ b/emper/sleep_strategy/AbstractWorkerSleepStrategy.hpp @@ -6,12 +6,10 @@ #include "CallerEnvironment.hpp" #include "Emper.hpp" +#include "Worker.hpp" namespace emper::sleep_strategy { -static constexpr bool needsNotifySpecific = - (emper::IO_COMPLETER_BEHAVIOR == emper::IoCompleterBehavior::wakeup); - template <class T> class AbstractWorkerSleepStrategy { [[nodiscard]] inline auto getSleeping() const -> long { @@ -35,12 +33,12 @@ class AbstractWorkerSleepStrategy { template <CallerEnvironment callerEnvironment> void notifySpecific(workerid_t workerId) { - if constexpr (needsNotifySpecific) { - static_cast<T*>(this)->template notifySpecific<callerEnvironment>(workerId); - } else { - throw std::logic_error( - "Called SemaphoreWorkerSleepStrategy::notifySpecific but needsNotifySpecific is false"); + // Do not notify ourselves + if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + if (workerId == Worker::getCurrentWorkerId()) return; } + + static_cast<T*>(this)->template notifySpecific<callerEnvironment>(workerId); } void sleep() { static_cast<T*>(this)->sleep(); } diff --git a/emper/sleep_strategy/PipeSleepStrategy.cpp b/emper/sleep_strategy/PipeSleepStrategy.cpp index a830730be656fa5bfea894165e9909a15f4fd933..3466425d2f3bbeb13f289157135d59fa24e78341 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.cpp +++ b/emper/sleep_strategy/PipeSleepStrategy.cpp @@ -6,131 +6,98 @@ #include <atomic> #include <cassert> -#include <mutex> #include "CallerEnvironment.hpp" +#include "Runtime.hpp" #include "Worker.hpp" #include "io/IoContext.hpp" #include "lib/TaggedPtr.hpp" -class Runtime; - using emper::io::IoContext; using emper::lib::TaggedPtr; namespace emper::sleep_strategy { +thread_local PipeSleepStrategy::Pipe PipeSleepStrategy::pipe; +thread_local PipeSleepStrategy::SleepState PipeSleepStrategy::sleepState; + PipeSleepStrategy::PipeSleepStrategy(Runtime& runtime, workerid_t workerCount) : workerCount(workerCount), stats(runtime) { LOGD("init pipe sleep startegy"); sleepers.store(0, std::memory_order_relaxed); - // NOLINTNEXTLINE(modernize-avoid-c-arrays) - int fds[2]; - if (pipe(fds)) { - DIE_MSG_ERRNO("pipe failed"); - } - sleepFd = fds[0]; - notifyFd = fds[1]; -} + sleepStates = new SleepState*[workerCount]; + pipes = new Pipe*[workerCount]; -PipeSleepStrategy::~PipeSleepStrategy() { - close(sleepFd); - close(notifyFd); + runtime.addNewWorkerHook([this](workerid_t workerId) { + sleepStates[workerId] = &sleepState; + pipes[workerId] = &pipe; + }); } -template <CallerEnvironment callerEnvironment> -[[nodiscard]] auto PipeSleepStrategy::createHint() -> TaggedPtr { - if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - return {(uintptr_t)Worker::getCurrentWorkerId(), - static_cast<uint16_t>(IoContext::PointerTags::NewWorkWsq)}; - } else { - return {(uintptr_t) nullptr, static_cast<uint16_t>(IoContext::PointerTags::NewWorkAq)}; - } +PipeSleepStrategy::~PipeSleepStrategy() { + delete[] sleepStates; + delete[] pipes; } -template auto PipeSleepStrategy::createHint<CallerEnvironment::EMPER>() -> TaggedPtr; -template auto PipeSleepStrategy::createHint<CallerEnvironment::ANYWHERE>() -> TaggedPtr; - void PipeSleepStrategy::sleep() { IoContext& io = *IoContext::getWorkerIo(); - // Only we are setting the flag therefore it is safe to use relaxed here - const bool mustPrepareSqe = !io.waitInflight.load(std::memory_order_relaxed); - if (mustPrepareSqe) { - // If there is a completer we have to ensure that the completer does not - // reap our new work notification rendering the worker sleeping forever. - // The waitInflight flag is not enough because it is possible set - // while the completer is currently reaping. - // Only using a flag means there is this possibility: - // C W1 W2 Kernel - // | | calls sleep | - // | | | generate cqe in W2's CQ - // check waitInflight | | | - // | | inc sleeper count | - // | | prepare read sqe | - // | | submit read sqe | - // | | set waitInflight | - // | notify | | - // | observe sleepers | | - // | write to pipe | | - // | | | complete W2's read - // reap W2 CQ | | | - // consume newWork cqe | | | - // | | await completions | - // | | sleep forever | - - // Just setting the waitInflight flag earlier is no solution. - - // Our approach to this race is to repurpose the lock protecting the CQ to ensure the - // completer is not reaping our CQ while we are about to sleep. - - // This has also the advantage that new work generated by the completer - // reaping our CQ may be faster scheduled and executed this worker - - // Try to take the cq_lock - if constexpr (IoContext::needsCqLock) { - if (unlikely(!io.cq_lock.try_lock())) { - LOGD("Completer is currently reaping -> skip sleeping to handle possible continuations"); - return; - } - } + { + std::lock_guard<std::mutex> lock(sleepState.lock); - // increment the sleeper count if it was negative we should skip sleeping - int64_t sleeping = sleepers.fetch_add(1, std::memory_order_acquire); - if (sleeping < 0) { - LOGD("observed sleeper count as: " << sleeping << " -> skip sleeping"); - if constexpr (IoContext::needsCqLock) { - io.cq_lock.unlock(); - } + // Check if we were notified specifically and should skip the sleep attempt + if (sleepState.isNotified()) { + sleepState.markRunning(); + LOGD("Reset notified state to running -> skip sleeping"); return; } - assert(sleeping <= workerCount); + if (!sleepState.globalReadInflight) { + // increment the sleeper count if it was negative we should skip sleeping + int64_t sleeping = sleepers.fetch_add(1, std::memory_order_acquire); + if (sleeping < 0) { + LOGD("observed sleeper count as: " << sleeping << " -> skip sleeping"); + return; + } + assert(sleeping <= workerCount); + + sleepState.globalReadInflight = true; - struct io_uring_sqe* sqe = io_uring_get_sqe(&io.ring); - assert(sqe); + struct io_uring_sqe* sqe = io.getSqe(); - // We read directly into the workers dispatchHint - io_uring_prep_read(sqe, sleepFd, &io.worker->dispatchHint, sizeof(io.worker->dispatchHint), 0); + // We read directly into the workers dispatchHint + io_uring_prep_read(sqe, global.sleepFd, &io.worker->dispatchHint, + sizeof(io.worker->dispatchHint), 0); - // Mark the sqe as a new work notification to reset the waitInflight flag when reaping the - // resulting cqe - io_uring_sqe_set_data(sqe, TaggedPtr((uintptr_t) nullptr, - static_cast<uint16_t>(IoContext::PointerTags::NewWorkAq))); + // Mark the sqe as a new work notification to reset the Global flag when reaping the + // resulting cqe + io_uring_sqe_set_data( + sqe, TaggedPtr(0, static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification))); - io.trackReqsInUring(1); + io.trackReqsInUring(1); - LOGD("prepared sleepFd read and set sleepers count to: " << sleeping + 1); + LOGD("prepared global.sleepFd read and set sleepers count to: " << sleeping + 1); + } + + if (!sleepState.isSleeping()) { + sleepState.markSleeping(); + + struct io_uring_sqe* sqe = io.getSqe(); - // Before going to sleep prevent the completer from reaping completions - // on our IoContext otherwise we would neither safely wakeup nor reset - // the waitInflight flag. - io.waitInflight.store(true, std::memory_order_release); - LOGD("set waitinflight"); + // We read directly into the workers dispatchHint + // TODO: Think about the race between the two reads + io_uring_prep_read(sqe, pipe.sleepFd, &io.worker->dispatchHint, + sizeof(io.worker->dispatchHint), 0); - if constexpr (IoContext::needsCqLock) { - io.cq_lock.unlock(); + // Tag the sqe with a marked new work notification to reset the specific state + // when reaping the resulting cqe. + io_uring_sqe_set_data( + sqe, TaggedPtr((void*)nullptr, + static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification), true)); + + io.trackReqsInUring(1); + LOGD("prepared pipe.sleepFd read"); } } @@ -143,20 +110,58 @@ void PipeSleepStrategy::sleep() { } template <CallerEnvironment callerEnvironment> -void PipeSleepStrategy::onNewWorkNotification(IoContext& io) { +void PipeSleepStrategy::notifySpecific(workerid_t workerId) { + auto& specificState = *sleepStates[workerId]; + std::lock_guard<std::mutex> lock(specificState.lock); + + LOGD("Specifically notify worker " << workerId << " from " << callerEnvironment); + + if (specificState.isNotified()) { + LOGD(workerId << " already marked notified"); + return; + } + + const bool isSleeping = specificState.markNotified(); + + if (isSleeping) { + LOGD(workerId << " has specific read -> write notification"); + writeSpecificNotification(workerId); + } +} + +template void PipeSleepStrategy::notifySpecific<CallerEnvironment::EMPER>(workerid_t workerId); +template void PipeSleepStrategy::notifySpecific<CallerEnvironment::ANYWHERE>(workerid_t workerId); + +template <CallerEnvironment callerEnvironment> +void PipeSleepStrategy::onNewWorkNotification(IoContext& io, TaggedPtr data) { if constexpr (callerEnvironment != CallerEnvironment::OWNER) { DIE_MSG("Others reaping new work notification from " << io.worker->getWorkerId() << " CQ"); } - LOGD("Got new work notification"); - stats.incWakeupDueToNotify(); + assert(data.getTag() == static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification)); + + if (data.isMarked()) { + std::lock_guard<std::mutex> lock(sleepState.lock); + LOGD("Got specific notification"); + stats.incWakeupDueToNotify(); - // Reset flag to indicate that a new sleep cqe must be prepared - // and allow the completer to reap completions again - io.waitInflight.store(false, std::memory_order_release); + // Reset specific and notified flag to indicate that a new specific notification + // was consumed, a new specific read must be prepared and other notifySpecific + // calls must notify again. + sleepState.markRunning(); + } else { + LOGD("Got new work notification"); + stats.incWakeupDueToNotify(); + // Reset global flag to indicate that a new sleep cqe must be prepared + // and allow the completer to reap completions again + sleepState.globalReadInflight = false; + } } -template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::OWNER>(IoContext& io); -template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::EMPER>(IoContext& io); -template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::ANYWHERE>(IoContext& io); +template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::OWNER>(IoContext& io, + TaggedPtr data); +template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::EMPER>(IoContext& io, + TaggedPtr data); +template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::ANYWHERE>(IoContext& io, + TaggedPtr data); } // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/PipeSleepStrategy.hpp b/emper/sleep_strategy/PipeSleepStrategy.hpp index 3a57e551e874603ac41b385da56eda1ca96ce21c..ccce97be42b27e29c6a15842349f6d0dc11f7390 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.hpp +++ b/emper/sleep_strategy/PipeSleepStrategy.hpp @@ -8,20 +8,19 @@ #include <atomic> #include <cstdint> #include <iostream> -#include <stdexcept> +#include <mutex> #include <vector> #include "CallerEnvironment.hpp" #include "Common.hpp" #include "Debug.hpp" -#include "Worker.hpp" +#include "FiberHint.hpp" #include "emper-common.h" #include "lib/TaggedPtr.hpp" #include "sleep_strategy/AbstractWorkerSleepStrategy.hpp" +#include "sleep_strategy/SleeperState.hpp" #include "sleep_strategy/Stats.hpp" -using emper::lib::TaggedPtr; - class Runtime; namespace emper::io { @@ -48,18 +47,31 @@ namespace emper::sleep_strategy { *``` * Data: * Global: - * hint pipe + * global pipe * sleepers count * Per worker: * dispatch hint buffer - * in flight flag + * specific pipe + * state lock + * sleep state * * Sleep: - * if we have no sleep request in flight + * Lock state + * If state == notified + * Set state = running + * return + * + * If we have no global sleep request in flight * Atomic increment sleep count - * Remember that we are sleeping - * Prepare read cqe from the hint pipe to dispatch hint buffer - * Prevent the completer from reaping completions on this worker IoContext + * Skip sleeping if sleep count was < 0 + * Mark sleep requests in flight + * Prepare read cqe from the global pipe to dispatch hint buffer + * + * If state == running + * Set state = sleeping + * Prepare marked read cqe from the specific pipe to dispatch hint buffer + * + * Unlock state * Wait until IO completions occurred * * NotifyEmper(n): @@ -82,9 +94,20 @@ namespace emper::sleep_strategy { * toWakeup = min(observed sleeping, n) * write toWakeup hints to the hint pipe * + * NotifySpecific(w): + * Get w's state + * Lock state + * Return if already notified + * Mark state notified + * Write hint if was stat was sleeping + * * onNewWorkCompletion: - * reset in flight flag - * allow completer to reap completions on this IoContext + * If data is marked + * lock state + * set state = running + * return + * + * Reset global read inflight *``` * * Notes @@ -105,9 +128,9 @@ namespace emper::sleep_strategy { * This is a trade-off where we trade slower wakeup - a just awoken worker * will check for local work - against a faster dispatch hot path when * we have work to do in our local WSQ. - * * The completer tread must not reap completions on the IoContexts of - * sleeping workers because this introduces a race for cqes and a possible - * lost wakeup if the completer consumes the completions before the worker + * * Other thread must not reap new work notifications because this + * would introduces a race for cqes and a possible + * lost wakeup if the other consumes the completions before the worker * is actually waiting for them. * * When notifying sleeping workers from anywhere we must ensure that all * notifications take effect. This is needed for example when terminating @@ -120,30 +143,85 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, public Logger<LogSubsystem::SLEEP_S> { friend class emper::io::IoContext; - workerid_t workerCount; - int sleepFd; - int notifyFd; + /** + * @brief State of a worker + */ + class SleepState { + friend class PipeSleepStrategy; + + bool globalReadInflight = false; + std::mutex lock; + emper::sleep_strategy::SleeperState s = emper::sleep_strategy::SleeperState::Running; + + auto markNotified() -> bool { + auto oldS = s; + s = emper::sleep_strategy::SleeperState::Notified; + return oldS == emper::sleep_strategy::SleeperState::Sleeping; + } + void markSleeping() { s = emper::sleep_strategy::SleeperState::Sleeping; } + void markRunning() { s = emper::sleep_strategy::SleeperState::Running; } + + auto isNotified() const -> bool { return s == emper::sleep_strategy::SleeperState::Notified; } + auto isSleeping() const -> bool { return s == emper::sleep_strategy::SleeperState::Sleeping; } + }; + + SleepState** sleepStates; + static thread_local SleepState sleepState; + + class Pipe { + friend class PipeSleepStrategy; + int sleepFd; + int notifyFd; + + Pipe() { + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + int fds[2]; + if (::pipe(fds)) DIE_MSG_ERRNO("pipe failed"); + sleepFd = fds[0]; + notifyFd = fds[1]; + } + + ~Pipe() { + if (close(sleepFd) || close(notifyFd)) DIE_MSG("Closing pipe failed"); + } + }; + + const workerid_t workerCount; + Pipe global; + + /** + * @brief Per worker pipe to notify specific workers + */ + Pipe** pipes; + static thread_local Pipe pipe; Stats stats; // Make sure the shared counter lives in an exlusive cache line CACHE_LINE_EXCLUSIVE(std::atomic<int64_t>, sleepers); - template <CallerEnvironment callerEnvironment> - [[nodiscard]] auto createHint() -> TaggedPtr; - - void writeNotifications(TaggedPtr hint, int64_t count) { + void writeNotifications(emper::FiberHint hint, int64_t count) { stats.addNotifications(count); std::vector<void*> hints(count, hint); - ssize_t res = write(notifyFd, hints.data(), sizeof(void*) * hints.size()); + ssize_t res = write(global.notifyFd, hints.data(), sizeof(void*) * hints.size()); if (unlikely(res) < 0) { DIE_MSG_ERRNO("writing to notifyFd failed"); } } - void notifyFromEmper(int64_t& sleeping, TaggedPtr hint, int64_t count) { + void writeSpecificNotification(workerid_t workerId) { + stats.addNotifications(1); + + const void* b = nullptr; + ssize_t res = write(pipes[workerId]->notifyFd, &b, sizeof(b)); + if (unlikely(res) < 0) { + DIE_MSG_ERRNO("writing to the notifyFd of worker " << workerId << " failed"); + } + } + + void notifyFromEmper(int64_t& sleeping, emper::FiberHint hint, int64_t count) { int64_t toWakeup; // We need to decrement the sleeper count on the notification side @@ -172,7 +250,7 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, << toWakeup << " notifications and set sleepers count to: " << sleeping - toWakeup); } - void notifyFromAnywhere(int64_t& sleeping, TaggedPtr hint, int64_t count) { + void notifyFromAnywhere(int64_t& sleeping, emper::FiberHint hint, int64_t count) { // If we observe nobody sleeping we need to prevent sleep locks when // everybody is about to sleep but none has incremented the sleepers count yet. // We prevent the next sleep from blocking by setting the sleeper count to @@ -217,7 +295,7 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, } template <CallerEnvironment callerEnvironment> - void notify(TaggedPtr hint, uint32_t count) { + void notify(emper::FiberHint hint, uint32_t count) { // The hint must be != nullptr so sleep() knows when to prepare and submit // a sleepFd read sqe. @@ -249,7 +327,7 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, template <CallerEnvironment callerEnvironment> inline void notifyMany(unsigned count) { if (count > 1) LOGD("notify(hint, " << count << ")"); - notify<callerEnvironment>(createHint<callerEnvironment>(), count); + notify<callerEnvironment>(emper::FiberHint::createNewWorkHint<callerEnvironment>(), count); } template <CallerEnvironment callerEnvironment> @@ -258,15 +336,10 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, } template <CallerEnvironment callerEnvironment> - inline void notifySpecific(workerid_t /* workerId */) { - throw std::logic_error("Not implemented"); - // // TODO: get pid of specific worker - // pid_t specific = 0; - // notify<callerEnvironment>(createHint<callerEnvironment>(), specific); - } + void notifySpecific(workerid_t workerId); template <CallerEnvironment callerEnvironment> - void onNewWorkNotification(emper::io::IoContext& io); + void onNewWorkNotification(emper::io::IoContext& io, emper::lib::TaggedPtr data); void sleep(); }; } // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp index 188f0152fb777f80dc560065551d584b90eebd7b..0d2f07ac1cd5b2d3649dc0458ad9557d1adb76bd 100644 --- a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp +++ b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp @@ -3,14 +3,19 @@ #pragma once #include <atomic> +#include <chrono> #include <iostream> +#include <new> +#include <thread> #include "CallerEnvironment.hpp" +#include "Common.hpp" #include "Debug.hpp" #include "Worker.hpp" #include "emper-common.h" #include "emper-config.h" #include "sleep_strategy/AbstractWorkerSleepStrategy.hpp" +#include "sleep_strategy/SleeperState.hpp" #include "sleep_strategy/Stats.hpp" #ifdef EMPER_LOCKED_WAKEUP_SEMAPHORE @@ -25,6 +30,10 @@ #include "lib/sync/SpuriousFutexSemaphore.hpp" #endif +#ifdef EMPER_FUTEX2_WAKEUP_SEMAPHORE +#include "lib/sync/SpuriousFutex2Semaphore.hpp" +#endif + class Runtime; namespace emper::sleep_strategy { @@ -45,28 +54,35 @@ class AbstractSemaphoreWorkerSleepStrategy // check if the used Semaphore provides a notifySpecific implementation static constexpr bool semHasNotifySpecific = requires(Sem s) { s.notifySpecific(0); }; + // check if the used Semaphore needs initialization + static constexpr bool semNeedsInit = requires(Sem s) { s.init(0); }; + // should the generic notifySpecific implementation be used - static constexpr bool useGenericNotifySpecificImpl = !semHasNotifySpecific && needsNotifySpecific; + static constexpr bool useGenericNotifySpecificImpl = !semHasNotifySpecific; // Member used for the generic notifySpecific implementation - std::atomic<bool>* notifiedFlags; + std::atomic<SleeperState>* states = nullptr; // Sleep part of the generic notifySpecific implementation inline void genericNotifySpecificSleep() { - workerid_t workerId = Worker::getCurrentWorkerId(); - auto& notified = notifiedFlags[workerId]; - if (notified.load(std::memory_order_relaxed)) { - notified.store(false, std::memory_order_relaxed); + const workerid_t workerId = Worker::getCurrentWorkerId(); + auto& state = states[workerId]; + + auto expected = SleeperState::Running; + if (!state.compare_exchange_strong(expected, SleeperState::Sleeping, std::memory_order_release, + std::memory_order_relaxed)) { + state.store(SleeperState::Running, std::memory_order_release); stats.incSkip(); + LOGD("State " << &state << " was Notified reset to Running and skip sleeping"); return; } + LOGD("going to sleep"); wakeupSem.wait(); stats.incWakeup(); - if (notified.load(std::memory_order_relaxed)) { - notified.store(false, std::memory_order_relaxed); - } + LOGD("awoken set state to Running"); + state.store(SleeperState::Running, std::memory_order_relaxed); } // Currently we don't have a good sempahore based algorithm to notify a specific @@ -87,26 +103,31 @@ class AbstractSemaphoreWorkerSleepStrategy // 5. W2 iterates the dispatch loop two times without finding work -> goes to sleep // 6. W1 calls sleep() with a sem value of 0 -> goes to sleep though it has a cqe in its CQ // - // The used approach is to keep a notified flag for each worker in the notifiedFlags - // array. Those flags make it possible to notify a specific worker. + // The used approach is to keep a state for each worker in the states + // array. Those states make it possible to notify a specific worker. // We are sure that the specific worker was successfully notified if we observe - // that its flag was reset. + // that its state has changed. // Therefore we can notify all worker in a loop until we observe that the - // specific worker has changed its flag or the runtime is terminating. - // - // Unfortunately this introduces significant overhead in the sleep method which is - // only necessary if the runtime actually wants to notify a specific worker. - // This should be configured at compile time using a boolean template parameter of the - // WorkerWakeupStrategy (<bool needsNotifySpecific>). - // - // For now we hardcode a constexpr check for the only condition where we need notifySpecific + // specific worker has changed its state or the runtime is terminating. template <CallerEnvironment callerEnvironment> inline void genericNotifySpecific(workerid_t workerId) { - auto& notifiedFlag = notifiedFlags[workerId]; - notifiedFlag.store(true, std::memory_order_relaxed); - // NotifyAll since we observe that the specific worker has reset its flag - while (notifiedFlag.load(std::memory_order_relaxed) && !isRuntimeTerminating()) { + auto& state = states[workerId]; + if (state.exchange(SleeperState::Notified, std::memory_order_release) != + SleeperState::Sleeping) { + LOGD("Worker to notify (" << workerId << ") is not sleeping -> skip notifying"); + return; + } + + // NotifyAll since we observe that the specific worker has changes its state + while (state.load(std::memory_order_relaxed) == SleeperState::Notified && + !isRuntimeTerminating()) { notifyAll<callerEnvironment>(); + // clang-tidy bug workaround + // see: + // https://stackoverflow.com/questions/65564677/clang-tidy-parsing-error-at-spaceship-operator + std::this_thread::sleep_until(std::chrono::system_clock::now() + + std::chrono::milliseconds(1)); + // std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } @@ -144,16 +165,16 @@ class AbstractSemaphoreWorkerSleepStrategy AbstractSemaphoreWorkerSleepStrategy(Runtime& runtime, workerid_t workerCount) : workerCount(workerCount), stats(runtime) { if constexpr (useGenericNotifySpecificImpl) { - notifiedFlags = new std::atomic<bool>[workerCount]; + states = new (std::align_val_t(CACHE_LINE_SIZE)) std::atomic<SleeperState>[workerCount]; } - } - ~AbstractSemaphoreWorkerSleepStrategy() { - if constexpr (useGenericNotifySpecificImpl) { - delete[] notifiedFlags; + if constexpr (semNeedsInit) { + wakeupSem.init(workerCount); } } + ~AbstractSemaphoreWorkerSleepStrategy() { delete[] states; } + [[nodiscard]] inline auto getSleeping() const -> long { return wakeupSem.getValue(); } template <CallerEnvironment callerEnvironment> @@ -190,11 +211,14 @@ class AbstractSemaphoreWorkerSleepStrategy template <CallerEnvironment callerEnvironment> void notifySpecific(workerid_t workerId) { + LOGD("specifically notify worker " << workerId << " from " << callerEnvironment); + if constexpr (semHasNotifySpecific) { wakeupSem.notifySpecific(workerId); } else { genericNotifySpecific<callerEnvironment>(workerId); } + stats.incNotify(); stats.incNotifications(); } @@ -214,16 +238,20 @@ class AbstractSemaphoreWorkerSleepStrategy }; #ifdef EMPER_LOCKED_WAKEUP_SEMAPHORE -using ::emper::lib::sync::LockedSemaphore; -using SemaphoreWorkerSleepStrategy = AbstractSemaphoreWorkerSleepStrategy<LockedSemaphore>; +using SemaphoreWorkerSleepStrategy = + AbstractSemaphoreWorkerSleepStrategy<::emper::lib::sync::LockedSemaphore>; #elif defined EMPER_POSIX_WAKEUP_SEMAPHORE -using ::emper::lib::sync::PosixSemaphore; -using SemaphoreWorkerSleepStrategy = AbstractSemaphoreWorkerSleepStrategy<PosixSemaphore>; +using SemaphoreWorkerSleepStrategy = + AbstractSemaphoreWorkerSleepStrategy<::emper::lib::sync::PosixSemaphore>; #elif defined EMPER_FUTEX_WAKEUP_SEMAPHORE -using ::emper::lib::sync::SpuriousFutexSemaphore; -using SemaphoreWorkerSleepStrategy = AbstractSemaphoreWorkerSleepStrategy<SpuriousFutexSemaphore>; +using SemaphoreWorkerSleepStrategy = + AbstractSemaphoreWorkerSleepStrategy<::emper::lib::sync::SpuriousFutexSemaphore>; + +#elif defined EMPER_FUTEX2_WAKEUP_SEMAPHORE +using SemaphoreWorkerSleepStrategy = + AbstractSemaphoreWorkerSleepStrategy<::emper::lib::sync::SpuriousFutex2Semaphore>; #else #error Unknown WorkerSleepSemaphore implementation diff --git a/emper/sleep_strategy/SleeperState.hpp b/emper/sleep_strategy/SleeperState.hpp new file mode 100644 index 0000000000000000000000000000000000000000..937b6c4143b4dd1d619c81e24701ffa1b88e798e --- /dev/null +++ b/emper/sleep_strategy/SleeperState.hpp @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +namespace emper::sleep_strategy { + +/** + * @brief States a worker can have regarding the sleep strategy + */ +enum class SleeperState { + Running, /*!< The worker is in its dispatch loop */ + Sleeping, /*!< The worker is sleeping in the sleep strategy */ + Notified, /*!< The worker was notified an should (re-) execute its dispatch loop */ +}; + +} // namespace emper::sleep_strategy diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index a4a6d251437244389d6f9108b31ae276331cb1c8..028dd892ab742c8869ee5158c5d5cbfc4061cc95 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -1,10 +1,11 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2021 Florian Schmaus +// Copyright © 2021 Florian Schmaus, Florian Fischer #include "AbstractWorkStealingScheduler.hpp" #include <algorithm> #include <array> #include <cassert> +#include <cstdint> #include <ostream> // for operator<<, basic_ostream<>::__ostream_type #include <vector> @@ -13,13 +14,14 @@ #include "Debug.hpp" // for ABORT #include "Emper.hpp" // for OVERFLOW_QUEUE #include "Fiber.hpp" +#include "FiberHint.hpp" +#include "FiberSource.hpp" #include "NextFiberResult.hpp" #include "Runtime.hpp" // for Runtime #include "StealingResult.hpp" #include "Worker.hpp" #include "emper-common.h" // for workerid_t #include "io/IoContext.hpp" -#include "lib/TaggedPtr.hpp" #include "strategies/AbstractWorkStealingStrategy.hpp" #include "strategies/AbstractWorkStealingWorkerStats.hpp" @@ -31,13 +33,19 @@ using emper::io::IoContext; thread_local AbstractWorkStealingScheduler::WsQueue<AbstractWorkStealingScheduler::QUEUE_SIZE> AbstractWorkStealingScheduler::queue; +thread_local AbstractWorkStealingScheduler::MpscQueue AbstractWorkStealingScheduler::mpscQueue; + AbstractWorkStealingScheduler::AbstractWorkStealingScheduler(Runtime& runtime, RuntimeStrategy& strategy) : Scheduler(runtime, strategy) { const workerid_t workerCount = runtime.getWorkerCount(); queues = new AbstractWorkStealingScheduler::WsQueue<QUEUE_SIZE>*[workerCount]; + mpscQueues = new AbstractWorkStealingScheduler::MpscQueue*[workerCount]; - auto newWorkerHook = [this](workerid_t workerId) { queues[workerId] = &queue; }; + auto newWorkerHook = [this](workerid_t workerId) { + queues[workerId] = &queue; + mpscQueues[workerId] = &mpscQueue; + }; addNewWorkerHook(newWorkerHook); } @@ -58,7 +66,16 @@ void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) { // Classes using this method are supposed to always invoke this // method. Hence we call onNewWork() here. - onNewWork(); + onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{emper::FiberSource::local}); +} + +void AbstractWorkStealingScheduler::scheduleToMpscQueue(Fiber& fiber, workerid_t workerId) { + mpscQueues[workerId]->enqueue(&fiber); + emper::statsIncr(awss::stats.scheduledFibersToMpscQueue); + + // Notify the runtime that we scheduled work to a specific workers' mpscQueue + // the runtime is responsible to notify it if necessary. + onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{workerId, emper::FiberSource::mpscQueue}); } auto AbstractWorkStealingScheduler::maybeRecycle(Fiber* fiber) -> bool { @@ -68,8 +85,7 @@ auto AbstractWorkStealingScheduler::maybeRecycle(Fiber* fiber) -> bool { return true; } -auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() - -> std::optional<std::pair<Fiber*, FiberSource>> { +auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() -> std::optional<NextFiberResult> { const size_t KEEP_FREE_SLOTS = 64; const size_t DEQUEUE_FROM_ANYWHERE_MAX = 512; @@ -114,20 +130,20 @@ auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() } if (fibersLiftedFromAnywhere) { - onNewWork(); + onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{emper::FiberSource::local}); if constexpr (emper::STATS) { awss::stats.recordFibersLiftedFromAnywhereQueue(fibersLiftedFromAnywhere); } } - if (res) return std::make_pair(res, FiberSource::anywhereQueue); + if (res) return NextFiberResult{res, emper::FiberSource::anywhereQueue}; return std::nullopt; } auto AbstractWorkStealingScheduler::tryStealFiberFrom(workerid_t victim) - -> std::optional<std::pair<Fiber*, FiberSource>> { + -> std::optional<NextFiberResult> { constexpr int maxRetries = emper::WAITFREE_WORK_STEALING ? 0 : -1; Fiber* fiber; popTop: @@ -137,7 +153,7 @@ popTop: if (maybeRecycle(fiber)) goto popTop; - return std::make_pair(fiber, FiberSource::stolen); + return NextFiberResult{fiber, emper::FiberSource::stolen}; } if constexpr (emper::IO_STEALING) { @@ -145,15 +161,15 @@ popTop: fiber = victimIo->reapSingleCompletion<CallerEnvironment::EMPER>(); if (fiber) { emper::statsIncr(awss::stats.nextIoFiberStolen); - return std::make_pair(fiber, FiberSource::ioStolen); + return NextFiberResult{fiber, emper::FiberSource::ioStolen}; } } return std::nullopt; } -auto AbstractWorkStealingScheduler::nextFiberViaWorkStealing() -> std::pair<Fiber*, FiberSource> { - FiberSource fiberSource = FiberSource::local; +auto AbstractWorkStealingScheduler::nextFiberResultViaWorkStealing() + -> std::optional<NextFiberResult> { Fiber* fiber; popBottom: @@ -163,108 +179,102 @@ popBottom: if (maybeRecycle(fiber)) goto popBottom; - return std::make_pair(fiber, fiberSource); + return NextFiberResult{fiber, emper::FiberSource::local}; } auto* const currentWorker = Worker::getCurrentWorker(); // Try dispatch hint possibly set by the sleep strategy - { - if (currentWorker->dispatchHint) { - const TaggedPtr dispatchHint = currentWorker->dispatchHint; - currentWorker->dispatchHint = nullptr; - const auto tag = static_cast<IoContext::PointerTags>(dispatchHint.getTag()); - - switch (tag) { - case IoContext::PointerTags::NewWorkWsq: { - const auto victim = static_cast<workerid_t>(dispatchHint.getPtrValue()); - const auto stolen = tryStealFiberFrom(victim); - if (stolen) { - fiber = (*stolen).first; - fiberSource = FiberSource::hintWsq; - emper::statsIncr(awss::stats.nextFiberFromHintLocal); - goto out; - } - } break; - - case IoContext::PointerTags::NewWorkAq: { - const auto fromAnywhere = nextFiberViaAnywhereQueue(); - if (fromAnywhere) { - fiber = (*fromAnywhere).first; - fiberSource = FiberSource::hintAq; - emper::statsIncr(awss::stats.nextFiberFromHintAnywhere); - goto out; - } - break; + if (currentWorker->dispatchHint) { + const emper::FiberHint dispatchHint = currentWorker->dispatchHint; + currentWorker->dispatchHint.clear(); + const auto source = dispatchHint.getSource(); + + switch (source) { + case emper::FiberSource::hintWsq: { + const auto victim = dispatchHint.getWorker(); + const auto stolen = tryStealFiberFrom(victim); + if (stolen) { + emper::statsIncr(awss::stats.nextFiberFromHintLocal); + onWorkStolen(); + return NextFiberResult{(*stolen).fiber, emper::FiberSource::hintWsq}; } - default: - DIE_MSG("invalid dispatch hint"); + } break; + + case emper::FiberSource::hintAq: { + const auto fromAnywhere = nextFiberViaAnywhereQueue(); + if (fromAnywhere) { + emper::statsIncr(awss::stats.nextFiberFromHintAnywhere); + return NextFiberResult{(*fromAnywhere).fiber, emper::FiberSource::hintAq}; + } + break; } + default: + DIE_MSG("invalid dispatch hint"); } } // Go into work stealing - { - // TODO: Determine if there is a better value than 1/3. - const float CHECK_ANYWHERE_QUEUE_AT_PERCENTAGE = 0.33; - const workerid_t myWorkerId = currentWorker->getWorkerId(); - const workerid_t workerCount = runtime.getWorkerCount(); - // NOLINTNEXTLINE(bugprone-narrowing-conversions) - const workerid_t checkAnywhereQueueAt = workerCount * CHECK_ANYWHERE_QUEUE_AT_PERCENTAGE; - - workerid_t startWorkerId = currentWorker->nextRandomWorkerId(); - // TODO: See how changing the victim count affects things. - const workerid_t victimCount = [&] { - if constexpr (emper::WS_VICTIM_COUNT) - return emper::WS_VICTIM_COUNT; - else if constexpr (emper::WS_VICTIM_DENOMINATOR) - return workerCount / emper::WS_VICTIM_DENOMINATOR; - else - return workerCount; - }(); - - for (workerid_t i = 0; i < victimCount; ++i) { - workerid_t victim = (startWorkerId + i) % workerCount; - - // Don't steal from ourselves. - if (unlikely(victim == myWorkerId)) continue; - - auto stolenFiber = tryStealFiberFrom(victim); - if (stolenFiber) return *stolenFiber; - - if constexpr (!emper::CHECK_ANYWHERE_QUEUE_WHILE_STEALING) continue; - - // If we failed to steal from a certain number of victims, check - // the anywhere queue for new fibers. - if (i == checkAnywhereQueueAt) { - auto anywhereQueueFiber = nextFiberViaAnywhereQueue(); - if (anywhereQueueFiber) return *anywhereQueueFiber; - } + // TODO: Determine if there is a better value than 1/3. + const float CHECK_ANYWHERE_QUEUE_AT_PERCENTAGE = 0.33; + const workerid_t myWorkerId = currentWorker->getWorkerId(); + const workerid_t workerCount = runtime.getWorkerCount(); + // NOLINTNEXTLINE(bugprone-narrowing-conversions) + const workerid_t checkAnywhereQueueAt = workerCount * CHECK_ANYWHERE_QUEUE_AT_PERCENTAGE; + + workerid_t startWorkerId = currentWorker->nextRandomWorkerId(); + // TODO: See how changing the victim count affects things. + const workerid_t victimCount = [&] { + if constexpr (emper::WS_VICTIM_COUNT) + return emper::WS_VICTIM_COUNT; + else if constexpr (emper::WS_VICTIM_DENOMINATOR) + return workerCount / emper::WS_VICTIM_DENOMINATOR; + else + return workerCount; + }(); + + for (workerid_t i = 0; i < victimCount; ++i) { + workerid_t victim = (startWorkerId + i) % workerCount; + + // Don't steal from ourselves. + if (unlikely(victim == myWorkerId)) continue; + + auto stolenFiber = tryStealFiberFrom(victim); + if (stolenFiber) { + onWorkStolen(); + return *stolenFiber; + } + + if constexpr (!emper::CHECK_ANYWHERE_QUEUE_WHILE_STEALING) continue; + + // If we failed to steal from a certain number of victims, check + // the anywhere queue for new fibers. + if (i == checkAnywhereQueueAt) { + auto anywhereQueueFiber = nextFiberViaAnywhereQueue(); + if (anywhereQueueFiber) return *anywhereQueueFiber; } } // Try the "scheduled from anywhere" queue to get work as last resort. - { - auto anywhereQueueFiber = nextFiberViaAnywhereQueue(); - if (anywhereQueueFiber) return *anywhereQueueFiber; - } + auto anywhereQueueFiber = nextFiberViaAnywhereQueue(); + if (anywhereQueueFiber) return *anywhereQueueFiber; // We where not able to dequeue any fiber if we reach this point. - fiber = nullptr; - -out: - return std::make_pair(fiber, fiberSource); + return std::nullopt; } -auto AbstractWorkStealingScheduler::nextFiberResultViaWorkStealing() -> NextFiberResult { - std::pair<Fiber*, FiberSource> nextFiberWsResult = nextFiberViaWorkStealing(); +auto AbstractWorkStealingScheduler::nextFiberResultFromMpscQueue() + -> std::optional<NextFiberResult> { + Fiber* fiber = mpscQueue.dequeue(); + if (fiber != nullptr) return NextFiberResult{fiber, emper::FiberSource::mpscQueue}; - if (nextFiberWsResult.second == FiberSource::stolen) { - onWorkStolen(); - } + return std::nullopt; +} - return NextFiberResult{ - nextFiberWsResult.first, - static_cast<uintptr_t>(nextFiberWsResult.second), - }; +auto AbstractWorkStealingScheduler::nextFiberResultFromMpscQueueOrWorkStealing() + -> std::optional<NextFiberResult> { + auto result = nextFiberResultFromMpscQueue(); + if (result) return result; + + return nextFiberResultViaWorkStealing(); } diff --git a/emper/strategies/AbstractWorkStealingScheduler.hpp b/emper/strategies/AbstractWorkStealingScheduler.hpp index ade437e1a3501cb7a8864f34d1def359258c65ee..7dd3198bf0b5b70b958fbe67ad3be26e39f41c0a 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.hpp +++ b/emper/strategies/AbstractWorkStealingScheduler.hpp @@ -3,13 +3,12 @@ #pragma once #include <cstddef> // for size_t -#include <cstdint> #include <optional> -#include <utility> -#include "NextFiberResult.hpp" +#include "Fiber.hpp" #include "Scheduler.hpp" #include "emper-common.h" +#include "lib/adt/MpscQueue.hpp" #ifdef EMPER_LOCKED_WS_QUEUE #include "lib/adt/LockedQueue.hpp" @@ -17,7 +16,7 @@ #include "lib/adt/WsClQueue.hpp" #endif -class Fiber; +struct NextFiberResult; class Runtime; class RuntimeStrategy; @@ -28,34 +27,31 @@ class AbstractWorkStealingScheduler : public Scheduler { #else using WsQueue = adt::WsClQueue<Fiber*, SIZE>; #endif + using MpscQueue = adt::MpscQueue<Fiber>; public: static const int QUEUE_SIZE = 1024; - enum class FiberSource : uintptr_t { - local, - hintWsq, - hintAq, - stolen, - ioStolen, - anywhereQueue, - }; - private: - auto nextFiberViaAnywhereQueue() -> std::optional<std::pair<Fiber*, FiberSource>>; - auto tryStealFiberFrom(workerid_t victim) -> std::optional<std::pair<Fiber*, FiberSource>>; + auto nextFiberViaAnywhereQueue() -> std::optional<NextFiberResult>; + auto tryStealFiberFrom(workerid_t victim) -> std::optional<NextFiberResult>; protected: WsQueue<QUEUE_SIZE>** queues; static thread_local WsQueue<QUEUE_SIZE> queue; + MpscQueue** mpscQueues; + static thread_local MpscQueue mpscQueue; + void scheduleViaWorkStealing(Fiber& fiber); + void scheduleToMpscQueue(Fiber& fiber, workerid_t workerId); auto maybeRecycle(Fiber* fiber) -> bool; - auto nextFiberViaWorkStealing() -> std::pair<Fiber*, FiberSource>; - - auto nextFiberResultViaWorkStealing() -> NextFiberResult; + // This method is static because it only uses the thread_local mpscQueue + static auto nextFiberResultFromMpscQueue() -> std::optional<NextFiberResult>; + auto nextFiberResultViaWorkStealing() -> std::optional<NextFiberResult>; + auto nextFiberResultFromMpscQueueOrWorkStealing() -> std::optional<NextFiberResult>; public: AbstractWorkStealingScheduler(Runtime& runtime, RuntimeStrategy& strategy); diff --git a/emper/strategies/AbstractWorkStealingStats.cpp b/emper/strategies/AbstractWorkStealingStats.cpp index 64bcaac58ffc1d0faea9891c378eddb8f142b0cd..af7b74ee83e34238c09833be21638f993d4c2428 100644 --- a/emper/strategies/AbstractWorkStealingStats.cpp +++ b/emper/strategies/AbstractWorkStealingStats.cpp @@ -21,6 +21,8 @@ void AbstractWorkStealingStats::print(std::ostream& out) { << std::to_string(comulatedWorkerStats.scheduledFibersToLocal) << std::endl << "total-scheduled-fibers-to-overflow-queue: " << std::to_string(comulatedWorkerStats.scheduledFibersToOverflowQueue) << std::endl + << "total-scheduled-fibers-to-mpsc-queue: " + << std::to_string(comulatedWorkerStats.scheduledFibersToMpscQueue) << std::endl << "global-max-queue-length: " << std::to_string(comulatedWorkerStats.maxQueueLength) << std::endl << "total-next-fiber-from-local: " << std::to_string(comulatedWorkerStats.nextFiberFromLocal) diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.cpp b/emper/strategies/AbstractWorkStealingWorkerStats.cpp index ceaf7b4004d14746d6c662dcef4722a02a63ac09..ca3ef250b10adbb8a38dd09e9bf851b990b8c9ae 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.cpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.cpp @@ -8,6 +8,7 @@ auto AbstractWorkStealingWorkerStats::operator+=(const AbstractWorkStealingWorke -> AbstractWorkStealingWorkerStats& { scheduledFibersToLocal += other.scheduledFibersToLocal; scheduledFibersToOverflowQueue += other.scheduledFibersToOverflowQueue; + scheduledFibersToMpscQueue += other.scheduledFibersToMpscQueue; maxQueueLength = std::max(maxQueueLength, other.maxQueueLength); nextFiberFromLocal += other.nextFiberFromLocal; nextFiberFromHintLocal += other.nextFiberFromHintLocal; diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.hpp b/emper/strategies/AbstractWorkStealingWorkerStats.hpp index 57052eecffd9448669e17bd5e6108d0cba732946..e8e25bef9e692d550ec21a722cbf23adbb67d835 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.hpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.hpp @@ -15,6 +15,7 @@ class AbstractWorkStealingWorkerStats { public: uint64_t scheduledFibersToLocal = 0; uint64_t scheduledFibersToOverflowQueue = 0; + uint64_t scheduledFibersToMpscQueue = 0; uint64_t maxQueueLength = 0; uint64_t nextFiberFromLocal = 0; uint64_t nextFiberFromHintLocal = 0; diff --git a/emper/strategies/laws/LawsDispatcher.cpp b/emper/strategies/laws/LawsDispatcher.cpp index 1cc2164df97f55fe3dc2c2b500a5201383e21bb9..7980f7c732cec4f466648c0645dfcff62e7f423a 100644 --- a/emper/strategies/laws/LawsDispatcher.cpp +++ b/emper/strategies/laws/LawsDispatcher.cpp @@ -2,10 +2,13 @@ // Copyright © 2020-2021 Florian Schmaus #include "LawsDispatcher.hpp" +#include <optional> + #include "Common.hpp" // for DIE_MSG #include "Emper.hpp" #include "Fiber.hpp" -#include "LawsStrategy.hpp" // for LawsStrategy, LawsStrategy::FiberSource +#include "FiberSource.hpp" +#include "LawsStrategy.hpp" // for LawsStrategy #include "NextFiberResult.hpp" #include "Runtime.hpp" #include "emper-common.h" @@ -23,41 +26,41 @@ void LawsDispatcher::recycle(Fiber* fiber) { void LawsDispatcher::dispatchLoop() { while (true) { - NextFiberResult next = runtime.nextFiber(); - Fiber* const fiber = next.fiber; - if (!fiber) { + std::optional<NextFiberResult> next = runtime.nextFiber(); + if (!next) { dispatchLoopDoSleep(); continue; } + Fiber* const fiber = next->fiber; + // The isRunnable() method performes an atomic swap on a boolean, // which was initialized to true, in order to check if this fiber // is runnable. if (isRunnable(fiber)) { if constexpr (emper::STATS) { - auto fiberSource = static_cast<LawsStrategy::FiberSource>(next.metadata); - switch (fiberSource) { - case LawsStrategy::FiberSource::fromPriority: + switch (next->source) { + case emper::FiberSource::mpscQueue: LawsStrategy::stats.dispatchedFibersFromPriority++; break; - case LawsStrategy::FiberSource::local: + case emper::FiberSource::local: LawsStrategy::stats.dispatchedFibersFromLocal++; break; - case LawsStrategy::FiberSource::hintWsq: + case emper::FiberSource::hintWsq: LawsStrategy::stats.dispatchedFibersFromHintLocal++; break; - case LawsStrategy::FiberSource::hintAq: + case emper::FiberSource::hintAq: LawsStrategy::stats.dispatchedFibersFromHintAnywhere++; break; - case LawsStrategy::FiberSource::stolen: + case emper::FiberSource::stolen: LawsStrategy::stats.dispatchedFibersStolen++; break; - case LawsStrategy::FiberSource::anywhereQueue: + case emper::FiberSource::anywhereQueue: LawsStrategy::stats.dispatchedFibersFromAnywhereQueue++; break; default: - DIE_MSG("Unknown fiber source: " << next.metadata); + DIE_MSG("Unknown fiber source: " << next->source); break; } } diff --git a/emper/strategies/laws/LawsScheduler.cpp b/emper/strategies/laws/LawsScheduler.cpp index 8066c5cb03c5a3efb05ac8ab43efefcb7f4e5dd9..9edcf3cd2930711e6a345c15c61114787356cc55 100644 --- a/emper/strategies/laws/LawsScheduler.cpp +++ b/emper/strategies/laws/LawsScheduler.cpp @@ -2,26 +2,18 @@ // Copyright © 2020-2021 Florian Schmaus #include "LawsScheduler.hpp" -#include <cstdint> - #include "CallerEnvironment.hpp" #include "Emper.hpp" +#include "FiberHint.hpp" +#include "FiberSource.hpp" #include "LawsStrategy.hpp" // IWYU pragma: keep #include "NextFiberResult.hpp" #include "Runtime.hpp" #include "emper-common.h" #include "strategies/laws/LawsWorkerStats.hpp" -thread_local LawsScheduler::LawsMpscQueue LawsScheduler::priorityQueue; - LawsScheduler::LawsScheduler(Runtime& runtime, RuntimeStrategy& strategy) - : AbstractWorkStealingScheduler(runtime, strategy) { - const workerid_t workerCount = runtime.getWorkerCount(); - priorityQueues = new LawsScheduler::LawsMpscQueue*[workerCount]; - - auto newWorkerHook = [this](workerid_t workerId) { priorityQueues[workerId] = &priorityQueue; }; - addNewWorkerHook(newWorkerHook); -} + : AbstractWorkStealingScheduler(runtime, strategy) {} template <CallerEnvironment callerEnvironment> void LawsScheduler::tryScheduleToPriorityQueue(Fiber& fiber) { @@ -58,7 +50,7 @@ void LawsScheduler::scheduleInternal(Fiber& fiber) { void LawsScheduler::scheduleFromAnywhereInternal(Fiber& fiber) { tryScheduleToPriorityQueue<CallerEnvironment::ANYWHERE>(fiber); enqueueInAnywhereQueue(fiber); - onNewWork<CallerEnvironment::ANYWHERE>(); + onNewWork<CallerEnvironment::ANYWHERE>(emper::FiberHint{emper::FiberSource::anywhereQueue}); } void LawsScheduler::scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) { @@ -67,17 +59,9 @@ void LawsScheduler::scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) tryScheduleToPriorityQueue<CallerEnvironment::ANYWHERE>(fiber); } insertInAnywhereQueue(fibers, count); - onNewWork<CallerEnvironment::ANYWHERE>(); + onNewWork<CallerEnvironment::ANYWHERE>(emper::FiberHint{emper::FiberSource::anywhereQueue}); } -auto LawsScheduler::nextFiber() -> NextFiberResult { - Fiber* fiber = priorityQueue.dequeue(); - if (fiber != nullptr) { - return NextFiberResult{ - fiber, - static_cast<uintptr_t>(LawsStrategy::FiberSource::fromPriority), - }; - } - - return nextFiberResultViaWorkStealing(); +auto LawsScheduler::nextFiber() -> std::optional<NextFiberResult> { + return nextFiberResultFromMpscQueueOrWorkStealing(); } diff --git a/emper/strategies/laws/LawsScheduler.hpp b/emper/strategies/laws/LawsScheduler.hpp index f4b0f242418e48ae92825736456979666913a6c5..cc69ab7ac57b41fc0ab39cd789972b6bdcefceef 100644 --- a/emper/strategies/laws/LawsScheduler.hpp +++ b/emper/strategies/laws/LawsScheduler.hpp @@ -1,9 +1,12 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020-2021 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus, Florian Fischer #pragma once +#include <optional> + #include "CallerEnvironment.hpp" #include "Fiber.hpp" +#include "emper-common.h" #include "lib/adt/MpscQueue.hpp" #include "strategies/AbstractWorkStealingScheduler.hpp" @@ -15,20 +18,21 @@ class LawsScheduler : public AbstractWorkStealingScheduler { using LawsMpscQueue = adt::MpscQueue<Fiber>; private: - LawsMpscQueue** priorityQueues; - - static thread_local LawsMpscQueue priorityQueue; + LawsMpscQueue**& priorityQueues = AbstractWorkStealingScheduler::mpscQueues; template <CallerEnvironment callerEnvironment> void tryScheduleToPriorityQueue(Fiber& fiber); protected: void scheduleInternal(Fiber& fiber) override; + void scheduleOnInternal(Fiber& fiber, workerid_t workerId) override { + scheduleToMpscQueue(fiber, workerId); + } void scheduleFromAnywhereInternal(Fiber& fiber) override; void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) override; public: LawsScheduler(Runtime& runtime, RuntimeStrategy& strategy); - auto nextFiber() -> NextFiberResult override; + auto nextFiber() -> std::optional<NextFiberResult> override; }; diff --git a/emper/strategies/laws/LawsStrategy.hpp b/emper/strategies/laws/LawsStrategy.hpp index d287b446cfbdf667beb258a4ad0a27c880c8a953..c2eea7ed0bba35ef6e19987cd764244f6f4780ac 100644 --- a/emper/strategies/laws/LawsStrategy.hpp +++ b/emper/strategies/laws/LawsStrategy.hpp @@ -2,11 +2,9 @@ // Copyright © 2020-2021 Florian Schmaus #pragma once -#include <cstdint> #include <memory> #include "WorkerLocalData.hpp" -#include "strategies/AbstractWorkStealingScheduler.hpp" #include "strategies/AbstractWorkStealingStrategy.hpp" #include "strategies/laws/LawsDispatcher.hpp" #include "strategies/laws/LawsScheduler.hpp" @@ -19,16 +17,6 @@ struct LawsWorkerStats; class LawsStrategy : public AbstractWorkStealingStrategy { private: - enum class FiberSource : uintptr_t { - local = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::local), - hintWsq = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::hintWsq), - hintAq = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::hintAq), - stolen = static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::stolen), - anywhereQueue = - static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::anywhereQueue), - fromPriority, - }; - LawsScheduler scheduler; LawsDispatcher dispatcher; diff --git a/emper/strategies/ws/WsDispatcher.cpp b/emper/strategies/ws/WsDispatcher.cpp index 00569799c74b21876dcb3c2c42fcbd093f0823a7..7826f33a82d0628091dbccaf38c6cca90bcb61f9 100644 --- a/emper/strategies/ws/WsDispatcher.cpp +++ b/emper/strategies/ws/WsDispatcher.cpp @@ -1,7 +1,9 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2022 Florian Schmaus Florian Fischer #include "WsDispatcher.hpp" +#include <optional> + #include "NextFiberResult.hpp" #include "Runtime.hpp" // for Runtime @@ -9,13 +11,13 @@ class Fiber; void WsDispatcher::dispatchLoop() { while (true) { - NextFiberResult next = runtime.nextFiber(); - Fiber* const fiber = next.fiber; - if (!fiber) { + std::optional<NextFiberResult> next = runtime.nextFiber(); + if (!next) { dispatchLoopDoSleep(); continue; } + Fiber* const fiber = next->fiber; dispatch(fiber); diff --git a/emper/strategies/ws/WsScheduler.hpp b/emper/strategies/ws/WsScheduler.hpp index c0ce9c3b8766f7b673e71a34f04ac24974e1e27c..af94eaa074eecad47a17abd5ab36e749f1468933 100644 --- a/emper/strategies/ws/WsScheduler.hpp +++ b/emper/strategies/ws/WsScheduler.hpp @@ -2,8 +2,13 @@ // Copyright © 2020-2021 Florian Schmaus #pragma once +#include <optional> + #include "CallerEnvironment.hpp" +#include "FiberHint.hpp" +#include "FiberSource.hpp" #include "NextFiberResult.hpp" +#include "emper-common.h" #include "strategies/AbstractWorkStealingScheduler.hpp" class Fiber; @@ -13,18 +18,23 @@ class RuntimeStrategy; class WsScheduler : public AbstractWorkStealingScheduler { protected: void scheduleInternal(Fiber& fiber) override { scheduleViaWorkStealing(fiber); } + void scheduleOnInternal(Fiber& fiber, workerid_t workerId) override { + scheduleToMpscQueue(fiber, workerId); + } void scheduleFromAnywhereInternal(Fiber& fiber) override { enqueueInAnywhereQueue(fiber); - onNewWork<CallerEnvironment::ANYWHERE>(); + onNewWork<CallerEnvironment::ANYWHERE>(emper::FiberHint{emper::FiberSource::anywhereQueue}); } void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) override { insertInAnywhereQueue(fibers, count); - onNewWork<CallerEnvironment::ANYWHERE>(); + onNewWork<CallerEnvironment::ANYWHERE>(emper::FiberHint{emper::FiberSource::anywhereQueue}); } public: WsScheduler(Runtime& runtime, RuntimeStrategy& strategy); - auto nextFiber() -> NextFiberResult override { return nextFiberResultViaWorkStealing(); }; + auto nextFiber() -> std::optional<NextFiberResult> override { + return nextFiberResultFromMpscQueueOrWorkStealing(); + }; }; diff --git a/meson_options.txt b/meson_options.txt index 96b76c34be3435f7737a42e7b4523b7811d2f014..115a5e5546fef221b255f89416dc17a6dab3977e 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -40,7 +40,10 @@ option( 'worker_wakeup_strategy', type: 'combo', description: 'The strategy used to wakeup sleeping workers (only effective if worker_sleep is enabled)', - choices: ['one', 'throttle', 'all'], + choices: ['one', 'all'], + # Disable throttle until it works with notifySpecific + # TODO: Fix throttle and notifySpecific + # choices: ['one', 'throttle', 'all'], value: 'one', ) option( @@ -57,6 +60,7 @@ option( 'posix', 'locked', 'futex', + 'futex2', ], value: 'posix', description: 'Semaphore implementation to suspend/wakeup workers', diff --git a/tests/ScheduleOnTest.cpp b/tests/ScheduleOnTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f458a99dd16ed3557d7c9d848334bcb406c23598 --- /dev/null +++ b/tests/ScheduleOnTest.cpp @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "CountingPrivateSemaphore.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" +#include "Worker.hpp" +#include "emper.hpp" +#include "fixtures/assert.hpp" + +static const unsigned ITERATIONS = 100; +static Runtime* runtime; +static unsigned workerCount; +unsigned iteration = 0; + +static void runOn(CPS& cps) { + ASSERT(Worker::getCurrentWorkerId() == (iteration % workerCount)); + ++iteration; + if (iteration == ITERATIONS) cps.signalAndExit(); + + runtime->scheduleOn(*Fiber::from([&] { runOn(cps); }), (iteration % workerCount)); +} + +static void scheduleOnTest() { + runtime = Runtime::getRuntime(); + ASSERT(runtime); + workerCount = runtime->getWorkerCount(); + + emper::sleep(1); + + CPS cps(1); + runtime->scheduleOn(*Fiber::from([&] { runOn(cps); }), (iteration % workerCount)); + cps.wait(); +} + +void emperTest() { scheduleOnTest(); } diff --git a/tests/ScheduleOnUnknownWorkerTest.cpp b/tests/ScheduleOnUnknownWorkerTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2af239c568edfb449cce451d50ceeaaa03798441 --- /dev/null +++ b/tests/ScheduleOnUnknownWorkerTest.cpp @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "Fiber.hpp" +#include "Runtime.hpp" +#include "fixtures/assert.hpp" + +auto main() -> int { + const unsigned nthreads = 1; + Runtime runtime(nthreads); + + runtime.executeAndWait([&] { + auto* next = Fiber::from([] { ASSERT(false); }); + runtime.scheduleOn(*next, nthreads + 1); + }); + + return 0; +} diff --git a/tests/io/CancelFutureTest.cpp b/tests/io/CancelFutureTest.cpp index ca44dbc8d4c7e332f231bebacb6a97665601f702..46ac6eb0a682a0fa0592190f981e6ca120a868b9 100644 --- a/tests/io/CancelFutureTest.cpp +++ b/tests/io/CancelFutureTest.cpp @@ -4,11 +4,18 @@ #include <cerrno> // for ECANCELED, ETIME #include <cstdint> // for uint64_t, int32_t +#include <memory> #include "Common.hpp" +#include "CountingPrivateSemaphore.hpp" #include "Emper.hpp" +#include "Runtime.hpp" +#include "Semaphore.hpp" +#include "emper.hpp" #include "fixtures/assert.hpp" -#include "io/Future.hpp" // for ReadFuture, WriteFuture +#include "io.hpp" +#include "io/Future.hpp" +#include "lib/LinuxVersion.hpp" using emper::io::ReadFuture; using emper::io::WriteFuture; @@ -17,7 +24,7 @@ int efd; uint64_t read_buf; uint64_t write_buf = 42; -void cancelSubmittedCallback() { +static void cancelSubmittedCallback() { ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); readFuture.setCallback([](int32_t res) { ASSERT(res == -ECANCELED) }); readFuture.submit(); @@ -32,18 +39,18 @@ void cancelSubmittedCallback() { } } -void cancelNotSubmitted() { +static void cancelNotSubmitted() { ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); ASSERT(readFuture.cancel() == -ENOENT); } -void cancelSubmittedNotCompleted() { +static void cancelSubmittedNotCompleted() { ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); readFuture.submit(); ASSERT(readFuture.cancel() == -ECANCELED); } -void cancelCompleted() { +static void cancelCompleted() { ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); readFuture.submit(); WriteFuture writeFuture(efd, &write_buf, sizeof(write_buf), 0); @@ -51,27 +58,17 @@ void cancelCompleted() { ASSERT(readFuture.cancel() == sizeof(write_buf) && read_buf == write_buf); } -void cancelNotCompletedChain() { +static void cancelNotCompletedChain() { ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); ReadFuture readFuture2(efd, &read_buf, sizeof(read_buf), 0); readFuture2.setDependency(readFuture); readFuture2.submit(); ASSERT(readFuture2.cancel() == -ECANCELED); - ASSERT(readFuture.wait() == -ECANCELED); + ASSERT(readFuture.get() == -ECANCELED); } -void cancelPartialCompletedChain() { - ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); - ReadFuture readFuture2(efd, &read_buf, sizeof(read_buf), 0); - readFuture2.setDependency(readFuture); - - readFuture2.submit(); - ASSERT(readFuture2.cancel() == -ECANCELED); - ASSERT(readFuture.wait() == -ECANCELED); -} - -void cancelNotCompletedFutureChain() { +static void cancelPartialCompletedChain() { ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); WriteFuture writeFuture(efd, &write_buf, sizeof(write_buf), 0); ReadFuture readFuture2(efd, &read_buf, sizeof(read_buf), 0); @@ -82,7 +79,70 @@ void cancelNotCompletedFutureChain() { // TODO: investigate why this read is completed with -EINTR most of the time int r = readFuture2.cancel(); ASSERT(r == -EINTR || r == -ECANCELED); - ASSERT(readFuture.wait() == sizeof(write_buf) && read_buf == write_buf); + ASSERT(readFuture.get() == sizeof(write_buf) && read_buf == write_buf); +} + +/** + * @brief Cancel a lot of read futures hopefully going through scheduleOn + * + * The test uses one fiber per worker. Each fiber creates an eventfd, prepares + * and submits a ReadFuture. Then each Fiber yields and creates a second ReadFuture. + * After all Fibers are done with this preparation each will issue a write to the + * next eventfd. + * Each fiber yields a last time to change the worker and exit resulting in + * cancellation of two Futures one possible completed and one definitely outstanding. + */ +static void massCancelOnDifferentWorker() { + const unsigned fiberCount = Runtime::getRuntime()->getWorkerCount() * 5; + int* evfds = new int[fiberCount]; + emper::Semaphore readySem; + emper::Semaphore startSem; + + CPS cps; + for (unsigned i = 0; i < fiberCount; ++i) { + spawn( + [&, i = i] { + uint64_t rbuf = 0; + const uint64_t wbuf = i; + + evfds[i] = eventfd(0, 0); + + ReadFuture rf(evfds[i], &rbuf, sizeof(rbuf), 0); + rf.submit(); + + emper::yield(); + + ReadFuture rf2(evfds[i], &rbuf, sizeof(rbuf), 0); + rf2.submit(); + + readySem.release(); + startSem.acquire(); + + WriteFuture wf(evfds[(i + 1) % fiberCount], &wbuf, sizeof(wbuf), 0); + wf.submitAndWait(); + + emper::yield(); + rf.cancel(); + rf2.cancel(); + }, + cps); + } + + for (unsigned i = 0; i < fiberCount; ++i) { + readySem.acquire(); + } + + for (unsigned i = 0; i < fiberCount; ++i) { + startSem.release(); + } + cps.wait(); + + CPS closeCps; + for (unsigned i = 0; i < fiberCount; ++i) { + spawn([&, i = i] { emper::io::closeAndForget(evfds[i]); }, closeCps); + } + closeCps.wait(); + delete[] evfds; } void emperTest() { @@ -97,4 +157,12 @@ void emperTest() { cancelCompleted(); cancelNotCompletedChain(); cancelPartialCompletedChain(); + + // Async cancelation is racy before linux 5.17. + // Work may be not found at all or only the work is cancelled but + // not the armed poll. + // https://lore.kernel.org/io-uring/20220119024241.609233-1-axboe@kernel.dk/T/#t + if (EMPER_LINUX_GE("5.17.0")) { + massCancelOnDifferentWorker(); + } } diff --git a/tests/lib/LinuxVersionTest.cpp b/tests/lib/LinuxVersionTest.cpp index b14ec049c47731467a0fe84b80d082a716f57188..4f171a74b2a5a98d91dc0725cf90405328b3a12c 100644 --- a/tests/lib/LinuxVersionTest.cpp +++ b/tests/lib/LinuxVersionTest.cpp @@ -73,3 +73,9 @@ TEST(LinuxVersion, ge) { ASSERT_TRUE(TestLinuxVersion::ge("5.14.0", "4.13")); ASSERT_TRUE(TestLinuxVersion::ge("5.14.0", "5.14.0")); } + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +TEST(LinuxVersion, real_world) { + ASSERT_TRUE(TestLinuxVersion::le("5.15.0-0.bpo.2-amd64", "5.15.0")); + ASSERT_TRUE(TestLinuxVersion::ge("5.17.0", "5.15.0-0.bpo.2-amd64")); +} diff --git a/tests/meson.build b/tests/meson.build index c1d0d09a2aee9f1cb30a7367dad830a892b685fb..eb4e9607f4c6ef43e5acb1024efd20f38d6fdc4a 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -49,6 +49,21 @@ tests = [ 'test_runner': 'emper', }, + { + 'source': files('ScheduleOnTest.cpp'), + 'name': 'ScheduleOnTest', + 'description': 'Test Scheduler::scheduleOn', + 'test_runner': 'emper', + }, + + { + 'source': files('ScheduleOnUnknownWorkerTest.cpp'), + 'name': 'ScheduleOnUnknownWorkerTest', + 'description': 'Test Scheduler::scheduleOn with to big workerId', + 'should_fail': true, + 'is_parallel': true, + }, + { 'source': files('ReuseBpsTest.cpp'), 'name': 'ReuseBpsTest', diff --git a/tools/gdb/dump_runtime_state.py b/tools/gdb/dump_runtime_state.py index ae1a6d9e1774d348ee261d9dbdc5799310d94ae2..6fd8b3b2c212b3e2f6febec79aa1b5b6fee82758 100644 --- a/tools/gdb/dump_runtime_state.py +++ b/tools/gdb/dump_runtime_state.py @@ -113,7 +113,6 @@ def print_io(io, indentation=''): if not submitter.address: print(f'{indentation}submitter: {submitter}') - print(f'{indentation}waitInflight: {io["waitInflight"]["_M_base"]["_M_i"]}') # print(f'{indentation}{io["CQE_BATCH_SIZE"]}') print(f'{indentation}reqs_in_uring: {io["reqs_in_uring"]["_M_i"]}') print(f'{indentation}preparedSqes: {io["preparedSqes"]}')