diff --git a/apps/EchoClient.cpp b/apps/EchoClient.cpp index 3712671ca09148b3c1e51770ac3c09a542cfd519..a7e5ffebac7c6e92fde5aa32f87a27bd59fdc730 100644 --- a/apps/EchoClient.cpp +++ b/apps/EchoClient.cpp @@ -9,7 +9,6 @@ #include <unistd.h> #include <algorithm> // for find -#include <array> #include <atomic> #include <cerrno> // for errno, ECANCELED #include <chrono> // for nanoseconds, duration, durat... @@ -18,6 +17,7 @@ #include <cstring> // for memcmp #include <iostream> // for operator<<, basic_ostream, endl #include <string> // for allocator, string, char_traits +#include <thread> #include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli... #include "CountingPrivateSemaphore.hpp" // for CPS @@ -35,7 +35,9 @@ using emper::io::RecvFuture; using emper::io::SendFuture; using std::chrono::duration_cast; +using std::chrono::high_resolution_clock; using std::chrono::nanoseconds; +using std::chrono::seconds; // Defaults const int DECIMAL = 10; @@ -54,6 +56,7 @@ size_t execution_seconds = 0; size_t clients = CLIENTS; size_t size = SIZE; size_t server_backlog = SERVER_BACKLOG; +bool linked_futures; std::atomic<bool> terminate = false; @@ -69,6 +72,60 @@ struct ClientResult* clientResults; struct addrinfo* server; +static auto clientIteration(SendFuture& sendFuture, RecvFuture& recvFuture) -> nanoseconds { + auto send_start = high_resolution_clock::now(); + int32_t bytes_send = sendFuture.submitAndWait(); + auto send_end = high_resolution_clock::now(); + + if (bytes_send < 0) { + errno = -bytes_send; + DIE_MSG_ERRNO("send failed"); + } + + auto recv_start = high_resolution_clock::now(); + int32_t bytes_recv = recvFuture.submitAndWait(); + auto recv_end = high_resolution_clock::now(); + + if (bytes_recv < 0) { + errno = -bytes_recv; + DIE_MSG_ERRNO("recv failed"); + } + + return duration_cast<nanoseconds>((send_end - send_start) + (recv_end - recv_start)); +} + +static auto clientIterationLinked(SendFuture& sendFuture, RecvFuture& recvFuture) -> nanoseconds { + recvFuture.setDependency(sendFuture); + + auto start = high_resolution_clock::now(); + int32_t res = recvFuture.submitAndWait(); + auto end = high_resolution_clock::now(); + + if (res <= 0) { + // recv failed + if (res != -ECANCELED) { + if (res == 0) { + DIE_MSG("server closed the connection"); + } + + errno = -res; + DIE_MSG_ERRNO("receive failed"); + } + + // send failed + int32_t send_res = sendFuture.wait(); + if (send_res < 0) { + errno = -send_res; + DIE_MSG_ERRNO("send failed"); + } + + // send must send less than expected because recvFuture returned -ECANCELED + DIE_MSG("short send" << send_res); + } + + return duration_cast<nanoseconds>(end - start); +} + static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& startSem, CPS& cps) { int client_sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol); if (client_sock < 0) { @@ -91,9 +148,8 @@ static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& start DIE_MSG_ERRNO("setsockopt failed"); } - std::array<char*, 2> buf; - buf[0] = new char[size]; - buf[1] = new char[size]; + auto* outBuf = new char[size]; + auto* inBuf = new char[size]; uint64_t avg_ns = 0; readySem.release(); @@ -101,43 +157,17 @@ static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& start size_t i = 0; while (i < iterations && !terminate.load(std::memory_order_relaxed)) { - char* outBuf = buf[i % 2]; - char* inBuf = buf[(i + 1) % 2]; - SendFuture sendFuture(client_sock, outBuf, size, MSG_NOSIGNAL); RecvFuture recvFuture(client_sock, inBuf, size, MSG_WAITALL); - recvFuture.setDependency(sendFuture); - auto start = std::chrono::high_resolution_clock::now(); - int32_t res = recvFuture.submitAndWait(); - auto end = std::chrono::high_resolution_clock::now(); - - if (res <= 0) { - // recv failed - if (res != -ECANCELED) { - if (res == 0) { - DIE_MSG("server closed the connection"); - } - - errno = -res; - DIE_MSG_ERRNO("receive failed"); - } - - // send failed - int32_t send_res = sendFuture.wait(); - if (send_res < 0) { - errno = -send_res; - DIE_MSG_ERRNO("send failed"); - } - - DIE_MSG("short send" << send_res); - } - - if (memcmp(outBuf, inBuf, size) != 0) { - DIE_MSG("got unexpected echo from server"); + nanoseconds duration; + if (linked_futures) { + duration = clientIterationLinked(sendFuture, recvFuture); + } else { + duration = clientIteration(sendFuture, recvFuture); } - auto duration = duration_cast<nanoseconds>(end - start); + // TODO: replace with emper::math::RunningAverage when !127 is merged uint64_t ns = duration.count(); if (unlikely(avg_ns == 0)) { avg_ns = ns; @@ -146,6 +176,10 @@ static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& start avg_ns /= 2; } + if (memcmp(outBuf, inBuf, size) != 0) { + DIE_MSG("got unexpected echo from server"); + } + i++; } @@ -157,8 +191,8 @@ static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& start // cf.setDependency(shut_f); cf.submit(); - delete[] buf[0]; - delete[] buf[1]; + delete[] inBuf; + delete[] outBuf; cf.wait(); @@ -183,10 +217,11 @@ static auto existsOption(int argc, char** argv, const std::string& option) -> bo } static void printUsage(char* name) { - std::cerr << "Usage: " << name - << "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]" - " [-f <output-file>] [-i <iterations> | -t <execution time in sec>]" - << std::endl; + std::cerr + << "Usage: " << name + << "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]" + " [-f <output-file>] [-i <iterations> | -t <execution time in sec>] [--linked-futures]" + << std::endl; } auto main(int argc, char* argv[]) -> int { @@ -235,6 +270,8 @@ auto main(int argc, char* argv[]) -> int { server_backlog = strtol(server_backlog_s, nullptr, DECIMAL); } + linked_futures = getOption(argc, argv, "--linked-futures"); + int out_fd = STDOUT_FILENO; char* output_file = getOption(argc, argv, "-f"); @@ -255,8 +292,9 @@ auto main(int argc, char* argv[]) -> int { CPS cps; Semaphore readySemaphore; Semaphore startSemaphore; + std::thread terminator; - auto connect_start = std::chrono::high_resolution_clock::now(); + auto connect_start = high_resolution_clock::now(); // start first client batch for (size_t i = 0; i < server_backlog; ++i) { @@ -276,23 +314,27 @@ auto main(int argc, char* argv[]) -> int { if (execution_seconds) { iterations = SIZE_MAX; - async([] { - sleep(execution_seconds); - terminate.store(true, std::memory_order_relaxed); + terminator = std::thread([] { + std::this_thread::sleep_for(seconds(execution_seconds)); + terminate.store(true); }); } - auto echo_start = std::chrono::high_resolution_clock::now(); + auto echo_start = high_resolution_clock::now(); // start the clients for (size_t i = 0; i < clients; ++i) { startSemaphore.release(); } - // await the clients + // await client termination cps.wait(); - auto echo_end = std::chrono::high_resolution_clock::now(); + if (execution_seconds) { + terminator.join(); + } + + auto echo_end = high_resolution_clock::now(); uint64_t avg_ns = clientResults[0].avg_ns; uint64_t avg_iterations = clientResults[0].iterations;