Created
March 6, 2026 20:06
-
-
Save talmo/1841b3aed5d513686ae38f845b4d2d24 to your computer and use it in GitHub Desktop.
Pure Python reader and converter for Norpix .seq video files (Piotr Dollar SeqIo format). Supports compressed (JPEG/PNG) and uncompressed formats with seek index caching, MP4 re-encoding via sleap-io, and CLI interface. Self-contained with PEP 723 inline dependencies — just uv run it.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [ | |
| # "numpy", | |
| # "Pillow", | |
| # "imageio-ffmpeg", | |
| # "sleap-io", | |
| # ] | |
| # /// | |
| """Pure Python reader and converter for Norpix .seq video files. | |
| This implements the seq file format used by StreamPix / Norpix and Piotr Dollar's | |
| MATLAB SeqIo toolbox. Supports both uncompressed (raw) and compressed (JPEG, PNG) | |
| image formats, with optional re-encoding to MP4 via ffmpeg. | |
| Seq File Format | |
| --------------- | |
| The .seq format stores a sequence of image frames with a fixed 1024-byte header | |
| followed by concatenated frame data. It was originally designed by Norpix for their | |
| StreamPix recording software and later adopted by Piotr Dollar's Computer Vision | |
| MATLAB Toolbox. | |
| Header layout (1024 bytes, little-endian): | |
| Offset Size Type Field | |
| 0 4 uint32 Magic number (0xFEED) | |
| 4 20 10×uint16 Name string ("Norpix seq") | |
| 24 4 - Padding | |
| 28 4 int32 Version (>= 5 for StreamPix 6+) | |
| 32 4 uint32 Header size (always 1024) | |
| 36 512 256×uint16 Description string | |
| 548 4 uint32 Image width | |
| 552 4 uint32 Image height | |
| 556 4 uint32 Bit depth | |
| 560 4 uint32 Bit depth (real) | |
| 564 4 uint32 Image size in bytes | |
| 568 4 uint32 Image format code (see below) | |
| 572 4 uint32 Number of allocated frames | |
| 576 4 uint32 Reserved (0) | |
| 580 4 uint32 True image size (stride per frame, uncompressed) | |
| 584 8 float64 Suggested frame rate | |
| Image format codes: | |
| 100 = monoraw Grayscale uncompressed | |
| 200 = raw Color BGR uncompressed | |
| 101 = brgb8 Bayer pattern raw | |
| 102 = monojpg Grayscale JPEG compressed | |
| 201 = jpg Color JPEG compressed | |
| 103 = jbrgb Bayer JPEG compressed | |
| 1 = monopng Grayscale PNG compressed | |
| 2 = png Color PNG compressed | |
| Frame layout (compressed formats): | |
| 4 bytes uint32 Frame size (includes these 4 bytes) | |
| N-4 bytes Encoded image data (JPEG or PNG) | |
| 4 bytes uint32 Timestamp: seconds since epoch | |
| 2 bytes uint16 Timestamp: milliseconds | |
| 2 bytes uint16 Timestamp: microseconds (version >= 5 only) | |
| [0 or 8] Optional padding (auto-detected) | |
| Frame layout (uncompressed formats): | |
| Each frame occupies exactly `true_image_size` bytes at a fixed stride from | |
| the header. Image data is followed by the same timestamp fields. | |
| Seek index: | |
| For compressed formats, frames have variable sizes, so a seek index mapping | |
| frame number -> byte offset must be built by scanning the file sequentially. | |
| This index is cached to `<filename>.seq-index.json` for fast subsequent loads. | |
| This mirrors the MATLAB `-seek.mat` caching behavior. | |
| Dependencies | |
| ------------ | |
| All dependencies are declared inline via PEP 723 and resolved automatically | |
| by ``uv run``. No manual installation needed. | |
| - numpy: Frame data as arrays | |
| - Pillow: JPEG/PNG decoding | |
| - imageio-ffmpeg: MP4 encoding (only needed for ``to_mp4()``) | |
| - sleap-io: Video writing backend for MP4 encoding | |
| Usage (CLI) | |
| ----------- | |
| All commands use ``uv run -p <python-version>`` which auto-installs | |
| dependencies in an isolated environment based on the PEP 723 inline metadata | |
| at the top of this script. No venv setup or pip install needed. | |
| # Print file info and benchmark | |
| uv run -p 3.12 seq_reader.py video.seq | |
| # Convert to MP4 | |
| uv run -p 3.12 seq_reader.py video.seq --to-mp4 | |
| # Convert with options | |
| uv run -p 3.12 seq_reader.py video.seq --to-mp4 -o output.mp4 --crf 18 --codec libx265 | |
| # Convert a range of frames | |
| uv run -p 3.12 seq_reader.py video.seq --to-mp4 -o tmp.mp4 --start 0 --end 1000 | |
| Usage (Python API) | |
| ------------------ | |
| When importing as a module, ensure the dependencies are installed in your | |
| environment (e.g., ``uv add numpy Pillow imageio-ffmpeg sleap-io``). | |
| from seq_reader import SeqReader | |
| # Read frames | |
| with SeqReader("video.seq") as reader: | |
| print(reader) # Summary info | |
| frame = reader.get_frame(0) # Single frame -> (H, W) uint8 | |
| frame = reader[100] # Indexing | |
| batch = reader[10:20] # Slicing -> (N, H, W) uint8 | |
| ts = reader.get_timestamp(0) # datetime object | |
| ts_f = reader.get_timestamp_float(0) # Seconds since epoch | |
| img, ts = reader.get_frame_with_timestamp(0) # Both at once | |
| # Convert to MP4 | |
| with SeqReader("video.seq") as reader: | |
| reader.to_mp4("output.mp4") # Default settings | |
| reader.to_mp4("output.mp4", crf=18) # Higher quality | |
| reader.to_mp4("output.mp4", codec="libx265") # HEVC | |
| reader.to_mp4("out.mp4", frame_range=(0, 100)) # First 100 frames | |
| References | |
| ---------- | |
| - https://github.com/pdollar/toolbox/blob/master/videos/seqReaderPlugin.m | |
| - https://github.com/pdollar/toolbox/blob/master/videos/seqWriterPlugin.m | |
| - https://github.com/soft-matter/pims/blob/main/pims/norpix_reader.py | |
| - https://github.com/nclack/seq | |
| """ | |
| from __future__ import annotations | |
| import datetime | |
| import io | |
| import json | |
| import os | |
| import struct | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Iterator | |
| import numpy as np | |
| from numpy.typing import NDArray | |
| try: | |
| from PIL import Image | |
| except ImportError: | |
| Image = None | |
| # Image format codec mapping (from SeqIo MATLAB toolbox) | |
| IMAGE_FORMAT_CODES = { | |
| 100: "monoraw", # Grayscale uncompressed | |
| 200: "raw", # Color BGR uncompressed | |
| 101: "brgb8", # Bayer pattern raw | |
| 102: "monojpg", # Grayscale JPEG compressed | |
| 201: "jpg", # Color JPEG compressed | |
| 103: "jbrgb", # Bayer JPEG compressed | |
| 1: "monopng", # Grayscale PNG compressed | |
| 2: "png", # Color PNG compressed | |
| } | |
| # Codec name aliases | |
| CODEC_ALIASES = { | |
| "monoraw": "imageFormat100", | |
| "raw": "imageFormat200", | |
| "brgb8": "imageFormat101", | |
| "monojpg": "imageFormat102", | |
| "jpg": "imageFormat201", | |
| "jbrgb": "imageFormat103", | |
| "monopng": "imageFormat001", | |
| "png": "imageFormat002", | |
| } | |
| # Which codecs use compression (variable-length frames) | |
| COMPRESSED_CODECS = {"monojpg", "jpg", "jbrgb", "monopng", "png"} | |
| UNCOMPRESSED_CODECS = {"monoraw", "raw", "brgb8"} | |
| HEADER_SIZE = 1024 | |
| MAGIC = 0xFEED | |
| @dataclass | |
| class SeqHeader: | |
| """Parsed header of a .seq file.""" | |
| magic: int = MAGIC | |
| name: str = "Norpix seq" | |
| version: int = 0 | |
| header_size: int = HEADER_SIZE | |
| description: str = "" | |
| width: int = 0 | |
| height: int = 0 | |
| bit_depth: int = 8 | |
| bit_depth_real: int = 8 | |
| image_size_bytes: int = 0 | |
| image_format: int = 100 | |
| num_frames: int = 0 | |
| true_image_size: int = 0 | |
| fps: float = 30.0 | |
| codec: str = "" | |
| @property | |
| def codec_name(self) -> str: | |
| """Human-readable codec name.""" | |
| return IMAGE_FORMAT_CODES.get(self.image_format, f"unknown({self.image_format})") | |
| @property | |
| def is_compressed(self) -> bool: | |
| """Whether frames use variable-length compression.""" | |
| return self.codec_name in COMPRESSED_CODECS | |
| @property | |
| def num_channels(self) -> int: | |
| """Number of color channels.""" | |
| return self.bit_depth // (self.bit_depth_real or 8) | |
| @classmethod | |
| def from_file(cls, f) -> SeqHeader: | |
| """Read and parse the header from an open file handle.""" | |
| f.seek(0) | |
| raw = f.read(HEADER_SIZE) | |
| if len(raw) < HEADER_SIZE: | |
| raise ValueError("File too small to contain a valid .seq header") | |
| # Magic number (bytes 0-3) | |
| magic = struct.unpack_from("<I", raw, 0)[0] | |
| if magic != MAGIC: | |
| raise ValueError(f"Invalid .seq magic: 0x{magic:08X} (expected 0x{MAGIC:08X})") | |
| # Name string (bytes 4-23, 10 uint16 chars) | |
| name_chars = struct.unpack_from("<10H", raw, 4) | |
| name = "".join(chr(c) for c in name_chars if 0 < c < 128).strip() | |
| # 4 bytes padding (24-27) | |
| # Version and header size (bytes 28-35) | |
| version, header_size = struct.unpack_from("<iI", raw, 28) | |
| # Description (bytes 36-547, 256 uint16 chars) | |
| desc_chars = struct.unpack_from("<256H", raw, 36) | |
| description = "".join(chr(c) for c in desc_chars if 0 < c < 128).strip() | |
| # 9 uint32 fields (bytes 548-583) | |
| fields = struct.unpack_from("<9I", raw, 548) | |
| width = fields[0] | |
| height = fields[1] | |
| bit_depth = fields[2] | |
| bit_depth_real = fields[3] | |
| image_size_bytes = fields[4] | |
| image_format = fields[5] | |
| num_frames = fields[6] | |
| # fields[7] is reserved (should be 0) | |
| true_image_size = fields[8] | |
| # Frame rate (bytes 584-591) | |
| fps = struct.unpack_from("<d", raw, 584)[0] | |
| codec = f"imageFormat{image_format:03d}" | |
| return cls( | |
| magic=magic, | |
| name=name, | |
| version=version, | |
| header_size=header_size, | |
| description=description, | |
| width=width, | |
| height=height, | |
| bit_depth=bit_depth, | |
| bit_depth_real=bit_depth_real, | |
| image_size_bytes=image_size_bytes, | |
| image_format=image_format, | |
| num_frames=num_frames, | |
| true_image_size=true_image_size, | |
| fps=fps, | |
| codec=codec, | |
| ) | |
| @dataclass | |
| class SeqIndex: | |
| """Frame seek index for a .seq file. | |
| For uncompressed formats, frame offsets are computed arithmetically. | |
| For compressed formats, a seek table must be built by scanning the file. | |
| """ | |
| offsets: list[int] = field(default_factory=list) | |
| num_frames: int = 0 | |
| timestamp_size: int = 8 # 8 bytes for version >= 5, 6 for older | |
| def frame_offset(self, frame: int) -> int: | |
| """Get the byte offset for a given frame number.""" | |
| if frame < 0 or frame >= self.num_frames: | |
| raise IndexError(f"Frame {frame} out of range [0, {self.num_frames})") | |
| return self.offsets[frame] | |
| def save(self, path: str | Path) -> None: | |
| """Save the seek index to a JSON file.""" | |
| data = { | |
| "num_frames": self.num_frames, | |
| "timestamp_size": self.timestamp_size, | |
| "offsets": self.offsets, | |
| } | |
| with open(path, "w") as f: | |
| json.dump(data, f) | |
| @classmethod | |
| def load(cls, path: str | Path) -> SeqIndex: | |
| """Load a seek index from a JSON file.""" | |
| with open(path) as f: | |
| data = json.load(f) | |
| return cls( | |
| offsets=data["offsets"], | |
| num_frames=data["num_frames"], | |
| timestamp_size=data.get("timestamp_size", 8), | |
| ) | |
| @classmethod | |
| def build_uncompressed(cls, header: SeqHeader) -> SeqIndex: | |
| """Build index for uncompressed formats (constant frame stride).""" | |
| offsets = [ | |
| HEADER_SIZE + i * header.true_image_size | |
| for i in range(header.num_frames) | |
| ] | |
| return cls( | |
| offsets=offsets, | |
| num_frames=header.num_frames, | |
| timestamp_size=8 if header.version >= 5 else 6, | |
| ) | |
| @classmethod | |
| def build_compressed(cls, f, header: SeqHeader) -> SeqIndex: | |
| """Build index for compressed formats by scanning the file. | |
| This replicates the MATLAB seqReaderPlugin seek-building logic: | |
| - Start at offset 1024 | |
| - Read 4-byte size, skip (size - 4) bytes of image data | |
| - Skip `extra` bytes (timestamp: 8 for version >= 5, else 6) | |
| - On second frame, probe for 8 bytes of padding after timestamp | |
| Args: | |
| f: Open file handle in binary read mode. | |
| header: Parsed SeqHeader. | |
| Returns: | |
| SeqIndex with computed frame offsets. | |
| """ | |
| file_size = f.seek(0, 2) | |
| n_max = header.num_frames if header.num_frames > 0 else 10_000_000 | |
| # Timestamp size: 8 bytes for version >= 5 (4 sec + 2 ms + 2 us) | |
| # 6 bytes for older versions (4 sec + 2 ms) | |
| ts_size = 8 if header.version >= 5 else 6 | |
| extra = ts_size # May grow to ts_size + 8 if padding detected | |
| offsets = [HEADER_SIZE] | |
| for i in range(1, n_max): | |
| prev = offsets[i - 1] | |
| f.seek(prev) | |
| size_bytes = f.read(4) | |
| if len(size_bytes) < 4: | |
| break | |
| frame_size = struct.unpack("<I", size_bytes)[0] | |
| next_offset = prev + frame_size + extra | |
| if next_offset >= file_size: | |
| break | |
| # On second frame, detect if there's 8 bytes of padding after ts | |
| if i == 1: | |
| f.seek(next_offset) | |
| probe = f.read(4) | |
| if len(probe) >= 4: | |
| probe_val = struct.unpack("<I", probe)[0] | |
| if probe_val == 0: | |
| # Padding detected | |
| extra += 8 | |
| next_offset += 8 | |
| if next_offset >= file_size: | |
| break | |
| # Validate: at next_offset, we should find a valid frame size | |
| f.seek(next_offset) | |
| check = f.read(4) | |
| if len(check) < 4: | |
| break | |
| check_size = struct.unpack("<I", check)[0] | |
| # Sanity check: size should be reasonable | |
| if check_size == 0 or check_size > file_size: | |
| break | |
| offsets.append(next_offset) | |
| return cls( | |
| offsets=offsets, | |
| num_frames=len(offsets), | |
| timestamp_size=ts_size, | |
| ) | |
| class SeqReader: | |
| """Reader for Norpix .seq video files. | |
| Supports both uncompressed (raw, monoraw) and compressed (jpg, monojpg, png, | |
| monopng) image formats. Implements the seek index cache matching the MATLAB | |
| seqReaderPlugin behavior. | |
| Usage: | |
| reader = SeqReader("video.seq") | |
| print(reader.header) | |
| frame = reader.get_frame(0) | |
| ts = reader.get_timestamp(0) | |
| reader.close() | |
| Or as a context manager: | |
| with SeqReader("video.seq") as reader: | |
| for i, frame in enumerate(reader): | |
| ... | |
| Attributes: | |
| header: Parsed SeqHeader with file metadata. | |
| index: SeqIndex with frame byte offsets. | |
| """ | |
| def __init__(self, path: str | Path): | |
| self.path = Path(path) | |
| if not self.path.exists(): | |
| raise FileNotFoundError(f"File not found: {self.path}") | |
| self._file = open(self.path, "rb") | |
| self.header = SeqHeader.from_file(self._file) | |
| self.index = self._load_or_build_index() | |
| # Recompute FPS from timestamps (MATLAB does this) | |
| self._recompute_fps() | |
| def _load_or_build_index(self) -> SeqIndex: | |
| """Load cached index or build one by scanning the file.""" | |
| if not self.header.is_compressed: | |
| return SeqIndex.build_uncompressed(self.header) | |
| # Check for cached index | |
| cache_path = self.path.with_suffix(".seq-index.json") | |
| if cache_path.exists(): | |
| try: | |
| idx = SeqIndex.load(cache_path) | |
| if idx.num_frames > 0: | |
| return idx | |
| except (json.JSONDecodeError, KeyError): | |
| pass | |
| # Build by scanning | |
| idx = SeqIndex.build_compressed(self._file, self.header) | |
| # Cache for next time | |
| try: | |
| idx.save(cache_path) | |
| except OSError: | |
| pass | |
| return idx | |
| def _recompute_fps(self) -> None: | |
| """Recompute FPS from actual timestamps (first 100 frames).""" | |
| n = min(100, len(self)) | |
| if n < 2: | |
| return | |
| ts = np.array([self.get_timestamp_float(i) for i in range(n)]) | |
| ds = np.diff(ts) | |
| median_ds = np.median(ds) | |
| # Filter outliers | |
| ds = ds[np.abs(ds - median_ds) < 0.005] | |
| if len(ds) > 0: | |
| self.header.fps = 1.0 / np.mean(ds) | |
| @property | |
| def num_frames(self) -> int: | |
| return self.index.num_frames | |
| def _read_raw_frame(self, frame: int) -> tuple[bytes, int]: | |
| """Read raw frame bytes and return (data_bytes, offset_after_data). | |
| For uncompressed: reads image_size_bytes of raw pixel data. | |
| For compressed: reads the size field, then (size-4) bytes of encoded data. | |
| Returns: | |
| Tuple of (raw_bytes, file_position_after_image_data). | |
| """ | |
| offset = self.index.frame_offset(frame) | |
| self._file.seek(offset) | |
| if self.header.is_compressed: | |
| nbytes = struct.unpack("<I", self._file.read(4))[0] | |
| data = self._file.read(nbytes - 4) | |
| return data, self._file.tell() | |
| else: | |
| data = self._file.read(self.header.image_size_bytes) | |
| return data, self._file.tell() | |
| def get_frame(self, frame: int) -> NDArray[np.uint8]: | |
| """Read and decode a single frame. | |
| Args: | |
| frame: Frame index (0-based). Negative indices count from the end. | |
| Returns: | |
| Decoded image as numpy array with shape (height, width) for grayscale | |
| or (height, width, 3) for color (RGB). | |
| """ | |
| if frame < 0: | |
| frame = self.num_frames + frame | |
| if frame < 0 or frame >= self.num_frames: | |
| raise IndexError(f"Frame {frame} out of range [0, {self.num_frames})") | |
| data, _ = self._read_raw_frame(frame) | |
| return self._decode_frame(data) | |
| def _decode_frame(self, data: bytes) -> NDArray[np.uint8]: | |
| """Decode raw frame bytes into a numpy image array.""" | |
| codec = self.header.codec_name | |
| if codec in ("monoraw", "raw", "brgb8"): | |
| return self._decode_raw(data) | |
| elif codec in ("monojpg", "jpg", "jbrgb"): | |
| return self._decode_jpeg(data) | |
| elif codec in ("monopng", "png"): | |
| return self._decode_png(data) | |
| else: | |
| raise ValueError(f"Unsupported codec: {codec}") | |
| def _decode_raw(self, data: bytes) -> NDArray[np.uint8]: | |
| """Decode uncompressed raw pixel data.""" | |
| h, w = self.header.height, self.header.width | |
| nch = self.header.num_channels | |
| arr = np.frombuffer(data, dtype=np.uint8) | |
| if nch == 1: | |
| return arr.reshape(h, w) | |
| else: | |
| # Data stored as BGR interleaved | |
| arr = arr.reshape(h, w, nch) | |
| # Convert BGR -> RGB | |
| return arr[:, :, ::-1].copy() | |
| def _decode_jpeg(self, data: bytes) -> NDArray[np.uint8]: | |
| """Decode JPEG compressed frame.""" | |
| if Image is None: | |
| raise ImportError("Pillow is required for JPEG decoding: pip install Pillow") | |
| img = Image.open(io.BytesIO(data)) | |
| return np.array(img) | |
| def _decode_png(self, data: bytes) -> NDArray[np.uint8]: | |
| """Decode PNG compressed frame.""" | |
| if Image is None: | |
| raise ImportError("Pillow is required for PNG decoding: pip install Pillow") | |
| img = Image.open(io.BytesIO(data)) | |
| return np.array(img) | |
| def get_timestamp(self, frame: int) -> datetime.datetime: | |
| """Get the timestamp for a frame as a datetime object. | |
| Note: .seq files store local time, not UTC. | |
| """ | |
| tfloat = self.get_timestamp_float(frame) | |
| return datetime.datetime.fromtimestamp(tfloat) | |
| def get_timestamp_float(self, frame: int) -> float: | |
| """Get the timestamp for a frame as seconds since epoch.""" | |
| if frame < 0: | |
| frame = self.num_frames + frame | |
| if frame < 0 or frame >= self.num_frames: | |
| raise IndexError(f"Frame {frame} out of range [0, {self.num_frames})") | |
| _, pos_after_data = self._read_raw_frame(frame) | |
| self._file.seek(pos_after_data) | |
| ts_sec = struct.unpack("<I", self._file.read(4))[0] | |
| ts_ms = struct.unpack("<H", self._file.read(2))[0] | |
| result = ts_sec + ts_ms / 1000.0 | |
| if self.index.timestamp_size == 8: | |
| ts_us = struct.unpack("<H", self._file.read(2))[0] | |
| result += ts_us / 1_000_000.0 | |
| return result | |
| def get_timestamps_float(self) -> NDArray[np.float64]: | |
| """Get all frame timestamps as an array of seconds since epoch.""" | |
| return np.array([self.get_timestamp_float(i) for i in range(self.num_frames)]) | |
| def get_frame_with_timestamp( | |
| self, frame: int | |
| ) -> tuple[NDArray[np.uint8], float]: | |
| """Read a frame and its timestamp in a single seek operation.""" | |
| if frame < 0: | |
| frame = self.num_frames + frame | |
| if frame < 0 or frame >= self.num_frames: | |
| raise IndexError(f"Frame {frame} out of range [0, {self.num_frames})") | |
| data, pos = self._read_raw_frame(frame) | |
| img = self._decode_frame(data) | |
| self._file.seek(pos) | |
| ts_sec = struct.unpack("<I", self._file.read(4))[0] | |
| ts_ms = struct.unpack("<H", self._file.read(2))[0] | |
| ts = ts_sec + ts_ms / 1000.0 | |
| if self.index.timestamp_size == 8: | |
| ts_us = struct.unpack("<H", self._file.read(2))[0] | |
| ts += ts_us / 1_000_000.0 | |
| return img, ts | |
| def to_mp4( | |
| self, | |
| output: str | Path | None = None, | |
| *, | |
| fps: float | None = None, | |
| codec: str = "libx264", | |
| crf: int = 25, | |
| preset: str = "superfast", | |
| pixelformat: str = "yuv420p", | |
| frame_range: tuple[int, int] | None = None, | |
| output_params: list[str] | None = None, | |
| ) -> Path: | |
| """Re-encode the .seq file to MP4 using ffmpeg (via sleap-io's VideoWriter). | |
| Args: | |
| output: Output MP4 path. Defaults to same name with .mp4 extension. | |
| fps: Output frame rate. Defaults to the source FPS. | |
| codec: Video codec (e.g., "libx264", "libx265"). Default: "libx264". | |
| crf: Constant rate factor (2-51, lower = higher quality). Default: 25. | |
| preset: Encoding speed preset (ultrafast, superfast, veryfast, faster, | |
| fast, medium, slow, slower, veryslow). Default: "superfast". | |
| pixelformat: Pixel format for output. Default: "yuv420p". | |
| frame_range: Optional (start, end) tuple for a subset of frames. | |
| Both indices are 0-based, end is exclusive. | |
| output_params: Additional ffmpeg output parameters as a list of strings. | |
| Returns: | |
| Path to the output MP4 file. | |
| """ | |
| try: | |
| from sleap_io.io.video_writing import VideoWriter | |
| except ImportError: | |
| raise ImportError( | |
| "sleap-io is required for MP4 encoding: " | |
| "uv pip install sleap-io" | |
| ) | |
| if output is None: | |
| output = self.path.with_suffix(".mp4") | |
| output = Path(output) | |
| if fps is None: | |
| fps = self.header.fps | |
| start = 0 | |
| end = self.num_frames | |
| if frame_range is not None: | |
| start, end = frame_range | |
| start = max(0, start) | |
| end = min(end, self.num_frames) | |
| total = end - start | |
| is_grayscale = self.header.num_channels == 1 | |
| writer = VideoWriter( | |
| filename=output, | |
| fps=fps, | |
| codec=codec, | |
| crf=crf, | |
| preset=preset, | |
| pixelformat=pixelformat, | |
| output_params=output_params or [], | |
| ) | |
| import time | |
| t0 = time.perf_counter() | |
| with writer: | |
| for i in range(start, end): | |
| frame = self.get_frame(i) | |
| # h264 with yuv420p requires 3-channel input | |
| if is_grayscale: | |
| frame = np.stack([frame, frame, frame], axis=-1) | |
| writer(frame) | |
| done = i - start + 1 | |
| if done % 5000 == 0 or done == total: | |
| elapsed = time.perf_counter() - t0 | |
| speed = done / elapsed | |
| eta = (total - done) / speed if speed > 0 else 0 | |
| print( | |
| f" [{done}/{total}] " | |
| f"{speed:.0f} fps, " | |
| f"ETA {eta:.0f}s" | |
| ) | |
| elapsed = time.perf_counter() - t0 | |
| print( | |
| f"Wrote {total} frames to {output} " | |
| f"in {elapsed:.1f}s ({total / elapsed:.0f} fps)" | |
| ) | |
| return output | |
| def __len__(self) -> int: | |
| return self.num_frames | |
| def __getitem__(self, idx: int | slice) -> NDArray[np.uint8]: | |
| if isinstance(idx, slice): | |
| indices = range(*idx.indices(self.num_frames)) | |
| return np.stack([self.get_frame(i) for i in indices]) | |
| return self.get_frame(idx) | |
| def __iter__(self) -> Iterator[NDArray[np.uint8]]: | |
| for i in range(self.num_frames): | |
| yield self.get_frame(i) | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *args): | |
| self.close() | |
| def close(self) -> None: | |
| """Close the underlying file handle.""" | |
| if hasattr(self, "_file") and self._file and not self._file.closed: | |
| self._file.close() | |
| def __del__(self): | |
| self.close() | |
| def __repr__(self) -> str: | |
| return ( | |
| f"SeqReader('{self.path.name}')\n" | |
| f" Frames: {self.num_frames}\n" | |
| f" Size: {self.header.width}x{self.header.height}\n" | |
| f" Codec: {self.header.codec_name} ({self.header.codec})\n" | |
| f" FPS: {self.header.fps:.2f}\n" | |
| f" Bit depth: {self.header.bit_depth_real}" | |
| ) | |
| def _fmt_size(nbytes: int | float) -> str: | |
| """Format a byte count as a human-readable string.""" | |
| for unit in ("B", "KB", "MB", "GB", "TB"): | |
| if abs(nbytes) < 1024: | |
| return f"{nbytes:.1f} {unit}" if unit != "B" else f"{int(nbytes)} B" | |
| nbytes /= 1024 | |
| return f"{nbytes:.1f} PB" | |
| def _print_input_stats(reader: SeqReader) -> None: | |
| """Print detailed stats about the input .seq file.""" | |
| h = reader.header | |
| file_size = os.path.getsize(reader.path) | |
| raw_frame_bytes = h.width * h.height * (h.bit_depth // 8) | |
| raw_total = raw_frame_bytes * reader.num_frames | |
| duration = reader.num_frames / h.fps if h.fps > 0 else 0 | |
| ts_first = reader.get_timestamp(0) | |
| ts_last = reader.get_timestamp(-1) | |
| print(f"Input: {reader.path}") | |
| print(f" File size: {_fmt_size(file_size)}") | |
| print(f" Frames: {reader.num_frames}") | |
| print(f" Resolution: {h.width} x {h.height}") | |
| print(f" Bit depth: {h.bit_depth_real}-bit") | |
| print(f" Codec: {h.codec_name} ({h.codec})") | |
| print(f" FPS: {h.fps:.2f}") | |
| print(f" Duration: {duration:.1f}s ({duration / 60:.1f} min)") | |
| print(f" Timestamps: {ts_first} -> {ts_last}") | |
| print(f" Raw size: {_fmt_size(raw_total)} " | |
| f"({h.width}x{h.height}x{h.bit_depth // 8} x {reader.num_frames})") | |
| if raw_total > 0: | |
| seq_ratio = file_size / raw_total | |
| print(f" Seq ratio: {seq_ratio:.3f}x ({_fmt_size(file_size)} / {_fmt_size(raw_total)})") | |
| # Benchmark | |
| import time | |
| n_bench = min(100, reader.num_frames) | |
| t0 = time.perf_counter() | |
| for i in range(n_bench): | |
| _ = reader.get_frame(i) | |
| t1 = time.perf_counter() | |
| print(f" Decode speed: {n_bench / (t1 - t0):.0f} fps ({n_bench} frames in {t1 - t0:.3f}s)") | |
| def _print_output_stats( | |
| reader: SeqReader, output: Path, frame_range: tuple[int, int] | None | |
| ) -> None: | |
| """Print stats about the output MP4 and verify frame count.""" | |
| h = reader.header | |
| start = frame_range[0] if frame_range else 0 | |
| end = frame_range[1] if frame_range else reader.num_frames | |
| n_frames = end - start | |
| out_size = os.path.getsize(output) | |
| raw_frame_bytes = h.width * h.height * (h.bit_depth // 8) | |
| raw_total = raw_frame_bytes * n_frames | |
| duration = n_frames / h.fps if h.fps > 0 else 0 | |
| # Compute input size for the same frame range | |
| if h.is_compressed: | |
| # For compressed, sum actual frame sizes from the index | |
| if end <= len(reader.index.offsets): | |
| if end < len(reader.index.offsets): | |
| seq_range_size = reader.index.offsets[end] - reader.index.offsets[start] | |
| else: | |
| seq_range_size = os.path.getsize(reader.path) - reader.index.offsets[start] | |
| else: | |
| seq_range_size = os.path.getsize(reader.path) - HEADER_SIZE | |
| else: | |
| seq_range_size = h.true_image_size * n_frames | |
| print(f"\nOutput: {output}") | |
| print(f" File size: {_fmt_size(out_size)}") | |
| print(f" Frames: {n_frames}" + ( | |
| f" [{start}:{end}]" if frame_range else "")) | |
| print(f" Duration: {duration:.1f}s ({duration / 60:.1f} min)") | |
| if raw_total > 0: | |
| mp4_ratio = out_size / raw_total | |
| print(f" vs raw: {mp4_ratio:.4f}x ({_fmt_size(out_size)} / {_fmt_size(raw_total)})") | |
| if seq_range_size > 0: | |
| vs_seq = out_size / seq_range_size | |
| print(f" vs seq: {vs_seq:.3f}x ({_fmt_size(out_size)} / {_fmt_size(seq_range_size)})") | |
| # Verify frame count | |
| try: | |
| import sleap_io as sio | |
| vid = sio.Video(str(output)) | |
| match = len(vid) == n_frames | |
| print(f" Verification: {len(vid)} frames ({'OK' if match else f'MISMATCH, expected {n_frames}'})") | |
| except Exception: | |
| pass | |
| def main(): | |
| import argparse | |
| parser = argparse.ArgumentParser( | |
| description="Read and convert Norpix .seq video files.", | |
| epilog="Examples:\n" | |
| " uv run -p 3.12 seq_reader.py video.seq # Print info\n" | |
| " uv run -p 3.12 seq_reader.py video.seq --to-mp4 # Convert to MP4\n" | |
| " uv run -p 3.12 seq_reader.py video.seq --to-mp4 --crf 18 # High quality\n" | |
| " uv run -p 3.12 seq_reader.py video.seq --to-mp4 -o out.mp4 # Custom output\n", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| ) | |
| parser.add_argument("input", help="Path to the .seq file") | |
| parser.add_argument( | |
| "--to-mp4", action="store_true", help="Convert to MP4" | |
| ) | |
| parser.add_argument("-o", "--output", help="Output MP4 path (default: <input>.mp4)") | |
| parser.add_argument("--fps", type=float, help="Output frame rate (default: source FPS)") | |
| parser.add_argument( | |
| "--codec", default="libx264", help="Video codec (default: libx264)" | |
| ) | |
| parser.add_argument( | |
| "--crf", type=int, default=25, | |
| help="Constant rate factor, 2-51, lower=better (default: 25)", | |
| ) | |
| parser.add_argument( | |
| "--preset", default="superfast", | |
| help="Encoding preset (default: superfast)", | |
| ) | |
| parser.add_argument( | |
| "--pixelformat", default="yuv420p", | |
| help="Pixel format (default: yuv420p)", | |
| ) | |
| parser.add_argument("--start", type=int, default=0, help="Start frame (default: 0)") | |
| parser.add_argument("--end", type=int, default=None, help="End frame, exclusive (default: all)") | |
| args = parser.parse_args() | |
| with SeqReader(args.input) as reader: | |
| _print_input_stats(reader) | |
| if args.to_mp4: | |
| frame_range = None | |
| if args.start != 0 or args.end is not None: | |
| frame_range = (args.start, args.end or reader.num_frames) | |
| print() | |
| out = reader.to_mp4( | |
| output=args.output, | |
| fps=args.fps, | |
| codec=args.codec, | |
| crf=args.crf, | |
| preset=args.preset, | |
| pixelformat=args.pixelformat, | |
| frame_range=frame_range, | |
| ) | |
| _print_output_stats(reader, out, frame_range) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment