diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index addc76d3cd6260c92c3310550f2e7f76a6532d2a..d1fec427ab31318cc87833f221ac57957fd929f0 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -7,6 +7,7 @@ #include <sys/eventfd.h> // for eventfd #include <unistd.h> // for close +#include <array> #include <atomic> // for atomic, __atomic_base #include <cassert> // for assert #include <cerrno> // for errno, ECANCELED, EBUSY, EAGAIN, EINTR @@ -162,16 +163,32 @@ template void IoContext::submit<CallerEnvironment::ANYWHERE>(Future &future); template <CallerEnvironment callerEnvironment> auto IoContext::reapCompletions() -> std::vector<Fiber *> { - // vector returned containing all reaped completions - std::vector<Fiber *> continuationFibers; + // Should not be more than the uring_entries count. + const unsigned CQE_BATCH_COUNT = EMPER_IO_WORKER_URING_ENTRIES; uint32_t maxRaceFreeCompleterAttempts = 1; + using Completion = std::pair<uint32_t, void *>; + // vector to store seen cqes to make the critical section + // where cq_lock is held as small as possible + std::array<Completion, CQE_BATCH_COUNT> reapedCompletions; + + // Vector returned containing all reaped completions in form of fibers. + std::vector<Fiber *> continuationFibers; + // We reserve capacity in contiunationFibers as soon as we know the + // amount of reaped completions. + // this label is not used for callerEnvironment::ANYWHERE and thus has to be // annotated with ATTR_UNUSED reap_cqes: ATTR_UNUSED; + // never reap completions on the global IoContext + assert(this != runtime.globalIo); + + LOGD("Reaping completions"); + std::array<struct io_uring_cqe *, CQE_BATCH_COUNT> cqes; + // Someone else is currently reaping completions if constexpr (callerEnvironment == CallerEnvironment::EMPER) { if (unlikely(!cq_lock.try_lock())) { @@ -185,52 +202,28 @@ reap_cqes: } } - // never reap completions on the global IoContext - assert(this != runtime.globalIo); - - LOGD("Reaping completions"); - unsigned head; - struct io_uring_cqe *cqe; - unsigned count = 0; + unsigned count = io_uring_peek_batch_cqe(&ring, cqes.data(), CQE_BATCH_COUNT); - using Completion = std::pair<uint32_t, TaggedPtr>; - // vector to store seen cqes to make the critical section - // where cq_lock is held as small as possible - std::vector<Completion> reapedCompletions; + for (unsigned i = 0; i < count; ++i) { + struct io_uring_cqe *cqe = cqes[i]; + void *cqe_data = io_uring_cqe_get_data(cqe); - int err = io_uring_peek_cqe(&ring, &cqe); - if (err) { - if (err == -EAGAIN) { - goto unlock; - } - errno = -err; - DIE_MSG_ERRNO("io_uring_peek_cqe failed"); + auto &reapedCompletion = reapedCompletions[i]; + reapedCompletion.first = cqe->res; + reapedCompletion.second = cqe_data; } - io_uring_for_each_cqe(&ring, head, cqe) { - count++; - - TaggedPtr tptr(io_uring_cqe_get_data(cqe)); - - // Got a CQE for a forgotten Future - if (!tptr) { - continue; - } + io_uring_cq_advance(&ring, count); - reapedCompletions.emplace_back(cqe->res, tptr); - } + uint32_t globalCompleterAttempts = cq_lock.unlock(); LOGD("got " << count << " cqes from the io_uring"); - io_uring_cq_advance(&ring, count); if constexpr (emper::DEBUG) { assert(count <= reqs_in_uring); reqs_in_uring -= count; } -unlock: - uint32_t globalCompleterAttempts = cq_lock.unlock(); - // A naive try lock protecting a worker's IoContext's cq is racy. // While a worker is holding the lock additional completions could arrive // which the worker does not observe because it could be already finished iterating. @@ -253,9 +246,18 @@ unlock: stats.record_reaps<callerEnvironment>(count); - for (auto &completion : reapedCompletions) { + continuationFibers.reserve(count); + + for (unsigned i = 0; i < count; ++i) { + auto &completion = reapedCompletions[i]; auto res = completion.first; - auto tptr = completion.second; + auto *cqe_data = completion.second; + + TaggedPtr tptr(cqe_data); + // Got a CQE for a forgotten Future. + if (!tptr) { + continue; + } auto tag = static_cast<PointerTags>(tptr.getTag()); switch (tag) { @@ -290,13 +292,23 @@ unlock: // check if lost wakeup was possible if constexpr (callerEnvironment == CallerEnvironment::EMPER) { + bool reReap = false; + // TODO: How sure are we that this is unlikely? if (unlikely(globalCompleterAttempts > maxRaceFreeCompleterAttempts)) { + // In all CQ iteration after the first we expect no further globalCompleter attempts + maxRaceFreeCompleterAttempts = 0; + reReap = true; + } else if (count == CQE_BATCH_COUNT) { + // We reaped a full batch, this means there could be potentially + // more CQEs in the completion queue. + reReap = true; + } + + if (reReap) { // schedule all already collected continuation fibers runtime.schedule(continuationFibers.begin(), continuationFibers.end()); continuationFibers.clear(); - // In all CQ iteration after the first we expect no further globalCompleter attempts - maxRaceFreeCompleterAttempts = 0; goto reap_cqes; } }