Commit 94a00a78 authored by Florian Fischer's avatar Florian Fischer
Browse files

add synchronous io option

When EMPER is build with -Dio_synchronous each Future will be
completed synchronously when calling Future::wait().
parent 7c341849
......@@ -232,6 +232,14 @@ enum class IoCompleterBehavior {
const enum IoCompleterBehavior IO_COMPLETER_BEHAVIOR =
IoCompleterBehavior::EMPER_IO_COMPLETER_BEHAVIOR;
const bool IO_SYNCHRONOUS =
#ifdef EMPER_IO_SYNCHRONOUS
true
#else
false
#endif
;
const bool SET_AFFINITY_ON_BLOCK =
#ifdef EMPER_SET_AFFINITY_ON_BLOCK
true
......
......@@ -79,6 +79,10 @@ using emper::io::IoContext;
bool Runtime::guardPageTouchCausesExitSuccess = false;
struct sigaction Runtime::oldact {};
static constexpr bool USE_IO_COMPLETER =
emper::IO && !emper::IO_SYNCHRONOUS &&
emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none;
Runtime::Runtime(workerid_t workerCount, const std::vector<NewWorkerHook>& newWorkerHooks,
bool pinWorkers, workerid_t pinningOffset, const StrategyFactory& strategyFactory,
unsigned int seed)
......@@ -131,7 +135,7 @@ Runtime::Runtime(workerid_t workerCount, const std::vector<NewWorkerHook>& newWo
for (const auto& f : newWorkerHooks) this->newWorkerHooks.push_back(f);
// initialize the global IoContext if a completer is used
if constexpr (emper::IO && emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) {
if constexpr (USE_IO_COMPLETER) {
// The global io_uring needs at least workerCount entries in its SQ because
// for each worker's IoContext one eventfd read is prepared before the
// globalCompleter is started and submits all previously prepared sqes.
......@@ -286,7 +290,7 @@ Runtime::~Runtime() {
delete[] threads;
if constexpr (emper::IO) {
if constexpr (emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) {
if constexpr (USE_IO_COMPLETER) {
// It is safer to destroy the globalIo before the worker IoContexts
// because pointer to the worker IoContexts can outlife their objects
// in the globalCompleter
......@@ -418,7 +422,7 @@ void Runtime::yield() {
}
auto Runtime::nextFiber() -> std::optional<NextFiberResult> {
if constexpr (emper::IO_WORKER_URING) {
if constexpr (emper::IO_WORKER_URING && !emper::IO_SYNCHRONOUS) {
// Schedule all fibers waiting on completed IO
IoContext::ContinuationBuffer completions;
unsigned ncompletions =
......@@ -440,7 +444,7 @@ void Runtime::initiateTermination() {
workerSleepStrategy.notifyAll<CallerEnvironment::ANYWHERE>();
if constexpr (emper::IO && emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) {
if constexpr (USE_IO_COMPLETER) {
globalIo->initiateTermination();
}
}
......@@ -453,7 +457,7 @@ void Runtime::waitUntilFinished() {
}
}
if constexpr (emper::IO && emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) {
if constexpr (USE_IO_COMPLETER) {
globalIo->waitUntilFinished();
}
......
......@@ -2,8 +2,14 @@
// Copyright © 2020-2021 Florian Fischer
#include "Future.hpp"
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/uio.h>
#include <unistd.h>
#include <cassert> // for assert
#include <cerrno> // for errno, EAGAIN, EWOULDBLOCK
#include <ctime>
#include <ostream> // for operator<<, ostream, basic_ost...
#include "BinaryPrivateSemaphore.hpp" // for BPS
......@@ -13,6 +19,7 @@
#include "Runtime.hpp"
#include "Worker.hpp"
#include "io/IoContext.hpp"
#include "io/Operation.hpp"
#include "io/Stats.hpp"
#include "lib/TaggedPtr.hpp"
......@@ -45,9 +52,13 @@ template PartialCompletableFuture::CompletionType
PartialCompletableFuture::tryComplete<CallerEnvironment::ANYWHERE>(int32_t res);
auto Future::_wait() -> int32_t {
LOGD("Waiting on " << this);
if constexpr (emper::IO_SYNCHRONOUS) {
completeSynchronously();
} else {
LOGD("Waiting on " << this);
sem.wait();
sem.wait();
}
return get();
}
......@@ -183,4 +194,63 @@ void CancelWrapper::prepareSqeInternal(struct io_uring_sqe* sqe) {
uint64_t user_data = IoContext::createFutureTag(future);
io_uring_prep_cancel(sqe, user_data, 0);
}
void Future::completeSynchronously() {
int32_t res = [this]() -> int32_t {
switch (op) {
case Operation::SEND:
return static_cast<int32_t>(::send(fd, buf, len, flags));
case Operation::RECV:
return static_cast<int32_t>(::recv(fd, buf, len, flags));
case Operation::CONNECT:
return ::connect(fd, (const struct sockaddr*)buf, (socklen_t)len);
case Operation::ACCEPT:
// NOLINTNEXTLINE(performance-no-int-to-ptr)
return ::accept4(fd, (struct sockaddr*)buf, (socklen_t*)len, 0);
case Operation::OPENAT:
return ::openat(fd, reinterpret_cast<const char*>(buf), static_cast<int>(len),
static_cast<mode_t>(offset));
case Operation::READ:
if (offset == -1) return static_cast<int32_t>(::read(fd, buf, len));
return static_cast<int32_t>(::pread(fd, buf, len, offset));
case Operation::WRITE:
if (offset == -1) return static_cast<int32_t>(::write(fd, buf, len));
return static_cast<int32_t>(::pwrite(fd, buf, len, offset));
case Operation::WRITEV:
return static_cast<int32_t>(::writev(fd, (const struct iovec*)buf, static_cast<int>(len)));
case Operation::CLOSE:
return ::close(fd);
case Operation::SHUTDOWN:
return ::shutdown(fd, flags);
case Operation::LINK_TIMEOUT:
case Operation::CANCEL:
DIE_MSG(op << " is not supported synchronously");
case Operation::TIMEOUT: {
const auto* fts = (const AlarmFuture::Timespec*)buf;
struct timespec ts = {.tv_sec = fts->tv_sec, .tv_nsec = fts->tv_nsec};
::nanosleep(&ts, nullptr);
return -ETIME;
}
case Operation::MADVISE:
return ::madvise(buf, static_cast<off_t>(len), flags);
default:
DIE_MSG("unknown IO operation:" << (int)op);
}
}();
complete(res);
}
} // namespace emper::io
......@@ -221,6 +221,8 @@ class Future : public Logger<LogSubsystem::IO> {
void recordCompletionInternal(Stats& stats, int32_t res) const;
void completeSynchronously();
/**
* @brief Block till the IO request is completed without sanity checks
*
......
......@@ -371,6 +371,10 @@ class IoContext : public Logger<LogSubsystem::IO> {
// must go through IoContext::submit even calling Future::submit
future.markSubmitted();
// If IO_SYNCHRONOUS is enabled, then all I/O operations will be issued
// and completed in Future::wait() using a synchronous system call.
if constexpr (emper::IO_SYNCHRONOUS) return;
if constexpr (emper::IO_SINGLE_URING) {
submitter->tell<callerEnvironment>(&future);
} else {
......@@ -389,6 +393,12 @@ class IoContext : public Logger<LogSubsystem::IO> {
*/
template <typename InputIt, CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
auto submit(InputIt begin, InputIt end) -> unsigned {
// Mark Futures as submitted and return if they are completed synchronously
if constexpr (emper::IO_SYNCHRONOUS) {
for (InputIt cur = begin; cur != end; ++cur) (*cur)->markSubmitted();
return std::distance(begin, end);
}
for (InputIt cur = begin; cur != end; ++cur) {
auto *future = *cur;
future->markSubmitted();
......@@ -423,7 +433,7 @@ class IoContext : public Logger<LogSubsystem::IO> {
emper::assertInRuntime();
submit<CallerEnvironment::EMPER>(future);
if constexpr (emper::IO_SINGLE_URING) {
if constexpr (emper::IO_SINGLE_URING || emper::IO_SYNCHRONOUS) {
return;
}
// io_uring will try to synchronously complete any IO request before
......
......@@ -194,6 +194,7 @@ io_bool_options = [
{'option': 'try_syscall'},
{'option': 'waitfree_stealing',
'dependencies': {'io_stealing': true, 'io_lockless_cq': true}},
{'option': 'synchronous'},
]
io_raw_options = [
......
......@@ -301,6 +301,12 @@ option(
description: 'Memory ordering used for the lockless CQ algorithm',
value: 'weak',
)
option(
'io_synchronous',
type: 'boolean',
description: 'Use synchronous blocking systemcalls instead of the emper::io subsystem',
value: false,
)
option(
'continuation_stealing_mode',
type: 'combo',
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment