Skip to content

Instantly share code, notes, and snippets.

@larseggert
Created March 5, 2026 11:04
Show Gist options
  • Select an option

  • Save larseggert/86bc7394160502b73af0314f7f22959c to your computer and use it in GitHub Desktop.

Select an option

Save larseggert/86bc7394160502b73af0314f7f22959c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""Firefox H2 vs H3 upload benchmark.
Usage:
python3 upload-bench.py run [h2|h3] [iterations] Run benchmark
python3 upload-bench.py plots [dirs...|all] Generate plots
Orchestrates Firefox launch under perf record, dumpcap packet capture,
Marionette-driven Google Drive upload, profiler dump, and cleanup.
Prerequisites:
Launch Firefox once with the default profile, log into Google Drive,
and close it before running this script.
Required tools:
perf, sudo, Xvfb, gzip, dd
dumpcap, tshark (wireshark suite — capture + analysis)
pip: matplotlib (for H2 TCP and H3 QUIC plots)
Environment variables:
FIREFOX_BIN Path to Firefox binary
SAMPLE_SIZE_MB Upload file size in MB (default: 1024)
"""
from __future__ import annotations
import argparse
import atexit
import gzip
import json
import os
import re
import shutil
import signal
import socket
import statistics
import subprocess
import sys
import tempfile
import time
from collections import defaultdict
from pathlib import Path
from subprocess import DEVNULL, Popen, TimeoutExpired
from typing import Any
sys.stdout.reconfigure(line_buffering=True) # type: ignore[union-attr]
sys.stderr.reconfigure(line_buffering=True) # type: ignore[union-attr]
HOME = Path.home()
FIREFOX_BIN = Path(
os.environ.get(
"FIREFOX_BIN",
HOME / "firefox/main/obj-aarch64-unknown-linux-gnu/dist/bin/firefox",
)
)
MARIONETTE_CLIENT = FIREFOX_BIN.parents[3] / "testing/marionette/client"
RESULTS_DIR = HOME / "upload-bench-results"
SAMPLE_SIZE_MB = int(os.environ.get("SAMPLE_SIZE_MB", "1024"))
XVFB_DISPLAY = ":99"
STALL_TIMEOUT = 180
PROFILER_FEATURES = [
"js",
"stackwalk",
"cpu",
"screenshots",
"bandwidth",
"network",
"ipcmessages",
"cpuallthreads",
"markersallthreads",
]
PROFILER_ENV = {
"MOZ_PROFILER_STARTUP": "1",
"MOZ_PROFILER_STARTUP_FEATURES": ",".join(PROFILER_FEATURES),
"MOZ_PROFILER_STARTUP_FILTERS": "GeckoMain,Socket Thread,DOM Worker",
"MOZ_PROFILER_STARTUP_ENTRIES": "134217728",
}
# Parallel time + value arrays for plotting.
_TS = tuple[list[float], list[float]]
_sample_file: Path | None = None
def log(msg: str) -> None:
now = time.time()
ts = time.strftime("%H:%M:%S", time.localtime(now))
print(f"[{ts}.{int(now % 1 * 1000):03d}] {msg}")
def _run(
cmd: list[str], check: bool = False, **kw: Any
) -> subprocess.CompletedProcess[bytes]:
return subprocess.run(cmd, stdout=DEVNULL, stderr=DEVNULL, check=check, **kw)
def _popen(cmd: list[str], **kw: Any) -> Popen[bytes]:
return Popen(cmd, stdout=DEVNULL, stderr=DEVNULL, **kw)
def _human_size(nbytes: int) -> str:
if nbytes < 1024 * 1024:
return f"{nbytes / 1024:.0f} KB"
return f"{nbytes / 1024 / 1024:.1f} MB"
# -- Main -------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
sub = parser.add_subparsers(dest="command")
bench = sub.add_parser("run", help="Run upload benchmark")
bench.add_argument("mode", choices=["h2", "h3"])
bench.add_argument("iterations", nargs="?", type=int, default=5)
bench.add_argument("--upload-timeout", type=int, default=600)
plots = sub.add_parser("plots", help="Generate plots for completed runs")
plots.add_argument(
"dirs",
nargs="+",
help="Run directories (e.g., h3-1 h3-2) or 'all'",
)
args = parser.parse_args()
if args.command == "plots":
_main_plots(args)
return
if args.command is None:
parser.print_help()
sys.exit(1)
global _sample_file
_sample_file = _ensure_sample_file(SAMPLE_SIZE_MB)
profile_dir = _find_profile_dir()
_check_prerequisites(profile_dir)
_ensure_xvfb()
for i in range(1, args.iterations + 1):
log(f"{'=' * 50}")
log(f" Iteration {i}/{args.iterations} mode={args.mode}")
log(f"{'=' * 50}")
run_dir = RESULTS_DIR / f"{args.mode}-{i}"
if run_dir.exists():
sys.exit(
f"Error: {run_dir} already exists. "
"Move or remove it to avoid overwriting previous results."
)
run_dir.mkdir(parents=True)
try:
_run_iteration(args, profile_dir, run_dir, i)
except Exception as e:
log(f"ERROR in iteration {i}: {e}")
_kill_firefox()
continue
time.sleep(5)
summary = RESULTS_DIR / "summary.txt"
log(f"All {args.iterations} iterations complete. Results in {RESULTS_DIR}/")
if summary.exists():
log(f"Summary:\n{summary.read_text()}")
_print_stats(summary, args.mode)
# -- Setup ------------------------------------------------------------------
def _find_profile_dir() -> Path:
for base in [HOME / ".mozilla/firefox", HOME / ".config/mozilla/firefox"]:
if base.is_dir():
for entry in base.iterdir():
if "default" in entry.name and entry.is_dir():
return entry
raise RuntimeError("No default Firefox profile found")
def _ensure_sample_file(size_mb: int) -> Path:
fd, name = tempfile.mkstemp(suffix=".bin", prefix="upload-bench-")
os.close(fd)
path = Path(name)
def cleanup() -> None:
path.unlink(missing_ok=True)
atexit.register(cleanup)
def _handle_signal(signum: int, _: Any) -> None:
cleanup()
sys.exit(128 + signum)
signal.signal(signal.SIGINT, _handle_signal)
signal.signal(signal.SIGTERM, _handle_signal)
log(f"Creating {path.name} ({size_mb} MB)...")
_run(
["dd", "if=/dev/urandom", f"of={path}", "bs=1M", f"count={size_mb}"],
check=True,
)
log(f"Created {path}")
return path
def _check_prerequisites(profile_dir: Path) -> None:
for label, path in [("Profile", profile_dir), ("Firefox", FIREFOX_BIN)]:
if not path.exists():
raise RuntimeError(f"{label} not found: {path}")
def _set_user_pref(user_js: Path, pref: str, value: bool | str | int) -> None:
if isinstance(value, bool):
js_val = "true" if value else "false"
elif isinstance(value, str):
js_val = f'"{value}"'
else:
js_val = str(value)
new_line = f'user_pref("{pref}", {js_val});\n'
lines: list[str] = []
if user_js.exists():
lines = [
ln
for ln in user_js.read_text().splitlines(keepends=True)
if f'"{pref}"' not in ln
]
lines.append(new_line)
user_js.write_text("".join(lines))
def _capture_interface() -> str:
"""Find the interface that routes to the internet via route lookup."""
try:
tokens = subprocess.run(
["ip", "route", "get", "8.8.8.8"],
capture_output=True, text=True, check=True,
).stdout.split()
idx = tokens.index("dev")
iface = tokens[idx + 1]
log(f"Capture interface: {iface} (from route lookup)")
return iface
except (subprocess.CalledProcessError, FileNotFoundError, ValueError, IndexError):
pass
log("WARNING: Could not determine route interface, falling back to eth0")
return "eth0"
def _ensure_xvfb() -> None:
if _run(["pgrep", "-f", f"Xvfb {XVFB_DISPLAY}"]).returncode == 0:
log(f"Xvfb already running on {XVFB_DISPLAY}")
return
_popen(["Xvfb", XVFB_DISPLAY, "-screen", "0", "2560x1440x24"])
time.sleep(1)
log(f"Started Xvfb on {XVFB_DISPLAY}")
# -- Iteration orchestration ------------------------------------------------
def _run_iteration(
args: argparse.Namespace, profile_dir: Path, run_dir: Path, iteration: int
) -> None:
pcap_tmp = Path(tempfile.mktemp(suffix=".pcapng", prefix="capture-", dir="/tmp"))
pcap = run_dir / "capture.pcapng"
tls = run_dir / "tls.keys"
perf = run_dir / "perf.data"
profile = run_dir / "profile.json.gz"
cap_filter = "udp port 443" if args.mode == "h3" else "tcp port 443"
iface = _capture_interface()
capture_start = time.time()
cap_log = open(run_dir / "dumpcap.log", "w")
capture = Popen(
["sudo", "dumpcap", "-q", "-i", iface, "-w", str(pcap_tmp), "-f", cap_filter],
stdout=cap_log, stderr=cap_log,
)
log(f"Started dumpcap on {iface} (PID {capture.pid})")
user_js = profile_dir / "user.js"
_set_user_pref(user_js, "network.http.http3.enable_qlog", args.mode == "h3")
env = {
**os.environ,
"DISPLAY": XVFB_DISPLAY,
"SSLKEYLOGFILE": str(tls),
**PROFILER_ENV,
}
perf_proc = _popen(
[
"perf", "record", "-F", "997", "--call-graph", "fp",
"-o", str(perf), "--",
str(FIREFOX_BIN), "--no-remote", "-marionette",
"-remote-allow-system-access", "--profile", str(profile_dir),
],
env=env,
)
log(f"Started Firefox under perf (PID {perf_proc.pid})")
try:
_wait_for_marionette()
upload_secs, upload_start, upload_end = _drive_session(
args.mode, str(profile), args.upload_timeout
)
timing = {
"capture_start": capture_start,
"upload_start": upload_start,
"upload_end": upload_end,
"upload_secs": upload_secs,
}
(run_dir / "timing.json").write_text(json.dumps(timing, indent=2))
assert _sample_file is not None
size_mb = _sample_file.stat().st_size / 1024 / 1024
mbps = size_mb / upload_secs if upload_secs > 0 else 0
line = f"{args.mode}-{iteration}: {upload_secs:.1f}s ({mbps:.1f} MB/s)"
log(line)
with open(RESULTS_DIR / "summary.txt", "a") as f:
f.write(line + "\n")
finally:
log("Waiting for Firefox to exit...")
try:
perf_proc.wait(timeout=30)
except TimeoutExpired:
log("Firefox didn't exit, killing...")
_kill_firefox()
perf_proc.wait(timeout=10)
_run(["sudo", "killall", "dumpcap"], check=False)
try:
capture.wait(timeout=10)
except TimeoutExpired:
capture.kill()
cap_log.close()
# dumpcap writes to /tmp as root; chown and move to results dir
if pcap_tmp.exists():
_run(["sudo", "chown", f"{os.getuid()}:{os.getgid()}", str(pcap_tmp)])
shutil.move(str(pcap_tmp), str(pcap))
log("Stopped capture")
else:
cap_err = (run_dir / "dumpcap.log").read_text().strip()
log(f"WARNING: No capture file. dumpcap output: {cap_err}")
# Collect qlogs, then compress large files in parallel
qlogs = _collect_qlogs(run_dir) if args.mode == "h3" else []
# pcap is mostly encrypted TLS payload — doesn't compress well, skip it
to_gzip = [p for p in [perf] if p.exists()]
if to_gzip or qlogs:
log(f"Compressing {len(to_gzip)} + {len(qlogs)} files...")
procs = [_popen(["gzip", str(p)]) for p in to_gzip]
procs += qlogs
for proc in procs:
proc.wait()
for label, p in [
("perf", perf.with_suffix(".data.gz")),
("pcap", pcap),
("tls", tls),
("profile", profile),
]:
if p.exists():
log(f" {label}: {_human_size(p.stat().st_size)}")
def _wait_for_marionette(timeout: int = 90) -> None:
log("Waiting for Marionette on port 2828...")
for attempt in range(1, timeout + 1):
try:
with socket.create_connection(("127.0.0.1", 2828), timeout=1):
pass
log(f"Marionette available after {attempt}s")
return
except (ConnectionRefusedError, OSError):
time.sleep(1)
raise RuntimeError("Marionette did not become available")
# -- Marionette automation --------------------------------------------------
def _drive_session(
mode: str, profile_file: str, upload_timeout: int
) -> tuple[float, float, float]:
"""Launch Marionette session, upload, trash, save profile.
Returns (upload_secs, upload_start_epoch, upload_end_epoch).
"""
sys.path.insert(0, str(MARIONETTE_CLIENT))
from marionette_driver.by import By
from marionette_driver.marionette import Actions, Marionette
client = Marionette(host="127.0.0.1", port=2828)
client.start_session()
client.timeout.page_load = 120
client.timeout.implicit = 10
with client.using_context(client.CONTEXT_CHROME):
client.execute_script(
"Services.prefs.setBoolPref('network.http.http3.enable', arguments[0]);",
script_args=[mode != "h2"],
)
if mode == "h2":
log("Disabled HTTP/3 via pref")
try:
return _upload_and_cleanup(
client, By, Actions, profile_file, upload_timeout
)
finally:
log("Quitting Firefox...")
try:
with client.using_context(client.CONTEXT_CHROME):
client.execute_script(
"Services.startup.quit(Services.startup.eAttemptQuit);"
)
except Exception:
pass
def _upload_and_cleanup(
client: Any, By: Any, Actions: Any, profile_file: str, upload_timeout: int
) -> tuple[float, float, float]:
assert _sample_file is not None
filename = _sample_file.name
log("Navigating to Google Drive...")
client.navigate("https://drive.google.com/drive/my-drive")
_wait_for_listing(client)
if _file_in_listing(client, filename):
raise RuntimeError(f"'{filename}' already in Drive — trash it first")
log(f"Verified '{filename}' not in Drive listing")
file_input = _open_upload_dialog(client, By)
if file_input is None:
raise RuntimeError("Could not locate a file input element")
log(f"Sending file: {_sample_file}")
t0 = time.time()
file_input.send_keys(str(_sample_file))
_verify_upload_started(client, filename)
log(f"Waiting up to {upload_timeout}s for upload...")
_wait_for_upload(client, upload_timeout)
upload_end = time.time()
upload_secs = upload_end - t0
size_mb = _sample_file.stat().st_size / 1024 / 1024
mbps = size_mb / upload_secs if upload_secs > 0 else 0
log(f"Upload completed in {upload_secs:.1f}s ({mbps:.1f} MB/s)")
log("Moving uploaded file to trash...")
_trash_file(client, filename, By, Actions)
log(f"Saving profiler data to {profile_file}...")
_save_profiler(client, profile_file)
return upload_secs, t0, upload_end
# -- Google Drive helpers ---------------------------------------------------
def _wait_for_listing(client: Any, timeout: int = 30) -> None:
for _ in range(timeout // 2):
time.sleep(2)
if client.execute_script(
"return document.querySelectorAll('[data-id]').length > 0;"
):
return
log("WARNING: File listing may not have loaded")
def _file_in_listing(client: Any, filename: str) -> bool:
return client.execute_script(
"""
if (document.body.innerText.includes(arguments[0])) return true;
for (let el of document.querySelectorAll('[data-id]')) {
if (el.textContent.includes(arguments[0])) return true;
if (el.querySelector('[data-tooltip="' + arguments[0] + '"]'))
return true;
}
return false;
""",
script_args=[filename],
)
def _open_upload_dialog(client: Any, By: Any) -> Any:
"""Send Alt+C, U via Marionette to open Google Drive's upload dialog."""
from marionette_driver.keys import Keys
from marionette_driver.marionette import Actions
inputs = client.find_elements(By.CSS_SELECTOR, 'input[type="file"]')
if inputs:
_reveal_element(client, inputs[0])
return inputs[0]
client.execute_script(
"""(document.querySelector('[role="main"]') || document.body).click();"""
)
time.sleep(1)
log(" Sending Alt+C, U keyboard shortcut...")
actions = Actions(client)
seq = actions.sequence("key", "kbd_upload")
seq.key_down(Keys.ALT).key_down("c").key_up("c").key_up(Keys.ALT)
seq.pause(1000)
seq.key_down("u").key_up("u")
seq.perform()
time.sleep(3)
inputs = client.find_elements(By.CSS_SELECTOR, 'input[type="file"]')
if inputs:
_reveal_element(client, inputs[0])
return inputs[0]
return None
def _reveal_element(client: Any, element: Any) -> None:
client.execute_script(
"""
Object.assign(arguments[0].style, {
display: 'block', opacity: '1', position: 'fixed',
top: '0', left: '0', width: '200px', height: '40px',
zIndex: '99999',
});
""",
script_args=[element],
)
def _verify_upload_started(client: Any, filename: str) -> None:
for _ in range(5):
time.sleep(2)
if client.execute_script(
"""
let text = document.body.innerText;
return /uploading/i.test(text) || text.includes(arguments[0]);
""",
script_args=[filename],
):
return
raise RuntimeError(
f"Upload did not start: neither 'uploading' nor '{filename}' visible"
)
def _wait_for_upload(client: Any, timeout: int) -> None:
"""Wait for 'uploading' to appear then disappear, or 'upload complete'."""
deadline = time.time() + timeout
last_msg = ""
saw_uploading = False
last_change = time.time()
while time.time() < deadline:
status = client.execute_script(
"""
let text = document.body.innerText;
let match = text.match(/(\\d+)%/);
return {
uploading: /uploading/i.test(text),
complete: /upload complete/i.test(text),
failed: /upload failed/i.test(text),
pct: match ? match[1] : "",
};
"""
)
elapsed = int(time.time() - (deadline - timeout))
if status.get("failed"):
raise RuntimeError("Google Drive reported upload failure")
if status.get("complete"):
log(f" Upload complete (t+{elapsed}s)")
return
if status.get("uploading"):
saw_uploading = True
pct = status.get("pct", "")
msg = f"uploading {pct}%" if pct else "uploading"
if msg != last_msg:
log(f" {msg} (t+{elapsed}s)")
last_msg = msg
last_change = time.time()
elif saw_uploading:
log(f" Upload finished (notification dismissed, t+{elapsed}s)")
return
elif last_msg != "waiting":
log(f" waiting for upload to start (t+{elapsed}s)")
last_msg = "waiting"
last_change = time.time()
if time.time() - last_change > STALL_TIMEOUT:
raise RuntimeError(
f"Upload stalled: '{last_msg}' unchanged for {STALL_TIMEOUT}s"
)
time.sleep(2)
raise TimeoutError(f"Upload did not complete within {timeout}s")
def _trash_file(client: Any, filename: str, By: Any, Actions: Any) -> None:
"""Right-click file row, select 'Move to trash', confirm dialog."""
client.navigate("https://drive.google.com/drive/my-drive")
time.sleep(5)
row = client.execute_script(
"""
for (let row of document.querySelectorAll('[data-id]'))
if (row.textContent.includes(arguments[0])) return row;
return null;
""",
script_args=[filename],
)
if not row:
log(f"WARNING: Could not find '{filename}' row")
return
_click(Actions, client, row, button=2)
time.sleep(2)
if not client.execute_script(
"""
for (let el of document.querySelectorAll('*')) {
let rect = el.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) continue;
for (let child of el.childNodes)
if (child.nodeType === 3 && child.textContent.trim() === 'Move to trash')
return true;
}
return false;
"""
):
log("WARNING: 'Move to trash' not found in context menu")
return
item = client.find_element(By.XPATH, '//*[text()="Move to trash"]')
_click(Actions, client, item)
time.sleep(2)
try:
elements = client.find_elements(By.XPATH, '//*[text()="Move to trash"]')
if elements:
_click(Actions, client, elements[-1])
log(" Confirmed trash dialog")
time.sleep(1)
except Exception:
pass
log(f"Trashed '{filename}'")
def _click(Actions: Any, client: Any, element: Any, button: int = 0) -> None:
actions = Actions(client)
seq = actions.sequence("pointer", f"mouse_{id(element)}")
seq.pointer_move(0, 0, origin=element)
seq.pointer_down(button=button)
seq.pointer_up(button=button)
seq.perform()
# -- Plot helpers -----------------------------------------------------------
def _bin_per_second(
times: list[float],
values: list[float],
agg: Any = max,
) -> _TS:
"""Aggregate values into 1-second bins using the given function."""
if not times:
return [], []
bt, bv = [], []
for sec in range(int(min(times)), int(max(times)) + 1):
bucket = [v for t, v in zip(times, values) if sec <= t < sec + 1]
if bucket:
bt.append(float(sec))
bv.append(agg(bucket))
return bt, bv
def _seq_progression_per_second(
times: list[float], seqs: list[float]
) -> _TS:
"""Throughput from monotonic sequence progression: max-min per second."""
if not times:
return [], []
tp_t, tp_v = [], []
for sec in range(int(min(times)), int(max(times)) + 1):
s = [v for t, v in zip(times, seqs) if sec <= t < sec + 1]
if len(s) >= 2:
tp_t.append(float(sec))
tp_v.append(max(s) - min(s))
return tp_t, tp_v
def _running_min(values: list[float]) -> list[float]:
result = []
m = values[0] if values else 0.0
for v in values:
m = min(m, v)
result.append(m)
return result
def _init_plot(n: int = 4) -> tuple[Any, Any, Any]:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
fig, axes = plt.subplots(n, 1, figsize=(14, 4 * n), sharex=True)
fig.set_dpi(900)
return fig, axes, plt
def _style_ax(ax: Any, ylabel: str, title: str, **kw: Any) -> None:
ax.set_ylabel(ylabel)
ax.set_title(title)
ax.grid(True, alpha=0.3)
if "xlabel" in kw:
ax.set_xlabel(kw["xlabel"])
def _opaque_legend(ax: Any, **kw: Any) -> None:
leg = ax.legend(**kw)
for lh in leg.legend_handles:
lh.set_alpha(1)
def _plot_throughput(ax: Any, tp_t: list[float], tp_v: list[float]) -> None:
if tp_t:
ax.plot(tp_t, tp_v, color="green", alpha=0.8)
ax.fill_between(tp_t, tp_v, alpha=0.3, color="green")
_style_ax(ax, "Throughput (MB/s)", "Upload Throughput")
def _plot_rtt(
ax: Any,
raw_t: list[float],
raw_v: list[float],
smoothed: _TS | None = None,
min_rtt: _TS | None = None,
raw_label: str = "rtt",
smoothed_label: str = "smoothed_rtt",
min_label: str = "min_rtt",
) -> None:
if raw_t:
ax.scatter(
raw_t, raw_v, s=0.5, alpha=0.2, label=raw_label,
color="tab:orange", rasterized=True,
)
if smoothed and smoothed[0]:
ax.plot(*smoothed, label=smoothed_label, alpha=0.8, color="blue")
if min_rtt and min_rtt[0]:
ax.plot(*min_rtt, label=min_label, alpha=0.6, linestyle="--", color="green")
_style_ax(ax, "RTT (ms)", "RTT", xlabel="Time (s)")
_opaque_legend(ax, markerscale=10)
def _fit_ylim(axes: Any) -> None:
"""Recompute y-axis limits from data within the visible x-range.
matplotlib autoscales y from ALL data, including points hidden by xlim.
"""
for ax in axes:
xlo, xhi = ax.get_xlim()
yvals: list[float] = []
for line in ax.get_lines():
xd, yd = line.get_xdata(), line.get_ydata()
yvals.extend(y for x, y in zip(xd, yd) if xlo <= x <= xhi)
for coll in ax.collections:
off = coll.get_offsets()
if len(off):
yvals.extend(float(y) for x, y in off if xlo <= x <= xhi)
if yvals:
ax.set_ylim(0, max(yvals) * 1.05)
def _finalize_plot(
fig: Any, axes: Any, plt: Any, run_dir: Path, out_name: str,
tp_t: list[float],
) -> None:
"""Shared plot finalization: set xlim from timing, fit ylim, save."""
timing = _load_timing(run_dir)
if timing:
axes[-1].set_xlim(timing[0] - 1, timing[1] + 2)
log(f" Upload bounds from timing.json: {timing[0]:.1f}s - {timing[1]:.1f}s")
elif tp_t:
axes[-1].set_xlim(tp_t[0] - 1, tp_t[-1] + 2)
_fit_ylim(axes)
out = run_dir / out_name
plt.tight_layout()
fig.savefig(out)
plt.close(fig)
log(f" Saved {out}")
def _find_capture(run_dir: Path) -> Path | None:
for name in [
"capture.pcapng.gz", "capture.pcapng",
"capture.pcap.gz", "capture.pcap",
]:
p = run_dir / name
if p.exists():
return p
return None
def _load_timing(run_dir: Path) -> tuple[float, float] | None:
"""Load timing.json → (upload_start, upload_end) in capture-relative seconds."""
timing_file = run_dir / "timing.json"
if not timing_file.exists():
return None
timing = json.loads(timing_file.read_text())
t0 = timing.get("capture_start") or timing.get("tcpdump_start", 0)
return timing["upload_start"] - t0, timing["upload_end"] - t0
# -- Plot subcommand --------------------------------------------------------
def _main_plots(args: argparse.Namespace) -> None:
dirs = args.dirs
if dirs == ["all"]:
if not RESULTS_DIR.is_dir():
sys.exit(f"No results directory: {RESULTS_DIR}")
dirs = [d.name for d in sorted(RESULTS_DIR.iterdir()) if d.is_dir()]
for name in dirs:
run_dir = RESULTS_DIR / name
if not run_dir.is_dir():
log(f"Skipping {name}: not a directory")
continue
mode = "h3" if name.startswith("h3") else "h2"
log(f"Generating plots for {name} (mode={mode})...")
if mode == "h2":
pcap = _find_capture(run_dir)
if pcap:
_plot_tcp(pcap, run_dir / "tls.keys", run_dir)
else:
log(f" No pcap for {name}")
else:
qlog_dir = run_dir / "qlog"
if qlog_dir.is_dir():
_plot_quic(run_dir)
else:
log(f" No qlog for {name}")
# -- TCP (H2) plots ---------------------------------------------------------
def _plot_tcp(pcap: Path, tls: Path, run_dir: Path) -> None:
log(" Extracting TCP fields via tshark...")
tls_args = (
["-o", f"tls.keylog_file:{tls}"]
if tls.exists() and tls.stat().st_size > 0
else []
)
fields = [
"frame.time_relative",
"tcp.srcport",
"tcp.dstport",
"tcp.seq",
"tcp.ack",
"tcp.len",
"tcp.analysis.ack_rtt",
"tcp.analysis.bytes_in_flight",
"tcp.window_size",
]
result = subprocess.run(
[
"tshark", "-r", str(pcap), *tls_args,
"-Y", "tcp.port==443", "-T", "fields",
*[a for f in fields for a in ("-e", f)],
],
capture_output=True,
text=True,
check=False,
)
# Identify upload port (most bytes to :443)
port_bytes: dict[str, int] = {}
for raw in result.stdout.splitlines():
f = raw.split("\t")
if len(f) < 9 or f[2] != "443":
continue
try:
port_bytes[f[1]] = port_bytes.get(f[1], 0) + int(f[5])
except ValueError:
continue
if not port_bytes:
log(" No upload traffic found")
return
port = max(port_bytes, key=port_bytes.get) # type: ignore[arg-type]
log(f" Upload port {port} ({port_bytes[port] / 1024 / 1024:.0f} MB)")
# Parse into series
data_t: list[float] = []
data_seq: list[float] = []
ack_t: list[float] = []
ack_seq: list[float] = []
rtt_t: list[float] = []
rtt_v: list[float] = []
for raw in result.stdout.splitlines():
f = raw.split("\t")
if len(f) < 9:
continue
try:
t = float(f[0])
if f[1] == port and int(f[5]) > 0:
data_t.append(t)
data_seq.append(int(f[3]) / 1024 / 1024)
elif f[2] == port:
if f[4]:
ack_t.append(t)
ack_seq.append(int(f[4]) / 1024 / 1024)
if f[6]:
rtt_t.append(t)
rtt_v.append(float(f[6]) * 1000)
except ValueError:
continue
if not data_t:
log(" No data packets")
return
log(f" {len(data_t)} data + {len(ack_t)} ACK + {len(rtt_t)} RTT samples")
fig, axes, plt = _init_plot()
# Sequence numbers
axes[0].scatter(
ack_t, ack_seq, s=0.5, alpha=0.3, label="ACK", color="red", rasterized=True
)
axes[0].scatter(
data_t, data_seq, s=0.5, alpha=0.3, label="Data (seq)",
color="blue", rasterized=True,
)
_style_ax(axes[0], "Sequence / ACK (MB)", "TCP Sequence Numbers & ACKs")
_opaque_legend(axes[0], markerscale=10)
# Throughput from seq progression
tp_t, tp_v = _seq_progression_per_second(data_t, data_seq)
_plot_throughput(axes[1], tp_t, tp_v)
# Bytes in flight + cwnd estimate (computed from seq/ack progression)
merged = sorted(
[("d", t, s) for t, s in zip(data_t, data_seq)]
+ [("a", t, s) for t, s in zip(ack_t, ack_seq)],
key=lambda x: x[1],
)
max_seq = last_ack = 0.0
inflight_t: list[float] = []
inflight_v: list[float] = []
for kind, t, val in merged:
if kind == "d":
max_seq = max(max_seq, val)
inflight_t.append(t)
inflight_v.append(max(0.0, max_seq - last_ack) * 1024) # MB → KB
else:
last_ack = max(last_ack, val)
if inflight_t:
axes[2].plot(
inflight_t, inflight_v, alpha=0.4, color="orange", label="bytes_in_flight",
)
cwnd_t, cwnd_v = _bin_per_second(inflight_t, inflight_v, agg=max)
if cwnd_t:
axes[2].plot(cwnd_t, cwnd_v, label="cwnd (est.)", alpha=0.8, color="blue")
_style_ax(axes[2], "KB", "Congestion Window & Bytes in Flight")
_opaque_legend(axes[2])
# RTT — per-second MAX avoids near-zero ack_rtt artifacts
max_t, max_v = _bin_per_second(rtt_t, rtt_v, agg=max)
min_of_max = (max_t, _running_min(max_v)) if max_t else ([], [])
_plot_rtt(
axes[3], rtt_t, rtt_v, (max_t, max_v), min_of_max,
raw_label="ack_rtt", smoothed_label="max (1s)", min_label="min of max",
)
_finalize_plot(fig, axes, plt, run_dir, "h2-congestion-control.pdf", tp_t)
# -- QUIC (H3) plots --------------------------------------------------------
def _plot_quic(run_dir: Path) -> None:
qlog_dir = run_dir / "qlog"
candidates = list(qlog_dir.glob("*.sqlog.gz")) + list(qlog_dir.glob("*.sqlog"))
if not candidates:
log(" No sqlog files found")
return
biggest = max(candidates, key=lambda p: p.stat().st_size)
log(f" Parsing {biggest.name} ({biggest.stat().st_size / 1024 / 1024:.1f} MB)...")
cwnd_t: list[float] = []
cwnd_v: list[float] = []
bif_t: list[float] = []
bif_v: list[float] = []
srtt_t: list[float] = []
srtt_v: list[float] = []
latest_t: list[float] = []
latest_v: list[float] = []
min_t: list[float] = []
min_v: list[float] = []
streams: dict[int, tuple[list[float], list[float]]] = defaultdict(lambda: ([], []))
data_t: list[float] = []
data_bytes: list[float] = []
opener = gzip.open if biggest.suffix == ".gz" else open
with opener(biggest, "rt", encoding="utf-8", errors="replace") as f: # type: ignore[call-overload]
for raw_line in f:
line = raw_line[1:] if raw_line[0:1] == "\x1e" else raw_line
if "metrics_updated" in line:
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
if obj.get("name") != "recovery:metrics_updated":
continue
t = obj["time"] / 1000.0
d = obj.get("data", {})
for key, tl, vl in [
("congestion_window", cwnd_t, cwnd_v),
("bytes_in_flight", bif_t, bif_v),
("smoothed_rtt", srtt_t, srtt_v),
("latest_rtt", latest_t, latest_v),
("min_rtt", min_t, min_v),
]:
if key in d:
tl.append(t)
vl.append(d[key])
elif "packet_sent" in line:
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
if obj.get("name") != "transport:packet_sent":
continue
t = obj["time"] / 1000.0
for frame in obj.get("data", {}).get("frames", []):
if frame.get("frame_type") == "stream":
sid = frame.get("stream_id", 0)
length = frame.get("length", 0)
off = frame.get("offset", 0) + length
streams[sid][0].append(t)
streams[sid][1].append(off / 1024 / 1024)
if length > 0:
data_t.append(t)
data_bytes.append(length)
n_cc = len(cwnd_t) + len(bif_t) + len(srtt_t) + len(min_t)
n_stream = sum(len(v[0]) for v in streams.values())
log(f" {n_cc} congestion + {n_stream} stream + {len(data_t)} data points")
if not n_cc and not streams:
log(" No usable data")
return
fig, axes, plt = _init_plot()
# Aggregate data transferred across all streams, colored by stream
big = sorted(
[(sid, st, sv) for sid, (st, sv) in streams.items() if sv and max(sv) > 1],
key=lambda x: -max(x[2]),
)
if big:
colors = plt.cm.tab10.colors # type: ignore[attr-defined]
sid_color = {sid: colors[i % len(colors)] for i, (sid, _, _) in enumerate(big)}
all_pts = sorted(
(t, sid, v) for sid, st, sv in big for t, v in zip(st, sv)
)
cur_max: dict[int, float] = {sid: 0.0 for sid, _, _ in big}
per_stream: dict[int, tuple[list[float], list[float]]] = {
sid: ([], []) for sid, _, _ in big
}
for t, sid, v in all_pts:
cur_max[sid] = max(cur_max[sid], v)
total = sum(cur_max.values())
per_stream[sid][0].append(t)
per_stream[sid][1].append(total)
for sid, _, _ in big:
at, av = per_stream[sid]
axes[0].scatter(
at, av, s=0.3, alpha=0.3, label=f"stream {sid}",
color=sid_color[sid], rasterized=True,
)
_style_ax(axes[0], "Total data (MB)", "QUIC Aggregate Data Transferred")
_opaque_legend(axes[0], markerscale=10, fontsize="small", ncol=2)
# Throughput: sum per-stream offset progression per second
big_streams = {
sid: (st, sv) for sid, (st, sv) in streams.items() if sv and max(sv) > 1
}
tp_t: list[float] = []
tp_v: list[float] = []
if big_streams and data_t:
for sec in range(int(min(data_t)), int(max(data_t)) + 1):
total_mb = 0.0
for st, sv in big_streams.values():
offsets = [o for ti, o in zip(st, sv) if sec <= ti < sec + 1]
if len(offsets) >= 2:
total_mb += max(offsets) - min(offsets)
if total_mb > 0:
tp_t.append(float(sec))
tp_v.append(total_mb)
_plot_throughput(axes[1], tp_t, tp_v)
# Congestion window
if cwnd_t:
axes[2].plot(cwnd_t, [v / 1024 for v in cwnd_v], label="cwnd", alpha=0.8)
if bif_t:
axes[2].plot(
bif_t, [v / 1024 for v in bif_v], label="bytes_in_flight", alpha=0.6
)
_style_ax(axes[2], "KB", "Congestion Window & Bytes in Flight")
_opaque_legend(axes[2])
# RTT
_plot_rtt(
axes[3], latest_t, latest_v,
smoothed=(srtt_t, srtt_v),
min_rtt=(min_t, min_v),
raw_label="latest_rtt",
)
_finalize_plot(fig, axes, plt, run_dir, "h3-congestion-control.pdf", tp_t)
# -- Profiler / qlogs -------------------------------------------------------
def _collect_qlogs(run_dir: Path) -> list[Popen[bytes]]:
"""Move qlog directories from /tmp into results, return gzip processes."""
qlog_dirs = sorted(Path("/tmp").glob("qlog_*"))
if not qlog_dirs:
log("No qlog directories found in /tmp")
return []
dest = run_dir / "qlog"
dest.mkdir(exist_ok=True)
for qpath in qlog_dirs:
if qpath.is_dir():
for f in qpath.iterdir():
shutil.move(str(f), str(dest / f.name))
qpath.rmdir()
log(f"Collected {len(list(dest.iterdir()))} qlog files")
return [_popen(["gzip", str(f)]) for f in dest.iterdir() if f.suffix == ".sqlog"]
def _save_profiler(client: Any, output_path: str) -> None:
raw_path = output_path.removesuffix(".gz")
with client.using_context(client.CONTEXT_CHROME):
result = client.execute_async_script(
"""
let path = arguments[0];
let resolve = arguments[arguments.length - 1];
Services.profiler.dumpProfileToFileAsync(path).then(
() => resolve("ok"),
(err) => resolve("error: " + err)
);
""",
script_args=[raw_path],
script_timeout=300000,
)
if result and str(result).startswith("error"):
log(f"WARNING: Profiler dump reported: {result}")
return
raw = Path(raw_path)
if raw.exists():
with raw.open("rb") as f_in, gzip.open(output_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
raw.unlink()
log(f"Profiler data saved to {output_path}")
else:
log(f"WARNING: Profiler dump file not found at {raw_path}")
# -- Stats ------------------------------------------------------------------
def _print_stats(summary: Path, mode: str) -> None:
times: list[float] = []
throughputs: list[float] = []
for line in summary.read_text().strip().splitlines():
stripped = line.strip()
if not stripped.startswith(f"{mode}-"):
continue
m = re.match(r".+:\s+([\d.]+)s(?:\s+\(([\d.]+)\s+MB/s\))?", stripped)
if not m:
continue
times.append(float(m.group(1)))
if m.group(2):
throughputs.append(float(m.group(2)))
if not times:
return
parts = [f"{statistics.mean(times):.1f}s avg"]
if len(times) >= 2:
parts.append(f"{statistics.stdev(times):.1f}s stddev")
if throughputs:
parts.append(f"{statistics.mean(throughputs):.1f} MB/s avg")
if len(throughputs) >= 2:
parts.append(f"{statistics.stdev(throughputs):.1f} MB/s stddev")
parts.append(f"n={len(times)}")
log(f"{mode}: {', '.join(parts)}")
def _kill_firefox() -> None:
_run(["killall", "-9", "firefox"], check=False)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment