Last active
September 26, 2025 07:09
-
-
Save imneonizer/9dbf95bcae47a86c311314b998fcf07a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import av | |
| import cv2 | |
| import numpy as np | |
| import time | |
| from collections import deque, OrderedDict | |
| from threading import Thread, Lock, Event | |
| from IPython.display import display, clear_output, Image as DispImage | |
| class RTSPBatchDecoder: | |
| """ | |
| - Accepts a dict: {camera_name: rtsp_uri} | |
| - Maintains latest frames per camera name | |
| - Can return latest frame by camera name | |
| - get_batch(order=...) returns ordered dict of latest frames | |
| - get_tiled() tiles whatever order is given in the batch itself | |
| - store_size: Optional[Tuple[w,h] | Dict[name, (w,h)]] | |
| - output_fps: if provided, throttles BOTH get_batch() and per-stream store rate | |
| - fast_decode: if True, relax some decode filters to save CPU (quality tradeoff) | |
| - NEW: per-stream sequence counters ensure get_batch() only returns True when | |
| at least one requested stream has produced a new frame since last publish. | |
| """ | |
| def __init__(self, sources: dict, output_size=(1920, 1080), | |
| store_size=None, output_fps=None, fast_decode=True): | |
| self.sources = dict(sources) | |
| self.names = list(self.sources.keys()) | |
| self.frames = {name: deque(maxlen=1) for name in self.names} | |
| self.output_size = tuple(output_size) | |
| self.black_placeholder = np.zeros((1, 1, 3), dtype=np.uint8) | |
| # store_size can be None, a (w,h) tuple for all, or a per-name dict | |
| if store_size is None or isinstance(store_size, tuple): | |
| self._store_size_global = store_size # None or (w,h) | |
| self._store_size_map = {} | |
| elif isinstance(store_size, dict): | |
| self._store_size_global = None | |
| self._store_size_map = {str(k): tuple(v) for k, v in store_size.items()} | |
| else: | |
| raise TypeError("store_size must be None, a (w,h) tuple, or a dict[name->(w,h)]") | |
| self._lock = Lock() | |
| self._stop = Event() | |
| self._threads = [] | |
| # Global output throttle (for get_batch) and per-stream store interval | |
| self._min_batch_interval = 0.0 | |
| self._store_interval = 0.0 | |
| self._next_batch_due = 0.0 | |
| self._next_store_due = {name: 0.0 for name in self.names} | |
| self._fast_decode = bool(fast_decode) | |
| # NEW: per-stream sequence numbers (increment on store) and last-published seq | |
| self._seq = {name: -1 for name in self.names} | |
| self._last_pub_seq = {name: -1 for name in self.names} | |
| if output_fps and output_fps > 0: | |
| interval = 1.0 / float(output_fps) | |
| self._min_batch_interval = interval | |
| self._store_interval = interval | |
| for name, uri in self.sources.items(): | |
| t = Thread(target=self._handle_stream, args=(name, uri), daemon=True) | |
| t.start() | |
| self._threads.append(t) | |
| # ---- Runtime controls ---- | |
| def set_output_fps(self, fps: float | None): | |
| """Set global output FPS for both get_batch() and per-stream store pacing.""" | |
| with self._lock: | |
| if fps and fps > 0: | |
| interval = 1.0 / float(fps) | |
| self._min_batch_interval = interval | |
| self._store_interval = interval | |
| else: | |
| self._min_batch_interval = 0.0 | |
| self._store_interval = 0.0 | |
| self._next_batch_due = 0.0 | |
| for n in self._next_store_due: | |
| self._next_store_due[n] = 0.0 # allow immediate store | |
| def set_store_size(self, value): | |
| """Update the store_size behavior at runtime: None | (w,h) | {name: (w,h)}""" | |
| with self._lock: | |
| if value is None or isinstance(value, tuple): | |
| self._store_size_global = value | |
| self._store_size_map = {} | |
| elif isinstance(value, dict): | |
| self._store_size_global = None | |
| self._store_size_map = {str(k): tuple(v) for k, v in value.items()} | |
| else: | |
| raise TypeError("value must be None, a (w,h) tuple, or a dict[name->(w,h)]") | |
| def _get_store_size(self, name: str): | |
| if name in self._store_size_map: | |
| return self._store_size_map[name] | |
| return self._store_size_global | |
| def stop(self, join=True): | |
| self._stop.set() | |
| if join: | |
| for t in self._threads: | |
| t.join(timeout=1.0) | |
| # ---- Stream handling with store pacing ---- | |
| def _handle_stream(self, name: str, uri: str, | |
| retry_delay=1.0, retry_backoff=1.5, retry_max=10.0): | |
| delay = retry_delay | |
| while not self._stop.is_set(): | |
| try: | |
| container = av.open(uri, timeout=5.0) | |
| stream = container.streams.video[0] | |
| stream.thread_type = "AUTO" | |
| # Optional fast decode knobs (CPU↓, quality↘) | |
| if self._fast_decode and hasattr(stream, "codec_context"): | |
| cc = stream.codec_context | |
| try: | |
| cc.skip_loop_filter = "NONKEY" # skip deblocking on non-keyframes | |
| except Exception: | |
| pass | |
| try: | |
| cc.skip_idct = "NONE" | |
| except Exception: | |
| pass | |
| for frame in container.decode(stream): | |
| if self._stop.is_set(): | |
| break | |
| now = time.time() | |
| with self._lock: | |
| due = self._next_store_due.get(name, 0.0) | |
| store_interval = self._store_interval | |
| # Drop early if not time to publish the next frame | |
| if store_interval > 0.0 and now < due: | |
| continue | |
| # Convert only when due | |
| img = frame.to_ndarray(format="bgr24") | |
| # Optional pre-store resize | |
| target = self._get_store_size(name) | |
| if target is not None: | |
| tw, th = target | |
| ih, iw = img.shape[:2] | |
| interp = cv2.INTER_AREA if (iw > tw or ih > th) else cv2.INTER_LINEAR | |
| img = cv2.resize(img, (tw, th), interpolation=interp) | |
| with self._lock: | |
| self.frames[name].append(img) | |
| # NEW: bump sequence id on successful store | |
| self._seq[name] += 1 | |
| if store_interval > 0.0: | |
| # schedule next store; avoid drift by basing on 'now' | |
| self._next_store_due[name] = now + store_interval | |
| container.close() | |
| delay = retry_delay | |
| except Exception: | |
| time.sleep(delay) | |
| delay = min(retry_max, delay * retry_backoff) | |
| # ---- Accessors / Output ---- | |
| def get_latest(self, name: str): | |
| with self._lock: | |
| dq = self.frames.get(name) | |
| return dq[-1] if dq and len(dq) else None | |
| def get_batch(self, order=None): | |
| """ | |
| Returns (status, batch) | |
| - status=True => at least one requested stream has a NEW frame since last True | |
| - status=False => too early (FPS gate) OR no new frames for requested set | |
| """ | |
| now = time.time() | |
| # Global FPS gate | |
| if self._min_batch_interval > 0.0 and now < self._next_batch_due: | |
| return False, OrderedDict() | |
| with self._lock: | |
| if order: | |
| # Only known names participate in "freshness" check | |
| requested_known = [n for n in order if n in self.frames] | |
| requested_all = list(order) # include unknowns for placeholders | |
| else: | |
| requested_known = list(self.names) | |
| requested_all = list(self.names) | |
| # NEW: check if ANY requested-known stream has a new seq | |
| any_new = any(self._seq[n] != self._last_pub_seq[n] for n in requested_known) | |
| if not any_new: | |
| return False, OrderedDict() | |
| # Build batch after confirming something is new | |
| batch = OrderedDict() | |
| for name in requested_all: | |
| if name in self.frames: | |
| dq = self.frames[name] | |
| batch[name] = dq[-1] if dq and len(dq) else None | |
| else: | |
| # unknown name -> black placeholder | |
| batch[name] = self.black_placeholder | |
| # Mark published seq for requested-known names | |
| for n in requested_known: | |
| self._last_pub_seq[n] = self._seq[n] | |
| if self._min_batch_interval > 0.0: | |
| self._next_batch_due = now + self._min_batch_interval | |
| return True, batch | |
| def get_tiled(self, batch, output_size=None): | |
| w_out, h_out = output_size if output_size else self.output_size | |
| frames = list(batch.values()) | |
| n = len(frames) | |
| if n == 0: | |
| return np.zeros((h_out, w_out, 3), dtype=np.uint8) | |
| cols = int(np.ceil(np.sqrt(n))) | |
| rows = int(np.ceil(n / cols)) | |
| cell_w = w_out // cols | |
| cell_h = h_out // rows | |
| canvas = np.zeros((h_out, w_out, 3), dtype=np.uint8) | |
| for idx, frame in enumerate(frames): | |
| r = idx // cols | |
| c = idx % cols | |
| y0, y1 = r * cell_h, (r + 1) * cell_h | |
| x0, x1 = c * cell_w, (c + 1) * cell_w | |
| if frame is None: | |
| continue | |
| f_resized = cv2.resize(frame, (cell_w, cell_h)) | |
| canvas[y0:y1, x0:x1] = f_resized | |
| return canvas | |
| def imshow(self, frame, jupyter=True): | |
| if jupyter: | |
| ok, jpeg = cv2.imencode('.jpg', frame) | |
| if not ok: | |
| return | |
| clear_output(wait=True) | |
| display(DispImage(data=jpeg.tobytes())) | |
| def control_loop_fps(self, fps: float=30): | |
| """ | |
| Call this at the end (or start) of each while-loop iteration to throttle the | |
| loop to run at <= fps. Example: | |
| while True: | |
| decoder.control_loop_fps(15) | |
| ... | |
| """ | |
| if fps <= 0: | |
| return | |
| interval = 1.0 / float(fps) | |
| now = time.time() | |
| # Create/update a per-instance marker for next allowed time | |
| next_due = getattr(self, "_loop_next_due", 0.0) | |
| if now < next_due: | |
| time.sleep(next_due - now) | |
| self._loop_next_due = time.time() + interval | |
| import subprocess, shlex, numpy as np, cv2, sys | |
| class FFmpegRTSPWriter: | |
| """ | |
| Push raw BGR frames to an RTSP server using FFmpeg/libx264. | |
| - url: rtsp publish url, e.g. rtsp://localhost:8554/tiled | |
| - size: (w, h) of frames you will send | |
| - fps: nominal stream fps | |
| - codec: 'libx264' (CPU), 'h264_nvenc' (NVIDIA), 'h264_vaapi' (Intel iGPU), 'h264_v4l2m2m' (Raspberry Pi) | |
| """ | |
| def __init__(self, url: str, size: tuple[int,int], fps: int = 15, codec: str = "libx264", tcp=True): | |
| self.url = url | |
| self.w, self.h = size | |
| self.fps = int(fps) | |
| self.codec = codec | |
| self.proc = None | |
| self.tcp = tcp | |
| def start(self): | |
| cmd = [ | |
| "ffmpeg", | |
| "-re", # rate-limit read to fps; keeps timestamps sane for RTSP | |
| "-fflags", "nobuffer", | |
| "-f", "rawvideo", | |
| "-pix_fmt", "bgr24", | |
| "-s", f"{self.w}x{self.h}", | |
| "-r", str(self.fps), | |
| "-i", "-", # stdin | |
| "-an", | |
| ] | |
| if self.codec == "libx264": | |
| cmd += ["-c:v", "libx264", "-preset", "veryfast", "-tune", "zerolatency", | |
| "-profile:v", "baseline", "-pix_fmt", "yuv420p", "-g", str(self.fps*2)] | |
| elif self.codec == "h264_nvenc": | |
| cmd += ["-c:v", "h264_nvenc", "-preset", "p4", "-tune", "ll", | |
| "-pix_fmt", "yuv420p", "-g", str(self.fps*2)] | |
| elif self.codec == "h264_vaapi": | |
| # Requires /dev/dri and correct drivers | |
| cmd = ["ffmpeg", "-re", "-hwaccel", "vaapi", "-f", "rawvideo", | |
| "-pix_fmt", "bgr0", "-s", f"{self.w}x{self.h}", "-r", str(self.fps), "-i", "-", | |
| "-an", "-vf", f"format=nv12,hwupload,scale_vaapi=w={self.w}:h={self.h}", | |
| "-c:v", "h264_vaapi", "-g", str(self.fps*2)] | |
| elif self.codec == "h264_v4l2m2m": | |
| cmd += ["-c:v", "h264_v4l2m2m", "-pix_fmt", "yuv420p", "-g", str(self.fps*2)] | |
| else: | |
| raise ValueError(f"Unsupported codec: {self.codec}") | |
| if self.tcp: | |
| cmd += ["-rtsp_transport", "tcp"] | |
| cmd += ["-f", "rtsp", self.url] | |
| self.proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0) | |
| def write(self, frame_bgr: np.ndarray): | |
| if self.proc is None: | |
| raise RuntimeError("call start() first") | |
| # Ensure size matches; resize if needed | |
| if frame_bgr.shape[1] != self.w or frame_bgr.shape[0] != self.h: | |
| frame_bgr = cv2.resize(frame_bgr, (self.w, self.h), interpolation=cv2.INTER_AREA) | |
| # Ensure contiguous BGR24 | |
| if not frame_bgr.flags['C_CONTIGUOUS']: | |
| frame_bgr = np.ascontiguousarray(frame_bgr) | |
| try: | |
| self.proc.stdin.write(frame_bgr.tobytes()) | |
| except (BrokenPipeError, ValueError): | |
| raise RuntimeError("FFmpeg pipe broken or closed") | |
| def close(self): | |
| if self.proc is not None: | |
| try: | |
| self.proc.stdin.close() | |
| except Exception: | |
| pass | |
| self.proc.terminate() | |
| self.proc.wait(timeout=2) | |
| self.proc = None | |
| sources = { | |
| "first_front_right": "rtsp://abc:def@192.168.0.119:554/cam/realmonitor?channel=1", | |
| "ground_first_right": "rtsp://abc:def@192.168.0.125:554/cam/realmonitor?channel=1", | |
| } | |
| decoder = RTSPBatchDecoder(sources) | |
| pub_w, pub_h = decoder.output_size | |
| publisher = FFmpegRTSPWriter( | |
| url="rtsp://localhost:8554/tiled", | |
| size=(pub_w, pub_h), | |
| fps=30, | |
| codec="libx264", # or 'h264_nvenc' if you have NVIDIA | |
| tcp=True | |
| ) | |
| publisher.start() | |
| try: | |
| while True: | |
| status, batch = decoder.get_batch(order=[ | |
| "first_front_right", | |
| "ground_first_right", | |
| ]) | |
| decoder.control_loop_fps(fps=10) | |
| if not status: | |
| continue | |
| tiled = decoder.get_tiled(batch) | |
| decoder.imshow(tiled) | |
| publisher.write(tiled) | |
| except KeyboardInterrupt: | |
| decoder.stop() | |
| publisher.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment