diff --git a/eval.py b/eval.py index 4f94ec464a8e188cbb69e5aa3a1edae7038f8920..68923ad39f20aae2d150b5fa17b274d60c8b9172 100755 --- a/eval.py +++ b/eval.py @@ -8,11 +8,15 @@ import itertools import os from pathlib import Path import platform +import shutil import subprocess import sys from time import sleep from typing import Dict, List, MutableMapping, Mapping, Sequence +import pssh.exceptions +from pssh.clients import ParallelSSHClient + ROOT_DIR = Path(os.path.dirname(os.path.realpath(__file__))) SERVER_DIR = ROOT_DIR / 'servers' @@ -363,6 +367,10 @@ MEASURE_IO_CMD = f"sar 1 1000 -o {ARG_PREFIX}.sar.data" REMOTE_CMD = "ssh -p {ssh_port} -q {host}" HOST = "faui49big02" +HOSTNAME = platform.node() +COORDINATOR = HOSTNAME +CLIENTS = None + STARTUP_TIME = 30 TERMINATION_TIME = 30 CLIENT_SEPARATION_TIME = 60 @@ -476,6 +484,50 @@ def prepare_client(verbose=False): build_emper(CLIENT_DIR, verbose=verbose) +def run_local_client(client_cmd, client_out, client_err): + """Run a local echo client""" + ret = BenchResult.OK + client_env = prepare_env(CLIENT_ENV) + with open(client_out, 'w') as cout, open(client_err, 'w') as cerr: + try: + subprocess.run(client_cmd.split(), + stdout=cout, + stderr=cerr, + timeout=CLIENT_TIMEOUT, + text=True, + env=client_env, + check=True) + + except subprocess.CalledProcessError as run_err: + print(f'Client terminated unsuccessful: {run_err}', file=cerr) + ret = BenchResult.FAIL + + except subprocess.TimeoutExpired: + ret = BenchResult.TIMEOUT + + #remove empty files + if os.stat(client_out).st_size == 0: + os.remove(client_out) + + if os.stat(client_err).st_size == 0: + os.remove(client_err) + + return ret + + +def save_client_results(clients_out, bench_dir, client_out_suffix, + client_err_suffix): + """Save the client results in the result directory""" + for client_out in clients_out: + client_out_p = bench_dir / f'{client_out.host}-{client_out_suffix}' + client_err_p = bench_dir / f'{client_out.host}-{client_err_suffix}' + + with open(client_out_p, 'w') as cout: + shutil.copyfileobj(client_out.stdout, cout) + with open(client_err_p, 'w') as cerr: + shutil.copyfileobj(client_out.stderr, cerr) + + def update_emper_flavor(flavor_dir, checkout, verbose=False): """Reset and update a emper worktree""" # reset patches @@ -690,49 +742,41 @@ def bench(verbose=False) -> Dict[str, List[List[BenchResult]]]: outfile=outfile, **substitutions) - client_out = bench_dir / CLIENT_OUT.format( - cons=cons, size=size, run=run) - client_err = bench_dir / CLIENT_ERR.format( - cons=cons, size=size, run=run) - client_env = prepare_env(CLIENT_ENV) - with open(client_out, 'w') as cout, open(client_err, - 'w') as cerr: + client_out_suffix = CLIENT_OUT.format(cons=cons, + size=size, + run=run) + client_err_suffix = CLIENT_ERR.format(cons=cons, + size=size, + run=run) + if not CLIENTS: + bench_results[i] = run_local_client( + client_cmd, bench_dir / client_out_suffix, + bench_dir / client_err_suffix) + else: + clients_out = ssh_client.run_command(client_cmd) try: - subprocess.run(client_cmd.split(), - stdout=cout, - stderr=cerr, - timeout=CLIENT_TIMEOUT, - text=True, - env=client_env, - check=True) - + ssh_client.join(timeout=CLIENT_TIMEOUT) + save_client_results(clients_out, bench_dir, + client_out_suffix, + client_err_suffix) bench_results[i] = BenchResult.OK - except subprocess.CalledProcessError as run_err: - print('\nClient cmd failed. Terminate server') - print(f'Client terminated unsuccessful: {run_err}', - file=cerr) - bench_results[i] = BenchResult.FAIL - break - - except subprocess.TimeoutExpired: - print('\nClient timeout expired') + except pssh.exceptions.Timeout: bench_results[i] = BenchResult.TIMEOUT - finally: - # stop io stats collection - if measure_io_proc: - measure_io_proc.kill() - measure_io_proc.wait() + if bench_results[i] == BenchResult.FAIL: + print('\nClient cmd failed. Terminate server') + break - sleep(CLIENT_SEPARATION_TIME) + if bench_results[i] == BenchResult.TIMEOUT: + print('\nClient timeout expired') - # remove empty files - if os.stat(client_out).st_size == 0: - os.remove(client_out) + # stop io stats collection + if measure_io_proc: + measure_io_proc.kill() + measure_io_proc.wait() - if os.stat(client_err).st_size == 0: - os.remove(client_err) + sleep(CLIENT_SEPARATION_TIME) shutdown_server(server_popen, verbose=verbose) return results @@ -758,16 +802,20 @@ if __name__ == '__main__': '--connections', help='number of client connections to evaluate', type=int, - nargs='*') + nargs='+') + parser.add_argument('--clients', + help='client computers used to evaluate the host', + type=str, + nargs='+') parser.add_argument('-s', '--sizes', help='sizes of messages sent', type=int, - nargs='*') + nargs='+') parser.add_argument('-f', '--flavors', help='emper flavors to benchmark', - nargs='*') + nargs='+') parser.add_argument('-m', '--measure-cmd', help='command executing the server', @@ -816,11 +864,17 @@ if __name__ == '__main__': TERMINATION_FLAG = '-t 60' CLIENT_TIMEOUT = 60 * CLIENT_TIMEOUT_FACTOR + CLIENTS = args.clients + CLIENT_CMD = ( f'{CLIENT_EXE} -c {{cons}} -s {{size}} {TERMINATION_FLAG}' f' -a {{host}} -f {{outfile}} --no-quit {args.additional_client_args or ""}' ) + if CLIENTS: + ssh_client = ParallelSSHClient(CLIENTS, port=SSH_PORT) + CLIENT_CMD += f'--coordinator {COORDINATOR}' + if not Path(EMPER_REPO).exists(): subprocess.run('git subproject update --init'.split(), check=True) emper_repo_add_muhq_remote() @@ -895,7 +949,7 @@ if __name__ == '__main__': subprocess.run(make_cmd, check=True, cwd=ROOT_DIR) start_time = datetime.datetime.now() - experiment_desc = f'{ARTIFACT_DESC}-{platform.uname().node}-{HOST}' + experiment_desc = f'{ARTIFACT_DESC}-{HOSTNAME}-{HOST}' if args.data_root: data_root = Path(args.data_root)