Commit 4d47c729 authored by Florian Fischer's avatar Florian Fischer
Browse files

Merge branch 'multiple-clients'

parents ccd5e6ab 0c3c64db
Pipeline #67419 failed with stage
in 23 seconds
.PHONY: all clean check fotmat check-format check-pylint clean-emper clean-emper-client eval servers clients
PYTHONFILES := eval.py plot.py parse_results.py plot_tail_latency.py plot_utils.py
PYTHONFILES := eval.py plot.py parse_results.py plot_tail_latency.py plot_utils.py bench/*.py
all: servers clients
servers:
make -C servers/
$(MAKE) -C servers/
clients:
make -C clients/
$(MAKE) -C clients/
check: check-pylint check-format
check: check-pylint check-format check-mypy
format:
yapf -i $(PYTHONFILES)
......@@ -21,6 +21,9 @@ check-format:
check-pylint:
pylint --rcfile=.pylint.rc -j 0 $(PYTHONFILES) || ./tools/check-pylint $$?
check-mypy:
mypy --ignore-missing-imports $(PYTHONFILES)
clean-emper-client:
rm -rf emper-client
git -C emper/ worktree prune
......@@ -30,8 +33,8 @@ clean-emper:
git -C emper/ worktree prune
clean: clean-emper
make -C servers clean
make -C clients clean
$(MAKE) -C servers clean
$(MAKE) -C clients clean
eval: all
./eval.py
# Copyright 2021 Florian Fischer
"""Result of a single client iteration"""
from enum import Enum
class BenchResult(Enum):
"""Enum for the exit types of a client execution"""
OK = 1
TIMEOUT = 2
FAIL = 3
SKIP = 4
# Copyright 2021 Florian Fischer
"""Echoclient related code"""
import logging
import os
from pathlib import Path
import subprocess
from typing import Mapping, Sequence, Union
from .benchresult import BenchResult
from .coordinator import Coordinator
from .globalvars import CLIENT_DIR, HOSTNAME, KILL_CMD, TERMINATION_TIME
from .util import cmd_run, prepare_env
log = logging.getLogger(__name__)
CLIENT_EXE = f'{CLIENT_DIR}/build/apps/echoclient'
CLIENT_CSV = '{arg_prefix}.ini'
CLIENT_OUT = '{arg_prefix}.out'
CLIENT_ERR = '{arg_prefix}.err'
CLIENT_TIMEOUT_FACTOR = 2
CLIENT_DEFAULT_TERMINATION = ('time', 60)
CLIENT_CMD = (f'{CLIENT_EXE} -c {{cons}} -s {{size}} {{termination_flag}}'
' -a {host} -f {outfile} --no-quit')
def purge_empty_files(files: Sequence[Union[str, Path]]):
"""Remove empty files"""
for _file in files:
if os.path.exists(_file) and os.stat(_file).st_size == 0:
os.remove(_file)
def run_clients(run: int,
args: Mapping,
result_dir,
server_host: str,
substitutions=None,
termination=None,
remote_cmd=None,
hosts=None,
env=None) -> BenchResult:
"""Run one or multiple local or remote clients fro args"""
arg_prefix_template = f'{run}.{{{"}.{".join(sorted(args.keys()))}}}'
arg_prefix = arg_prefix_template.format(run=run, **args)
log.debug('output file prefix: %s generated from: %s', arg_prefix,
arg_prefix_template)
client_out = CLIENT_OUT.format(arg_prefix=arg_prefix)
client_err = CLIENT_ERR.format(arg_prefix=arg_prefix)
client_csv = CLIENT_CSV.format(arg_prefix=arg_prefix)
termination = termination or CLIENT_DEFAULT_TERMINATION
if termination[0] == 'time':
termination_flag = f'-t {termination[1]}'
timeout = termination[1] * CLIENT_TIMEOUT_FACTOR
elif termination[0] == 'iterations':
termination_flag = f'-i {termination[1]}'
# TOD: one second per iteration seems to long
timeout = termination[1]
else:
log.error('Unknown termination option: %s', termination[0])
out = result_dir / client_out
err = result_dir / client_err
csv = result_dir / client_csv
if not hosts:
cmd = CLIENT_CMD.format(host=server_host,
outfile=csv,
termination_flag=termination_flag,
**args,
**substitutions)
return run_local_client(cmd, out, err, timeout, env=env)
assert hosts
assert remote_cmd
cmd = CLIENT_CMD.format(host=server_host,
outfile='{outfile}',
termination_flag=termination_flag,
**args,
**substitutions)
return run_remote_clients(cmd, out, err, csv, remote_cmd, hosts, timeout)
def run_local_client(cmd: str,
out: str,
err: str,
timeout,
env=None) -> BenchResult:
"""Run a local echo client"""
ret = BenchResult.OK
client_env = prepare_env(env or {})
try:
client = Client(cmd, out, err, env=client_env)
if client.wait(timeout) != 0:
ret = BenchResult.FAIL
log.error('Client terminated unsuccessful: %s', client.poll())
except subprocess.TimeoutExpired:
ret = BenchResult.TIMEOUT
log.error('Client timed out')
purge_empty_files([out, err])
return ret
def run_remote_clients(cmd: str, out: Path, err: Path, csv: Path,
remote_cmd: str, hosts: Sequence[str],
timeout) -> BenchResult:
"""Run clients on multiple remote hosts"""
ret = BenchResult.OK
clients = []
files = []
# Start coordinator
coordinator = Coordinator(len(hosts))
cmd += f' --coordinator {HOSTNAME}'
for host in hosts:
# prepend out files with the host the client runs on
out = out.parent / f'{host}-{out.name}'
err = err.parent / f'{host}-{err.name}'
csv = csv.parent / f'{host}-{csv.name}'
files.extend([out, err, csv])
cmd = cmd.format(outfile=csv)
clients.append(Client(cmd, out, err, remote_cmd=remote_cmd, host=host))
# await all clients
for client in clients:
try:
if client.wait(timeout) != 0:
ret = BenchResult.FAIL
log.error('Client at %s terminated unsuccessful: %s',
client.host, client.poll())
except subprocess.TimeoutExpired:
ret = BenchResult.TIMEOUT
log.error('Client at %s timed out', client.host)
client.kill()
purge_empty_files(files)
coordinator.kill()
coordinator.wait(TERMINATION_TIME)
return ret
class Client(subprocess.Popen):
"""Class wrapping a remote or local echo client subprocess"""
def __init__(self,
cmd: str,
out_file: Union[str, Path],
err_file: Union[str, Path],
env=None,
remote_cmd=None,
host=None):
self.cmd = cmd
self.bin = cmd.split()[0]
self.remote_cmd = remote_cmd
if remote_cmd:
self.cmd = f'{remote_cmd} {cmd}'
assert host
self.host = host
log.debug('Creating %s client: %s', host or 'local', {
'cmd': self.cmd,
'out': out_file,
'err': err_file,
'host': host
})
self.fout = open(out_file, 'w')
self.ferr = open(err_file, 'w')
super().__init__(cmd.split(),
stdout=self.fout,
stderr=self.ferr,
env=env,
text=True) # type: ignore
def kill(self):
"""Kill a running client"""
if self.poll() is not None:
return
if not self.remote_cmd:
super().kill()
else:
# kill the client using pkill(1) -9
kill_cmd = f'{self.remote_cmd} {KILL_CMD.format(proc=self.bin)}'
cmd_run(kill_cmd, log, check=False)
self.wait(TERMINATION_TIME)
def close_out_files(self):
"""Close the output filed"""
self.fout.close()
self.ferr.close()
def __del__(self):
# the client is still alive
if self.poll() is None:
self.kill()
self.close_out_files()
# Copyright 2021 Florian Fischer
"""Coordniator to synchronize echoclients"""
import subprocess
import logging
from .globalvars import CLIENT_DIR
log = logging.getLogger(__name__)
COORDINATOR_EXE = f'{CLIENT_DIR}/build/apps/coordinator'
class Coordinator(subprocess.Popen):
"""Coordniator class usable in a with statement"""
def __init__(self, nclients):
cmd = f'{COORDINATOR_EXE} {nclients}'
log.debug('Creating coordinator object with cmd: %s', cmd)
super().__init__(cmd.split(),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
def __enter__(self):
log.debug('Start coordinator')
super().__enter__()
def __exit__(self, exc_type, value, traceback): # pylint: disable=arguments-differ
log.debug('Join coordinator')
super().__exit__(exc_type, value, traceback)
# Copyright 2021 Florian Fischer
"""Emper flavor definitions, selection, preparation and building"""
import fnmatch
import logging
from pathlib import Path
import os
import subprocess
import sys
from typing import Sequence, Union
from .client import CLIENT_DIR, CLIENT_EXE
from .globalvars import ROOT_DIR
from .util import cmd_run, prepare_env
log = logging.getLogger(__name__)
SERVER_CMD = f'{ROOT_DIR}/emper-{{flavor}}/build/apps/echoserver 12345'
ENV_CMD_20_WORKER = 'env EMPER_WORKER_COUNT=20'
REPO = f'{ROOT_DIR}/emper'
DEFAULT_CHECKOUT = 'origin/master'
CLIENT_CHECKOUT = DEFAULT_CHECKOUT
FIRST_CHECKOUT = 'muhq/io-first'
LINKED_FUTURE_CHECKOUT = 'muhq/linked_future_echoserver'
REAP_COMPLETIONS_PROVIDE_MEMORY_HEAP = 'muhq/reapCompletions_provide_memory_heap'
WAKEUP_CONDVAR_CHECKOUT = 'cfba846c7763faf63877329b227c2b23af4fc568'
BEFORE_COUNTING_TRY_LOCK_CHECKOUT = '5edc8e5eea4134badfa0bf1ee35bf8b671037328'
BEFORE_BATCH_CQE_CHECKOUT = 'c8828c263618865866f20870c8488d8de11c776a'
BEFORE_CHANGES_TO_WORKER_SLEEP_ALGO = 'cfba846c7763faf63877329b227c2b23af4fc568'
BATCH_DEQUEUE_ANYWHERE = '6d63965bccb9c9612b85bbb46c31a1eacd3c6693'
CQ_LOCK_CHECKOUT = 'muhq/cq_lock'
BUF_SEL_CHECKOUT = 'muhq/buffer-selection'
SQPOLL_CHECKOUT = 'muhq/sqpoll'
FlavorDesc = dict[str, Union[list[str], str]]
Flavors = dict[str, FlavorDesc]
FLAVORS: Flavors = {
'vanilla': {},
#'buf-sel': {
# 'checkout': BUF_SEL_CHECKOUT,
#},
'pipe': {
'meson_options': '-Dworker_sleep_strategy=pipe'
},
'pipe-no-completer': {
'meson_options':
'-Dworker_sleep_strategy=pipe -Dio_completer_behavior=none'
},
'no-stats': {
'meson_options': '-Dstats=False'
},
'heap-alloc-continuation-buf': {
'checkout': REAP_COMPLETIONS_PROVIDE_MEMORY_HEAP
},
'before-batch-cqe': {
'checkout': BEFORE_BATCH_CQE_CHECKOUT,
'meson_options': '-Dlog_level=OFF',
'patches': ['fix_free_vs_delete.patch', 'before-batch-cqe.patch'],
},
'notify-all': {
'meson_options': '-Dworker_wakeup_strategy=all'
},
'no-sleep': {
'meson_options': '-Dworker_sleep=false'
},
'sqpoll': {
'meson_options': '-Dio_uring_sq_poller=one',
'checkout': SQPOLL_CHECKOUT,
'patches': ['getcpu.patch']
},
'sqpoll-numa': {
'meson_options': '-Dio_uring_sq_poller=numa',
'checkout': SQPOLL_CHECKOUT,
'patches': ['getcpu.patch']
},
'callback': {
'patches': ['use_callback_echoserver.patch']
},
'wakeup-condvar': {
'checkout': WAKEUP_CONDVAR_CHECKOUT,
'patches': ['fix_free_vs_delete.patch', 'wakeup-condvar.patch'],
},
'wakeup-all-condvar': {
'checkout': WAKEUP_CONDVAR_CHECKOUT,
'meson_options': '-Dworker_wakeup_strategy=all',
'patches': ['fix_free_vs_delete.patch', 'wakeup-condvar.patch'],
},
'wakeup-futex': {
'meson_options': '-Dwakeup_semaphore_implementation=futex'
},
'wakeup-all-futex': {
'meson_options':
'-Dworker_wakeup_strategy=all -Dwakeup_semaphore_implementation=futex'
},
'shared-mutex': {
'meson_option': '-Dlocked_unbounded_queue_implementation=shared_mutex'
},
'boost-shared-mutex': {
'meson_option':
'-Dlocked_unbounded_queue_implementation=boost_shared_mutex'
},
'pthread-rwlock': {
'meson_option': '-Dlocked_unbounded_queue_implementation=rwlock'
},
'cq-mutex': {
'meson_options': '-Dio_cq_lock_implementation=mutex'
},
'cq-spin-lock': {
'meson_options': '-Dio_cq_lock_implementation=spin_lock'
},
'big-reap-cqe-section': {
'checkout': BEFORE_COUNTING_TRY_LOCK_CHECKOUT,
'meson_options': '-Dlog_level=OFF',
'patches': ['fix_free_vs_delete.patch', 'before-batch-cqe.patch'],
},
'completer-wakeup': {
'meson_options': '-Dio_completer_behavior=maybe_wakeup'
},
'no-sleep-computation-100us': {
'meson_options': '-Dworker_sleep=false',
'server_cmd_suffix': '100'
},
'no-sleep-no-completer-computation-100us': {
'meson_options':
'-Dworker_sleep=false -Dio_completer_behavior=maybe_wakeup',
'server_cmd_suffix': '100'
},
'no-sleep-computation-1ms': {
'meson_options': '-Dworker_sleep=false',
'server_cmd_suffix': ' 1000'
},
'no-sleep-no-completer-computation-1ms': {
'meson_options':
'-Dworker_sleep=false -Dio_completer_behavior=maybe_wakeup',
'server_cmd_suffix': ' 1000'
},
'computation-100us': {
'server_cmd_suffix': '100'
},
'computation-200us': {
'server_cmd_suffix': ' 200'
},
'completer-wakeup-computation-100us': {
'meson_options': '-Dio_completer_behavior=maybe_wakeup',
'server_cmd_suffix': '100'
},
'computation-10us': {
'server_cmd_suffix': '10'
},
'completer-wakeup-computation-10us': {
'meson_options': '-Dio_completer_behavior=maybe_wakeup',
'server_cmd_suffix': '10'
},
'mutex-computation-100us': {
'meson_options': '-Dio_cq_lock_implementation=mutex',
'server_cmd_suffix': '100'
},
'simple-arch': {
'meson_options': '-Dio_single_uring=true',
},
'simple-arch-try-syscall': {
'meson_options': '-Dio_single_uring=true -Dio_try_syscall=true',
},
'first': {
'checkout': FIRST_CHECKOUT,
'patches': ['fix_free_vs_delete.patch', 'wakeup-condvar.patch'],
},
'4': {
'server_cmd_prefix': 'env WORKER_COUNT=4'
},
'20': {
'server_cmd_prefix': ENV_CMD_20_WORKER,
},
'20-nopin': {
'server_cmd_prefix': ENV_CMD_20_WORKER + ' PIN_WORKERS=false',
},
'20-computation-100us': {
'server_cmd_prefix': ENV_CMD_20_WORKER,
'server_cmd_suffix': '100'
},
'20-nopin-computation-100us': {
'server_cmd_prefix': ENV_CMD_20_WORKER + ' PIN_WORKERS=false',
'server_cmd_suffix': '100'
},
'20-computation-200us': {
'server_cmd_prefix': ENV_CMD_20_WORKER,
'server_cmd_suffix': '200'
},
'20-nopin-computation-200us': {
'server_cmd_prefix': ENV_CMD_20_WORKER + ' PIN_WORKERS=false',
'server_cmd_suffix': '200'
},
'linked-futures': {
'checkout': LINKED_FUTURE_CHECKOUT,
'patches': ['fix_free_vs_delete.patch', 'wakeup-condvar.patch'],
},
'linked-futures-notify-all': {
'checkout': LINKED_FUTURE_CHECKOUT,
'patches': ['fix_free_vs_delete.patch', 'wakeup-condvar.patch'],
'meson_options': '-Dworker_wakeup_strategy=all'
},
'before-new-worker-sleep-algo': {
'checkout': BEFORE_CHANGES_TO_WORKER_SLEEP_ALGO,
'patches': ['fix_free_vs_delete.patch', 'before-batch-cqe.patch'],
'meson_options': '-Dlog_level=OFF',
},
'bdqfa': {
'checkout': BATCH_DEQUEUE_ANYWHERE,
},
'bdqfa-200us': {
'checkout': BATCH_DEQUEUE_ANYWHERE,
'server_cmd_suffix': '200'
},
'bdqfa-20-nopin': {
'checkout': BATCH_DEQUEUE_ANYWHERE,
'server_cmd_prefix': ENV_CMD_20_WORKER + ' PIN_WORKERS=false',
},
'bdqfa-20-nopin-200us': {
'checkout': BATCH_DEQUEUE_ANYWHERE,
'server_cmd_prefix': ENV_CMD_20_WORKER + ' PIN_WORKERS=false',
'server_cmd_suffix': '200'
},
'laws': {
'meson_options':
('-Ddefault_scheduling_strategy=locality_aware_work_stealing '
'-Dset_affinity_on_block=true'),
},
'laws-200us': {
'meson_options':
('-Ddefault_scheduling_strategy=locality_aware_work_stealing '
'-Dset_affinity_on_block=true'),
'server_cmd_suffix':
'200'
},
'laws-20-nopin': {
'server_cmd_prefix':
ENV_CMD_20_WORKER + ' PIN_WORKERS=false',
'meson_options':
('-Ddefault_scheduling_strategy=locality_aware_work_stealing '
'-Dset_affinity_on_block=true'),
},
'laws-20-nopin-200us': {
'server_cmd_prefix':
ENV_CMD_20_WORKER + ' PIN_WORKERS=false',
'meson_options':
('-Ddefault_scheduling_strategy=locality_aware_work_stealing '
'-Dset_affinity_on_block=true'),
'server_cmd_suffix':
'200'
},
'laws-bdqfa': {
'checkout':
BATCH_DEQUEUE_ANYWHERE,
'meson_options':
('-Ddefault_scheduling_strategy=locality_aware_work_stealing '
'-Dset_affinity_on_block=true'),
},
'laws-bdqfa-200us': {
'checkout':
BATCH_DEQUEUE_ANYWHERE,
'meson_options':
('-Ddefault_scheduling_strategy=locality_aware_work_stealing '
'-Dset_affinity_on_block=true'),
'server_cmd_suffix':
'200'
},
'laws-bdqfa-20-nopin': {
'checkout':
BATCH_DEQUEUE_ANYWHERE,
'server_cmd_prefix':
ENV_CMD_20_WORKER + ' PIN_WORKERS=false',
'meson_options':
('-Ddefault_scheduling_strategy=locality_aware_work_stealing '
'-Dset_affinity_on_block=true'),
},
'laws-bdqfa-20-nopin-200us': {
'checkout':
BATCH_DEQUEUE_ANYWHERE,
'server_cmd_prefix':
ENV_CMD_20_WORKER + ' PIN_WORKERS=false',
'meson_options':
('-Ddefault_scheduling_strategy=locality_aware_work_stealing '
'-Dset_affinity_on_block=true'),
'server_cmd_suffix':
'200'
},
}
def repo_add_muhq_remote():
"""Add muhq remote to emper repo"""
cmd = 'git remote add muhq https://gitlab.cs.fau.de/aj46ezos/emper.git'
cmd_run(cmd, log, cwd=REPO)
def checkout_worktree(directory, checkout):
"""Create a new emper worktree of checkout at directory"""
worktree_cmd = f'git worktree add {directory} {checkout}'
cmd_run(worktree_cmd, log, cwd=REPO)
def build_emper(emper_dir, meson_options='', build_env=None):
"""Build a emper variant at emper_dir"""
if not build_env:
build_env = {}
# add debug symbols to the release build
build_env.update({'CFLAGS': '-g', 'CXXFLAGS': '-g'})
meson_env = prepare_env(build_env)
common_options = '-Dstats=true --buildtype=release --fatal-meson-warnings'