Newer
Older
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <arpa/inet.h>
#include <netdb.h> //for getaddrinfo
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h> // for shutdown, socket, AF_INET
#include <sys/stat.h>
#include <unistd.h>
#include <algorithm> // for find
#include <cerrno> // for errno, ECANCELED
#include <chrono> // for nanoseconds, duration, durat...
#include <cstdint> // for uint64_t, int32_t
#include <cstdio>
#include <cstdlib> // for size_t, strtol, exit, EXIT_F...
#include <cstring> // for memcmp
#include <iomanip>
#include <iostream> // for operator<<, basic_ostream, endl
#include <string> // for allocator, string, char_traits
#include <utility>
#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli...
#include "CountingPrivateSemaphore.hpp" // for CPS
#include "Debug.hpp" // for LOGE
#include "Fiber.hpp" // for Fiber
#include "Runtime.hpp" // for Runtime
#include "Semaphore.hpp" // for Semaphore
#include "emper.hpp" // for spawn
#include "io.hpp" // for connectAndWait
#include "io/Future.hpp" // for CloseFuture, RecvFuture, Sen...
#include "lib/math.hpp"
using emper::Semaphore;
using namespace emper::io;
using emper::lib::math::RunningAverage;
using std::chrono::duration_cast;
using std::chrono::high_resolution_clock;
using std::chrono::nanoseconds;
// Defaults
const int DECIMAL = 10;
const std::string HOST = "0.0.0.0";
const std::string PORT = "12345";
const size_t ITERATIONS = 10000;
const size_t CLIENTS = 10000;
const size_t SIZE = 32;
const size_t SERVER_BACKLOG = 1024;
const seconds ONERROR_SLEEP(10);
const int OPENFLAGS = O_CREAT | O_WRONLY;
const int OPENMODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
// Globals
std::string host = HOST;
std::string port = PORT;
size_t iterations = ITERATIONS;
size_t execution_seconds = 0;
size_t nclients = CLIENTS;
size_t size = SIZE;
size_t server_backlog = SERVER_BACKLOG;
bool sendQuit = true;
std::atomic<bool> terminate = false;
struct addrinfo* server;
struct addrinfo* coordinator;
std::string COORDINATOR_PORT = "12346";
static auto getFirstAddrInfo(const std::string& host, const std::string& port) -> struct addrinfo* {
struct addrinfo* addrinfo;
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &addrinfo);
if (err) {
if (err == EAI_SYSTEM) {
DIE_MSG_ERRNO("getaddrinfo failed");
} else {
LOGE("error in getaddrinfo: " << gai_strerror(err));
exit(EXIT_FAILURE);
}
}
return addrinfo;
}
struct TimeStamps {
high_resolution_clock::time_point start;
high_resolution_clock::time_point afterSend;
high_resolution_clock::time_point afterSendDispatch;
high_resolution_clock::time_point afterRecv;
};
class Client {
using IterationResult = std::pair<int, std::string>;
size_t id;
Semaphore& readySem;
Semaphore& startSem;
CPS& cps;
int sock;
bool initialized = false;
char* inBuf;
char* outBuf;
public:
static std::atomic<size_t> client_ids;
// results
size_t iteration = 0;
RunningAverage<double> avg_ns;
size_t unexpectedEchos = 0;
size_t reconnects = 0;
struct TimeStamps* timeStamps = nullptr;

Florian Fischer
committed
high_resolution_clock::time_point echoStart;
high_resolution_clock::time_point echoEnd;
high_resolution_clock::time_point echoLoopEnd;
Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps, bool collectTimeStamps)
: id(id), readySem(readySem), startSem(startSem), cps(cps) {
outBuf = new char[size];
inBuf = new char[size];
if (collectTimeStamps) {
timeStamps = new TimeStamps[iterations];
}
}
~Client() {
delete[] outBuf;
delete[] inBuf;
delete[] timeStamps;
static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool;

Florian Fischer
committed
[[nodiscard]] auto shouldTerminate() const -> bool {

Florian Fischer
committed
return iteration >= iterations || terminate.load(std::memory_order_relaxed);
}

Florian Fischer
committed
void print_unexpected_echo_msg() const {
std::stringstream sst;
sst << "Client " << id << " got unexpected echo from server" << std::endl;
sst << "expected: ";
for (unsigned i = 0; i < size; ++i) {
sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]);

Florian Fischer
committed
sst << std::endl;

Florian Fischer
committed
sst << "received: ";
for (unsigned i = 0; i < size; ++i) {
sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]);
sst << (unsigned)inBuf[i] << " ";

Florian Fischer
committed
sst << std::endl;
LOGE(sst.str());

Florian Fischer
committed
void echoError(int err, std::string&& msg) const {
std::stringstream sst;
sst << "Client " << id << " failed at iteration " << iteration << " with " << msg;
if (err != 0) {
sst << ": " << strerror(err);

Florian Fischer
committed
sst << std::endl;
LOGE(sst.str());

Florian Fischer
committed
std::this_thread::sleep_for(ONERROR_SLEEP);
DIE;
}

Florian Fischer
committed
void onCONNRESET() {
LOGW("Client " << id << " reconnecting");
closeAndForget(sock);

Florian Fischer
committed
// reconnect to try again
connect();
void connect() {
sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
if (sock < 0) {
DIE_MSG_ERRNO("creating new client socket failed");
}
int enable = 1;
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) == -1) {
DIE_MSG_ERRNO("setsockopt failed");
}
int err = emper::io::connectAndWait(sock, server->ai_addr, server->ai_addrlen);
if (err) {
DIE_MSG_ERRNO("connecting new client socket failed");
}
if (!initialized) {
// We are connected -> start next client
Client::startNew(readySem, startSem, cps);
initialized = true;
reconnects++;
}

Florian Fischer
committed
enum class CollectTimeStamps { yes, no };

Florian Fischer
committed
enum class LinkedFutures { yes, no };

Florian Fischer
committed
template <CollectTimeStamps collectTimeStampsSwitch, LinkedFutures linkedFuturesSwitch>
void _run() {
constexpr bool collectTimeStamps = collectTimeStampsSwitch == CollectTimeStamps::yes;
constexpr bool linkedFutures = linkedFuturesSwitch == LinkedFutures::yes;
// signal that this client is initialized and connected
readySem.release();
// wait for all clients to be ready
startSem.acquire();

Florian Fischer
committed
while (!shouldTerminate()) {
SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL);
RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL);
// prepare output buf
sprintf(outBuf, "%lu:%lu", id, iteration);

Florian Fischer
committed
echoStart = high_resolution_clock::now();
if constexpr (collectTimeStamps) {
timeStamps[iteration].start = echoStart;

Florian Fischer
committed
if constexpr (!linkedFutures) {
sendFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
if (shouldTerminate()) {
break;
}
int32_t bytes_send = sendFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = high_resolution_clock::now();
}
if (unlikely(bytes_send < 0)) {
int err = -bytes_send;
if (err == ECONNRESET) {
onCONNRESET();
continue;
}
echoError(err, "send failed");
}

Florian Fischer
committed
recvFuture.setDependency(sendFuture);

Florian Fischer
committed
recvFuture.submit();
if (shouldTerminate()) {
break;
}
int32_t bytes_recv = recvFuture.wait();
echoEnd = high_resolution_clock::now();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterRecv = echoEnd;

Florian Fischer
committed
if (unlikely(bytes_recv < 0)) {
int err = -bytes_recv;
if (err == ECONNRESET) {

Florian Fischer
committed
onCONNRESET();
continue;
}

Florian Fischer
committed
if constexpr (!linkedFutures) {
echoError(err, "recv failed");
} else {
// recv failed
if (err != ECANCELED) {
echoError(err, err < 0 ? "recv failed" : "server closed the connection");
}
// send failed
int32_t send_res = sendFuture.wait();
if (send_res < 0) {
echoError(-send_res, "send failed");
}
// send must have sent less than expected because recvFuture returned -ECANCELED
echoError(0, EMPER_BUILD_STR("short send: " << send_res));
}
if constexpr (!collectTimeStamps) {

Florian Fischer
committed
nanoseconds duration = duration_cast<nanoseconds>(echoEnd - echoStart);
avg_ns.update(duration.count());
}
if (memcmp(outBuf, inBuf, size) != 0) {

Florian Fischer
committed
print_unexpected_echo_msg();
unexpectedEchos++;
iteration++;

Florian Fischer
committed
echoLoopEnd = high_resolution_clock::now();
}
public:
void shutdown(bool isQuitClient) const {
if (sendQuit && isQuitClient) {
LOGI("Send quit message");
std::string msg = "quit\n";
ssize_t res = emper::io::sendAndWait(sock, msg.c_str(), msg.size(), MSG_NOSIGNAL);
if (res < 0) {
LOGW("Sending quit failed: " << strerror(errno));
}
}

Florian Fischer
committed
CloseFuture cf(sock);

Florian Fischer
committed
cf.submit();
cf.wait();
}
void run() {
if (histogram) {
if (linked_futures) {
_run<CollectTimeStamps::yes, LinkedFutures::yes>();
} else {
_run<CollectTimeStamps::yes, LinkedFutures::no>();
}
} else {
if (linked_futures) {
_run<CollectTimeStamps::no, LinkedFutures::yes>();
} else {
_run<CollectTimeStamps::no, LinkedFutures::no>();
}
}
Client** clients;
std::atomic<size_t> Client::client_ids = 0;
auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool {
size_t next_client_id = client_ids.fetch_add(1);
if (next_client_id >= nclients) {
return false;
}
spawn(
[&, id = next_client_id] {
clients[id] = new Client(id, readySem, startSem, cps, histogram);

Florian Fischer
committed
clients[id]->run();
},
cps);
return true;
}
static auto getOption(int argc, char** argv, const std::string& option) -> char* {
char** end = argv + argc;
char** itr = std::find(argv, end, option);
if (itr != end && ++itr != end) {
return *itr;
}
return nullptr;
}
static auto existsOption(int argc, char** argv, const std::string& option) -> bool {
char** end = argv + argc;
return std::find(argv, end, option) != end;
}
static void printUsage(char* name) {
std::cerr
<< "Usage: " << name
<< "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]"
" [-f <output-file>] [-i <iterations> | -t <execution time in sec>] [--linked-futures]"
" [--coordinator <coordinator-host>"
}
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
static void printIp(struct sockaddr* addr) {
std::array<char, INET6_ADDRSTRLEN> serverAddrString;
void* serverAddr;
switch (addr->sa_family) {
case AF_INET:
serverAddr = &(((struct sockaddr_in*)addr)->sin_addr);
break;
case AF_INET6:
serverAddr = &(((struct sockaddr_in6*)addr)->sin6_addr);
break;
default:
DIE_MSG("uknown sa_family " << addr->sa_family);
}
inet_ntop(addr->sa_family, serverAddr, serverAddrString.data(), INET6_ADDRSTRLEN);
std::cout << "echo client connecting to " << serverAddrString.data() << ":" << port << std::endl;
}
static void printSettings() {
std::cout << "parameters: {termination=";
if (execution_seconds) {
std::cout << execution_seconds << "seconds";
} else {
std::cout << iterations << "iterations";
}
std::cout << ",clients=" << nclients;
std::cout << ",size=" << size;
std::cout << ",sendquit= " << (sendQuit ? "true" : "false");
std::cout << "}" << std::endl;
}
auto main(int argc, char* argv[]) -> int {
if (existsOption(argc, argv, "-h")) {
printUsage(argv[0]);
exit(EXIT_FAILURE);
}
char* port_s = getOption(argc, argv, "-p");
if (port_s) {
port = std::string(port_s);
}
char* addr_s = getOption(argc, argv, "-a");
if (addr_s) {
host = std::string(addr_s);
}
char* client_s = getOption(argc, argv, "-c");
if (client_s) {
nclients = strtol(client_s, nullptr, DECIMAL);
}
char* iterations_s = getOption(argc, argv, "-i");
if (iterations_s) {
iterations = strtol(iterations_s, nullptr, DECIMAL);
}
char* time_s = getOption(argc, argv, "-t");
if (time_s) {
if (iterations_s) {
std::cerr << "-t and -i are mutual exclusive" << std::endl;
printUsage(argv[0]);
exit(EXIT_FAILURE);
}
execution_seconds = strtol(time_s, nullptr, DECIMAL);
}
char* size_s = getOption(argc, argv, "-s");
if (size_s) {
size = strtol(size_s, nullptr, DECIMAL);
}
char* server_backlog_s = getOption(argc, argv, "-b");
if (server_backlog_s) {
server_backlog = strtol(server_backlog_s, nullptr, DECIMAL);
}
linked_futures = getOption(argc, argv, "--linked-futures");
int out_fd = STDOUT_FILENO;
char* output_file = getOption(argc, argv, "-f");
if (output_file) {
if (access(output_file, W_OK) == 0) {
DIE_MSG("Output file: " << output_file << " already exists");
}
out_fd = ::open(output_file, OPENFLAGS, OPENMODE);
if (out_fd < 0) {
DIE_MSG_ERRNO("opening output file failed");
}
}
sendQuit = !existsOption(argc, argv, "--no-quit");
int histogram_fd;
char* histogram_file = getOption(argc, argv, "--histogram");
if (histogram_file) {
if (!iterations_s) {
DIE_MSG("histograms are currently only possible with fixed iterations");
}
histogram = true;
if (access(histogram_file, W_OK) == 0) {
DIE_MSG("Histogram file: " << histogram_file << " already exists");
}
histogram_fd = ::open(histogram_file, OPENFLAGS, OPENMODE);
if (histogram_fd < 0) {
DIE_MSG_ERRNO("opening histogram file failed");
}
}
char* coordinator_s = getOption(argc, argv, "--coordinator");
if (coordinator_s) {
coordinator = getFirstAddrInfo(coordinator_s, COORDINATOR_PORT);
}
server = getFirstAddrInfo(host, port);
printIp(server->ai_addr);
printSettings();
Runtime runtime;
Fiber* alphaFiber = Fiber::from([&] {
clients = new Client*[nclients];
CPS cps;
Semaphore readySemaphore;
Semaphore startSemaphore;
auto connect_start = high_resolution_clock::now();
// start first client batch
for (size_t i = 0; i < server_backlog; ++i) {
// Start new clients until we have started all or the amount
// of parallel connecting clients is reached
if (!Client::startNew(readySemaphore, startSemaphore, cps)) {
}
// await the connection of all clients
for (size_t i = 0; i < nclients; ++i) {
readySemaphore.acquire();
}
// synchronize with other peers using the coordinator
if (coordinator) {
int conn = socket(coordinator->ai_family, coordinator->ai_socktype, coordinator->ai_protocol);
if (conn < 0) DIE_MSG_ERRNO("coordinator socket creation failed");
int err = emper::io::connectAndWait(conn, coordinator->ai_addr, coordinator->ai_addrlen);
if (err) DIE_MSG_ERRNO("coordinator socket connect failed");
std::string m = "OK";
SendFuture sf(conn, m.c_str(), m.size(), 0);
std::array<char, 16> buf;
RecvFuture rf(conn, buf.data(), buf.size(), 0);
rf.setDependency(sf);
err = rf.submitAndWait();
if (err < 0) DIE_MSG("coordinator recv failed: " << strerror(-err));
closeAndForget(conn);
LOGI("Coordinator go received");
}

Florian Fischer
committed
high_resolution_clock::time_point echoTerminationTime;
if (execution_seconds) {
iterations = SIZE_MAX;

Florian Fischer
committed
terminator = std::thread([&] {
std::this_thread::sleep_for(seconds(execution_seconds));

Florian Fischer
committed
echoTerminationTime = high_resolution_clock::now();
auto echo_start = high_resolution_clock::now();
// start the clients
for (size_t i = 0; i < nclients; ++i) {
startSemaphore.release();
}
// await echo phase termination
cps.wait();
auto echo_end = high_resolution_clock::now();
if (execution_seconds) {
terminator.join();
}
auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count();

Florian Fischer
committed
uint64_t echoDurationUntilTermination;
if (execution_seconds) {
echoDurationUntilTermination =
duration_cast<nanoseconds>(echoTerminationTime - echo_start).count();
}
auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count();
auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count();
uint64_t total_iterations = 0;
uint64_t total_reconnects = 0;
uint64_t total_unexpected_echos = 0;
RunningAverage avg_ns;
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
total_iterations += client->iteration;
total_reconnects += client->reconnects;
total_unexpected_echos += client->unexpectedEchos;
if (!histogram) {
avg_ns.update(client->avg_ns.getAverage());
}
// terminate the connection and send 'quit' on the last connection
bool isQuitClient = i == (nclients - 1);
async([=]() { client->shutdown(isQuitClient); });
std::stringstream sst;
sst << "[global]" << std::endl;
sst << "clients = " << nclients << std::endl;
sst << "size = " << size << std::endl;
sst << "total_iterations = " << total_iterations << std::endl;
sst << "total_reconnects = " << total_reconnects << std::endl;
sst << "total_unexpected_echos = " << total_unexpected_echos << std::endl;
sst << "avg_ns = " << avg_ns.getAverage() << std::endl;
sst << "connect_duration = " << connect_duration << std::endl;

Florian Fischer
committed
if (execution_seconds) {
sst << "execution_duration = " << echoDurationUntilTermination << std::endl;
}
sst << "echo_duration = " << echo_duration << std::endl;
sst << "total_duration = " << total_duration << std::endl;
sst << std::endl;
sst << "[clients]" << std::endl;
sst << "csv =" << std::endl;

Florian Fischer
committed
sst << " iterations,avg_ns,reconnects,unexpected_echos,echoLoopEnd" << std::endl;
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
sst << " " << client->iteration << ",";
sst << client->avg_ns.getAverage() << ",";
sst << client->reconnects << ",";

Florian Fischer
committed
sst << client->unexpectedEchos << ",";
sst << duration_cast<nanoseconds>(client->echoLoopEnd.time_since_epoch()).count()
<< std::endl;
auto output = sst.str();
if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) {
DIE_MSG_ERRNO("writing results failed");
}
if (output_file) {
emper::io::closeAndForget(out_fd);
}
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
if (histogram) {
std::string histogramDataHeader =
"total_latency,after_send_latency,after_send_dispatch_latency\n";
if (emper::io::writeFileAndWait(histogram_fd, histogramDataHeader.c_str(),
histogramDataHeader.size()) < 0) {
DIE_MSG_ERRNO("writing histogram data header failed");
}
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
std::stringstream sst;
for (size_t iteration = 0; iteration < iterations; ++iteration) {
auto timeStamps = client->timeStamps[iteration];
nanoseconds totalLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.start);
nanoseconds afterSendLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSend);
nanoseconds afterSendDispatchLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSendDispatch);
sst << totalLatency.count() << ",";
sst << afterSendLatency.count() << ",";
sst << afterSendDispatchLatency.count() << std::endl;
}
auto histoOutput = sst.str();
if (emper::io::writeFileAndWait(histogram_fd, histoOutput.c_str(), histoOutput.size()) <
0) {
DIE_MSG_ERRNO("writing histogram data failed");
}
}
emper::io::closeAndForget(histogram_fd);
}
Runtime::getRuntime()->initiateTermination();
});
runtime.scheduleFromAnywhere(*alphaFiber);
runtime.waitUntilFinished();
for (size_t i = 0; i < nclients; ++i) {
delete clients[i];
}
delete[] clients;
return EXIT_SUCCESS;