diff --git a/src/benchmark.py b/src/benchmark.py index b436c153c5f73e0ebb04603583b122f326a302db..71e4577735a7097d213c567bac40790c5c0a0022 100644 --- a/src/benchmark.py +++ b/src/benchmark.py @@ -1,3 +1,4 @@ +import atexit from collections import namedtuple import csv import itertools @@ -8,8 +9,10 @@ import os import pickle import shutil import subprocess +from time import sleep import src.globalvars +import src.util from src.util import * @@ -17,6 +20,7 @@ from src.util import * nan = np.NaN + class Benchmark (object): perf_allowed = None @@ -28,10 +32,29 @@ class Benchmark (object): "measure_cmd": "perf stat -x, -d", "cmd": "true", + "server_cmds": [], "allocators": src.globalvars.allocators, - "server_benchmark": False, } + @staticmethod + def terminate_subprocess(popen, timeout=5): + """Terminate or kill a Popen object""" + + # Skip already terminated subprocess + if popen.poll() is not None: + return + + print_info("Terminating subprocess", popen.args) + popen.terminate() + try: + print_info("Subprocess exited with ", popen.wait(timeout=timeout)) + except: + print_error("Killing subprocess ", server.args) + popen.kill() + popen.wait() + print_debug("Server Out:", popen.stdout) + print_debug("Server Err:", popen.stderr) + @staticmethod def scale_threads_for_cpus(factor, steps=None): cpus = multiprocessing.cpu_count() @@ -82,6 +105,7 @@ class Benchmark (object): print_debug("Creating benchmark", self.name) print_debug("Cmd:", self.cmd) + print_debug("Server Cmds:", self.server_cmds) print_debug("Args:", self.args) print_debug("Requirements:", self.requirements) print_debug("Results dictionary:", self.results) @@ -168,6 +192,50 @@ class Benchmark (object): if is_fixed: yield p + def start_servers(self, env=None, alloc_name="None", alloc={"cmd_prefix": ""}): + """Start Servers + + Servers are not allowed to deamonize because then they can't + be terminated with their Popen object.""" + self.servers = [] + + substitutions = {"alloc": alloc_name, + "perm": alloc_name, + "builddir": src.globalvars.builddir} + + substitutions.update(alloc) + + for server_cmd in self.server_cmds: + print_info("Starting Server for", alloc_name) + + server_cmd = src.util.prefix_cmd_with_abspath(server_cmd) + server_cmd = "{} {} {}".format(self.measure_cmd, + alloc["cmd_prefix"], + server_cmd) + + server_cmd = server_cmd.format(**substitutions) + print_debug(server_cmd) + + server = subprocess.Popen(server_cmd.split(), env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True) + + #TODO check if server is up + sleep(5) + + ret = server.poll() + if ret is not None: + raise Exception("Starting Server failed with exit code " + str(ret)) + # Register termination of the server + atexit.register(Benchmark.terminate_subprocess, popen=server) + self.servers.append(server) + + def shutdown_servers(self): + print_info("Shutting down servers") + for server in self.servers: + Benchmark.terminate_subprocess(server) + def run(self, runs=5): if runs < 1: return @@ -199,19 +267,23 @@ class Benchmark (object): print_status(run, ". run", sep='') i = 0 - for alloc_name, t in self.allocators.items(): + for alloc_name, alloc in self.allocators.items(): if alloc_name not in self.results: self.results[alloc_name] = {} env = dict(os.environ) env["LD_PRELOAD"] = env.get("LD_PRELOAD", "") env["LD_PRELOAD"] += " " + "build/print_status_on_exit.so" - env["LD_PRELOAD"] += " " + t["LD_PRELOAD"] + env["LD_PRELOAD"] += " " + alloc["LD_PRELOAD"] + + self.start_servers(alloc_name=alloc_name, alloc=alloc, env=env) + # Preallocator hook if hasattr(self, "preallocator_hook"): - self.preallocator_hook((alloc_name, t), run, env, + self.preallocator_hook((alloc_name, alloc), run, env, verbose=src.globalvars.verbosity) + # Run benchmark for alloc for perm in self.iterate_args(): i += 1 print_info0(i, "of", n, "\r", end='') @@ -220,25 +292,17 @@ class Benchmark (object): substitutions = {"run": run} substitutions.update(perm._asdict()) substitutions["perm"] = ("{}-"*(len(perm)-1) + "{}").format(*perm) - substitutions.update(t) + substitutions.update(alloc) actual_cmd = self.cmd.format(**substitutions) actual_env = None - if not self.server_benchmark: - # Find absolute path of executable - binary_end = actual_cmd.find(" ") - binary_end = None if binary_end == -1 else binary_end - cmd_start = len(actual_cmd) if binary_end == None else binary_end - - binary = subprocess.run(["whereis", actual_cmd[0:binary_end]], - stdout=subprocess.PIPE, - universal_newlines=True).stdout.split()[1] - + # Prepend cmd if we are not measuring servers + if self.server_cmds == []: + actual_cmd = src.util.prefix_cmd_with_abspath(actual_cmd) actual_cmd = "{} {} {}{}".format(self.measure_cmd, - t["cmd_prefix"], - binary, - actual_cmd[cmd_start:]) + alloc["cmd_prefix"], + actual_cmd) # substitute again actual_cmd = actual_cmd.format(**substitutions) @@ -264,7 +328,7 @@ class Benchmark (object): # parse and store results else: - if not self.server_benchmark: + if self.server_cmds == []: # Read VmHWM from status file. If our benchmark didn't fork # the first occurance of VmHWM is from our benchmark with open("status", "r") as f: @@ -273,18 +337,21 @@ class Benchmark (object): result["VmHWM"] = l.split()[1] break os.remove("status") - - # Parse perf output if available - if self.measure_cmd == self.defaults["measure_cmd"]: - csvreader = csv.reader(res.stderr.splitlines(), - delimiter=',') - for row in csvreader: - # Split of the user/kernel space info to be better portable - try: - result[row[2].split(":")[0]] = row[0] - except Exception as e: - print_warn("Exception", e, "occured on", row, "for", - alloc_name, "and", perm) + # TODO get VmHWM from servers + else: + pass + + # Parse perf output if available + if self.measure_cmd == self.defaults["measure_cmd"]: + csvreader = csv.reader(res.stderr.splitlines(), + delimiter=',') + for row in csvreader: + # Split of the user/kernel space info to be better portable + try: + result[row[2].split(":")[0]] = row[0] + except Exception as e: + print_warn("Exception", e, "occured on", row, "for", + alloc_name, "and", perm) if hasattr(self, "process_output"): self.process_output(result, res.stdout, res.stderr, @@ -299,9 +366,12 @@ class Benchmark (object): self.results[alloc_name][perm] = [] self.results[alloc_name][perm].append(result) + self.shutdown_servers() + if hasattr(self, "postallocator_hook"): - self.postallocator_hook((alloc_name, t), run, + self.postallocator_hook((alloc_name, alloc), run, verbose=src.globalvars.verbosity) + print() # Reset PATH diff --git a/src/benchmarks/httpd.py b/src/benchmarks/httpd.py index de59d4e02052a0b2b53cad0fe6f0677899406108..980a6cc01ce801ed2d6938b83e3b3cda6f30d472 100644 --- a/src/benchmarks/httpd.py +++ b/src/benchmarks/httpd.py @@ -1,4 +1,3 @@ -import atexit import matplotlib.pyplot as plt import numpy as np import re @@ -8,55 +7,26 @@ from subprocess import PIPE import sys from time import sleep -from src.globalvars import builddir from src.benchmark import Benchmark from src.util import * -server_cmd = "{} -c {}/benchmarks/httpd/nginx/nginx.conf".format(shutil.which("nginx"), builddir).split() - class Benchmark_HTTPD(Benchmark): def __init__(self): self.name = "httpd" self.descrition = """TODO""" - self.args = {"nthreads": Benchmark.scale_threads_for_cpus(2)} - self.cmd = "ab -n 10000 -c {nthreads} localhost:8080/index.html" + self.args = {"nthreads": Benchmark.scale_threads_for_cpus(2), + "site": ["index.html", "index.php"]} + self.cmd = "ab -n 100 -c {nthreads} localhost:8080/{site}" self.measure_cmd = "" - self.server_benchmark = True + self.server_cmds = ["nginx -c {builddir}/benchmarks/httpd/etc/nginx/nginx.conf", + "php-fpm -c {builddir}/benchmarks/httpd/etc/php/php.ini -y {builddir}/benchmarks/httpd/etc/php/php-fpm.conf -F"] self.requirements = ["nginx", "ab"] - atexit.register(self.terminate_server) - super().__init__() - def terminate_server(self): - # check if nginx is running - if os.path.isfile(os.path.join(builddir, "benchmarks", self.name, "nginx", "nginx.pid")): - ret = subprocess.run(server_cmd + ["-s", "stop"], stdout=PIPE, - stderr=PIPE, universal_newlines=True) - - if ret.returncode != 0: - print_debug("Stdout:", ret.stdout) - print_debug("Stderr:", ret.stderr) - raise Exception("Stopping {} failed with {}".format(server_cmd[0], ret.returncode)) - - def preallocator_hook(self, allocator, run, env, verbose): - actual_cmd = allocator[1]["cmd_prefix"].split() + server_cmd - print_info("Starting server with:", actual_cmd) - - ret = subprocess.run(actual_cmd, stdout=PIPE, stderr=PIPE, env=env, - universal_newlines=True) - if ret.returncode != 0: - print_debug("Stdout:", ret.stdout) - print_debug("Stderr:", ret.stderr) - raise Exception("Starting {} for {} failed with {}".format(server_cmd[0], allocator[0], ret.returncode)) - - - def postallocator_hook(self, allocator, run, verbose): - self.terminate_server() - def process_output(self, result, stdout, stderr, allocator, perm, verbose): result["time"] = re.search("Time taken for tests:\s*(\d*\.\d*) seconds", stdout).group(1) result["requests"] = re.search("Requests per second:\s*(\d*\.\d*) .*", stdout).group(1) @@ -74,33 +44,29 @@ class Benchmark_HTTPD(Benchmark): self.calc_desc_statistics() # linear plot - self.plot_single_arg("{requests}", + self.plot_fixed_arg("{requests}", xlabel='"threads"', ylabel='"requests/s"', - title='"ab -n 10000 -c threads"') + autoticks=False, + filepostfix="requests", + title='"ab -n 10000 -c " + str(perm.nthreads)') # linear plot ref_alloc = list(allocators)[0] - self.plot_single_arg("{requests}", + self.plot_fixed_arg("{requests}", xlabel='"threads"', ylabel='"requests/s scaled at " + scale', - title='"ab -n 10000 -c threads (normalized)"', - filepostfix="norm", + title='"ab -n 10000 -c " + str(perm.nthreads) + " (normalized)"', + filepostfix="requests.norm", + autoticks=False, scale=ref_alloc) # bar plot - self.barplot_single_arg("{requests}", - xlabel='"threads"', - ylabel='"requests/s"', - filepostfix="b", - title='"ab -n 10000 -c threads"') + # self.barplot_fixed_arg("{requests}", + # xlabel='"threads"', + # ylabel='"requests/s"', + # filepostfix="b", + # title='"ab -n 10000 -c threads"') - # bar plot - self.barplot_single_arg("{requests}", - xlabel='"threads"', - ylabel='"requests/s scaled at " + scale', - title='"ab -n 10000 -c threads (normalized)"', - filepostfix="norm.b.", - scale=ref_alloc) httpd = Benchmark_HTTPD() diff --git a/src/benchmarks/mysql.py b/src/benchmarks/mysql.py index 752a4ea52fb85a278d8aac06bc7ff058a9ea3a82..fb9c2e7059a450ff72fd43ca3ea6a147a602d28f 100644 --- a/src/benchmarks/mysql.py +++ b/src/benchmarks/mysql.py @@ -1,4 +1,3 @@ -import atexit import copy import matplotlib.pyplot as plt import multiprocessing @@ -21,14 +20,12 @@ prepare_cmd = ("sysbench oltp_read_only --db-driver=mysql --mysql-user=root " "--mysql-socket=" + cwd + "/mysql_test/socket --tables=5 " "--table-size=1000000 prepare").split() -cmd = ("sysbench oltp_read_only --threads={nthreads} --time=60 --tables=5 " +cmd = ("sysbench oltp_read_only --threads={nthreads} --time=10 --tables=5 " "--db-driver=mysql --mysql-user=root --mysql-socket=" + cwd + "/mysql_test/socket run") -server_cmd = ("{0} -h {2}/mysql_test --socket={2}/mysql_test/socket " - "--max-connections={1} " - "--secure-file-priv=").format(shutil.which("mysqld"), - multiprocessing.cpu_count(), cwd).split() +server_cmd = ("mysqld -h {0}/mysql_test --socket={0}/mysql_test/socket " + "--max-connections={1} --secure-file-priv=").format(cwd, multiprocessing.cpu_count()) class Benchmark_MYSQL(Benchmark): @@ -38,49 +35,18 @@ class Benchmark_MYSQL(Benchmark): # mysqld fails with hoard somehow self.allocators = copy.copy(allocators) - if "hoard" in self.allocators: - del(self.allocators["hoard"]) + if "Hoard" in self.allocators: + del(self.allocators["Hoard"]) self.args = {"nthreads": Benchmark.scale_threads_for_cpus(1)} self.cmd = cmd + self.server_cmds = [server_cmd] self.measure_cmd = "" self.requirements = ["mysqld", "sysbench"] - atexit.register(self.terminate_server) - super().__init__() - def start_and_wait_for_server(self, cmd_prefix="", env=None): - actual_cmd = cmd_prefix.split() + server_cmd - print_info("Starting server with:", actual_cmd) - - self.server = subprocess.Popen(actual_cmd, stdout=PIPE, stderr=PIPE, - env=env, universal_newlines=True) - # TODO make sure server comes up ! - sleep(10) - if self.server.poll() is not None: - print_debug("cmd_prefix:", cmd_prefix, file=sys.stderr) - print_debug("Stderr:", self.server.stderr, file=sys.stderr) - return False - - return True - - def terminate_server(self): - if hasattr(self, "server"): - if self.server.poll() is None: - print_info("Terminating mysql server") - self.server.terminate() - - for i in range(0,10): - sleep(1) - if self.server.poll() is not None: - return - - print_info("Killing still running mysql server") - self.server.kill() - self.server.wait() - def prepare(self): super().prepare() @@ -106,8 +72,7 @@ class Benchmark_MYSQL(Benchmark): print_debug(p.stderr, file=sys.stderr) raise Exception("Creating test DB failed with:", p.returncode) - if not self.start_and_wait_for_server(): - raise Exception("Starting mysql server failed with {}".format(self.server.returncode)) + self.start_servers() # Create sbtest TABLE p = subprocess.run(("mysql -u root -S "+cwd+"/mysql_test/socket").split(" "), @@ -128,21 +93,13 @@ class Benchmark_MYSQL(Benchmark): self.terminate_server() raise Exception("Preparing test tables failed with:", p.returncode) - self.terminate_server() + self.shutdown_servers() def cleanup(self): if os.path.exists("mysql_test"): print_status("Delete mysqld directory") shutil.rmtree("mysql_test", ignore_errors=True) - def preallocator_hook(self, allocator, run, env, verbose): - if not self.start_and_wait_for_server(cmd_prefix=allocator[1]["cmd_prefix"], env=env): - print_debug(allocator[1]["cmd_prefix"], file=sys.stderr) - raise Exception("Starting mysql server for {} failed with".format(allocator[0], self.server.returncode)) - - def postallocator_hook(self, allocator, run, verbose): - self.terminate_server() - def process_output(self, result, stdout, stderr, allocator, perm, verbose): result["transactions"] = re.search("transactions:\s*(\d*)", stdout).group(1) result["queries"] = re.search("queries:\s*(\d*)", stdout).group(1) @@ -151,7 +108,7 @@ class Benchmark_MYSQL(Benchmark): result["avg"] = re.search("avg:\s*(\d*.\d*)", stdout).group(1) result["max"] = re.search("max:\s*(\d*.\d*)", stdout).group(1) - with open("/proc/"+str(self.server.pid)+"/status", "r") as f: + with open("/proc/"+str(self.servers[0].pid)+"/status", "r") as f: for l in f.readlines(): if l.startswith("VmHWM:"): result["rssmax"] = int(l.replace("VmHWM:", "").strip().split()[0])