Last active
January 8, 2026 12:07
-
-
Save DamianPala/3f194af9b3608124ebdfb036d3a298c9 to your computer and use it in GitHub Desktop.
Video Converter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # PYTHON_ARGCOMPLETE_OK | |
| import argparse | |
| import json | |
| import math | |
| import os | |
| import pty | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| import time | |
| import urllib.request | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List, Optional, Tuple | |
| try: | |
| import argcomplete | |
| except ImportError: | |
| argcomplete = None | |
| VIDEO_EXTS = {".mp4", ".mkv", ".mov", ".avi", ".m4v", ".webm", ".mpeg", ".mpg", ".ts"} # fmt: skip | |
| PRESETS: Dict[str, List[str]] = { | |
| "talking-head": [ | |
| "-map", "0:v:0", "-map", "0:a:0?", | |
| "-metadata:s:v:0", 'handler_name=AV1 video', | |
| "-metadata:s:a:0", 'handler_name=Opus audio', | |
| "-c:v", "libsvtav1", "-preset", "6", | |
| "-crf", "42", "-pix_fmt", "yuv420p", "-b:v", "0", | |
| "-c:a", "libopus", "-b:a", "48k", "-vbr", "on", "-compression_level", "10", | |
| ], | |
| "presentation": [ | |
| "-c:v", "libx264", "-preset", "medium", | |
| "-crf", "20", "-vf", "scale=-2:1080", | |
| "-pix_fmt", "yuv420p", "-c:a", "aac", | |
| "-b:a", "160k", | |
| ], | |
| # "apartment-tour": [ | |
| # "-map", "0:v:0", "-map", "0:a:0?", | |
| # "-metadata:s:v:0", 'handler_name=AV1 video', | |
| # "-metadata:s:a:0", 'handler_name=Opus audio', | |
| # "-c:v", "libsvtav1", "-preset", "6", | |
| # "-crf", "36", "-pix_fmt", "yuv420p", "-b:v", "0", | |
| # "-c:a", "libopus", "-b:a", "48k", "-vbr", "on", "-compression_level", "10", | |
| # ], | |
| # "apartment-tour": [ | |
| # "-map_metadata", "-1", | |
| # "-c:v", "libsvtav1", "-preset", "6", | |
| # "-crf", "32", "-pix_fmt", "yuv420p", | |
| # "-c:a", "libopus", "-b:a", "80k", | |
| # ], | |
| # "apartment-tour": [ | |
| # "-map_metadata", "-1", | |
| # "-c:v", "libaom-av1", "-cpu-used", "6", "-b:v", "0", | |
| # "-crf", "32", "-pix_fmt", "yuv420p", | |
| # "-c:a", "libopus", "-b:a", "80k", | |
| # # "-row-mt", "1", "-threads", "16", "-tile-columns", "1", "-tile-rows", "1" | |
| # ], | |
| # Final preset | |
| "apartment-tour": [ | |
| "-map", "0:v:0", "-map", "0:a:0?", | |
| "-metadata:s:v:0", 'handler_name=H.265/HEVC video', | |
| "-metadata:s:a:0", 'handler_name=Opus audio', | |
| "-c:v", "libx265", "-preset", "slow", | |
| "-crf", "28", "-pix_fmt", "yuv420p", | |
| "-c:a", "libopus", "-b:a", "48k", "-vbr", "on", "-compression_level", "10", | |
| ], | |
| # CRF30 gives some small artifacts and ghosting but very small size | |
| # CRF28 almost no artifacts and ghosting | |
| "dance-lesson": [ | |
| "-map", "0:v:0", "-map", "0:a:0?", | |
| "-metadata:s:v:0", 'handler_name=H.265/HEVC video', | |
| "-metadata:s:a:0", 'handler_name=Opus audio', | |
| "-c:v", "libx265", "-preset", "slow", "-tag:v", "hvc1", | |
| "-crf", "28", "-pix_fmt", "yuv420p", "-vf", "scale=-2:720", | |
| "-x265-params", "profile=main:level-idc=4.1:aq-mode=3:aq-strength=1.1:deblock=-1:-1", | |
| "-c:a", "libopus", "-b:a", "64k", "-vbr", "on", "-compression_level", "10", | |
| "-ac", "2", "-ar", "48000", | |
| ], | |
| "social-short": [ | |
| "-c:v", "libx264", "-preset", "fast", | |
| "-crf", "24", "-vf", "scale=-2:720", | |
| "-pix_fmt", "yuv420p", "-c:a", "aac", | |
| "-b:a", "128k", | |
| ], | |
| } # fmt: skip | |
| PRESET_CONTAINERS = { | |
| "talking-head": ".mkv", | |
| "presentation": ".mkv", | |
| "apartment-tour": ".mkv", | |
| "dance-lesson": ".mp4", | |
| "social-short": ".mkv", | |
| } | |
| SAMPLE_PRESETS: Dict[str, List[str]] = { | |
| # -------------------- HEVC / x265 -------------------- | |
| "hevc_x265_crf24_slow": [ | |
| "-map", "0", | |
| "-c:v", "libx265", "-preset", "slow", "-crf", "24", | |
| "-pix_fmt", "yuv420p", "-tag:v", "hvc1", | |
| "-x265-params", "range=full", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "hevc_x265_crf26_slow": [ | |
| "-map", "0", | |
| "-c:v", "libx265", "-preset", "slow", "-crf", "26", | |
| "-pix_fmt", "yuv420p", "-tag:v", "hvc1", | |
| "-x265-params", "range=full", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "hevc_x265_crf28_slow": [ | |
| "-map", "0", | |
| "-c:v", "libx265", "-preset", "slow", "-crf", "28", | |
| "-pix_fmt", "yuv420p", "-tag:v", "hvc1", | |
| "-x265-params", "range=full", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "hevc_x265_crf30_slow": [ | |
| "-map", "0", | |
| "-c:v", "libx265", "-preset", "slow", "-crf", "30", | |
| "-pix_fmt", "yuv420p", "-tag:v", "hvc1", | |
| "-x265-params", "range=full", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "hevc_x265_crf32_slow": [ | |
| "-map", "0", | |
| "-c:v", "libx265", "-preset", "slow", "-crf", "32", | |
| "-pix_fmt", "yuv420p", "-tag:v", "hvc1", | |
| "-x265-params", "range=full", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "hevc_x265_crf28_veryslow": [ | |
| "-map", "0", | |
| "-c:v", "libx265", "-preset", "veryslow", "-crf", "28", | |
| "-pix_fmt", "yuv420p", "-tag:v", "hvc1", | |
| "-x265-params", "range=full", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| # -------------------- AV1 / SVT-AV1 (p6 and p7) -------------------- | |
| "av1_svt_p6_crf28": [ | |
| "-map", "0", | |
| "-c:v", "libsvtav1", "-preset", "6", "-crf", "28", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "av1_svt_p6_crf30": [ | |
| "-map", "0", | |
| "-c:v", "libsvtav1", "-preset", "6", "-crf", "30", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "av1_svt_p6_crf32": [ | |
| "-map", "0", | |
| "-c:v", "libsvtav1", "-preset", "6", "-crf", "32", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "av1_svt_p6_crf34": [ | |
| "-map", "0", | |
| "-c:v", "libsvtav1", "-preset", "6", "-crf", "34", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "av1_svt_p6_crf36": [ | |
| "-map", "0", | |
| "-c:v", "libsvtav1", "-preset", "6", "-crf", "36", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "av1_svt_p7_crf28": [ | |
| "-map", "0", | |
| "-c:v", "libsvtav1", "-preset", "7", "-crf", "28", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "av1_svt_p7_crf30": [ | |
| "-map", "0", | |
| "-c:v", "libsvtav1", "-preset", "7", "-crf", "30", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "av1_svt_p7_crf32": [ | |
| "-map", "0", | |
| "-c:v", "libsvtav1", "-preset", "7", "-crf", "32", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "av1_svt_p7_crf34": [ | |
| "-map", "0", | |
| "-c:v", "libsvtav1", "-preset", "7", "-crf", "34", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "av1_svt_p7_crf36": [ | |
| "-map", "0", | |
| "-c:v", "libsvtav1", "-preset", "7", "-crf", "36", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| # -------------------- AV1 / AOM-AV1 p6 -------------------- | |
| "av1_aom_p6_crf28": [ | |
| "-map", "0", | |
| "-c:v", "libaom-av1", "-cpu-used", "6", "-b:v", "0", "-crf", "28", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "av1_aom_p6_crf30": [ | |
| "-map", "0", | |
| "-c:v", "libaom-av1", "-cpu-used", "6", "-b:v", "0", "-crf", "30", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "av1_aom_p6_crf32": [ | |
| "-map", "0", | |
| "-c:v", "libaom-av1", "-cpu-used", "6", "-b:v", "0", "-crf", "32", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "av1_aom_p6_crf34": [ | |
| "-map", "0", | |
| "-c:v", "libaom-av1", "-cpu-used", "6", "-b:v", "0", "-crf", "34", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| "av1_aom_p6_crf36": [ | |
| "-map", "0", | |
| "-c:v", "libaom-av1", "-cpu-used", "6", "-b:v", "0", "-crf", "36", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| ], | |
| } # fmt: skip | |
| @dataclass | |
| class VideoPair: | |
| test: Path | |
| ref: Path | |
| def _run(cmd: List[str]) -> subprocess.CompletedProcess: | |
| return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| def _run_ffmpeg_capture(cmd: List[str], show: bool = False) -> subprocess.CompletedProcess: | |
| print(" ".join(cmd)) | |
| if not show: | |
| return _run(cmd) | |
| try: | |
| master_fd, slave_fd = pty.openpty() | |
| except OSError: | |
| master_fd, slave_fd = None, None | |
| if master_fd is None or slave_fd is None: | |
| proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| stderr_chunks: List[bytes] = [] | |
| if proc.stderr: | |
| while True: | |
| chunk = proc.stderr.read(4096) | |
| if not chunk: | |
| break | |
| sys.stderr.buffer.write(chunk) | |
| sys.stderr.buffer.flush() | |
| stderr_chunks.append(chunk) | |
| stdout = b"" | |
| if proc.stdout: | |
| stdout = proc.stdout.read() | |
| returncode = proc.wait() | |
| return subprocess.CompletedProcess( | |
| cmd, | |
| returncode, | |
| stdout.decode(errors="replace"), | |
| b"".join(stderr_chunks).decode(errors="replace"), | |
| ) | |
| proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=slave_fd) | |
| os.close(slave_fd) | |
| stderr_chunks: List[bytes] = [] | |
| while True: | |
| try: | |
| chunk = os.read(master_fd, 4096) | |
| except OSError: | |
| break | |
| if not chunk: | |
| break | |
| sys.stderr.buffer.write(chunk) | |
| sys.stderr.buffer.flush() | |
| stderr_chunks.append(chunk) | |
| os.close(master_fd) | |
| stdout = b"" | |
| if proc.stdout: | |
| stdout = proc.stdout.read() | |
| returncode = proc.wait() | |
| return subprocess.CompletedProcess( | |
| cmd, | |
| returncode, | |
| stdout.decode(errors="replace"), | |
| b"".join(stderr_chunks).decode(errors="replace"), | |
| ) | |
| def _fix_seekability(path: Path, overwrite: bool, show: bool) -> int: | |
| ext = path.suffix.lower() | |
| tmp = path.with_name(f"{path.name}.tmp{ext}") | |
| if ext in {".mp4", ".m4v", ".mov"}: | |
| cmd = [ | |
| "ffmpeg", "-hide_banner", "-y" if overwrite else "-n", | |
| "-fflags", "+genpts", "-i", str(path), | |
| "-map", "0", "-c", "copy", | |
| "-movflags", "+faststart", | |
| str(tmp), | |
| ] # fmt: skip | |
| else: | |
| cmd = [ | |
| "ffmpeg", "-hide_banner", "-y" if overwrite else "-n", | |
| "-fflags", "+genpts", "-i", str(path), | |
| "-map", "0", "-c", "copy", "-avoid_negative_ts", | |
| "make_zero", "-max_interleave_delta", "0", "-cues_to_front", | |
| "1", str(tmp), | |
| ] # fmt: skip | |
| result = _run_ffmpeg_capture(cmd, show) | |
| if result.returncode != 0: | |
| if result.stderr: | |
| print(result.stderr, file=sys.stderr) | |
| try: | |
| if tmp.exists(): | |
| tmp.unlink() | |
| except OSError: | |
| pass | |
| return result.returncode | |
| tmp.replace(path) | |
| return 0 | |
| def _run_ffmpeg(cmd: List[str], show: bool) -> Tuple[int, str]: | |
| if show: | |
| return subprocess.run(cmd).returncode, "" | |
| result = _run(cmd) | |
| return result.returncode, result.stderr | |
| def _ffmpeg_base_cmd(show: bool) -> List[str]: | |
| cmd = ["ffmpeg"] | |
| if not show: | |
| cmd.append("-hide_banner") | |
| else: | |
| cmd.append("-stats") | |
| return cmd | |
| def _available_encoders() -> set: | |
| result = _run_ffmpeg_capture(["ffmpeg", "-hide_banner", "-encoders"]) | |
| if result.returncode != 0: | |
| return set() | |
| return set(re.findall(r"^\s*[A-Z\.]{6}\s+(\S+)\s", result.stdout, re.MULTILINE)) | |
| def _svt_to_aom(args: List[str]) -> List[str]: | |
| cpu_used = "6" | |
| for idx, value in enumerate(args): | |
| if value == "-preset" and idx + 1 < len(args): | |
| cpu_used = args[idx + 1] | |
| break | |
| out: List[str] = [] | |
| idx = 0 | |
| while idx < len(args): | |
| if args[idx] == "-c:v" and idx + 1 < len(args) and args[idx + 1] == "libsvtav1": | |
| out.extend(["-c:v", "libaom-av1", "-cpu-used", cpu_used, "-b:v", "0"]) | |
| idx += 2 | |
| continue | |
| if args[idx] == "-preset" and idx + 1 < len(args): | |
| idx += 2 | |
| continue | |
| out.append(args[idx]) | |
| idx += 1 | |
| return out | |
| def _resolve_preset(preset_name: str, preset_args: List[str]) -> List[str]: | |
| if "libsvtav1" not in preset_args: | |
| return preset_args | |
| encoders = _available_encoders() | |
| if "libsvtav1" in encoders: | |
| return preset_args | |
| if "libaom-av1" in encoders: | |
| print("libsvtav1 not available; falling back to libaom-av1") | |
| return _svt_to_aom(preset_args) | |
| raise SystemExit(f"Preset {preset_name} requires libsvtav1; install a build with libsvtav1 or change preset") | |
| def _probe_video_info(path: Path) -> Optional[Tuple[int, int, int]]: | |
| cmd = [ | |
| "ffprobe", "-hide_banner", "-v", "error", | |
| "-select_streams", "v:0", | |
| "-show_streams", "-print_format", "json", str(path), | |
| ] # fmt: skip | |
| result = _run(cmd) | |
| if result.returncode != 0: | |
| return None | |
| try: | |
| data = json.loads(result.stdout) | |
| except json.JSONDecodeError: | |
| return None | |
| streams = data.get("streams") or [] | |
| if not streams: | |
| return None | |
| stream = streams[0] | |
| width = stream.get("width") | |
| height = stream.get("height") | |
| if not width or not height: | |
| return None | |
| def _parse_rotation(value: object) -> Optional[int]: | |
| if value is None: | |
| return None | |
| if isinstance(value, (int, float)): | |
| return int(value) | |
| if isinstance(value, str): | |
| try: | |
| return int(float(value)) | |
| except ValueError: | |
| return None | |
| return None | |
| rotation = _parse_rotation((stream.get("tags") or {}).get("rotate")) | |
| if not rotation: | |
| for side in stream.get("side_data_list") or []: | |
| rot = _parse_rotation(side.get("rotation")) | |
| if rot: | |
| rotation = rot | |
| break | |
| if rotation is None: | |
| rotation = 0 | |
| return int(width), int(height), rotation | |
| def _probe_duration(path: Path) -> Optional[float]: | |
| cmd = [ | |
| "ffprobe", | |
| "-hide_banner", | |
| "-v", | |
| "error", | |
| "-show_entries", | |
| "format=duration", | |
| "-of", | |
| "json", | |
| str(path), | |
| ] | |
| result = _run(cmd) | |
| if result.returncode != 0: | |
| return None | |
| try: | |
| data = json.loads(result.stdout) | |
| except json.JSONDecodeError: | |
| return None | |
| duration = (data.get("format") or {}).get("duration") | |
| if duration is None: | |
| return None | |
| try: | |
| return float(duration) | |
| except ValueError: | |
| return None | |
| def _probe_stream_counts(path: Path) -> Tuple[int, int]: | |
| cmd = [ | |
| "ffprobe", | |
| "-hide_banner", | |
| "-v", | |
| "error", | |
| "-show_entries", | |
| "stream=codec_type", | |
| "-of", | |
| "json", | |
| str(path), | |
| ] | |
| result = _run(cmd) | |
| if result.returncode != 0: | |
| return 0, 0 | |
| try: | |
| data = json.loads(result.stdout) | |
| except json.JSONDecodeError: | |
| return 0, 0 | |
| video = 0 | |
| audio = 0 | |
| for stream in data.get("streams") or []: | |
| stype = stream.get("codec_type") | |
| if stype == "video": | |
| video += 1 | |
| elif stype == "audio": | |
| audio += 1 | |
| return video, audio | |
| def _decode_check(path: Path, show: bool) -> bool: | |
| cmd = [ | |
| "ffmpeg", | |
| "-hide_banner", | |
| "-v", | |
| "error", | |
| "-i", | |
| str(path), | |
| "-f", | |
| "null", | |
| "-", | |
| ] | |
| result = _run_ffmpeg_capture(cmd, show) | |
| if result.returncode != 0: | |
| return False | |
| return result.stderr.strip() == "" | |
| def _verify_conversion(src: Path, dst: Path, show: bool, duration_tolerance: float) -> Dict[str, object]: | |
| src_duration = _probe_duration(src) | |
| dst_duration = _probe_duration(dst) | |
| duration_diff = None | |
| duration_ok = True | |
| if src_duration is not None and dst_duration is not None: | |
| duration_diff = abs(dst_duration - src_duration) | |
| duration_ok = duration_diff <= duration_tolerance | |
| src_video, src_audio = _probe_stream_counts(src) | |
| dst_video, dst_audio = _probe_stream_counts(dst) | |
| video_ok = dst_video > 0 | |
| audio_ok = src_audio == 0 or dst_audio > 0 | |
| decode_ok = _decode_check(dst, show) | |
| ok = decode_ok and duration_ok and audio_ok and video_ok | |
| return { | |
| "output": dst.name, | |
| "decode_ok": decode_ok, | |
| "duration_diff": duration_diff, | |
| "duration_ok": duration_ok, | |
| "audio_ok": audio_ok, | |
| "video_ok": video_ok, | |
| "ok": ok, | |
| } | |
| def _probe_frame_rate(path: Path) -> Optional[str]: | |
| for entry in ("r_frame_rate", "avg_frame_rate"): | |
| cmd = [ | |
| "ffprobe", | |
| "-hide_banner", | |
| "-v", | |
| "error", | |
| "-select_streams", | |
| "v:0", | |
| "-show_entries", | |
| f"stream={entry}", | |
| "-of", | |
| "default=nk=1:nw=1", | |
| str(path), | |
| ] | |
| result = _run(cmd) | |
| if result.returncode != 0: | |
| continue | |
| value = (result.stdout or "").strip() | |
| if value and value != "0/0": | |
| return value | |
| return None | |
| def _probe_color_range(path: Path) -> str: | |
| cmd = [ | |
| "ffprobe", | |
| "-hide_banner", | |
| "-v", | |
| "error", | |
| "-select_streams", | |
| "v:0", | |
| "-show_entries", | |
| "stream=color_range", | |
| "-of", | |
| "default=nk=1:nw=1", | |
| str(path), | |
| ] | |
| result = _run(cmd) | |
| if result.returncode != 0: | |
| return "tv" | |
| value = (result.stdout or "").strip() | |
| if value in {"pc", "tv"}: | |
| return value | |
| return "tv" | |
| def _probe_pixel_format(path: Path) -> Optional[str]: | |
| cmd = [ | |
| "ffprobe", | |
| "-hide_banner", | |
| "-v", | |
| "error", | |
| "-select_streams", | |
| "v:0", | |
| "-show_entries", | |
| "stream=pix_fmt", | |
| "-of", | |
| "default=nk=1:nw=1", | |
| str(path), | |
| ] | |
| result = _run(cmd) | |
| if result.returncode != 0: | |
| return None | |
| value = (result.stdout or "").strip() | |
| return value or None | |
| def _select_pixel_format(pix_fmt_in: Optional[str]) -> str: | |
| if pix_fmt_in and "420p10le" in pix_fmt_in: | |
| return "yuv420p10le" | |
| return "yuv420p" | |
| def _talking_head_vf(src: Path) -> str: | |
| info = _probe_video_info(src) | |
| scale = "scale=-2:360" | |
| if info: | |
| width, height, rotation = info | |
| if abs(rotation) % 180 == 90: | |
| width, height = height, width | |
| if height >= width: | |
| scale = "scale=360:-2" | |
| return f"{scale},setsar=1,fps=25" | |
| def _rotation_aware_scale_720(src: Path, vf: str) -> str: | |
| filters = [f.strip() for f in vf.split(",") if f.strip()] | |
| info = _probe_video_info(src) | |
| if not info: | |
| return vf | |
| width, height, rotation = info | |
| if abs(rotation) % 180 == 90: | |
| width, height = height, width | |
| changed = False | |
| for idx, flt in enumerate(filters): | |
| match = re.match(r"scale=(-?\d+):(-?\d+)$", flt) | |
| if not match: | |
| continue | |
| width_arg = int(match.group(1)) | |
| height_arg = int(match.group(2)) | |
| if width_arg == -2 and height_arg > 0: | |
| target = height_arg | |
| elif height_arg == -2 and width_arg > 0: | |
| target = width_arg | |
| else: | |
| continue | |
| if min(width, height) <= target: | |
| filters[idx] = "" | |
| else: | |
| if width >= height: | |
| filters[idx] = f"scale=-2:{target}" | |
| else: | |
| filters[idx] = f"scale={target}:-2" | |
| changed = True | |
| if not changed: | |
| return vf | |
| filters = [f for f in filters if f] | |
| return ",".join(filters) | |
| def _transpose_filter(rotation: int) -> Optional[str]: | |
| rot = rotation % 360 | |
| if rot == 90: | |
| return "transpose=1" | |
| if rot == 270: | |
| return "transpose=2" | |
| if rot == 180: | |
| return "transpose=1,transpose=1" | |
| return None | |
| def _apply_vf(args: List[str], vf: str) -> List[str]: | |
| if "-vf" in args: | |
| idx = args.index("-vf") | |
| if idx + 1 < len(args): | |
| updated = list(args) | |
| updated[idx + 1] = vf | |
| return updated | |
| return [*args, "-vf", vf] | |
| def _append_vf(args: List[str], vf: str) -> List[str]: | |
| if "-vf" in args: | |
| idx = args.index("-vf") | |
| if idx + 1 < len(args): | |
| updated = list(args) | |
| updated[idx + 1] = f"{updated[idx + 1]},{vf}" | |
| return updated | |
| return [*args, "-vf", vf] | |
| def _apply_rotation_aware_scale_720(args: List[str], src: Path) -> List[str]: | |
| if "-vf" not in args: | |
| return args | |
| idx = args.index("-vf") | |
| if idx + 1 >= len(args): | |
| return args | |
| vf = args[idx + 1] | |
| updated_vf = _rotation_aware_scale_720(src, vf) | |
| if updated_vf == vf: | |
| return args | |
| updated = list(args) | |
| if updated_vf: | |
| updated[idx + 1] = updated_vf | |
| return updated | |
| return updated[:idx] + updated[idx + 2 :] | |
| def _apply_arg(args: List[str], key: str, value: str) -> List[str]: | |
| if key in args: | |
| idx = args.index(key) | |
| if idx + 1 < len(args): | |
| updated = list(args) | |
| updated[idx + 1] = value | |
| return updated | |
| return [*args, key, value] | |
| def _set_x265_range(args: List[str], in_range: str) -> List[str]: | |
| if "libx265" not in args: | |
| return args | |
| range_value = "full" if in_range == "pc" else "limited" | |
| if "-x265-params" in args: | |
| idx = args.index("-x265-params") | |
| if idx + 1 < len(args): | |
| params = args[idx + 1].split(":") if args[idx + 1] else [] | |
| updated_params: List[str] = [] | |
| found = False | |
| for param in params: | |
| if param.startswith("range="): | |
| updated_params.append(f"range={range_value}") | |
| found = True | |
| else: | |
| updated_params.append(param) | |
| if not found: | |
| updated_params.append(f"range={range_value}") | |
| updated = list(args) | |
| updated[idx + 1] = ":".join(p for p in updated_params if p) | |
| return updated | |
| return [*args, "-x265-params", f"range={range_value}"] | |
| def _ensure_ffmpeg() -> None: | |
| for tool in ("ffmpeg", "ffprobe"): | |
| if shutil.which(tool) is None: | |
| raise SystemExit(f"Missing required tool: {tool}") | |
| def _ensure_libvmaf() -> None: | |
| result = _run_ffmpeg_capture(["ffmpeg", "-hide_banner", "-filters"]) | |
| if result.returncode != 0 or "libvmaf" not in result.stdout: | |
| raise SystemExit("Missing libvmaf filter in ffmpeg; install/build ffmpeg with --enable-libvmaf") | |
| def _ensure_vmaf_model(path: str, workdir: Path) -> str: | |
| model_path = Path(path) | |
| if model_path.exists(): | |
| return str(model_path) | |
| if not model_path.is_absolute(): | |
| candidate = workdir / model_path | |
| if candidate.exists(): | |
| return str(candidate) | |
| cache_dir = Path.home() / ".cache" / "movieconvert" | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| cache_path = cache_dir / "vmaf_v0.6.1.json" | |
| if cache_path.exists(): | |
| return str(cache_path) | |
| url = "https://github.com/Netflix/vmaf/raw/master/model/vmaf_v0.6.1.json" | |
| print(f"Downloading VMAF model: {url}") | |
| try: | |
| urllib.request.urlretrieve(url, cache_path) | |
| except Exception as exc: | |
| raise SystemExit(f"Failed to download VMAF model: {exc}") | |
| return str(cache_path) | |
| def _vmaf_model_arg(model_path: Optional[str]) -> str: | |
| if not model_path: | |
| return "" | |
| result = _run_ffmpeg_capture(["ffmpeg", "-hide_banner", "-h", "filter=libvmaf"]) | |
| output = (result.stdout or "") + (result.stderr or "") | |
| if result.returncode != 0: | |
| print("Unable to inspect libvmaf options; using default model", file=sys.stderr) | |
| return "" | |
| if "model_path" in output: | |
| return f":model_path={model_path}" | |
| if re.search(r"\bmodel\b", output): | |
| return f":model=path={model_path}" | |
| print("libvmaf has no model option; using default model", file=sys.stderr) | |
| return "" | |
| def _is_video(path: Path) -> bool: | |
| return path.suffix.lower() in VIDEO_EXTS | |
| def _iter_files(paths: List[Path], recursive: bool) -> Iterable[Path]: | |
| for p in paths: | |
| if p.is_file() and _is_video(p): | |
| yield p | |
| elif p.is_dir(): | |
| if recursive: | |
| for child in p.rglob("*"): | |
| if child.is_file() and _is_video(child): | |
| yield child | |
| else: | |
| for child in p.iterdir(): | |
| if child.is_file() and _is_video(child): | |
| yield child | |
| def _output_path( | |
| input_root: Optional[Path], | |
| src: Path, | |
| suffix: Optional[str], | |
| out_dir: Optional[Path], | |
| ext: str = ".mkv", | |
| ) -> Path: | |
| out_dir = _resolve_output_dir(input_root, src, out_dir) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| base = src.stem | |
| if suffix: | |
| base = f"{base}_{suffix}" | |
| return out_dir / f"{base}{ext}" | |
| def _output_path_same_ext(input_root: Optional[Path], src: Path, suffix: Optional[str]) -> Path: | |
| out_dir = _resolve_output_dir(input_root, src, None) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| base = src.stem | |
| if suffix: | |
| base = f"{base}_{suffix}" | |
| return out_dir / f"{base}{src.suffix}" | |
| def _resolve_output_dir(input_root: Optional[Path], src: Path, out_dir: Optional[Path]) -> Path: | |
| if out_dir: | |
| if input_root and input_root.is_dir(): | |
| try: | |
| rel = src.relative_to(input_root) | |
| except ValueError: | |
| return out_dir | |
| return out_dir / rel.parent | |
| return out_dir | |
| if "converted" in src.parts: | |
| return src.parent | |
| if input_root and input_root.is_dir(): | |
| rel = src.relative_to(input_root) | |
| return input_root / "converted" / rel.parent | |
| return src.parent / "converted" | |
| def _unique_path(path: Path, overwrite: bool) -> Path: | |
| if overwrite or not path.exists(): | |
| return path | |
| base = path.stem | |
| ext = path.suffix | |
| counter = 1 | |
| while True: | |
| candidate = path.with_name(f"{base}_{counter}{ext}") | |
| if not candidate.exists(): | |
| return candidate | |
| counter += 1 | |
| def _resolve_samples_dir(input_root: Optional[Path], src: Path, out_dir: Optional[Path]) -> Path: | |
| if out_dir: | |
| return out_dir | |
| if "samples" in src.parts: | |
| return src.parent | |
| if input_root and input_root.is_dir(): | |
| rel = src.relative_to(input_root) | |
| return input_root / "samples" / rel.parent | |
| return src.parent / "samples" | |
| def _sample_output_path( | |
| input_root: Optional[Path], | |
| src: Path, | |
| out_dir: Optional[Path], | |
| preset_name: str, | |
| ext: str, | |
| suffix: Optional[str], | |
| ) -> Path: | |
| ext = ext.lstrip(".") | |
| out_dir = _resolve_samples_dir(input_root, src, out_dir) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| base = src.stem | |
| if suffix: | |
| base = f"{base}_{suffix}" | |
| return out_dir / f"{base}_{preset_name}.{ext}" | |
| def list_presets(_: argparse.Namespace) -> int: | |
| for name, args in PRESETS.items(): | |
| joined = " ".join(args) | |
| print(f"{name}: {joined}") | |
| return 0 | |
| def list_sample_presets(_: argparse.Namespace) -> int: | |
| for name, args in SAMPLE_PRESETS.items(): | |
| joined = " ".join(args) | |
| print(f"{name}: {joined}") | |
| return 0 | |
| def convert(args: argparse.Namespace) -> int: | |
| _ensure_ffmpeg() | |
| preset_args = PRESETS.get(args.preset) | |
| is_sample_preset = False | |
| if not preset_args: | |
| preset_args = SAMPLE_PRESETS.get(args.preset) | |
| if preset_args: | |
| print(f"Using sample preset: {args.preset}") | |
| is_sample_preset = True | |
| if not preset_args: | |
| raise SystemExit(f"Unknown preset: {args.preset} (try list-presets or list-sample-presets)") | |
| preset_args = _resolve_preset(args.preset, preset_args) | |
| files = list(_iter_files([Path(p) for p in args.inputs], args.recursive)) | |
| if not files: | |
| raise SystemExit("No input video files found") | |
| input_root = None | |
| if len(args.inputs) == 1: | |
| root = Path(args.inputs[0]) | |
| if root.is_dir(): | |
| input_root = root | |
| out_dir = Path(args.out_dir) if args.out_dir else None | |
| converted: List[Tuple[Path, Path]] = [] | |
| for src in files: | |
| current_args = preset_args | |
| if args.preset == "talking-head": | |
| current_args = _apply_vf(preset_args, _talking_head_vf(src)) | |
| current_args = _apply_rotation_aware_scale_720(current_args, src) | |
| apply_color = is_sample_preset or args.preset in { | |
| "presentation", | |
| "apartment-tour", | |
| "social-short", | |
| "talking-head", | |
| "dance-lesson", | |
| } | |
| if apply_color: | |
| in_range = _probe_color_range(src) | |
| vf = f"scale=iw:ih:in_range={in_range}:out_range={in_range},format=yuv420p" | |
| current_args = _append_vf(current_args, vf) | |
| current_args = _apply_arg(current_args, "-pix_fmt", "yuv420p") | |
| current_args = _apply_arg(current_args, "-color_range", in_range) | |
| current_args = _set_x265_range(current_args, in_range) | |
| ext = PRESET_CONTAINERS.get(args.preset, ".mkv") | |
| dst = _output_path(input_root, src, args.suffix, out_dir, ext=ext) | |
| dst = _unique_path(dst, args.overwrite) | |
| cmd = [ | |
| *_ffmpeg_base_cmd(args.show_ffmpeg), | |
| "-y" if args.overwrite else "-n", | |
| ] | |
| cmd += [ | |
| "-i", | |
| str(src), | |
| *current_args, | |
| ] | |
| if ext == ".mp4": | |
| cmd += ["-f", "mp4"] | |
| cmd += ["-movflags", "+faststart", str(dst)] | |
| print(" ".join(cmd)) | |
| started = time.perf_counter() | |
| returncode, stderr = _run_ffmpeg(cmd, args.show_ffmpeg) | |
| if returncode != 0: | |
| if stderr: | |
| print(stderr, file=sys.stderr) | |
| return returncode | |
| elapsed = time.perf_counter() - started | |
| print(f"Completed: {src.name} in {elapsed:.2f}s") | |
| fix_rc = _fix_seekability(dst, args.overwrite, args.show_ffmpeg) | |
| if fix_rc != 0: | |
| return fix_rc | |
| converted.append((src, dst)) | |
| if converted: | |
| # if sys.stdin.isatty(): | |
| # input("Press Enter to run verification...") | |
| results = [_verify_conversion(src, dst, args.show_ffmpeg, 0.5) for src, dst in converted] | |
| if len(results) > 1: | |
| rows = [["Output", "Decode", "Dur diff [s]", "Audio", "Video"]] | |
| for res in results: | |
| dur_diff = res["duration_diff"] | |
| rows.append( | |
| [ | |
| str(res["output"]), | |
| "ok" if res["decode_ok"] else "fail", | |
| f"{dur_diff:.2f}" if isinstance(dur_diff, float) else "n/a", | |
| "ok" if res["audio_ok"] else "missing", | |
| "ok" if res["video_ok"] else "missing", | |
| ] | |
| ) | |
| print("Verification summary:") | |
| print(_format_table(rows)) | |
| else: | |
| res = results[0] | |
| dur_diff = res["duration_diff"] | |
| print( | |
| "Verification:", | |
| f"output={res['output']}", | |
| f"decode={'ok' if res['decode_ok'] else 'fail'}", | |
| f"dur_diff_s={dur_diff:.2f}" if isinstance(dur_diff, float) else "dur_diff_s=n/a", | |
| f"audio={'ok' if res['audio_ok'] else 'missing'}", | |
| f"video={'ok' if res['video_ok'] else 'missing'}", | |
| ) | |
| if any(not res["ok"] for res in results): | |
| return 1 | |
| return 0 | |
| def samples(args: argparse.Namespace) -> int: | |
| _ensure_ffmpeg() | |
| files = list(_iter_files([Path(p) for p in args.inputs], args.recursive)) | |
| if not files: | |
| raise SystemExit("No input video files found") | |
| names: List[str] | |
| if args.presets: | |
| names = [name.strip() for name in args.presets.split(",") if name.strip()] | |
| missing = [name for name in names if name not in SAMPLE_PRESETS] | |
| if missing: | |
| raise SystemExit(f"Unknown sample preset(s): {', '.join(missing)}") | |
| else: | |
| names = list(SAMPLE_PRESETS.keys()) | |
| resolved_presets = {name: _resolve_preset(name, SAMPLE_PRESETS[name]) for name in names} | |
| input_root = None | |
| if len(args.inputs) == 1: | |
| root = Path(args.inputs[0]) | |
| if root.is_dir(): | |
| input_root = root | |
| start = _parse_time(args.start) if args.start else None | |
| duration = _parse_time(args.duration) if args.duration else None | |
| expected_duration = _time_to_seconds(args.duration) if args.duration else None | |
| rows = [["output", "preset", "size_mb", "time_s"]] | |
| for src in files: | |
| in_range = _probe_color_range(src) | |
| vf = f"scale=iw:ih:in_range={in_range}:out_range={in_range},format=yuv420p" | |
| src_duration = _probe_duration(src) | |
| expected = expected_duration if expected_duration is not None else src_duration | |
| for name in names: | |
| preset_args = resolved_presets[name] | |
| preset_args = _apply_vf(preset_args, vf) | |
| preset_args = _apply_arg(preset_args, "-pix_fmt", "yuv420p") | |
| preset_args = _apply_arg(preset_args, "-color_range", in_range) | |
| preset_args = _set_x265_range(preset_args, in_range) | |
| dst = _sample_output_path( | |
| input_root, src, Path(args.out_dir) if args.out_dir else None, name, args.ext, args.suffix | |
| ) | |
| file_overwrite = args.overwrite | |
| if dst.exists() and not args.overwrite: | |
| dst_duration = _probe_duration(dst) | |
| if expected is not None and dst_duration is not None: | |
| if abs(expected - dst_duration) <= 0.5: | |
| print(f"Skipping existing sample: {dst.name}") | |
| continue | |
| print(f"Overwriting existing sample: {dst.name}") | |
| file_overwrite = True | |
| cmd = [*_ffmpeg_base_cmd(args.show_ffmpeg), "-y" if file_overwrite else "-n"] | |
| if start: | |
| cmd += ["-ss", start] | |
| cmd += ["-noautorotate", "-i", str(src)] | |
| if duration: | |
| cmd += ["-t", duration] | |
| cmd += [*preset_args, str(dst)] | |
| print(" ".join(cmd)) | |
| started = time.perf_counter() | |
| returncode, stderr = _run_ffmpeg(cmd, args.show_ffmpeg) | |
| if returncode != 0: | |
| if stderr: | |
| print(stderr, file=sys.stderr) | |
| return returncode | |
| elapsed = time.perf_counter() - started | |
| print(f"Sampled: {dst.name} in {elapsed:.2f}s") | |
| size_mb = os.path.getsize(dst) / (1024 * 1024) | |
| rows.append([dst.name, name, f"{size_mb:.2f}", f"{elapsed:.2f}"]) | |
| if len(rows) > 1: | |
| print("Summary:") | |
| print(_format_table(rows)) | |
| return 0 | |
| def prepare_ref(args: argparse.Namespace) -> int: | |
| _ensure_ffmpeg() | |
| files = list(_iter_files([Path(p) for p in args.inputs], args.recursive)) | |
| if not files: | |
| raise SystemExit("No input video files found") | |
| start = _parse_time(args.start) if args.start else None | |
| duration = _parse_time(args.duration) if args.duration else None | |
| for src in files: | |
| fps = args.fps or _probe_frame_rate(src) or "30000/1001" | |
| in_range = _probe_color_range(src) | |
| pix_fmt_in = _probe_pixel_format(src) | |
| pix_fmt_out = _select_pixel_format(pix_fmt_in) | |
| base = f"{src.stem}_{args.suffix}" | |
| out_dir = Path(args.out_dir) if args.out_dir else src.parent | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| dst = out_dir / f"{base}.{args.ext.lstrip('.')}" | |
| dst = _unique_path(dst, args.overwrite) | |
| vf = f"fps={fps},scale=iw:ih:in_range={in_range}:out_range=pc,format={pix_fmt_out}" | |
| x265_args = ["-x265-params", "lossless=1:range=full"] | |
| mode_desc = "lossless" | |
| if not args.lossless: | |
| x265_args = ["-crf", str(args.crf), "-x265-params", "range=full"] | |
| mode_desc = f"crf={args.crf}" | |
| cmd = [*_ffmpeg_base_cmd(args.show_ffmpeg), "-y" if args.overwrite else "-n"] | |
| cmd += ["-noautorotate"] | |
| if start: | |
| cmd += ["-ss", start] | |
| cmd += ["-i", str(src)] | |
| if duration: | |
| cmd += ["-t", duration] | |
| timescale = args.timescale | |
| if timescale is None: | |
| timescale = _timescale_for_fps(fps) | |
| if timescale is None: | |
| timescale = 90000 | |
| print("Warning: falling back to timescale 90000", file=sys.stderr) | |
| cmd += [ | |
| "-map", "0", "-vf", vf, "-c:v", "libx265", "-preset", | |
| args.preset, *x265_args, "-pix_fmt", pix_fmt_out, | |
| "-tag:v", "hvc1", "-color_range", "pc", | |
| "-fps_mode", "cfr", "-video_track_timescale", | |
| str(timescale), "-c:a", "copy", "-movflags", "+faststart", str(dst), | |
| ] # fmt: skip | |
| print(" ".join(cmd)) | |
| returncode, stderr = _run_ffmpeg(cmd, args.show_ffmpeg) | |
| if returncode != 0: | |
| if stderr: | |
| print(stderr, file=sys.stderr) | |
| return returncode | |
| print(f"OK: {dst}") | |
| print(f"CFR: {fps}") | |
| print(f"Range: {in_range} -> pc") | |
| print(f"PixFmt: {pix_fmt_in or 'unknown'} -> {pix_fmt_out}") | |
| print(f"Mode: {mode_desc} preset={args.preset}") | |
| print(f"Timescale: {timescale}") | |
| return 0 | |
| def _find_reference(src: Path, ref_root: Optional[Path]) -> Optional[Path]: | |
| if ref_root is None: | |
| if src.parent.name == "converted": | |
| candidate = src.parent.parent / src.name | |
| if candidate.exists(): | |
| return candidate | |
| return None | |
| if ref_root.is_file(): | |
| return ref_root | |
| rel = src.name | |
| candidate = ref_root / rel | |
| if candidate.exists(): | |
| return candidate | |
| return None | |
| def _parse_metric(pattern: str, text: str) -> Optional[float]: | |
| match = re.search(pattern, text) | |
| if not match: | |
| return None | |
| value = match.group(1) | |
| if value.lower() == "inf": | |
| return float("inf") | |
| try: | |
| return float(value) | |
| except ValueError: | |
| return None | |
| def _metric_vmaf(test: Path, ref: Path, workdir: Path, model_arg: str, show: bool) -> Optional[float]: | |
| log_path = workdir / "vmaf.json" | |
| vf = ( | |
| "[0:v]format=yuv420p[dist];" | |
| "[1:v]format=yuv420p[ref];" | |
| "[dist][ref]scale2ref[dist2][ref2];" | |
| f"[dist2][ref2]libvmaf=shortest=1:log_fmt=json:log_path={log_path}{model_arg}" | |
| ) | |
| cmd = [ | |
| "ffmpeg", | |
| "-hide_banner", | |
| "-noautorotate", | |
| "-i", | |
| str(test), | |
| "-noautorotate", | |
| "-i", | |
| str(ref), | |
| "-lavfi", | |
| vf, | |
| "-f", | |
| "null", | |
| "-", | |
| ] | |
| result = _run_ffmpeg_capture(cmd, show) | |
| if result.returncode != 0: | |
| print(result.stderr, file=sys.stderr) | |
| return None | |
| try: | |
| data = json.loads(log_path.read_text()) | |
| except (OSError, json.JSONDecodeError): | |
| print("Failed to read VMAF JSON output", file=sys.stderr) | |
| return None | |
| return data.get("pooled_metrics", {}).get("vmaf", {}).get("mean") | |
| def _metric_ssim(test: Path, ref: Path, show: bool) -> Optional[float]: | |
| vf = ( | |
| "[0:v]format=yuv420p[dist];" | |
| "[1:v]format=yuv420p[ref];" | |
| "[dist][ref]scale2ref[dist2][ref2];" | |
| "[dist2][ref2]ssim=shortest=1" | |
| ) | |
| cmd = [ | |
| "ffmpeg", | |
| "-hide_banner", | |
| "-noautorotate", | |
| "-i", | |
| str(test), | |
| "-noautorotate", | |
| "-i", | |
| str(ref), | |
| "-lavfi", | |
| vf, | |
| "-f", | |
| "null", | |
| "-", | |
| ] | |
| result = _run_ffmpeg_capture(cmd, show) | |
| if result.returncode != 0: | |
| return None | |
| return _parse_metric(r"All:([0-9.]+)", result.stderr) | |
| def _metric_psnr(test: Path, ref: Path, show: bool) -> Optional[float]: | |
| vf = ( | |
| "[0:v]format=yuv420p[dist];" | |
| "[1:v]format=yuv420p[ref];" | |
| "[dist][ref]scale2ref[dist2][ref2];" | |
| "[dist2][ref2]psnr=shortest=1" | |
| ) | |
| cmd = [ | |
| "ffmpeg", | |
| "-hide_banner", | |
| "-noautorotate", | |
| "-i", | |
| str(test), | |
| "-noautorotate", | |
| "-i", | |
| str(ref), | |
| "-lavfi", | |
| vf, | |
| "-f", | |
| "null", | |
| "-", | |
| ] | |
| result = _run_ffmpeg_capture(cmd, show) | |
| if result.returncode != 0: | |
| return None | |
| for pattern in ( | |
| r"(?i)average:([0-9.]+|inf)", | |
| r"(?i)psnr_avg:?\s*([0-9.]+|inf)", | |
| r"(?i)avg:([0-9.]+|inf)", | |
| ): | |
| value = _parse_metric(pattern, result.stderr) | |
| if value is not None: | |
| return value | |
| return None | |
| def _format_table(rows: List[List[str]]) -> str: | |
| if not rows: | |
| return "" | |
| widths = [max(len(row[i]) for row in rows) for i in range(len(rows[0]))] | |
| lines = [] | |
| for idx, row in enumerate(rows): | |
| line = " ".join(row[i].ljust(widths[i]) for i in range(len(row))) | |
| lines.append(line) | |
| if idx == 0: | |
| lines.append(" ".join("-" * widths[i] for i in range(len(row)))) | |
| return "\n".join(lines) | |
| def quality(args: argparse.Namespace) -> int: | |
| _ensure_ffmpeg() | |
| _ensure_libvmaf() | |
| files = list(_iter_files([Path(p) for p in args.inputs], args.recursive)) | |
| if not files: | |
| raise SystemExit("No input video files found") | |
| ref_root = Path(args.ref) if args.ref else None | |
| rows = [["Filename", "Size [MB]", "VMAF", "SSIM", "PSNR"]] | |
| workdir = Path(args.workdir) if args.workdir else Path.cwd() | |
| model_path = _ensure_vmaf_model(args.model, workdir) | |
| model_arg = _vmaf_model_arg(model_path) | |
| print(f"Quality: {len(files)} file(s) to process") | |
| if model_path: | |
| print(f"VMAF model: {model_path}") | |
| if args.ref: | |
| print(f"Reference: {args.ref}") | |
| print("Metric guide: higher VMAF/SSIM/PSNR means more similar (PSNR may be inf)") | |
| for src in files: | |
| ref = _find_reference(src, ref_root) | |
| if not ref or not ref.exists(): | |
| print(f"No reference found for {src}", file=sys.stderr) | |
| continue | |
| print(f"Processing: {src.name}") | |
| size_mb = os.path.getsize(src) / (1024 * 1024) | |
| print("Calculating VMAF...") | |
| vmaf = _metric_vmaf(src, ref, workdir, model_arg, args.show_ffmpeg) | |
| print("Calculating SSIM...") | |
| ssim = _metric_ssim(src, ref, args.show_ffmpeg) | |
| print("Calculating PSNR...") | |
| psnr = _metric_psnr(src, ref, args.show_ffmpeg) | |
| rows.append( | |
| [ | |
| src.name, | |
| f"{size_mb:.2f}", | |
| f"{vmaf:.2f}" if vmaf is not None else "n/a", | |
| f"{ssim:.4f}" if ssim is not None else "n/a", | |
| f"{psnr:.2f}" if psnr is not None else "n/a", | |
| ] | |
| ) | |
| print(_format_table(rows[-2:])) | |
| if len(rows) > 1: | |
| header = rows[0] | |
| body = sorted( | |
| rows[1:], | |
| key=lambda r: float("inf") if r[2] == "n/a" else float(r[2]), | |
| reverse=True, | |
| ) | |
| rows = [header, *body] | |
| print(_format_table(rows)) | |
| return 0 | |
| def probe(args: argparse.Namespace) -> int: | |
| _ensure_ffmpeg() | |
| path = Path(args.input) | |
| cmd = [ | |
| "ffprobe", | |
| "-hide_banner", | |
| "-v", | |
| "error", | |
| "-print_format", | |
| "json", | |
| "-show_format", | |
| "-show_streams", | |
| str(path), | |
| ] | |
| result = _run(cmd) | |
| if result.returncode != 0: | |
| print(result.stderr, file=sys.stderr) | |
| return result.returncode | |
| print(result.stdout) | |
| return 0 | |
| def _parse_time(value: str) -> str: | |
| if re.match(r"^\d+(\.\d+)?$", value): | |
| return value | |
| if re.match(r"^\d{1,2}:\d{2}(:\d{2})?$", value): | |
| return value | |
| raise SystemExit(f"Invalid time value: {value}") | |
| def _time_to_seconds(value: str) -> Optional[float]: | |
| if ":" not in value: | |
| try: | |
| return float(value) | |
| except ValueError: | |
| return None | |
| parts = value.split(":") | |
| if len(parts) == 2: | |
| hours = 0 | |
| minutes, seconds = parts | |
| elif len(parts) == 3: | |
| hours, minutes, seconds = parts | |
| else: | |
| return None | |
| try: | |
| return int(hours) * 3600 + int(minutes) * 60 + float(seconds) | |
| except ValueError: | |
| return None | |
| def _parse_fps_fraction(value: str) -> Optional[Tuple[int, int]]: | |
| if "/" in value: | |
| parts = value.split("/") | |
| if len(parts) != 2: | |
| return None | |
| try: | |
| return int(parts[0]), int(parts[1]) | |
| except ValueError: | |
| return None | |
| try: | |
| return int(value), 1 | |
| except ValueError: | |
| return None | |
| def _timescale_for_fps(fps: str) -> Optional[int]: | |
| parsed = _parse_fps_fraction(fps) | |
| if not parsed: | |
| return None | |
| num, den = parsed | |
| if den == 1001: | |
| return num | |
| return (1000 * num) // math.gcd(1000, num) | |
| def cut(args: argparse.Namespace) -> int: | |
| _ensure_ffmpeg() | |
| files = list(_iter_files([Path(p) for p in args.inputs], args.recursive)) | |
| if not files: | |
| raise SystemExit("No input video files found") | |
| input_root = None | |
| if len(args.inputs) == 1: | |
| root = Path(args.inputs[0]) | |
| if root.is_dir(): | |
| input_root = root | |
| start = _parse_time(args.start) if args.start else None | |
| end = _parse_time(args.end) if args.end else None | |
| if not start and not end: | |
| raise SystemExit("Provide --start, --end, or both") | |
| for src in files: | |
| dst = _output_path_same_ext(input_root, src, args.suffix) | |
| dst = _unique_path(dst, args.overwrite) | |
| cmd = [*_ffmpeg_base_cmd(args.show_ffmpeg), "-y" if args.overwrite else "-n"] | |
| if start: | |
| cmd += ["-ss", start] | |
| cmd += ["-i", str(src)] | |
| if end: | |
| cmd += ["-to", end] | |
| cmd += ["-c", "copy"] | |
| cmd += ["-movflags", "+faststart", str(dst)] | |
| print(" ".join(cmd)) | |
| returncode, stderr = _run_ffmpeg(cmd, args.show_ffmpeg) | |
| if returncode != 0: | |
| if stderr: | |
| print(stderr, file=sys.stderr) | |
| return returncode | |
| return 0 | |
| def main() -> int: | |
| parser = argparse.ArgumentParser(description="Movie conversion and quality tools") | |
| sub = parser.add_subparsers(dest="command", required=True) | |
| p_convert = sub.add_parser("convert", help="Convert movie files") | |
| p_convert.add_argument("inputs", nargs="+", help="Files or directories") | |
| p_convert.add_argument("--preset", default="talking-head") | |
| p_convert.add_argument("--recursive", action="store_true") | |
| p_convert.add_argument("--suffix", help="Suffix appended to base filename") | |
| p_convert.add_argument("--out-dir", help="Output directory (default: converted/)") | |
| p_convert.add_argument("--overwrite", action="store_true") | |
| p_convert.add_argument( | |
| "--show-ffmpeg", | |
| action="store_true", | |
| help="Show ffmpeg progress/time output", | |
| ) | |
| p_convert.set_defaults(func=convert) | |
| p_quality = sub.add_parser("quality", help="Compute VMAF/SSIM/PSNR table") | |
| p_quality.add_argument("inputs", nargs="+", help="Files or directories") | |
| p_quality.add_argument("--recursive", action="store_true") | |
| p_quality.add_argument("--ref", help="Reference file or directory") | |
| p_quality.add_argument("--workdir", help="Directory for temporary metric files") | |
| p_quality.add_argument( | |
| "--model", | |
| default="/usr/share/model/vmaf_v0.6.1.json", | |
| help="Path to VMAF model JSON", | |
| ) | |
| p_quality.add_argument( | |
| "--show-ffmpeg", | |
| action="store_true", | |
| help="Show ffmpeg progress/time output", | |
| ) | |
| p_quality.set_defaults(func=quality) | |
| p_presets = sub.add_parser("list-presets", help="List conversion presets") | |
| p_presets.set_defaults(func=list_presets) | |
| p_sample_presets = sub.add_parser("list-sample-presets", help="List sample presets") | |
| p_sample_presets.set_defaults(func=list_sample_presets) | |
| p_probe = sub.add_parser("probe", help="Print ffprobe JSON") | |
| p_probe.add_argument("input") | |
| p_probe.set_defaults(func=probe) | |
| p_samples = sub.add_parser("samples", help="Generate sample encodes for comparison") | |
| p_samples.add_argument("inputs", nargs="+", help="Files or directories") | |
| p_samples.add_argument("--presets", help="Comma-separated sample preset names") | |
| p_samples.add_argument("--recursive", action="store_true") | |
| p_samples.add_argument("--start", help="Start time in seconds or hh:mm:ss") | |
| p_samples.add_argument("--duration", help="Clip duration in seconds or hh:mm:ss") | |
| p_samples.add_argument("--suffix", help="Suffix appended to base filename") | |
| p_samples.add_argument("--ext", default="mp4", help="Output extension (default: mp4)") | |
| p_samples.add_argument("--out-dir", help="Output directory (default: samples/)") | |
| p_samples.add_argument("--overwrite", action="store_true") | |
| p_samples.add_argument( | |
| "--show-ffmpeg", | |
| action="store_true", | |
| help="Show ffmpeg progress/time output", | |
| ) | |
| p_samples.set_defaults(func=samples) | |
| p_prepare = sub.add_parser("prepare-ref", help="Create canonical VMAF reference") | |
| p_prepare.add_argument("inputs", nargs="+", help="Files or directories") | |
| p_prepare.add_argument("--recursive", action="store_true") | |
| p_prepare.add_argument("--start", help="Start time in seconds or hh:mm:ss") | |
| p_prepare.add_argument("--duration", help="Clip duration in seconds or hh:mm:ss") | |
| p_prepare.add_argument("--suffix", default="ref_canonical", help="Suffix for output basename") | |
| p_prepare.add_argument("--ext", default="mp4", help="Output extension (default: mp4)") | |
| p_prepare.add_argument("--out-dir", help="Output directory (default: input dir)") | |
| p_prepare.add_argument("--preset", default="slow", help="x265 preset (default: slow)") | |
| p_prepare.add_argument("--crf", type=int, default=12, help="x265 CRF (default: 12)") | |
| p_prepare.add_argument("--fps", help="Override input FPS (e.g. 30000/1001)") | |
| p_prepare.add_argument("--timescale", type=int, help="Video track timescale (auto by default)") | |
| p_prepare.add_argument( | |
| "--lossless", | |
| dest="lossless", | |
| action="store_true", | |
| default=True, | |
| help="Use lossless x265 (default)", | |
| ) | |
| p_prepare.add_argument( | |
| "--lossy", | |
| dest="lossless", | |
| action="store_false", | |
| help="Use CRF-based x265 (see --crf)", | |
| ) | |
| p_prepare.add_argument("--overwrite", action="store_true") | |
| p_prepare.add_argument( | |
| "--show-ffmpeg", | |
| action="store_true", | |
| help="Show ffmpeg progress/time output", | |
| ) | |
| p_prepare.set_defaults(func=prepare_ref) | |
| p_cut = sub.add_parser("cut", help="Cut a time range from movie files") | |
| p_cut.add_argument("inputs", nargs="+", help="Files or directories") | |
| p_cut.add_argument("--start", help="Start time in seconds or hh:mm:ss") | |
| p_cut.add_argument("--end", help="End time in seconds or hh:mm:ss") | |
| p_cut.add_argument("--recursive", action="store_true") | |
| p_cut.add_argument("--suffix", default="cut", help="Suffix appended to base filename") | |
| p_cut.add_argument("--overwrite", action="store_true") | |
| p_cut.add_argument( | |
| "--show-ffmpeg", | |
| action="store_true", | |
| help="Show ffmpeg progress/time output", | |
| ) | |
| p_cut.set_defaults(func=cut) | |
| if argcomplete: | |
| argcomplete.autocomplete(parser) | |
| args = parser.parse_args() | |
| return args.func(args) | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment