Future.hpp 7.95 KB
Newer Older
1
2
3
4
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Fischer
#pragma once

5
#include <liburing.h>
6
7
8
#include <sys/socket.h>
#include <sys/types.h>

9
#include <cerrno>
10
11
12
13
14
15
#include <cstddef>
#include <cstdint>

#include "BinaryPrivateSemaphore.hpp"
#include "CallerEnvironment.hpp"
#include "Debug.hpp"
16
#include "Emper.hpp"
17
18
#include "io/Operation.hpp"

19
struct __kernel_timespec;
20
21
22
23
24
25
26
27
28

namespace emper::io {

/*
 * @brief Future representing an IO request which can be awaited
 */
class Future : public Logger<LogSubsystem::IO> {
	friend class IoContext;

29
 protected:
30
31
32
33
34
35
36
	BPS sem;

	/* IO operation to perform */
	Operation op;

	/* Arguments for the IO operation */
	int fd;
37
38
39
	void* buf;
	size_t len;
	int offsetOrFlags;
40
41
42
43

	/**
	 * Return value of the operation.
	 * __s32 is used by liburing for the res field in its cqe struct
44
	 * This value is returned by Future::wait() and can only differentiate between
45
46
	 * failure or full success.
	 */
47
	int32_t returnValue = 0;
48

49
50
51
52
	// Dependency futures will be submitted to the io_uring before
	// this and will be linked to this IO request using IOSQE_IO_LINK
	Future* dependency = nullptr;

53
54
55
56
	/*
	 * @brief prepare a sqe for this IO operation
	 */
	virtual void prepareSqe(struct io_uring_sqe* sqe) = 0;
57
58
59
60
61
62
63
64
65

	/**
	 * @brief Try to complete this IO request using the result of an IO operation.
	 *        If the IO request is completed signal its semaphore otherwise resubmit it.
	 *
	 * @param res Result from an IO operation from a io_uring_cqe or a common syscall
	 * @param syscall Boolean to indicate if the result originated from a syscall or io_uring
	 */
	template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
66
	void complete(int32_t res, bool syscall = false);
67

68
 protected:
69
70
	Future(Operation op, int fd, void* buf, size_t len, int offsetOrFlags)
			: op(op), fd(fd), buf(buf), len(len), offsetOrFlags(offsetOrFlags){};
71

72
 public:
73
	virtual ~Future(){};
74
75
76
77
78
79
80
81
82
83
84
85
86
87
	/*
	 * @brief reset the Future
	 *
	 * Resetting a Future is useful if we wan't to use the same Future object
	 * multiple times.
	 * A Future for a specific reoccurring IO operation can be created and reused
	 * in a submit, reset loop.
	 */
	inline void reset() {
		LOGD("Resetting Future");

		sem.reset();
	}

88
89
90
91
92
93
94
95
96
	/*
	 * @brief add Future as dependency
	 *
	 * Dependencies which must complete before this future.
	 * This is assured by linking the futures' sqes in the io_uring using the IOSQE_IO_LINK flag.
	 * If a request in the link fails all dependent futures will return -1 and
	 * set errno to ECANCELED
	 *
	 * See: <a href="https://unixism.net/loti/tutorial/link_liburing.html">Liburing linking
97
98
99
	 * requests</a>
	 * The emper equivalent to the example from the liburing documentation.
	 *
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
	 * @code
	 * int fd = open(FILE_NAME, O_RDWR|O_TRUNC|O_CREAT, 0644);
	 * if (fd < 0 ) {
	 * perror("open");
	 * return 1;
	 * }
	 *
	 * char* msg = "Hallo emper!\n";
	 * WriteFuture writeFuture(fd, &msg, strlen(msg), 0);
	 *
	 * char buf[32];
	 * ReadFuture readFuture(fd, &buf, sizeof(buf), 0);
	 * readFuture.addDependency(writeFuture);
	 *
	 * CloseFuture closeFuture(fd);
	 * closeFuture.addDependency(readFuture);
	 *
117
	 * // Blocks till all three futures are completed
118
119
120
121
122
123
124
	 * ssize_t bytes_read = closeFuture.submitAndWait();
	 * ...
	 * @endcode
	 *
	 */
	inline void addDependency(Future& dependency) { this->dependency = &dependency; }

125
	/*
126
	 * @brief submit Future for asynchronous completion
127
128
129
	 *
	 * submit must be called withn the runtime because it submitts the future to
	 * the current worker's IoContext
130
	 */
131
	virtual void submit();
132
133
134
135

	/**
	 * @brief Block till the IO request is completed
	 *
136
137
138
139
140
141
142
	 * @return return the result received from the io_uring
	 */
	auto wait() -> int32_t;

	/**
	 * @brief Block till the IO request is completed and set errno on error
	 *
143
144
	 * @return -1 on error and errno will be set, otherwise the return value of the IO request
	 */
145
	auto waitAndSetErrno() -> ssize_t;
146
147
148
149

	/*
	 * @brief Equivalent to calling wait() after calling submit()
	 */
150
	inline auto submitAndWait() -> int32_t {
151
		submit();
152
153
154
		return wait();
	}
};
155
156
157

class SendFuture : public Future {
 public:
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
	SendFuture(int socket, const void* buffer, size_t length, int flags)
			: Future(Operation::SEND, socket, const_cast<void*>(buffer), length, flags){};

