Commit 1957be38 authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Merge branch 'io-serialization' into 'master'

add io synchronous option

See merge request !373
parents 7c341849 3c3c48df
......@@ -151,6 +151,10 @@ clang-tidy:
variables:
EMPER_IO_SINGLE_URING: 'true'
.emper-synchronous-io:
variables:
EMPER_IO_SYNCHRONOUS: 'true'
.emper-io-stealing:
variables:
EMPER_IO_STEALING: 'true'
......@@ -412,6 +416,11 @@ test-single-uring:
- .test
- .emper-single-uring
test-synchronous-io:
extends:
- .test
- .emper-synchronous-io
test-pipe-sleep-strategy:
extends:
- .test
......
......@@ -232,6 +232,14 @@ enum class IoCompleterBehavior {
const enum IoCompleterBehavior IO_COMPLETER_BEHAVIOR =
IoCompleterBehavior::EMPER_IO_COMPLETER_BEHAVIOR;
const bool IO_SYNCHRONOUS =
#ifdef EMPER_IO_SYNCHRONOUS
true
#else
false
#endif
;
const bool SET_AFFINITY_ON_BLOCK =
#ifdef EMPER_SET_AFFINITY_ON_BLOCK
true
......
......@@ -79,6 +79,10 @@ using emper::io::IoContext;
bool Runtime::guardPageTouchCausesExitSuccess = false;
struct sigaction Runtime::oldact {};
static constexpr bool USE_IO_COMPLETER =
emper::IO && !emper::IO_SYNCHRONOUS &&
emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none;
Runtime::Runtime(workerid_t workerCount, const std::vector<NewWorkerHook>& newWorkerHooks,
bool pinWorkers, workerid_t pinningOffset, const StrategyFactory& strategyFactory,
unsigned int seed)
......@@ -131,7 +135,7 @@ Runtime::Runtime(workerid_t workerCount, const std::vector<NewWorkerHook>& newWo
for (const auto& f : newWorkerHooks) this->newWorkerHooks.push_back(f);
// initialize the global IoContext if a completer is used
if constexpr (emper::IO && emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) {
if constexpr (USE_IO_COMPLETER) {
// The global io_uring needs at least workerCount entries in its SQ because
// for each worker's IoContext one eventfd read is prepared before the
// globalCompleter is started and submits all previously prepared sqes.
......@@ -286,7 +290,7 @@ Runtime::~Runtime() {
delete[] threads;
if constexpr (emper::IO) {
if constexpr (emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) {
if constexpr (USE_IO_COMPLETER) {
// It is safer to destroy the globalIo before the worker IoContexts
// because pointer to the worker IoContexts can outlife their objects
// in the globalCompleter
......@@ -418,7 +422,7 @@ void Runtime::yield() {
}
auto Runtime::nextFiber() -> std::optional<NextFiberResult> {
if constexpr (emper::IO_WORKER_URING) {
if constexpr (emper::IO_WORKER_URING && !emper::IO_SYNCHRONOUS) {
// Schedule all fibers waiting on completed IO
IoContext::ContinuationBuffer completions;
unsigned ncompletions =
......@@ -440,7 +444,7 @@ void Runtime::initiateTermination() {
workerSleepStrategy.notifyAll<CallerEnvironment::ANYWHERE>();
if constexpr (emper::IO && emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) {
if constexpr (USE_IO_COMPLETER) {
globalIo->initiateTermination();
}
}
......@@ -453,7 +457,7 @@ void Runtime::waitUntilFinished() {
}
}
if constexpr (emper::IO && emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) {
if constexpr (USE_IO_COMPLETER) {
globalIo->waitUntilFinished();
}
......
......@@ -2,8 +2,14 @@
// Copyright © 2020-2021 Florian Fischer
#include "Future.hpp"
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/uio.h>
#include <unistd.h>
#include <cassert> // for assert
#include <cerrno> // for errno, EAGAIN, EWOULDBLOCK
#include <ctime>
#include <ostream> // for operator<<, ostream, basic_ost...
#include "BinaryPrivateSemaphore.hpp" // for BPS
......@@ -13,6 +19,7 @@
#include "Runtime.hpp"
#include "Worker.hpp"
#include "io/IoContext.hpp"
#include "io/Operation.hpp"
#include "io/Stats.hpp"
#include "lib/TaggedPtr.hpp"
......@@ -45,9 +52,13 @@ template PartialCompletableFuture::CompletionType
PartialCompletableFuture::tryComplete<CallerEnvironment::ANYWHERE>(int32_t res);
auto Future::_wait() -> int32_t {
LOGD("Waiting on " << this);
if constexpr (emper::IO_SYNCHRONOUS) {
completeSynchronously();
} else {
LOGD("Waiting on " << this);
sem.wait();
sem.wait();
}
return get();
}
......@@ -183,4 +194,63 @@ void CancelWrapper::prepareSqeInternal(struct io_uring_sqe* sqe) {
uint64_t user_data = IoContext::createFutureTag(future);
io_uring_prep_cancel(sqe, user_data, 0);
}
void Future::completeSynchronously() {
int32_t res = [this]() -> int32_t {
switch (op) {
case Operation::SEND:
return static_cast<int32_t>(::send(fd, buf, len, flags));
case Operation::RECV:
return static_cast<int32_t>(::recv(fd, buf, len, flags));
case Operation::CONNECT:
return ::connect(fd, (const struct sockaddr*)buf, (socklen_t)len);
case Operation::ACCEPT:
// NOLINTNEXTLINE(performance-no-int-to-ptr)
return ::accept4(fd, (struct sockaddr*)buf, (socklen_t*)len, 0);
case Operation::OPENAT:
return ::openat(fd, reinterpret_cast<const char*>(buf), static_cast<int>(len),
static_cast<mode_t>(offset));
case Operation::READ:
if (offset == -1) return static_cast<int32_t>(::read(fd, buf, len));
return static_cast<int32_t>(::pread(fd, buf, len, offset));
case Operation::WRITE:
if (offset == -1) return static_cast<int32_t>(::write(fd, buf, len));
return static_cast<int32_t>(::pwrite(fd, buf, len, offset));
case Operation::WRITEV:
return static_cast<int32_t>(::writev(fd, (const struct iovec*)buf, static_cast<int>(len)));
case Operation::CLOSE:
return ::close(fd);
case Operation::SHUTDOWN:
return ::shutdown(fd, flags);
case Operation::LINK_TIMEOUT:
case Operation::CANCEL:
DIE_MSG(op << " is not supported synchronously");
case Operation::TIMEOUT: {
const auto* fts = (const AlarmFuture::Timespec*)buf;
struct timespec ts = {.tv_sec = fts->tv_sec, .tv_nsec = fts->tv_nsec};
::nanosleep(&ts, nullptr);
return -ETIME;
}
case Operation::MADVISE:
return ::madvise(buf, static_cast<off_t>(len), flags);
default:
DIE_MSG("unknown IO operation:" << (int)op);
}
}();
complete(res);
}
} // namespace emper::io
......@@ -221,6 +221,8 @@ class Future : public Logger<LogSubsystem::IO> {
void recordCompletionInternal(Stats& stats, int32_t res) const;
void completeSynchronously();
/**
* @brief Block till the IO request is completed without sanity checks
*
......
......@@ -371,6 +371,10 @@ class IoContext : public Logger<LogSubsystem::IO> {
// must go through IoContext::submit even calling Future::submit
future.markSubmitted();
// If IO_SYNCHRONOUS is enabled, then all I/O operations will be issued
// and completed in Future::wait() using a synchronous system call.
if constexpr (emper::IO_SYNCHRONOUS) return;
if constexpr (emper::IO_SINGLE_URING) {
submitter->tell<callerEnvironment>(&future);
} else {
......@@ -389,6 +393,12 @@ class IoContext : public Logger<LogSubsystem::IO> {
*/
template <typename InputIt, CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
auto submit(InputIt begin, InputIt end) -> unsigned {
// Mark Futures as submitted and return if they are completed synchronously
if constexpr (emper::IO_SYNCHRONOUS) {
for (InputIt cur = begin; cur != end; ++cur) (*cur)->markSubmitted();
return std::distance(begin, end);
}
for (InputIt cur = begin; cur != end; ++cur) {
auto *future = *cur;
future->markSubmitted();
......@@ -423,7 +433,7 @@ class IoContext : public Logger<LogSubsystem::IO> {
emper::assertInRuntime();
submit<CallerEnvironment::EMPER>(future);
if constexpr (emper::IO_SINGLE_URING) {
if constexpr (emper::IO_SINGLE_URING || emper::IO_SYNCHRONOUS) {
return;
}
// io_uring will try to synchronously complete any IO request before
......
......@@ -194,6 +194,7 @@ io_bool_options = [
{'option': 'try_syscall'},
{'option': 'waitfree_stealing',
'dependencies': {'io_stealing': true, 'io_lockless_cq': true}},
{'option': 'synchronous'},
]
io_raw_options = [
......
......@@ -301,6 +301,12 @@ option(
description: 'Memory ordering used for the lockless CQ algorithm',
value: 'weak',
)
option(
'io_synchronous',
type: 'boolean',
description: 'Use synchronous blocking systemcalls instead of the emper::io subsystem',
value: false,
)
option(
'continuation_stealing_mode',
type: 'combo',
......
......@@ -4,6 +4,7 @@
#include <cerrno> // for ECANCELED, ETIME
#include <cstdint> // for uint64_t, int32_t
#include <cstdlib>
#include <memory>
#include <ostream>
......@@ -149,6 +150,8 @@ static void massCancelOnDifferentWorker() {
}
void emperTest() {
if constexpr (emper::IO_SYNCHRONOUS) exit(77);
efd = eventfd(0, 0);
if (efd == -1) {
DIE_MSG_ERRNO("eventfd failed");
......
......@@ -25,7 +25,7 @@
#define BUF_SIZE 1024
auto main(int argc, char* argv[]) -> int {
if constexpr (!emper::IO) {
if constexpr (!emper::IO || emper::IO_SYNCHRONOUS) {
exit(77);
}
......
......@@ -2,8 +2,10 @@
// Copyright © 2020-2021 Florian Fischer
#include <cerrno> // for ETIME
#include <cstdint> // for int32_t
#include <cstdlib>
#include "BinaryPrivateSemaphore.hpp"
#include "Emper.hpp"
#include "emper.hpp"
#include "fixtures/assert.hpp"
#include "io/Future.hpp" // for AlarmFuture
......@@ -64,6 +66,8 @@ static void blockingCallbackTest() {
}
void emperTest() {
if constexpr (emper::IO_SYNCHRONOUS) exit(77);
signallingCallbackTest();
spawningCallbackTest();
blockingCallbackTest();
......
......@@ -3,9 +3,11 @@
#include <unistd.h> // for pipe
#include <cstdint> // for uint64_t, int32_t
#include <cstdlib>
#include <cstring> // for memcmp
#include "Common.hpp"
#include "Emper.hpp"
#include "fixtures/assert.hpp"
#include "io/Future.hpp" // for ReadFuture, CloseFuture, WriteFuture
......@@ -16,6 +18,8 @@ using emper::io::WriteFuture;
static const size_t MEMSIZE = 10 << 20;
void emperTest() {
if constexpr (emper::IO_SYNCHRONOUS) exit(77);
auto* memOut = new char[MEMSIZE];
auto* memIn = new char[MEMSIZE];
// NOLINTNEXTLINE(modernize-avoid-c-arrays)
......
......@@ -7,9 +7,11 @@
#include <array>
#include <cerrno> // for EBADF, ECANCELED
#include <cstdint> // for uint64_t, int32_t
#include <cstdlib>
#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG
#include "Debug.hpp"
#include "Emper.hpp"
#include "emper-config.h"
#include "fixtures/assert.hpp"
#include "io.hpp"
......@@ -129,6 +131,8 @@ static void failureChainCorInvCor() {
}
void emperTest() {
if constexpr (emper::IO_SYNCHRONOUS) exit(77);
// Test if a invalid chain leaves the IoContext in an unexpected / invalid state
failureChainCorInvCor();
successChain();
......
......@@ -21,10 +21,10 @@ void emperTest() {
}
uint64_t read_buf;
ReadFuture read_future(efd, &read_buf, sizeof(read_buf), 0);
ReadFuture read_future(efd, &read_buf, sizeof(read_buf), -1);
uint64_t write_buf = 1;
WriteFuture write_future(efd, &write_buf, sizeof(write_buf), 0);
WriteFuture write_future(efd, &write_buf, sizeof(write_buf), -1);
const int ITERATIONS = 100;
......
......@@ -13,6 +13,7 @@
#include "Common.hpp"
#include "Debug.hpp"
#include "Emper.hpp"
#include "Runtime.hpp"
#include "fixtures/assert.hpp"
#include "fixtures/network.hpp"
......@@ -159,4 +160,7 @@ static void testIov() {
void emperTest() {
testDiskAndNetwork();
testIov();
// The blocking accept4 done by the tcp_listener will never return and thus prevent
// the Runtime from terminating properly.
if constexpr (emper::IO_SYNCHRONOUS) exit(0);
}
......@@ -6,12 +6,14 @@
#include <cerrno>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <memory>
#include "Common.hpp"
#include "CountingPrivateSemaphore.hpp"
#include "Debug.hpp"
#include "Emper.hpp"
#include "Future.hpp"
#include "emper.hpp"
#include "fixtures/assert.hpp"
......@@ -131,6 +133,8 @@ void writeTest() {
}
void emperTest() {
if constexpr (emper::IO_SYNCHRONOUS) exit(77);
sockTest();
readTest();
writeTest();
......
......@@ -4,8 +4,10 @@
#include <cerrno> // for ECANCELED, ETIME
#include <cstdint> // for uint64_t, int32_t
#include <cstdlib>
#include "Common.hpp"
#include "Emper.hpp"
#include "fixtures/assert.hpp"
#include "io/Future.hpp" // for ReadFuture, TimeoutWrapper
......@@ -13,6 +15,8 @@ using emper::io::ReadFuture;
using emper::io::TimeoutWrapper;
void emperTest() {
if constexpr (emper::IO_SYNCHRONOUS) exit(77);
int efd = eventfd(0, EFD_SEMAPHORE);
if (efd == -1) {
DIE_MSG_ERRNO("eventfd failed");
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Fischer
#include <array>
#include <cstddef>
#include <cstdlib>
#include "Emper.hpp"
#include "emper-config.h" // for EMPER_IO_WORKER_URING_ENTRIES
#include "io/Future.hpp" // for AlarmFuture
using emper::io::AlarmFuture;
void emperTest() {
if constexpr (emper::IO_SYNCHRONOUS) exit(77);
const size_t links = static_cast<size_t>(EMPER_IO_WORKER_URING_ENTRIES) * 2;
std::array<AlarmFuture*, links> futures;
AlarmFuture::Timespec ts = {.tv_sec = 0, .tv_nsec = 100};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment