diff --git a/emper/Actor.hpp b/emper/Actor.hpp index 4065ee2c8ff74916ede9e851126249424d8f1431..f8d5651b5fb76c9cbfc7fee83977f3075aa0364d 100644 --- a/emper/Actor.hpp +++ b/emper/Actor.hpp @@ -9,6 +9,7 @@ #include "Fiber.hpp" #include "UnboundedBlockingMpscQueue.hpp" #include "emper.hpp" +#include "io/Future.hpp" template <typename T> class Actor { @@ -19,8 +20,10 @@ class Actor { Running, }; + protected: Runtime& runtime; + private: std::atomic<State> state = {Stopped}; UnboundedBlockingMpscQueue<T> queue; @@ -73,9 +76,15 @@ class Actor { void startFromAnywhere() { start<CallerEnvironment::ANYWHERE>(); } - void tell(T t) { queue.put(t); } + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> + void tell(T t) { + queue.template put<callerEnvironment>(t); + } - void tellFromAnywhere(T t) { queue.putFromAnywhere(t); } + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER, typename InputIt> + void tell(InputIt begin, InputIt end) { + queue.template put<callerEnvironment>(begin, end); + } auto pendingMailboxItems() -> size_t { return queue.size(); } @@ -86,7 +95,7 @@ class Actor { if constexpr (emper::IO) { emper::sleep(1); } else { - emper::yield(); + runtime.yield(); } // TODO: The suppressed linter error below may be a false positive // reported by clang-tidy. diff --git a/emper/Debug.cpp b/emper/Debug.cpp index b97d82f3b778efa54584e79439e462434e0c13ad..93aaa2430431a3d488e5b62a4834a7d16b64d2da 100644 --- a/emper/Debug.cpp +++ b/emper/Debug.cpp @@ -48,7 +48,7 @@ void emper_log(const std::string& prefix, const std::string& message) { // Are we the global IO completer Runtime* runtime = Runtime::getRuntime(); GlobalIoContext* gio = runtime ? runtime->globalIo : nullptr; - if (gio && pthread_self() == gio->globalCompleter) { + if (gio && pthread_self() == gio->completer) { logMessage << "IOC "; } else { logMessage << " "; diff --git a/emper/Emper.hpp b/emper/Emper.hpp index 530c03b566a4d22d5b16448e422273352621dbbc..2beb9c0149d9ed8a067eea15a6622d3d1be9b0e3 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -104,6 +104,24 @@ static const bool IO = #endif ; +static const bool IO_SINGLE_URING = +#ifdef EMPER_IO_SINGLE_URING + true +#else + false +#endif + ; + +static const bool IO_TRY_SYSCALL = +#ifdef EMPER_IO_TRY_SYSCALL + true +#else + false +#endif + ; + +static const bool IO_WORKER_URING = IO && !IO_SINGLE_URING; + static const bool IO_URING_SQPOLL = #ifdef EMPER_IO_URING_SQPOLL true diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 8eab259cddbe143675ae72f432d37d1740968459..ad79ad43996e49fea2c6d1afcfb52cab77916416 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -110,11 +110,12 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory // The global io_uring needs at least workerCount entries in its SQ because // for each worker's IoContext one eventfd read is prepared before the // globalCompleter is started and submits all previously prepared sqes. - globalIo = new GlobalIoContext(*this, workerCount); - globalIo->startGlobalCompleter(); + const unsigned entries = emper::IO_SINGLE_URING ? EMPER_IO_WORKER_URING_ENTRIES : workerCount; + globalIo = new GlobalIoContext(*this, entries); + globalIo->startCompleter(); if constexpr (emper::STATS) { - globalIo->stats.workerId = emper::io::Stats::GLOBAL_COMPLETER_ID; + globalIo->stats.workerId = emper::io::Stats::COMPLETER_ID; } } @@ -234,7 +235,7 @@ Runtime::~Runtime() { auto Runtime::workerLoop(Worker* worker) -> void* { worker->setWorker(); - if constexpr (emper::IO) { + if constexpr (emper::IO_WORKER_URING) { auto* workerIo = new IoContext(*this); if constexpr (emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) { @@ -323,7 +324,7 @@ void Runtime::yield() { } auto Runtime::nextFiber() -> NextFiberResult { - if constexpr (emper::IO) { + if constexpr (emper::IO_WORKER_URING) { // Schedule all fibers waiting on completed IO IoContext::ContinuationBuffer completions; unsigned ncompletions = IoContext::getWorkerIo()->reapCompletions(completions); diff --git a/emper/UnboundedBlockingMpscQueue.hpp b/emper/UnboundedBlockingMpscQueue.hpp index 9df3fdb02e166ec812266423dd91fc650939c748..67353bfa3090c31bbff48ce589ad1bcb4da14b46 100644 --- a/emper/UnboundedBlockingMpscQueue.hpp +++ b/emper/UnboundedBlockingMpscQueue.hpp @@ -60,8 +60,29 @@ class UnboundedBlockingMpscQueue : public Blockable<LogSubsystem::U_B_MPSC_Q> { } } + template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER, typename InputIt> + void put(InputIt begin, InputIt end) { + { + std::lock_guard<std::mutex> lock(queueMutex); + for (InputIt cur = begin; cur != end; ++cur) { + mpscQueue.push(*cur); + } + } + + // Micro optimization, see if there is a blocked context + // before performing the atomic exchange operation. + if (blockedContext != nullptr) { + tryToWakeupBlockedContext<callerEnvironment>(); + } + } + void putFromAnywhere(T t) { put<CallerEnvironment::ANYWHERE>(t); } + template <typename InputIt> + void putFromAnywhere(InputIt begin, InputIt end) { + put<CallerEnvironment::ANYWHERE>(begin, end); + } + auto get(const std::function<void(void)>& postRetrieve) -> T { tPopped = false; tryToGetElement(postRetrieve); diff --git a/emper/io/Future.cpp b/emper/io/Future.cpp index f6767182e3b977a06ef3122b86f590c9ef820c25..20b361b3d9332fbab4cd96488b0bda70b5dbb1f8 100644 --- a/emper/io/Future.cpp +++ b/emper/io/Future.cpp @@ -84,7 +84,11 @@ auto Future::cancel() -> int32_t { state.cancelled = true; - if (!isSubmitted() || isForgotten()) { + // With the separation of isSubmitted and isPrepared dependent Futures + // are not marked as submitted anymore. + // This means we have to check if we are canceling a not submitted Future + // which is not a dependency. + if ((!isSubmitted() && !isDependency()) || isForgotten()) { LOGW("Cancelling " << (isForgotten() ? "forgotten " : "unsubmitted ") << this); return -ENOENT; } @@ -116,4 +120,38 @@ void PartialCompletableFuture::recordCompletionInternal(Stats& stats, int32_t re void WritevFuture::recordCompletionInternal(Stats& stats, int32_t res) const { stats.recordCompletion(*this, res); } + +void SendFuture::submit() { + if constexpr (emper::IO_TRY_SYSCALL) { + if (dependency) goto normal_submit; + + ssize_t res = send(fd, buf, len, flags | MSG_DONTWAIT); + if (res != -1 || (errno != EAGAIN && errno != EWOULDBLOCK)) { + state.submitted = true; + int ires = res == -1 ? -errno : static_cast<int>(res); + complete(ires); + return; + } + } + +normal_submit: + Future::submit(); +} + +void RecvFuture::submit() { + if constexpr (emper::IO_TRY_SYSCALL) { + if (dependency) goto normal_submit; + + ssize_t res = recv(fd, buf, len, flags | MSG_DONTWAIT); + if (res != -1 || (errno != EAGAIN && errno != EWOULDBLOCK)) { + state.submitted = true; + int ires = res == -1 ? -errno : static_cast<int>(res); + complete(ires); + return; + } + } + +normal_submit: + Future::submit(); +} } // namespace emper::io diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp index 46946d6fc202e4cc73899083838d5928791b3f56..9a1960750e778895b0fc02a447a0c01ab1002751 100644 --- a/emper/io/Future.hpp +++ b/emper/io/Future.hpp @@ -31,6 +31,7 @@ class Fiber; namespace emper::io { class Stats; +class SubmitActor; // IWYU pragma: keep class FutureError : public std::logic_error { friend class Future; @@ -44,6 +45,7 @@ class Future : public Logger<LogSubsystem::IO> { friend class GlobalIoContext; friend class IoContext; friend class Stats; + friend class SubmitActor; public: // User facing Callback type @@ -63,7 +65,8 @@ class Future : public Logger<LogSubsystem::IO> { }; using State = struct State { - uint8_t submitted : 1 = 0; /*!< An sqe for this Future was prepared */ + uint8_t submitted : 1 = 0; /*!< This Future was submitted to the IO subsystem */ + uint8_t prepared : 1 = 0; /*!< An sqe for this Future was prepared */ uint8_t completed : 1 = 0; /*!< The semaphor was signaled */ uint8_t retrieved : 1 = 0; /*!< Future::wait() has retrieved the result */ uint8_t dependency : 1 = 0; /*!< This Future is a dependency of another */ @@ -72,6 +75,13 @@ class Future : public Logger<LogSubsystem::IO> { 0; /*!< This Future does not care about the result of the IO operation */ }; + /* + * @brief Semaphore enabling Fibers to wait for the completion of this Future + * + * The semaphore is either used to block contexts waiting on the future + * or used to ensure that the future is not forgotten and thus possibly deleted + * until it was handled by the submit actor (only in the IO_SINGLE_URING case). + */ BPS sem; State state; @@ -116,6 +126,16 @@ class Future : public Logger<LogSubsystem::IO> { void submitAndForget() { state.forgotten = true; submit(); + // When using a SubmitActor a Future may be submitted to the SubmitActor but + // no SQE was prepared for it yet. + // It must be ensured that the Future is prepared before forgetting and possibly dropping it. + // We await its preperation using the nomal BPS which is not used otherwise for forgotten + // Futures. + // The BPS of forgotten Futures is signaled in IoContext::prepareFutureChainwhen + // when using a single io_uring + if constexpr (emper::IO_SINGLE_URING) { + sem.wait(); + } } void setCompletion(int32_t res) { @@ -261,7 +281,7 @@ class Future : public Logger<LogSubsystem::IO> { /* * @brief submit Future for asynchronous completion to the workers IoContext */ - void submit(); + virtual void submit(); /** * @brief cancel Future and wait if necessary for its cancellation @@ -478,6 +498,8 @@ class SendFuture : public PartialCompletableFuture { SendFuture(int socket, const void* buffer, size_t length, int flags, bool send_all = true) : SendFuture(socket, const_cast<void*>(buffer), length, flags, send_all ? ENABLE_PARTIAL_COMPLETION : DISABLE_PARTIAL_COMPLETION){}; + + void submit() override; }; class RecvFuture : public Future { @@ -486,6 +508,8 @@ class RecvFuture : public Future { public: RecvFuture(int socket, void* buffer, size_t length, int flags) : Future(Operation::RECV, socket, buffer, length, flags){}; + + void submit() override; }; class ConnectFuture : public Future { diff --git a/emper/io/GlobalIoContext.cpp b/emper/io/GlobalIoContext.cpp index 161ccd8b5d4fe4f366d705ceb8b2d48641c65e93..a561324791c92aabc9635efa7eedec96d96a85d9 100644 --- a/emper/io/GlobalIoContext.cpp +++ b/emper/io/GlobalIoContext.cpp @@ -8,16 +8,18 @@ #include <cassert> #include <cerrno> -#include <cstdio> #include <mutex> #include "CallerEnvironment.hpp" #include "Common.hpp" +#include "Debug.hpp" #include "Emper.hpp" +#include "Fiber.hpp" #include "Runtime.hpp" #include "Worker.hpp" #include "io/Future.hpp" #include "io/IoContext.hpp" +#include "io/SubmitActor.hpp" #include "lib/TaggedPtr.hpp" #include "lib/sync/Semaphore.hpp" #include "sleep_strategy/WorkerSleepStrategy.hpp" @@ -26,55 +28,56 @@ using emper::lib::TaggedPtr; namespace emper::io { -auto GlobalIoContext::globalCompleterFunc(void* arg) -> void* { - auto* globalIoContext = reinterpret_cast<GlobalIoContext*>(arg); +void GlobalIoContext::completerLoop() { + int submitted; - // wait till all workers registered their IoContext's eventfd - for (workerid_t i = 0; i < globalIoContext->runtime.workerCount; ++i) { - globalIoContext->runtime.ioReadySem.wait(); - } + if constexpr (emper::IO_WORKER_URING) { + // wait till all workers registered their IoContext's eventfd + for (workerid_t i = 0; i < runtime.workerCount; ++i) { + runtime.ioReadySem.wait(); + } - globalIoContext->logD("submit all worker io_uring eventfds"); + LOGD("submit all worker io_uring eventfds"); - // submit all eventfds in the SQ inserted by IoContext::submit_efd calls - int submitted = io_uring_submit(&globalIoContext->ring); - if (unlikely(submitted < 0)) { - DIE_MSG_ERRNO("initial global io_uring submit failed"); - } + // submit all eventfds in the SQ inserted by IoContext::submit_efd calls + submitted = io_uring_submit(&ring); + if (unlikely(submitted < 0)) { + DIE_MSG_ERRNO("initial global io_uring submit failed"); + } - // We have submitted all eventfds - assert(submitted == globalIoContext->runtime.getWorkerCount()); + // We have submitted all eventfds + assert(submitted == runtime.getWorkerCount()); + } // Submit the read for our termination eventfd - struct io_uring_sqe* sqe = io_uring_get_sqe(&globalIoContext->ring); + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring); assert(sqe); - // We reuse the notificationEventFd member of IoContext to receive a the termination notification - io_uring_prep_read(sqe, globalIoContext->notificationEventFd, - &globalIoContext->notificationEventFdBuf, - sizeof(globalIoContext->notificationEventFdBuf), 0); + // We reuse the notificationEventFd member of IoContext to receive the termination notification + io_uring_prep_read(sqe, notificationEventFd, ¬ificationEventFdBuf, + sizeof(notificationEventFdBuf), 0); TaggedPtr terminationEvent(static_cast<void*>(nullptr), static_cast<uint16_t>(PointerTags::TerminationEvent)); io_uring_sqe_set_data(sqe, terminationEvent); - submitted = io_uring_submit(&globalIoContext->ring); + submitted = io_uring_submit(&ring); if (unlikely(submitted < 0)) { DIE_MSG_ERRNO("submitting termination eventfd read failed"); } assert(submitted == 1); - globalIoContext->logD("start global completer loop"); + LOGD("start completer loop"); while (true) { struct io_uring_cqe* cqe; // wait for completions - int err = io_uring_wait_cqe(&globalIoContext->ring, &cqe); + int err = io_uring_wait_cqe(&ring, &cqe); if (unlikely(err)) { if (err == -EINTR) { continue; } errno = -err; - perror("io_uring_wait_cqe"); + DIE_MSG_ERRNO("io_uring_wait_cqe"); } TaggedPtr tptr(io_uring_cqe_get_data(cqe)); @@ -86,17 +89,21 @@ auto GlobalIoContext::globalCompleterFunc(void* arg) -> void* { // clang-11 does not support [[likely]] yet // TODO: remove the preprocessor check if clang has [[likely]] support #if defined __has_attribute -#if __has_attribute(likely) +#if __has_attribute(likely) && defined EMPER_IO_WORKER_URING [[likely]] // NOLINT(clang-diagnostic-unknown-attributes) #endif #endif case PointerTags::IoContext : { + if constexpr (emper::IO_SINGLE_URING) { + DIE_MSG("Using a single uring there can never be IoContexts in the io_uring"); + } + auto* worker_io = tptr.getPtr<IoContext>(); assert(worker_io); // re-add the eventfd read - globalIoContext->prepareWorkerNotification(*worker_io); - submitted = io_uring_submit(&globalIoContext->ring); + prepareWorkerNotification(*worker_io); + submitted = io_uring_submit(&ring); if (unlikely(submitted < 0)) { errno = -submitted; @@ -108,8 +115,7 @@ auto GlobalIoContext::globalCompleterFunc(void* arg) -> void* { if constexpr (emper::IO_COMPLETER_BEHAVIOR == emper::IoCompleterBehavior::wakeup) { // Notify the worker which's CQ has a new cqe workerid_t worker = worker_io->worker->getWorkerId(); - globalIoContext->runtime.workerSleepStrategy.notifySpecific<CallerEnvironment::ANYWHERE>( - worker); + runtime.workerSleepStrategy.notifySpecific<CallerEnvironment::ANYWHERE>(worker); } else { worker_io->reapAndScheduleCompletions<CallerEnvironment::ANYWHERE>(); } @@ -119,18 +125,48 @@ auto GlobalIoContext::globalCompleterFunc(void* arg) -> void* { // The cqe is for a completed Future. // This happens in case the global completer is dealing with a // partial completion. - case PointerTags::Future: { +#if defined __has_attribute +#if __has_attribute(likely) && defined EMPER_IO_SINGLE_URING + [[likely]] // NOLINT(clang-diagnostic-unknown-attributes) +#endif +#endif + case PointerTags::Future : { auto* future = tptr.getPtr<Future>(); + // Forgotten Futures can only be in the single io_uring + if constexpr (emper::IO_SINGLE_URING) { + if (!future) break; + } + int32_t res = cqe->res; - future->recordCompletion(globalIoContext->stats, res); + future->recordCompletion(stats, res); future->completeFromAnywhere(res); + } + break; + + case PointerTags::Callback: { + if constexpr (emper::IO_WORKER_URING) { + DIE_MSG("Callbacks can never reach the GlobalIoContext from a worker IoContext"); + } + + auto* callback = tptr.getPtr<Future::CallbackInternal>(); + LOGD("Create new callback fiber for " << callback); + auto* callbackFiber = Fiber::from( + [&c = *callback, res = cqe->res] { + c(res); + delete &c; + }, + &callback->affinity); + + runtime.scheduleFromAnywhere(*callbackFiber); } break; // someone initiated termination case PointerTags::TerminationEvent: { - pthread_exit(nullptr); + // TODO: Convert to LOGI once EMPER has a proper log configuration mechanism. + LOGD("exit completer loop"); + return; } break; default: @@ -138,14 +174,24 @@ auto GlobalIoContext::globalCompleterFunc(void* arg) -> void* { break; } - io_uring_cqe_seen(&globalIoContext->ring, cqe); + io_uring_cqe_seen(&ring, cqe); } } -void GlobalIoContext::startGlobalCompleter() { - errno = pthread_create(&globalCompleter, nullptr, globalCompleterFunc, this); +void GlobalIoContext::startCompleter() { + if constexpr (emper::IO_SINGLE_URING) { + assert(submitter); + submitter->startFromAnywhere(); + } + + auto completerFunc = [](void* arg) -> void* { + reinterpret_cast<GlobalIoContext*>(arg)->completerLoop(); + return nullptr; + }; + errno = pthread_create(&completer, nullptr, completerFunc, this); + if (unlikely(errno)) { - DIE_MSG_ERRNO("Creating global completer thread failed"); + DIE_MSG_ERRNO("Creating completer thread failed"); } } @@ -158,7 +204,7 @@ void GlobalIoContext::initiateTermination() { } void GlobalIoContext::waitUntilFinished() const { - errno = pthread_join(globalCompleter, nullptr); + errno = pthread_join(completer, nullptr); if (unlikely(errno)) { DIE_MSG_ERRNO("pthread_join failed for the globalCompleter"); } diff --git a/emper/io/GlobalIoContext.hpp b/emper/io/GlobalIoContext.hpp index 6a17f4239a718a33ebda1520b95785ec747d792b..862c944afba565523c84cd42af41f9d1b29ff99d 100644 --- a/emper/io/GlobalIoContext.hpp +++ b/emper/io/GlobalIoContext.hpp @@ -29,17 +29,17 @@ class GlobalIoContext : public IoContext { // register their IoContext's eventfds in parallel std::mutex registerLock; - enum class PointerTags : uint16_t { Future, IoContext, TerminationEvent }; + enum class PointerTags : uint16_t { Future, Callback, IoContext, TerminationEvent }; // pthread used to monitor the CQs from worker io_urings // as well as handling IO requests submitted from anywhere - pthread_t globalCompleter; + pthread_t completer; - /* function executed by the global completer thread */ - static auto globalCompleterFunc(void* arg) -> void*; + /* Loop executed by the completer thread */ + void completerLoop(); - // start the global completer thread - void startGlobalCompleter(); + // start the completer thread + void startCompleter(); void initiateTermination(); void waitUntilFinished() const; diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index 6c53f2b96d99d4e8f5880a5da585728890fe5233..e8b62ec4922fae02df973609ebfbad42e6decd72 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -18,6 +18,7 @@ #include <utility> #include <vector> +#include "BinaryPrivateSemaphore.hpp" #include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER, ANYWHERE #include "Common.hpp" // for unlikely, DIE_MSG_ERRNO, DIE_MSG #include "Debug.hpp" // for LOGD @@ -28,6 +29,7 @@ #include "io/Future.hpp" // for Future, operator<<, Future::State #include "io/GlobalIoContext.hpp" #include "io/Stats.hpp" // for Stats, nanoseconds +#include "io/SubmitActor.hpp" #include "lib/TaggedPtr.hpp" #include "sleep_strategy/PipeSleepStrategy.hpp" @@ -77,7 +79,19 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns io_uring_sqe_set_data(sqe, TaggedPtr(&future, static_cast<uint16_t>(PointerTags::Future))); } - future.state.submitted = true; + future.state.prepared = true; + + // If we use a single io_uring and a SubmitActor forgotten futures may be + // dropped before preparing a sqe. + // This is prevented by waiting on their BPS in Future::submitAndForget. + // This is save because the BPS is guarantied to be unused for forgotten Futures. + // We must signal the preparation of the forgotten future and allow its possible + // destruction. + if constexpr (emper::IO_SINGLE_URING) { + if (future.isForgotten()) { + future.sem.signal(); + } + } if (future.isDependency()) { LOGD("Prepare " << future << " as a dependency"); @@ -123,6 +137,43 @@ auto IoContext::submitPreparedSqesAndWait(unsigned wait_nr) -> unsigned { return static_cast<unsigned>(submitted); } +template <CallerEnvironment callerEnvironment> +void IoContext::cancelUnsubmittedChainParts(Future &chain) { + unsigned unsubmitted = io_uring_sq_ready(&ring); + Future *unsubmittedFuture = &chain; + + for (unsigned i = 0; i < unsubmitted; ++i) { + // we prepare sqes for future from the chain end to the head + // A -> B -> C will result in CQ [C, B, A] and preparedSqes [C, B, A] + // invalidate/cancel N sqes/Futures from the back + struct io_uring_sqe *sqe = preparedSqes.back(); + preparedSqes.pop_back(); + // invalidate the prepared sqe + io_uring_prep_nop(sqe); + io_uring_sqe_set_data(sqe, nullptr); + + // cancel the according future + unsubmittedFuture->recordCompletion(stats, -ECANCELED); + if constexpr (callerEnvironment == EMPER) { + unsubmittedFuture->complete(-ECANCELED); + } else { + unsubmittedFuture->completeFromAnywhere(-ECANCELED); + } + unsubmittedFuture = chain.dependency; + } + + // submit all now invalidated sqes +#ifdef NDEBUG + ATTR_UNUSED +#endif + unsigned invalidated = submitPreparedSqes<callerEnvironment>(); + assert(invalidated == unsubmitted); +} + +// show the compiler our template incarnations +template void IoContext::cancelUnsubmittedChainParts<CallerEnvironment::EMPER>(Future &future); +template void IoContext::cancelUnsubmittedChainParts<CallerEnvironment::ANYWHERE>(Future &future); + template <CallerEnvironment callerEnvironment> void IoContext::submitAndWait(Future &future, unsigned wait_nr) { LOGD("submitting " << future << (future.dependency ? " and it's dependencies" : "")); @@ -148,35 +199,7 @@ void IoContext::submitAndWait(Future &future, unsigned wait_nr) { // result in cqes with result -ECANCELD and the invalid one will // generate a cqe with the appropriate error code if (unlikely(static_cast<unsigned>(submitted) < prepared)) { - unsigned unsubmitted = io_uring_sq_ready(&ring); - Future *unsubmittedFuture = &future; - - for (unsigned i = 0; i < unsubmitted; ++i) { - // we prepare sqes for future from the chain end to the head - // A -> B -> C will result in CQ [C, B, A] and preparedSqes [C, B, A] - // invalidate/cancel N sqes/Futures from the back - struct io_uring_sqe *sqe = preparedSqes.back(); - preparedSqes.pop_back(); - // invalidate the prepared sqe - io_uring_prep_nop(sqe); - io_uring_sqe_set_data(sqe, nullptr); - - // cancel the according future - unsubmittedFuture->recordCompletion(stats, -ECANCELED); - if constexpr (callerEnvironment == EMPER) { - unsubmittedFuture->complete(-ECANCELED); - } else { - unsubmittedFuture->completeFromAnywhere(-ECANCELED); - } - unsubmittedFuture = future.dependency; - } - - // submit all now invalidated sqes -#ifdef NDEBUG - ATTR_UNUSED -#endif - unsigned invalidated = submitPreparedSqes<callerEnvironment>(); - assert(invalidated == unsubmitted); + cancelUnsubmittedChainParts<callerEnvironment>(future); } // we have submitted all our prepared sqes @@ -423,11 +446,17 @@ IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) if (unlikely(notificationEventFd < 0)) { DIE_MSG_ERRNO("creating eventfd for io_uring failed"); } + + if constexpr (emper::IO_SINGLE_URING) { + submitter = new SubmitActor(*this, runtime); + } } IoContext::~IoContext() { io_uring_queue_exit(&ring); // TODO: check if this is safe ::close(notificationEventFd); + + delete submitter; } } // namespace emper::io diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index b7825a04f06877cc8368a7f20f1b9a22513c569f..88f5ba9e74eb38b8e63153967cb47a0d32c2eb05 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -10,6 +10,7 @@ #include <cstddef> // for size_t #include <cstdint> // for uint64_t #include <functional> // for less +#include <iterator> #include <vector> #include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER @@ -18,8 +19,10 @@ #include "Emper.hpp" #include "Runtime.hpp" // for Runtime #include "Worker.hpp" -#include "emper-config.h" // for EMPER_IO_WORKER_URING_ENTRIES -#include "io/Stats.hpp" // for Stats +#include "emper-config.h" +#include "io/Future.hpp" +#include "io/Stats.hpp" +#include "io/SubmitActor.hpp" // IWYU pragma: keep #include "lib/adt/LockedSet.hpp" // for LockedSet class AbstractWorkStealingScheduler; @@ -49,11 +52,10 @@ class PipeSleepStrategy; } namespace emper::io { -class Future; - class IoContext : public Logger<LogSubsystem::IO> { friend class ::Runtime; friend class ::AbstractWorkStealingScheduler; + friend class SubmitActor; friend class Future; friend class SendFuture; friend class RecvFuture; @@ -83,6 +85,10 @@ class IoContext : public Logger<LogSubsystem::IO> { // Remember the worker object for this IoContext Worker *worker; + // SubmitActor used when multiple workers use the SQ of this IoContext. + // This only happens if emper is build with the IO_SINGLE_URING option + SubmitActor *submitter = nullptr; + // Flag to indicate that we already have a sleep related request in the io_uring. // Gets set by the worker on WaitdfdSleepStrategy::sleep() and // reset when reaping a completion containing a NewWork{Wsq,Aq} TaggedPtr. @@ -126,6 +132,9 @@ class IoContext : public Logger<LogSubsystem::IO> { */ auto prepareFutureChain(Future &future, unsigned chain_length = 1) -> unsigned; + template <CallerEnvironment callerEnvironment> + void cancelUnsubmittedChainParts(Future &chain); + enum class PointerTags : uint16_t { Future, Callback, NewWorkAq, NewWorkWsq }; /** @@ -187,6 +196,10 @@ class IoContext : public Logger<LogSubsystem::IO> { */ [[nodiscard]] static inline auto getWorkerIo() -> IoContext * { assert(Runtime::inRuntime()); + if constexpr (emper::IO_SINGLE_URING) { + return getIo<CallerEnvironment::ANYWHERE>(); + } + return workerIo; } @@ -221,7 +234,15 @@ class IoContext : public Logger<LogSubsystem::IO> { */ template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void submit(Future &future) { - submitAndWait(future, 0); + // Mark the Future as submitted. We do this here because all Futures + // must go through IoContext::submit even calling Future::submit + future.state.submitted = true; + + if constexpr (emper::IO_SINGLE_URING) { + submitter->tell<callerEnvironment>(&future); + } else { + submitAndWait(future, 0); + } } /** @@ -236,10 +257,24 @@ class IoContext : public Logger<LogSubsystem::IO> { template <typename InputIt, CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> auto submit(InputIt begin, InputIt end) -> unsigned { for (InputIt cur = begin; cur != end; ++cur) { - auto future = *cur; - prepareFutureChain(*future); + auto *future = *cur; + future->state.submitted = true; + + // This IoContext is Worker exclusive -> prepare the futures + if constexpr (emper::IO_WORKER_URING) { + prepareFutureChain(*future); + } + } + + // If we have already prepared futures submit them + if constexpr (emper::IO_WORKER_URING) { + return submitPreparedSqes<callerEnvironment>(); } - return submitPreparedSqes<callerEnvironment>(); + + // The IoContext is not Worker exclusive -> + // submit using the IoContexts's SubmitActor + submitter->tell<callerEnvironment>(begin, end); + return std::distance(begin, end); } /** @@ -254,6 +289,10 @@ class IoContext : public Logger<LogSubsystem::IO> { void submitAndReap(Future &future) { assert(Runtime::inRuntime()); submit<CallerEnvironment::EMPER>(future); + + if constexpr (emper::IO_SINGLE_URING) { + return; + } // io_uring will try to synchronously complete any IO request before // offloading it to the async backend. See io_uring_enter(2). // Immediate offloading can be enforced by setting the IOSQE_ASYNC flag in the sqe. diff --git a/emper/io/Stats.cpp b/emper/io/Stats.cpp index 258179e0105b9600d709fd2dea73c6c466ff4237..d1f76561ab59cda60c96c5b02905b165a9e8bef6 100644 --- a/emper/io/Stats.cpp +++ b/emper/io/Stats.cpp @@ -40,7 +40,7 @@ auto operator<<(std::ostream& os, const Stats::CompletionType& t) -> std::ostrea } auto operator<<(std::ostream& os, const Stats& s) -> std::ostream& { - if (s.workerId == Stats::GLOBAL_COMPLETER_ID) { + if (s.workerId == Stats::COMPLETER_ID) { bool futuresToPrint = false; std::stringstream ss; ss << "# global IO Stats: #" << std::endl; @@ -94,7 +94,7 @@ auto operator<<(std::ostream& os, const Stats& s) -> std::ostream& { os << std::endl; } - if (s.workerId == Stats::GLOBAL_COMPLETER_ID) { + if (s.workerId == Stats::COMPLETER_ID) { return os; } diff --git a/emper/io/Stats.hpp b/emper/io/Stats.hpp index f1854391b5d81eac0d9b94f1c97217f5d9159f06..bf303b68028ee2666fd34983604c5a55a1b2be83 100644 --- a/emper/io/Stats.hpp +++ b/emper/io/Stats.hpp @@ -23,11 +23,10 @@ #include "lib/math.hpp" class Runtime; // lines 28-28 -namespace emper { -namespace io { +namespace emper::io { class IoContext; -} -} // namespace emper +class SubmitActor; // IWYU pragma: keep +} // namespace emper::io namespace math = emper::lib::math; @@ -49,8 +48,9 @@ class Stats { friend class SendFuture; friend class RecvFuture; friend class WritevFuture; + friend class SubmitActor; - static const int GLOBAL_COMPLETER_ID = -1; + static const int COMPLETER_ID = -1; static const int AVG_ID = -2; int workerId = 0; diff --git a/emper/io/SubmitActor.cpp b/emper/io/SubmitActor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2f17af42b568fd91b31269c3319f8c6242c7a61f --- /dev/null +++ b/emper/io/SubmitActor.cpp @@ -0,0 +1,79 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "io/SubmitActor.hpp" + +#include <liburing.h> + +#include <atomic> // for atomic +#include <cassert> // for assert +#include <cerrno> // for EBUSY, ECANCELED, errno +#include <ostream> // for operator<<, ostream +#include <vector> // for vector + +#include "CallerEnvironment.hpp" +#include "Common.hpp" // for unlikely, DIE_MSG_ERRNO, TIME_NS +#include "Emper.hpp" // for DEBUG +#include "Runtime.hpp" +#include "io/Future.hpp" +#include "io/IoContext.hpp" + +namespace emper::io { +void SubmitActor::receive(Future* future) { + assert(future); + assert(future->isSubmitted()); + + // If someone already canceled the future before the submitter + // receives the future complete the chain with -ECANCELED and return + if (future->isCancelled()) { + // complete the chain + for (Future* f = future; f; f = future->dependency) { + f->recordCompletion(io.stats, -ECANCELED); + f->complete(-ECANCELED); + } + return; + } + + LOGD("submitting " << future << (future->dependency ? " and it's dependencies" : "")); + unsigned prepared = io.prepareFutureChain(*future); + + // submit the Future to the io_uring + int submitted = io_uring_submit(&io.ring); + + // We can't submit our sqe because the CQ is full -> yield + if (unlikely(submitted == -EBUSY)) { + if constexpr (emper::DEBUG) { + LOGGER_LOGI("io_uring_submit returned EBUSY trying to submit in addition to " + << io.reqs_in_uring); + } else { + LOGW("io_uring_submit returned EBUSY"); + } + + TIME_NS( + { + do { + runtime.yield(); + } while ((submitted = io_uring_submit(&io.ring)) == -EBUSY); + }, + io.stats.record_io_submit_full_cq); + } + + if (unlikely(submitted < 0)) { + errno = -submitted; + DIE_MSG_ERRNO("io_uring_submit failed"); + } + + // We submitted some Futures to the io_uring + // Because we submit every Future right away multiple prepared sqes can only + // occur for Future chains. + // If we could not submit the whole chain cancel all non submitted Futures + // because we can not guaranty the soundness of the chain + // req1 -> invalid_req -> req3 + // will submit only 2 instead of all 3 prepared sqes + // See: https://github.com/axboe/liburing/issues/186 + if (unlikely(static_cast<unsigned>(submitted) < prepared)) { + io.cancelUnsubmittedChainParts<CallerEnvironment::EMPER>(*future); + } + + io.preparedSqes.clear(); +} +} // namespace emper::io diff --git a/emper/io/SubmitActor.hpp b/emper/io/SubmitActor.hpp new file mode 100644 index 0000000000000000000000000000000000000000..203b728221ea1334fcc274c3f0eeed3c5984ea67 --- /dev/null +++ b/emper/io/SubmitActor.hpp @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#include "Actor.hpp" +#include "Debug.hpp" // for LogSubsystem, LogSubsystem::IO, Logger + +class Runtime; +namespace emper::io { +class Future; +class IoContext; +} // namespace emper::io + +namespace emper::io { +class SubmitActor : public Actor<Future *>, public Logger<LogSubsystem::IO> { + private: + IoContext &io; + + protected: + void receive(Future *future) override; + + public: + SubmitActor(IoContext &c, Runtime &runtime) : Actor(runtime), io(c) {} +}; +} // namespace emper::io diff --git a/emper/io/meson.build b/emper/io/meson.build index bceedac11de3e0d46515f28ebc0b350b5bf53b61..2f5a5d4033b502ccfa6f770d813cb1d1b60fc709 100644 --- a/emper/io/meson.build +++ b/emper/io/meson.build @@ -25,4 +25,5 @@ emper_cpp_sources += files( 'Operation.cpp', 'IoContext.cpp', 'GlobalIoContext.cpp', + 'SubmitActor.cpp', ) diff --git a/meson.build b/meson.build index 52f6063fd6a84b964f1014b5f31b8eeb095ab50c..056dd7c75cec49ad07b3f1ff7f15bd55a6eb0ba4 100644 --- a/meson.build +++ b/meson.build @@ -81,6 +81,8 @@ if option_io endif io_bool_options = [ + 'single_uring', + 'try_syscall', 'uring_sqpoll', 'uring_shared_wq', ] @@ -115,6 +117,17 @@ endif conf_data.set('EMPER_IO_COMPLETER_BEHAVIOR', io_completer_behavior) +# check io meson options consistency +if get_option('io_single_uring') + if not (io_completer_behavior == 'schedule') + error('Using a single io_uring need a "scheduling" completer') + endif + + if get_option('io_uring_shared_wq') + error('Sharing io_uring ressources is useless when using a single io_uring') + endif +endif + subdir('emper') subdir('tests') subdir('apps') diff --git a/meson_options.txt b/meson_options.txt index a9fb626603202da68b2f324bb9119fc690edc3a4..899b30b6ae540d8cc9c1d79eefc357889080524f 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -110,6 +110,18 @@ option( value: true, description: 'Add support for asynchronous IO using io_uring' ) +option( + 'io_single_uring', + type: 'boolean', + value: false, + description: 'Use only a single io_uring' +) +option( + 'io_try_syscall', + type: 'boolean', + value: false, + description: 'Try non blocking syscalls before submitting the request to io_uring' +) option( 'io_worker_uring_entries', type: 'integer', diff --git a/tests/TellActorFromAnywhereTest.cpp b/tests/TellActorFromAnywhereTest.cpp index 3e63f1f1a4b9c49d41d6bb11a5c00d093886bfcb..7fc9cdad57e10ec2176416396cf5f04cfae7d2a0 100644 --- a/tests/TellActorFromAnywhereTest.cpp +++ b/tests/TellActorFromAnywhereTest.cpp @@ -4,6 +4,7 @@ #include "Actor.hpp" #include "BinaryPrivateSemaphore.hpp" +#include "CallerEnvironment.hpp" #include "emper-common.h" class SignallingActor : public Actor<unsigned int> { @@ -25,7 +26,7 @@ void emperTest() { signallingActor->start(); // TODO: Use std::jthread once EMPER uses C++20. - std::thread signallingThread([&] { signallingActor->tellFromAnywhere(1); }); + std::thread signallingThread([&] { signallingActor->tell<CallerEnvironment::ANYWHERE>(1); }); bps->wait();