Skip to content

Instantly share code, notes, and snippets.

@DamianPala
Last active January 8, 2026 12:07
Show Gist options
  • Select an option

  • Save DamianPala/3f194af9b3608124ebdfb036d3a298c9 to your computer and use it in GitHub Desktop.

Select an option

Save DamianPala/3f194af9b3608124ebdfb036d3a298c9 to your computer and use it in GitHub Desktop.
Video Converter
#!/usr/bin/env python3
# PYTHON_ARGCOMPLETE_OK
import argparse
import json
import math
import os
import pty
import re
import shutil
import subprocess
import sys
import time
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
try:
import argcomplete
except ImportError:
argcomplete = None
VIDEO_EXTS = {".mp4", ".mkv", ".mov", ".avi", ".m4v", ".webm", ".mpeg", ".mpg", ".ts"} # fmt: skip
PRESETS: Dict[str, List[str]] = {
"talking-head": [
"-map", "0:v:0", "-map", "0:a:0?",
"-metadata:s:v:0", 'handler_name=AV1 video',
"-metadata:s:a:0", 'handler_name=Opus audio',
"-c:v", "libsvtav1", "-preset", "6",
"-crf", "42", "-pix_fmt", "yuv420p", "-b:v", "0",
"-c:a", "libopus", "-b:a", "48k", "-vbr", "on", "-compression_level", "10",
],
"presentation": [
"-c:v", "libx264", "-preset", "medium",
"-crf", "20", "-vf", "scale=-2:1080",
"-pix_fmt", "yuv420p", "-c:a", "aac",
"-b:a", "160k",
],
# "apartment-tour": [
# "-map", "0:v:0", "-map", "0:a:0?",
# "-metadata:s:v:0", 'handler_name=AV1 video',
# "-metadata:s:a:0", 'handler_name=Opus audio',
# "-c:v", "libsvtav1", "-preset", "6",
# "-crf", "36", "-pix_fmt", "yuv420p", "-b:v", "0",
# "-c:a", "libopus", "-b:a", "48k", "-vbr", "on", "-compression_level", "10",
# ],
# "apartment-tour": [
# "-map_metadata", "-1",
# "-c:v", "libsvtav1", "-preset", "6",
# "-crf", "32", "-pix_fmt", "yuv420p",
# "-c:a", "libopus", "-b:a", "80k",
# ],
# "apartment-tour": [
# "-map_metadata", "-1",
# "-c:v", "libaom-av1", "-cpu-used", "6", "-b:v", "0",
# "-crf", "32", "-pix_fmt", "yuv420p",
# "-c:a", "libopus", "-b:a", "80k",
# # "-row-mt", "1", "-threads", "16", "-tile-columns", "1", "-tile-rows", "1"
# ],
# Final preset
"apartment-tour": [
"-map", "0:v:0", "-map", "0:a:0?",
"-metadata:s:v:0", 'handler_name=H.265/HEVC video',
"-metadata:s:a:0", 'handler_name=Opus audio',
"-c:v", "libx265", "-preset", "slow",
"-crf", "28", "-pix_fmt", "yuv420p",
"-c:a", "libopus", "-b:a", "48k", "-vbr", "on", "-compression_level", "10",
],
# CRF30 gives some small artifacts and ghosting but very small size
# CRF28 almost no artifacts and ghosting
"dance-lesson": [
"-map", "0:v:0", "-map", "0:a:0?",
"-metadata:s:v:0", 'handler_name=H.265/HEVC video',
"-metadata:s:a:0", 'handler_name=Opus audio',
"-c:v", "libx265", "-preset", "slow", "-tag:v", "hvc1",
"-crf", "28", "-pix_fmt", "yuv420p", "-vf", "scale=-2:720",
"-x265-params", "profile=main:level-idc=4.1:aq-mode=3:aq-strength=1.1:deblock=-1:-1",
"-c:a", "libopus", "-b:a", "64k", "-vbr", "on", "-compression_level", "10",
"-ac", "2", "-ar", "48000",
],
"social-short": [
"-c:v", "libx264", "-preset", "fast",
"-crf", "24", "-vf", "scale=-2:720",
"-pix_fmt", "yuv420p", "-c:a", "aac",
"-b:a", "128k",
],
} # fmt: skip
PRESET_CONTAINERS = {
"talking-head": ".mkv",
"presentation": ".mkv",
"apartment-tour": ".mkv",
"dance-lesson": ".mp4",
"social-short": ".mkv",
}
SAMPLE_PRESETS: Dict[str, List[str]] = {
# -------------------- HEVC / x265 --------------------
"hevc_x265_crf24_slow": [
"-map", "0",
"-c:v", "libx265", "-preset", "slow", "-crf", "24",
"-pix_fmt", "yuv420p", "-tag:v", "hvc1",
"-x265-params", "range=full",
"-c:a", "copy",
"-movflags", "+faststart",
],
"hevc_x265_crf26_slow": [
"-map", "0",
"-c:v", "libx265", "-preset", "slow", "-crf", "26",
"-pix_fmt", "yuv420p", "-tag:v", "hvc1",
"-x265-params", "range=full",
"-c:a", "copy",
"-movflags", "+faststart",
],
"hevc_x265_crf28_slow": [
"-map", "0",
"-c:v", "libx265", "-preset", "slow", "-crf", "28",
"-pix_fmt", "yuv420p", "-tag:v", "hvc1",
"-x265-params", "range=full",
"-c:a", "copy",
"-movflags", "+faststart",
],
"hevc_x265_crf30_slow": [
"-map", "0",
"-c:v", "libx265", "-preset", "slow", "-crf", "30",
"-pix_fmt", "yuv420p", "-tag:v", "hvc1",
"-x265-params", "range=full",
"-c:a", "copy",
"-movflags", "+faststart",
],
"hevc_x265_crf32_slow": [
"-map", "0",
"-c:v", "libx265", "-preset", "slow", "-crf", "32",
"-pix_fmt", "yuv420p", "-tag:v", "hvc1",
"-x265-params", "range=full",
"-c:a", "copy",
"-movflags", "+faststart",
],
"hevc_x265_crf28_veryslow": [
"-map", "0",
"-c:v", "libx265", "-preset", "veryslow", "-crf", "28",
"-pix_fmt", "yuv420p", "-tag:v", "hvc1",
"-x265-params", "range=full",
"-c:a", "copy",
"-movflags", "+faststart",
],
# -------------------- AV1 / SVT-AV1 (p6 and p7) --------------------
"av1_svt_p6_crf28": [
"-map", "0",
"-c:v", "libsvtav1", "-preset", "6", "-crf", "28",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
"av1_svt_p6_crf30": [
"-map", "0",
"-c:v", "libsvtav1", "-preset", "6", "-crf", "30",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
"av1_svt_p6_crf32": [
"-map", "0",
"-c:v", "libsvtav1", "-preset", "6", "-crf", "32",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
"av1_svt_p6_crf34": [
"-map", "0",
"-c:v", "libsvtav1", "-preset", "6", "-crf", "34",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
"av1_svt_p6_crf36": [
"-map", "0",
"-c:v", "libsvtav1", "-preset", "6", "-crf", "36",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
"av1_svt_p7_crf28": [
"-map", "0",
"-c:v", "libsvtav1", "-preset", "7", "-crf", "28",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
"av1_svt_p7_crf30": [
"-map", "0",
"-c:v", "libsvtav1", "-preset", "7", "-crf", "30",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
"av1_svt_p7_crf32": [
"-map", "0",
"-c:v", "libsvtav1", "-preset", "7", "-crf", "32",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
"av1_svt_p7_crf34": [
"-map", "0",
"-c:v", "libsvtav1", "-preset", "7", "-crf", "34",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
"av1_svt_p7_crf36": [
"-map", "0",
"-c:v", "libsvtav1", "-preset", "7", "-crf", "36",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
# -------------------- AV1 / AOM-AV1 p6 --------------------
"av1_aom_p6_crf28": [
"-map", "0",
"-c:v", "libaom-av1", "-cpu-used", "6", "-b:v", "0", "-crf", "28",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
"av1_aom_p6_crf30": [
"-map", "0",
"-c:v", "libaom-av1", "-cpu-used", "6", "-b:v", "0", "-crf", "30",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
"av1_aom_p6_crf32": [
"-map", "0",
"-c:v", "libaom-av1", "-cpu-used", "6", "-b:v", "0", "-crf", "32",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
"av1_aom_p6_crf34": [
"-map", "0",
"-c:v", "libaom-av1", "-cpu-used", "6", "-b:v", "0", "-crf", "34",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
"av1_aom_p6_crf36": [
"-map", "0",
"-c:v", "libaom-av1", "-cpu-used", "6", "-b:v", "0", "-crf", "36",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
],
} # fmt: skip
@dataclass
class VideoPair:
test: Path
ref: Path
def _run(cmd: List[str]) -> subprocess.CompletedProcess:
return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
def _run_ffmpeg_capture(cmd: List[str], show: bool = False) -> subprocess.CompletedProcess:
print(" ".join(cmd))
if not show:
return _run(cmd)
try:
master_fd, slave_fd = pty.openpty()
except OSError:
master_fd, slave_fd = None, None
if master_fd is None or slave_fd is None:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stderr_chunks: List[bytes] = []
if proc.stderr:
while True:
chunk = proc.stderr.read(4096)
if not chunk:
break
sys.stderr.buffer.write(chunk)
sys.stderr.buffer.flush()
stderr_chunks.append(chunk)
stdout = b""
if proc.stdout:
stdout = proc.stdout.read()
returncode = proc.wait()
return subprocess.CompletedProcess(
cmd,
returncode,
stdout.decode(errors="replace"),
b"".join(stderr_chunks).decode(errors="replace"),
)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=slave_fd)
os.close(slave_fd)
stderr_chunks: List[bytes] = []
while True:
try:
chunk = os.read(master_fd, 4096)
except OSError:
break
if not chunk:
break
sys.stderr.buffer.write(chunk)
sys.stderr.buffer.flush()
stderr_chunks.append(chunk)
os.close(master_fd)
stdout = b""
if proc.stdout:
stdout = proc.stdout.read()
returncode = proc.wait()
return subprocess.CompletedProcess(
cmd,
returncode,
stdout.decode(errors="replace"),
b"".join(stderr_chunks).decode(errors="replace"),
)
def _fix_seekability(path: Path, overwrite: bool, show: bool) -> int:
ext = path.suffix.lower()
tmp = path.with_name(f"{path.name}.tmp{ext}")
if ext in {".mp4", ".m4v", ".mov"}:
cmd = [
"ffmpeg", "-hide_banner", "-y" if overwrite else "-n",
"-fflags", "+genpts", "-i", str(path),
"-map", "0", "-c", "copy",
"-movflags", "+faststart",
str(tmp),
] # fmt: skip
else:
cmd = [
"ffmpeg", "-hide_banner", "-y" if overwrite else "-n",
"-fflags", "+genpts", "-i", str(path),
"-map", "0", "-c", "copy", "-avoid_negative_ts",
"make_zero", "-max_interleave_delta", "0", "-cues_to_front",
"1", str(tmp),
] # fmt: skip
result = _run_ffmpeg_capture(cmd, show)
if result.returncode != 0:
if result.stderr:
print(result.stderr, file=sys.stderr)
try:
if tmp.exists():
tmp.unlink()
except OSError:
pass
return result.returncode
tmp.replace(path)
return 0
def _run_ffmpeg(cmd: List[str], show: bool) -> Tuple[int, str]:
if show:
return subprocess.run(cmd).returncode, ""
result = _run(cmd)
return result.returncode, result.stderr
def _ffmpeg_base_cmd(show: bool) -> List[str]:
cmd = ["ffmpeg"]
if not show:
cmd.append("-hide_banner")
else:
cmd.append("-stats")
return cmd
def _available_encoders() -> set:
result = _run_ffmpeg_capture(["ffmpeg", "-hide_banner", "-encoders"])
if result.returncode != 0:
return set()
return set(re.findall(r"^\s*[A-Z\.]{6}\s+(\S+)\s", result.stdout, re.MULTILINE))
def _svt_to_aom(args: List[str]) -> List[str]:
cpu_used = "6"
for idx, value in enumerate(args):
if value == "-preset" and idx + 1 < len(args):
cpu_used = args[idx + 1]
break
out: List[str] = []
idx = 0
while idx < len(args):
if args[idx] == "-c:v" and idx + 1 < len(args) and args[idx + 1] == "libsvtav1":
out.extend(["-c:v", "libaom-av1", "-cpu-used", cpu_used, "-b:v", "0"])
idx += 2
continue
if args[idx] == "-preset" and idx + 1 < len(args):
idx += 2
continue
out.append(args[idx])
idx += 1
return out
def _resolve_preset(preset_name: str, preset_args: List[str]) -> List[str]:
if "libsvtav1" not in preset_args:
return preset_args
encoders = _available_encoders()
if "libsvtav1" in encoders:
return preset_args
if "libaom-av1" in encoders:
print("libsvtav1 not available; falling back to libaom-av1")
return _svt_to_aom(preset_args)
raise SystemExit(f"Preset {preset_name} requires libsvtav1; install a build with libsvtav1 or change preset")
def _probe_video_info(path: Path) -> Optional[Tuple[int, int, int]]:
cmd = [
"ffprobe", "-hide_banner", "-v", "error",
"-select_streams", "v:0",
"-show_streams", "-print_format", "json", str(path),
] # fmt: skip
result = _run(cmd)
if result.returncode != 0:
return None
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return None
streams = data.get("streams") or []
if not streams:
return None
stream = streams[0]
width = stream.get("width")
height = stream.get("height")
if not width or not height:
return None
def _parse_rotation(value: object) -> Optional[int]:
if value is None:
return None
if isinstance(value, (int, float)):
return int(value)
if isinstance(value, str):
try:
return int(float(value))
except ValueError:
return None
return None
rotation = _parse_rotation((stream.get("tags") or {}).get("rotate"))
if not rotation:
for side in stream.get("side_data_list") or []:
rot = _parse_rotation(side.get("rotation"))
if rot:
rotation = rot
break
if rotation is None:
rotation = 0
return int(width), int(height), rotation
def _probe_duration(path: Path) -> Optional[float]:
cmd = [
"ffprobe",
"-hide_banner",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"json",
str(path),
]
result = _run(cmd)
if result.returncode != 0:
return None
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return None
duration = (data.get("format") or {}).get("duration")
if duration is None:
return None
try:
return float(duration)
except ValueError:
return None
def _probe_stream_counts(path: Path) -> Tuple[int, int]:
cmd = [
"ffprobe",
"-hide_banner",
"-v",
"error",
"-show_entries",
"stream=codec_type",
"-of",
"json",
str(path),
]
result = _run(cmd)
if result.returncode != 0:
return 0, 0
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return 0, 0
video = 0
audio = 0
for stream in data.get("streams") or []:
stype = stream.get("codec_type")
if stype == "video":
video += 1
elif stype == "audio":
audio += 1
return video, audio
def _decode_check(path: Path, show: bool) -> bool:
cmd = [
"ffmpeg",
"-hide_banner",
"-v",
"error",
"-i",
str(path),
"-f",
"null",
"-",
]
result = _run_ffmpeg_capture(cmd, show)
if result.returncode != 0:
return False
return result.stderr.strip() == ""
def _verify_conversion(src: Path, dst: Path, show: bool, duration_tolerance: float) -> Dict[str, object]:
src_duration = _probe_duration(src)
dst_duration = _probe_duration(dst)
duration_diff = None
duration_ok = True
if src_duration is not None and dst_duration is not None:
duration_diff = abs(dst_duration - src_duration)
duration_ok = duration_diff <= duration_tolerance
src_video, src_audio = _probe_stream_counts(src)
dst_video, dst_audio = _probe_stream_counts(dst)
video_ok = dst_video > 0
audio_ok = src_audio == 0 or dst_audio > 0
decode_ok = _decode_check(dst, show)
ok = decode_ok and duration_ok and audio_ok and video_ok
return {
"output": dst.name,
"decode_ok": decode_ok,
"duration_diff": duration_diff,
"duration_ok": duration_ok,
"audio_ok": audio_ok,
"video_ok": video_ok,
"ok": ok,
}
def _probe_frame_rate(path: Path) -> Optional[str]:
for entry in ("r_frame_rate", "avg_frame_rate"):
cmd = [
"ffprobe",
"-hide_banner",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
f"stream={entry}",
"-of",
"default=nk=1:nw=1",
str(path),
]
result = _run(cmd)
if result.returncode != 0:
continue
value = (result.stdout or "").strip()
if value and value != "0/0":
return value
return None
def _probe_color_range(path: Path) -> str:
cmd = [
"ffprobe",
"-hide_banner",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=color_range",
"-of",
"default=nk=1:nw=1",
str(path),
]
result = _run(cmd)
if result.returncode != 0:
return "tv"
value = (result.stdout or "").strip()
if value in {"pc", "tv"}:
return value
return "tv"
def _probe_pixel_format(path: Path) -> Optional[str]:
cmd = [
"ffprobe",
"-hide_banner",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=pix_fmt",
"-of",
"default=nk=1:nw=1",
str(path),
]
result = _run(cmd)
if result.returncode != 0:
return None
value = (result.stdout or "").strip()
return value or None
def _select_pixel_format(pix_fmt_in: Optional[str]) -> str:
if pix_fmt_in and "420p10le" in pix_fmt_in:
return "yuv420p10le"
return "yuv420p"
def _talking_head_vf(src: Path) -> str:
info = _probe_video_info(src)
scale = "scale=-2:360"
if info:
width, height, rotation = info
if abs(rotation) % 180 == 90:
width, height = height, width
if height >= width:
scale = "scale=360:-2"
return f"{scale},setsar=1,fps=25"
def _rotation_aware_scale_720(src: Path, vf: str) -> str:
filters = [f.strip() for f in vf.split(",") if f.strip()]
info = _probe_video_info(src)
if not info:
return vf
width, height, rotation = info
if abs(rotation) % 180 == 90:
width, height = height, width
changed = False
for idx, flt in enumerate(filters):
match = re.match(r"scale=(-?\d+):(-?\d+)$", flt)
if not match:
continue
width_arg = int(match.group(1))
height_arg = int(match.group(2))
if width_arg == -2 and height_arg > 0:
target = height_arg
elif height_arg == -2 and width_arg > 0:
target = width_arg
else:
continue
if min(width, height) <= target:
filters[idx] = ""
else:
if width >= height:
filters[idx] = f"scale=-2:{target}"
else:
filters[idx] = f"scale={target}:-2"
changed = True
if not changed:
return vf
filters = [f for f in filters if f]
return ",".join(filters)
def _transpose_filter(rotation: int) -> Optional[str]:
rot = rotation % 360
if rot == 90:
return "transpose=1"
if rot == 270:
return "transpose=2"
if rot == 180:
return "transpose=1,transpose=1"
return None
def _apply_vf(args: List[str], vf: str) -> List[str]:
if "-vf" in args:
idx = args.index("-vf")
if idx + 1 < len(args):
updated = list(args)
updated[idx + 1] = vf
return updated
return [*args, "-vf", vf]
def _append_vf(args: List[str], vf: str) -> List[str]:
if "-vf" in args:
idx = args.index("-vf")
if idx + 1 < len(args):
updated = list(args)
updated[idx + 1] = f"{updated[idx + 1]},{vf}"
return updated
return [*args, "-vf", vf]
def _apply_rotation_aware_scale_720(args: List[str], src: Path) -> List[str]:
if "-vf" not in args:
return args
idx = args.index("-vf")
if idx + 1 >= len(args):
return args
vf = args[idx + 1]
updated_vf = _rotation_aware_scale_720(src, vf)
if updated_vf == vf:
return args
updated = list(args)
if updated_vf:
updated[idx + 1] = updated_vf
return updated
return updated[:idx] + updated[idx + 2 :]
def _apply_arg(args: List[str], key: str, value: str) -> List[str]:
if key in args:
idx = args.index(key)
if idx + 1 < len(args):
updated = list(args)
updated[idx + 1] = value
return updated
return [*args, key, value]
def _set_x265_range(args: List[str], in_range: str) -> List[str]:
if "libx265" not in args:
return args
range_value = "full" if in_range == "pc" else "limited"
if "-x265-params" in args:
idx = args.index("-x265-params")
if idx + 1 < len(args):
params = args[idx + 1].split(":") if args[idx + 1] else []
updated_params: List[str] = []
found = False
for param in params:
if param.startswith("range="):
updated_params.append(f"range={range_value}")
found = True
else:
updated_params.append(param)
if not found:
updated_params.append(f"range={range_value}")
updated = list(args)
updated[idx + 1] = ":".join(p for p in updated_params if p)
return updated
return [*args, "-x265-params", f"range={range_value}"]
def _ensure_ffmpeg() -> None:
for tool in ("ffmpeg", "ffprobe"):
if shutil.which(tool) is None:
raise SystemExit(f"Missing required tool: {tool}")
def _ensure_libvmaf() -> None:
result = _run_ffmpeg_capture(["ffmpeg", "-hide_banner", "-filters"])
if result.returncode != 0 or "libvmaf" not in result.stdout:
raise SystemExit("Missing libvmaf filter in ffmpeg; install/build ffmpeg with --enable-libvmaf")
def _ensure_vmaf_model(path: str, workdir: Path) -> str:
model_path = Path(path)
if model_path.exists():
return str(model_path)
if not model_path.is_absolute():
candidate = workdir / model_path
if candidate.exists():
return str(candidate)
cache_dir = Path.home() / ".cache" / "movieconvert"
cache_dir.mkdir(parents=True, exist_ok=True)
cache_path = cache_dir / "vmaf_v0.6.1.json"
if cache_path.exists():
return str(cache_path)
url = "https://github.com/Netflix/vmaf/raw/master/model/vmaf_v0.6.1.json"
print(f"Downloading VMAF model: {url}")
try:
urllib.request.urlretrieve(url, cache_path)
except Exception as exc:
raise SystemExit(f"Failed to download VMAF model: {exc}")
return str(cache_path)
def _vmaf_model_arg(model_path: Optional[str]) -> str:
if not model_path:
return ""
result = _run_ffmpeg_capture(["ffmpeg", "-hide_banner", "-h", "filter=libvmaf"])
output = (result.stdout or "") + (result.stderr or "")
if result.returncode != 0:
print("Unable to inspect libvmaf options; using default model", file=sys.stderr)
return ""
if "model_path" in output:
return f":model_path={model_path}"
if re.search(r"\bmodel\b", output):
return f":model=path={model_path}"
print("libvmaf has no model option; using default model", file=sys.stderr)
return ""
def _is_video(path: Path) -> bool:
return path.suffix.lower() in VIDEO_EXTS
def _iter_files(paths: List[Path], recursive: bool) -> Iterable[Path]:
for p in paths:
if p.is_file() and _is_video(p):
yield p
elif p.is_dir():
if recursive:
for child in p.rglob("*"):
if child.is_file() and _is_video(child):
yield child
else:
for child in p.iterdir():
if child.is_file() and _is_video(child):
yield child
def _output_path(
input_root: Optional[Path],
src: Path,
suffix: Optional[str],
out_dir: Optional[Path],
ext: str = ".mkv",
) -> Path:
out_dir = _resolve_output_dir(input_root, src, out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
base = src.stem
if suffix:
base = f"{base}_{suffix}"
return out_dir / f"{base}{ext}"
def _output_path_same_ext(input_root: Optional[Path], src: Path, suffix: Optional[str]) -> Path:
out_dir = _resolve_output_dir(input_root, src, None)
out_dir.mkdir(parents=True, exist_ok=True)
base = src.stem
if suffix:
base = f"{base}_{suffix}"
return out_dir / f"{base}{src.suffix}"
def _resolve_output_dir(input_root: Optional[Path], src: Path, out_dir: Optional[Path]) -> Path:
if out_dir:
if input_root and input_root.is_dir():
try:
rel = src.relative_to(input_root)
except ValueError:
return out_dir
return out_dir / rel.parent
return out_dir
if "converted" in src.parts:
return src.parent
if input_root and input_root.is_dir():
rel = src.relative_to(input_root)
return input_root / "converted" / rel.parent
return src.parent / "converted"
def _unique_path(path: Path, overwrite: bool) -> Path:
if overwrite or not path.exists():
return path
base = path.stem
ext = path.suffix
counter = 1
while True:
candidate = path.with_name(f"{base}_{counter}{ext}")
if not candidate.exists():
return candidate
counter += 1
def _resolve_samples_dir(input_root: Optional[Path], src: Path, out_dir: Optional[Path]) -> Path:
if out_dir:
return out_dir
if "samples" in src.parts:
return src.parent
if input_root and input_root.is_dir():
rel = src.relative_to(input_root)
return input_root / "samples" / rel.parent
return src.parent / "samples"
def _sample_output_path(
input_root: Optional[Path],
src: Path,
out_dir: Optional[Path],
preset_name: str,
ext: str,
suffix: Optional[str],
) -> Path:
ext = ext.lstrip(".")
out_dir = _resolve_samples_dir(input_root, src, out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
base = src.stem
if suffix:
base = f"{base}_{suffix}"
return out_dir / f"{base}_{preset_name}.{ext}"
def list_presets(_: argparse.Namespace) -> int:
for name, args in PRESETS.items():
joined = " ".join(args)
print(f"{name}: {joined}")
return 0
def list_sample_presets(_: argparse.Namespace) -> int:
for name, args in SAMPLE_PRESETS.items():
joined = " ".join(args)
print(f"{name}: {joined}")
return 0
def convert(args: argparse.Namespace) -> int:
_ensure_ffmpeg()
preset_args = PRESETS.get(args.preset)
is_sample_preset = False
if not preset_args:
preset_args = SAMPLE_PRESETS.get(args.preset)
if preset_args:
print(f"Using sample preset: {args.preset}")
is_sample_preset = True
if not preset_args:
raise SystemExit(f"Unknown preset: {args.preset} (try list-presets or list-sample-presets)")
preset_args = _resolve_preset(args.preset, preset_args)
files = list(_iter_files([Path(p) for p in args.inputs], args.recursive))
if not files:
raise SystemExit("No input video files found")
input_root = None
if len(args.inputs) == 1:
root = Path(args.inputs[0])
if root.is_dir():
input_root = root
out_dir = Path(args.out_dir) if args.out_dir else None
converted: List[Tuple[Path, Path]] = []
for src in files:
current_args = preset_args
if args.preset == "talking-head":
current_args = _apply_vf(preset_args, _talking_head_vf(src))
current_args = _apply_rotation_aware_scale_720(current_args, src)
apply_color = is_sample_preset or args.preset in {
"presentation",
"apartment-tour",
"social-short",
"talking-head",
"dance-lesson",
}
if apply_color:
in_range = _probe_color_range(src)
vf = f"scale=iw:ih:in_range={in_range}:out_range={in_range},format=yuv420p"
current_args = _append_vf(current_args, vf)
current_args = _apply_arg(current_args, "-pix_fmt", "yuv420p")
current_args = _apply_arg(current_args, "-color_range", in_range)
current_args = _set_x265_range(current_args, in_range)
ext = PRESET_CONTAINERS.get(args.preset, ".mkv")
dst = _output_path(input_root, src, args.suffix, out_dir, ext=ext)
dst = _unique_path(dst, args.overwrite)
cmd = [
*_ffmpeg_base_cmd(args.show_ffmpeg),
"-y" if args.overwrite else "-n",
]
cmd += [
"-i",
str(src),
*current_args,
]
if ext == ".mp4":
cmd += ["-f", "mp4"]
cmd += ["-movflags", "+faststart", str(dst)]
print(" ".join(cmd))
started = time.perf_counter()
returncode, stderr = _run_ffmpeg(cmd, args.show_ffmpeg)
if returncode != 0:
if stderr:
print(stderr, file=sys.stderr)
return returncode
elapsed = time.perf_counter() - started
print(f"Completed: {src.name} in {elapsed:.2f}s")
fix_rc = _fix_seekability(dst, args.overwrite, args.show_ffmpeg)
if fix_rc != 0:
return fix_rc
converted.append((src, dst))
if converted:
# if sys.stdin.isatty():
# input("Press Enter to run verification...")
results = [_verify_conversion(src, dst, args.show_ffmpeg, 0.5) for src, dst in converted]
if len(results) > 1:
rows = [["Output", "Decode", "Dur diff [s]", "Audio", "Video"]]
for res in results:
dur_diff = res["duration_diff"]
rows.append(
[
str(res["output"]),
"ok" if res["decode_ok"] else "fail",
f"{dur_diff:.2f}" if isinstance(dur_diff, float) else "n/a",
"ok" if res["audio_ok"] else "missing",
"ok" if res["video_ok"] else "missing",
]
)
print("Verification summary:")
print(_format_table(rows))
else:
res = results[0]
dur_diff = res["duration_diff"]
print(
"Verification:",
f"output={res['output']}",
f"decode={'ok' if res['decode_ok'] else 'fail'}",
f"dur_diff_s={dur_diff:.2f}" if isinstance(dur_diff, float) else "dur_diff_s=n/a",
f"audio={'ok' if res['audio_ok'] else 'missing'}",
f"video={'ok' if res['video_ok'] else 'missing'}",
)
if any(not res["ok"] for res in results):
return 1
return 0
def samples(args: argparse.Namespace) -> int:
_ensure_ffmpeg()
files = list(_iter_files([Path(p) for p in args.inputs], args.recursive))
if not files:
raise SystemExit("No input video files found")
names: List[str]
if args.presets:
names = [name.strip() for name in args.presets.split(",") if name.strip()]
missing = [name for name in names if name not in SAMPLE_PRESETS]
if missing:
raise SystemExit(f"Unknown sample preset(s): {', '.join(missing)}")
else:
names = list(SAMPLE_PRESETS.keys())
resolved_presets = {name: _resolve_preset(name, SAMPLE_PRESETS[name]) for name in names}
input_root = None
if len(args.inputs) == 1:
root = Path(args.inputs[0])
if root.is_dir():
input_root = root
start = _parse_time(args.start) if args.start else None
duration = _parse_time(args.duration) if args.duration else None
expected_duration = _time_to_seconds(args.duration) if args.duration else None
rows = [["output", "preset", "size_mb", "time_s"]]
for src in files:
in_range = _probe_color_range(src)
vf = f"scale=iw:ih:in_range={in_range}:out_range={in_range},format=yuv420p"
src_duration = _probe_duration(src)
expected = expected_duration if expected_duration is not None else src_duration
for name in names:
preset_args = resolved_presets[name]
preset_args = _apply_vf(preset_args, vf)
preset_args = _apply_arg(preset_args, "-pix_fmt", "yuv420p")
preset_args = _apply_arg(preset_args, "-color_range", in_range)
preset_args = _set_x265_range(preset_args, in_range)
dst = _sample_output_path(
input_root, src, Path(args.out_dir) if args.out_dir else None, name, args.ext, args.suffix
)
file_overwrite = args.overwrite
if dst.exists() and not args.overwrite:
dst_duration = _probe_duration(dst)
if expected is not None and dst_duration is not None:
if abs(expected - dst_duration) <= 0.5:
print(f"Skipping existing sample: {dst.name}")
continue
print(f"Overwriting existing sample: {dst.name}")
file_overwrite = True
cmd = [*_ffmpeg_base_cmd(args.show_ffmpeg), "-y" if file_overwrite else "-n"]
if start:
cmd += ["-ss", start]
cmd += ["-noautorotate", "-i", str(src)]
if duration:
cmd += ["-t", duration]
cmd += [*preset_args, str(dst)]
print(" ".join(cmd))
started = time.perf_counter()
returncode, stderr = _run_ffmpeg(cmd, args.show_ffmpeg)
if returncode != 0:
if stderr:
print(stderr, file=sys.stderr)
return returncode
elapsed = time.perf_counter() - started
print(f"Sampled: {dst.name} in {elapsed:.2f}s")
size_mb = os.path.getsize(dst) / (1024 * 1024)
rows.append([dst.name, name, f"{size_mb:.2f}", f"{elapsed:.2f}"])
if len(rows) > 1:
print("Summary:")
print(_format_table(rows))
return 0
def prepare_ref(args: argparse.Namespace) -> int:
_ensure_ffmpeg()
files = list(_iter_files([Path(p) for p in args.inputs], args.recursive))
if not files:
raise SystemExit("No input video files found")
start = _parse_time(args.start) if args.start else None
duration = _parse_time(args.duration) if args.duration else None
for src in files:
fps = args.fps or _probe_frame_rate(src) or "30000/1001"
in_range = _probe_color_range(src)
pix_fmt_in = _probe_pixel_format(src)
pix_fmt_out = _select_pixel_format(pix_fmt_in)
base = f"{src.stem}_{args.suffix}"
out_dir = Path(args.out_dir) if args.out_dir else src.parent
out_dir.mkdir(parents=True, exist_ok=True)
dst = out_dir / f"{base}.{args.ext.lstrip('.')}"
dst = _unique_path(dst, args.overwrite)
vf = f"fps={fps},scale=iw:ih:in_range={in_range}:out_range=pc,format={pix_fmt_out}"
x265_args = ["-x265-params", "lossless=1:range=full"]
mode_desc = "lossless"
if not args.lossless:
x265_args = ["-crf", str(args.crf), "-x265-params", "range=full"]
mode_desc = f"crf={args.crf}"
cmd = [*_ffmpeg_base_cmd(args.show_ffmpeg), "-y" if args.overwrite else "-n"]
cmd += ["-noautorotate"]
if start:
cmd += ["-ss", start]
cmd += ["-i", str(src)]
if duration:
cmd += ["-t", duration]
timescale = args.timescale
if timescale is None:
timescale = _timescale_for_fps(fps)
if timescale is None:
timescale = 90000
print("Warning: falling back to timescale 90000", file=sys.stderr)
cmd += [
"-map", "0", "-vf", vf, "-c:v", "libx265", "-preset",
args.preset, *x265_args, "-pix_fmt", pix_fmt_out,
"-tag:v", "hvc1", "-color_range", "pc",
"-fps_mode", "cfr", "-video_track_timescale",
str(timescale), "-c:a", "copy", "-movflags", "+faststart", str(dst),
] # fmt: skip
print(" ".join(cmd))
returncode, stderr = _run_ffmpeg(cmd, args.show_ffmpeg)
if returncode != 0:
if stderr:
print(stderr, file=sys.stderr)
return returncode
print(f"OK: {dst}")
print(f"CFR: {fps}")
print(f"Range: {in_range} -> pc")
print(f"PixFmt: {pix_fmt_in or 'unknown'} -> {pix_fmt_out}")
print(f"Mode: {mode_desc} preset={args.preset}")
print(f"Timescale: {timescale}")
return 0
def _find_reference(src: Path, ref_root: Optional[Path]) -> Optional[Path]:
if ref_root is None:
if src.parent.name == "converted":
candidate = src.parent.parent / src.name
if candidate.exists():
return candidate
return None
if ref_root.is_file():
return ref_root
rel = src.name
candidate = ref_root / rel
if candidate.exists():
return candidate
return None
def _parse_metric(pattern: str, text: str) -> Optional[float]:
match = re.search(pattern, text)
if not match:
return None
value = match.group(1)
if value.lower() == "inf":
return float("inf")
try:
return float(value)
except ValueError:
return None
def _metric_vmaf(test: Path, ref: Path, workdir: Path, model_arg: str, show: bool) -> Optional[float]:
log_path = workdir / "vmaf.json"
vf = (
"[0:v]format=yuv420p[dist];"
"[1:v]format=yuv420p[ref];"
"[dist][ref]scale2ref[dist2][ref2];"
f"[dist2][ref2]libvmaf=shortest=1:log_fmt=json:log_path={log_path}{model_arg}"
)
cmd = [
"ffmpeg",
"-hide_banner",
"-noautorotate",
"-i",
str(test),
"-noautorotate",
"-i",
str(ref),
"-lavfi",
vf,
"-f",
"null",
"-",
]
result = _run_ffmpeg_capture(cmd, show)
if result.returncode != 0:
print(result.stderr, file=sys.stderr)
return None
try:
data = json.loads(log_path.read_text())
except (OSError, json.JSONDecodeError):
print("Failed to read VMAF JSON output", file=sys.stderr)
return None
return data.get("pooled_metrics", {}).get("vmaf", {}).get("mean")
def _metric_ssim(test: Path, ref: Path, show: bool) -> Optional[float]:
vf = (
"[0:v]format=yuv420p[dist];"
"[1:v]format=yuv420p[ref];"
"[dist][ref]scale2ref[dist2][ref2];"
"[dist2][ref2]ssim=shortest=1"
)
cmd = [
"ffmpeg",
"-hide_banner",
"-noautorotate",
"-i",
str(test),
"-noautorotate",
"-i",
str(ref),
"-lavfi",
vf,
"-f",
"null",
"-",
]
result = _run_ffmpeg_capture(cmd, show)
if result.returncode != 0:
return None
return _parse_metric(r"All:([0-9.]+)", result.stderr)
def _metric_psnr(test: Path, ref: Path, show: bool) -> Optional[float]:
vf = (
"[0:v]format=yuv420p[dist];"
"[1:v]format=yuv420p[ref];"
"[dist][ref]scale2ref[dist2][ref2];"
"[dist2][ref2]psnr=shortest=1"
)
cmd = [
"ffmpeg",
"-hide_banner",
"-noautorotate",
"-i",
str(test),
"-noautorotate",
"-i",
str(ref),
"-lavfi",
vf,
"-f",
"null",
"-",
]
result = _run_ffmpeg_capture(cmd, show)
if result.returncode != 0:
return None
for pattern in (
r"(?i)average:([0-9.]+|inf)",
r"(?i)psnr_avg:?\s*([0-9.]+|inf)",
r"(?i)avg:([0-9.]+|inf)",
):
value = _parse_metric(pattern, result.stderr)
if value is not None:
return value
return None
def _format_table(rows: List[List[str]]) -> str:
if not rows:
return ""
widths = [max(len(row[i]) for row in rows) for i in range(len(rows[0]))]
lines = []
for idx, row in enumerate(rows):
line = " ".join(row[i].ljust(widths[i]) for i in range(len(row)))
lines.append(line)
if idx == 0:
lines.append(" ".join("-" * widths[i] for i in range(len(row))))
return "\n".join(lines)
def quality(args: argparse.Namespace) -> int:
_ensure_ffmpeg()
_ensure_libvmaf()
files = list(_iter_files([Path(p) for p in args.inputs], args.recursive))
if not files:
raise SystemExit("No input video files found")
ref_root = Path(args.ref) if args.ref else None
rows = [["Filename", "Size [MB]", "VMAF", "SSIM", "PSNR"]]
workdir = Path(args.workdir) if args.workdir else Path.cwd()
model_path = _ensure_vmaf_model(args.model, workdir)
model_arg = _vmaf_model_arg(model_path)
print(f"Quality: {len(files)} file(s) to process")
if model_path:
print(f"VMAF model: {model_path}")
if args.ref:
print(f"Reference: {args.ref}")
print("Metric guide: higher VMAF/SSIM/PSNR means more similar (PSNR may be inf)")
for src in files:
ref = _find_reference(src, ref_root)
if not ref or not ref.exists():
print(f"No reference found for {src}", file=sys.stderr)
continue
print(f"Processing: {src.name}")
size_mb = os.path.getsize(src) / (1024 * 1024)
print("Calculating VMAF...")
vmaf = _metric_vmaf(src, ref, workdir, model_arg, args.show_ffmpeg)
print("Calculating SSIM...")
ssim = _metric_ssim(src, ref, args.show_ffmpeg)
print("Calculating PSNR...")
psnr = _metric_psnr(src, ref, args.show_ffmpeg)
rows.append(
[
src.name,
f"{size_mb:.2f}",
f"{vmaf:.2f}" if vmaf is not None else "n/a",
f"{ssim:.4f}" if ssim is not None else "n/a",
f"{psnr:.2f}" if psnr is not None else "n/a",
]
)
print(_format_table(rows[-2:]))
if len(rows) > 1:
header = rows[0]
body = sorted(
rows[1:],
key=lambda r: float("inf") if r[2] == "n/a" else float(r[2]),
reverse=True,
)
rows = [header, *body]
print(_format_table(rows))
return 0
def probe(args: argparse.Namespace) -> int:
_ensure_ffmpeg()
path = Path(args.input)
cmd = [
"ffprobe",
"-hide_banner",
"-v",
"error",
"-print_format",
"json",
"-show_format",
"-show_streams",
str(path),
]
result = _run(cmd)
if result.returncode != 0:
print(result.stderr, file=sys.stderr)
return result.returncode
print(result.stdout)
return 0
def _parse_time(value: str) -> str:
if re.match(r"^\d+(\.\d+)?$", value):
return value
if re.match(r"^\d{1,2}:\d{2}(:\d{2})?$", value):
return value
raise SystemExit(f"Invalid time value: {value}")
def _time_to_seconds(value: str) -> Optional[float]:
if ":" not in value:
try:
return float(value)
except ValueError:
return None
parts = value.split(":")
if len(parts) == 2:
hours = 0
minutes, seconds = parts
elif len(parts) == 3:
hours, minutes, seconds = parts
else:
return None
try:
return int(hours) * 3600 + int(minutes) * 60 + float(seconds)
except ValueError:
return None
def _parse_fps_fraction(value: str) -> Optional[Tuple[int, int]]:
if "/" in value:
parts = value.split("/")
if len(parts) != 2:
return None
try:
return int(parts[0]), int(parts[1])
except ValueError:
return None
try:
return int(value), 1
except ValueError:
return None
def _timescale_for_fps(fps: str) -> Optional[int]:
parsed = _parse_fps_fraction(fps)
if not parsed:
return None
num, den = parsed
if den == 1001:
return num
return (1000 * num) // math.gcd(1000, num)
def cut(args: argparse.Namespace) -> int:
_ensure_ffmpeg()
files = list(_iter_files([Path(p) for p in args.inputs], args.recursive))
if not files:
raise SystemExit("No input video files found")
input_root = None
if len(args.inputs) == 1:
root = Path(args.inputs[0])
if root.is_dir():
input_root = root
start = _parse_time(args.start) if args.start else None
end = _parse_time(args.end) if args.end else None
if not start and not end:
raise SystemExit("Provide --start, --end, or both")
for src in files:
dst = _output_path_same_ext(input_root, src, args.suffix)
dst = _unique_path(dst, args.overwrite)
cmd = [*_ffmpeg_base_cmd(args.show_ffmpeg), "-y" if args.overwrite else "-n"]
if start:
cmd += ["-ss", start]
cmd += ["-i", str(src)]
if end:
cmd += ["-to", end]
cmd += ["-c", "copy"]
cmd += ["-movflags", "+faststart", str(dst)]
print(" ".join(cmd))
returncode, stderr = _run_ffmpeg(cmd, args.show_ffmpeg)
if returncode != 0:
if stderr:
print(stderr, file=sys.stderr)
return returncode
return 0
def main() -> int:
parser = argparse.ArgumentParser(description="Movie conversion and quality tools")
sub = parser.add_subparsers(dest="command", required=True)
p_convert = sub.add_parser("convert", help="Convert movie files")
p_convert.add_argument("inputs", nargs="+", help="Files or directories")
p_convert.add_argument("--preset", default="talking-head")
p_convert.add_argument("--recursive", action="store_true")
p_convert.add_argument("--suffix", help="Suffix appended to base filename")
p_convert.add_argument("--out-dir", help="Output directory (default: converted/)")
p_convert.add_argument("--overwrite", action="store_true")
p_convert.add_argument(
"--show-ffmpeg",
action="store_true",
help="Show ffmpeg progress/time output",
)
p_convert.set_defaults(func=convert)
p_quality = sub.add_parser("quality", help="Compute VMAF/SSIM/PSNR table")
p_quality.add_argument("inputs", nargs="+", help="Files or directories")
p_quality.add_argument("--recursive", action="store_true")
p_quality.add_argument("--ref", help="Reference file or directory")
p_quality.add_argument("--workdir", help="Directory for temporary metric files")
p_quality.add_argument(
"--model",
default="/usr/share/model/vmaf_v0.6.1.json",
help="Path to VMAF model JSON",
)
p_quality.add_argument(
"--show-ffmpeg",
action="store_true",
help="Show ffmpeg progress/time output",
)
p_quality.set_defaults(func=quality)
p_presets = sub.add_parser("list-presets", help="List conversion presets")
p_presets.set_defaults(func=list_presets)
p_sample_presets = sub.add_parser("list-sample-presets", help="List sample presets")
p_sample_presets.set_defaults(func=list_sample_presets)
p_probe = sub.add_parser("probe", help="Print ffprobe JSON")
p_probe.add_argument("input")
p_probe.set_defaults(func=probe)
p_samples = sub.add_parser("samples", help="Generate sample encodes for comparison")
p_samples.add_argument("inputs", nargs="+", help="Files or directories")
p_samples.add_argument("--presets", help="Comma-separated sample preset names")
p_samples.add_argument("--recursive", action="store_true")
p_samples.add_argument("--start", help="Start time in seconds or hh:mm:ss")
p_samples.add_argument("--duration", help="Clip duration in seconds or hh:mm:ss")
p_samples.add_argument("--suffix", help="Suffix appended to base filename")
p_samples.add_argument("--ext", default="mp4", help="Output extension (default: mp4)")
p_samples.add_argument("--out-dir", help="Output directory (default: samples/)")
p_samples.add_argument("--overwrite", action="store_true")
p_samples.add_argument(
"--show-ffmpeg",
action="store_true",
help="Show ffmpeg progress/time output",
)
p_samples.set_defaults(func=samples)
p_prepare = sub.add_parser("prepare-ref", help="Create canonical VMAF reference")
p_prepare.add_argument("inputs", nargs="+", help="Files or directories")
p_prepare.add_argument("--recursive", action="store_true")
p_prepare.add_argument("--start", help="Start time in seconds or hh:mm:ss")
p_prepare.add_argument("--duration", help="Clip duration in seconds or hh:mm:ss")
p_prepare.add_argument("--suffix", default="ref_canonical", help="Suffix for output basename")
p_prepare.add_argument("--ext", default="mp4", help="Output extension (default: mp4)")
p_prepare.add_argument("--out-dir", help="Output directory (default: input dir)")
p_prepare.add_argument("--preset", default="slow", help="x265 preset (default: slow)")
p_prepare.add_argument("--crf", type=int, default=12, help="x265 CRF (default: 12)")
p_prepare.add_argument("--fps", help="Override input FPS (e.g. 30000/1001)")
p_prepare.add_argument("--timescale", type=int, help="Video track timescale (auto by default)")
p_prepare.add_argument(
"--lossless",
dest="lossless",
action="store_true",
default=True,
help="Use lossless x265 (default)",
)
p_prepare.add_argument(
"--lossy",
dest="lossless",
action="store_false",
help="Use CRF-based x265 (see --crf)",
)
p_prepare.add_argument("--overwrite", action="store_true")
p_prepare.add_argument(
"--show-ffmpeg",
action="store_true",
help="Show ffmpeg progress/time output",
)
p_prepare.set_defaults(func=prepare_ref)
p_cut = sub.add_parser("cut", help="Cut a time range from movie files")
p_cut.add_argument("inputs", nargs="+", help="Files or directories")
p_cut.add_argument("--start", help="Start time in seconds or hh:mm:ss")
p_cut.add_argument("--end", help="End time in seconds or hh:mm:ss")
p_cut.add_argument("--recursive", action="store_true")
p_cut.add_argument("--suffix", default="cut", help="Suffix appended to base filename")
p_cut.add_argument("--overwrite", action="store_true")
p_cut.add_argument(
"--show-ffmpeg",
action="store_true",
help="Show ffmpeg progress/time output",
)
p_cut.set_defaults(func=cut)
if argcomplete:
argcomplete.autocomplete(parser)
args = parser.parse_args()
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment