// SPDX-License-Identifier: LGPL-3.0-or-later // Copyright © 2021 Florian Fischer #include <fcntl.h> #include <netdb.h> //for getaddrinfo #include <netinet/in.h> #include <netinet/tcp.h> #include <sys/socket.h> // for shutdown, socket, AF_INET #include <sys/stat.h> #include <unistd.h> #include <algorithm> // for find #include <atomic> #include <cerrno> // for errno, ECANCELED #include <chrono> // for nanoseconds, duration, durat... #include <cstdint> // for uint64_t, int32_t #include <cstdio> #include <cstdlib> // for size_t, strtol, exit, EXIT_F... #include <cstring> // for memcmp #include <iomanip> #include <iostream> // for operator<<, basic_ostream, endl #include <string> // for allocator, string, char_traits #include <thread> #include <utility> #include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli... #include "CountingPrivateSemaphore.hpp" // for CPS #include "Debug.hpp" // for LOGE #include "Fiber.hpp" // for Fiber #include "Runtime.hpp" // for Runtime #include "Semaphore.hpp" // for Semaphore #include "emper.hpp" // for spawn #include "io.hpp" // for connectAndWait #include "io/Future.hpp" // for CloseFuture, RecvFuture, Sen... #include "lib/math.hpp" using emper::Semaphore; using emper::io::CloseFuture; using emper::io::RecvFuture; using emper::io::SendFuture; using emper::lib::math::RunningAverage; using std::chrono::duration_cast; using std::chrono::high_resolution_clock; using std::chrono::nanoseconds; using std::chrono::seconds; // Defaults const int DECIMAL = 10; const std::string HOST = "0.0.0.0"; const std::string PORT = "12345"; const size_t ITERATIONS = 10000; const size_t CLIENTS = 10000; const size_t SIZE = 32; const size_t SERVER_BACKLOG = 1024; const seconds ONERROR_SLEEP(10); // Globals std::string host = HOST; std::string port = PORT; size_t iterations = ITERATIONS; size_t execution_seconds = 0; size_t nclients = CLIENTS; size_t size = SIZE; size_t server_backlog = SERVER_BACKLOG; bool linked_futures; std::atomic<bool> terminate = false; struct addrinfo* server; class Client { using IterationResult = std::pair<int, std::string>; size_t id; Semaphore& readySem; Semaphore& startSem; CPS& cps; int sock; bool initialized = false; char* inBuf; char* outBuf; public: static std::atomic<size_t> client_ids; // results size_t iteration = 0; RunningAverage<double> avg_ns; size_t unexpectedEchos = 0; size_t reconnects = 0; Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps) : id(id), readySem(readySem), startSem(startSem), cps(cps) { outBuf = new char[size]; inBuf = new char[size]; } static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool; private: static auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult { int32_t bytes_send = sendFuture.submitAndWait(); if (unlikely(bytes_send < 0)) { return {bytes_send, "send failed"}; } int32_t bytes_recv = recvFuture.submitAndWait(); if (unlikely(bytes_recv < 0)) { return {bytes_recv, "recv failed"}; } return {bytes_recv, ""}; } static auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult { recvFuture.setDependency(sendFuture); int32_t res = recvFuture.submitAndWait(); if (unlikely(res <= 0)) { // recv failed if (res != -ECANCELED) { if (res == 0) { return {0, "server closed the connection"}; } return {res, "recv failed"}; } // send failed int32_t send_res = sendFuture.wait(); if (send_res < 0) { return {send_res, "send failed"}; } // send must have sent less than expected because recvFuture returned -ECANCELED return {0, EMPER_BUILD_STR("short send: " << send_res)}; } return {res, ""}; } void connect() { sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol); if (sock < 0) { DIE_MSG_ERRNO("creating new client socket failed"); } int enable = 1; if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) == -1) { DIE_MSG_ERRNO("setsockopt failed"); } int err = emper::io::connectAndWait(sock, server->ai_addr, server->ai_addrlen); if (err) { DIE_MSG_ERRNO("connecting new client socket failed"); } if (!initialized) { // We are connected -> start next client Client::startNew(readySem, startSem, cps); initialized = true; } else { reconnects++; } } public: void run() { connect(); // signal that this client is initialized and connected readySem.release(); // wait for all clients to be ready startSem.acquire(); while (iteration < iterations && !terminate.load(std::memory_order_relaxed)) { SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL); RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL); // prepare output buf sprintf(outBuf, "%lu:%lu", id, iteration); IterationResult iterationResult; auto start = high_resolution_clock::now(); if (linked_futures) { iterationResult = linkedEcho(sendFuture, recvFuture); } else { iterationResult = echo(sendFuture, recvFuture); } auto end = high_resolution_clock::now(); // less than 1 byte was received -> an error occurred // (connection closed by server is considered an error) if (unlikely(iterationResult.first <= 0)) { int err = -iterationResult.first; auto& msg = iterationResult.second; if (err == ECONNRESET) { LOGW("Client " << id << " reconnecting"); close(sock); // reconnect and try again connect(); continue; } LOGE("Client failed at iteration " << iteration << " with " << msg << ": " << strerror(err)); std::this_thread::sleep_for(ONERROR_SLEEP); DIE; } nanoseconds duration = duration_cast<nanoseconds>(end - start); avg_ns.update(duration.count()); if (memcmp(outBuf, inBuf, size) != 0) { std::stringstream sst; sst << "got unexpected echo from server" << std::endl; sst << "expected: "; for (unsigned i = 0; i < size; ++i) { sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]); } sst << std::endl; sst << "received: "; for (unsigned i = 0; i < size; ++i) { sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]); sst << (unsigned)inBuf[i] << " "; } sst << std::endl; LOGE(sst.str()); unexpectedEchos++; } iteration++; } shutdown(sock, SHUT_RDWR); // ShutdownFuture shut_f(sock, SHUT_RDWR); CloseFuture cf(sock); // cf.setDependency(shut_f); cf.submit(); delete[] inBuf; delete[] outBuf; cf.wait(); } }; Client** clients; std::atomic<size_t> Client::client_ids = 0; auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool { size_t next_client_id = client_ids.fetch_add(1); if (next_client_id >= nclients) { return false; } spawn( [&, id = next_client_id] { clients[id] = new Client(id, readySem, startSem, cps); clients[id]->run(); }, cps); return true; } auto getOption(int argc, char** argv, const std::string& option) -> char* { char** end = argv + argc; char** itr = std::find(argv, end, option); if (itr != end && ++itr != end) { return *itr; } return nullptr; } static auto existsOption(int argc, char** argv, const std::string& option) -> bool { char** end = argv + argc; return std::find(argv, end, option) != end; } static void printUsage(char* name) { std::cerr << "Usage: " << name << "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]" " [-f <output-file>] [-i <iterations> | -t <execution time in sec>] [--linked-futures]" << std::endl; } auto main(int argc, char* argv[]) -> int { if (existsOption(argc, argv, "-h")) { printUsage(argv[0]); exit(EXIT_FAILURE); } char* port_s = getOption(argc, argv, "-p"); if (port_s) { port = std::string(port_s); } char* addr_s = getOption(argc, argv, "-a"); if (addr_s) { host = std::string(addr_s); } char* client_s = getOption(argc, argv, "-c"); if (client_s) { nclients = strtol(client_s, nullptr, DECIMAL); } char* iterations_s = getOption(argc, argv, "-i"); if (iterations_s) { iterations = strtol(iterations_s, nullptr, DECIMAL); } char* time_s = getOption(argc, argv, "-t"); if (time_s) { if (iterations_s) { std::cerr << "-t and -i are mutual exclusive" << std::endl; printUsage(argv[0]); exit(EXIT_FAILURE); } execution_seconds = strtol(time_s, nullptr, DECIMAL); } char* size_s = getOption(argc, argv, "-s"); if (size_s) { size = strtol(size_s, nullptr, DECIMAL); } char* server_backlog_s = getOption(argc, argv, "-b"); if (server_backlog_s) { server_backlog = strtol(server_backlog_s, nullptr, DECIMAL); } linked_futures = getOption(argc, argv, "--linked-futures"); int out_fd = STDOUT_FILENO; char* output_file = getOption(argc, argv, "-f"); int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server); if (err) { if (err == EAI_SYSTEM) { DIE_MSG_ERRNO("getaddrinfo failed"); } else { LOGE("error in getaddrinfo: " << gai_strerror(err)); exit(EXIT_FAILURE); } } Runtime runtime; Fiber* alphaFiber = Fiber::from([&] { clients = new Client*[nclients]; CPS cps; Semaphore readySemaphore; Semaphore startSemaphore; std::thread terminator; auto connect_start = high_resolution_clock::now(); // start first client batch for (size_t i = 0; i < server_backlog; ++i) { // Start new clients until we have started all or the amount // of parallel connecting clients is reached if (!Client::startNew(readySemaphore, startSemaphore, cps)) { break; } } // await the clients for (size_t i = 0; i < nclients; ++i) { readySemaphore.acquire(); } if (execution_seconds) { iterations = SIZE_MAX; terminator = std::thread([] { std::this_thread::sleep_for(seconds(execution_seconds)); terminate.store(true); }); } auto echo_start = high_resolution_clock::now(); // start the clients for (size_t i = 0; i < nclients; ++i) { startSemaphore.release(); } // await client termination cps.wait(); if (execution_seconds) { terminator.join(); } auto echo_end = high_resolution_clock::now(); auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count(); auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count(); auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count(); if (output_file) { if (access(output_file, W_OK) == 0) { DIE_MSG("Output file: " << output_file << " already exists"); } int openflags = O_CREAT | O_WRONLY; int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH; out_fd = emper::io::openAndWait(output_file, openflags, mode); if (out_fd < 0) { DIE_MSG_ERRNO("opening output file failed"); } } std::stringstream sst; sst << "[global]" << std::endl; sst << "clients = " << nclients << std::endl; sst << "size = " << size << std::endl; sst << "connect_duration = " << connect_duration << std::endl; sst << "echo_duration = " << echo_duration << std::endl; sst << "total_duration = " << total_duration << std::endl; sst << std::endl; sst << "[clients]" << std::endl; sst << "csv =" << std::endl; sst << " iterations,avg_ns,reconnects,unexpected_echos" << std::endl; for (size_t i = 0; i < nclients; ++i) { auto* client = clients[i]; sst << " " << client->iteration << ","; sst << client->avg_ns.getAverage() << ","; sst << client->reconnects << ","; sst << client->unexpectedEchos << std::endl; } auto output = sst.str(); if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) { DIE_MSG_ERRNO("writing results failed"); } if (output_file) { emper::io::closeAndForget(out_fd); } for (size_t i = 0; i < nclients; ++i) { delete clients[i]; } delete[] clients; exit(EXIT_SUCCESS); }); runtime.scheduleFromAnywhere(*alphaFiber); runtime.waitUntilFinished(); return EXIT_FAILURE; }