Verified Commit 7cb036cd authored by Sebastian Endres's avatar Sebastian Endres
Browse files

Extend inject_secrets to accept result.json files

parent 07579c58
......@@ -5,26 +5,28 @@ import os
import subprocess
import sys
from pathlib import Path
from typing import Optional
from typing import Optional, Union
from termcolor import cprint
from result_parser import Result
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"log_dirs",
"spec",
nargs="+",
type=Path,
help="quic-interop-runner log dirs",
help="quic-interop-runner log dirs or result.json files",
)
return parser.parse_args()
class Cli:
def __init__(self, log_dirs: list[Path]):
self.log_dirs = log_dirs
class SecretsInjector:
def __init__(self, specs: list[Union[Path, Result]]):
self.specs = specs
self.num_injected = 0
self.num_already_injected = 0
self.num_no_secret_found = 0
......@@ -91,43 +93,69 @@ class Cli:
if pcap_ng_path.is_file():
pcap_ng_path.unlink()
def inject_in_test_repetition_run_dir(self, test_run_dir: Path):
"""Inject secrets into a test repetition run log_dir."""
keylog_file = self.find_keylog_file(test_run_dir)
if not keylog_file:
self.log(
f"[!] no keylog file found in {test_run_dir}",
color="red",
)
self.num_no_secret_found += 1
return
for pcap_name in ("left", "right"):
pcap_root = test_run_dir / "sim"
stem = f"trace_node_{pcap_name}"
pcap_path = pcap_root / f"{stem}.pcap"
pcap_ng_path = pcap_root / f"{stem}_with_secrets.pcapng"
self.inject(
pcap_path=pcap_path,
pcap_ng_path=pcap_ng_path,
keylog_file=keylog_file,
)
self.log()
def inject_in_result(self, result: Result):
"""Inject secrets in result log dir."""
for test_result in result.all_test_results:
self.inject_in_test_repetition_run_dir(test_result.log_dir_for_test)
for meas_result in result.all_measurement_results:
for repetition_log_dir in meas_result.repetition_log_dirs:
self.inject_in_test_repetition_run_dir(repetition_log_dir)
def inject_in_log_dir(self, log_dir: Path):
"""Inject Secrets"""
for test_run in log_dir.iterdir():
if not test_run.is_dir():
for combination in log_dir.iterdir():
if not combination.is_dir():
continue
for combination in test_run.iterdir():
if not combination.is_dir():
for test_case in combination.iterdir():
if not test_case.is_dir():
continue
for test_case in combination.iterdir():
if not test_case.is_dir():
continue
keylog_file = self.find_keylog_file(test_case)
sim_path = test_case / "sim"
if not keylog_file:
self.log(
f"[!] no keylog file found for {test_case}", color="red"
)
self.num_no_secret_found += 1
if sim_path.is_dir():
# test case
self.inject_in_test_repetition_run_dir(test_case)
else:
# meas test case -> iterate over test repetitions
continue
for repetition_path in test_case.iterdir():
if (
not repetition_path.is_dir()
or not repetition_path.name.isnumeric()
):
continue
for pcap_name in ("left", "right"):
pcap_root = test_case / "sim"
stem = f"trace_node_{pcap_name}"
pcap_path = pcap_root / f"{stem}.pcap"
pcap_ng_path = pcap_root / f"{stem}_with_secrets.pcapng"
self.inject(
pcap_path=pcap_path,
pcap_ng_path=pcap_ng_path,
keylog_file=keylog_file,
)
self.log()
self.inject_in_test_repetition_run_dir(repetition_path)
def log(self, *args, **kwargs):
if args or kwargs:
......@@ -144,15 +172,21 @@ class Cli:
cprint(log_str, attrs=["bold"], end="\r", flush=True)
def run(self):
for log_dir in self.log_dirs:
self.inject_in_log_dir(log_dir)
for spec in self.specs:
if isinstance(spec, Result):
...
self.inject_in_result(spec)
else:
self.inject_in_log_dir(spec)
self.log("[i] Done", color="green", attrs=["bold"])
def main():
args = parse_args()
cli = Cli(log_dirs=args.log_dirs)
cli = SecretsInjector(
specs=[Result(spec) if spec.is_file() else spec for spec in args.spec],
)
cli.run()
......
......@@ -116,30 +116,6 @@ class _ExtendedTestResultMixin(ABC):
return self._base_log_dir / self.combination / self.test.name
@cached_property
def iteration_log_dirs(self) -> list[Path]:
"""Return a list of log dirs for each test iteration."""
iterations = sorted(
iterdir
for iterdir in self.log_dir_for_test.iterdir()
if iterdir.is_dir() and iterdir.name.isnumeric()
)
iteration_nums = [int(iterdir.name) for iterdir in iterations]
for index, cur_num in enumerate(iteration_nums):
if index + 1 != cur_num:
raise AssertionError(
f"Expected the {index}th iteration directory "
f"to be named {index} instead of {cur_num}."
)
return iterations
@property
def num_iterations(self) -> int:
"""Return the number of test iterations found for this test case."""
return len(self.iteration_log_dirs)
@abstractmethod
def to_raw(self):
"""Convert to a raw result."""
......@@ -165,6 +141,31 @@ class ExtendedMeasurementResult(_ExtendedTestResultMixin):
details: str
@cached_property
def repetition_log_dirs(self) -> list[Path]:
"""Return a list of log dirs for each test repetition."""
repetitions = sorted(
iterdir
for iterdir in self.log_dir_for_test.iterdir()
if iterdir.is_dir() and iterdir.name.isnumeric()
)
repetition_nums = [int(iterdir.name) for iterdir in repetitions]
for index, cur_num in enumerate(repetition_nums):
if index + 1 != cur_num:
raise AssertionError(
f"Expected the {index}th repetition directory "
f"to be named {index} instead of {cur_num}."
)
return repetitions
@property
def num_repetitions(self) -> int:
"""Return the number of test repetitions found for this test case."""
return len(self.repetition_log_dirs)
@property
def _details_match(self) -> re.Match:
assert self.succeeded, "Can't parse details, because test did not succeed."
......@@ -396,6 +397,17 @@ class Result:
return self._test_results
@property
def all_test_results(self) -> list[ExtendedTestResult]:
"""Return all test results."""
return [
test_result
for results_by_server in self.test_results.values()
for results_by_client in results_by_server.values()
for test_result in results_by_client.values()
]
def get_test_results_for_combination(
self, server: Union[str, Implementation], client: Union[str, Implementation]
) -> dict[str, ExtendedTestResult]:
......@@ -453,6 +465,17 @@ class Result:
return self._measurement_results
@property
def all_measurement_results(self) -> list[ExtendedMeasurementResult]:
"""Return all measurement results."""
return [
measurement_result
for results_by_server in self.measurement_results.values()
for results_by_client in results_by_server.values()
for measurement_result in results_by_client.values()
]
def get_measurements_for_combination(
self, server: Union[str, Implementation], client: Union[str, Implementation]
) -> dict[str, ExtendedMeasurementResult]:
......
......@@ -203,7 +203,7 @@ class YaspinWrapper:
def create_relpath(path1: Path, path2: Optional[Path] = None) -> Path:
"""Create a relative path for path1 relative to path2."""
"""Create a relative path for path1 relative to path2. TODO this is broken."""
if not path2:
path2 = Path(".")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment