Commit 6bd13e43 authored by Florian Fischer's avatar Florian Fischer
Browse files

[IO] slightly change the API to prevent parallel modification of an io_uring's SQ

With this change the globalCompleter does not support partial completions
and will signal a Future immediately and thus never modifying the io_uring's
SQ.
As consequence we can't give guaranties about completion if either {read, send, write}_all
is specified.
parent 51021af0
Pipeline #54296 failed with stages
in 1 minute and 27 seconds
......@@ -66,8 +66,10 @@ inline auto recv_and_wait(int socket, void *buffer, size_t length, int flags) ->
* @param buffer source buffer
* @param length length of the message in bytes
* @param flags type of message transmission
* @param send_all If true sginal the future only if all bytes are sent or
* sending further bytes is not possible
* @param send_all Try to signal the future only if all bytes are sent or
* sending further bytes is not possible. It is not guarantied that
* the future is fully completed when it is signaled. You can
* check for a full completion using Future::wait(partial_completion).
*
* @return Future object which signals the completion of the send request
*/
......@@ -87,8 +89,9 @@ inline auto send(int socket, const void *buffer, size_t length, int flags, bool
* @param buffer source buffer
* @param length length of the message in bytes
* @param flags type of message transmission
* @param send_all If true return to the user only if all bytes are sent or
* sending further bytes is not possible
* @param send_all Try to return only if all bytes are sent or
* sending further bytes is not possible. It is not guarantied that
* all bytes are sent when send_and_wait returns.
*
* @return -1 on error, otherwise the number of sent bytes
*/
......@@ -211,8 +214,10 @@ inline auto accept_and_wait(int socket, struct sockaddr *address, socklen_t *add
* @param buf destination buffer
* @param nbyte amount of bytes to read
* @param offset offset in the file
* @param read_all If true signal the future only if the buffer is completely
* filled or no further data can be read.
* @param read_all Try to signal the future only if the buffer is completely filled or
* no further data can be read. It is not guarantied that
* the future is fully completed when it is signaled. You can
* check for a full completion using Future::wait(partial_completion).
*
* @return Future object which signals the completion of the read request
*/
......@@ -232,8 +237,9 @@ inline auto read_file(int fildes, void *buf, size_t nbyte, off_t offset = 0, boo
* @param buf destination buffer
* @param nbyte amount of bytes to read
* @param offset offset in the file
* @param read_all If true return only to the user if the buffer is completely
* filled or no further data can be read.
* @param read_all Try to return only if the buffer is completely filled or
* no further data can be read. It is not guarantied that the buffer is
* filled when read_and_wait returns.
*
* @return -1 on error, otherwise the number of bytes read
*/
......@@ -256,8 +262,10 @@ inline auto read_file_and_wait(int fildes, void *buf, size_t nbyte, off_t offset
* @param buf source buffer
* @param nbyte amount of bytes to write
* @param offset offset in the file
* @param write_all If true signal this future only all nbyte were written
* or no further data can be written.
* @param write_all Try to signal the future only if all nbyte were written or
* no further data can be written. It is not guarantied that
* the future is fully completed when it is signaled. You can
* check for a full completion using Future::wait(partial_completion).
*
* @return Future object which signals the completion of the write request
*/
......@@ -280,9 +288,10 @@ inline auto write_file(int fildes, const void *buf, size_t nbyte, off_t offset =
* @param buf source buffer
* @param nbyte amount of bytes to write
* @param offset offset in the file
* @param write_all If true return only to the user if all nbyte were written
* or no further data can be written.
*
* @param write_all Try to return only if all nbyte were written or
* no further data can be written. It is not guarantied that all nbyte were
* written when write_and_wait returns.
* @return -1 on error, otherwise the number of bytes written
*/
inline auto write_file_and_wait(int fildes, const void *buf, size_t nbyte, off_t offset = 0,
......
......@@ -29,25 +29,29 @@ template <CallerEnvironment callerEnvironment>
void Future::try_complete(int32_t res, bool syscall) {
LOGD("Complete " << op << " Future");
// The IO request is not finished yet
if (res == -EAGAIN || res == -EWOULDBLOCK) {
// resubmit future if we could not complete it
LOGD("received " << (res == -EAGAIN ? "EAGAIN" : "EWOULDBLOCK") << " -> resubmitting " << op
<< " request " << this);
resubmit:
origin.stats.record_completion(op, res, partial_completion, buf_len, syscall);
this->submit<callerEnvironment>();
return;
}
// Only resubmit Futures if we are in the runtime to prevent
// parallel modification of an io_uring's SQ
if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
// The IO request is not finished yet
if (res == -EAGAIN || res == -EWOULDBLOCK) {
// resubmit future if we could not complete it
LOGD("received " << (res == -EAGAIN ? "EAGAIN" : "EWOULDBLOCK") << " -> resubmitting " << op
<< " request " << this);
resubmit:
origin.stats.record_completion(op, res, partial_completion, buf_len, syscall);
this->submit();
return;
}
// We expect partial completions and made progress
if (partial_completion != Future::DISABLE_PARTIAL_COMPLETION && res > 0) {
partial_completion += res;
// write/send: we have not written the whole buffer yet
// read: we have not filled the buffer yet
if ((size_t)partial_completion < buf_len) {
goto resubmit;
// We expect partial completions and made progress
if (partial_completion != Future::DISABLE_PARTIAL_COMPLETION && res > 0) {
partial_completion += res;
// write/send: we have not written the whole buffer yet
// read: we have not filled the buffer yet
if ((size_t)partial_completion < buf_len) {
goto resubmit;
}
}
}
......@@ -92,7 +96,6 @@ auto Future::wait(ssize_t &partial_completion) -> ssize_t {
return res;
}
template <CallerEnvironment callerEnvironment>
void Future::submit() {
LOGD("submit " << op << " Future to io_uring");
......@@ -133,10 +136,6 @@ void Future::submit() {
assert(submitted == 1);
}
// show the compiler our template incarnations
template void Future::submit<CallerEnvironment::EMPER>();
template void Future::submit<CallerEnvironment::ANYWHERE>();
void Future::prepare_sqe(struct io_uring_sqe *sqe) {
switch (op) {
case Operation::RECV:
......
......@@ -137,7 +137,6 @@ class Future : public Logger<LogSubsystem::IO> {
/*
* @brief submit Future for asynchronous completion
*/
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
void submit();
/**
......@@ -159,18 +158,16 @@ class Future : public Logger<LogSubsystem::IO> {
/*
* @brief Equivalent to calling wait() after calling submit()
*/
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
inline auto submit_and_wait() -> size_t {
submit<callerEnvironment>();
submit();
return wait();
}
/*
* @brief Equivalent to calling wait(partial_completion) after calling submit()
*/
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
inline auto submit_and_wait(ssize_t &partial_completion) -> size_t {
submit<callerEnvironment>();
submit();
return wait(partial_completion);
}
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment