Skip to content
Snippets Groups Projects

[EchoClient] support latency histograms

Merged Florian Fischer requested to merge aj46ezos/emper:echo_client_histo into master
1 file
+ 150
32
Compare changes
  • Side-by-side
  • Inline
+ 150
32
@@ -56,6 +56,9 @@ const size_t SERVER_BACKLOG = 1024;
const seconds ONERROR_SLEEP(10);
const int OPENFLAGS = O_CREAT | O_WRONLY;
const int OPENMODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
// Globals
std::string host = HOST;
std::string port = PORT;
@@ -65,11 +68,19 @@ size_t nclients = CLIENTS;
size_t size = SIZE;
size_t server_backlog = SERVER_BACKLOG;
bool linked_futures;
bool histogram = false;
std::atomic<bool> terminate = false;
struct addrinfo* server;
struct TimeStamps {
high_resolution_clock::time_point start;
high_resolution_clock::time_point afterSend;
high_resolution_clock::time_point afterSendDispatch;
high_resolution_clock::time_point afterRecv;
};
class Client {
using IterationResult = std::pair<int, std::string>;
@@ -93,17 +104,44 @@ class Client {
size_t unexpectedEchos = 0;
size_t reconnects = 0;
Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps)
struct TimeStamps* timeStamps = nullptr;
Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps, bool collectTimeStamps)
: id(id), readySem(readySem), startSem(startSem), cps(cps) {
outBuf = new char[size];
inBuf = new char[size];
if (collectTimeStamps) {
timeStamps = new TimeStamps[iterations];
}
}
~Client() {
delete[] outBuf;
delete[] inBuf;
delete[] timeStamps;
}
static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool;
private:
static auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
int32_t bytes_send = sendFuture.submitAndWait();
template <bool collectTimeStamps = false>
auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
if constexpr (collectTimeStamps) {
timeStamps[iteration].start = high_resolution_clock::now();
}
sendFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
int32_t bytes_send = sendFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = high_resolution_clock::now();
}
if (unlikely(bytes_send < 0)) {
return {bytes_send, "send failed"};
@@ -111,6 +149,10 @@ class Client {
int32_t bytes_recv = recvFuture.submitAndWait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterRecv = high_resolution_clock::now();
}
if (unlikely(bytes_recv < 0)) {
return {bytes_recv, "recv failed"};
}
@@ -118,10 +160,22 @@ class Client {
return {bytes_recv, ""};
}
static auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
template <bool collectTimeStamps = false>
auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
recvFuture.setDependency(sendFuture);
int32_t res = recvFuture.submitAndWait();
recvFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
int32_t res = recvFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = timeStamps[iteration].afterRecv =
high_resolution_clock::now();
}
if (unlikely(res <= 0)) {
// recv failed
@@ -172,6 +226,7 @@ class Client {
}
public:
template <bool collectTimeStamps = false>
void run() {
connect();
@@ -188,14 +243,22 @@ class Client {
// prepare output buf
sprintf(outBuf, "%lu:%lu", id, iteration);
high_resolution_clock::time_point start;
if constexpr (!collectTimeStamps) {
start = high_resolution_clock::now();
}
IterationResult iterationResult;
auto start = high_resolution_clock::now();
if (linked_futures) {
iterationResult = linkedEcho(sendFuture, recvFuture);
iterationResult = linkedEcho<collectTimeStamps>(sendFuture, recvFuture);
} else {
iterationResult = echo(sendFuture, recvFuture);
iterationResult = echo<collectTimeStamps>(sendFuture, recvFuture);
}
high_resolution_clock::time_point end;
if constexpr (!collectTimeStamps) {
end = high_resolution_clock::now();
}
auto end = high_resolution_clock::now();
// less than 1 byte was received -> an error occurred
// (connection closed by server is considered an error)
@@ -216,9 +279,10 @@ class Client {
DIE;
}
nanoseconds duration = duration_cast<nanoseconds>(end - start);
avg_ns.update(duration.count());
if constexpr (!collectTimeStamps) {
nanoseconds duration = duration_cast<nanoseconds>(end - start);
avg_ns.update(duration.count());
}
if (memcmp(outBuf, inBuf, size) != 0) {
std::stringstream sst;
@@ -249,9 +313,6 @@ class Client {
// cf.setDependency(shut_f);
cf.submit();
delete[] inBuf;
delete[] outBuf;
cf.wait();
}
};
@@ -268,8 +329,13 @@ auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> boo
spawn(
[&, id = next_client_id] {
clients[id] = new Client(id, readySem, startSem, cps);
clients[id]->run();
clients[id] = new Client(id, readySem, startSem, cps, histogram);
if (histogram) {
clients[id]->run<true>();
} else {
clients[id]->run();
}
},
cps);
return true;
@@ -350,6 +416,36 @@ auto main(int argc, char* argv[]) -> int {
int out_fd = STDOUT_FILENO;
char* output_file = getOption(argc, argv, "-f");
if (output_file) {
if (access(output_file, W_OK) == 0) {
DIE_MSG("Output file: " << output_file << " already exists");
}
out_fd = open(output_file, OPENFLAGS, OPENMODE);
if (out_fd < 0) {
DIE_MSG_ERRNO("opening output file failed");
}
}
int histogram_fd;
char* histogram_file = getOption(argc, argv, "--histogram");
if (histogram_file) {
if (!iterations_s) {
DIE_MSG("histograms are currently only possible with fixed iterations");
}
histogram = true;
if (access(histogram_file, W_OK) == 0) {
DIE_MSG("Histogram file: " << histogram_file << " already exists");
}
histogram_fd = open(histogram_file, OPENFLAGS, OPENMODE);
if (histogram_fd < 0) {
DIE_MSG_ERRNO("opening histogram file failed");
}
}
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
if (err) {
if (err == EAI_SYSTEM) {
@@ -413,20 +509,6 @@ auto main(int argc, char* argv[]) -> int {
auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count();
auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count();
if (output_file) {
if (access(output_file, W_OK) == 0) {
DIE_MSG("Output file: " << output_file << " already exists");
}
int openflags = O_CREAT | O_WRONLY;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
out_fd = emper::io::openAndWait(output_file, openflags, mode);
if (out_fd < 0) {
DIE_MSG_ERRNO("opening output file failed");
}
}
uint64_t total_iterations = 0;
uint64_t total_reconnects = 0;
uint64_t total_unexpected_echos = 0;
@@ -436,7 +518,9 @@ auto main(int argc, char* argv[]) -> int {
total_iterations += client->iteration;
total_reconnects += client->reconnects;
total_unexpected_echos += client->unexpectedEchos;
avg_ns.update(client->avg_ns.getAverage());
if (!histogram) {
avg_ns.update(client->avg_ns.getAverage());
}
}
std::stringstream sst;
@@ -472,6 +556,40 @@ auto main(int argc, char* argv[]) -> int {
emper::io::closeAndForget(out_fd);
}
if (histogram) {
std::string histogramDataHeader =
"total_latency,after_send_latency,after_send_dispatch_latency\n";
if (emper::io::writeFileAndWait(histogram_fd, histogramDataHeader.c_str(),
histogramDataHeader.size()) < 0) {
DIE_MSG_ERRNO("writing histogram data header failed");
}
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
std::stringstream sst;
for (size_t iteration = 0; iteration < iterations; ++iteration) {
auto timeStamps = client->timeStamps[iteration];
nanoseconds totalLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.start);
nanoseconds afterSendLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSend);
nanoseconds afterSendDispatchLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSendDispatch);
sst << totalLatency.count() << ",";
sst << afterSendLatency.count() << ",";
sst << afterSendDispatchLatency.count() << std::endl;
}
auto histoOutput = sst.str();
if (emper::io::writeFileAndWait(histogram_fd, histoOutput.c_str(), histoOutput.size()) <
0) {
DIE_MSG_ERRNO("writing histogram data failed");
}
}
emper::io::closeAndForget(histogram_fd);
}
for (size_t i = 0; i < nclients; ++i) {
delete clients[i];
}
Loading