Skip to content

Instantly share code, notes, and snippets.

@talmo
Created March 6, 2026 20:06
Show Gist options
  • Select an option

  • Save talmo/1841b3aed5d513686ae38f845b4d2d24 to your computer and use it in GitHub Desktop.

Select an option

Save talmo/1841b3aed5d513686ae38f845b4d2d24 to your computer and use it in GitHub Desktop.
Pure Python reader and converter for Norpix .seq video files (Piotr Dollar SeqIo format). Supports compressed (JPEG/PNG) and uncompressed formats with seek index caching, MP4 re-encoding via sleap-io, and CLI interface. Self-contained with PEP 723 inline dependencies — just uv run it.
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "numpy",
# "Pillow",
# "imageio-ffmpeg",
# "sleap-io",
# ]
# ///
"""Pure Python reader and converter for Norpix .seq video files.
This implements the seq file format used by StreamPix / Norpix and Piotr Dollar's
MATLAB SeqIo toolbox. Supports both uncompressed (raw) and compressed (JPEG, PNG)
image formats, with optional re-encoding to MP4 via ffmpeg.
Seq File Format
---------------
The .seq format stores a sequence of image frames with a fixed 1024-byte header
followed by concatenated frame data. It was originally designed by Norpix for their
StreamPix recording software and later adopted by Piotr Dollar's Computer Vision
MATLAB Toolbox.
Header layout (1024 bytes, little-endian):
Offset Size Type Field
0 4 uint32 Magic number (0xFEED)
4 20 10×uint16 Name string ("Norpix seq")
24 4 - Padding
28 4 int32 Version (>= 5 for StreamPix 6+)
32 4 uint32 Header size (always 1024)
36 512 256×uint16 Description string
548 4 uint32 Image width
552 4 uint32 Image height
556 4 uint32 Bit depth
560 4 uint32 Bit depth (real)
564 4 uint32 Image size in bytes
568 4 uint32 Image format code (see below)
572 4 uint32 Number of allocated frames
576 4 uint32 Reserved (0)
580 4 uint32 True image size (stride per frame, uncompressed)
584 8 float64 Suggested frame rate
Image format codes:
100 = monoraw Grayscale uncompressed
200 = raw Color BGR uncompressed
101 = brgb8 Bayer pattern raw
102 = monojpg Grayscale JPEG compressed
201 = jpg Color JPEG compressed
103 = jbrgb Bayer JPEG compressed
1 = monopng Grayscale PNG compressed
2 = png Color PNG compressed
Frame layout (compressed formats):
4 bytes uint32 Frame size (includes these 4 bytes)
N-4 bytes Encoded image data (JPEG or PNG)
4 bytes uint32 Timestamp: seconds since epoch
2 bytes uint16 Timestamp: milliseconds
2 bytes uint16 Timestamp: microseconds (version >= 5 only)
[0 or 8] Optional padding (auto-detected)
Frame layout (uncompressed formats):
Each frame occupies exactly `true_image_size` bytes at a fixed stride from
the header. Image data is followed by the same timestamp fields.
Seek index:
For compressed formats, frames have variable sizes, so a seek index mapping
frame number -> byte offset must be built by scanning the file sequentially.
This index is cached to `<filename>.seq-index.json` for fast subsequent loads.
This mirrors the MATLAB `-seek.mat` caching behavior.
Dependencies
------------
All dependencies are declared inline via PEP 723 and resolved automatically
by ``uv run``. No manual installation needed.
- numpy: Frame data as arrays
- Pillow: JPEG/PNG decoding
- imageio-ffmpeg: MP4 encoding (only needed for ``to_mp4()``)
- sleap-io: Video writing backend for MP4 encoding
Usage (CLI)
-----------
All commands use ``uv run -p <python-version>`` which auto-installs
dependencies in an isolated environment based on the PEP 723 inline metadata
at the top of this script. No venv setup or pip install needed.
# Print file info and benchmark
uv run -p 3.12 seq_reader.py video.seq
# Convert to MP4
uv run -p 3.12 seq_reader.py video.seq --to-mp4
# Convert with options
uv run -p 3.12 seq_reader.py video.seq --to-mp4 -o output.mp4 --crf 18 --codec libx265
# Convert a range of frames
uv run -p 3.12 seq_reader.py video.seq --to-mp4 -o tmp.mp4 --start 0 --end 1000
Usage (Python API)
------------------
When importing as a module, ensure the dependencies are installed in your
environment (e.g., ``uv add numpy Pillow imageio-ffmpeg sleap-io``).
from seq_reader import SeqReader
# Read frames
with SeqReader("video.seq") as reader:
print(reader) # Summary info
frame = reader.get_frame(0) # Single frame -> (H, W) uint8
frame = reader[100] # Indexing
batch = reader[10:20] # Slicing -> (N, H, W) uint8
ts = reader.get_timestamp(0) # datetime object
ts_f = reader.get_timestamp_float(0) # Seconds since epoch
img, ts = reader.get_frame_with_timestamp(0) # Both at once
# Convert to MP4
with SeqReader("video.seq") as reader:
reader.to_mp4("output.mp4") # Default settings
reader.to_mp4("output.mp4", crf=18) # Higher quality
reader.to_mp4("output.mp4", codec="libx265") # HEVC
reader.to_mp4("out.mp4", frame_range=(0, 100)) # First 100 frames
References
----------
- https://github.com/pdollar/toolbox/blob/master/videos/seqReaderPlugin.m
- https://github.com/pdollar/toolbox/blob/master/videos/seqWriterPlugin.m
- https://github.com/soft-matter/pims/blob/main/pims/norpix_reader.py
- https://github.com/nclack/seq
"""
from __future__ import annotations
import datetime
import io
import json
import os
import struct
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterator
import numpy as np
from numpy.typing import NDArray
try:
from PIL import Image
except ImportError:
Image = None
# Image format codec mapping (from SeqIo MATLAB toolbox)
IMAGE_FORMAT_CODES = {
100: "monoraw", # Grayscale uncompressed
200: "raw", # Color BGR uncompressed
101: "brgb8", # Bayer pattern raw
102: "monojpg", # Grayscale JPEG compressed
201: "jpg", # Color JPEG compressed
103: "jbrgb", # Bayer JPEG compressed
1: "monopng", # Grayscale PNG compressed
2: "png", # Color PNG compressed
}
# Codec name aliases
CODEC_ALIASES = {
"monoraw": "imageFormat100",
"raw": "imageFormat200",
"brgb8": "imageFormat101",
"monojpg": "imageFormat102",
"jpg": "imageFormat201",
"jbrgb": "imageFormat103",
"monopng": "imageFormat001",
"png": "imageFormat002",
}
# Which codecs use compression (variable-length frames)
COMPRESSED_CODECS = {"monojpg", "jpg", "jbrgb", "monopng", "png"}
UNCOMPRESSED_CODECS = {"monoraw", "raw", "brgb8"}
HEADER_SIZE = 1024
MAGIC = 0xFEED
@dataclass
class SeqHeader:
"""Parsed header of a .seq file."""
magic: int = MAGIC
name: str = "Norpix seq"
version: int = 0
header_size: int = HEADER_SIZE
description: str = ""
width: int = 0
height: int = 0
bit_depth: int = 8
bit_depth_real: int = 8
image_size_bytes: int = 0
image_format: int = 100
num_frames: int = 0
true_image_size: int = 0
fps: float = 30.0
codec: str = ""
@property
def codec_name(self) -> str:
"""Human-readable codec name."""
return IMAGE_FORMAT_CODES.get(self.image_format, f"unknown({self.image_format})")
@property
def is_compressed(self) -> bool:
"""Whether frames use variable-length compression."""
return self.codec_name in COMPRESSED_CODECS
@property
def num_channels(self) -> int:
"""Number of color channels."""
return self.bit_depth // (self.bit_depth_real or 8)
@classmethod
def from_file(cls, f) -> SeqHeader:
"""Read and parse the header from an open file handle."""
f.seek(0)
raw = f.read(HEADER_SIZE)
if len(raw) < HEADER_SIZE:
raise ValueError("File too small to contain a valid .seq header")
# Magic number (bytes 0-3)
magic = struct.unpack_from("<I", raw, 0)[0]
if magic != MAGIC:
raise ValueError(f"Invalid .seq magic: 0x{magic:08X} (expected 0x{MAGIC:08X})")
# Name string (bytes 4-23, 10 uint16 chars)
name_chars = struct.unpack_from("<10H", raw, 4)
name = "".join(chr(c) for c in name_chars if 0 < c < 128).strip()
# 4 bytes padding (24-27)
# Version and header size (bytes 28-35)
version, header_size = struct.unpack_from("<iI", raw, 28)
# Description (bytes 36-547, 256 uint16 chars)
desc_chars = struct.unpack_from("<256H", raw, 36)
description = "".join(chr(c) for c in desc_chars if 0 < c < 128).strip()
# 9 uint32 fields (bytes 548-583)
fields = struct.unpack_from("<9I", raw, 548)
width = fields[0]
height = fields[1]
bit_depth = fields[2]
bit_depth_real = fields[3]
image_size_bytes = fields[4]
image_format = fields[5]
num_frames = fields[6]
# fields[7] is reserved (should be 0)
true_image_size = fields[8]
# Frame rate (bytes 584-591)
fps = struct.unpack_from("<d", raw, 584)[0]
codec = f"imageFormat{image_format:03d}"
return cls(
magic=magic,
name=name,
version=version,
header_size=header_size,
description=description,
width=width,
height=height,
bit_depth=bit_depth,
bit_depth_real=bit_depth_real,
image_size_bytes=image_size_bytes,
image_format=image_format,
num_frames=num_frames,
true_image_size=true_image_size,
fps=fps,
codec=codec,
)
@dataclass
class SeqIndex:
"""Frame seek index for a .seq file.
For uncompressed formats, frame offsets are computed arithmetically.
For compressed formats, a seek table must be built by scanning the file.
"""
offsets: list[int] = field(default_factory=list)
num_frames: int = 0
timestamp_size: int = 8 # 8 bytes for version >= 5, 6 for older
def frame_offset(self, frame: int) -> int:
"""Get the byte offset for a given frame number."""
if frame < 0 or frame >= self.num_frames:
raise IndexError(f"Frame {frame} out of range [0, {self.num_frames})")
return self.offsets[frame]
def save(self, path: str | Path) -> None:
"""Save the seek index to a JSON file."""
data = {
"num_frames": self.num_frames,
"timestamp_size": self.timestamp_size,
"offsets": self.offsets,
}
with open(path, "w") as f:
json.dump(data, f)
@classmethod
def load(cls, path: str | Path) -> SeqIndex:
"""Load a seek index from a JSON file."""
with open(path) as f:
data = json.load(f)
return cls(
offsets=data["offsets"],
num_frames=data["num_frames"],
timestamp_size=data.get("timestamp_size", 8),
)
@classmethod
def build_uncompressed(cls, header: SeqHeader) -> SeqIndex:
"""Build index for uncompressed formats (constant frame stride)."""
offsets = [
HEADER_SIZE + i * header.true_image_size
for i in range(header.num_frames)
]
return cls(
offsets=offsets,
num_frames=header.num_frames,
timestamp_size=8 if header.version >= 5 else 6,
)
@classmethod
def build_compressed(cls, f, header: SeqHeader) -> SeqIndex:
"""Build index for compressed formats by scanning the file.
This replicates the MATLAB seqReaderPlugin seek-building logic:
- Start at offset 1024
- Read 4-byte size, skip (size - 4) bytes of image data
- Skip `extra` bytes (timestamp: 8 for version >= 5, else 6)
- On second frame, probe for 8 bytes of padding after timestamp
Args:
f: Open file handle in binary read mode.
header: Parsed SeqHeader.
Returns:
SeqIndex with computed frame offsets.
"""
file_size = f.seek(0, 2)
n_max = header.num_frames if header.num_frames > 0 else 10_000_000
# Timestamp size: 8 bytes for version >= 5 (4 sec + 2 ms + 2 us)
# 6 bytes for older versions (4 sec + 2 ms)
ts_size = 8 if header.version >= 5 else 6
extra = ts_size # May grow to ts_size + 8 if padding detected
offsets = [HEADER_SIZE]
for i in range(1, n_max):
prev = offsets[i - 1]
f.seek(prev)
size_bytes = f.read(4)
if len(size_bytes) < 4:
break
frame_size = struct.unpack("<I", size_bytes)[0]
next_offset = prev + frame_size + extra
if next_offset >= file_size:
break
# On second frame, detect if there's 8 bytes of padding after ts
if i == 1:
f.seek(next_offset)
probe = f.read(4)
if len(probe) >= 4:
probe_val = struct.unpack("<I", probe)[0]
if probe_val == 0:
# Padding detected
extra += 8
next_offset += 8
if next_offset >= file_size:
break
# Validate: at next_offset, we should find a valid frame size
f.seek(next_offset)
check = f.read(4)
if len(check) < 4:
break
check_size = struct.unpack("<I", check)[0]
# Sanity check: size should be reasonable
if check_size == 0 or check_size > file_size:
break
offsets.append(next_offset)
return cls(
offsets=offsets,
num_frames=len(offsets),
timestamp_size=ts_size,
)
class SeqReader:
"""Reader for Norpix .seq video files.
Supports both uncompressed (raw, monoraw) and compressed (jpg, monojpg, png,
monopng) image formats. Implements the seek index cache matching the MATLAB
seqReaderPlugin behavior.
Usage:
reader = SeqReader("video.seq")
print(reader.header)
frame = reader.get_frame(0)
ts = reader.get_timestamp(0)
reader.close()
Or as a context manager:
with SeqReader("video.seq") as reader:
for i, frame in enumerate(reader):
...
Attributes:
header: Parsed SeqHeader with file metadata.
index: SeqIndex with frame byte offsets.
"""
def __init__(self, path: str | Path):
self.path = Path(path)
if not self.path.exists():
raise FileNotFoundError(f"File not found: {self.path}")
self._file = open(self.path, "rb")
self.header = SeqHeader.from_file(self._file)
self.index = self._load_or_build_index()
# Recompute FPS from timestamps (MATLAB does this)
self._recompute_fps()
def _load_or_build_index(self) -> SeqIndex:
"""Load cached index or build one by scanning the file."""
if not self.header.is_compressed:
return SeqIndex.build_uncompressed(self.header)
# Check for cached index
cache_path = self.path.with_suffix(".seq-index.json")
if cache_path.exists():
try:
idx = SeqIndex.load(cache_path)
if idx.num_frames > 0:
return idx
except (json.JSONDecodeError, KeyError):
pass
# Build by scanning
idx = SeqIndex.build_compressed(self._file, self.header)
# Cache for next time
try:
idx.save(cache_path)
except OSError:
pass
return idx
def _recompute_fps(self) -> None:
"""Recompute FPS from actual timestamps (first 100 frames)."""
n = min(100, len(self))
if n < 2:
return
ts = np.array([self.get_timestamp_float(i) for i in range(n)])
ds = np.diff(ts)
median_ds = np.median(ds)
# Filter outliers
ds = ds[np.abs(ds - median_ds) < 0.005]
if len(ds) > 0:
self.header.fps = 1.0 / np.mean(ds)
@property
def num_frames(self) -> int:
return self.index.num_frames
def _read_raw_frame(self, frame: int) -> tuple[bytes, int]:
"""Read raw frame bytes and return (data_bytes, offset_after_data).
For uncompressed: reads image_size_bytes of raw pixel data.
For compressed: reads the size field, then (size-4) bytes of encoded data.
Returns:
Tuple of (raw_bytes, file_position_after_image_data).
"""
offset = self.index.frame_offset(frame)
self._file.seek(offset)
if self.header.is_compressed:
nbytes = struct.unpack("<I", self._file.read(4))[0]
data = self._file.read(nbytes - 4)
return data, self._file.tell()
else:
data = self._file.read(self.header.image_size_bytes)
return data, self._file.tell()
def get_frame(self, frame: int) -> NDArray[np.uint8]:
"""Read and decode a single frame.
Args:
frame: Frame index (0-based). Negative indices count from the end.
Returns:
Decoded image as numpy array with shape (height, width) for grayscale
or (height, width, 3) for color (RGB).
"""
if frame < 0:
frame = self.num_frames + frame
if frame < 0 or frame >= self.num_frames:
raise IndexError(f"Frame {frame} out of range [0, {self.num_frames})")
data, _ = self._read_raw_frame(frame)
return self._decode_frame(data)
def _decode_frame(self, data: bytes) -> NDArray[np.uint8]:
"""Decode raw frame bytes into a numpy image array."""
codec = self.header.codec_name
if codec in ("monoraw", "raw", "brgb8"):
return self._decode_raw(data)
elif codec in ("monojpg", "jpg", "jbrgb"):
return self._decode_jpeg(data)
elif codec in ("monopng", "png"):
return self._decode_png(data)
else:
raise ValueError(f"Unsupported codec: {codec}")
def _decode_raw(self, data: bytes) -> NDArray[np.uint8]:
"""Decode uncompressed raw pixel data."""
h, w = self.header.height, self.header.width
nch = self.header.num_channels
arr = np.frombuffer(data, dtype=np.uint8)
if nch == 1:
return arr.reshape(h, w)
else:
# Data stored as BGR interleaved
arr = arr.reshape(h, w, nch)
# Convert BGR -> RGB
return arr[:, :, ::-1].copy()
def _decode_jpeg(self, data: bytes) -> NDArray[np.uint8]:
"""Decode JPEG compressed frame."""
if Image is None:
raise ImportError("Pillow is required for JPEG decoding: pip install Pillow")
img = Image.open(io.BytesIO(data))
return np.array(img)
def _decode_png(self, data: bytes) -> NDArray[np.uint8]:
"""Decode PNG compressed frame."""
if Image is None:
raise ImportError("Pillow is required for PNG decoding: pip install Pillow")
img = Image.open(io.BytesIO(data))
return np.array(img)
def get_timestamp(self, frame: int) -> datetime.datetime:
"""Get the timestamp for a frame as a datetime object.
Note: .seq files store local time, not UTC.
"""
tfloat = self.get_timestamp_float(frame)
return datetime.datetime.fromtimestamp(tfloat)
def get_timestamp_float(self, frame: int) -> float:
"""Get the timestamp for a frame as seconds since epoch."""
if frame < 0:
frame = self.num_frames + frame
if frame < 0 or frame >= self.num_frames:
raise IndexError(f"Frame {frame} out of range [0, {self.num_frames})")
_, pos_after_data = self._read_raw_frame(frame)
self._file.seek(pos_after_data)
ts_sec = struct.unpack("<I", self._file.read(4))[0]
ts_ms = struct.unpack("<H", self._file.read(2))[0]
result = ts_sec + ts_ms / 1000.0
if self.index.timestamp_size == 8:
ts_us = struct.unpack("<H", self._file.read(2))[0]
result += ts_us / 1_000_000.0
return result
def get_timestamps_float(self) -> NDArray[np.float64]:
"""Get all frame timestamps as an array of seconds since epoch."""
return np.array([self.get_timestamp_float(i) for i in range(self.num_frames)])
def get_frame_with_timestamp(
self, frame: int
) -> tuple[NDArray[np.uint8], float]:
"""Read a frame and its timestamp in a single seek operation."""
if frame < 0:
frame = self.num_frames + frame
if frame < 0 or frame >= self.num_frames:
raise IndexError(f"Frame {frame} out of range [0, {self.num_frames})")
data, pos = self._read_raw_frame(frame)
img = self._decode_frame(data)
self._file.seek(pos)
ts_sec = struct.unpack("<I", self._file.read(4))[0]
ts_ms = struct.unpack("<H", self._file.read(2))[0]
ts = ts_sec + ts_ms / 1000.0
if self.index.timestamp_size == 8:
ts_us = struct.unpack("<H", self._file.read(2))[0]
ts += ts_us / 1_000_000.0
return img, ts
def to_mp4(
self,
output: str | Path | None = None,
*,
fps: float | None = None,
codec: str = "libx264",
crf: int = 25,
preset: str = "superfast",
pixelformat: str = "yuv420p",
frame_range: tuple[int, int] | None = None,
output_params: list[str] | None = None,
) -> Path:
"""Re-encode the .seq file to MP4 using ffmpeg (via sleap-io's VideoWriter).
Args:
output: Output MP4 path. Defaults to same name with .mp4 extension.
fps: Output frame rate. Defaults to the source FPS.
codec: Video codec (e.g., "libx264", "libx265"). Default: "libx264".
crf: Constant rate factor (2-51, lower = higher quality). Default: 25.
preset: Encoding speed preset (ultrafast, superfast, veryfast, faster,
fast, medium, slow, slower, veryslow). Default: "superfast".
pixelformat: Pixel format for output. Default: "yuv420p".
frame_range: Optional (start, end) tuple for a subset of frames.
Both indices are 0-based, end is exclusive.
output_params: Additional ffmpeg output parameters as a list of strings.
Returns:
Path to the output MP4 file.
"""
try:
from sleap_io.io.video_writing import VideoWriter
except ImportError:
raise ImportError(
"sleap-io is required for MP4 encoding: "
"uv pip install sleap-io"
)
if output is None:
output = self.path.with_suffix(".mp4")
output = Path(output)
if fps is None:
fps = self.header.fps
start = 0
end = self.num_frames
if frame_range is not None:
start, end = frame_range
start = max(0, start)
end = min(end, self.num_frames)
total = end - start
is_grayscale = self.header.num_channels == 1
writer = VideoWriter(
filename=output,
fps=fps,
codec=codec,
crf=crf,
preset=preset,
pixelformat=pixelformat,
output_params=output_params or [],
)
import time
t0 = time.perf_counter()
with writer:
for i in range(start, end):
frame = self.get_frame(i)
# h264 with yuv420p requires 3-channel input
if is_grayscale:
frame = np.stack([frame, frame, frame], axis=-1)
writer(frame)
done = i - start + 1
if done % 5000 == 0 or done == total:
elapsed = time.perf_counter() - t0
speed = done / elapsed
eta = (total - done) / speed if speed > 0 else 0
print(
f" [{done}/{total}] "
f"{speed:.0f} fps, "
f"ETA {eta:.0f}s"
)
elapsed = time.perf_counter() - t0
print(
f"Wrote {total} frames to {output} "
f"in {elapsed:.1f}s ({total / elapsed:.0f} fps)"
)
return output
def __len__(self) -> int:
return self.num_frames
def __getitem__(self, idx: int | slice) -> NDArray[np.uint8]:
if isinstance(idx, slice):
indices = range(*idx.indices(self.num_frames))
return np.stack([self.get_frame(i) for i in indices])
return self.get_frame(idx)
def __iter__(self) -> Iterator[NDArray[np.uint8]]:
for i in range(self.num_frames):
yield self.get_frame(i)
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def close(self) -> None:
"""Close the underlying file handle."""
if hasattr(self, "_file") and self._file and not self._file.closed:
self._file.close()
def __del__(self):
self.close()
def __repr__(self) -> str:
return (
f"SeqReader('{self.path.name}')\n"
f" Frames: {self.num_frames}\n"
f" Size: {self.header.width}x{self.header.height}\n"
f" Codec: {self.header.codec_name} ({self.header.codec})\n"
f" FPS: {self.header.fps:.2f}\n"
f" Bit depth: {self.header.bit_depth_real}"
)
def _fmt_size(nbytes: int | float) -> str:
"""Format a byte count as a human-readable string."""
for unit in ("B", "KB", "MB", "GB", "TB"):
if abs(nbytes) < 1024:
return f"{nbytes:.1f} {unit}" if unit != "B" else f"{int(nbytes)} B"
nbytes /= 1024
return f"{nbytes:.1f} PB"
def _print_input_stats(reader: SeqReader) -> None:
"""Print detailed stats about the input .seq file."""
h = reader.header
file_size = os.path.getsize(reader.path)
raw_frame_bytes = h.width * h.height * (h.bit_depth // 8)
raw_total = raw_frame_bytes * reader.num_frames
duration = reader.num_frames / h.fps if h.fps > 0 else 0
ts_first = reader.get_timestamp(0)
ts_last = reader.get_timestamp(-1)
print(f"Input: {reader.path}")
print(f" File size: {_fmt_size(file_size)}")
print(f" Frames: {reader.num_frames}")
print(f" Resolution: {h.width} x {h.height}")
print(f" Bit depth: {h.bit_depth_real}-bit")
print(f" Codec: {h.codec_name} ({h.codec})")
print(f" FPS: {h.fps:.2f}")
print(f" Duration: {duration:.1f}s ({duration / 60:.1f} min)")
print(f" Timestamps: {ts_first} -> {ts_last}")
print(f" Raw size: {_fmt_size(raw_total)} "
f"({h.width}x{h.height}x{h.bit_depth // 8} x {reader.num_frames})")
if raw_total > 0:
seq_ratio = file_size / raw_total
print(f" Seq ratio: {seq_ratio:.3f}x ({_fmt_size(file_size)} / {_fmt_size(raw_total)})")
# Benchmark
import time
n_bench = min(100, reader.num_frames)
t0 = time.perf_counter()
for i in range(n_bench):
_ = reader.get_frame(i)
t1 = time.perf_counter()
print(f" Decode speed: {n_bench / (t1 - t0):.0f} fps ({n_bench} frames in {t1 - t0:.3f}s)")
def _print_output_stats(
reader: SeqReader, output: Path, frame_range: tuple[int, int] | None
) -> None:
"""Print stats about the output MP4 and verify frame count."""
h = reader.header
start = frame_range[0] if frame_range else 0
end = frame_range[1] if frame_range else reader.num_frames
n_frames = end - start
out_size = os.path.getsize(output)
raw_frame_bytes = h.width * h.height * (h.bit_depth // 8)
raw_total = raw_frame_bytes * n_frames
duration = n_frames / h.fps if h.fps > 0 else 0
# Compute input size for the same frame range
if h.is_compressed:
# For compressed, sum actual frame sizes from the index
if end <= len(reader.index.offsets):
if end < len(reader.index.offsets):
seq_range_size = reader.index.offsets[end] - reader.index.offsets[start]
else:
seq_range_size = os.path.getsize(reader.path) - reader.index.offsets[start]
else:
seq_range_size = os.path.getsize(reader.path) - HEADER_SIZE
else:
seq_range_size = h.true_image_size * n_frames
print(f"\nOutput: {output}")
print(f" File size: {_fmt_size(out_size)}")
print(f" Frames: {n_frames}" + (
f" [{start}:{end}]" if frame_range else ""))
print(f" Duration: {duration:.1f}s ({duration / 60:.1f} min)")
if raw_total > 0:
mp4_ratio = out_size / raw_total
print(f" vs raw: {mp4_ratio:.4f}x ({_fmt_size(out_size)} / {_fmt_size(raw_total)})")
if seq_range_size > 0:
vs_seq = out_size / seq_range_size
print(f" vs seq: {vs_seq:.3f}x ({_fmt_size(out_size)} / {_fmt_size(seq_range_size)})")
# Verify frame count
try:
import sleap_io as sio
vid = sio.Video(str(output))
match = len(vid) == n_frames
print(f" Verification: {len(vid)} frames ({'OK' if match else f'MISMATCH, expected {n_frames}'})")
except Exception:
pass
def main():
import argparse
parser = argparse.ArgumentParser(
description="Read and convert Norpix .seq video files.",
epilog="Examples:\n"
" uv run -p 3.12 seq_reader.py video.seq # Print info\n"
" uv run -p 3.12 seq_reader.py video.seq --to-mp4 # Convert to MP4\n"
" uv run -p 3.12 seq_reader.py video.seq --to-mp4 --crf 18 # High quality\n"
" uv run -p 3.12 seq_reader.py video.seq --to-mp4 -o out.mp4 # Custom output\n",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("input", help="Path to the .seq file")
parser.add_argument(
"--to-mp4", action="store_true", help="Convert to MP4"
)
parser.add_argument("-o", "--output", help="Output MP4 path (default: <input>.mp4)")
parser.add_argument("--fps", type=float, help="Output frame rate (default: source FPS)")
parser.add_argument(
"--codec", default="libx264", help="Video codec (default: libx264)"
)
parser.add_argument(
"--crf", type=int, default=25,
help="Constant rate factor, 2-51, lower=better (default: 25)",
)
parser.add_argument(
"--preset", default="superfast",
help="Encoding preset (default: superfast)",
)
parser.add_argument(
"--pixelformat", default="yuv420p",
help="Pixel format (default: yuv420p)",
)
parser.add_argument("--start", type=int, default=0, help="Start frame (default: 0)")
parser.add_argument("--end", type=int, default=None, help="End frame, exclusive (default: all)")
args = parser.parse_args()
with SeqReader(args.input) as reader:
_print_input_stats(reader)
if args.to_mp4:
frame_range = None
if args.start != 0 or args.end is not None:
frame_range = (args.start, args.end or reader.num_frames)
print()
out = reader.to_mp4(
output=args.output,
fps=args.fps,
codec=args.codec,
crf=args.crf,
preset=args.preset,
pixelformat=args.pixelformat,
frame_range=frame_range,
)
_print_output_stats(reader, out, frame_range)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment