Skip to content

Instantly share code, notes, and snippets.

@YunghuiHsu
Last active November 9, 2025 18:10
Show Gist options
  • Select an option

  • Save YunghuiHsu/e5222cf09815833ce9db95a598dd9014 to your computer and use it in GitHub Desktop.

Select an option

Save YunghuiHsu/e5222cf09815833ce9db95a598dd9014 to your computer and use it in GitHub Desktop.
demo for RTP and RTCP Buffer parse with GStreamer
demo for RTP and RTCP Buffer parse with GStreamer
1. This example program is designed to work with the deepstream / gstreamer python API
to calculate the latency of an rtsp transfer.
2. The gstreamer element example is modified from NVIDIA-AI-IOT/deepstream_python_apps/
deepstream_test1_rtsp_in_rtsp_out.py.
##################################################################################################
# 1. This example program is designed to work with the deepstream / gstreamer python API
# to calculate the latency of an rtsp transfer.
# 2. The gstreamer element example is modified from NVIDIA-AI-IOT/deepstream_python_apps/
# deepstream_test1_rtsp_in_rtsp_out.py.
##################################################################################################
import os
import sys
import glob
import time
from time import ctime
import datetime
import csv
from typing import Union
from functools import partial
from typing import Any, List, Tuple, Union, Optional
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")
enable_debug = True
from threading import Event, Lock
from pathlib import Path
import gi
gi.require_version("Gst", "1.0")
gi.require_version("GstRtp", "1.0")
from gi.repository import Gst, GstRtp
# Calculate the number of seconds between the NTP and UNIX epochs.
SEC_BETWEEN_EPOCHS = 2208988800 # ( (70 - 17) * 365 + 17 * 366 ) * 24 * 60 * 60
# Initialize for Log RTSP ----------------------------------------------------------------------------------
start_time = time.time()
time_start = datetime.datetime.now().strftime("%y%m%d_%H%M%S")
start_time_program = 0
dir_log = Path("./latency_logs")
dir_log.mkdir(exist_ok=True, parents=True)
file_rtsp = dir_log / f"rtsp_latency_{time_start}.csv"
# time_cols = ["time_elapsed"]
rtcp_header_cols = []
rtcp_header_cols += ["time_elapsed"]
rtcp_header_cols += [
"ssrc_rtcp",
"ntp_timestamp",
"rtp_timestamp_rtcp",
"packet_count",
"octet_count",
]
rtcp_header_cols += ["ntp_timestamp_host"]
rtp_header_cols = [
"rtp_timestamp_rtp",
"ssrc_rtp",
"payload_type",
"marker",
"sequence",
]
# rtp_header_cols += ["time_recieve_buffer"]
rtp_header_cols += ["time_recieve_host"]
header_cols = rtcp_header_cols + rtp_header_cols + ["latency"]
# For RTSP Latency =================================
class RtspProcessor():
"""
RTSP Processor for handling RTP and RTCP packets.
This class is responsible for fetching, processing, and mapping RTP and RTCP packets
to calculate and log the latency of an RTSP stream.
"""
# BATCH_SIZE = 10
TIMEOUT_DURATION = 10 # Mapping rtp_timestamp timeout
def __init__(self):
super().__init__()
self.__rtp_data = None
self.ssrc = None
self.latency = 0
self.__lock = Lock()
def __del__(self):
# self.__thread_pool.shutdown(wait=True)
print("-----==================RTPProcessor del=======================-----")
# Process RTCP Packets -----------------------------------------------------------
def on_receiving_rtcp_callback(self, session, buffer: Gst.Buffer):
"""
Fetch the NTP and RTP reference Timestamp in the RTCP Sender Report
Map RTCP and RTP packet data to calculate latency.
"""
rtcp_buffer = GstRtp.RTCPBuffer()
if not GstRtp.RTCPBuffer.map(buffer, Gst.MapFlags.READ, rtcp_buffer):
logging.warning(
"\t\t[on_receiving_rtcp_callback] : Unable to map RTCP buffer"
)
return Gst.PadProbeReturn.PASS
try:
rtcp_packet = GstRtp.RTCPPacket()
if not rtcp_buffer.get_first_packet(rtcp_packet):
logging.warning(
"\t\t[on_receiving_rtcp_callback] : Unable to get the first RTCP packet"
)
return Gst.PadProbeReturn.PASS
while rtcp_packet:
if rtcp_packet.get_type() == GstRtp.RTCPType.SR:
ntp_timestamp_host = time.time()
time_elaped = time.time() - start_time
rtcp_data = [time_elaped, *rtcp_packet.sr_get_sender_info(), ntp_timestamp_host]
self.ssrc, *_ = rtcp_data
# self.__rtcp_event.set()
info = f"\t[on_receiving_rtcp_callback] Get RTCP Packets. time_elapsed: {time_elaped:4,.1f} sec"
info += " | "
info += ", ".join(f"{col} : {data:,d}" for col, data in zip(rtcp_header_cols[2:4], rtcp_data[2:4]))
print(info)
self.__calculate_latency_from_rtcp(rtcp_data)
if not rtcp_packet.move_to_next():
break
finally: # Add finally block to ensure buffer is unmapped
GstRtp.RTCPBuffer.unmap(rtcp_buffer)
return Gst.PadProbeReturn.PASS
# Process RTP Packets and Calculate RTSP Latency -----------------------------
def rtp_depay_sink_pad_buffer_probe(
self, pad: Gst.Pad, info: Gst.PadProbeInfo, u_data: Any
) -> Gst.PadProbeReturn:
"""
Handle the sink pad buffer probe for RTP depay.
"""
self.__fetch_rtp_buffer(pad, info)
return Gst.PadProbeReturn.OK
def __should_add_to_batch(self, rtp_buffer: GstRtp.RTPBuffer) -> bool:
"""
Determine if the RTP buffer should be added to the batch.
"""
return (
self.ssrc
and rtp_buffer.get_ssrc() == self.ssrc
and rtp_buffer.get_marker() == 1
)
def __fetch_rtp_buffer(self, pad: Gst.Pad, info: Gst.PadProbeInfo) -> None:
"""
Process an RTP buffer and add it to the batch if necessary.
Parameters:
- pad (Gst.Pad): The pad to fetch the buffer from.
- info (Gst.PadProbeInfo): Information about the buffer.
Returns:
- None
"""
buffer = info.get_buffer()
res, rtp_buffer = GstRtp.RTPBuffer.map(buffer, Gst.MapFlags.READ)
time_recieve_host = time.time()
try:
if self.__should_add_to_batch(rtp_buffer):
rtp_data = [
rtp_buffer.get_timestamp(),
rtp_buffer.get_ssrc(),
rtp_buffer.get_payload_type(),
rtp_buffer.get_marker(),
rtp_buffer.get_seq(),
time_recieve_host,
]
with self.__lock:
self.__rtp_data = rtp_data
finally:
rtp_buffer.unmap()
def __process_latency_and_log(
self, rtcp_data: List[Union[int, float]], rtp_data: List[Union[int, float]]
) -> None:
"""
Calculate latency, emit the latency signal, and log the RTSP data.
"""
rtp_timestamp_receive, *_, time_recieve_host = rtp_data
# Extracting RTCP data for clarity
time_elapsd, ssrc, last_ntp_timestamp, last_rtp_timestamp, *_, = rtcp_data
time_sender = ntp2unix(last_ntp_timestamp)
latency = get_rtsp_latency(
time_sender, time_recieve_host, last_rtp_timestamp, rtp_timestamp_receive
)
self.latency = latency
# Logging the RTSP data
rtsp_data = rtcp_data + rtp_data + [latency]
self.log_rtsp(header_cols, *rtsp_data)
# self.__rtp_buffer_queue.clear()
logging.debug(
f"\t\t[rtp_depay_sink_pad_buffer_probe] Latency: {latency:.3f} ms"
)
def __calculate_latency_from_rtcp(self, rtcp_data: List[Union[int, float]]) -> None:
"""
Map RTCP data to get latency.
"""
(
_,
self.ssrc,
last_ntp_timestamp,
last_rtp_timestamp_rtcp,
*_,
) = rtcp_data
# logging.debug(
# f"\t\t[rtp_depay_sink_pad_buffer_probe] Length of rtp_buffer_queue : {len(self.__rtp_buffer_queue)}"
# )
start_time_calculate_latency = time.time()
rtp_data_last = None
cnt = 0
while rtcp_data is not None:
with self.__lock:
rtp_data_tmp = self.__rtp_data
if rtp_data_tmp is None:
# print(F"\t[calculate_latency_from_rtcp] rtp_data is None, skipping iteration", end='\r')
continue # skip to the next iteration of the loop
if (rtp_data_last is not None) and rtp_data_last[0] == rtp_data_tmp[0]:
print(F"\t[calculate_latency_from_rtcp] rtp_data is Same. Update Rtp Data", end='\r')
rtp_data_tmp = self.__rtp_data
rtp_timestamp_receive, *_ = rtp_data_tmp
# print(F"\t\t[calculate_latency_from_rtcp] rtp_timestamp_receive : rtp_timestamp_receive > last_rtp_timestamp_rtcp : {rtp_timestamp_receive > last_rtp_timestamp_rtcp}, cnt:{cnt}", end='\r')
rtp_data_last = rtp_data_tmp
if rtp_timestamp_receive > last_rtp_timestamp_rtcp:
# logging.debug(f"\t\t[calculate_latency_from_rtcp] rtp_timestamp_receive > last_rtp_timestamp_rtcp, cnt: {cnt}")
if cnt >= 1:
info = f"\t[calculate_latency_from_rtcp] Condition met, cnt: {cnt}"
info += " | "
# info += ", ".join(f"{col} : {data:,d}" for col, data in zip(rtp_header_cols[0], rtp_data[0]))
info += f"{rtp_header_cols[0]} : {rtp_data_tmp[0]:,d}, {rtp_header_cols[-1]} : {rtp_data_tmp[-1]:,.2f}"
print(info)
self.__process_latency_and_log(rtcp_data, rtp_data_tmp)
break
cnt += 1
if time.time() - start_time_calculate_latency > self.TIMEOUT_DURATION:
logging.debug(
f"\t\t[Mapping rtp_timestamp timeout] Wait more than {self.TIMEOUT_DURATION} sec."
)
break
def log_rtsp(self, header_cols, *data_cols, **kws):
"""Log RTSP Data, incluing RTCP and RTP Packets"""
# Check if the file exists, if not initialize it
if not os.path.exists(file_rtsp):
with open(file_rtsp, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(header_cols)
# Write Data
with open(file_rtsp, "a", newline="") as f:
writer = csv.writer(f)
writer.writerow(data_cols)
# RTSP Related Function ===============================================================================================
def get_rtsp_latency(
time_sender: float,
time_receive: float,
rtp_timestamp_sender: Union[str, float, int],
rtp_timestamp_receive: Union[str, float, int],
sample_rate: int = 9e4,
) -> float:
"""
Calculate RTSP latency based on the provided RTP timestamps and the received timestamp.
Parameters:
- time_sender : float
UNIX timestamp of the sender obtained from the RTCP capture.
- time_receive : float
UNIX timestamp of the received packet obtained from the RTP capture.
- rtp_timestamp_sender : Union[str, float, int]
RTP timestamp from the sender obtained from the RTCP capture.
- rtp_timestamp_receive : Union[str, float, int]
RTP timestamp of the received packet obtained from the RTP capture.
- sample_rate : float
The sample rate used in the RTP stream. Normally 90,000 for video.
Returns:
- float
Mapped UNIX timestamp of the sender considering the RTP timestamp difference.
"""
try:
rtp_ts_diff = float(rtp_timestamp_receive) - float(rtp_timestamp_sender)
if rtp_ts_diff <= 0:
raise ValueError("RTP_Timestamp Error : RTP < RTCP")
time_diff = rtp_ts_diff / sample_rate
time_sender_mapped = time_sender + time_diff
# Assuming ts_received is defined elsewhere or passed as an argument
latency = (time_receive - time_sender_mapped) * 1e3 # second(s) to millisecond(ms)
return latency
except Exception as e:
print(f"RTSP Latency error : {e}")
return None
def ntp32to64(msw: Union[str, int], lsw: Union[str, int]) -> int:
"""
Convert NTP timestamp format from 32 to 64 bit
Parameters:
- 32bit msw: Most significant word (integer part) of the NTP timestamp.
- 32bit lsw: Least significant word (fractional part) of the NTP timestamp.
Returns:
- UNIX timestamp
eg: (3900966726, 895446429) -> 16754524511849639325
"""
# Calculate the 64-bit integer NTP timestamp
# ≪ is the bitwise left shift operator and | is the bitwise OR operator.
# same with ntp_timestamp_combined = msw + (lsw / 2**32)
ntp_timestamp_combined = (msw << 32) | lsw
return ntp_timestamp_combined
def ntp2unix(ntp_ts_64: Union[str, float, int]) -> float:
"""
Convert NTP timestamp format to UNIX timestamp.
Parameters:
- ntp_ts_64: 64-bit integer NTP timestamp
Returns:
- UNIX timestamp
eg: 16760860267217432274 -> 1693453084.2088041
"""
# Make sure the timestamp is an integer
ntp_ts_64 = int(ntp_ts_64)
# Extract the integer and fractional parts
# Right-shifting by 32 bits to isolate the most significant 32 bits (msw),
# which represent the integer part of the NTP timestamp.
# eg: abcdefgh >> 32, return abcd0000
msw = ntp_ts_64 >> 32
# Bitwise AND with 0xFFFFFFFF to isolate the least significant 32 bits (lsw),
# which represent the fractional part of the NTP timestamp.
# eg: abcdefgh & 0xFFFFFFFF, return 0000efgh
lsw = ntp_ts_64 & 0xFFFFFFFF
# Convert to UNIX epoch by subtracting the seconds offset between NTP and UNIX epochs
unix_timestamp = msw - SEC_BETWEEN_EPOCHS
# Add the fractional part (scaled to a second)
# Scale the least significant 32 bits (lsw) to a fraction of a second
# by dividing it by the maximum value that a 32-bit number can hold (0xFFFFFFFF).
unix_timestamp += lsw / 0xFFFFFFFF
return unix_timestamp
def unix2ntp(unix_timestamp: Union[str, float, int]) -> float:
"""
Convert UNIX timestamp to NTP timestamp format.
Parameters:
- unix_timestamp: UNIX timestamp (seconds since epoch)
Returns:
- NTP timestamp (as a floating-point number, where the integer part represents
the most significant word (seconds since NTP epoch) and the fractional part
represents the least significant word).
"""
# Convert input string to float if necessary
try:
unix_timestamp = float(unix_timestamp)
except ValueError:
raise ValueError(
"Invalid input for UNIX timestamp. It should be convertible to float."
)
# Convert UNIX timestamp to NTP timestamp by adding the seconds offset between
# NTP and UNIX epochs
ntp_timestamp = unix_timestamp - SEC_BETWEEN_EPOCHS
return ntp_timestamp
def unix_to_local(unix_timestamp: Union[str, float, int]) -> str:
"""
Convert UNIX timestamp to local datetime.
Parameters:
- unix_timestamp: UNIX timestamp (seconds since epoch)
Returns:
- Local datetime in string format as 'YYYY-MM-DD HH:MM:SS'
"""
# Convert input string to float if necessary
try:
unix_timestamp = float(unix_timestamp)
except ValueError:
raise ValueError(
"Invalid input for UNIX timestamp. It should be convertible to float."
)
# Convert the UNIX timestamp to local datetime and return as formatted string
return datetime.datetime.fromtimestamp(unix_timestamp).strftime(
"%Y-%m-%d %H:%M:%S.%f"
)[:-3]
# Gstreamer Elements for RTSP ===========================================================================================
class CreateSourcebin_func():
def __init__(self):
super().__init__()
self.rtsp_processor = RtspProcessor()
def __del__(self):
if self.rtsp_processor is not None:
del self.rtsp_processor
self.rtsp_processor = None
def release(self):
if self.rtsp_processor is not None:
del self.rtsp_processor
self.rtsp_processor = None
def new_manager_callback(self, rtspsrc, manager, index, *args):
"""
Manage RTP sessions for accessing RTCP packets
"""
element_name = manager.get_factory().get_name()
# print(f"\t[new_manager_callback] : List element_name '{element_name}-{index}' in Decodebin ")
if element_name != "rtpbin":
print(
f"\t[new_manager_callback] : Manager is of type {element_name}, not rtspsrc. "
)
return
print(f"\t[new_manager_callback] Find out '{element_name}_{index:02d}'")
# Establish RTP session in src pad for fetching RTCP packets
sinkpad = manager.get_request_pad(f"recv_rtcp_sink_{index:02d}")
if sinkpad:
print(
f"\t[new_manager_callback] Successfully Created '{element_name}_{index:02d}' sinkpad"
)
else:
sys.stderr.write(
f"\t[new_manager_callback] : Failed '{element_name}_{index:02d}' sinkpad creation. \n"
)
session = manager.emit("get-internal-session", 0)
if session:
session.connect("on-receiving-rtcp", self.rtsp_processor.on_receiving_rtcp_callback)
print(
f"\t[new_manager_callback] : Successfully Connected 'recv_rtcp_sink_{index:02d}' with RTPSession"
)
else:
print(
f"\t[new_manager_callback] : Failed 'recv_rtcp_sink_{index:02d}' Connect with RTPSession.\n"
)
# def create_source_bin(index, uri):
def create_source_bin(
self,
index,
uri,
*args,
**kwargs,
):
print("Creating source bin")
# Create a source GstBin to abstract this bin's content from the rest of the
# pipeline
bin_name = "source-bin-%02d" % index
print(bin_name)
nbin = Gst.Bin.new(bin_name)
if not nbin:
sys.stderr.write(" Unable to create source bin \n")
# Source element for reading from the uri.
# We will use decodebin and let it figure out the container format of the
# stream and the codec and plug the appropriate demux and decode plugins.
uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
if not uri_decode_bin:
sys.stderr.write(" Unable to create uri decode bin \n")
# We set the input uri to the source element
uri_decode_bin.set_property("uri", uri)
# Connect to the "pad-added" signal of the decodebin which generates a
# callback once a new pad for raw data has beed created by the decodebin
uri_decode_bin.connect("pad-added", self.cb_newpad, nbin)
# uri_decode_bin.connect("child-added", self.decodebin_child_added, nbin)
uri_decode_bin.connect("child-added", partial(self.decodebin_child_added, index_val=index), nbin)
# We need to create a ghost pad for the source bin which will act as a proxy
# for the video decoder src pad. The ghost pad will not have a target right
# now. Once the decode bin creates the video decoder and generates the
# cb_newpad callback, we will set the ghost pad target to the video decoder
# src pad.
Gst.Bin.add(nbin, uri_decode_bin)
bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
if not bin_pad:
sys.stderr.write(" Failed to add ghost pad in source bin \n")
return None
return nbin
def cb_newpad(self, decodebin, decoder_src_pad, data):
print("In cb_newpad\n")
caps = decoder_src_pad.get_current_caps()
if not caps:
caps = decoder_src_pad.query_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)
# Need to check if the pad created by the decodebin is for video and not
# audio.
print("gstname=", gstname)
if gstname.find("video") != -1:
# Link the decodebin pad only if decodebin has picked nvidia
# decoder plugin nvdec_*. We do this by checking if the pad caps contain
# NVMM memory features.
print("features=", features)
if features.contains("memory:NVMM"):
# Get the source bin ghost pad
bin_ghost_pad = source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
sys.stderr.write(
"Failed to link decoder src pad to source bin ghost pad\n"
)
else:
sys.stderr.write(
" Error: Decodebin did not pick nvidia decoder plugin.\n"
)
# def decodebin_child_added(child_proxy, Object, name, user_data ):
def decodebin_child_added(
self, child_proxy, Object, name, user_data, *args, **kwargs
):
# deepstream_player_instance = kwargs.get("deepstream_player_instance")
index = kwargs.get("index_val")
print("[decodebin_child_added] Decodebin child added:", name, "\n")
if name.find("decodebin") != -1:
Object.connect(
"child-added",
partial(self.decodebin_child_added, index=index),
user_data,
)
# Connect "session manager" in 'rtspsrc' element, which is used to extract RTCP packets.
# index==0, Just add one probe only
if ("source" in name or "src" in name) and (index == 0):
element_type = Object.get_factory().get_name()
print(
f"[decodebin_child_added] Check Element type of '{name}-{index:02d}': '{element_type}'"
)
if element_type == "rtspsrc":
try:
# Use functools.partial to freeze the 'index' parameter for the callback
# Object.connect("new-manager", new_manager_callback)
Object.connect(
"new-manager", partial(self.new_manager_callback, index=index)
)
print(
f"[decodebin_child_added] Successfully connected 'on-receiving-rtcp' with '{name}-{index:02d}'."
)
except Exception as e:
print(
f"[decodebin_child_added] Object.connect Error: {e} with {name}"
)
# Add Probe for *depay, which is used to extract video from RTP packets.
# uridecodebin is a high-level element that dynamically creates its internal pipeline, including elements like rtph264depay,
# based on the media it's given. This means that the rtph264depay element may not exist until the pipeline is set to the PAUSED
# or PLAYING state and has detected that the incoming stream is H.264 encoded.
# index==0, Just add one probe only
if ("depay") in name and (index == 0):
# if ("depay") in name :
element_type = Object.get_factory().get_name()
print(
f"[decodebin_child_added] Check element_type '{element_type}-{index:02d}'."
)
depay_sinkpad = Object.get_static_pad("sink")
if depay_sinkpad:
depay_sinkpad.add_probe(
Gst.PadProbeType.BUFFER,
self.rtsp_processor.rtp_depay_sink_pad_buffer_probe,
0,
)
# depay_sinkpad.add_probe(Gst.PadProbeType.BUFFER, rtp_depay_sink_pad_buffer_probe, 0)
print(
f"[decodebin_child_added] Probe of 'depay_sinkpad-{index:02d}' added"
)
else:
sys.stderr.write(
"[decodebin_child_added] Unable to create depay sink pad bin.\n"
)
# Other helper Function ===============================================================================================
def find_recent_file(
file_prefix: str = "rtp_capture", dir_log: str = "logs_rtsp"
) -> str:
matching_files = glob.glob(os.path.join(dir_log, f"{file_prefix}*.csv"))
latest_file = max(matching_files, key=os.path.getmtime, default=None)
if latest_file:
print(f"Find the latest_file : {latest_file}")
return latest_file
else:
print(f"No '{file_prefix}_*.csv' files found in the directory.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment