Skip to content

Instantly share code, notes, and snippets.

@pepijndevos
Created August 28, 2025 15:36
Show Gist options
  • Select an option

  • Save pepijndevos/f03cc26701705806a00a9af2a5c96dbd to your computer and use it in GitHub Desktop.

Select an option

Save pepijndevos/f03cc26701705806a00a9af2a5c96dbd to your computer and use it in GitHub Desktop.
process high speed footage into low speed HDR footage
#!/usr/bin/env python3
import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import numpy as np
from collections import deque
if len(sys.argv) != 3:
print("Usage: python gst_hdr.py input.mp4 output.mkv")
sys.exit(1)
input_path = sys.argv[1]
output_path = sys.argv[2]
# HDR parameters
window_size = 4
input_fps = 120.0
output_fps = 30.0
frame_skip = int(input_fps / output_fps)
gamma = 3
print(f"HDR Config: Window={window_size}, Skip every {frame_skip} frames, Gamma={gamma}")
# Initialize GStreamer
Gst.init(None)
class HDRProcessor:
def __init__(self, input_file, output_file):
self.input_file = input_file
self.output_file = output_file
self.frame_count = 0
self.output_count = 0
self.frames_since_output = 0
# HDR frame buffer
self.frame_buffer = deque(maxlen=window_size)
# Timestamp tracking
self.timestamp = 0
self.duration = int(Gst.SECOND / output_fps)
# Create main loop
self.loop = GLib.MainLoop()
# Create source pipeline (decoder -> appsink)
self.src_pipeline = Gst.parse_launch(f"""
filesrc location={input_file} !
decodebin !
videoconvert !
video/x-raw,format=ARGB64 !
appsink name=sink emit-signals=true sync=false
""")
# Create sink pipeline (appsrc -> encoder -> file)
self.sink_pipeline = Gst.parse_launch(f"""
appsrc name=source emit-signals=false !
video/x-raw,format=ARGB64 !
videoconvert !
x265enc !
h265parse !
matroskamux !
filesink location={output_file}
""")
# Get appsink and appsrc elements
self.appsink = self.src_pipeline.get_by_name('sink')
self.appsrc = self.sink_pipeline.get_by_name('source')
# Configure appsink
self.appsink.set_property('emit-signals', True)
self.appsink.connect('new-sample', self.on_new_sample)
# Configure appsrc
self.appsrc.set_property('emit-signals', True)
self.appsrc.set_property('is-live', False)
self.appsrc.set_property('format', Gst.Format.TIME)
self.appsrc.set_property('max-bytes', 50 * 1024 * 1024)
self.appsrc.set_property('block', True)
# Set up bus message handlers
self.src_bus = self.src_pipeline.get_bus()
self.src_bus.add_signal_watch()
self.src_bus.connect('message', self.on_src_message)
self.sink_bus = self.sink_pipeline.get_bus()
self.sink_bus.add_signal_watch()
self.sink_bus.connect('message', self.on_sink_message)
# Video properties (will be set from first frame)
self.width = None
self.height = None
self.caps_set = False
def timeout_quit(self):
print("Timeout reached - forcing quit")
self.loop.quit()
return False
def buffer_to_numpy(self, buffer):
"""Convert GStreamer buffer to numpy array (ARGB64 format)"""
# Get buffer data
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success:
return None
# ARGB64 format: 16-bit per channel, 4 channels (A,R,G,B)
array = np.frombuffer(map_info.data, dtype=np.uint16)
array = array.reshape((self.height, self.width, 4))
# Extract RGB channels (skip alpha channel 0, take channels 1,2,3)
rgb_array = array[:, :, 1:4] # Skip alpha, take R,G,B
buffer.unmap(map_info)
return rgb_array.copy()
def numpy_to_buffer(self, array):
"""Convert numpy array back to GStreamer buffer (ARGB64 format)"""
# Ensure array is uint16
if array.dtype != np.uint16:
array = array.astype(np.uint16)
# Add alpha channel back (set to max value)
alpha_channel = np.full((self.height, self.width, 1), 65535, dtype=np.uint16)
argb_array = np.concatenate([alpha_channel, array], axis=2) # ARGB format
# Create buffer from array data - use correct method
data = argb_array.tobytes()
buffer = Gst.Buffer.new_wrapped(data)
return buffer
def process_hdr_frame(self):
"""Process accumulated frames with HDR tone mapping"""
if len(self.frame_buffer) < window_size:
return None
# Sum all frames in buffer
summed = np.zeros((self.height, self.width, 3), dtype=np.int64)
for buf_frame in self.frame_buffer:
summed += buf_frame.astype(np.int64)
# Apply tone mapping
tone_mapped = np.power(summed, 1.0/gamma)
# Normalize and keep 16-bit precision
mapped_min = np.min(tone_mapped)
mapped_max = np.max(tone_mapped)
if mapped_max > mapped_min:
output_frame = ((tone_mapped - mapped_min) / (mapped_max - mapped_min) * 65535).astype(np.uint16)
else:
output_frame = np.zeros_like(tone_mapped, dtype=np.uint16)
return output_frame
def on_new_sample(self, appsink):
# Pull sample from appsink
sample = appsink.emit('pull-sample')
if sample:
# Get the buffer and caps
buffer = sample.get_buffer()
caps = sample.get_caps()
# Set video properties from first frame
if not self.caps_set:
structure = caps.get_structure(0)
self.width = structure.get_int('width')[1]
self.height = structure.get_int('height')[1]
# Create output caps with framerate
output_caps = Gst.Caps.from_string(f"video/x-raw,format=ARGB64,width={self.width},height={self.height},framerate={int(output_fps)}/1")
self.appsrc.set_property('caps', output_caps)
print(f"Input caps: {caps.to_string()}")
print(f"Output caps: {output_caps.to_string()}")
print(f"Video size: {self.width}x{self.height}")
self.sink_pipeline.set_state(Gst.State.PLAYING)
self.caps_set = True
# Convert buffer to numpy array
frame_array = self.buffer_to_numpy(buffer)
if frame_array is not None:
# Add frame to HDR buffer
self.frame_buffer.append(frame_array)
self.frame_count += 1
self.frames_since_output += 1
if self.frame_count % 100 == 0:
print(f"Processed input frame {self.frame_count}")
# Generate output frame?
if self.frames_since_output >= frame_skip:
hdr_frame = self.process_hdr_frame()
if hdr_frame is not None:
# Convert back to GStreamer buffer and push
output_buffer = self.numpy_to_buffer(hdr_frame)
# Set timestamp and duration
output_buffer.pts = self.timestamp
output_buffer.duration = self.duration
self.timestamp += self.duration
print(f"Created buffer size: {output_buffer.get_size()}, HDR frame shape: {hdr_frame.shape}")
ret = self.appsrc.emit('push-buffer', output_buffer)
self.output_count += 1
self.frames_since_output = 0
print(f"Output HDR frame {self.output_count} (from input {self.frame_count}), push result: {ret}")
if ret != Gst.FlowReturn.OK:
print(f"Error pushing HDR buffer: {ret}")
return ret
else:
print("HDR frame was None - not enough frames in buffer yet")
return Gst.FlowReturn.OK
def on_src_message(self, bus, message):
if message.type == Gst.MessageType.EOS:
print("Source pipeline reached EOS")
# Send EOS to appsrc
ret = self.appsrc.emit('end-of-stream')
print(f"Sent EOS to appsrc: {ret}")
# Set a timeout to quit if sink doesn't finish
GLib.timeout_add_seconds(5, self.timeout_quit)
elif message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Source pipeline error: {err}, {debug}")
self.loop.quit()
def on_sink_message(self, bus, message):
if message.type == Gst.MessageType.EOS:
print("Sink pipeline reached EOS")
self.loop.quit()
elif message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Sink pipeline error: {err}, {debug}")
self.loop.quit()
def run(self):
print(f"Starting HDR processing: {self.input_file} -> {self.output_file}")
# Start both pipelines (sink will wait for caps from first frame)
self.sink_pipeline.set_state(Gst.State.READY)
self.src_pipeline.set_state(Gst.State.PLAYING)
# Run the main loop
try:
self.loop.run()
except KeyboardInterrupt:
print("Interrupted by user - sending EOS to finish processing...")
# Send EOS to source pipeline instead of abrupt stop
self.src_pipeline.send_event(Gst.Event.new_eos())
# Continue running loop to process remaining frames
try:
self.loop.run()
except KeyboardInterrupt:
print("Second interrupt - force stopping")
# Clean up
self.src_pipeline.set_state(Gst.State.NULL)
self.sink_pipeline.set_state(Gst.State.NULL)
print(f"Done: {self.frame_count} input frames, {self.output_count} HDR output frames")
if __name__ == "__main__":
processor = HDRProcessor(input_path, output_path)
processor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment