Skip to content

Instantly share code, notes, and snippets.

@imneonizer
Last active September 26, 2025 07:09
Show Gist options
  • Select an option

  • Save imneonizer/9dbf95bcae47a86c311314b998fcf07a to your computer and use it in GitHub Desktop.

Select an option

Save imneonizer/9dbf95bcae47a86c311314b998fcf07a to your computer and use it in GitHub Desktop.
import av
import cv2
import numpy as np
import time
from collections import deque, OrderedDict
from threading import Thread, Lock, Event
from IPython.display import display, clear_output, Image as DispImage
class RTSPBatchDecoder:
"""
- Accepts a dict: {camera_name: rtsp_uri}
- Maintains latest frames per camera name
- Can return latest frame by camera name
- get_batch(order=...) returns ordered dict of latest frames
- get_tiled() tiles whatever order is given in the batch itself
- store_size: Optional[Tuple[w,h] | Dict[name, (w,h)]]
- output_fps: if provided, throttles BOTH get_batch() and per-stream store rate
- fast_decode: if True, relax some decode filters to save CPU (quality tradeoff)
- NEW: per-stream sequence counters ensure get_batch() only returns True when
at least one requested stream has produced a new frame since last publish.
"""
def __init__(self, sources: dict, output_size=(1920, 1080),
store_size=None, output_fps=None, fast_decode=True):
self.sources = dict(sources)
self.names = list(self.sources.keys())
self.frames = {name: deque(maxlen=1) for name in self.names}
self.output_size = tuple(output_size)
self.black_placeholder = np.zeros((1, 1, 3), dtype=np.uint8)
# store_size can be None, a (w,h) tuple for all, or a per-name dict
if store_size is None or isinstance(store_size, tuple):
self._store_size_global = store_size # None or (w,h)
self._store_size_map = {}
elif isinstance(store_size, dict):
self._store_size_global = None
self._store_size_map = {str(k): tuple(v) for k, v in store_size.items()}
else:
raise TypeError("store_size must be None, a (w,h) tuple, or a dict[name->(w,h)]")
self._lock = Lock()
self._stop = Event()
self._threads = []
# Global output throttle (for get_batch) and per-stream store interval
self._min_batch_interval = 0.0
self._store_interval = 0.0
self._next_batch_due = 0.0
self._next_store_due = {name: 0.0 for name in self.names}
self._fast_decode = bool(fast_decode)
# NEW: per-stream sequence numbers (increment on store) and last-published seq
self._seq = {name: -1 for name in self.names}
self._last_pub_seq = {name: -1 for name in self.names}
if output_fps and output_fps > 0:
interval = 1.0 / float(output_fps)
self._min_batch_interval = interval
self._store_interval = interval
for name, uri in self.sources.items():
t = Thread(target=self._handle_stream, args=(name, uri), daemon=True)
t.start()
self._threads.append(t)
# ---- Runtime controls ----
def set_output_fps(self, fps: float | None):
"""Set global output FPS for both get_batch() and per-stream store pacing."""
with self._lock:
if fps and fps > 0:
interval = 1.0 / float(fps)
self._min_batch_interval = interval
self._store_interval = interval
else:
self._min_batch_interval = 0.0
self._store_interval = 0.0
self._next_batch_due = 0.0
for n in self._next_store_due:
self._next_store_due[n] = 0.0 # allow immediate store
def set_store_size(self, value):
"""Update the store_size behavior at runtime: None | (w,h) | {name: (w,h)}"""
with self._lock:
if value is None or isinstance(value, tuple):
self._store_size_global = value
self._store_size_map = {}
elif isinstance(value, dict):
self._store_size_global = None
self._store_size_map = {str(k): tuple(v) for k, v in value.items()}
else:
raise TypeError("value must be None, a (w,h) tuple, or a dict[name->(w,h)]")
def _get_store_size(self, name: str):
if name in self._store_size_map:
return self._store_size_map[name]
return self._store_size_global
def stop(self, join=True):
self._stop.set()
if join:
for t in self._threads:
t.join(timeout=1.0)
# ---- Stream handling with store pacing ----
def _handle_stream(self, name: str, uri: str,
retry_delay=1.0, retry_backoff=1.5, retry_max=10.0):
delay = retry_delay
while not self._stop.is_set():
try:
container = av.open(uri, timeout=5.0)
stream = container.streams.video[0]
stream.thread_type = "AUTO"
# Optional fast decode knobs (CPU↓, quality↘)
if self._fast_decode and hasattr(stream, "codec_context"):
cc = stream.codec_context
try:
cc.skip_loop_filter = "NONKEY" # skip deblocking on non-keyframes
except Exception:
pass
try:
cc.skip_idct = "NONE"
except Exception:
pass
for frame in container.decode(stream):
if self._stop.is_set():
break
now = time.time()
with self._lock:
due = self._next_store_due.get(name, 0.0)
store_interval = self._store_interval
# Drop early if not time to publish the next frame
if store_interval > 0.0 and now < due:
continue
# Convert only when due
img = frame.to_ndarray(format="bgr24")
# Optional pre-store resize
target = self._get_store_size(name)
if target is not None:
tw, th = target
ih, iw = img.shape[:2]
interp = cv2.INTER_AREA if (iw > tw or ih > th) else cv2.INTER_LINEAR
img = cv2.resize(img, (tw, th), interpolation=interp)
with self._lock:
self.frames[name].append(img)
# NEW: bump sequence id on successful store
self._seq[name] += 1
if store_interval > 0.0:
# schedule next store; avoid drift by basing on 'now'
self._next_store_due[name] = now + store_interval
container.close()
delay = retry_delay
except Exception:
time.sleep(delay)
delay = min(retry_max, delay * retry_backoff)
# ---- Accessors / Output ----
def get_latest(self, name: str):
with self._lock:
dq = self.frames.get(name)
return dq[-1] if dq and len(dq) else None
def get_batch(self, order=None):
"""
Returns (status, batch)
- status=True => at least one requested stream has a NEW frame since last True
- status=False => too early (FPS gate) OR no new frames for requested set
"""
now = time.time()
# Global FPS gate
if self._min_batch_interval > 0.0 and now < self._next_batch_due:
return False, OrderedDict()
with self._lock:
if order:
# Only known names participate in "freshness" check
requested_known = [n for n in order if n in self.frames]
requested_all = list(order) # include unknowns for placeholders
else:
requested_known = list(self.names)
requested_all = list(self.names)
# NEW: check if ANY requested-known stream has a new seq
any_new = any(self._seq[n] != self._last_pub_seq[n] for n in requested_known)
if not any_new:
return False, OrderedDict()
# Build batch after confirming something is new
batch = OrderedDict()
for name in requested_all:
if name in self.frames:
dq = self.frames[name]
batch[name] = dq[-1] if dq and len(dq) else None
else:
# unknown name -> black placeholder
batch[name] = self.black_placeholder
# Mark published seq for requested-known names
for n in requested_known:
self._last_pub_seq[n] = self._seq[n]
if self._min_batch_interval > 0.0:
self._next_batch_due = now + self._min_batch_interval
return True, batch
def get_tiled(self, batch, output_size=None):
w_out, h_out = output_size if output_size else self.output_size
frames = list(batch.values())
n = len(frames)
if n == 0:
return np.zeros((h_out, w_out, 3), dtype=np.uint8)
cols = int(np.ceil(np.sqrt(n)))
rows = int(np.ceil(n / cols))
cell_w = w_out // cols
cell_h = h_out // rows
canvas = np.zeros((h_out, w_out, 3), dtype=np.uint8)
for idx, frame in enumerate(frames):
r = idx // cols
c = idx % cols
y0, y1 = r * cell_h, (r + 1) * cell_h
x0, x1 = c * cell_w, (c + 1) * cell_w
if frame is None:
continue
f_resized = cv2.resize(frame, (cell_w, cell_h))
canvas[y0:y1, x0:x1] = f_resized
return canvas
def imshow(self, frame, jupyter=True):
if jupyter:
ok, jpeg = cv2.imencode('.jpg', frame)
if not ok:
return
clear_output(wait=True)
display(DispImage(data=jpeg.tobytes()))
def control_loop_fps(self, fps: float=30):
"""
Call this at the end (or start) of each while-loop iteration to throttle the
loop to run at <= fps. Example:
while True:
decoder.control_loop_fps(15)
...
"""
if fps <= 0:
return
interval = 1.0 / float(fps)
now = time.time()
# Create/update a per-instance marker for next allowed time
next_due = getattr(self, "_loop_next_due", 0.0)
if now < next_due:
time.sleep(next_due - now)
self._loop_next_due = time.time() + interval
import subprocess, shlex, numpy as np, cv2, sys
class FFmpegRTSPWriter:
"""
Push raw BGR frames to an RTSP server using FFmpeg/libx264.
- url: rtsp publish url, e.g. rtsp://localhost:8554/tiled
- size: (w, h) of frames you will send
- fps: nominal stream fps
- codec: 'libx264' (CPU), 'h264_nvenc' (NVIDIA), 'h264_vaapi' (Intel iGPU), 'h264_v4l2m2m' (Raspberry Pi)
"""
def __init__(self, url: str, size: tuple[int,int], fps: int = 15, codec: str = "libx264", tcp=True):
self.url = url
self.w, self.h = size
self.fps = int(fps)
self.codec = codec
self.proc = None
self.tcp = tcp
def start(self):
cmd = [
"ffmpeg",
"-re", # rate-limit read to fps; keeps timestamps sane for RTSP
"-fflags", "nobuffer",
"-f", "rawvideo",
"-pix_fmt", "bgr24",
"-s", f"{self.w}x{self.h}",
"-r", str(self.fps),
"-i", "-", # stdin
"-an",
]
if self.codec == "libx264":
cmd += ["-c:v", "libx264", "-preset", "veryfast", "-tune", "zerolatency",
"-profile:v", "baseline", "-pix_fmt", "yuv420p", "-g", str(self.fps*2)]
elif self.codec == "h264_nvenc":
cmd += ["-c:v", "h264_nvenc", "-preset", "p4", "-tune", "ll",
"-pix_fmt", "yuv420p", "-g", str(self.fps*2)]
elif self.codec == "h264_vaapi":
# Requires /dev/dri and correct drivers
cmd = ["ffmpeg", "-re", "-hwaccel", "vaapi", "-f", "rawvideo",
"-pix_fmt", "bgr0", "-s", f"{self.w}x{self.h}", "-r", str(self.fps), "-i", "-",
"-an", "-vf", f"format=nv12,hwupload,scale_vaapi=w={self.w}:h={self.h}",
"-c:v", "h264_vaapi", "-g", str(self.fps*2)]
elif self.codec == "h264_v4l2m2m":
cmd += ["-c:v", "h264_v4l2m2m", "-pix_fmt", "yuv420p", "-g", str(self.fps*2)]
else:
raise ValueError(f"Unsupported codec: {self.codec}")
if self.tcp:
cmd += ["-rtsp_transport", "tcp"]
cmd += ["-f", "rtsp", self.url]
self.proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0)
def write(self, frame_bgr: np.ndarray):
if self.proc is None:
raise RuntimeError("call start() first")
# Ensure size matches; resize if needed
if frame_bgr.shape[1] != self.w or frame_bgr.shape[0] != self.h:
frame_bgr = cv2.resize(frame_bgr, (self.w, self.h), interpolation=cv2.INTER_AREA)
# Ensure contiguous BGR24
if not frame_bgr.flags['C_CONTIGUOUS']:
frame_bgr = np.ascontiguousarray(frame_bgr)
try:
self.proc.stdin.write(frame_bgr.tobytes())
except (BrokenPipeError, ValueError):
raise RuntimeError("FFmpeg pipe broken or closed")
def close(self):
if self.proc is not None:
try:
self.proc.stdin.close()
except Exception:
pass
self.proc.terminate()
self.proc.wait(timeout=2)
self.proc = None
sources = {
"first_front_right": "rtsp://abc:def@192.168.0.119:554/cam/realmonitor?channel=1",
"ground_first_right": "rtsp://abc:def@192.168.0.125:554/cam/realmonitor?channel=1",
}
decoder = RTSPBatchDecoder(sources)
pub_w, pub_h = decoder.output_size
publisher = FFmpegRTSPWriter(
url="rtsp://localhost:8554/tiled",
size=(pub_w, pub_h),
fps=30,
codec="libx264", # or 'h264_nvenc' if you have NVIDIA
tcp=True
)
publisher.start()
try:
while True:
status, batch = decoder.get_batch(order=[
"first_front_right",
"ground_first_right",
])
decoder.control_loop_fps(fps=10)
if not status:
continue
tiled = decoder.get_tiled(batch)
decoder.imshow(tiled)
publisher.write(tiled)
except KeyboardInterrupt:
decoder.stop()
publisher.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment