Skip to content

Instantly share code, notes, and snippets.

@marpe
Created March 11, 2026 01:12
Show Gist options
  • Select an option

  • Save marpe/4f334e25608443e714a0eca009bdf202 to your computer and use it in GitHub Desktop.

Select an option

Save marpe/4f334e25608443e714a0eca009bdf202 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Subtitle transcription pipeline for Forensic Files episodes.
Uses faster-whisper to generate .en.srt files for every episode that
doesn't already have one.
Install dependency:
pip install faster-whisper
Usage:
python transcribe_subtitles.py # transcribe all missing
python transcribe_subtitles.py --dry-run # list episodes needing subs
python transcribe_subtitles.py --model medium # use a smaller/faster model
python transcribe_subtitles.py --device cuda # force GPU
python transcribe_subtitles.py --workers 2 # parallel GPU workers
python transcribe_subtitles.py --season "Season 03" # single season only
"""
import sys
import os
import argparse
import json
import subprocess
import tempfile
import traceback
from datetime import timedelta
from pathlib import Path
# Force unbuffered output
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
# Add CUDA 12 DLL dirs (nvidia-cublas-cu12, nvidia-cudnn-cu12) to PATH so that
# ctranslate2's C extensions can find them via Windows LoadLibrary.
# Must happen before any ctranslate2 / faster-whisper import.
def _add_nvidia_dlls_to_path():
import site
added = []
for sp in site.getsitepackages() + [site.getusersitepackages()]:
nvidia_dir = Path(sp) / "nvidia"
if nvidia_dir.exists():
for pkg_bin in nvidia_dir.glob("*/bin"):
if pkg_bin.is_dir():
added.append(str(pkg_bin))
if added:
os.environ["PATH"] = os.pathsep.join(added) + os.pathsep + os.environ.get("PATH", "")
_add_nvidia_dlls_to_path()
# ── Configuration ──────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).parent
# All season directories to scan (relative to BASE_DIR)
SEASON_DIRS = [
"Forensic Files - Season 01",
"Forensic Files - Season 02",
"Forensic Files - Season 03",
"Forensic Files - Season 04",
"Forensic Files - Season 05",
"Forensic Files - Season 06",
"Forensic Files - Season 07",
"Forensic Files - Season 08",
"Forensic Files - Season 09",
"Forensic Files - Season 10",
"Forensic Files - Season 11",
"Forensic Files - Season 12",
"Forensic Files - Season 13",
"Forensic Files - Season 14",
"Forensic Files - Specials",
]
VIDEO_EXTS = {".avi", ".mp4", ".mkv", ".m4v"}
DEFAULT_MODEL = "large-v3" # best quality; use "medium" or "small" for speed
DEFAULT_DEVICE = "auto" # "auto" | "cuda" | "cpu"
DEFAULT_COMPUTE = "auto" # "auto" | "float16" | "int8"
LOG_FILE = BASE_DIR / "transcription_log.jsonl"
# ── Helpers ────────────────────────────────────────────────────────────────────
def srt_path_for(video: Path) -> Path | None:
"""Return existing .en.srt or .srt next to video, or None."""
for suffix in (".en.srt", ".srt"):
p = video.with_suffix(suffix)
if p.exists():
return p
return None
def seconds_to_srt_ts(seconds: float) -> str:
"""Convert float seconds to SRT timestamp HH:MM:SS,mmm."""
td = timedelta(seconds=seconds)
total_ms = int(td.total_seconds() * 1000)
h = total_ms // 3_600_000
m = (total_ms % 3_600_000) // 60_000
s = (total_ms % 60_000) // 1_000
ms = total_ms % 1_000
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
def segments_to_srt(segments) -> str:
"""Convert faster-whisper segment iterable to SRT string."""
lines = []
for i, seg in enumerate(segments, 1):
start = seconds_to_srt_ts(seg.start)
end = seconds_to_srt_ts(seg.end)
text = seg.text.strip()
lines.append(f"{i}\n{start} --> {end}\n{text}\n")
return "\n".join(lines)
def find_video_files(season_filter: str | None = None) -> list[Path]:
"""Return sorted list of video files across all season directories."""
dirs = SEASON_DIRS
if season_filter:
dirs = [d for d in dirs if season_filter.lower() in d.lower()]
if not dirs:
print(f"[WARN] No season directory matched '{season_filter}'")
videos = []
for d in dirs:
p = BASE_DIR / d
if not p.exists():
print(f" [WARN] Not found: {d}")
continue
for f in sorted(p.rglob("*")):
if f.suffix.lower() in VIDEO_EXTS:
videos.append(f)
return videos
def log_result(entry: dict):
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
# ── Transcription ──────────────────────────────────────────────────────────────
def load_model(model_name: str, device: str, compute_type: str):
try:
from faster_whisper import WhisperModel
except ImportError:
print("ERROR: faster-whisper not installed.")
print(" Install with: pip install faster-whisper")
sys.exit(1)
# Resolve 'auto' device
if device == "auto":
try:
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
except ImportError:
device = "cpu"
if compute_type == "auto":
compute_type = "float16" if device == "cuda" else "int8"
print(f" Loading model '{model_name}' on {device} ({compute_type}) …")
model = WhisperModel(model_name, device=device, compute_type=compute_type)
print(f" Model ready.\n")
return model
def extract_audio_wav(video: Path, tmp_path: str) -> bool:
"""Extract audio from video to a 16 kHz mono WAV via ffmpeg."""
result = subprocess.run(
[
"ffmpeg", "-y", "-i", str(video),
"-vn", # no video
"-ar", "16000", # resample to 16 kHz (Whisper native)
"-ac", "1", # mono
"-f", "wav", tmp_path,
],
capture_output=True, text=True,
)
return result.returncode == 0
def transcribe_video(model, video: Path, language: str = "en") -> tuple[bool, str]:
"""
Transcribe a video and write a .en.srt next to it.
Extracts audio via ffmpeg first for maximum format compatibility.
Returns (success, message).
"""
out_srt = video.with_suffix(".en.srt")
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_wav = tmp.name
if not extract_audio_wav(video, tmp_wav):
return False, "ffmpeg audio extraction failed"
segments, info = model.transcribe(
tmp_wav,
language=language,
beam_size=5,
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500),
word_timestamps=False,
)
srt_text = segments_to_srt(segments)
Path(tmp_wav).unlink(missing_ok=True)
if not srt_text.strip():
return False, "no speech detected"
out_srt.write_text(srt_text, encoding="utf-8")
return True, f"saved {out_srt.name} ({len(srt_text)} chars)"
except Exception as e:
Path(tmp_wav).unlink(missing_ok=True)
return False, f"ERROR: {e}\n{traceback.format_exc()}"
# ── Main pipeline ──────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Transcribe subtitles for all Forensic Files episodes"
)
parser.add_argument("--dry-run", action="store_true",
help="Print what would be transcribed, don't do it")
parser.add_argument("--model", default=DEFAULT_MODEL,
help=f"Whisper model name (default: {DEFAULT_MODEL})")
parser.add_argument("--device", default=DEFAULT_DEVICE,
help="Device: auto|cuda|cpu (default: auto)")
parser.add_argument("--compute", default=DEFAULT_COMPUTE,
help="Compute type: auto|float16|int8 (default: auto)")
parser.add_argument("--language", default="en",
help="Language hint for Whisper (default: en)")
parser.add_argument("--season", default=None,
help="Filter to a single season, e.g. 'Season 03'")
parser.add_argument("--force", action="store_true",
help="Re-transcribe even if .srt already exists")
parser.add_argument("--limit", type=int, default=0,
help="Only transcribe the first N episodes")
args = parser.parse_args()
print("=" * 60)
print("Forensic Files - Subtitle Transcription Pipeline")
print(f" Model: {args.model}")
print(f" Device: {args.device}")
print(f" Language: {args.language}")
print("=" * 60)
# ── Discover videos ──────────────────────────────────────────────────────
all_videos = find_video_files(args.season)
print(f"\nFound {len(all_videos)} video file(s) in season directories")
if args.force:
todo = all_videos
else:
todo = [v for v in all_videos if srt_path_for(v) is None]
already_done = len(all_videos) - len(todo)
print(f" {already_done} already have subtitles -- skipping")
print(f" {len(todo)} episode(s) to transcribe\n")
if args.limit > 0:
todo = todo[:args.limit]
print(f" Limited to first {args.limit} episode(s)\n")
if not todo:
print("Nothing to do -- all episodes already have subtitles.")
return
if args.dry_run:
print("DRY RUN - files that would be transcribed:")
for v in todo:
print(f" {v.parent.name}/{v.name}")
return
# ── Load model ───────────────────────────────────────────────────────────
model = load_model(args.model, args.device, args.compute)
# ── Transcribe each episode ──────────────────────────────────────────────
ok = 0
failed = 0
for i, video in enumerate(todo, 1):
label = f"{video.parent.name}/{video.name}"
print(f"[{i:3d}/{len(todo)}] {label}")
success, msg = transcribe_video(model, video, language=args.language)
status = "ok" if success else "fail"
print(f" -> {msg}")
log_result({"video": str(video), "status": status, "message": msg})
if success:
ok += 1
else:
failed += 1
# ── Summary ──────────────────────────────────────────────────────────────
print(f"\n{'='*60}")
print(f"Done. Success: {ok} Failed: {failed} Total: {len(todo)}")
if failed:
print(f" Check {LOG_FILE.name} for error details")
print("=" * 60)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment