// SPDX-License-Identifier: LGPL-3.0-or-later // Copyright © 2021 Florian Fischer #include <fcntl.h> #include <netdb.h> //for getaddrinfo #include <sys/socket.h> // for shutdown, socket, AF_INET #include <sys/stat.h> #include <unistd.h> #include <algorithm> // for find #include <atomic> #include <cerrno> // for errno, ECANCELED #include <chrono> // for nanoseconds, duration, durat... #include <cstdint> // for uint64_t, int32_t #include <cstdlib> // for size_t, strtol, exit, EXIT_F... #include <cstring> // for memcmp #include <iostream> // for operator<<, basic_ostream, endl #include <string> // for allocator, string, char_traits #include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli... #include "CountingPrivateSemaphore.hpp" // for CPS #include "Debug.hpp" // for LOGE #include "Fiber.hpp" // for Fiber #include "Runtime.hpp" // for Runtime #include "Semaphore.hpp" // for Semaphore #include "emper.hpp" // for spawn #include "io.hpp" // for connectAndWait #include "io/Future.hpp" // for CloseFuture, RecvFuture, Sen... using emper::Semaphore; using emper::io::CloseFuture; using emper::io::RecvFuture; using emper::io::SendFuture; // Defaults const int DECIMAL = 10; const std::string HOST = "0.0.0.0"; const std::string PORT = "12345"; const size_t ITERATIONS = 10000; const size_t CLIENTS = 10000; const size_t SIZE = 32; const size_t SERVER_BACKLOG = 1024; // Globals std::string host = HOST; std::string port = PORT; size_t iterations = ITERATIONS; size_t clients = CLIENTS; size_t size = SIZE; size_t server_backlog = SERVER_BACKLOG; std::atomic<size_t> client_ids = 0; uint64_t* client_avgs; struct addrinfo* server; static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& startSem, CPS& cps) { int client_sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol); if (client_sock < 0) { DIE_MSG_ERRNO("creating new client socket failed"); } int err = emper::io::connectAndWait(client_sock, server->ai_addr, server->ai_addrlen); if (err) { DIE_MSG_ERRNO("connecting new client socket failed"); } // We are connected -> start next client size_t next_client_id = client_ids.fetch_add(1); if (next_client_id < clients) { spawn([&, next_client_id] { clientFunc(next_client_id, readySem, startSem, cps); }, cps); } // NOLINTNEXTLINE(modernize-avoid-c-arrays) char* buf[2]; buf[0] = new char[size]; buf[1] = new char[size]; uint64_t avg_ns = 0; readySem.release(); startSem.acquire(); for (size_t i = 0; i < iterations; ++i) { char* outBuf = buf[i % 2]; char* inBuf = buf[(i + 1) % 2]; SendFuture sendFuture(client_sock, outBuf, size, MSG_NOSIGNAL); RecvFuture recvFuture(client_sock, inBuf, size, MSG_WAITALL); recvFuture.setDependency(sendFuture); auto start = std::chrono::high_resolution_clock::now(); int32_t res = recvFuture.submitAndWait(); auto end = std::chrono::high_resolution_clock::now(); if (res <= 0) { // recv failed if (res != -ECANCELED) { if (res == 0) { DIE_MSG("server closed the connection"); } errno = -res; DIE_MSG_ERRNO("receive failed"); } // send failed int32_t send_res = sendFuture.wait(); if (send_res < 0) { errno = -send_res; DIE_MSG_ERRNO("send failed"); } DIE_MSG("short send" << send_res); } if (memcmp(outBuf, inBuf, size) != 0) { DIE_MSG("got unexpected echo from server"); } auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start); uint64_t ns = duration.count(); if (unlikely(avg_ns == 0)) { avg_ns = ns; } else { avg_ns += ns; avg_ns /= 2; } } // Shutdown and close the client socket; shutdown(client_sock, SHUT_RDWR); // ShutdownFuture shut_f(client_sock, SHUT_RDWR); CloseFuture cf(client_sock); // cf.setDependency(shut_f); cf.submit(); delete[] buf[0]; delete[] buf[1]; cf.wait(); client_avgs[client_id] = avg_ns; } auto getOption(int argc, char** argv, const std::string& option) -> char* { char** end = argv + argc; char** itr = std::find(argv, end, option); if (itr != end && ++itr != end) { return *itr; } return nullptr; } static auto existsOption(int argc, char** argv, const std::string& option) -> bool { char** end = argv + argc; return std::find(argv, end, option) != end; } static void printUsage(char* name) { std::cerr << "Usage: " << name << "[-h] [-p <port>] [-c <clients>] [-i <iterations>] [-a <address>] [-s <size>] [-b " "<server backlog>] [-f <output-file>]" << std::endl; } auto main(int argc, char* argv[]) -> int { if (existsOption(argc, argv, "-h")) { printUsage(argv[0]); exit(EXIT_FAILURE); } char* port_s = getOption(argc, argv, "-p"); if (port_s) { port = std::string(port_s); } char* addr_s = getOption(argc, argv, "-a"); if (addr_s) { host = std::string(addr_s); } char* client_s = getOption(argc, argv, "-c"); if (client_s) { clients = strtol(client_s, nullptr, DECIMAL); } char* iterations_s = getOption(argc, argv, "-i"); if (iterations_s) { iterations = strtol(iterations_s, nullptr, DECIMAL); } char* size_s = getOption(argc, argv, "-s"); if (size_s) { size = strtol(size_s, nullptr, DECIMAL); } char* server_backlog_s = getOption(argc, argv, "-b"); if (server_backlog_s) { server_backlog = strtol(server_backlog_s, nullptr, DECIMAL); } int out_fd = STDOUT_FILENO; char* output_file = getOption(argc, argv, "-f"); if (output_file) { out_fd = open(output_file, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); if (out_fd < 0) { DIE_MSG_ERRNO("opening output file failed"); } } int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server); if (err) { if (err == EAI_SYSTEM) { DIE_MSG_ERRNO("getaddrinfo failed"); } else { LOGE("error in getaddrinfo: " << gai_strerror(err)); exit(EXIT_FAILURE); } } Runtime runtime; Fiber* alphaFiber = Fiber::from([&] { client_avgs = new uint64_t[clients]; CPS cps; Semaphore readySemaphore; Semaphore startSemaphore; // start first client batch for (size_t i = 0; i < server_backlog; ++i) { size_t next_client_id = client_ids.fetch_add(1); if (next_client_id >= clients) { break; } spawn( [&, next_client_id] { clientFunc(next_client_id, readySemaphore, startSemaphore, cps); }, cps); } // await the clients for (size_t i = 0; i < clients; ++i) { readySemaphore.acquire(); } // start the clients for (size_t i = 0; i < clients; ++i) { startSemaphore.release(); } // await the clients cps.wait(); uint64_t avg_ns = client_avgs[0]; for (size_t i = 1; i < clients; ++i) { avg_ns += client_avgs[i]; avg_ns /= 2; } std::stringstream sst; sst << "clients,iterations,size,time" << std::endl; sst << clients << "," << iterations << "," << size << "," << avg_ns << std::endl; auto output = sst.str(); if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) { DIE_MSG_ERRNO("writing results failed"); } if (output_file) { emper::io::closeAndForget(out_fd); } delete[] client_avgs; exit(EXIT_SUCCESS); }); runtime.scheduleFromAnywhere(*alphaFiber); runtime.waitUntilFinished(); return EXIT_FAILURE; }