Commit 96f291b7 authored by Florian Fischer's avatar Florian Fischer
Browse files

clients/io_uring: limit concurrent connect requests

To not flood a servers listen socket we limit the amount of
concurrent connect requests to a configurable backlog value.
By default backlog is 4096.
parent 20af5d26
Pipeline #72855 passed with stage
in 1 minute and 15 seconds
......@@ -81,7 +81,8 @@ class Client {
public:
Client(const std::string& host, const std::string& port, unsigned connections, unsigned time,
unsigned iterations, unsigned workerCount = std::thread::hardware_concurrency(),
unsigned iterations, unsigned backlog,
unsigned workerCount = std::thread::hardware_concurrency(),
unsigned echoSize = DEFAULT_ECHO_SIZE);
~Client();
......@@ -102,7 +103,7 @@ class Worker {
struct io_uring ring;
public:
Worker(Client& client, unsigned id, unsigned connectionCount);
Worker(Client& client, unsigned id, unsigned connectionCount, unsigned concurrentWorkerConnects);
~Worker();
};
......@@ -136,8 +137,6 @@ class Connection {
auto onRecv(int ret) -> bool;
public:
struct addrinfo* server;
Connection(Worker& worker);
~Connection();
......@@ -145,7 +144,8 @@ class Connection {
};
Client::Client(const std::string& host, const std::string& port, unsigned connections,
unsigned time, unsigned iterations, unsigned _workerCount, unsigned echoSize)
unsigned time, unsigned iterations, unsigned backlog, unsigned _workerCount,
unsigned echoSize)
: connections(connections),
iterations(iterations),
workerCount(std::min(_workerCount, connections)),
......@@ -160,13 +160,16 @@ Client::Client(const std::string& host, const std::string& port, unsigned connec
workers = new Worker*[workerCount];
for (unsigned i = 0; i < workerCount; ++i) {
unsigned workerConnections = connections / workerCount;
unsigned concurrentWorkerConnects = backlog / workerCount;
// TODO: improve distribution of the leftover connections
if (i == 0) {
unsigned leftOverConnections = connections % workerCount;
const unsigned leftOverConnections = connections % workerCount;
if (leftOverConnections) workerConnections += leftOverConnections;
const unsigned leftOverBacklog = backlog % workerCount;
if (leftOverBacklog) concurrentWorkerConnects += leftOverBacklog;
}
workers[i] = new Worker(*this, i, workerConnections);
workers[i] = new Worker(*this, i, workerConnections, concurrentWorkerConnects);
}
pthread_barrier_wait(&connectBarrier);
......@@ -241,9 +244,10 @@ void Client::printStats(std::ostream& out) {
out << clientData.str();
}
Worker::Worker(Client& client, unsigned id, unsigned connectionCount)
Worker::Worker(Client& client, unsigned id, unsigned connectionCount,
unsigned concurrentWorkerConnects)
: client(client), id(id), connectionCount(connectionCount) {
thread = std::thread([this] {
thread = std::thread([this, concurrentWorkerConnects] {
struct io_uring_cqe* cqe;
unsigned head, count = 0;
......@@ -255,17 +259,32 @@ Worker::Worker(Client& client, unsigned id, unsigned connectionCount)
this->connections[i] = new Connection(*this);
}
ret = io_uring_submit_and_wait(&this->ring, 1);
if (ret < 0) DIE_MSG_ERRNO("io_uring_submit_and_wait failed");
unsigned connected = 0;
unsigned prepared = 0;
unsigned inflight = 0;
while (connected < this->connectionCount) {
count = 0;
// Prepare as much concurrent connects as allowed
while (inflight < concurrentWorkerConnects && prepared < this->connectionCount) {
this->connections[prepared]->prepConnect();
++prepared;
++inflight;
}
// Submit the prepared connect requests
ret = io_uring_submit_and_wait(&this->ring, 1);
if (ret < 0) DIE_MSG_ERRNO("io_uring_submit_and_wait failed");
// Reap all already connected requests
io_uring_for_each_cqe(&this->ring, head, cqe) {
if (unlikely(cqe->res)) DIE_MSG("connect failed: " << strerror(-cqe->res));
++count;
}
io_uring_cq_advance(&this->ring, count);
assert(inflight >= count);
inflight -= count;
connected += count;
}
......@@ -302,7 +321,6 @@ Connection::Connection(Worker& worker) : worker(worker) {
buf = new char[worker.client.echoSize];
newSocket();
prepConnect();
}
Connection::~Connection() {
......@@ -425,6 +443,7 @@ static void printUsage(const char* progname) {
std::cout << "\t-c | --connections: concurrent connections" << std::endl;
std::cout << "\t-t | --time: time the client should send echos" << std::endl;
std::cout << "\t-f | --output: file to write results to" << std::endl;
std::cout << "\t-b | --backlog: max concurrent connect attempts" << std::endl;
}
auto main(int argc, char* argv[]) -> int {
......@@ -433,6 +452,7 @@ auto main(int argc, char* argv[]) -> int {
unsigned connections = 500;
unsigned iterations = 0;
unsigned time = 0;
unsigned backlog = 4096;
char* outfile = nullptr;
for (int i = 1; i < argc; ++i) {
......@@ -466,6 +486,10 @@ auto main(int argc, char* argv[]) -> int {
strncmp(argv[i], "--output", strlen("--output")) == 0) {
outfile = argv[i + 1];
++i;
} else if (strncmp(argv[i], "-b", strlen("-b")) == 0 ||
strncmp(argv[i], "--backlog", strlen("--backlog")) == 0) {
backlog = std::strtol(argv[i + 1], nullptr, DECIMAL);
++i;
}
}
......@@ -474,7 +498,7 @@ auto main(int argc, char* argv[]) -> int {
if (!time && !iterations) time = 60;
std::cout << "echo client connecting to " << host << ":" << port << std::endl;
Client client(host, port, connections, time, iterations);
Client client(host, port, connections, time, iterations, backlog);
client.join();
if (outfile) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment