Commit bb72fca3 authored by Florian Fischer's avatar Florian Fischer
Browse files

add a batch optimization for the global completer

This change introduces new scheduleFromAnywhere methods which take
a range of Fibers to schedule.

Blockable gets a new method returning the fiber used to start
the unblocked context, which is used by Future/PartialCompletableFuture
to provide a way of completion and returning the continuation Fiber
to the caller so they may schedule the continuation how they want.

If the meson option io_batch_anywhere_completions is set the global
completer will collect all callback and continuation fibers before
scheduling them at once when it is done reaping the completions.
The idea is that taking the AnywhereQueue write lock and calling onNewWork
must only be done once.

TODO: investigate if onNewWork should be extended by an amountOfWork
argument which determines how many worker can be awoken and have work to
do. This should be trivially since our WorkerWakeupSemaphore implementations
already support notify_many(), which may be implemented in terms of
notify_all though.
parent 58d376aa
Pipeline #58701 failed with stages
in 50 seconds
...@@ -41,16 +41,18 @@ class Blockable : public Logger<logSubsystem> { ...@@ -41,16 +41,18 @@ class Blockable : public Logger<logSubsystem> {
contextManager.saveAndStartNew(std::move(freshContextHook)); contextManager.saveAndStartNew(std::move(freshContextHook));
} }
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> auto unblockAndGetContinuation(Context* context) -> Fiber* {
void unblock(Context* context) {
assert(context != nullptr); assert(context != nullptr);
if constexpr (emper::BLOCKED_CONTEXT_SET) { if constexpr (emper::BLOCKED_CONTEXT_SET) {
blockedContexts.erase(context); blockedContexts.erase(context);
} }
// cppcheck-suppress unsafeClassCanLeak return Fiber::from([this, context]() { contextManager.discardAndResume(context); });
Fiber* unblockFiber = }
Fiber::from([this, context]() { contextManager.discardAndResume(context); });
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
void unblock(Context* context) {
Fiber* unblockFiber = unblockAndGetContinuation(context);
if constexpr (callerEnvironment == CallerEnvironment::EMPER) { if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
runtime.schedule(*unblockFiber); runtime.schedule(*unblockFiber);
......
...@@ -97,4 +97,11 @@ static const bool IO_URING_SHARED_WQ = ...@@ -97,4 +97,11 @@ static const bool IO_URING_SHARED_WQ =
false false
#endif #endif
; ;
static const bool IO_BATCH_ANYWHERE_COMPLETIONS =
#ifdef EMPER_IO_BATCH_ANYWHERE_COMPLETIONS
true
#else
false
#endif
;
} // namespace emper } // namespace emper
...@@ -37,6 +37,13 @@ class PrivateSemaphore : protected Blockable<LogSubsystem::PS> { ...@@ -37,6 +37,13 @@ class PrivateSemaphore : protected Blockable<LogSubsystem::PS> {
// invalid at this point. // invalid at this point.
} }
[[nodiscard]] auto signalAndGetContinuation() -> Fiber* {
if (Context* readyContext = signalInternal()) {
return unblockAndGetContinuation(readyContext);
}
return nullptr;
}
void signalFromAnywhere() { signal<CallerEnvironment::ANYWHERE>(); } void signalFromAnywhere() { signal<CallerEnvironment::ANYWHERE>(); }
void signalAndExit() { void signalAndExit() {
......
...@@ -158,6 +158,11 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { ...@@ -158,6 +158,11 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); } inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); }
template <class InputIt>
inline void scheduleFromAnywhere(InputIt begin, InputIt end) {
scheduler.scheduleFromAnywhere(begin, end);
}
void yield(); void yield();
// TODO: This should probably not be a public method of Runtime. // TODO: This should probably not be a public method of Runtime.
......
...@@ -42,6 +42,11 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { ...@@ -42,6 +42,11 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
void enqueueInAnywhereQueue(Fiber& fiber) { scheduleAnywhereQueue.enqueue(&fiber); } void enqueueInAnywhereQueue(Fiber& fiber) { scheduleAnywhereQueue.enqueue(&fiber); }
template <class InputIt>
void insertInAnywhereQueue(InputIt begin, InputIt end) {
scheduleAnywhereQueue.insert(begin, end);
}
auto dequeueFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); } auto dequeueFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); }
virtual void scheduleInternal(Fiber& fiber) = 0; virtual void scheduleInternal(Fiber& fiber) = 0;
...@@ -60,4 +65,13 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { ...@@ -60,4 +65,13 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
onNewWork<CallerEnvironment::ANYWHERE>(); onNewWork<CallerEnvironment::ANYWHERE>();
} }
// TODO: investigate if it is still a good idea to wakeup only a single
// worker maybe we want something like onNewWork(amountOfWork)
template <class InputIt>
void scheduleFromAnywhere(InputIt begin, InputIt end) {
insertInAnywhereQueue(begin, end);
onNewWork<CallerEnvironment::ANYWHERE>();
}
}; };
...@@ -106,6 +106,11 @@ class Future : public Logger<LogSubsystem::IO> { ...@@ -106,6 +106,11 @@ class Future : public Logger<LogSubsystem::IO> {
sem.signalFromAnywhere(); sem.signalFromAnywhere();
} }
virtual auto completeAndGetContinuation(int32_t res) -> Fiber* {
setCompletion(res);
return sem.signalAndGetContinuation();
}
/** /**
* Used for Stats::recordCompletion double dispatch * Used for Stats::recordCompletion double dispatch
*/ */
...@@ -377,6 +382,21 @@ class PartialCompletableFuture : public Future { ...@@ -377,6 +382,21 @@ class PartialCompletableFuture : public Future {
} }
} }
auto completeAndGetContinuation(int32_t res) -> Fiber* override {
CompletionType completion = tryComplete<CallerEnvironment::ANYWHERE>(res);
switch (completion) {
case CompletionType::Resubmission:
return nullptr;
case CompletionType::IncrementalCompletion:
return Future::completeAndGetContinuation(partialCompletion);
case CompletionType::Completion:
return Future::completeAndGetContinuation(res);
default:
DIE_MSG("Unknown CompletionType: " << completion);
}
}
public: public:
/** /**
* @brief Block till the IO request is completed * @brief Block till the IO request is completed
......
...@@ -191,6 +191,10 @@ void IoContext::reapCompletions() { ...@@ -191,6 +191,10 @@ void IoContext::reapCompletions() {
struct io_uring_cqe *cqe; struct io_uring_cqe *cqe;
unsigned count = 0; unsigned count = 0;
// vector used to batch all completions scheduled to the AnywhereQueue
std::vector<Fiber *> continuationFibers;
Runtime *runtime = Runtime::getRuntime();
int err = io_uring_peek_cqe(&ring, &cqe); int err = io_uring_peek_cqe(&ring, &cqe);
if (err) { if (err) {
if (err == -EAGAIN) { if (err == -EAGAIN) {
...@@ -217,11 +221,14 @@ void IoContext::reapCompletions() { ...@@ -217,11 +221,14 @@ void IoContext::reapCompletions() {
c(res); c(res);
delete &c; delete &c;
}); });
Runtime *runtime = Runtime::getRuntime();
if constexpr (callerEnvironment == CallerEnvironment::EMPER) { if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
runtime->schedule(*callbackFiber); runtime->schedule(*callbackFiber);
} else { } else {
runtime->scheduleFromAnywhere(*callbackFiber); if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) {
continuationFibers.push_back(callbackFiber);
} else {
runtime->scheduleFromAnywhere(*callbackFiber);
}
} }
continue; continue;
} }
...@@ -235,7 +242,14 @@ void IoContext::reapCompletions() { ...@@ -235,7 +242,14 @@ void IoContext::reapCompletions() {
if constexpr (callerEnvironment == EMPER) { if constexpr (callerEnvironment == EMPER) {
future->complete(cqe->res); future->complete(cqe->res);
} else { } else {
future->completeFromAnywhere(cqe->res); if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) {
Fiber *continuation = future->completeAndGetContinuation(cqe->res);
if (continuation) {
continuationFibers.push_back(continuation);
}
} else {
future->completeFromAnywhere(cqe->res);
}
} }
} }
...@@ -250,6 +264,8 @@ void IoContext::reapCompletions() { ...@@ -250,6 +264,8 @@ void IoContext::reapCompletions() {
if constexpr (callerEnvironment == CallerEnvironment::EMPER) { if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
stats.record_worker_reaps(count); stats.record_worker_reaps(count);
} else { } else {
// actually schedule all completion fibers
runtime->scheduleFromAnywhere(continuationFibers.begin(), continuationFibers.end());
stats.record_completer_reaps(count); stats.record_completer_reaps(count);
} }
......
...@@ -20,6 +20,14 @@ class LockedUnboundedQueue { ...@@ -20,6 +20,14 @@ class LockedUnboundedQueue {
queue.push(item); queue.push(item);
} }
template <class InputIt>
void insert(InputIt begin, InputIt end) {
std::lock_guard<std::mutex> lock(queue_mutex);
for (; begin != end; ++begin) {
queue.push(*begin);
}
}
auto dequeue() -> I* { auto dequeue() -> I* {
std::lock_guard<std::mutex> lock(queue_mutex); std::lock_guard<std::mutex> lock(queue_mutex);
if (queue.empty()) { if (queue.empty()) {
......
...@@ -65,6 +65,7 @@ endif ...@@ -65,6 +65,7 @@ endif
io_bool_options = [ io_bool_options = [
'uring_sqpoll', 'uring_sqpoll',
'uring_shared_wq', 'uring_shared_wq',
'batch_anywhere_completions',
] ]
io_raw_options = [ io_raw_options = [
......
...@@ -109,3 +109,9 @@ option( ...@@ -109,3 +109,9 @@ option(
value: false, value: false,
description: 'Share a common async backend between all io_urings' description: 'Share a common async backend between all io_urings'
) )
option(
'io_batch_anywhere_completions',
type: 'boolean',
value: false,
description: 'Collect and schedule all completions reaped by the global completer at once'
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment