Skip to content

Instantly share code, notes, and snippets.

@planetrocky
Forked from alastairmccormack/iframe-probe.py
Last active December 8, 2025 12:59
Show Gist options
  • Select an option

  • Save planetrocky/e9944085d5e101408d8db9fd0c6aaa20 to your computer and use it in GitHub Desktop.

Select an option

Save planetrocky/e9944085d5e101408d8db9fd0c6aaa20 to your computer and use it in GitHub Desktop.
Shows GOP structure for video file using ffmpeg --show-frames output
#!/usr/bin/env python3
#
# pylint: disable=missing-module-docstring, missing-class-docstring, missing-function-docstring, line-too-long
#
# Shows GOP structure of video file. Useful for checking suitability for HLS and DASH packaging.
# Example:
#
# $ iframe-probe.py myvideo.mp4
# GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED
# GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED
# GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED
# GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED
# GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED
# GOP: IPPPPPPPPPPPPPPPPP 18 CLOSED
#
# Key:
# 🟧 I: IDR Frame
# 🟨 i: i frame
# 🟩 P: p frame
# 🟦 B: b frame
import sys
import errno
import os
import json
import subprocess
import threading
import time
from contextlib import contextmanager
import glob
import shutil
import argparse
import textwrap
import statistics
import numpy as np
import plotext
FFMPEG_PATH = shutil.which("ffmpeg")
if not FFMPEG_PATH:
print('FFmpeg not found. FFmpeg must be installed and available in PATH.')
sys.exit(1)
def expand_glob_if_needed(pattern):
"""
Expand glob pattern if it contains glob characters, otherwise return as-is
"""
# Check if the pattern contains glob characters
glob_chars = ['*', '?', '[', ']']
has_glob = any(char in pattern for char in glob_chars)
if has_glob:
# Expand the glob pattern
matches = glob.glob(pattern)
if matches:
return sorted(matches) # Sort for consistent ordering
else:
# No matches found, return original pattern
return [pattern]
else:
# No glob characters, return as single item
return [pattern]
@contextmanager
def spinner_context(message : str = "Processing", show_completed : bool = True):
stop_event = threading.Event()
def spin():
chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
idx = 0
sys.stdout.write("\033[?25l") # Hide cursor
while not stop_event.is_set():
sys.stdout.write(f"\r{chars[idx % len(chars)]} {message}...")
sys.stdout.flush()
idx += 1
time.sleep(0.08)
if show_completed:
sys.stdout.write(f"\r✓ {message} complete! \n")
else:
sys.stdout.write("\r\033[K") # Clear line
sys.stdout.write("\033[?25h") # Show cursor
sys.stdout.flush()
thread = threading.Thread(target=spin)
thread.start()
try:
yield
finally:
stop_event.set()
thread.join()
class Ansi:
# blocks
BLOCK_HALF = "▀"
BLOCK_QUADRANTS = " ▘▝▀▖▌▞▛▗▚▐▜▄▙▟█"
CSI = "\033["
FG_XTERM256 = "38:"
BG_XTERM256 = "48:"
FG_RGB24 = "38:2::"
BG_RGB24 = "48:2::"
CSI_END = "m"
# color s
BFRAME = CSI + FG_RGB24 + "0:166:237" + CSI_END # emoji blue square Windows 11 Terminal "\033[38;5;63m"
PFRAME = CSI + FG_RGB24 + "0:210:106" + CSI_END # emoji green square Windows 11 Terminal "\033[38;5;76m"
IFRAME = CSI + FG_RGB24 + "255:176:46" + CSI_END # emoji yellow square Windows 11 Terminal "\033[38;5;127m"
IFRAME_KEYFRAME = CSI + FG_RGB24 + "255:115:47" + CSI_END # emoji orange square Windows 11 Terminal "\033[38;5;m"
FG_WHITE = CSI + FG_RGB24 + "255:255:255" + CSI_END
BG_WHITE = CSI + BG_RGB24 + "255:255:255" + CSI_END
BG_DARKGREEN = CSI + BG_RGB24 + "00:5f:00" # "\033[48;5;22m"
BG_DARKRED = CSI + BG_RGB24 + "87:00:00" # '\033[48;5;88m'
DEFAULT = CSI + "0" + CSI_END
BOLD = CSI + "1" + CSI_END
ITALIC = CSI + "3" + CSI_END
UNDERLINE = CSI + "4" + CSI_END
REVERSE_VIDEO = CSI + "7" + CSI_END
NOT_BOLD = CSI + "22" + CSI_END
NOT_ITALIC = CSI + "23" + CSI_END
NOT_UNDERLINE = CSI + "24" + CSI_END
NOT_REVERSE_VIDEO = CSI + "27" + CSI_END
FG_DEFAULT = CSI + "39" + CSI_END
BG_DEFAULT = CSI + "49" + CSI_END
CURSOR_HOME = CSI + "H"
CURSOR_UP = CSI + "A"
CURSOR_DOWN = CSI + "B"
CURSOR_FORWARD = CSI + "C"
CURSOR_BACKWARD = CSI + "D"
CURSOR_NEXT_LINE = CSI + "E"
CURSOR_PREVIOUS_LINE = CSI + "F"
CURSOR_INVISIBLE = CSI + "?25l"
CURSOR_VISIBLE = CSI + "?25h"
ERASE_LINE = CSI + "K"
ERASE_DISPLAY = CSI + "J"
class BFrame:
def __init__(self):
self.pts_time = 0
self.json_data = {}
def __format__(self, format_spec = "ascii"):
if format_spec == "ascii":
return repr(self)
if format_spec == "emoji":
return "🟦"
if format_spec == "unicode":
return (
Ansi.BFRAME
+ "▀"
+ Ansi.DEFAULT
)
if format_spec == "ansi":
return (
Ansi.BFRAME
+ "B"
+ Ansi.DEFAULT
)
def __repr__(self):
return "B"
def __str__(self):
return repr(self)
class PFrame:
def __init__(self):
self.pts_time = 0
self.json_data = {}
def __format__(self, format_spec = "ascii"):
if format_spec == "ascii":
return repr(self)
if format_spec == "emoji":
return "🟩"
if format_spec == "unicode":
return (
Ansi.PFRAME
+ "▀"
+ Ansi.DEFAULT
)
if format_spec == "ansi":
return (
Ansi.PFRAME
+ "P"
+ Ansi.DEFAULT
)
def __repr__(self):
return "P"
def __str__(self):
return repr(self)
class IFrame:
def __init__(self):
self.key_frame = False
self.pts_time = 0
self.json_data = {}
def __format__(self, format_spec = "ascii"):
if format_spec == "ascii":
return repr(self)
if self.key_frame:
if format_spec == "emoji":
return "🟧"
if format_spec == "unicode":
return (
Ansi.IFRAME_KEYFRAME
+ "▀"
+ Ansi.DEFAULT
)
if format_spec == "ansi":
return (
Ansi.IFRAME_KEYFRAME
+ Ansi.BG_WHITE
+ Ansi.REVERSE_VIDEO
+ "i"
+ Ansi.DEFAULT
)
if format_spec == "emoji":
return "🟨"
if format_spec == "unicode":
return (
Ansi.IFRAME
+ "▀"
+ Ansi.DEFAULT
)
if format_spec == "ansi":
return (
Ansi.IFRAME
+ Ansi.BG_WHITE
+ Ansi.REVERSE_VIDEO
+ "i"
+ Ansi.DEFAULT
)
def __repr__(self):
if self.key_frame:
return "I"
return "i"
def __str__(self):
return repr(self)
class GOP:
def __init__(self):
self.closed = False
self.frames = []
def add_frame(self, frame):
self.frames.append(frame)
if isinstance(frame, IFrame) and frame.key_frame:
self.closed = True
def __format__(self, format_spec = "ascii"):
if format_spec == "ascii":
return repr(self)
frames_repr = ''
for frame in self.frames:
frames_repr += format(frame, format_spec)
if self.closed:
gtype = Ansi.BG_DARKGREEN + ' CLOSED ' + Ansi.DEFAULT
else:
gtype = Ansi.BG_DARKRED + ' OPEN ' + Ansi.DEFAULT
return (
f'\033[48;5;18m'
f'GOP:'
f'{Ansi.DEFAULT}'
f' {gtype}'
f' {len(self.frames):3d}'
f' {self.frames[0].pts_time:9.3f}'
f' {frames_repr}'
)
def __repr__(self):
frames_repr = ''
for frame in self.frames:
frames_repr += repr(frame)
gtype = 'CLOSED' if self.closed else ' OPEN '
return (
f'GOP:'
f' {gtype}'
f' {len(self.frames):3d}'
f' {frames_repr}'
)
def __str__(self):
return repr(self)
def process_ffmpeg_output(frames_json):
gops = []
gop = GOP()
gops.append(gop)
for jframe in frames_json:
if jframe["media_type"] == "video":
frame = None
if jframe["pict_type"] == 'I':
if gop.frames:
# GOP open and new iframe. Time to close GOP
gop = GOP()
gops.append(gop)
frame = IFrame()
if jframe["key_frame"] == 1:
frame.key_frame = True
elif jframe["pict_type"] == 'P':
frame = PFrame()
elif jframe["pict_type"] == 'B':
frame = BFrame()
if jframe["pts_time"]:
frame.pts_time = float(jframe["pts_time"])
frame.json_data = jframe
gop.add_frame(frame)
return gops
def print_gops(gops, print_format):
for gop in gops:
if print_format == "ascii":
print(repr(gop))
else:
print(format(gop, print_format))
def print_gop_pts_time(gops, print_format):
pts_times = [float(0.0)]
pts_times += [x.frames[0].pts_time for x in gops]
pts_times_delta = np.diff(pts_times)
for i, pts_time_delta in enumerate(pts_times_delta):
if print_format == "ascii":
print((
f"GOP {i:3d}"
f" PTS {pts_time_delta:8.3f}"
))
else:
print((
f"\033[48;5;18m"
f"GOP:"
f"{Ansi.BG_DEFAULT}"
f" {i:3d}"
f" {Ansi.BG_DARKGREEN}"
f"PTS-delta:"
f"{Ansi.BG_DEFAULT}"
f" {pts_time_delta:+6.4f}"
))
return pts_times_delta
class UltimateHelpFormatter(
argparse.RawTextHelpFormatter,
argparse.ArgumentDefaultsHelpFormatter
):
pass
def main():
parser = argparse.ArgumentParser(
description = textwrap.dedent(
"""\
dump GOP structure of video file
key:
🟧 or 🅸 or I: IDR frame
🟨 or 🅘 or i: i frame
🟩 or 🅿 or P: p frame
🟦 or 🅱 or B: b frame
"""),
formatter_class = UltimateHelpFormatter
)
parser.add_argument(
"filenames",
nargs = "+",
help="video file to parse"
)
parser.add_argument(
"-e",
"--ffprobe-exec",
dest = "ffprobe_exec",
help = "ffprobe executable. (default: %(default)s)",
default = "ffprobe"
)
parser.add_argument(
"-d",
"--duration",
default = 30,
help = "duration (default: %(default)s), set to 0 to scan entire file"
)
parser.add_argument(
"-f",
"--format",
choices = ["ascii", "ansi", "emoji", "unicode"],
default = "emoji",
help = "output formatting"
)
parser.add_argument(
"-p",
"--pts_time",
action="store_true",
help = "show gop pts time deltas"
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help = "verbose mode"
)
parser.add_argument(
"-g",
"--grid",
action="store_true",
help = "add grid to plot"
)
args = parser.parse_args()
if args.filenames is None or args.filenames == "":
parser.print_help()
print("\033[48;5;88m No filename provided! {Ansi.BG_DEFAULT}")
sys.exit(1)
for filename_arg in args.filenames:
filenames_globbed = expand_glob_if_needed(filename_arg)
for filename in filenames_globbed:
if not os.path.isfile(filename):
print(f"\033[48;5;88m File not found: {filename} {Ansi.BG_DEFAULT}")
continue
print(f"\n\033[38;5;76m{filename}{Ansi.BG_DEFAULT}\n")
if not os.path.isfile(filename):
raise FileNotFoundError(
errno.ENOENT,
os.strerror(errno.ENOENT),
filename
)
print_format = args.format.lower()
command = (
f'"{args.ffprobe_exec}"'
f' -hide_banner'
f' -loglevel error'
f' -threads 4'
f' -select_streams v:0'
f' -show_entries frame=media_type,key_frame,pict_type,pts,pts_time'
f' -print_format json'
f' "{filename}"'
)
# command = (
# f'"{args.ffprobe_exec}"'
# f' -hide_banner'
# f' -loglevel fatal'
# f' -read_intervals "%+${args.duration}"'
# f' -select_streams v:0'
# f' -show_entries frame=media_type,key_frame,pict_type,pts,pts_time'
# f' -output_format csv=p=0'
# f' -skip_frame nokey'
# f' "{filename}"'
# )
# @TODO convert to CSV to remove very slow JSON processing
# -output_format csv=p=0
if int(args.duration) > 0:
spinner_message = f"Analyzing first \033[4m{args.duration}s\033[24m of file..."
command += f' -read_intervals "%+{args.duration}"'
else:
spinner_message = "Analyzing entire file..."
try:
with spinner_context(spinner_message, show_completed = False):
response_json = subprocess.check_output(command, shell=True, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
print(f'\033[31mERROR: ({e.returncode}) {e.output.decode()} {e}{Ansi.BG_DEFAULT}')
sys.exit(1)
frames_json = json.loads(response_json)["frames"]
gops = process_ffmpeg_output(frames_json)
if args.pts_time:
if args.verbose:
print("\033[4mGOP PTS Time Delta:\033[24m\n")
pts_times_delta = print_gop_pts_time(gops, print_format)
print( "\033[48;5;18mStatistics (units=seconds):{Ansi.BG_DEFAULT}")
print(f"{Ansi.BG_DARKGREEN}min :{Ansi.BG_DEFAULT} {min(pts_times_delta):+6.4f}")
print(f"{Ansi.BG_DARKGREEN}mean :{Ansi.BG_DEFAULT} {statistics.mean(pts_times_delta):+6.4f} (±{statistics.stdev(pts_times_delta):6.4f})")
print(f"{Ansi.BG_DARKGREEN}max :{Ansi.BG_DEFAULT} {max(pts_times_delta):+6.4f}")
print(f"{Ansi.BG_DARKGREEN}mode :{Ansi.BG_DEFAULT} {statistics.mode(pts_times_delta):+6.4f}")
print(f"{Ansi.BG_DARKGREEN}median:{Ansi.BG_DEFAULT} {statistics.median(pts_times_delta):+6.4f}")
print(f"{Ansi.BG_DARKGREEN}stdev :{Ansi.BG_DEFAULT} {statistics.stdev(pts_times_delta):+6.4f}")
print()
gop_sizes = [len(x.frames) for x in gops]
if args.verbose:
print(f"GOP sizes: {gop_sizes}")
print(f"GOP lengths: {sorted(list(set(gop_sizes)))}")
print( "\033[48;5;18mStatistics (units=frames):{Ansi.BG_DEFAULT}")
print(f"{Ansi.BG_DARKGREEN}min :{Ansi.BG_DEFAULT} {min(gop_sizes):+4d}")
print(f"{Ansi.BG_DARKGREEN}mean :{Ansi.BG_DEFAULT} {statistics.mean(gop_sizes):+6.4f} (±{statistics.stdev(gop_sizes):6.4f})")
print(f"{Ansi.BG_DARKGREEN}max :{Ansi.BG_DEFAULT} {max(gop_sizes):+4d}")
print(f"{Ansi.BG_DARKGREEN}mode :{Ansi.BG_DEFAULT} {statistics.mode(gop_sizes):+4d}")
print(f"{Ansi.BG_DARKGREEN}median:{Ansi.BG_DEFAULT} {statistics.median(gop_sizes):+6.4f}")
print(f"{Ansi.BG_DARKGREEN}stdev :{Ansi.BG_DEFAULT} {statistics.stdev(gop_sizes):+6.4f}")
width = plotext.terminal_width()
bins = min(width, max(gop_sizes) - min(gop_sizes))
if args.verbose:
print(f"{width=} {bins=}")
plotext.theme('pro')
plotext.grid(args.grid, args.grid)
plotext.plot_size(width, 20)
plotext.hist(data = gop_sizes, bins = bins)
if int(args.duration) > 0:
plotext.title(f"Histogram Plot first {args.duration}s")
else:
plotext.title("Histogram Plot of entire file")
plotext.xlabel("GOP size")
plotext.show()
else:
if args.verbose:
print("\033[4mGOP Frame Analysis:\033[24m\n")
print_gops(gops, print_format)
if __name__ == "__main__":
main()
@planetrocky
Copy link
Author

Example of the three display formatting

@planetrocky
Copy link
Author

Screenshot 2025-05-01 023534

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment