// SPDX-License-Identifier: LGPL-3.0-or-later // Copyright © 2021 Florian Fischer #include <fcntl.h> #include <netdb.h> //for getaddrinfo #include <netinet/in.h> #include <netinet/tcp.h> #include <sys/socket.h> // for shutdown, socket, AF_INET #include <sys/stat.h> #include <unistd.h> #include <algorithm> // for find #include <array> #include <atomic> #include <cerrno> // for errno, ECANCELED #include <chrono> // for nanoseconds, duration, durat... #include <cstdint> // for uint64_t, int32_t #include <cstdlib> // for size_t, strtol, exit, EXIT_F... #include <cstring> // for memcmp #include <iostream> // for operator<<, basic_ostream, endl #include <string> // for allocator, string, char_traits #include <thread> #include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli... #include "CountingPrivateSemaphore.hpp" // for CPS #include "Debug.hpp" // for LOGE #include "Fiber.hpp" // for Fiber #include "Runtime.hpp" // for Runtime #include "Semaphore.hpp" // for Semaphore #include "emper.hpp" // for spawn #include "io.hpp" // for connectAndWait #include "io/Future.hpp" // for CloseFuture, RecvFuture, Sen... using emper::Semaphore; using emper::io::CloseFuture; using emper::io::RecvFuture; using emper::io::SendFuture; using std::chrono::duration_cast; using std::chrono::high_resolution_clock; using std::chrono::nanoseconds; using std::chrono::seconds; // Defaults const int DECIMAL = 10; const std::string HOST = "0.0.0.0"; const std::string PORT = "12345"; const size_t ITERATIONS = 10000; const size_t CLIENTS = 10000; const size_t SIZE = 32; const size_t SERVER_BACKLOG = 1024; // Globals std::string host = HOST; std::string port = PORT; size_t iterations = ITERATIONS; size_t execution_seconds = 0; size_t clients = CLIENTS; size_t size = SIZE; size_t server_backlog = SERVER_BACKLOG; std::atomic<bool> terminate = false; std::atomic<size_t> client_ids = 0; struct ClientResult { size_t iterations; size_t avg_ns; }; using ClientResult = struct ClientResult; struct ClientResult* clientResults; struct addrinfo* server; static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& startSem, CPS& cps) { int client_sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol); if (client_sock < 0) { DIE_MSG_ERRNO("creating new client socket failed"); } int err = emper::io::connectAndWait(client_sock, server->ai_addr, server->ai_addrlen); if (err) { DIE_MSG_ERRNO("connecting new client socket failed"); } // We are connected -> start next client size_t next_client_id = client_ids.fetch_add(1); if (next_client_id < clients) { spawn([&, next_client_id] { clientFunc(next_client_id, readySem, startSem, cps); }, cps); } int enable = 1; if (setsockopt(client_sock, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) == -1) { DIE_MSG_ERRNO("setsockopt failed"); } std::array<char*, 2> buf; buf[0] = new char[size]; buf[1] = new char[size]; uint64_t avg_ns = 0; readySem.release(); startSem.acquire(); size_t i = 0; while (i < iterations && !terminate.load(std::memory_order_relaxed)) { char* outBuf = buf[i % 2]; char* inBuf = buf[(i + 1) % 2]; SendFuture sendFuture(client_sock, outBuf, size, MSG_NOSIGNAL); RecvFuture recvFuture(client_sock, inBuf, size, MSG_WAITALL); recvFuture.setDependency(sendFuture); auto start = high_resolution_clock::now(); int32_t res = recvFuture.submitAndWait(); auto end = high_resolution_clock::now(); if (res <= 0) { // recv failed if (res != -ECANCELED) { if (res == 0) { DIE_MSG("server closed the connection"); } errno = -res; DIE_MSG_ERRNO("receive failed"); } // send failed int32_t send_res = sendFuture.wait(); if (send_res < 0) { errno = -send_res; DIE_MSG_ERRNO("send failed"); } DIE_MSG("short send" << send_res); } if (memcmp(outBuf, inBuf, size) != 0) { DIE_MSG("got unexpected echo from server"); } auto duration = duration_cast<nanoseconds>(end - start); uint64_t ns = duration.count(); if (unlikely(avg_ns == 0)) { avg_ns = ns; } else { avg_ns += ns; avg_ns /= 2; } i++; } // Shutdown and close the client socket; shutdown(client_sock, SHUT_RDWR); // ShutdownFuture shut_f(client_sock, SHUT_RDWR); CloseFuture cf(client_sock); // cf.setDependency(shut_f); cf.submit(); delete[] buf[0]; delete[] buf[1]; cf.wait(); clientResults[client_id].avg_ns = avg_ns; clientResults[client_id].iterations = i; } auto getOption(int argc, char** argv, const std::string& option) -> char* { char** end = argv + argc; char** itr = std::find(argv, end, option); if (itr != end && ++itr != end) { return *itr; } return nullptr; } static auto existsOption(int argc, char** argv, const std::string& option) -> bool { char** end = argv + argc; return std::find(argv, end, option) != end; } static void printUsage(char* name) { std::cerr << "Usage: " << name << "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]" " [-f <output-file>] [-i <iterations> | -t <execution time in sec>]" << std::endl; } auto main(int argc, char* argv[]) -> int { if (existsOption(argc, argv, "-h")) { printUsage(argv[0]); exit(EXIT_FAILURE); } char* port_s = getOption(argc, argv, "-p"); if (port_s) { port = std::string(port_s); } char* addr_s = getOption(argc, argv, "-a"); if (addr_s) { host = std::string(addr_s); } char* client_s = getOption(argc, argv, "-c"); if (client_s) { clients = strtol(client_s, nullptr, DECIMAL); } char* iterations_s = getOption(argc, argv, "-i"); if (iterations_s) { iterations = strtol(iterations_s, nullptr, DECIMAL); } char* time_s = getOption(argc, argv, "-t"); if (time_s) { if (iterations_s) { std::cerr << "-t and -i are mutual exclusive" << std::endl; printUsage(argv[0]); exit(EXIT_FAILURE); } execution_seconds = strtol(time_s, nullptr, DECIMAL); } char* size_s = getOption(argc, argv, "-s"); if (size_s) { size = strtol(size_s, nullptr, DECIMAL); } char* server_backlog_s = getOption(argc, argv, "-b"); if (server_backlog_s) { server_backlog = strtol(server_backlog_s, nullptr, DECIMAL); } int out_fd = STDOUT_FILENO; char* output_file = getOption(argc, argv, "-f"); int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server); if (err) { if (err == EAI_SYSTEM) { DIE_MSG_ERRNO("getaddrinfo failed"); } else { LOGE("error in getaddrinfo: " << gai_strerror(err)); exit(EXIT_FAILURE); } } Runtime runtime; Fiber* alphaFiber = Fiber::from([&] { clientResults = new ClientResult[clients]; CPS cps; Semaphore readySemaphore; Semaphore startSemaphore; std::thread terminator; auto connect_start = high_resolution_clock::now(); // start first client batch for (size_t i = 0; i < server_backlog; ++i) { size_t next_client_id = client_ids.fetch_add(1); if (next_client_id >= clients) { break; } spawn( [&, next_client_id] { clientFunc(next_client_id, readySemaphore, startSemaphore, cps); }, cps); } // await the clients for (size_t i = 0; i < clients; ++i) { readySemaphore.acquire(); } if (execution_seconds) { iterations = SIZE_MAX; terminator = std::thread([] { std::this_thread::sleep_for(seconds(execution_seconds)); terminate.store(true); }); } auto echo_start = high_resolution_clock::now(); // start the clients for (size_t i = 0; i < clients; ++i) { startSemaphore.release(); } // await the clients cps.wait(); if (execution_seconds) { terminator.join(); } auto echo_end = high_resolution_clock::now(); uint64_t avg_ns = clientResults[0].avg_ns; uint64_t avg_iterations = clientResults[0].iterations; for (size_t i = 1; i < clients; ++i) { avg_ns += clientResults[i].avg_ns; avg_ns /= 2; avg_iterations += clientResults[i].iterations; avg_iterations /= 2; } auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count(); auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count(); auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count(); std::stringstream sst; bool exists = true; if (output_file) { exists = access(output_file, W_OK) == 0; int openflags; int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH; if (!exists) { openflags = O_CREAT | O_WRONLY; } else { openflags = O_APPEND; } out_fd = emper::io::openAndWait(output_file, openflags, mode); if (out_fd < 0) { DIE_MSG_ERRNO("opening output file failed"); } } if (!exists) { sst << "clients,iterations,size,avg_ns,connect,echo,total" << std::endl; } sst << clients << "," << avg_iterations << "," << size << "," << avg_ns << "," << connect_duration << "," << echo_duration << "," << total_duration << std::endl; auto output = sst.str(); if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) { DIE_MSG_ERRNO("writing results failed"); } if (output_file) { emper::io::closeAndForget(out_fd); } delete[] clientResults; exit(EXIT_SUCCESS); }); runtime.scheduleFromAnywhere(*alphaFiber); runtime.waitUntilFinished(); return EXIT_FAILURE; }