Commit 8dca1494 authored by Florian Fischer's avatar Florian Fischer
Browse files

add io_uring based client

parent c5d55623
---
BasedOnStyle: Google
ColumnLimit: 100
TabWidth: 2
UseTab: Always
Checks: >
bugprone-*,
cert-*,
linuxkernel-*,
modernize-*,
performance-*,
portability-*,
readability-*,
-bugprone-easily-swappable-parameters,
-cert-err58-cpp,
-clang-diagnostic-empty-translation-unit,
-readability-braces-around-statements,
-readability-function-cognitive-complexity,
-readability-implicit-bool-conversion,
-readability-isolate-declaration,
-readability-magic-numbers,
WarningsAsErrors: >
bugprone-*,
modernize-*,
clang-*,
readability-*,
performance-*,
HeaderFilterRegex: '(?!subprojects).*'
[*.{c,h,cpp,hpp}]
indent_style = tab
indent_size = 2
[meson.build]
indent_style = space
indent_size = 2
// Copyright Florian Fischer 2021
#include <liburing.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <atomic>
#include <cassert>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <latch>
#include <thread>
#include <vector>
#define DIE_MSG(...) \
do { \
std::cout << __VA_ARGS__ << std::endl; \
abort(); \
} while (0);
#define DIE_MSG_ERRNO(...) DIE_MSG(__VA_ARGS__ << ": " << strerror(errno))
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
static const int DECIMAL = 10;
static auto getFirstAddrInfo(const std::string& host, const std::string& port) -> struct addrinfo* {
struct addrinfo* addrinfo;
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &addrinfo);
if (err) {
if (err == EAI_SYSTEM) {
DIE_MSG_ERRNO("getaddrinfo failed");
} else {
DIE_MSG("error in getaddrinfo: " << gai_strerror(err));
}
}
return addrinfo;
}
class Connection;
class Worker;
class Client {
friend class Worker;
friend class Connection;
const unsigned iterations;
Worker** workers;
const unsigned workerCount;
struct addrinfo* server;
const unsigned echoSize;
static const unsigned DEFAULT_ECHO_SIZE = 32;
std::atomic<bool> quit = false;
std::latch connectBarrier;
public:
Client(const std::string& host, const std::string& port, unsigned connections, unsigned time,
unsigned iterations, unsigned workerCount = std::thread::hardware_concurrency(),
unsigned echoSize = DEFAULT_ECHO_SIZE);
~Client();
};
class Worker {
friend class Client;
friend class Connection;
Client& client;
unsigned id;
unsigned finished = 0;
unsigned connectionCount;
Connection** connections;
std::thread thread;
struct io_uring ring;
public:
Worker(Client& client, unsigned id, unsigned connectionCount);
~Worker();
auto getEchos() -> unsigned;
};
class Connection {
friend class Worker;
enum class State {
connecting,
sending,
receiving,
};
State state = State::connecting;
Worker& worker;
int sock;
char* buf;
unsigned echos = 0;
void prepSend();
void onConnect(int ret);
void onSend(int ret);
auto onRecv(int ret) -> bool;
public:
struct addrinfo* server;
Connection(Worker& worker);
~Connection();
auto advance(struct io_uring_cqe* cqe) -> bool;
};
Client::Client(const std::string& host, const std::string& port, unsigned connections,
unsigned time, unsigned iterations, unsigned _workerCount, unsigned echoSize)
: iterations(iterations),
workerCount(std::min(_workerCount, connections)),
echoSize(echoSize),
connectBarrier(workerCount + 1) {
std::cout << "[global]" << std::endl;
std::cout << "clients = " << connections << std::endl;
std::cout << "size = " << echoSize << std::endl;
std::cout << "time = " << time << std::endl;
std::cout << "iterations = " << iterations << std::endl;
server = getFirstAddrInfo(host, port);
workers = new Worker*[workerCount];
for (unsigned i = 0; i < workerCount; ++i) {
unsigned workerConnections = connections / workerCount;
// TODO: improve distribution of the leftover connections
if (i == 0) {
unsigned leftOverConnections = connections % workerCount;
if (leftOverConnections) workerConnections += leftOverConnections;
}
workers[i] = new Worker(*this, i, workerConnections);
}
connectBarrier.arrive_and_wait();
if (time) {
std::jthread([this, time] {
sleep(time);
this->quit = true;
});
}
}
Client::~Client() {
unsigned echos = 0;
for (unsigned i = 0; i < workerCount; ++i) {
Worker* worker = workers[i];
worker->thread.join();
echos += worker->getEchos();
delete worker;
}
std::cout << "total_iterations = " << echos << std::endl;
delete[] workers;
}
Worker::Worker(Client& client, unsigned id, unsigned connectionCount)
: client(client), id(id), connectionCount(connectionCount) {
thread = std::thread([this] {
struct io_uring_cqe* cqe;
unsigned head, count = 0;
int ret = io_uring_queue_init(this->connectionCount, &this->ring, 0);
if (ret < 0) DIE_MSG_ERRNO("io_uring_queue_init failed");
this->connections = new Connection*[this->connectionCount];
for (unsigned i = 0; i < this->connectionCount; ++i) {
this->connections[i] = new Connection(*this);
}
ret = io_uring_submit_and_wait(&this->ring, 1);
if (ret < 0) DIE_MSG_ERRNO("io_uring_submit_and_wait failed");
unsigned connected = 0;
while (connected < this->connectionCount) {
count = 0;
io_uring_for_each_cqe(&this->ring, head, cqe) {
if (unlikely(cqe->res)) DIE_MSG("connect failed: " << strerror(-cqe->res));
++count;
}
io_uring_cq_advance(&this->ring, count);
connected += count;
}
this->client.connectBarrier.arrive_and_wait();
for (unsigned i = 0; i < this->connectionCount; ++i) this->connections[i]->prepSend();
while (!this->client.quit && this->finished != this->connectionCount) {
ret = io_uring_submit_and_wait(&this->ring, 1);
if (ret < 0) DIE_MSG_ERRNO("io_uring_submit_and_wait failed");
count = 0;
io_uring_for_each_cqe(&this->ring, head, cqe) {
// NOLINTNEXTLINE(performance-no-int-to-ptr)
auto* connection = reinterpret_cast<Connection*>(cqe->user_data);
count++;
if (connection->advance(cqe)) ++finished;
}
io_uring_cq_advance(&this->ring, count);
}
});
}
Worker::~Worker() {
for (unsigned i = 0; i < connectionCount; ++i) {
delete connections[i];
}
delete[] connections;
io_uring_queue_exit(&ring);
}
auto Worker::getEchos() -> unsigned {
unsigned echos = 0;
for (unsigned i = 0; i < connectionCount; ++i)
echos += connections[i]->echos;
return echos;
}
Connection::Connection(Worker& worker) : worker(worker) {
buf = new char[worker.client.echoSize];
struct addrinfo* server = worker.client.server;
sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
if (sock < 0) DIE_MSG_ERRNO("creating socket failed");
struct io_uring_sqe* sqe = io_uring_get_sqe(&worker.ring);
assert(sqe);
// All initial connect requests are submitted in teh Worker constructor
io_uring_prep_connect(sqe, sock, server->ai_addr, server->ai_addrlen);
io_uring_sqe_set_data(sqe, this);
}
Connection::~Connection() {
close(sock);
delete[] buf;
}
void Connection::prepSend() {
state = State::sending;
struct io_uring_sqe* sqe = io_uring_get_sqe(&worker.ring);
assert(sqe);
io_uring_prep_send(sqe, sock, buf, worker.client.echoSize, MSG_NOSIGNAL);
io_uring_sqe_set_data(sqe, this);
}
void Connection::onConnect(int ret) {
if (unlikely(ret < 0)) DIE_MSG("connect failed: " << strerror(-ret));
prepSend();
}
void Connection::onSend(int ret) {
if (unlikely(ret < 0)) DIE_MSG("send failed: " << strerror(-ret));
state = State::receiving;
struct io_uring_sqe* sqe = io_uring_get_sqe(&worker.ring);
assert(sqe);
sprintf(buf, "%u:%u", worker.id, echos);
io_uring_prep_recv(sqe, sock, buf, worker.client.echoSize, MSG_WAITALL);
io_uring_sqe_set_data(sqe, this);
}
auto Connection::onRecv(int ret) -> bool {
if (unlikely(ret <= 0)) {
if (ret == 0) DIE_MSG("socket closed by server");
DIE_MSG("recv failed: " << strerror(-ret));
}
++echos;
if (echos == worker.client.iterations) return true;
prepSend();
return false;
}
auto Connection::advance(struct io_uring_cqe* cqe) -> bool {
switch (state) {
case State::connecting:
onConnect(cqe->res);
return false;
case State::sending:
onSend(cqe->res);
return false;
case State::receiving:
return onRecv(cqe->res);
default:
DIE_MSG("unhandeld connection state");
}
}
static void printUsage(const char* progname) {
std::cout << progname << " - io_uring base tcp echo client" << std::endl;
std::cout << progname << " [options]" << std::endl;
std::cout << "\t-h | --help: print this help message" << std::endl;
std::cout << "\t-a | --host: host to connect to" << std::endl;
std::cout << "\t-p | --port: port to connect to" << std::endl;
std::cout << "\t-i | --iterations: echos issued per connection" << std::endl;
std::cout << "\t-c | --connections: concurrent connections" << std::endl;
std::cout << "\t-t | --time: time the client should send echos" << std::endl;
}
auto main(int argc, char* argv[]) -> int {
std::string host = "localhost";
std::string port = "12345";
unsigned connections = 500;
unsigned iterations = 0;
unsigned time = 0;
for (int i = 1; i < argc; ++i) {
if (strncmp(argv[i], "-h", strlen("-h")) == 0 ||
strncmp(argv[i], "--help", strlen("--help")) == 0) {
printUsage(argv[0]);
return EXIT_SUCCESS;
}
if (strncmp(argv[i], "-a", strlen("-a")) == 0 ||
strncmp(argv[i], "--host", strlen("--host")) == 0) {
host = std::string(argv[i + 1]);
++i;
} else if (strncmp(argv[i], "-p", strlen("-p")) == 0 ||
strncmp(argv[i], "--port", strlen("--port")) == 0) {
port = std::string(argv[i + 1]);
++i;
} else if (strncmp(argv[i], "-i", strlen("-i")) == 0 ||
strncmp(argv[i], "--iterations", strlen("--iterations")) == 0) {
iterations = std::strtol(argv[i + 1], nullptr, DECIMAL);
++i;
} else if (strncmp(argv[i], "-c", strlen("-c")) == 0 ||
strncmp(argv[i], "--connections", strlen("--connections")) == 0) {
connections = std::strtol(argv[i + 1], nullptr, DECIMAL);
++i;
} else if (strncmp(argv[i], "-t", strlen("-t")) == 0 ||
strncmp(argv[i], "--time", strlen("--time")) == 0) {
time = std::strtol(argv[i + 1], nullptr, DECIMAL);
++i;
}
}
if (time && iterations) DIE_MSG("time and iteration are mutual exclusive termination conditions");
if (!time && !iterations) time = 60;
std::cout << "echo client connecting to " << host << ":" << port << std::endl;
Client client(host, port, connections, time, iterations);
return EXIT_SUCCESS;
}
build/compile_commands.json
\ No newline at end of file
project('io_uring_echo_client', 'cpp',
version: '0.0.1',
default_options : [
'warning_level=3',
'c_std=gnu11',
'cpp_std=c++2a',
'b_ndebug=if-release',
'werror=true',
]
)
thread_dep = dependency('threads')
liburing_version = '2.0'
uring_dep = dependency('liburing', version: liburing_version, required: false)
if not uring_dep.found()
message('liburing ' + liburing_version + ' not found in system, using liburing subproject')
liburing_sp = subproject('liburing')
uring_dep = liburing_sp.get_variable('uring').as_system()
endif
dependencies = [thread_dep, uring_dep]
client = executable(
'client',
'client.cpp',
dependencies: dependencies,
)
[wrap-file]
directory = liburing-liburing-2.1
source_url = https://github.com/axboe/liburing/archive/refs/tags/liburing-2.1.tar.gz
source_filename = liburing-2.1.tar.gz
source_hash = f1e0500cb3934b0b61c5020c3999a973c9c93b618faff1eba75aadc95bb03e07
patch_filename = liburing_2.1-1_patch.zip
patch_url = https://wrapdb.mesonbuild.com/v2/liburing_2.1-1/get_patch
patch_hash = aaeb2be1f5eacf4f41e0537aa02154486e0729a3395d2e832de7657ab0b56290
[provide]
uring = uring
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment