Verified Commit 8e0fad5f authored by Sebastian Endres's avatar Sebastian Endres
Browse files

Organize in git repo again

parents
*.pickle
.ipynb_checkpoints
interop.seemann.io/**/index.html*
interop.seemann.io/logs/latest
interop.seemann.io/logs/logs.json
.ssh
logs/
[submodule "evaluation_tools"]
path = evaluation_tools
url = https://gitlab.cs.fau.de/sedrubal/masterarbeit/evaluation_tools.git
[mypy]
ignore_missing_imports = True
FROM alpine
RUN apk update \
&& apk upgrade \
&& apk add \
bash \
coreutils \
tmux \
git \
git-lfs \
openssh \
py3-beautifulsoup4 \
py3-matplotlib \
py3-requests \
py3-simplejson \
py3-termcolor \
py3-pip \
&& pip3 install --upgrade yaspin \
&& git config --global user.email "dev@sedrubal.de" \
&& git config --global user.name "QUIC Interop Runner Results Bot"
ADD .ssh/ /root/.ssh/
ENV GIT_SSH_COMMAND="ssh -i /root/.ssh/id_ed25519"
WORKDIR /srv/
CMD ["sleep", "inf"]
#!/usr/bin/bash
cd "$(dirname "$0")"
exec docker build --tag=quic-interop-runner-results.frickeln.sedrubal.de .
Subproject commit 9935c5da6702003a09af424c3d3d46d0a8e1e082
#!/usr/bin/env python3
"""
Download result from interop.seeman.io
More selective than `wget --mirror --no-parent https://interop.seemann.io/logs/`
"""
import curses
import fnmatch
import json
# import multiprocessing
import os
import random
import re
import sys
import threading
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from queue import Queue
# from multiprocessing import Queue
from typing import Callable, Optional, Union
import requests
import simplejson
import termcolor
from bs4 import BeautifulSoup
INTEROP_SITE = "https://interop.seemann.io"
LOGS_JSON_URL = f"{INTEROP_SITE}/logs/logs.json"
LOGS_TREE_URL = f"{INTEROP_SITE}/logs/"
RESULT_URL = "{INTEROP_SITE}/logs/{time}/result.json"
LOGS_PATH = Path("./logs")
DOWNLOAD_DETAILED_TESTS = ["longrtt"]
CHUNK_SIZE = 8192
FILES_FILTER = (
"*.qlog",
"*.pcap",
"*.pcapng",
"*.keys",
"keys*",
)
FILES_FILTER_PATTERN = re.compile(
"|".join((fnmatch.translate(pattern) for pattern in FILES_FILTER)),
re.IGNORECASE,
)
class Colors(Enum):
"""Curses colors."""
DANGER = 1
WARNING = 2
SUCCESS = 3
INFO = 4
MUTED = 5
@dataclass
class Progress:
index: int
num_tests: int
num_tests_complete: int = 0
num_files: int = 0
num_files_complete: int = 0
class Screen:
"""Dummy screen with similar API as ``curses.Screen``."""
fallback_width = 120
fallback_height = 100
color_lookup = {
Colors.DANGER: "red",
Colors.WARNING: "yellow",
Colors.SUCCESS: "green",
Colors.INFO: "cyan",
Colors.MUTED: "blue",
}
def __init__(self):
"""Initialize curses screen with fallback."""
self.error_log = list[str]()
if os.environ.get("TERM"):
self._screen = curses.initscr()
curses.curs_set(0)
curses.start_color()
curses.init_pair(Colors.DANGER.value, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(
Colors.WARNING.value, curses.COLOR_YELLOW, curses.COLOR_BLACK
)
curses.init_pair(
Colors.SUCCESS.value, curses.COLOR_GREEN, curses.COLOR_BLACK
)
curses.init_pair(Colors.INFO.value, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(Colors.MUTED.value, curses.COLOR_WHITE, curses.COLOR_BLACK)
else:
self._screen = None
@property
def mode(self) -> str:
return "fallback" if not self._screen else "curses"
def log_error(self, err: Union[str, Exception]):
if self._screen:
self.error_log.append(str(err))
else:
self.display(0, 0, f"[!] {err}", color=Colors.DANGER)
def deinit(self):
"""Deinitialize curses screen with fallback to DummyScreen."""
if self._screen:
self._screen.clear()
curses.curs_set(1)
curses.endwin()
def getmaxyx(self) -> tuple[int, int]:
if self._screen:
return self._screen.getmaxyx()
else:
return self.fallback_height, self.fallback_width
def display(
self,
y: int,
x: int,
msg: str,
color: Optional[Colors] = None,
underline=False,
center=False,
):
if self._screen:
attrs = curses.color_pair(color.value) if color else 0
attrs = attrs | curses.A_UNDERLINE if underline else attrs
_height, width = self._screen.getmaxyx()
if center:
msg = msg.strip()[:width].center(width).ljust(width)
else:
msg = msg.strip()[:width].ljust(width)
self._screen.addstr(y, x, msg, attrs)
else:
if not msg.strip():
return
if center:
msg = msg.center(self.fallback_width)
color_str = self.color_lookup[color] if color else None
termcolor.cprint(
msg, color=color_str, attrs=["underline"] if underline else []
)
def refresh(self):
if self._screen:
self._screen.refresh()
def clear(self):
if self._screen:
self._screen.clear()
def format_timedelta(delta: timedelta) -> str:
"""Format a timedelta."""
return str(timedelta(seconds=int(delta.total_seconds())))
class GatherResultsCli:
def __init__(self):
self.session = requests.Session()
self._num_workers = 10
self._progress = dict[str, Progress]()
# self.manager = multiprocessing.Manager()
self._progress_lock = threading.Lock()
# self._progress_lock = self.manager.Lock()
self._screen = None
self._screen_lock = threading.Lock()
# self._screen_lock = self.manager.Lock()
self._queue = Queue()
# self._queue = self.manager.Queue()
self._workers = [
threading.Thread(target=self._run_worker) for _ in range(self._num_workers)
]
# self._workers = [
# multiprocessing.Process(target=self._run_worker) for _ in range(self._num_workers)
# ]
self._completed_jobs = 0
self._downloaded_files = 0
self._already_downloaded_files = 0
self._filtered_files = 0
self._started: Optional[datetime] = None
self._num_get_requests = 0
def http_get(self, url: str, task_name: Optional[str] = None):
if task_name:
self.log(task_name, "info", f"GET {url}")
else:
print(" [i] GET", url)
resp = self.session.get(url)
self._num_get_requests += 1
resp.raise_for_status()
return resp.json()
def get_dirlisting(self, path: str) -> list[str]:
"""
Download and parse directory listing.
Sub directories have a trailing slash.
"""
resp = self.session.get(path)
self._num_get_requests += 1
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
files = list[str]()
for html_file_entry in soup.select("tr.file"):
file_name = html_file_entry.select_one(".name").text
if (
html_file_entry.select_one("svg use").attrs.get("xlink:href")
== "#folder"
):
# seems to be a directory
file_name = f"{file_name}/"
files.append(file_name)
return files
def get_recent_logs_fallback(self) -> list[str]:
"""Download and parse directory index."""
files = self.get_dirlisting(LOGS_TREE_URL)
recent_logs = list[str]()
for file_name in files:
try:
result_date_str = file_name.rstrip("/")
_result_date = datetime.fromisoformat(result_date_str)
print(f" Found {result_date_str}")
recent_logs.append(file_name)
except ValueError:
print(
f" {file_name} does not parse as date. Ignoring.",
file=sys.stderr,
)
continue
return recent_logs
def get_recent_logs(self) -> list[str]:
"""Download and parse logs.json."""
print("[i] Downloading recent logs...")
try:
return self.http_get(LOGS_JSON_URL)
except simplejson.errors.JSONDecodeError:
pass
print(
" [!] No valid json found. Falling back to parsing HTML directory index.",
file=sys.stderr,
)
try:
return self.get_recent_logs_fallback()
except requests.exceptions.HTTPError as err:
print(f" [!] {err}", file=sys.stderr)
return []
def collect_download_tasks(self, test_run: str, remote_url: str, local_path: Path):
try:
files = self.get_dirlisting(remote_url)
except requests.exceptions.HTTPError as err:
self.log(test_run, "error", err)
return
with self._progress_lock:
self._progress[test_run].num_files += len(files)
for file_name in files:
local_sub_path = local_path / file_name
if file_name.endswith("/"):
local_sub_path.mkdir(exist_ok=True, parents=True)
self._queue.put(
(
self.collect_download_tasks,
(
test_run,
remote_url + file_name,
local_sub_path,
),
)
)
continue
elif local_sub_path.is_file() and local_sub_path.stat().st_size > 0:
self.log(
test_run,
"info",
f"{local_sub_path} already exists. Skipping.",
)
self._already_downloaded_files += 1
continue
elif not FILES_FILTER_PATTERN.match(str(local_sub_path)):
self.log(
test_run,
"info",
f"{local_sub_path} does not match patterns. Skipping.",
)
self._filtered_files += 1
continue
else:
self._queue.put(
(
self.process_download_task,
(
test_run,
remote_url + file_name,
local_sub_path,
),
)
)
with self._progress_lock:
self._progress[test_run].num_files_complete += 1
def process_download_task(self, test_run: str, url: str, local_path: Path):
with self.session.get(url, stream=True) as req:
self._num_get_requests += 1
try:
req.raise_for_status()
except requests.exceptions.HTTPError as err:
self.log(test_run, "error", err)
return
self.log(test_run, "info", f"Downloading {local_path}...")
with local_path.open("wb") as local_file:
for chunk in req.iter_content(chunk_size=CHUNK_SIZE):
local_file.write(chunk)
with self._progress_lock:
self._downloaded_files += 1
self._progress[test_run].num_files_complete += 1
def download_log_tree_parallel(
self, test_run: str, remote_url: str, local_path: Path
):
"""Download a file tree from the server by parsing dir listing."""
self.log(
test_run,
"info",
"Downloading detailed result files",
)
with self._progress_lock:
self._progress[test_run].num_files += 1
self._queue.put(
(
self.collect_download_tasks,
(
test_run,
remote_url,
local_path,
),
)
)
def download_detailed_test_result(
self, result: dict, time_str: str, log_path: Path
):
"""Download the detailed test result for the tests given in `DOWNLOAD_DETAILED_TESTS`."""
url = f"{LOGS_TREE_URL}{time_str}/"
server_client_combinations = []
for server in result["servers"]:
for client in result["clients"]:
combination_path = f"{server}_{client}"
server_client_combinations.append(combination_path)
random.shuffle(server_client_combinations)
num_tests = len(server_client_combinations) * len(DOWNLOAD_DETAILED_TESTS)
with self._progress_lock:
self._progress[time_str].num_tests = num_tests
for combination_path in server_client_combinations:
for test in DOWNLOAD_DETAILED_TESTS:
self._queue.put(
(
self.download_log_tree_parallel,
(
time_str,
f"{url}{combination_path}/{test}/",
log_path / combination_path / test,
),
)
)
def process_download_logs_task(self, log_dir: str):
time_str = log_dir.rstrip("/")
local_result_path = LOGS_PATH / time_str
local_result_path.mkdir(parents=True, exist_ok=True)
local_file_path = local_result_path / "result.json"
if not local_file_path.is_file() or local_file_path.stat().st_size == 0:
self.log(
test_run_name=time_str,
level="info",
msg="downloading...",
)
try:
result = self.http_get(
RESULT_URL.format(INTEROP_SITE=INTEROP_SITE, time=time_str),
time_str,
)
except Exception as exc:
self.log(
test_run_name=time_str,
level="error",
msg=exc,
)
return
with local_file_path.open("w") as file:
json.dump(result, fp=file)
else:
self.log(
test_run_name=time_str,
level="info",
msg=f"{local_file_path} already exists.",
)
with local_file_path.open("r") as result_file:
result = json.load(result_file)
# self._queue.put(
# (
# self.download_detailed_test_result,
# (
# result,
# time_str,
# local_result_path,
# ),
# )
# )
def log(self, test_run_name: Optional[str], level: str, msg: Union[str, Exception]):
with self._screen_lock:
if not self._screen:
self._screen = Screen()
progress: Optional[Progress] = None
with self._progress_lock:
completed_jobs = self._completed_jobs
downloaded_files = self._downloaded_files
already_downloaded_files = self._already_downloaded_files
filtered_files = self._filtered_files
num_cur_downloads = len(self._progress)
if test_run_name:
progress = self._progress.get(test_run_name, None)
if progress is None:
progress = Progress(
index=num_cur_downloads,
num_tests=0, # TODO
)
self._progress[test_run_name] = progress
num_cur_downloads += 1
if isinstance(msg, Exception) or not test_run_name:
self._screen.log_error(msg)
msg = str(msg).splitlines()[-1]
color = Colors.WARNING
if level == "verbose" and self._screen.mode == "fallback":
return
if progress:
if level in ("info", "fallback"):
msg = f"[i] {test_run_name} {progress.num_files_complete}/{progress.num_files} {msg}"
color = Colors.INFO
else:
msg = f"[!] {test_run_name} {progress.num_files_complete}/{progress.num_files} {msg}"
color = Colors.DANGER
if progress:
self._screen.display(progress.index, 0, msg, color)
self._screen.display(num_cur_downloads, 0, "")
assert self._started
duration = datetime.now() - self._started
self._screen.display(
num_cur_downloads + 1,
0,
(
f"[i] currently pending jobs: {self._queue.unfinished_tasks}. "
f"Completed jobs: {completed_jobs}. "
f"Downloaded files: {downloaded_files}. "
f"Skipped files: {already_downloaded_files}. "
f"Filtered files: {filtered_files}. "
f"GET requests: {self._num_get_requests}. "
f"Elapsed time: {format_timedelta(duration)}"
),
Colors.MUTED,
)
if self._screen.error_log:
screen_height, _screen_width = self._screen.getmaxyx()
error_log_height = screen_height - 3 - num_cur_downloads - 1
self._screen.error_log = self._screen.error_log[-error_log_height:]
self._screen.display(
num_cur_downloads + 2,
0,
"Error Log:",
Colors.DANGER,
underline=True,
center=True,
)
for i, log in enumerate(self._screen.error_log):
try:
self._screen.display(
num_cur_downloads + 3 + i,
0,
log,
Colors.DANGER,
)
except:
pass
self._screen.refresh()
def _run_worker(self):
"""Initial run method for threads."""
while True:
task = self._queue.get()
if task is None:
# self.log(None, "error", f"{threading.current_thread().name}: Stop worker")
return
func, args = task
func(*args)
with self._progress_lock: