Skip to content
Snippets Groups Projects
Commit b22579c0 authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Merge branch 'batch_schedule_from_anywhere' into 'master'

add a batch optimization for the global completer

See merge request !110
parents 6fd6fc4d 17776ba2
No related branches found
No related tags found
No related merge requests found
Pipeline #58723 passed
......@@ -41,16 +41,18 @@ class Blockable : public Logger<logSubsystem> {
contextManager.saveAndStartNew(std::move(freshContextHook));
}
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
void unblock(Context* context) {
auto unblockAndGetContinuation(Context* context) -> Fiber* {
assert(context != nullptr);
if constexpr (emper::BLOCKED_CONTEXT_SET) {
blockedContexts.erase(context);
}
// cppcheck-suppress unsafeClassCanLeak
Fiber* unblockFiber =
Fiber::from([this, context]() { contextManager.discardAndResume(context); });
return Fiber::from([this, context]() { contextManager.discardAndResume(context); });
}
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
void unblock(Context* context) {
Fiber* unblockFiber = unblockAndGetContinuation(context);
if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
runtime.schedule(*unblockFiber);
......
......@@ -97,4 +97,11 @@ static const bool IO_URING_SHARED_WQ =
false
#endif
;
static const bool IO_BATCH_ANYWHERE_COMPLETIONS =
#ifdef EMPER_IO_BATCH_ANYWHERE_COMPLETIONS
true
#else
false
#endif
;
} // namespace emper
......@@ -37,6 +37,13 @@ class PrivateSemaphore : protected Blockable<LogSubsystem::PS> {
// invalid at this point.
}
[[nodiscard]] auto signalAndGetContinuation() -> Fiber* {
if (Context* readyContext = signalInternal()) {
return unblockAndGetContinuation(readyContext);
}
return nullptr;
}
void signalFromAnywhere() { signal<CallerEnvironment::ANYWHERE>(); }
void signalAndExit() {
......
......@@ -158,6 +158,11 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); }
template <class InputIt>
inline void scheduleFromAnywhere(InputIt begin, InputIt end) {
scheduler.scheduleFromAnywhere(begin, end);
}
void yield();
// TODO: This should probably not be a public method of Runtime.
......
......@@ -42,6 +42,11 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
void enqueueInAnywhereQueue(Fiber& fiber) { scheduleAnywhereQueue.enqueue(&fiber); }
template <class InputIt>
void insertInAnywhereQueue(InputIt begin, InputIt end) {
scheduleAnywhereQueue.insert(begin, end);
}
auto dequeueFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); }
virtual void scheduleInternal(Fiber& fiber) = 0;
......@@ -60,4 +65,13 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
onNewWork<CallerEnvironment::ANYWHERE>();
}
// TODO: investigate if it is still a good idea to wakeup only a single
// worker maybe we want something like onNewWork(amountOfWork)
template <class InputIt>
void scheduleFromAnywhere(InputIt begin, InputIt end) {
insertInAnywhereQueue(begin, end);
onNewWork<CallerEnvironment::ANYWHERE>();
}
};
......@@ -22,6 +22,7 @@
#include "io/Operation.hpp" // for Operation, operator<<, Operati...
struct io_uring_sqe;
class Fiber;
namespace emper::io {
class Stats;
......@@ -106,6 +107,11 @@ class Future : public Logger<LogSubsystem::IO> {
sem.signalFromAnywhere();
}
virtual auto completeAndGetContinuation(int32_t res) -> Fiber* {
setCompletion(res);
return sem.signalAndGetContinuation();
}
/**
* Used for Stats::recordCompletion double dispatch
*/
......@@ -377,6 +383,21 @@ class PartialCompletableFuture : public Future {
}
}
auto completeAndGetContinuation(int32_t res) -> Fiber* override {
CompletionType completion = tryComplete<CallerEnvironment::ANYWHERE>(res);
switch (completion) {
case CompletionType::Resubmission:
return nullptr;
case CompletionType::IncrementalCompletion:
return Future::completeAndGetContinuation(partialCompletion);
case CompletionType::Completion:
return Future::completeAndGetContinuation(res);
default:
DIE_MSG("Unknown CompletionType: " << (int)completion);
}
}
public:
/**
* @brief Block till the IO request is completed
......
......@@ -13,6 +13,7 @@
#include <cstdio> // for perror
#include <cstring> // for memset
#include <memory> // for allocator
#include <vector>
#include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER, ANYWHERE
#include "Common.hpp" // for unlikely, DIE_MSG_ERRNO, DIE_MSG
......@@ -191,6 +192,10 @@ void IoContext::reapCompletions() {
struct io_uring_cqe *cqe;
unsigned count = 0;
// vector used to batch all completions scheduled to the AnywhereQueue
std::vector<Fiber *> continuationFibers;
Runtime *runtime = Runtime::getRuntime();
int err = io_uring_peek_cqe(&ring, &cqe);
if (err) {
if (err == -EAGAIN) {
......@@ -217,11 +222,14 @@ void IoContext::reapCompletions() {
c(res);
delete &c;
});
Runtime *runtime = Runtime::getRuntime();
if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
runtime->schedule(*callbackFiber);
} else {
runtime->scheduleFromAnywhere(*callbackFiber);
if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) {
continuationFibers.push_back(callbackFiber);
} else {
runtime->scheduleFromAnywhere(*callbackFiber);
}
}
continue;
}
......@@ -235,7 +243,14 @@ void IoContext::reapCompletions() {
if constexpr (callerEnvironment == EMPER) {
future->complete(cqe->res);
} else {
future->completeFromAnywhere(cqe->res);
if constexpr (emper::IO_BATCH_ANYWHERE_COMPLETIONS) {
Fiber *continuation = future->completeAndGetContinuation(cqe->res);
if (continuation) {
continuationFibers.push_back(continuation);
}
} else {
future->completeFromAnywhere(cqe->res);
}
}
}
......@@ -250,6 +265,8 @@ void IoContext::reapCompletions() {
if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
stats.record_worker_reaps(count);
} else {
// actually schedule all completion fibers
runtime->scheduleFromAnywhere(continuationFibers.begin(), continuationFibers.end());
stats.record_completer_reaps(count);
}
......
......@@ -20,6 +20,14 @@ class LockedUnboundedQueue {
queue.push(item);
}
template <class InputIt>
void insert(InputIt begin, InputIt end) {
std::lock_guard<std::mutex> lock(queue_mutex);
for (; begin != end; ++begin) {
queue.push(*begin);
}
}
auto dequeue() -> I* {
std::lock_guard<std::mutex> lock(queue_mutex);
if (queue.empty()) {
......
......@@ -65,6 +65,7 @@ endif
io_bool_options = [
'uring_sqpoll',
'uring_shared_wq',
'batch_anywhere_completions',
]
io_raw_options = [
......
......@@ -109,3 +109,9 @@ option(
value: false,
description: 'Share a common async backend between all io_urings'
)
option(
'io_batch_anywhere_completions',
type: 'boolean',
value: false,
description: 'Collect and schedule all completions reaped by the global completer at once'
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment