Skip to content

Instantly share code, notes, and snippets.

@provencher
Last active January 29, 2026 09:55
Show Gist options
  • Select an option

  • Save provencher/5f76fe0472fbe8be01ff40f8337dd626 to your computer and use it in GitHub Desktop.

Select an option

Save provencher/5f76fe0472fbe8be01ff40f8337dd626 to your computer and use it in GitHub Desktop.
Sentry Issue triage + RP-CLI planning
#!/usr/bin/env python3
"""
Sentry Triage Script - Template
================================
A script for triaging Sentry issues and optionally generating fix plans
using AI-assisted code analysis.
Usage: ./sentry-triage-template.py [version...] [--output-dir DIR] [--plan] [--plan-only]
Examples:
./sentry-triage-template.py 1.6.4 # Fetch stacktraces only
./sentry-triage-template.py 1.6.4 --plan # Fetch + generate fix plans
./sentry-triage-template.py 1.6.4 --plan-only # Generate plans for existing stacktraces
./sentry-triage-template.py 1.6.4 --output-dir ./out
Setup:
1. Install sentry-cli: brew install getsentry/tools/sentry-cli
2. Authenticate: sentry-cli login
3. Update the Configuration section below with your org/project details
"""
import argparse
import concurrent.futures
import json
import os
import re
import subprocess
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
# =============================================================================
# CONFIGURATION - Update these for your project
# =============================================================================
# Your Sentry organization slug (from your Sentry URL: https://YOUR_ORG.sentry.io)
ORG = "your-sentry-org"
# Your Sentry project slug
PROJECT = "your-project"
# Your app's release identifier prefix (e.g., bundle ID for iOS/macOS apps)
# Examples: "com.company.appname@", "your-app@"
RELEASE_PREFIX = "com.example.yourapp@"
# Delay between parallel plan invocations (to avoid rate limits)
PLAN_DELAY_SECONDS = 3
@dataclass
class Issue:
id: str
short_id: str
title: str
level: str
status: str
count: int = 0
user_count: int = 0
release: str = ""
def get_token() -> str:
"""Read Sentry auth token from ~/.sentryclirc"""
config_path = Path.home() / ".sentryclirc"
if not config_path.exists():
print("Error: ~/.sentryclirc not found. Run: sentry-cli login", file=sys.stderr)
sys.exit(1)
with open(config_path) as f:
for line in f:
if line.startswith("token="):
return line.strip().split("=", 1)[1]
print("Error: No token found in ~/.sentryclirc", file=sys.stderr)
sys.exit(1)
def api_get(token: str, endpoint: str) -> dict:
"""Make a GET request to Sentry API"""
import urllib.request
url = f"https://sentry.io/api/0/{endpoint}"
req = urllib.request.Request(url, headers={"Authorization": f"Bearer {token}"})
try:
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read().decode())
except Exception as e:
print(f"API error for {endpoint}: {e}", file=sys.stderr)
return {}
def get_releases(token: str) -> dict[str, str]:
"""Get mapping of version -> full release identifier"""
result = subprocess.run(
["sentry-cli", "releases", "list", "--org", ORG, "--project", PROJECT],
capture_output=True, text=True
)
releases = {}
for line in result.stdout.split("\n"):
# Parse: | (unreleased) | com.pvncher.repoprompt@1.6.4+258 | ...
match = re.search(r'com\.pvncher\.repoprompt@([\d.]+)\+(\d+)', line)
if match:
version = match.group(1)
full_id = f"{RELEASE_PREFIX}{version}+{match.group(2)}"
releases[version] = full_id
return releases
def get_issues_for_release(release_id: str) -> list[Issue]:
"""Get all issues for a specific release"""
result = subprocess.run(
["sentry-cli", "issues", "list", "--org", ORG, "--project", PROJECT,
"--all", "--query", f"release:{release_id}"],
capture_output=True, text=True
)
issues = []
for line in result.stdout.split("\n"):
# Parse table rows: | ID | SHORT_ID | TITLE | ... | STATUS | LEVEL |
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 7 and parts[1].isdigit():
issues.append(Issue(
id=parts[1],
short_id=parts[2],
title=parts[3][:100],
status=parts[5],
level=parts[6],
release=release_id
))
return issues
def get_issue_stats(token: str, issue_id: str) -> tuple[int, int]:
"""Get event count and user count for an issue"""
data = api_get(token, f"organizations/{ORG}/issues/{issue_id}/")
return data.get("count", 0), data.get("userCount", 0)
def format_crash_stacktrace(data: dict, issue: Issue) -> str:
"""Format crash stacktrace as markdown"""
lines = [
f"# {issue.short_id} - Crash",
f"# Events: {issue.count} | Users: {issue.user_count}",
"",
]
# Get exception data
entries = data.get("entries", [])
exc_entry = next((e for e in entries if e.get("type") == "exception"), None)
if not exc_entry:
lines.append("No exception data found")
return "\n".join(lines)
exc = exc_entry.get("data", {}).get("values", [{}])[0]
lines.extend([
"## Exception",
f"Type: {exc.get('type', '?')}",
f"Value: {exc.get('value', '?')}",
])
# Mechanism info
mech = exc.get("mechanism", {})
if mech:
lines.append(f"Mechanism: {mech.get('type', '?')}")
meta = mech.get("meta", {})
if meta.get("signal"):
sig = meta["signal"]
lines.append(f"Signal: {sig.get('name')} ({sig.get('number')})")
lines.extend(["", "## Full Stacktrace", ""])
# Stacktrace
frames = exc.get("stacktrace", {}).get("frames", [])
for i, f in enumerate(reversed(frames)):
pkg = f.get("package", "").split("/")[-1] if f.get("package") else "?"
fn = f.get("function", "?")
inapp = "[APP]" if f.get("inApp") else " "
file = f.get("filename") or ""
line_no = f.get("lineNo") or ""
loc = f" ({file}:{line_no})" if file else ""
addr = f.get("instructionAddr", "")
lines.append(f"{i:3d} {inapp} {pkg}")
lines.append(f" {fn}{loc}")
if addr:
lines.append(f" addr: {addr}")
lines.append("")
return "\n".join(lines)
def format_hang_stacktrace(data: dict, issue: Issue) -> str:
"""Format hang stacktrace (all threads) as markdown"""
lines = [
f"# {issue.short_id} - App Hang",
f"# Events: {issue.count} | Users: {issue.user_count}",
"",
"## Event Info",
f"Event ID: {data.get('eventID', '?')}",
f"Timestamp: {data.get('dateCreated', '?')}",
]
# Tags
tags = {t["key"]: t["value"] for t in data.get("tags", [])}
lines.append(f"Release: {tags.get('release', '?')}")
lines.append(f"OS: {tags.get('os', '?')}")
lines.append("")
# Find threads entry
entries = data.get("entries", [])
threads_entry = next((e for e in entries if e.get("type") == "threads"), None)
if not threads_entry:
lines.append("No thread data found")
return "\n".join(lines)
threads = threads_entry.get("data", {}).get("values", [])
lines.append(f"## Threads ({len(threads)} total)")
lines.append("")
for t in threads:
crashed = t.get("crashed")
current = t.get("current")
marker = ""
if crashed:
marker = " [CRASHED]"
elif current:
marker = " [CURRENT/HUNG]"
lines.append(f"### Thread {t.get('id')}{marker}")
lines.append(f"Name: {t.get('name', '(unnamed)')}")
lines.append("")
st = t.get("stacktrace")
if not st:
lines.append("(no stacktrace)")
lines.append("")
continue
frames = st.get("frames", [])
if not frames:
lines.append("(no frames)")
lines.append("")
continue
for i, f in enumerate(reversed(frames)):
pkg = f.get("package", "").split("/")[-1] if f.get("package") else "?"
fn = f.get("function", "?")
inapp = "[APP]" if f.get("inApp") else " "
file = f.get("filename") or ""
line_no = f.get("lineNo") or ""
loc = f" ({file}:{line_no})" if file else ""
lines.append(f"{i:3d} {inapp} {pkg}: {fn}{loc}")
lines.append("")
return "\n".join(lines)
def generate_summary(all_issues: dict[str, list[Issue]], output_dir: Path) -> str:
"""Generate summary triage report"""
lines = [
"# Sentry Triage Report",
"",
f"**Generated**: {subprocess.run(['date', '+%Y-%m-%d'], capture_output=True, text=True).stdout.strip()}",
"",
"## Summary by Version",
"",
"| Version | Total | Crashes | Hangs | Other |",
"|---------|-------|---------|-------|-------|",
]
for version, issues in sorted(all_issues.items()):
crashes = len([i for i in issues if i.level == "fatal"])
hangs = len([i for i in issues if "hanging" in i.title.lower()])
other = len(issues) - crashes - hangs
lines.append(f"| {version} | {len(issues)} | {crashes} | {hangs} | {other} |")
lines.extend(["", "## Crashes (Fatal)", ""])
crashes = []
for version, issues in all_issues.items():
for i in issues:
if i.level == "fatal":
crashes.append((version, i))
if crashes:
lines.append("| Issue | Version | Events | Users | Title |")
lines.append("|-------|---------|--------|-------|-------|")
for version, i in crashes:
lines.append(f"| {i.short_id} | {version} | {i.count} | {i.user_count} | {i.title[:50]} |")
else:
lines.append("No crashes found.")
lines.extend(["", "## High-Impact Hangs", ""])
hangs = []
for version, issues in all_issues.items():
for i in issues:
if "hanging" in i.title.lower() and i.count > 10:
hangs.append((version, i))
hangs.sort(key=lambda x: x[1].count, reverse=True)
if hangs:
lines.append("| Issue | Version | Events | Users | Title |")
lines.append("|-------|---------|--------|-------|-------|")
for version, i in hangs[:20]: # Top 20
lines.append(f"| {i.short_id} | {version} | {i.count} | {i.user_count} | {i.title[:50]} |")
else:
lines.append("No high-impact hangs found.")
lines.extend([
"",
"## Stacktrace Files",
"",
"See individual `REPOPROMPT-*-stacktrace.md` files for full details.",
"",
"## Sentry Links",
"",
])
for version, issues in all_issues.items():
for i in issues:
if i.level == "fatal" or i.count > 100:
lines.append(f"- [{i.short_id}](https://{ORG}.sentry.io/issues/{i.id}/)")
return "\n".join(lines)
# ============================================================================
# Plan Generation via rp-cli
# ============================================================================
def get_rp_window_id() -> Optional[str]:
"""
Get the RepoPrompt window ID, or empty string for single-window mode.
Returns None if RepoPrompt is not running.
"""
result = subprocess.run(
["rp-cli", "-e", "windows"],
capture_output=True, text=True
)
# Check if RepoPrompt is running
if result.returncode != 0:
# Check if it's a connection error vs single-window mode
if "single-window mode" in result.stdout:
return "" # Empty string = no -w needed
print(f"Error getting windows: {result.stderr}", file=sys.stderr)
return None
# Single-window mode message in stdout
if "single-window mode" in result.stdout:
return "" # Empty string = no -w needed
# Multi-window mode - parse window ID
try:
data = json.loads(result.stdout)
windows = data.get("windows", [])
if windows:
return str(windows[0].get("id"))
except json.JSONDecodeError:
# Try parsing text output: "Window 1: ..."
match = re.search(r'Window (\d+):', result.stdout)
if match:
return match.group(1)
return "" # Default to no -w
def generate_plan_for_stacktrace(stacktrace_path: Path, window_id: str, delay_index: int = 0) -> bool:
"""
Generate a fix plan for a stacktrace using rp-cli context_builder.
Passes file path so context_builder can read and analyze it.
Outputs directly to a plan file. Returns True on success.
"""
# Apply staggered delay for parallel execution
if delay_index > 0:
time.sleep(delay_index * PLAN_DELAY_SECONDS)
# Extract issue ID from filename
issue_id = stacktrace_path.stem.replace("-stacktrace", "")
# Output path for the plan
plan_path = stacktrace_path.parent / stacktrace_path.name.replace("-stacktrace.md", "-plan.md")
# Get absolute path for the stacktrace
abs_stacktrace_path = stacktrace_path.resolve()
# Read first few lines to determine type
with open(stacktrace_path) as f:
header = f.read(500)
is_crash = "Crash" in header or "Exception" in header
issue_type = "crash" if is_crash else "app hang"
# Build the prompt - reference the file path so context_builder can read it
prompt = f"""Analyze this {issue_type} from Sentry and create a fix plan.
<task>
Create a detailed implementation plan to fix {issue_type} {issue_id}.
Read and analyze the stacktrace at: {abs_stacktrace_path}
</task>
<context>
The stacktrace file contains the full crash/hang details from Sentry including:
- Exception type and message (for crashes)
- Thread states (for hangs)
- Full symbolicated stacktrace with file:line references
</context>
<guidelines>
- Read the stacktrace file first to understand the issue
- Identify the root cause from the stacktrace frames
- Find the relevant source files in the RepoPrompt codebase
- Propose specific code changes to fix the issue
- Consider edge cases and potential regressions
- For crashes: focus on null checks, bounds validation, race conditions
- For hangs: focus on main thread blocking, async/await issues, deadlocks
</guidelines>
"""
print(f" Generating plan for {issue_id}...", file=sys.stderr)
try:
# Build JSON args
json_args = json.dumps({
"instructions": prompt,
"response_type": "plan"
})
# Build shell command with stdout redirection only (stderr shows progress)
window_flag = f"-w {window_id} " if window_id else ""
cmd = f"rp-cli {window_flag}-c context_builder -j '{json_args}' > '{plan_path}'"
result = subprocess.run(
cmd,
shell=True,
text=True,
timeout=2700 # 45 minute timeout
)
if result.returncode != 0:
print(f" Error: {result.stderr[:200]}", file=sys.stderr)
plan_path.unlink(missing_ok=True)
return False
# Check if file was created with content
if plan_path.exists() and plan_path.stat().st_size > 0:
print(f" Saved {plan_path.name}", file=sys.stderr)
return True
else:
print(f" No output generated for {issue_id}", file=sys.stderr)
plan_path.unlink(missing_ok=True)
return False
except subprocess.TimeoutExpired:
print(f" Timeout generating plan for {issue_id}", file=sys.stderr)
plan_path.unlink(missing_ok=True)
return False
def generate_plans_parallel(stacktrace_files: list[Path], window_id: str, max_workers: int = 3) -> int:
"""
Generate plans for multiple stacktraces in parallel with staggered starts.
Returns count of successful plans.
"""
success_count = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit jobs with staggered delays
future_to_path = {}
for i, path in enumerate(stacktrace_files):
future = executor.submit(generate_plan_for_stacktrace, path, window_id, i)
future_to_path[future] = path
# Collect results
for future in concurrent.futures.as_completed(future_to_path):
path = future_to_path[future]
try:
if future.result():
success_count += 1
except Exception as e:
print(f" Exception for {path.name}: {e}", file=sys.stderr)
return success_count
def find_stacktraces_without_plans(output_dir: Path) -> list[Path]:
"""Find stacktrace files that don't have corresponding plan files"""
stacktraces = list(output_dir.glob("*-stacktrace.md"))
missing_plans = []
for st in stacktraces:
plan_path = st.parent / st.name.replace("-stacktrace.md", "-plan.md")
if not plan_path.exists():
missing_plans.append(st)
return missing_plans
def run_plan_generation(output_dir: Path, stacktrace_files: Optional[list[Path]] = None):
"""Run plan generation for stacktraces"""
# Get window ID (empty string = single-window mode, None = not running)
window_id = get_rp_window_id()
if window_id is None:
print("Error: No RepoPrompt window found. Launch the app first.", file=sys.stderr)
print(" Run: rp-cli --launch-app", file=sys.stderr)
return
if window_id:
print(f"Using RepoPrompt window {window_id}", file=sys.stderr)
else:
print("Using RepoPrompt (single-window mode)", file=sys.stderr)
# Find stacktraces to process
if stacktrace_files is None:
stacktrace_files = find_stacktraces_without_plans(output_dir)
if not stacktrace_files:
print("No stacktraces need plans.", file=sys.stderr)
return
print(f"Generating plans for {len(stacktrace_files)} stacktraces...", file=sys.stderr)
print(f" (parallel with {PLAN_DELAY_SECONDS}s stagger between starts)", file=sys.stderr)
# Generate plans in parallel (files are written directly by each worker)
success_count = generate_plans_parallel(stacktrace_files, window_id)
print(f"\nGenerated {success_count}/{len(stacktrace_files)} plans.", file=sys.stderr)
# ============================================================================
# Main
# ============================================================================
def main():
parser = argparse.ArgumentParser(description="Triage Sentry issues for RepoPrompt")
parser.add_argument("versions", nargs="*", help="Version(s) to triage (e.g., 1.6.4)")
parser.add_argument("--output-dir", "-o", default=None, help="Output directory")
parser.add_argument("--crashes-only", action="store_true", help="Only fetch crashes")
parser.add_argument("--dry-run", action="store_true", help="List issues without fetching details")
parser.add_argument("--plan", action="store_true", help="Generate fix plans after fetching stacktraces")
parser.add_argument("--plan-only", action="store_true", help="Only generate plans for existing stacktraces (no Sentry fetch)")
parser.add_argument("--plan-workers", type=int, default=3, help="Max parallel plan generations (default: 3)")
args = parser.parse_args()
# Determine output directory
if args.output_dir:
output_dir = Path(args.output_dir)
elif args.versions:
max_version = max(args.versions, key=lambda v: [int(x) for x in v.split(".")])
output_dir = Path(__file__).parent / max_version
else:
# Default to current directory for --plan-only without versions
output_dir = Path(__file__).parent
output_dir.mkdir(parents=True, exist_ok=True)
# Plan-only mode: just generate plans for existing stacktraces
if args.plan_only:
print(f"Plan-only mode. Scanning {output_dir} for stacktraces...", file=sys.stderr)
run_plan_generation(output_dir)
return
# Normal mode: need versions
if not args.versions:
parser.error("versions required (or use --plan-only)")
token = get_token()
# Get release mappings
print("Fetching releases...", file=sys.stderr)
releases = get_releases(token)
# Validate versions
for v in args.versions:
if v not in releases:
print(f"Warning: Version {v} not found in releases. Available: {', '.join(sorted(releases.keys())[-10:])}", file=sys.stderr)
print(f"Output directory: {output_dir}", file=sys.stderr)
# Collect all issues
all_issues: dict[str, list[Issue]] = {}
for version in args.versions:
if version not in releases:
continue
release_id = releases[version]
print(f"Fetching issues for {version} ({release_id})...", file=sys.stderr)
issues = get_issues_for_release(release_id)
print(f" Found {len(issues)} issues", file=sys.stderr)
all_issues[version] = issues
if args.dry_run:
for version, issues in all_issues.items():
print(f"\n=== {version} ===")
for i in issues:
print(f" {i.short_id} [{i.level}] {i.title[:60]}")
return
# Fetch details and save stacktraces
processed = set()
saved_stacktraces = []
for version, issues in all_issues.items():
for issue in issues:
# Skip duplicates (same issue in multiple releases)
if issue.id in processed:
continue
processed.add(issue.id)
# Skip non-crashes if --crashes-only
if args.crashes_only and issue.level != "fatal":
continue
# Skip low-impact hangs
is_hang = "hanging" in issue.title.lower()
if is_hang and issue.level != "fatal":
# Get stats first to decide if worth fetching
issue.count, issue.user_count = get_issue_stats(token, issue.id)
if issue.count < 5:
continue
print(f" Fetching {issue.short_id}...", file=sys.stderr)
# Get event counts if not already fetched
if issue.count == 0:
issue.count, issue.user_count = get_issue_stats(token, issue.id)
# Get latest event
event_data = api_get(token, f"organizations/{ORG}/issues/{issue.id}/events/latest/")
if not event_data:
continue
# Format stacktrace
if issue.level == "fatal":
content = format_crash_stacktrace(event_data, issue)
else:
content = format_hang_stacktrace(event_data, issue)
# Save to file
filename = f"{issue.short_id}-stacktrace.md"
filepath = output_dir / filename
with open(filepath, "w") as f:
f.write(content)
saved_stacktraces.append(filepath)
print(f" Saved {filename}", file=sys.stderr)
# Update issue counts for summary
for version, issues in all_issues.items():
for issue in issues:
if issue.count == 0:
issue.count, issue.user_count = get_issue_stats(token, issue.id)
# Generate summary
print("Generating summary...", file=sys.stderr)
summary = generate_summary(all_issues, output_dir)
version_range = f"{min(args.versions)}-{max(args.versions)}" if len(args.versions) > 1 else args.versions[0]
summary_path = output_dir / f"TRIAGE_{version_range}.md"
with open(summary_path, "w") as f:
f.write(summary)
print(f"Saved {summary_path}", file=sys.stderr)
print(f"\nDone! Files saved to {output_dir}", file=sys.stderr)
# Generate plans if requested
if args.plan and saved_stacktraces:
print("\n" + "="*50, file=sys.stderr)
print("Starting plan generation...", file=sys.stderr)
run_plan_generation(output_dir, saved_stacktraces)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment