From 8f01db6299daa857ba2634c279bdf7e431a80ae8 Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fl.fischer@fau.de>
Date: Mon, 12 Apr 2021 15:00:10 +0200
Subject: [PATCH] [EchoClient] reconnect on ECONNRESETC and C++-ify the code

* Unconditionally reconnect when the TCP connection was reset and count the reconnects
* Send Client id and iteration in each echo
* Use a Client class holding a clients state instead of a pthread style clientFunc
* Create the Client object in the client Fiber to use NUMA first-touch policy
* Reduce code duplication:
  * starting a new Client is now done with Client::startNew
  * latency measurement and error handling are moved from the clientIteration*
    functions in the Client::run function
---
 apps/EchoClient.cpp | 326 +++++++++++++++++++++++++-------------------
 1 file changed, 189 insertions(+), 137 deletions(-)

diff --git a/apps/EchoClient.cpp b/apps/EchoClient.cpp
index 174f3cc8..830d23bb 100644
--- a/apps/EchoClient.cpp
+++ b/apps/EchoClient.cpp
@@ -13,6 +13,7 @@
 #include <cerrno>		// for errno, ECANCELED
 #include <chrono>		// for nanoseconds, duration, durat...
 #include <cstdint>	// for uint64_t, int32_t
+#include <cstdio>
 #include <cstdlib>	// for size_t, strtol, exit, EXIT_F...
 #include <cstring>	// for memcmp
 #include <iomanip>
@@ -60,171 +61,218 @@ std::string host = HOST;
 std::string port = PORT;
 size_t iterations = ITERATIONS;
 size_t execution_seconds = 0;
-size_t clients = CLIENTS;
+size_t nclients = CLIENTS;
 size_t size = SIZE;
 size_t server_backlog = SERVER_BACKLOG;
 bool linked_futures;
 
 std::atomic<bool> terminate = false;
 
-std::atomic<size_t> client_ids = 0;
+struct addrinfo* server;
 
-struct ClientResult {
-	size_t iterations;
-	size_t avg_ns;
-	size_t unexpectedEchos;
-};
-using ClientResult = struct ClientResult;
+class Client {
+	using IterationResult = std::pair<int, std::string>;
 
-struct ClientResult* clientResults;
+	size_t id;
+	Semaphore& readySem;
+	Semaphore& startSem;
+	CPS& cps;
 
-struct addrinfo* server;
+	int sock;
+	bool initialized = false;
 
-static auto clientIteration(SendFuture& sendFuture, RecvFuture& recvFuture)
-		-> std::pair<nanoseconds, std::string> {
-	nanoseconds duration(0);
-	auto send_start = high_resolution_clock::now();
-	int32_t bytes_send = sendFuture.submitAndWait();
-	auto send_end = high_resolution_clock::now();
+	char* inBuf;
+	char* outBuf;
 
-	if (bytes_send < 0) {
-		return {duration, EMPER_BUILD_STR("send failed: " << strerror(-bytes_send))};
-	}
+ public:
+	static std::atomic<size_t> client_ids;
 
-	auto recv_start = high_resolution_clock::now();
-	int32_t bytes_recv = recvFuture.submitAndWait();
-	auto recv_end = high_resolution_clock::now();
+	// results
+	size_t iteration = 0;
+	RunningAverage<double> avg_ns;
+	size_t unexpectedEchos = 0;
+	size_t reconnects = 0;
 
-	if (bytes_recv < 0) {
-		return {duration, EMPER_BUILD_STR("recv failed: " << strerror(-bytes_recv))};
+	Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps)
+			: id(id), readySem(readySem), startSem(startSem), cps(cps) {
+		outBuf = new char[size];
+		inBuf = new char[size];
 	}
 
-	duration = duration_cast<nanoseconds>((send_end - send_start) + (recv_end - recv_start));
-	return {duration, ""};
-}
-
-static auto clientIterationLinked(SendFuture& sendFuture, RecvFuture& recvFuture)
-		-> std::pair<nanoseconds, std::string> {
-	nanoseconds duration(0);
-	recvFuture.setDependency(sendFuture);
-
-	auto start = high_resolution_clock::now();
-	int32_t res = recvFuture.submitAndWait();
-	auto end = high_resolution_clock::now();
+	static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool;
 
-	if (res <= 0) {
-		// recv failed
-		if (res != -ECANCELED) {
-			if (res == 0) {
-				return {duration, "server closed the connection"};
-			}
+ private:
+	static auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
+		int32_t bytes_send = sendFuture.submitAndWait();
 
-			return {duration, EMPER_BUILD_STR("receive failed: " << strerror(-res))};
+		if (unlikely(bytes_send < 0)) {
+			return {bytes_send, "send failed"};
 		}
 
-		// send failed
-		int32_t send_res = sendFuture.wait();
-		if (send_res < 0) {
-			return {duration, EMPER_BUILD_STR("send failed: " << strerror(-send_res))};
+		int32_t bytes_recv = recvFuture.submitAndWait();
+
+		if (unlikely(bytes_recv < 0)) {
+			return {bytes_recv, "recv failed"};
 		}
 
-		// send must send less than expected because recvFuture returned -ECANCELED
-		return {duration, EMPER_BUILD_STR("short send: " << send_res)};
+		return {bytes_recv, ""};
 	}
 
-	duration = duration_cast<nanoseconds>(end - start);
-	return {duration, ""};
-}
+	static auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
+		recvFuture.setDependency(sendFuture);
 
-static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& startSem, CPS& cps) {
-	int client_sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
-	if (client_sock < 0) {
-		DIE_MSG_ERRNO("creating new client socket failed");
-	}
+		int32_t res = recvFuture.submitAndWait();
 
-	int err = emper::io::connectAndWait(client_sock, server->ai_addr, server->ai_addrlen);
-	if (err) {
-		DIE_MSG_ERRNO("connecting new client socket failed");
-	}
+		if (unlikely(res <= 0)) {
+			// recv failed
+			if (res != -ECANCELED) {
+				if (res == 0) {
+					return {0, "server closed the connection"};
+				}
 
-	// We are connected -> start next client
-	size_t next_client_id = client_ids.fetch_add(1);
-	if (next_client_id < clients) {
-		spawn([&, next_client_id] { clientFunc(next_client_id, readySem, startSem, cps); }, cps);
-	}
+				return {res, "recv failed"};
+			}
+
+			// send failed
+			int32_t send_res = sendFuture.wait();
+			if (send_res < 0) {
+				return {send_res, "send failed"};
+			}
+
+			// send must have sent less than expected because recvFuture returned -ECANCELED
+			return {0, EMPER_BUILD_STR("short send: " << send_res)};
+		}
 
-	int enable = 1;
-	if (setsockopt(client_sock, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) == -1) {
-		DIE_MSG_ERRNO("setsockopt failed");
+		return {res, ""};
 	}
 
-	auto* outBuf = new char[size];
-	auto* inBuf = new char[size];
-	RunningAverage avg_ns;
+	void connect() {
+		sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
+		if (sock < 0) {
+			DIE_MSG_ERRNO("creating new client socket failed");
+		}
 
-	readySem.release();
-	startSem.acquire();
+		int enable = 1;
+		if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) == -1) {
+			DIE_MSG_ERRNO("setsockopt failed");
+		}
 
-	size_t i = 0;
-	while (i < iterations && !terminate.load(std::memory_order_relaxed)) {
-		SendFuture sendFuture(client_sock, outBuf, size, MSG_NOSIGNAL);
-		RecvFuture recvFuture(client_sock, inBuf, size, MSG_WAITALL);
+		int err = emper::io::connectAndWait(sock, server->ai_addr, server->ai_addrlen);
+		if (err) {
+			DIE_MSG_ERRNO("connecting new client socket failed");
+		}
 
-		std::pair<nanoseconds, std::string> res;
-		if (linked_futures) {
-			res = clientIterationLinked(sendFuture, recvFuture);
+		if (!initialized) {
+			// We are connected -> start next client
+			Client::startNew(readySem, startSem, cps);
+			initialized = true;
 		} else {
-			res = clientIteration(sendFuture, recvFuture);
+			reconnects++;
 		}
+	}
 
-		auto& duration = res.first;
-		auto& err = res.second;
+ public:
+	void run() {
+		connect();
 
-		if (!err.empty()) {
-			LOGE("Client failed at iteration " << i << " with " << err);
-			std::this_thread::sleep_for(ONERROR_SLEEP);
-			DIE;
-		}
+		// signal that this client is initialized and connected
+		readySem.release();
 
-		avg_ns.update(duration.count());
+		// wait for all clients to be ready
+		startSem.acquire();
 
-		if (memcmp(outBuf, inBuf, size) != 0) {
-			std::stringstream sst;
-			sst << "got unexpected echo from server" << std::endl;
-			sst << "expected: ";
-			for (unsigned i = 0; i < size; ++i) {
-				sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]);
+		while (iteration < iterations && !terminate.load(std::memory_order_relaxed)) {
+			SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL);
+			RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL);
+
+			// prepare output buf
+			sprintf(outBuf, "%lu:%lu", id, iteration);
+
+			IterationResult iterationResult;
+			auto start = high_resolution_clock::now();
+			if (linked_futures) {
+				iterationResult = linkedEcho(sendFuture, recvFuture);
+			} else {
+				iterationResult = echo(sendFuture, recvFuture);
+			}
+			auto end = high_resolution_clock::now();
+
+			// less than 1 byte was received -> an error occurred
+			// (connection closed by server is considered an error)
+			if (unlikely(iterationResult.first <= 0)) {
+				int err = -iterationResult.first;
+				auto& msg = iterationResult.second;
+				if (err == ECONNRESET) {
+					LOGW("Client " << id << " reconnecting");
+					close(sock);
+					// reconnect and try again
+					connect();
+					continue;
+				}
+
+				LOGE("Client failed at iteration " << iteration << " with " << msg << ": "
+																					 << strerror(err));
+				std::this_thread::sleep_for(ONERROR_SLEEP);
+				DIE;
 			}
-			sst << std::endl;
 
-			sst << "received: ";
-			for (unsigned i = 0; i < size; ++i) {
-				sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]);
-				sst << (unsigned)inBuf[i] << " ";
+			nanoseconds duration = duration_cast<nanoseconds>(end - start);
+
+			avg_ns.update(duration.count());
+
+			if (memcmp(outBuf, inBuf, size) != 0) {
+				std::stringstream sst;
+				sst << "got unexpected echo from server" << std::endl;
+				sst << "expected: ";
+				for (unsigned i = 0; i < size; ++i) {
+					sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]);
+				}
+				sst << std::endl;
+
+				sst << "received: ";
+				for (unsigned i = 0; i < size; ++i) {
+					sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]);
+					sst << (unsigned)inBuf[i] << " ";
+				}
+				sst << std::endl;
+				LOGE(sst.str());
+				unexpectedEchos++;
 			}
-			sst << std::endl;
-			LOGE(sst.str());
-			clientResults[client_id].unexpectedEchos++;
+
+			iteration++;
 		}
 
-		i++;
-	}
+		shutdown(sock, SHUT_RDWR);
+
+		// ShutdownFuture shut_f(sock, SHUT_RDWR);
+		CloseFuture cf(sock);
+		// cf.setDependency(shut_f);
+		cf.submit();
+
+		delete[] inBuf;
+		delete[] outBuf;
 
-	// Shutdown and close the client socket;
-	shutdown(client_sock, SHUT_RDWR);
+		cf.wait();
+	}
+};
 
-	// ShutdownFuture shut_f(client_sock, SHUT_RDWR);
-	CloseFuture cf(client_sock);
-	// cf.setDependency(shut_f);
-	cf.submit();
+Client** clients;
 
-	delete[] inBuf;
-	delete[] outBuf;
+std::atomic<size_t> Client::client_ids = 0;
 
-	cf.wait();
+auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool {
+	size_t next_client_id = client_ids.fetch_add(1);
+	if (next_client_id >= nclients) {
+		return false;
+	}
 
-	clientResults[client_id].avg_ns = avg_ns.getCounter();
-	clientResults[client_id].iterations = i;
+	spawn(
+			[&, id = next_client_id] {
+				clients[id] = new Client(id, readySem, startSem, cps);
+				clients[id]->run();
+			},
+			cps);
+	return true;
 }
 
 auto getOption(int argc, char** argv, const std::string& option) -> char* {
@@ -269,7 +317,7 @@ auto main(int argc, char* argv[]) -> int {
 
 	char* client_s = getOption(argc, argv, "-c");
 	if (client_s) {
-		clients = strtol(client_s, nullptr, DECIMAL);
+		nclients = strtol(client_s, nullptr, DECIMAL);
 	}
 
 	char* iterations_s = getOption(argc, argv, "-i");
@@ -315,7 +363,7 @@ auto main(int argc, char* argv[]) -> int {
 	Runtime runtime;
 
 	Fiber* alphaFiber = Fiber::from([&] {
-		clientResults = new ClientResult[clients];
+		clients = new Client*[nclients];
 		CPS cps;
 		Semaphore readySemaphore;
 		Semaphore startSemaphore;
@@ -325,17 +373,15 @@ auto main(int argc, char* argv[]) -> int {
 
 		// start first client batch
 		for (size_t i = 0; i < server_backlog; ++i) {
-			size_t next_client_id = client_ids.fetch_add(1);
-			if (next_client_id >= clients) {
+			// Start new clients until we have started all or the amount
+			// of parallel connecting clients is reached
+			if (!Client::startNew(readySemaphore, startSemaphore, cps)) {
 				break;
 			}
-			spawn(
-					[&, next_client_id] { clientFunc(next_client_id, readySemaphore, startSemaphore, cps); },
-					cps);
 		}
 
 		// await the clients
-		for (size_t i = 0; i < clients; ++i) {
+		for (size_t i = 0; i < nclients; ++i) {
 			readySemaphore.acquire();
 		}
 
@@ -350,7 +396,7 @@ auto main(int argc, char* argv[]) -> int {
 		auto echo_start = high_resolution_clock::now();
 
 		// start the clients
-		for (size_t i = 0; i < clients; ++i) {
+		for (size_t i = 0; i < nclients; ++i) {
 			startSemaphore.release();
 		}
 
@@ -365,13 +411,15 @@ auto main(int argc, char* argv[]) -> int {
 
 		RunningAverage avg_ns;
 		RunningAverage avg_iterations;
-		uint64_t unexpectedEchos = 0;
 		uint64_t total_iterations = 0;
-		for (size_t i = 0; i < clients; ++i) {
-			avg_ns.update(clientResults[i].avg_ns);
-			avg_iterations.update(clientResults[i].iterations);
-			unexpectedEchos += clientResults[i].unexpectedEchos;
-			total_iterations += clientResults[i].iterations;
+		uint64_t reconnects = 0;
+		uint64_t unexpectedEchos = 0;
+		for (size_t i = 0; i < nclients; ++i) {
+			avg_ns.update(clients[i]->avg_ns.getAverage());
+			avg_iterations.update(clients[i]->iteration);
+			total_iterations += clients[i]->iteration;
+			unexpectedEchos += clients[i]->unexpectedEchos;
+			reconnects += clients[i]->reconnects;
 		}
 
 		auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count();
@@ -397,12 +445,13 @@ auto main(int argc, char* argv[]) -> int {
 
 		std::stringstream sst;
 		if (!exists) {
-			sst << "clients,iterations,total_iterations,size,avg_ns,unexpected_echos,connect,echo,total"
+			sst << "clients,iterations,total_iterations,size,avg_ns,reconnects,unexpected_echos,connect,"
+						 "echo,total"
 					<< std::endl;
 		}
-		sst << clients << "," << avg_iterations.getAverage() << "," << total_iterations << "," << size
-				<< "," << avg_ns.getAverage() << "," << unexpectedEchos << "," << connect_duration << ","
-				<< echo_duration << "," << total_duration << std::endl;
+		sst << nclients << "," << avg_iterations.getAverage() << "," << total_iterations << "," << size
+				<< "," << avg_ns.getAverage() << "," << reconnects << "," << unexpectedEchos << ","
+				<< connect_duration << "," << echo_duration << "," << total_duration << std::endl;
 
 		auto output = sst.str();
 		if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) {
@@ -413,7 +462,10 @@ auto main(int argc, char* argv[]) -> int {
 			emper::io::closeAndForget(out_fd);
 		}
 
-		delete[] clientResults;
+		for (size_t i = 0; i < nclients; ++i) {
+			delete clients[i];
+		}
+		delete[] clients;
 
 		exit(EXIT_SUCCESS);
 	});
-- 
GitLab