diff --git a/emper/io/Future.cpp b/emper/io/Future.cpp index b71c00bb56a40066a14d64066683a0fa97840b48..c3cbb74bfc1ff04bb7d7d1dd9bdfe44f75b33e5a 100644 --- a/emper/io/Future.cpp +++ b/emper/io/Future.cpp @@ -101,6 +101,13 @@ void Future::submit() { origin.uringFutureSet.insert(this); } + // protect the sq from parallel modification from the completer + // calling submit<ANYWHERE> and the worker owning origin calling + // submit<EMPER>. + // To prevent deadlocks when reapCompletions is resubmitting not fully completed + // Futures we must unlock aq_mutex before calling reapCompletions. + std::unique_lock<std::mutex> sq_lock(origin.sq_mutex); + struct io_uring_sqe *sqe = io_uring_get_sqe(&origin.ring); // no entries in submission queue left -> try to reap some completions if (!sqe) { @@ -108,7 +115,13 @@ void Future::submit() { LOGD("Busy waiting for a free sqe"); TIME_NS( - while (!(sqe = io_uring_get_sqe(&origin.ring))) { origin.reapCompletions(); }, + { + while (!(sqe = io_uring_get_sqe(&origin.ring))) { + sq_lock.unlock(); + origin.reapCompletions(); + sq_lock.lock(); + } + }, origin.stats.record_get_sqe_busy_wait); } @@ -121,7 +134,13 @@ void Future::submit() { LOGW("io_submit returned EBUSY trying to submit in addition to " << origin.reqs_in_uring); TIME_NS( - while ((submitted = io_uring_submit(&origin.ring)) == -EBUSY) { origin.reapCompletions(); }, + { + while ((submitted = io_uring_submit(&origin.ring)) == -EBUSY) { + sq_lock.lock(); + origin.reapCompletions(); + sq_lock.unlock(); + } + }, origin.stats.record_io_submit_busy_wait); } diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index bf47f37997de3ca38d8bf01771de9f0429a56ecc..120a625a12c25a6a1590b399e754ee96e7e57191 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -37,8 +37,15 @@ class IoContext : public Logger<LogSubsystem::IO> { /* function executed by the global completer thread */ static auto globalCompleterFunc(void *) -> void *; + // Mutex protecting the submission queue of ring. + // Used in Future::submit<> to prevent parallel access from the globalCompleter and the worker + std::mutex sq_mutex; + // Mutex protecting the completion queue of ring. std::mutex cq_mutex; struct io_uring ring; + + // eventfd registered with ring. It is used to notify the globalCompleter about + // available completions in ring. int ring_eventfd; uint64_t ring_eventfd_readbuf;