client.py 6.97 KB
Newer Older
Florian Fischer's avatar
Florian Fischer committed
1
2
3
4
5
6
# Copyright 2021 Florian Fischer
"""Echoclient related code"""

import logging
import os
from pathlib import Path
7
import pprint as pp
Florian Fischer's avatar
Florian Fischer committed
8
9
10
11
12
import subprocess
from typing import Mapping, Sequence, Union

from .benchresult import BenchResult
from .coordinator import Coordinator
13
from .globalvars import EMPER_CLIENT_DIR, CLIENTS_DIR, HOSTNAME, KILL_CMD, TERMINATION_TIME
Florian Fischer's avatar
Florian Fischer committed
14
15
16
17
from .util import cmd_run, prepare_env

log = logging.getLogger(__name__)

18
19
20
21
CLIENT_IMPLEMENTATIONS = {
    'emper': f'{EMPER_CLIENT_DIR}/build/apps/echoclient',
    'io_uring': f'{CLIENTS_DIR}/io_uring/build/client',
}
Florian Fischer's avatar
Florian Fischer committed
22
23
24
25
26
27
28
29

CLIENT_CSV = '{arg_prefix}.ini'
CLIENT_OUT = '{arg_prefix}.out'
CLIENT_ERR = '{arg_prefix}.err'

CLIENT_TIMEOUT_FACTOR = 2
CLIENT_DEFAULT_TERMINATION = ('time', 60)

30
CLIENT_CMD = '{exe} -c {cons} -s {size} {termination_flag} -a {host} -f {outfile} --no-quit'
Florian Fischer's avatar
Florian Fischer committed
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47


def purge_empty_files(files: Sequence[Union[str, Path]]):
    """Remove empty files"""
    for _file in files:
        if os.path.exists(_file) and os.stat(_file).st_size == 0:
            os.remove(_file)


def run_clients(run: int,
                args: Mapping,
                result_dir,
                server_host: str,
                substitutions=None,
                termination=None,
                remote_cmd=None,
                hosts=None,
48
49
                env=None,
                implementation='emper') -> BenchResult:
Florian Fischer's avatar
Florian Fischer committed
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
    """Run one or multiple local or remote clients fro args"""

    arg_prefix_template = f'{run}.{{{"}.{".join(sorted(args.keys()))}}}'
    arg_prefix = arg_prefix_template.format(run=run, **args)
    log.debug('output file prefix: %s generated from: %s', arg_prefix,
              arg_prefix_template)

    client_out = CLIENT_OUT.format(arg_prefix=arg_prefix)
    client_err = CLIENT_ERR.format(arg_prefix=arg_prefix)
    client_csv = CLIENT_CSV.format(arg_prefix=arg_prefix)

    termination = termination or CLIENT_DEFAULT_TERMINATION
    if termination[0] == 'time':
        termination_flag = f'-t {termination[1]}'
        timeout = termination[1] * CLIENT_TIMEOUT_FACTOR
    elif termination[0] == 'iterations':
        termination_flag = f'-i {termination[1]}'
67
        # TODO: one second per iteration seems to long
Florian Fischer's avatar
Florian Fischer committed
68
69
70
71
72
73
74
75
76
        timeout = termination[1]
    else:
        log.error('Unknown termination option: %s', termination[0])

    out = result_dir / client_out
    err = result_dir / client_err
    csv = result_dir / client_csv

    if not hosts:
77
78
        cmd = CLIENT_CMD.format(exe=CLIENT_IMPLEMENTATIONS[implementation],
                                host=server_host,
Florian Fischer's avatar
Florian Fischer committed
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
                                outfile=csv,
                                termination_flag=termination_flag,
                                **args,
                                **substitutions)
        return run_local_client(cmd, out, err, timeout, env=env)

    assert hosts
    assert remote_cmd
    cmd = CLIENT_CMD.format(host=server_host,
                            outfile='{outfile}',
                            termination_flag=termination_flag,
                            **args,
                            **substitutions)
    return run_remote_clients(cmd, out, err, csv, remote_cmd, hosts, timeout)


def run_local_client(cmd: str,
                     out: str,
                     err: str,
                     timeout,
                     env=None) -> BenchResult:
    """Run a local echo client"""
    ret = BenchResult.OK
    client_env = prepare_env(env or {})
    try:
        client = Client(cmd, out, err, env=client_env)
        if client.wait(timeout) != 0:
            ret = BenchResult.FAIL
            log.error('Client terminated unsuccessful: %s', client.poll())

    except subprocess.TimeoutExpired:
        ret = BenchResult.TIMEOUT
        log.error('Client timed out')

    purge_empty_files([out, err])

    return ret


def run_remote_clients(cmd: str, out: Path, err: Path, csv: Path,
                       remote_cmd: str, hosts: Sequence[str],
                       timeout) -> BenchResult:
    """Run clients on multiple remote hosts"""
    ret = BenchResult.OK
    clients = []
    files = []

    # Start coordinator
Florian Fischer's avatar
Florian Fischer committed
127
128
129
130
    c_out = out.parent / f'coordinator-{out.name}'
    c_err = err.parent / f'coordinator-{err.name}'
    coordinator = Coordinator(len(hosts), out_file=c_out, err_file=c_err)

Florian Fischer's avatar
Florian Fischer committed
131
132
133
134
    cmd += f' --coordinator {HOSTNAME}'

    for host in hosts:
        # prepend out files with the host the client runs on
135
136
137
        h_out = out.parent / f'{host}-{out.name}'
        h_err = err.parent / f'{host}-{err.name}'
        h_csv = csv.parent / f'{host}-{csv.name}'
Florian Fischer's avatar
Florian Fischer committed
138

139
        files.extend([h_out, h_err, h_csv])
Florian Fischer's avatar
Florian Fischer committed
140

141
        r_cmd = remote_cmd.format(host=host)
142
        h_cmd = cmd.format(outfile=h_csv)
Florian Fischer's avatar
Florian Fischer committed
143

144
145
        clients.append(Client(h_cmd, h_out, h_err, remote_cmd=r_cmd,
                              host=host))
Florian Fischer's avatar
Florian Fischer committed
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185

    # await all clients
    for client in clients:
        try:
            if client.wait(timeout) != 0:
                ret = BenchResult.FAIL
                log.error('Client at %s terminated unsuccessful: %s',
                          client.host, client.poll())

        except subprocess.TimeoutExpired:
            ret = BenchResult.TIMEOUT
            log.error('Client at %s timed out', client.host)
            client.kill()

    purge_empty_files(files)

    coordinator.kill()
    coordinator.wait(TERMINATION_TIME)

    return ret


class Client(subprocess.Popen):
    """Class wrapping a remote or local echo client subprocess"""
    def __init__(self,
                 cmd: str,
                 out_file: Union[str, Path],
                 err_file: Union[str, Path],
                 env=None,
                 remote_cmd=None,
                 host=None):
        self.cmd = cmd
        self.bin = cmd.split()[0]
        self.remote_cmd = remote_cmd

        if remote_cmd:
            self.cmd = f'{remote_cmd} {cmd}'
            assert host
            self.host = host

186
187
188
189
190
191
192
193
        log.debug(
            'Creating %s client:\n%s', host or 'local',
            pp.pformat({
                'cmd': self.cmd,
                'out': out_file,
                'err': err_file,
                'host': host
            }))
Florian Fischer's avatar
Florian Fischer committed
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
        self.fout = open(out_file, 'w')
        self.ferr = open(err_file, 'w')

        super().__init__(cmd.split(),
                         stdout=self.fout,
                         stderr=self.ferr,
                         env=env,
                         text=True)  # type: ignore

    def kill(self):
        """Kill a running client"""
        if self.poll() is not None:
            return

        if not self.remote_cmd:
            super().kill()
        else:
            # kill the client using pkill(1) -9
            kill_cmd = f'{self.remote_cmd} {KILL_CMD.format(proc=self.bin)}'

            cmd_run(kill_cmd, log, check=False)

        self.wait(TERMINATION_TIME)

    def close_out_files(self):
        """Close the output filed"""
        self.fout.close()
        self.ferr.close()

    def __del__(self):
        # the client is still alive
        if self.poll() is None:
            self.kill()

        self.close_out_files()