Last active
September 17, 2025 04:47
-
-
Save jasalt/efb955a7ad7af854ebba07c5dca137c5 to your computer and use it in GitHub Desktop.
Plot tabular time-series data as overlay to a video (matplotlib + ffmpeg)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Author: Jarkko Saltiola | |
| # License: BSD-3 | |
| # | |
| # Convert TSV time-series to SRT subtitles or burn overlays into a video aligned to --video-start. | |
| # | |
| # Also supports a real-time rolling plot overlay of selected columns via --plot-columns. | |
| # | |
| ## Example input TSV data format: | |
| # id timestamp device_id p1 p2 p3 w1 w2 w3 w-total | |
| # 53285 2025-09-14 13:01:12 1 3.6 3.61 3.73 828 830.3 857.9 2516.2 | |
| # 53287 2025-09-14 13:01:15 1 0.45 0.36 0.56 103.5 82.8 128.8 315.1 | |
| # 53289 2025-09-14 13:01:17 1 0 0 0 0 0 0 0 | |
| # 53291 2025-09-14 13:01:20 1 0 0 0 0 0 0 0 | |
| # 53305 2025-09-14 13:01:38 1 3.34 3.26 3.05 768.2 749.8 701.5 2219.5 | |
| # 53307 2025-09-14 13:01:41 1 6.06 6.15 6.4 1393.8 1414.5 1472 4280.3 | |
| # | |
| # Requirements: click, matplotlib, numpy | |
| # | |
| ## Command line interface (Click): | |
| # | |
| # cd video_embed_stats | |
| # uv run video_embed_stats.py --input-tsv=<tsv-file> [--video-start=<timestamp>] [--output-srt=<srt-file>] | |
| # uv run video_embed_stats.py --input-tsv=<tsv-file> --input-video=<video-file> --output-video=<output-video-file> --video-start=<timestamp> [--plot-columns=<c1,c2,...>] [--plot-window-duration=<seconds>] [--plot-width=<percent>] [--plot-height=<percent>] [--plot-limit-y=<float>] [--plot-type=<line|step>] [--ui-scale=<float>] [--cycle-gap=<rows>] [--integral=<all|c1,c2,...>] [--integral-unit=<str>] [--integral-convert=<Wh|kWh>] | |
| # | |
| # Timestamp formats accepted for --video-start: 20250914T161319, 20250914T13:19:12, or 2025-09-14T13:19:12 | |
| # | |
| ### Examples: | |
| # | |
| ## Create SRT aligned to a video start time: | |
| # uv run video_embed_stats.py --input-tsv=250914_demodata.tsv --video-start=20250914T131912 --output-srt=250914_demodata.srt | |
| # | |
| ## Burn static subtitles (no plot) into a video: | |
| # uv run video_embed_stats.py --input-video=IMG_2927.MOV --input-tsv=250914_demodata.tsv --output-video=IMG_2927_subs.mp4 --video-start=20250914T131912 | |
| # | |
| ## Burn a rolling plot (last 10s) of columns p1,p2,p3 at the bottom of the video: | |
| # uv run video_embed_stats.py --input-video=IMG_2927.MOV --input-tsv=250914_demodata.tsv --output-video=IMG_2927_plot.mp4 --video-start=20250914T131912 --plot-columns=p1,p2,p3 | |
| # | |
| ## Customize plot window, size and style (15s window, 80% width, 25% height, step line chart, fixed Y up to 100), scale UI for mobile, break cycles after 3 zero rows: | |
| # uv run video_embed_stats.py --input-video=IMG_2927.MOV --input-tsv=250914_demodata.tsv --output-video=IMG_2927_plot_custom.mp4 --video-start=20250914T131912 --plot-columns=p1,p2,p3 --plot-window-duration=15 --plot-width=80 --plot-height=25 --plot-type=step --plot-limit-y=100 --ui-scale=2 --cycle-gap=3 | |
| # | |
| ## Show integrated area under the curve (AUC) for all plotted columns and display in Wh with a unit label: | |
| # uv run video_embed_stats.py --input-video=IMG_2927.MOV --input-tsv=250914_demodata.tsv --output-video=IMG_2927_plot_auc.mp4 --video-start=20250914T131912 --plot-columns=p1,p2,p3 --integral=all --integral-convert=Wh --integral-unit=Wh | |
| # | |
| ## uv run --help # print click help | |
| # read main function Click decorators for more clues | |
| import click | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import matplotlib.dates as mdates | |
| from datetime import datetime, timedelta | |
| import subprocess | |
| import tempfile | |
| import os | |
| from pathlib import Path | |
| import re | |
| import multiprocessing as mp | |
| from functools import partial | |
| import numpy as np | |
| def parse_isotime_format(timestamp_str): | |
| """Parse ISOTIME-like format (e.g., 20250914T161319 or 20250914T13:19:12).""" | |
| # Handle format like 20250914T161319 (compact) | |
| compact_match = re.match(r'(\d{8})T(\d{6})$', timestamp_str) | |
| if compact_match: | |
| date_part = compact_match.group(1) | |
| time_part = compact_match.group(2) | |
| # Convert YYYYMMDD to YYYY-MM-DD | |
| formatted_date = f"{date_part[:4]}-{date_part[4:6]}-{date_part[6:8]}" | |
| # Convert HHMMSS to HH:MM:SS | |
| formatted_time = f"{time_part[:2]}:{time_part[2:4]}:{time_part[4:6]}" | |
| return datetime.fromisoformat(f"{formatted_date} {formatted_time}") | |
| # Handle format like 20250914T13:19:12 (mixed) | |
| mixed_match = re.match(r'(\d{8})T(\d{1,2}:\d{2}:\d{2})$', timestamp_str) | |
| if mixed_match: | |
| date_part = mixed_match.group(1) | |
| time_part = mixed_match.group(2) | |
| # Convert YYYYMMDD to YYYY-MM-DD | |
| formatted_date = f"{date_part[:4]}-{date_part[4:6]}-{date_part[6:8]}" | |
| return datetime.fromisoformat(f"{formatted_date} {time_part}") | |
| # Fallback to standard ISO format parsing | |
| try: | |
| return datetime.fromisoformat(timestamp_str.replace('T', ' ')) | |
| except ValueError: | |
| raise ValueError(f"Unable to parse timestamp format: {timestamp_str}") | |
| def parse_tsv_data(tsv_file): | |
| """Parse TSV file and return DataFrame with parsed timestamps.""" | |
| df = pd.read_csv(tsv_file, sep='\t') | |
| df['timestamp'] = pd.to_datetime(df['timestamp']) | |
| return df | |
| def create_srt_content(df, data_columns=['p1', 'p2', 'p3'], video_start_time=None): | |
| """Convert DataFrame to SRT subtitle format.""" | |
| # If video_start_time is provided, filter data to start from that time | |
| if video_start_time: | |
| video_start = pd.to_datetime(video_start_time) | |
| # Filter data to only include timestamps at or after video start | |
| df = df[df['timestamp'] >= video_start].copy() | |
| if df.empty: | |
| return "" | |
| # Reset index for proper subtitle numbering | |
| df = df.reset_index(drop=True) | |
| srt_content = [] | |
| for i, row in df.iterrows(): | |
| # Calculate time offset from video start (or first timestamp if no video_start) | |
| if video_start_time: | |
| start_time = row['timestamp'] - pd.to_datetime(video_start_time) | |
| else: | |
| if i == 0: | |
| start_time = timedelta(0) | |
| else: | |
| start_time = row['timestamp'] - df.iloc[0]['timestamp'] | |
| # End time is start of next subtitle or +3 seconds for last one | |
| if i < len(df) - 1: | |
| if video_start_time: | |
| end_time = df.iloc[i + 1]['timestamp'] - pd.to_datetime(video_start_time) | |
| else: | |
| end_time = df.iloc[i + 1]['timestamp'] - df.iloc[0]['timestamp'] | |
| else: | |
| end_time = start_time + timedelta(seconds=3) | |
| # Skip subtitles with negative start times | |
| if start_time.total_seconds() < 0: | |
| continue | |
| # Format timestamps for SRT | |
| start_srt = format_srt_time(start_time) | |
| end_srt = format_srt_time(end_time) | |
| # Create subtitle text with data values | |
| values = [f"{col}: {row[col]:.2f}" for col in data_columns if col in df.columns] | |
| subtitle_text = " | ".join(values) | |
| srt_content.append(f"{len([x for x in srt_content if x.isdigit()]) + 1}") | |
| srt_content.append(f"{start_srt} --> {end_srt}") | |
| srt_content.append(subtitle_text) | |
| srt_content.append("") | |
| return "\n".join(srt_content) | |
| def format_srt_time(td): | |
| """Format timedelta to SRT time format (HH:MM:SS,mmm).""" | |
| total_seconds = int(td.total_seconds()) | |
| hours = total_seconds // 3600 | |
| minutes = (total_seconds % 3600) // 60 | |
| seconds = total_seconds % 60 | |
| milliseconds = int((td.total_seconds() - total_seconds) * 1000) | |
| return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" | |
| def get_video_info(video_file): | |
| """Get video duration, width, and height using ffprobe.""" | |
| # Try to get all info in one command with JSON output for better parsing | |
| info_cmd = [ | |
| 'ffprobe', '-v', 'quiet', '-print_format', 'json', | |
| '-show_format', '-show_streams', '-select_streams', 'v:0', video_file | |
| ] | |
| duration = None | |
| width = None | |
| height = None | |
| rotation = 0 | |
| try: | |
| import json | |
| result = subprocess.run(info_cmd, check=True, capture_output=True, text=True) | |
| data = json.loads(result.stdout) | |
| # Try to get duration from format first (most reliable) | |
| if 'format' in data and 'duration' in data['format']: | |
| try: | |
| duration = float(data['format']['duration']) | |
| except (ValueError, TypeError): | |
| pass | |
| # Try to get width/height from video stream | |
| if 'streams' in data and len(data['streams']) > 0: | |
| stream = data['streams'][0] | |
| if 'width' in stream and 'height' in stream: | |
| try: | |
| width = int(stream['width']) | |
| height = int(stream['height']) | |
| except (ValueError, TypeError): | |
| pass | |
| # Detect rotation metadata to compute display dimensions | |
| try: | |
| rot_candidates = [] | |
| if isinstance(stream.get('tags'), dict) and 'rotate' in stream['tags']: | |
| rot_candidates.append(int(str(stream['tags']['rotate']).strip())) | |
| if isinstance(stream.get('side_data_list'), list): | |
| for sd in stream['side_data_list']: | |
| if isinstance(sd, dict) and 'rotation' in sd: | |
| rot_candidates.append(int(str(sd['rotation']).strip())) | |
| if rot_candidates: | |
| rotation = rot_candidates[0] | |
| except Exception: | |
| pass | |
| # If duration not found in format, try stream duration | |
| if duration is None and 'duration' in stream: | |
| try: | |
| duration = float(stream['duration']) | |
| except (ValueError, TypeError): | |
| pass | |
| except (subprocess.CalledProcessError, json.JSONDecodeError, ImportError): | |
| # Fallback to original method if JSON parsing fails | |
| pass | |
| # If still no duration, try the simple format duration command | |
| if duration is None: | |
| format_cmd = [ | |
| 'ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', | |
| '-of', 'default=noprint_wrappers=1:nokey=1', video_file | |
| ] | |
| try: | |
| result = subprocess.run(format_cmd, check=True, capture_output=True, text=True) | |
| duration_str = result.stdout.strip() | |
| if duration_str and duration_str != 'N/A' and duration_str != '': | |
| duration = float(duration_str) | |
| except (subprocess.CalledProcessError, ValueError): | |
| pass | |
| # If still no width/height, try simple approach | |
| if width is None or height is None: | |
| size_cmd = [ | |
| 'ffprobe', '-v', 'quiet', '-select_streams', 'v:0', | |
| '-show_entries', 'stream=width,height', '-of', 'csv=s=x:p=0', video_file | |
| ] | |
| try: | |
| result = subprocess.run(size_cmd, check=True, capture_output=True, text=True) | |
| size_str = result.stdout.strip() | |
| if 'x' in size_str: | |
| w_str, h_str = size_str.split('x') | |
| width = int(w_str) | |
| height = int(h_str) | |
| except (subprocess.CalledProcessError, ValueError): | |
| pass | |
| # Adjust for display rotation (swap width/height when rotated 90/270) | |
| try: | |
| rot_norm = int(rotation) % 360 | |
| except Exception: | |
| rot_norm = 0 | |
| if rot_norm in (90, 270): | |
| if width is not None and height is not None: | |
| width, height = height, width | |
| # Debug output to see what we got | |
| click.echo(f"Debug: Detected duration={duration}, width={width}, height={height}, rotation={rot_norm}") | |
| return duration, width, height | |
| def get_video_duration(video_file): | |
| """Get video duration in seconds using ffprobe.""" | |
| duration, _, _ = get_video_info(video_file) | |
| return duration | |
| def create_single_plot_frame(args): | |
| """Create a single plot frame - designed for multiprocessing.""" | |
| (frame_num, df_filtered, plot_columns, video_start, fps, window_seconds_default, frames_dir, colors, plot_width, plot_height, plot_limit_y, plot_type, ui_scale, cycle_gap, cycle_helpers, integral_helpers, integral_factor, integral_unit) = args | |
| # Calculate current video time | |
| current_video_time = frame_num / fps | |
| current_timestamp = video_start + timedelta(seconds=current_video_time) | |
| # Define rolling window (show last N seconds of data) | |
| default_window_start = current_timestamp - timedelta(seconds=window_seconds_default) | |
| window_start = default_window_start | |
| window_end = current_timestamp | |
| # If cycle-based dynamic windowing is enabled, extend window to cycle start | |
| # A cycle starts at the first non-zero sample after the last stretch of | |
| # `cycle_gap` consecutive rows where all selected columns are zero. | |
| dynamic_window_start = None | |
| cycle_active = False | |
| cycle_start_idx = None | |
| total_label_drawn = False | |
| if cycle_gap and cycle_gap > 0 and cycle_helpers is not None: | |
| try: | |
| times = cycle_helpers['times'] | |
| break_times = cycle_helpers['break_times'] | |
| first_nonzero_after_break = cycle_helpers['first_nonzero_after_break'] | |
| first_nonzero_global_idx = cycle_helpers['first_nonzero_global_idx'] | |
| # Find the last break (end of a >= cycle_gap zero-run) at or before current time | |
| cur_np_time = np.datetime64(current_timestamp.to_pydatetime()) | |
| j = int(np.searchsorted(break_times, cur_np_time, side='right') - 1) | |
| if j >= 0: | |
| start_idx = first_nonzero_after_break[j] | |
| else: | |
| start_idx = first_nonzero_global_idx | |
| if start_idx is not None and times[start_idx] <= cur_np_time: | |
| dynamic_window_start = pd.to_datetime(str(times[start_idx])) | |
| cycle_active = True | |
| cycle_start_idx = start_idx | |
| # Extend the window to cover the entire ongoing cycle (but never shrink below default) | |
| if cycle_active and dynamic_window_start is not None and dynamic_window_start < default_window_start: | |
| window_start = dynamic_window_start | |
| except Exception: | |
| # Fallback silently to default window if any issue occurs | |
| pass | |
| # Get data within the rolling window | |
| mask = (df_filtered['timestamp'] >= window_start) & (df_filtered['timestamp'] <= window_end) | |
| window_data = df_filtered[mask].copy() | |
| # Create the plot with custom dimensions for video encoding | |
| fig, ax = plt.subplots(figsize=(plot_width, plot_height)) | |
| fig.patch.set_facecolor('black') | |
| ax.set_facecolor('black') | |
| if not window_data.empty: | |
| # Plot each column | |
| for i, col in enumerate(plot_columns): | |
| if col in window_data.columns: | |
| color = colors[i % len(colors)] | |
| if plot_type == 'step': | |
| ax.step(window_data['timestamp'], window_data[col], where='post', | |
| color=color, linewidth=4*ui_scale, label=col, alpha=0.8) | |
| else: | |
| ax.plot(window_data['timestamp'], window_data[col], | |
| color=color, linewidth=4*ui_scale, label=col, alpha=0.8) | |
| # Add current time indicator | |
| ax.axvline(x=current_timestamp, color='white', linestyle='--', alpha=0.7, linewidth=2*ui_scale) | |
| # Compute and display integrals | |
| if integral_helpers is not None: | |
| try: | |
| times_i = integral_helpers['times'] | |
| cum_area = integral_helpers['cum_area'] | |
| cur_np_time = np.datetime64(current_timestamp.to_pydatetime()) | |
| cur_idx = int(np.searchsorted(times_i, cur_np_time, side='right') - 1) | |
| if cur_idx < 0: | |
| total_auc = 0.0 | |
| else: | |
| total_auc = float(cum_area[cur_idx]) * integral_factor | |
| unit_suffix = f" {integral_unit}" if integral_unit else "" | |
| total_text = f"Total: {total_auc:.2f}{unit_suffix}" | |
| ax.text(0.01, 0.95, total_text, | |
| transform=ax.transAxes, | |
| color='cyan', | |
| fontsize=int(22*ui_scale), | |
| verticalalignment='top', | |
| horizontalalignment='left', | |
| bbox=dict(facecolor='black', alpha=0.3, edgecolor='cyan', boxstyle='round,pad=0.3')) | |
| total_label_drawn = True | |
| except Exception: | |
| pass | |
| # Show cycle duration indicator if cycle-based windowing is active | |
| if 'cycle_active' in locals() and cycle_active: | |
| duration_td = current_timestamp - (dynamic_window_start or current_timestamp) | |
| total_ms = int(duration_td.total_seconds() * 1000) | |
| secs = total_ms // 1000 | |
| ms = total_ms % 1000 | |
| cycle_text = f"Cycle {secs}:{ms:03d}" | |
| # If integral available, show cycle AUC next to time | |
| if integral_helpers is not None: | |
| try: | |
| times_i = integral_helpers['times'] | |
| cum_area = integral_helpers['cum_area'] | |
| cur_np_time = np.datetime64(current_timestamp.to_pydatetime()) | |
| cur_idx = int(np.searchsorted(times_i, cur_np_time, side='right') - 1) | |
| if cur_idx >= 0 and cycle_start_idx is not None and cur_idx >= cycle_start_idx: | |
| cycle_auc = (float(cum_area[cur_idx]) - float(cum_area[cycle_start_idx])) * integral_factor | |
| else: | |
| cycle_auc = 0.0 | |
| unit_suffix = f" {integral_unit}" if integral_unit else "" | |
| cycle_text += f" | AUC: {cycle_auc:.2f}{unit_suffix}" | |
| except Exception: | |
| pass | |
| cycle_x = 0.30 if 'total_label_drawn' in locals() and total_label_drawn else 0.01 | |
| ax.text(cycle_x, 0.95, cycle_text, | |
| transform=ax.transAxes, | |
| color='yellow', | |
| fontsize=int(22*ui_scale), | |
| verticalalignment='top', | |
| horizontalalignment='left', | |
| bbox=dict(facecolor='black', alpha=0.3, edgecolor='yellow', boxstyle='round,pad=0.3')) | |
| # Styling | |
| ax.set_xlabel('Time', color='white', fontsize=int(25*ui_scale)) | |
| ax.set_ylabel('Values', color='white', fontsize=int(25*ui_scale)) | |
| ax.tick_params(colors='white', labelsize=int(20*ui_scale)) | |
| # Only create legend if there are plot lines with data | |
| if not window_data.empty: | |
| ax.legend(loc='upper right', fontsize=int(20*ui_scale)) | |
| ax.grid(True, alpha=0.3) | |
| # Set consistent axis limits | |
| ax.set_xlim(window_start, window_end) | |
| # Set y-axis limits | |
| if plot_limit_y is not None: | |
| ax.set_ylim(0, plot_limit_y) | |
| elif not window_data.empty: | |
| y_values = [] | |
| for col in plot_columns: | |
| if col in window_data.columns: | |
| y_values.extend(window_data[col].values) | |
| if y_values: | |
| y_min, y_max = min(y_values), max(y_values) | |
| y_range = y_max - y_min | |
| ax.set_ylim(y_min - y_range * 0.1, y_max + y_range * 0.1) | |
| # Format x-axis | |
| ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S')) | |
| plt.xticks(rotation=45) | |
| plt.tight_layout() | |
| # Save frame with even dimensions | |
| frame_path = os.path.join(frames_dir, f'frame_{frame_num:06d}.png') | |
| plt.savefig(frame_path, facecolor='black', dpi=100) | |
| plt.close() | |
| return frame_num | |
| def create_dynamic_plot_frames(df, plot_columns, video_start_time, temp_dir, video_file, window_seconds=10, fps=30, plot_width_percent=100, plot_height_percent=10, plot_limit_y=None, plot_type='line', ui_scale=1.0, cycle_gap=0, integral_columns=None, integral_unit='', integral_factor=1.0): | |
| """Create multiple plot frames for dynamic overlay showing rolling window of data.""" | |
| video_start = pd.to_datetime(video_start_time) | |
| # Filter data to only include timestamps at or after video start | |
| df_filtered = df[df['timestamp'] >= video_start].copy() | |
| if df_filtered.empty: | |
| click.echo("Warning: No data found from video start time") | |
| return None | |
| # Get actual video info | |
| video_duration, video_width, video_height = get_video_info(video_file) | |
| if video_duration is None: | |
| click.echo("Warning: Could not determine video duration, using data duration") | |
| video_duration = (df_filtered['timestamp'].max() - video_start).total_seconds() | |
| if video_width is None or video_height is None: | |
| click.echo("Warning: Could not determine video dimensions, using defaults") | |
| video_width, video_height = 1920, 1080 | |
| # Calculate actual plot dimensions in pixels | |
| plot_width_pixels = int(video_width * plot_width_percent / 100) | |
| plot_height_pixels = int(video_height * plot_height_percent / 100) | |
| # Convert to matplotlib figure size (assuming 100 DPI) | |
| plot_width_fig = plot_width_pixels / 100 | |
| plot_height_fig = plot_height_pixels / 100 | |
| total_frames = int(video_duration * fps) | |
| colors = ['red', 'green', 'blue', 'yellow', 'cyan', 'magenta'] | |
| frames_dir = os.path.join(temp_dir, 'frames') | |
| os.makedirs(frames_dir, exist_ok=True) | |
| # Precompute cycle helpers if dynamic cycle windowing is enabled | |
| cycle_helpers = None | |
| if cycle_gap and cycle_gap > 0: | |
| try: | |
| # Compute boolean mask where all selected columns are exactly zero | |
| all_zero = np.ones(len(df_filtered), dtype=bool) | |
| for col in plot_columns: | |
| if col in df_filtered.columns: | |
| all_zero &= (df_filtered[col].values == 0) | |
| times = df_filtered['timestamp'].values.astype('datetime64[ns]') | |
| # Identify indices that end zero-runs with length >= cycle_gap | |
| break_indices = [] | |
| run_len = 0 | |
| for i, is_zero in enumerate(all_zero): | |
| if is_zero: | |
| run_len += 1 | |
| else: | |
| if run_len >= cycle_gap: | |
| break_indices.append(i - 1) # end index of the zero-run | |
| run_len = 0 | |
| # Handle trailing zero-run at the end | |
| if run_len >= cycle_gap and len(all_zero) > 0: | |
| break_indices.append(len(all_zero) - 1) | |
| break_times = times[break_indices] if break_indices else np.array([], dtype='datetime64[ns]') | |
| # Map each break to the first non-zero index after it | |
| first_nonzero_after_break = [] | |
| for bi in break_indices: | |
| k = bi + 1 | |
| while k < len(all_zero) and all_zero[k]: | |
| k += 1 | |
| first_nonzero_after_break.append(k if k < len(all_zero) else None) | |
| # Global first non-zero index (before any breaks) | |
| k = 0 | |
| while k < len(all_zero) and all_zero[k]: | |
| k += 1 | |
| first_nonzero_global_idx = k if k < len(all_zero) else None | |
| cycle_helpers = { | |
| 'times': times, | |
| 'break_times': break_times, | |
| 'break_indices': break_indices, | |
| 'first_nonzero_after_break': first_nonzero_after_break, | |
| 'first_nonzero_global_idx': first_nonzero_global_idx, | |
| } | |
| except Exception: | |
| cycle_helpers = None | |
| # Precompute integral helpers for AUC calculations | |
| integral_helpers = None | |
| try: | |
| cols_to_integrate = integral_columns if integral_columns else plot_columns | |
| valid_integral_cols = [c for c in cols_to_integrate if c in df_filtered.columns] | |
| if valid_integral_cols: | |
| y_sum = np.zeros(len(df_filtered), dtype=float) | |
| for c in valid_integral_cols: | |
| y_sum += pd.to_numeric(df_filtered[c], errors='coerce').fillna(0).values.astype(float) | |
| times_i = df_filtered['timestamp'].values.astype('datetime64[ns]') | |
| # Convert to seconds since first timestamp | |
| t_ns = times_i.astype('int64') | |
| t_secs = (t_ns - t_ns[0]) / 1e9 | |
| cum_area = np.zeros(len(y_sum), dtype=float) | |
| if len(y_sum) > 1: | |
| dt = np.diff(t_secs) | |
| y_mid = 0.5 * (y_sum[:-1] + y_sum[1:]) | |
| cum_area[1:] = np.cumsum(y_mid * dt) | |
| integral_helpers = {'times': times_i, 't_secs': t_secs, 'cum_area': cum_area} | |
| except Exception: | |
| integral_helpers = None | |
| # Calculate number of processes to use (90% of available cores) | |
| num_cores = mp.cpu_count() | |
| num_processes = max(1, int(num_cores * 0.9)) | |
| click.echo(f"Video dimensions: {video_width}x{video_height}") | |
| click.echo(f"Plot size: {plot_width_pixels}x{plot_height_pixels} pixels ({plot_width_percent}% x {plot_height_percent}%)") | |
| click.echo(f"Generating {total_frames} plot frames for {video_duration:.1f}s video using {num_processes} processes...") | |
| # Prepare arguments for multiprocessing | |
| frame_args = [ | |
| (frame_num, df_filtered, plot_columns, video_start, fps, window_seconds, frames_dir, colors, plot_width_fig, plot_height_fig, plot_limit_y, plot_type, ui_scale, cycle_gap, cycle_helpers, integral_helpers, integral_factor, integral_unit) | |
| for frame_num in range(total_frames) | |
| ] | |
| # Use multiprocessing to create frames in parallel | |
| with mp.Pool(processes=num_processes) as pool: | |
| # Use imap to get progress updates | |
| completed_frames = 0 | |
| for result in pool.imap(create_single_plot_frame, frame_args): | |
| completed_frames += 1 | |
| if completed_frames % (total_frames // 10) == 0 or completed_frames == total_frames: | |
| progress = (completed_frames / total_frames) * 100 | |
| click.echo(f"Progress: {progress:.0f}% ({completed_frames}/{total_frames} frames)") | |
| return frames_dir | |
| def detect_hardware_encoder(): | |
| """Detect available hardware encoders and return the best option.""" | |
| # Test for AMD AMF encoder (Ryzen 7000 series integrated graphics) | |
| test_cmd = ['ffmpeg', '-hide_banner', '-f', 'lavfi', '-i', 'testsrc=duration=1:size=320x240:rate=1', | |
| '-c:v', 'h264_amf', '-t', '1', '-f', 'null', '-'] | |
| try: | |
| subprocess.run(test_cmd, check=True, capture_output=True, text=True) | |
| return 'h264_amf', 'AMD AMF' | |
| except subprocess.CalledProcessError: | |
| pass | |
| # Test for NVIDIA NVENC | |
| test_cmd = ['ffmpeg', '-hide_banner', '-f', 'lavfi', '-i', 'testsrc=duration=1:size=320x240:rate=1', | |
| '-c:v', 'h264_nvenc', '-t', '1', '-f', 'null', '-'] | |
| try: | |
| subprocess.run(test_cmd, check=True, capture_output=True, text=True) | |
| return 'h264_nvenc', 'NVIDIA NVENC' | |
| except subprocess.CalledProcessError: | |
| pass | |
| # Test for Intel QuickSync | |
| test_cmd = ['ffmpeg', '-hide_banner', '-f', 'lavfi', '-i', 'testsrc=duration=1:size=320x240:rate=1', | |
| '-c:v', 'h264_qsv', '-t', '1', '-f', 'null', '-'] | |
| try: | |
| subprocess.run(test_cmd, check=True, capture_output=True, text=True) | |
| return 'h264_qsv', 'Intel QuickSync' | |
| except subprocess.CalledProcessError: | |
| pass | |
| # Fallback to software encoding | |
| return 'libx264', 'Software (libx264)' | |
| def burn_subtitles_to_video(input_video, srt_file, output_video): | |
| """Use ffmpeg to burn subtitles into video.""" | |
| # Detect best available encoder | |
| encoder, encoder_name = detect_hardware_encoder() | |
| click.echo(f"Using encoder for subtitles: {encoder_name}") | |
| cmd = [ | |
| 'ffmpeg', '-i', input_video, | |
| '-vf', f'subtitles={srt_file}', | |
| '-c:v', encoder, '-c:a', 'copy' | |
| ] | |
| # Add encoder-specific options | |
| if encoder == 'h264_amf': | |
| cmd.extend(['-quality', 'balanced', '-rc', 'cqp', '-qp_i', '20', '-qp_p', '20']) | |
| elif encoder == 'h264_nvenc': | |
| cmd.extend(['-preset', 'medium', '-cq', '20']) | |
| elif encoder == 'h264_qsv': | |
| cmd.extend(['-preset', 'medium', '-global_quality', '20']) | |
| else: # libx264 | |
| cmd.extend(['-preset', 'medium', '-crf', '20']) | |
| cmd.extend(['-y', output_video]) | |
| try: | |
| subprocess.run(cmd, check=True, text=True) | |
| click.echo(f"Successfully created video with subtitles: {output_video}") | |
| except subprocess.CalledProcessError as e: | |
| click.echo(f"Error burning subtitles") | |
| raise | |
| def burn_dynamic_plot_to_video(input_video, frames_dir, output_video, fps=30, plot_width_percent=100, plot_height_percent=10): | |
| """Use ffmpeg to overlay dynamic plot frames on video.""" | |
| # Detect best available encoder | |
| encoder, encoder_name = detect_hardware_encoder() | |
| click.echo(f"Using encoder: {encoder_name}") | |
| # Create video from plot frames | |
| plot_video = os.path.join(os.path.dirname(frames_dir), 'plot_video.mp4') | |
| # First, create a video from the plot frames with proper scaling | |
| frames_cmd = [ | |
| 'ffmpeg', '-r', str(fps), '-i', os.path.join(frames_dir, 'frame_%06d.png'), | |
| '-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2', | |
| '-c:v', encoder, '-pix_fmt', 'yuv420p' | |
| ] | |
| # Add encoder-specific options | |
| if encoder == 'h264_amf': | |
| frames_cmd.extend(['-quality', 'speed', '-rc', 'cqp', '-qp_i', '23', '-qp_p', '23']) | |
| elif encoder == 'h264_nvenc': | |
| frames_cmd.extend(['-preset', 'fast', '-cq', '23']) | |
| elif encoder == 'h264_qsv': | |
| frames_cmd.extend(['-preset', 'fast', '-global_quality', '23']) | |
| else: # libx264 | |
| frames_cmd.extend(['-preset', 'fast', '-crf', '23']) | |
| frames_cmd.extend(['-y', plot_video]) | |
| try: | |
| subprocess.run(frames_cmd, check=True, text=True) | |
| click.echo("Created plot video from frames") | |
| except subprocess.CalledProcessError as e: | |
| click.echo(f"Error creating plot video") | |
| raise | |
| # Then overlay the plot video on the main video with custom sizing | |
| overlay_cmd = [ | |
| 'ffmpeg', '-i', input_video, '-i', plot_video, | |
| '-filter_complex', '[0:v][1:v]overlay=10:main_h-overlay_h-10', | |
| '-c:v', encoder, '-c:a', 'copy', '-shortest' | |
| ] | |
| # Add encoder-specific options for final output | |
| if encoder == 'h264_amf': | |
| overlay_cmd.extend(['-quality', 'balanced', '-rc', 'cqp', '-qp_i', '20', '-qp_p', '20']) | |
| elif encoder == 'h264_nvenc': | |
| overlay_cmd.extend(['-preset', 'medium', '-cq', '20']) | |
| elif encoder == 'h264_qsv': | |
| overlay_cmd.extend(['-preset', 'medium', '-global_quality', '20']) | |
| else: # libx264 | |
| overlay_cmd.extend(['-preset', 'medium', '-crf', '20']) | |
| overlay_cmd.extend(['-y', output_video]) | |
| try: | |
| subprocess.run(overlay_cmd, check=True, text=True) | |
| click.echo(f"Successfully created video with dynamic plot overlay: {output_video}") | |
| except subprocess.CalledProcessError as e: | |
| click.echo(f"Error overlaying dynamic plot") | |
| raise | |
| finally: | |
| # Clean up temporary plot video | |
| if os.path.exists(plot_video): | |
| os.remove(plot_video) | |
| @click.command() | |
| @click.option('--input-tsv', required=True, help='Input TSV file with data') | |
| @click.option('--output-srt', help='Output SRT subtitle file') | |
| @click.option('--input-video', help='Input video file') | |
| @click.option('--output-video', help='Output video file with embedded data') | |
| @click.option('--video-start', help='Video start timestamp (formats: 20250914T131912, 20250914T13:19:12, or 2025-09-14T13:19:12)') | |
| @click.option('--plot-columns', help='Comma-separated column names to plot (e.g., p1,p2,p3)') | |
| @click.option('--plot-window-duration', type=int, default=10, help='Duration of data shown in real-time overlay plot in seconds (default: 10)') | |
| @click.option('--plot-width', type=int, default=100, help='Plot window width as percentage of video width (default: 100)') | |
| @click.option('--plot-height', type=int, default=10, help='Plot window height as percentage of video height (default: 10)') | |
| @click.option('--plot-limit-y', type=float, help='Static Y-axis max value; set fixed range from 0 to this value for the plot') | |
| @click.option('--plot-type', type=click.Choice(['line', 'step']), default='line', show_default=True, help='Plot style: line or step') | |
| @click.option('--ui-scale', type=float, default=1.0, help='Scale UI elements (lines, fonts, ticks) by this factor (e.g., 1.5 for mobile)') | |
| @click.option('--cycle-gap', type=int, default=0, help='Number of consecutive zero-value rows across all selected columns that breaks a single cycle (0 disables cycle-based window)') | |
| @click.option('--integral', default='all', help='Columns to integrate: comma-separated list like --plot-columns or "all" to sum all selected plot columns (default)') | |
| @click.option('--integral-unit', default='', help='Unit label appended to AUC values') | |
| @click.option('--integral-convert', type=click.Choice(['Wh', 'kWh'], case_sensitive=False), help='Convert computed integral (J) to Wh or kWh before display') | |
| def main(input_tsv, output_srt, input_video, output_video, video_start, plot_columns, plot_window_duration, plot_width, plot_height, plot_limit_y, plot_type, ui_scale, cycle_gap, integral, integral_unit, integral_convert): | |
| """Convert TSV data to SRT subtitles or burn into video as captions/plots.""" | |
| # Validate input file | |
| if not Path(input_tsv).exists(): | |
| click.echo(f"Error: Input TSV file '{input_tsv}' not found") | |
| return | |
| # Parse TSV data | |
| try: | |
| df = parse_tsv_data(input_tsv) | |
| click.echo(f"Loaded {len(df)} rows from {input_tsv}") | |
| except Exception as e: | |
| click.echo(f"Error parsing TSV file: {e}") | |
| return | |
| # Determine data columns to use | |
| data_columns = ['p1', 'p2', 'p3'] # Default columns | |
| if plot_columns: | |
| data_columns = [col.strip() for col in plot_columns.split(',')] | |
| # Validate columns exist | |
| missing_cols = [col for col in data_columns if col not in df.columns] | |
| if missing_cols: | |
| click.echo(f"Warning: Columns not found in data: {missing_cols}") | |
| data_columns = [col for col in data_columns if col in df.columns] | |
| if not data_columns: | |
| click.echo("Error: No valid data columns found") | |
| return | |
| # Determine columns to integrate for AUC | |
| if (integral or '').strip().lower() == 'all': | |
| integral_columns = data_columns.copy() | |
| else: | |
| integral_columns = [c.strip() for c in integral.split(',')] if integral else data_columns.copy() | |
| missing_integral = [c for c in integral_columns if c not in df.columns] | |
| if missing_integral: | |
| click.echo(f"Warning: Integral columns not found in data: {missing_integral}") | |
| integral_columns = [c for c in integral_columns if c in df.columns] | |
| if not integral_columns: | |
| click.echo("Warning: No valid integral columns; defaulting to plotted columns") | |
| integral_columns = data_columns.copy() | |
| # Validate UI scale | |
| if ui_scale <= 0: | |
| click.echo("Error: --ui-scale must be greater than 0") | |
| return | |
| # Validate cycle gap | |
| if cycle_gap < 0: | |
| click.echo("Error: --cycle-gap must be >= 0") | |
| return | |
| # Apply integral conversion factor and default unit if needed | |
| integral_factor = 1.0 | |
| if integral_convert: | |
| if integral_convert.lower() == 'wh': | |
| integral_factor = 1.0 / 3600.0 | |
| elif integral_convert.lower() == 'kwh': | |
| integral_factor = 1.0 / 3600000.0 | |
| if not integral_unit: | |
| integral_unit = integral_convert or '' | |
| # Parse video start time if provided | |
| video_start_dt = None | |
| if video_start: | |
| try: | |
| video_start_dt = parse_isotime_format(video_start) | |
| except ValueError as e: | |
| click.echo(f"Error: Invalid video-start format. {e}") | |
| click.echo("Supported formats: 20250914T161319, 20250914T13:19:12, or 2025-09-14T13:19:12") | |
| return | |
| # Generate SRT content | |
| srt_content = create_srt_content(df, data_columns, video_start_dt) | |
| # If only SRT output requested | |
| if output_srt and not input_video: | |
| with open(output_srt, 'w') as f: | |
| f.write(srt_content) | |
| click.echo(f"SRT file created: {output_srt}") | |
| return | |
| # Video processing | |
| if input_video: | |
| if not Path(input_video).exists(): | |
| click.echo(f"Error: Input video file '{input_video}' not found") | |
| return | |
| if not output_video: | |
| click.echo("Error: --output-video required when --input-video is specified") | |
| return | |
| if not video_start: | |
| click.echo("Error: --video-start required when processing video") | |
| return | |
| # Create temporary files | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| if plot_columns: | |
| # Create dynamic plot overlay | |
| frames_dir = create_dynamic_plot_frames(df, data_columns, video_start_dt, temp_dir, input_video, | |
| window_seconds=plot_window_duration, plot_width_percent=plot_width, plot_height_percent=plot_height, plot_limit_y=plot_limit_y, plot_type=plot_type, ui_scale=ui_scale, cycle_gap=cycle_gap, integral_columns=integral_columns, integral_unit=integral_unit, integral_factor=integral_factor) | |
| if frames_dir: | |
| burn_dynamic_plot_to_video(input_video, frames_dir, output_video, plot_width_percent=plot_width, plot_height_percent=plot_height) | |
| else: | |
| click.echo("Failed to create plot frames") | |
| return | |
| else: | |
| # Create subtitle overlay (srt_content already filtered by video_start_dt) | |
| srt_path = os.path.join(temp_dir, 'subtitles.srt') | |
| with open(srt_path, 'w') as f: | |
| f.write(srt_content) | |
| burn_subtitles_to_video(input_video, srt_path, output_video) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment