Newer
Older
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <netdb.h> //for getaddrinfo
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h> // for shutdown, socket, AF_INET
#include <sys/stat.h>
#include <unistd.h>
#include <algorithm> // for find
#include <cerrno> // for errno, ECANCELED
#include <chrono> // for nanoseconds, duration, durat...
#include <cstdint> // for uint64_t, int32_t
#include <cstdio>
#include <cstdlib> // for size_t, strtol, exit, EXIT_F...
#include <cstring> // for memcmp
#include <iomanip>
#include <iostream> // for operator<<, basic_ostream, endl
#include <string> // for allocator, string, char_traits
#include <utility>
#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli...
#include "CountingPrivateSemaphore.hpp" // for CPS
#include "Debug.hpp" // for LOGE
#include "Fiber.hpp" // for Fiber
#include "Runtime.hpp" // for Runtime
#include "Semaphore.hpp" // for Semaphore
#include "emper.hpp" // for spawn
#include "io.hpp" // for connectAndWait
#include "io/Future.hpp" // for CloseFuture, RecvFuture, Sen...
#include "lib/math.hpp"
using emper::Semaphore;
using emper::io::CloseFuture;
using emper::io::RecvFuture;
using emper::io::SendFuture;
using emper::lib::math::RunningAverage;
using std::chrono::duration_cast;
using std::chrono::high_resolution_clock;
using std::chrono::nanoseconds;
// Defaults
const int DECIMAL = 10;
const std::string HOST = "0.0.0.0";
const std::string PORT = "12345";
const size_t ITERATIONS = 10000;
const size_t CLIENTS = 10000;
const size_t SIZE = 32;
const size_t SERVER_BACKLOG = 1024;
const seconds ONERROR_SLEEP(10);
const int OPENFLAGS = O_CREAT | O_WRONLY;
const int OPENMODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
// Globals
std::string host = HOST;
std::string port = PORT;
size_t iterations = ITERATIONS;
size_t execution_seconds = 0;
size_t nclients = CLIENTS;
size_t size = SIZE;
size_t server_backlog = SERVER_BACKLOG;
bool sendQuit = true;
std::atomic<bool> terminate = false;
struct addrinfo* server;
struct TimeStamps {
high_resolution_clock::time_point start;
high_resolution_clock::time_point afterSend;
high_resolution_clock::time_point afterSendDispatch;
high_resolution_clock::time_point afterRecv;
};
class Client {
using IterationResult = std::pair<int, std::string>;
size_t id;
Semaphore& readySem;
Semaphore& startSem;
CPS& cps;
int sock;
bool initialized = false;
char* inBuf;
char* outBuf;
public:
static std::atomic<size_t> client_ids;
// results
size_t iteration = 0;
RunningAverage<double> avg_ns;
size_t unexpectedEchos = 0;
size_t reconnects = 0;
struct TimeStamps* timeStamps = nullptr;

Florian Fischer
committed
high_resolution_clock::time_point echoStart;
high_resolution_clock::time_point echoEnd;
high_resolution_clock::time_point echoLoopEnd;
Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps, bool collectTimeStamps)
: id(id), readySem(readySem), startSem(startSem), cps(cps) {
outBuf = new char[size];
inBuf = new char[size];
if (collectTimeStamps) {
timeStamps = new TimeStamps[iterations];
}
}
~Client() {
delete[] outBuf;
delete[] inBuf;
delete[] timeStamps;
static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool;

Florian Fischer
committed
[[nodiscard]] auto shouldTerminate() const -> bool {

Florian Fischer
committed
return iteration >= iterations || terminate.load(std::memory_order_relaxed);
}

Florian Fischer
committed
void print_unexpected_echo_msg() const {
std::stringstream sst;
sst << "Client " << id << " got unexpected echo from server" << std::endl;
sst << "expected: ";
for (unsigned i = 0; i < size; ++i) {
sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]);

Florian Fischer
committed
sst << std::endl;

Florian Fischer
committed
sst << "received: ";
for (unsigned i = 0; i < size; ++i) {
sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]);
sst << (unsigned)inBuf[i] << " ";

Florian Fischer
committed
sst << std::endl;
LOGE(sst.str());

Florian Fischer
committed
void echoError(int err, std::string&& msg) const {
std::stringstream sst;
sst << "Client " << id << " failed at iteration " << iteration << " with " << msg;
if (err != 0) {
sst << ": " << strerror(err);

Florian Fischer
committed
sst << std::endl;
LOGE(sst.str());

Florian Fischer
committed
std::this_thread::sleep_for(ONERROR_SLEEP);
DIE;
}

Florian Fischer
committed
void onCONNRESET() {
LOGW("Client " << id << " reconnecting");
close(sock);
// reconnect to try again
connect();
void connect() {
sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
if (sock < 0) {
DIE_MSG_ERRNO("creating new client socket failed");
}
int enable = 1;
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) == -1) {
DIE_MSG_ERRNO("setsockopt failed");
}
int err = emper::io::connectAndWait(sock, server->ai_addr, server->ai_addrlen);
if (err) {
DIE_MSG_ERRNO("connecting new client socket failed");
}
if (!initialized) {
// We are connected -> start next client
Client::startNew(readySem, startSem, cps);
initialized = true;
reconnects++;
}

Florian Fischer
committed
enum class CollectTimeStamps { yes, no };

Florian Fischer
committed
enum class LinkedFutures { yes, no };

Florian Fischer
committed
template <CollectTimeStamps collectTimeStampsSwitch, LinkedFutures linkedFuturesSwitch>
void _run() {
constexpr bool collectTimeStamps = collectTimeStampsSwitch == CollectTimeStamps::yes;
constexpr bool linkedFutures = linkedFuturesSwitch == LinkedFutures::yes;
// signal that this client is initialized and connected
readySem.release();
// wait for all clients to be ready
startSem.acquire();

Florian Fischer
committed
while (!shouldTerminate()) {
SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL);
RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL);
// prepare output buf
sprintf(outBuf, "%lu:%lu", id, iteration);

Florian Fischer
committed
echoStart = high_resolution_clock::now();
if constexpr (collectTimeStamps) {
timeStamps[iteration].start = echoStart;

Florian Fischer
committed
if constexpr (!linkedFutures) {
sendFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
if (shouldTerminate()) {
break;
}
int32_t bytes_send = sendFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = high_resolution_clock::now();
}
if (unlikely(bytes_send < 0)) {
int err = -bytes_send;
if (err == ECONNRESET) {
onCONNRESET();
continue;
}
echoError(err, "send failed");
}

Florian Fischer
committed
recvFuture.setDependency(sendFuture);

Florian Fischer
committed
recvFuture.submit();
if (shouldTerminate()) {
break;
}
int32_t bytes_recv = recvFuture.wait();
echoEnd = high_resolution_clock::now();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterRecv = echoEnd;

Florian Fischer
committed
if (unlikely(bytes_recv < 0)) {
int err = -bytes_recv;
if (err == ECONNRESET) {

Florian Fischer
committed
onCONNRESET();
continue;
}

Florian Fischer
committed
if constexpr (!linkedFutures) {
echoError(err, "recv failed");
} else {
// recv failed
if (err != ECANCELED) {
echoError(err, err < 0 ? "recv failed" : "server closed the connection");
}
// send failed
int32_t send_res = sendFuture.wait();
if (send_res < 0) {
echoError(-send_res, "send failed");
}
// send must have sent less than expected because recvFuture returned -ECANCELED
echoError(0, EMPER_BUILD_STR("short send: " << send_res));
}
if constexpr (!collectTimeStamps) {

Florian Fischer
committed
nanoseconds duration = duration_cast<nanoseconds>(echoEnd - echoStart);
avg_ns.update(duration.count());
}
if (memcmp(outBuf, inBuf, size) != 0) {

Florian Fischer
committed
print_unexpected_echo_msg();
unexpectedEchos++;
iteration++;

Florian Fischer
committed
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
echoLoopEnd = high_resolution_clock::now();
}
public:
void shutdown(bool isQuitClient) const {
if (sendQuit && isQuitClient) {
LOGI("Send quit message");
std::string msg = "quit\n";
ssize_t res = emper::io::sendAndWait(sock, msg.c_str(), msg.size(), MSG_NOSIGNAL);
if (res < 0) {
LOGW("Sending quit failed: " << strerror(errno));
}
}
::shutdown(sock, SHUT_RDWR);
// ShutdownFuture shut_f(sock, SHUT_RDWR);
CloseFuture cf(sock);
// cf.setDependency(shut_f);
cf.submit();
cf.wait();
}
void run() {
if (histogram) {
if (linked_futures) {
_run<CollectTimeStamps::yes, LinkedFutures::yes>();
} else {
_run<CollectTimeStamps::yes, LinkedFutures::no>();
}
} else {
if (linked_futures) {
_run<CollectTimeStamps::no, LinkedFutures::yes>();
} else {
_run<CollectTimeStamps::no, LinkedFutures::no>();
}
}
Client** clients;
std::atomic<size_t> Client::client_ids = 0;
auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool {
size_t next_client_id = client_ids.fetch_add(1);
if (next_client_id >= nclients) {
return false;
}
spawn(
[&, id = next_client_id] {
clients[id] = new Client(id, readySem, startSem, cps, histogram);

Florian Fischer
committed
clients[id]->run();
},
cps);
return true;
}
auto getOption(int argc, char** argv, const std::string& option) -> char* {
char** end = argv + argc;
char** itr = std::find(argv, end, option);
if (itr != end && ++itr != end) {
return *itr;
}
return nullptr;
}
static auto existsOption(int argc, char** argv, const std::string& option) -> bool {
char** end = argv + argc;
return std::find(argv, end, option) != end;
}
static void printUsage(char* name) {
std::cerr
<< "Usage: " << name
<< "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]"
" [-f <output-file>] [-i <iterations> | -t <execution time in sec>] [--linked-futures]"
<< std::endl;
}
auto main(int argc, char* argv[]) -> int {
if (existsOption(argc, argv, "-h")) {
printUsage(argv[0]);
exit(EXIT_FAILURE);
}
char* port_s = getOption(argc, argv, "-p");
if (port_s) {
port = std::string(port_s);
}
char* addr_s = getOption(argc, argv, "-a");
if (addr_s) {
host = std::string(addr_s);
}
char* client_s = getOption(argc, argv, "-c");
if (client_s) {
nclients = strtol(client_s, nullptr, DECIMAL);
}
char* iterations_s = getOption(argc, argv, "-i");
if (iterations_s) {
iterations = strtol(iterations_s, nullptr, DECIMAL);
}
char* time_s = getOption(argc, argv, "-t");
if (time_s) {
if (iterations_s) {
std::cerr << "-t and -i are mutual exclusive" << std::endl;
printUsage(argv[0]);
exit(EXIT_FAILURE);
}
execution_seconds = strtol(time_s, nullptr, DECIMAL);
}
char* size_s = getOption(argc, argv, "-s");
if (size_s) {
size = strtol(size_s, nullptr, DECIMAL);
}
char* server_backlog_s = getOption(argc, argv, "-b");
if (server_backlog_s) {
server_backlog = strtol(server_backlog_s, nullptr, DECIMAL);
}
linked_futures = getOption(argc, argv, "--linked-futures");
int out_fd = STDOUT_FILENO;
char* output_file = getOption(argc, argv, "-f");
if (output_file) {
if (access(output_file, W_OK) == 0) {
DIE_MSG("Output file: " << output_file << " already exists");
}
out_fd = open(output_file, OPENFLAGS, OPENMODE);
if (out_fd < 0) {
DIE_MSG_ERRNO("opening output file failed");
}
}
sendQuit = !existsOption(argc, argv, "--no-quit");
int histogram_fd;
char* histogram_file = getOption(argc, argv, "--histogram");
if (histogram_file) {
if (!iterations_s) {
DIE_MSG("histograms are currently only possible with fixed iterations");
}
histogram = true;
if (access(histogram_file, W_OK) == 0) {
DIE_MSG("Histogram file: " << histogram_file << " already exists");
}
histogram_fd = open(histogram_file, OPENFLAGS, OPENMODE);
if (histogram_fd < 0) {
DIE_MSG_ERRNO("opening histogram file failed");
}
}
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
if (err) {
if (err == EAI_SYSTEM) {
DIE_MSG_ERRNO("getaddrinfo failed");
} else {
LOGE("error in getaddrinfo: " << gai_strerror(err));
exit(EXIT_FAILURE);
}
}
Runtime runtime;
Fiber* alphaFiber = Fiber::from([&] {
clients = new Client*[nclients];
CPS cps;
Semaphore readySemaphore;
Semaphore startSemaphore;
auto connect_start = high_resolution_clock::now();
// start first client batch
for (size_t i = 0; i < server_backlog; ++i) {
// Start new clients until we have started all or the amount
// of parallel connecting clients is reached
if (!Client::startNew(readySemaphore, startSemaphore, cps)) {
}
// await the clients
for (size_t i = 0; i < nclients; ++i) {
readySemaphore.acquire();
}

Florian Fischer
committed
high_resolution_clock::time_point echoTerminationTime;
if (execution_seconds) {
iterations = SIZE_MAX;

Florian Fischer
committed
terminator = std::thread([&] {
std::this_thread::sleep_for(seconds(execution_seconds));

Florian Fischer
committed
echoTerminationTime = high_resolution_clock::now();
auto echo_start = high_resolution_clock::now();
// start the clients
for (size_t i = 0; i < nclients; ++i) {
startSemaphore.release();
}
// await echo phase termination
cps.wait();
auto echo_end = high_resolution_clock::now();
if (execution_seconds) {
terminator.join();
}
auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count();

Florian Fischer
committed
uint64_t echoDurationUntilTermination;
if (execution_seconds) {
echoDurationUntilTermination =
duration_cast<nanoseconds>(echoTerminationTime - echo_start).count();
}
auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count();
auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count();
uint64_t total_iterations = 0;
uint64_t total_reconnects = 0;
uint64_t total_unexpected_echos = 0;
RunningAverage avg_ns;
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
total_iterations += client->iteration;
total_reconnects += client->reconnects;
total_unexpected_echos += client->unexpectedEchos;
if (!histogram) {
avg_ns.update(client->avg_ns.getAverage());
}
// terminate the connection and send 'quit' on the last connection
bool isQuitClient = i == (nclients - 1);
async([=]() { client->shutdown(isQuitClient); });
std::stringstream sst;
sst << "[global]" << std::endl;
sst << "clients = " << nclients << std::endl;
sst << "size = " << size << std::endl;
sst << "total_iterations = " << total_iterations << std::endl;
sst << "total_reconnects = " << total_reconnects << std::endl;
sst << "total_unexpected_echos = " << total_unexpected_echos << std::endl;
sst << "avg_ns = " << avg_ns.getAverage() << std::endl;
sst << "connect_duration = " << connect_duration << std::endl;

Florian Fischer
committed
if (execution_seconds) {
sst << "execution_duration = " << echoDurationUntilTermination << std::endl;
}
sst << "echo_duration = " << echo_duration << std::endl;
sst << "total_duration = " << total_duration << std::endl;
sst << std::endl;
sst << "[clients]" << std::endl;
sst << "csv =" << std::endl;

Florian Fischer
committed
sst << " iterations,avg_ns,reconnects,unexpected_echos,echoLoopEnd" << std::endl;
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
sst << " " << client->iteration << ",";
sst << client->avg_ns.getAverage() << ",";
sst << client->reconnects << ",";

Florian Fischer
committed
sst << client->unexpectedEchos << ",";
sst << duration_cast<nanoseconds>(client->echoLoopEnd.time_since_epoch()).count()
<< std::endl;
auto output = sst.str();
if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) {
DIE_MSG_ERRNO("writing results failed");
}
if (output_file) {
emper::io::closeAndForget(out_fd);
}
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
if (histogram) {
std::string histogramDataHeader =
"total_latency,after_send_latency,after_send_dispatch_latency\n";
if (emper::io::writeFileAndWait(histogram_fd, histogramDataHeader.c_str(),
histogramDataHeader.size()) < 0) {
DIE_MSG_ERRNO("writing histogram data header failed");
}
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
std::stringstream sst;
for (size_t iteration = 0; iteration < iterations; ++iteration) {
auto timeStamps = client->timeStamps[iteration];
nanoseconds totalLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.start);
nanoseconds afterSendLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSend);
nanoseconds afterSendDispatchLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSendDispatch);
sst << totalLatency.count() << ",";
sst << afterSendLatency.count() << ",";
sst << afterSendDispatchLatency.count() << std::endl;
}
auto histoOutput = sst.str();
if (emper::io::writeFileAndWait(histogram_fd, histoOutput.c_str(), histoOutput.size()) <
0) {
DIE_MSG_ERRNO("writing histogram data failed");
}
}
emper::io::closeAndForget(histogram_fd);
}
Runtime::getRuntime()->initiateTermination();
});
runtime.scheduleFromAnywhere(*alphaFiber);
runtime.waitUntilFinished();
for (size_t i = 0; i < nclients; ++i) {
delete clients[i];
}
delete[] clients;
return EXIT_SUCCESS;