Skip to content

Instantly share code, notes, and snippets.

@RasmusRynell
Created March 28, 2025 13:35
Show Gist options
  • Select an option

  • Save RasmusRynell/f96a823b168a2c22e37e9fec4af03d3d to your computer and use it in GitHub Desktop.

Select an option

Save RasmusRynell/f96a823b168a2c22e37e9fec4af03d3d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
GStreamer & GLib Multiprocessing/Threading Template with RTSP Data Relay
-------------------------------------------------------------------------
This template demonstrates how to safely run GStreamer pipelines using
GLib.MainLoop/GLib.MainContext in Python. It covers:
- Initialization of GStreamer in the proper process/thread.
- Creating a dedicated GLib.MainContext and MainLoop for each pipeline.
- Handling GStreamer bus messages (EOS, ERROR) to trigger a graceful shutdown.
- Best practices for running pipelines in separate threads and processes.
- Detailed cleanup and shutdown procedures to avoid lingering threads,
inherited file descriptors, and deadlocks.
- Additional logging to trace state transitions and process/thread boundaries.
- Safely relaying data (e.g. from an RTSP stream via appsink) back to the main process.
Key Points:
- **Initialization:** Always call Gst.init(None) in the process that runs the pipeline.
- **Main Context Isolation:** Create a new GLib.MainContext so that GLib sources (e.g. bus watches)
are tied to the correct event loop.
- **Shutdown Sequence:** On EOS or ERROR, send an EOS event, quit the main loop, set the pipeline
to NULL (which stops internal GStreamer threads), and remove any signal watches.
- **Multiprocessing & Spawn Start Method:**
- Use multiprocessing.set_start_method('spawn', force=True) to avoid inheriting GLib state from the parent.
- Drawbacks: The 'spawn' method has higher startup overhead compared to 'fork'. In large projects,
you may need to weigh this cost against the benefits of isolation.
- **Thread vs Process:** Avoid mixing shared GStreamer or GLib objects between threads and processes.
- **Data Relay:** When using an RTSP pipeline, use an appsink to capture output and a multiprocessing.Queue
to send simple, pickleable data (e.g. frames or metadata) back to the main process. This avoids sharing
complex objects and ensures that the main process can handle data as needed.
This template is intended as a starting point; adjust the timeouts, logging, and shutdown logic as needed
for your application.
"""
import threading
import multiprocessing
import time
import os
import datetime
import gi
# Ensure we have the correct version of GStreamer bindings
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
def log(message):
"""Simple logging function with process ID and timestamp."""
now = datetime.datetime.now().strftime("%H:%M:%S.%f")
print(f"[{os.getpid()}] {now} - {message}")
###############################################################################
# Basic Video Pipeline (No Data Relay)
###############################################################################
class VideoPipeline:
def __init__(self, pipeline_description):
"""
Initialize the GStreamer pipeline and set up a dedicated GLib MainLoop.
"""
# IMPORTANT: Call Gst.init() in the process that uses the pipeline.
Gst.init(None)
log("Initializing VideoPipeline")
# Create the pipeline from a textual description.
self.pipeline = Gst.parse_launch(pipeline_description)
# Create a dedicated GLib MainContext and MainLoop for this pipeline.
self.context = GLib.MainContext()
self.loop = GLib.MainLoop.new(self.context, False)
self.stop_event = threading.Event()
# Set up a bus watch to monitor pipeline messages (EOS, ERROR, etc.)
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect("message", self.on_message)
log("VideoPipeline initialized")
def on_message(self, bus, message):
"""
Handle GStreamer bus messages.
"""
msg_type = message.type
log(f"Bus message received: {msg_type}")
if msg_type == Gst.MessageType.ERROR:
err, dbg = message.parse_error()
log(f"ERROR: {err.message}, Debug: {dbg}")
self.stop()
elif msg_type == Gst.MessageType.EOS:
log("EOS received")
self.stop()
def run(self):
"""
Run the GStreamer pipeline using the dedicated GLib MainLoop.
"""
log("Setting pipeline state to PLAYING")
state_change_return = self.pipeline.set_state(Gst.State.PLAYING)
log(f"Set state to PLAYING, return value: {state_change_return}")
start_time = time.time()
# Use our custom context for any GLib operations in this thread.
GLib.MainContext.push_thread_default(self.context)
try:
self.loop.run() # Blocks until loop.quit() is called.
finally:
GLib.MainContext.pop_thread_default(self.context)
elapsed = time.time() - start_time
log(f"Exiting main loop after {elapsed:.2f} seconds")
# Transition pipeline to NULL to ensure all GStreamer threads are terminated.
log("Setting pipeline state to NULL")
state_change_return_null = self.pipeline.set_state(Gst.State.NULL)
log(f"Set state to NULL, return value: {state_change_return_null}")
# Remove the bus watch to avoid late callbacks.
self.bus.remove_signal_watch()
def stop(self):
"""
Request a graceful shutdown of the pipeline.
"""
if not self.stop_event.is_set():
log("Initiating pipeline shutdown")
self.stop_event.set()
# Sending EOS allows internal elements to finish cleanly.
self.pipeline.send_event(Gst.Event.new_eos())
# Quit the main loop so that run() can exit.
self.loop.quit()
else:
log("stop() called but shutdown already in progress")
###############################################################################
# RTSP Pipeline with Data Relay (via appsink and multiprocessing.Queue)
###############################################################################
class RTSPPipeline:
def __init__(self, pipeline_description, data_queue):
"""
Initialize the RTSP GStreamer pipeline, configure an appsink to
emit signals for new samples, and set up a dedicated GLib MainLoop.
"""
Gst.init(None)
log("Initializing RTSPPipeline")
# Create the pipeline from the description.
self.pipeline = Gst.parse_launch(pipeline_description)
# Retrieve the appsink element (must be named "appsink" in the pipeline).
self.appsink = self.pipeline.get_by_name("appsink")
if self.appsink is None:
log("Warning: No appsink element found; data cannot be relayed")
else:
# Enable signals and connect the callback.
self.appsink.set_property("emit-signals", True)
self.appsink.connect("new-sample", self.on_new_sample, data_queue)
self.context = GLib.MainContext()
self.loop = GLib.MainLoop.new(self.context, False)
self.stop_event = threading.Event()
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect("message", self.on_message)
log("RTSPPipeline initialized")
def on_new_sample(self, sink, data_queue):
"""
Called when a new sample is available at the appsink.
Relay simple, pickleable data (e.g. metadata or frame info) back to the main process.
"""
sample = sink.emit("pull-sample")
if sample:
# Here, perform any conversion necessary. For demonstration, we send a simple dict.
data = {"timestamp": time.time(), "info": "sample data"}
try:
data_queue.put(data, block=False)
except Exception as e:
log(f"Queue put error: {e}")
return Gst.FlowReturn.OK
def on_message(self, bus, message):
msg_type = message.type
log(f"Bus message received: {msg_type}")
if msg_type == Gst.MessageType.ERROR:
err, dbg = message.parse_error()
log(f"ERROR: {err.message}, Debug: {dbg}")
self.stop()
elif msg_type == Gst.MessageType.EOS:
log("EOS received")
self.stop()
def run(self):
log("Setting RTSP pipeline state to PLAYING")
self.pipeline.set_state(Gst.State.PLAYING)
GLib.MainContext.push_thread_default(self.context)
try:
self.loop.run()
finally:
GLib.MainContext.pop_thread_default(self.context)
log("Exiting RTSP main loop")
self.pipeline.set_state(Gst.State.NULL)
self.bus.remove_signal_watch()
def stop(self):
if not self.stop_event.is_set():
log("Initiating RTSP pipeline shutdown")
self.stop_event.set()
self.pipeline.send_event(Gst.Event.new_eos())
self.loop.quit()
else:
log("RTSP stop() called but shutdown already in progress")
###############################################################################
# Example Runner Functions
###############################################################################
def run_pipeline_in_thread(pipeline_description, duration_sec=5):
"""
Run a VideoPipeline in a separate thread.
"""
log("[THREAD] Starting pipeline in thread")
vp = VideoPipeline(pipeline_description)
thread = threading.Thread(target=vp.run)
thread.start()
time.sleep(duration_sec)
log("[THREAD] Requesting pipeline stop from thread")
vp.stop()
thread.join()
log("[THREAD] Pipeline thread has been stopped")
def pipeline_process_main(pipeline_description, duration_sec):
"""
Main function to run inside a separate process for a VideoPipeline.
"""
log("[PROCESS] Entered child process for pipeline")
vp = VideoPipeline(pipeline_description)
stopper = threading.Thread(target=lambda: (time.sleep(duration_sec), vp.stop()))
stopper.start()
run_start = time.time()
vp.run()
stopper.join()
elapsed_run = time.time() - run_start
log(f"[PROCESS] Pipeline run finished after {elapsed_run:.2f} seconds")
def run_pipeline_in_process(pipeline_description, duration_sec=5):
"""
Launch a VideoPipeline in a separate process.
"""
log("[MAIN] [PROCESS] Starting pipeline process")
p = multiprocessing.Process(
target=pipeline_process_main,
args=(pipeline_description, duration_sec)
)
p.start()
p.join(timeout=duration_sec + 10)
if p.is_alive():
log("[MAIN] [PROCESS] ERROR: Process still alive after timeout. Forcing termination.")
p.terminate()
p.join()
log("[MAIN] [PROCESS] Pipeline process has ended")
def run_multiple_processes_simultaneously(pipeline_description, num_procs=3, duration_sec=5):
"""
Run multiple VideoPipeline processes concurrently.
"""
log(f"[MAIN] Launching {num_procs} parallel pipeline processes")
processes = []
for i in range(num_procs):
p = multiprocessing.Process(
target=pipeline_process_main,
args=(pipeline_description, duration_sec)
)
p.start()
processes.append(p)
for i, p in enumerate(processes):
p.join(timeout=duration_sec + 10)
if p.is_alive():
log(f"[MAIN] [PROCESS-{i}] ERROR: Still alive after timeout. Forcing termination.")
p.terminate()
p.join()
log(f"[MAIN] [PROCESS-{i}] Finished successfully")
def run_mixed_concurrent(pipeline_description, duration_sec=5):
"""
Run a mix of a threaded pipeline and multiple process pipelines concurrently.
"""
log("[MAIN] Launching mixed concurrent pipelines (thread + processes)")
thread = threading.Thread(target=run_pipeline_in_thread, args=(pipeline_description, duration_sec))
thread.start()
processes = []
for i in range(2):
p = multiprocessing.Process(
target=pipeline_process_main,
args=(pipeline_description, duration_sec)
)
p.start()
processes.append(p)
thread.join()
for i, p in enumerate(processes):
p.join(timeout=duration_sec + 10)
if p.is_alive():
log(f"[MAIN] [PROCESS-{i}] ERROR: Still alive after timeout. Forcing termination.")
p.terminate()
p.join()
log(f"[MAIN] [PROCESS-{i}] Finished successfully")
def rtsp_pipeline_process_main(pipeline_description, duration_sec, data_queue):
"""
Main function to run inside a separate process for an RTSPPipeline.
"""
log("[PROCESS] Entered child process for RTSP pipeline")
rtsp = RTSPPipeline(pipeline_description, data_queue)
stopper = threading.Thread(target=lambda: (time.sleep(duration_sec), rtsp.stop()))
stopper.start()
run_start = time.time()
rtsp.run()
stopper.join()
elapsed_run = time.time() - run_start
log(f"[PROCESS] RTSP Pipeline run finished after {elapsed_run:.2f} seconds")
def run_rtsp_pipeline_in_process(pipeline_description, duration_sec=5):
"""
Launch an RTSPPipeline in a separate process with a data relay queue.
"""
log("[MAIN] [PROCESS] Starting RTSP pipeline process")
data_queue = multiprocessing.Queue()
p = multiprocessing.Process(
target=rtsp_pipeline_process_main,
args=(pipeline_description, duration_sec, data_queue)
)
p.start()
# Main process handles data produced by the RTSP pipeline.
try:
while p.is_alive() or not data_queue.empty():
try:
data = data_queue.get(timeout=1)
log(f"[MAIN] Received RTSP data: {data}")
# Process the data as needed.
except Exception:
pass
except KeyboardInterrupt:
log("[MAIN] KeyboardInterrupt: Terminating RTSP process.")
p.terminate()
p.join(timeout=duration_sec + 10)
if p.is_alive():
log("[MAIN] [PROCESS] ERROR: RTSP Process still alive after timeout. Forcing termination.")
p.terminate()
p.join()
log("[MAIN] [PROCESS] RTSP pipeline process has ended")
###############################################################################
# Main Execution
###############################################################################
if __name__ == "__main__":
# ------------------------------------------------------------------------------
# IMPORTANT: Set the multiprocessing start method to 'spawn'.
# ------------------------------------------------------------------------------
multiprocessing.set_start_method('spawn', force=True)
# Define your GStreamer pipeline descriptions.
# For the basic pipeline example, we use videotestsrc.
pipeline_description = "videotestsrc num-buffers=100 ! autovideosink"
# For the RTSP example, ensure the pipeline includes an appsink named "appsink".
rtsp_pipeline_description = (
"rtspsrc location=rtsp://example.com/stream ! decodebin ! videoconvert ! "
"videoscale ! appsink name=appsink"
)
# Run examples for different concurrency models:
run_pipeline_in_thread(pipeline_description, duration_sec=5)
run_pipeline_in_process(pipeline_description, duration_sec=5)
run_multiple_processes_simultaneously(pipeline_description, num_procs=3, duration_sec=5)
run_mixed_concurrent(pipeline_description, duration_sec=5)
# Run the RTSP pipeline process example with data relay.
run_rtsp_pipeline_in_process(rtsp_pipeline_description, duration_sec=10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment