Commit 3b7a60ff authored by Florian Fischer's avatar Florian Fischer
Browse files

[IO] remove partial completion feature

Not supporting partial completions for send, write, read makes
a lot of code paths way simpler, removes Futures from the globalIo and
allows more inheritance.

I don't think partial completion support is worth the hassle.
Partial completions cancel all dependent requests. Resubmitting some
in the chain breaks the semantic.

The global completer does not need to handle requests in its own io_uring.
We don't need templates for submit because submit works only from within
the Runtime. This allows submit to be virtual and be overridden by
Futures which issue a syscall before they submit them selves to the io_uring.

This and the absence of partial completions save a lot of conditions on
the fast path.
parent 0e33cea1
......@@ -87,7 +87,7 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy, unsigned int
// The global io_uring needs at least workerCount entries in its SQ because
// for each worker's IoContext one eventfd read is prepared before the
// globalCompleter is started and submits all previously prepared sqes.
globalIo = new IoContext(workerCount + EMPER_IO_GLOBAL_URING_ENTRIES);
globalIo = new IoContext(workerCount);
ioContexts = new IoContext[workerCount];
if constexpr (emper::STATS) {
......
......@@ -76,16 +76,14 @@ inline auto recvAndWait(int socket, void *buffer, size_t length, int flags) -> s
* @param buffer source buffer
* @param length length of the message in bytes
* @param flags type of message transmission
* @param send_all If true sginal the future only if all bytes are sent or
* sending further bytes is not possible
*
* @return Future object which signals the completion of the send request
*/
inline auto send(int socket, const void *buffer, size_t length, int flags, bool send_all = true)
inline auto send(int socket, const void *buffer, size_t length, int flags)
-> std::unique_ptr<Future> {
assert(Runtime::inRuntime());
IoContext *io = IoContext::getWorkerIo();
return io->send(socket, buffer, length, flags, send_all);
return io->send(socket, buffer, length, flags);
}
/**
......@@ -98,16 +96,13 @@ inline auto send(int socket, const void *buffer, size_t length, int flags, bool
* @param buffer source buffer
* @param length length of the message in bytes
* @param flags type of message transmission
* @param send_all If true return to the user only if all bytes are sent or
* sending further bytes is not possible
*
* @return -1 on error, otherwise the number of sent bytes
*/
inline auto sendAndWait(int socket, const void *buffer, size_t length, int flags,
bool send_all = true) -> ssize_t {
inline auto sendAndWait(int socket, const void *buffer, size_t length, int flags) -> ssize_t {
assert(Runtime::inRuntime());
IoContext *io = IoContext::getWorkerIo();
return io->sendAndWait(socket, buffer, length, flags, send_all);
return io->sendAndWait(socket, buffer, length, flags);
}
/**
......@@ -227,16 +222,14 @@ inline auto acceptAndWait(int socket, struct sockaddr *address, socklen_t *addre
* @param buf destination buffer
* @param nbyte amount of bytes to read
* @param offset offset in the file
* @param read_all If true signal the future only if the buffer is completely
* filled or no further data can be read.
*
* @return Future object which signals the completion of the read request
*/
inline auto readFile(int fildes, void *buf, size_t nbyte, off_t offset = 0, bool read_all = false)
inline auto readFile(int fildes, void *buf, size_t nbyte, off_t offset = 0)
-> std::unique_ptr<Future> {
assert(Runtime::inRuntime());
IoContext *io = IoContext::getWorkerIo();
return io->readFile(fildes, buf, nbyte, offset, read_all);
return io->readFile(fildes, buf, nbyte, offset);
}
/**
......@@ -249,16 +242,13 @@ inline auto readFile(int fildes, void *buf, size_t nbyte, off_t offset = 0, bool
* @param buf destination buffer
* @param nbyte amount of bytes to read
* @param offset offset in the file
* @param read_all If true return only to the user if the buffer is completely
* filled or no further data can be read.
*
* @return -1 on error, otherwise the number of bytes read
*/
inline auto readFileAndWait(int fildes, void *buf, size_t nbyte, off_t offset = 0,
bool read_all = false) -> ssize_t {
inline auto readFileAndWait(int fildes, void *buf, size_t nbyte, off_t offset = 0) -> ssize_t {
assert(Runtime::inRuntime());
IoContext *io = IoContext::getWorkerIo();
return io->readFileAndWait(fildes, buf, nbyte, offset, read_all);
return io->readFileAndWait(fildes, buf, nbyte, offset);
}
/**
......@@ -274,16 +264,14 @@ inline auto readFileAndWait(int fildes, void *buf, size_t nbyte, off_t offset =
* @param buf source buffer
* @param nbyte amount of bytes to write
* @param offset offset in the file
* @param write_all If true signal this future only all nbyte were written
* or no further data can be written.
*
* @return Future object which signals the completion of the write request
*/
inline auto writeFile(int fildes, const void *buf, size_t nbyte, off_t offset = 0,
bool write_all = true) -> std::unique_ptr<Future> {
inline auto writeFile(int fildes, const void *buf, size_t nbyte, off_t offset = 0)
-> std::unique_ptr<Future> {
assert(Runtime::inRuntime());
IoContext *io = IoContext::getWorkerIo();
return io->writeFile(fildes, buf, nbyte, offset, write_all);
return io->writeFile(fildes, buf, nbyte, offset);
}
/**
......@@ -299,16 +287,14 @@ inline auto writeFile(int fildes, const void *buf, size_t nbyte, off_t offset =
* @param buf source buffer
* @param nbyte amount of bytes to write
* @param offset offset in the file
* @param write_all If true return only to the user if all nbyte were written
* or no further data can be written.
*
* @return -1 on error, otherwise the number of bytes written
*/
inline auto writeFileAndWait(int fildes, const void *buf, size_t nbyte, off_t offset = 0,
bool write_all = true) -> ssize_t {
inline auto writeFileAndWait(int fildes, const void *buf, size_t nbyte, off_t offset = 0)
-> ssize_t {
assert(Runtime::inRuntime());
IoContext *io = IoContext::getWorkerIo();
return io->writeFileAndWait(fildes, buf, nbyte, offset, write_all);
return io->writeFileAndWait(fildes, buf, nbyte, offset);
}
/**
......
......@@ -2,64 +2,39 @@
// Copyright © 2020 Florian Fischer
#include "Future.hpp"
#include <liburing.h> // for io_uring_prep_*
#include <sys/socket.h> // for socklen_t, recv, send, MSG_DON...
#include <sys/types.h> // for off_t, ssize_t
#include <sys/types.h> // for off_t, ssize_t
#include <cassert> // for assert
#include <cerrno> // for errno, EAGAIN, EWOULDBLOCK, EINTR
#include <cstdlib> // for abort
#include <ostream> // for operator<<, ostream, basic_ost...
#include "BinaryPrivateSemaphore.hpp" // for BinaryPrivateSemaphore
#include "CallerEnvironment.hpp" // for CallerEnvironment
#include "Debug.hpp" // for LogSubsystem, LogSubsyst...
#include "Runtime.hpp" // for Runtime
#include "io/IoContext.hpp" // for emper::io::Stats
#include "io/Operation.hpp" // for emper::io::Operation
#include "io/Stats.hpp" // for emper::io::Stats
struct __kernel_timespec;
namespace emper::io {
template <CallerEnvironment callerEnvironment>
void Future::tryComplete(int32_t res, bool syscall) {
void Future::complete(int32_t res, bool syscall) {
LOGD("Complete " << op << " Future");
IoContext *io = IoContext::getIo<callerEnvironment>();
// The IO request is not finished yet
if (res == -EAGAIN || res == -EWOULDBLOCK) {
// resubmit future if we could not complete it
LOGD("received " << (res == -EAGAIN ? "EAGAIN" : "EWOULDBLOCK") << " -> resubmitting " << op
<< " request " << this);
resubmit:
io->stats.record_completion(op, res, partialCompletion, len, syscall);
this->submit<callerEnvironment>();
return;
}
// We expect partial completions and made progress
if (partialCompletion != Future::DISABLE_PARTIAL_COMPLETION && res > 0) {
partialCompletion += res;
// write/send: we have not written the whole buffer yet
// read: we have not filled the buffer yet
if ((size_t)partialCompletion < len) {
goto resubmit;
}
}
// Async IO requests should not return early with out doing work
assert(res != -EAGAIN || res != -EWOULDBLOCK);
io->stats.record_completion(op, res, partialCompletion, len, syscall);
io->stats.record_completion(op, res, len, syscall);
// We should immediatly return to the user
// or made no progress (socket is closed)
// or an error arose
returnValue = res;
sem.signal<callerEnvironment>();
}
// show the compiler our template incarnations
template void Future::tryComplete<CallerEnvironment::EMPER>(int32_t res, bool syscall);
template void Future::tryComplete<CallerEnvironment::ANYWHERE>(int32_t res, bool syscall);
template void Future::complete<CallerEnvironment::EMPER>(int32_t res, bool syscall);
template void Future::complete<CallerEnvironment::ANYWHERE>(int32_t res, bool syscall);
auto Future::wait() -> int32_t {
LOGD("Waiting on " << op << " Future");
......@@ -80,96 +55,12 @@ auto Future::waitAndSetErrno() -> ssize_t {
return res;
}
auto Future::wait(ssize_t &partialCompletion) -> int32_t {
int32_t res = wait();
partialCompletion = this->partialCompletion;
return res;
}
template <CallerEnvironment callerEnvironment>
void Future::submit() {
IoContext *io = IoContext::getIo<callerEnvironment>();
assert(Runtime::inRuntime());
IoContext *io = IoContext::getWorkerIo();
LOGD("submit " << op << " Future to IoContext " << io);
io->submit(*this);
}
// show the compiler our template incarnations
template void Future::submit<CallerEnvironment::EMPER>();
template void Future::submit<CallerEnvironment::ANYWHERE>();
void Future::prepareSqe(struct io_uring_sqe *sqe) {
switch (op) {
case Operation::RECV:
io_uring_prep_recv(sqe, fd, buf, len, offsetOrFlags);
break;
case Operation::CONNECT:
io_uring_prep_connect(sqe, fd, (const struct sockaddr *)buf, (socklen_t)len);
break;
case Operation::ACCEPT:
io_uring_prep_accept(sqe, fd, (struct sockaddr *)buf, (socklen_t *)len, 0);
break;
case Operation::READ:
if (partialCompletion == -1) {
io_uring_prep_read(sqe, fd, buf, len, offsetOrFlags);
} else {
io_uring_prep_read(sqe, fd, (char *)buf + partialCompletion, len - partialCompletion,
offsetOrFlags + partialCompletion);
}
break;
case Operation::WRITE:
if (partialCompletion == -1) {
io_uring_prep_write(sqe, fd, buf, len, offsetOrFlags);
} else {
// TODO: think about partial writes with incremental offset!
// Are transparent incremental writes on files without O_APPEND even reasonable?
io_uring_prep_write(sqe, fd, (char *)buf + partialCompletion, len - partialCompletion,
offsetOrFlags + partialCompletion);
}
break;
case Operation::SEND:
if (partialCompletion == -1) {
io_uring_prep_send(sqe, fd, buf, len, offsetOrFlags);
} else {
io_uring_prep_send(sqe, fd, (char *)buf + partialCompletion, len - partialCompletion,
offsetOrFlags);
}
break;
case Operation::CLOSE:
io_uring_prep_close(sqe, fd);
break;
case Operation::LINK_TIMEOUT:
io_uring_prep_link_timeout(sqe, (struct ::__kernel_timespec *)buf, 0);
break;
case Operation::TIMEOUT:
io_uring_prep_timeout(sqe, (struct ::__kernel_timespec *)buf, 0, 0);
break;
default:
abort();
}
io_uring_sqe_set_data(sqe, this);
}
void Future::trySyscallCompletion() {
LOGD("Try " << op << " syscall");
ssize_t res;
switch (op) {
case Operation::RECV:
res = ::recv(fd, buf, len, offsetOrFlags | MSG_DONTWAIT);
break;
case Operation::SEND:
res = ::send(fd, buf, len, offsetOrFlags | MSG_DONTWAIT);
break;
default:
abort();
}
if (res < 0) {
res = -errno;
}
this->tryComplete(res, true);
}
} // namespace emper::io
......@@ -2,19 +2,21 @@
// Copyright © 2020 Florian Fischer
#pragma once
#include <liburing.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <cassert>
#include <cerrno>
#include <cstddef>
#include <cstdint>
#include "BinaryPrivateSemaphore.hpp"
#include "CallerEnvironment.hpp"
#include "Debug.hpp"
#include "Emper.hpp"
#include "io/Operation.hpp"
struct io_uring_sqe;
struct __kernel_timespec;
namespace emper::io {
......@@ -24,7 +26,7 @@ namespace emper::io {
class Future : public Logger<LogSubsystem::IO> {
friend class IoContext;
private:
protected:
BPS sem;
/* IO operation to perform */
......@@ -44,30 +46,14 @@ class Future : public Logger<LogSubsystem::IO> {
*/
int32_t returnValue = 0;
/**
* This member is used internally to handle partial completion.
* If initialized with -1 it signals that the IO request should immediately return to the user.
* A value >= 0 signals that this IO request should not * return to the user until it is
* fully completed or a persistent error arises.
* The IO request is considered fully completed if partialCompletion == len.
*
* When the IO request returns to the user the combination of @c future.returnValue and
* @c future.partialCompletion can signal partial completion.
*/
ssize_t partialCompletion;
protected:
// return immediately to the user
static const ssize_t DISABLE_PARTIAL_COMPLETION = -1;
// try to fully complete the request
static const ssize_t ENABLE_PARTIAL_COMPLETION = 0;
private:
// Dependency futures will be submitted to the io_uring before
// this and will be linked to this IO request using IOSQE_IO_LINK
Future* dependency = nullptr;
void prepareSqe(io_uring_sqe* sqe);
/*
* @brief prepare a sqe for this IO operation
*/
virtual void prepareSqe(struct io_uring_sqe* sqe) = 0;
/**
* @brief Try to complete this IO request using the result of an IO operation.
......@@ -77,25 +63,17 @@ class Future : public Logger<LogSubsystem::IO> {
* @param syscall Boolean to indicate if the result originated from a syscall or io_uring
*/
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
void tryComplete(int32_t res, bool syscall = false);
/* try to complete IO request using an IO syscall */
void trySyscallCompletion();
void complete(int32_t res, bool syscall = false);
protected:
Future(Operation op, int fd, void* buf, size_t len, int offsetOrFlags,
ssize_t partialCompletion = DISABLE_PARTIAL_COMPLETION)
: op(op), fd(fd), buf(buf), len(len), offsetOrFlags(offsetOrFlags) {
assert(partialCompletion == DISABLE_PARTIAL_COMPLETION ||
partialCompletion == ENABLE_PARTIAL_COMPLETION);
this->partialCompletion = partialCompletion;
};
Future(Operation op, int fd, void* buf, size_t len, int offsetOrFlags)
: op(op), fd(fd), buf(buf), len(len), offsetOrFlags(offsetOrFlags){};
public:
virtual ~Future(){};
/*
* @brief reset the Future
*
* Reset the BPS and the partialCompletion.
* Resetting a Future is useful if we wan't to use the same Future object
* multiple times.
* A Future for a specific reoccurring IO operation can be created and reused
......@@ -103,9 +81,6 @@ class Future : public Logger<LogSubsystem::IO> {
*/
inline void reset() {
LOGD("Resetting Future");
if (partialCompletion != -1) {
partialCompletion = 0;
}
sem.reset();
}
......@@ -149,9 +124,11 @@ class Future : public Logger<LogSubsystem::IO> {
/*
* @brief submit Future for asynchronous completion
*
* submit must be called withn the runtime because it submitts the future to
* the current worker's IoContext
*/
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
void submit();
virtual void submit();
/**
* @brief Block till the IO request is completed
......@@ -167,76 +144,104 @@ class Future : public Logger<LogSubsystem::IO> {
*/
auto waitAndSetErrno() -> ssize_t;
/**
* @brief Block till the IO request is completed
*
* @param[out] partialCompletion The amount of work done till an persistent error arose
*
* @return return the result received from the io_uring
*/
auto wait(ssize_t& partialCompletion) -> int32_t;
/*
* @brief Equivalent to calling wait() after calling submit()
*/
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
inline auto submitAndWait() -> int32_t {
submit<callerEnvironment>();
submit();
return wait();
}
/*
* @brief Equivalent to calling wait(partialCompletion) after calling submit()
*/
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
inline auto submitAndWait(ssize_t& partialCompletion) -> int32_t {
submit<callerEnvironment>();
return wait(partialCompletion);
}
};
class SendFuture : public Future {
public:
SendFuture(int socket, const void* buffer, size_t length, int flags, bool send_all = true)
: Future(Operation::SEND, socket, const_cast<void*>(buffer), length, flags,
send_all ? ENABLE_PARTIAL_COMPLETION : DISABLE_PARTIAL_COMPLETION){};
SendFuture(int socket, const void* buffer, size_t length, int flags)
: Future(Operation::SEND, socket, const_cast<void*>(buffer), length, flags){};
void prepareSqe(struct io_uring_sqe* sqe) {
io_uring_prep_send(sqe, fd, buf, len, offsetOrFlags);
};
void submit() {
if constexpr (emper::IO_TRY_SYSCALL) {
ssize_t res = ::send(fd, buf, len, offsetOrFlags | MSG_DONTWAIT);
if (res > -1 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
complete(res > 0 ? res : -errno);
}
}
Future::submit();
}
};
class RecvFuture : public Future {
public:
RecvFuture(int socket, void* buffer, size_t length, int flags)
: Future(Operation::RECV, socket, buffer, length, flags){};
void prepareSqe(struct io_uring_sqe* sqe) {
io_uring_prep_recv(sqe, fd, buf, len, offsetOrFlags);
};
void submit() {
if constexpr (emper::IO_TRY_SYSCALL) {
ssize_t res = ::recv(fd, buf, len, offsetOrFlags | MSG_DONTWAIT);
if (res > -1 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
complete(res > 0 ? res : -errno);
}
}
Future::submit();
}
};
class ConnectFuture : public Future {
public:
ConnectFuture(int socket, const struct sockaddr* address, socklen_t address_len)
: Future(Operation::CONNECT, socket, (void*)address, address_len, 0){};
void prepareSqe(struct io_uring_sqe* sqe) {
io_uring_prep_connect(sqe, fd, (const struct sockaddr*)buf, (socklen_t)len);
};
};
class AcceptFuture : public Future {
public:
AcceptFuture(int socket, const struct sockaddr* address, socklen_t* address_len)
: Future(Operation::ACCEPT, socket, (void*)address, (size_t)address_len, 0){};
void prepareSqe(struct io_uring_sqe* sqe) {
io_uring_prep_accept(sqe, fd, (struct sockaddr*)buf, (socklen_t*)len, 0);
};
};
class ReadFuture : public Future {
public:
ReadFuture(int fildes, void* buf, size_t nbyte, int offset, bool read_all = false)
: Future(Operation::READ, fildes, buf, nbyte, offset,
read_all ? ENABLE_PARTIAL_COMPLETION : DISABLE_PARTIAL_COMPLETION){};
ReadFuture(int fildes, void* buf, size_t nbyte, int offset)
: Future(Operation::READ, fildes, buf, nbyte, offset){};
void prepareSqe(struct io_uring_sqe* sqe) {
io_uring_prep_read(sqe, fd, buf, len, offsetOrFlags);
};
};
class WriteFuture : public Future {
public:
WriteFuture(int fildes, const void* buf, size_t nbyte, int offset, bool write_all = true)
: Future(Operation::WRITE, fildes, const_cast<void*>(buf), nbyte, offset,
write_all ? ENABLE_PARTIAL_COMPLETION : DISABLE_PARTIAL_COMPLETION){};
WriteFuture(int fildes, const void* buf, size_t nbyte, int offset)
: Future(Operation::WRITE, fildes, const_cast<void*>(buf), nbyte, offset){};
void prepareSqe(struct io_uring_sqe* sqe) {
io_uring_prep_write(sqe, fd, buf, len, offsetOrFlags);
};
};
class CloseFuture : public Future {
public:
CloseFuture(int fildes) : Future(Operation::CLOSE, fildes, nullptr, 0, 0){};
void prepareSqe(struct io_uring_sqe* sqe) { io_uring_prep_close(sqe, fd); };
};
/*
......@@ -265,6 +270,10 @@ class TimeoutWrapper : public Future {
: Future(Operation::LINK_TIMEOUT, 0, (void*)&ts, 0, 0) {
addDependency(future);
};
void prepareSqe(struct io_uring_sqe* sqe) {
io_uring_prep_link_timeout(sqe, (struct ::__kernel_timespec*)buf, 0);
};
};
/*
......@@ -275,5 +284,9 @@ class TimeoutWrapper : public Future {
class AlarmFuture : public Future {
public:
AlarmFuture(struct __kernel_timespec& ts) : Future(Operation::TIMEOUT, 0, (void*)&ts, 0, 0){};
void prepareSqe(struct io_uring_sqe* sqe) {
io_uring_prep_timeout(sqe, (struct ::__kernel_timespec*)buf, 0, 0);
};
};
} // namespace emper::io
......@@ -11,7 +11,6 @@
#include <atomic> // for atomic
#include <cassert> // for assert
#include <cerrno> // for errno, EAGAIN, EWOULDBLOCK, EINTR
#include <chrono> // for nanoseconds
#include <cstdio> // for perrror
#include <cstdlib> // for exit
#include <memory> // for make_unique, unique_ptr
......@@ -31,11 +30,6 @@
#include <ostream> // for basic_osteram::operator<<, operator<<
#endif
// use the most significant bit of a pointer to differ between an IoContext and a Future
// in the global IoContext's CQ
static const uintptr_t IOCONTEXT_TAG = 1L << (sizeof(size_t) * 8 - 1);
static const uintptr_t IOCONTEXT_TAG_MASK = IOCONTEXT_TAG - 1;
namespace emper::io {
thread_local IoContext *IoContext::workerIo = nullptr;
......@@ -58,6 +52,8 @@ auto IoContext::prepareFutureChain(Future &future, bool recursion) -> int {
assert(sqe);
future.prepareSqe(sqe);
io_uring_sqe_set_data(sqe, &future);
if (recursion) {
sqe->flags |= IOSQE_IO_LINK;
}
......@@ -65,7 +61,6 @@ auto IoContext::prepareFutureChain(Future &future, bool recursion) -> int {
return prepared + 1;
}
template <CallerEnvironment callerEnvironment>
void IoContext::submit(Future &future) {
LOGD("submitting " << future.op << " Future " << &future
<< (future.dependency ? " and it's dependencies" : ""));
......@@ -91,12 +86,6 @@ void IoContext::submit(Future &future) {
logI("io_submit returned EBUSY");
}
if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) {
// we are not busy looping in the globalIo
stats.record_io_submit_full_cq(std::chrono::nanoseconds(0));
return;
}
TIME_NS(
{
do {
......@@ -122,13 +111,9 @@ void IoContext::submit(Future &future) {
// offloading it to the async backend. See io_uring_enter(2).
// Immediate offloading can be enforced by setting the IOSQE_ASYNC flag in the sqe.