Skip to content

Instantly share code, notes, and snippets.

@simkin
Last active February 24, 2026 17:08
Show Gist options
  • Select an option

  • Save simkin/0bd09071df08738b0755cce34e6799b8 to your computer and use it in GitHub Desktop.

Select an option

Save simkin/0bd09071df08738b0755cce34e6799b8 to your computer and use it in GitHub Desktop.
Generates test videos covering DSE, vertical banding, black/red smearing, ghosting/overshoot, local dimming behavior, etc.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Panel Test Orchestrator — Final V4.1
Goal: DSE, vertical banding, black/red smearing, ghosting/overshoot, local dimming behavior, etc.
Key properties
- 4K UHD fixed: 3840x2160
- 60 or 120 fps
- 30s default (configurable)
- 5s readable intro panel (English) + optional timer
- SDR suite (Rec.709) and HDR10 suite (BT.2020 + PQ) with paired scenarios
- Correct RGB->YUV conversion matrix/transfer/primaries + explicit range mapping
- Correct PQ (SMPTE ST 2084) math for HDR code values
- Auto HEVC HW encoder detect (AMF/NVENC/QSV/VAAPI)
- HDR10 metadata injected for HW encoders via hevc_metadata bitstream filter if available
- Falls back to x265 when HDR metadata injection is unavailable
- RGB16 pipeline + vectorized starfield/red smear patterns
"""
import argparse
import math
import os
import shutil
import subprocess
import sys
import time
import textwrap
import threading
import queue
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Dict
import numpy as np
import cv2
# ==========================
# Fixed output geometry
# ==========================
W, H = 3840, 2160
MAX16 = 65535
INTRO_SECONDS = 0.5
OUTRO_SECONDS = 0.5
DEFAULT_BODY_SECONDS = 30.0
DEFAULT_TEXT_INTRO_SECONDS = 5.0
CURRENT_FFMPEG_PROC = None
CANCEL_REQUESTED = False
# ==========================
# Color / transfer helpers
# ==========================
# SMPTE ST 2084 (PQ) OETF constants
_PQ_M1 = 2610.0 / 16384.0
_PQ_M2 = 2523.0 / 32.0
_PQ_C1 = 3424.0 / 4096.0
_PQ_C2 = 2413.0 / 128.0
_PQ_C3 = 2392.0 / 128.0
_PQ_LMAX = 10000.0 # PQ absolute max in nits
def pq_oetf_nits_to_norm(nits: np.ndarray) -> np.ndarray:
"""Convert absolute luminance (nits) to PQ signal (0..1), SMPTE ST 2084."""
L = np.clip(nits, 0.0, _PQ_LMAX) / _PQ_LMAX
Lm1 = np.power(L, _PQ_M1)
num = _PQ_C1 + _PQ_C2 * Lm1
den = 1.0 + _PQ_C3 * Lm1
E = np.power(num / den, _PQ_M2)
return np.clip(E, 0.0, 1.0)
def pq_oetf_nits_to_u16(nits: np.ndarray) -> np.ndarray:
"""Return uint16 code values 0..65535 representing PQ-coded values."""
E = pq_oetf_nits_to_norm(nits)
return np.round(E * MAX16).astype(np.uint16)
def sdr_code_to_u16(x: np.ndarray) -> np.ndarray:
"""
SDR test patterns are authored as code values (display-referred), not scene-linear.
x in 0..1 maps directly to 16-bit code values.
"""
return np.round(np.clip(x, 0.0, 1.0) * MAX16).astype(np.uint16)
# ==========================
# Scenario definitions (paired)
# ==========================
SDR_SCENARIOS: Dict[str, Dict[str, str]] = {
"uniformity_sweep": {
"title": "Gray Uniformity Sweep (SDR)",
"goal": "Reveal dirty screen effect and vertical banding",
"watch": "Look for patches/streaks staying fixed while content drifts."
},
"green_uniformity_pan": {
"title": "Green Uniformity Pan (SDR)",
"goal": "Reveal DSE on green-ish content",
"watch": "Look for dirty screen patterns, tint shifts, non-uniform panning."
},
"vertical_bars": {
"title": "Vertical Bars Scroll (SDR)",
"goal": "Reveal vertical banding and edge uniformity",
"watch": "Look for banding, uneven brightness, unstable edges during scroll."
},
"near_black_steps": {
"title": "Near-Black Steps (SDR)",
"goal": "Check near-black detail and black crush",
"watch": "Look for missing steps, crushed shadows, flicker/instability."
},
"near_black_ramp_move": {
"title": "Near-Black Moving Ramp (SDR)",
"goal": "Stress near-black uniformity under motion",
"watch": "Look for flashing, contouring, and fixed-pattern noise in motion."
},
"red_gradient": {
"title": "Red Gradient Banding (SDR)",
"goal": "Reveal red banding and posterization",
"watch": "Look for steps in gradient and unstable red transitions."
},
"starfield": {
"title": "Starfield Smear Stress (SDR)",
"goal": "Stress black-to-color transitions for smearing",
"watch": "Look for black smearing, trails, slow pixel response."
},
"red_smear": {
"title": "Red-on-Black Smearing (SDR)",
"goal": "Stress VA red trailing",
"watch": "Look for long red trails and distorted moving shapes."
},
"overshoot_blocks": {
"title": "Overshoot / Inverse Ghosting (SDR)",
"goal": "Reveal overdrive artifacts on gray transitions",
"watch": "Look for bright halos, dark trails, inverse ghosting."
},
"checker_rows": {
"title": "Checkerboard Alternating Rows (SDR)",
"goal": "High-contrast row motion stress",
"watch": "Look for blur, ghosting, unstable row transitions."
},
"radial_spokes": {
"title": "Radial Spokes Rotation (SDR)",
"goal": "Stress high-contrast rotating edges",
"watch": "Look for edge breakup, flicker, interpolation artifacts."
},
"local_dimming_dot": {
"title": "Local Dimming Dot Sweep (SDR)",
"goal": "Reveal blooming / dimming lag (OLED, Mini-LED, FALD)",
"watch": "Look for halos, zone pumping, lag behind moving dot."
},
"subtitle_bar": {
"title": "Subtitle Bar Stress (SDR)",
"goal": "Stress dimming with bright bar near bottom",
"watch": "Look for pumping, raised blacks, bloom around the bar."
},
}
HDR_SCENARIOS: Dict[str, Dict[str, str]] = {
"uniformity_sweep_hdr": {
"title": "Gray Uniformity Sweep (HDR10)",
"goal": "Reveal DSE/banding under HDR processing",
"watch": "Look for DSE/banding differences vs SDR (processing/local dimming)."
},
"green_uniformity_pan_hdr": {
"title": "Green Uniformity Pan (HDR10)",
"goal": "Reveal DSE in HDR pipeline",
"watch": "Look for tint shifts, local dimming artifacts, non-uniform panning."
},
"vertical_bars_hdr": {
"title": "Vertical Bars Scroll (HDR10)",
"goal": "Reveal banding & edge uniformity in HDR pipeline",
"watch": "Look for banding changes vs SDR and any processing artifacts."
},
"near_black_steps_hdr": {
"title": "Near-Black Steps (HDR10)",
"goal": "Check PQ near-black stability (artifact focus)",
"watch": "Look for flashing, raised blacks, crushed near-black detail."
},
"near_black_ramp_move_hdr": {
"title": "Near-Black Moving Ramp (HDR10)",
"goal": "Stress PQ near-black uniformity under motion",
"watch": "Look for contouring, flashing, fixed patterns during motion."
},
"red_gradient_hdr": {
"title": "Red Gradient Banding (HDR10)",
"goal": "Reveal banding in HDR color processing",
"watch": "Look for steps/hue shifts/banding different from SDR."
},
"starfield_hdr": {
"title": "Starfield Smear Stress (HDR10)",
"goal": "Stress black-to-color transitions in HDR mode",
"watch": "Look for smearing/ghosting changes vs SDR."
},
"red_smear_hdr": {
"title": "Red-on-Black Smearing (HDR10)",
"goal": "Stress red trailing under HDR processing",
"watch": "Look for red smearing and local dimming interaction."
},
"overshoot_blocks_hdr": {
"title": "Overshoot / Inverse Ghosting (HDR10)",
"goal": "Reveal overdrive artifacts under HDR pipeline",
"watch": "Look for halos/trails/inverse ghosting differences vs SDR."
},
"checker_rows_hdr": {
"title": "Checkerboard Alternating Rows (HDR10)",
"goal": "High-contrast row motion stress in HDR pipeline",
"watch": "Look for motion artifacts differences vs SDR."
},
"radial_spokes_hdr": {
"title": "Radial Spokes Rotation (HDR10)",
"goal": "Stress edge motion in HDR pipeline",
"watch": "Look for flicker/interpolation and any processing artifacts."
},
"local_dimming_dot_hdr": {
"title": "Local Dimming Dot Sweep (HDR10)",
"goal": "Reveal blooming/zone behavior in HDR mode",
"watch": "Look for halos, zone pumping, lag vs SDR."
},
"subtitle_bar_hdr": {
"title": "Subtitle Bar Stress (HDR10)",
"goal": "Stress bright subtitle bar in HDR mode",
"watch": "Look for pumping, raised blacks, bloom around the bar."
},
}
def scenario_names_for_suite(suite: str) -> List[str]:
return list(SDR_SCENARIOS.keys()) if suite == "sdr" else list(HDR_SCENARIOS.keys())
def scenario_info(suite: str, scenario: str) -> Dict[str, str]:
return (SDR_SCENARIOS if suite == "sdr" else HDR_SCENARIOS)[scenario]
# ==========================
# Config
# ==========================
@dataclass
class RenderConfig:
fps: int = 60
body_seconds: float = DEFAULT_BODY_SECONDS
text_intro_seconds: float = DEFAULT_TEXT_INTRO_SECONDS
suite: str = "sdr"
scenario: str = "uniformity_sweep"
all_scenarios: bool = False
segment: str = "all" # intro | body | outro | all | separate
out_dir: str = "renders_final_v3_1"
basename: str = ""
# ffv1 | prores4444 | hevc_x265 | hevc_auto | hevc_hw
codec: str = "ffv1"
hevc_quality: str = "high" # high | medium
speed: str = "medium" # slow | medium | fast
timer_color: str = "white" # white | red
no_timer: bool = False
no_crt: bool = False
no_text_intro: bool = False
color_range: str = "tv" # tv | pc (limited vs full)
@property
def intro_frames(self) -> int:
return int(round(INTRO_SECONDS * self.fps))
@property
def outro_frames(self) -> int:
return int(round(OUTRO_SECONDS * self.fps))
@property
def body_frames(self) -> int:
return int(round(self.body_seconds * self.fps))
# ==========================
# Validation / utility
# ==========================
def ensure_ffmpeg() -> None:
if shutil.which("ffmpeg") is None:
raise RuntimeError("FFmpeg not found in PATH.")
def validate_cfg(cfg: RenderConfig) -> None:
if cfg.fps not in (60, 120):
raise ValueError("FPS must be 60 or 120.")
if cfg.body_seconds <= 0:
raise ValueError("Body seconds must be > 0.")
if cfg.text_intro_seconds < 0:
raise ValueError("Text intro seconds must be >= 0.")
if cfg.suite not in ("sdr", "hdr10"):
raise ValueError("Suite must be sdr or hdr10.")
if cfg.scenario not in scenario_names_for_suite(cfg.suite):
raise ValueError(f"Scenario '{cfg.scenario}' not in suite '{cfg.suite}'.")
if cfg.segment not in ("intro", "body", "outro", "all", "separate"):
raise ValueError("Segment must be intro/body/outro/all/separate.")
if cfg.codec not in ("ffv1", "prores4444", "hevc_x265", "hevc_auto", "hevc_hw"):
raise ValueError("Invalid codec.")
if cfg.hevc_quality not in ("high", "medium"):
raise ValueError("hevc_quality must be high or medium.")
if cfg.speed not in ("slow", "medium", "fast"):
raise ValueError("speed must be slow/medium/fast.")
if cfg.timer_color not in ("white", "red"):
raise ValueError("timer_color must be white or red.")
if cfg.color_range not in ("tv", "pc"):
raise ValueError("color_range must be tv or pc.")
def speed_px_per_sec(speed: str, base: float) -> float:
return {"slow": base * 0.55, "medium": base, "fast": base * 1.8}[speed]
def ffmpeg_capture(args: List[str]) -> str:
p = subprocess.run(args, capture_output=True, text=True)
return (p.stdout or "") + "\n" + (p.stderr or "")
def detect_hevc_hw_encoder() -> Optional[str]:
# Prefer AMD first (for your RX 6900 XT)
txt = ffmpeg_capture(["ffmpeg", "-hide_banner", "-encoders"]).lower()
for enc in ("hevc_amf", "hevc_vaapi", "hevc_nvenc", "hevc_qsv"):
if enc in txt:
return enc
return None
def has_filter(name: str) -> bool:
return name.lower() in ffmpeg_capture(["ffmpeg", "-hide_banner", "-filters"]).lower()
def has_bsf(name: str) -> bool:
return name.lower() in ffmpeg_capture(["ffmpeg", "-hide_banner", "-bsfs"]).lower()
def bsf_supports_hdr10_metadata() -> bool:
"""Check whether hevc_metadata BSF supports master_display + max_cll options."""
txt = ffmpeg_capture(["ffmpeg", "-hide_banner", "-h", "bsf=hevc_metadata"]).lower()
return ("master_display" in txt) and ("max_cll" in txt)
# ==========================
# Overlays / CRT
# ==========================
def frame_black() -> np.ndarray:
return np.zeros((H, W, 3), dtype=np.uint16)
def add_timer(frame_rgb16: np.ndarray, elapsed: float, total: float, color_name: str) -> None:
s = int(min(max(elapsed, 0), total))
mm, ss = divmod(s, 60)
text = f"{mm:02d}:{ss:02d}"
font = cv2.FONT_HERSHEY_SIMPLEX
scale = 2.0
thick = 4
# Draw directly on RGB array (no BGR view with negative strides)
target = frame_rgb16
outline = (0, 0, 0)
# RGB tuples now (R,G,B)
fg8 = (255, 255, 255) if color_name == "white" else (255, 64, 64)
outline16 = tuple(int(c * 257) for c in outline)
fg16 = tuple(int(c * 257) for c in fg8)
(tw, th), _ = cv2.getTextSize(text, font, scale, thick)
x = W - tw - 50
y = 50 + th
cv2.putText(target, text, (x, y), font, scale, outline16, thick + 6, cv2.LINE_AA)
cv2.putText(target, text, (x, y), font, scale, fg16, thick, cv2.LINE_AA)
def draw_text_panel(frame_rgb16: np.ndarray, title: str, goal: str, watch: str, opacity: float = 0.88) -> None:
"""
Full-page slide-style intro (more PowerPoint-like) for readability on TVs.
Draws directly on the RGB16 frame.
"""
# Dim whatever is behind this, then draw a full-page card
frame_rgb16[:] = np.clip(frame_rgb16.astype(np.float32) * (1.0 - opacity), 0, MAX16).astype(np.uint16)
target = frame_rgb16
# RGB colors
white = (MAX16, MAX16, MAX16)
black = (0, 0, 0)
soft_red = (MAX16, 21000, 21000)
soft_blue = (18000, 30000, MAX16)
card_fill = (4200, 4200, 6500)
card_edge = (13500, 13500, 21000)
card_fill_2 = (6500, 6500, 9000)
# Main card and header
x0, y0 = int(W * 0.05), int(H * 0.07)
x1, y1 = int(W * 0.95), int(H * 0.91)
header_h = int((y1 - y0) * 0.18)
hy1 = y0 + header_h
cv2.rectangle(target, (x0, y0), (x1, y1), card_fill, -1)
cv2.rectangle(target, (x0, y0), (x1, y1), card_edge, 6)
cv2.rectangle(target, (x0, y0), (x1, hy1), soft_blue, -1)
cv2.rectangle(target, (x0 + 24, hy1 + 24), (x1 - 24, hy1 + 34), soft_red, -1)
# Title (wrapped, max 2 lines)
tx = x0 + 52
ty = y0 + 78
for n, line in enumerate(textwrap.wrap(title, width=34)[:2]):
yy = ty + n * 72
cv2.putText(target, line, (tx, yy), cv2.FONT_HERSHEY_DUPLEX, 1.50, black, 8, cv2.LINE_AA)
cv2.putText(target, line, (tx, yy), cv2.FONT_HERSHEY_DUPLEX, 1.50, white, 4, cv2.LINE_AA)
# Two-column content area
content_top = hy1 + 64
margin = 52
gutter = 40
col_w = (x1 - x0 - margin * 2 - gutter)
col_w //= 2
left_x = x0 + margin
right_x = left_x + col_w + gutter
col_y0 = content_top
col_y1 = y1 - 58
# Boxes
cv2.rectangle(target, (left_x, col_y0), (left_x + col_w, col_y1), card_fill_2, -1)
cv2.rectangle(target, (left_x, col_y0), (left_x + col_w, col_y1), card_edge, 3)
cv2.rectangle(target, (right_x, col_y0), (right_x + col_w, col_y1), card_fill_2, -1)
cv2.rectangle(target, (right_x, col_y0), (right_x + col_w, col_y1), card_edge, 3)
# Column headers
hdr_h = 56
cv2.rectangle(target, (left_x, col_y0), (left_x + col_w, col_y0 + hdr_h), soft_red, -1)
cv2.rectangle(target, (right_x, col_y0), (right_x + col_w, col_y0 + hdr_h), soft_blue, -1)
cv2.putText(target, "GOAL", (left_x + 18, col_y0 + 40), cv2.FONT_HERSHEY_SIMPLEX, 1.02, black, 6, cv2.LINE_AA)
cv2.putText(target, "GOAL", (left_x + 18, col_y0 + 40), cv2.FONT_HERSHEY_SIMPLEX, 1.02, white, 2, cv2.LINE_AA)
cv2.putText(target, "WATCH FOR", (right_x + 18, col_y0 + 40), cv2.FONT_HERSHEY_SIMPLEX, 1.02, black, 6, cv2.LINE_AA)
cv2.putText(target, "WATCH FOR", (right_x + 18, col_y0 + 40), cv2.FONT_HERSHEY_SIMPLEX, 1.02, white, 2, cv2.LINE_AA)
# Body text
font = cv2.FONT_HERSHEY_SIMPLEX
scale = 0.90
thick = 2
step = 46
max_lines = max(1, (col_y1 - (col_y0 + hdr_h + 28)) // step)
y = col_y0 + hdr_h + 42
for line in textwrap.wrap(goal, width=42)[:max_lines]:
cv2.putText(target, line, (left_x + 18, y), font, scale, black, 5, cv2.LINE_AA)
cv2.putText(target, line, (left_x + 18, y), font, scale, white, thick, cv2.LINE_AA)
y += step
y = col_y0 + hdr_h + 42
for line in textwrap.wrap(watch, width=42)[:max_lines]:
cv2.putText(target, line, (right_x + 18, y), font, scale, black, 5, cv2.LINE_AA)
cv2.putText(target, line, (right_x + 18, y), font, scale, white, thick, cv2.LINE_AA)
y += step
# Footer
footer = "Body starts after intro. Timer appears top-right during the test."
fy = y1 - 18
cv2.putText(target, footer, (x0 + 34, fy), cv2.FONT_HERSHEY_SIMPLEX, 0.72, black, 4, cv2.LINE_AA)
cv2.putText(target, footer, (x0 + 34, fy), cv2.FONT_HERSHEY_SIMPLEX, 0.72, (52000, 52000, 52000), 2, cv2.LINE_AA)
def crt_vertical_collapse(frame_rgb16: np.ndarray, scale_y: float, flash_gain: float) -> np.ndarray:
nh = max(1, int(H * scale_y))
resized = cv2.resize(frame_rgb16, (W, nh), interpolation=cv2.INTER_LINEAR)
canvas = np.zeros_like(frame_rgb16)
y0 = (H - nh) // 2
canvas[y0:y0+nh, :, :] = resized
if flash_gain != 1.0:
canvas = np.clip(canvas.astype(np.float32) * flash_gain, 0, MAX16).astype(np.uint16)
return canvas
def apply_crt(cfg: RenderConfig, idx: int, total: int, which: str) -> np.ndarray:
if cfg.no_crt:
return frame_black()
# Generate a white frame so flash_gain/collapse visibly produces light.
flash_frame = np.full((H, W, 3), MAX16, dtype=np.uint16)
if which == "intro":
p = idx / max(1, total - 1)
return crt_vertical_collapse(flash_frame, 0.02 + 0.98 * p, 1.85 - 0.85 * p)
if which == "outro":
p = idx / max(1, total - 1)
return crt_vertical_collapse(flash_frame, max(0.008, 1.0 - 0.992 * p), 1.0 + 0.35 * (1.0 - p))
return frame_black()
# ==========================
# Pattern generators
# ==========================
def gen_uniformity_sweep_sdr(cfg: RenderConfig, t: float) -> np.ndarray:
frac = t / max(cfg.body_seconds, 1e-6)
pct = 0.05 if frac < 1/3 else (0.10 if frac < 2/3 else 0.50)
base = int(pct * MAX16)
frame = np.full((H, W, 3), base, dtype=np.uint16)
sp = speed_px_per_sec(cfg.speed, 22.0)
x = (np.arange(W, dtype=np.float32) + sp * t) / (W / 6.2)
drift = (np.sin(2 * np.pi * x) * 0.0025 + np.sin(2 * np.pi * x * 0.33) * 0.0018)
d = (drift * MAX16).astype(np.int32)
return np.clip(frame.astype(np.int32) + d[None, :, None], 0, MAX16).astype(np.uint16)
def gen_uniformity_sweep_hdr(cfg: RenderConfig, t: float) -> np.ndarray:
frac = t / max(cfg.body_seconds, 1e-6)
nits = 5.0 if frac < 1/3 else (10.0 if frac < 2/3 else 50.0)
base = int(pq_oetf_nits_to_u16(np.array(nits, dtype=np.float32)))
frame = np.full((H, W, 3), base, dtype=np.uint16)
sp = speed_px_per_sec(cfg.speed, 22.0)
x = (np.arange(W, dtype=np.float32) + sp * t) / (W / 6.2)
drift = (np.sin(2 * np.pi * x) * 0.0020 + np.sin(2 * np.pi * x * 0.33) * 0.0014)
d = (drift * 2200).astype(np.int32)
return np.clip(frame.astype(np.int32) + d[None, :, None], 0, MAX16).astype(np.uint16)
def gen_green_uniformity_pan_sdr(cfg: RenderConfig, t: float) -> np.ndarray:
frame = np.zeros((H, W, 3), dtype=np.uint16)
frame[:, :, 1] = int(MAX16 * 0.66) # G in RGB
sp = speed_px_per_sec(cfg.speed, 18.0)
x = (np.arange(W, dtype=np.float32) + sp * t) / (W / 5.8)
var = (np.sin(2*np.pi*x) * 0.0035 + np.sin(2*np.pi*0.21*x) * 0.002)
d = (var * MAX16).astype(np.int32)
f = frame.astype(np.int32)
f[:, :, 1] += d[None, :]
f[:, :, 0] += (d[None, :] // 7)
f[:, :, 2] += (d[None, :] // 7)
return np.clip(f, 0, MAX16).astype(np.uint16)
def gen_green_uniformity_pan_hdr(cfg: RenderConfig, t: float) -> np.ndarray:
base_nits = 20.0
base = int(pq_oetf_nits_to_u16(np.array(base_nits, dtype=np.float32)))
frame = np.zeros((H, W, 3), dtype=np.uint16)
frame[:, :, 1] = base
sp = speed_px_per_sec(cfg.speed, 18.0)
x = (np.arange(W, dtype=np.float32) + sp * t) / (W / 5.8)
var = (np.sin(2*np.pi*x) * 0.0030 + np.sin(2*np.pi*0.21*x) * 0.0017)
d = (var * 1800).astype(np.int32)
f = frame.astype(np.int32)
f[:, :, 1] += d[None, :]
f[:, :, 0] += (d[None, :] // 10)
f[:, :, 2] += (d[None, :] // 10)
return np.clip(f, 0, MAX16).astype(np.uint16)
def gen_vertical_bars(cfg: RenderConfig, t: float) -> np.ndarray:
bar_w = max(8, W // 96)
tile_w = bar_w * 256
xx = (np.arange(tile_w) // bar_w) % 2
vals = np.where(xx == 0, 0, MAX16).astype(np.uint16)
line = np.stack([vals, vals, vals], axis=1)
tile = np.repeat(line[None, :, :], H, axis=0)
off = int((speed_px_per_sec(cfg.speed, 320.0) * t) % tile_w)
p1 = tile[:, off:off + W]
return np.concatenate([p1, tile[:, :W - p1.shape[1]]], axis=1)[:, :W] if p1.shape[1] < W else p1[:, :W]
def gen_near_black_steps_sdr(cfg: RenderConfig, t: float) -> np.ndarray:
levels = np.array([0.00, 0.01, 0.02, 0.03, 0.04, 0.05, 0.07, 0.10], dtype=np.float32)
n = levels.size
bw = W // n
tile_w = bw * n
tile = np.zeros((H, tile_w, 3), dtype=np.uint16)
for i, lv in enumerate(levels):
tile[:, i*bw:(i+1)*bw, :] = int(lv * MAX16)
off = int((speed_px_per_sec(cfg.speed, 55.0) * t) % tile_w)
p1 = tile[:, off:off + W]
return np.concatenate([p1, tile[:, :W - p1.shape[1]]], axis=1)[:, :W] if p1.shape[1] < W else p1[:, :W]
def gen_near_black_steps_hdr(cfg: RenderConfig, t: float) -> np.ndarray:
nits = np.array([0.0, 0.05, 0.1, 0.2, 0.4, 0.7, 1.2, 2.0], dtype=np.float32)
codes = pq_oetf_nits_to_u16(nits)
n = codes.size
bw = W // n
tile_w = bw * n
tile = np.zeros((H, tile_w, 3), dtype=np.uint16)
for i, code in enumerate(codes):
tile[:, i*bw:(i+1)*bw, :] = int(code)
off = int((speed_px_per_sec(cfg.speed, 55.0) * t) % tile_w)
p1 = tile[:, off:off + W]
return np.concatenate([p1, tile[:, :W - p1.shape[1]]], axis=1)[:, :W] if p1.shape[1] < W else p1[:, :W]
def gen_near_black_ramp_move_sdr(cfg: RenderConfig, t: float) -> np.ndarray:
x = np.linspace(0.0, 1.0, W, dtype=np.float32)
ramp = (x ** 2.2) * 0.10
drift = 0.004 * np.sin(2*np.pi*(x*6 + t*0.15))
vals = np.clip(ramp + drift, 0.0, 0.12)
line = sdr_code_to_u16(vals)
frame = np.repeat(line[None, :, None], H, axis=0)
frame = np.repeat(frame, 3, axis=2)
off = int((speed_px_per_sec(cfg.speed, 30.0) * t) % W)
return np.roll(frame, -off, axis=1)
def gen_near_black_ramp_move_hdr(cfg: RenderConfig, t: float) -> np.ndarray:
x = np.linspace(0.0, 1.0, W, dtype=np.float32)
nits = (x ** 3.0) * 5.0
drift = 0.25 * np.sin(2*np.pi*(x*6 + t*0.12))
nits2 = np.clip(nits + drift, 0.0, 6.0)
line = pq_oetf_nits_to_u16(nits2)
frame = np.repeat(line[None, :, None], H, axis=0)
frame = np.repeat(frame, 3, axis=2)
off = int((speed_px_per_sec(cfg.speed, 30.0) * t) % W)
return np.roll(frame, -off, axis=1)
def gen_red_gradient_sdr(cfg: RenderConfig, t: float) -> np.ndarray:
xx = np.linspace(0, 1, W, dtype=np.float32)
yy = np.linspace(0, 1, H, dtype=np.float32)
X, Y = np.meshgrid(xx, yy)
ang = 2*np.pi*(t/max(cfg.body_seconds,1e-6))*0.25
g = np.clip(0.5 + 0.85*(np.cos(ang)*(X-0.5) + np.sin(ang)*(Y-0.5)), 0, 1)
pulse = 0.94 + 0.06 * math.sin(2*np.pi*t/max(cfg.body_seconds,1e-6))
red = sdr_code_to_u16((0.05 + 0.95*g) * pulse)
frame = np.zeros((H, W, 3), dtype=np.uint16)
frame[:, :, 0] = red # R
return frame
def gen_red_gradient_hdr(cfg: RenderConfig, t: float) -> np.ndarray:
xx = np.linspace(0, 1, W, dtype=np.float32)
yy = np.linspace(0, 1, H, dtype=np.float32)
X, Y = np.meshgrid(xx, yy)
ang = 2*np.pi*(t/max(cfg.body_seconds,1e-6))*0.25
g = np.clip(0.5 + 0.85*(np.cos(ang)*(X-0.5) + np.sin(ang)*(Y-0.5)), 0, 1)
nits = 5.0 + g * 115.0
pulse = 0.97 + 0.03 * math.sin(2*np.pi*t/max(cfg.body_seconds,1e-6))
code = pq_oetf_nits_to_u16(nits * pulse)
frame = np.zeros((H, W, 3), dtype=np.uint16)
frame[:, :, 0] = code # R
return frame
# --- Vectorized starfield ---
_STAR_CACHE = None
def _init_star_cache():
global _STAR_CACHE
rng = np.random.default_rng(12345)
n = 260
_STAR_CACHE = {
"x0": rng.uniform(0, W, n).astype(np.float32),
"y0": rng.uniform(0, H, n).astype(np.float32),
"grp": rng.integers(0, 3, n, dtype=np.int32),
"size": rng.choice(np.array([2, 3, 4, 5], dtype=np.int32), n),
"color": rng.integers(0, 3, n, dtype=np.int32), # 0 white, 1 red, 2 blue
"ang": np.deg2rad(25 + (np.arange(n) % 17) * 3).astype(np.float32),
}
def _stamp_star_squares(frame: np.ndarray, xs: np.ndarray, ys: np.ndarray, cs: np.ndarray, s: int, value: int) -> None:
# RGB layout: R=0, G=1, B=2
for dy in range(s):
yy = ys + dy
for dx in range(s):
xx = xs + dx
mw = (cs == 0)
if np.any(mw):
frame[yy[mw], xx[mw], :] = value
mr = (cs == 1)
if np.any(mr):
frame[yy[mr], xx[mr], 0] = value # red channel
mb = (cs == 2)
if np.any(mb):
frame[yy[mb], xx[mb], 2] = value # blue channel
def gen_starfield_sdr(cfg: RenderConfig, t: float) -> np.ndarray:
global _STAR_CACHE
if _STAR_CACHE is None:
_init_star_cache()
S = _STAR_CACHE
frame = np.full((H, W, 3), int(MAX16 * 0.002), dtype=np.uint16)
base = np.array([85.0, 190.0, 380.0], dtype=np.float32)
mult = {"slow": 0.8, "medium": 1.0, "fast": 1.35}[cfg.speed]
sp = base[S["grp"]] * mult
x = (S["x0"] + np.cos(S["ang"]) * sp * t) % (W + 20) - 10
y = (S["y0"] + np.sin(S["ang"]) * sp * t) % (H + 20) - 10
xi = x.astype(np.int32)
yi = y.astype(np.int32)
sz = S["size"]
col = S["color"]
vis = (xi >= -6) & (xi < W) & (yi >= -6) & (yi < H)
xi, yi, sz, col = xi[vis], yi[vis], sz[vis], col[vis]
for s in (2, 3, 4, 5):
m = (sz == s)
if not np.any(m):
continue
xs = np.clip(xi[m], 0, W - s)
ys = np.clip(yi[m], 0, H - s)
cs = col[m]
_stamp_star_squares(frame, xs, ys, cs, s, MAX16)
return frame
def gen_starfield_hdr(cfg: RenderConfig, t: float) -> np.ndarray:
global _STAR_CACHE
if _STAR_CACHE is None:
_init_star_cache()
S = _STAR_CACHE
bg = int(pq_oetf_nits_to_u16(np.array(0.2, dtype=np.float32)))
star_code = int(pq_oetf_nits_to_u16(np.array(200.0, dtype=np.float32)))
frame = np.full((H, W, 3), bg, dtype=np.uint16)
base = np.array([85.0, 190.0, 380.0], dtype=np.float32)
mult = {"slow": 0.8, "medium": 1.0, "fast": 1.35}[cfg.speed]
sp = base[S["grp"]] * mult
x = (S["x0"] + np.cos(S["ang"]) * sp * t) % (W + 20) - 10
y = (S["y0"] + np.sin(S["ang"]) * sp * t) % (H + 20) - 10
xi = x.astype(np.int32)
yi = y.astype(np.int32)
sz = S["size"]
col = S["color"]
vis = (xi >= -6) & (xi < W) & (yi >= -6) & (yi < H)
xi, yi, sz, col = xi[vis], yi[vis], sz[vis], col[vis]
for s in (2, 3, 4, 5):
m = (sz == s)
if not np.any(m):
continue
xs = np.clip(xi[m], 0, W - s)
ys = np.clip(yi[m], 0, H - s)
cs = col[m]
_stamp_star_squares(frame, xs, ys, cs, s, star_code)
return frame
def _stamp_rect_red(frame: np.ndarray, xs: np.ndarray, ys: np.ndarray, w: int, h: int, vals: np.ndarray) -> None:
# RGB layout, red channel index 0
for dy in range(h):
yy = ys + dy
for dx in range(w):
xx = xs + dx
frame[yy, xx, 0] = np.maximum(frame[yy, xx, 0], vals)
def _stamp_hline_red(frame: np.ndarray, xs: np.ndarray, ys: np.ndarray, w: int, vals: np.ndarray, thick: int = 2) -> None:
for dx in range(w):
xx = xs + dx
for dy in range(thick):
yy = np.clip(ys + dy, 0, H - 1)
frame[yy, xx, 0] = np.maximum(frame[yy, xx, 0], vals)
def gen_red_smear_sdr(cfg: RenderConfig, t: float) -> np.ndarray:
frame = np.zeros((H, W, 3), dtype=np.uint16)
mult = {"slow": 0.75, "medium": 1.0, "fast": 1.4}[cfg.speed]
n = 100
i = np.arange(n, dtype=np.int32)
x0 = (i * 181) % W
y0 = (i * 113) % H
sp = (110 + (i % 6) * 90).astype(np.float32) * mult
ang = np.deg2rad((i * 23) % 360).astype(np.float32)
x = ((x0.astype(np.float32) + np.cos(ang) * sp * t) % W).astype(np.int32)
y = ((y0.astype(np.float32) + np.sin(ang) * sp * t) % H).astype(np.int32)
shades = np.array([int(MAX16*0.25), int(MAX16*0.6), MAX16], dtype=np.uint16)
shade = shades[i % 3]
even = (i % 2 == 0)
xs = np.clip(x[even], 0, W - 16)
ys = np.clip(y[even], 0, H - 16)
_stamp_rect_red(frame, xs, ys, 16, 16, shade[even])
odd = ~even
xl = np.clip(x[odd], 0, W - 40)
yl = np.clip(y[odd], 0, H - 2)
_stamp_hline_red(frame, xl, yl, 40, shade[odd], thick=2)
return frame
def gen_red_smear_hdr(cfg: RenderConfig, t: float) -> np.ndarray:
bg = int(pq_oetf_nits_to_u16(np.array(0.2, dtype=np.float32)))
frame = np.full((H, W, 3), bg, dtype=np.uint16)
mult = {"slow": 0.75, "medium": 1.0, "fast": 1.4}[cfg.speed]
n = 100
i = np.arange(n, dtype=np.int32)
x0 = (i * 181) % W
y0 = (i * 113) % H
sp = (110 + (i % 6) * 90).astype(np.float32) * mult
ang = np.deg2rad((i * 23) % 360).astype(np.float32)
x = ((x0.astype(np.float32) + np.cos(ang) * sp * t) % W).astype(np.int32)
y = ((y0.astype(np.float32) + np.sin(ang) * sp * t) % H).astype(np.int32)
nits_levels = np.array([25.0, 80.0, 180.0], dtype=np.float32)
shade = pq_oetf_nits_to_u16(nits_levels)[i % 3]
even = (i % 2 == 0)
xs = np.clip(x[even], 0, W - 16)
ys = np.clip(y[even], 0, H - 16)
_stamp_rect_red(frame, xs, ys, 16, 16, shade[even])
odd = ~even
xl = np.clip(x[odd], 0, W - 40)
yl = np.clip(y[odd], 0, H - 2)
_stamp_hline_red(frame, xl, yl, 40, shade[odd], thick=2)
return frame
def gen_overshoot_blocks(cfg: RenderConfig, t: float, hdr: bool) -> np.ndarray:
if hdr:
bg = int(pq_oetf_nits_to_u16(np.array(50.0, dtype=np.float32)))
frame = np.full((H, W, 3), bg, dtype=np.uint16)
shades = pq_oetf_nits_to_u16(np.array([0.2, 10, 30, 50, 120, 220], dtype=np.float32))
else:
frame = np.full((H, W, 3), int(MAX16 * 0.5), dtype=np.uint16)
shades = np.array([0, int(MAX16*0.15), int(MAX16*0.35), int(MAX16*0.5), int(MAX16*0.75), MAX16], dtype=np.uint16)
mult = {"slow": 0.75, "medium": 1.0, "fast": 1.35}[cfg.speed]
n = 28
for i in range(n):
bw = 160 + (i % 5) * 90
bh = 100 + (i % 4) * 70
y = 80 + (i * 149) % (H - bh - 100)
sp = (95 + (i % 7) * 45) * mult
x0 = (i * 257) % (W + bw)
x = int((x0 + (sp * t if i % 2 == 0 else -sp * t)) % (W + bw)) - bw
v = int(shades[i % len(shades)])
x1, x2 = max(0, x), min(W, x + bw)
y1, y2 = max(0, y), min(H, y + bh)
if x1 < x2 and y1 < y2:
frame[y1:y2, x1:x2, :] = v
return frame
def gen_checker_rows(cfg: RenderConfig, t: float, hdr: bool) -> np.ndarray:
cell = max(40, W // 32)
inner = int(cell * 0.45)
rows = H // cell + 2
cols = W // cell + 6
tile_h, tile_w = rows * cell, cols * cell
yy = (np.arange(tile_h) // cell)[:, None]
xx = (np.arange(tile_w) // cell)[None, :]
checker = ((xx + yy) % 2).astype(np.uint8)
if hdr:
black = int(pq_oetf_nits_to_u16(np.array(0.2, dtype=np.float32)))
white = int(pq_oetf_nits_to_u16(np.array(200.0, dtype=np.float32)))
img = np.where(checker[..., None] == 0, black, white).astype(np.uint16)
else:
img = np.where(checker[..., None] == 0, 0, MAX16).astype(np.uint16)
img = np.repeat(img, 3, axis=2)
for ry in range(rows):
for cx in range(cols):
y0 = ry * cell + (cell - inner) // 2
x0 = cx * cell + (cell - inner) // 2
is_white = checker[ry * cell, cx * cell] == 1
val = black if (hdr and is_white) else white if hdr else (0 if is_white else MAX16)
img[y0:y0+inner, x0:x0+inner] = val
out = np.zeros((H, W, 3), dtype=np.uint16)
sp = speed_px_per_sec(cfg.speed, 220.0)
for r in range(H // cell + 2):
y0 = r * cell
y1 = min(H, y0 + cell)
if y0 >= H:
break
row_strip = img[y0:y1]
offset = int((sp * t) % tile_w)
xstart = (tile_w - offset) % tile_w if (r % 2 == 0) else offset
p1 = row_strip[:, xstart:xstart + W]
row = np.concatenate([p1, row_strip[:, :W - p1.shape[1]]], axis=1) if p1.shape[1] < W else p1
out[y0:y1] = row[:, :W]
return out
def gen_radial_spokes(cfg: RenderConfig, t: float, hdr: bool) -> np.ndarray:
cx, cy = W / 2, H / 2
yy, xx = np.mgrid[0:H, 0:W]
ang = np.arctan2(yy - cy, xx - cx)
rot = {"slow": 0.35, "medium": 0.7, "fast": 1.15}[cfg.speed]
phase = (((ang + rot * t + np.pi) / (2 * np.pi)) * 48).astype(np.int32)
mask = (phase % 2 == 0)
frame = np.zeros((H, W, 3), dtype=np.uint16)
if hdr:
black = int(pq_oetf_nits_to_u16(np.array(0.2, dtype=np.float32)))
white = int(pq_oetf_nits_to_u16(np.array(200.0, dtype=np.float32)))
frame[:] = black
frame[mask] = white
else:
frame[mask] = MAX16
return frame
def gen_local_dimming_dot(cfg: RenderConfig, t: float, hdr: bool) -> np.ndarray:
if hdr:
bg = int(pq_oetf_nits_to_u16(np.array(0.2, dtype=np.float32)))
dot = int(pq_oetf_nits_to_u16(np.array(250.0, dtype=np.float32)))
else:
bg = 0
dot = MAX16
frame = np.full((H, W, 3), bg, dtype=np.uint16)
half = cfg.body_seconds / 2.0
if t < half:
x = int((t / max(half, 1e-6)) * (W - 1))
y = H // 2
else:
x = W // 2
y = int(((t - half) / max(half, 1e-6)) * (H - 1))
r = 10
x0, x1 = max(0, x - r), min(W, x + r)
y0, y1 = max(0, y - r), min(H, y + r)
frame[y0:y1, x0:x1, :] = dot
return frame
def gen_subtitle_bar(cfg: RenderConfig, t: float, hdr: bool) -> np.ndarray:
if hdr:
bg = int(pq_oetf_nits_to_u16(np.array(1.0, dtype=np.float32)))
bar = int(pq_oetf_nits_to_u16(np.array(250.0, dtype=np.float32)))
else:
bg = int(MAX16 * 0.02)
bar = MAX16
frame = np.full((H, W, 3), bg, dtype=np.uint16)
bar_h = int(H * 0.06)
y0 = int(H * 0.86)
y1 = min(H, y0 + bar_h)
sp = speed_px_per_sec(cfg.speed, 40.0)
off = int((sp * t) % (W * 0.2))
x0 = int(W * 0.12 + off)
x1 = min(W, x0 + int(W * 0.76))
frame[y0:y1, x0:x1, :] = bar
return frame
# ==========================
# Scenario dispatcher
# ==========================
def gen_frame(cfg: RenderConfig, t: float) -> np.ndarray:
if cfg.suite == "sdr":
if cfg.scenario == "uniformity_sweep": return gen_uniformity_sweep_sdr(cfg, t)
if cfg.scenario == "green_uniformity_pan": return gen_green_uniformity_pan_sdr(cfg, t)
if cfg.scenario == "vertical_bars": return gen_vertical_bars(cfg, t)
if cfg.scenario == "near_black_steps": return gen_near_black_steps_sdr(cfg, t)
if cfg.scenario == "near_black_ramp_move": return gen_near_black_ramp_move_sdr(cfg, t)
if cfg.scenario == "red_gradient": return gen_red_gradient_sdr(cfg, t)
if cfg.scenario == "starfield": return gen_starfield_sdr(cfg, t)
if cfg.scenario == "red_smear": return gen_red_smear_sdr(cfg, t)
if cfg.scenario == "overshoot_blocks": return gen_overshoot_blocks(cfg, t, hdr=False)
if cfg.scenario == "checker_rows": return gen_checker_rows(cfg, t, hdr=False)
if cfg.scenario == "radial_spokes": return gen_radial_spokes(cfg, t, hdr=False)
if cfg.scenario == "local_dimming_dot": return gen_local_dimming_dot(cfg, t, hdr=False)
if cfg.scenario == "subtitle_bar": return gen_subtitle_bar(cfg, t, hdr=False)
raise ValueError("Unknown SDR scenario")
if cfg.scenario == "uniformity_sweep_hdr": return gen_uniformity_sweep_hdr(cfg, t)
if cfg.scenario == "green_uniformity_pan_hdr": return gen_green_uniformity_pan_hdr(cfg, t)
if cfg.scenario == "vertical_bars_hdr": return gen_vertical_bars(cfg, t)
if cfg.scenario == "near_black_steps_hdr": return gen_near_black_steps_hdr(cfg, t)
if cfg.scenario == "near_black_ramp_move_hdr": return gen_near_black_ramp_move_hdr(cfg, t)
if cfg.scenario == "red_gradient_hdr": return gen_red_gradient_hdr(cfg, t)
if cfg.scenario == "starfield_hdr": return gen_starfield_hdr(cfg, t)
if cfg.scenario == "red_smear_hdr": return gen_red_smear_hdr(cfg, t)
if cfg.scenario == "overshoot_blocks_hdr": return gen_overshoot_blocks(cfg, t, hdr=True)
if cfg.scenario == "checker_rows_hdr": return gen_checker_rows(cfg, t, hdr=True)
if cfg.scenario == "radial_spokes_hdr": return gen_radial_spokes(cfg, t, hdr=True)
if cfg.scenario == "local_dimming_dot_hdr": return gen_local_dimming_dot(cfg, t, hdr=True)
if cfg.scenario == "subtitle_bar_hdr": return gen_subtitle_bar(cfg, t, hdr=True)
raise ValueError("Unknown HDR scenario")
# ==========================
# FFmpeg command building
# ==========================
def color_metadata_args(cfg: RenderConfig) -> List[str]:
cr = "tv" if cfg.color_range == "tv" else "pc"
if cfg.suite == "sdr":
return ["-color_range", cr, "-colorspace", "bt709", "-color_primaries", "bt709", "-color_trc", "bt709"]
return ["-color_range", cr, "-colorspace", "bt2020nc", "-color_primaries", "bt2020", "-color_trc", "smpte2084"]
def rgb_to_yuv_filter(cfg: RenderConfig, out_pix_fmt: str) -> str:
"""
Critical correctness:
- Explicit matrix/primaries/transfer for RGB->YUV conversion
- Explicit range mapping: rangein=full, range=(limited|full)
"""
range_out = "limited" if cfg.color_range == "tv" else "full"
if has_filter("zscale"):
if cfg.suite == "sdr":
return (
"zscale="
"matrixin=bt709:transferin=bt709:primariesin=bt709:"
f"rangein=full:matrix=bt709:transfer=bt709:primaries=bt709:range={range_out},"
f"format={out_pix_fmt}"
)
return (
"zscale="
"matrixin=bt2020nc:transferin=smpte2084:primariesin=bt2020:"
f"rangein=full:matrix=bt2020nc:transfer=smpte2084:primaries=bt2020:range={range_out},"
f"format={out_pix_fmt}"
)
# Fallback (less ideal than zscale, but still explicit about matrix + range)
out_range = "tv" if cfg.color_range == "tv" else "pc"
if cfg.suite == "sdr":
return (
f"scale=in_color_matrix=bt709:out_color_matrix=bt709:"
f"in_range=pc:out_range={out_range},format={out_pix_fmt}"
)
return (
f"scale=in_color_matrix=bt2020:out_color_matrix=bt2020:"
f"in_range=pc:out_range={out_range},format={out_pix_fmt}"
)
def hdr10_metadata_bsf() -> str:
# FFmpeg hevc_metadata BSF uses "master_display" (not x265's "master-display").
# Use raw strings so Python keeps backslashes literally (no SyntaxWarning on \,, Python 3.12+).
return (
r"hevc_metadata="
r"master_display=G(8500\,39850)B(6550\,2300)R(34000\,16000)WP(15635\,16450)L(10000000\,1):"
r"max_cll=1000\,400"
)
def _build_amf_cmd(base: List[str], vf: str, meta: List[str], qp_triplet: tuple, out: str) -> List[str]:
_qpi, qpp, _qpb = qp_triplet
# AMF CLI options vary across ffmpeg builds. Using per-frame-type flags like -qp_p / -qp_b
# is not portable and can fail with "Unrecognized option".
# Use a single global quantizer for broad compatibility.
return base + ["-vf", vf] + meta + [
"-c:v", "hevc_amf",
"-quality", "quality",
"-rc", "cqp",
"-qp", qpp,
"-tag:v", "hvc1",
out
]
def _build_nvenc_cmd(base: List[str], vf: str, meta: List[str], qp: str, high: bool, out: str) -> List[str]:
return base + ["-vf", vf] + meta + [
"-c:v", "hevc_nvenc",
"-preset", "p7" if high else "p5",
"-tune", "hq",
"-rc", "constqp",
"-qp", qp,
"-tag:v", "hvc1",
out
]
def _build_qsv_cmd(base: List[str], vf: str, meta: List[str], qp: str, out: str) -> List[str]:
# Explicit quality-oriented config to avoid falling back to poor defaults
cmd = base + ["-vf", vf] + meta + [
"-c:v", "hevc_qsv",
"-look_ahead", "0",
"-global_quality", qp,
"-preset", "slow",
"-tag:v", "hvc1",
out
]
return cmd
def _build_vaapi_cmd(base: List[str], vf: str, meta: List[str], qp: str, out: str) -> List[str]:
# Minimal VAAPI path; may need environment/device setup on Linux
return base + ["-vf", vf] + meta + [
"-c:v", "hevc_vaapi",
"-qp", qp,
"-tag:v", "hvc1",
out
]
def ffmpeg_cmd(cfg: RenderConfig, out_root: str) -> (List[str], str):
base = [
"ffmpeg", "-y",
"-f", "rawvideo",
"-pix_fmt", "rgb48le",
"-s", f"{W}x{H}",
"-r", str(cfg.fps),
"-i", "-",
"-an"
]
meta = color_metadata_args(cfg)
if cfg.codec == "ffv1":
out = out_root + ".mkv"
cmd = base + meta + ["-c:v", "ffv1", "-level", "3", "-g", "1", out]
return cmd, out
if cfg.codec == "prores4444":
out = out_root + ".mov"
vf = rgb_to_yuv_filter(cfg, "yuv444p10le")
cmd = base + ["-vf", vf] + meta + [
"-c:v", "prores_ks", "-profile:v", "4", "-vendor", "apl0", "-bits_per_mb", "8000", out
]
return cmd, out
# HEVC paths
hw = detect_hevc_hw_encoder()
want_hw = (cfg.codec in ("hevc_auto", "hevc_hw"))
use_hw = want_hw and (hw is not None)
if cfg.codec == "hevc_hw" and hw is None:
raise RuntimeError("hevc_hw selected but no HEVC hardware encoder found.")
out_pix = "yuv444p10le" if cfg.suite == "sdr" else "yuv420p10le"
vf = rgb_to_yuv_filter(cfg, out_pix)
if cfg.hevc_quality == "high":
crf = "10"
qpi_qpp_qpb = ("16", "18", "20")
qp = "18"
else:
crf = "14"
qpi_qpp_qpb = ("22", "24", "26")
qp = "24"
# HDR10 metadata requirements
need_hdr = (cfg.suite == "hdr10")
can_inject_hdr_bsf = has_bsf("hevc_metadata") and bsf_supports_hdr10_metadata()
if use_hw:
out = out_root + f"_{hw}.mp4"
# If HDR and no hevc_metadata bsf, force x265 fallback for correctness
if need_hdr and not can_inject_hdr_bsf:
use_hw = False
else:
high = (cfg.hevc_quality == "high")
if hw == "hevc_amf":
cmd = _build_amf_cmd(base, vf, meta, qpi_qpp_qpb, out)
elif hw == "hevc_nvenc":
cmd = _build_nvenc_cmd(base, vf, meta, qp, high, out)
elif hw == "hevc_qsv":
cmd = _build_qsv_cmd(base, vf, meta, qp, out)
else:
cmd = _build_vaapi_cmd(base, vf, meta, qp, out)
if need_hdr:
# inject HDR10 mastering + MaxCLL/FALL metadata
cmd = cmd[:-1] + ["-bsf:v", hdr10_metadata_bsf(), cmd[-1]]
return cmd, out
# x265 fallback / explicit x265
out = out_root + ".mp4"
cmd = base + ["-vf", vf] + meta + ["-c:v", "libx265", "-preset", "slow", "-tag:v", "hvc1"]
if cfg.suite == "sdr":
cmd += ["-crf", crf, out]
return cmd, out
x265_params = [
"hdr10=1",
"repeat-headers=1",
"master-display=G(8500,39850)B(6550,2300)R(34000,16000)WP(15635,16450)L(10000000,1)",
"max-cll=1000,400",
f"crf={crf}",
]
cmd += ["-x265-params", ":".join(x265_params), out]
return cmd, out
# ==========================
# Frame assembly
# ==========================
def build_body_frame(cfg: RenderConfig, t: float) -> np.ndarray:
frame = gen_frame(cfg, t)
if (not cfg.no_text_intro) and (t < cfg.text_intro_seconds):
info = scenario_info(cfg.suite, cfg.scenario)
alpha = 0.80
remain = cfg.text_intro_seconds - t
if remain < 0.6:
alpha *= max(0.0, remain / 0.6)
draw_text_panel(frame, info["title"], info["goal"], info["watch"], alpha)
if not cfg.no_timer:
add_timer(frame, t, cfg.body_seconds, cfg.timer_color)
return frame
# ==========================
# Rendering engine
# ==========================
def write_raw_rgb48(proc: subprocess.Popen, frame_rgb16: np.ndarray) -> None:
global CANCEL_REQUESTED
try:
proc.stdin.write(frame_rgb16.tobytes())
except (BrokenPipeError, OSError) as e:
# Common during abort: FFmpeg pipe is already closed while UI abort is in-flight.
if CANCEL_REQUESTED:
raise KeyboardInterrupt("Render aborted by user") from e
raise
def render_segment(cfg: RenderConfig, segment: str, progress_cb=None) -> str:
global CURRENT_FFMPEG_PROC, CANCEL_REQUESTED
validate_cfg(cfg)
ensure_ffmpeg()
os.makedirs(cfg.out_dir, exist_ok=True)
base = cfg.basename or f"{cfg.suite}_{cfg.scenario}_{cfg.fps}fps_{segment}_{cfg.codec}"
out_root = str(Path(cfg.out_dir) / base)
cmd, out_path = ffmpeg_cmd(cfg, out_root)
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
CURRENT_FFMPEG_PROC = proc
t0 = time.time()
def _check_abort():
if CANCEL_REQUESTED:
try:
if proc.poll() is None:
proc.terminate()
except Exception:
pass
raise KeyboardInterrupt("Render aborted by user")
try:
if segment == "intro":
total = cfg.intro_frames
for i in range(total):
_check_abort()
write_raw_rgb48(proc, apply_crt(cfg, i, total, "intro"))
if progress_cb:
progress_cb(i + 1, total, segment)
elif segment == "outro":
total = cfg.outro_frames
for i in range(total):
_check_abort()
write_raw_rgb48(proc, apply_crt(cfg, i, total, "outro"))
if progress_cb:
progress_cb(i + 1, total, segment)
elif segment == "body":
total = cfg.body_frames
for i in range(total):
_check_abort()
write_raw_rgb48(proc, build_body_frame(cfg, i / cfg.fps))
if progress_cb:
progress_cb(i + 1, total, segment)
elif segment == "all":
total = cfg.intro_frames + cfg.body_frames + cfg.outro_frames
n = 0
for i in range(cfg.intro_frames):
_check_abort()
write_raw_rgb48(proc, apply_crt(cfg, i, cfg.intro_frames, "intro"))
n += 1
if progress_cb:
progress_cb(n, total, segment)
for i in range(cfg.body_frames):
_check_abort()
write_raw_rgb48(proc, build_body_frame(cfg, i / cfg.fps))
n += 1
if progress_cb:
progress_cb(n, total, segment)
for i in range(cfg.outro_frames):
_check_abort()
write_raw_rgb48(proc, apply_crt(cfg, i, cfg.outro_frames, "outro"))
n += 1
if progress_cb:
progress_cb(n, total, segment)
else:
raise ValueError(f"Unknown segment: {segment}")
except KeyboardInterrupt:
if progress_cb:
progress_cb(-3, -3, "aborted")
raise
finally:
try:
if proc.stdin:
proc.stdin.close()
except Exception:
pass
try:
proc.wait(timeout=10)
except Exception:
try:
proc.kill()
except Exception:
pass
CURRENT_FFMPEG_PROC = None
if proc.returncode != 0:
raise RuntimeError(f"FFmpeg failed with code {proc.returncode}")
if progress_cb:
progress_cb(-1, -1, f"done:{time.time() - t0:.1f}s")
return out_path
def render_requested(cfg: RenderConfig, progress_cb=None) -> List[str]:
if cfg.segment == "separate":
outs = []
for seg in ("intro", "body", "outro"):
c2 = RenderConfig(**{**cfg.__dict__, "segment": seg, "basename": f"{cfg.suite}_{cfg.scenario}_{cfg.fps}fps_{seg}_{cfg.codec}"})
outs.append(render_segment(c2, seg, progress_cb))
return outs
return [render_segment(cfg, cfg.segment, progress_cb)]
def render_all_scenarios(cfg: RenderConfig, progress_cb=None) -> List[str]:
outs: List[str] = []
names = scenario_names_for_suite(cfg.suite)
for idx, scen in enumerate(names, start=1):
if progress_cb:
progress_cb(-2, len(names), f"scenario:{idx}/{len(names)}:{scen}")
c2 = RenderConfig(**{**cfg.__dict__, "scenario": scen, "basename": ""})
outs.extend(render_requested(c2, progress_cb))
return outs
# ==========================
# CLI / GUI
# ==========================
def parse_args():
p = argparse.ArgumentParser(description="Panel Test Orchestrator — Final V4.1 (4K UHD)")
p.add_argument("--cli", action="store_true", help="Force CLI mode")
p.add_argument("--suite", default="sdr", choices=["sdr", "hdr10"])
p.add_argument("--scenario", default=None)
p.add_argument("--all-scenarios", action="store_true")
p.add_argument("--segment", default="all", choices=["intro", "body", "outro", "all", "separate"])
p.add_argument("--fps", type=int, default=120, choices=[60, 120])
p.add_argument("--seconds", type=float, default=30.0)
p.add_argument("--text-intro-seconds", type=float, default=5.0)
p.add_argument("--codec", default="ffv1", choices=["ffv1", "prores4444", "hevc_x265", "hevc_auto", "hevc_hw"])
p.add_argument("--hevc-quality", default="high", choices=["high", "medium"])
p.add_argument("--speed", default="medium", choices=["slow", "medium", "fast"])
p.add_argument("--timer-color", default="white", choices=["white", "red"])
p.add_argument("--no-timer", action="store_true")
p.add_argument("--no-crt", action="store_true")
p.add_argument("--no-text-intro", action="store_true")
p.add_argument("--color-range", default="tv", choices=["tv", "pc"])
p.add_argument("--out", default="renders_final_v3_1")
p.add_argument("--name", default="")
return p.parse_args()
def launch_gui():
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
global CANCEL_REQUESTED, CURRENT_FFMPEG_PROC
ensure_ffmpeg()
hw = detect_hevc_hw_encoder() or "none"
cpu_threads = os.cpu_count() or 1
zscale_ok = has_filter("zscale")
bsf_ok = has_bsf("hevc_metadata")
hdr_bsf_ok = bsf_ok and bsf_supports_hdr10_metadata()
root = tk.Tk()
root.title("Panel Test Orchestrator — Final V4.1 (4K UHD)")
root.geometry("1120x940")
vars_ = {
"suite": tk.StringVar(value="sdr"),
"scenario": tk.StringVar(value=scenario_names_for_suite("sdr")[0]),
"all_scenarios": tk.BooleanVar(value=True),
"segment": tk.StringVar(value="all"),
"fps": tk.StringVar(value="120"),
"seconds": tk.StringVar(value="30"),
"text_intro_seconds": tk.StringVar(value="5"),
"codec": tk.StringVar(value="hevc_auto"),
"hevc_quality": tk.StringVar(value="high"),
"speed": tk.StringVar(value="medium"),
"timer_color": tk.StringVar(value="white"),
"no_timer": tk.BooleanVar(value=False),
"no_crt": tk.BooleanVar(value=False),
"no_text_intro": tk.BooleanVar(value=False),
"color_range": tk.StringVar(value="tv"),
"out": tk.StringVar(value=str(Path.cwd() / "renders_final_v4_1")),
"name": tk.StringVar(value=""),
"perf_profile": tk.StringVar(value="high"),
}
work_q = queue.Queue()
worker = {"thread": None}
ui_state = {"running": False}
def predicted_hw_status():
codec = vars_["codec"].get()
suite = vars_["suite"].get()
if codec in ("ffv1", "prores4444", "hevc_x265"):
if codec == "hevc_x265":
return "Encoding path: CPU (libx265)"
return "Encoding path: CPU (non-HEVC codec)"
if hw == "none":
return "Encoding path: No HEVC HW encoder detected -> CPU x265 fallback"
if suite == "hdr10" and not hdr_bsf_ok:
return f"Encoding path: {hw} detected, but HDR BSF opts missing -> CPU x265 fallback"
return f"Encoding path: HW HEVC ({hw})"
def update_hw_status(*_):
hw_status_var.set(predicted_hw_status())
def suggest_profile_name() -> str:
if hw != "none" and cpu_threads >= 16:
return "high"
if hw != "none" and cpu_threads >= 8:
return "medium"
return "low"
def apply_profile(name: str):
if name == "low":
vars_["fps"].set("60")
vars_["codec"].set("hevc_auto" if hw != "none" else "hevc_x265")
vars_["hevc_quality"].set("medium")
vars_["speed"].set("medium")
vars_["text_intro_seconds"].set("3")
elif name == "medium":
vars_["fps"].set("60")
vars_["codec"].set("hevc_auto" if hw != "none" else "hevc_x265")
vars_["hevc_quality"].set("high")
vars_["speed"].set("medium")
vars_["text_intro_seconds"].set("5")
elif name == "high":
vars_["fps"].set("120" if cpu_threads >= 8 else "60")
vars_["codec"].set("hevc_auto" if hw != "none" else "hevc_x265")
vars_["hevc_quality"].set("high")
vars_["speed"].set("medium")
vars_["text_intro_seconds"].set("5")
elif name == "extreme":
vars_["fps"].set("120")
vars_["codec"].set("ffv1" if cpu_threads >= 12 else ("hevc_auto" if hw != "none" else "hevc_x265"))
vars_["hevc_quality"].set("high")
vars_["speed"].set("medium")
vars_["text_intro_seconds"].set("5")
update_hw_status()
def update_scenarios(*_):
suite = vars_["suite"].get()
opts = scenario_names_for_suite(suite)
scenario_combo["values"] = opts
if vars_["scenario"].get() not in opts:
vars_["scenario"].set(opts[0])
update_hw_status()
pad = {"padx": 10, "pady": 5}
ttk.Label(root, text="Panel Test Orchestrator — Final V4.1", font=("Segoe UI", 15, "bold")).grid(row=0, column=0, columnspan=4, sticky="w", padx=10, pady=10)
ttk.Label(
root,
text=f"Fixed 4K UHD 3840x2160 • CPU threads: {cpu_threads} • HEVC HW: {hw} • zscale: {'yes' if zscale_ok else 'no'} • hevc_metadata HDR opts: {'yes' if hdr_bsf_ok else 'no'}",
foreground="#555"
).grid(row=1, column=0, columnspan=4, sticky="w", padx=10)
hw_status_var = tk.StringVar(value="")
ttk.Label(root, textvariable=hw_status_var, foreground="#003366").grid(row=2, column=0, columnspan=4, sticky="w", padx=10)
ttk.Label(root, text="Suite").grid(row=3, column=0, sticky="w", **pad)
suite_combo = ttk.Combobox(root, textvariable=vars_["suite"], values=["sdr", "hdr10"], state="readonly", width=16)
suite_combo.grid(row=3, column=1, sticky="w", **pad)
suite_combo.bind("<<ComboboxSelected>>", update_scenarios)
ttk.Label(root, text="Scenario").grid(row=3, column=2, sticky="w", **pad)
scenario_combo = ttk.Combobox(root, textvariable=vars_["scenario"], values=scenario_names_for_suite("sdr"), state="readonly", width=42)
scenario_combo.grid(row=3, column=3, sticky="w", **pad)
ttk.Checkbutton(root, text="Render ALL scenarios in suite", variable=vars_["all_scenarios"]).grid(row=4, column=0, columnspan=2, sticky="w", **pad)
ttk.Label(root, text="Export mode").grid(row=5, column=0, sticky="w", **pad)
segment_combo = ttk.Combobox(root, textvariable=vars_["segment"], values=["intro", "body", "outro", "all", "separate"], state="readonly", width=16)
segment_combo.grid(row=5, column=1, sticky="w", **pad)
ttk.Label(root, text="Frame rate").grid(row=5, column=2, sticky="w", **pad)
fps_combo = ttk.Combobox(root, textvariable=vars_["fps"], values=["60", "120"], state="readonly", width=10)
fps_combo.grid(row=5, column=3, sticky="w", **pad)
ttk.Label(root, text="Body seconds").grid(row=6, column=0, sticky="w", **pad)
seconds_entry = ttk.Entry(root, textvariable=vars_["seconds"], width=12)
seconds_entry.grid(row=6, column=1, sticky="w", **pad)
ttk.Label(root, text="Text intro seconds").grid(row=6, column=2, sticky="w", **pad)
text_intro_entry = ttk.Entry(root, textvariable=vars_["text_intro_seconds"], width=12)
text_intro_entry.grid(row=6, column=3, sticky="w", **pad)
ttk.Label(root, text="Codec").grid(row=7, column=0, sticky="w", **pad)
codec_combo = ttk.Combobox(root, textvariable=vars_["codec"], values=["ffv1", "prores4444", "hevc_x265", "hevc_auto", "hevc_hw"], state="readonly", width=16)
codec_combo.grid(row=7, column=1, sticky="w", **pad)
codec_combo.bind("<<ComboboxSelected>>", update_hw_status)
ttk.Label(root, text="HEVC quality").grid(row=7, column=2, sticky="w", **pad)
hevcq_combo = ttk.Combobox(root, textvariable=vars_["hevc_quality"], values=["high", "medium"], state="readonly", width=10)
hevcq_combo.grid(row=7, column=3, sticky="w", **pad)
ttk.Label(root, text="Pattern speed").grid(row=8, column=0, sticky="w", **pad)
speed_combo = ttk.Combobox(root, textvariable=vars_["speed"], values=["slow", "medium", "fast"], state="readonly", width=12)
speed_combo.grid(row=8, column=1, sticky="w", **pad)
ttk.Label(root, text="Timer color").grid(row=8, column=2, sticky="w", **pad)
timer_combo = ttk.Combobox(root, textvariable=vars_["timer_color"], values=["white", "red"], state="readonly", width=10)
timer_combo.grid(row=8, column=3, sticky="w", **pad)
ttk.Label(root, text="Color range").grid(row=9, column=0, sticky="w", **pad)
colorrange_combo = ttk.Combobox(root, textvariable=vars_["color_range"], values=["tv", "pc"], state="readonly", width=10)
colorrange_combo.grid(row=9, column=1, sticky="w", **pad)
no_timer_chk = ttk.Checkbutton(root, text="Disable timer", variable=vars_["no_timer"])
no_timer_chk.grid(row=10, column=0, sticky="w", **pad)
no_crt_chk = ttk.Checkbutton(root, text="Disable CRT intro/outro", variable=vars_["no_crt"])
no_crt_chk.grid(row=10, column=1, sticky="w", **pad)
no_text_chk = ttk.Checkbutton(root, text="Disable text intro panel", variable=vars_["no_text_intro"])
no_text_chk.grid(row=10, column=2, sticky="w", **pad)
ttk.Label(root, text="Output folder").grid(row=11, column=0, sticky="w", **pad)
out_entry = ttk.Entry(root, textvariable=vars_["out"], width=68)
out_entry.grid(row=11, column=1, columnspan=2, sticky="w", **pad)
def browse_out():
d = filedialog.askdirectory(initialdir=vars_["out"].get() or str(Path.cwd()))
if d:
vars_["out"].set(d)
browse_btn = ttk.Button(root, text="Browse...", command=browse_out)
browse_btn.grid(row=11, column=3, sticky="w", **pad)
ttk.Label(root, text="Base filename (optional)").grid(row=12, column=0, sticky="w", **pad)
name_entry = ttk.Entry(root, textvariable=vars_["name"], width=32)
name_entry.grid(row=12, column=1, sticky="w", **pad)
ttk.Label(root, text="Performance profile").grid(row=12, column=2, sticky="w", **pad)
perf_combo = ttk.Combobox(root, textvariable=vars_["perf_profile"], values=["low", "medium", "high", "extreme"], state="readonly", width=12)
perf_combo.grid(row=12, column=3, sticky="w", **pad)
prof_row = ttk.Frame(root)
prof_row.grid(row=13, column=0, columnspan=4, sticky="w", padx=10, pady=4)
apply_profile_btn = ttk.Button(prof_row, text="Apply Profile", command=lambda: apply_profile(vars_["perf_profile"].get()))
apply_profile_btn.grid(row=0, column=0, padx=4)
auto_profile_btn = ttk.Button(prof_row, text="Auto Recommend", command=lambda: (vars_["perf_profile"].set(suggest_profile_name()), apply_profile(vars_["perf_profile"].get())))
auto_profile_btn.grid(row=0, column=1, padx=4)
pf = ttk.LabelFrame(root, text="Presets")
pf.grid(row=14, column=0, columnspan=4, sticky="ew", padx=10, pady=8)
def preset_master():
vars_["suite"].set("sdr"); update_scenarios()
vars_["codec"].set("ffv1"); update_hw_status()
vars_["fps"].set("120")
vars_["segment"].set("all")
vars_["seconds"].set("30")
vars_["all_scenarios"].set(True)
def preset_sdr_hevc():
vars_["suite"].set("sdr"); update_scenarios()
vars_["codec"].set("hevc_auto"); update_hw_status()
vars_["fps"].set("60")
vars_["segment"].set("all")
vars_["seconds"].set("30")
vars_["all_scenarios"].set(True)
def preset_hdr_hevc():
vars_["suite"].set("hdr10"); update_scenarios()
vars_["codec"].set("hevc_auto"); update_hw_status()
vars_["fps"].set("60")
vars_["segment"].set("all")
vars_["seconds"].set("30")
vars_["all_scenarios"].set(True)
ttk.Button(pf, text="Master (SDR FFV1 120fps)", command=preset_master).grid(row=0, column=0, padx=8, pady=8)
ttk.Button(pf, text="SDR HEVC Delivery (Auto HW)", command=preset_sdr_hevc).grid(row=0, column=1, padx=8, pady=8)
ttk.Button(pf, text="HDR10 HEVC Delivery (Auto HW)", command=preset_hdr_hevc).grid(row=0, column=2, padx=8, pady=8)
ttk.Label(root, text="Progress").grid(row=15, column=0, sticky="w", padx=10)
prog = ttk.Progressbar(root, orient="horizontal", mode="determinate", length=800)
prog.grid(row=15, column=1, columnspan=3, sticky="w", padx=10)
status = tk.StringVar(value="Ready")
ttk.Label(root, textvariable=status).grid(row=16, column=0, columnspan=4, sticky="w", padx=10)
log = tk.Text(root, height=13, width=132)
log.grid(row=17, column=0, columnspan=4, padx=10, pady=8)
log.insert("end",
"Notes:\n"
"- GUI rendering runs in a background thread, so the window stays responsive.\n"
"- Abort stops the current FFmpeg process and cancels the batch.\n"
"- HEVC status line above predicts whether HW or CPU fallback will be used.\n")
log.config(state="disabled")
def append_log(msg: str):
log.config(state="normal")
log.insert("end", msg + "\n")
log.see("end")
log.config(state="disabled")
interactive_widgets = [
suite_combo, scenario_combo, segment_combo, fps_combo, seconds_entry, text_intro_entry,
codec_combo, hevcq_combo, speed_combo, timer_combo, colorrange_combo, out_entry, browse_btn,
name_entry, perf_combo, apply_profile_btn, auto_profile_btn
]
def set_controls_enabled(enabled: bool):
for w in interactive_widgets:
try:
if enabled:
w.state(["!disabled"])
else:
w.state(["disabled"])
except Exception:
try:
w.configure(state=("normal" if enabled else "disabled"))
except Exception:
pass
for chk in (no_timer_chk, no_crt_chk, no_text_chk):
try:
chk.configure(state=("normal" if enabled else "disabled"))
except Exception:
pass
try:
if enabled:
render_btn.state(["!disabled"])
abort_btn.state(["disabled"])
else:
render_btn.state(["disabled"])
abort_btn.state(["!disabled"])
except Exception:
pass
def ui_progress_cb(done, total, label):
work_q.put(("progress", done, total, label))
def collect_cfg():
return RenderConfig(
fps=int(vars_["fps"].get()),
body_seconds=float(vars_["seconds"].get()),
text_intro_seconds=float(vars_["text_intro_seconds"].get()),
suite=vars_["suite"].get(),
scenario=vars_["scenario"].get(),
all_scenarios=vars_["all_scenarios"].get(),
segment=vars_["segment"].get(),
out_dir=vars_["out"].get(),
basename=vars_["name"].get().strip(),
codec=vars_["codec"].get(),
hevc_quality=vars_["hevc_quality"].get(),
speed=vars_["speed"].get(),
timer_color=vars_["timer_color"].get(),
no_timer=vars_["no_timer"].get(),
no_crt=vars_["no_crt"].get(),
no_text_intro=vars_["no_text_intro"].get(),
color_range=vars_["color_range"].get(),
)
def worker_run():
global CANCEL_REQUESTED
try:
cfg = collect_cfg()
validate_cfg(cfg)
work_q.put(("log", f"Start render: suite={cfg.suite}, codec={cfg.codec}, fps={cfg.fps}"))
t0 = time.time()
if cfg.all_scenarios:
outs = render_all_scenarios(cfg, ui_progress_cb)
else:
outs = render_requested(cfg, ui_progress_cb)
dt = time.time() - t0
work_q.put(("done", outs, dt))
except KeyboardInterrupt:
work_q.put(("aborted",))
except Exception as e:
if CANCEL_REQUESTED or getattr(e, "errno", None) in (22, 32):
work_q.put(("aborted",))
else:
work_q.put(("error", str(e)))
finally:
CANCEL_REQUESTED = False
def run_render():
global CANCEL_REQUESTED
if ui_state["running"]:
return
try:
cfg = collect_cfg()
validate_cfg(cfg)
except Exception as e:
messagebox.showerror("Error", str(e))
return
CANCEL_REQUESTED = False
ui_state["running"] = True
prog["value"] = 0
status.set("Starting...")
append_log(predicted_hw_status())
set_controls_enabled(False)
worker["thread"] = threading.Thread(target=worker_run, daemon=True)
worker["thread"].start()
def abort_render():
global CANCEL_REQUESTED, CURRENT_FFMPEG_PROC
if not ui_state["running"]:
return
CANCEL_REQUESTED = True
status.set("Abort requested...")
append_log("Abort requested by user.")
try:
if CURRENT_FFMPEG_PROC is not None and CURRENT_FFMPEG_PROC.poll() is None:
CURRENT_FFMPEG_PROC.terminate()
except Exception:
pass
def pump_queue():
try:
while True:
msg = work_q.get_nowait()
kind = msg[0]
if kind == "progress":
_, done, total, label = msg
if isinstance(label, str) and label.startswith("scenario:"):
status.set(label.replace("scenario:", "Scenario "))
append_log(status.get())
elif label == "aborted":
status.set("Aborted")
append_log("Render aborted.")
elif isinstance(label, str) and label.startswith("done:"):
status.set(f"Done in {label.split(':',1)[1]}")
prog["value"] = 100
elif total > 0:
prog["maximum"] = total
prog["value"] = done
status.set(f"Rendering {label}: {done}/{total} frames")
elif kind == "log":
append_log(msg[1])
elif kind == "done":
_, outs, dt = msg
ui_state["running"] = False
set_controls_enabled(True)
append_log(f"Finished in {dt:.1f}s")
for o in outs:
append_log(f" {o}")
status.set("Ready")
messagebox.showinfo("Done", "Rendered:\n" + "\n".join(outs))
elif kind == "aborted":
ui_state["running"] = False
set_controls_enabled(True)
status.set("Aborted")
append_log("Render aborted.")
elif kind == "error":
ui_state["running"] = False
set_controls_enabled(True)
status.set("Error")
append_log(f"ERROR: {msg[1]}")
messagebox.showerror("Error", msg[1])
except queue.Empty:
pass
root.after(100, pump_queue)
btn_frame = ttk.Frame(root)
btn_frame.grid(row=18, column=0, columnspan=4, sticky="e", padx=10, pady=8)
render_btn = ttk.Button(btn_frame, text="Render", command=run_render)
render_btn.grid(row=0, column=0, padx=6)
abort_btn = ttk.Button(btn_frame, text="Abort", command=abort_render)
abort_btn.grid(row=0, column=1, padx=6)
abort_btn.state(["disabled"])
vars_["perf_profile"].set(suggest_profile_name())
update_scenarios()
apply_profile(vars_["perf_profile"].get())
set_controls_enabled(True)
pump_queue()
root.mainloop()
def main():
args = parse_args()
if len(sys.argv) == 1 and not args.cli:
launch_gui()
return
ensure_ffmpeg()
suite = args.suite
scenario = args.scenario or scenario_names_for_suite(suite)[0]
cfg = RenderConfig(
fps=args.fps,
body_seconds=args.seconds,
text_intro_seconds=args.text_intro_seconds,
suite=suite,
scenario=scenario,
all_scenarios=args.all_scenarios,
segment=args.segment,
out_dir=args.out,
basename=args.name,
codec=args.codec,
hevc_quality=args.hevc_quality,
speed=args.speed,
timer_color=args.timer_color,
no_timer=args.no_timer,
no_crt=args.no_crt,
no_text_intro=args.no_text_intro,
color_range=args.color_range,
)
validate_cfg(cfg)
print("Detected HEVC HW encoder:", detect_hevc_hw_encoder() or "none")
print("zscale filter:", "yes" if has_filter("zscale") else "no")
print("hevc_metadata HDR opts:", "yes" if (has_bsf("hevc_metadata") and bsf_supports_hdr10_metadata()) else "no")
def cli_progress(done, total, label):
if isinstance(label, str) and label.startswith("scenario:"):
print(label.replace("scenario:", ""))
return
if isinstance(label, str) and label.startswith("done:"):
print(" done in", label.split(":", 1)[1])
return
if total > 0 and done % max(1, total // 20) == 0:
print(f" {label}: {done}/{total}")
outs = render_all_scenarios(cfg, cli_progress) if cfg.all_scenarios else render_requested(cfg, cli_progress)
print("\nRendered files:")
for p in outs:
print(" -", p)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment