From a71a6127fd7fcf311f8526a9c5bb7d41ffafab34 Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fl.fischer@fau.de>
Date: Sat, 17 Apr 2021 17:26:46 +0200
Subject: [PATCH] [EchoClient] support latency histograms

Histograms can only be collected when using a fixed amount of iterations.

When the '--histogram <file>' argument is passed each Client
collects 4 time stamps (each 8 byte):

1. Before requesting the send operation
2. After requesting the send operation
3. After getting unblocked and dispatched because the send operation finished
4. After getting unblocked and dispatched because the recv operation finished

Taking the timestamps is enabled using a template and thus does not introduce
any runtime cost if they are not used except binary size.

Before termination three latencies are calculated and written to the histogram
file as csv data for each client and each echo.

1. total_latency := (T4 - T1)
2. after_send_latency := (T4 - T2)
3. after_send_dispatch_latency := (T4 - T3)
---
 apps/EchoClient.cpp | 182 ++++++++++++++++++++++++++++++++++++--------
 1 file changed, 150 insertions(+), 32 deletions(-)

diff --git a/apps/EchoClient.cpp b/apps/EchoClient.cpp
index 72c70e05..1a477348 100644
--- a/apps/EchoClient.cpp
+++ b/apps/EchoClient.cpp
@@ -56,6 +56,9 @@ const size_t SERVER_BACKLOG = 1024;
 
 const seconds ONERROR_SLEEP(10);
 
+const int OPENFLAGS = O_CREAT | O_WRONLY;
+const int OPENMODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
+
 // Globals
 std::string host = HOST;
 std::string port = PORT;
@@ -65,11 +68,19 @@ size_t nclients = CLIENTS;
 size_t size = SIZE;
 size_t server_backlog = SERVER_BACKLOG;
 bool linked_futures;
+bool histogram = false;
 
 std::atomic<bool> terminate = false;
 
 struct addrinfo* server;
 
+struct TimeStamps {
+	high_resolution_clock::time_point start;
+	high_resolution_clock::time_point afterSend;
+	high_resolution_clock::time_point afterSendDispatch;
+	high_resolution_clock::time_point afterRecv;
+};
+
 class Client {
 	using IterationResult = std::pair<int, std::string>;
 
@@ -93,17 +104,44 @@ class Client {
 	size_t unexpectedEchos = 0;
 	size_t reconnects = 0;
 
-	Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps)
+	struct TimeStamps* timeStamps = nullptr;
+
+	Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps, bool collectTimeStamps)
 			: id(id), readySem(readySem), startSem(startSem), cps(cps) {
 		outBuf = new char[size];
 		inBuf = new char[size];
+
+		if (collectTimeStamps) {
+			timeStamps = new TimeStamps[iterations];
+		}
+	}
+
+	~Client() {
+		delete[] outBuf;
+		delete[] inBuf;
+		delete[] timeStamps;
 	}
 
 	static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool;
 
  private:
-	static auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
-		int32_t bytes_send = sendFuture.submitAndWait();
+	template <bool collectTimeStamps = false>
+	auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
+		if constexpr (collectTimeStamps) {
+			timeStamps[iteration].start = high_resolution_clock::now();
+		}
+
+		sendFuture.submit();
+
+		if constexpr (collectTimeStamps) {
+			timeStamps[iteration].afterSend = high_resolution_clock::now();
+		}
+
+		int32_t bytes_send = sendFuture.wait();
+
+		if constexpr (collectTimeStamps) {
+			timeStamps[iteration].afterSendDispatch = high_resolution_clock::now();
+		}
 
 		if (unlikely(bytes_send < 0)) {
 			return {bytes_send, "send failed"};
@@ -111,6 +149,10 @@ class Client {
 
 		int32_t bytes_recv = recvFuture.submitAndWait();
 
+		if constexpr (collectTimeStamps) {
+			timeStamps[iteration].afterRecv = high_resolution_clock::now();
+		}
+
 		if (unlikely(bytes_recv < 0)) {
 			return {bytes_recv, "recv failed"};
 		}
@@ -118,10 +160,22 @@ class Client {
 		return {bytes_recv, ""};
 	}
 
-	static auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
+	template <bool collectTimeStamps = false>
+	auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
 		recvFuture.setDependency(sendFuture);
 
-		int32_t res = recvFuture.submitAndWait();
+		recvFuture.submit();
+
+		if constexpr (collectTimeStamps) {
+			timeStamps[iteration].afterSend = high_resolution_clock::now();
+		}
+
+		int32_t res = recvFuture.wait();
+
+		if constexpr (collectTimeStamps) {
+			timeStamps[iteration].afterSendDispatch = timeStamps[iteration].afterRecv =
+					high_resolution_clock::now();
+		}
 
 		if (unlikely(res <= 0)) {
 			// recv failed
@@ -172,6 +226,7 @@ class Client {
 	}
 
  public:
+	template <bool collectTimeStamps = false>
 	void run() {
 		connect();
 
@@ -188,14 +243,22 @@ class Client {
 			// prepare output buf
 			sprintf(outBuf, "%lu:%lu", id, iteration);
 
+			high_resolution_clock::time_point start;
+			if constexpr (!collectTimeStamps) {
+				start = high_resolution_clock::now();
+			}
+
 			IterationResult iterationResult;
-			auto start = high_resolution_clock::now();
 			if (linked_futures) {
-				iterationResult = linkedEcho(sendFuture, recvFuture);
+				iterationResult = linkedEcho<collectTimeStamps>(sendFuture, recvFuture);
 			} else {
-				iterationResult = echo(sendFuture, recvFuture);
+				iterationResult = echo<collectTimeStamps>(sendFuture, recvFuture);
+			}
+
+			high_resolution_clock::time_point end;
+			if constexpr (!collectTimeStamps) {
+				end = high_resolution_clock::now();
 			}
-			auto end = high_resolution_clock::now();
 
 			// less than 1 byte was received -> an error occurred
 			// (connection closed by server is considered an error)
@@ -216,9 +279,10 @@ class Client {
 				DIE;
 			}
 
-			nanoseconds duration = duration_cast<nanoseconds>(end - start);
-
-			avg_ns.update(duration.count());
+			if constexpr (!collectTimeStamps) {
+				nanoseconds duration = duration_cast<nanoseconds>(end - start);
+				avg_ns.update(duration.count());
+			}
 
 			if (memcmp(outBuf, inBuf, size) != 0) {
 				std::stringstream sst;
@@ -249,9 +313,6 @@ class Client {
 		// cf.setDependency(shut_f);
 		cf.submit();
 
-		delete[] inBuf;
-		delete[] outBuf;
-
 		cf.wait();
 	}
 };
@@ -268,8 +329,13 @@ auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> boo
 
 	spawn(
 			[&, id = next_client_id] {
-				clients[id] = new Client(id, readySem, startSem, cps);
-				clients[id]->run();
+				clients[id] = new Client(id, readySem, startSem, cps, histogram);
+
+				if (histogram) {
+					clients[id]->run<true>();
+				} else {
+					clients[id]->run();
+				}
 			},
 			cps);
 	return true;
@@ -350,6 +416,36 @@ auto main(int argc, char* argv[]) -> int {
 	int out_fd = STDOUT_FILENO;
 	char* output_file = getOption(argc, argv, "-f");
 
+	if (output_file) {
+		if (access(output_file, W_OK) == 0) {
+			DIE_MSG("Output file: " << output_file << " already exists");
+		}
+
+		out_fd = open(output_file, OPENFLAGS, OPENMODE);
+		if (out_fd < 0) {
+			DIE_MSG_ERRNO("opening output file failed");
+		}
+	}
+
+	int histogram_fd;
+	char* histogram_file = getOption(argc, argv, "--histogram");
+	if (histogram_file) {
+		if (!iterations_s) {
+			DIE_MSG("histograms are currently only possible with fixed iterations");
+		}
+
+		histogram = true;
+
+		if (access(histogram_file, W_OK) == 0) {
+			DIE_MSG("Histogram file: " << histogram_file << " already exists");
+		}
+
+		histogram_fd = open(histogram_file, OPENFLAGS, OPENMODE);
+		if (histogram_fd < 0) {
+			DIE_MSG_ERRNO("opening histogram file failed");
+		}
+	}
+
 	int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
 	if (err) {
 		if (err == EAI_SYSTEM) {
@@ -413,20 +509,6 @@ auto main(int argc, char* argv[]) -> int {
 		auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count();
 		auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count();
 
-		if (output_file) {
-			if (access(output_file, W_OK) == 0) {
-				DIE_MSG("Output file: " << output_file << " already exists");
-			}
-
-			int openflags = O_CREAT | O_WRONLY;
-			int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
-
-			out_fd = emper::io::openAndWait(output_file, openflags, mode);
-			if (out_fd < 0) {
-				DIE_MSG_ERRNO("opening output file failed");
-			}
-		}
-
 		uint64_t total_iterations = 0;
 		uint64_t total_reconnects = 0;
 		uint64_t total_unexpected_echos = 0;
@@ -436,7 +518,9 @@ auto main(int argc, char* argv[]) -> int {
 			total_iterations += client->iteration;
 			total_reconnects += client->reconnects;
 			total_unexpected_echos += client->unexpectedEchos;
-			avg_ns.update(client->avg_ns.getAverage());
+			if (!histogram) {
+				avg_ns.update(client->avg_ns.getAverage());
+			}
 		}
 
 		std::stringstream sst;
@@ -472,6 +556,40 @@ auto main(int argc, char* argv[]) -> int {
 			emper::io::closeAndForget(out_fd);
 		}
 
+		if (histogram) {
+			std::string histogramDataHeader =
+					"total_latency,after_send_latency,after_send_dispatch_latency\n";
+			if (emper::io::writeFileAndWait(histogram_fd, histogramDataHeader.c_str(),
+																			histogramDataHeader.size()) < 0) {
+				DIE_MSG_ERRNO("writing histogram data header failed");
+			}
+
+			for (size_t i = 0; i < nclients; ++i) {
+				auto* client = clients[i];
+				std::stringstream sst;
+				for (size_t iteration = 0; iteration < iterations; ++iteration) {
+					auto timeStamps = client->timeStamps[iteration];
+					nanoseconds totalLatency =
+							duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.start);
+					nanoseconds afterSendLatency =
+							duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSend);
+					nanoseconds afterSendDispatchLatency =
+							duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSendDispatch);
+					sst << totalLatency.count() << ",";
+					sst << afterSendLatency.count() << ",";
+					sst << afterSendDispatchLatency.count() << std::endl;
+				}
+
+				auto histoOutput = sst.str();
+				if (emper::io::writeFileAndWait(histogram_fd, histoOutput.c_str(), histoOutput.size()) <
+						0) {
+					DIE_MSG_ERRNO("writing histogram data failed");
+				}
+			}
+
+			emper::io::closeAndForget(histogram_fd);
+		}
+
 		for (size_t i = 0; i < nclients; ++i) {
 			delete clients[i];
 		}
-- 
GitLab