	void prepareSqe(struct io_uring_sqe* sqe) {
		io_uring_prep_send(sqe, fd, buf, len, offsetOrFlags);
	};

	void submit() {
		if constexpr (emper::IO_TRY_SYSCALL) {
			ssize_t res = ::send(fd, buf, len, offsetOrFlags | MSG_DONTWAIT);

			if (res > -1 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
				complete(res > 0 ? res : -errno);
			}
		}

		Future::submit();
	}
176
177
178
179
180
181
};

class RecvFuture : public Future {
 public:
	RecvFuture(int socket, void* buffer, size_t length, int flags)
			: Future(Operation::RECV, socket, buffer, length, flags){};
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197

	void prepareSqe(struct io_uring_sqe* sqe) {
		io_uring_prep_recv(sqe, fd, buf, len, offsetOrFlags);
	};

	void submit() {
		if constexpr (emper::IO_TRY_SYSCALL) {
			ssize_t res = ::recv(fd, buf, len, offsetOrFlags | MSG_DONTWAIT);

			if (res > -1 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
				complete(res > 0 ? res : -errno);
			}
		}

		Future::submit();
	}
198
199
200
201
202
203
};

class ConnectFuture : public Future {
 public:
	ConnectFuture(int socket, const struct sockaddr* address, socklen_t address_len)
			: Future(Operation::CONNECT, socket, (void*)address, address_len, 0){};
204
205
206
207

	void prepareSqe(struct io_uring_sqe* sqe) {
		io_uring_prep_connect(sqe, fd, (const struct sockaddr*)buf, (socklen_t)len);
	};
208
209
210
211
212
213
};

class AcceptFuture : public Future {
 public:
	AcceptFuture(int socket, const struct sockaddr* address, socklen_t* address_len)
			: Future(Operation::ACCEPT, socket, (void*)address, (size_t)address_len, 0){};
214
215
216
217

	void prepareSqe(struct io_uring_sqe* sqe) {
		io_uring_prep_accept(sqe, fd, (struct sockaddr*)buf, (socklen_t*)len, 0);
	};
218
219
220
221
};

class ReadFuture : public Future {
 public:
222
223
224
225
226
227
	ReadFuture(int fildes, void* buf, size_t nbyte, int offset)
			: Future(Operation::READ, fildes, buf, nbyte, offset){};

	void prepareSqe(struct io_uring_sqe* sqe) {
		io_uring_prep_read(sqe, fd, buf, len, offsetOrFlags);
	};
228
229
230
231
};

class WriteFuture : public Future {
 public:
232
233
234
235
236
237
	WriteFuture(int fildes, const void* buf, size_t nbyte, int offset)
			: Future(Operation::WRITE, fildes, const_cast<void*>(buf), nbyte, offset){};

	void prepareSqe(struct io_uring_sqe* sqe) {
		io_uring_prep_write(sqe, fd, buf, len, offsetOrFlags);
	};
238
239
240
241
242
};

class CloseFuture : public Future {
 public:
	CloseFuture(int fildes) : Future(Operation::CLOSE, fildes, nullptr, 0, 0){};
243
244

	void prepareSqe(struct io_uring_sqe* sqe) { io_uring_prep_close(sqe, fd); };
245
};
246
247

/*
248
 * @brief Add a timeout to any Future not already submitted.
249
250
251
252
 *
 * A Timeout is added to a Future by inserting an dependent IOURING_OP_LINK_TIMEOUT
 * sqe after the actual IO operation.
 * The dependent timeout will not be started like usual dependent requests after
253
254
 * the previous request is completed, it is instead armed immediately when the
 * actual request is started.
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
 * See: https://lwn.net/Articles/803932/
 *
 * @code
 * ReadFuture rf(fd, buf, buf_len, 0);
 * struct __kernel_timespec ts = {.tv_sec = 0, .tv_nsec = 0);
 * TimeoutWrapper t(rf, ts);
 * // Both futures should be awaited by calling its wait method to
 * // prevent use after free problems.
 * t.submitAndWait()
 * rf.Wait()
 * @endcode
 */
class TimeoutWrapper : public Future {
 public:
	TimeoutWrapper(Future& future, struct __kernel_timespec& ts)
			: Future(Operation::LINK_TIMEOUT, 0, (void*)&ts, 0, 0) {
		addDependency(future);
	};
273
274
275
276

	void prepareSqe(struct io_uring_sqe* sqe) {
		io_uring_prep_link_timeout(sqe, (struct ::__kernel_timespec*)buf, 0);
	};
277
278
279
280
};

/*
 * @brief Arm a timeout which will signal the future when it is reached
281
282
 *
 * If the timeout was reached it will wait() will return -ETIME.
283
284
285
286
 */
class AlarmFuture : public Future {
 public:
	AlarmFuture(struct __kernel_timespec& ts) : Future(Operation::TIMEOUT, 0, (void*)&ts, 0, 0){};
287
288
289
290

	void prepareSqe(struct io_uring_sqe* sqe) {
		io_uring_prep_timeout(sqe, (struct ::__kernel_timespec*)buf, 0, 0);
	};
291
};
292
}	 // namespace emper::io