Skip to content
Snippets Groups Projects
Commit 30fe27bf authored by Florian Schmaus's avatar Florian Schmaus
Browse files

[IO] Use io_uring_peek_batch_cqe()

parent 196e2ec0
No related branches found
No related tags found
1 merge request!120Reduce the critical section of io_uring CQ
......@@ -7,6 +7,7 @@
#include <sys/eventfd.h> // for eventfd
#include <unistd.h> // for close
#include <array>
#include <atomic> // for atomic, __atomic_base
#include <cassert> // for assert
#include <cerrno> // for errno, ECANCELED, EBUSY, EAGAIN, EINTR
......@@ -162,16 +163,33 @@ template void IoContext::submit<CallerEnvironment::ANYWHERE>(Future &future);
template <CallerEnvironment callerEnvironment>
auto IoContext::reapCompletions() -> std::vector<Fiber *> {
// vector returned containing all reaped completions
std::vector<Fiber *> continuationFibers;
// Should not be more than the uring_entries count.
const unsigned CQE_BATCH_COUNT = EMPER_IO_WORKER_URING_ENTRIES;
uint32_t maxRaceFreeCompleterAttempts = 1;
using Completion = std::pair<uint32_t, TaggedPtr>;
// vector to store seen cqes to make the critical section
// where cq_lock is held as small as possible
std::vector<Completion> reapedCompletions;
reapedCompletions.reserve(CQE_BATCH_COUNT);
// Vector returned containing all reaped completions in form of fibers.
std::vector<Fiber *> continuationFibers;
// We reserve capacity in contiunationFibers as soon as we know the
// amount of reaped completions.
// this label is not used for callerEnvironment::ANYWHERE and thus has to be
// annotated with ATTR_UNUSED
reap_cqes:
ATTR_UNUSED;
// never reap completions on the global IoContext
assert(this != runtime.globalIo);
LOGD("Reaping completions");
std::array<struct io_uring_cqe *, CQE_BATCH_COUNT> cqes;
// Someone else is currently reaping completions
if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
if (unlikely(!cq_lock.try_lock())) {
......@@ -185,32 +203,12 @@ reap_cqes:
}
}
// never reap completions on the global IoContext
assert(this != runtime.globalIo);
LOGD("Reaping completions");
unsigned head;
struct io_uring_cqe *cqe;
unsigned count = 0;
using Completion = std::pair<uint32_t, TaggedPtr>;
// vector to store seen cqes to make the critical section
// where cq_lock is held as small as possible
std::vector<Completion> reapedCompletions;
int err = io_uring_peek_cqe(&ring, &cqe);
if (err) {
if (err == -EAGAIN) {
goto unlock;
}
errno = -err;
DIE_MSG_ERRNO("io_uring_peek_cqe failed");
}
io_uring_for_each_cqe(&ring, head, cqe) {
count++;
unsigned count = io_uring_peek_batch_cqe(&ring, cqes.data(), CQE_BATCH_COUNT);
TaggedPtr tptr(io_uring_cqe_get_data(cqe));
for (unsigned i = 0; i < count; ++i) {
struct io_uring_cqe *cqe = cqes[i];
void *cqe_data = io_uring_cqe_get_data(cqe);
TaggedPtr tptr(cqe_data);
// Got a CQE for a forgotten Future
if (!tptr) {
......@@ -220,17 +218,17 @@ reap_cqes:
reapedCompletions.emplace_back(cqe->res, tptr);
}
LOGD("got " << count << " cqes from the io_uring");
io_uring_cq_advance(&ring, count);
uint32_t globalCompleterAttempts = cq_lock.unlock();
LOGD("got " << count << " cqes from the io_uring");
if constexpr (emper::DEBUG) {
assert(count <= reqs_in_uring);
reqs_in_uring -= count;
}
unlock:
uint32_t globalCompleterAttempts = cq_lock.unlock();
// A naive try lock protecting a worker's IoContext's cq is racy.
// While a worker is holding the lock additional completions could arrive
// which the worker does not observe because it could be already finished iterating.
......@@ -253,6 +251,8 @@ unlock:
stats.record_reaps<callerEnvironment>(count);
continuationFibers.reserve(reapedCompletions.size());
for (auto &completion : reapedCompletions) {
auto res = completion.first;
auto tptr = completion.second;
......@@ -290,13 +290,24 @@ unlock:
// check if lost wakeup was possible
if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
bool reReap = false;
// TODO: How sure are we that this is unlikely?
if (unlikely(globalCompleterAttempts > maxRaceFreeCompleterAttempts)) {
// In all CQ iteration after the first we expect no further globalCompleter attempts
maxRaceFreeCompleterAttempts = 0;
reReap = true;
} else if (count == CQE_BATCH_COUNT) {
// We reaped a full batch, this means there could be potentially
// more CQEs in the completion queue.
reReap = true;
}
if (reReap) {
// schedule all already collected continuation fibers
runtime.schedule(continuationFibers.begin(), continuationFibers.end());
reapedCompletions.clear();
continuationFibers.clear();
// In all CQ iteration after the first we expect no further globalCompleter attempts
maxRaceFreeCompleterAttempts = 0;
goto reap_cqes;
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment