Newer
Older
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <netdb.h> //for getaddrinfo
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h> // for shutdown, socket, AF_INET
#include <sys/stat.h>
#include <unistd.h>
#include <algorithm> // for find
#include <cerrno> // for errno, ECANCELED
#include <chrono> // for nanoseconds, duration, durat...
#include <cstdint> // for uint64_t, int32_t
#include <cstdio>
#include <cstdlib> // for size_t, strtol, exit, EXIT_F...
#include <cstring> // for memcmp
#include <iomanip>
#include <iostream> // for operator<<, basic_ostream, endl
#include <string> // for allocator, string, char_traits
#include <utility>
#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli...
#include "CountingPrivateSemaphore.hpp" // for CPS
#include "Debug.hpp" // for LOGE
#include "Fiber.hpp" // for Fiber
#include "Runtime.hpp" // for Runtime
#include "Semaphore.hpp" // for Semaphore
#include "emper.hpp" // for spawn
#include "io.hpp" // for connectAndWait
#include "io/Future.hpp" // for CloseFuture, RecvFuture, Sen...
#include "lib/math.hpp"
using emper::Semaphore;
using emper::io::CloseFuture;
using emper::io::RecvFuture;
using emper::io::SendFuture;
using emper::lib::math::RunningAverage;
using std::chrono::duration_cast;
using std::chrono::high_resolution_clock;
using std::chrono::nanoseconds;
// Defaults
const int DECIMAL = 10;
const std::string HOST = "0.0.0.0";
const std::string PORT = "12345";
const size_t ITERATIONS = 10000;
const size_t CLIENTS = 10000;
const size_t SIZE = 32;
const size_t SERVER_BACKLOG = 1024;
const seconds ONERROR_SLEEP(10);
const int OPENFLAGS = O_CREAT | O_WRONLY;
const int OPENMODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
// Globals
std::string host = HOST;
std::string port = PORT;
size_t iterations = ITERATIONS;
size_t execution_seconds = 0;
size_t nclients = CLIENTS;
size_t size = SIZE;
size_t server_backlog = SERVER_BACKLOG;
bool sendQuit = true;
std::atomic<bool> terminate = false;
struct addrinfo* server;
struct TimeStamps {
high_resolution_clock::time_point start;
high_resolution_clock::time_point afterSend;
high_resolution_clock::time_point afterSendDispatch;
high_resolution_clock::time_point afterRecv;
};
class Client {
using IterationResult = std::pair<int, std::string>;
size_t id;
Semaphore& readySem;
Semaphore& startSem;
CPS& cps;
int sock;
bool initialized = false;
char* inBuf;
char* outBuf;
public:
static std::atomic<size_t> client_ids;
// results
size_t iteration = 0;
RunningAverage<double> avg_ns;
size_t unexpectedEchos = 0;
size_t reconnects = 0;
struct TimeStamps* timeStamps = nullptr;
Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps, bool collectTimeStamps)
: id(id), readySem(readySem), startSem(startSem), cps(cps) {
outBuf = new char[size];
inBuf = new char[size];
if (collectTimeStamps) {
timeStamps = new TimeStamps[iterations];
}
}
~Client() {
delete[] outBuf;
delete[] inBuf;
delete[] timeStamps;
static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool;
template <bool collectTimeStamps = false>
auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
if constexpr (collectTimeStamps) {
timeStamps[iteration].start = high_resolution_clock::now();
}
sendFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
int32_t bytes_send = sendFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = high_resolution_clock::now();
}
if (unlikely(bytes_send < 0)) {
return {bytes_send, "send failed"};
int32_t bytes_recv = recvFuture.submitAndWait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterRecv = high_resolution_clock::now();
}
if (unlikely(bytes_recv < 0)) {
return {bytes_recv, "recv failed"};
return {bytes_recv, ""};
template <bool collectTimeStamps = false>
auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
recvFuture.setDependency(sendFuture);
recvFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
int32_t res = recvFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = timeStamps[iteration].afterRecv =
high_resolution_clock::now();
}
if (unlikely(res <= 0)) {
// recv failed
if (res != -ECANCELED) {
if (res == 0) {
return {0, "server closed the connection"};
}
return {res, "recv failed"};
}
// send failed
int32_t send_res = sendFuture.wait();
if (send_res < 0) {
return {send_res, "send failed"};
}
// send must have sent less than expected because recvFuture returned -ECANCELED
return {0, EMPER_BUILD_STR("short send: " << send_res)};
}
return {res, ""};
void connect() {
sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
if (sock < 0) {
DIE_MSG_ERRNO("creating new client socket failed");
}
int enable = 1;
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) == -1) {
DIE_MSG_ERRNO("setsockopt failed");
}
int err = emper::io::connectAndWait(sock, server->ai_addr, server->ai_addrlen);
if (err) {
DIE_MSG_ERRNO("connecting new client socket failed");
}
if (!initialized) {
// We are connected -> start next client
Client::startNew(readySem, startSem, cps);
initialized = true;
reconnects++;
}
void shutdown(bool isQuitClient) const {
if (sendQuit && isQuitClient) {
LOGI("Send quit message");
std::string msg = "quit\n";
ssize_t res = emper::io::sendAndWait(sock, msg.c_str(), msg.size(), MSG_NOSIGNAL);
if (res < 0) {
LOGW("Sending quit failed: " << strerror(errno));
}
}
::shutdown(sock, SHUT_RDWR);
// ShutdownFuture shut_f(sock, SHUT_RDWR);
CloseFuture cf(sock);
// cf.setDependency(shut_f);
cf.submit();
cf.wait();
}
template <bool collectTimeStamps = false>
void run() {
connect();
// signal that this client is initialized and connected
readySem.release();
// wait for all clients to be ready
startSem.acquire();
while (iteration < iterations && !terminate.load(std::memory_order_relaxed)) {
SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL);
RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL);
// prepare output buf
sprintf(outBuf, "%lu:%lu", id, iteration);
high_resolution_clock::time_point start;
if constexpr (!collectTimeStamps) {
start = high_resolution_clock::now();
}
IterationResult iterationResult;
if (linked_futures) {
iterationResult = linkedEcho<collectTimeStamps>(sendFuture, recvFuture);
iterationResult = echo<collectTimeStamps>(sendFuture, recvFuture);
}
high_resolution_clock::time_point end;
if constexpr (!collectTimeStamps) {
end = high_resolution_clock::now();
}
// less than 1 byte was received -> an error occurred
// (connection closed by server is considered an error)
if (unlikely(iterationResult.first <= 0)) {
int err = -iterationResult.first;
auto& msg = iterationResult.second;
if (err == ECONNRESET) {
LOGW("Client " << id << " reconnecting");
close(sock);
// reconnect and try again
connect();
continue;
}
LOGE("Client failed at iteration " << iteration << " with " << msg << ": "
<< strerror(err));
std::this_thread::sleep_for(ONERROR_SLEEP);
DIE;
if constexpr (!collectTimeStamps) {
nanoseconds duration = duration_cast<nanoseconds>(end - start);
avg_ns.update(duration.count());
}
if (memcmp(outBuf, inBuf, size) != 0) {
std::stringstream sst;
sst << "got unexpected echo from server" << std::endl;
sst << "expected: ";
for (unsigned i = 0; i < size; ++i) {
sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]);
}
sst << std::endl;
sst << "received: ";
for (unsigned i = 0; i < size; ++i) {
sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]);
sst << (unsigned)inBuf[i] << " ";
}
sst << std::endl;
LOGE(sst.str());
unexpectedEchos++;
iteration++;
Client** clients;
std::atomic<size_t> Client::client_ids = 0;
auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool {
size_t next_client_id = client_ids.fetch_add(1);
if (next_client_id >= nclients) {
return false;
}
spawn(
[&, id = next_client_id] {
clients[id] = new Client(id, readySem, startSem, cps, histogram);
if (histogram) {
clients[id]->run<true>();
} else {
clients[id]->run();
}
},
cps);
return true;
}
auto getOption(int argc, char** argv, const std::string& option) -> char* {
char** end = argv + argc;
char** itr = std::find(argv, end, option);
if (itr != end && ++itr != end) {
return *itr;
}
return nullptr;
}
static auto existsOption(int argc, char** argv, const std::string& option) -> bool {
char** end = argv + argc;
return std::find(argv, end, option) != end;
}
static void printUsage(char* name) {
std::cerr
<< "Usage: " << name
<< "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]"
" [-f <output-file>] [-i <iterations> | -t <execution time in sec>] [--linked-futures]"
<< std::endl;
}
auto main(int argc, char* argv[]) -> int {
if (existsOption(argc, argv, "-h")) {
printUsage(argv[0]);
exit(EXIT_FAILURE);
}
char* port_s = getOption(argc, argv, "-p");
if (port_s) {
port = std::string(port_s);
}
char* addr_s = getOption(argc, argv, "-a");
if (addr_s) {
host = std::string(addr_s);
}
char* client_s = getOption(argc, argv, "-c");
if (client_s) {
nclients = strtol(client_s, nullptr, DECIMAL);
}
char* iterations_s = getOption(argc, argv, "-i");
if (iterations_s) {
iterations = strtol(iterations_s, nullptr, DECIMAL);
}
char* time_s = getOption(argc, argv, "-t");
if (time_s) {
if (iterations_s) {
std::cerr << "-t and -i are mutual exclusive" << std::endl;
printUsage(argv[0]);
exit(EXIT_FAILURE);
}
execution_seconds = strtol(time_s, nullptr, DECIMAL);
}
char* size_s = getOption(argc, argv, "-s");
if (size_s) {
size = strtol(size_s, nullptr, DECIMAL);
}
char* server_backlog_s = getOption(argc, argv, "-b");
if (server_backlog_s) {
server_backlog = strtol(server_backlog_s, nullptr, DECIMAL);
}
linked_futures = getOption(argc, argv, "--linked-futures");
int out_fd = STDOUT_FILENO;
char* output_file = getOption(argc, argv, "-f");
if (output_file) {
if (access(output_file, W_OK) == 0) {
DIE_MSG("Output file: " << output_file << " already exists");
}
out_fd = open(output_file, OPENFLAGS, OPENMODE);
if (out_fd < 0) {
DIE_MSG_ERRNO("opening output file failed");
}
}
sendQuit = !existsOption(argc, argv, "--no-quit");
int histogram_fd;
char* histogram_file = getOption(argc, argv, "--histogram");
if (histogram_file) {
if (!iterations_s) {
DIE_MSG("histograms are currently only possible with fixed iterations");
}
histogram = true;
if (access(histogram_file, W_OK) == 0) {
DIE_MSG("Histogram file: " << histogram_file << " already exists");
}
histogram_fd = open(histogram_file, OPENFLAGS, OPENMODE);
if (histogram_fd < 0) {
DIE_MSG_ERRNO("opening histogram file failed");
}
}
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
if (err) {
if (err == EAI_SYSTEM) {
DIE_MSG_ERRNO("getaddrinfo failed");
} else {
LOGE("error in getaddrinfo: " << gai_strerror(err));
exit(EXIT_FAILURE);
}
}
Runtime runtime;
Fiber* alphaFiber = Fiber::from([&] {
clients = new Client*[nclients];
CPS cps;
Semaphore readySemaphore;
Semaphore startSemaphore;
auto connect_start = high_resolution_clock::now();
// start first client batch
for (size_t i = 0; i < server_backlog; ++i) {
// Start new clients until we have started all or the amount
// of parallel connecting clients is reached
if (!Client::startNew(readySemaphore, startSemaphore, cps)) {
}
// await the clients
for (size_t i = 0; i < nclients; ++i) {
readySemaphore.acquire();
}
if (execution_seconds) {
iterations = SIZE_MAX;
terminator = std::thread([] {
std::this_thread::sleep_for(seconds(execution_seconds));
terminate.store(true);
auto echo_start = high_resolution_clock::now();
// start the clients
for (size_t i = 0; i < nclients; ++i) {
startSemaphore.release();
}
// await echo phase termination
cps.wait();
auto echo_end = high_resolution_clock::now();
if (execution_seconds) {
terminator.join();
}
auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count();
auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count();
auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count();
uint64_t total_iterations = 0;
uint64_t total_reconnects = 0;
uint64_t total_unexpected_echos = 0;
RunningAverage avg_ns;
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
total_iterations += client->iteration;
total_reconnects += client->reconnects;
total_unexpected_echos += client->unexpectedEchos;
if (!histogram) {
avg_ns.update(client->avg_ns.getAverage());
}
// terminate the connection and send 'quit' on the last connection
bool isQuitClient = i == (nclients - 1);
async([=]() { client->shutdown(isQuitClient); });
std::stringstream sst;
sst << "[global]" << std::endl;
sst << "clients = " << nclients << std::endl;
sst << "size = " << size << std::endl;
sst << "total_iterations = " << total_iterations << std::endl;
sst << "total_reconnects = " << total_reconnects << std::endl;
sst << "total_unexpected_echos = " << total_unexpected_echos << std::endl;
sst << "avg_ns = " << avg_ns.getAverage() << std::endl;
sst << "connect_duration = " << connect_duration << std::endl;
sst << "echo_duration = " << echo_duration << std::endl;
sst << "total_duration = " << total_duration << std::endl;
sst << std::endl;
sst << "[clients]" << std::endl;
sst << "csv =" << std::endl;
sst << " iterations,avg_ns,reconnects,unexpected_echos" << std::endl;
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
sst << " " << client->iteration << ",";
sst << client->avg_ns.getAverage() << ",";
sst << client->reconnects << ",";
sst << client->unexpectedEchos << std::endl;
auto output = sst.str();
if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) {
DIE_MSG_ERRNO("writing results failed");
}
if (output_file) {
emper::io::closeAndForget(out_fd);
}
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
if (histogram) {
std::string histogramDataHeader =
"total_latency,after_send_latency,after_send_dispatch_latency\n";
if (emper::io::writeFileAndWait(histogram_fd, histogramDataHeader.c_str(),
histogramDataHeader.size()) < 0) {
DIE_MSG_ERRNO("writing histogram data header failed");
}
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
std::stringstream sst;
for (size_t iteration = 0; iteration < iterations; ++iteration) {
auto timeStamps = client->timeStamps[iteration];
nanoseconds totalLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.start);
nanoseconds afterSendLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSend);
nanoseconds afterSendDispatchLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSendDispatch);
sst << totalLatency.count() << ",";
sst << afterSendLatency.count() << ",";
sst << afterSendDispatchLatency.count() << std::endl;
}
auto histoOutput = sst.str();
if (emper::io::writeFileAndWait(histogram_fd, histoOutput.c_str(), histoOutput.size()) <
0) {
DIE_MSG_ERRNO("writing histogram data failed");
}
}
emper::io::closeAndForget(histogram_fd);
}
Runtime::getRuntime()->initiateTermination();
});
runtime.scheduleFromAnywhere(*alphaFiber);
runtime.waitUntilFinished();
for (size_t i = 0; i < nclients; ++i) {
delete clients[i];
}
delete[] clients;
return EXIT_SUCCESS;