Skip to content

Instantly share code, notes, and snippets.

@jasalt
Last active September 17, 2025 04:47
Show Gist options
  • Select an option

  • Save jasalt/efb955a7ad7af854ebba07c5dca137c5 to your computer and use it in GitHub Desktop.

Select an option

Save jasalt/efb955a7ad7af854ebba07c5dca137c5 to your computer and use it in GitHub Desktop.
Plot tabular time-series data as overlay to a video (matplotlib + ffmpeg)
# Author: Jarkko Saltiola
# License: BSD-3
#
# Convert TSV time-series to SRT subtitles or burn overlays into a video aligned to --video-start.
#
# Also supports a real-time rolling plot overlay of selected columns via --plot-columns.
#
## Example input TSV data format:
# id timestamp device_id p1 p2 p3 w1 w2 w3 w-total
# 53285 2025-09-14 13:01:12 1 3.6 3.61 3.73 828 830.3 857.9 2516.2
# 53287 2025-09-14 13:01:15 1 0.45 0.36 0.56 103.5 82.8 128.8 315.1
# 53289 2025-09-14 13:01:17 1 0 0 0 0 0 0 0
# 53291 2025-09-14 13:01:20 1 0 0 0 0 0 0 0
# 53305 2025-09-14 13:01:38 1 3.34 3.26 3.05 768.2 749.8 701.5 2219.5
# 53307 2025-09-14 13:01:41 1 6.06 6.15 6.4 1393.8 1414.5 1472 4280.3
#
# Requirements: click, matplotlib, numpy
#
## Command line interface (Click):
#
# cd video_embed_stats
# uv run video_embed_stats.py --input-tsv=<tsv-file> [--video-start=<timestamp>] [--output-srt=<srt-file>]
# uv run video_embed_stats.py --input-tsv=<tsv-file> --input-video=<video-file> --output-video=<output-video-file> --video-start=<timestamp> [--plot-columns=<c1,c2,...>] [--plot-window-duration=<seconds>] [--plot-width=<percent>] [--plot-height=<percent>] [--plot-limit-y=<float>] [--plot-type=<line|step>] [--ui-scale=<float>] [--cycle-gap=<rows>] [--integral=<all|c1,c2,...>] [--integral-unit=<str>] [--integral-convert=<Wh|kWh>]
#
# Timestamp formats accepted for --video-start: 20250914T161319, 20250914T13:19:12, or 2025-09-14T13:19:12
#
### Examples:
#
## Create SRT aligned to a video start time:
# uv run video_embed_stats.py --input-tsv=250914_demodata.tsv --video-start=20250914T131912 --output-srt=250914_demodata.srt
#
## Burn static subtitles (no plot) into a video:
# uv run video_embed_stats.py --input-video=IMG_2927.MOV --input-tsv=250914_demodata.tsv --output-video=IMG_2927_subs.mp4 --video-start=20250914T131912
#
## Burn a rolling plot (last 10s) of columns p1,p2,p3 at the bottom of the video:
# uv run video_embed_stats.py --input-video=IMG_2927.MOV --input-tsv=250914_demodata.tsv --output-video=IMG_2927_plot.mp4 --video-start=20250914T131912 --plot-columns=p1,p2,p3
#
## Customize plot window, size and style (15s window, 80% width, 25% height, step line chart, fixed Y up to 100), scale UI for mobile, break cycles after 3 zero rows:
# uv run video_embed_stats.py --input-video=IMG_2927.MOV --input-tsv=250914_demodata.tsv --output-video=IMG_2927_plot_custom.mp4 --video-start=20250914T131912 --plot-columns=p1,p2,p3 --plot-window-duration=15 --plot-width=80 --plot-height=25 --plot-type=step --plot-limit-y=100 --ui-scale=2 --cycle-gap=3
#
## Show integrated area under the curve (AUC) for all plotted columns and display in Wh with a unit label:
# uv run video_embed_stats.py --input-video=IMG_2927.MOV --input-tsv=250914_demodata.tsv --output-video=IMG_2927_plot_auc.mp4 --video-start=20250914T131912 --plot-columns=p1,p2,p3 --integral=all --integral-convert=Wh --integral-unit=Wh
#
## uv run --help # print click help
# read main function Click decorators for more clues
import click
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime, timedelta
import subprocess
import tempfile
import os
from pathlib import Path
import re
import multiprocessing as mp
from functools import partial
import numpy as np
def parse_isotime_format(timestamp_str):
"""Parse ISOTIME-like format (e.g., 20250914T161319 or 20250914T13:19:12)."""
# Handle format like 20250914T161319 (compact)
compact_match = re.match(r'(\d{8})T(\d{6})$', timestamp_str)
if compact_match:
date_part = compact_match.group(1)
time_part = compact_match.group(2)
# Convert YYYYMMDD to YYYY-MM-DD
formatted_date = f"{date_part[:4]}-{date_part[4:6]}-{date_part[6:8]}"
# Convert HHMMSS to HH:MM:SS
formatted_time = f"{time_part[:2]}:{time_part[2:4]}:{time_part[4:6]}"
return datetime.fromisoformat(f"{formatted_date} {formatted_time}")
# Handle format like 20250914T13:19:12 (mixed)
mixed_match = re.match(r'(\d{8})T(\d{1,2}:\d{2}:\d{2})$', timestamp_str)
if mixed_match:
date_part = mixed_match.group(1)
time_part = mixed_match.group(2)
# Convert YYYYMMDD to YYYY-MM-DD
formatted_date = f"{date_part[:4]}-{date_part[4:6]}-{date_part[6:8]}"
return datetime.fromisoformat(f"{formatted_date} {time_part}")
# Fallback to standard ISO format parsing
try:
return datetime.fromisoformat(timestamp_str.replace('T', ' '))
except ValueError:
raise ValueError(f"Unable to parse timestamp format: {timestamp_str}")
def parse_tsv_data(tsv_file):
"""Parse TSV file and return DataFrame with parsed timestamps."""
df = pd.read_csv(tsv_file, sep='\t')
df['timestamp'] = pd.to_datetime(df['timestamp'])
return df
def create_srt_content(df, data_columns=['p1', 'p2', 'p3'], video_start_time=None):
"""Convert DataFrame to SRT subtitle format."""
# If video_start_time is provided, filter data to start from that time
if video_start_time:
video_start = pd.to_datetime(video_start_time)
# Filter data to only include timestamps at or after video start
df = df[df['timestamp'] >= video_start].copy()
if df.empty:
return ""
# Reset index for proper subtitle numbering
df = df.reset_index(drop=True)
srt_content = []
for i, row in df.iterrows():
# Calculate time offset from video start (or first timestamp if no video_start)
if video_start_time:
start_time = row['timestamp'] - pd.to_datetime(video_start_time)
else:
if i == 0:
start_time = timedelta(0)
else:
start_time = row['timestamp'] - df.iloc[0]['timestamp']
# End time is start of next subtitle or +3 seconds for last one
if i < len(df) - 1:
if video_start_time:
end_time = df.iloc[i + 1]['timestamp'] - pd.to_datetime(video_start_time)
else:
end_time = df.iloc[i + 1]['timestamp'] - df.iloc[0]['timestamp']
else:
end_time = start_time + timedelta(seconds=3)
# Skip subtitles with negative start times
if start_time.total_seconds() < 0:
continue
# Format timestamps for SRT
start_srt = format_srt_time(start_time)
end_srt = format_srt_time(end_time)
# Create subtitle text with data values
values = [f"{col}: {row[col]:.2f}" for col in data_columns if col in df.columns]
subtitle_text = " | ".join(values)
srt_content.append(f"{len([x for x in srt_content if x.isdigit()]) + 1}")
srt_content.append(f"{start_srt} --> {end_srt}")
srt_content.append(subtitle_text)
srt_content.append("")
return "\n".join(srt_content)
def format_srt_time(td):
"""Format timedelta to SRT time format (HH:MM:SS,mmm)."""
total_seconds = int(td.total_seconds())
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
milliseconds = int((td.total_seconds() - total_seconds) * 1000)
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}"
def get_video_info(video_file):
"""Get video duration, width, and height using ffprobe."""
# Try to get all info in one command with JSON output for better parsing
info_cmd = [
'ffprobe', '-v', 'quiet', '-print_format', 'json',
'-show_format', '-show_streams', '-select_streams', 'v:0', video_file
]
duration = None
width = None
height = None
rotation = 0
try:
import json
result = subprocess.run(info_cmd, check=True, capture_output=True, text=True)
data = json.loads(result.stdout)
# Try to get duration from format first (most reliable)
if 'format' in data and 'duration' in data['format']:
try:
duration = float(data['format']['duration'])
except (ValueError, TypeError):
pass
# Try to get width/height from video stream
if 'streams' in data and len(data['streams']) > 0:
stream = data['streams'][0]
if 'width' in stream and 'height' in stream:
try:
width = int(stream['width'])
height = int(stream['height'])
except (ValueError, TypeError):
pass
# Detect rotation metadata to compute display dimensions
try:
rot_candidates = []
if isinstance(stream.get('tags'), dict) and 'rotate' in stream['tags']:
rot_candidates.append(int(str(stream['tags']['rotate']).strip()))
if isinstance(stream.get('side_data_list'), list):
for sd in stream['side_data_list']:
if isinstance(sd, dict) and 'rotation' in sd:
rot_candidates.append(int(str(sd['rotation']).strip()))
if rot_candidates:
rotation = rot_candidates[0]
except Exception:
pass
# If duration not found in format, try stream duration
if duration is None and 'duration' in stream:
try:
duration = float(stream['duration'])
except (ValueError, TypeError):
pass
except (subprocess.CalledProcessError, json.JSONDecodeError, ImportError):
# Fallback to original method if JSON parsing fails
pass
# If still no duration, try the simple format duration command
if duration is None:
format_cmd = [
'ffprobe', '-v', 'quiet', '-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1', video_file
]
try:
result = subprocess.run(format_cmd, check=True, capture_output=True, text=True)
duration_str = result.stdout.strip()
if duration_str and duration_str != 'N/A' and duration_str != '':
duration = float(duration_str)
except (subprocess.CalledProcessError, ValueError):
pass
# If still no width/height, try simple approach
if width is None or height is None:
size_cmd = [
'ffprobe', '-v', 'quiet', '-select_streams', 'v:0',
'-show_entries', 'stream=width,height', '-of', 'csv=s=x:p=0', video_file
]
try:
result = subprocess.run(size_cmd, check=True, capture_output=True, text=True)
size_str = result.stdout.strip()
if 'x' in size_str:
w_str, h_str = size_str.split('x')
width = int(w_str)
height = int(h_str)
except (subprocess.CalledProcessError, ValueError):
pass
# Adjust for display rotation (swap width/height when rotated 90/270)
try:
rot_norm = int(rotation) % 360
except Exception:
rot_norm = 0
if rot_norm in (90, 270):
if width is not None and height is not None:
width, height = height, width
# Debug output to see what we got
click.echo(f"Debug: Detected duration={duration}, width={width}, height={height}, rotation={rot_norm}")
return duration, width, height
def get_video_duration(video_file):
"""Get video duration in seconds using ffprobe."""
duration, _, _ = get_video_info(video_file)
return duration
def create_single_plot_frame(args):
"""Create a single plot frame - designed for multiprocessing."""
(frame_num, df_filtered, plot_columns, video_start, fps, window_seconds_default, frames_dir, colors, plot_width, plot_height, plot_limit_y, plot_type, ui_scale, cycle_gap, cycle_helpers, integral_helpers, integral_factor, integral_unit) = args
# Calculate current video time
current_video_time = frame_num / fps
current_timestamp = video_start + timedelta(seconds=current_video_time)
# Define rolling window (show last N seconds of data)
default_window_start = current_timestamp - timedelta(seconds=window_seconds_default)
window_start = default_window_start
window_end = current_timestamp
# If cycle-based dynamic windowing is enabled, extend window to cycle start
# A cycle starts at the first non-zero sample after the last stretch of
# `cycle_gap` consecutive rows where all selected columns are zero.
dynamic_window_start = None
cycle_active = False
cycle_start_idx = None
total_label_drawn = False
if cycle_gap and cycle_gap > 0 and cycle_helpers is not None:
try:
times = cycle_helpers['times']
break_times = cycle_helpers['break_times']
first_nonzero_after_break = cycle_helpers['first_nonzero_after_break']
first_nonzero_global_idx = cycle_helpers['first_nonzero_global_idx']
# Find the last break (end of a >= cycle_gap zero-run) at or before current time
cur_np_time = np.datetime64(current_timestamp.to_pydatetime())
j = int(np.searchsorted(break_times, cur_np_time, side='right') - 1)
if j >= 0:
start_idx = first_nonzero_after_break[j]
else:
start_idx = first_nonzero_global_idx
if start_idx is not None and times[start_idx] <= cur_np_time:
dynamic_window_start = pd.to_datetime(str(times[start_idx]))
cycle_active = True
cycle_start_idx = start_idx
# Extend the window to cover the entire ongoing cycle (but never shrink below default)
if cycle_active and dynamic_window_start is not None and dynamic_window_start < default_window_start:
window_start = dynamic_window_start
except Exception:
# Fallback silently to default window if any issue occurs
pass
# Get data within the rolling window
mask = (df_filtered['timestamp'] >= window_start) & (df_filtered['timestamp'] <= window_end)
window_data = df_filtered[mask].copy()
# Create the plot with custom dimensions for video encoding
fig, ax = plt.subplots(figsize=(plot_width, plot_height))
fig.patch.set_facecolor('black')
ax.set_facecolor('black')
if not window_data.empty:
# Plot each column
for i, col in enumerate(plot_columns):
if col in window_data.columns:
color = colors[i % len(colors)]
if plot_type == 'step':
ax.step(window_data['timestamp'], window_data[col], where='post',
color=color, linewidth=4*ui_scale, label=col, alpha=0.8)
else:
ax.plot(window_data['timestamp'], window_data[col],
color=color, linewidth=4*ui_scale, label=col, alpha=0.8)
# Add current time indicator
ax.axvline(x=current_timestamp, color='white', linestyle='--', alpha=0.7, linewidth=2*ui_scale)
# Compute and display integrals
if integral_helpers is not None:
try:
times_i = integral_helpers['times']
cum_area = integral_helpers['cum_area']
cur_np_time = np.datetime64(current_timestamp.to_pydatetime())
cur_idx = int(np.searchsorted(times_i, cur_np_time, side='right') - 1)
if cur_idx < 0:
total_auc = 0.0
else:
total_auc = float(cum_area[cur_idx]) * integral_factor
unit_suffix = f" {integral_unit}" if integral_unit else ""
total_text = f"Total: {total_auc:.2f}{unit_suffix}"
ax.text(0.01, 0.95, total_text,
transform=ax.transAxes,
color='cyan',
fontsize=int(22*ui_scale),
verticalalignment='top',
horizontalalignment='left',
bbox=dict(facecolor='black', alpha=0.3, edgecolor='cyan', boxstyle='round,pad=0.3'))
total_label_drawn = True
except Exception:
pass
# Show cycle duration indicator if cycle-based windowing is active
if 'cycle_active' in locals() and cycle_active:
duration_td = current_timestamp - (dynamic_window_start or current_timestamp)
total_ms = int(duration_td.total_seconds() * 1000)
secs = total_ms // 1000
ms = total_ms % 1000
cycle_text = f"Cycle {secs}:{ms:03d}"
# If integral available, show cycle AUC next to time
if integral_helpers is not None:
try:
times_i = integral_helpers['times']
cum_area = integral_helpers['cum_area']
cur_np_time = np.datetime64(current_timestamp.to_pydatetime())
cur_idx = int(np.searchsorted(times_i, cur_np_time, side='right') - 1)
if cur_idx >= 0 and cycle_start_idx is not None and cur_idx >= cycle_start_idx:
cycle_auc = (float(cum_area[cur_idx]) - float(cum_area[cycle_start_idx])) * integral_factor
else:
cycle_auc = 0.0
unit_suffix = f" {integral_unit}" if integral_unit else ""
cycle_text += f" | AUC: {cycle_auc:.2f}{unit_suffix}"
except Exception:
pass
cycle_x = 0.30 if 'total_label_drawn' in locals() and total_label_drawn else 0.01
ax.text(cycle_x, 0.95, cycle_text,
transform=ax.transAxes,
color='yellow',
fontsize=int(22*ui_scale),
verticalalignment='top',
horizontalalignment='left',
bbox=dict(facecolor='black', alpha=0.3, edgecolor='yellow', boxstyle='round,pad=0.3'))
# Styling
ax.set_xlabel('Time', color='white', fontsize=int(25*ui_scale))
ax.set_ylabel('Values', color='white', fontsize=int(25*ui_scale))
ax.tick_params(colors='white', labelsize=int(20*ui_scale))
# Only create legend if there are plot lines with data
if not window_data.empty:
ax.legend(loc='upper right', fontsize=int(20*ui_scale))
ax.grid(True, alpha=0.3)
# Set consistent axis limits
ax.set_xlim(window_start, window_end)
# Set y-axis limits
if plot_limit_y is not None:
ax.set_ylim(0, plot_limit_y)
elif not window_data.empty:
y_values = []
for col in plot_columns:
if col in window_data.columns:
y_values.extend(window_data[col].values)
if y_values:
y_min, y_max = min(y_values), max(y_values)
y_range = y_max - y_min
ax.set_ylim(y_min - y_range * 0.1, y_max + y_range * 0.1)
# Format x-axis
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
plt.xticks(rotation=45)
plt.tight_layout()
# Save frame with even dimensions
frame_path = os.path.join(frames_dir, f'frame_{frame_num:06d}.png')
plt.savefig(frame_path, facecolor='black', dpi=100)
plt.close()
return frame_num
def create_dynamic_plot_frames(df, plot_columns, video_start_time, temp_dir, video_file, window_seconds=10, fps=30, plot_width_percent=100, plot_height_percent=10, plot_limit_y=None, plot_type='line', ui_scale=1.0, cycle_gap=0, integral_columns=None, integral_unit='', integral_factor=1.0):
"""Create multiple plot frames for dynamic overlay showing rolling window of data."""
video_start = pd.to_datetime(video_start_time)
# Filter data to only include timestamps at or after video start
df_filtered = df[df['timestamp'] >= video_start].copy()
if df_filtered.empty:
click.echo("Warning: No data found from video start time")
return None
# Get actual video info
video_duration, video_width, video_height = get_video_info(video_file)
if video_duration is None:
click.echo("Warning: Could not determine video duration, using data duration")
video_duration = (df_filtered['timestamp'].max() - video_start).total_seconds()
if video_width is None or video_height is None:
click.echo("Warning: Could not determine video dimensions, using defaults")
video_width, video_height = 1920, 1080
# Calculate actual plot dimensions in pixels
plot_width_pixels = int(video_width * plot_width_percent / 100)
plot_height_pixels = int(video_height * plot_height_percent / 100)
# Convert to matplotlib figure size (assuming 100 DPI)
plot_width_fig = plot_width_pixels / 100
plot_height_fig = plot_height_pixels / 100
total_frames = int(video_duration * fps)
colors = ['red', 'green', 'blue', 'yellow', 'cyan', 'magenta']
frames_dir = os.path.join(temp_dir, 'frames')
os.makedirs(frames_dir, exist_ok=True)
# Precompute cycle helpers if dynamic cycle windowing is enabled
cycle_helpers = None
if cycle_gap and cycle_gap > 0:
try:
# Compute boolean mask where all selected columns are exactly zero
all_zero = np.ones(len(df_filtered), dtype=bool)
for col in plot_columns:
if col in df_filtered.columns:
all_zero &= (df_filtered[col].values == 0)
times = df_filtered['timestamp'].values.astype('datetime64[ns]')
# Identify indices that end zero-runs with length >= cycle_gap
break_indices = []
run_len = 0
for i, is_zero in enumerate(all_zero):
if is_zero:
run_len += 1
else:
if run_len >= cycle_gap:
break_indices.append(i - 1) # end index of the zero-run
run_len = 0
# Handle trailing zero-run at the end
if run_len >= cycle_gap and len(all_zero) > 0:
break_indices.append(len(all_zero) - 1)
break_times = times[break_indices] if break_indices else np.array([], dtype='datetime64[ns]')
# Map each break to the first non-zero index after it
first_nonzero_after_break = []
for bi in break_indices:
k = bi + 1
while k < len(all_zero) and all_zero[k]:
k += 1
first_nonzero_after_break.append(k if k < len(all_zero) else None)
# Global first non-zero index (before any breaks)
k = 0
while k < len(all_zero) and all_zero[k]:
k += 1
first_nonzero_global_idx = k if k < len(all_zero) else None
cycle_helpers = {
'times': times,
'break_times': break_times,
'break_indices': break_indices,
'first_nonzero_after_break': first_nonzero_after_break,
'first_nonzero_global_idx': first_nonzero_global_idx,
}
except Exception:
cycle_helpers = None
# Precompute integral helpers for AUC calculations
integral_helpers = None
try:
cols_to_integrate = integral_columns if integral_columns else plot_columns
valid_integral_cols = [c for c in cols_to_integrate if c in df_filtered.columns]
if valid_integral_cols:
y_sum = np.zeros(len(df_filtered), dtype=float)
for c in valid_integral_cols:
y_sum += pd.to_numeric(df_filtered[c], errors='coerce').fillna(0).values.astype(float)
times_i = df_filtered['timestamp'].values.astype('datetime64[ns]')
# Convert to seconds since first timestamp
t_ns = times_i.astype('int64')
t_secs = (t_ns - t_ns[0]) / 1e9
cum_area = np.zeros(len(y_sum), dtype=float)
if len(y_sum) > 1:
dt = np.diff(t_secs)
y_mid = 0.5 * (y_sum[:-1] + y_sum[1:])
cum_area[1:] = np.cumsum(y_mid * dt)
integral_helpers = {'times': times_i, 't_secs': t_secs, 'cum_area': cum_area}
except Exception:
integral_helpers = None
# Calculate number of processes to use (90% of available cores)
num_cores = mp.cpu_count()
num_processes = max(1, int(num_cores * 0.9))
click.echo(f"Video dimensions: {video_width}x{video_height}")
click.echo(f"Plot size: {plot_width_pixels}x{plot_height_pixels} pixels ({plot_width_percent}% x {plot_height_percent}%)")
click.echo(f"Generating {total_frames} plot frames for {video_duration:.1f}s video using {num_processes} processes...")
# Prepare arguments for multiprocessing
frame_args = [
(frame_num, df_filtered, plot_columns, video_start, fps, window_seconds, frames_dir, colors, plot_width_fig, plot_height_fig, plot_limit_y, plot_type, ui_scale, cycle_gap, cycle_helpers, integral_helpers, integral_factor, integral_unit)
for frame_num in range(total_frames)
]
# Use multiprocessing to create frames in parallel
with mp.Pool(processes=num_processes) as pool:
# Use imap to get progress updates
completed_frames = 0
for result in pool.imap(create_single_plot_frame, frame_args):
completed_frames += 1
if completed_frames % (total_frames // 10) == 0 or completed_frames == total_frames:
progress = (completed_frames / total_frames) * 100
click.echo(f"Progress: {progress:.0f}% ({completed_frames}/{total_frames} frames)")
return frames_dir
def detect_hardware_encoder():
"""Detect available hardware encoders and return the best option."""
# Test for AMD AMF encoder (Ryzen 7000 series integrated graphics)
test_cmd = ['ffmpeg', '-hide_banner', '-f', 'lavfi', '-i', 'testsrc=duration=1:size=320x240:rate=1',
'-c:v', 'h264_amf', '-t', '1', '-f', 'null', '-']
try:
subprocess.run(test_cmd, check=True, capture_output=True, text=True)
return 'h264_amf', 'AMD AMF'
except subprocess.CalledProcessError:
pass
# Test for NVIDIA NVENC
test_cmd = ['ffmpeg', '-hide_banner', '-f', 'lavfi', '-i', 'testsrc=duration=1:size=320x240:rate=1',
'-c:v', 'h264_nvenc', '-t', '1', '-f', 'null', '-']
try:
subprocess.run(test_cmd, check=True, capture_output=True, text=True)
return 'h264_nvenc', 'NVIDIA NVENC'
except subprocess.CalledProcessError:
pass
# Test for Intel QuickSync
test_cmd = ['ffmpeg', '-hide_banner', '-f', 'lavfi', '-i', 'testsrc=duration=1:size=320x240:rate=1',
'-c:v', 'h264_qsv', '-t', '1', '-f', 'null', '-']
try:
subprocess.run(test_cmd, check=True, capture_output=True, text=True)
return 'h264_qsv', 'Intel QuickSync'
except subprocess.CalledProcessError:
pass
# Fallback to software encoding
return 'libx264', 'Software (libx264)'
def burn_subtitles_to_video(input_video, srt_file, output_video):
"""Use ffmpeg to burn subtitles into video."""
# Detect best available encoder
encoder, encoder_name = detect_hardware_encoder()
click.echo(f"Using encoder for subtitles: {encoder_name}")
cmd = [
'ffmpeg', '-i', input_video,
'-vf', f'subtitles={srt_file}',
'-c:v', encoder, '-c:a', 'copy'
]
# Add encoder-specific options
if encoder == 'h264_amf':
cmd.extend(['-quality', 'balanced', '-rc', 'cqp', '-qp_i', '20', '-qp_p', '20'])
elif encoder == 'h264_nvenc':
cmd.extend(['-preset', 'medium', '-cq', '20'])
elif encoder == 'h264_qsv':
cmd.extend(['-preset', 'medium', '-global_quality', '20'])
else: # libx264
cmd.extend(['-preset', 'medium', '-crf', '20'])
cmd.extend(['-y', output_video])
try:
subprocess.run(cmd, check=True, text=True)
click.echo(f"Successfully created video with subtitles: {output_video}")
except subprocess.CalledProcessError as e:
click.echo(f"Error burning subtitles")
raise
def burn_dynamic_plot_to_video(input_video, frames_dir, output_video, fps=30, plot_width_percent=100, plot_height_percent=10):
"""Use ffmpeg to overlay dynamic plot frames on video."""
# Detect best available encoder
encoder, encoder_name = detect_hardware_encoder()
click.echo(f"Using encoder: {encoder_name}")
# Create video from plot frames
plot_video = os.path.join(os.path.dirname(frames_dir), 'plot_video.mp4')
# First, create a video from the plot frames with proper scaling
frames_cmd = [
'ffmpeg', '-r', str(fps), '-i', os.path.join(frames_dir, 'frame_%06d.png'),
'-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2',
'-c:v', encoder, '-pix_fmt', 'yuv420p'
]
# Add encoder-specific options
if encoder == 'h264_amf':
frames_cmd.extend(['-quality', 'speed', '-rc', 'cqp', '-qp_i', '23', '-qp_p', '23'])
elif encoder == 'h264_nvenc':
frames_cmd.extend(['-preset', 'fast', '-cq', '23'])
elif encoder == 'h264_qsv':
frames_cmd.extend(['-preset', 'fast', '-global_quality', '23'])
else: # libx264
frames_cmd.extend(['-preset', 'fast', '-crf', '23'])
frames_cmd.extend(['-y', plot_video])
try:
subprocess.run(frames_cmd, check=True, text=True)
click.echo("Created plot video from frames")
except subprocess.CalledProcessError as e:
click.echo(f"Error creating plot video")
raise
# Then overlay the plot video on the main video with custom sizing
overlay_cmd = [
'ffmpeg', '-i', input_video, '-i', plot_video,
'-filter_complex', '[0:v][1:v]overlay=10:main_h-overlay_h-10',
'-c:v', encoder, '-c:a', 'copy', '-shortest'
]
# Add encoder-specific options for final output
if encoder == 'h264_amf':
overlay_cmd.extend(['-quality', 'balanced', '-rc', 'cqp', '-qp_i', '20', '-qp_p', '20'])
elif encoder == 'h264_nvenc':
overlay_cmd.extend(['-preset', 'medium', '-cq', '20'])
elif encoder == 'h264_qsv':
overlay_cmd.extend(['-preset', 'medium', '-global_quality', '20'])
else: # libx264
overlay_cmd.extend(['-preset', 'medium', '-crf', '20'])
overlay_cmd.extend(['-y', output_video])
try:
subprocess.run(overlay_cmd, check=True, text=True)
click.echo(f"Successfully created video with dynamic plot overlay: {output_video}")
except subprocess.CalledProcessError as e:
click.echo(f"Error overlaying dynamic plot")
raise
finally:
# Clean up temporary plot video
if os.path.exists(plot_video):
os.remove(plot_video)
@click.command()
@click.option('--input-tsv', required=True, help='Input TSV file with data')
@click.option('--output-srt', help='Output SRT subtitle file')
@click.option('--input-video', help='Input video file')
@click.option('--output-video', help='Output video file with embedded data')
@click.option('--video-start', help='Video start timestamp (formats: 20250914T131912, 20250914T13:19:12, or 2025-09-14T13:19:12)')
@click.option('--plot-columns', help='Comma-separated column names to plot (e.g., p1,p2,p3)')
@click.option('--plot-window-duration', type=int, default=10, help='Duration of data shown in real-time overlay plot in seconds (default: 10)')
@click.option('--plot-width', type=int, default=100, help='Plot window width as percentage of video width (default: 100)')
@click.option('--plot-height', type=int, default=10, help='Plot window height as percentage of video height (default: 10)')
@click.option('--plot-limit-y', type=float, help='Static Y-axis max value; set fixed range from 0 to this value for the plot')
@click.option('--plot-type', type=click.Choice(['line', 'step']), default='line', show_default=True, help='Plot style: line or step')
@click.option('--ui-scale', type=float, default=1.0, help='Scale UI elements (lines, fonts, ticks) by this factor (e.g., 1.5 for mobile)')
@click.option('--cycle-gap', type=int, default=0, help='Number of consecutive zero-value rows across all selected columns that breaks a single cycle (0 disables cycle-based window)')
@click.option('--integral', default='all', help='Columns to integrate: comma-separated list like --plot-columns or "all" to sum all selected plot columns (default)')
@click.option('--integral-unit', default='', help='Unit label appended to AUC values')
@click.option('--integral-convert', type=click.Choice(['Wh', 'kWh'], case_sensitive=False), help='Convert computed integral (J) to Wh or kWh before display')
def main(input_tsv, output_srt, input_video, output_video, video_start, plot_columns, plot_window_duration, plot_width, plot_height, plot_limit_y, plot_type, ui_scale, cycle_gap, integral, integral_unit, integral_convert):
"""Convert TSV data to SRT subtitles or burn into video as captions/plots."""
# Validate input file
if not Path(input_tsv).exists():
click.echo(f"Error: Input TSV file '{input_tsv}' not found")
return
# Parse TSV data
try:
df = parse_tsv_data(input_tsv)
click.echo(f"Loaded {len(df)} rows from {input_tsv}")
except Exception as e:
click.echo(f"Error parsing TSV file: {e}")
return
# Determine data columns to use
data_columns = ['p1', 'p2', 'p3'] # Default columns
if plot_columns:
data_columns = [col.strip() for col in plot_columns.split(',')]
# Validate columns exist
missing_cols = [col for col in data_columns if col not in df.columns]
if missing_cols:
click.echo(f"Warning: Columns not found in data: {missing_cols}")
data_columns = [col for col in data_columns if col in df.columns]
if not data_columns:
click.echo("Error: No valid data columns found")
return
# Determine columns to integrate for AUC
if (integral or '').strip().lower() == 'all':
integral_columns = data_columns.copy()
else:
integral_columns = [c.strip() for c in integral.split(',')] if integral else data_columns.copy()
missing_integral = [c for c in integral_columns if c not in df.columns]
if missing_integral:
click.echo(f"Warning: Integral columns not found in data: {missing_integral}")
integral_columns = [c for c in integral_columns if c in df.columns]
if not integral_columns:
click.echo("Warning: No valid integral columns; defaulting to plotted columns")
integral_columns = data_columns.copy()
# Validate UI scale
if ui_scale <= 0:
click.echo("Error: --ui-scale must be greater than 0")
return
# Validate cycle gap
if cycle_gap < 0:
click.echo("Error: --cycle-gap must be >= 0")
return
# Apply integral conversion factor and default unit if needed
integral_factor = 1.0
if integral_convert:
if integral_convert.lower() == 'wh':
integral_factor = 1.0 / 3600.0
elif integral_convert.lower() == 'kwh':
integral_factor = 1.0 / 3600000.0
if not integral_unit:
integral_unit = integral_convert or ''
# Parse video start time if provided
video_start_dt = None
if video_start:
try:
video_start_dt = parse_isotime_format(video_start)
except ValueError as e:
click.echo(f"Error: Invalid video-start format. {e}")
click.echo("Supported formats: 20250914T161319, 20250914T13:19:12, or 2025-09-14T13:19:12")
return
# Generate SRT content
srt_content = create_srt_content(df, data_columns, video_start_dt)
# If only SRT output requested
if output_srt and not input_video:
with open(output_srt, 'w') as f:
f.write(srt_content)
click.echo(f"SRT file created: {output_srt}")
return
# Video processing
if input_video:
if not Path(input_video).exists():
click.echo(f"Error: Input video file '{input_video}' not found")
return
if not output_video:
click.echo("Error: --output-video required when --input-video is specified")
return
if not video_start:
click.echo("Error: --video-start required when processing video")
return
# Create temporary files
with tempfile.TemporaryDirectory() as temp_dir:
if plot_columns:
# Create dynamic plot overlay
frames_dir = create_dynamic_plot_frames(df, data_columns, video_start_dt, temp_dir, input_video,
window_seconds=plot_window_duration, plot_width_percent=plot_width, plot_height_percent=plot_height, plot_limit_y=plot_limit_y, plot_type=plot_type, ui_scale=ui_scale, cycle_gap=cycle_gap, integral_columns=integral_columns, integral_unit=integral_unit, integral_factor=integral_factor)
if frames_dir:
burn_dynamic_plot_to_video(input_video, frames_dir, output_video, plot_width_percent=plot_width, plot_height_percent=plot_height)
else:
click.echo("Failed to create plot frames")
return
else:
# Create subtitle overlay (srt_content already filtered by video_start_dt)
srt_path = os.path.join(temp_dir, 'subtitles.srt')
with open(srt_path, 'w') as f:
f.write(srt_content)
burn_subtitles_to_video(input_video, srt_path, output_video)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment