Verified Commit 1da94eeb authored by Sebastian Endres's avatar Sebastian Endres
Browse files

Extend inject_secrets to do generic post processing (frickel commit)

parent 871efaf7
"""Some enums."""
from enum import Enum, IntFlag
from typing import Literal, Union
class PlotMode(Enum):
"""The plot type."""
OFFSET_NUMBER = "offset-number"
PACKET_NUMBER = "packet-number"
FILE_SIZE = "file-size"
PACKET_SIZE = "packet-size"
# SIZE_HIST = "size-hist"
class CacheMode(Enum):
"""The mode of caching."""
NONE = "none"
LOAD = "load"
STORE = "store"
BOTH = "both"
@property
def load(self) -> bool:
"""Is loading enabled?"""
return self in (CacheMode.LOAD, CacheMode.BOTH)
@property
def store(self) -> bool:
"""Is storing enabled?"""
return self in (CacheMode.STORE, CacheMode.BOTH)
class Direction(Enum):
"""Packet flow direction."""
TO_CLIENT = "to_client"
TO_SERVER = "to_server"
def is_opposite(self, other) -> bool:
"""Return True if the direction is the other direction than this."""
return (self == Direction.TO_CLIENT and other == Direction.TO_SERVER) or (
self == Direction.TO_SERVER and other == Direction.TO_CLIENT
)
class Side(Enum):
"""The side of a trace (left=client / right=server)."""
LEFT = "left"
RIGHT = "right"
class PostProcessingMode(IntFlag):
"""The mode of post processing."""
INJECT_SECRETS = 0b01
RENAME_QLOGS = 0b10
ALL = 0b11
NONE = 0b00
@classmethod
def from_str(cls, value: str) -> "PostProcessingMode":
"""Parse from string."""
lookup: dict[str, PostProcessingMode] = {
flag.name.lower(): flag for flag in PostProcessingMode
}
return lookup[value.lower()]
#!/usr/bin/env python3
"""
Post process interop runner logs.
This contains:
- inject secrets
- search and rename qlog files
"""
import argparse
import os
import shutil
import subprocess
import sys
from pathlib import Path
......@@ -9,11 +18,13 @@ from typing import Optional, Union
from termcolor import colored, cprint
from enums import PostProcessingMode
from result_parser import Result
from utils import YaspinWrapper
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument(
"spec",
......@@ -21,6 +32,13 @@ def parse_args():
type=Path,
help="quic-interop-runner log dirs or result.json files",
)
parser.add_argument(
"--mode",
action="store",
default=PostProcessingMode.ALL,
type=PostProcessingMode.from_str,
help="The mode of post processing.",
)
parser.add_argument(
"--clean",
action="store_true",
......@@ -35,18 +53,60 @@ def parse_args():
return parser.parse_args()
class SecretsInjector:
def __init__(self, specs: list[Union[Path, Result]], clean=False, debug=False):
class PostProcessor:
"""The post processor."""
def __init__(
self,
specs: list[Union[Path, Result]],
clean=False,
debug=False,
mode=PostProcessingMode.ALL,
):
self.mode = mode
self.specs = specs
self.num_injected = 0
self.num_already_injected = 0
self.num_no_secret_found = 0
self.num_failed = 0
self.num_inject_failed = 0
self.num_qlog_moved = 0
self.num_qlog_already_moved = 0
self.num_no_qlog_found = 0
self._max_log_str_len = 0
self.debug = debug
self.clean = clean
self._spinner: Optional[YaspinWrapper] = None
def find_qlog_test_run_dir(self, test_run_path: Path, side: str) -> Optional[Path]:
"""Find a qlog file in ``test_run_path`` for ``side``."""
side_root = test_run_path / side
qlog_dirs = (
side_root,
side_root / "qlog",
)
qlog_files = list[Path]()
for qlog_dir in qlog_dirs:
if not qlog_dir.is_dir():
continue
for file in qlog_dir.iterdir():
if file.is_file() and file.suffix == ".qlog":
qlog_files.append(file)
if len(qlog_files) == 1:
return qlog_files.pop()
elif len(qlog_files) == 0:
return None
else:
self.log(
f"⚒ found more than one qlog file in {side_root}. Ignoring.",
color="yellow",
)
return None
def find_keylog_file(self, test_case_path: Path) -> Optional[Path]:
"""Find the keylog file."""
results = list[Path]()
......@@ -67,15 +127,64 @@ class SecretsInjector:
return results[0]
def inject(self, pcap_path: Path, pcap_ng_path: Path, keylog_file: Path):
if pcap_ng_path.is_file() and pcap_ng_path.stat().st_size > 0:
self.log(
f"⚒ {pcap_ng_path} already exists. Skipping.",
color="grey",
)
self.num_already_injected += 1
def rename_qlogs_in_test_repetition_run_dir(self, test_run_dir: Path):
"""Rename QLOG files in test repetition run dir."""
return
for side in ("server", "client"):
target = test_run_dir / f"{side}.qlog"
exists = False
if target.is_symlink():
if not self.clean:
exists = True
else:
if self.clean and target.is_file():
exists = True
try:
if exists and target.stat().st_size > 0:
self.log(
f"⚒ {target} already exists. Skipping.",
color="grey",
)
self.num_qlog_already_moved += 1
continue
except FileNotFoundError as err:
# broken symlink
self.log(
f"⚒ Error while checking qlog target: {err}",
color="yellow",
)
try:
target.unlink()
except FileNotFoundError:
pass
qlog_file = self.find_qlog_test_run_dir(test_run_dir, side)
if not qlog_file:
self.log(
f"⨯ no qlog file found in {test_run_dir} for {side}",
color="red",
)
self.num_no_qlog_found += 1
continue
if self.clean:
shutil.move(qlog_file, target)
else:
target.symlink_to(qlog_file.relative_to(target.parent))
self.num_qlog_moved += 1
self.log()
def inject(self, pcap_path: Path, pcap_ng_path: Path, keylog_file: Path):
"""Inject keylog file into pcap."""
if not pcap_path.is_file():
self.log(
......@@ -108,23 +217,15 @@ class SecretsInjector:
f"⨯ Failed to inject secrets {keylog_file} into {pcap_path}.",
color="red",
)
self.num_failed += 1
self.num_inject_failed += 1
if pcap_ng_path.is_file():
pcap_ng_path.unlink()
def inject_in_test_repetition_run_dir(self, test_run_dir: Path):
"""Inject secrets into a test repetition run log_dir."""
keylog_file = self.find_keylog_file(test_run_dir)
if not keylog_file:
self.log(
f"⨯ no keylog file found in {test_run_dir}",
color="red",
)
self.num_no_secret_found += 1
def inject_secrets_in_test_repetition_run_dir(self, test_run_dir: Path):
"""docstring for inject_secrets_in_test_repetition_run_dir"""
return
keylog_file = self.find_keylog_file(test_run_dir)
for pcap_name in ("left", "right"):
pcap_root = test_run_dir / "sim"
......@@ -132,6 +233,27 @@ class SecretsInjector:
pcap_path = pcap_root / f"{stem}.pcap"
pcap_ng_path = pcap_root / f"{stem}_with_secrets.pcapng"
if pcap_ng_path.is_file() and pcap_ng_path.stat().st_size > 0:
if self.clean and pcap_path.is_file():
pcap_path.unlink()
self.log(
f"⚒ {pcap_ng_path} already exists. Skipping.",
color="grey",
)
self.num_already_injected += 1
continue
if not keylog_file:
self.log(
f"⨯ no keylog file found in {test_run_dir}",
color="red",
)
self.num_no_secret_found += 1
continue
self.inject(
pcap_path=pcap_path,
pcap_ng_path=pcap_ng_path,
......@@ -140,24 +262,36 @@ class SecretsInjector:
self.log()
def inject_in_result(self, result: Result):
"""Inject secrets in result log dir."""
if self.clean and keylog_file and keylog_file.is_file():
keylog_file.unlink()
def post_process_test_repetition_run_dir(self, test_run_dir: Path):
"""Inject secrets into a test repetition run log_dir."""
if PostProcessingMode.INJECT_SECRETS in self.mode:
self.inject_secrets_in_test_repetition_run_dir(test_run_dir)
if PostProcessingMode.RENAME_QLOGS in self.mode:
self.rename_qlogs_in_test_repetition_run_dir(test_run_dir)
def post_process_result(self, result: Result):
"""Post process in result log dir."""
for test_result in result.all_test_results:
if test_result.result == "unsupported":
continue
self.inject_in_test_repetition_run_dir(test_result.log_dir_for_test)
self.post_process_test_repetition_run_dir(test_result.log_dir_for_test)
for meas_result in result.all_measurement_results:
if meas_result.result == "unsupported":
continue
for repetition_log_dir in meas_result.repetition_log_dirs:
self.inject_in_test_repetition_run_dir(repetition_log_dir)
self.post_process_test_repetition_run_dir(repetition_log_dir)
def inject_in_log_dir(self, log_dir: Path):
"""Inject secrets."""
def post_process_log_dir(self, log_dir: Path):
"""Post process inside a log dir."""
for combination in log_dir.iterdir():
if not combination.is_dir():
......@@ -171,7 +305,7 @@ class SecretsInjector:
if sim_path.is_dir():
# test case
self.inject_in_test_repetition_run_dir(test_case)
self.post_process_test_repetition_run_dir(test_case)
else:
# meas test case -> iterate over test repetitions
......@@ -182,9 +316,10 @@ class SecretsInjector:
):
continue
self.inject_in_test_repetition_run_dir(repetition_path)
self.post_process_test_repetition_run_dir(repetition_path)
def log(self, *args, **kwargs):
"""Log a message."""
msg: Optional[str] = None
if args or kwargs:
......@@ -192,14 +327,15 @@ class SecretsInjector:
log_str = ", ".join(
(
colored(f"injected: {self.num_injected}", color="green"),
colored(f"failed: {self.num_failed}", color="red"),
colored(
f"already injected: {self.num_already_injected}", color="yellow"
),
colored(
f"no keylog file found: {self.num_no_secret_found}", color="red"
),
colored("Secrets", color="white", attrs=["bold"]),
colored(f"inj.: {self.num_injected}", color="green"),
colored(f"fail: {self.num_inject_failed}", color="red"),
colored(f"already inj.: {self.num_already_injected}", color="yellow"),
colored(f"not found: {self.num_no_secret_found}", color="red"),
colored("QLOGS", color="white", attrs=["bold"]),
colored(f"renamed: {self.num_qlog_moved}", color="green"),
colored(f"already ren.: {self.num_qlog_already_moved}", color="yellow"),
colored(f"not found: {self.num_no_qlog_found}", color="red"),
)
)
self._max_log_str_len = max(self._max_log_str_len, len(log_str))
......@@ -214,6 +350,7 @@ class SecretsInjector:
cprint(f"⚒ {log_str}", attrs=["bold"], end="\r", flush=True)
def run(self):
"""Run the post processor."""
with YaspinWrapper(
debug=self.debug, text="Injecting", color="green"
) as spinner:
......@@ -221,19 +358,21 @@ class SecretsInjector:
for spec in self.specs:
if isinstance(spec, Result):
self.inject_in_result(spec)
self.post_process_result(spec)
else:
self.inject_in_log_dir(spec)
self.post_process_log_dir(spec)
spinner.ok("✔")
def main():
"""Run the post processor as cli."""
args = parse_args()
cli = SecretsInjector(
cli = PostProcessor(
specs=[Result(spec) if spec.is_file() else spec for spec in args.spec],
clean=args.clean,
debug=args.debug,
mode=args.mode,
)
cli.run()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment