IoContext.cpp 11.9 KB
Newer Older
1
2
3
4
5
6
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Fischer
#include "io/IoContext.hpp"

#include <liburing.h>						// for io_uring_get_sqe, io_uring_submit
#include <liburing/io_uring.h>	// for io_uring_cqe
7
#include <sys/eventfd.h>				// for eventfd
8
9
10
#include <sys/socket.h>					// for socklen_t, recv, send, MSG_DON...
#include <sys/types.h>					// for off_t, ssize_t

11
12
13
14
15
16
#include <atomic>		// for atomic
#include <cassert>	// for assert
#include <cerrno>		// for errno, EAGAIN, EWOULDBLOCK, EINTR
#include <cstdio>		// for perrror
#include <cstdlib>	// for exit
#include <memory>		// for make_unique, unique_ptr
17
#include <vector>		// for push_back
18
19
20
21
22

#include "CallerEnvironment.hpp"	// for CallerEnvironment
#include "Common.hpp"							// for DIE, unlikely
#include "Debug.hpp"							// for LogSubsystem, LogSubsyst...
#include "Emper.hpp"							// for IO_POLLING_IO_THREAD, DEBUG
23
#include "Runtime.hpp"						// for Runtime::getRuntime
24
#include "emper-config.h"					// IWYU pragma: keep
25
26
27
28
#include "io/Future.hpp"					// for emper::io::Future
#include "io/Operation.hpp"				// for emper::io::Operation
#include "io/Stats.hpp"						// for emper::io::Stats

29
30
31
32
#ifndef EMPER_LOG_OFF
#include <ostream>	// for basic_osteram::operator<<, operator<<
#endif

33
namespace emper::io {
34
thread_local IoContext *IoContext::workerIo = nullptr;
35

36
pthread_t IoContext::globalCompleter;
37

38
39
40
41
42
43
auto IoContext::prepareFutureChain(Future &future, bool recursion) -> int {
	int prepared = 0;
	if (future.dependency) {
		prepared += prepareFutureChain(*future.dependency, true);
	}

44
45
46
47
48
49
50
	if constexpr (emper::DEBUG) {
		reqs_in_uring++;
		uringFutureSet.insert(&future);
	}

	struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
	// The kernel consumes all sqes from the SQ during io_uring_enter
51
	// If we can't get a sqe the chain was to long for our SQ
52
53
54
	assert(sqe);

	future.prepareSqe(sqe);
55
56
	io_uring_sqe_set_data(sqe, &future);

57
58
59
60
61
62
63
64
65
66
67
68
69
70
	if (recursion) {
		sqe->flags |= IOSQE_IO_LINK;
	}

	return prepared + 1;
}

void IoContext::submit(Future &future) {
	LOGD("submitting " << future.op << " Future " << &future
										 << (future.dependency ? " and it's dependencies" : ""));
#ifdef NDEBUG
	UNUSED_ARG
#endif
	int prepared = prepareFutureChain(future);
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

	// submit the Future to the io_uring
	int submitted = io_uring_submit(&ring);

	// We can't submit our sqe because the CQ is full
	// in worker thread -> reapCompletions
	// in globalCompleter thread -> TODO: deterministically handle global full CQ
	// for now hope a jam does not happen or will solve itself
	if (unlikely(submitted == -EBUSY)) {
		if constexpr (emper::DEBUG) {
			std::stringstream sst;
			sst << "io_submit returned EBUSY trying to submit in addition to " << reqs_in_uring
					<< std::endl;
			logI(sst.str());
		} else {
			logI("io_submit returned EBUSY");
		}

		TIME_NS(
				{
					do {
						reapCompletions();
					} while ((submitted = io_uring_submit(&ring)) == -EBUSY);
				},
				stats.record_io_submit_full_cq);
	}

	if (unlikely(submitted < 0)) {
		errno = -submitted;
		DIE_MSG_ERRNO("io_uring_submit failed");
	}

103
	// submit the Future to the io_uring
104
105
106
107
108
	// TODO: handle submit errors in Future chains
	// req1 -> invalid_req -> req3
	// will submit only 2 instead of all 3 prepared sqes
	// See: https://github.com/axboe/liburing/issues/186
	assert(submitted == prepared);
109
110
111
112
113

	// io_uring will try to synchronously complete any IO request before
	// offloading it to the async backend. See io_uring_enter(2).
	// Immediate offloading can be enforced by setting the IOSQE_ASYNC flag in the sqe.
	// Try to reap a possible synchronous completion.
114
	reapCompletions();
115
116
}

117
118
119
120
121
template <CallerEnvironment callerEnvironment>
void IoContext::reapCompletions() {
	// Someone else is currently reaping completions
	if (unlikely(!cq_mutex.try_lock())) {
		return;
122
123
	}

124
	// never reap completions on the global IoContext
125
	assert(this != getGlobalIo());
126

127
128
129
130
	LOGD("Reaping completions");
	unsigned head;
	struct io_uring_cqe *cqe;
	unsigned count = 0;
131

132
133
134
135
	int err = io_uring_peek_cqe(&ring, &cqe);
	if (err) {
		if (err == -EAGAIN) {
			goto unlock;
136
		}
137
138
		errno = -err;
		DIE_MSG_ERRNO("io_uring_peek_cqe failed");
139
140
	}

141
142
	io_uring_for_each_cqe(&ring, head, cqe) {
		count++;
143

144
145
		auto *future = reinterpret_cast<Future *>(io_uring_cqe_get_data(cqe));
		assert(future);
146

147
148
149
		if constexpr (emper::DEBUG) {
			// assert that the future was previously in the uringFutureSet
			assert(uringFutureSet.erase(future) > 0);
150
151
		}

152
		future->complete<callerEnvironment>(cqe->res);
153
154
	}

155
156
	LOGD("got " << count << " cqes from the io_uring");
	io_uring_cq_advance(&ring, count);
157
158

	if constexpr (emper::DEBUG) {
159
160
		assert(count <= reqs_in_uring);
		reqs_in_uring -= count;
161
162
	}

163
164
165
166
167
168
	if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
		stats.record_worker_reaps(count);
	} else {
		stats.record_completer_reaps(count);
	}

169
170
unlock:
	cq_mutex.unlock();
171
172
}

173
174
175
176
// Show the compiler our template incarnations
template void IoContext::reapCompletions<CallerEnvironment::ANYWHERE>();
template void IoContext::reapCompletions<CallerEnvironment::EMPER>();

177
178
179
180
181
// The globalCompleter must be started after all worker io_uring eventfds are submitted
// so we don't have to synchronize the global sq
auto IoContext::globalCompleterFunc(void *arg) -> void * {
	IoContext &io = *reinterpret_cast<IoContext *>(arg);

182
183
	io.logD("submit all worker io_uring eventfds");

184
185
186
187
188
189
190
191
192
	// submit all eventfds in the SQ inserted by IoContext::submit_efd calls
	int submitted = io_uring_submit(&io.ring);
	if (unlikely(submitted < 0)) {
		DIE_MSG_ERRNO("initial global io_uring submit failed");
	}

	// We have submitted all eventfds
	assert(submitted == Runtime::getRuntime()->getWorkerCount());

193
	io.logD("start global completer loop");
194
195
	while (true) {
		struct io_uring_cqe *cqe;
196

197
198
		// wait for completions
		int err = io_uring_wait_cqe(&io.ring, &cqe);
199
200
201
202
203
204
205
206
		if (unlikely(err)) {
			if (err == -EINTR) {
				continue;
			}
			errno = -err;
			perror("io_uring_wait_cqe");
		}

207
208
		auto *workerIo = reinterpret_cast<IoContext *>(io_uring_cqe_get_data(cqe));
		assert(workerIo);
209
210
211
212
213
214
215
216

		io_uring_cqe_seen(&io.ring, cqe);

		// re-add the eventfd read
		struct io_uring_sqe *sqe = io_uring_get_sqe(&io.ring);
		// we initialized the global ring big enough we should always get a free sqe
		assert(sqe);

217
218
219
		io_uring_prep_read(sqe, workerIo->ring_eventfd, &workerIo->ring_eventfd_readbuf,
											 sizeof(workerIo->ring_eventfd_readbuf), 0);
		io_uring_sqe_set_data(sqe, reinterpret_cast<void *>(workerIo));
220
221
222
223
224
225
226

		submitted = io_uring_submit(&io.ring);

		if (unlikely(submitted < 0)) {
			errno = -submitted;
			DIE_MSG_ERRNO("re-submitting eventfd read to global_ring failed");
		}
227

228
229
		assert(submitted == 1);

230
		workerIo->reapCompletions<CallerEnvironment::ANYWHERE>();
231
	}
232
233

	return nullptr;
234
235
}

236
237
238
239
240
241
242
243
void IoContext::startGlobalCompleter(IoContext &globalIo) {
	int err = pthread_create(&globalCompleter, nullptr, globalCompleterFunc, &globalIo);
	if (unlikely(err)) {
		errno = err;
		DIE_MSG_ERRNO("Creating global completer thread failed");
	}
}

244
IoContext::IoContext(size_t uring_entries) {
245
	unsigned flags = 0;
246
	if constexpr (emper::IO_URING_SQPOLL) {
247
248
249
250
251
252
253
254
255
		// check if SQPOLL with unregistered files is supported by the kernel
		static int support = -1;

		if (support == -1) {
			support = compareKernelVersion("5.11") >= 0 ? 1 : 0;
		}

		if (!support) {
			DIE_MSG("SQPOLL with un-registered files only supportet since linux 5.11");
256
257
		}

258
259
260
		flags |= IORING_SETUP_SQPOLL;
	}

261
	auto ret = io_uring_queue_init(uring_entries, &ring, flags);
262
263
264
265
266
267
268
	if (ret < 0) {
		// skip tests if we are not able to setup an io_uring
		perror("io_uring_queue_init failed");
		exit(77);
	}
	LOGD("Ring fd is " << ring.ring_fd);

269
270
271
272
273
274
275
276
277
	ring_eventfd = eventfd(0, 0);
	if (unlikely(ring_eventfd < 0)) {
		DIE_MSG_ERRNO("creating eventfd for io_uring failed");
	}

	if (unlikely(io_uring_register_eventfd(&ring, ring_eventfd) < 0)) {
		DIE_MSG_ERRNO("io_uring_register_eventfd failed");
	}

278
	if constexpr (emper::STATS) {
279
		Stats::workerStats.push_back(&stats);
280
281
	}
}
282

283
284
285
286
// This function must not be executed in parallel because it does not synchronize
// the global io_uring's SQ
void IoContext::submit_efd() {
	// the global IoContext must be initialized to submit our eventfd
287
	assert(getGlobalIo());
288

289
	IoContext &globalIo = *getGlobalIo();
290

291
	struct io_uring_sqe *sqe = io_uring_get_sqe(&globalIo.ring);
292
293
	// we initialized the global ring big enough we should always get a free sqe
	assert(sqe);
294

295
	io_uring_prep_read(sqe, ring_eventfd, &ring_eventfd_readbuf, sizeof(ring_eventfd_readbuf), 0);
296
	io_uring_sqe_set_data(sqe, this);
297

298
	// The sqe we prepared will be submitted to io_uring when the globalCompleter starts.
299
300
301
302
}

IoContext::~IoContext() {
	io_uring_queue_exit(&ring);
303
304
	// TODO: check if this is safe
	close(ring_eventfd);
305
306
307
308
}

auto IoContext::connect(int socket, const struct sockaddr *address, socklen_t address_len)
		-> std::unique_ptr<Future> {
309
	auto future = std::make_unique<ConnectFuture>(socket, address, address_len);
310
	submit(*future);
311
312
313
314
315
	return future;
}

auto IoContext::accept(int socket, struct sockaddr *address, socklen_t *address_len)
		-> std::unique_ptr<Future> {
316
	auto future = std::make_unique<AcceptFuture>(socket, address, address_len);
317
	submit(*future);
318
319
320
321
322
	return future;
}

auto IoContext::recv(int socket, void *buffer, size_t length, int flags)
		-> std::unique_ptr<Future> {
323
	auto future = std::make_unique<RecvFuture>(socket, buffer, length, flags);
324
	submit(*future);
325
326
327
	return future;
}

328
auto IoContext::send(int socket, const void *buffer, size_t length, int flags)
329
		-> std::unique_ptr<Future> {
330
331
	auto future = std::make_unique<SendFuture>(socket, buffer, length, flags);
	submit(*future);
332
333
334
	return future;
}

335
auto IoContext::readFile(int fildes, void *buf, size_t nbyte, off_t offset)
336
		-> std::unique_ptr<Future> {
337
	auto future = std::make_unique<ReadFuture>(fildes, buf, nbyte, offset);
338
	submit(*future);
339
340
341
	return future;
}

342
auto IoContext::writeFile(int fildes, const void *buf, size_t nbyte, off_t offset)
343
		-> std::unique_ptr<Future> {
344
	auto future = std::make_unique<WriteFuture>(fildes, buf, nbyte, offset);
345
346
347
348
349
350
	submit(*future);
	return future;
}

auto IoContext::close(int fildes) -> std::unique_ptr<Future> {
	auto future = std::make_unique<CloseFuture>(fildes);
351
	submit(*future);
352
353
354
	return future;
}

355
auto IoContext::connectAndWait(int socket, const struct sockaddr *address, socklen_t address_len)
356
		-> int {
357
	ConnectFuture future(socket, address, address_len);
358
	submit(future);
359
	return future.waitAndSetErrno();
360
361
}

362
auto IoContext::acceptAndWait(int socket, struct sockaddr *address, socklen_t *address_len) -> int {
363
	AcceptFuture future(socket, address, address_len);
364
	submit(future);
365
	return future.waitAndSetErrno();
366
367
}

368
auto IoContext::recvAndWait(int socket, void *buffer, size_t length, int flags) -> ssize_t {
369
370
371
	if constexpr (emper::IO_TRY_SYSCALL) {
		ssize_t res = ::recv(socket, buffer, length, flags | MSG_DONTWAIT);
		if (res != -1 || errno != EAGAIN) {
372
			stats.record_completion(Operation::RECV, res, length, true);
373
374
375
376
			return res;
		}
	}

377
	RecvFuture future(socket, buffer, length, flags);
378
	submit(future);
379
	return future.waitAndSetErrno();
380
381
}

382
auto IoContext::sendAndWait(int socket, const void *buffer, size_t length, int flags) -> ssize_t {
383
384
385
	void *mut_buf = const_cast<void *>(buffer);
	if constexpr (emper::IO_TRY_SYSCALL) {
		ssize_t res = ::send(socket, mut_buf, length, flags | MSG_DONTWAIT);
386
387
		if (res != -1 || errno != EAGAIN) {
			stats.record_completion(Operation::SEND, res, length, true);
388
389
390
391
			return res;
		}
	}

392
	SendFuture future(socket, mut_buf, length, flags);
393
	submit(future);
394
	return future.waitAndSetErrno();
395
396
}

397
398
auto IoContext::readFileAndWait(int fildes, void *buf, size_t nbyte, off_t offset) -> ssize_t {
	ReadFuture future(fildes, buf, nbyte, offset);
399
	submit(future);
400
	return future.waitAndSetErrno();
401
402
}

403
404
auto IoContext::writeFileAndWait(int fildes, const void *buf, size_t nbyte, off_t offset)
		-> ssize_t {
405
	void *mut_buf = const_cast<void *>(buf);
406
	WriteFuture future(fildes, mut_buf, nbyte, offset);
407
	submit(future);
408
	return future.waitAndSetErrno();
409
410
411
412
}

auto IoContext::closeAndWait(int fildes) -> ssize_t {
	CloseFuture future(fildes);
413
	submit(future);
414
	return future.waitAndSetErrno();
415
416
}
}	 // namespace emper::io