From 5f29bfc89ee5a3f4eb78b8dbd46dea7d514c191c Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fischer@muhq.space>
Date: Wed, 28 Jul 2021 15:57:41 +0200
Subject: [PATCH] [io.hpp] add blocking functions using timeouts

---
 emper/io.hpp             |  99 +++++++++++++++++++++++++++++
 tests/io/TimeoutTest.cpp | 133 +++++++++++++++++++++++++++++++++++++++
 tests/io/meson.build     |   8 +++
 3 files changed, 240 insertions(+)
 create mode 100644 tests/io/TimeoutTest.cpp

diff --git a/emper/io.hpp b/emper/io.hpp
index de7203bf..9cd813d7 100644
--- a/emper/io.hpp
+++ b/emper/io.hpp
@@ -65,6 +65,28 @@ inline auto recvAndWait(int socket, void *buffer, size_t length, int flags) -> s
 	return future.waitAndSetErrno();
 }
 
+/**
+ * @brief Blocking recv mimicking POSIX recv(3) with timeout
+ *
+ * This method must be called from inside the emper runtime because it uses
+ * the worker-local IoContext
+ *
+ * @param socket file descriptor for this operation
+ * @param buffer destination buffer
+ * @param length length of the message in bytes
+ * @param flags type of message reception
+ * @param ts Time to wait before aborting the request
+
+ * @return -1 on error, 0 when receiving from a closed socket, otherwise the received bytes
+ */
+inline auto recvAndTryWait(int socket, void *buffer, size_t length, int flags,
+													 TimeoutWrapper::Timespec &ts) -> ssize_t {
+	RecvFuture future(socket, buffer, length, flags);
+	TimeoutWrapper wrapper(future, ts);
+	wrapper.submitAndWait();
+	return future.waitAndSetErrno();
+}
+
 /**
  * @brief Non-blocking send mimicking POSIX send(3)
  *
@@ -110,6 +132,31 @@ inline auto sendAndWait(int socket, const void *buffer, size_t length, int flags
 	return future.waitAndSetErrno();
 }
 
+/**
+ * @brief Blocking send mimicking POSIX send(3) with timeout
+ *
+ * This method must be called from inside the emper runtime because it uses
+ * the worker-local IoContext
+ *
+ * @param socket file descriptor for this operation
+ * @param buffer source buffer
+ * @param length length of the message in bytes
+ * @param flags type of message transmission
+ * @param ts Time to wait before aborting the request
+ * @param send_all If true return to the user only if all bytes are sent or
+ *        sending further bytes is not possible
+ *
+ * @return -1 on error, otherwise the number of sent bytes
+ */
+inline auto sendAndTryWait(int socket, const void *buffer, size_t length, int flags,
+													 TimeoutWrapper::Timespec &ts, bool send_all = true) -> ssize_t {
+	void *mut_buf = const_cast<void *>(buffer);
+	SendFuture future(socket, mut_buf, length, flags, send_all);
+	TimeoutWrapper wrapper(future, ts);
+	wrapper.submitAndWait();
+	return future.waitAndSetErrno();
+}
+
 /**
  * @brief Non-blocking connect mimicking POSIX connect(3)
  *
@@ -253,6 +300,30 @@ inline auto readFileAndWait(int fildes, void *buf, size_t nbyte, off_t offset =
 	return future.waitAndSetErrno();
 }
 
+/**
+ * @brief Blocking read for regular files mimicking POSIX recv(3) with timeout
+ *
+ * This method must be called from inside the emper runtime because it uses
+ * the worker-local IoContext
+ *
+ * @param fildes file descriptor to the regular file to be written to
+ * @param buf destination buffer
+ * @param nbyte amount of bytes to read
+ * @param offset offset in the file
+ * @param ts Time to wait before aborting the request
+ * @param read_all If true return only to the user if the buffer is completely
+ *        filled or no further data can be read.
+ *
+ * @return -1 on error, 0 when receiving from a closed socket, otherwise the received bytes
+ */
+inline auto readFileAndTryWait(int fildes, void *buf, size_t nbyte, TimeoutWrapper::Timespec &ts,
+															 off_t offset = -1, bool read_all = false) -> ssize_t {
+	ReadFuture future(fildes, buf, nbyte, offset, read_all);
+	TimeoutWrapper wrapper(future, ts);
+	wrapper.submitAndWait();
+	return future.waitAndSetErrno();
+}
+
 /**
  * @brief Non-blocking write for regular files mimicking POSIX write(3)
  *
@@ -303,6 +374,34 @@ inline auto writeFileAndWait(int fildes, const void *buf, size_t nbyte, off_t of
 	return future.waitAndSetErrno();
 }
 
+/**
+ * @brief Blocking write for regular files mimicking POSIX write(3) with timeout
+ *
+ * Currently only writing to regular files is tested and supported.
+ * Writing to other file types could work but may result in undefined behavior.
+ *
+ * This method must be called from inside the emper runtime because it uses
+ * the worker-local IoContext
+ *
+ * @param fildes file descriptor to the regular file to be written to
+ * @param buf source buffer
+ * @param nbyte amount of bytes to write
+ * @param ts Time to wait before aborting the request
+ * @param offset offset in the file
+ * @param write_all If true return only to the user if all nbyte were written
+ *        or no further data can be written.
+ *
+ * @return -1 on error, otherwise the number of bytes written
+ */
+inline auto writeFileAndTryWait(int fildes, const void *buf, size_t nbyte,
+																TimeoutWrapper::Timespec &ts, off_t offset = -1,
+																bool write_all = true) -> ssize_t {
+	WriteFuture future(fildes, buf, nbyte, offset, write_all);
+	TimeoutWrapper wrapper(future, ts);
+	wrapper.submitAndWait();
+	return future.waitAndSetErrno();
+}
+
 /**
  * @brief Non-blocking writev mimicking POSIX writev(3)
  *
diff --git a/tests/io/TimeoutTest.cpp b/tests/io/TimeoutTest.cpp
new file mode 100644
index 00000000..9e9f7d3f
--- /dev/null
+++ b/tests/io/TimeoutTest.cpp
@@ -0,0 +1,133 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2021 Florian Fischer
+#include <netinet/in.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include <cassert>
+#include <cerrno>
+#include <cstdint>
+#include <cstring>
+
+#include "Common.hpp"
+#include "CountingPrivateSemaphore.hpp"
+#include "Debug.hpp"
+#include "Future.hpp"
+#include "emper.hpp"
+#include "io.hpp"
+
+using emper::io::ReadFuture;
+using emper::io::TimeoutWrapper;
+
+static void setupSockPair(int& sock1, int& sock2) {
+	const int PORT = 4242;
+
+	struct sockaddr_in addr;
+	memset(&addr, 0, sizeof(addr));
+
+	addr.sin_family = AF_INET;
+	addr.sin_addr.s_addr = INADDR_ANY;
+	addr.sin_port = htons(PORT);
+
+	int listen_sock = socket(AF_INET, SOCK_STREAM, 0);
+	if (listen_sock == -1) DIE_MSG_ERRNO("creating listen socket failed");
+
+	int reuseaddr = 1;
+	if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1)
+		DIE_MSG_ERRNO("setsockopt failed");
+
+	if (bind(listen_sock, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) == -1)
+		DIE_MSG_ERRNO("bind failed");
+
+	if (listen(listen_sock, 1) != 0) DIE_MSG_ERRNO("listen failed");
+
+	sock2 = socket(AF_INET, SOCK_STREAM, 0);
+	if (sock2 == -1) DIE_MSG_ERRNO("creating client socket failed");
+
+	CPS cps;
+	spawn(
+			[&]() {
+				if ((sock1 = emper::io::acceptAndWait(listen_sock, nullptr, nullptr)) == -1)
+					DIE_MSG_ERRNO("accept failed");
+			},
+			cps);
+
+	if (emper::io::connectAndWait(sock2, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) ==
+			-1)
+		DIE_MSG_ERRNO("connect failed");
+
+	cps.wait();
+
+	emper::io::closeAndForget(listen_sock);
+}
+
+void sockTest() {
+	int sock1, sock2;
+	DBG("setup sockets");
+	setupSockPair(sock1, sock2);
+
+	uint64_t recvBuf;
+	DBG("submit recv");
+	TimeoutWrapper::Timespec ts = {.tv_sec = 1, .tv_nsec = 0};
+	ssize_t res = emper::io::recvAndTryWait(sock1, &recvBuf, sizeof(recvBuf), 0, ts);
+	assert(res == -1);
+	assert(errno == ECANCELED);
+
+	// TODO: find a way to test sendAndTryWait
+	// // allocate a huge buffer which is surely bigger then the sockets buffer and
+	// // thus causing the send to block triggering the timeout
+	// const ssize_t MEMB = 1 << 20;
+	// auto* sendBuf = new char[MEMB];
+	// DBG("submit send");
+	// res = emper::io::sendAndTryWait(sock1, &sendBuf, MEMB, 0, ts, true);
+	// assert(res == -1);
+	// assert(errno == ECANCELED);
+	// delete[] sendBuf;
+	emper::io::closeAndForget(sock1);
+	emper::io::closeAndForget(sock2);
+}
+
+void readTest() {
+	int efd = eventfd(0, EFD_SEMAPHORE);
+	if (efd == -1) {
+		DIE_MSG_ERRNO("eventfd failed");
+	}
+
+	uint64_t readBuf;
+	TimeoutWrapper::Timespec ts = {.tv_sec = 1, .tv_nsec = 0};
+	ssize_t res = emper::io::readFileAndTryWait(efd, &readBuf, sizeof(readBuf), ts);
+	assert(res == -1);
+	assert(errno == ECANCELED);
+
+	emper::io::closeAndForget(efd);
+}
+
+void writeTest() {
+	int efd = eventfd(0, EFD_SEMAPHORE);
+	if (efd == -1) {
+		DIE_MSG_ERRNO("eventfd failed");
+	}
+
+	// fill up the eventfd
+	uint64_t writeBuf = 0xfffffffffffffffe;
+	if (emper::io::writeFileAndWait(efd, &writeBuf, sizeof(writeBuf)) == -1) {
+		DIE_MSG("eventfd prep write failed");
+	}
+
+	writeBuf = 1;
+	TimeoutWrapper::Timespec ts = {.tv_sec = 1, .tv_nsec = 0};
+	ssize_t res = emper::io::writeFileAndTryWait(efd, &writeBuf, sizeof(writeBuf), ts);
+	assert(res == -1);
+	// write requests can't be canceled when in execution so this
+	// will return as interupted
+	assert(errno == EINTR);
+
+	emper::io::closeAndForget(efd);
+}
+
+void emperTest() {
+	sockTest();
+	readTest();
+	writeTest();
+}
diff --git a/tests/io/meson.build b/tests/io/meson.build
index ab25a0aa..234a6016 100644
--- a/tests/io/meson.build
+++ b/tests/io/meson.build
@@ -48,6 +48,14 @@ tests += [
 		'test_runner': 'io',
 	},
 
+	{
+		'source': files('TimeoutTest.cpp'),
+		'name': 'TimeoutTest',
+		'description': 'Test io.hpp functions using timeouts',
+		'test_suite': 'io',
+		'test_runner': 'io',
+	},
+
 	{
 		'source': files('TimeoutWrapperTest.cpp'),
 		'name': 'TimeoutWrapperTest',
-- 
GitLab