Select Git revision
-
Junio C Hamano authored
* nd/index-pack-no-recurse: index-pack: eliminate unlimited recursion in get_base_data() index-pack: eliminate recursion in find_unresolved_deltas Eliminate recursion in setting/clearing marks in commit list
Junio C Hamano authored* nd/index-pack-no-recurse: index-pack: eliminate unlimited recursion in get_base_data() index-pack: eliminate recursion in find_unresolved_deltas Eliminate recursion in setting/clearing marks in commit list
eval.py 12.40 KiB
#!/usr/bin/env python3
"""Evaluation script for the EMPER echoserver evaluation artifact"""
import argparse
import datetime
import itertools
import logging
import os
from pathlib import Path
import subprocess
import sys
from time import sleep
from typing import Dict, List, Mapping, Sequence
from bench.benchresult import BenchResult
from bench.client import run_clients
import bench.emper as emper
from bench.globalvars import HOSTNAME, ROOT_DIR, TERMINATION_TIME
from bench.server import Server
from bench.server_cmds import SERVER_CMDS
from bench.util import cmd_run
ARTIFACT_DESC = subprocess.check_output(
'git describe --dirty --always'.split(), cwd=ROOT_DIR, text=True)[:-1]
CONS = [500, 1000, 5000, 10000, 15000, 20000, 25000]
SIZES = [16]
# ARGS = itertools.product(CONS, SIZES)
MEASURE_CMD = "/usr/bin/time -o {BENCHDIR}/server_stats.{RUN}.txt -v"
MEASURE_IO_CMD = "sar 1 1000 -o {run}.{cons}.{size}.sar.data"
REMOTE_CMD = "ssh -p {ssh_port} -q {host}"
HOST = "faui49big02"
CLIENTS = None
SERVER_ENV = {}
CLIENT_ENV = {}
STARTUP_TIME = 10
CLIENT_SEPARATION_TIME = 10
def write_desc(data_dir):
"""Write a YAML description of the evaluation into result_dir"""
desc_file_path = data_dir / 'desc.yml'
with open(desc_file_path, 'w') as desc_file:
print(f'start: {start_time.isoformat()}', file=desc_file)
print(f'args: {ARGS}', file=desc_file)
print(f'host: {HOST}', file=desc_file)
print(f'measure_cmd: {MEASURE_CMD}', file=desc_file)
print(f'measure_io_cmd: {MEASURE_IO_CMD}', file=desc_file)
uname = os.uname()
print(
(f'uname_client: {uname.sysname} {uname.nodename} {uname.release} '
f'{uname.version} {uname.machine}'),
file=desc_file)
if HOST == 'localhost':
return
# collect the uname of the host
print('uname_host: ', end='', file=desc_file)
# collect the uname of the host
remote_cmd = REMOTE_CMD.format(host=HOST, ssh_port=SSH_PORT)
uname_cmd = f'{remote_cmd} uname -a >> {desc_file_path}'
cmd_run(uname_cmd, log)
def summarize(results: Mapping[str, Sequence[Sequence[BenchResult]]]):
"""Create, print and safe a summary of the evaluation"""
summary = ''
totals = {s: 0 for s in BenchResult}
for server, server_results in results.items():
for bench_results in server_results:
for i, arg in enumerate(ARGS):
summary += f'{i + 1}/{len(ARGS)} {bench_results[i].name} {server} / {arg}\n'
totals[bench_results[i]] += 1
summary += '\n'
for result, amount in totals.items():
summary += f'{result.name}: {amount}\n'
print(summary)
with open(DATA_DIR / 'summary.txt', 'w') as summary_file:
print(summary, file=summary_file)
RunResult = List[BenchResult]
ServerResult = List[RunResult]
EvalResult = Dict[str, ServerResult]
def bench(server_cmds) -> EvalResult:
"""Run the benchmark for all selected servers and client arguments"""
results = {
server: [[BenchResult.SKIP] * len(ARGS) for r in range(RUNS)]
for server in server_cmds
}
for run in range(1, RUNS + 1):
print(f"starting run {run}/{RUNS}...")
# reset line if we are not in debug mode
if log.getEffectiveLevel() > logging.DEBUG:
print('\u001b[K')
for server_name, server_cmd in server_cmds.items():
bench_results = results[server_name][run - 1]
bench_dir = DATA_DIR / server_name
if not bench_dir.exists():
os.mkdir(bench_dir)
substitutions = {
'BENCHDIR': bench_dir,
'RUN': run,
'SERVER': server_name
}
if REMOTE_CMD:
# make sure the benchdir is available on the HOST
remote_cmd = REMOTE_CMD.format(host=HOST, ssh_port=SSH_PORT)
remote_prepare_cmd = f'{remote_cmd} mkdir -p {bench_dir}'
cmd_run(remote_prepare_cmd, log)
measure_cmd = MEASURE_CMD.format(**substitutions)
print(f'benchmarking {server_name} ...')
server = Server(server_name,
server_cmd,
run,
bench_dir,
env=SERVER_ENV,
measure_cmd=measure_cmd,
remote_cmd=remote_cmd if REMOTE_CMD else None,
host=HOST)
sleep(STARTUP_TIME)
if server.poll() is not None:
log.error('server cmd returned early')
continue
for i, (cons, size) in enumerate(ARGS):
# clear line to the right and reset cursor to the first column
print(f'{i + 1}. (c {cons} s {size}) of {len(ARGS)}\u001b[K\r',
end=''
if log.getEffectiveLevel() > logging.DEBUG else '\n')
# skip naive-multithreaded for cons > 10000
if server_name == 'naive-multithreaded' and cons > 10000:
log.debug(
'Skipping naive-multithreaded for cons (%d) > 10000',
cons)
continue
measure_io_proc = None
if MEASURE_IO_CMD:
measure_io_cmd = MEASURE_IO_CMD.format(cons=cons,
size=size,
run=run)
measure_io_proc = subprocess.Popen(
measure_io_cmd.split(),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
cwd=bench_dir)
client_args = {'cons': cons, 'size': size}
client_remote_cmd = None if not REMOTE_CMD else REMOTE_CMD.format(
host='{host}', ssh_port=SSH_PORT)
bench_results[i] = run_clients(run,
client_args,
bench_dir,
HOST,
substitutions=substitutions,
termination=TERMINATION,
remote_cmd=client_remote_cmd,
hosts=CLIENTS,
env=CLIENT_ENV)
if bench_results[i] == BenchResult.FAIL:
log.warning('\nClient cmd failed. Terminate server')
break
# stop io stats collection
if measure_io_proc:
measure_io_proc.kill()
measure_io_proc.wait(TERMINATION_TIME)
sleep(CLIENT_SEPARATION_TIME)
server.shutdown()
return results
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-r',
'--runs',
help='benchmark runs',
type=int,
default=3)
parser.add_argument('-c',
'--connections',
help='number of client connections to evaluate',
type=int,
nargs='+')
parser.add_argument('--clients',
help='client computers used to evaluate the host',
type=str,
nargs='+')
parser.add_argument('-s',
'--sizes',
help='sizes of messages sent',
type=int,
nargs='+')
parser.add_argument('-f',
'--flavors',
help='emper flavors to benchmark',
nargs='+')
parser.add_argument('-m',
'--measure-cmd',
help='command executing the server',
default=MEASURE_CMD)
parser.add_argument('--measure-io',
help='measure the IO done by all network interfaces',
default=False,
action='store_true')
parser.add_argument('--ssh-port',
help='ssh port to use for the connection to the host',
default=22,
type=int)
parser.add_argument(
'--data-root',
help='path where the experiment results should be saved',
type=str)
parser.add_argument('--log', help='Log level to use', type=str)
parser.add_argument(
'--additional-client-args',
help='additional arguments appended to the echo client command',
type=str)
parser.add_argument('-nf',
'--no-fetch',
help='do not fetch the emper remotes',
action='store_true')
parser.add_argument('implementations',
help='server implementations to benchmark',
nargs='*')
location_group = parser.add_mutually_exclusive_group()
location_group.add_argument('-l',
'--local',
help='run benchmark on localhost',
action='store_true')
location_group.add_argument('--host', help='host running the echo servers')
termination_group = parser.add_mutually_exclusive_group()
termination_group.add_argument(
'--time', help='time the echo client should issue echos', type=int)
termination_group.add_argument(
'--iterations',
help='echos the client should issue per connection',
type=int)
args = parser.parse_args()
if args.log:
numeric_loglevel = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_loglevel, int):
raise ValueError(f'Invalid log level: {args.log}')
else:
numeric_loglevel = logging.WARNING
logging.basicConfig(level=numeric_loglevel)
log = logging.getLogger(Path(__file__).name)
TERMINATION = None
if args.time:
TERMINATION = ('time', args.time)
elif args.iterations:
TERMINATION = ('iterations', args.iterations)
SSH_PORT = args.ssh_port
CLIENTS = args.clients
if not args.measure_io:
MEASURE_IO_CMD = ''
if args.local:
# split the available core between both emper processes
ncpus = os.cpu_count()
if ncpus is not None:
workerCount = ncpus // 2
SERVER_ENV['selected EMPER_WORKER_COUNT'] = str(workerCount)
CLIENT_ENV['EMPER_WORKER_COUNT'] = str(workerCount)
CLIENT_ENV['EMPER_PINNING_OFFSET'] = str(workerCount - 1)
else:
log.warning(('can not determine cpu count and worker count.'
' Each EMPER process will use all available cpus'))
REMOTE_CMD = ""
HOST = "localhost"
if args.host:
HOST = args.host
if args.implementations:
_server_cmds = {
i: c
for i, c in SERVER_CMDS.items() if i in args.implementations
}
MEASURE_CMD = args.measure_cmd
_cons = args.connections or CONS
_sizes = args.sizes or SIZES
ARGS = list(itertools.product(_cons, _sizes))
print("Building the artifact ...")
make_cmd = ['make']
if numeric_loglevel > logging.DEBUG:
make_cmd.append('-s')
subprocess.run(make_cmd, check=True, cwd=ROOT_DIR)
start_time = datetime.datetime.now()
clients_desc = HOSTNAME if not CLIENTS else '-'.join(CLIENTS)
experiment_desc = f'{ARTIFACT_DESC}-{clients_desc}-{HOST}'
if args.data_root:
data_root = Path(args.data_root)
else:
data_root = ROOT_DIR / "results"
DATA_DIR = data_root / experiment_desc / f'{start_time.strftime("%Y-%m-%dT%H_%M_%S")}'
if not DATA_DIR.exists():
os.makedirs(DATA_DIR)
print(f"data_dir: {DATA_DIR}")
write_desc(DATA_DIR)
emper.prepare_client(DATA_DIR)
# generate selected emper flavors
if 'emper' in args.implementations:
_server_cmds.update(
emper.prepare_flavors(DATA_DIR,
selectors=args.flavors,
fetch=not args.no_fetch))
RUNS = args.runs
if not RUNS > 0:
sys.exit(0)
print()
summarize(bench(_server_cmds))
print(f"Written results to {DATA_DIR}")