Created
March 11, 2026 01:11
-
-
Save marpe/3fe8a8d9d2780a803658611aa3f4af69 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Forensic Files supercut generator. | |
| Step 1a: Extract embedded English SRT subtitles from MKV files (ffmpeg) | |
| Step 1b: Download missing English SRT subtitles via subliminal | |
| Step 2: Find timestamps for target phrases | |
| Step 3: Cut clips with ffmpeg and concatenate into a supercut | |
| Usage: | |
| python forensic_supercut.py # full pipeline | |
| python forensic_supercut.py --skip-download # skip subliminal, search existing SRTs only | |
| python forensic_supercut.py --search-only # only search + cut (no subtitle fetching) | |
| """ | |
| import sys | |
| import re | |
| import subprocess | |
| import json | |
| import argparse | |
| from pathlib import Path | |
| # Force unbuffered output for real-time progress | |
| sys.stdout.reconfigure(line_buffering=True) | |
| sys.stderr.reconfigure(line_buffering=True) | |
| # ── Configuration ───────────────────────────────────────────────────────────── | |
| SEARCH_TERMS = [ | |
| "mitochondrial dna" | |
| ] | |
| PAD_BEFORE = 0.5 # seconds before phrase start | |
| PAD_AFTER = 0.5 # seconds after phrase end | |
| OUTPUT_DIR = Path("forensic_supercut") | |
| OUTPUT_FILE = OUTPUT_DIR / "supercut.mp4" | |
| CLIPS_DIR = OUTPUT_DIR / "clips" | |
| SUBLIMINAL = str(Path.home() / "AppData/Roaming/Python/Python311/Scripts/subliminal.exe") | |
| FORENSIC_DIRS = [ | |
| "Forensic Files - Season 01", | |
| "Forensic Files - Season 02", | |
| "Forensic Files - Season 03", | |
| "Forensic Files - Season 04", | |
| "Forensic Files - Season 05", | |
| "Forensic Files - Season 06", | |
| "Forensic Files - Season 07", | |
| "Forensic Files - Season 08", | |
| "Forensic Files - Season 09", | |
| "Forensic Files - Season 10", | |
| "Forensic Files - Season 11", | |
| "Forensic Files - Season 12", | |
| "Forensic Files - Season 13", | |
| "Forensic Files - Season 14", | |
| "Forensic Files - Specials", | |
| ] | |
| VIDEO_EXTS = {".avi", ".mp4", ".mkv", ".m4v"} | |
| # ── Helpers ─────────────────────────────────────────────────────────────────── | |
| def srt_time_to_seconds(t: str) -> float: | |
| """Convert SRT timestamp HH:MM:SS,mmm -> float seconds.""" | |
| t = t.replace(",", ".") | |
| h, m, rest = t.split(":") | |
| s, ms = rest.split(".") | |
| return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000 | |
| def parse_srt(srt_path: Path): | |
| """Yield (start_sec, end_sec, text) for each SRT block.""" | |
| text = srt_path.read_text(encoding="utf-8", errors="replace") | |
| blocks = re.split(r"\n\s*\n", text.strip()) | |
| for block in blocks: | |
| lines = block.strip().splitlines() | |
| if len(lines) < 3: | |
| continue | |
| tc_line = lines[1] | |
| m = re.match( | |
| r"(\d{2}:\d{2}:\d{2}[,\.]\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2}[,\.]\d{3})", | |
| tc_line, | |
| ) | |
| if not m: | |
| continue | |
| start = srt_time_to_seconds(m.group(1)) | |
| end = srt_time_to_seconds(m.group(2)) | |
| caption = " ".join(lines[2:]).strip() | |
| yield start, end, caption | |
| def find_video_files() -> list[Path]: | |
| base = Path(".") | |
| videos = [] | |
| for d in FORENSIC_DIRS: | |
| p = base / d | |
| if not p.exists(): | |
| print(f" [WARN] Directory not found: {d}") | |
| continue | |
| for f in sorted(p.rglob("*")): | |
| if f.suffix.lower() in VIDEO_EXTS: | |
| videos.append(f) | |
| return videos | |
| def srt_for_video(video: Path) -> Path | None: | |
| """Return the .en.srt or .srt path next to a video if it exists.""" | |
| for suffix in [".en.srt", ".srt"]: | |
| candidate = video.with_suffix(suffix) | |
| if candidate.exists(): | |
| return candidate | |
| return None | |
| def has_embedded_english_sub(video: Path) -> bool: | |
| """Check whether an MKV has an embedded English subtitle stream.""" | |
| if video.suffix.lower() != ".mkv": | |
| return False | |
| result = subprocess.run( | |
| ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", str(video)], | |
| capture_output=True, text=True, | |
| ) | |
| if result.returncode != 0: | |
| return False | |
| try: | |
| streams = json.loads(result.stdout).get("streams", []) | |
| for s in streams: | |
| if s.get("codec_type") == "subtitle": | |
| lang = s.get("tags", {}).get("language", "") | |
| if lang in ("eng", "en", ""): | |
| return True | |
| except Exception: | |
| pass | |
| return False | |
| # ── Step 1a: Extract embedded subtitles ─────────────────────────────────────── | |
| def extract_embedded_subtitles(videos: list[Path]) -> int: | |
| """Extract embedded English subs from MKV files that lack an external SRT.""" | |
| mkv_missing = [ | |
| v for v in videos | |
| if v.suffix.lower() == ".mkv" and srt_for_video(v) is None | |
| ] | |
| if not mkv_missing: | |
| return 0 | |
| print(f"\n Extracting embedded subtitles from {len(mkv_missing)} MKV(s)...") | |
| extracted = 0 | |
| for video in mkv_missing: | |
| out_srt = video.with_suffix(".en.srt") | |
| # Find first subtitle stream index | |
| probe = subprocess.run( | |
| ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", str(video)], | |
| capture_output=True, text=True, | |
| ) | |
| if probe.returncode != 0: | |
| continue | |
| try: | |
| streams = json.loads(probe.stdout).get("streams", []) | |
| except Exception: | |
| continue | |
| sub_idx = None | |
| for s in streams: | |
| if s.get("codec_type") == "subtitle": | |
| lang = s.get("tags", {}).get("language", "") | |
| if lang in ("eng", "en", ""): | |
| sub_idx = s["index"] | |
| break | |
| if sub_idx is None: | |
| continue | |
| result = subprocess.run( | |
| ["ffmpeg", "-y", "-i", str(video), | |
| "-map", f"0:{sub_idx}", | |
| "-c:s", "srt", | |
| str(out_srt)], | |
| capture_output=True, text=True, | |
| ) | |
| if result.returncode == 0 and out_srt.exists(): | |
| print(f" Extracted: {video.name}") | |
| extracted += 1 | |
| else: | |
| print(f" [WARN] Could not extract from {video.name}") | |
| print(f" Extracted {extracted}/{len(mkv_missing)} embedded subtitle(s)") | |
| return extracted | |
| # ── Step 1b: Download subtitles via subliminal ──────────────────────────────── | |
| def download_subtitles(videos: list[Path]): | |
| missing = [v for v in videos if srt_for_video(v) is None] | |
| print(f" {len(videos) - len(missing)} already have subtitles, {len(missing)} need downloading\n") | |
| if not missing: | |
| return | |
| # Group by parent directory | |
| dirs: dict[Path, list[Path]] = {} | |
| for v in missing: | |
| dirs.setdefault(v.parent, []).append(v) | |
| for i, (directory, files) in enumerate(sorted(dirs.items()), 1): | |
| print(f" [{i}/{len(dirs)}] {directory.name} ({len(files)} without sub)") | |
| cmd = [ | |
| SUBLIMINAL, "download", | |
| "-l", "en", | |
| "-p", "podnapisi", | |
| "-p", "opensubtitles", | |
| "-p", "tvsubtitles", | |
| "--min-score", "25", | |
| "-w", "4", | |
| str(directory), | |
| ] | |
| subprocess.run(cmd, text=True) | |
| new = sum(1 for v in files if srt_for_video(v) is not None) | |
| print(f" -> Downloaded {new}/{len(files)} subtitles") | |
| # ── Step 2: Find occurrences ────────────────────────────────────────────────── | |
| def find_occurrences(videos: list[Path]) -> list[dict]: | |
| print(f"\n{'='*60}") | |
| print("STEP 2: Searching subtitles for target phrases") | |
| print(f"{'='*60}") | |
| print(f" Terms: {SEARCH_TERMS}") | |
| hits = [] | |
| no_sub = 0 | |
| searched = 0 | |
| for video in videos: | |
| srt = srt_for_video(video) | |
| if srt is None: | |
| no_sub += 1 | |
| continue | |
| searched += 1 | |
| for start, end, caption in parse_srt(srt): | |
| lower = caption.lower() | |
| for term in SEARCH_TERMS: | |
| if term in lower: | |
| hits.append({ | |
| "video": str(video), | |
| "term": term, | |
| "start": max(0.0, start - PAD_BEFORE), | |
| "end": end + PAD_AFTER, | |
| "caption": caption, | |
| "raw_start": start, | |
| "raw_end": end, | |
| }) | |
| print(f" FOUND [{term}] {video.name}") | |
| print(f" {start:.1f}–{end:.1f}s: {caption!r}") | |
| print(f"\n Searched {searched} episodes, {no_sub} had no subtitle") | |
| print(f" Found {len(hits)} occurrence(s) total\n") | |
| OUTPUT_DIR.mkdir(exist_ok=True) | |
| manifest = OUTPUT_DIR / "occurrences.json" | |
| manifest.write_text(json.dumps(hits, indent=2)) | |
| print(f" Manifest saved -> {manifest}") | |
| # Deduplicate overlapping clips from same video | |
| hits = deduplicate_hits(hits) | |
| print(f" After dedup: {len(hits)} clip(s) to cut") | |
| return hits | |
| def deduplicate_hits(hits: list[dict]) -> list[dict]: | |
| """Merge clips from same video that overlap or are very close (<1s gap).""" | |
| by_video: dict[str, list[dict]] = {} | |
| for h in hits: | |
| by_video.setdefault(h["video"], []).append(h) | |
| result = [] | |
| for video, vhits in by_video.items(): | |
| vhits.sort(key=lambda h: h["start"]) | |
| merged = [vhits[0].copy()] | |
| for h in vhits[1:]: | |
| prev = merged[-1] | |
| if h["start"] <= prev["end"] + 1.0: | |
| # Extend the previous clip | |
| prev["end"] = max(prev["end"], h["end"]) | |
| prev["raw_end"] = max(prev["raw_end"], h["raw_end"]) | |
| prev["term"] = f"{prev['term']} + {h['term']}" if h["term"] not in prev["term"] else prev["term"] | |
| prev["caption"] = prev["caption"] + " | " + h["caption"] | |
| else: | |
| merged.append(h.copy()) | |
| result.extend(merged) | |
| result.sort(key=lambda h: (h["video"], h["start"])) | |
| return result | |
| # ── Step 3: Cut clips ───────────────────────────────────────────────────────── | |
| def cut_clips(hits: list[dict]) -> list[Path]: | |
| print(f"\n{'='*60}") | |
| print(f"STEP 3: Cutting {len(hits)} clips with ffmpeg") | |
| print(f"{'='*60}") | |
| CLIPS_DIR.mkdir(parents=True, exist_ok=True) | |
| clip_paths = [] | |
| for i, hit in enumerate(hits): | |
| video = Path(hit["video"]) | |
| start = hit["start"] | |
| duration = hit["end"] - hit["start"] | |
| term_slug = hit["term"].replace(" ", "_") | |
| clip_name = f"{i:04d}_{term_slug}_{video.stem[:40]}.mp4" | |
| clip_path = CLIPS_DIR / clip_name | |
| if clip_path.exists(): | |
| print(f" [{i+1}/{len(hits)}] Skip (exists): {clip_name}") | |
| clip_paths.append(clip_path) | |
| continue | |
| print(f" [{i+1}/{len(hits)}] {video.name} @{start:.1f}s dur={duration:.1f}s") | |
| print(f" -> {clip_name}") | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-ss", str(start), | |
| "-i", str(video), | |
| "-t", str(duration), | |
| # Normalize to 1280×720 H.264/AAC so concat is lossless | |
| "-c:v", "libx264", "-preset", "fast", "-crf", "20", | |
| "-vf", "scale=1280:720:force_original_aspect_ratio=decrease," | |
| "pad=1280:720:(ow-iw)/2:(oh-ih)/2,setsar=1", | |
| "-af", "loudnorm=I=-16:TP=-1.5:LRA=11", | |
| "-c:a", "aac", "-b:a", "128k", "-ar", "44100", "-ac", "2", | |
| str(clip_path), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| print(f" [ERROR] ffmpeg:\n{result.stderr[-400:]}") | |
| else: | |
| clip_paths.append(clip_path) | |
| return clip_paths | |
| # ── Step 4: Concatenate ─────────────────────────────────────────────────────── | |
| def concatenate_clips(clip_paths: list[Path]): | |
| print(f"\n{'='*60}") | |
| print(f"STEP 4: Concatenating {len(clip_paths)} clips -> {OUTPUT_FILE}") | |
| print(f"{'='*60}") | |
| # Build filter_complex concat — more reliable than the concat demuxer | |
| # because it properly handles timestamp resets across all streams. | |
| inputs = [] | |
| for clip in clip_paths: | |
| inputs += ["-i", str(clip)] | |
| n = len(clip_paths) | |
| filter_in = "".join(f"[{i}:v][{i}:a]" for i in range(n)) | |
| filter_complex = f"{filter_in}concat=n={n}:v=1:a=1[vout][aout]" | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| *inputs, | |
| "-filter_complex", filter_complex, | |
| "-map", "[vout]", "-map", "[aout]", | |
| "-c:v", "libx264", "-preset", "fast", "-crf", "20", | |
| "-c:a", "aac", "-b:a", "128k", | |
| "-movflags", "+faststart", | |
| str(OUTPUT_FILE), | |
| ] | |
| # Write concat list for reference (not used by ffmpeg here) | |
| concat_list = OUTPUT_DIR / "concat.txt" | |
| concat_list.write_text("".join(f"file '{clip.resolve()}'\n" for clip in clip_paths)) | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| print(f" [ERROR] concat failed:\n{result.stderr[-500:]}") | |
| else: | |
| size_mb = OUTPUT_FILE.stat().st_size / 1_048_576 | |
| print(f" Done! {OUTPUT_FILE} ({size_mb:.1f} MB)") | |
| # ── Main ────────────────────────────────────────────────────────────────────── | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Forensic Files supercut generator") | |
| parser.add_argument("--skip-download", action="store_true", | |
| help="Skip subliminal download step, only search existing SRTs") | |
| parser.add_argument("--search-only", action="store_true", | |
| help="Only search + cut; skip all subtitle fetching") | |
| parser.add_argument("--no-merge", action="store_true", | |
| help="Search and cut clips but stop before merging into supercut") | |
| parser.add_argument("--merge-only", action="store_true", | |
| help="Merge whatever clips are currently in the clips folder, skipping search/cut") | |
| args = parser.parse_args() | |
| print("Forensic Files Supercut Generator") | |
| print(f"Terms: {SEARCH_TERMS}") | |
| print(f"Padding: -{PAD_BEFORE}s / +{PAD_AFTER}s\n") | |
| if args.merge_only: | |
| clip_paths = sorted(CLIPS_DIR.glob("*.mp4")) | |
| if not clip_paths: | |
| print(f"No clips found in {CLIPS_DIR}") | |
| sys.exit(1) | |
| print(f"Merging {len(clip_paths)} clips from {CLIPS_DIR}") | |
| concatenate_clips(clip_paths) | |
| print(f"\nAll done! {len(clip_paths)} clips -> {OUTPUT_FILE}") | |
| return | |
| videos = find_video_files() | |
| print(f"Found {len(videos)} video files across all Forensic Files directories") | |
| if not args.search_only: | |
| print(f"\n{'='*60}") | |
| print("STEP 1: Obtaining subtitles") | |
| print(f"{'='*60}") | |
| # 1a: Extract embedded subs from MKVs first (free, fast) | |
| extract_embedded_subtitles(videos) | |
| if not args.skip_download: | |
| # 1b: Download remaining missing subs via subliminal | |
| download_subtitles(videos) | |
| else: | |
| missing = sum(1 for v in videos if srt_for_video(v) is None) | |
| print(f" [skip-download] {missing} episodes still have no subtitle") | |
| hits = find_occurrences(videos) | |
| if not hits: | |
| print("No occurrences found — check that subtitles were downloaded.") | |
| sys.exit(0) | |
| clip_paths = cut_clips(hits) | |
| if not clip_paths: | |
| print("No clips were produced.") | |
| sys.exit(1) | |
| if args.no_merge: | |
| print(f"\n{len(clip_paths)} clips saved to {CLIPS_DIR}") | |
| print("Review and remove unwanted clips, then run with --merge-only to merge.") | |
| return | |
| concatenate_clips(clip_paths) | |
| print(f"\nAll done! {len(clip_paths)} clips -> {OUTPUT_FILE}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment