diff --git a/apps/EchoClient.cpp b/apps/EchoClient.cpp index f8322776ab068c51e8c7c3092e2cde8053325c54..71c409de0a5d528d244abfd6ac16ff87f0a74d23 100644 --- a/apps/EchoClient.cpp +++ b/apps/EchoClient.cpp @@ -65,10 +65,10 @@ std::string host = HOST; std::string port = PORT; size_t iterations = ITERATIONS; size_t execution_seconds = 0; +TimeoutWrapper::Timespec* timeout = nullptr; size_t nclients = CLIENTS; size_t size = SIZE; size_t server_backlog = SERVER_BACKLOG; -bool linked_futures; bool histogram = false; bool sendQuit = true; @@ -130,6 +130,7 @@ class Client { RunningAverage<double> avg_ns; size_t unexpectedEchos = 0; size_t reconnects = 0; + bool timedOut = false; struct TimeStamps* timeStamps = nullptr; high_resolution_clock::time_point echoStart; @@ -229,103 +230,97 @@ class Client { } } - enum class CollectTimeStamps { yes, no }; + void prepeareOutBuf() { + // prepare output buf + sprintf(outBuf, "%lu:%lu", id, iteration); + } - enum class LinkedFutures { yes, no }; + enum class CollectTimeStamps { yes, no }; + enum class UseTimeout { yes, no }; - template <CollectTimeStamps collectTimeStampsSwitch, LinkedFutures linkedFuturesSwitch> - void _run() { + template <CollectTimeStamps collectTimeStampsSwitch> + void storeTimestamp(high_resolution_clock::time_point* mem, + high_resolution_clock::time_point& time) { constexpr bool collectTimeStamps = collectTimeStampsSwitch == CollectTimeStamps::yes; - constexpr bool linkedFutures = linkedFuturesSwitch == LinkedFutures::yes; - connect(); + if constexpr (collectTimeStamps) { + *mem = time; + } + } - // signal that this client is initialized and connected - readySem.release(); + template <CollectTimeStamps collectTimeStampsSwitch> + auto echo() -> bool { + SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL); + RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL); - // wait for all clients to be ready - startSem.acquire(); + setState(ClientState::unknown); + recvFuture.setDependency(sendFuture); - while (!shouldTerminate()) { - SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL); - RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL); + prepeareOutBuf(); - // prepare output buf - sprintf(outBuf, "%lu:%lu", id, iteration); + echoStart = high_resolution_clock::now(); + storeTimestamp<collectTimeStampsSwitch>(&timeStamps[iteration].start, echoStart); - echoStart = high_resolution_clock::now(); - if constexpr (collectTimeStamps) { - timeStamps[iteration].start = echoStart; + int32_t bytes_recv = 0; + if (timeout) { + TimeoutWrapper t(recvFuture, *timeout); + t.submit(); + + int32_t res = t.wait(); + if (unlikely(res == -ETIME)) { + timedOut = true; + return false; } - if constexpr (!linkedFutures) { - setState(ClientState::sending); - sendFuture.submit(); + bytes_recv = recvFuture.wait(); + } else { + recvFuture.submit(); + if (shouldTerminate()) return false; + bytes_recv = recvFuture.wait(); + } - if constexpr (collectTimeStamps) { - timeStamps[iteration].afterSend = high_resolution_clock::now(); - } + echoEnd = high_resolution_clock::now(); + storeTimestamp<collectTimeStampsSwitch>(&timeStamps[iteration].afterRecv, echoEnd); - if (shouldTerminate()) { - break; - } - int32_t bytes_send = sendFuture.wait(); - setState(ClientState::receiving); + if (unlikely(bytes_recv < 0)) { + int err = -bytes_recv; + if (err == ECONNRESET) { + onCONNRESET(); + return false; + } - if constexpr (collectTimeStamps) { - timeStamps[iteration].afterSendDispatch = high_resolution_clock::now(); - } + // recv failed + if (err != ECANCELED) { + echoError(err, err < 0 ? "recv failed" : "server closed the connection"); + } - if (unlikely(bytes_send < 0)) { - int err = -bytes_send; - if (err == ECONNRESET) { - onCONNRESET(); - continue; - } - echoError(err, "send failed"); - } - } else { - setState(ClientState::unknown); - recvFuture.setDependency(sendFuture); + // send failed + int32_t send_res = sendFuture.wait(); + if (send_res < 0) { + echoError(-send_res, "send failed"); } - recvFuture.submit(); + // send must have sent less than expected because recvFuture returned -ECANCELED + echoError(0, EMPER_BUILD_STR("short send: " << send_res)); + } - if (shouldTerminate()) { - break; - } - int32_t bytes_recv = recvFuture.wait(); + return true; + } - echoEnd = high_resolution_clock::now(); - if constexpr (collectTimeStamps) { - timeStamps[iteration].afterRecv = echoEnd; - } + template <CollectTimeStamps collectTimeStampsSwitch> + void _run() { + constexpr bool collectTimeStamps = collectTimeStampsSwitch == CollectTimeStamps::yes; - if (unlikely(bytes_recv < 0)) { - int err = -bytes_recv; - if (err == ECONNRESET) { - onCONNRESET(); - continue; - } + connect(); - if constexpr (!linkedFutures) { - echoError(err, "recv failed"); - } else { - // recv failed - if (err != ECANCELED) { - echoError(err, err < 0 ? "recv failed" : "server closed the connection"); - } - - // send failed - int32_t send_res = sendFuture.wait(); - if (send_res < 0) { - echoError(-send_res, "send failed"); - } - - // send must have sent less than expected because recvFuture returned -ECANCELED - echoError(0, EMPER_BUILD_STR("short send: " << send_res)); - } - } + // signal that this client is initialized and connected + readySem.release(); + + // wait for all clients to be ready + startSem.acquire(); + + while (!shouldTerminate()) { + if (!echo<collectTimeStampsSwitch>()) continue; if constexpr (!collectTimeStamps) { nanoseconds duration = duration_cast<nanoseconds>(echoEnd - echoStart); @@ -336,7 +331,6 @@ class Client { print_unexpected_echo_msg(); unexpectedEchos++; } - iteration++; } @@ -363,19 +357,10 @@ class Client { } void run() { - if (histogram) { - if (linked_futures) { - _run<CollectTimeStamps::yes, LinkedFutures::yes>(); - } else { - _run<CollectTimeStamps::yes, LinkedFutures::no>(); - } - } else { - if (linked_futures) { - _run<CollectTimeStamps::no, LinkedFutures::yes>(); - } else { - _run<CollectTimeStamps::no, LinkedFutures::no>(); - } - } + if (histogram) + _run<CollectTimeStamps::yes>(); + else + _run<CollectTimeStamps::no>(); } }; @@ -425,7 +410,6 @@ static void printUsage(char* name) { << " [-f <output-file>]\tfile to write the results to" << std::endl << " [-i <iterations> | -t <execution time in sec>]\ttermination conditions" << std::endl - << " [--linked-futures]\tuse linked futures" << std::endl << " [--coordinator <coordinator-host>]\tcoordinator address to connect to" << std::endl; } @@ -494,6 +478,7 @@ auto main(int argc, char* argv[]) -> int { exit(EXIT_FAILURE); } execution_seconds = strtol(time_s, nullptr, DECIMAL); + timeout = new TimeoutWrapper::Timespec{static_cast<int64_t>(execution_seconds), 0}; } char* size_s = getOption(argc, argv, "-s"); @@ -506,8 +491,6 @@ auto main(int argc, char* argv[]) -> int { server_backlog = strtol(server_backlog_s, nullptr, DECIMAL); } - linked_futures = getOption(argc, argv, "--linked-futures"); - int out_fd = STDOUT_FILENO; char* output_file = getOption(argc, argv, "-f"); @@ -641,12 +624,14 @@ auto main(int argc, char* argv[]) -> int { uint64_t total_iterations = 0; uint64_t total_reconnects = 0; uint64_t total_unexpected_echos = 0; + uint64_t total_timeouts = 0; RunningAverage avg_ns; for (size_t i = 0; i < nclients; ++i) { auto* client = clients[i]; total_iterations += client->iteration; total_reconnects += client->reconnects; total_unexpected_echos += client->unexpectedEchos; + if (client->timedOut) ++total_timeouts; if (!histogram) { avg_ns.update(client->avg_ns.getAverage()); } @@ -663,6 +648,7 @@ auto main(int argc, char* argv[]) -> int { sst << "total_iterations = " << total_iterations << std::endl; sst << "total_reconnects = " << total_reconnects << std::endl; sst << "total_unexpected_echos = " << total_unexpected_echos << std::endl; + sst << "total_timeouts = " << total_timeouts << std::endl; sst << "avg_ns = " << avg_ns.getAverage() << std::endl; sst << "connect_duration = " << connect_duration << std::endl; if (execution_seconds) { @@ -743,6 +729,7 @@ auto main(int argc, char* argv[]) -> int { delete clients[i]; } delete[] clients; + delete timeout; return EXIT_SUCCESS; }