// SPDX-License-Identifier: LGPL-3.0-or-later // Copyright © 2021 Florian Fischer #include <arpa/inet.h> #include <fcntl.h> #include <netdb.h> //for getaddrinfo #include <netinet/in.h> #include <netinet/tcp.h> #include <sys/socket.h> // for shutdown, socket, AF_INET #include <sys/stat.h> #include <unistd.h> #include <algorithm> // for find #include <array> #include <atomic> #include <cerrno> // for errno, ECANCELED #include <chrono> // for nanoseconds, duration, durat... #include <cstdint> // for uint64_t, int32_t #include <cstdio> #include <cstdlib> // for size_t, strtol, exit, EXIT_F... #include <cstring> // for memcmp #include <iomanip> #include <iostream> // for operator<<, basic_ostream, endl #include <string> // for allocator, string, char_traits #include <thread> #include <utility> #include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli... #include "CountingPrivateSemaphore.hpp" // for CPS #include "Debug.hpp" // for LOGE #include "Fiber.hpp" // for Fiber #include "Runtime.hpp" // for Runtime #include "Semaphore.hpp" // for Semaphore #include "emper.hpp" // for spawn #include "io.hpp" // for connectAndWait #include "io/Future.hpp" // for CloseFuture, RecvFuture, Sen... #include "lib/math.hpp" using emper::Semaphore; using namespace emper::io; using emper::lib::math::RunningAverage; using std::chrono::duration_cast; using std::chrono::high_resolution_clock; using std::chrono::nanoseconds; using std::chrono::seconds; // Defaults const int DECIMAL = 10; const std::string HOST = "0.0.0.0"; const std::string PORT = "12345"; const size_t ITERATIONS = 10000; const size_t CLIENTS = 10000; const size_t SIZE = 32; const size_t SERVER_BACKLOG = 1024; const seconds ONERROR_SLEEP(10); const int OPENFLAGS = O_CREAT | O_WRONLY; const int OPENMODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH; // Globals std::string host = HOST; std::string port = PORT; size_t iterations = ITERATIONS; size_t execution_seconds = 0; size_t nclients = CLIENTS; size_t size = SIZE; size_t server_backlog = SERVER_BACKLOG; bool linked_futures; bool histogram = false; bool sendQuit = true; std::atomic<bool> terminate = false; struct addrinfo* server; struct addrinfo* coordinator; std::string COORDINATOR_PORT = "12346"; static auto getFirstAddrInfo(const std::string& host, const std::string& port) -> struct addrinfo* { struct addrinfo* addrinfo; int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &addrinfo); if (err) { if (err == EAI_SYSTEM) { DIE_MSG_ERRNO("getaddrinfo failed"); } else { LOGE("error in getaddrinfo: " << gai_strerror(err)); exit(EXIT_FAILURE); } } return addrinfo; } struct TimeStamps { high_resolution_clock::time_point start; high_resolution_clock::time_point afterSend; high_resolution_clock::time_point afterSendDispatch; high_resolution_clock::time_point afterRecv; }; class Client { using IterationResult = std::pair<int, std::string>; size_t id; Semaphore& readySem; Semaphore& startSem; CPS& cps; int sock; bool initialized = false; char* inBuf; char* outBuf; public: static std::atomic<size_t> client_ids; // results size_t iteration = 0; RunningAverage<double> avg_ns; size_t unexpectedEchos = 0; size_t reconnects = 0; struct TimeStamps* timeStamps = nullptr; high_resolution_clock::time_point echoStart; high_resolution_clock::time_point echoEnd; high_resolution_clock::time_point echoLoopEnd; Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps, bool collectTimeStamps) : id(id), readySem(readySem), startSem(startSem), cps(cps) { outBuf = new char[size]; inBuf = new char[size]; if (collectTimeStamps) { timeStamps = new TimeStamps[iterations]; } } ~Client() { delete[] outBuf; delete[] inBuf; delete[] timeStamps; } static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool; private: [[nodiscard]] auto shouldTerminate() const -> bool { return iteration >= iterations || terminate.load(std::memory_order_relaxed); } void print_unexpected_echo_msg() const { std::stringstream sst; sst << "Client " << id << " got unexpected echo from server" << std::endl; sst << "expected: "; for (unsigned i = 0; i < size; ++i) { sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]); } sst << std::endl; sst << "received: "; for (unsigned i = 0; i < size; ++i) { sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]); sst << (unsigned)inBuf[i] << " "; } sst << std::endl; LOGE(sst.str()); } void echoError(int err, std::string&& msg) const { std::stringstream sst; sst << "Client " << id << " failed at iteration " << iteration << " with " << msg; if (err != 0) { sst << ": " << strerror(err); } sst << std::endl; LOGE(sst.str()); std::this_thread::sleep_for(ONERROR_SLEEP); DIE; } void onCONNRESET() { LOGW("Client " << id << " reconnecting"); closeAndForget(sock); // reconnect to try again connect(); } void connect() { sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol); if (sock < 0) { DIE_MSG_ERRNO("creating new client socket failed"); } int enable = 1; if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) == -1) { DIE_MSG_ERRNO("setsockopt failed"); } int err = emper::io::connectAndWait(sock, server->ai_addr, server->ai_addrlen); if (err) { DIE_MSG_ERRNO("connecting new client socket failed"); } if (!initialized) { // We are connected -> start next client Client::startNew(readySem, startSem, cps); initialized = true; } else { reconnects++; } } enum class CollectTimeStamps { yes, no }; enum class LinkedFutures { yes, no }; template <CollectTimeStamps collectTimeStampsSwitch, LinkedFutures linkedFuturesSwitch> void _run() { constexpr bool collectTimeStamps = collectTimeStampsSwitch == CollectTimeStamps::yes; constexpr bool linkedFutures = linkedFuturesSwitch == LinkedFutures::yes; connect(); // signal that this client is initialized and connected readySem.release(); // wait for all clients to be ready startSem.acquire(); while (!shouldTerminate()) { SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL); RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL); // prepare output buf sprintf(outBuf, "%lu:%lu", id, iteration); echoStart = high_resolution_clock::now(); if constexpr (collectTimeStamps) { timeStamps[iteration].start = echoStart; } if constexpr (!linkedFutures) { sendFuture.submit(); if constexpr (collectTimeStamps) { timeStamps[iteration].afterSend = high_resolution_clock::now(); } if (shouldTerminate()) { break; } int32_t bytes_send = sendFuture.wait(); if constexpr (collectTimeStamps) { timeStamps[iteration].afterSendDispatch = high_resolution_clock::now(); } if (unlikely(bytes_send < 0)) { int err = -bytes_send; if (err == ECONNRESET) { onCONNRESET(); continue; } echoError(err, "send failed"); } } else { recvFuture.setDependency(sendFuture); } recvFuture.submit(); if (shouldTerminate()) { break; } int32_t bytes_recv = recvFuture.wait(); echoEnd = high_resolution_clock::now(); if constexpr (collectTimeStamps) { timeStamps[iteration].afterRecv = echoEnd; } if (unlikely(bytes_recv < 0)) { int err = -bytes_recv; if (err == ECONNRESET) { onCONNRESET(); continue; } if constexpr (!linkedFutures) { echoError(err, "recv failed"); } else { // recv failed if (err != ECANCELED) { echoError(err, err < 0 ? "recv failed" : "server closed the connection"); } // send failed int32_t send_res = sendFuture.wait(); if (send_res < 0) { echoError(-send_res, "send failed"); } // send must have sent less than expected because recvFuture returned -ECANCELED echoError(0, EMPER_BUILD_STR("short send: " << send_res)); } } if constexpr (!collectTimeStamps) { nanoseconds duration = duration_cast<nanoseconds>(echoEnd - echoStart); avg_ns.update(duration.count()); } if (memcmp(outBuf, inBuf, size) != 0) { print_unexpected_echo_msg(); unexpectedEchos++; } iteration++; } echoLoopEnd = high_resolution_clock::now(); } public: void shutdown(bool isQuitClient) const { if (sendQuit && isQuitClient) { LOGI("Send quit message"); std::string msg = "quit\n"; ssize_t res = emper::io::sendAndWait(sock, msg.c_str(), msg.size(), MSG_NOSIGNAL); if (res < 0) { LOGW("Sending quit failed: " << strerror(errno)); } } ShutdownFuture shut_f(sock, SHUT_RDWR); CloseFuture cf(sock); cf.setDependency(shut_f); cf.submit(); cf.wait(); } void run() { if (histogram) { if (linked_futures) { _run<CollectTimeStamps::yes, LinkedFutures::yes>(); } else { _run<CollectTimeStamps::yes, LinkedFutures::no>(); } } else { if (linked_futures) { _run<CollectTimeStamps::no, LinkedFutures::yes>(); } else { _run<CollectTimeStamps::no, LinkedFutures::no>(); } } } }; Client** clients; std::atomic<size_t> Client::client_ids = 0; auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool { size_t next_client_id = client_ids.fetch_add(1); if (next_client_id >= nclients) { return false; } spawn( [&, id = next_client_id] { clients[id] = new Client(id, readySem, startSem, cps, histogram); clients[id]->run(); }, cps); return true; } static auto getOption(int argc, char** argv, const std::string& option) -> char* { char** end = argv + argc; char** itr = std::find(argv, end, option); if (itr != end && ++itr != end) { return *itr; } return nullptr; } static auto existsOption(int argc, char** argv, const std::string& option) -> bool { char** end = argv + argc; return std::find(argv, end, option) != end; } static void printUsage(char* name) { std::cerr << "Usage: " << name << "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]" " [-f <output-file>] [-i <iterations> | -t <execution time in sec>] [--linked-futures]" " [--coordinator <coordinator-host>" << std::endl; } static void printIp(struct sockaddr* addr) { std::array<char, INET6_ADDRSTRLEN> serverAddrString; void* serverAddr; switch (addr->sa_family) { case AF_INET: serverAddr = &(((struct sockaddr_in*)addr)->sin_addr); break; case AF_INET6: serverAddr = &(((struct sockaddr_in6*)addr)->sin6_addr); break; default: DIE_MSG("uknown sa_family " << addr->sa_family); } inet_ntop(addr->sa_family, serverAddr, serverAddrString.data(), INET6_ADDRSTRLEN); std::cout << "echo client connecting to " << serverAddrString.data() << ":" << port << std::endl; } static void printSettings() { std::cout << "parameters: {termination="; if (execution_seconds) { std::cout << execution_seconds << "seconds"; } else { std::cout << iterations << "iterations"; } std::cout << ",clients=" << nclients; std::cout << ",size=" << size; std::cout << ",sendquit= " << (sendQuit ? "true" : "false"); std::cout << "}" << std::endl; } auto main(int argc, char* argv[]) -> int { if (existsOption(argc, argv, "-h")) { printUsage(argv[0]); exit(EXIT_FAILURE); } char* port_s = getOption(argc, argv, "-p"); if (port_s) { port = std::string(port_s); } char* addr_s = getOption(argc, argv, "-a"); if (addr_s) { host = std::string(addr_s); } char* client_s = getOption(argc, argv, "-c"); if (client_s) { nclients = strtol(client_s, nullptr, DECIMAL); } char* iterations_s = getOption(argc, argv, "-i"); if (iterations_s) { iterations = strtol(iterations_s, nullptr, DECIMAL); } char* time_s = getOption(argc, argv, "-t"); if (time_s) { if (iterations_s) { std::cerr << "-t and -i are mutual exclusive" << std::endl; printUsage(argv[0]); exit(EXIT_FAILURE); } execution_seconds = strtol(time_s, nullptr, DECIMAL); } char* size_s = getOption(argc, argv, "-s"); if (size_s) { size = strtol(size_s, nullptr, DECIMAL); } char* server_backlog_s = getOption(argc, argv, "-b"); if (server_backlog_s) { server_backlog = strtol(server_backlog_s, nullptr, DECIMAL); } linked_futures = getOption(argc, argv, "--linked-futures"); int out_fd = STDOUT_FILENO; char* output_file = getOption(argc, argv, "-f"); if (output_file) { if (access(output_file, W_OK) == 0) { DIE_MSG("Output file: " << output_file << " already exists"); } out_fd = ::open(output_file, OPENFLAGS, OPENMODE); if (out_fd < 0) { DIE_MSG_ERRNO("opening output file failed"); } } sendQuit = !existsOption(argc, argv, "--no-quit"); int histogram_fd; char* histogram_file = getOption(argc, argv, "--histogram"); if (histogram_file) { if (!iterations_s) { DIE_MSG("histograms are currently only possible with fixed iterations"); } histogram = true; if (access(histogram_file, W_OK) == 0) { DIE_MSG("Histogram file: " << histogram_file << " already exists"); } histogram_fd = ::open(histogram_file, OPENFLAGS, OPENMODE); if (histogram_fd < 0) { DIE_MSG_ERRNO("opening histogram file failed"); } } char* coordinator_s = getOption(argc, argv, "--coordinator"); if (coordinator_s) { coordinator = getFirstAddrInfo(coordinator_s, COORDINATOR_PORT); } server = getFirstAddrInfo(host, port); printIp(server->ai_addr); printSettings(); Runtime runtime; Fiber* alphaFiber = Fiber::from([&] { clients = new Client*[nclients]; CPS cps; Semaphore readySemaphore; Semaphore startSemaphore; std::thread terminator; auto connect_start = high_resolution_clock::now(); // start first client batch for (size_t i = 0; i < server_backlog; ++i) { // Start new clients until we have started all or the amount // of parallel connecting clients is reached if (!Client::startNew(readySemaphore, startSemaphore, cps)) { break; } } // await the connection of all clients for (size_t i = 0; i < nclients; ++i) { readySemaphore.acquire(); } // synchronize with other peers using the coordinator if (coordinator) { int conn = socket(coordinator->ai_family, coordinator->ai_socktype, coordinator->ai_protocol); if (conn < 0) DIE_MSG_ERRNO("coordinator socket creation failed"); int err = emper::io::connectAndWait(conn, coordinator->ai_addr, coordinator->ai_addrlen); if (err) DIE_MSG_ERRNO("coordinator socket connect failed"); std::string m = "OK"; SendFuture sf(conn, m.c_str(), m.size(), 0); std::array<char, 16> buf; RecvFuture rf(conn, buf.data(), buf.size(), 0); rf.setDependency(sf); err = rf.submitAndWait(); if (err < 0) DIE_MSG("coordinator recv failed: " << strerror(-err)); closeAndForget(conn); LOGI("Coordinator go received"); } high_resolution_clock::time_point echoTerminationTime; if (execution_seconds) { iterations = SIZE_MAX; terminator = std::thread([&] { std::this_thread::sleep_for(seconds(execution_seconds)); echoTerminationTime = high_resolution_clock::now(); terminate.store(true); }); } auto echo_start = high_resolution_clock::now(); // start the clients for (size_t i = 0; i < nclients; ++i) { startSemaphore.release(); } // await echo phase termination cps.wait(); auto echo_end = high_resolution_clock::now(); if (execution_seconds) { terminator.join(); } auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count(); uint64_t echoDurationUntilTermination; if (execution_seconds) { echoDurationUntilTermination = duration_cast<nanoseconds>(echoTerminationTime - echo_start).count(); } auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count(); auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count(); uint64_t total_iterations = 0; uint64_t total_reconnects = 0; uint64_t total_unexpected_echos = 0; RunningAverage avg_ns; for (size_t i = 0; i < nclients; ++i) { auto* client = clients[i]; total_iterations += client->iteration; total_reconnects += client->reconnects; total_unexpected_echos += client->unexpectedEchos; if (!histogram) { avg_ns.update(client->avg_ns.getAverage()); } // terminate the connection and send 'quit' on the last connection bool isQuitClient = i == (nclients - 1); async([=]() { client->shutdown(isQuitClient); }); } std::stringstream sst; sst << "[global]" << std::endl; sst << "clients = " << nclients << std::endl; sst << "size = " << size << std::endl; sst << "total_iterations = " << total_iterations << std::endl; sst << "total_reconnects = " << total_reconnects << std::endl; sst << "total_unexpected_echos = " << total_unexpected_echos << std::endl; sst << "avg_ns = " << avg_ns.getAverage() << std::endl; sst << "connect_duration = " << connect_duration << std::endl; if (execution_seconds) { sst << "execution_duration = " << echoDurationUntilTermination << std::endl; } sst << "echo_duration = " << echo_duration << std::endl; sst << "total_duration = " << total_duration << std::endl; sst << std::endl; sst << "[clients]" << std::endl; sst << "csv =" << std::endl; sst << " iterations,avg_ns,reconnects,unexpected_echos,echoLoopEnd" << std::endl; for (size_t i = 0; i < nclients; ++i) { auto* client = clients[i]; sst << " " << client->iteration << ","; sst << client->avg_ns.getAverage() << ","; sst << client->reconnects << ","; sst << client->unexpectedEchos << ","; sst << duration_cast<nanoseconds>(client->echoLoopEnd.time_since_epoch()).count() << std::endl; } auto output = sst.str(); if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) { DIE_MSG_ERRNO("writing results failed"); } if (output_file) { emper::io::closeAndForget(out_fd); } if (histogram) { std::string histogramDataHeader = "total_latency,after_send_latency,after_send_dispatch_latency\n"; if (emper::io::writeFileAndWait(histogram_fd, histogramDataHeader.c_str(), histogramDataHeader.size()) < 0) { DIE_MSG_ERRNO("writing histogram data header failed"); } for (size_t i = 0; i < nclients; ++i) { auto* client = clients[i]; std::stringstream sst; for (size_t iteration = 0; iteration < iterations; ++iteration) { auto timeStamps = client->timeStamps[iteration]; nanoseconds totalLatency = duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.start); nanoseconds afterSendLatency = duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSend); nanoseconds afterSendDispatchLatency = duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSendDispatch); sst << totalLatency.count() << ","; sst << afterSendLatency.count() << ","; sst << afterSendDispatchLatency.count() << std::endl; } auto histoOutput = sst.str(); if (emper::io::writeFileAndWait(histogram_fd, histoOutput.c_str(), histoOutput.size()) < 0) { DIE_MSG_ERRNO("writing histogram data failed"); } } emper::io::closeAndForget(histogram_fd); } Runtime::getRuntime()->initiateTermination(); }); runtime.scheduleFromAnywhere(*alphaFiber); runtime.waitUntilFinished(); for (size_t i = 0; i < nclients; ++i) { delete clients[i]; } delete[] clients; return EXIT_SUCCESS; }