Commit e3b55401 authored by Florian Fischer's avatar Florian Fischer
Browse files

[eval] big refactoring

* Split alot of code from eval.py into separate python files in bench/
* Introduce subprocess.Popen wrapper classes for server, clients, and the coordinator
* Drop the use of pssh
* make -l and --host mutual exclusive
* pass logger to run_cmd
parent ee7e1ded
.PHONY: all clean check fotmat check-format check-pylint clean-emper clean-emper-client eval servers clients
PYTHONFILES := eval.py plot.py parse_results.py plot_tail_latency.py plot_utils.py
PYTHONFILES := eval.py plot.py parse_results.py plot_tail_latency.py plot_utils.py bench/*.py
all: servers clients
......
# Copyright 2021 Florian Fischer
"""Result of a single client iteration"""
from enum import Enum
class BenchResult(Enum):
"""Enum for the exit types of a client execution"""
OK = 1
TIMEOUT = 2
FAIL = 3
SKIP = 4
# Copyright 2021 Florian Fischer
"""Echoclient related code"""
import logging
import os
from pathlib import Path
import subprocess
from typing import Mapping, Sequence, Union
from .benchresult import BenchResult
from .coordinator import Coordinator
from .globalvars import CLIENT_DIR, HOSTNAME, KILL_CMD, TERMINATION_TIME
from .util import cmd_run, prepare_env
log = logging.getLogger(__name__)
CLIENT_EXE = f'{CLIENT_DIR}/build/apps/echoclient'
CLIENT_CSV = '{arg_prefix}.ini'
CLIENT_OUT = '{arg_prefix}.out'
CLIENT_ERR = '{arg_prefix}.err'
CLIENT_TIMEOUT_FACTOR = 2
CLIENT_DEFAULT_TERMINATION = ('time', 60)
CLIENT_CMD = (f'{CLIENT_EXE} -c {{cons}} -s {{size}} {{termination_flag}}'
' -a {host} -f {outfile} --no-quit')
def purge_empty_files(files: Sequence[Union[str, Path]]):
"""Remove empty files"""
for _file in files:
if os.path.exists(_file) and os.stat(_file).st_size == 0:
os.remove(_file)
def run_clients(run: int,
args: Mapping,
result_dir,
server_host: str,
substitutions=None,
termination=None,
remote_cmd=None,
hosts=None,
env=None) -> BenchResult:
"""Run one or multiple local or remote clients fro args"""
arg_prefix_template = f'{run}.{{{"}.{".join(sorted(args.keys()))}}}'
arg_prefix = arg_prefix_template.format(run=run, **args)
log.debug('output file prefix: %s generated from: %s', arg_prefix,
arg_prefix_template)
client_out = CLIENT_OUT.format(arg_prefix=arg_prefix)
client_err = CLIENT_ERR.format(arg_prefix=arg_prefix)
client_csv = CLIENT_CSV.format(arg_prefix=arg_prefix)
termination = termination or CLIENT_DEFAULT_TERMINATION
if termination[0] == 'time':
termination_flag = f'-t {termination[1]}'
timeout = termination[1] * CLIENT_TIMEOUT_FACTOR
elif termination[0] == 'iterations':
termination_flag = f'-i {termination[1]}'
# TOD: one second per iteration seems to long
timeout = termination[1]
else:
log.error('Unknown termination option: %s', termination[0])
out = result_dir / client_out
err = result_dir / client_err
csv = result_dir / client_csv
if not hosts:
cmd = CLIENT_CMD.format(host=server_host,
outfile=csv,
termination_flag=termination_flag,
**args,
**substitutions)
return run_local_client(cmd, out, err, timeout, env=env)
assert hosts
assert remote_cmd
cmd = CLIENT_CMD.format(host=server_host,
outfile='{outfile}',
termination_flag=termination_flag,
**args,
**substitutions)
return run_remote_clients(cmd, out, err, csv, remote_cmd, hosts, timeout)
def run_local_client(cmd: str,
out: str,
err: str,
timeout,
env=None) -> BenchResult:
"""Run a local echo client"""
ret = BenchResult.OK
client_env = prepare_env(env or {})
try:
client = Client(cmd, out, err, env=client_env)
if client.wait(timeout) != 0:
ret = BenchResult.FAIL
log.error('Client terminated unsuccessful: %s', client.poll())
except subprocess.TimeoutExpired:
ret = BenchResult.TIMEOUT
log.error('Client timed out')
purge_empty_files([out, err])
return ret
def run_remote_clients(cmd: str, out: Path, err: Path, csv: Path,
remote_cmd: str, hosts: Sequence[str],
timeout) -> BenchResult:
"""Run clients on multiple remote hosts"""
ret = BenchResult.OK
clients = []
files = []
# Start coordinator
coordinator = Coordinator(len(hosts))
cmd += f' --coordinator {HOSTNAME}'
for host in hosts:
# prepend out files with the host the client runs on
out = out.parent / f'{host}-{out.name}'
err = err.parent / f'{host}-{err.name}'
csv = csv.parent / f'{host}-{csv.name}'
files.extend([out, err, csv])
cmd = cmd.format(outfile=csv)
clients.append(Client(cmd, out, err, remote_cmd=remote_cmd, host=host))
# await all clients
for client in clients:
try:
if client.wait(timeout) != 0:
ret = BenchResult.FAIL
log.error('Client at %s terminated unsuccessful: %s',
client.host, client.poll())
except subprocess.TimeoutExpired:
ret = BenchResult.TIMEOUT
log.error('Client at %s timed out', client.host)
client.kill()
purge_empty_files(files)
coordinator.kill()
coordinator.wait(TERMINATION_TIME)
return ret
class Client(subprocess.Popen):
"""Class wrapping a remote or local echo client subprocess"""
def __init__(self,
cmd: str,
out_file: Union[str, Path],
err_file: Union[str, Path],
env=None,
remote_cmd=None,
host=None):
self.cmd = cmd
self.bin = cmd.split()[0]
self.remote_cmd = remote_cmd
if remote_cmd:
self.cmd = f'{remote_cmd} {cmd}'
assert host
self.host = host
log.debug('Creating %s client: %s', host or 'local', {
'cmd': self.cmd,
'out': out_file,
'err': err_file,
'host': host
})
self.fout = open(out_file, 'w')
self.ferr = open(err_file, 'w')
super().__init__(cmd.split(),
stdout=self.fout,
stderr=self.ferr,
env=env,
text=True) # type: ignore
def kill(self):
"""Kill a running client"""
if self.poll() is not None:
return
if not self.remote_cmd:
super().kill()
else:
# kill the client using pkill(1) -9
kill_cmd = f'{self.remote_cmd} {KILL_CMD.format(proc=self.bin)}'
cmd_run(kill_cmd, log, check=False)
self.wait(TERMINATION_TIME)
def close_out_files(self):
"""Close the output filed"""
self.fout.close()
self.ferr.close()
def __del__(self):
# the client is still alive
if self.poll() is None:
self.kill()
self.close_out_files()
# Copyright 2021 Florian Fischer
"""Coordniator to synchronize echoclients"""
import subprocess
import logging
from .globalvars import CLIENT_DIR
log = logging.getLogger(__name__)
COORDINATOR_EXE = f'{CLIENT_DIR}/build/apps/coordinator'
class Coordinator(subprocess.Popen):
"""Coordniator class usable in a with statement"""
def __init__(self, nclients):
cmd = f'{COORDINATOR_EXE} {nclients}'
log.debug('Creating coordinator object with cmd: %s', cmd)
super().__init__(cmd.split(),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
def __enter__(self):
log.debug('Start coordinator')
super().__enter__()
def __exit__(self, exc_type, value, traceback): # pylint: disable=arguments-differ
log.debug('Join coordinator')
super().__exit__(exc_type, value, traceback)
# Copyright 2021 Florian Fischer
"""Common global variables"""
import platform
from pathlib import Path
ROOT_DIR = Path(__file__).parents[1]
CLIENT_DIR = ROOT_DIR / 'emper-client'
SERVER_SRC_DIR = ROOT_DIR / 'servers'
TERMINATION_TIME = 10
"""Timeout used for awaiting termination of subprocesses"""
HOSTNAME = platform.node()
KILL_CMD = 'pkill -9 -f {proc}'
"""Command used to kill ot terminating subprocesses"""
# Copyright 2021 Florian Fischer
"""Server subprocess related code"""
import logging
from pathlib import Path
import subprocess
from .globalvars import KILL_CMD, TERMINATION_TIME
from .util import cmd_run, prepare_env
log = logging.getLogger(__name__)
SHUTDOWN_CMD = 'nc {host} 12345'
SERVER_OUT = '{run}.out'
SERVER_ERR = '{run}.err'
class Server(subprocess.Popen):
"""Wrapper around a server subprocess"""
def __init__(self,
name: str,
cmd: str,
run: int,
bench_dir: Path,
env=None,
measure_cmd=None,
remote_cmd=None,
host=None):
self.name = name
self.cmd = cmd
self.bin = cmd.split()[0]
self.remote_cmd = remote_cmd
if measure_cmd:
self.cmd = f'{measure_cmd} {cmd}'
if remote_cmd:
self.cmd = f'{remote_cmd} {cmd}'
assert host
self.host = host
out_file = bench_dir / SERVER_OUT.format(run=run)
err_file = bench_dir / SERVER_ERR.format(run=run)
log.debug(
'Creating %s server: %s', name, {
'cmd': self.cmd,
'out': out_file,
'err': err_file,
'env': env,
'host': host or 'localhost'
})
self.fout = open(out_file, 'w')
self.ferr = open(err_file, 'w')
server_env = prepare_env(env or {})
super().__init__(cmd.split(),
stdout=self.fout,
stderr=self.ferr,
env=server_env,
text=True) # type: ignore
def shutdown(self) -> bool:
"""Gracfully shutdown a running server"""
if self.poll() is not None:
return True
shutdown_cmd = SHUTDOWN_CMD.format(host=self.host or 'localhost')
log.debug('Shutting down %s server by running: %s', self.name,
shutdown_cmd)
# try to shutdown by sending "quit" to the server
try:
subprocess.run(shutdown_cmd.split(),
input=b'quit\n',
check=True,
timeout=TERMINATION_TIME)
# wait for the server to terminate
self.wait(TERMINATION_TIME)
return True
except subprocess.CalledProcessError as run_err:
log.warning('%s terminated unsuccessful: %s', shutdown_cmd,
run_err)
except subprocess.TimeoutExpired as timeout_err:
log.warning('%s termination timeout: %s', shutdown_cmd,
timeout_err)
return False
def kill(self):
"""Kill a running server"""
if self.poll() is not None:
return
# kill the server using pkill(1) -9
kill_cmd = KILL_CMD.format(proc=self.bin)
if self.remote_cmd:
kill_cmd = f'{self.remote_cmd} {kill_cmd}'
cmd_run(kill_cmd, log, check=False)
self.wait(TERMINATION_TIME)
def __del__(self):
# the server is still alive
if self.poll() is None:
self.kill()
if not self.shutdown():
self.kill()
self.fout.close()
self.ferr.close()
# Copyright 2021 Florian Fischer
"""Definitions of the available server implementations to evaluate"""
from .globalvars import ROOT_DIR, SERVER_SRC_DIR
GO_20_WORKER_ENV_CMD = 'env GOMAXPROCS=20'
TOKIO_20_WORKER_ENV_CMD = 'env TOKIO_WORKER_COUNT=20'
SERVER_CMDS = {
'go': f'{SERVER_SRC_DIR}/go-echo-server/echo',
'go-100us': f'{SERVER_SRC_DIR}/go-echo-server/echo -computation 100',
'go-200us': f'{SERVER_SRC_DIR}/go-echo-server/echo -computation 200',
'go-20': f'{GO_20_WORKER_ENV_CMD} {SERVER_SRC_DIR}/go-echo-server/echo',
'go-20-100us':
f'{GO_20_WORKER_ENV_CMD} {SERVER_SRC_DIR}/go-echo-server/echo -computation 100',
'go-20-200us':
f'{GO_20_WORKER_ENV_CMD} {SERVER_SRC_DIR}/go-echo-server/echo -computation 200',
'tokio': f'{SERVER_SRC_DIR}/tokio-echo/target/release/tokio-echo',
'tokio-100us':
f'{SERVER_SRC_DIR}/tokio-echo/target/release/tokio-echo 100',
'tokio-200us':
f'{SERVER_SRC_DIR}/tokio-echo/target/release/tokio-echo 200',
'tokio-20':
f'{TOKIO_20_WORKER_ENV_CMD} {SERVER_SRC_DIR}/tokio-echo/target/release/tokio-echo',
'tokio-20-100us':
f'{TOKIO_20_WORKER_ENV_CMD} {SERVER_SRC_DIR}/tokio-echo/target/release/tokio-echo 100',
'tokio-20-200us':
f'{TOKIO_20_WORKER_ENV_CMD} {SERVER_SRC_DIR}/tokio-echo/target/release/tokio-echo 200',
#'libevent-single': f'{SERVER_SRC_DIR}/libevent-single/echoserver',
'libevent-memcached': f'{SERVER_SRC_DIR}/libevent-echo/echoserver',
#'libevent-cemer': f'{SERVER_SRC_DIR}/libevent-cemer-thread/echoserver_threaded',
#'naive-singlethreaded': f'{SERVER_SRC_DIR}/naive-singlethreaded/echoserver',
'naive-multithreaded': f'{SERVER_SRC_DIR}/naive-multithreaded/echoserver',
'emper': f'{ROOT_DIR}/emper-{{flavor}}/build/apps/echoserver 12345',
'burak':
f'{SERVER_SRC_DIR}/burak/build-release/src/emper-echo-server 12345'
}
# Copyright 2021 Florian Fischer
"""Utility functions"""
import logging
import os
import subprocess
from typing import Dict, MutableMapping
def prepare_env(update_env: MutableMapping) -> Dict:
"""Update and return the a copy of os.environ with a new mapping"""
current_env = dict(os.environ)
current_env.update(update_env)
return current_env
def cmd_run(cmd: str, log: logging.Logger, cwd=None, env=None, check=True):
"""Run a verbosity aware subcommand"""
stdout = None
if log.getEffectiveLevel() > logging.DEBUG:
stdout = subprocess.DEVNULL
log.debug('Running subcommand: %s', cmd)
try:
subprocess.run(cmd.split(),
stdout=stdout,
stderr=subprocess.PIPE,
text=True,
env=env,
check=check,
cwd=cwd)
except subprocess.CalledProcessError as run_err:
log.critical('subcommand failed: %s', run_err)
log.debug('%s', run_err.stderr)
raise run_err
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment