Skip to content
Snippets Groups Projects
Commit 31de63c7 authored by Florian Fischer's avatar Florian Fischer
Browse files

[IO] make Future::submit thread safe

Future::submit<ANYWHERE> and Future::submit<EMPER> can be executed
in parallel by the globalCompleter and.
To prevent possible data races of the io_urings SQ a new mutex
is introduced.
This new sq_mutex must be unlocked before calling reapCompletions() because
reapCompletions() may call Future::submit on not fully completed Futures
and thus causing a deadlock.
parent 51021af0
No related branches found
No related tags found
No related merge requests found
Pipeline #54297 failed
......@@ -101,6 +101,13 @@ void Future::submit() {
origin.uringFutureSet.insert(this);
}
// protect the sq from parallel modification from the completer
// calling submit<ANYWHERE> and the worker owning origin calling
// submit<EMPER>.
// To prevent deadlocks when reapCompletions is resubmitting not fully completed
// Futures we must unlock aq_mutex before calling reapCompletions.
std::unique_lock<std::mutex> sq_lock(origin.sq_mutex);
struct io_uring_sqe *sqe = io_uring_get_sqe(&origin.ring);
// no entries in submission queue left -> try to reap some completions
if (!sqe) {
......@@ -108,7 +115,13 @@ void Future::submit() {
LOGD("Busy waiting for a free sqe");
TIME_NS(
while (!(sqe = io_uring_get_sqe(&origin.ring))) { origin.reapCompletions(); },
{
while (!(sqe = io_uring_get_sqe(&origin.ring))) {
sq_lock.unlock();
origin.reapCompletions();
sq_lock.lock();
}
},
origin.stats.record_get_sqe_busy_wait);
}
......@@ -121,7 +134,13 @@ void Future::submit() {
LOGW("io_submit returned EBUSY trying to submit in addition to " << origin.reqs_in_uring);
TIME_NS(
while ((submitted = io_uring_submit(&origin.ring)) == -EBUSY) { origin.reapCompletions(); },
{
while ((submitted = io_uring_submit(&origin.ring)) == -EBUSY) {
sq_lock.lock();
origin.reapCompletions();
sq_lock.unlock();
}
},
origin.stats.record_io_submit_busy_wait);
}
......
......@@ -37,8 +37,15 @@ class IoContext : public Logger<LogSubsystem::IO> {
/* function executed by the global completer thread */
static auto globalCompleterFunc(void *) -> void *;
// Mutex protecting the submission queue of ring.
// Used in Future::submit<> to prevent parallel access from the globalCompleter and the worker
std::mutex sq_mutex;
// Mutex protecting the completion queue of ring.
std::mutex cq_mutex;
struct io_uring ring;
// eventfd registered with ring. It is used to notify the globalCompleter about
// available completions in ring.
int ring_eventfd;
uint64_t ring_eventfd_readbuf;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment