Skip to content

Instantly share code, notes, and snippets.

@stephendolan
Last active February 17, 2026 22:06
Show Gist options
  • Select an option

  • Save stephendolan/8fa6ec68fc34abc90263f41c826afafa to your computer and use it in GitHub Desktop.

Select an option

Save stephendolan/8fa6ec68fc34abc90263f41c826afafa to your computer and use it in GitHub Desktop.
transcribe-call-ended
#!/usr/bin/env python3
"""
Tuple Trigger: Transcribe Call
Fires on call-recording-complete. Reads WAV files and Events.txt from the
artifacts directory, transcribes audio with whisper-cpp, and writes Summary.md.
Requires:
1. brew install whisper-cpp
2. Download a GGML model: curl -L -o ~/.local/share/whisper-cpp/models/ggml-large-v3.bin \
https://huggingface.co/ggerganov/whisper.cpp/resolve/main/ggml-large-v3.bin
Enable recording on staging:
defaults write app.tuple.staging callRecording true
Environment variables (from Tuple):
TUPLE_TRIGGER_CALL_ARTIFACTS_DIRECTORY - Path to the call artifacts directory
Optional environment variables:
WHISPER_MODEL - Path to a GGML model file (overrides default)
TUPLE_CALL_RECORDING_TRANSCRIPTION_WEBHOOK_URL - POST the summary here after writing (works with Slack incoming webhooks)
"""
import json
import os
import re
import subprocess
import sys
import tempfile
import urllib.request
from datetime import datetime, timedelta
from pathlib import Path
# --- Configuration ---
DEFAULT_MODEL = os.path.expanduser(
"~/.local/share/whisper-cpp/models/ggml-large-v3.bin"
)
WHISPER_CLI = "whisper-cli"
# Skip WAV files smaller than this. At 48kHz mono Int16, 50KB ≈ 0.5 seconds.
# Shorter clips produce whisper hallucinations ("Thank you", etc).
MIN_WAV_BYTES = 50_000
# --- Parsing ---
def parse_wav_filename(filename):
"""Extract user ID and start timestamp from a WAV filename.
Format: User{id}@{yyyy-MM-dd_HH.mm.ss.SSS}.wav
"""
match = re.match(
r"User(\d+)@(\d{4}-\d{2}-\d{2}_\d{2}\.\d{2}\.\d{2}\.\d{3})", filename
)
if not match:
return None, None
user_id = match.group(1)
dt = datetime.strptime(match.group(2), "%Y-%m-%d_%H.%M.%S.%f")
return user_id, dt
def parse_event_line(line):
"""Parse an Events.txt line -> (datetime, message) or (None, None)."""
match = re.match(
r"\[(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] (.+)", line.strip()
)
if not match:
return None, None
dt = datetime.strptime(match.group(1), "%Y/%m/%d %H:%M:%S.%f")
return dt, match.group(2)
# --- Data Loading ---
def load_events(events_path):
"""Load categorized events and extract participant names."""
events = []
user_names = {}
user_event_types = {
"This user is:": "self_identified",
"Peer already on call:": "peer_present",
"Peer joined call:": "peer_joined",
"Peer left call:": "peer_left",
}
with open(events_path) as f:
for line in f:
dt, message = parse_event_line(line)
if dt is None:
continue
if "Call joined" in message:
call_id_match = re.search(r"id = (.+)", message)
events.append((dt, "call_start", {
"call_id": call_id_match.group(1) if call_id_match else "unknown"
}))
elif "Call ended" in message:
events.append((dt, "call_end", {}))
else:
user_match = re.search(
r'UserID (\d+), "([^"]+)" <([^>]+)>', message
)
if not user_match:
continue
for prefix, event_type in user_event_types.items():
if prefix in message:
uid, name, email = user_match.group(1, 2, 3)
user_data = {"user_id": uid, "name": name, "email": email}
if event_type != "peer_left":
user_names[uid] = name
events.append((dt, event_type, user_data))
break
return events, user_names
def transcribe_wav(wav_path, model_path, output_dir):
"""Run whisper-cli on a single WAV file, return the JSON transcription path."""
stem = Path(wav_path).stem
output_base = os.path.join(output_dir, stem)
result = subprocess.run(
[
WHISPER_CLI,
"-m", model_path,
"-l", "en",
"-np",
"-oj",
"-of", output_base,
wav_path,
],
capture_output=True,
text=True,
)
if result.returncode != 0:
print(f" Warning: whisper-cli failed on {Path(wav_path).name}: {result.stderr.strip()}", file=sys.stderr)
return None
json_path = output_base + ".json"
return json_path if os.path.exists(json_path) else None
def load_transcription(json_path):
"""Load a single whisper JSON and convert to absolute-timestamped segments."""
user_id, file_start_dt = parse_wav_filename(Path(json_path).stem)
if user_id is None:
return []
with open(json_path) as f:
data = json.load(f)
segments = []
for entry in data.get("transcription", []):
text = entry["text"].strip()
if not text:
continue
offset_ms = entry["offsets"]["from"]
abs_dt = file_start_dt + timedelta(milliseconds=offset_ms)
segments.append((abs_dt, "speech", {"user_id": user_id, "text": text}))
return segments
# --- Markdown Generation ---
def first_name(full_name):
return full_name.split()[0]
def format_time(dt):
return dt.strftime("%H:%M:%S")
def generate_markdown(events, transcriptions, user_names):
all_entries = sorted(events + transcriptions, key=lambda x: x[0])
lines = []
if all_entries:
call_date = all_entries[0][0].strftime("%Y-%m-%d")
call_time = all_entries[0][0].strftime("%H:%M")
else:
call_date = call_time = "Unknown"
lines.append(f"# Call Summary - {call_date} {call_time}")
lines.append("")
if user_names:
lines.append("## Participants")
lines.append("")
for uid, name in sorted(user_names.items(), key=lambda x: x[1]):
lines.append(f"- {name}")
lines.append("")
lines.append("## Timeline")
lines.append("")
peers_at_start = {
data["user_id"]
for _, event_type, data in all_entries
if event_type == "peer_present"
}
last_speaker = None
prev_was_event = False
for dt, event_type, data in all_entries:
time_str = format_time(dt)
if event_type == "peer_joined" and data["user_id"] in peers_at_start:
continue
if event_type in ("call_start", "call_end", "self_identified",
"peer_present", "peer_joined", "peer_left"):
if not prev_was_event and lines and lines[-1] != "":
lines.append("")
label = {
"call_start": "Call started",
"call_end": "Call ended",
"self_identified": f"{data.get('name', '?')} joined",
"peer_present": f"{data.get('name', '?')} was already on the call",
"peer_joined": f"{data.get('name', '?')} joined",
"peer_left": f"{data.get('name', '?')} left",
}[event_type]
lines.append(f"*{time_str} -- {label}*")
prev_was_event = True
last_speaker = None
elif event_type == "speech":
if prev_was_event:
lines.append("")
speaker_name = user_names.get(data["user_id"], f"User{data['user_id']}")
speaker_first = first_name(speaker_name)
text = data["text"]
if speaker_first == last_speaker:
lines[-1] += " " + text
else:
lines.append(f"**{speaker_first}** [{time_str}]: {text}")
last_speaker = speaker_first
prev_was_event = False
return "\n".join(lines) + "\n"
# --- Main ---
def main():
recording_dir = os.environ.get("TUPLE_TRIGGER_CALL_ARTIFACTS_DIRECTORY")
if not recording_dir:
print("Error: TUPLE_TRIGGER_CALL_ARTIFACTS_DIRECTORY not set", file=sys.stderr)
sys.exit(1)
if not os.path.isdir(recording_dir):
print(f"Error: Recording directory not found: {recording_dir}", file=sys.stderr)
sys.exit(1)
model_path = os.environ.get("WHISPER_MODEL", DEFAULT_MODEL)
if not os.path.exists(model_path):
print(f"Error: Whisper model not found: {model_path}", file=sys.stderr)
print("Install with: brew install whisper-cpp", file=sys.stderr)
print(f"Download model to: {model_path}", file=sys.stderr)
sys.exit(1)
events_path = os.path.join(recording_dir, "Events.txt")
if not os.path.exists(events_path):
print(f"Error: Events.txt not found in {recording_dir}", file=sys.stderr)
sys.exit(1)
# Discover WAV files, filtering out clips too short to contain speech
all_wavs = sorted(Path(recording_dir).glob("*.wav"))
wav_files = [w for w in all_wavs if w.stat().st_size >= MIN_WAV_BYTES]
skipped = len(all_wavs) - len(wav_files)
if not wav_files:
print("No WAV files found, nothing to transcribe.", file=sys.stderr)
sys.exit(0)
print(f"Transcribing {len(wav_files)} audio segments ({skipped} skipped < {MIN_WAV_BYTES // 1000}KB)...")
# Transcribe into a temp directory
with tempfile.TemporaryDirectory(prefix="tuple-transcribe-") as tmp_dir:
all_segments = []
for i, wav in enumerate(wav_files, 1):
print(f" [{i}/{len(wav_files)}] {wav.name}")
json_path = transcribe_wav(str(wav), model_path, tmp_dir)
if json_path:
all_segments.extend(load_transcription(json_path))
# Load events
events, user_names = load_events(events_path)
# Generate markdown
markdown = generate_markdown(events, all_segments, user_names)
output_path = os.path.join(recording_dir, "Summary.md")
with open(output_path, "w") as f:
f.write(markdown)
print(f"Written to {output_path}")
print(f" {len(events)} events, {len(all_segments)} speech segments")
# POST to webhook if configured
webhook_url = os.environ.get("TUPLE_CALL_RECORDING_TRANSCRIPTION_WEBHOOK_URL")
if webhook_url:
payload = json.dumps({"text": markdown}).encode()
req = urllib.request.Request(
webhook_url,
data=payload,
headers={"Content-Type": "application/json"},
)
try:
urllib.request.urlopen(req)
print(f"Posted summary to webhook")
except Exception as e:
print(f"Warning: webhook POST failed: {e}", file=sys.stderr)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment