diff --git a/apps/EchoClient.cpp b/apps/EchoClient.cpp index 1a4773489cec3bc20baf992ffb19abc4b12970e2..606f031b3901e06b8921161cb9968b2692a944ba 100644 --- a/apps/EchoClient.cpp +++ b/apps/EchoClient.cpp @@ -69,6 +69,7 @@ size_t size = SIZE; size_t server_backlog = SERVER_BACKLOG; bool linked_futures; bool histogram = false; +bool sendQuit = true; std::atomic<bool> terminate = false; @@ -105,6 +106,9 @@ class Client { size_t reconnects = 0; struct TimeStamps* timeStamps = nullptr; + high_resolution_clock::time_point echoStart; + high_resolution_clock::time_point echoEnd; + high_resolution_clock::time_point echoLoopEnd; Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps, bool collectTimeStamps) : id(id), readySem(readySem), startSem(startSem), cps(cps) { @@ -125,79 +129,46 @@ class Client { static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool; private: - template <bool collectTimeStamps = false> - auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult { - if constexpr (collectTimeStamps) { - timeStamps[iteration].start = high_resolution_clock::now(); - } - - sendFuture.submit(); - - if constexpr (collectTimeStamps) { - timeStamps[iteration].afterSend = high_resolution_clock::now(); - } - - int32_t bytes_send = sendFuture.wait(); - - if constexpr (collectTimeStamps) { - timeStamps[iteration].afterSendDispatch = high_resolution_clock::now(); - } - - if (unlikely(bytes_send < 0)) { - return {bytes_send, "send failed"}; - } - - int32_t bytes_recv = recvFuture.submitAndWait(); + [[nodiscard]] auto shouldTerminate() const -> bool { + return iteration >= iterations || terminate.load(std::memory_order_relaxed); + } - if constexpr (collectTimeStamps) { - timeStamps[iteration].afterRecv = high_resolution_clock::now(); + void print_unexpected_echo_msg() const { + std::stringstream sst; + sst << "Client " << id << " got unexpected echo from server" << std::endl; + sst << "expected: "; + for (unsigned i = 0; i < size; ++i) { + sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]); } + sst << std::endl; - if (unlikely(bytes_recv < 0)) { - return {bytes_recv, "recv failed"}; + sst << "received: "; + for (unsigned i = 0; i < size; ++i) { + sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]); + sst << (unsigned)inBuf[i] << " "; } - - return {bytes_recv, ""}; + sst << std::endl; + LOGE(sst.str()); } - template <bool collectTimeStamps = false> - auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult { - recvFuture.setDependency(sendFuture); - - recvFuture.submit(); - - if constexpr (collectTimeStamps) { - timeStamps[iteration].afterSend = high_resolution_clock::now(); - } - - int32_t res = recvFuture.wait(); - - if constexpr (collectTimeStamps) { - timeStamps[iteration].afterSendDispatch = timeStamps[iteration].afterRecv = - high_resolution_clock::now(); + void echoError(int err, std::string&& msg) const { + std::stringstream sst; + sst << "Client " << id << " failed at iteration " << iteration << " with " << msg; + if (err != 0) { + sst << ": " << strerror(err); } + sst << std::endl; + LOGE(sst.str()); - if (unlikely(res <= 0)) { - // recv failed - if (res != -ECANCELED) { - if (res == 0) { - return {0, "server closed the connection"}; - } - - return {res, "recv failed"}; - } - - // send failed - int32_t send_res = sendFuture.wait(); - if (send_res < 0) { - return {send_res, "send failed"}; - } - - // send must have sent less than expected because recvFuture returned -ECANCELED - return {0, EMPER_BUILD_STR("short send: " << send_res)}; - } + std::this_thread::sleep_for(ONERROR_SLEEP); + DIE; + } - return {res, ""}; + void onCONNRESET() { + LOGW("Client " << id << " reconnecting"); + close(sock); + // reconnect to try again + connect(); } void connect() { @@ -225,9 +196,15 @@ class Client { } } - public: - template <bool collectTimeStamps = false> - void run() { + enum class CollectTimeStamps { yes, no }; + + enum class LinkedFutures { yes, no }; + + template <CollectTimeStamps collectTimeStampsSwitch, LinkedFutures linkedFuturesSwitch> + void _run() { + constexpr bool collectTimeStamps = collectTimeStampsSwitch == CollectTimeStamps::yes; + constexpr bool linkedFutures = linkedFuturesSwitch == LinkedFutures::yes; + connect(); // signal that this client is initialized and connected @@ -236,77 +213,112 @@ class Client { // wait for all clients to be ready startSem.acquire(); - while (iteration < iterations && !terminate.load(std::memory_order_relaxed)) { + while (!shouldTerminate()) { SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL); RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL); // prepare output buf sprintf(outBuf, "%lu:%lu", id, iteration); - high_resolution_clock::time_point start; - if constexpr (!collectTimeStamps) { - start = high_resolution_clock::now(); + echoStart = high_resolution_clock::now(); + if constexpr (collectTimeStamps) { + timeStamps[iteration].start = echoStart; } - IterationResult iterationResult; - if (linked_futures) { - iterationResult = linkedEcho<collectTimeStamps>(sendFuture, recvFuture); + if constexpr (!linkedFutures) { + sendFuture.submit(); + + if constexpr (collectTimeStamps) { + timeStamps[iteration].afterSend = high_resolution_clock::now(); + } + + if (shouldTerminate()) { + break; + } + int32_t bytes_send = sendFuture.wait(); + + if constexpr (collectTimeStamps) { + timeStamps[iteration].afterSendDispatch = high_resolution_clock::now(); + } + + if (unlikely(bytes_send < 0)) { + int err = -bytes_send; + if (err == ECONNRESET) { + onCONNRESET(); + continue; + } + echoError(err, "send failed"); + } } else { - iterationResult = echo<collectTimeStamps>(sendFuture, recvFuture); + recvFuture.setDependency(sendFuture); } - high_resolution_clock::time_point end; - if constexpr (!collectTimeStamps) { - end = high_resolution_clock::now(); + recvFuture.submit(); + + if (shouldTerminate()) { + break; + } + int32_t bytes_recv = recvFuture.wait(); + + echoEnd = high_resolution_clock::now(); + if constexpr (collectTimeStamps) { + timeStamps[iteration].afterRecv = echoEnd; } - // less than 1 byte was received -> an error occurred - // (connection closed by server is considered an error) - if (unlikely(iterationResult.first <= 0)) { - int err = -iterationResult.first; - auto& msg = iterationResult.second; + if (unlikely(bytes_recv < 0)) { + int err = -bytes_recv; if (err == ECONNRESET) { - LOGW("Client " << id << " reconnecting"); - close(sock); - // reconnect and try again - connect(); + onCONNRESET(); continue; } - LOGE("Client failed at iteration " << iteration << " with " << msg << ": " - << strerror(err)); - std::this_thread::sleep_for(ONERROR_SLEEP); - DIE; + if constexpr (!linkedFutures) { + echoError(err, "recv failed"); + } else { + // recv failed + if (err != ECANCELED) { + echoError(err, err < 0 ? "recv failed" : "server closed the connection"); + } + + // send failed + int32_t send_res = sendFuture.wait(); + if (send_res < 0) { + echoError(-send_res, "send failed"); + } + + // send must have sent less than expected because recvFuture returned -ECANCELED + echoError(0, EMPER_BUILD_STR("short send: " << send_res)); + } } if constexpr (!collectTimeStamps) { - nanoseconds duration = duration_cast<nanoseconds>(end - start); + nanoseconds duration = duration_cast<nanoseconds>(echoEnd - echoStart); avg_ns.update(duration.count()); } if (memcmp(outBuf, inBuf, size) != 0) { - std::stringstream sst; - sst << "got unexpected echo from server" << std::endl; - sst << "expected: "; - for (unsigned i = 0; i < size; ++i) { - sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]); - } - sst << std::endl; - - sst << "received: "; - for (unsigned i = 0; i < size; ++i) { - sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]); - sst << (unsigned)inBuf[i] << " "; - } - sst << std::endl; - LOGE(sst.str()); + print_unexpected_echo_msg(); unexpectedEchos++; } iteration++; } - shutdown(sock, SHUT_RDWR); + echoLoopEnd = high_resolution_clock::now(); + } + + public: + void shutdown(bool isQuitClient) const { + if (sendQuit && isQuitClient) { + LOGI("Send quit message"); + std::string msg = "quit\n"; + ssize_t res = emper::io::sendAndWait(sock, msg.c_str(), msg.size(), MSG_NOSIGNAL); + if (res < 0) { + LOGW("Sending quit failed: " << strerror(errno)); + } + } + + ::shutdown(sock, SHUT_RDWR); // ShutdownFuture shut_f(sock, SHUT_RDWR); CloseFuture cf(sock); @@ -315,6 +327,22 @@ class Client { cf.wait(); } + + void run() { + if (histogram) { + if (linked_futures) { + _run<CollectTimeStamps::yes, LinkedFutures::yes>(); + } else { + _run<CollectTimeStamps::yes, LinkedFutures::no>(); + } + } else { + if (linked_futures) { + _run<CollectTimeStamps::no, LinkedFutures::yes>(); + } else { + _run<CollectTimeStamps::no, LinkedFutures::no>(); + } + } + } }; Client** clients; @@ -330,12 +358,7 @@ auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> boo spawn( [&, id = next_client_id] { clients[id] = new Client(id, readySem, startSem, cps, histogram); - - if (histogram) { - clients[id]->run<true>(); - } else { - clients[id]->run(); - } + clients[id]->run(); }, cps); return true; @@ -427,6 +450,8 @@ auto main(int argc, char* argv[]) -> int { } } + sendQuit = !existsOption(argc, argv, "--no-quit"); + int histogram_fd; char* histogram_file = getOption(argc, argv, "--histogram"); if (histogram_file) { @@ -481,10 +506,12 @@ auto main(int argc, char* argv[]) -> int { readySemaphore.acquire(); } + high_resolution_clock::time_point echoTerminationTime; if (execution_seconds) { iterations = SIZE_MAX; - terminator = std::thread([] { + terminator = std::thread([&] { std::this_thread::sleep_for(seconds(execution_seconds)); + echoTerminationTime = high_resolution_clock::now(); terminate.store(true); }); } @@ -496,16 +523,21 @@ auto main(int argc, char* argv[]) -> int { startSemaphore.release(); } - // await client termination + // await echo phase termination cps.wait(); + auto echo_end = high_resolution_clock::now(); + if (execution_seconds) { terminator.join(); } - auto echo_end = high_resolution_clock::now(); - auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count(); + uint64_t echoDurationUntilTermination; + if (execution_seconds) { + echoDurationUntilTermination = + duration_cast<nanoseconds>(echoTerminationTime - echo_start).count(); + } auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count(); auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count(); @@ -521,6 +553,10 @@ auto main(int argc, char* argv[]) -> int { if (!histogram) { avg_ns.update(client->avg_ns.getAverage()); } + + // terminate the connection and send 'quit' on the last connection + bool isQuitClient = i == (nclients - 1); + async([=]() { client->shutdown(isQuitClient); }); } std::stringstream sst; @@ -532,19 +568,24 @@ auto main(int argc, char* argv[]) -> int { sst << "total_unexpected_echos = " << total_unexpected_echos << std::endl; sst << "avg_ns = " << avg_ns.getAverage() << std::endl; sst << "connect_duration = " << connect_duration << std::endl; + if (execution_seconds) { + sst << "execution_duration = " << echoDurationUntilTermination << std::endl; + } sst << "echo_duration = " << echo_duration << std::endl; sst << "total_duration = " << total_duration << std::endl; sst << std::endl; sst << "[clients]" << std::endl; sst << "csv =" << std::endl; - sst << " iterations,avg_ns,reconnects,unexpected_echos" << std::endl; + sst << " iterations,avg_ns,reconnects,unexpected_echos,echoLoopEnd" << std::endl; for (size_t i = 0; i < nclients; ++i) { auto* client = clients[i]; sst << " " << client->iteration << ","; sst << client->avg_ns.getAverage() << ","; sst << client->reconnects << ","; - sst << client->unexpectedEchos << std::endl; + sst << client->unexpectedEchos << ","; + sst << duration_cast<nanoseconds>(client->echoLoopEnd.time_since_epoch()).count() + << std::endl; } auto output = sst.str(); @@ -590,17 +631,17 @@ auto main(int argc, char* argv[]) -> int { emper::io::closeAndForget(histogram_fd); } - for (size_t i = 0; i < nclients; ++i) { - delete clients[i]; - } - delete[] clients; - - exit(EXIT_SUCCESS); + Runtime::getRuntime()->initiateTermination(); }); runtime.scheduleFromAnywhere(*alphaFiber); runtime.waitUntilFinished(); - return EXIT_FAILURE; + for (size_t i = 0; i < nclients; ++i) { + delete clients[i]; + } + delete[] clients; + + return EXIT_SUCCESS; }