Skip to content
Snippets Groups Projects
Select Git revision
  • a03cafb9a2f76ce1a23f642ad15794b776b49b73
  • master default
  • ci-bump-test
  • abp-queue
  • new-delete-leaks
  • fused-continuation-and-completion-stealing
  • emper-fix-invalid-conv
  • remote-put-get-free-context-cycle
  • linux-version-construct-on-first-use
  • libstdc++-asserts
  • msan
  • libc++
  • completer-strategies
  • cactus_stack_devel_one_commit
  • client-load-change
  • cppcheck
  • flow
  • cast-if-future
  • async_network2
  • thread_safe_log_config
  • burak
  • attic/clang-release-tls-optimization-debug-and-fix
  • attic/continuation-stealing-dev
23 results

EchoClient.cpp

Blame
  • Forked from Lehrstuhl für Informatik 4 (Systemsoftware) / manycore / emper
    Source project has a limited visibility.
    EchoClient.cpp 17.40 KiB
    // SPDX-License-Identifier: LGPL-3.0-or-later
    // Copyright © 2021 Florian Fischer
    #include <fcntl.h>
    #include <netdb.h>	//for getaddrinfo
    #include <netinet/in.h>
    #include <netinet/tcp.h>
    #include <sys/socket.h>	 // for shutdown, socket, AF_INET
    #include <sys/stat.h>
    #include <unistd.h>
    
    #include <algorithm>	// for find
    #include <atomic>
    #include <cerrno>		// for errno, ECANCELED
    #include <chrono>		// for nanoseconds, duration, durat...
    #include <cstdint>	// for uint64_t, int32_t
    #include <cstdio>
    #include <cstdlib>	// for size_t, strtol, exit, EXIT_F...
    #include <cstring>	// for memcmp
    #include <iomanip>
    #include <iostream>	 // for operator<<, basic_ostream, endl
    #include <ratio>
    #include <string>
    #include <thread>
    #include <utility>
    
    #include "Common.hpp"										 // for DIE_MSG_ERRNO, DIE_MSG, unli...
    #include "CountingPrivateSemaphore.hpp"	 // for CPS
    #include "Debug.hpp"										 // for LOGE
    #include "Fiber.hpp"										 // for Fiber
    #include "Runtime.hpp"									 // for Runtime
    #include "Semaphore.hpp"								 // for Semaphore
    #include "emper.hpp"										 // for spawn
    #include "io.hpp"												 // for connectAndWait
    #include "io/Future.hpp"								 // for CloseFuture, RecvFuture, Sen...
    #include "lib/math.hpp"
    
    using emper::Semaphore;
    using emper::io::CloseFuture;
    using emper::io::RecvFuture;
    using emper::io::SendFuture;
    
    using emper::lib::math::RunningAverage;
    
    using std::chrono::duration_cast;
    using std::chrono::high_resolution_clock;
    using std::chrono::nanoseconds;
    using std::chrono::seconds;
    
    // Defaults
    const int DECIMAL = 10;
    const std::string HOST = "0.0.0.0";
    const std::string PORT = "12345";
    const size_t ITERATIONS = 10000;
    const size_t CLIENTS = 10000;
    const size_t SIZE = 32;
    const size_t SERVER_BACKLOG = 1024;
    
    const seconds ONERROR_SLEEP(10);
    
    const int OPENFLAGS = O_CREAT | O_WRONLY;
    const int OPENMODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
    
    // Globals
    std::string host = HOST;
    std::string port = PORT;
    size_t iterations = ITERATIONS;
    size_t execution_seconds = 0;
    size_t nclients = CLIENTS;
    size_t size = SIZE;
    size_t server_backlog = SERVER_BACKLOG;
    bool linked_futures;
    bool histogram = false;
    static ssize_t maxLowLoadClients = -1;
    static std::chrono::milliseconds loadSwitchPeriod(1000);
    
    std::atomic<bool> terminate = false;
    
    struct addrinfo* server;
    
    struct TimeStamps {
    	high_resolution_clock::time_point start;
    	high_resolution_clock::time_point afterSend;
    	high_resolution_clock::time_point afterSendDispatch;
    	high_resolution_clock::time_point afterRecv;
    };
    
    class Client {
    	using IterationResult = std::pair<int, std::string>;
    
    	size_t id;
    	Semaphore& readySem;
    	Semaphore& startSem;
    	CPS& cps;
    
    	int sock;
    	bool initialized = false;
    
    	char* inBuf;
    	char* outBuf;
    
     public:
    	static std::atomic<size_t> client_ids;
    
    	// results
    	size_t iteration = 0;
    	RunningAverage<double> avg_ns;
    	size_t unexpectedEchos = 0;
    	size_t reconnects = 0;
    
    	struct TimeStamps* timeStamps = nullptr;
    
    	Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps, bool collectTimeStamps)
    			: id(id), readySem(readySem), startSem(startSem), cps(cps) {
    		outBuf = new char[size];
    		inBuf = new char[size];
    
    		if (collectTimeStamps) {
    			timeStamps = new TimeStamps[iterations];
    		}
    	}
    
    	~Client() {
    		delete[] outBuf;
    		delete[] inBuf;
    		delete[] timeStamps;
    	}
    
    	static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool;
    
     private:
    	template <bool collectTimeStamps = false>
    	auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
    		if constexpr (collectTimeStamps) {
    			timeStamps[iteration].start = high_resolution_clock::now();
    		}
    
    		sendFuture.submit();
    
    		if constexpr (collectTimeStamps) {
    			timeStamps[iteration].afterSend = high_resolution_clock::now();
    		}
    
    		int32_t bytes_send = sendFuture.wait();
    
    		if constexpr (collectTimeStamps) {
    			timeStamps[iteration].afterSendDispatch = high_resolution_clock::now();
    		}
    
    		if (unlikely(bytes_send < 0)) {
    			return {bytes_send, "send failed"};
    		}
    
    		int32_t bytes_recv = recvFuture.submitAndWait();
    
    		if constexpr (collectTimeStamps) {
    			timeStamps[iteration].afterRecv = high_resolution_clock::now();
    		}
    
    		if (unlikely(bytes_recv < 0)) {
    			return {bytes_recv, "recv failed"};
    		}
    
    		return {bytes_recv, ""};
    	}
    
    	template <bool collectTimeStamps = false>
    	auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
    		recvFuture.setDependency(sendFuture);
    
    		recvFuture.submit();
    
    		if constexpr (collectTimeStamps) {
    			timeStamps[iteration].afterSend = high_resolution_clock::now();
    		}
    
    		int32_t res = recvFuture.wait();
    
    		if constexpr (collectTimeStamps) {
    			timeStamps[iteration].afterSendDispatch = timeStamps[iteration].afterRecv =
    					high_resolution_clock::now();
    		}
    
    		if (unlikely(res <= 0)) {
    			// recv failed
    			if (res != -ECANCELED) {
    				if (res == 0) {
    					return {0, "server closed the connection"};
    				}
    
    				return {res, "recv failed"};
    			}
    
    			// send failed
    			int32_t send_res = sendFuture.wait();
    			if (send_res < 0) {
    				return {send_res, "send failed"};
    			}
    
    			// send must have sent less than expected because recvFuture returned -ECANCELED
    			return {0, EMPER_BUILD_STR("short send: " << send_res)};
    		}
    
    		return {res, ""};
    	}
    
    	void connect() {
    		sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
    		if (sock < 0) {
    			DIE_MSG_ERRNO("creating new client socket failed");
    		}
    
    		int enable = 1;
    		if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) == -1) {
    			DIE_MSG_ERRNO("setsockopt failed");
    		}
    
    		int err = emper::io::connectAndWait(sock, server->ai_addr, server->ai_addrlen);
    		if (err) {
    			DIE_MSG_ERRNO("connecting new client socket failed");
    		}
    
    		if (!initialized) {
    			// We are connected -> start next client
    			Client::startNew(readySem, startSem, cps);
    			initialized = true;
    		} else {
    			reconnects++;
    		}
    	}
    
    	[[nodiscard]] auto shouldYield() const -> bool {
    		if (maxLowLoadClients < 0) {
    			return false;
    		}
    
    		if (id < static_cast<size_t>(maxLowLoadClients)) {
    			return false;
    		}
    
    		auto now = std::chrono::steady_clock::now();
    		auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now);
    		auto ms_since_epoch = now_ms.time_since_epoch();
    		auto ms_since_epoch_duration =
    				std::chrono::duration_cast<std::chrono::milliseconds>(ms_since_epoch);
    		uint64_t periodNum = ms_since_epoch_duration / loadSwitchPeriod;
    		return periodNum % 2;
    	}
    
     public:
    	template <bool collectTimeStamps = false>
    	void run() {
    		connect();
    
    		// signal that this client is initialized and connected
    		readySem.release();
    
    		// wait for all clients to be ready
    		startSem.acquire();
    
    		while (iteration < iterations && !terminate.load(std::memory_order_relaxed)) {
    			if (shouldYield()) {
    				emper::yield();
    				continue;
    			}
    
    			SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL);
    			RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL);
    
    			// prepare output buf
    			sprintf(outBuf, "%lu:%lu", id, iteration);
    
    			high_resolution_clock::time_point start;
    			if constexpr (!collectTimeStamps) {
    				start = high_resolution_clock::now();
    			}
    
    			IterationResult iterationResult;
    			if (linked_futures) {
    				iterationResult = linkedEcho<collectTimeStamps>(sendFuture, recvFuture);
    			} else {
    				iterationResult = echo<collectTimeStamps>(sendFuture, recvFuture);
    			}
    
    			high_resolution_clock::time_point end;
    			if constexpr (!collectTimeStamps) {
    				end = high_resolution_clock::now();
    			}
    
    			// less than 1 byte was received -> an error occurred
    			// (connection closed by server is considered an error)
    			if (unlikely(iterationResult.first <= 0)) {
    				int err = -iterationResult.first;
    				auto& msg = iterationResult.second;
    				if (err == ECONNRESET) {
    					LOGW("Client " << id << " reconnecting");
    					close(sock);
    					// reconnect and try again
    					connect();
    					continue;
    				}
    
    				LOGE("Client failed at iteration " << iteration << " with " << msg << ": "
    																					 << strerror(err));
    				std::this_thread::sleep_for(ONERROR_SLEEP);
    				DIE;
    			}
    
    			if constexpr (!collectTimeStamps) {
    				nanoseconds duration = duration_cast<nanoseconds>(end - start);
    				avg_ns.update(duration.count());
    			}
    
    			if (memcmp(outBuf, inBuf, size) != 0) {
    				std::stringstream sst;
    				sst << "got unexpected echo from server" << std::endl;
    				sst << "expected: ";
    				for (unsigned i = 0; i < size; ++i) {
    					sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]);
    				}
    				sst << std::endl;
    
    				sst << "received: ";
    				for (unsigned i = 0; i < size; ++i) {
    					sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]);
    					sst << (unsigned)inBuf[i] << " ";
    				}
    				sst << std::endl;
    				LOGE(sst.str());
    				unexpectedEchos++;
    			}
    
    			iteration++;
    		}
    
    		shutdown(sock, SHUT_RDWR);
    
    		// ShutdownFuture shut_f(sock, SHUT_RDWR);
    		CloseFuture cf(sock);
    		// cf.setDependency(shut_f);
    		cf.submit();
    
    		cf.wait();
    	}
    };
    
    Client** clients;
    
    std::atomic<size_t> Client::client_ids = 0;
    
    auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool {
    	size_t next_client_id = client_ids.fetch_add(1);
    	if (next_client_id >= nclients) {
    		return false;
    	}
    
    	spawn(
    			[&, id = next_client_id] {
    				clients[id] = new Client(id, readySem, startSem, cps, histogram);
    
    				if (histogram) {
    					clients[id]->run<true>();
    				} else {
    					clients[id]->run();
    				}
    			},
    			cps);
    	return true;
    }
    
    auto getOption(int argc, char** argv, const std::string& option) -> char* {
    	char** end = argv + argc;
    	char** itr = std::find(argv, end, option);
    
    	if (itr != end && ++itr != end) {
    		return *itr;
    	}
    
    	return nullptr;
    }
    
    static auto existsOption(int argc, char** argv, const std::string& option) -> bool {
    	char** end = argv + argc;
    	return std::find(argv, end, option) != end;
    }
    
    static void printUsage(char* name) {
    	std::cerr
    			<< "Usage: " << name
    			<< "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]"
    			<< std::endl
    			<< "[-f <output-file>]  [-i <iterations> | -t <execution time in sec>] [--linked-futures]"
    			<< std::endl
    			<< "[--max-low-load-clients <num>] [--load-switch-period-ms <num>]" << std::endl;
    }
    
    auto main(int argc, char* argv[]) -> int {
    	if (existsOption(argc, argv, "-h")) {
    		printUsage(argv[0]);
    		exit(EXIT_FAILURE);
    	}
    
    	char* port_s = getOption(argc, argv, "-p");
    	if (port_s) {
    		port = std::string(port_s);
    	}
    
    	char* addr_s = getOption(argc, argv, "-a");
    	if (addr_s) {
    		host = std::string(addr_s);
    	}
    
    	char* client_s = getOption(argc, argv, "-c");
    	if (client_s) {
    		nclients = strtol(client_s, nullptr, DECIMAL);
    	}
    
    	char* iterations_s = getOption(argc, argv, "-i");
    	if (iterations_s) {
    		iterations = strtol(iterations_s, nullptr, DECIMAL);
    	}
    
    	char* time_s = getOption(argc, argv, "-t");
    	if (time_s) {
    		if (iterations_s) {
    			std::cerr << "-t and -i are mutual exclusive" << std::endl;
    			printUsage(argv[0]);
    			exit(EXIT_FAILURE);
    		}
    		execution_seconds = strtol(time_s, nullptr, DECIMAL);
    	}
    
    	char* size_s = getOption(argc, argv, "-s");
    	if (size_s) {
    		size = strtol(size_s, nullptr, DECIMAL);
    	}
    
    	char* server_backlog_s = getOption(argc, argv, "-b");
    	if (server_backlog_s) {
    		server_backlog = strtol(server_backlog_s, nullptr, DECIMAL);
    	}
    
    	linked_futures = getOption(argc, argv, "--linked-futures");
    
    	int out_fd = STDOUT_FILENO;
    	char* output_file = getOption(argc, argv, "-f");
    
    	if (output_file) {
    		if (access(output_file, W_OK) == 0) {
    			DIE_MSG("Output file: " << output_file << " already exists");
    		}
    
    		out_fd = open(output_file, OPENFLAGS, OPENMODE);
    		if (out_fd < 0) {
    			DIE_MSG_ERRNO("opening output file failed");
    		}
    	}
    
    	int histogram_fd;
    	char* histogram_file = getOption(argc, argv, "--histogram");
    	if (histogram_file) {
    		if (!iterations_s) {
    			DIE_MSG("histograms are currently only possible with fixed iterations");
    		}
    
    		histogram = true;
    
    		if (access(histogram_file, W_OK) == 0) {
    			DIE_MSG("Histogram file: " << histogram_file << " already exists");
    		}
    
    		histogram_fd = open(histogram_file, OPENFLAGS, OPENMODE);
    		if (histogram_fd < 0) {
    			DIE_MSG_ERRNO("opening histogram file failed");
    		}
    	}
    
    	char* max_low_load_clients = getOption(argc, argv, "--max-low-load-clients");
    	if (max_low_load_clients) {
    		maxLowLoadClients = std::stoi(max_low_load_clients);
    		if (static_cast<size_t>(maxLowLoadClients) >= nclients) {
    			DIE_MSG("--max-low-load-clients " << maxLowLoadClients
    																				<< " must be smaller than the total number of clients "
    																				<< nclients);
    		}
    	}
    
    	char* load_switch_period_ms_string = getOption(argc, argv, "--load-switch-period-ms");
    	if (load_switch_period_ms_string) {
    		if (!max_low_load_clients) {
    			DIE_MSG("Can't use --load-switch-period-ms without --max-low-load-clients");
    		}
    		int load_switch_period_ms = std::stoi(load_switch_period_ms_string);
    		loadSwitchPeriod = std::chrono::milliseconds(load_switch_period_ms);
    	}
    
    	int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
    	if (err) {
    		if (err == EAI_SYSTEM) {
    			DIE_MSG_ERRNO("getaddrinfo failed");
    		} else {
    			LOGE("error in getaddrinfo: " << gai_strerror(err));
    			exit(EXIT_FAILURE);
    		}
    	}
    
    	Runtime runtime;
    
    	Fiber* alphaFiber = Fiber::from([&] {
    		clients = new Client*[nclients];
    		CPS cps;
    		Semaphore readySemaphore;
    		Semaphore startSemaphore;
    		std::thread terminator;
    
    		auto connect_start = high_resolution_clock::now();
    
    		// start first client batch
    		for (size_t i = 0; i < server_backlog; ++i) {
    			// Start new clients until we have started all or the amount
    			// of parallel connecting clients is reached
    			if (!Client::startNew(readySemaphore, startSemaphore, cps)) {
    				break;
    			}
    		}
    
    		// await the clients
    		for (size_t i = 0; i < nclients; ++i) {
    			readySemaphore.acquire();
    		}
    
    		if (execution_seconds) {
    			iterations = SIZE_MAX;
    			terminator = std::thread([] {
    				std::this_thread::sleep_for(seconds(execution_seconds));
    				terminate.store(true);
    			});
    		}
    
    		auto echo_start = high_resolution_clock::now();
    
    		// start the clients
    		for (size_t i = 0; i < nclients; ++i) {
    			startSemaphore.release();
    		}
    
    		// await client termination
    		cps.wait();
    
    		if (execution_seconds) {
    			terminator.join();
    		}
    
    		auto echo_end = high_resolution_clock::now();
    
    		auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count();
    		auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count();
    		auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count();
    
    		uint64_t total_iterations = 0;
    		uint64_t total_reconnects = 0;
    		uint64_t total_unexpected_echos = 0;
    		RunningAverage avg_ns;
    		for (size_t i = 0; i < nclients; ++i) {
    			auto* client = clients[i];
    			total_iterations += client->iteration;
    			total_reconnects += client->reconnects;
    			total_unexpected_echos += client->unexpectedEchos;
    			if (!histogram) {
    				avg_ns.update(client->avg_ns.getAverage());
    			}
    		}
    
    		std::stringstream sst;
    		sst << "[global]" << std::endl;
    		sst << "clients = " << nclients << std::endl;
    		sst << "size = " << size << std::endl;
    		sst << "total_iterations = " << total_iterations << std::endl;
    		sst << "total_reconnects = " << total_reconnects << std::endl;
    		sst << "total_unexpected_echos = " << total_unexpected_echos << std::endl;
    		sst << "avg_ns = " << avg_ns.getAverage() << std::endl;
    		sst << "connect_duration = " << connect_duration << std::endl;
    		sst << "echo_duration = " << echo_duration << std::endl;
    		sst << "total_duration = " << total_duration << std::endl;
    		sst << std::endl;
    
    		sst << "[clients]" << std::endl;
    		sst << "csv =" << std::endl;
    		sst << " iterations,avg_ns,reconnects,unexpected_echos" << std::endl;
    		for (size_t i = 0; i < nclients; ++i) {
    			auto* client = clients[i];
    			sst << " " << client->iteration << ",";
    			sst << client->avg_ns.getAverage() << ",";
    			sst << client->reconnects << ",";
    			sst << client->unexpectedEchos << std::endl;
    		}
    
    		auto output = sst.str();
    		if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) {
    			DIE_MSG_ERRNO("writing results failed");
    		}
    
    		if (output_file) {
    			emper::io::closeAndForget(out_fd);
    		}
    
    		if (histogram) {
    			std::string histogramDataHeader =
    					"total_latency,after_send_latency,after_send_dispatch_latency\n";
    			if (emper::io::writeFileAndWait(histogram_fd, histogramDataHeader.c_str(),
    																			histogramDataHeader.size()) < 0) {
    				DIE_MSG_ERRNO("writing histogram data header failed");
    			}
    
    			for (size_t i = 0; i < nclients; ++i) {
    				auto* client = clients[i];
    				std::stringstream sst;
    				for (size_t iteration = 0; iteration < iterations; ++iteration) {
    					auto timeStamps = client->timeStamps[iteration];
    					nanoseconds totalLatency =
    							duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.start);
    					nanoseconds afterSendLatency =
    							duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSend);
    					nanoseconds afterSendDispatchLatency =
    							duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSendDispatch);
    					sst << totalLatency.count() << ",";
    					sst << afterSendLatency.count() << ",";
    					sst << afterSendDispatchLatency.count() << std::endl;
    				}
    
    				auto histoOutput = sst.str();
    				if (emper::io::writeFileAndWait(histogram_fd, histoOutput.c_str(), histoOutput.size()) <
    						0) {
    					DIE_MSG_ERRNO("writing histogram data failed");
    				}
    			}
    
    			emper::io::closeAndForget(histogram_fd);
    		}
    
    		for (size_t i = 0; i < nclients; ++i) {
    			delete clients[i];
    		}
    		delete[] clients;
    
    		exit(EXIT_SUCCESS);
    	});
    
    	runtime.scheduleFromAnywhere(*alphaFiber);
    
    	runtime.waitUntilFinished();
    
    	return EXIT_FAILURE;
    }