Last active
February 24, 2026 17:08
-
-
Save simkin/0bd09071df08738b0755cce34e6799b8 to your computer and use it in GitHub Desktop.
Generates test videos covering DSE, vertical banding, black/red smearing, ghosting/overshoot, local dimming behavior, etc.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Panel Test Orchestrator — Final V4.1 | |
| Goal: DSE, vertical banding, black/red smearing, ghosting/overshoot, local dimming behavior, etc. | |
| Key properties | |
| - 4K UHD fixed: 3840x2160 | |
| - 60 or 120 fps | |
| - 30s default (configurable) | |
| - 5s readable intro panel (English) + optional timer | |
| - SDR suite (Rec.709) and HDR10 suite (BT.2020 + PQ) with paired scenarios | |
| - Correct RGB->YUV conversion matrix/transfer/primaries + explicit range mapping | |
| - Correct PQ (SMPTE ST 2084) math for HDR code values | |
| - Auto HEVC HW encoder detect (AMF/NVENC/QSV/VAAPI) | |
| - HDR10 metadata injected for HW encoders via hevc_metadata bitstream filter if available | |
| - Falls back to x265 when HDR metadata injection is unavailable | |
| - RGB16 pipeline + vectorized starfield/red smear patterns | |
| """ | |
| import argparse | |
| import math | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import time | |
| import textwrap | |
| import threading | |
| import queue | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import List, Optional, Dict | |
| import numpy as np | |
| import cv2 | |
| # ========================== | |
| # Fixed output geometry | |
| # ========================== | |
| W, H = 3840, 2160 | |
| MAX16 = 65535 | |
| INTRO_SECONDS = 0.5 | |
| OUTRO_SECONDS = 0.5 | |
| DEFAULT_BODY_SECONDS = 30.0 | |
| DEFAULT_TEXT_INTRO_SECONDS = 5.0 | |
| CURRENT_FFMPEG_PROC = None | |
| CANCEL_REQUESTED = False | |
| # ========================== | |
| # Color / transfer helpers | |
| # ========================== | |
| # SMPTE ST 2084 (PQ) OETF constants | |
| _PQ_M1 = 2610.0 / 16384.0 | |
| _PQ_M2 = 2523.0 / 32.0 | |
| _PQ_C1 = 3424.0 / 4096.0 | |
| _PQ_C2 = 2413.0 / 128.0 | |
| _PQ_C3 = 2392.0 / 128.0 | |
| _PQ_LMAX = 10000.0 # PQ absolute max in nits | |
| def pq_oetf_nits_to_norm(nits: np.ndarray) -> np.ndarray: | |
| """Convert absolute luminance (nits) to PQ signal (0..1), SMPTE ST 2084.""" | |
| L = np.clip(nits, 0.0, _PQ_LMAX) / _PQ_LMAX | |
| Lm1 = np.power(L, _PQ_M1) | |
| num = _PQ_C1 + _PQ_C2 * Lm1 | |
| den = 1.0 + _PQ_C3 * Lm1 | |
| E = np.power(num / den, _PQ_M2) | |
| return np.clip(E, 0.0, 1.0) | |
| def pq_oetf_nits_to_u16(nits: np.ndarray) -> np.ndarray: | |
| """Return uint16 code values 0..65535 representing PQ-coded values.""" | |
| E = pq_oetf_nits_to_norm(nits) | |
| return np.round(E * MAX16).astype(np.uint16) | |
| def sdr_code_to_u16(x: np.ndarray) -> np.ndarray: | |
| """ | |
| SDR test patterns are authored as code values (display-referred), not scene-linear. | |
| x in 0..1 maps directly to 16-bit code values. | |
| """ | |
| return np.round(np.clip(x, 0.0, 1.0) * MAX16).astype(np.uint16) | |
| # ========================== | |
| # Scenario definitions (paired) | |
| # ========================== | |
| SDR_SCENARIOS: Dict[str, Dict[str, str]] = { | |
| "uniformity_sweep": { | |
| "title": "Gray Uniformity Sweep (SDR)", | |
| "goal": "Reveal dirty screen effect and vertical banding", | |
| "watch": "Look for patches/streaks staying fixed while content drifts." | |
| }, | |
| "green_uniformity_pan": { | |
| "title": "Green Uniformity Pan (SDR)", | |
| "goal": "Reveal DSE on green-ish content", | |
| "watch": "Look for dirty screen patterns, tint shifts, non-uniform panning." | |
| }, | |
| "vertical_bars": { | |
| "title": "Vertical Bars Scroll (SDR)", | |
| "goal": "Reveal vertical banding and edge uniformity", | |
| "watch": "Look for banding, uneven brightness, unstable edges during scroll." | |
| }, | |
| "near_black_steps": { | |
| "title": "Near-Black Steps (SDR)", | |
| "goal": "Check near-black detail and black crush", | |
| "watch": "Look for missing steps, crushed shadows, flicker/instability." | |
| }, | |
| "near_black_ramp_move": { | |
| "title": "Near-Black Moving Ramp (SDR)", | |
| "goal": "Stress near-black uniformity under motion", | |
| "watch": "Look for flashing, contouring, and fixed-pattern noise in motion." | |
| }, | |
| "red_gradient": { | |
| "title": "Red Gradient Banding (SDR)", | |
| "goal": "Reveal red banding and posterization", | |
| "watch": "Look for steps in gradient and unstable red transitions." | |
| }, | |
| "starfield": { | |
| "title": "Starfield Smear Stress (SDR)", | |
| "goal": "Stress black-to-color transitions for smearing", | |
| "watch": "Look for black smearing, trails, slow pixel response." | |
| }, | |
| "red_smear": { | |
| "title": "Red-on-Black Smearing (SDR)", | |
| "goal": "Stress VA red trailing", | |
| "watch": "Look for long red trails and distorted moving shapes." | |
| }, | |
| "overshoot_blocks": { | |
| "title": "Overshoot / Inverse Ghosting (SDR)", | |
| "goal": "Reveal overdrive artifacts on gray transitions", | |
| "watch": "Look for bright halos, dark trails, inverse ghosting." | |
| }, | |
| "checker_rows": { | |
| "title": "Checkerboard Alternating Rows (SDR)", | |
| "goal": "High-contrast row motion stress", | |
| "watch": "Look for blur, ghosting, unstable row transitions." | |
| }, | |
| "radial_spokes": { | |
| "title": "Radial Spokes Rotation (SDR)", | |
| "goal": "Stress high-contrast rotating edges", | |
| "watch": "Look for edge breakup, flicker, interpolation artifacts." | |
| }, | |
| "local_dimming_dot": { | |
| "title": "Local Dimming Dot Sweep (SDR)", | |
| "goal": "Reveal blooming / dimming lag (OLED, Mini-LED, FALD)", | |
| "watch": "Look for halos, zone pumping, lag behind moving dot." | |
| }, | |
| "subtitle_bar": { | |
| "title": "Subtitle Bar Stress (SDR)", | |
| "goal": "Stress dimming with bright bar near bottom", | |
| "watch": "Look for pumping, raised blacks, bloom around the bar." | |
| }, | |
| } | |
| HDR_SCENARIOS: Dict[str, Dict[str, str]] = { | |
| "uniformity_sweep_hdr": { | |
| "title": "Gray Uniformity Sweep (HDR10)", | |
| "goal": "Reveal DSE/banding under HDR processing", | |
| "watch": "Look for DSE/banding differences vs SDR (processing/local dimming)." | |
| }, | |
| "green_uniformity_pan_hdr": { | |
| "title": "Green Uniformity Pan (HDR10)", | |
| "goal": "Reveal DSE in HDR pipeline", | |
| "watch": "Look for tint shifts, local dimming artifacts, non-uniform panning." | |
| }, | |
| "vertical_bars_hdr": { | |
| "title": "Vertical Bars Scroll (HDR10)", | |
| "goal": "Reveal banding & edge uniformity in HDR pipeline", | |
| "watch": "Look for banding changes vs SDR and any processing artifacts." | |
| }, | |
| "near_black_steps_hdr": { | |
| "title": "Near-Black Steps (HDR10)", | |
| "goal": "Check PQ near-black stability (artifact focus)", | |
| "watch": "Look for flashing, raised blacks, crushed near-black detail." | |
| }, | |
| "near_black_ramp_move_hdr": { | |
| "title": "Near-Black Moving Ramp (HDR10)", | |
| "goal": "Stress PQ near-black uniformity under motion", | |
| "watch": "Look for contouring, flashing, fixed patterns during motion." | |
| }, | |
| "red_gradient_hdr": { | |
| "title": "Red Gradient Banding (HDR10)", | |
| "goal": "Reveal banding in HDR color processing", | |
| "watch": "Look for steps/hue shifts/banding different from SDR." | |
| }, | |
| "starfield_hdr": { | |
| "title": "Starfield Smear Stress (HDR10)", | |
| "goal": "Stress black-to-color transitions in HDR mode", | |
| "watch": "Look for smearing/ghosting changes vs SDR." | |
| }, | |
| "red_smear_hdr": { | |
| "title": "Red-on-Black Smearing (HDR10)", | |
| "goal": "Stress red trailing under HDR processing", | |
| "watch": "Look for red smearing and local dimming interaction." | |
| }, | |
| "overshoot_blocks_hdr": { | |
| "title": "Overshoot / Inverse Ghosting (HDR10)", | |
| "goal": "Reveal overdrive artifacts under HDR pipeline", | |
| "watch": "Look for halos/trails/inverse ghosting differences vs SDR." | |
| }, | |
| "checker_rows_hdr": { | |
| "title": "Checkerboard Alternating Rows (HDR10)", | |
| "goal": "High-contrast row motion stress in HDR pipeline", | |
| "watch": "Look for motion artifacts differences vs SDR." | |
| }, | |
| "radial_spokes_hdr": { | |
| "title": "Radial Spokes Rotation (HDR10)", | |
| "goal": "Stress edge motion in HDR pipeline", | |
| "watch": "Look for flicker/interpolation and any processing artifacts." | |
| }, | |
| "local_dimming_dot_hdr": { | |
| "title": "Local Dimming Dot Sweep (HDR10)", | |
| "goal": "Reveal blooming/zone behavior in HDR mode", | |
| "watch": "Look for halos, zone pumping, lag vs SDR." | |
| }, | |
| "subtitle_bar_hdr": { | |
| "title": "Subtitle Bar Stress (HDR10)", | |
| "goal": "Stress bright subtitle bar in HDR mode", | |
| "watch": "Look for pumping, raised blacks, bloom around the bar." | |
| }, | |
| } | |
| def scenario_names_for_suite(suite: str) -> List[str]: | |
| return list(SDR_SCENARIOS.keys()) if suite == "sdr" else list(HDR_SCENARIOS.keys()) | |
| def scenario_info(suite: str, scenario: str) -> Dict[str, str]: | |
| return (SDR_SCENARIOS if suite == "sdr" else HDR_SCENARIOS)[scenario] | |
| # ========================== | |
| # Config | |
| # ========================== | |
| @dataclass | |
| class RenderConfig: | |
| fps: int = 60 | |
| body_seconds: float = DEFAULT_BODY_SECONDS | |
| text_intro_seconds: float = DEFAULT_TEXT_INTRO_SECONDS | |
| suite: str = "sdr" | |
| scenario: str = "uniformity_sweep" | |
| all_scenarios: bool = False | |
| segment: str = "all" # intro | body | outro | all | separate | |
| out_dir: str = "renders_final_v3_1" | |
| basename: str = "" | |
| # ffv1 | prores4444 | hevc_x265 | hevc_auto | hevc_hw | |
| codec: str = "ffv1" | |
| hevc_quality: str = "high" # high | medium | |
| speed: str = "medium" # slow | medium | fast | |
| timer_color: str = "white" # white | red | |
| no_timer: bool = False | |
| no_crt: bool = False | |
| no_text_intro: bool = False | |
| color_range: str = "tv" # tv | pc (limited vs full) | |
| @property | |
| def intro_frames(self) -> int: | |
| return int(round(INTRO_SECONDS * self.fps)) | |
| @property | |
| def outro_frames(self) -> int: | |
| return int(round(OUTRO_SECONDS * self.fps)) | |
| @property | |
| def body_frames(self) -> int: | |
| return int(round(self.body_seconds * self.fps)) | |
| # ========================== | |
| # Validation / utility | |
| # ========================== | |
| def ensure_ffmpeg() -> None: | |
| if shutil.which("ffmpeg") is None: | |
| raise RuntimeError("FFmpeg not found in PATH.") | |
| def validate_cfg(cfg: RenderConfig) -> None: | |
| if cfg.fps not in (60, 120): | |
| raise ValueError("FPS must be 60 or 120.") | |
| if cfg.body_seconds <= 0: | |
| raise ValueError("Body seconds must be > 0.") | |
| if cfg.text_intro_seconds < 0: | |
| raise ValueError("Text intro seconds must be >= 0.") | |
| if cfg.suite not in ("sdr", "hdr10"): | |
| raise ValueError("Suite must be sdr or hdr10.") | |
| if cfg.scenario not in scenario_names_for_suite(cfg.suite): | |
| raise ValueError(f"Scenario '{cfg.scenario}' not in suite '{cfg.suite}'.") | |
| if cfg.segment not in ("intro", "body", "outro", "all", "separate"): | |
| raise ValueError("Segment must be intro/body/outro/all/separate.") | |
| if cfg.codec not in ("ffv1", "prores4444", "hevc_x265", "hevc_auto", "hevc_hw"): | |
| raise ValueError("Invalid codec.") | |
| if cfg.hevc_quality not in ("high", "medium"): | |
| raise ValueError("hevc_quality must be high or medium.") | |
| if cfg.speed not in ("slow", "medium", "fast"): | |
| raise ValueError("speed must be slow/medium/fast.") | |
| if cfg.timer_color not in ("white", "red"): | |
| raise ValueError("timer_color must be white or red.") | |
| if cfg.color_range not in ("tv", "pc"): | |
| raise ValueError("color_range must be tv or pc.") | |
| def speed_px_per_sec(speed: str, base: float) -> float: | |
| return {"slow": base * 0.55, "medium": base, "fast": base * 1.8}[speed] | |
| def ffmpeg_capture(args: List[str]) -> str: | |
| p = subprocess.run(args, capture_output=True, text=True) | |
| return (p.stdout or "") + "\n" + (p.stderr or "") | |
| def detect_hevc_hw_encoder() -> Optional[str]: | |
| # Prefer AMD first (for your RX 6900 XT) | |
| txt = ffmpeg_capture(["ffmpeg", "-hide_banner", "-encoders"]).lower() | |
| for enc in ("hevc_amf", "hevc_vaapi", "hevc_nvenc", "hevc_qsv"): | |
| if enc in txt: | |
| return enc | |
| return None | |
| def has_filter(name: str) -> bool: | |
| return name.lower() in ffmpeg_capture(["ffmpeg", "-hide_banner", "-filters"]).lower() | |
| def has_bsf(name: str) -> bool: | |
| return name.lower() in ffmpeg_capture(["ffmpeg", "-hide_banner", "-bsfs"]).lower() | |
| def bsf_supports_hdr10_metadata() -> bool: | |
| """Check whether hevc_metadata BSF supports master_display + max_cll options.""" | |
| txt = ffmpeg_capture(["ffmpeg", "-hide_banner", "-h", "bsf=hevc_metadata"]).lower() | |
| return ("master_display" in txt) and ("max_cll" in txt) | |
| # ========================== | |
| # Overlays / CRT | |
| # ========================== | |
| def frame_black() -> np.ndarray: | |
| return np.zeros((H, W, 3), dtype=np.uint16) | |
| def add_timer(frame_rgb16: np.ndarray, elapsed: float, total: float, color_name: str) -> None: | |
| s = int(min(max(elapsed, 0), total)) | |
| mm, ss = divmod(s, 60) | |
| text = f"{mm:02d}:{ss:02d}" | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| scale = 2.0 | |
| thick = 4 | |
| # Draw directly on RGB array (no BGR view with negative strides) | |
| target = frame_rgb16 | |
| outline = (0, 0, 0) | |
| # RGB tuples now (R,G,B) | |
| fg8 = (255, 255, 255) if color_name == "white" else (255, 64, 64) | |
| outline16 = tuple(int(c * 257) for c in outline) | |
| fg16 = tuple(int(c * 257) for c in fg8) | |
| (tw, th), _ = cv2.getTextSize(text, font, scale, thick) | |
| x = W - tw - 50 | |
| y = 50 + th | |
| cv2.putText(target, text, (x, y), font, scale, outline16, thick + 6, cv2.LINE_AA) | |
| cv2.putText(target, text, (x, y), font, scale, fg16, thick, cv2.LINE_AA) | |
| def draw_text_panel(frame_rgb16: np.ndarray, title: str, goal: str, watch: str, opacity: float = 0.88) -> None: | |
| """ | |
| Full-page slide-style intro (more PowerPoint-like) for readability on TVs. | |
| Draws directly on the RGB16 frame. | |
| """ | |
| # Dim whatever is behind this, then draw a full-page card | |
| frame_rgb16[:] = np.clip(frame_rgb16.astype(np.float32) * (1.0 - opacity), 0, MAX16).astype(np.uint16) | |
| target = frame_rgb16 | |
| # RGB colors | |
| white = (MAX16, MAX16, MAX16) | |
| black = (0, 0, 0) | |
| soft_red = (MAX16, 21000, 21000) | |
| soft_blue = (18000, 30000, MAX16) | |
| card_fill = (4200, 4200, 6500) | |
| card_edge = (13500, 13500, 21000) | |
| card_fill_2 = (6500, 6500, 9000) | |
| # Main card and header | |
| x0, y0 = int(W * 0.05), int(H * 0.07) | |
| x1, y1 = int(W * 0.95), int(H * 0.91) | |
| header_h = int((y1 - y0) * 0.18) | |
| hy1 = y0 + header_h | |
| cv2.rectangle(target, (x0, y0), (x1, y1), card_fill, -1) | |
| cv2.rectangle(target, (x0, y0), (x1, y1), card_edge, 6) | |
| cv2.rectangle(target, (x0, y0), (x1, hy1), soft_blue, -1) | |
| cv2.rectangle(target, (x0 + 24, hy1 + 24), (x1 - 24, hy1 + 34), soft_red, -1) | |
| # Title (wrapped, max 2 lines) | |
| tx = x0 + 52 | |
| ty = y0 + 78 | |
| for n, line in enumerate(textwrap.wrap(title, width=34)[:2]): | |
| yy = ty + n * 72 | |
| cv2.putText(target, line, (tx, yy), cv2.FONT_HERSHEY_DUPLEX, 1.50, black, 8, cv2.LINE_AA) | |
| cv2.putText(target, line, (tx, yy), cv2.FONT_HERSHEY_DUPLEX, 1.50, white, 4, cv2.LINE_AA) | |
| # Two-column content area | |
| content_top = hy1 + 64 | |
| margin = 52 | |
| gutter = 40 | |
| col_w = (x1 - x0 - margin * 2 - gutter) | |
| col_w //= 2 | |
| left_x = x0 + margin | |
| right_x = left_x + col_w + gutter | |
| col_y0 = content_top | |
| col_y1 = y1 - 58 | |
| # Boxes | |
| cv2.rectangle(target, (left_x, col_y0), (left_x + col_w, col_y1), card_fill_2, -1) | |
| cv2.rectangle(target, (left_x, col_y0), (left_x + col_w, col_y1), card_edge, 3) | |
| cv2.rectangle(target, (right_x, col_y0), (right_x + col_w, col_y1), card_fill_2, -1) | |
| cv2.rectangle(target, (right_x, col_y0), (right_x + col_w, col_y1), card_edge, 3) | |
| # Column headers | |
| hdr_h = 56 | |
| cv2.rectangle(target, (left_x, col_y0), (left_x + col_w, col_y0 + hdr_h), soft_red, -1) | |
| cv2.rectangle(target, (right_x, col_y0), (right_x + col_w, col_y0 + hdr_h), soft_blue, -1) | |
| cv2.putText(target, "GOAL", (left_x + 18, col_y0 + 40), cv2.FONT_HERSHEY_SIMPLEX, 1.02, black, 6, cv2.LINE_AA) | |
| cv2.putText(target, "GOAL", (left_x + 18, col_y0 + 40), cv2.FONT_HERSHEY_SIMPLEX, 1.02, white, 2, cv2.LINE_AA) | |
| cv2.putText(target, "WATCH FOR", (right_x + 18, col_y0 + 40), cv2.FONT_HERSHEY_SIMPLEX, 1.02, black, 6, cv2.LINE_AA) | |
| cv2.putText(target, "WATCH FOR", (right_x + 18, col_y0 + 40), cv2.FONT_HERSHEY_SIMPLEX, 1.02, white, 2, cv2.LINE_AA) | |
| # Body text | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| scale = 0.90 | |
| thick = 2 | |
| step = 46 | |
| max_lines = max(1, (col_y1 - (col_y0 + hdr_h + 28)) // step) | |
| y = col_y0 + hdr_h + 42 | |
| for line in textwrap.wrap(goal, width=42)[:max_lines]: | |
| cv2.putText(target, line, (left_x + 18, y), font, scale, black, 5, cv2.LINE_AA) | |
| cv2.putText(target, line, (left_x + 18, y), font, scale, white, thick, cv2.LINE_AA) | |
| y += step | |
| y = col_y0 + hdr_h + 42 | |
| for line in textwrap.wrap(watch, width=42)[:max_lines]: | |
| cv2.putText(target, line, (right_x + 18, y), font, scale, black, 5, cv2.LINE_AA) | |
| cv2.putText(target, line, (right_x + 18, y), font, scale, white, thick, cv2.LINE_AA) | |
| y += step | |
| # Footer | |
| footer = "Body starts after intro. Timer appears top-right during the test." | |
| fy = y1 - 18 | |
| cv2.putText(target, footer, (x0 + 34, fy), cv2.FONT_HERSHEY_SIMPLEX, 0.72, black, 4, cv2.LINE_AA) | |
| cv2.putText(target, footer, (x0 + 34, fy), cv2.FONT_HERSHEY_SIMPLEX, 0.72, (52000, 52000, 52000), 2, cv2.LINE_AA) | |
| def crt_vertical_collapse(frame_rgb16: np.ndarray, scale_y: float, flash_gain: float) -> np.ndarray: | |
| nh = max(1, int(H * scale_y)) | |
| resized = cv2.resize(frame_rgb16, (W, nh), interpolation=cv2.INTER_LINEAR) | |
| canvas = np.zeros_like(frame_rgb16) | |
| y0 = (H - nh) // 2 | |
| canvas[y0:y0+nh, :, :] = resized | |
| if flash_gain != 1.0: | |
| canvas = np.clip(canvas.astype(np.float32) * flash_gain, 0, MAX16).astype(np.uint16) | |
| return canvas | |
| def apply_crt(cfg: RenderConfig, idx: int, total: int, which: str) -> np.ndarray: | |
| if cfg.no_crt: | |
| return frame_black() | |
| # Generate a white frame so flash_gain/collapse visibly produces light. | |
| flash_frame = np.full((H, W, 3), MAX16, dtype=np.uint16) | |
| if which == "intro": | |
| p = idx / max(1, total - 1) | |
| return crt_vertical_collapse(flash_frame, 0.02 + 0.98 * p, 1.85 - 0.85 * p) | |
| if which == "outro": | |
| p = idx / max(1, total - 1) | |
| return crt_vertical_collapse(flash_frame, max(0.008, 1.0 - 0.992 * p), 1.0 + 0.35 * (1.0 - p)) | |
| return frame_black() | |
| # ========================== | |
| # Pattern generators | |
| # ========================== | |
| def gen_uniformity_sweep_sdr(cfg: RenderConfig, t: float) -> np.ndarray: | |
| frac = t / max(cfg.body_seconds, 1e-6) | |
| pct = 0.05 if frac < 1/3 else (0.10 if frac < 2/3 else 0.50) | |
| base = int(pct * MAX16) | |
| frame = np.full((H, W, 3), base, dtype=np.uint16) | |
| sp = speed_px_per_sec(cfg.speed, 22.0) | |
| x = (np.arange(W, dtype=np.float32) + sp * t) / (W / 6.2) | |
| drift = (np.sin(2 * np.pi * x) * 0.0025 + np.sin(2 * np.pi * x * 0.33) * 0.0018) | |
| d = (drift * MAX16).astype(np.int32) | |
| return np.clip(frame.astype(np.int32) + d[None, :, None], 0, MAX16).astype(np.uint16) | |
| def gen_uniformity_sweep_hdr(cfg: RenderConfig, t: float) -> np.ndarray: | |
| frac = t / max(cfg.body_seconds, 1e-6) | |
| nits = 5.0 if frac < 1/3 else (10.0 if frac < 2/3 else 50.0) | |
| base = int(pq_oetf_nits_to_u16(np.array(nits, dtype=np.float32))) | |
| frame = np.full((H, W, 3), base, dtype=np.uint16) | |
| sp = speed_px_per_sec(cfg.speed, 22.0) | |
| x = (np.arange(W, dtype=np.float32) + sp * t) / (W / 6.2) | |
| drift = (np.sin(2 * np.pi * x) * 0.0020 + np.sin(2 * np.pi * x * 0.33) * 0.0014) | |
| d = (drift * 2200).astype(np.int32) | |
| return np.clip(frame.astype(np.int32) + d[None, :, None], 0, MAX16).astype(np.uint16) | |
| def gen_green_uniformity_pan_sdr(cfg: RenderConfig, t: float) -> np.ndarray: | |
| frame = np.zeros((H, W, 3), dtype=np.uint16) | |
| frame[:, :, 1] = int(MAX16 * 0.66) # G in RGB | |
| sp = speed_px_per_sec(cfg.speed, 18.0) | |
| x = (np.arange(W, dtype=np.float32) + sp * t) / (W / 5.8) | |
| var = (np.sin(2*np.pi*x) * 0.0035 + np.sin(2*np.pi*0.21*x) * 0.002) | |
| d = (var * MAX16).astype(np.int32) | |
| f = frame.astype(np.int32) | |
| f[:, :, 1] += d[None, :] | |
| f[:, :, 0] += (d[None, :] // 7) | |
| f[:, :, 2] += (d[None, :] // 7) | |
| return np.clip(f, 0, MAX16).astype(np.uint16) | |
| def gen_green_uniformity_pan_hdr(cfg: RenderConfig, t: float) -> np.ndarray: | |
| base_nits = 20.0 | |
| base = int(pq_oetf_nits_to_u16(np.array(base_nits, dtype=np.float32))) | |
| frame = np.zeros((H, W, 3), dtype=np.uint16) | |
| frame[:, :, 1] = base | |
| sp = speed_px_per_sec(cfg.speed, 18.0) | |
| x = (np.arange(W, dtype=np.float32) + sp * t) / (W / 5.8) | |
| var = (np.sin(2*np.pi*x) * 0.0030 + np.sin(2*np.pi*0.21*x) * 0.0017) | |
| d = (var * 1800).astype(np.int32) | |
| f = frame.astype(np.int32) | |
| f[:, :, 1] += d[None, :] | |
| f[:, :, 0] += (d[None, :] // 10) | |
| f[:, :, 2] += (d[None, :] // 10) | |
| return np.clip(f, 0, MAX16).astype(np.uint16) | |
| def gen_vertical_bars(cfg: RenderConfig, t: float) -> np.ndarray: | |
| bar_w = max(8, W // 96) | |
| tile_w = bar_w * 256 | |
| xx = (np.arange(tile_w) // bar_w) % 2 | |
| vals = np.where(xx == 0, 0, MAX16).astype(np.uint16) | |
| line = np.stack([vals, vals, vals], axis=1) | |
| tile = np.repeat(line[None, :, :], H, axis=0) | |
| off = int((speed_px_per_sec(cfg.speed, 320.0) * t) % tile_w) | |
| p1 = tile[:, off:off + W] | |
| return np.concatenate([p1, tile[:, :W - p1.shape[1]]], axis=1)[:, :W] if p1.shape[1] < W else p1[:, :W] | |
| def gen_near_black_steps_sdr(cfg: RenderConfig, t: float) -> np.ndarray: | |
| levels = np.array([0.00, 0.01, 0.02, 0.03, 0.04, 0.05, 0.07, 0.10], dtype=np.float32) | |
| n = levels.size | |
| bw = W // n | |
| tile_w = bw * n | |
| tile = np.zeros((H, tile_w, 3), dtype=np.uint16) | |
| for i, lv in enumerate(levels): | |
| tile[:, i*bw:(i+1)*bw, :] = int(lv * MAX16) | |
| off = int((speed_px_per_sec(cfg.speed, 55.0) * t) % tile_w) | |
| p1 = tile[:, off:off + W] | |
| return np.concatenate([p1, tile[:, :W - p1.shape[1]]], axis=1)[:, :W] if p1.shape[1] < W else p1[:, :W] | |
| def gen_near_black_steps_hdr(cfg: RenderConfig, t: float) -> np.ndarray: | |
| nits = np.array([0.0, 0.05, 0.1, 0.2, 0.4, 0.7, 1.2, 2.0], dtype=np.float32) | |
| codes = pq_oetf_nits_to_u16(nits) | |
| n = codes.size | |
| bw = W // n | |
| tile_w = bw * n | |
| tile = np.zeros((H, tile_w, 3), dtype=np.uint16) | |
| for i, code in enumerate(codes): | |
| tile[:, i*bw:(i+1)*bw, :] = int(code) | |
| off = int((speed_px_per_sec(cfg.speed, 55.0) * t) % tile_w) | |
| p1 = tile[:, off:off + W] | |
| return np.concatenate([p1, tile[:, :W - p1.shape[1]]], axis=1)[:, :W] if p1.shape[1] < W else p1[:, :W] | |
| def gen_near_black_ramp_move_sdr(cfg: RenderConfig, t: float) -> np.ndarray: | |
| x = np.linspace(0.0, 1.0, W, dtype=np.float32) | |
| ramp = (x ** 2.2) * 0.10 | |
| drift = 0.004 * np.sin(2*np.pi*(x*6 + t*0.15)) | |
| vals = np.clip(ramp + drift, 0.0, 0.12) | |
| line = sdr_code_to_u16(vals) | |
| frame = np.repeat(line[None, :, None], H, axis=0) | |
| frame = np.repeat(frame, 3, axis=2) | |
| off = int((speed_px_per_sec(cfg.speed, 30.0) * t) % W) | |
| return np.roll(frame, -off, axis=1) | |
| def gen_near_black_ramp_move_hdr(cfg: RenderConfig, t: float) -> np.ndarray: | |
| x = np.linspace(0.0, 1.0, W, dtype=np.float32) | |
| nits = (x ** 3.0) * 5.0 | |
| drift = 0.25 * np.sin(2*np.pi*(x*6 + t*0.12)) | |
| nits2 = np.clip(nits + drift, 0.0, 6.0) | |
| line = pq_oetf_nits_to_u16(nits2) | |
| frame = np.repeat(line[None, :, None], H, axis=0) | |
| frame = np.repeat(frame, 3, axis=2) | |
| off = int((speed_px_per_sec(cfg.speed, 30.0) * t) % W) | |
| return np.roll(frame, -off, axis=1) | |
| def gen_red_gradient_sdr(cfg: RenderConfig, t: float) -> np.ndarray: | |
| xx = np.linspace(0, 1, W, dtype=np.float32) | |
| yy = np.linspace(0, 1, H, dtype=np.float32) | |
| X, Y = np.meshgrid(xx, yy) | |
| ang = 2*np.pi*(t/max(cfg.body_seconds,1e-6))*0.25 | |
| g = np.clip(0.5 + 0.85*(np.cos(ang)*(X-0.5) + np.sin(ang)*(Y-0.5)), 0, 1) | |
| pulse = 0.94 + 0.06 * math.sin(2*np.pi*t/max(cfg.body_seconds,1e-6)) | |
| red = sdr_code_to_u16((0.05 + 0.95*g) * pulse) | |
| frame = np.zeros((H, W, 3), dtype=np.uint16) | |
| frame[:, :, 0] = red # R | |
| return frame | |
| def gen_red_gradient_hdr(cfg: RenderConfig, t: float) -> np.ndarray: | |
| xx = np.linspace(0, 1, W, dtype=np.float32) | |
| yy = np.linspace(0, 1, H, dtype=np.float32) | |
| X, Y = np.meshgrid(xx, yy) | |
| ang = 2*np.pi*(t/max(cfg.body_seconds,1e-6))*0.25 | |
| g = np.clip(0.5 + 0.85*(np.cos(ang)*(X-0.5) + np.sin(ang)*(Y-0.5)), 0, 1) | |
| nits = 5.0 + g * 115.0 | |
| pulse = 0.97 + 0.03 * math.sin(2*np.pi*t/max(cfg.body_seconds,1e-6)) | |
| code = pq_oetf_nits_to_u16(nits * pulse) | |
| frame = np.zeros((H, W, 3), dtype=np.uint16) | |
| frame[:, :, 0] = code # R | |
| return frame | |
| # --- Vectorized starfield --- | |
| _STAR_CACHE = None | |
| def _init_star_cache(): | |
| global _STAR_CACHE | |
| rng = np.random.default_rng(12345) | |
| n = 260 | |
| _STAR_CACHE = { | |
| "x0": rng.uniform(0, W, n).astype(np.float32), | |
| "y0": rng.uniform(0, H, n).astype(np.float32), | |
| "grp": rng.integers(0, 3, n, dtype=np.int32), | |
| "size": rng.choice(np.array([2, 3, 4, 5], dtype=np.int32), n), | |
| "color": rng.integers(0, 3, n, dtype=np.int32), # 0 white, 1 red, 2 blue | |
| "ang": np.deg2rad(25 + (np.arange(n) % 17) * 3).astype(np.float32), | |
| } | |
| def _stamp_star_squares(frame: np.ndarray, xs: np.ndarray, ys: np.ndarray, cs: np.ndarray, s: int, value: int) -> None: | |
| # RGB layout: R=0, G=1, B=2 | |
| for dy in range(s): | |
| yy = ys + dy | |
| for dx in range(s): | |
| xx = xs + dx | |
| mw = (cs == 0) | |
| if np.any(mw): | |
| frame[yy[mw], xx[mw], :] = value | |
| mr = (cs == 1) | |
| if np.any(mr): | |
| frame[yy[mr], xx[mr], 0] = value # red channel | |
| mb = (cs == 2) | |
| if np.any(mb): | |
| frame[yy[mb], xx[mb], 2] = value # blue channel | |
| def gen_starfield_sdr(cfg: RenderConfig, t: float) -> np.ndarray: | |
| global _STAR_CACHE | |
| if _STAR_CACHE is None: | |
| _init_star_cache() | |
| S = _STAR_CACHE | |
| frame = np.full((H, W, 3), int(MAX16 * 0.002), dtype=np.uint16) | |
| base = np.array([85.0, 190.0, 380.0], dtype=np.float32) | |
| mult = {"slow": 0.8, "medium": 1.0, "fast": 1.35}[cfg.speed] | |
| sp = base[S["grp"]] * mult | |
| x = (S["x0"] + np.cos(S["ang"]) * sp * t) % (W + 20) - 10 | |
| y = (S["y0"] + np.sin(S["ang"]) * sp * t) % (H + 20) - 10 | |
| xi = x.astype(np.int32) | |
| yi = y.astype(np.int32) | |
| sz = S["size"] | |
| col = S["color"] | |
| vis = (xi >= -6) & (xi < W) & (yi >= -6) & (yi < H) | |
| xi, yi, sz, col = xi[vis], yi[vis], sz[vis], col[vis] | |
| for s in (2, 3, 4, 5): | |
| m = (sz == s) | |
| if not np.any(m): | |
| continue | |
| xs = np.clip(xi[m], 0, W - s) | |
| ys = np.clip(yi[m], 0, H - s) | |
| cs = col[m] | |
| _stamp_star_squares(frame, xs, ys, cs, s, MAX16) | |
| return frame | |
| def gen_starfield_hdr(cfg: RenderConfig, t: float) -> np.ndarray: | |
| global _STAR_CACHE | |
| if _STAR_CACHE is None: | |
| _init_star_cache() | |
| S = _STAR_CACHE | |
| bg = int(pq_oetf_nits_to_u16(np.array(0.2, dtype=np.float32))) | |
| star_code = int(pq_oetf_nits_to_u16(np.array(200.0, dtype=np.float32))) | |
| frame = np.full((H, W, 3), bg, dtype=np.uint16) | |
| base = np.array([85.0, 190.0, 380.0], dtype=np.float32) | |
| mult = {"slow": 0.8, "medium": 1.0, "fast": 1.35}[cfg.speed] | |
| sp = base[S["grp"]] * mult | |
| x = (S["x0"] + np.cos(S["ang"]) * sp * t) % (W + 20) - 10 | |
| y = (S["y0"] + np.sin(S["ang"]) * sp * t) % (H + 20) - 10 | |
| xi = x.astype(np.int32) | |
| yi = y.astype(np.int32) | |
| sz = S["size"] | |
| col = S["color"] | |
| vis = (xi >= -6) & (xi < W) & (yi >= -6) & (yi < H) | |
| xi, yi, sz, col = xi[vis], yi[vis], sz[vis], col[vis] | |
| for s in (2, 3, 4, 5): | |
| m = (sz == s) | |
| if not np.any(m): | |
| continue | |
| xs = np.clip(xi[m], 0, W - s) | |
| ys = np.clip(yi[m], 0, H - s) | |
| cs = col[m] | |
| _stamp_star_squares(frame, xs, ys, cs, s, star_code) | |
| return frame | |
| def _stamp_rect_red(frame: np.ndarray, xs: np.ndarray, ys: np.ndarray, w: int, h: int, vals: np.ndarray) -> None: | |
| # RGB layout, red channel index 0 | |
| for dy in range(h): | |
| yy = ys + dy | |
| for dx in range(w): | |
| xx = xs + dx | |
| frame[yy, xx, 0] = np.maximum(frame[yy, xx, 0], vals) | |
| def _stamp_hline_red(frame: np.ndarray, xs: np.ndarray, ys: np.ndarray, w: int, vals: np.ndarray, thick: int = 2) -> None: | |
| for dx in range(w): | |
| xx = xs + dx | |
| for dy in range(thick): | |
| yy = np.clip(ys + dy, 0, H - 1) | |
| frame[yy, xx, 0] = np.maximum(frame[yy, xx, 0], vals) | |
| def gen_red_smear_sdr(cfg: RenderConfig, t: float) -> np.ndarray: | |
| frame = np.zeros((H, W, 3), dtype=np.uint16) | |
| mult = {"slow": 0.75, "medium": 1.0, "fast": 1.4}[cfg.speed] | |
| n = 100 | |
| i = np.arange(n, dtype=np.int32) | |
| x0 = (i * 181) % W | |
| y0 = (i * 113) % H | |
| sp = (110 + (i % 6) * 90).astype(np.float32) * mult | |
| ang = np.deg2rad((i * 23) % 360).astype(np.float32) | |
| x = ((x0.astype(np.float32) + np.cos(ang) * sp * t) % W).astype(np.int32) | |
| y = ((y0.astype(np.float32) + np.sin(ang) * sp * t) % H).astype(np.int32) | |
| shades = np.array([int(MAX16*0.25), int(MAX16*0.6), MAX16], dtype=np.uint16) | |
| shade = shades[i % 3] | |
| even = (i % 2 == 0) | |
| xs = np.clip(x[even], 0, W - 16) | |
| ys = np.clip(y[even], 0, H - 16) | |
| _stamp_rect_red(frame, xs, ys, 16, 16, shade[even]) | |
| odd = ~even | |
| xl = np.clip(x[odd], 0, W - 40) | |
| yl = np.clip(y[odd], 0, H - 2) | |
| _stamp_hline_red(frame, xl, yl, 40, shade[odd], thick=2) | |
| return frame | |
| def gen_red_smear_hdr(cfg: RenderConfig, t: float) -> np.ndarray: | |
| bg = int(pq_oetf_nits_to_u16(np.array(0.2, dtype=np.float32))) | |
| frame = np.full((H, W, 3), bg, dtype=np.uint16) | |
| mult = {"slow": 0.75, "medium": 1.0, "fast": 1.4}[cfg.speed] | |
| n = 100 | |
| i = np.arange(n, dtype=np.int32) | |
| x0 = (i * 181) % W | |
| y0 = (i * 113) % H | |
| sp = (110 + (i % 6) * 90).astype(np.float32) * mult | |
| ang = np.deg2rad((i * 23) % 360).astype(np.float32) | |
| x = ((x0.astype(np.float32) + np.cos(ang) * sp * t) % W).astype(np.int32) | |
| y = ((y0.astype(np.float32) + np.sin(ang) * sp * t) % H).astype(np.int32) | |
| nits_levels = np.array([25.0, 80.0, 180.0], dtype=np.float32) | |
| shade = pq_oetf_nits_to_u16(nits_levels)[i % 3] | |
| even = (i % 2 == 0) | |
| xs = np.clip(x[even], 0, W - 16) | |
| ys = np.clip(y[even], 0, H - 16) | |
| _stamp_rect_red(frame, xs, ys, 16, 16, shade[even]) | |
| odd = ~even | |
| xl = np.clip(x[odd], 0, W - 40) | |
| yl = np.clip(y[odd], 0, H - 2) | |
| _stamp_hline_red(frame, xl, yl, 40, shade[odd], thick=2) | |
| return frame | |
| def gen_overshoot_blocks(cfg: RenderConfig, t: float, hdr: bool) -> np.ndarray: | |
| if hdr: | |
| bg = int(pq_oetf_nits_to_u16(np.array(50.0, dtype=np.float32))) | |
| frame = np.full((H, W, 3), bg, dtype=np.uint16) | |
| shades = pq_oetf_nits_to_u16(np.array([0.2, 10, 30, 50, 120, 220], dtype=np.float32)) | |
| else: | |
| frame = np.full((H, W, 3), int(MAX16 * 0.5), dtype=np.uint16) | |
| shades = np.array([0, int(MAX16*0.15), int(MAX16*0.35), int(MAX16*0.5), int(MAX16*0.75), MAX16], dtype=np.uint16) | |
| mult = {"slow": 0.75, "medium": 1.0, "fast": 1.35}[cfg.speed] | |
| n = 28 | |
| for i in range(n): | |
| bw = 160 + (i % 5) * 90 | |
| bh = 100 + (i % 4) * 70 | |
| y = 80 + (i * 149) % (H - bh - 100) | |
| sp = (95 + (i % 7) * 45) * mult | |
| x0 = (i * 257) % (W + bw) | |
| x = int((x0 + (sp * t if i % 2 == 0 else -sp * t)) % (W + bw)) - bw | |
| v = int(shades[i % len(shades)]) | |
| x1, x2 = max(0, x), min(W, x + bw) | |
| y1, y2 = max(0, y), min(H, y + bh) | |
| if x1 < x2 and y1 < y2: | |
| frame[y1:y2, x1:x2, :] = v | |
| return frame | |
| def gen_checker_rows(cfg: RenderConfig, t: float, hdr: bool) -> np.ndarray: | |
| cell = max(40, W // 32) | |
| inner = int(cell * 0.45) | |
| rows = H // cell + 2 | |
| cols = W // cell + 6 | |
| tile_h, tile_w = rows * cell, cols * cell | |
| yy = (np.arange(tile_h) // cell)[:, None] | |
| xx = (np.arange(tile_w) // cell)[None, :] | |
| checker = ((xx + yy) % 2).astype(np.uint8) | |
| if hdr: | |
| black = int(pq_oetf_nits_to_u16(np.array(0.2, dtype=np.float32))) | |
| white = int(pq_oetf_nits_to_u16(np.array(200.0, dtype=np.float32))) | |
| img = np.where(checker[..., None] == 0, black, white).astype(np.uint16) | |
| else: | |
| img = np.where(checker[..., None] == 0, 0, MAX16).astype(np.uint16) | |
| img = np.repeat(img, 3, axis=2) | |
| for ry in range(rows): | |
| for cx in range(cols): | |
| y0 = ry * cell + (cell - inner) // 2 | |
| x0 = cx * cell + (cell - inner) // 2 | |
| is_white = checker[ry * cell, cx * cell] == 1 | |
| val = black if (hdr and is_white) else white if hdr else (0 if is_white else MAX16) | |
| img[y0:y0+inner, x0:x0+inner] = val | |
| out = np.zeros((H, W, 3), dtype=np.uint16) | |
| sp = speed_px_per_sec(cfg.speed, 220.0) | |
| for r in range(H // cell + 2): | |
| y0 = r * cell | |
| y1 = min(H, y0 + cell) | |
| if y0 >= H: | |
| break | |
| row_strip = img[y0:y1] | |
| offset = int((sp * t) % tile_w) | |
| xstart = (tile_w - offset) % tile_w if (r % 2 == 0) else offset | |
| p1 = row_strip[:, xstart:xstart + W] | |
| row = np.concatenate([p1, row_strip[:, :W - p1.shape[1]]], axis=1) if p1.shape[1] < W else p1 | |
| out[y0:y1] = row[:, :W] | |
| return out | |
| def gen_radial_spokes(cfg: RenderConfig, t: float, hdr: bool) -> np.ndarray: | |
| cx, cy = W / 2, H / 2 | |
| yy, xx = np.mgrid[0:H, 0:W] | |
| ang = np.arctan2(yy - cy, xx - cx) | |
| rot = {"slow": 0.35, "medium": 0.7, "fast": 1.15}[cfg.speed] | |
| phase = (((ang + rot * t + np.pi) / (2 * np.pi)) * 48).astype(np.int32) | |
| mask = (phase % 2 == 0) | |
| frame = np.zeros((H, W, 3), dtype=np.uint16) | |
| if hdr: | |
| black = int(pq_oetf_nits_to_u16(np.array(0.2, dtype=np.float32))) | |
| white = int(pq_oetf_nits_to_u16(np.array(200.0, dtype=np.float32))) | |
| frame[:] = black | |
| frame[mask] = white | |
| else: | |
| frame[mask] = MAX16 | |
| return frame | |
| def gen_local_dimming_dot(cfg: RenderConfig, t: float, hdr: bool) -> np.ndarray: | |
| if hdr: | |
| bg = int(pq_oetf_nits_to_u16(np.array(0.2, dtype=np.float32))) | |
| dot = int(pq_oetf_nits_to_u16(np.array(250.0, dtype=np.float32))) | |
| else: | |
| bg = 0 | |
| dot = MAX16 | |
| frame = np.full((H, W, 3), bg, dtype=np.uint16) | |
| half = cfg.body_seconds / 2.0 | |
| if t < half: | |
| x = int((t / max(half, 1e-6)) * (W - 1)) | |
| y = H // 2 | |
| else: | |
| x = W // 2 | |
| y = int(((t - half) / max(half, 1e-6)) * (H - 1)) | |
| r = 10 | |
| x0, x1 = max(0, x - r), min(W, x + r) | |
| y0, y1 = max(0, y - r), min(H, y + r) | |
| frame[y0:y1, x0:x1, :] = dot | |
| return frame | |
| def gen_subtitle_bar(cfg: RenderConfig, t: float, hdr: bool) -> np.ndarray: | |
| if hdr: | |
| bg = int(pq_oetf_nits_to_u16(np.array(1.0, dtype=np.float32))) | |
| bar = int(pq_oetf_nits_to_u16(np.array(250.0, dtype=np.float32))) | |
| else: | |
| bg = int(MAX16 * 0.02) | |
| bar = MAX16 | |
| frame = np.full((H, W, 3), bg, dtype=np.uint16) | |
| bar_h = int(H * 0.06) | |
| y0 = int(H * 0.86) | |
| y1 = min(H, y0 + bar_h) | |
| sp = speed_px_per_sec(cfg.speed, 40.0) | |
| off = int((sp * t) % (W * 0.2)) | |
| x0 = int(W * 0.12 + off) | |
| x1 = min(W, x0 + int(W * 0.76)) | |
| frame[y0:y1, x0:x1, :] = bar | |
| return frame | |
| # ========================== | |
| # Scenario dispatcher | |
| # ========================== | |
| def gen_frame(cfg: RenderConfig, t: float) -> np.ndarray: | |
| if cfg.suite == "sdr": | |
| if cfg.scenario == "uniformity_sweep": return gen_uniformity_sweep_sdr(cfg, t) | |
| if cfg.scenario == "green_uniformity_pan": return gen_green_uniformity_pan_sdr(cfg, t) | |
| if cfg.scenario == "vertical_bars": return gen_vertical_bars(cfg, t) | |
| if cfg.scenario == "near_black_steps": return gen_near_black_steps_sdr(cfg, t) | |
| if cfg.scenario == "near_black_ramp_move": return gen_near_black_ramp_move_sdr(cfg, t) | |
| if cfg.scenario == "red_gradient": return gen_red_gradient_sdr(cfg, t) | |
| if cfg.scenario == "starfield": return gen_starfield_sdr(cfg, t) | |
| if cfg.scenario == "red_smear": return gen_red_smear_sdr(cfg, t) | |
| if cfg.scenario == "overshoot_blocks": return gen_overshoot_blocks(cfg, t, hdr=False) | |
| if cfg.scenario == "checker_rows": return gen_checker_rows(cfg, t, hdr=False) | |
| if cfg.scenario == "radial_spokes": return gen_radial_spokes(cfg, t, hdr=False) | |
| if cfg.scenario == "local_dimming_dot": return gen_local_dimming_dot(cfg, t, hdr=False) | |
| if cfg.scenario == "subtitle_bar": return gen_subtitle_bar(cfg, t, hdr=False) | |
| raise ValueError("Unknown SDR scenario") | |
| if cfg.scenario == "uniformity_sweep_hdr": return gen_uniformity_sweep_hdr(cfg, t) | |
| if cfg.scenario == "green_uniformity_pan_hdr": return gen_green_uniformity_pan_hdr(cfg, t) | |
| if cfg.scenario == "vertical_bars_hdr": return gen_vertical_bars(cfg, t) | |
| if cfg.scenario == "near_black_steps_hdr": return gen_near_black_steps_hdr(cfg, t) | |
| if cfg.scenario == "near_black_ramp_move_hdr": return gen_near_black_ramp_move_hdr(cfg, t) | |
| if cfg.scenario == "red_gradient_hdr": return gen_red_gradient_hdr(cfg, t) | |
| if cfg.scenario == "starfield_hdr": return gen_starfield_hdr(cfg, t) | |
| if cfg.scenario == "red_smear_hdr": return gen_red_smear_hdr(cfg, t) | |
| if cfg.scenario == "overshoot_blocks_hdr": return gen_overshoot_blocks(cfg, t, hdr=True) | |
| if cfg.scenario == "checker_rows_hdr": return gen_checker_rows(cfg, t, hdr=True) | |
| if cfg.scenario == "radial_spokes_hdr": return gen_radial_spokes(cfg, t, hdr=True) | |
| if cfg.scenario == "local_dimming_dot_hdr": return gen_local_dimming_dot(cfg, t, hdr=True) | |
| if cfg.scenario == "subtitle_bar_hdr": return gen_subtitle_bar(cfg, t, hdr=True) | |
| raise ValueError("Unknown HDR scenario") | |
| # ========================== | |
| # FFmpeg command building | |
| # ========================== | |
| def color_metadata_args(cfg: RenderConfig) -> List[str]: | |
| cr = "tv" if cfg.color_range == "tv" else "pc" | |
| if cfg.suite == "sdr": | |
| return ["-color_range", cr, "-colorspace", "bt709", "-color_primaries", "bt709", "-color_trc", "bt709"] | |
| return ["-color_range", cr, "-colorspace", "bt2020nc", "-color_primaries", "bt2020", "-color_trc", "smpte2084"] | |
| def rgb_to_yuv_filter(cfg: RenderConfig, out_pix_fmt: str) -> str: | |
| """ | |
| Critical correctness: | |
| - Explicit matrix/primaries/transfer for RGB->YUV conversion | |
| - Explicit range mapping: rangein=full, range=(limited|full) | |
| """ | |
| range_out = "limited" if cfg.color_range == "tv" else "full" | |
| if has_filter("zscale"): | |
| if cfg.suite == "sdr": | |
| return ( | |
| "zscale=" | |
| "matrixin=bt709:transferin=bt709:primariesin=bt709:" | |
| f"rangein=full:matrix=bt709:transfer=bt709:primaries=bt709:range={range_out}," | |
| f"format={out_pix_fmt}" | |
| ) | |
| return ( | |
| "zscale=" | |
| "matrixin=bt2020nc:transferin=smpte2084:primariesin=bt2020:" | |
| f"rangein=full:matrix=bt2020nc:transfer=smpte2084:primaries=bt2020:range={range_out}," | |
| f"format={out_pix_fmt}" | |
| ) | |
| # Fallback (less ideal than zscale, but still explicit about matrix + range) | |
| out_range = "tv" if cfg.color_range == "tv" else "pc" | |
| if cfg.suite == "sdr": | |
| return ( | |
| f"scale=in_color_matrix=bt709:out_color_matrix=bt709:" | |
| f"in_range=pc:out_range={out_range},format={out_pix_fmt}" | |
| ) | |
| return ( | |
| f"scale=in_color_matrix=bt2020:out_color_matrix=bt2020:" | |
| f"in_range=pc:out_range={out_range},format={out_pix_fmt}" | |
| ) | |
| def hdr10_metadata_bsf() -> str: | |
| # FFmpeg hevc_metadata BSF uses "master_display" (not x265's "master-display"). | |
| # Use raw strings so Python keeps backslashes literally (no SyntaxWarning on \,, Python 3.12+). | |
| return ( | |
| r"hevc_metadata=" | |
| r"master_display=G(8500\,39850)B(6550\,2300)R(34000\,16000)WP(15635\,16450)L(10000000\,1):" | |
| r"max_cll=1000\,400" | |
| ) | |
| def _build_amf_cmd(base: List[str], vf: str, meta: List[str], qp_triplet: tuple, out: str) -> List[str]: | |
| _qpi, qpp, _qpb = qp_triplet | |
| # AMF CLI options vary across ffmpeg builds. Using per-frame-type flags like -qp_p / -qp_b | |
| # is not portable and can fail with "Unrecognized option". | |
| # Use a single global quantizer for broad compatibility. | |
| return base + ["-vf", vf] + meta + [ | |
| "-c:v", "hevc_amf", | |
| "-quality", "quality", | |
| "-rc", "cqp", | |
| "-qp", qpp, | |
| "-tag:v", "hvc1", | |
| out | |
| ] | |
| def _build_nvenc_cmd(base: List[str], vf: str, meta: List[str], qp: str, high: bool, out: str) -> List[str]: | |
| return base + ["-vf", vf] + meta + [ | |
| "-c:v", "hevc_nvenc", | |
| "-preset", "p7" if high else "p5", | |
| "-tune", "hq", | |
| "-rc", "constqp", | |
| "-qp", qp, | |
| "-tag:v", "hvc1", | |
| out | |
| ] | |
| def _build_qsv_cmd(base: List[str], vf: str, meta: List[str], qp: str, out: str) -> List[str]: | |
| # Explicit quality-oriented config to avoid falling back to poor defaults | |
| cmd = base + ["-vf", vf] + meta + [ | |
| "-c:v", "hevc_qsv", | |
| "-look_ahead", "0", | |
| "-global_quality", qp, | |
| "-preset", "slow", | |
| "-tag:v", "hvc1", | |
| out | |
| ] | |
| return cmd | |
| def _build_vaapi_cmd(base: List[str], vf: str, meta: List[str], qp: str, out: str) -> List[str]: | |
| # Minimal VAAPI path; may need environment/device setup on Linux | |
| return base + ["-vf", vf] + meta + [ | |
| "-c:v", "hevc_vaapi", | |
| "-qp", qp, | |
| "-tag:v", "hvc1", | |
| out | |
| ] | |
| def ffmpeg_cmd(cfg: RenderConfig, out_root: str) -> (List[str], str): | |
| base = [ | |
| "ffmpeg", "-y", | |
| "-f", "rawvideo", | |
| "-pix_fmt", "rgb48le", | |
| "-s", f"{W}x{H}", | |
| "-r", str(cfg.fps), | |
| "-i", "-", | |
| "-an" | |
| ] | |
| meta = color_metadata_args(cfg) | |
| if cfg.codec == "ffv1": | |
| out = out_root + ".mkv" | |
| cmd = base + meta + ["-c:v", "ffv1", "-level", "3", "-g", "1", out] | |
| return cmd, out | |
| if cfg.codec == "prores4444": | |
| out = out_root + ".mov" | |
| vf = rgb_to_yuv_filter(cfg, "yuv444p10le") | |
| cmd = base + ["-vf", vf] + meta + [ | |
| "-c:v", "prores_ks", "-profile:v", "4", "-vendor", "apl0", "-bits_per_mb", "8000", out | |
| ] | |
| return cmd, out | |
| # HEVC paths | |
| hw = detect_hevc_hw_encoder() | |
| want_hw = (cfg.codec in ("hevc_auto", "hevc_hw")) | |
| use_hw = want_hw and (hw is not None) | |
| if cfg.codec == "hevc_hw" and hw is None: | |
| raise RuntimeError("hevc_hw selected but no HEVC hardware encoder found.") | |
| out_pix = "yuv444p10le" if cfg.suite == "sdr" else "yuv420p10le" | |
| vf = rgb_to_yuv_filter(cfg, out_pix) | |
| if cfg.hevc_quality == "high": | |
| crf = "10" | |
| qpi_qpp_qpb = ("16", "18", "20") | |
| qp = "18" | |
| else: | |
| crf = "14" | |
| qpi_qpp_qpb = ("22", "24", "26") | |
| qp = "24" | |
| # HDR10 metadata requirements | |
| need_hdr = (cfg.suite == "hdr10") | |
| can_inject_hdr_bsf = has_bsf("hevc_metadata") and bsf_supports_hdr10_metadata() | |
| if use_hw: | |
| out = out_root + f"_{hw}.mp4" | |
| # If HDR and no hevc_metadata bsf, force x265 fallback for correctness | |
| if need_hdr and not can_inject_hdr_bsf: | |
| use_hw = False | |
| else: | |
| high = (cfg.hevc_quality == "high") | |
| if hw == "hevc_amf": | |
| cmd = _build_amf_cmd(base, vf, meta, qpi_qpp_qpb, out) | |
| elif hw == "hevc_nvenc": | |
| cmd = _build_nvenc_cmd(base, vf, meta, qp, high, out) | |
| elif hw == "hevc_qsv": | |
| cmd = _build_qsv_cmd(base, vf, meta, qp, out) | |
| else: | |
| cmd = _build_vaapi_cmd(base, vf, meta, qp, out) | |
| if need_hdr: | |
| # inject HDR10 mastering + MaxCLL/FALL metadata | |
| cmd = cmd[:-1] + ["-bsf:v", hdr10_metadata_bsf(), cmd[-1]] | |
| return cmd, out | |
| # x265 fallback / explicit x265 | |
| out = out_root + ".mp4" | |
| cmd = base + ["-vf", vf] + meta + ["-c:v", "libx265", "-preset", "slow", "-tag:v", "hvc1"] | |
| if cfg.suite == "sdr": | |
| cmd += ["-crf", crf, out] | |
| return cmd, out | |
| x265_params = [ | |
| "hdr10=1", | |
| "repeat-headers=1", | |
| "master-display=G(8500,39850)B(6550,2300)R(34000,16000)WP(15635,16450)L(10000000,1)", | |
| "max-cll=1000,400", | |
| f"crf={crf}", | |
| ] | |
| cmd += ["-x265-params", ":".join(x265_params), out] | |
| return cmd, out | |
| # ========================== | |
| # Frame assembly | |
| # ========================== | |
| def build_body_frame(cfg: RenderConfig, t: float) -> np.ndarray: | |
| frame = gen_frame(cfg, t) | |
| if (not cfg.no_text_intro) and (t < cfg.text_intro_seconds): | |
| info = scenario_info(cfg.suite, cfg.scenario) | |
| alpha = 0.80 | |
| remain = cfg.text_intro_seconds - t | |
| if remain < 0.6: | |
| alpha *= max(0.0, remain / 0.6) | |
| draw_text_panel(frame, info["title"], info["goal"], info["watch"], alpha) | |
| if not cfg.no_timer: | |
| add_timer(frame, t, cfg.body_seconds, cfg.timer_color) | |
| return frame | |
| # ========================== | |
| # Rendering engine | |
| # ========================== | |
| def write_raw_rgb48(proc: subprocess.Popen, frame_rgb16: np.ndarray) -> None: | |
| global CANCEL_REQUESTED | |
| try: | |
| proc.stdin.write(frame_rgb16.tobytes()) | |
| except (BrokenPipeError, OSError) as e: | |
| # Common during abort: FFmpeg pipe is already closed while UI abort is in-flight. | |
| if CANCEL_REQUESTED: | |
| raise KeyboardInterrupt("Render aborted by user") from e | |
| raise | |
| def render_segment(cfg: RenderConfig, segment: str, progress_cb=None) -> str: | |
| global CURRENT_FFMPEG_PROC, CANCEL_REQUESTED | |
| validate_cfg(cfg) | |
| ensure_ffmpeg() | |
| os.makedirs(cfg.out_dir, exist_ok=True) | |
| base = cfg.basename or f"{cfg.suite}_{cfg.scenario}_{cfg.fps}fps_{segment}_{cfg.codec}" | |
| out_root = str(Path(cfg.out_dir) / base) | |
| cmd, out_path = ffmpeg_cmd(cfg, out_root) | |
| proc = subprocess.Popen(cmd, stdin=subprocess.PIPE) | |
| CURRENT_FFMPEG_PROC = proc | |
| t0 = time.time() | |
| def _check_abort(): | |
| if CANCEL_REQUESTED: | |
| try: | |
| if proc.poll() is None: | |
| proc.terminate() | |
| except Exception: | |
| pass | |
| raise KeyboardInterrupt("Render aborted by user") | |
| try: | |
| if segment == "intro": | |
| total = cfg.intro_frames | |
| for i in range(total): | |
| _check_abort() | |
| write_raw_rgb48(proc, apply_crt(cfg, i, total, "intro")) | |
| if progress_cb: | |
| progress_cb(i + 1, total, segment) | |
| elif segment == "outro": | |
| total = cfg.outro_frames | |
| for i in range(total): | |
| _check_abort() | |
| write_raw_rgb48(proc, apply_crt(cfg, i, total, "outro")) | |
| if progress_cb: | |
| progress_cb(i + 1, total, segment) | |
| elif segment == "body": | |
| total = cfg.body_frames | |
| for i in range(total): | |
| _check_abort() | |
| write_raw_rgb48(proc, build_body_frame(cfg, i / cfg.fps)) | |
| if progress_cb: | |
| progress_cb(i + 1, total, segment) | |
| elif segment == "all": | |
| total = cfg.intro_frames + cfg.body_frames + cfg.outro_frames | |
| n = 0 | |
| for i in range(cfg.intro_frames): | |
| _check_abort() | |
| write_raw_rgb48(proc, apply_crt(cfg, i, cfg.intro_frames, "intro")) | |
| n += 1 | |
| if progress_cb: | |
| progress_cb(n, total, segment) | |
| for i in range(cfg.body_frames): | |
| _check_abort() | |
| write_raw_rgb48(proc, build_body_frame(cfg, i / cfg.fps)) | |
| n += 1 | |
| if progress_cb: | |
| progress_cb(n, total, segment) | |
| for i in range(cfg.outro_frames): | |
| _check_abort() | |
| write_raw_rgb48(proc, apply_crt(cfg, i, cfg.outro_frames, "outro")) | |
| n += 1 | |
| if progress_cb: | |
| progress_cb(n, total, segment) | |
| else: | |
| raise ValueError(f"Unknown segment: {segment}") | |
| except KeyboardInterrupt: | |
| if progress_cb: | |
| progress_cb(-3, -3, "aborted") | |
| raise | |
| finally: | |
| try: | |
| if proc.stdin: | |
| proc.stdin.close() | |
| except Exception: | |
| pass | |
| try: | |
| proc.wait(timeout=10) | |
| except Exception: | |
| try: | |
| proc.kill() | |
| except Exception: | |
| pass | |
| CURRENT_FFMPEG_PROC = None | |
| if proc.returncode != 0: | |
| raise RuntimeError(f"FFmpeg failed with code {proc.returncode}") | |
| if progress_cb: | |
| progress_cb(-1, -1, f"done:{time.time() - t0:.1f}s") | |
| return out_path | |
| def render_requested(cfg: RenderConfig, progress_cb=None) -> List[str]: | |
| if cfg.segment == "separate": | |
| outs = [] | |
| for seg in ("intro", "body", "outro"): | |
| c2 = RenderConfig(**{**cfg.__dict__, "segment": seg, "basename": f"{cfg.suite}_{cfg.scenario}_{cfg.fps}fps_{seg}_{cfg.codec}"}) | |
| outs.append(render_segment(c2, seg, progress_cb)) | |
| return outs | |
| return [render_segment(cfg, cfg.segment, progress_cb)] | |
| def render_all_scenarios(cfg: RenderConfig, progress_cb=None) -> List[str]: | |
| outs: List[str] = [] | |
| names = scenario_names_for_suite(cfg.suite) | |
| for idx, scen in enumerate(names, start=1): | |
| if progress_cb: | |
| progress_cb(-2, len(names), f"scenario:{idx}/{len(names)}:{scen}") | |
| c2 = RenderConfig(**{**cfg.__dict__, "scenario": scen, "basename": ""}) | |
| outs.extend(render_requested(c2, progress_cb)) | |
| return outs | |
| # ========================== | |
| # CLI / GUI | |
| # ========================== | |
| def parse_args(): | |
| p = argparse.ArgumentParser(description="Panel Test Orchestrator — Final V4.1 (4K UHD)") | |
| p.add_argument("--cli", action="store_true", help="Force CLI mode") | |
| p.add_argument("--suite", default="sdr", choices=["sdr", "hdr10"]) | |
| p.add_argument("--scenario", default=None) | |
| p.add_argument("--all-scenarios", action="store_true") | |
| p.add_argument("--segment", default="all", choices=["intro", "body", "outro", "all", "separate"]) | |
| p.add_argument("--fps", type=int, default=120, choices=[60, 120]) | |
| p.add_argument("--seconds", type=float, default=30.0) | |
| p.add_argument("--text-intro-seconds", type=float, default=5.0) | |
| p.add_argument("--codec", default="ffv1", choices=["ffv1", "prores4444", "hevc_x265", "hevc_auto", "hevc_hw"]) | |
| p.add_argument("--hevc-quality", default="high", choices=["high", "medium"]) | |
| p.add_argument("--speed", default="medium", choices=["slow", "medium", "fast"]) | |
| p.add_argument("--timer-color", default="white", choices=["white", "red"]) | |
| p.add_argument("--no-timer", action="store_true") | |
| p.add_argument("--no-crt", action="store_true") | |
| p.add_argument("--no-text-intro", action="store_true") | |
| p.add_argument("--color-range", default="tv", choices=["tv", "pc"]) | |
| p.add_argument("--out", default="renders_final_v3_1") | |
| p.add_argument("--name", default="") | |
| return p.parse_args() | |
| def launch_gui(): | |
| import tkinter as tk | |
| from tkinter import ttk, filedialog, messagebox | |
| global CANCEL_REQUESTED, CURRENT_FFMPEG_PROC | |
| ensure_ffmpeg() | |
| hw = detect_hevc_hw_encoder() or "none" | |
| cpu_threads = os.cpu_count() or 1 | |
| zscale_ok = has_filter("zscale") | |
| bsf_ok = has_bsf("hevc_metadata") | |
| hdr_bsf_ok = bsf_ok and bsf_supports_hdr10_metadata() | |
| root = tk.Tk() | |
| root.title("Panel Test Orchestrator — Final V4.1 (4K UHD)") | |
| root.geometry("1120x940") | |
| vars_ = { | |
| "suite": tk.StringVar(value="sdr"), | |
| "scenario": tk.StringVar(value=scenario_names_for_suite("sdr")[0]), | |
| "all_scenarios": tk.BooleanVar(value=True), | |
| "segment": tk.StringVar(value="all"), | |
| "fps": tk.StringVar(value="120"), | |
| "seconds": tk.StringVar(value="30"), | |
| "text_intro_seconds": tk.StringVar(value="5"), | |
| "codec": tk.StringVar(value="hevc_auto"), | |
| "hevc_quality": tk.StringVar(value="high"), | |
| "speed": tk.StringVar(value="medium"), | |
| "timer_color": tk.StringVar(value="white"), | |
| "no_timer": tk.BooleanVar(value=False), | |
| "no_crt": tk.BooleanVar(value=False), | |
| "no_text_intro": tk.BooleanVar(value=False), | |
| "color_range": tk.StringVar(value="tv"), | |
| "out": tk.StringVar(value=str(Path.cwd() / "renders_final_v4_1")), | |
| "name": tk.StringVar(value=""), | |
| "perf_profile": tk.StringVar(value="high"), | |
| } | |
| work_q = queue.Queue() | |
| worker = {"thread": None} | |
| ui_state = {"running": False} | |
| def predicted_hw_status(): | |
| codec = vars_["codec"].get() | |
| suite = vars_["suite"].get() | |
| if codec in ("ffv1", "prores4444", "hevc_x265"): | |
| if codec == "hevc_x265": | |
| return "Encoding path: CPU (libx265)" | |
| return "Encoding path: CPU (non-HEVC codec)" | |
| if hw == "none": | |
| return "Encoding path: No HEVC HW encoder detected -> CPU x265 fallback" | |
| if suite == "hdr10" and not hdr_bsf_ok: | |
| return f"Encoding path: {hw} detected, but HDR BSF opts missing -> CPU x265 fallback" | |
| return f"Encoding path: HW HEVC ({hw})" | |
| def update_hw_status(*_): | |
| hw_status_var.set(predicted_hw_status()) | |
| def suggest_profile_name() -> str: | |
| if hw != "none" and cpu_threads >= 16: | |
| return "high" | |
| if hw != "none" and cpu_threads >= 8: | |
| return "medium" | |
| return "low" | |
| def apply_profile(name: str): | |
| if name == "low": | |
| vars_["fps"].set("60") | |
| vars_["codec"].set("hevc_auto" if hw != "none" else "hevc_x265") | |
| vars_["hevc_quality"].set("medium") | |
| vars_["speed"].set("medium") | |
| vars_["text_intro_seconds"].set("3") | |
| elif name == "medium": | |
| vars_["fps"].set("60") | |
| vars_["codec"].set("hevc_auto" if hw != "none" else "hevc_x265") | |
| vars_["hevc_quality"].set("high") | |
| vars_["speed"].set("medium") | |
| vars_["text_intro_seconds"].set("5") | |
| elif name == "high": | |
| vars_["fps"].set("120" if cpu_threads >= 8 else "60") | |
| vars_["codec"].set("hevc_auto" if hw != "none" else "hevc_x265") | |
| vars_["hevc_quality"].set("high") | |
| vars_["speed"].set("medium") | |
| vars_["text_intro_seconds"].set("5") | |
| elif name == "extreme": | |
| vars_["fps"].set("120") | |
| vars_["codec"].set("ffv1" if cpu_threads >= 12 else ("hevc_auto" if hw != "none" else "hevc_x265")) | |
| vars_["hevc_quality"].set("high") | |
| vars_["speed"].set("medium") | |
| vars_["text_intro_seconds"].set("5") | |
| update_hw_status() | |
| def update_scenarios(*_): | |
| suite = vars_["suite"].get() | |
| opts = scenario_names_for_suite(suite) | |
| scenario_combo["values"] = opts | |
| if vars_["scenario"].get() not in opts: | |
| vars_["scenario"].set(opts[0]) | |
| update_hw_status() | |
| pad = {"padx": 10, "pady": 5} | |
| ttk.Label(root, text="Panel Test Orchestrator — Final V4.1", font=("Segoe UI", 15, "bold")).grid(row=0, column=0, columnspan=4, sticky="w", padx=10, pady=10) | |
| ttk.Label( | |
| root, | |
| text=f"Fixed 4K UHD 3840x2160 • CPU threads: {cpu_threads} • HEVC HW: {hw} • zscale: {'yes' if zscale_ok else 'no'} • hevc_metadata HDR opts: {'yes' if hdr_bsf_ok else 'no'}", | |
| foreground="#555" | |
| ).grid(row=1, column=0, columnspan=4, sticky="w", padx=10) | |
| hw_status_var = tk.StringVar(value="") | |
| ttk.Label(root, textvariable=hw_status_var, foreground="#003366").grid(row=2, column=0, columnspan=4, sticky="w", padx=10) | |
| ttk.Label(root, text="Suite").grid(row=3, column=0, sticky="w", **pad) | |
| suite_combo = ttk.Combobox(root, textvariable=vars_["suite"], values=["sdr", "hdr10"], state="readonly", width=16) | |
| suite_combo.grid(row=3, column=1, sticky="w", **pad) | |
| suite_combo.bind("<<ComboboxSelected>>", update_scenarios) | |
| ttk.Label(root, text="Scenario").grid(row=3, column=2, sticky="w", **pad) | |
| scenario_combo = ttk.Combobox(root, textvariable=vars_["scenario"], values=scenario_names_for_suite("sdr"), state="readonly", width=42) | |
| scenario_combo.grid(row=3, column=3, sticky="w", **pad) | |
| ttk.Checkbutton(root, text="Render ALL scenarios in suite", variable=vars_["all_scenarios"]).grid(row=4, column=0, columnspan=2, sticky="w", **pad) | |
| ttk.Label(root, text="Export mode").grid(row=5, column=0, sticky="w", **pad) | |
| segment_combo = ttk.Combobox(root, textvariable=vars_["segment"], values=["intro", "body", "outro", "all", "separate"], state="readonly", width=16) | |
| segment_combo.grid(row=5, column=1, sticky="w", **pad) | |
| ttk.Label(root, text="Frame rate").grid(row=5, column=2, sticky="w", **pad) | |
| fps_combo = ttk.Combobox(root, textvariable=vars_["fps"], values=["60", "120"], state="readonly", width=10) | |
| fps_combo.grid(row=5, column=3, sticky="w", **pad) | |
| ttk.Label(root, text="Body seconds").grid(row=6, column=0, sticky="w", **pad) | |
| seconds_entry = ttk.Entry(root, textvariable=vars_["seconds"], width=12) | |
| seconds_entry.grid(row=6, column=1, sticky="w", **pad) | |
| ttk.Label(root, text="Text intro seconds").grid(row=6, column=2, sticky="w", **pad) | |
| text_intro_entry = ttk.Entry(root, textvariable=vars_["text_intro_seconds"], width=12) | |
| text_intro_entry.grid(row=6, column=3, sticky="w", **pad) | |
| ttk.Label(root, text="Codec").grid(row=7, column=0, sticky="w", **pad) | |
| codec_combo = ttk.Combobox(root, textvariable=vars_["codec"], values=["ffv1", "prores4444", "hevc_x265", "hevc_auto", "hevc_hw"], state="readonly", width=16) | |
| codec_combo.grid(row=7, column=1, sticky="w", **pad) | |
| codec_combo.bind("<<ComboboxSelected>>", update_hw_status) | |
| ttk.Label(root, text="HEVC quality").grid(row=7, column=2, sticky="w", **pad) | |
| hevcq_combo = ttk.Combobox(root, textvariable=vars_["hevc_quality"], values=["high", "medium"], state="readonly", width=10) | |
| hevcq_combo.grid(row=7, column=3, sticky="w", **pad) | |
| ttk.Label(root, text="Pattern speed").grid(row=8, column=0, sticky="w", **pad) | |
| speed_combo = ttk.Combobox(root, textvariable=vars_["speed"], values=["slow", "medium", "fast"], state="readonly", width=12) | |
| speed_combo.grid(row=8, column=1, sticky="w", **pad) | |
| ttk.Label(root, text="Timer color").grid(row=8, column=2, sticky="w", **pad) | |
| timer_combo = ttk.Combobox(root, textvariable=vars_["timer_color"], values=["white", "red"], state="readonly", width=10) | |
| timer_combo.grid(row=8, column=3, sticky="w", **pad) | |
| ttk.Label(root, text="Color range").grid(row=9, column=0, sticky="w", **pad) | |
| colorrange_combo = ttk.Combobox(root, textvariable=vars_["color_range"], values=["tv", "pc"], state="readonly", width=10) | |
| colorrange_combo.grid(row=9, column=1, sticky="w", **pad) | |
| no_timer_chk = ttk.Checkbutton(root, text="Disable timer", variable=vars_["no_timer"]) | |
| no_timer_chk.grid(row=10, column=0, sticky="w", **pad) | |
| no_crt_chk = ttk.Checkbutton(root, text="Disable CRT intro/outro", variable=vars_["no_crt"]) | |
| no_crt_chk.grid(row=10, column=1, sticky="w", **pad) | |
| no_text_chk = ttk.Checkbutton(root, text="Disable text intro panel", variable=vars_["no_text_intro"]) | |
| no_text_chk.grid(row=10, column=2, sticky="w", **pad) | |
| ttk.Label(root, text="Output folder").grid(row=11, column=0, sticky="w", **pad) | |
| out_entry = ttk.Entry(root, textvariable=vars_["out"], width=68) | |
| out_entry.grid(row=11, column=1, columnspan=2, sticky="w", **pad) | |
| def browse_out(): | |
| d = filedialog.askdirectory(initialdir=vars_["out"].get() or str(Path.cwd())) | |
| if d: | |
| vars_["out"].set(d) | |
| browse_btn = ttk.Button(root, text="Browse...", command=browse_out) | |
| browse_btn.grid(row=11, column=3, sticky="w", **pad) | |
| ttk.Label(root, text="Base filename (optional)").grid(row=12, column=0, sticky="w", **pad) | |
| name_entry = ttk.Entry(root, textvariable=vars_["name"], width=32) | |
| name_entry.grid(row=12, column=1, sticky="w", **pad) | |
| ttk.Label(root, text="Performance profile").grid(row=12, column=2, sticky="w", **pad) | |
| perf_combo = ttk.Combobox(root, textvariable=vars_["perf_profile"], values=["low", "medium", "high", "extreme"], state="readonly", width=12) | |
| perf_combo.grid(row=12, column=3, sticky="w", **pad) | |
| prof_row = ttk.Frame(root) | |
| prof_row.grid(row=13, column=0, columnspan=4, sticky="w", padx=10, pady=4) | |
| apply_profile_btn = ttk.Button(prof_row, text="Apply Profile", command=lambda: apply_profile(vars_["perf_profile"].get())) | |
| apply_profile_btn.grid(row=0, column=0, padx=4) | |
| auto_profile_btn = ttk.Button(prof_row, text="Auto Recommend", command=lambda: (vars_["perf_profile"].set(suggest_profile_name()), apply_profile(vars_["perf_profile"].get()))) | |
| auto_profile_btn.grid(row=0, column=1, padx=4) | |
| pf = ttk.LabelFrame(root, text="Presets") | |
| pf.grid(row=14, column=0, columnspan=4, sticky="ew", padx=10, pady=8) | |
| def preset_master(): | |
| vars_["suite"].set("sdr"); update_scenarios() | |
| vars_["codec"].set("ffv1"); update_hw_status() | |
| vars_["fps"].set("120") | |
| vars_["segment"].set("all") | |
| vars_["seconds"].set("30") | |
| vars_["all_scenarios"].set(True) | |
| def preset_sdr_hevc(): | |
| vars_["suite"].set("sdr"); update_scenarios() | |
| vars_["codec"].set("hevc_auto"); update_hw_status() | |
| vars_["fps"].set("60") | |
| vars_["segment"].set("all") | |
| vars_["seconds"].set("30") | |
| vars_["all_scenarios"].set(True) | |
| def preset_hdr_hevc(): | |
| vars_["suite"].set("hdr10"); update_scenarios() | |
| vars_["codec"].set("hevc_auto"); update_hw_status() | |
| vars_["fps"].set("60") | |
| vars_["segment"].set("all") | |
| vars_["seconds"].set("30") | |
| vars_["all_scenarios"].set(True) | |
| ttk.Button(pf, text="Master (SDR FFV1 120fps)", command=preset_master).grid(row=0, column=0, padx=8, pady=8) | |
| ttk.Button(pf, text="SDR HEVC Delivery (Auto HW)", command=preset_sdr_hevc).grid(row=0, column=1, padx=8, pady=8) | |
| ttk.Button(pf, text="HDR10 HEVC Delivery (Auto HW)", command=preset_hdr_hevc).grid(row=0, column=2, padx=8, pady=8) | |
| ttk.Label(root, text="Progress").grid(row=15, column=0, sticky="w", padx=10) | |
| prog = ttk.Progressbar(root, orient="horizontal", mode="determinate", length=800) | |
| prog.grid(row=15, column=1, columnspan=3, sticky="w", padx=10) | |
| status = tk.StringVar(value="Ready") | |
| ttk.Label(root, textvariable=status).grid(row=16, column=0, columnspan=4, sticky="w", padx=10) | |
| log = tk.Text(root, height=13, width=132) | |
| log.grid(row=17, column=0, columnspan=4, padx=10, pady=8) | |
| log.insert("end", | |
| "Notes:\n" | |
| "- GUI rendering runs in a background thread, so the window stays responsive.\n" | |
| "- Abort stops the current FFmpeg process and cancels the batch.\n" | |
| "- HEVC status line above predicts whether HW or CPU fallback will be used.\n") | |
| log.config(state="disabled") | |
| def append_log(msg: str): | |
| log.config(state="normal") | |
| log.insert("end", msg + "\n") | |
| log.see("end") | |
| log.config(state="disabled") | |
| interactive_widgets = [ | |
| suite_combo, scenario_combo, segment_combo, fps_combo, seconds_entry, text_intro_entry, | |
| codec_combo, hevcq_combo, speed_combo, timer_combo, colorrange_combo, out_entry, browse_btn, | |
| name_entry, perf_combo, apply_profile_btn, auto_profile_btn | |
| ] | |
| def set_controls_enabled(enabled: bool): | |
| for w in interactive_widgets: | |
| try: | |
| if enabled: | |
| w.state(["!disabled"]) | |
| else: | |
| w.state(["disabled"]) | |
| except Exception: | |
| try: | |
| w.configure(state=("normal" if enabled else "disabled")) | |
| except Exception: | |
| pass | |
| for chk in (no_timer_chk, no_crt_chk, no_text_chk): | |
| try: | |
| chk.configure(state=("normal" if enabled else "disabled")) | |
| except Exception: | |
| pass | |
| try: | |
| if enabled: | |
| render_btn.state(["!disabled"]) | |
| abort_btn.state(["disabled"]) | |
| else: | |
| render_btn.state(["disabled"]) | |
| abort_btn.state(["!disabled"]) | |
| except Exception: | |
| pass | |
| def ui_progress_cb(done, total, label): | |
| work_q.put(("progress", done, total, label)) | |
| def collect_cfg(): | |
| return RenderConfig( | |
| fps=int(vars_["fps"].get()), | |
| body_seconds=float(vars_["seconds"].get()), | |
| text_intro_seconds=float(vars_["text_intro_seconds"].get()), | |
| suite=vars_["suite"].get(), | |
| scenario=vars_["scenario"].get(), | |
| all_scenarios=vars_["all_scenarios"].get(), | |
| segment=vars_["segment"].get(), | |
| out_dir=vars_["out"].get(), | |
| basename=vars_["name"].get().strip(), | |
| codec=vars_["codec"].get(), | |
| hevc_quality=vars_["hevc_quality"].get(), | |
| speed=vars_["speed"].get(), | |
| timer_color=vars_["timer_color"].get(), | |
| no_timer=vars_["no_timer"].get(), | |
| no_crt=vars_["no_crt"].get(), | |
| no_text_intro=vars_["no_text_intro"].get(), | |
| color_range=vars_["color_range"].get(), | |
| ) | |
| def worker_run(): | |
| global CANCEL_REQUESTED | |
| try: | |
| cfg = collect_cfg() | |
| validate_cfg(cfg) | |
| work_q.put(("log", f"Start render: suite={cfg.suite}, codec={cfg.codec}, fps={cfg.fps}")) | |
| t0 = time.time() | |
| if cfg.all_scenarios: | |
| outs = render_all_scenarios(cfg, ui_progress_cb) | |
| else: | |
| outs = render_requested(cfg, ui_progress_cb) | |
| dt = time.time() - t0 | |
| work_q.put(("done", outs, dt)) | |
| except KeyboardInterrupt: | |
| work_q.put(("aborted",)) | |
| except Exception as e: | |
| if CANCEL_REQUESTED or getattr(e, "errno", None) in (22, 32): | |
| work_q.put(("aborted",)) | |
| else: | |
| work_q.put(("error", str(e))) | |
| finally: | |
| CANCEL_REQUESTED = False | |
| def run_render(): | |
| global CANCEL_REQUESTED | |
| if ui_state["running"]: | |
| return | |
| try: | |
| cfg = collect_cfg() | |
| validate_cfg(cfg) | |
| except Exception as e: | |
| messagebox.showerror("Error", str(e)) | |
| return | |
| CANCEL_REQUESTED = False | |
| ui_state["running"] = True | |
| prog["value"] = 0 | |
| status.set("Starting...") | |
| append_log(predicted_hw_status()) | |
| set_controls_enabled(False) | |
| worker["thread"] = threading.Thread(target=worker_run, daemon=True) | |
| worker["thread"].start() | |
| def abort_render(): | |
| global CANCEL_REQUESTED, CURRENT_FFMPEG_PROC | |
| if not ui_state["running"]: | |
| return | |
| CANCEL_REQUESTED = True | |
| status.set("Abort requested...") | |
| append_log("Abort requested by user.") | |
| try: | |
| if CURRENT_FFMPEG_PROC is not None and CURRENT_FFMPEG_PROC.poll() is None: | |
| CURRENT_FFMPEG_PROC.terminate() | |
| except Exception: | |
| pass | |
| def pump_queue(): | |
| try: | |
| while True: | |
| msg = work_q.get_nowait() | |
| kind = msg[0] | |
| if kind == "progress": | |
| _, done, total, label = msg | |
| if isinstance(label, str) and label.startswith("scenario:"): | |
| status.set(label.replace("scenario:", "Scenario ")) | |
| append_log(status.get()) | |
| elif label == "aborted": | |
| status.set("Aborted") | |
| append_log("Render aborted.") | |
| elif isinstance(label, str) and label.startswith("done:"): | |
| status.set(f"Done in {label.split(':',1)[1]}") | |
| prog["value"] = 100 | |
| elif total > 0: | |
| prog["maximum"] = total | |
| prog["value"] = done | |
| status.set(f"Rendering {label}: {done}/{total} frames") | |
| elif kind == "log": | |
| append_log(msg[1]) | |
| elif kind == "done": | |
| _, outs, dt = msg | |
| ui_state["running"] = False | |
| set_controls_enabled(True) | |
| append_log(f"Finished in {dt:.1f}s") | |
| for o in outs: | |
| append_log(f" {o}") | |
| status.set("Ready") | |
| messagebox.showinfo("Done", "Rendered:\n" + "\n".join(outs)) | |
| elif kind == "aborted": | |
| ui_state["running"] = False | |
| set_controls_enabled(True) | |
| status.set("Aborted") | |
| append_log("Render aborted.") | |
| elif kind == "error": | |
| ui_state["running"] = False | |
| set_controls_enabled(True) | |
| status.set("Error") | |
| append_log(f"ERROR: {msg[1]}") | |
| messagebox.showerror("Error", msg[1]) | |
| except queue.Empty: | |
| pass | |
| root.after(100, pump_queue) | |
| btn_frame = ttk.Frame(root) | |
| btn_frame.grid(row=18, column=0, columnspan=4, sticky="e", padx=10, pady=8) | |
| render_btn = ttk.Button(btn_frame, text="Render", command=run_render) | |
| render_btn.grid(row=0, column=0, padx=6) | |
| abort_btn = ttk.Button(btn_frame, text="Abort", command=abort_render) | |
| abort_btn.grid(row=0, column=1, padx=6) | |
| abort_btn.state(["disabled"]) | |
| vars_["perf_profile"].set(suggest_profile_name()) | |
| update_scenarios() | |
| apply_profile(vars_["perf_profile"].get()) | |
| set_controls_enabled(True) | |
| pump_queue() | |
| root.mainloop() | |
| def main(): | |
| args = parse_args() | |
| if len(sys.argv) == 1 and not args.cli: | |
| launch_gui() | |
| return | |
| ensure_ffmpeg() | |
| suite = args.suite | |
| scenario = args.scenario or scenario_names_for_suite(suite)[0] | |
| cfg = RenderConfig( | |
| fps=args.fps, | |
| body_seconds=args.seconds, | |
| text_intro_seconds=args.text_intro_seconds, | |
| suite=suite, | |
| scenario=scenario, | |
| all_scenarios=args.all_scenarios, | |
| segment=args.segment, | |
| out_dir=args.out, | |
| basename=args.name, | |
| codec=args.codec, | |
| hevc_quality=args.hevc_quality, | |
| speed=args.speed, | |
| timer_color=args.timer_color, | |
| no_timer=args.no_timer, | |
| no_crt=args.no_crt, | |
| no_text_intro=args.no_text_intro, | |
| color_range=args.color_range, | |
| ) | |
| validate_cfg(cfg) | |
| print("Detected HEVC HW encoder:", detect_hevc_hw_encoder() or "none") | |
| print("zscale filter:", "yes" if has_filter("zscale") else "no") | |
| print("hevc_metadata HDR opts:", "yes" if (has_bsf("hevc_metadata") and bsf_supports_hdr10_metadata()) else "no") | |
| def cli_progress(done, total, label): | |
| if isinstance(label, str) and label.startswith("scenario:"): | |
| print(label.replace("scenario:", "")) | |
| return | |
| if isinstance(label, str) and label.startswith("done:"): | |
| print(" done in", label.split(":", 1)[1]) | |
| return | |
| if total > 0 and done % max(1, total // 20) == 0: | |
| print(f" {label}: {done}/{total}") | |
| outs = render_all_scenarios(cfg, cli_progress) if cfg.all_scenarios else render_requested(cfg, cli_progress) | |
| print("\nRendered files:") | |
| for p in outs: | |
| print(" -", p) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment