Verified Commit 41659b44 authored by Sebastian Endres's avatar Sebastian Endres
Browse files

Fix many things, add data rate plot, add RTT experiment, ...)

parent 63580e28
......@@ -11,7 +11,9 @@ class PlotMode(Enum):
PACKET_NUMBER = "packet-number"
FILE_SIZE = "file-size"
PACKET_SIZE = "packet-size"
DATA_RATE = "data-rate"
# SIZE_HIST = "size-hist"
# RTT = "rtt"
class CacheMode(Enum):
......
......@@ -4,9 +4,10 @@
import argparse
import sys
import typing
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Union
from typing import TYPE_CHECKING, Optional, Union
import numpy as np
import prettytable
......@@ -15,7 +16,7 @@ from matplotlib import colors
from matplotlib import pyplot as plt
from termcolor import colored, cprint
from enums import CacheMode, PlotMode, Side
from enums import CacheMode, Direction, PlotMode, Side
from tango_colors import Tango
from tracer import ParsingError, Trace
from utils import (
......@@ -26,9 +27,10 @@ from utils import (
create_relpath,
map2d,
map3d,
natural_data_rate,
)
if typing.TYPE_CHECKING:
if TYPE_CHECKING:
from collections.abc import Iterable
from pyshark.packet.packet import Packet
......@@ -39,7 +41,9 @@ DEFAULT_TITLES = {
PlotMode.PACKET_NUMBER: "Time vs. Packet-Number",
PlotMode.FILE_SIZE: "Time vs. Transmitted File Size",
PlotMode.PACKET_SIZE: "Time vs. Packet Size",
PlotMode.DATA_RATE: "Time vs. Data Rate",
# PlotMode.SIZE_HIST: "Size Histogram",
# PlotMode.RTT: "Time vs. RTT",
}
......@@ -336,6 +340,8 @@ class PlotCli:
lambda val, _pos: naturalsize(val, binary=True)
)
assert self.traces[0].pair_trace
with YaspinWrapper(
debug=self.debug, text="processing...", color="cyan"
) as spinner:
......@@ -345,6 +351,7 @@ class PlotCli:
request_timestamps = list[list[float]]()
response_first_timestamps = list[list[float]]()
response_retrans_timestamps = list[list[float]]()
max_offsets = list[int]()
for trace in self.traces:
......@@ -354,27 +361,42 @@ class PlotCli:
request_timestamps.append(list[float]())
response_first_timestamps.append(list[float]())
response_retrans_timestamps.append(list[float]())
max_offsets.append(0)
for layer in trace.request_stream_packets:
offset = trace.get_stream_offset(layer)
if offset is not None:
request_offsets[-1].append(offset)
if offset is None:
continue
request_offsets[-1].append(offset)
request_timestamps[-1].append(layer.norm_time)
if offset > max_offsets[-1]:
# assert that we transmit without overlapping
max_offsets[-1] = offset + trace.get_stream_length(layer)
for layer in trace.response_stream_packets_first_tx:
offset = trace.get_stream_offset(layer)
if offset is not None:
response_first_offsets[-1].append(offset)
if offset is None:
continue
response_first_offsets[-1].append(offset)
response_first_timestamps[-1].append(layer.norm_time)
if offset > max_offsets[-1]:
# assert that we transmit without overlapping
max_offsets[-1] = offset + trace.get_stream_length(layer)
for layer in trace.response_stream_packets_retrans:
offset = trace.get_stream_offset(layer)
if offset is not None:
response_retrans_offsets[-1].append(offset)
if offset is None:
continue
response_retrans_offsets[-1].append(offset)
response_retrans_timestamps[-1].append(layer.norm_time)
# Do not calculate max_offsets in assertion that retransmitted packets are
# not larger than previously transmitted packets
all_offsets = (
*request_offsets,
......@@ -382,7 +404,7 @@ class PlotCli:
*(lst for lst in response_retrans_offsets if lst),
)
min_offset: int = map2d(min, all_offsets)
max_offset: int = map2d(max, all_offsets)
max_offset: int = max(max_offsets)
all_timestamps = (
*request_timestamps,
*response_first_timestamps,
......@@ -395,6 +417,7 @@ class PlotCli:
ax.set_xlim(left=min(0, min_timestamp), right=max_timestamp)
ax.set_ylim(bottom=min(0, min_offset), top=max_offset)
ax.set_yticks(np.arange(0, max_offset * 1.1, 1024 * 1024))
with YaspinWrapper(
debug=self.debug, text="plotting...", color="cyan"
......@@ -463,6 +486,148 @@ class PlotCli:
self._annotate_time_plot(ax, height=max_offset, spinner=spinner)
self._save(fig, output_file, spinner)
def plot_data_rate(self, output_file: Optional[Path]):
"""Plot the data rate plot."""
with Subplot(nrows=1, ncols=1) as (fig, ax):
ax.grid(True)
ax.set_xlabel("Time (s)")
ax.set_ylabel("Data Rate")
assert self.title
ax.set_title(self.title)
ax.yaxis.set_major_formatter(lambda val, _pos: natural_data_rate(val))
DATA_RATE_WINDOW = 1 # 1s
with YaspinWrapper(
debug=self.debug, text="processing...", color="cyan"
) as spinner:
timestamps = []
goodput_data_rates = list[list[float]]()
tx_data_rates = list[list[float]]()
min_timestamp: float = 0
max_timestamp: float = 0
@dataclass
class DataRateBufEntry:
timestamp: float
raw_data: int
successful_stream_data: int
for trace in self.traces:
goodput_data_rates.append(list[float]())
tx_data_rates.append(list[float]())
trace_max_timestamp = trace.extended_facts["plt"]
trace_timestamps = np.arange(
min_timestamp, trace_max_timestamp, 0.1
)
timestamps.append(trace_timestamps)
max_timestamp = max(max_timestamp, trace_max_timestamp)
data_rate_buf = deque[DataRateBufEntry]()
assert trace.pair_trace
with spinner.hidden():
trace.pair_trace.parse()
for packet in trace.server_client_packets:
raw_data_len = len(packet.udp.payload.binary_value)
# goodput
right_packet = trace.get_pair_packet(packet)
if not right_packet:
stream_data_len = 0
else:
stream_data_len = trace.pair_trace.get_quic_payload_size(
right_packet
)
# *8: convert from byte to bit
data_rate_buf.append(
DataRateBufEntry(
timestamp=packet.norm_time,
raw_data=raw_data_len * 8,
successful_stream_data=stream_data_len * 8,
)
)
# calculate data rates
# marker_start is inclusive, marker_end is exclusive
marker_start = marker_end = 0
for timestamp in trace_timestamps:
while data_rate_buf[marker_end].timestamp < timestamp:
if marker_end == len(data_rate_buf) - 1:
break
marker_end += 1
while (
data_rate_buf[marker_start].timestamp
< timestamp - DATA_RATE_WINDOW
):
if marker_start == len(data_rate_buf) - 1:
break
marker_start += 1
buf_slice = list(data_rate_buf)[marker_start:marker_end]
tx_data_rates[-1].append(
sum(entry.raw_data for entry in buf_slice)
/ DATA_RATE_WINDOW
)
goodput_data_rates[-1].append(
sum(entry.successful_stream_data for entry in buf_slice)
/ DATA_RATE_WINDOW
)
max_data_rate: float = map3d(max, (tx_data_rates, goodput_data_rates))
ax.set_xlim(left=min(0, min_timestamp), right=max_timestamp)
ax.set_ylim(bottom=0, top=max_data_rate)
# ax.set_yticks(np.arange(0, max_offset * 1.1, 1024 * 1024))
with YaspinWrapper(
debug=self.debug, text="plotting...", color="cyan"
) as spinner:
# plot shadow traces (request and response separated)
for trace_timestamps, trace_goodput in zip(
timestamps[1:], goodput_data_rates[1:]
):
ax.plot(
trace_timestamps,
trace_goodput,
# marker="o",
linestyle="--",
color=self._colors.aluminium4,
markersize=self._markersize,
)
# plot main trace
ax.plot(
timestamps[0],
goodput_data_rates[0],
label=r"Goodput (recv'd payload rate delayed by $\frac{-RTT}{2}$)",
# marker="o",
linestyle="--",
color=self._colors.orange1,
markersize=self._markersize,
)
ax.plot(
timestamps[0],
tx_data_rates[0],
label="Data Rate of Transmitted Packets",
# marker="o",
linestyle="--",
color=self._colors.orange3,
markersize=self._markersize,
)
ax.legend(loc="upper left", fontsize=8)
self._annotate_time_plot(ax, height=max_data_rate, spinner=spinner)
self._save(fig, output_file, spinner)
def plot_packet_number(self, output_file: Optional[Path]):
"""Plot the packet number diagram."""
with Subplot(nrows=1, ncols=1) as (fig, ax):
......@@ -860,6 +1025,135 @@ class PlotCli:
# )
#
# self._save(fig, output_file, spinner)
#
# def plot_rtt(self, output_file: Optional[Path]):
# """Plot the rtt diagram."""
# with Subplot() as (fig, ax):
# ax.grid(True)
# ax.set_xlabel("Time (s)")
# ax.set_ylabel("estimated RTT")
# assert self.title
# ax.set_title(self.title)
# ax.yaxis.set_major_formatter(lambda val, _pos: f"{val:.1f} ms")
#
# with YaspinWrapper(
# debug=self.debug, text="processing...", color="cyan"
# ) as spinner:
#
# for trace in self.traces:
# trace.parse()
#
# request_timestamps = [
# [packet.norm_time for packet in trace.request_stream_packets]
# for trace in self.traces
# ]
# response_timestamps = [
# [packet.norm_time for packet in trace.response_stream_packets]
# for trace in self.traces
# ]
# request_spin_bits = [
# [
# packet.quic.spin_bit.int_value
# if "spin_bit" in packet.quic.field_names
# else None
# for packet in trace.packets
# if getattr(packet, "direction", None) == Direction.TO_SERVER
# ]
# for trace in self.traces
# ]
# response_spin_bits = [
# [
# packet.quic.spin_bit.int_value
# if "spin_bit" in packet.quic.field_names
# else None
# for packet in trace.packets
# if getattr(packet, "direction", None) == Direction.TO_CLIENT
# ]
# for trace in self.traces
# ]
# min_timestamp: float = map3d(
# min, [request_timestamps, response_timestamps]
# )
# max_timestamp: float = max(
# trace.extended_facts["plt"] for trace in self.traces
# )
#
# request_timestamps = list[list[float]]()
# response_timestamps = list[list[float]]()
# request_spin_bits = list[list[int]]()
# response_spin_bits = list[list[int]]()
# min_timestamp = float("inf")
# max_timestamp = -float("inf")
#
# for trace in self.traces:
# request_timestamps.append(list[float]())
# response_timestamps.append(list[float]())
# request_spin_bits.append(list[int]())
# response_spin_bits.append(list[int]())
#
# for packet in trace.packets:
# packet_direction = getattr(packet, "direction", None)
#
# if "spin_bit" not in packet.quic.field_names:
# continue
# spin_bit = packet.quic.spin_bit.int_value
# timestamp = packet.norm_time
# min_timestamp = min(min_timestamp, timestamp)
# max_timestamp = max(max_timestamp, timestamp)
#
# if packet_direction == Direction.TO_SERVER:
# request_spin_bits[-1].append(spin_bit)
# request_timestamps[-1].append(timestamp)
# else:
# response_spin_bits[-1].append(spin_bit)
# response_timestamps[-1].append(timestamp)
#
# ax.set_xlim(left=min(0, min_timestamp), right=max_timestamp)
#
# with YaspinWrapper(
# debug=self.debug, text="plotting...", color="cyan"
# ) as spinner:
# for (
# trace_request_timestamps,
# trace_response_timestamps,
# trace_request_spin_bits,
# trace_response_spin_bits,
# ) in zip(
# request_timestamps[1:],
# response_timestamps[1:],
# request_spin_bits[1:],
# response_spin_bits[1:],
# ):
# ax.plot(
# (*trace_request_timestamps, *trace_response_timestamps),
# (*trace_request_spin_bits, *trace_response_spin_bits),
# marker="o",
# linestyle="",
# color=self._colors.aluminium4,
# markersize=self._markersize,
# )
#
# # plot main trace (request and response separated)
# ax.plot(
# request_timestamps[0],
# request_spin_bits[0],
# marker="o",
# linestyle="",
# color=self._colors.Chameleon,
# markersize=self._markersize,
# )
# ax.plot(
# response_timestamps[0],
# response_spin_bits[0],
# marker="o",
# linestyle="",
# color=self._colors.SkyBlue,
# markersize=self._markersize,
# )
#
# self._annotate_time_plot(ax, height=1, spinner=spinner)
#
# self._save(fig, output_file, spinner)
def _save(
self, figure: plt.Figure, output_file: Optional[Path], spinner: YaspinWrapper
......@@ -929,10 +1223,16 @@ class PlotCli:
"callback": self.plot_packet_size,
"single": True,
},
PlotMode.DATA_RATE: {
"callback": self.plot_data_rate,
},
# PlotMode.SIZE_HIST: {
# "callback": self.plot_packet_hist,
# "single": True,
# },
# PlotMode.RTT: {
# "callback": self.plot_rtt,
# },
}
cfg = mapping[self.mode]
......
......@@ -19,7 +19,7 @@ from prompt_toolkit.shortcuts import ProgressBar
from termcolor import colored, cprint
from enums import CacheMode, Direction, Side
from utils import YaspinWrapper, create_relpath
from utils import YaspinWrapper, clear_line, create_relpath
if typing.TYPE_CHECKING:
from pyshark.packet.packet import Packet
......@@ -39,6 +39,9 @@ class ParsingError(Exception):
self.msg = msg
self.trace = trace
def __str__(self):
return f"{self.trace.input_file}: {self.msg}"
class FinError(ParsingError):
"""Error with closing streams detected."""
......@@ -178,18 +181,28 @@ class Trace:
return int(output.split()[1])
def get_packet_by_fpr(self, fpr: int) -> Optional["Packet"]:
def get_packet_by_fpr(self, fpr: int, partial=True) -> Optional["Packet"]:
"""
Get a packet by it's packet fingerprint. Assuming that we need one of the first packets.
"""
packets_by_fpr = dict[int, "Packet"]()
if not partial and not self._packets_by_fpr:
# 1. if not partial and not yet sorted: parse all packets
for packet in self.packet_iter():
packet_fpr = self._packet_fingerprint(packet)
packets_by_fpr[packet_fpr] = packet
self._packets_by_fpr = packets_by_fpr
if self._packets_by_fpr:
# 1. if already fingerprinted -> use dict lookup
# 2. if already fingerprinted -> use dict lookup
return self._packets_by_fpr.get(fpr, None)
# 2. load and iterate over packets and check fingerprint
packets_by_fpr = dict[int, "Packet"]()
# 3. load and iterate over packets and check fingerprint
for packet in self.packet_iter():
packet_fpr = self._packet_fingerprint(packet)
......@@ -203,6 +216,37 @@ class Trace:
return None
def get_pair_packet(self, packet: "Packet", partial=False) -> Optional["Packet"]:
"""Get the packet of the pair trace that belong to this packet."""
assert self.pair_trace
return self.pair_trace.get_packet_by_fpr(
self._packet_fingerprint(packet),
partial=partial,
)
def get_pair_stream_packet(
self,
stream_packet: QuicStreamPacket,
partial=False,
) -> Optional[QuicStreamPacket]:
"""Get the quic stream packet of the pair trace that belongs to this packet."""
assert self.pair_trace
pair_packet = self.get_pair_packet(
stream_packet.packet,
partial=partial,
)
if not pair_packet:
return None
for pair_stream_packet in self.pair_trace.iter_stream_packets(pair_packet):
if pair_stream_packet.packet_number == stream_packet.packet_number:
return pair_stream_packet
return None
@property
def packets(self) -> list["Packet"]:
"""Parse packets of this trace."""
......@@ -286,6 +330,23 @@ class Trace:
first_packet_sniff_timestamp = float("-inf")
for packet in self._cap:
if (
not hasattr(packet, "udp")
or not hasattr(packet, "ip")
or not hasattr(packet, "quic")
):
clear_line(file=sys.stderr)
cprint(
f"⨯ Skipping a packet without UDP or IP?!? ({packet.frame_info.protocols})",
color="red",
file=sys.stderr,
)
continue
# calculate norm_time
# norm_time should be the same as udp.time_relative
if first_packet_sniff_timestamp < 0:
first_packet_sniff_timestamp = float(packet.sniff_timestamp)
packet.norm_time = (
......@@ -457,19 +518,10 @@ class Trace:
response_stream_packets = list[QuicStreamPacket]()
for packet in self.packets:
try:
src_port = packet.udp.srcport
dst_port = packet.udp.dstport
src_ip = packet.ip.src
dst_ip = packet.ip.dst
except AttributeError:
cprint(
f"⨯ Skipping a packet without UDP or IP?!? ({packet.frame_info.protocols})",
color="red",
file=sys.stderr,
)
continue
src_port = packet.udp.srcport
dst_port = packet.udp.dstport
src_ip = packet.ip.src
dst_ip = packet.ip.dst
src_tuple = (src_ip, src_port)
dst_tuple = (dst_ip, dst_port)
......@@ -479,18 +531,19 @@ class Trace:
client_server_packets.append(packet)
for inner_packet in self.iter_stream_packets(packet):
inner_packet.norm_time = packet.norm_time
request_stream_packets.append(inner_packet)
elif src_tuple == server_tuple and dst_tuple == client_tuple:
packet.direction = Direction.TO_CLIENT
server_client_packets.append(packet)
for inner_packet in self.iter_stream_packets(packet):
inner_packet.norm_time = packet.norm_time
response_stream_packets.append(inner_packet)
else:
raise ParsingError(
f"Packet #{packet.quic.packet_number} has unknown source or destination.",
(
f"Packet #{packet.quic.packet_number} has unknown source or destination: "
f"{src_ip}:{src_port} -> {dst_ip}:{dst_port}"
),
trace=self,
)
......@@ -730,8 +783,8 @@ class Trace:
rtt_ret = calc_rtt(Direction.TO_SERVER, left_trace, right_trace)
rtt_fwd = calc_rtt(Direction.TO_CLIENT, left_trace, right_trace)
assert (
abs(rtt_fwd - rtt_ret) < 0.005
), f"RTTs vary by more than 5ms: {rtt_fwd * 100:.1f} ms vs. {rtt_ret * 100:.1f} ms"
abs(rtt_fwd / rtt_ret - 1) <= 0.02 or abs(rtt_fwd - rtt_ret) <= 0.005
), f"RTTs vary by more than 2 % or 5ms: {rtt_fwd * 1000:.1f} ms vs. {rtt_ret * 1000:.1f} ms"
return (rtt_fwd + rtt_ret) / 2
......@@ -744,6 +797,9 @@ class Trace:
)
if hasattr(quic_packet, "stream_stream_id"):
# add fields from parent
quic_packet.norm_time = packet.norm_time
quic_packet.packet = packet
yield quic_packet
def get_stream_length(self, quic_layer: QuicStreamPacket) -> int:
......@@ -808,11 +864,7 @@ class Trace:
return None
if quic_layer.stream_off.int_value:
# try:
return int(quic_layer.stream_offset)
# except AttributeError:
# breakpoint()
else:
return 0
......
......@@ -12,6 +12,7 @@ from typing import Callable, NamedTuple, Optional, TypeVar, Union
from urllib.parse import ParseResult as URL
from urllib.parse import urlparse
import humanize
import termcolor