Commit 66b9869a authored by Florian Fischer's avatar Florian Fischer
Browse files

clients/io_uring: add result reporting compatible with emper's EchoClient

parent 8497fe27
Pipeline #72825 failed with stage
in 2 minutes and 2 seconds
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#pragma once
#include <cassert>
#include <cstdint>
#include <iosfwd>
#include <string>
#include <type_traits>
// Template magic, thanks to Tobias Langer for help with this.
template <typename Type, typename Enable = void>
struct make_signed_cond;
template <typename Type>
struct make_signed_cond<Type, typename std::enable_if<std::is_floating_point<Type>::value>::type> {
using type = Type;
};
template <typename Type>
struct make_signed_cond<Type, typename std::enable_if<!std::is_floating_point<Type>::value>::type> {
using type = typename std::make_signed<Type>::type;
};
// Source of the algorithm:
// https://math.stackexchange.com/questions/106700/incremental-averageing
template <typename average_type, typename counter_type, typename value_type>
void calcRunningAvg(average_type& runningAverage, counter_type count, value_type value) {
using signed_average_type = typename make_signed_cond<average_type>::type;
using signed_value_type = typename make_signed_cond<value_type>::type;
signed_average_type diff =
((signed_value_type)value - (signed_average_type)runningAverage) / (signed_average_type)count;
assert(diff >= 0 || (value_type)-diff <= (value_type)runningAverage);
runningAverage += diff;
}
template <typename average_type = double, typename counter_type = uint32_t>
class RunningAverage {
private:
counter_type counter;
average_type average;
public:
RunningAverage() : counter(0), average(0){};
template <typename value_type>
void update(value_type value) {
counter++;
calcRunningAvg(average, counter, value);
}
[[nodiscard]] auto getCounter() const -> counter_type { return counter; }
[[nodiscard]] auto getAverage() const -> average_type { return average; }
friend auto operator<<(std::ostream& strm, const RunningAverage& avg) -> std::ostream& {
strm << std::to_string(avg.average) << " [" << std::to_string(avg.counter) << "]";
return strm;
}
};
......@@ -14,10 +14,14 @@
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>
#include <thread>
#include "RunningAverage.hpp"
#define DIE_MSG(...) \
do { \
std::cout << __VA_ARGS__ << std::endl; \
......@@ -29,6 +33,9 @@
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
using std::chrono::duration_cast;
using std::chrono::high_resolution_clock;
using std::chrono::nanoseconds;
using std::chrono::seconds;
static const int DECIMAL = 10;
......@@ -54,6 +61,7 @@ class Client {
friend class Worker;
friend class Connection;
const unsigned connections;
const unsigned iterations;
Worker** workers;
const unsigned workerCount;
......@@ -67,11 +75,18 @@ class Client {
pthread_barrier_t connectBarrier;
high_resolution_clock::time_point start;
high_resolution_clock::time_point echoStart;
high_resolution_clock::time_point end;
public:
Client(const std::string& host, const std::string& port, unsigned connections, unsigned time,
unsigned iterations, unsigned workerCount = std::thread::hardware_concurrency(),
unsigned echoSize = DEFAULT_ECHO_SIZE);
~Client();
void printStats(std::ostream& out);
void join();
};
class Worker {
......@@ -94,6 +109,7 @@ class Worker {
};
class Connection {
friend class Client;
friend class Worker;
enum class State {
......@@ -107,6 +123,11 @@ class Connection {
int sock;
char* buf;
unsigned echos = 0;
unsigned unexpectedEchos = 0;
unsigned reconnects = 0;
RunningAverage<double> echoDurationAvg;
high_resolution_clock::time_point echoStart;
high_resolution_clock::time_point echoLoopEnd;
void prepSend();
void onConnect(int ret);
......@@ -124,17 +145,17 @@ class Connection {
Client::Client(const std::string& host, const std::string& port, unsigned connections,
unsigned time, unsigned iterations, unsigned _workerCount, unsigned echoSize)
: iterations(iterations), workerCount(std::min(_workerCount, connections)), echoSize(echoSize) {
std::cout << "[global]" << std::endl;
std::cout << "clients = " << connections << std::endl;
std::cout << "size = " << echoSize << std::endl;
std::cout << "time = " << time << std::endl;
std::cout << "iterations = " << iterations << std::endl;
: connections(connections),
iterations(iterations),
workerCount(std::min(_workerCount, connections)),
echoSize(echoSize) {
server = getFirstAddrInfo(host, port);
int res = pthread_barrier_init(&connectBarrier, nullptr, workerCount + 1);
if (res) DIE_MSG_ERRNO("pthread_barrier_init failed");
start = high_resolution_clock::now();
workers = new Worker*[workerCount];
for (unsigned i = 0; i < workerCount; ++i) {
unsigned workerConnections = connections / workerCount;
......@@ -148,6 +169,7 @@ Client::Client(const std::string& host, const std::string& port, unsigned connec
}
pthread_barrier_wait(&connectBarrier);
echoStart = high_resolution_clock::now();
if (time) {
std::jthread([this, time] {
......@@ -158,17 +180,64 @@ Client::Client(const std::string& host, const std::string& port, unsigned connec
}
Client::~Client() {
for (unsigned i = 0; i < workerCount; ++i) delete workers[i];
delete[] workers;
pthread_barrier_destroy(&connectBarrier);
}
void Client::join() {
for (unsigned i = 0; i < workerCount; ++i) {
workers[i]->thread.join();
}
end = high_resolution_clock::now();
}
void Client::printStats(std::ostream& out) {
unsigned echos = 0;
unsigned reconnects = 0;
unsigned unexpectedEchos = 0;
RunningAverage<double> echoDurationAvg;
std::stringstream clientData;
for (unsigned i = 0; i < workerCount; ++i) {
Worker* worker = workers[i];
worker->thread.join();
echos += worker->getEchos();
delete worker;
auto* worker = workers[i];
for (unsigned j = 0; j < worker->connectionCount; ++j) {
auto* connection = worker->connections[j];
echos += connection->echos;
reconnects += connection->reconnects;
unexpectedEchos += connection->unexpectedEchos;
echoDurationAvg.update(connection->echoDurationAvg.getAverage());
clientData << " " << connection->echos;
clientData << "," << connection->echoDurationAvg.getAverage();
clientData << "," << connection->reconnects;
clientData << "," << connection->unexpectedEchos;
clientData << ","
<< duration_cast<nanoseconds>(connection->echoLoopEnd.time_since_epoch()).count();
clientData << std::endl;
}
}
std::cout << "total_iterations = " << echos << std::endl;
delete[] workers;
pthread_barrier_destroy(&connectBarrier);
out << "[global]" << std::endl;
out << "clients = " << connections << std::endl;
out << "size = " << echoSize << std::endl;
out << "total_iterations = " << echos << std::endl;
out << "total_reconnects = " << reconnects << std::endl;
out << "total_unexpected_echos = " << unexpectedEchos << std::endl;
out << "avg_ns = " << echoDurationAvg.getAverage() << std::endl;
out << "connect_duration = " << duration_cast<nanoseconds>(echoStart - start).count()
<< std::endl;
out << "echo_duration = " << duration_cast<nanoseconds>(end - echoStart).count() << std::endl;
out << "total_duration = " << duration_cast<nanoseconds>(end - start).count() << std::endl;
out << std::endl;
out << "[clients]" << std::endl;
out << "csv =" << std::endl;
out << " iterations,avg_ns,reconnects,unexpected_echos,echoLoopEnd" << std::endl;
out << clientData.str();
}
Worker::Worker(Client& client, unsigned id, unsigned connectionCount)
......@@ -230,7 +299,6 @@ Worker::~Worker() {
auto Worker::getEchos() -> unsigned {
unsigned echos = 0;
for (unsigned i = 0; i < connectionCount; ++i) echos += connections[i]->echos;
return echos;
}
......@@ -263,6 +331,8 @@ void Connection::prepSend() {
io_uring_prep_send(sqe, sock, buf, worker.client.echoSize, MSG_NOSIGNAL);
io_uring_sqe_set_data(sqe, this);
echoStart = high_resolution_clock::now();
}
void Connection::onConnect(int ret) {
......@@ -292,8 +362,14 @@ auto Connection::onRecv(int ret) -> bool {
DIE_MSG("recv failed: " << strerror(-ret));
}
echoDurationAvg.update(
duration_cast<nanoseconds>(high_resolution_clock::now() - echoStart).count());
++echos;
if (echos == worker.client.iterations) return true;
if (echos == worker.client.iterations) {
echoLoopEnd = high_resolution_clock::now();
return true;
}
prepSend();
return false;
......@@ -323,14 +399,16 @@ static void printUsage(const char* progname) {
std::cout << "\t-i | --iterations: echos issued per connection" << std::endl;
std::cout << "\t-c | --connections: concurrent connections" << std::endl;
std::cout << "\t-t | --time: time the client should send echos" << std::endl;
std::cout << "\t-f | --output: file to write results to" << std::endl;
}
auto main(int argc, char* argv[]) -> int {
std::string host = "localhost";
std::string host = "0.0.0.0";
std::string port = "12345";
unsigned connections = 500;
unsigned iterations = 0;
unsigned time = 0;
char* outfile = nullptr;
for (int i = 1; i < argc; ++i) {
if (strncmp(argv[i], "-h", strlen("-h")) == 0 ||
......@@ -359,6 +437,10 @@ auto main(int argc, char* argv[]) -> int {
strncmp(argv[i], "--time", strlen("--time")) == 0) {
time = std::strtol(argv[i + 1], nullptr, DECIMAL);
++i;
} else if (strncmp(argv[i], "-f", strlen("-f")) == 0 ||
strncmp(argv[i], "--output", strlen("--output")) == 0) {
outfile = argv[i + 1];
++i;
}
}
......@@ -368,5 +450,13 @@ auto main(int argc, char* argv[]) -> int {
std::cout << "echo client connecting to " << host << ":" << port << std::endl;
Client client(host, port, connections, time, iterations);
client.join();
if (outfile) {
std::fstream fs(outfile, std::fstream::out);
client.printStats(fs);
} else
client.printStats(std::cout);
return EXIT_SUCCESS;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment