Skip to content
Snippets Groups Projects
EchoClient.cpp 7.14 KiB
Newer Older
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <fcntl.h>
#include <netdb.h>			 //for getaddrinfo
#include <sys/socket.h>	 // for shutdown, socket, AF_INET
#include <sys/stat.h>
#include <unistd.h>
#include <atomic>
#include <cerrno>		 // for errno, ECANCELED
#include <chrono>		 // for nanoseconds, duration, durat...
#include <cstdint>	 // for uint64_t, int32_t
#include <cstdlib>	 // for size_t, strtol, exit, EXIT_F...
#include <cstring>	 // for memcmp
#include <iostream>	 // for operator<<, basic_ostream, endl
#include <string>		 // for allocator, string, char_traits

#include "Common.hpp"										 // for DIE_MSG_ERRNO, DIE_MSG, unli...
#include "CountingPrivateSemaphore.hpp"	 // for CPS
#include "Debug.hpp"										 // for LOGE
#include "Fiber.hpp"										 // for Fiber
#include "Runtime.hpp"									 // for Runtime
#include "Semaphore.hpp"								 // for Semaphore
#include "emper.hpp"										 // for spawn
#include "io.hpp"												 // for connectAndWait
#include "io/Future.hpp"								 // for CloseFuture, RecvFuture, Sen...

using emper::Semaphore;
using emper::io::CloseFuture;
using emper::io::RecvFuture;
using emper::io::SendFuture;

// Defaults
const int DECIMAL = 10;
const std::string HOST = "0.0.0.0";
const std::string PORT = "12345";
const size_t ITERATIONS = 10000;
const size_t CLIENTS = 10000;
const size_t SIZE = 32;
const size_t SERVER_BACKLOG = 1024;
// Globals
std::string host = HOST;
std::string port = PORT;
size_t iterations = ITERATIONS;
size_t clients = CLIENTS;
size_t size = SIZE;
size_t server_backlog = SERVER_BACKLOG;

std::atomic<size_t> client_ids = 0;
uint64_t* client_avgs;

struct addrinfo* server;

static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& startSem, CPS& cps) {
	int client_sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
	if (client_sock < 0) {
		DIE_MSG_ERRNO("creating new client socket failed");
	}

	int err = emper::io::connectAndWait(client_sock, server->ai_addr, server->ai_addrlen);
	if (err) {
		DIE_MSG_ERRNO("connecting new client socket failed");
	}

	// We are connected -> start next client
	size_t next_client_id = client_ids.fetch_add(1);
	if (next_client_id < clients) {
		spawn([&, next_client_id] { clientFunc(next_client_id, readySem, startSem, cps); }, cps);
	}

	// NOLINTNEXTLINE(modernize-avoid-c-arrays)
	char* buf[2];
	buf[0] = new char[size];
	buf[1] = new char[size];
	uint64_t avg_ns = 0;

	readySem.release();
	startSem.acquire();

	for (size_t i = 0; i < iterations; ++i) {
		char* outBuf = buf[i % 2];
		char* inBuf = buf[(i + 1) % 2];

		SendFuture sendFuture(client_sock, outBuf, size, MSG_NOSIGNAL);
		RecvFuture recvFuture(client_sock, inBuf, size, MSG_WAITALL);
		recvFuture.setDependency(sendFuture);

		auto start = std::chrono::high_resolution_clock::now();
		int32_t res = recvFuture.submitAndWait();
		auto end = std::chrono::high_resolution_clock::now();

		if (res <= 0) {
			// recv failed
			if (res != -ECANCELED) {
				if (res == 0) {
					DIE_MSG("server closed the connection");
				}

				errno = -res;
				DIE_MSG_ERRNO("receive failed");
			}

			// send failed
			int32_t send_res = sendFuture.wait();
			if (send_res < 0) {
				errno = -send_res;
				DIE_MSG_ERRNO("send failed");
			}

			DIE_MSG("short send" << send_res);
		}

		if (memcmp(outBuf, inBuf, size) != 0) {
			DIE_MSG("got unexpected echo from server");
		}

		auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
		uint64_t ns = duration.count();
		if (unlikely(avg_ns == 0)) {
			avg_ns = ns;
		} else {
			avg_ns += ns;
			avg_ns /= 2;
		}
	}

	// Shutdown and close the client socket;
	shutdown(client_sock, SHUT_RDWR);

	// ShutdownFuture shut_f(client_sock, SHUT_RDWR);
	CloseFuture cf(client_sock);
	// cf.setDependency(shut_f);
	cf.submit();

	delete[] buf[0];
	delete[] buf[1];

	cf.wait();

	client_avgs[client_id] = avg_ns;
}

auto getOption(int argc, char** argv, const std::string& option) -> char* {
	char** end = argv + argc;
	char** itr = std::find(argv, end, option);

	if (itr != end && ++itr != end) {
		return *itr;
	}

	return nullptr;
}

static auto existsOption(int argc, char** argv, const std::string& option) -> bool {
	char** end = argv + argc;
	return std::find(argv, end, option) != end;
}

static void printUsage(char* name) {
	std::cerr << "Usage: " << name
						<< "[-h] [-p <port>] [-c <clients>] [-i <iterations>] [-a <address>] [-s <size>] [-b "
							 "<server backlog>] [-f <output-file>]"
						<< std::endl;
}

auto main(int argc, char* argv[]) -> int {
	if (existsOption(argc, argv, "-h")) {
		printUsage(argv[0]);
		exit(EXIT_FAILURE);
	}

	char* port_s = getOption(argc, argv, "-p");
	if (port_s) {
		port = std::string(port_s);
	}

	char* addr_s = getOption(argc, argv, "-a");
	if (addr_s) {
		host = std::string(addr_s);
	}

	char* client_s = getOption(argc, argv, "-c");
	if (client_s) {
		clients = strtol(client_s, nullptr, DECIMAL);
	}

	char* iterations_s = getOption(argc, argv, "-i");
	if (iterations_s) {
		iterations = strtol(iterations_s, nullptr, DECIMAL);
	}

	char* size_s = getOption(argc, argv, "-s");
	if (size_s) {
		size = strtol(size_s, nullptr, DECIMAL);
	}

	char* server_backlog_s = getOption(argc, argv, "-b");
	if (server_backlog_s) {
		server_backlog = strtol(server_backlog_s, nullptr, DECIMAL);
	}

	int out_fd = STDOUT_FILENO;

	char* output_file = getOption(argc, argv, "-f");
	if (output_file) {
		out_fd = open(output_file, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);

		if (out_fd < 0) {
			DIE_MSG_ERRNO("opening output file failed");
		}
	}

	int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
	if (err) {
		if (err == EAI_SYSTEM) {
			DIE_MSG_ERRNO("getaddrinfo failed");
		} else {
			LOGE("error in getaddrinfo: " << gai_strerror(err));
			exit(EXIT_FAILURE);
		}
	}

	Runtime runtime;

	Fiber* alphaFiber = Fiber::from([&] {
		client_avgs = new uint64_t[clients];
		CPS cps;
		Semaphore readySemaphore;
		Semaphore startSemaphore;

		// start first client batch
		for (size_t i = 0; i < server_backlog; ++i) {
			size_t next_client_id = client_ids.fetch_add(1);
			if (next_client_id >= clients) {
				break;
			}
					[&, next_client_id] { clientFunc(next_client_id, readySemaphore, startSemaphore, cps); },
					cps);
		}

		// await the clients
		for (size_t i = 0; i < clients; ++i) {
			readySemaphore.acquire();
		}

		// start the clients
		for (size_t i = 0; i < clients; ++i) {
			startSemaphore.release();
		}

		// await the clients
		cps.wait();

		uint64_t avg_ns = client_avgs[0];
		for (size_t i = 1; i < clients; ++i) {
			avg_ns += client_avgs[i];
			avg_ns /= 2;
		}

		std::stringstream sst;
		sst << "clients,iterations,size,time" << std::endl;
		sst << clients << "," << iterations << "," << size << "," << avg_ns << std::endl;

		auto output = sst.str();
		if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) {
			DIE_MSG_ERRNO("writing results failed");
		}

		if (output_file) {
			emper::io::closeAndForget(out_fd);
		}

		delete[] client_avgs;

		exit(EXIT_SUCCESS);
	});

	runtime.scheduleFromAnywhere(*alphaFiber);

	runtime.waitUntilFinished();

	return EXIT_FAILURE;
}