Newer
Older
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <netdb.h> //for getaddrinfo
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h> // for shutdown, socket, AF_INET
#include <sys/stat.h>
#include <unistd.h>
#include <algorithm> // for find
#include <array>
#include <atomic>
#include <cerrno> // for errno, ECANCELED
#include <chrono> // for nanoseconds, duration, durat...
#include <cstdint> // for uint64_t, int32_t
#include <cstdlib> // for size_t, strtol, exit, EXIT_F...
#include <cstring> // for memcmp
#include <iostream> // for operator<<, basic_ostream, endl
#include <string> // for allocator, string, char_traits
#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli...
#include "CountingPrivateSemaphore.hpp" // for CPS
#include "Debug.hpp" // for LOGE
#include "Fiber.hpp" // for Fiber
#include "Runtime.hpp" // for Runtime
#include "Semaphore.hpp" // for Semaphore
#include "emper.hpp" // for spawn
#include "io.hpp" // for connectAndWait
#include "io/Future.hpp" // for CloseFuture, RecvFuture, Sen...
using emper::Semaphore;
using emper::io::CloseFuture;
using emper::io::RecvFuture;
using emper::io::SendFuture;
using std::chrono::duration_cast;
using std::chrono::high_resolution_clock;
using std::chrono::nanoseconds;
// Defaults
const int DECIMAL = 10;
const std::string HOST = "0.0.0.0";
const std::string PORT = "12345";
const size_t ITERATIONS = 10000;
const size_t CLIENTS = 10000;
const size_t SIZE = 32;
const size_t SERVER_BACKLOG = 1024;
// Globals
std::string host = HOST;
std::string port = PORT;
size_t iterations = ITERATIONS;
size_t execution_seconds = 0;
size_t clients = CLIENTS;
size_t size = SIZE;
size_t server_backlog = SERVER_BACKLOG;
std::atomic<bool> terminate = false;
std::atomic<size_t> client_ids = 0;
struct ClientResult {
size_t iterations;
size_t avg_ns;
};
using ClientResult = struct ClientResult;
struct ClientResult* clientResults;
struct addrinfo* server;
static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& startSem, CPS& cps) {
int client_sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
if (client_sock < 0) {
DIE_MSG_ERRNO("creating new client socket failed");
}
int err = emper::io::connectAndWait(client_sock, server->ai_addr, server->ai_addrlen);
if (err) {
DIE_MSG_ERRNO("connecting new client socket failed");
}
// We are connected -> start next client
size_t next_client_id = client_ids.fetch_add(1);
if (next_client_id < clients) {
spawn([&, next_client_id] { clientFunc(next_client_id, readySem, startSem, cps); }, cps);
}
int enable = 1;
if (setsockopt(client_sock, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) == -1) {
DIE_MSG_ERRNO("setsockopt failed");
}
std::array<char*, 2> buf;
buf[0] = new char[size];
buf[1] = new char[size];
uint64_t avg_ns = 0;
readySem.release();
startSem.acquire();
size_t i = 0;
while (i < iterations && !terminate.load(std::memory_order_relaxed)) {
char* outBuf = buf[i % 2];
char* inBuf = buf[(i + 1) % 2];
SendFuture sendFuture(client_sock, outBuf, size, MSG_NOSIGNAL);
RecvFuture recvFuture(client_sock, inBuf, size, MSG_WAITALL);
recvFuture.setDependency(sendFuture);
auto start = high_resolution_clock::now();
int32_t res = recvFuture.submitAndWait();
auto end = high_resolution_clock::now();
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
if (res <= 0) {
// recv failed
if (res != -ECANCELED) {
if (res == 0) {
DIE_MSG("server closed the connection");
}
errno = -res;
DIE_MSG_ERRNO("receive failed");
}
// send failed
int32_t send_res = sendFuture.wait();
if (send_res < 0) {
errno = -send_res;
DIE_MSG_ERRNO("send failed");
}
DIE_MSG("short send" << send_res);
}
if (memcmp(outBuf, inBuf, size) != 0) {
DIE_MSG("got unexpected echo from server");
}
auto duration = duration_cast<nanoseconds>(end - start);
uint64_t ns = duration.count();
if (unlikely(avg_ns == 0)) {
avg_ns = ns;
} else {
avg_ns += ns;
avg_ns /= 2;
}
}
// Shutdown and close the client socket;
shutdown(client_sock, SHUT_RDWR);
// ShutdownFuture shut_f(client_sock, SHUT_RDWR);
CloseFuture cf(client_sock);
// cf.setDependency(shut_f);
cf.submit();
delete[] buf[0];
delete[] buf[1];
cf.wait();
clientResults[client_id].avg_ns = avg_ns;
clientResults[client_id].iterations = i;
}
auto getOption(int argc, char** argv, const std::string& option) -> char* {
char** end = argv + argc;
char** itr = std::find(argv, end, option);
if (itr != end && ++itr != end) {
return *itr;
}
return nullptr;
}
static auto existsOption(int argc, char** argv, const std::string& option) -> bool {
char** end = argv + argc;
return std::find(argv, end, option) != end;
}
static void printUsage(char* name) {
std::cerr << "Usage: " << name
<< "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]"
" [-f <output-file>] [-i <iterations> | -t <execution time in sec>]"
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
<< std::endl;
}
auto main(int argc, char* argv[]) -> int {
if (existsOption(argc, argv, "-h")) {
printUsage(argv[0]);
exit(EXIT_FAILURE);
}
char* port_s = getOption(argc, argv, "-p");
if (port_s) {
port = std::string(port_s);
}
char* addr_s = getOption(argc, argv, "-a");
if (addr_s) {
host = std::string(addr_s);
}
char* client_s = getOption(argc, argv, "-c");
if (client_s) {
clients = strtol(client_s, nullptr, DECIMAL);
}
char* iterations_s = getOption(argc, argv, "-i");
if (iterations_s) {
iterations = strtol(iterations_s, nullptr, DECIMAL);
}
char* time_s = getOption(argc, argv, "-t");
if (time_s) {
if (iterations_s) {
std::cerr << "-t and -i are mutual exclusive" << std::endl;
printUsage(argv[0]);
exit(EXIT_FAILURE);
}
execution_seconds = strtol(time_s, nullptr, DECIMAL);
}
char* size_s = getOption(argc, argv, "-s");
if (size_s) {
size = strtol(size_s, nullptr, DECIMAL);
}
char* server_backlog_s = getOption(argc, argv, "-b");
if (server_backlog_s) {
server_backlog = strtol(server_backlog_s, nullptr, DECIMAL);
}
int out_fd = STDOUT_FILENO;
char* output_file = getOption(argc, argv, "-f");
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
if (err) {
if (err == EAI_SYSTEM) {
DIE_MSG_ERRNO("getaddrinfo failed");
} else {
LOGE("error in getaddrinfo: " << gai_strerror(err));
exit(EXIT_FAILURE);
}
}
Runtime runtime;
Fiber* alphaFiber = Fiber::from([&] {
clientResults = new ClientResult[clients];
CPS cps;
Semaphore readySemaphore;
Semaphore startSemaphore;
auto connect_start = high_resolution_clock::now();
// start first client batch
for (size_t i = 0; i < server_backlog; ++i) {
size_t next_client_id = client_ids.fetch_add(1);
if (next_client_id >= clients) {
break;
}
spawn(
[&, next_client_id] { clientFunc(next_client_id, readySemaphore, startSemaphore, cps); },
cps);
}
// await the clients
for (size_t i = 0; i < clients; ++i) {
readySemaphore.acquire();
}
if (execution_seconds) {
iterations = SIZE_MAX;
terminator = std::thread([] {
std::this_thread::sleep_for(seconds(execution_seconds));
terminate.store(true);
auto echo_start = high_resolution_clock::now();
// start the clients
for (size_t i = 0; i < clients; ++i) {
startSemaphore.release();
}
// await the clients
cps.wait();
if (execution_seconds) {
terminator.join();
}
auto echo_end = high_resolution_clock::now();
uint64_t avg_ns = clientResults[0].avg_ns;
uint64_t avg_iterations = clientResults[0].iterations;
for (size_t i = 1; i < clients; ++i) {
avg_ns += clientResults[i].avg_ns;
avg_ns /= 2;
avg_iterations += clientResults[i].iterations;
avg_iterations /= 2;
}
auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count();
auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count();
auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count();
bool exists = true;
if (output_file) {
exists = access(output_file, W_OK) == 0;
int openflags;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
if (!exists) {
openflags = O_CREAT | O_WRONLY;
} else {
openflags = O_APPEND;
}
out_fd = emper::io::openAndWait(output_file, openflags, mode);
if (out_fd < 0) {
DIE_MSG_ERRNO("opening output file failed");
}
}
if (!exists) {
sst << "clients,iterations,size,avg_ns,connect,echo,total" << std::endl;
}
sst << clients << "," << avg_iterations << "," << size << "," << avg_ns << ","
<< connect_duration << "," << echo_duration << "," << total_duration << std::endl;
auto output = sst.str();
if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) {
DIE_MSG_ERRNO("writing results failed");
}
if (output_file) {
emper::io::closeAndForget(out_fd);
}