Select Git revision
EchoClient.cpp
Forked from
Lehrstuhl für Informatik 4 (Systemsoftware) / manycore / emper
Source project has a limited visibility.
-
Florian Schmaus authoredFlorian Schmaus authored
EchoClient.cpp 17.40 KiB
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <fcntl.h>
#include <netdb.h> //for getaddrinfo
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h> // for shutdown, socket, AF_INET
#include <sys/stat.h>
#include <unistd.h>
#include <algorithm> // for find
#include <atomic>
#include <cerrno> // for errno, ECANCELED
#include <chrono> // for nanoseconds, duration, durat...
#include <cstdint> // for uint64_t, int32_t
#include <cstdio>
#include <cstdlib> // for size_t, strtol, exit, EXIT_F...
#include <cstring> // for memcmp
#include <iomanip>
#include <iostream> // for operator<<, basic_ostream, endl
#include <ratio>
#include <string>
#include <thread>
#include <utility>
#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli...
#include "CountingPrivateSemaphore.hpp" // for CPS
#include "Debug.hpp" // for LOGE
#include "Fiber.hpp" // for Fiber
#include "Runtime.hpp" // for Runtime
#include "Semaphore.hpp" // for Semaphore
#include "emper.hpp" // for spawn
#include "io.hpp" // for connectAndWait
#include "io/Future.hpp" // for CloseFuture, RecvFuture, Sen...
#include "lib/math.hpp"
using emper::Semaphore;
using emper::io::CloseFuture;
using emper::io::RecvFuture;
using emper::io::SendFuture;
using emper::lib::math::RunningAverage;
using std::chrono::duration_cast;
using std::chrono::high_resolution_clock;
using std::chrono::nanoseconds;
using std::chrono::seconds;
// Defaults
const int DECIMAL = 10;
const std::string HOST = "0.0.0.0";
const std::string PORT = "12345";
const size_t ITERATIONS = 10000;
const size_t CLIENTS = 10000;
const size_t SIZE = 32;
const size_t SERVER_BACKLOG = 1024;
const seconds ONERROR_SLEEP(10);
const int OPENFLAGS = O_CREAT | O_WRONLY;
const int OPENMODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
// Globals
std::string host = HOST;
std::string port = PORT;
size_t iterations = ITERATIONS;
size_t execution_seconds = 0;
size_t nclients = CLIENTS;
size_t size = SIZE;
size_t server_backlog = SERVER_BACKLOG;
bool linked_futures;
bool histogram = false;
static ssize_t maxLowLoadClients = -1;
static std::chrono::milliseconds loadSwitchPeriod(1000);
std::atomic<bool> terminate = false;
struct addrinfo* server;
struct TimeStamps {
high_resolution_clock::time_point start;
high_resolution_clock::time_point afterSend;
high_resolution_clock::time_point afterSendDispatch;
high_resolution_clock::time_point afterRecv;
};
class Client {
using IterationResult = std::pair<int, std::string>;
size_t id;
Semaphore& readySem;
Semaphore& startSem;
CPS& cps;
int sock;
bool initialized = false;
char* inBuf;
char* outBuf;
public:
static std::atomic<size_t> client_ids;
// results
size_t iteration = 0;
RunningAverage<double> avg_ns;
size_t unexpectedEchos = 0;
size_t reconnects = 0;
struct TimeStamps* timeStamps = nullptr;
Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps, bool collectTimeStamps)
: id(id), readySem(readySem), startSem(startSem), cps(cps) {
outBuf = new char[size];
inBuf = new char[size];
if (collectTimeStamps) {
timeStamps = new TimeStamps[iterations];
}
}
~Client() {
delete[] outBuf;
delete[] inBuf;
delete[] timeStamps;
}
static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool;
private:
template <bool collectTimeStamps = false>
auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
if constexpr (collectTimeStamps) {
timeStamps[iteration].start = high_resolution_clock::now();
}
sendFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
int32_t bytes_send = sendFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = high_resolution_clock::now();
}
if (unlikely(bytes_send < 0)) {
return {bytes_send, "send failed"};
}
int32_t bytes_recv = recvFuture.submitAndWait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterRecv = high_resolution_clock::now();
}
if (unlikely(bytes_recv < 0)) {
return {bytes_recv, "recv failed"};
}
return {bytes_recv, ""};
}
template <bool collectTimeStamps = false>
auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
recvFuture.setDependency(sendFuture);
recvFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
int32_t res = recvFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = timeStamps[iteration].afterRecv =
high_resolution_clock::now();
}
if (unlikely(res <= 0)) {
// recv failed
if (res != -ECANCELED) {
if (res == 0) {
return {0, "server closed the connection"};
}
return {res, "recv failed"};
}
// send failed
int32_t send_res = sendFuture.wait();
if (send_res < 0) {
return {send_res, "send failed"};
}
// send must have sent less than expected because recvFuture returned -ECANCELED
return {0, EMPER_BUILD_STR("short send: " << send_res)};
}
return {res, ""};
}
void connect() {
sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
if (sock < 0) {
DIE_MSG_ERRNO("creating new client socket failed");
}
int enable = 1;
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) == -1) {
DIE_MSG_ERRNO("setsockopt failed");
}
int err = emper::io::connectAndWait(sock, server->ai_addr, server->ai_addrlen);
if (err) {
DIE_MSG_ERRNO("connecting new client socket failed");
}
if (!initialized) {
// We are connected -> start next client
Client::startNew(readySem, startSem, cps);
initialized = true;
} else {
reconnects++;
}
}
[[nodiscard]] auto shouldYield() const -> bool {
if (maxLowLoadClients < 0) {
return false;
}
if (id < static_cast<size_t>(maxLowLoadClients)) {
return false;
}
auto now = std::chrono::steady_clock::now();
auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now);
auto ms_since_epoch = now_ms.time_since_epoch();
auto ms_since_epoch_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(ms_since_epoch);
uint64_t periodNum = ms_since_epoch_duration / loadSwitchPeriod;
return periodNum % 2;
}
public:
template <bool collectTimeStamps = false>
void run() {
connect();
// signal that this client is initialized and connected
readySem.release();
// wait for all clients to be ready
startSem.acquire();
while (iteration < iterations && !terminate.load(std::memory_order_relaxed)) {
if (shouldYield()) {
emper::yield();
continue;
}
SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL);
RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL);
// prepare output buf
sprintf(outBuf, "%lu:%lu", id, iteration);
high_resolution_clock::time_point start;
if constexpr (!collectTimeStamps) {
start = high_resolution_clock::now();
}
IterationResult iterationResult;
if (linked_futures) {
iterationResult = linkedEcho<collectTimeStamps>(sendFuture, recvFuture);
} else {
iterationResult = echo<collectTimeStamps>(sendFuture, recvFuture);
}
high_resolution_clock::time_point end;
if constexpr (!collectTimeStamps) {
end = high_resolution_clock::now();
}
// less than 1 byte was received -> an error occurred
// (connection closed by server is considered an error)
if (unlikely(iterationResult.first <= 0)) {
int err = -iterationResult.first;
auto& msg = iterationResult.second;
if (err == ECONNRESET) {
LOGW("Client " << id << " reconnecting");
close(sock);
// reconnect and try again
connect();
continue;
}
LOGE("Client failed at iteration " << iteration << " with " << msg << ": "
<< strerror(err));
std::this_thread::sleep_for(ONERROR_SLEEP);
DIE;
}
if constexpr (!collectTimeStamps) {
nanoseconds duration = duration_cast<nanoseconds>(end - start);
avg_ns.update(duration.count());
}
if (memcmp(outBuf, inBuf, size) != 0) {
std::stringstream sst;
sst << "got unexpected echo from server" << std::endl;
sst << "expected: ";
for (unsigned i = 0; i < size; ++i) {
sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]);
}
sst << std::endl;
sst << "received: ";
for (unsigned i = 0; i < size; ++i) {
sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]);
sst << (unsigned)inBuf[i] << " ";
}
sst << std::endl;
LOGE(sst.str());
unexpectedEchos++;
}
iteration++;
}
shutdown(sock, SHUT_RDWR);
// ShutdownFuture shut_f(sock, SHUT_RDWR);
CloseFuture cf(sock);
// cf.setDependency(shut_f);
cf.submit();
cf.wait();
}
};
Client** clients;
std::atomic<size_t> Client::client_ids = 0;
auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool {
size_t next_client_id = client_ids.fetch_add(1);
if (next_client_id >= nclients) {
return false;
}
spawn(
[&, id = next_client_id] {
clients[id] = new Client(id, readySem, startSem, cps, histogram);
if (histogram) {
clients[id]->run<true>();
} else {
clients[id]->run();
}
},
cps);
return true;
}
auto getOption(int argc, char** argv, const std::string& option) -> char* {
char** end = argv + argc;
char** itr = std::find(argv, end, option);
if (itr != end && ++itr != end) {
return *itr;
}
return nullptr;
}
static auto existsOption(int argc, char** argv, const std::string& option) -> bool {
char** end = argv + argc;
return std::find(argv, end, option) != end;
}
static void printUsage(char* name) {
std::cerr
<< "Usage: " << name
<< "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]"
<< std::endl
<< "[-f <output-file>] [-i <iterations> | -t <execution time in sec>] [--linked-futures]"
<< std::endl
<< "[--max-low-load-clients <num>] [--load-switch-period-ms <num>]" << std::endl;
}
auto main(int argc, char* argv[]) -> int {
if (existsOption(argc, argv, "-h")) {
printUsage(argv[0]);
exit(EXIT_FAILURE);
}
char* port_s = getOption(argc, argv, "-p");
if (port_s) {
port = std::string(port_s);
}
char* addr_s = getOption(argc, argv, "-a");
if (addr_s) {
host = std::string(addr_s);
}
char* client_s = getOption(argc, argv, "-c");
if (client_s) {
nclients = strtol(client_s, nullptr, DECIMAL);
}
char* iterations_s = getOption(argc, argv, "-i");
if (iterations_s) {
iterations = strtol(iterations_s, nullptr, DECIMAL);
}
char* time_s = getOption(argc, argv, "-t");
if (time_s) {
if (iterations_s) {
std::cerr << "-t and -i are mutual exclusive" << std::endl;
printUsage(argv[0]);
exit(EXIT_FAILURE);
}
execution_seconds = strtol(time_s, nullptr, DECIMAL);
}
char* size_s = getOption(argc, argv, "-s");
if (size_s) {
size = strtol(size_s, nullptr, DECIMAL);
}
char* server_backlog_s = getOption(argc, argv, "-b");
if (server_backlog_s) {
server_backlog = strtol(server_backlog_s, nullptr, DECIMAL);
}
linked_futures = getOption(argc, argv, "--linked-futures");
int out_fd = STDOUT_FILENO;
char* output_file = getOption(argc, argv, "-f");
if (output_file) {
if (access(output_file, W_OK) == 0) {
DIE_MSG("Output file: " << output_file << " already exists");
}
out_fd = open(output_file, OPENFLAGS, OPENMODE);
if (out_fd < 0) {
DIE_MSG_ERRNO("opening output file failed");
}
}
int histogram_fd;
char* histogram_file = getOption(argc, argv, "--histogram");
if (histogram_file) {
if (!iterations_s) {
DIE_MSG("histograms are currently only possible with fixed iterations");
}
histogram = true;
if (access(histogram_file, W_OK) == 0) {
DIE_MSG("Histogram file: " << histogram_file << " already exists");
}
histogram_fd = open(histogram_file, OPENFLAGS, OPENMODE);
if (histogram_fd < 0) {
DIE_MSG_ERRNO("opening histogram file failed");
}
}
char* max_low_load_clients = getOption(argc, argv, "--max-low-load-clients");
if (max_low_load_clients) {
maxLowLoadClients = std::stoi(max_low_load_clients);
if (static_cast<size_t>(maxLowLoadClients) >= nclients) {
DIE_MSG("--max-low-load-clients " << maxLowLoadClients
<< " must be smaller than the total number of clients "
<< nclients);
}
}
char* load_switch_period_ms_string = getOption(argc, argv, "--load-switch-period-ms");
if (load_switch_period_ms_string) {
if (!max_low_load_clients) {
DIE_MSG("Can't use --load-switch-period-ms without --max-low-load-clients");
}
int load_switch_period_ms = std::stoi(load_switch_period_ms_string);
loadSwitchPeriod = std::chrono::milliseconds(load_switch_period_ms);
}
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
if (err) {
if (err == EAI_SYSTEM) {
DIE_MSG_ERRNO("getaddrinfo failed");
} else {
LOGE("error in getaddrinfo: " << gai_strerror(err));
exit(EXIT_FAILURE);
}
}
Runtime runtime;
Fiber* alphaFiber = Fiber::from([&] {
clients = new Client*[nclients];
CPS cps;
Semaphore readySemaphore;
Semaphore startSemaphore;
std::thread terminator;
auto connect_start = high_resolution_clock::now();
// start first client batch
for (size_t i = 0; i < server_backlog; ++i) {
// Start new clients until we have started all or the amount
// of parallel connecting clients is reached
if (!Client::startNew(readySemaphore, startSemaphore, cps)) {
break;
}
}
// await the clients
for (size_t i = 0; i < nclients; ++i) {
readySemaphore.acquire();
}
if (execution_seconds) {
iterations = SIZE_MAX;
terminator = std::thread([] {
std::this_thread::sleep_for(seconds(execution_seconds));
terminate.store(true);
});
}
auto echo_start = high_resolution_clock::now();
// start the clients
for (size_t i = 0; i < nclients; ++i) {
startSemaphore.release();
}
// await client termination
cps.wait();
if (execution_seconds) {
terminator.join();
}
auto echo_end = high_resolution_clock::now();
auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count();
auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count();
auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count();
uint64_t total_iterations = 0;
uint64_t total_reconnects = 0;
uint64_t total_unexpected_echos = 0;
RunningAverage avg_ns;
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
total_iterations += client->iteration;
total_reconnects += client->reconnects;
total_unexpected_echos += client->unexpectedEchos;
if (!histogram) {
avg_ns.update(client->avg_ns.getAverage());
}
}
std::stringstream sst;
sst << "[global]" << std::endl;
sst << "clients = " << nclients << std::endl;
sst << "size = " << size << std::endl;
sst << "total_iterations = " << total_iterations << std::endl;
sst << "total_reconnects = " << total_reconnects << std::endl;
sst << "total_unexpected_echos = " << total_unexpected_echos << std::endl;
sst << "avg_ns = " << avg_ns.getAverage() << std::endl;
sst << "connect_duration = " << connect_duration << std::endl;
sst << "echo_duration = " << echo_duration << std::endl;
sst << "total_duration = " << total_duration << std::endl;
sst << std::endl;
sst << "[clients]" << std::endl;
sst << "csv =" << std::endl;
sst << " iterations,avg_ns,reconnects,unexpected_echos" << std::endl;
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
sst << " " << client->iteration << ",";
sst << client->avg_ns.getAverage() << ",";
sst << client->reconnects << ",";
sst << client->unexpectedEchos << std::endl;
}
auto output = sst.str();
if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) {
DIE_MSG_ERRNO("writing results failed");
}
if (output_file) {
emper::io::closeAndForget(out_fd);
}
if (histogram) {
std::string histogramDataHeader =
"total_latency,after_send_latency,after_send_dispatch_latency\n";
if (emper::io::writeFileAndWait(histogram_fd, histogramDataHeader.c_str(),
histogramDataHeader.size()) < 0) {
DIE_MSG_ERRNO("writing histogram data header failed");
}
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
std::stringstream sst;
for (size_t iteration = 0; iteration < iterations; ++iteration) {
auto timeStamps = client->timeStamps[iteration];
nanoseconds totalLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.start);
nanoseconds afterSendLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSend);
nanoseconds afterSendDispatchLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSendDispatch);
sst << totalLatency.count() << ",";
sst << afterSendLatency.count() << ",";
sst << afterSendDispatchLatency.count() << std::endl;
}
auto histoOutput = sst.str();
if (emper::io::writeFileAndWait(histogram_fd, histoOutput.c_str(), histoOutput.size()) <
0) {
DIE_MSG_ERRNO("writing histogram data failed");
}
}
emper::io::closeAndForget(histogram_fd);
}
for (size_t i = 0; i < nclients; ++i) {
delete clients[i];
}
delete[] clients;
exit(EXIT_SUCCESS);
});
runtime.scheduleFromAnywhere(*alphaFiber);
runtime.waitUntilFinished();
return EXIT_FAILURE;
}