Skip to content
Snippets Groups Projects
EchoClient.cpp 19.2 KiB
Newer Older
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>	//for getaddrinfo
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>	 // for shutdown, socket, AF_INET
#include <sys/stat.h>
#include <unistd.h>
#include <cerrno>		// for errno, ECANCELED
#include <chrono>		// for nanoseconds, duration, durat...
#include <cstdint>	// for uint64_t, int32_t
#include <cstdlib>	// for size_t, strtol, exit, EXIT_F...
#include <cstring>	// for memcmp
#include <iomanip>
#include <iostream>	 // for operator<<, basic_ostream, endl
#include <string>		 // for allocator, string, char_traits
#include <thread>

#include "Common.hpp"										 // for DIE_MSG_ERRNO, DIE_MSG, unli...
#include "CountingPrivateSemaphore.hpp"	 // for CPS
#include "Debug.hpp"										 // for LOGE
#include "Fiber.hpp"										 // for Fiber
#include "Runtime.hpp"									 // for Runtime
#include "Semaphore.hpp"								 // for Semaphore
#include "emper.hpp"										 // for spawn
#include "io.hpp"												 // for connectAndWait
#include "io/Future.hpp"								 // for CloseFuture, RecvFuture, Sen...
using emper::lib::math::RunningAverage;

using std::chrono::duration_cast;
using std::chrono::high_resolution_clock;
using std::chrono::nanoseconds;
using std::chrono::seconds;
// Defaults
const int DECIMAL = 10;
const std::string HOST = "0.0.0.0";
const std::string PORT = "12345";
const size_t ITERATIONS = 10000;
const size_t CLIENTS = 10000;
const size_t SIZE = 32;
const size_t SERVER_BACKLOG = 1024;
const int OPENFLAGS = O_CREAT | O_WRONLY;
const int OPENMODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;

// Globals
std::string host = HOST;
std::string port = PORT;
size_t iterations = ITERATIONS;
size_t execution_seconds = 0;
size_t size = SIZE;
size_t server_backlog = SERVER_BACKLOG;
bool linked_futures;
bool histogram = false;
std::atomic<bool> terminate = false;

struct addrinfo* coordinator;
std::string COORDINATOR_PORT = "12346";

static auto getFirstAddrInfo(const std::string& host, const std::string& port) -> struct addrinfo* {
	struct addrinfo* addrinfo;
	int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &addrinfo);
	if (err) {
		if (err == EAI_SYSTEM) {
			DIE_MSG_ERRNO("getaddrinfo failed");
		} else {
			LOGE("error in getaddrinfo: " << gai_strerror(err));
			exit(EXIT_FAILURE);
		}
	}
	return addrinfo;
}
struct TimeStamps {
	high_resolution_clock::time_point start;
	high_resolution_clock::time_point afterSend;
	high_resolution_clock::time_point afterSendDispatch;
	high_resolution_clock::time_point afterRecv;
};

class Client {
	using IterationResult = std::pair<int, std::string>;
	size_t id;
	Semaphore& readySem;
	Semaphore& startSem;
	CPS& cps;
	int sock;
	bool initialized = false;
 public:
	static std::atomic<size_t> client_ids;
	// results
	size_t iteration = 0;
	RunningAverage<double> avg_ns;
	size_t unexpectedEchos = 0;
	size_t reconnects = 0;
	struct TimeStamps* timeStamps = nullptr;
	high_resolution_clock::time_point echoStart;
	high_resolution_clock::time_point echoEnd;
	high_resolution_clock::time_point echoLoopEnd;

	Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps, bool collectTimeStamps)
			: id(id), readySem(readySem), startSem(startSem), cps(cps) {
		outBuf = new char[size];
		inBuf = new char[size];

		if (collectTimeStamps) {
			timeStamps = new TimeStamps[iterations];
		}
	}

	~Client() {
		delete[] outBuf;
		delete[] inBuf;
		delete[] timeStamps;
	static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool;
	[[nodiscard]] auto shouldTerminate() const -> bool {
		return iteration >= iterations || terminate.load(std::memory_order_relaxed);
	}
	void print_unexpected_echo_msg() const {
		std::stringstream sst;
		sst << "Client " << id << " got unexpected echo from server" << std::endl;
		sst << "expected: ";
		for (unsigned i = 0; i < size; ++i) {
			sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]);
		sst << "received: ";
		for (unsigned i = 0; i < size; ++i) {
			sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]);
			sst << (unsigned)inBuf[i] << " ";
	void echoError(int err, std::string&& msg) const {
		std::stringstream sst;
		sst << "Client " << id << " failed at iteration " << iteration << " with " << msg;
		if (err != 0) {
			sst << ": " << strerror(err);
	void onCONNRESET() {
		LOGW("Client " << id << " reconnecting");
	void connect() {
		sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
		if (sock < 0) {
			DIE_MSG_ERRNO("creating new client socket failed");
		}
		int enable = 1;
		if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) == -1) {
			DIE_MSG_ERRNO("setsockopt failed");
		}
		int err = emper::io::connectAndWait(sock, server->ai_addr, server->ai_addrlen);
		if (err) {
			DIE_MSG_ERRNO("connecting new client socket failed");
		}
		if (!initialized) {
			// We are connected -> start next client
			Client::startNew(readySem, startSem, cps);
			initialized = true;
	template <CollectTimeStamps collectTimeStampsSwitch, LinkedFutures linkedFuturesSwitch>
	void _run() {
		constexpr bool collectTimeStamps = collectTimeStampsSwitch == CollectTimeStamps::yes;
		constexpr bool linkedFutures = linkedFuturesSwitch == LinkedFutures::yes;
		// signal that this client is initialized and connected
		readySem.release();
		// wait for all clients to be ready
		startSem.acquire();
			SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL);
			RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL);

			// prepare output buf
			sprintf(outBuf, "%lu:%lu", id, iteration);

			echoStart = high_resolution_clock::now();
			if constexpr (collectTimeStamps) {
				timeStamps[iteration].start = echoStart;
			if constexpr (!linkedFutures) {
				sendFuture.submit();

				if constexpr (collectTimeStamps) {
					timeStamps[iteration].afterSend = high_resolution_clock::now();
				}

				if (shouldTerminate()) {
					break;
				}
				int32_t bytes_send = sendFuture.wait();

				if constexpr (collectTimeStamps) {
					timeStamps[iteration].afterSendDispatch = high_resolution_clock::now();
				}

				if (unlikely(bytes_send < 0)) {
					int err = -bytes_send;
					if (err == ECONNRESET) {
						onCONNRESET();
						continue;
					}
					echoError(err, "send failed");
				}
			recvFuture.submit();

			if (shouldTerminate()) {
				break;
			}
			int32_t bytes_recv = recvFuture.wait();

			echoEnd = high_resolution_clock::now();
			if constexpr (collectTimeStamps) {
				timeStamps[iteration].afterRecv = echoEnd;
			if (unlikely(bytes_recv < 0)) {
				int err = -bytes_recv;
				if constexpr (!linkedFutures) {
					echoError(err, "recv failed");
				} else {
					// recv failed
					if (err != ECANCELED) {
						echoError(err, err < 0 ? "recv failed" : "server closed the connection");
					}

					// send failed
					int32_t send_res = sendFuture.wait();
					if (send_res < 0) {
						echoError(-send_res, "send failed");
					}

					// send must have sent less than expected because recvFuture returned -ECANCELED
					echoError(0, EMPER_BUILD_STR("short send: " << send_res));
				}
			if constexpr (!collectTimeStamps) {
				nanoseconds duration = duration_cast<nanoseconds>(echoEnd - echoStart);
				avg_ns.update(duration.count());
			}

			if (memcmp(outBuf, inBuf, size) != 0) {

		echoLoopEnd = high_resolution_clock::now();
	}

 public:
	void shutdown(bool isQuitClient) const {
		if (sendQuit && isQuitClient) {
			LOGI("Send quit message");
			std::string msg = "quit\n";
			ssize_t res = emper::io::sendAndWait(sock, msg.c_str(), msg.size(), MSG_NOSIGNAL);
			if (res < 0) {
				LOGW("Sending quit failed: " << strerror(errno));
			}
		}

		ShutdownFuture shut_f(sock, SHUT_RDWR);
		cf.setDependency(shut_f);
		cf.submit();

		cf.wait();
	}

	void run() {
		if (histogram) {
			if (linked_futures) {
				_run<CollectTimeStamps::yes, LinkedFutures::yes>();
			} else {
				_run<CollectTimeStamps::yes, LinkedFutures::no>();
			}
		} else {
			if (linked_futures) {
				_run<CollectTimeStamps::no, LinkedFutures::yes>();
			} else {
				_run<CollectTimeStamps::no, LinkedFutures::no>();
			}
		}
std::atomic<size_t> Client::client_ids = 0;
auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool {
	size_t next_client_id = client_ids.fetch_add(1);
	if (next_client_id >= nclients) {
		return false;
	}
	spawn(
			[&, id = next_client_id] {
				clients[id] = new Client(id, readySem, startSem, cps, histogram);
static auto getOption(int argc, char** argv, const std::string& option) -> char* {
	char** end = argv + argc;
	char** itr = std::find(argv, end, option);

	if (itr != end && ++itr != end) {
		return *itr;
	}

	return nullptr;
}

static auto existsOption(int argc, char** argv, const std::string& option) -> bool {
	char** end = argv + argc;
	return std::find(argv, end, option) != end;
}

static void printUsage(char* name) {
	std::cerr
			<< "Usage: " << name
			<< "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]"
				 " [-f <output-file>]  [-i <iterations> | -t <execution time in sec>] [--linked-futures]"
static void printIp(struct sockaddr* addr) {
	std::array<char, INET6_ADDRSTRLEN> serverAddrString;
	void* serverAddr;
	switch (addr->sa_family) {
		case AF_INET:
			serverAddr = &(((struct sockaddr_in*)addr)->sin_addr);
			break;
		case AF_INET6:
			serverAddr = &(((struct sockaddr_in6*)addr)->sin6_addr);
			break;
		default:
			DIE_MSG("uknown sa_family " << addr->sa_family);
	}
	inet_ntop(addr->sa_family, serverAddr, serverAddrString.data(), INET6_ADDRSTRLEN);
	std::cout << "echo client connecting to " << serverAddrString.data() << ":" << port << std::endl;
}

static void printSettings() {
	std::cout << "parameters: {termination=";
	if (execution_seconds) {
		std::cout << execution_seconds << "seconds";
	} else {
		std::cout << iterations << "iterations";
	}
	std::cout << ",clients=" << nclients;
	std::cout << ",size=" << size;
	std::cout << ",sendquit= " << (sendQuit ? "true" : "false");
	std::cout << "}" << std::endl;
}

auto main(int argc, char* argv[]) -> int {
	if (existsOption(argc, argv, "-h")) {
		printUsage(argv[0]);
		exit(EXIT_FAILURE);
	}

	char* port_s = getOption(argc, argv, "-p");
	if (port_s) {
		port = std::string(port_s);
	}

	char* addr_s = getOption(argc, argv, "-a");
	if (addr_s) {
		host = std::string(addr_s);
	}

	char* client_s = getOption(argc, argv, "-c");
	if (client_s) {
		nclients = strtol(client_s, nullptr, DECIMAL);
	}

	char* iterations_s = getOption(argc, argv, "-i");
	if (iterations_s) {
		iterations = strtol(iterations_s, nullptr, DECIMAL);
	}

	char* time_s = getOption(argc, argv, "-t");
	if (time_s) {
		if (iterations_s) {
			std::cerr << "-t and -i are mutual exclusive" << std::endl;
			printUsage(argv[0]);
			exit(EXIT_FAILURE);
		}
		execution_seconds = strtol(time_s, nullptr, DECIMAL);
	}

	char* size_s = getOption(argc, argv, "-s");
	if (size_s) {
		size = strtol(size_s, nullptr, DECIMAL);
	}

	char* server_backlog_s = getOption(argc, argv, "-b");
	if (server_backlog_s) {
		server_backlog = strtol(server_backlog_s, nullptr, DECIMAL);
	}

	linked_futures = getOption(argc, argv, "--linked-futures");

	int out_fd = STDOUT_FILENO;
	char* output_file = getOption(argc, argv, "-f");

	if (output_file) {
		if (access(output_file, W_OK) == 0) {
			DIE_MSG("Output file: " << output_file << " already exists");
		}

		out_fd = ::open(output_file, OPENFLAGS, OPENMODE);
		if (out_fd < 0) {
			DIE_MSG_ERRNO("opening output file failed");
		}
	}

	sendQuit = !existsOption(argc, argv, "--no-quit");

	int histogram_fd;
	char* histogram_file = getOption(argc, argv, "--histogram");
	if (histogram_file) {
		if (!iterations_s) {
			DIE_MSG("histograms are currently only possible with fixed iterations");
		}

		histogram = true;

		if (access(histogram_file, W_OK) == 0) {
			DIE_MSG("Histogram file: " << histogram_file << " already exists");
		}

		histogram_fd = ::open(histogram_file, OPENFLAGS, OPENMODE);
		if (histogram_fd < 0) {
			DIE_MSG_ERRNO("opening histogram file failed");
		}
	}

	char* coordinator_s = getOption(argc, argv, "--coordinator");
	if (coordinator_s) {
		coordinator = getFirstAddrInfo(coordinator_s, COORDINATOR_PORT);
	printIp(server->ai_addr);
	printSettings();

	Runtime runtime;

	Fiber* alphaFiber = Fiber::from([&] {
		clients = new Client*[nclients];
		CPS cps;
		Semaphore readySemaphore;
		Semaphore startSemaphore;
		std::thread terminator;
		auto connect_start = high_resolution_clock::now();
		// start first client batch
		for (size_t i = 0; i < server_backlog; ++i) {
			// Start new clients until we have started all or the amount
			// of parallel connecting clients is reached
			if (!Client::startNew(readySemaphore, startSemaphore, cps)) {
		// await the connection of all clients
		for (size_t i = 0; i < nclients; ++i) {
		// synchronize with other peers using the coordinator
		if (coordinator) {
			int conn = socket(coordinator->ai_family, coordinator->ai_socktype, coordinator->ai_protocol);
			if (conn < 0) DIE_MSG_ERRNO("coordinator socket creation failed");

			int err = emper::io::connectAndWait(conn, coordinator->ai_addr, coordinator->ai_addrlen);
			if (err) DIE_MSG_ERRNO("coordinator socket connect failed");

			std::string m = "OK";
			SendFuture sf(conn, m.c_str(), m.size(), 0);

			std::array<char, 16> buf;
			RecvFuture rf(conn, buf.data(), buf.size(), 0);

			rf.setDependency(sf);
			err = rf.submitAndWait();
			if (err < 0) DIE_MSG("coordinator recv failed: " << strerror(-err));

			closeAndForget(conn);

			LOGI("Coordinator go received");
		}

		high_resolution_clock::time_point echoTerminationTime;
		if (execution_seconds) {
			iterations = SIZE_MAX;
				std::this_thread::sleep_for(seconds(execution_seconds));
				echoTerminationTime = high_resolution_clock::now();
				terminate.store(true);
		auto echo_start = high_resolution_clock::now();
		for (size_t i = 0; i < nclients; ++i) {
		// await echo phase termination
		auto echo_end = high_resolution_clock::now();

		if (execution_seconds) {
			terminator.join();
		}

		auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count();
		uint64_t echoDurationUntilTermination;
		if (execution_seconds) {
			echoDurationUntilTermination =
					duration_cast<nanoseconds>(echoTerminationTime - echo_start).count();
		}
		auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count();
		auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count();

		uint64_t total_iterations = 0;
		uint64_t total_reconnects = 0;
		uint64_t total_unexpected_echos = 0;
		RunningAverage avg_ns;
		for (size_t i = 0; i < nclients; ++i) {
			auto* client = clients[i];
			total_iterations += client->iteration;
			total_reconnects += client->reconnects;
			total_unexpected_echos += client->unexpectedEchos;
			if (!histogram) {
				avg_ns.update(client->avg_ns.getAverage());
			}

			// terminate the connection and send 'quit' on the last connection
			bool isQuitClient = i == (nclients - 1);
			async([=]() { client->shutdown(isQuitClient); });
		sst << "[global]" << std::endl;
		sst << "clients = " << nclients << std::endl;
		sst << "size = " << size << std::endl;
		sst << "total_iterations = " << total_iterations << std::endl;
		sst << "total_reconnects = " << total_reconnects << std::endl;
		sst << "total_unexpected_echos = " << total_unexpected_echos << std::endl;
		sst << "avg_ns = " << avg_ns.getAverage() << std::endl;
		sst << "connect_duration = " << connect_duration << std::endl;
		if (execution_seconds) {
			sst << "execution_duration = " << echoDurationUntilTermination << std::endl;
		}
		sst << "echo_duration = " << echo_duration << std::endl;
		sst << "total_duration = " << total_duration << std::endl;
		sst << std::endl;

		sst << "[clients]" << std::endl;
		sst << "csv =" << std::endl;
		sst << " iterations,avg_ns,reconnects,unexpected_echos,echoLoopEnd" << std::endl;
		for (size_t i = 0; i < nclients; ++i) {
			auto* client = clients[i];
			sst << " " << client->iteration << ",";
			sst << client->avg_ns.getAverage() << ",";
			sst << client->reconnects << ",";
			sst << client->unexpectedEchos << ",";
			sst << duration_cast<nanoseconds>(client->echoLoopEnd.time_since_epoch()).count()
					<< std::endl;

		auto output = sst.str();
		if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) {
			DIE_MSG_ERRNO("writing results failed");
		}

		if (output_file) {
			emper::io::closeAndForget(out_fd);
		}

		if (histogram) {
			std::string histogramDataHeader =
					"total_latency,after_send_latency,after_send_dispatch_latency\n";
			if (emper::io::writeFileAndWait(histogram_fd, histogramDataHeader.c_str(),
																			histogramDataHeader.size()) < 0) {
				DIE_MSG_ERRNO("writing histogram data header failed");
			}

			for (size_t i = 0; i < nclients; ++i) {
				auto* client = clients[i];
				std::stringstream sst;
				for (size_t iteration = 0; iteration < iterations; ++iteration) {
					auto timeStamps = client->timeStamps[iteration];
					nanoseconds totalLatency =
							duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.start);
					nanoseconds afterSendLatency =
							duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSend);
					nanoseconds afterSendDispatchLatency =
							duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSendDispatch);
					sst << totalLatency.count() << ",";
					sst << afterSendLatency.count() << ",";
					sst << afterSendDispatchLatency.count() << std::endl;
				}

				auto histoOutput = sst.str();
				if (emper::io::writeFileAndWait(histogram_fd, histoOutput.c_str(), histoOutput.size()) <
						0) {
					DIE_MSG_ERRNO("writing histogram data failed");
				}
			}

			emper::io::closeAndForget(histogram_fd);
		}

		Runtime::getRuntime()->initiateTermination();
	});

	runtime.scheduleFromAnywhere(*alphaFiber);

	runtime.waitUntilFinished();

	for (size_t i = 0; i < nclients; ++i) {
		delete clients[i];
	}
	delete[] clients;

	return EXIT_SUCCESS;