# Copyright 2021 Florian Fischer """Echoclient related code""" import logging from pathlib import Path import pprint as pp import subprocess from typing import Mapping, Sequence, Union from .benchresult import BenchResult from .coordinator import Coordinator from .globalvars import EMPER_CLIENT_DIR, CLIENTS_DIR, HOSTNAME, KILL_CMD, TERMINATION_TIME from .util import cmd_run, prepare_env, purge_empty_files log = logging.getLogger(__name__) CLIENT_IMPLEMENTATIONS = { 'emper': f'{EMPER_CLIENT_DIR}/build/apps/echoclient', 'io_uring': f'{CLIENTS_DIR}/io_uring/build/client', } CLIENT_CSV = '{arg_prefix}.ini' CLIENT_OUT = '{arg_prefix}.out' CLIENT_ERR = '{arg_prefix}.err' CLIENT_TIMEOUT_FACTOR = 2 CLIENT_DEFAULT_TERMINATION = ('time', 60) CLIENT_CMD = '{exe} -c {cons} -s {size} {termination_flag} -a {host} -f {outfile} --no-quit' def run_clients(run: int, args: Mapping, result_dir, server_host: str, substitutions=None, termination=None, remote_cmd=None, hosts=None, env=None, implementation='emper') -> BenchResult: """Run one or multiple local or remote clients fro args""" arg_prefix_template = f'{run}.{{{"}.{".join(sorted(args.keys()))}}}' arg_prefix = arg_prefix_template.format(run=run, **args) log.debug('output file prefix: %s generated from: %s', arg_prefix, arg_prefix_template) client_out = CLIENT_OUT.format(arg_prefix=arg_prefix) client_err = CLIENT_ERR.format(arg_prefix=arg_prefix) client_csv = CLIENT_CSV.format(arg_prefix=arg_prefix) termination = termination or CLIENT_DEFAULT_TERMINATION if termination[0] == 'time': termination_flag = f'-t {termination[1]}' timeout = termination[1] * CLIENT_TIMEOUT_FACTOR elif termination[0] == 'iterations': termination_flag = f'-i {termination[1]}' # TODO: one second per iteration seems to long timeout = termination[1] else: log.error('Unknown termination option: %s', termination[0]) out = result_dir / client_out err = result_dir / client_err csv = result_dir / client_csv client_impl = CLIENT_IMPLEMENTATIONS[implementation] if not hosts: cmd = CLIENT_CMD.format(exe=client_impl, host=server_host, outfile=csv, termination_flag=termination_flag, **args, **substitutions) return run_local_client(cmd, out, err, timeout, env=env) assert hosts assert remote_cmd cmd = CLIENT_CMD.format(exe=client_impl, host=server_host, outfile='{outfile}', termination_flag=termination_flag, **args, **substitutions) return run_remote_clients(cmd, out, err, csv, remote_cmd, hosts, timeout) def run_local_client(cmd: str, out: str, err: str, timeout, env=None) -> BenchResult: """Run a local echo client""" ret = BenchResult.OK client_env = prepare_env(env or {}) try: client = Client(cmd, out, err, env=client_env) if client.wait(timeout) != 0: ret = BenchResult.FAIL log.error('Client terminated unsuccessful: %s', client.poll()) except subprocess.TimeoutExpired: ret = BenchResult.TIMEOUT log.error('Client timed out') purge_empty_files([out, err]) return ret def run_remote_clients(cmd: str, out: Path, err: Path, csv: Path, remote_cmd: str, hosts: Sequence[str], timeout) -> BenchResult: """Run clients on multiple remote hosts""" ret = BenchResult.OK clients = [] files = [] # Start coordinator c_out = out.parent / f'coordinator-{out.name}' c_err = err.parent / f'coordinator-{err.name}' files.extend([c_out, c_err]) coordinator = Coordinator(len(hosts), out_file=c_out, err_file=c_err) cmd += f' --coordinator {HOSTNAME}' for host in hosts: # prepend out files with the host the client runs on h_out = out.parent / f'{host}-{out.name}' h_err = err.parent / f'{host}-{err.name}' h_csv = csv.parent / f'{host}-{csv.name}' files.extend([h_out, h_err, h_csv]) r_cmd = remote_cmd.format(host=host) h_cmd = cmd.format(outfile=h_csv) clients.append(Client(h_cmd, h_out, h_err, remote_cmd=r_cmd, host=host)) # await all clients for client in clients: try: if client.wait(timeout) != 0: ret = BenchResult.FAIL log.error('Client at %s terminated unsuccessful: %s', client.host, client.poll()) except subprocess.TimeoutExpired: ret = BenchResult.TIMEOUT log.error('Client at %s timed out', client.host) client.kill() purge_empty_files(files) coordinator.kill() coordinator.wait(TERMINATION_TIME) return ret class Client(subprocess.Popen): """Class wrapping a remote or local echo client subprocess""" def __init__(self, cmd: str, out_file: Union[str, Path], err_file: Union[str, Path], env=None, remote_cmd=None, host=None): self.cmd = cmd self.bin = cmd.split()[0] self.remote_cmd = remote_cmd if remote_cmd: self.cmd = f'{remote_cmd} {cmd}' assert host self.host = host log.debug( 'Creating %s client:\n%s', host or 'local', pp.pformat({ 'cmd': self.cmd, 'out': out_file, 'err': err_file, 'host': host })) self.fout = open(out_file, 'w', encoding='utf-8') self.ferr = open(err_file, 'w', encoding='utf-8') super().__init__(self.cmd.split(), stdout=self.fout, stderr=self.ferr, env=env, text=True) # type: ignore def kill(self): """Kill a running client""" if self.poll() is not None: return if not self.remote_cmd: super().kill() else: # kill the client using pkill(1) -9 kill_cmd = f'{self.remote_cmd} {KILL_CMD.format(proc=self.bin)}' cmd_run(kill_cmd, log, check=False) self.wait(TERMINATION_TIME) def close_out_files(self): """Close the output filed""" self.fout.close() self.ferr.close() def __del__(self): # the client is still alive if self.poll() is None: self.kill() self.close_out_files()