From ae81b0290afae01f82c689facca0ce88a20184bd Mon Sep 17 00:00:00 2001 From: Florian Fischer <florian.fl.fischer@fau.de> Date: Tue, 16 Mar 2021 16:47:03 +0100 Subject: [PATCH] [EchoClient] make linked futures optional While we haven't figured out why the version using linked futures breaks we can use a simpler implementation which calls submitAndWait on both futures separately --- apps/EchoClient.cpp | 120 ++++++++++++++++++++++++++++---------------- 1 file changed, 77 insertions(+), 43 deletions(-) diff --git a/apps/EchoClient.cpp b/apps/EchoClient.cpp index c872e49f..a7e5ffeb 100644 --- a/apps/EchoClient.cpp +++ b/apps/EchoClient.cpp @@ -9,7 +9,6 @@ #include <unistd.h> #include <algorithm> // for find -#include <array> #include <atomic> #include <cerrno> // for errno, ECANCELED #include <chrono> // for nanoseconds, duration, durat... @@ -57,6 +56,7 @@ size_t execution_seconds = 0; size_t clients = CLIENTS; size_t size = SIZE; size_t server_backlog = SERVER_BACKLOG; +bool linked_futures; std::atomic<bool> terminate = false; @@ -72,6 +72,60 @@ struct ClientResult* clientResults; struct addrinfo* server; +static auto clientIteration(SendFuture& sendFuture, RecvFuture& recvFuture) -> nanoseconds { + auto send_start = high_resolution_clock::now(); + int32_t bytes_send = sendFuture.submitAndWait(); + auto send_end = high_resolution_clock::now(); + + if (bytes_send < 0) { + errno = -bytes_send; + DIE_MSG_ERRNO("send failed"); + } + + auto recv_start = high_resolution_clock::now(); + int32_t bytes_recv = recvFuture.submitAndWait(); + auto recv_end = high_resolution_clock::now(); + + if (bytes_recv < 0) { + errno = -bytes_recv; + DIE_MSG_ERRNO("recv failed"); + } + + return duration_cast<nanoseconds>((send_end - send_start) + (recv_end - recv_start)); +} + +static auto clientIterationLinked(SendFuture& sendFuture, RecvFuture& recvFuture) -> nanoseconds { + recvFuture.setDependency(sendFuture); + + auto start = high_resolution_clock::now(); + int32_t res = recvFuture.submitAndWait(); + auto end = high_resolution_clock::now(); + + if (res <= 0) { + // recv failed + if (res != -ECANCELED) { + if (res == 0) { + DIE_MSG("server closed the connection"); + } + + errno = -res; + DIE_MSG_ERRNO("receive failed"); + } + + // send failed + int32_t send_res = sendFuture.wait(); + if (send_res < 0) { + errno = -send_res; + DIE_MSG_ERRNO("send failed"); + } + + // send must send less than expected because recvFuture returned -ECANCELED + DIE_MSG("short send" << send_res); + } + + return duration_cast<nanoseconds>(end - start); +} + static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& startSem, CPS& cps) { int client_sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol); if (client_sock < 0) { @@ -94,9 +148,8 @@ static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& start DIE_MSG_ERRNO("setsockopt failed"); } - std::array<char*, 2> buf; - buf[0] = new char[size]; - buf[1] = new char[size]; + auto* outBuf = new char[size]; + auto* inBuf = new char[size]; uint64_t avg_ns = 0; readySem.release(); @@ -104,43 +157,17 @@ static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& start size_t i = 0; while (i < iterations && !terminate.load(std::memory_order_relaxed)) { - char* outBuf = buf[i % 2]; - char* inBuf = buf[(i + 1) % 2]; - SendFuture sendFuture(client_sock, outBuf, size, MSG_NOSIGNAL); RecvFuture recvFuture(client_sock, inBuf, size, MSG_WAITALL); - recvFuture.setDependency(sendFuture); - - auto start = high_resolution_clock::now(); - int32_t res = recvFuture.submitAndWait(); - auto end = high_resolution_clock::now(); - - if (res <= 0) { - // recv failed - if (res != -ECANCELED) { - if (res == 0) { - DIE_MSG("server closed the connection"); - } - - errno = -res; - DIE_MSG_ERRNO("receive failed"); - } - - // send failed - int32_t send_res = sendFuture.wait(); - if (send_res < 0) { - errno = -send_res; - DIE_MSG_ERRNO("send failed"); - } - - DIE_MSG("short send" << send_res); - } - if (memcmp(outBuf, inBuf, size) != 0) { - DIE_MSG("got unexpected echo from server"); + nanoseconds duration; + if (linked_futures) { + duration = clientIterationLinked(sendFuture, recvFuture); + } else { + duration = clientIteration(sendFuture, recvFuture); } - auto duration = duration_cast<nanoseconds>(end - start); + // TODO: replace with emper::math::RunningAverage when !127 is merged uint64_t ns = duration.count(); if (unlikely(avg_ns == 0)) { avg_ns = ns; @@ -149,6 +176,10 @@ static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& start avg_ns /= 2; } + if (memcmp(outBuf, inBuf, size) != 0) { + DIE_MSG("got unexpected echo from server"); + } + i++; } @@ -160,8 +191,8 @@ static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& start // cf.setDependency(shut_f); cf.submit(); - delete[] buf[0]; - delete[] buf[1]; + delete[] inBuf; + delete[] outBuf; cf.wait(); @@ -186,10 +217,11 @@ static auto existsOption(int argc, char** argv, const std::string& option) -> bo } static void printUsage(char* name) { - std::cerr << "Usage: " << name - << "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]" - " [-f <output-file>] [-i <iterations> | -t <execution time in sec>]" - << std::endl; + std::cerr + << "Usage: " << name + << "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]" + " [-f <output-file>] [-i <iterations> | -t <execution time in sec>] [--linked-futures]" + << std::endl; } auto main(int argc, char* argv[]) -> int { @@ -238,6 +270,8 @@ auto main(int argc, char* argv[]) -> int { server_backlog = strtol(server_backlog_s, nullptr, DECIMAL); } + linked_futures = getOption(argc, argv, "--linked-futures"); + int out_fd = STDOUT_FILENO; char* output_file = getOption(argc, argv, "-f"); @@ -293,7 +327,7 @@ auto main(int argc, char* argv[]) -> int { startSemaphore.release(); } - // await the clients + // await client termination cps.wait(); if (execution_seconds) { -- GitLab