-
-
Save planetrocky/e9944085d5e101408d8db9fd0c6aaa20 to your computer and use it in GitHub Desktop.
Shows GOP structure for video file using ffmpeg --show-frames output
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # | |
| # pylint: disable=missing-module-docstring, missing-class-docstring, missing-function-docstring, line-too-long | |
| # | |
| # Shows GOP structure of video file. Useful for checking suitability for HLS and DASH packaging. | |
| # Example: | |
| # | |
| # $ iframe-probe.py myvideo.mp4 | |
| # GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED | |
| # GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED | |
| # GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED | |
| # GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED | |
| # GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED | |
| # GOP: IPPPPPPPPPPPPPPPPP 18 CLOSED | |
| # | |
| # Key: | |
| # 🟧 I: IDR Frame | |
| # 🟨 i: i frame | |
| # 🟩 P: p frame | |
| # 🟦 B: b frame | |
| import sys | |
| import errno | |
| import os | |
| import json | |
| import subprocess | |
| import threading | |
| import time | |
| from contextlib import contextmanager | |
| import glob | |
| import shutil | |
| import argparse | |
| import textwrap | |
| import statistics | |
| import numpy as np | |
| import plotext | |
| FFMPEG_PATH = shutil.which("ffmpeg") | |
| if not FFMPEG_PATH: | |
| print('FFmpeg not found. FFmpeg must be installed and available in PATH.') | |
| sys.exit(1) | |
| def expand_glob_if_needed(pattern): | |
| """ | |
| Expand glob pattern if it contains glob characters, otherwise return as-is | |
| """ | |
| # Check if the pattern contains glob characters | |
| glob_chars = ['*', '?', '[', ']'] | |
| has_glob = any(char in pattern for char in glob_chars) | |
| if has_glob: | |
| # Expand the glob pattern | |
| matches = glob.glob(pattern) | |
| if matches: | |
| return sorted(matches) # Sort for consistent ordering | |
| else: | |
| # No matches found, return original pattern | |
| return [pattern] | |
| else: | |
| # No glob characters, return as single item | |
| return [pattern] | |
| @contextmanager | |
| def spinner_context(message : str = "Processing", show_completed : bool = True): | |
| stop_event = threading.Event() | |
| def spin(): | |
| chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" | |
| idx = 0 | |
| sys.stdout.write("\033[?25l") # Hide cursor | |
| while not stop_event.is_set(): | |
| sys.stdout.write(f"\r{chars[idx % len(chars)]} {message}...") | |
| sys.stdout.flush() | |
| idx += 1 | |
| time.sleep(0.08) | |
| if show_completed: | |
| sys.stdout.write(f"\r✓ {message} complete! \n") | |
| else: | |
| sys.stdout.write("\r\033[K") # Clear line | |
| sys.stdout.write("\033[?25h") # Show cursor | |
| sys.stdout.flush() | |
| thread = threading.Thread(target=spin) | |
| thread.start() | |
| try: | |
| yield | |
| finally: | |
| stop_event.set() | |
| thread.join() | |
| class Ansi: | |
| # blocks | |
| BLOCK_HALF = "▀" | |
| BLOCK_QUADRANTS = " ▘▝▀▖▌▞▛▗▚▐▜▄▙▟█" | |
| CSI = "\033[" | |
| FG_XTERM256 = "38:" | |
| BG_XTERM256 = "48:" | |
| FG_RGB24 = "38:2::" | |
| BG_RGB24 = "48:2::" | |
| CSI_END = "m" | |
| # color s | |
| BFRAME = CSI + FG_RGB24 + "0:166:237" + CSI_END # emoji blue square Windows 11 Terminal "\033[38;5;63m" | |
| PFRAME = CSI + FG_RGB24 + "0:210:106" + CSI_END # emoji green square Windows 11 Terminal "\033[38;5;76m" | |
| IFRAME = CSI + FG_RGB24 + "255:176:46" + CSI_END # emoji yellow square Windows 11 Terminal "\033[38;5;127m" | |
| IFRAME_KEYFRAME = CSI + FG_RGB24 + "255:115:47" + CSI_END # emoji orange square Windows 11 Terminal "\033[38;5;m" | |
| FG_WHITE = CSI + FG_RGB24 + "255:255:255" + CSI_END | |
| BG_WHITE = CSI + BG_RGB24 + "255:255:255" + CSI_END | |
| BG_DARKGREEN = CSI + BG_RGB24 + "00:5f:00" # "\033[48;5;22m" | |
| BG_DARKRED = CSI + BG_RGB24 + "87:00:00" # '\033[48;5;88m' | |
| DEFAULT = CSI + "0" + CSI_END | |
| BOLD = CSI + "1" + CSI_END | |
| ITALIC = CSI + "3" + CSI_END | |
| UNDERLINE = CSI + "4" + CSI_END | |
| REVERSE_VIDEO = CSI + "7" + CSI_END | |
| NOT_BOLD = CSI + "22" + CSI_END | |
| NOT_ITALIC = CSI + "23" + CSI_END | |
| NOT_UNDERLINE = CSI + "24" + CSI_END | |
| NOT_REVERSE_VIDEO = CSI + "27" + CSI_END | |
| FG_DEFAULT = CSI + "39" + CSI_END | |
| BG_DEFAULT = CSI + "49" + CSI_END | |
| CURSOR_HOME = CSI + "H" | |
| CURSOR_UP = CSI + "A" | |
| CURSOR_DOWN = CSI + "B" | |
| CURSOR_FORWARD = CSI + "C" | |
| CURSOR_BACKWARD = CSI + "D" | |
| CURSOR_NEXT_LINE = CSI + "E" | |
| CURSOR_PREVIOUS_LINE = CSI + "F" | |
| CURSOR_INVISIBLE = CSI + "?25l" | |
| CURSOR_VISIBLE = CSI + "?25h" | |
| ERASE_LINE = CSI + "K" | |
| ERASE_DISPLAY = CSI + "J" | |
| class BFrame: | |
| def __init__(self): | |
| self.pts_time = 0 | |
| self.json_data = {} | |
| def __format__(self, format_spec = "ascii"): | |
| if format_spec == "ascii": | |
| return repr(self) | |
| if format_spec == "emoji": | |
| return "🟦" | |
| if format_spec == "unicode": | |
| return ( | |
| Ansi.BFRAME | |
| + "▀" | |
| + Ansi.DEFAULT | |
| ) | |
| if format_spec == "ansi": | |
| return ( | |
| Ansi.BFRAME | |
| + "B" | |
| + Ansi.DEFAULT | |
| ) | |
| def __repr__(self): | |
| return "B" | |
| def __str__(self): | |
| return repr(self) | |
| class PFrame: | |
| def __init__(self): | |
| self.pts_time = 0 | |
| self.json_data = {} | |
| def __format__(self, format_spec = "ascii"): | |
| if format_spec == "ascii": | |
| return repr(self) | |
| if format_spec == "emoji": | |
| return "🟩" | |
| if format_spec == "unicode": | |
| return ( | |
| Ansi.PFRAME | |
| + "▀" | |
| + Ansi.DEFAULT | |
| ) | |
| if format_spec == "ansi": | |
| return ( | |
| Ansi.PFRAME | |
| + "P" | |
| + Ansi.DEFAULT | |
| ) | |
| def __repr__(self): | |
| return "P" | |
| def __str__(self): | |
| return repr(self) | |
| class IFrame: | |
| def __init__(self): | |
| self.key_frame = False | |
| self.pts_time = 0 | |
| self.json_data = {} | |
| def __format__(self, format_spec = "ascii"): | |
| if format_spec == "ascii": | |
| return repr(self) | |
| if self.key_frame: | |
| if format_spec == "emoji": | |
| return "🟧" | |
| if format_spec == "unicode": | |
| return ( | |
| Ansi.IFRAME_KEYFRAME | |
| + "▀" | |
| + Ansi.DEFAULT | |
| ) | |
| if format_spec == "ansi": | |
| return ( | |
| Ansi.IFRAME_KEYFRAME | |
| + Ansi.BG_WHITE | |
| + Ansi.REVERSE_VIDEO | |
| + "i" | |
| + Ansi.DEFAULT | |
| ) | |
| if format_spec == "emoji": | |
| return "🟨" | |
| if format_spec == "unicode": | |
| return ( | |
| Ansi.IFRAME | |
| + "▀" | |
| + Ansi.DEFAULT | |
| ) | |
| if format_spec == "ansi": | |
| return ( | |
| Ansi.IFRAME | |
| + Ansi.BG_WHITE | |
| + Ansi.REVERSE_VIDEO | |
| + "i" | |
| + Ansi.DEFAULT | |
| ) | |
| def __repr__(self): | |
| if self.key_frame: | |
| return "I" | |
| return "i" | |
| def __str__(self): | |
| return repr(self) | |
| class GOP: | |
| def __init__(self): | |
| self.closed = False | |
| self.frames = [] | |
| def add_frame(self, frame): | |
| self.frames.append(frame) | |
| if isinstance(frame, IFrame) and frame.key_frame: | |
| self.closed = True | |
| def __format__(self, format_spec = "ascii"): | |
| if format_spec == "ascii": | |
| return repr(self) | |
| frames_repr = '' | |
| for frame in self.frames: | |
| frames_repr += format(frame, format_spec) | |
| if self.closed: | |
| gtype = Ansi.BG_DARKGREEN + ' CLOSED ' + Ansi.DEFAULT | |
| else: | |
| gtype = Ansi.BG_DARKRED + ' OPEN ' + Ansi.DEFAULT | |
| return ( | |
| f'\033[48;5;18m' | |
| f'GOP:' | |
| f'{Ansi.DEFAULT}' | |
| f' {gtype}' | |
| f' {len(self.frames):3d}' | |
| f' {self.frames[0].pts_time:9.3f}' | |
| f' {frames_repr}' | |
| ) | |
| def __repr__(self): | |
| frames_repr = '' | |
| for frame in self.frames: | |
| frames_repr += repr(frame) | |
| gtype = 'CLOSED' if self.closed else ' OPEN ' | |
| return ( | |
| f'GOP:' | |
| f' {gtype}' | |
| f' {len(self.frames):3d}' | |
| f' {frames_repr}' | |
| ) | |
| def __str__(self): | |
| return repr(self) | |
| def process_ffmpeg_output(frames_json): | |
| gops = [] | |
| gop = GOP() | |
| gops.append(gop) | |
| for jframe in frames_json: | |
| if jframe["media_type"] == "video": | |
| frame = None | |
| if jframe["pict_type"] == 'I': | |
| if gop.frames: | |
| # GOP open and new iframe. Time to close GOP | |
| gop = GOP() | |
| gops.append(gop) | |
| frame = IFrame() | |
| if jframe["key_frame"] == 1: | |
| frame.key_frame = True | |
| elif jframe["pict_type"] == 'P': | |
| frame = PFrame() | |
| elif jframe["pict_type"] == 'B': | |
| frame = BFrame() | |
| if jframe["pts_time"]: | |
| frame.pts_time = float(jframe["pts_time"]) | |
| frame.json_data = jframe | |
| gop.add_frame(frame) | |
| return gops | |
| def print_gops(gops, print_format): | |
| for gop in gops: | |
| if print_format == "ascii": | |
| print(repr(gop)) | |
| else: | |
| print(format(gop, print_format)) | |
| def print_gop_pts_time(gops, print_format): | |
| pts_times = [float(0.0)] | |
| pts_times += [x.frames[0].pts_time for x in gops] | |
| pts_times_delta = np.diff(pts_times) | |
| for i, pts_time_delta in enumerate(pts_times_delta): | |
| if print_format == "ascii": | |
| print(( | |
| f"GOP {i:3d}" | |
| f" PTS {pts_time_delta:8.3f}" | |
| )) | |
| else: | |
| print(( | |
| f"\033[48;5;18m" | |
| f"GOP:" | |
| f"{Ansi.BG_DEFAULT}" | |
| f" {i:3d}" | |
| f" {Ansi.BG_DARKGREEN}" | |
| f"PTS-delta:" | |
| f"{Ansi.BG_DEFAULT}" | |
| f" {pts_time_delta:+6.4f}" | |
| )) | |
| return pts_times_delta | |
| class UltimateHelpFormatter( | |
| argparse.RawTextHelpFormatter, | |
| argparse.ArgumentDefaultsHelpFormatter | |
| ): | |
| pass | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description = textwrap.dedent( | |
| """\ | |
| dump GOP structure of video file | |
| key: | |
| 🟧 or 🅸 or I: IDR frame | |
| 🟨 or 🅘 or i: i frame | |
| 🟩 or 🅿 or P: p frame | |
| 🟦 or 🅱 or B: b frame | |
| """), | |
| formatter_class = UltimateHelpFormatter | |
| ) | |
| parser.add_argument( | |
| "filenames", | |
| nargs = "+", | |
| help="video file to parse" | |
| ) | |
| parser.add_argument( | |
| "-e", | |
| "--ffprobe-exec", | |
| dest = "ffprobe_exec", | |
| help = "ffprobe executable. (default: %(default)s)", | |
| default = "ffprobe" | |
| ) | |
| parser.add_argument( | |
| "-d", | |
| "--duration", | |
| default = 30, | |
| help = "duration (default: %(default)s), set to 0 to scan entire file" | |
| ) | |
| parser.add_argument( | |
| "-f", | |
| "--format", | |
| choices = ["ascii", "ansi", "emoji", "unicode"], | |
| default = "emoji", | |
| help = "output formatting" | |
| ) | |
| parser.add_argument( | |
| "-p", | |
| "--pts_time", | |
| action="store_true", | |
| help = "show gop pts time deltas" | |
| ) | |
| parser.add_argument( | |
| "-v", | |
| "--verbose", | |
| action="store_true", | |
| help = "verbose mode" | |
| ) | |
| parser.add_argument( | |
| "-g", | |
| "--grid", | |
| action="store_true", | |
| help = "add grid to plot" | |
| ) | |
| args = parser.parse_args() | |
| if args.filenames is None or args.filenames == "": | |
| parser.print_help() | |
| print("\033[48;5;88m No filename provided! {Ansi.BG_DEFAULT}") | |
| sys.exit(1) | |
| for filename_arg in args.filenames: | |
| filenames_globbed = expand_glob_if_needed(filename_arg) | |
| for filename in filenames_globbed: | |
| if not os.path.isfile(filename): | |
| print(f"\033[48;5;88m File not found: {filename} {Ansi.BG_DEFAULT}") | |
| continue | |
| print(f"\n\033[38;5;76m{filename}{Ansi.BG_DEFAULT}\n") | |
| if not os.path.isfile(filename): | |
| raise FileNotFoundError( | |
| errno.ENOENT, | |
| os.strerror(errno.ENOENT), | |
| filename | |
| ) | |
| print_format = args.format.lower() | |
| command = ( | |
| f'"{args.ffprobe_exec}"' | |
| f' -hide_banner' | |
| f' -loglevel error' | |
| f' -threads 4' | |
| f' -select_streams v:0' | |
| f' -show_entries frame=media_type,key_frame,pict_type,pts,pts_time' | |
| f' -print_format json' | |
| f' "{filename}"' | |
| ) | |
| # command = ( | |
| # f'"{args.ffprobe_exec}"' | |
| # f' -hide_banner' | |
| # f' -loglevel fatal' | |
| # f' -read_intervals "%+${args.duration}"' | |
| # f' -select_streams v:0' | |
| # f' -show_entries frame=media_type,key_frame,pict_type,pts,pts_time' | |
| # f' -output_format csv=p=0' | |
| # f' -skip_frame nokey' | |
| # f' "{filename}"' | |
| # ) | |
| # @TODO convert to CSV to remove very slow JSON processing | |
| # -output_format csv=p=0 | |
| if int(args.duration) > 0: | |
| spinner_message = f"Analyzing first \033[4m{args.duration}s\033[24m of file..." | |
| command += f' -read_intervals "%+{args.duration}"' | |
| else: | |
| spinner_message = "Analyzing entire file..." | |
| try: | |
| with spinner_context(spinner_message, show_completed = False): | |
| response_json = subprocess.check_output(command, shell=True, stderr=subprocess.PIPE) | |
| except subprocess.CalledProcessError as e: | |
| print(f'\033[31mERROR: ({e.returncode}) {e.output.decode()} {e}{Ansi.BG_DEFAULT}') | |
| sys.exit(1) | |
| frames_json = json.loads(response_json)["frames"] | |
| gops = process_ffmpeg_output(frames_json) | |
| if args.pts_time: | |
| if args.verbose: | |
| print("\033[4mGOP PTS Time Delta:\033[24m\n") | |
| pts_times_delta = print_gop_pts_time(gops, print_format) | |
| print( "\033[48;5;18mStatistics (units=seconds):{Ansi.BG_DEFAULT}") | |
| print(f"{Ansi.BG_DARKGREEN}min :{Ansi.BG_DEFAULT} {min(pts_times_delta):+6.4f}") | |
| print(f"{Ansi.BG_DARKGREEN}mean :{Ansi.BG_DEFAULT} {statistics.mean(pts_times_delta):+6.4f} (±{statistics.stdev(pts_times_delta):6.4f})") | |
| print(f"{Ansi.BG_DARKGREEN}max :{Ansi.BG_DEFAULT} {max(pts_times_delta):+6.4f}") | |
| print(f"{Ansi.BG_DARKGREEN}mode :{Ansi.BG_DEFAULT} {statistics.mode(pts_times_delta):+6.4f}") | |
| print(f"{Ansi.BG_DARKGREEN}median:{Ansi.BG_DEFAULT} {statistics.median(pts_times_delta):+6.4f}") | |
| print(f"{Ansi.BG_DARKGREEN}stdev :{Ansi.BG_DEFAULT} {statistics.stdev(pts_times_delta):+6.4f}") | |
| print() | |
| gop_sizes = [len(x.frames) for x in gops] | |
| if args.verbose: | |
| print(f"GOP sizes: {gop_sizes}") | |
| print(f"GOP lengths: {sorted(list(set(gop_sizes)))}") | |
| print( "\033[48;5;18mStatistics (units=frames):{Ansi.BG_DEFAULT}") | |
| print(f"{Ansi.BG_DARKGREEN}min :{Ansi.BG_DEFAULT} {min(gop_sizes):+4d}") | |
| print(f"{Ansi.BG_DARKGREEN}mean :{Ansi.BG_DEFAULT} {statistics.mean(gop_sizes):+6.4f} (±{statistics.stdev(gop_sizes):6.4f})") | |
| print(f"{Ansi.BG_DARKGREEN}max :{Ansi.BG_DEFAULT} {max(gop_sizes):+4d}") | |
| print(f"{Ansi.BG_DARKGREEN}mode :{Ansi.BG_DEFAULT} {statistics.mode(gop_sizes):+4d}") | |
| print(f"{Ansi.BG_DARKGREEN}median:{Ansi.BG_DEFAULT} {statistics.median(gop_sizes):+6.4f}") | |
| print(f"{Ansi.BG_DARKGREEN}stdev :{Ansi.BG_DEFAULT} {statistics.stdev(gop_sizes):+6.4f}") | |
| width = plotext.terminal_width() | |
| bins = min(width, max(gop_sizes) - min(gop_sizes)) | |
| if args.verbose: | |
| print(f"{width=} {bins=}") | |
| plotext.theme('pro') | |
| plotext.grid(args.grid, args.grid) | |
| plotext.plot_size(width, 20) | |
| plotext.hist(data = gop_sizes, bins = bins) | |
| if int(args.duration) > 0: | |
| plotext.title(f"Histogram Plot first {args.duration}s") | |
| else: | |
| plotext.title("Histogram Plot of entire file") | |
| plotext.xlabel("GOP size") | |
| plotext.show() | |
| else: | |
| if args.verbose: | |
| print("\033[4mGOP Frame Analysis:\033[24m\n") | |
| print_gops(gops, print_format) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment

