Skip to content
Snippets Groups Projects
Commit 83f52451 authored by Florian Fischer's avatar Florian Fischer
Browse files

[EchoClient] make client server backlog aware

Start client Fibers in batches to make sure we don't exceed the server's
TCP listen backlog.
parent 0698f1c8
No related branches found
No related tags found
1 merge request!82Improve networking code
...@@ -4,13 +4,14 @@ ...@@ -4,13 +4,14 @@
#include <sys/socket.h> // for shutdown, socket, AF_INET #include <sys/socket.h> // for shutdown, socket, AF_INET
#include <algorithm> // for find #include <algorithm> // for find
#include <cerrno> // for errno, ECANCELED #include <atomic>
#include <chrono> // for nanoseconds, duration, durat... #include <cerrno> // for errno, ECANCELED
#include <cstdint> // for uint64_t, int32_t #include <chrono> // for nanoseconds, duration, durat...
#include <cstdlib> // for size_t, strtol, exit, EXIT_F... #include <cstdint> // for uint64_t, int32_t
#include <cstring> // for memcmp #include <cstdlib> // for size_t, strtol, exit, EXIT_F...
#include <iostream> // for operator<<, basic_ostream, endl #include <cstring> // for memcmp
#include <string> // for allocator, string, char_traits #include <iostream> // for operator<<, basic_ostream, endl
#include <string> // for allocator, string, char_traits
#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli... #include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli...
#include "CountingPrivateSemaphore.hpp" // for CPS #include "CountingPrivateSemaphore.hpp" // for CPS
...@@ -34,19 +35,38 @@ const std::string PORT = "12345"; ...@@ -34,19 +35,38 @@ const std::string PORT = "12345";
const size_t ITERATIONS = 10000; const size_t ITERATIONS = 10000;
const size_t CLIENTS = 10000; const size_t CLIENTS = 10000;
const size_t SIZE = 32; const size_t SIZE = 32;
const size_t SERVER_BACKLOG = 1024;
static auto clientFunc(struct addrinfo& server, size_t iterations, size_t size, Semaphore& readySem, // Globals
Semaphore& startSem) -> uint64_t { std::string host = HOST;
int client_sock = socket(server.ai_family, server.ai_socktype, server.ai_protocol); std::string port = PORT;
size_t iterations = ITERATIONS;
size_t clients = CLIENTS;
size_t size = SIZE;
size_t server_backlog = SERVER_BACKLOG;
std::atomic<size_t> client_ids = 0;
uint64_t* client_avgs;
struct addrinfo* server;
static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& startSem, CPS& cps) {
int client_sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
if (client_sock < 0) { if (client_sock < 0) {
DIE_MSG_ERRNO("creating new client socket failed"); DIE_MSG_ERRNO("creating new client socket failed");
} }
int err = emper::io::connectAndWait(client_sock, server.ai_addr, server.ai_addrlen); int err = emper::io::connectAndWait(client_sock, server->ai_addr, server->ai_addrlen);
if (err) { if (err) {
DIE_MSG_ERRNO("connecting new client socket failed"); DIE_MSG_ERRNO("connecting new client socket failed");
} }
// We are connected -> start next client
size_t next_client_id = client_ids.fetch_add(1);
if (next_client_id < clients) {
spawn([&, next_client_id] { clientFunc(next_client_id, readySem, startSem, cps); }, cps);
}
// NOLINTNEXTLINE(modernize-avoid-c-arrays) // NOLINTNEXTLINE(modernize-avoid-c-arrays)
char* buf[2]; char* buf[2];
buf[0] = new char[size]; buf[0] = new char[size];
...@@ -116,7 +136,7 @@ static auto clientFunc(struct addrinfo& server, size_t iterations, size_t size, ...@@ -116,7 +136,7 @@ static auto clientFunc(struct addrinfo& server, size_t iterations, size_t size,
cf.wait(); cf.wait();
return avg_ns; client_avgs[client_id] = avg_ns;
} }
auto getOption(int argc, char** argv, const std::string& option) -> char* { auto getOption(int argc, char** argv, const std::string& option) -> char* {
...@@ -137,17 +157,12 @@ static auto existsOption(int argc, char** argv, const std::string& option) -> bo ...@@ -137,17 +157,12 @@ static auto existsOption(int argc, char** argv, const std::string& option) -> bo
static void printUsage(char* name) { static void printUsage(char* name) {
std::cerr << "Usage: " << name std::cerr << "Usage: " << name
<< "[-h] [-p <port>] [-c <clients>] [-i <iterations>] [-a <address>] [-s <size>]" << "[-h] [-p <port>] [-c <clients>] [-i <iterations>] [-a <address>] [-s <size>] [-b "
"<server backlog>]"
<< std::endl; << std::endl;
} }
auto main(int argc, char* argv[]) -> int { auto main(int argc, char* argv[]) -> int {
std::string host = HOST;
std::string port = PORT;
size_t iterations = ITERATIONS;
size_t clients = CLIENTS;
size_t size = SIZE;
if (existsOption(argc, argv, "-h")) { if (existsOption(argc, argv, "-h")) {
printUsage(argv[0]); printUsage(argv[0]);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
...@@ -178,7 +193,11 @@ auto main(int argc, char* argv[]) -> int { ...@@ -178,7 +193,11 @@ auto main(int argc, char* argv[]) -> int {
size = strtol(size_s, nullptr, DECIMAL); size = strtol(size_s, nullptr, DECIMAL);
} }
struct addrinfo* server; char* server_backlog_s = getOption(argc, argv, "-b");
if (server_backlog_s) {
server_backlog = strtol(server_backlog_s, nullptr, DECIMAL);
}
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server); int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
if (err) { if (err) {
if (err == EAI_SYSTEM) { if (err == EAI_SYSTEM) {
...@@ -192,17 +211,19 @@ auto main(int argc, char* argv[]) -> int { ...@@ -192,17 +211,19 @@ auto main(int argc, char* argv[]) -> int {
Runtime runtime; Runtime runtime;
Fiber* alphaFiber = Fiber::from([&] { Fiber* alphaFiber = Fiber::from([&] {
auto* client_avgs = new uint64_t[clients]; client_avgs = new uint64_t[clients];
CPS cps; CPS cps;
Semaphore readySemaphore; Semaphore readySemaphore;
Semaphore startSemaphore; Semaphore startSemaphore;
// establish client connections // start first client batch
for (size_t i = 0; i < clients; ++i) { for (size_t i = 0; i < server_backlog; ++i) {
size_t next_client_id = client_ids.fetch_add(1);
if (next_client_id >= clients) {
break;
}
spawn( spawn(
[&, i] { [&, next_client_id] { clientFunc(next_client_id, readySemaphore, startSemaphore, cps); },
client_avgs[i] = clientFunc(*server, iterations, size, readySemaphore, startSemaphore);
},
cps); cps);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment