From 07782b4f2ae2d00ab858b2a050175977219c8e27 Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fl.fischer@fau.de>
Date: Thu, 21 Jan 2021 18:29:02 +0100
Subject: [PATCH] [IO] add an echo client implementation suitable for > C10k
 experiments

The echo client established X connections and start the
echo phase after all sockets are connected.

Each client Fiber measures the time from sending the message until
receiving the echo.
---
 apps/EchoClient.cpp | 240 ++++++++++++++++++++++++++++++++++++++++++++
 apps/meson.build    |   6 ++
 2 files changed, 246 insertions(+)
 create mode 100644 apps/EchoClient.cpp

diff --git a/apps/EchoClient.cpp b/apps/EchoClient.cpp
new file mode 100644
index 00000000..3240643f
--- /dev/null
+++ b/apps/EchoClient.cpp
@@ -0,0 +1,240 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2021 Florian Fischer
+#include <netdb.h>			 //for getaddrinfo
+#include <sys/socket.h>	 // for shutdown, socket, AF_INET
+
+#include <algorithm>	// for find
+#include <cerrno>			// for errno, ECANCELED
+#include <chrono>			// for nanoseconds, duration, durat...
+#include <cstdint>		// for uint64_t, int32_t
+#include <cstdlib>		// for size_t, strtol, exit, EXIT_F...
+#include <cstring>		// for memcmp
+#include <iostream>		// for operator<<, basic_ostream, endl
+#include <string>			// for allocator, string, char_traits
+
+#include "Common.hpp"										 // for DIE_MSG_ERRNO, DIE_MSG, unli...
+#include "CountingPrivateSemaphore.hpp"	 // for CPS
+#include "Debug.hpp"										 // for LOGE
+#include "Fiber.hpp"										 // for Fiber
+#include "Runtime.hpp"									 // for Runtime
+#include "Semaphore.hpp"								 // for Semaphore
+#include "emper.hpp"										 // for spawn
+#include "io.hpp"												 // for connectAndWait
+#include "io/Future.hpp"								 // for CloseFuture, RecvFuture, Sen...
+
+using emper::Semaphore;
+using emper::io::CloseFuture;
+using emper::io::RecvFuture;
+using emper::io::SendFuture;
+
+// Defaults
+const int DECIMAL = 10;
+const std::string HOST = "0.0.0.0";
+const std::string PORT = "12345";
+const size_t ITERATIONS = 10000;
+const size_t CLIENTS = 10000;
+const size_t SIZE = 32;
+
+static auto clientFunc(struct addrinfo& server, size_t iterations, size_t size, Semaphore& readySem,
+											 Semaphore& startSem) -> uint64_t {
+	int client_sock = socket(server.ai_family, server.ai_socktype, server.ai_protocol);
+	if (client_sock < 0) {
+		DIE_MSG_ERRNO("creating new client socket failed");
+	}
+
+	int err = emper::io::connectAndWait(client_sock, server.ai_addr, server.ai_addrlen);
+	if (err) {
+		DIE_MSG_ERRNO("connecting new client socket failed");
+	}
+
+	// NOLINTNEXTLINE(modernize-avoid-c-arrays)
+	char* buf[2];
+	buf[0] = new char[size];
+	buf[1] = new char[size];
+	uint64_t avg_ns = 0;
+
+	readySem.release();
+	startSem.acquire();
+
+	for (size_t i = 0; i < iterations; ++i) {
+		char* outBuf = buf[i % 2];
+		char* inBuf = buf[(i + 1) % 2];
+
+		SendFuture sendFuture(client_sock, outBuf, size, MSG_NOSIGNAL);
+		RecvFuture recvFuture(client_sock, inBuf, size, MSG_WAITALL);
+		recvFuture.setDependency(sendFuture);
+
+		auto start = std::chrono::high_resolution_clock::now();
+		int32_t res = recvFuture.submitAndWait();
+		auto end = std::chrono::high_resolution_clock::now();
+
+		if (res <= 0) {
+			// recv failed
+			if (res != -ECANCELED) {
+				if (res == 0) {
+					DIE_MSG("server closed the connection");
+				}
+
+				errno = -res;
+				DIE_MSG_ERRNO("receive failed");
+			}
+
+			// send failed
+			int32_t send_res = sendFuture.wait();
+			if (send_res < 0) {
+				errno = -send_res;
+				DIE_MSG_ERRNO("send failed");
+			}
+
+			DIE_MSG("short send" << send_res);
+		}
+
+		if (memcmp(outBuf, inBuf, size) != 0) {
+			DIE_MSG("got unexpected echo from server");
+		}
+
+		auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
+		uint64_t ns = duration.count();
+		if (unlikely(avg_ns == 0)) {
+			avg_ns = ns;
+		} else {
+			avg_ns += ns;
+			avg_ns /= 2;
+		}
+	}
+
+	// Shutdown and close the client socket;
+	shutdown(client_sock, SHUT_RDWR);
+
+	// ShutdownFuture shut_f(client_sock, SHUT_RDWR);
+	CloseFuture cf(client_sock);
+	// cf.setDependency(shut_f);
+	cf.submit();
+
+	delete[] buf[0];
+	delete[] buf[1];
+
+	cf.wait();
+
+	return avg_ns;
+}
+
+auto getOption(int argc, char** argv, const std::string& option) -> char* {
+	char** end = argv + argc;
+	char** itr = std::find(argv, end, option);
+
+	if (itr != end && ++itr != end) {
+		return *itr;
+	}
+
+	return nullptr;
+}
+
+static auto existsOption(int argc, char** argv, const std::string& option) -> bool {
+	char** end = argv + argc;
+	return std::find(argv, end, option) != end;
+}
+
+static void printUsage(char* name) {
+	std::cerr << "Usage: " << name
+						<< "[-h] [-p <port>] [-c <clients>] [-i <iterations>] [-a <address>] [-s <size>]"
+						<< std::endl;
+}
+
+auto main(int argc, char* argv[]) -> int {
+	std::string host = HOST;
+	std::string port = PORT;
+	size_t iterations = ITERATIONS;
+	size_t clients = CLIENTS;
+	size_t size = SIZE;
+
+	if (existsOption(argc, argv, "-h")) {
+		printUsage(argv[0]);
+		exit(EXIT_FAILURE);
+	}
+
+	char* port_s = getOption(argc, argv, "-p");
+	if (port_s) {
+		port = std::string(port_s);
+	}
+
+	char* addr_s = getOption(argc, argv, "-a");
+	if (addr_s) {
+		host = std::string(addr_s);
+	}
+
+	char* client_s = getOption(argc, argv, "-c");
+	if (client_s) {
+		clients = strtol(client_s, nullptr, DECIMAL);
+	}
+
+	char* iterations_s = getOption(argc, argv, "-i");
+	if (iterations_s) {
+		iterations = strtol(iterations_s, nullptr, DECIMAL);
+	}
+
+	char* size_s = getOption(argc, argv, "-s");
+	if (size_s) {
+		size = strtol(size_s, nullptr, DECIMAL);
+	}
+
+	struct addrinfo* server;
+	int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
+	if (err) {
+		if (err == EAI_SYSTEM) {
+			DIE_MSG_ERRNO("getaddrinfo failed");
+		} else {
+			LOGE("error in getaddrinfo: " << gai_strerror(err));
+			exit(EXIT_FAILURE);
+		}
+	}
+
+	Runtime runtime;
+
+	Fiber* alphaFiber = Fiber::from([&] {
+		auto* client_avgs = new uint64_t[clients];
+		CPS cps;
+		Semaphore readySemaphore;
+		Semaphore startSemaphore;
+
+		// establish client connections
+		for (size_t i = 0; i < clients; ++i) {
+			spawn(
+					[&, i] {
+						client_avgs[i] = clientFunc(*server, iterations, size, readySemaphore, startSemaphore);
+					},
+					cps);
+		}
+
+		// await the clients
+		for (size_t i = 0; i < clients; ++i) {
+			readySemaphore.acquire();
+		}
+
+		// start the clients
+		for (size_t i = 0; i < clients; ++i) {
+			startSemaphore.release();
+		}
+
+		// await the clients
+		cps.wait();
+
+		uint64_t avg_ns = client_avgs[0];
+		for (size_t i = 1; i < clients; ++i) {
+			avg_ns += client_avgs[i];
+			avg_ns /= 2;
+		}
+
+		std::cout << "clients,iterations,size,time" << std::endl;
+		std::cout << clients << "," << iterations << "," << size << "," << avg_ns << std::endl;
+		delete[] client_avgs;
+
+		exit(EXIT_SUCCESS);
+	});
+
+	runtime.scheduleFromAnywhere(*alphaFiber);
+
+	runtime.waitUntilFinished();
+
+	return EXIT_FAILURE;
+}
diff --git a/apps/meson.build b/apps/meson.build
index 82a23a56..5075128a 100644
--- a/apps/meson.build
+++ b/apps/meson.build
@@ -15,3 +15,9 @@ echoserver_exe = executable(
   'EchoServer.cpp',
   dependencies: emper_dep,
 )
+
+echoclient_exe = executable(
+  'echoclient',
+  'EchoClient.cpp',
+  dependencies: emper_dep,
+)
-- 
GitLab