Last active
November 9, 2025 18:10
-
-
Save YunghuiHsu/e5222cf09815833ce9db95a598dd9014 to your computer and use it in GitHub Desktop.
demo for RTP and RTCP Buffer parse with GStreamer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| demo for RTP and RTCP Buffer parse with GStreamer | |
| 1. This example program is designed to work with the deepstream / gstreamer python API | |
| to calculate the latency of an rtsp transfer. | |
| 2. The gstreamer element example is modified from NVIDIA-AI-IOT/deepstream_python_apps/ | |
| deepstream_test1_rtsp_in_rtsp_out.py. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ################################################################################################## | |
| # 1. This example program is designed to work with the deepstream / gstreamer python API | |
| # to calculate the latency of an rtsp transfer. | |
| # 2. The gstreamer element example is modified from NVIDIA-AI-IOT/deepstream_python_apps/ | |
| # deepstream_test1_rtsp_in_rtsp_out.py. | |
| ################################################################################################## | |
| import os | |
| import sys | |
| import glob | |
| import time | |
| from time import ctime | |
| import datetime | |
| import csv | |
| from typing import Union | |
| from functools import partial | |
| from typing import Any, List, Tuple, Union, Optional | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s") | |
| enable_debug = True | |
| from threading import Event, Lock | |
| from pathlib import Path | |
| import gi | |
| gi.require_version("Gst", "1.0") | |
| gi.require_version("GstRtp", "1.0") | |
| from gi.repository import Gst, GstRtp | |
| # Calculate the number of seconds between the NTP and UNIX epochs. | |
| SEC_BETWEEN_EPOCHS = 2208988800 # ( (70 - 17) * 365 + 17 * 366 ) * 24 * 60 * 60 | |
| # Initialize for Log RTSP ---------------------------------------------------------------------------------- | |
| start_time = time.time() | |
| time_start = datetime.datetime.now().strftime("%y%m%d_%H%M%S") | |
| start_time_program = 0 | |
| dir_log = Path("./latency_logs") | |
| dir_log.mkdir(exist_ok=True, parents=True) | |
| file_rtsp = dir_log / f"rtsp_latency_{time_start}.csv" | |
| # time_cols = ["time_elapsed"] | |
| rtcp_header_cols = [] | |
| rtcp_header_cols += ["time_elapsed"] | |
| rtcp_header_cols += [ | |
| "ssrc_rtcp", | |
| "ntp_timestamp", | |
| "rtp_timestamp_rtcp", | |
| "packet_count", | |
| "octet_count", | |
| ] | |
| rtcp_header_cols += ["ntp_timestamp_host"] | |
| rtp_header_cols = [ | |
| "rtp_timestamp_rtp", | |
| "ssrc_rtp", | |
| "payload_type", | |
| "marker", | |
| "sequence", | |
| ] | |
| # rtp_header_cols += ["time_recieve_buffer"] | |
| rtp_header_cols += ["time_recieve_host"] | |
| header_cols = rtcp_header_cols + rtp_header_cols + ["latency"] | |
| # For RTSP Latency ================================= | |
| class RtspProcessor(): | |
| """ | |
| RTSP Processor for handling RTP and RTCP packets. | |
| This class is responsible for fetching, processing, and mapping RTP and RTCP packets | |
| to calculate and log the latency of an RTSP stream. | |
| """ | |
| # BATCH_SIZE = 10 | |
| TIMEOUT_DURATION = 10 # Mapping rtp_timestamp timeout | |
| def __init__(self): | |
| super().__init__() | |
| self.__rtp_data = None | |
| self.ssrc = None | |
| self.latency = 0 | |
| self.__lock = Lock() | |
| def __del__(self): | |
| # self.__thread_pool.shutdown(wait=True) | |
| print("-----==================RTPProcessor del=======================-----") | |
| # Process RTCP Packets ----------------------------------------------------------- | |
| def on_receiving_rtcp_callback(self, session, buffer: Gst.Buffer): | |
| """ | |
| Fetch the NTP and RTP reference Timestamp in the RTCP Sender Report | |
| Map RTCP and RTP packet data to calculate latency. | |
| """ | |
| rtcp_buffer = GstRtp.RTCPBuffer() | |
| if not GstRtp.RTCPBuffer.map(buffer, Gst.MapFlags.READ, rtcp_buffer): | |
| logging.warning( | |
| "\t\t[on_receiving_rtcp_callback] : Unable to map RTCP buffer" | |
| ) | |
| return Gst.PadProbeReturn.PASS | |
| try: | |
| rtcp_packet = GstRtp.RTCPPacket() | |
| if not rtcp_buffer.get_first_packet(rtcp_packet): | |
| logging.warning( | |
| "\t\t[on_receiving_rtcp_callback] : Unable to get the first RTCP packet" | |
| ) | |
| return Gst.PadProbeReturn.PASS | |
| while rtcp_packet: | |
| if rtcp_packet.get_type() == GstRtp.RTCPType.SR: | |
| ntp_timestamp_host = time.time() | |
| time_elaped = time.time() - start_time | |
| rtcp_data = [time_elaped, *rtcp_packet.sr_get_sender_info(), ntp_timestamp_host] | |
| self.ssrc, *_ = rtcp_data | |
| # self.__rtcp_event.set() | |
| info = f"\t[on_receiving_rtcp_callback] Get RTCP Packets. time_elapsed: {time_elaped:4,.1f} sec" | |
| info += " | " | |
| info += ", ".join(f"{col} : {data:,d}" for col, data in zip(rtcp_header_cols[2:4], rtcp_data[2:4])) | |
| print(info) | |
| self.__calculate_latency_from_rtcp(rtcp_data) | |
| if not rtcp_packet.move_to_next(): | |
| break | |
| finally: # Add finally block to ensure buffer is unmapped | |
| GstRtp.RTCPBuffer.unmap(rtcp_buffer) | |
| return Gst.PadProbeReturn.PASS | |
| # Process RTP Packets and Calculate RTSP Latency ----------------------------- | |
| def rtp_depay_sink_pad_buffer_probe( | |
| self, pad: Gst.Pad, info: Gst.PadProbeInfo, u_data: Any | |
| ) -> Gst.PadProbeReturn: | |
| """ | |
| Handle the sink pad buffer probe for RTP depay. | |
| """ | |
| self.__fetch_rtp_buffer(pad, info) | |
| return Gst.PadProbeReturn.OK | |
| def __should_add_to_batch(self, rtp_buffer: GstRtp.RTPBuffer) -> bool: | |
| """ | |
| Determine if the RTP buffer should be added to the batch. | |
| """ | |
| return ( | |
| self.ssrc | |
| and rtp_buffer.get_ssrc() == self.ssrc | |
| and rtp_buffer.get_marker() == 1 | |
| ) | |
| def __fetch_rtp_buffer(self, pad: Gst.Pad, info: Gst.PadProbeInfo) -> None: | |
| """ | |
| Process an RTP buffer and add it to the batch if necessary. | |
| Parameters: | |
| - pad (Gst.Pad): The pad to fetch the buffer from. | |
| - info (Gst.PadProbeInfo): Information about the buffer. | |
| Returns: | |
| - None | |
| """ | |
| buffer = info.get_buffer() | |
| res, rtp_buffer = GstRtp.RTPBuffer.map(buffer, Gst.MapFlags.READ) | |
| time_recieve_host = time.time() | |
| try: | |
| if self.__should_add_to_batch(rtp_buffer): | |
| rtp_data = [ | |
| rtp_buffer.get_timestamp(), | |
| rtp_buffer.get_ssrc(), | |
| rtp_buffer.get_payload_type(), | |
| rtp_buffer.get_marker(), | |
| rtp_buffer.get_seq(), | |
| time_recieve_host, | |
| ] | |
| with self.__lock: | |
| self.__rtp_data = rtp_data | |
| finally: | |
| rtp_buffer.unmap() | |
| def __process_latency_and_log( | |
| self, rtcp_data: List[Union[int, float]], rtp_data: List[Union[int, float]] | |
| ) -> None: | |
| """ | |
| Calculate latency, emit the latency signal, and log the RTSP data. | |
| """ | |
| rtp_timestamp_receive, *_, time_recieve_host = rtp_data | |
| # Extracting RTCP data for clarity | |
| time_elapsd, ssrc, last_ntp_timestamp, last_rtp_timestamp, *_, = rtcp_data | |
| time_sender = ntp2unix(last_ntp_timestamp) | |
| latency = get_rtsp_latency( | |
| time_sender, time_recieve_host, last_rtp_timestamp, rtp_timestamp_receive | |
| ) | |
| self.latency = latency | |
| # Logging the RTSP data | |
| rtsp_data = rtcp_data + rtp_data + [latency] | |
| self.log_rtsp(header_cols, *rtsp_data) | |
| # self.__rtp_buffer_queue.clear() | |
| logging.debug( | |
| f"\t\t[rtp_depay_sink_pad_buffer_probe] Latency: {latency:.3f} ms" | |
| ) | |
| def __calculate_latency_from_rtcp(self, rtcp_data: List[Union[int, float]]) -> None: | |
| """ | |
| Map RTCP data to get latency. | |
| """ | |
| ( | |
| _, | |
| self.ssrc, | |
| last_ntp_timestamp, | |
| last_rtp_timestamp_rtcp, | |
| *_, | |
| ) = rtcp_data | |
| # logging.debug( | |
| # f"\t\t[rtp_depay_sink_pad_buffer_probe] Length of rtp_buffer_queue : {len(self.__rtp_buffer_queue)}" | |
| # ) | |
| start_time_calculate_latency = time.time() | |
| rtp_data_last = None | |
| cnt = 0 | |
| while rtcp_data is not None: | |
| with self.__lock: | |
| rtp_data_tmp = self.__rtp_data | |
| if rtp_data_tmp is None: | |
| # print(F"\t[calculate_latency_from_rtcp] rtp_data is None, skipping iteration", end='\r') | |
| continue # skip to the next iteration of the loop | |
| if (rtp_data_last is not None) and rtp_data_last[0] == rtp_data_tmp[0]: | |
| print(F"\t[calculate_latency_from_rtcp] rtp_data is Same. Update Rtp Data", end='\r') | |
| rtp_data_tmp = self.__rtp_data | |
| rtp_timestamp_receive, *_ = rtp_data_tmp | |
| # print(F"\t\t[calculate_latency_from_rtcp] rtp_timestamp_receive : rtp_timestamp_receive > last_rtp_timestamp_rtcp : {rtp_timestamp_receive > last_rtp_timestamp_rtcp}, cnt:{cnt}", end='\r') | |
| rtp_data_last = rtp_data_tmp | |
| if rtp_timestamp_receive > last_rtp_timestamp_rtcp: | |
| # logging.debug(f"\t\t[calculate_latency_from_rtcp] rtp_timestamp_receive > last_rtp_timestamp_rtcp, cnt: {cnt}") | |
| if cnt >= 1: | |
| info = f"\t[calculate_latency_from_rtcp] Condition met, cnt: {cnt}" | |
| info += " | " | |
| # info += ", ".join(f"{col} : {data:,d}" for col, data in zip(rtp_header_cols[0], rtp_data[0])) | |
| info += f"{rtp_header_cols[0]} : {rtp_data_tmp[0]:,d}, {rtp_header_cols[-1]} : {rtp_data_tmp[-1]:,.2f}" | |
| print(info) | |
| self.__process_latency_and_log(rtcp_data, rtp_data_tmp) | |
| break | |
| cnt += 1 | |
| if time.time() - start_time_calculate_latency > self.TIMEOUT_DURATION: | |
| logging.debug( | |
| f"\t\t[Mapping rtp_timestamp timeout] Wait more than {self.TIMEOUT_DURATION} sec." | |
| ) | |
| break | |
| def log_rtsp(self, header_cols, *data_cols, **kws): | |
| """Log RTSP Data, incluing RTCP and RTP Packets""" | |
| # Check if the file exists, if not initialize it | |
| if not os.path.exists(file_rtsp): | |
| with open(file_rtsp, "w", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerow(header_cols) | |
| # Write Data | |
| with open(file_rtsp, "a", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerow(data_cols) | |
| # RTSP Related Function =============================================================================================== | |
| def get_rtsp_latency( | |
| time_sender: float, | |
| time_receive: float, | |
| rtp_timestamp_sender: Union[str, float, int], | |
| rtp_timestamp_receive: Union[str, float, int], | |
| sample_rate: int = 9e4, | |
| ) -> float: | |
| """ | |
| Calculate RTSP latency based on the provided RTP timestamps and the received timestamp. | |
| Parameters: | |
| - time_sender : float | |
| UNIX timestamp of the sender obtained from the RTCP capture. | |
| - time_receive : float | |
| UNIX timestamp of the received packet obtained from the RTP capture. | |
| - rtp_timestamp_sender : Union[str, float, int] | |
| RTP timestamp from the sender obtained from the RTCP capture. | |
| - rtp_timestamp_receive : Union[str, float, int] | |
| RTP timestamp of the received packet obtained from the RTP capture. | |
| - sample_rate : float | |
| The sample rate used in the RTP stream. Normally 90,000 for video. | |
| Returns: | |
| - float | |
| Mapped UNIX timestamp of the sender considering the RTP timestamp difference. | |
| """ | |
| try: | |
| rtp_ts_diff = float(rtp_timestamp_receive) - float(rtp_timestamp_sender) | |
| if rtp_ts_diff <= 0: | |
| raise ValueError("RTP_Timestamp Error : RTP < RTCP") | |
| time_diff = rtp_ts_diff / sample_rate | |
| time_sender_mapped = time_sender + time_diff | |
| # Assuming ts_received is defined elsewhere or passed as an argument | |
| latency = (time_receive - time_sender_mapped) * 1e3 # second(s) to millisecond(ms) | |
| return latency | |
| except Exception as e: | |
| print(f"RTSP Latency error : {e}") | |
| return None | |
| def ntp32to64(msw: Union[str, int], lsw: Union[str, int]) -> int: | |
| """ | |
| Convert NTP timestamp format from 32 to 64 bit | |
| Parameters: | |
| - 32bit msw: Most significant word (integer part) of the NTP timestamp. | |
| - 32bit lsw: Least significant word (fractional part) of the NTP timestamp. | |
| Returns: | |
| - UNIX timestamp | |
| eg: (3900966726, 895446429) -> 16754524511849639325 | |
| """ | |
| # Calculate the 64-bit integer NTP timestamp | |
| # ≪ is the bitwise left shift operator and | is the bitwise OR operator. | |
| # same with ntp_timestamp_combined = msw + (lsw / 2**32) | |
| ntp_timestamp_combined = (msw << 32) | lsw | |
| return ntp_timestamp_combined | |
| def ntp2unix(ntp_ts_64: Union[str, float, int]) -> float: | |
| """ | |
| Convert NTP timestamp format to UNIX timestamp. | |
| Parameters: | |
| - ntp_ts_64: 64-bit integer NTP timestamp | |
| Returns: | |
| - UNIX timestamp | |
| eg: 16760860267217432274 -> 1693453084.2088041 | |
| """ | |
| # Make sure the timestamp is an integer | |
| ntp_ts_64 = int(ntp_ts_64) | |
| # Extract the integer and fractional parts | |
| # Right-shifting by 32 bits to isolate the most significant 32 bits (msw), | |
| # which represent the integer part of the NTP timestamp. | |
| # eg: abcdefgh >> 32, return abcd0000 | |
| msw = ntp_ts_64 >> 32 | |
| # Bitwise AND with 0xFFFFFFFF to isolate the least significant 32 bits (lsw), | |
| # which represent the fractional part of the NTP timestamp. | |
| # eg: abcdefgh & 0xFFFFFFFF, return 0000efgh | |
| lsw = ntp_ts_64 & 0xFFFFFFFF | |
| # Convert to UNIX epoch by subtracting the seconds offset between NTP and UNIX epochs | |
| unix_timestamp = msw - SEC_BETWEEN_EPOCHS | |
| # Add the fractional part (scaled to a second) | |
| # Scale the least significant 32 bits (lsw) to a fraction of a second | |
| # by dividing it by the maximum value that a 32-bit number can hold (0xFFFFFFFF). | |
| unix_timestamp += lsw / 0xFFFFFFFF | |
| return unix_timestamp | |
| def unix2ntp(unix_timestamp: Union[str, float, int]) -> float: | |
| """ | |
| Convert UNIX timestamp to NTP timestamp format. | |
| Parameters: | |
| - unix_timestamp: UNIX timestamp (seconds since epoch) | |
| Returns: | |
| - NTP timestamp (as a floating-point number, where the integer part represents | |
| the most significant word (seconds since NTP epoch) and the fractional part | |
| represents the least significant word). | |
| """ | |
| # Convert input string to float if necessary | |
| try: | |
| unix_timestamp = float(unix_timestamp) | |
| except ValueError: | |
| raise ValueError( | |
| "Invalid input for UNIX timestamp. It should be convertible to float." | |
| ) | |
| # Convert UNIX timestamp to NTP timestamp by adding the seconds offset between | |
| # NTP and UNIX epochs | |
| ntp_timestamp = unix_timestamp - SEC_BETWEEN_EPOCHS | |
| return ntp_timestamp | |
| def unix_to_local(unix_timestamp: Union[str, float, int]) -> str: | |
| """ | |
| Convert UNIX timestamp to local datetime. | |
| Parameters: | |
| - unix_timestamp: UNIX timestamp (seconds since epoch) | |
| Returns: | |
| - Local datetime in string format as 'YYYY-MM-DD HH:MM:SS' | |
| """ | |
| # Convert input string to float if necessary | |
| try: | |
| unix_timestamp = float(unix_timestamp) | |
| except ValueError: | |
| raise ValueError( | |
| "Invalid input for UNIX timestamp. It should be convertible to float." | |
| ) | |
| # Convert the UNIX timestamp to local datetime and return as formatted string | |
| return datetime.datetime.fromtimestamp(unix_timestamp).strftime( | |
| "%Y-%m-%d %H:%M:%S.%f" | |
| )[:-3] | |
| # Gstreamer Elements for RTSP =========================================================================================== | |
| class CreateSourcebin_func(): | |
| def __init__(self): | |
| super().__init__() | |
| self.rtsp_processor = RtspProcessor() | |
| def __del__(self): | |
| if self.rtsp_processor is not None: | |
| del self.rtsp_processor | |
| self.rtsp_processor = None | |
| def release(self): | |
| if self.rtsp_processor is not None: | |
| del self.rtsp_processor | |
| self.rtsp_processor = None | |
| def new_manager_callback(self, rtspsrc, manager, index, *args): | |
| """ | |
| Manage RTP sessions for accessing RTCP packets | |
| """ | |
| element_name = manager.get_factory().get_name() | |
| # print(f"\t[new_manager_callback] : List element_name '{element_name}-{index}' in Decodebin ") | |
| if element_name != "rtpbin": | |
| print( | |
| f"\t[new_manager_callback] : Manager is of type {element_name}, not rtspsrc. " | |
| ) | |
| return | |
| print(f"\t[new_manager_callback] Find out '{element_name}_{index:02d}'") | |
| # Establish RTP session in src pad for fetching RTCP packets | |
| sinkpad = manager.get_request_pad(f"recv_rtcp_sink_{index:02d}") | |
| if sinkpad: | |
| print( | |
| f"\t[new_manager_callback] Successfully Created '{element_name}_{index:02d}' sinkpad" | |
| ) | |
| else: | |
| sys.stderr.write( | |
| f"\t[new_manager_callback] : Failed '{element_name}_{index:02d}' sinkpad creation. \n" | |
| ) | |
| session = manager.emit("get-internal-session", 0) | |
| if session: | |
| session.connect("on-receiving-rtcp", self.rtsp_processor.on_receiving_rtcp_callback) | |
| print( | |
| f"\t[new_manager_callback] : Successfully Connected 'recv_rtcp_sink_{index:02d}' with RTPSession" | |
| ) | |
| else: | |
| print( | |
| f"\t[new_manager_callback] : Failed 'recv_rtcp_sink_{index:02d}' Connect with RTPSession.\n" | |
| ) | |
| # def create_source_bin(index, uri): | |
| def create_source_bin( | |
| self, | |
| index, | |
| uri, | |
| *args, | |
| **kwargs, | |
| ): | |
| print("Creating source bin") | |
| # Create a source GstBin to abstract this bin's content from the rest of the | |
| # pipeline | |
| bin_name = "source-bin-%02d" % index | |
| print(bin_name) | |
| nbin = Gst.Bin.new(bin_name) | |
| if not nbin: | |
| sys.stderr.write(" Unable to create source bin \n") | |
| # Source element for reading from the uri. | |
| # We will use decodebin and let it figure out the container format of the | |
| # stream and the codec and plug the appropriate demux and decode plugins. | |
| uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin") | |
| if not uri_decode_bin: | |
| sys.stderr.write(" Unable to create uri decode bin \n") | |
| # We set the input uri to the source element | |
| uri_decode_bin.set_property("uri", uri) | |
| # Connect to the "pad-added" signal of the decodebin which generates a | |
| # callback once a new pad for raw data has beed created by the decodebin | |
| uri_decode_bin.connect("pad-added", self.cb_newpad, nbin) | |
| # uri_decode_bin.connect("child-added", self.decodebin_child_added, nbin) | |
| uri_decode_bin.connect("child-added", partial(self.decodebin_child_added, index_val=index), nbin) | |
| # We need to create a ghost pad for the source bin which will act as a proxy | |
| # for the video decoder src pad. The ghost pad will not have a target right | |
| # now. Once the decode bin creates the video decoder and generates the | |
| # cb_newpad callback, we will set the ghost pad target to the video decoder | |
| # src pad. | |
| Gst.Bin.add(nbin, uri_decode_bin) | |
| bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC)) | |
| if not bin_pad: | |
| sys.stderr.write(" Failed to add ghost pad in source bin \n") | |
| return None | |
| return nbin | |
| def cb_newpad(self, decodebin, decoder_src_pad, data): | |
| print("In cb_newpad\n") | |
| caps = decoder_src_pad.get_current_caps() | |
| if not caps: | |
| caps = decoder_src_pad.query_caps() | |
| gststruct = caps.get_structure(0) | |
| gstname = gststruct.get_name() | |
| source_bin = data | |
| features = caps.get_features(0) | |
| # Need to check if the pad created by the decodebin is for video and not | |
| # audio. | |
| print("gstname=", gstname) | |
| if gstname.find("video") != -1: | |
| # Link the decodebin pad only if decodebin has picked nvidia | |
| # decoder plugin nvdec_*. We do this by checking if the pad caps contain | |
| # NVMM memory features. | |
| print("features=", features) | |
| if features.contains("memory:NVMM"): | |
| # Get the source bin ghost pad | |
| bin_ghost_pad = source_bin.get_static_pad("src") | |
| if not bin_ghost_pad.set_target(decoder_src_pad): | |
| sys.stderr.write( | |
| "Failed to link decoder src pad to source bin ghost pad\n" | |
| ) | |
| else: | |
| sys.stderr.write( | |
| " Error: Decodebin did not pick nvidia decoder plugin.\n" | |
| ) | |
| # def decodebin_child_added(child_proxy, Object, name, user_data ): | |
| def decodebin_child_added( | |
| self, child_proxy, Object, name, user_data, *args, **kwargs | |
| ): | |
| # deepstream_player_instance = kwargs.get("deepstream_player_instance") | |
| index = kwargs.get("index_val") | |
| print("[decodebin_child_added] Decodebin child added:", name, "\n") | |
| if name.find("decodebin") != -1: | |
| Object.connect( | |
| "child-added", | |
| partial(self.decodebin_child_added, index=index), | |
| user_data, | |
| ) | |
| # Connect "session manager" in 'rtspsrc' element, which is used to extract RTCP packets. | |
| # index==0, Just add one probe only | |
| if ("source" in name or "src" in name) and (index == 0): | |
| element_type = Object.get_factory().get_name() | |
| print( | |
| f"[decodebin_child_added] Check Element type of '{name}-{index:02d}': '{element_type}'" | |
| ) | |
| if element_type == "rtspsrc": | |
| try: | |
| # Use functools.partial to freeze the 'index' parameter for the callback | |
| # Object.connect("new-manager", new_manager_callback) | |
| Object.connect( | |
| "new-manager", partial(self.new_manager_callback, index=index) | |
| ) | |
| print( | |
| f"[decodebin_child_added] Successfully connected 'on-receiving-rtcp' with '{name}-{index:02d}'." | |
| ) | |
| except Exception as e: | |
| print( | |
| f"[decodebin_child_added] Object.connect Error: {e} with {name}" | |
| ) | |
| # Add Probe for *depay, which is used to extract video from RTP packets. | |
| # uridecodebin is a high-level element that dynamically creates its internal pipeline, including elements like rtph264depay, | |
| # based on the media it's given. This means that the rtph264depay element may not exist until the pipeline is set to the PAUSED | |
| # or PLAYING state and has detected that the incoming stream is H.264 encoded. | |
| # index==0, Just add one probe only | |
| if ("depay") in name and (index == 0): | |
| # if ("depay") in name : | |
| element_type = Object.get_factory().get_name() | |
| print( | |
| f"[decodebin_child_added] Check element_type '{element_type}-{index:02d}'." | |
| ) | |
| depay_sinkpad = Object.get_static_pad("sink") | |
| if depay_sinkpad: | |
| depay_sinkpad.add_probe( | |
| Gst.PadProbeType.BUFFER, | |
| self.rtsp_processor.rtp_depay_sink_pad_buffer_probe, | |
| 0, | |
| ) | |
| # depay_sinkpad.add_probe(Gst.PadProbeType.BUFFER, rtp_depay_sink_pad_buffer_probe, 0) | |
| print( | |
| f"[decodebin_child_added] Probe of 'depay_sinkpad-{index:02d}' added" | |
| ) | |
| else: | |
| sys.stderr.write( | |
| "[decodebin_child_added] Unable to create depay sink pad bin.\n" | |
| ) | |
| # Other helper Function =============================================================================================== | |
| def find_recent_file( | |
| file_prefix: str = "rtp_capture", dir_log: str = "logs_rtsp" | |
| ) -> str: | |
| matching_files = glob.glob(os.path.join(dir_log, f"{file_prefix}*.csv")) | |
| latest_file = max(matching_files, key=os.path.getmtime, default=None) | |
| if latest_file: | |
| print(f"Find the latest_file : {latest_file}") | |
| return latest_file | |
| else: | |
| print(f"No '{file_prefix}_*.csv' files found in the directory.") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment