diff --git a/apps/EchoClient.cpp b/apps/EchoClient.cpp index 72c70e057e0a3252d1d0d3a34c1eb3fcaf128f20..1a4773489cec3bc20baf992ffb19abc4b12970e2 100644 --- a/apps/EchoClient.cpp +++ b/apps/EchoClient.cpp @@ -56,6 +56,9 @@ const size_t SERVER_BACKLOG = 1024; const seconds ONERROR_SLEEP(10); +const int OPENFLAGS = O_CREAT | O_WRONLY; +const int OPENMODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH; + // Globals std::string host = HOST; std::string port = PORT; @@ -65,11 +68,19 @@ size_t nclients = CLIENTS; size_t size = SIZE; size_t server_backlog = SERVER_BACKLOG; bool linked_futures; +bool histogram = false; std::atomic<bool> terminate = false; struct addrinfo* server; +struct TimeStamps { + high_resolution_clock::time_point start; + high_resolution_clock::time_point afterSend; + high_resolution_clock::time_point afterSendDispatch; + high_resolution_clock::time_point afterRecv; +}; + class Client { using IterationResult = std::pair<int, std::string>; @@ -93,17 +104,44 @@ class Client { size_t unexpectedEchos = 0; size_t reconnects = 0; - Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps) + struct TimeStamps* timeStamps = nullptr; + + Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps, bool collectTimeStamps) : id(id), readySem(readySem), startSem(startSem), cps(cps) { outBuf = new char[size]; inBuf = new char[size]; + + if (collectTimeStamps) { + timeStamps = new TimeStamps[iterations]; + } + } + + ~Client() { + delete[] outBuf; + delete[] inBuf; + delete[] timeStamps; } static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool; private: - static auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult { - int32_t bytes_send = sendFuture.submitAndWait(); + template <bool collectTimeStamps = false> + auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult { + if constexpr (collectTimeStamps) { + timeStamps[iteration].start = high_resolution_clock::now(); + } + + sendFuture.submit(); + + if constexpr (collectTimeStamps) { + timeStamps[iteration].afterSend = high_resolution_clock::now(); + } + + int32_t bytes_send = sendFuture.wait(); + + if constexpr (collectTimeStamps) { + timeStamps[iteration].afterSendDispatch = high_resolution_clock::now(); + } if (unlikely(bytes_send < 0)) { return {bytes_send, "send failed"}; @@ -111,6 +149,10 @@ class Client { int32_t bytes_recv = recvFuture.submitAndWait(); + if constexpr (collectTimeStamps) { + timeStamps[iteration].afterRecv = high_resolution_clock::now(); + } + if (unlikely(bytes_recv < 0)) { return {bytes_recv, "recv failed"}; } @@ -118,10 +160,22 @@ class Client { return {bytes_recv, ""}; } - static auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult { + template <bool collectTimeStamps = false> + auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult { recvFuture.setDependency(sendFuture); - int32_t res = recvFuture.submitAndWait(); + recvFuture.submit(); + + if constexpr (collectTimeStamps) { + timeStamps[iteration].afterSend = high_resolution_clock::now(); + } + + int32_t res = recvFuture.wait(); + + if constexpr (collectTimeStamps) { + timeStamps[iteration].afterSendDispatch = timeStamps[iteration].afterRecv = + high_resolution_clock::now(); + } if (unlikely(res <= 0)) { // recv failed @@ -172,6 +226,7 @@ class Client { } public: + template <bool collectTimeStamps = false> void run() { connect(); @@ -188,14 +243,22 @@ class Client { // prepare output buf sprintf(outBuf, "%lu:%lu", id, iteration); + high_resolution_clock::time_point start; + if constexpr (!collectTimeStamps) { + start = high_resolution_clock::now(); + } + IterationResult iterationResult; - auto start = high_resolution_clock::now(); if (linked_futures) { - iterationResult = linkedEcho(sendFuture, recvFuture); + iterationResult = linkedEcho<collectTimeStamps>(sendFuture, recvFuture); } else { - iterationResult = echo(sendFuture, recvFuture); + iterationResult = echo<collectTimeStamps>(sendFuture, recvFuture); + } + + high_resolution_clock::time_point end; + if constexpr (!collectTimeStamps) { + end = high_resolution_clock::now(); } - auto end = high_resolution_clock::now(); // less than 1 byte was received -> an error occurred // (connection closed by server is considered an error) @@ -216,9 +279,10 @@ class Client { DIE; } - nanoseconds duration = duration_cast<nanoseconds>(end - start); - - avg_ns.update(duration.count()); + if constexpr (!collectTimeStamps) { + nanoseconds duration = duration_cast<nanoseconds>(end - start); + avg_ns.update(duration.count()); + } if (memcmp(outBuf, inBuf, size) != 0) { std::stringstream sst; @@ -249,9 +313,6 @@ class Client { // cf.setDependency(shut_f); cf.submit(); - delete[] inBuf; - delete[] outBuf; - cf.wait(); } }; @@ -268,8 +329,13 @@ auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> boo spawn( [&, id = next_client_id] { - clients[id] = new Client(id, readySem, startSem, cps); - clients[id]->run(); + clients[id] = new Client(id, readySem, startSem, cps, histogram); + + if (histogram) { + clients[id]->run<true>(); + } else { + clients[id]->run(); + } }, cps); return true; @@ -350,6 +416,36 @@ auto main(int argc, char* argv[]) -> int { int out_fd = STDOUT_FILENO; char* output_file = getOption(argc, argv, "-f"); + if (output_file) { + if (access(output_file, W_OK) == 0) { + DIE_MSG("Output file: " << output_file << " already exists"); + } + + out_fd = open(output_file, OPENFLAGS, OPENMODE); + if (out_fd < 0) { + DIE_MSG_ERRNO("opening output file failed"); + } + } + + int histogram_fd; + char* histogram_file = getOption(argc, argv, "--histogram"); + if (histogram_file) { + if (!iterations_s) { + DIE_MSG("histograms are currently only possible with fixed iterations"); + } + + histogram = true; + + if (access(histogram_file, W_OK) == 0) { + DIE_MSG("Histogram file: " << histogram_file << " already exists"); + } + + histogram_fd = open(histogram_file, OPENFLAGS, OPENMODE); + if (histogram_fd < 0) { + DIE_MSG_ERRNO("opening histogram file failed"); + } + } + int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server); if (err) { if (err == EAI_SYSTEM) { @@ -413,20 +509,6 @@ auto main(int argc, char* argv[]) -> int { auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count(); auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count(); - if (output_file) { - if (access(output_file, W_OK) == 0) { - DIE_MSG("Output file: " << output_file << " already exists"); - } - - int openflags = O_CREAT | O_WRONLY; - int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH; - - out_fd = emper::io::openAndWait(output_file, openflags, mode); - if (out_fd < 0) { - DIE_MSG_ERRNO("opening output file failed"); - } - } - uint64_t total_iterations = 0; uint64_t total_reconnects = 0; uint64_t total_unexpected_echos = 0; @@ -436,7 +518,9 @@ auto main(int argc, char* argv[]) -> int { total_iterations += client->iteration; total_reconnects += client->reconnects; total_unexpected_echos += client->unexpectedEchos; - avg_ns.update(client->avg_ns.getAverage()); + if (!histogram) { + avg_ns.update(client->avg_ns.getAverage()); + } } std::stringstream sst; @@ -472,6 +556,40 @@ auto main(int argc, char* argv[]) -> int { emper::io::closeAndForget(out_fd); } + if (histogram) { + std::string histogramDataHeader = + "total_latency,after_send_latency,after_send_dispatch_latency\n"; + if (emper::io::writeFileAndWait(histogram_fd, histogramDataHeader.c_str(), + histogramDataHeader.size()) < 0) { + DIE_MSG_ERRNO("writing histogram data header failed"); + } + + for (size_t i = 0; i < nclients; ++i) { + auto* client = clients[i]; + std::stringstream sst; + for (size_t iteration = 0; iteration < iterations; ++iteration) { + auto timeStamps = client->timeStamps[iteration]; + nanoseconds totalLatency = + duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.start); + nanoseconds afterSendLatency = + duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSend); + nanoseconds afterSendDispatchLatency = + duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSendDispatch); + sst << totalLatency.count() << ","; + sst << afterSendLatency.count() << ","; + sst << afterSendDispatchLatency.count() << std::endl; + } + + auto histoOutput = sst.str(); + if (emper::io::writeFileAndWait(histogram_fd, histoOutput.c_str(), histoOutput.size()) < + 0) { + DIE_MSG_ERRNO("writing histogram data failed"); + } + } + + emper::io::closeAndForget(histogram_fd); + } + for (size_t i = 0; i < nclients; ++i) { delete clients[i]; }