Skip to content
Snippets Groups Projects
Select Git revision
  • cd3c5d3524dd015ef4abc47fa8634649cd8506c1
  • master default protected
  • android-7.1.2_r28_klist
  • pie-cts-release
  • pie-vts-release
  • pie-cts-dev
  • oreo-mr1-iot-release
  • sdk-release
  • oreo-m6-s4-release
  • oreo-m4-s12-release
  • pie-release
  • pie-r2-release
  • pie-r2-s1-release
  • oreo-vts-release
  • oreo-cts-release
  • oreo-dev
  • oreo-mr1-dev
  • pie-gsi
  • pie-platform-release
  • pie-dev
  • oreo-cts-dev
  • android-o-mr1-iot-release-1.0.4
  • android-9.0.0_r8
  • android-9.0.0_r7
  • android-9.0.0_r6
  • android-9.0.0_r5
  • android-8.1.0_r46
  • android-8.1.0_r45
  • android-n-iot-release-smart-display-r2
  • android-vts-8.1_r5
  • android-cts-8.1_r8
  • android-cts-8.0_r12
  • android-cts-7.1_r20
  • android-cts-7.0_r24
  • android-o-mr1-iot-release-1.0.3
  • android-cts-9.0_r1
  • android-8.1.0_r43
  • android-8.1.0_r42
  • android-n-iot-release-smart-display
  • android-p-preview-5
  • android-9.0.0_r3
41 results

file_contexts

Blame
  • EchoClient.cpp 9.01 KiB
    // SPDX-License-Identifier: LGPL-3.0-or-later
    // Copyright © 2021 Florian Fischer
    #include <fcntl.h>
    #include <netdb.h>	//for getaddrinfo
    #include <netinet/in.h>
    #include <netinet/tcp.h>
    #include <sys/socket.h>	 // for shutdown, socket, AF_INET
    #include <sys/stat.h>
    #include <unistd.h>
    
    #include <algorithm>	// for find
    #include <array>
    #include <atomic>
    #include <cerrno>		 // for errno, ECANCELED
    #include <chrono>		 // for nanoseconds, duration, durat...
    #include <cstdint>	 // for uint64_t, int32_t
    #include <cstdlib>	 // for size_t, strtol, exit, EXIT_F...
    #include <cstring>	 // for memcmp
    #include <iostream>	 // for operator<<, basic_ostream, endl
    #include <string>		 // for allocator, string, char_traits
    
    #include "Common.hpp"										 // for DIE_MSG_ERRNO, DIE_MSG, unli...
    #include "CountingPrivateSemaphore.hpp"	 // for CPS
    #include "Debug.hpp"										 // for LOGE
    #include "Fiber.hpp"										 // for Fiber
    #include "Runtime.hpp"									 // for Runtime
    #include "Semaphore.hpp"								 // for Semaphore
    #include "emper.hpp"										 // for spawn
    #include "io.hpp"												 // for connectAndWait
    #include "io/Future.hpp"								 // for CloseFuture, RecvFuture, Sen...
    
    using emper::Semaphore;
    using emper::io::CloseFuture;
    using emper::io::RecvFuture;
    using emper::io::SendFuture;
    
    using std::chrono::duration_cast;
    using std::chrono::nanoseconds;
    
    // Defaults
    const int DECIMAL = 10;
    const std::string HOST = "0.0.0.0";
    const std::string PORT = "12345";
    const size_t ITERATIONS = 10000;
    const size_t CLIENTS = 10000;
    const size_t SIZE = 32;
    const size_t SERVER_BACKLOG = 1024;
    
    // Globals
    std::string host = HOST;
    std::string port = PORT;
    size_t iterations = ITERATIONS;
    size_t execution_seconds = 0;
    size_t clients = CLIENTS;
    size_t size = SIZE;
    size_t server_backlog = SERVER_BACKLOG;
    
    std::atomic<bool> terminate = false;
    
    std::atomic<size_t> client_ids = 0;
    
    struct ClientResult {
    	size_t iterations;
    	size_t avg_ns;
    };
    using ClientResult = struct ClientResult;
    
    struct ClientResult* clientResults;
    
    struct addrinfo* server;
    
    static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& startSem, CPS& cps) {
    	int client_sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
    	if (client_sock < 0) {
    		DIE_MSG_ERRNO("creating new client socket failed");
    	}
    
    	int err = emper::io::connectAndWait(client_sock, server->ai_addr, server->ai_addrlen);
    	if (err) {
    		DIE_MSG_ERRNO("connecting new client socket failed");
    	}
    
    	// We are connected -> start next client
    	size_t next_client_id = client_ids.fetch_add(1);
    	if (next_client_id < clients) {
    		spawn([&, next_client_id] { clientFunc(next_client_id, readySem, startSem, cps); }, cps);
    	}
    
    	int enable = 1;
    	if (setsockopt(client_sock, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) == -1) {
    		DIE_MSG_ERRNO("setsockopt failed");
    	}
    
    	std::array<char*, 2> buf;
    	buf[0] = new char[size];
    	buf[1] = new char[size];
    	uint64_t avg_ns = 0;
    
    	readySem.release();
    	startSem.acquire();
    
    	size_t i = 0;
    	while (i < iterations && !terminate.load(std::memory_order_relaxed)) {
    		char* outBuf = buf[i % 2];
    		char* inBuf = buf[(i + 1) % 2];
    
    		SendFuture sendFuture(client_sock, outBuf, size, MSG_NOSIGNAL);
    		RecvFuture recvFuture(client_sock, inBuf, size, MSG_WAITALL);
    		recvFuture.setDependency(sendFuture);
    
    		auto start = std::chrono::high_resolution_clock::now();
    		int32_t res = recvFuture.submitAndWait();
    		auto end = std::chrono::high_resolution_clock::now();
    
    		if (res <= 0) {
    			// recv failed
    			if (res != -ECANCELED) {
    				if (res == 0) {
    					DIE_MSG("server closed the connection");
    				}
    
    				errno = -res;
    				DIE_MSG_ERRNO("receive failed");
    			}
    
    			// send failed
    			int32_t send_res = sendFuture.wait();
    			if (send_res < 0) {
    				errno = -send_res;
    				DIE_MSG_ERRNO("send failed");
    			}
    
    			DIE_MSG("short send" << send_res);
    		}
    
    		if (memcmp(outBuf, inBuf, size) != 0) {
    			DIE_MSG("got unexpected echo from server");
    		}
    
    		auto duration = duration_cast<nanoseconds>(end - start);
    		uint64_t ns = duration.count();
    		if (unlikely(avg_ns == 0)) {
    			avg_ns = ns;
    		} else {
    			avg_ns += ns;
    			avg_ns /= 2;
    		}
    
    		i++;
    	}
    
    	// Shutdown and close the client socket;
    	shutdown(client_sock, SHUT_RDWR);
    
    	// ShutdownFuture shut_f(client_sock, SHUT_RDWR);
    	CloseFuture cf(client_sock);
    	// cf.setDependency(shut_f);
    	cf.submit();
    
    	delete[] buf[0];
    	delete[] buf[1];
    
    	cf.wait();
    
    	clientResults[client_id].avg_ns = avg_ns;
    	clientResults[client_id].iterations = i;
    }
    
    auto getOption(int argc, char** argv, const std::string& option) -> char* {
    	char** end = argv + argc;
    	char** itr = std::find(argv, end, option);
    
    	if (itr != end && ++itr != end) {
    		return *itr;
    	}
    
    	return nullptr;
    }
    
    static auto existsOption(int argc, char** argv, const std::string& option) -> bool {
    	char** end = argv + argc;
    	return std::find(argv, end, option) != end;
    }
    
    static void printUsage(char* name) {
    	std::cerr << "Usage: " << name
    						<< "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]"
    							 " [-f <output-file>]  [-i <iterations> | -t <execution time in sec>]"
    						<< std::endl;
    }
    
    auto main(int argc, char* argv[]) -> int {
    	if (existsOption(argc, argv, "-h")) {
    		printUsage(argv[0]);
    		exit(EXIT_FAILURE);
    	}
    
    	char* port_s = getOption(argc, argv, "-p");
    	if (port_s) {
    		port = std::string(port_s);
    	}
    
    	char* addr_s = getOption(argc, argv, "-a");
    	if (addr_s) {
    		host = std::string(addr_s);
    	}
    
    	char* client_s = getOption(argc, argv, "-c");
    	if (client_s) {
    		clients = strtol(client_s, nullptr, DECIMAL);
    	}
    
    	char* iterations_s = getOption(argc, argv, "-i");
    	if (iterations_s) {
    		iterations = strtol(iterations_s, nullptr, DECIMAL);
    	}
    
    	char* time_s = getOption(argc, argv, "-t");
    	if (time_s) {
    		if (iterations_s) {
    			std::cerr << "-t and -i are mutual exclusive" << std::endl;
    			printUsage(argv[0]);
    			exit(EXIT_FAILURE);
    		}
    		execution_seconds = strtol(time_s, nullptr, DECIMAL);
    	}
    
    	char* size_s = getOption(argc, argv, "-s");
    	if (size_s) {
    		size = strtol(size_s, nullptr, DECIMAL);
    	}
    
    	char* server_backlog_s = getOption(argc, argv, "-b");
    	if (server_backlog_s) {
    		server_backlog = strtol(server_backlog_s, nullptr, DECIMAL);
    	}
    
    	int out_fd = STDOUT_FILENO;
    	char* output_file = getOption(argc, argv, "-f");
    
    	int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
    	if (err) {
    		if (err == EAI_SYSTEM) {
    			DIE_MSG_ERRNO("getaddrinfo failed");
    		} else {
    			LOGE("error in getaddrinfo: " << gai_strerror(err));
    			exit(EXIT_FAILURE);
    		}
    	}
    
    	Runtime runtime;
    
    	Fiber* alphaFiber = Fiber::from([&] {
    		clientResults = new ClientResult[clients];
    		CPS cps;
    		Semaphore readySemaphore;
    		Semaphore startSemaphore;
    
    		auto connect_start = std::chrono::high_resolution_clock::now();
    
    		// start first client batch
    		for (size_t i = 0; i < server_backlog; ++i) {
    			size_t next_client_id = client_ids.fetch_add(1);
    			if (next_client_id >= clients) {
    				break;
    			}
    			spawn(
    					[&, next_client_id] { clientFunc(next_client_id, readySemaphore, startSemaphore, cps); },
    					cps);
    		}
    
    		// await the clients
    		for (size_t i = 0; i < clients; ++i) {
    			readySemaphore.acquire();
    		}
    
    		if (execution_seconds) {
    			iterations = SIZE_MAX;
    			async([] {
    				sleep(execution_seconds);
    				terminate.store(true, std::memory_order_relaxed);
    			});
    		}
    
    		auto echo_start = std::chrono::high_resolution_clock::now();
    
    		// start the clients
    		for (size_t i = 0; i < clients; ++i) {
    			startSemaphore.release();
    		}
    
    		// await the clients
    		cps.wait();
    
    		auto echo_end = std::chrono::high_resolution_clock::now();
    
    		uint64_t avg_ns = clientResults[0].avg_ns;
    		uint64_t avg_iterations = clientResults[0].iterations;
    		for (size_t i = 1; i < clients; ++i) {
    			avg_ns += clientResults[i].avg_ns;
    			avg_ns /= 2;
    
    			avg_iterations += clientResults[i].iterations;
    			avg_iterations /= 2;
    		}
    
    		auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count();
    		auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count();
    		auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count();
    
    		std::stringstream sst;
    		bool exists = true;
    		if (output_file) {
    			exists = access(output_file, W_OK) == 0;
    			int openflags;
    			int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
    			if (!exists) {
    				openflags = O_CREAT | O_WRONLY;
    			} else {
    				openflags = O_APPEND;
    			}
    
    			out_fd = emper::io::openAndWait(output_file, openflags, mode);
    			if (out_fd < 0) {
    				DIE_MSG_ERRNO("opening output file failed");
    			}
    		}
    
    		if (!exists) {
    			sst << "clients,iterations,size,avg_ns,connect,echo,total" << std::endl;
    		}
    		sst << clients << "," << avg_iterations << "," << size << "," << avg_ns << ","
    				<< connect_duration << "," << echo_duration << "," << total_duration << std::endl;
    
    		auto output = sst.str();
    		if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) {
    			DIE_MSG_ERRNO("writing results failed");
    		}
    
    		if (output_file) {
    			emper::io::closeAndForget(out_fd);
    		}
    
    		delete[] clientResults;
    
    		exit(EXIT_SUCCESS);
    	});
    
    	runtime.scheduleFromAnywhere(*alphaFiber);
    
    	runtime.waitUntilFinished();
    
    	return EXIT_FAILURE;
    }