Created
March 11, 2026 01:12
-
-
Save marpe/4f334e25608443e714a0eca009bdf202 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Subtitle transcription pipeline for Forensic Files episodes. | |
| Uses faster-whisper to generate .en.srt files for every episode that | |
| doesn't already have one. | |
| Install dependency: | |
| pip install faster-whisper | |
| Usage: | |
| python transcribe_subtitles.py # transcribe all missing | |
| python transcribe_subtitles.py --dry-run # list episodes needing subs | |
| python transcribe_subtitles.py --model medium # use a smaller/faster model | |
| python transcribe_subtitles.py --device cuda # force GPU | |
| python transcribe_subtitles.py --workers 2 # parallel GPU workers | |
| python transcribe_subtitles.py --season "Season 03" # single season only | |
| """ | |
| import sys | |
| import os | |
| import argparse | |
| import json | |
| import subprocess | |
| import tempfile | |
| import traceback | |
| from datetime import timedelta | |
| from pathlib import Path | |
| # Force unbuffered output | |
| sys.stdout.reconfigure(line_buffering=True) | |
| sys.stderr.reconfigure(line_buffering=True) | |
| # Add CUDA 12 DLL dirs (nvidia-cublas-cu12, nvidia-cudnn-cu12) to PATH so that | |
| # ctranslate2's C extensions can find them via Windows LoadLibrary. | |
| # Must happen before any ctranslate2 / faster-whisper import. | |
| def _add_nvidia_dlls_to_path(): | |
| import site | |
| added = [] | |
| for sp in site.getsitepackages() + [site.getusersitepackages()]: | |
| nvidia_dir = Path(sp) / "nvidia" | |
| if nvidia_dir.exists(): | |
| for pkg_bin in nvidia_dir.glob("*/bin"): | |
| if pkg_bin.is_dir(): | |
| added.append(str(pkg_bin)) | |
| if added: | |
| os.environ["PATH"] = os.pathsep.join(added) + os.pathsep + os.environ.get("PATH", "") | |
| _add_nvidia_dlls_to_path() | |
| # ── Configuration ────────────────────────────────────────────────────────────── | |
| BASE_DIR = Path(__file__).parent | |
| # All season directories to scan (relative to BASE_DIR) | |
| SEASON_DIRS = [ | |
| "Forensic Files - Season 01", | |
| "Forensic Files - Season 02", | |
| "Forensic Files - Season 03", | |
| "Forensic Files - Season 04", | |
| "Forensic Files - Season 05", | |
| "Forensic Files - Season 06", | |
| "Forensic Files - Season 07", | |
| "Forensic Files - Season 08", | |
| "Forensic Files - Season 09", | |
| "Forensic Files - Season 10", | |
| "Forensic Files - Season 11", | |
| "Forensic Files - Season 12", | |
| "Forensic Files - Season 13", | |
| "Forensic Files - Season 14", | |
| "Forensic Files - Specials", | |
| ] | |
| VIDEO_EXTS = {".avi", ".mp4", ".mkv", ".m4v"} | |
| DEFAULT_MODEL = "large-v3" # best quality; use "medium" or "small" for speed | |
| DEFAULT_DEVICE = "auto" # "auto" | "cuda" | "cpu" | |
| DEFAULT_COMPUTE = "auto" # "auto" | "float16" | "int8" | |
| LOG_FILE = BASE_DIR / "transcription_log.jsonl" | |
| # ── Helpers ──────────────────────────────────────────────────────────────────── | |
| def srt_path_for(video: Path) -> Path | None: | |
| """Return existing .en.srt or .srt next to video, or None.""" | |
| for suffix in (".en.srt", ".srt"): | |
| p = video.with_suffix(suffix) | |
| if p.exists(): | |
| return p | |
| return None | |
| def seconds_to_srt_ts(seconds: float) -> str: | |
| """Convert float seconds to SRT timestamp HH:MM:SS,mmm.""" | |
| td = timedelta(seconds=seconds) | |
| total_ms = int(td.total_seconds() * 1000) | |
| h = total_ms // 3_600_000 | |
| m = (total_ms % 3_600_000) // 60_000 | |
| s = (total_ms % 60_000) // 1_000 | |
| ms = total_ms % 1_000 | |
| return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" | |
| def segments_to_srt(segments) -> str: | |
| """Convert faster-whisper segment iterable to SRT string.""" | |
| lines = [] | |
| for i, seg in enumerate(segments, 1): | |
| start = seconds_to_srt_ts(seg.start) | |
| end = seconds_to_srt_ts(seg.end) | |
| text = seg.text.strip() | |
| lines.append(f"{i}\n{start} --> {end}\n{text}\n") | |
| return "\n".join(lines) | |
| def find_video_files(season_filter: str | None = None) -> list[Path]: | |
| """Return sorted list of video files across all season directories.""" | |
| dirs = SEASON_DIRS | |
| if season_filter: | |
| dirs = [d for d in dirs if season_filter.lower() in d.lower()] | |
| if not dirs: | |
| print(f"[WARN] No season directory matched '{season_filter}'") | |
| videos = [] | |
| for d in dirs: | |
| p = BASE_DIR / d | |
| if not p.exists(): | |
| print(f" [WARN] Not found: {d}") | |
| continue | |
| for f in sorted(p.rglob("*")): | |
| if f.suffix.lower() in VIDEO_EXTS: | |
| videos.append(f) | |
| return videos | |
| def log_result(entry: dict): | |
| with open(LOG_FILE, "a", encoding="utf-8") as f: | |
| f.write(json.dumps(entry) + "\n") | |
| # ── Transcription ────────────────────────────────────────────────────────────── | |
| def load_model(model_name: str, device: str, compute_type: str): | |
| try: | |
| from faster_whisper import WhisperModel | |
| except ImportError: | |
| print("ERROR: faster-whisper not installed.") | |
| print(" Install with: pip install faster-whisper") | |
| sys.exit(1) | |
| # Resolve 'auto' device | |
| if device == "auto": | |
| try: | |
| import torch | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| except ImportError: | |
| device = "cpu" | |
| if compute_type == "auto": | |
| compute_type = "float16" if device == "cuda" else "int8" | |
| print(f" Loading model '{model_name}' on {device} ({compute_type}) …") | |
| model = WhisperModel(model_name, device=device, compute_type=compute_type) | |
| print(f" Model ready.\n") | |
| return model | |
| def extract_audio_wav(video: Path, tmp_path: str) -> bool: | |
| """Extract audio from video to a 16 kHz mono WAV via ffmpeg.""" | |
| result = subprocess.run( | |
| [ | |
| "ffmpeg", "-y", "-i", str(video), | |
| "-vn", # no video | |
| "-ar", "16000", # resample to 16 kHz (Whisper native) | |
| "-ac", "1", # mono | |
| "-f", "wav", tmp_path, | |
| ], | |
| capture_output=True, text=True, | |
| ) | |
| return result.returncode == 0 | |
| def transcribe_video(model, video: Path, language: str = "en") -> tuple[bool, str]: | |
| """ | |
| Transcribe a video and write a .en.srt next to it. | |
| Extracts audio via ffmpeg first for maximum format compatibility. | |
| Returns (success, message). | |
| """ | |
| out_srt = video.with_suffix(".en.srt") | |
| try: | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: | |
| tmp_wav = tmp.name | |
| if not extract_audio_wav(video, tmp_wav): | |
| return False, "ffmpeg audio extraction failed" | |
| segments, info = model.transcribe( | |
| tmp_wav, | |
| language=language, | |
| beam_size=5, | |
| vad_filter=True, | |
| vad_parameters=dict(min_silence_duration_ms=500), | |
| word_timestamps=False, | |
| ) | |
| srt_text = segments_to_srt(segments) | |
| Path(tmp_wav).unlink(missing_ok=True) | |
| if not srt_text.strip(): | |
| return False, "no speech detected" | |
| out_srt.write_text(srt_text, encoding="utf-8") | |
| return True, f"saved {out_srt.name} ({len(srt_text)} chars)" | |
| except Exception as e: | |
| Path(tmp_wav).unlink(missing_ok=True) | |
| return False, f"ERROR: {e}\n{traceback.format_exc()}" | |
| # ── Main pipeline ────────────────────────────────────────────────────────────── | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Transcribe subtitles for all Forensic Files episodes" | |
| ) | |
| parser.add_argument("--dry-run", action="store_true", | |
| help="Print what would be transcribed, don't do it") | |
| parser.add_argument("--model", default=DEFAULT_MODEL, | |
| help=f"Whisper model name (default: {DEFAULT_MODEL})") | |
| parser.add_argument("--device", default=DEFAULT_DEVICE, | |
| help="Device: auto|cuda|cpu (default: auto)") | |
| parser.add_argument("--compute", default=DEFAULT_COMPUTE, | |
| help="Compute type: auto|float16|int8 (default: auto)") | |
| parser.add_argument("--language", default="en", | |
| help="Language hint for Whisper (default: en)") | |
| parser.add_argument("--season", default=None, | |
| help="Filter to a single season, e.g. 'Season 03'") | |
| parser.add_argument("--force", action="store_true", | |
| help="Re-transcribe even if .srt already exists") | |
| parser.add_argument("--limit", type=int, default=0, | |
| help="Only transcribe the first N episodes") | |
| args = parser.parse_args() | |
| print("=" * 60) | |
| print("Forensic Files - Subtitle Transcription Pipeline") | |
| print(f" Model: {args.model}") | |
| print(f" Device: {args.device}") | |
| print(f" Language: {args.language}") | |
| print("=" * 60) | |
| # ── Discover videos ────────────────────────────────────────────────────── | |
| all_videos = find_video_files(args.season) | |
| print(f"\nFound {len(all_videos)} video file(s) in season directories") | |
| if args.force: | |
| todo = all_videos | |
| else: | |
| todo = [v for v in all_videos if srt_path_for(v) is None] | |
| already_done = len(all_videos) - len(todo) | |
| print(f" {already_done} already have subtitles -- skipping") | |
| print(f" {len(todo)} episode(s) to transcribe\n") | |
| if args.limit > 0: | |
| todo = todo[:args.limit] | |
| print(f" Limited to first {args.limit} episode(s)\n") | |
| if not todo: | |
| print("Nothing to do -- all episodes already have subtitles.") | |
| return | |
| if args.dry_run: | |
| print("DRY RUN - files that would be transcribed:") | |
| for v in todo: | |
| print(f" {v.parent.name}/{v.name}") | |
| return | |
| # ── Load model ─────────────────────────────────────────────────────────── | |
| model = load_model(args.model, args.device, args.compute) | |
| # ── Transcribe each episode ────────────────────────────────────────────── | |
| ok = 0 | |
| failed = 0 | |
| for i, video in enumerate(todo, 1): | |
| label = f"{video.parent.name}/{video.name}" | |
| print(f"[{i:3d}/{len(todo)}] {label}") | |
| success, msg = transcribe_video(model, video, language=args.language) | |
| status = "ok" if success else "fail" | |
| print(f" -> {msg}") | |
| log_result({"video": str(video), "status": status, "message": msg}) | |
| if success: | |
| ok += 1 | |
| else: | |
| failed += 1 | |
| # ── Summary ────────────────────────────────────────────────────────────── | |
| print(f"\n{'='*60}") | |
| print(f"Done. Success: {ok} Failed: {failed} Total: {len(todo)}") | |
| if failed: | |
| print(f" Check {LOG_FILE.name} for error details") | |
| print("=" * 60) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment