You are reviewing a Tauri desktop app called "Mission Control" that acts as a frontend for Claude Code CLI. The app uses Claude as a VM to execute programs written in a custom language called OpenProse.
The key integration points are:
- Tauri backend (lib.rs) spawns Claude CLI processes and manages state
- prose-runtime crate contains VmExecutor which spawns
claude -p --output-format stream-jsonand parses its output - MCP server provides HTTP endpoints for Claude to call back for user feedback and puppet mode
- Frontend (React/TypeScript) receives events via Tauri's emit system and displays execution state
Please do extensive research on:
- Claude Code CLI architecture and best practices (the official Anthropic CLI tool)
- Similar Tauri + CLI bridging patterns in production apps
- Rust async process management patterns
- MCP (Model Context Protocol) server implementations
Then review this codebase for:
- Is the process spawning and stream parsing robust?
- Are there race conditions between stdout parsing and process exit?
- Is the JSON stream parsing (stream-json format) handling edge cases correctly?
- Are there potential deadlocks in the channel/oneshot communication?
- Is the Tauri event emission pattern correct and efficient?
- Are there potential memory leaks from event listeners?
- Is the watch_run polling mechanism efficient? Could it miss events?
- Is the CLI invocation correct? Are we missing important flags?
- Is the MCP server integration following best practices?
- Is the feedback/puppet mode implementation robust?
- Are timeouts appropriate (5min for feedback, 30min for puppet)?
- What happens if Claude crashes mid-execution?
- What happens if the MCP server receives malformed requests?
- Are there cleanup issues (orphaned processes, leaked file handles)?
- Is the stderr capture sufficient for debugging?
- Any concerns with how we spawn processes?
- MCP server security (binding to localhost, request validation)?
- What patterns from similar apps should we adopt?
- Any Rust idioms we're missing?
- Performance optimizations?
- Better error recovery mechanisms?
Please be thorough and specific. Reference similar open source projects where relevant. If you find issues, provide concrete code suggestions for fixes.
use fs2::FileExt;
use prose_checkpoint::git as checkpoints;
use prose_runtime::{ExecutorConfig, Executor, McpServer, VmConfig, VmEvent, VmExecutionHandle, VmExecutor};
use prose_trace::redaction;
use prose_trace::event::TraceEvent;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tauri::Emitter;
use tokio::sync::Mutex;
#[cfg(target_os = "macos")]
use cocoa::base::{id, nil};
#[cfg(target_os = "macos")]
use objc::{class, msg_send, sel, sel_impl};
/// Get the one-off storage directory (~/.mission-control/)
fn oneoff_root() -> PathBuf {
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("/tmp"))
.join(".mission-control")
}
/// Determine the effective workspace root for runs
fn effective_workspace(workspace_root: Option<&str>) -> PathBuf {
match workspace_root {
Some(ws) if !ws.is_empty() => PathBuf::from(ws),
_ => oneoff_root(),
}
}
#[derive(Debug, Clone, Serialize)]
struct RunSummary {
run_id: String,
trace_path: String,
program_path: String,
status: String, // "running" | "completed" | "failed"
}
/// Quick check of trace file for completion status (reads last few lines only)
fn get_run_status(trace_path: &Path) -> String {
// Read the file and check for completion events
if let Ok(content) = std::fs::read_to_string(trace_path) {
// Check from the end for efficiency
for line in content.lines().rev().take(20) {
if line.contains("\"RunCompleted\"") {
return "completed".to_string();
}
if line.contains("\"RunFailed\"") {
return "failed".to_string();
}
}
}
"running".to_string()
}
fn run_dir(workspace_root: &Path, run_id: &str) -> PathBuf {
workspace_root.join(".prose").join("runs").join(run_id)
}
fn trace_path(workspace_root: &Path, run_id: &str) -> PathBuf {
run_dir(workspace_root, run_id).join("trace.jsonl")
}
fn program_path(workspace_root: &Path, run_id: &str) -> PathBuf {
run_dir(workspace_root, run_id).join("program.prose")
}
fn ensure_safe_component(what: &str, s: &str) -> Result<(), String> {
if s.is_empty() {
return Err(format!("{what} must not be empty"));
}
if !s
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
{
return Err(format!("{what} contains invalid characters"));
}
Ok(())
}
fn ensure_safe_session_id(s: &str) -> Result<(), String> {
ensure_safe_component("session_id", s)
}
/// Maximum events to read from a trace file (safety limit against unbounded growth)
const MAX_TRACE_EVENTS: usize = 100_000;
fn read_events(path: &Path) -> anyhow::Result<Vec<TraceEvent>> {
let f = File::open(path)?;
let r = BufReader::new(f);
let mut out = Vec::new();
for line in r.lines() {
if out.len() >= MAX_TRACE_EVENTS {
break; // Safety limit reached
}
let line = line?;
if line.trim().is_empty() {
continue;
}
if let Ok(ev) = serde_json::from_str::<TraceEvent>(&line) {
out.push(ev);
}
}
Ok(out)
}
#[tauri::command]
fn list_runs(workspace_root: String) -> Result<Vec<RunSummary>, String> {
let root = PathBuf::from(&workspace_root);
let dir = root.join(".prose").join("runs");
let mut out = Vec::new();
let entries = std::fs::read_dir(&dir).map_err(|e| format!("read {}: {e}", dir.display()))?;
for ent in entries {
let ent = ent.map_err(|e| e.to_string())?;
if !ent.file_type().map_err(|e| e.to_string())?.is_dir() {
continue;
}
let Some(run_id) = ent.file_name().to_str().map(|s| s.to_string()) else {
continue;
};
let trace = trace_path(&root, &run_id);
if !trace.exists() {
continue;
}
let program = program_path(&root, &run_id);
let status = get_run_status(&trace);
out.push(RunSummary {
run_id,
trace_path: trace.display().to_string(),
program_path: program.display().to_string(),
status,
});
}
out.sort_by(|a, b| b.run_id.cmp(&a.run_id));
Ok(out)
}
#[tauri::command]
fn read_run_events(workspace_root: String, run_id: String) -> Result<Vec<TraceEvent>, String> {
ensure_safe_component("run_id", &run_id)?;
let root = PathBuf::from(&workspace_root);
let path = trace_path(&root, &run_id);
read_events(&path).map_err(|e| format!("read {}: {e}", path.display()))
}
#[tauri::command]
fn read_run_program(workspace_root: String, run_id: String) -> Result<String, String> {
ensure_safe_component("run_id", &run_id)?;
let root = PathBuf::from(&workspace_root);
let path = program_path(&root, &run_id);
std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))
}
#[tauri::command]
fn read_run_binding(workspace_root: String, run_id: String, name: String) -> Result<String, String> {
if name.trim().is_empty() {
return Ok(String::new());
}
ensure_safe_component("run_id", &run_id)?;
ensure_safe_component("binding name", name.trim())?;
let root = PathBuf::from(&workspace_root);
let path = run_dir(&root, &run_id).join("bindings").join(format!("{name}.md"));
std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))
}
#[tauri::command]
fn append_intervention(workspace_root: String, run_id: String, note: String) -> Result<(), String> {
ensure_safe_component("run_id", &run_id)?;
let root = PathBuf::from(&workspace_root);
let path = trace_path(&root, &run_id);
let f = File::options()
.create(true)
.append(true)
.open(&path)
.map_err(|e| format!("open {}: {e}", path.display()))?;
let _ = f.lock_exclusive();
let mut w = BufWriter::new(f);
let mut ev = TraceEvent::new(&run_id, "HumanInterventionApplied");
let (note, hits) = redaction::redact_text(note.trim());
ev.put_str("note", ¬e);
if !hits.is_empty() {
ev.put_json("redaction_hits", serde_json::json!(hits));
}
serde_json::to_writer(&mut w, &ev).map_err(|e| e.to_string())?;
w.write_all(b"\n").map_err(|e| e.to_string())?;
w.flush().map_err(|e| e.to_string())?;
Ok(())
}
#[tauri::command]
fn fork_run(workspace_root: String, run_id: String) -> Result<String, String> {
ensure_safe_component("run_id", &run_id)?;
prose_trace::writer::fork_run(Path::new(&workspace_root), &run_id).map_err(|e| e.to_string())
}
#[tauri::command]
fn blame_session(workspace_root: String, run_id: String, session_id: String) -> Result<String, String> {
ensure_safe_component("run_id", &run_id)?;
ensure_safe_session_id(&session_id)?;
prose_trace::writer::blame_session(Path::new(&workspace_root), &run_id, &session_id)
.map_err(|e| e.to_string())
}
#[tauri::command]
fn checkpoint_list(workspace_root: String) -> Result<Vec<checkpoints::CheckpointInfo>, String> {
checkpoints::list_checkpoints(Path::new(&workspace_root)).map_err(|e| e.to_string())
}
#[tauri::command]
fn checkpoint_create(workspace_root: String, label: Option<String>) -> Result<String, String> {
let label = label.as_deref().filter(|s| !s.trim().is_empty());
checkpoints::create_checkpoint(Path::new(&workspace_root), label).map_err(|e| e.to_string())
}
#[tauri::command]
fn checkpoint_restore(workspace_root: String, checkpoint_id: String) -> Result<(), String> {
checkpoints::restore_checkpoint(Path::new(&workspace_root), &checkpoint_id).map_err(|e| e.to_string())
}
#[tauri::command]
fn checkpoint_diff(workspace_root: String, checkpoint_id: String) -> Result<String, String> {
checkpoints::diff_checkpoint(Path::new(&workspace_root), &checkpoint_id).map_err(|e| e.to_string())
}
#[derive(Default)]
struct WatchState {
stop: Option<Arc<AtomicBool>>,
task: Option<tauri::async_runtime::JoinHandle<()>>,
run_id: Option<String>,
}
/// State for active VM execution with MCP server for feedback/puppet mode
struct VmState {
run_id: Option<String>,
mcp_server: Option<McpServer>,
}
impl Default for VmState {
fn default() -> Self {
Self {
run_id: None,
mcp_server: None,
}
}
}
#[tauri::command]
async fn watch_run(
app: tauri::AppHandle,
state: tauri::State<'_, Mutex<WatchState>>,
workspace_root: String,
run_id: String,
) -> Result<(), String> {
ensure_safe_component("run_id", &run_id)?;
let mut s = state.lock().await;
if let Some(stop) = s.stop.take() {
stop.store(true, Ordering::SeqCst);
}
if let Some(task) = s.task.take() {
task.abort();
}
let stop = Arc::new(AtomicBool::new(false));
let stop2 = stop.clone();
let app2 = app.clone();
let ws = workspace_root.clone();
let rid = run_id.clone();
s.stop = Some(stop);
s.run_id = Some(run_id);
s.task = Some(tauri::async_runtime::spawn(async move {
let path = trace_path(Path::new(&ws), &rid);
let mut pos = 0u64;
let mut leftover = String::new(); // Keep partial lines between iterations
// Start at end so the initial load via `read_run_events` owns the baseline.
if let Ok(md) = std::fs::metadata(&path) {
pos = md.len();
}
loop {
if stop2.load(Ordering::SeqCst) {
break;
}
let mut f = match File::open(&path) {
Ok(f) => f,
Err(_) => {
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
continue;
}
};
let len = match f.metadata() {
Ok(m) => m.len(),
Err(_) => {
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
continue;
}
};
if len < pos {
pos = 0;
leftover.clear(); // File was truncated, reset leftover
}
if f.seek(SeekFrom::Start(pos)).is_ok() {
let mut buf = std::mem::take(&mut leftover); // Start with leftover from previous read
if f.read_to_string(&mut buf).is_ok() {
pos = f.stream_position().unwrap_or(len);
// Split on newlines, keeping the last fragment if it doesn't end with \n
let ends_with_newline = buf.ends_with('\n');
let lines: Vec<&str> = buf.split('\n').collect();
let (complete_lines, partial) = if ends_with_newline || lines.is_empty() {
(lines.as_slice(), "")
} else {
let (complete, last) = lines.split_at(lines.len() - 1);
(complete, last.first().copied().unwrap_or(""))
};
// Process complete lines
for line in complete_lines {
let line = line.trim();
if line.is_empty() {
continue;
}
if let Ok(ev) = serde_json::from_str::<TraceEvent>(line) {
let _ = app2.emit("trace_event", ev);
}
}
// Keep partial line for next iteration
leftover = partial.to_string();
}
}
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
}
}));
Ok(())
}
#[tauri::command]
async fn watch_stop(state: tauri::State<'_, Mutex<WatchState>>) -> Result<(), String> {
let mut s = state.lock().await;
if let Some(stop) = s.stop.take() {
stop.store(true, Ordering::SeqCst);
}
if let Some(task) = s.task.take() {
task.abort();
}
s.run_id = None;
Ok(())
}
#[tauri::command]
fn save_file(file_path: String, content: String) -> Result<(), String> {
let path = PathBuf::from(&file_path);
// Security: ensure path doesn't escape expected directories
if path.components().any(|c| matches!(c, std::path::Component::ParentDir)) {
return Err("Path traversal not allowed".to_string());
}
std::fs::write(&path, content).map_err(|e| format!("write {}: {e}", path.display()))
}
#[tauri::command]
fn open_file(file_path: String) -> Result<String, String> {
let path = PathBuf::from(&file_path);
std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))
}
#[derive(Serialize)]
struct LintError {
line: usize,
col: usize,
message: String,
}
#[derive(Serialize)]
struct LintResult {
success: bool,
message: Option<String>,
errors: Option<Vec<LintError>>,
}
#[tauri::command]
fn lint_prose(content: String) -> LintResult {
match prose_core::parse_program(&content) {
Ok(program) => {
// Run linter on the parsed program
let lint_errors = prose_core::lint_program(&program);
if lint_errors.is_empty() {
LintResult {
success: true,
message: None,
errors: None,
}
} else {
// Filter to only errors (not warnings) for failure
let error_count = lint_errors
.iter()
.filter(|e| e.level == prose_core::LintLevel::Error)
.count();
LintResult {
success: error_count == 0,
message: if error_count > 0 {
Some(format!("{} lint error(s)", error_count))
} else {
Some(format!("{} warning(s)", lint_errors.len()))
},
errors: Some(
lint_errors
.into_iter()
.map(|e| LintError {
line: e.line,
col: e.col,
message: e.message,
})
.collect(),
),
}
}
}
Err(e) => {
// Parse error - extract line/col if possible
let msg = e.to_string();
let (line, col) = parse_error_location(&msg).unwrap_or((1, 1));
LintResult {
success: false,
message: Some(msg.clone()),
errors: Some(vec![LintError { line, col, message: msg }]),
}
}
}
}
fn parse_error_location(msg: &str) -> Option<(usize, usize)> {
// Parse "line:col: message" format
let parts: Vec<&str> = msg.splitn(3, ':').collect();
if parts.len() >= 2 {
let line = parts[0].trim().parse().ok()?;
let col = parts[1].trim().parse().ok()?;
return Some((line, col));
}
None
}
/// Ensure OpenProse telemetry is disabled in the workspace.
/// This speeds up execution by skipping telemetry notices and analytics POST requests.
fn ensure_telemetry_disabled(workspace: &Path) -> Result<(), String> {
let prose_dir = workspace.join(".prose");
let env_path = prose_dir.join(".env");
// Create .prose directory if needed
if !prose_dir.exists() {
std::fs::create_dir_all(&prose_dir)
.map_err(|e| format!("Failed to create .prose directory: {}", e))?;
}
// Read existing .env or start fresh
let existing = std::fs::read_to_string(&env_path).unwrap_or_default();
// Check if telemetry is already disabled
if existing.contains("OPENPROSE_TELEMETRY=disabled") {
return Ok(());
}
// Update or add the telemetry setting
let new_content = if existing.contains("OPENPROSE_TELEMETRY=") {
// Replace existing setting
existing
.lines()
.map(|line| {
if line.starts_with("OPENPROSE_TELEMETRY=") {
"OPENPROSE_TELEMETRY=disabled"
} else {
line
}
})
.collect::<Vec<_>>()
.join("\n")
} else {
// Add new setting
if existing.is_empty() {
"OPENPROSE_TELEMETRY=disabled".to_string()
} else {
format!("{}\nOPENPROSE_TELEMETRY=disabled", existing.trim_end())
}
};
std::fs::write(&env_path, new_content)
.map_err(|e| format!("Failed to write .prose/.env: {}", e))?;
Ok(())
}
#[derive(Serialize)]
struct RunProseResult {
run_id: String,
success: bool,
message: Option<String>,
}
/// Run a prose program through the executor
#[tauri::command]
async fn run_prose(
app: tauri::AppHandle,
workspace_root: Option<String>,
program_content: String,
model: Option<String>,
skip_permissions: Option<bool>,
) -> Result<RunProseResult, String> {
// Parse the program first
let program = prose_core::parse_program(&program_content)
.map_err(|e| format!("Parse error: {}", e))?;
// Determine effective workspace
let effective_ws = effective_workspace(workspace_root.as_deref());
// Generate run ID
let run_id = format!(
"run-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
);
// Ensure run directory exists
let run_dir = run_dir(&effective_ws, &run_id);
std::fs::create_dir_all(&run_dir)
.map_err(|e| format!("Failed to create run dir: {}", e))?;
// Save the program
let prog_path = program_path(&effective_ws, &run_id);
std::fs::write(&prog_path, &program_content)
.map_err(|e| format!("Failed to write program: {}", e))?;
// Create trace file
let trace_path = trace_path(&effective_ws, &run_id);
let trace_file = File::create(&trace_path)
.map_err(|e| format!("Failed to create trace: {}", e))?;
// Configure executor
let config = ExecutorConfig {
puppet_mode_enabled: false,
workspace_root: effective_ws.clone(),
model: model.unwrap_or_else(|| "sonnet".to_string()),
skip_permissions: skip_permissions.unwrap_or(true), // Default to true for now (no approval UI yet)
mock_execution: false,
};
// Run the executor
let executor = Executor::new(config);
match executor.run(program).await {
Ok(mut handle) => {
let app_clone = app.clone();
let run_id_clone = run_id.clone();
let trace_file = Arc::new(Mutex::new(trace_file));
// Process events
while let Some(event) = handle.events.recv().await {
// Convert to TraceEvent for consistency
let trace_event = runtime_event_to_trace(&run_id_clone, &event);
// Write to trace file
{
let mut f = trace_file.lock().await;
if let Ok(json) = serde_json::to_string(&trace_event) {
let _ = writeln!(f, "{}", json);
}
}
// Emit to frontend (emit TraceEvent, not raw RuntimeEvent)
let _ = app_clone.emit("runtime_event", &trace_event);
}
Ok(RunProseResult {
run_id,
success: true,
message: None,
})
}
Err(e) => Ok(RunProseResult {
run_id,
success: false,
message: Some(e.to_string()),
}),
}
}
/// Convert a RuntimeEvent to a TraceEvent for logging
fn runtime_event_to_trace(run_id: &str, event: &prose_runtime::RuntimeEvent) -> TraceEvent {
let mut trace = TraceEvent::new(run_id, "RuntimeEvent");
trace.put_json("event", serde_json::to_value(event).unwrap_or_default());
trace
}
/// Run a prose program using Claude-as-VM approach
/// This sends the program + prose.md to Claude, which becomes the VM
#[tauri::command]
async fn run_prose_vm(
app: tauri::AppHandle,
vm_state: tauri::State<'_, Arc<Mutex<VmState>>>,
workspace_root: Option<String>,
program_content: String,
model: Option<String>,
skip_permissions: Option<bool>,
enable_mcp: Option<bool>,
resume_id: Option<String>,
run_id: Option<String>,
) -> Result<RunProseResult, String> {
// Determine effective workspace
let effective_ws = effective_workspace(workspace_root.as_deref());
// Ensure telemetry is disabled for faster execution
// This prevents the VM from showing telemetry notices and sending analytics
ensure_telemetry_disabled(&effective_ws)?;
// Configure VM executor
let config = VmConfig {
prose_md_path: effective_ws.join("prose.md"),
workspace_root: effective_ws.clone(),
model: model.unwrap_or_else(|| "sonnet".to_string()),
skip_permissions: skip_permissions.unwrap_or(true), // Default to true for now (no approval UI yet)
resume_id: resume_id.clone(),
enable_mcp: enable_mcp.unwrap_or(true),
};
let executor = VmExecutor::new(config);
// Create event channel
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<VmEvent>(100);
// Start execution (non-blocking - returns immediately)
// If run_id is provided (e.g., from fork), use that existing run directory
let handle = executor
.execute(&program_content, event_tx, run_id)
.await
.map_err(|e| format!("Failed to start VM: {}", e))?;
let run_id = handle.run_id.clone();
// Store MCP server for feedback/puppet mode responses
{
let mut state = vm_state.lock().await;
state.run_id = Some(run_id.clone());
state.mcp_server = handle.mcp_server;
}
// Spawn task to process events and forward to frontend
let app_clone = app.clone();
let effective_ws_clone = effective_ws.clone();
let vm_state_clone = vm_state.inner().clone();
tokio::spawn(async move {
let mut trace_file: Option<File> = None;
let mut saw_completion = false;
while let Some(event) = event_rx.recv().await {
// Convert VmEvent to TraceEvent for consistency
let trace_event = vm_event_to_trace(&event);
// Create trace file on first event (VmStarted)
if let VmEvent::VmStarted { ref run_id, .. } = event {
let path = trace_path(&effective_ws_clone, run_id);
trace_file = File::create(&path).ok();
}
// Write to trace file (flush immediately for real-time streaming)
if let Some(ref mut f) = trace_file {
if let Ok(json) = serde_json::to_string(&trace_event) {
let _ = writeln!(f, "{}", json);
let _ = f.flush(); // Ensure event is written immediately
}
}
// Emit to frontend
let _ = app_clone.emit("vm_event", &event);
let _ = app_clone.emit("runtime_event", &trace_event);
// Clear VM state on completion
if matches!(event, VmEvent::VmCompleted { .. }) {
saw_completion = true;
let mut state = vm_state_clone.lock().await;
state.run_id = None;
state.mcp_server = None;
}
}
// Channel closed without VmCompleted - emit error and cleanup
if !saw_completion {
let mut state = vm_state_clone.lock().await;
if let Some(ref run_id) = state.run_id {
let mut error_trace = TraceEvent::new(run_id, "ErrorOccurred");
error_trace.put_str("message", "VM event channel closed unexpectedly");
error_trace.put_str("error_type", "ChannelClosed");
// Write to trace file
if let Some(ref mut f) = trace_file {
if let Ok(json) = serde_json::to_string(&error_trace) {
let _ = writeln!(f, "{}", json);
let _ = f.flush();
}
}
// Emit to frontend
let _ = app_clone.emit("runtime_event", &error_trace);
}
state.run_id = None;
state.mcp_server = None;
}
});
Ok(RunProseResult {
run_id,
success: true,
message: None,
})
}
/// Submit feedback response to an MCP request (vm_ask_user or vm_suspend)
#[tauri::command]
async fn submit_feedback(
vm_state: tauri::State<'_, Arc<Mutex<VmState>>>,
request_id: String,
response: serde_json::Value,
) -> Result<(), String> {
let state = vm_state.lock().await;
if let Some(ref mcp_server) = state.mcp_server {
mcp_server
.respond(&request_id, response)
.await
.map_err(|e| format!("Failed to submit feedback: {}", e))
} else {
Err("No active VM session".to_string())
}
}
/// Convert a VmEvent to a TraceEvent for logging and UI
fn vm_event_to_trace(event: &VmEvent) -> TraceEvent {
match event {
VmEvent::VmStarted { run_id, program_file, resume_id } => {
let mut trace = TraceEvent::new(run_id, "RunStarted");
trace.put_str("program_file", program_file);
if let Some(rid) = resume_id {
trace.put_str("resume_id", rid);
}
trace
}
VmEvent::Thought { run_id, text, position } => {
let mut trace = TraceEvent::new(run_id, "Thought");
trace.put_str("text", text);
if let Some(pos) = position {
trace.put_str("position", pos);
}
trace
}
VmEvent::ContextAssembled { run_id, session_id, token_estimate, prompt_tokens } => {
let mut trace = TraceEvent::new(run_id, "ContextAssembled");
trace.session_id = Some(session_id.clone());
trace.put_json("token_estimate", serde_json::json!(*token_estimate));
// Add a synthetic "prompt" binding for the Context tab
trace.put_json("included_bindings", serde_json::json!([{
"name": "prompt",
"binding_id": "prompt",
"token_count": *prompt_tokens,
"reason": "required",
"reason_detail": "Session prompt"
}]));
trace
}
VmEvent::SessionSpawned { run_id, session_id, agent, prompt_preview } => {
let mut trace = TraceEvent::new(run_id, "SessionStarted");
trace.session_id = Some(session_id.clone());
if let Some(a) = agent {
trace.put_str("agent_id", a);
}
trace.put_str("prompt_preview", prompt_preview);
trace
}
VmEvent::SessionCompleted { run_id, session_id, output_preview } => {
let mut trace = TraceEvent::new(run_id, "SessionCompleted");
trace.session_id = Some(session_id.clone());
trace.put_str("output_preview", output_preview);
trace
}
VmEvent::SessionFailed { run_id, session_id, error } => {
let mut trace = TraceEvent::new(run_id, "SessionFailed");
trace.session_id = Some(session_id.clone());
trace.put_str("error", error);
trace
}
VmEvent::BindingCreated { run_id, name, path } => {
let mut trace = TraceEvent::new(run_id, "BindingMaterialized");
trace.put_str("name", name);
trace.put_str("path", path);
// Use path as stable binding_id (since we don't have a content hash)
trace.put_str("binding_id", path);
trace
}
VmEvent::ToolCall { run_id, tool_id, tool_name, input_preview } => {
let mut trace = TraceEvent::new(run_id, "ToolCall");
trace.put_str("tool_id", tool_id);
trace.put_str("tool_name", tool_name);
trace.put_str("input_preview", input_preview);
trace
}
VmEvent::ToolResult { run_id, tool_id, is_error, output_preview } => {
let mut trace = TraceEvent::new(run_id, "ToolResult");
trace.put_str("tool_id", tool_id);
trace.put_json("is_error", serde_json::json!(*is_error));
trace.put_str("output_preview", output_preview);
trace
}
VmEvent::TextOutput { run_id, text } => {
let mut trace = TraceEvent::new(run_id, "TextOutput");
trace.put_str("text", text);
trace
}
VmEvent::Processing { run_id } => {
TraceEvent::new(run_id, "Processing")
}
VmEvent::StreamingText { run_id, text, is_complete } => {
let mut trace = TraceEvent::new(run_id, "StreamingText");
trace.put_str("text", text);
trace.put_json("is_complete", serde_json::json!(*is_complete));
trace
}
VmEvent::FeedbackRequested { run_id, request_id, summary, question, options } => {
let mut trace = TraceEvent::new(run_id, "HumanInterventionRequested");
trace.put_str("request_id", request_id);
trace.put_str("summary", summary);
if let Some(q) = question {
trace.put_str("question", q);
}
trace.put_json("options", serde_json::json!(options));
trace
}
VmEvent::PuppetModeEntered { run_id, request_id, session_id, context_summary } => {
let mut trace = TraceEvent::new(run_id, "PuppetModeEntered");
trace.put_str("request_id", request_id);
trace.session_id = Some(session_id.clone());
trace.put_str("context_summary", context_summary);
trace
}
VmEvent::VmCompleted { run_id, success, error, resume_id, stderr_tail } => {
let event_type = if *success { "RunCompleted" } else { "RunFailed" };
let mut trace = TraceEvent::new(run_id, event_type);
trace.put_json("success", serde_json::json!(*success));
if let Some(e) = error {
trace.put_str("message", e); // Use "message" for error details (consistent with RunFailed)
}
if let Some(rid) = resume_id {
trace.put_str("resume_id", rid);
}
if let Some(tail) = stderr_tail {
trace.put_str("stderr_tail", tail);
}
trace
}
}
}
/// List prose files in a workspace
#[tauri::command]
fn list_prose_files(workspace_root: Option<String>) -> Result<Vec<String>, String> {
let root = effective_workspace(workspace_root.as_deref());
let mut files = Vec::new();
// Look for .prose files in the workspace
let patterns = [
root.join("*.prose"),
root.join("**/*.prose"),
];
for pattern in &patterns {
if let Ok(entries) = glob::glob(&pattern.to_string_lossy()) {
for entry in entries.flatten() {
if let Some(path_str) = entry.to_str() {
files.push(path_str.to_string());
}
}
}
}
files.sort();
files.dedup();
Ok(files)
}
/// Clear all runs in a workspace
#[tauri::command]
fn clear_runs(workspace_root: Option<String>) -> Result<usize, String> {
let root = effective_workspace(workspace_root.as_deref());
let runs_dir = root.join(".prose").join("runs");
if !runs_dir.exists() {
return Ok(0);
}
let mut count = 0;
let entries = std::fs::read_dir(&runs_dir)
.map_err(|e| format!("Failed to read runs directory: {}", e))?;
for entry in entries.flatten() {
if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
if std::fs::remove_dir_all(entry.path()).is_ok() {
count += 1;
}
}
}
Ok(count)
}
/// Delete a specific run
#[tauri::command]
fn delete_run(workspace_root: String, run_id: String) -> Result<(), String> {
ensure_safe_component("run_id", &run_id)?;
let root = PathBuf::from(&workspace_root);
let dir = run_dir(&root, &run_id);
if !dir.exists() {
return Err(format!("Run {} not found", run_id));
}
std::fs::remove_dir_all(&dir)
.map_err(|e| format!("Failed to delete run: {}", e))
}
// =============================================================================
// E2E Testing: Interaction simulation (focus-free)
// =============================================================================
/// Interaction types for E2E testing - executed via webview.eval() so no focus stealing
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Interaction {
Click { selector: String },
ClickXy { x: f64, y: f64 },
Type { selector: String, text: String },
Clear { selector: String },
Focus { selector: String },
}
#[tauri::command]
async fn simulate_interaction(
webview: tauri::Webview,
interaction: Interaction,
) -> Result<String, String> {
let js = match interaction {
Interaction::Click { selector } => {
format!(
r#"(function() {{
const el = document.querySelector({:?});
if (!el) return JSON.stringify({{ ok: false, error: "not found: " + {:?} }});
el.scrollIntoView({{ block: "center" }});
el.click();
return JSON.stringify({{ ok: true }});
}})()"#,
selector, selector
)
}
Interaction::ClickXy { x, y } => {
format!(
r#"(function() {{
const el = document.elementFromPoint({}, {});
if (!el) return JSON.stringify({{ ok: false, error: "no element at ({}, {})" }});
el.dispatchEvent(new MouseEvent('click', {{
view: window, bubbles: true, cancelable: true, clientX: {}, clientY: {}
}}));
return JSON.stringify({{ ok: true, tag: el.tagName }});
}})()"#,
x, y, x, y, x, y
)
}
Interaction::Type { selector, text } => {
format!(
r#"(function() {{
const el = document.querySelector({:?});
if (!el) return JSON.stringify({{ ok: false, error: "not found: " + {:?} }});
el.focus();
if (el.tagName === 'INPUT' || el.tagName === 'TEXTAREA') {{
el.value = {:?};
el.dispatchEvent(new Event('input', {{ bubbles: true }}));
el.dispatchEvent(new Event('change', {{ bubbles: true }}));
}} else if (el.isContentEditable) {{
el.textContent = {:?};
el.dispatchEvent(new Event('input', {{ bubbles: true }}));
}} else {{
return JSON.stringify({{ ok: false, error: "element not editable" }});
}}
return JSON.stringify({{ ok: true }});
}})()"#,
selector, selector, text, text
)
}
Interaction::Clear { selector } => {
format!(
r#"(function() {{
const el = document.querySelector({:?});
if (!el) return JSON.stringify({{ ok: false, error: "not found: " + {:?} }});
if (el.tagName === 'INPUT' || el.tagName === 'TEXTAREA') {{
el.value = '';
el.dispatchEvent(new Event('input', {{ bubbles: true }}));
}} else if (el.isContentEditable) {{
el.textContent = '';
el.dispatchEvent(new Event('input', {{ bubbles: true }}));
}}
return JSON.stringify({{ ok: true }});
}})()"#,
selector, selector
)
}
Interaction::Focus { selector } => {
format!(
r#"(function() {{
const el = document.querySelector({:?});
if (!el) return JSON.stringify({{ ok: false, error: "not found: " + {:?} }});
el.focus();
return JSON.stringify({{ ok: true }});
}})()"#,
selector, selector
)
}
};
webview.eval(&js).map_err(|e| e.to_string())?;
// eval doesn't return the result directly, but the JS writes to a global we could read
// For now just return success - the JS will throw if there's an error
Ok(r#"{"ok":true}"#.to_string())
}
// =============================================================================
// Snapshot for E2E testing (macOS only)
// =============================================================================
const SNAPSHOT_PATH: &str = "/tmp/mission-control-snapshot.png";
#[tauri::command]
async fn capture_snapshot(webview: tauri::Webview) -> Result<String, String> {
#[cfg(target_os = "macos")]
{
use block::ConcreteBlock;
let (tx, rx) = tokio::sync::oneshot::channel();
let tx_mutex = std::sync::Arc::new(std::sync::Mutex::new(Some::<
tokio::sync::oneshot::Sender<Result<String, String>>,
>(tx)));
webview
.with_webview(move |webview_ptr| {
unsafe {
let webview_id = webview_ptr.inner() as id;
let config: id = msg_send![class!(WKSnapshotConfiguration), new];
let tx_clone = tx_mutex.clone();
let block = ConcreteBlock::new(move |image: id, _error: id| {
let mut tx_lock = match tx_clone.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(), // Recover from poisoned mutex
};
if let Some(tx) = tx_lock.take() {
if image == nil {
let _ = tx.send(Err("Snapshot returned nil image".to_string()));
return;
}
let tiff_data: id = msg_send![image, TIFFRepresentation];
let bitmap_rep: id =
msg_send![class!(NSBitmapImageRep), imageRepWithData:tiff_data];
let png_data: id =
msg_send![bitmap_rep, representationUsingType:4 properties:nil];
let bytes: *const u8 = msg_send![png_data, bytes];
let length: usize = msg_send![png_data, length];
let data = std::slice::from_raw_parts(bytes, length);
match std::fs::write(SNAPSHOT_PATH, data) {
Ok(_) => {
let _ = tx.send(Ok(SNAPSHOT_PATH.to_string()));
}
Err(e) => {
let _ = tx.send(Err(e.to_string()));
}
}
}
});
let block_copy = block.copy();
let _: () = msg_send![webview_id, takeSnapshotWithConfiguration:config completionHandler:block_copy];
}
})
.map_err(|e| e.to_string())?;
rx.await
.map_err(|e: tokio::sync::oneshot::error::RecvError| e.to_string())?
}
#[cfg(not(target_os = "macos"))]
{
Err("Snapshot only supported on macOS".to_string())
}
}
pub fn run() {
let mut builder = tauri::Builder::default()
.plugin(tauri_plugin_dialog::init())
.plugin(tauri_plugin_opener::init())
.manage(Mutex::new(WatchState::default()))
.manage(Arc::new(Mutex::new(VmState::default())))
.invoke_handler(tauri::generate_handler![
list_runs,
read_run_events,
read_run_program,
read_run_binding,
append_intervention,
fork_run,
blame_session,
checkpoint_list,
checkpoint_create,
checkpoint_restore,
checkpoint_diff,
watch_run,
watch_stop,
save_file,
open_file,
lint_prose,
run_prose,
list_prose_files,
clear_runs,
delete_run,
run_prose_vm,
submit_feedback,
capture_snapshot,
simulate_interaction
]);
// E2E testing: MCP plugin for CLI interaction (debug builds only)
#[cfg(debug_assertions)]
{
let socket_path = std::path::PathBuf::from("/tmp/mission-control-mcp.sock");
if socket_path.exists() {
let _ = std::fs::remove_file(&socket_path);
}
builder = builder.plugin(tauri_plugin_mcp::init_with_config(
tauri_plugin_mcp::PluginConfig::new("mission-control".to_string())
.socket_path(socket_path)
.start_socket_server(true),
));
}
builder
.run(tauri::generate_context!())
.expect("error while running tauri application");
}
mod claude;
mod executor;
mod mcp_server;
mod rpc;
mod suspension;
mod vm;
pub use claude::{run_claude, ClaudeConfig, ClaudeEvent};
pub use executor::{Executor, ExecutorConfig, RuntimeEvent};
pub use mcp_server::{McpEvent, McpResponse, McpServer};
pub use rpc::{RuntimeServer, RuntimeClient};
pub use suspension::{ExecutionState, ResumeData, SuspensionPoint};
pub use vm::{VmConfig, VmEvent, VmExecutionHandle, VmExecutor, SessionState};
//! Claude-as-VM executor.
//!
//! Incorporates lessons from conductor codebase:
//! - Stderr tail capture for debugging
//! - Session state persistence with resume tokens
//! - Robust process exit handling
//! - Atomic JSON line parsing
use crate::mcp_server::{McpEvent, McpServer};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use std::path::{Path, PathBuf};
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc;
/// Maximum stderr lines to keep for debugging (from conductor).
const STDERR_TAIL_LINES: usize = 200;
/// Events emitted during VM execution.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "event_type")]
pub enum VmEvent {
/// VM started executing a program.
VmStarted {
run_id: String,
program_file: String,
resume_id: Option<String>,
},
/// VM reported a thought/reasoning step.
Thought {
run_id: String,
text: String,
position: Option<String>,
},
/// A session was spawned via Task tool.
SessionSpawned {
run_id: String,
session_id: String,
agent: Option<String>,
prompt_preview: String,
},
/// Context was assembled for a session (token budget tracking).
ContextAssembled {
run_id: String,
session_id: String,
token_estimate: u32,
prompt_tokens: u32,
},
/// A session completed successfully.
SessionCompleted {
run_id: String,
session_id: String,
output_preview: String,
},
/// A session failed with an error.
SessionFailed {
run_id: String,
session_id: String,
error: String,
},
/// A binding was created/materialized.
BindingCreated {
run_id: String,
name: String,
path: String,
},
/// A tool was called (generic, for visibility).
ToolCall {
run_id: String,
tool_id: String,
tool_name: String,
input_preview: String,
},
/// A tool returned a result.
ToolResult {
run_id: String,
tool_id: String,
is_error: bool,
output_preview: String,
},
/// Claude output text (narration, etc).
TextOutput {
run_id: String,
text: String,
},
/// Claude has started processing (message_start received).
Processing {
run_id: String,
},
/// Streaming text update (partial, for progress indication).
StreamingText {
run_id: String,
text: String,
is_complete: bool,
},
/// Feedback requested (suspension point) - from MCP vm_ask_user.
FeedbackRequested {
run_id: String,
request_id: String,
summary: String,
question: Option<String>,
options: Vec<String>,
},
/// Puppet mode suspension requested - from MCP vm_suspend.
PuppetModeEntered {
run_id: String,
request_id: String,
session_id: String,
context_summary: String,
},
/// VM execution completed.
VmCompleted {
run_id: String,
success: bool,
error: Option<String>,
resume_id: Option<String>,
stderr_tail: Option<String>,
},
}
/// Session state for persistence (like conductor's session.json).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionState {
pub run_id: String,
pub resume_id: Option<String>,
pub model: String,
pub started_at: String,
pub updated_at: String,
}
/// Configuration for VM execution.
#[derive(Debug, Clone)]
pub struct VmConfig {
/// Path to prose.md (VM specification).
pub prose_md_path: PathBuf,
/// Working directory for execution.
pub workspace_root: PathBuf,
/// Model to use (sonnet, opus, haiku).
pub model: String,
/// Skip permission prompts.
pub skip_permissions: bool,
/// Resume ID from previous session (for continuity).
pub resume_id: Option<String>,
/// Enable MCP server for feedback/puppet mode.
pub enable_mcp: bool,
}
impl Default for VmConfig {
fn default() -> Self {
Self {
prose_md_path: PathBuf::from("prose.md"),
workspace_root: std::env::current_dir().unwrap_or_default(),
model: "sonnet".to_string(),
skip_permissions: false,
resume_id: None,
enable_mcp: true,
}
}
}
/// Result of VM execution, including MCP handle for responding to requests.
pub struct VmExecutionHandle {
pub run_id: String,
pub mcp_server: Option<McpServer>,
}
/// The VM executor - runs Claude as the OpenProse VM.
pub struct VmExecutor {
config: VmConfig,
}
impl VmExecutor {
pub fn new(config: VmConfig) -> Self {
Self { config }
}
/// Execute a .prose program using Claude as the VM.
/// Returns a handle with the run_id and optional MCP server for responding to requests.
/// The execution runs in the background - listen for VmCompleted event for completion.
/// If `existing_run_id` is provided, uses that run directory (from fork) instead of creating new.
pub async fn execute(
&self,
program_content: &str,
event_tx: mpsc::Sender<VmEvent>,
existing_run_id: Option<String>,
) -> Result<VmExecutionHandle> {
// Use provided run_id or generate new one
let run_id = existing_run_id.unwrap_or_else(generate_run_id);
let run_dir = self
.config
.workspace_root
.join(".prose")
.join("runs")
.join(&run_id);
// Create run directory structure (may already exist if forked)
std::fs::create_dir_all(run_dir.join("bindings"))?;
std::fs::create_dir_all(run_dir.join("agents"))?;
// Save/overwrite the program
std::fs::write(run_dir.join("program.prose"), program_content)?;
// Read prose.md (VM specification)
let prose_md = std::fs::read_to_string(&self.config.prose_md_path)
.unwrap_or_else(|_| include_str!("../../../../skills/open-prose/prose.md").to_string());
// Build the VM prompt
let prompt = build_vm_prompt(
&prose_md,
program_content,
&self.config.workspace_root,
&run_id,
);
// Start MCP server if enabled
let (mcp_server, mcp_config_path) = if self.config.enable_mcp {
let (server, mcp_event_rx) = McpServer::start().await?;
// Write MCP config to temp file
let config_path = run_dir.join("mcp-config.json");
let config = server.mcp_config();
std::fs::write(&config_path, serde_json::to_string_pretty(&config)?)?;
// Forward MCP events to VmEvent channel
let event_tx_mcp = event_tx.clone();
let run_id_mcp = run_id.clone();
tokio::spawn(async move {
let mut rx = mcp_event_rx;
while let Some(mcp_event) = rx.recv().await {
let vm_event = match mcp_event {
McpEvent::AskUser {
request_id,
summary,
question,
options,
} => VmEvent::FeedbackRequested {
run_id: run_id_mcp.clone(),
request_id,
summary,
question,
options,
},
McpEvent::Suspend {
request_id,
session_id,
context_summary,
} => VmEvent::PuppetModeEntered {
run_id: run_id_mcp.clone(),
request_id,
session_id,
context_summary,
},
};
if event_tx_mcp.send(vm_event).await.is_err() {
break;
}
}
});
(Some(server), Some(config_path))
} else {
(None, None)
};
// Emit start event (with existing resume_id if resuming)
event_tx
.send(VmEvent::VmStarted {
run_id: run_id.clone(),
program_file: "program.prose".to_string(),
resume_id: self.config.resume_id.clone(),
})
.await?;
// Build claude CLI args
let mut args = vec![
"-p".to_string(),
"--output-format".to_string(),
"stream-json".to_string(),
"--verbose".to_string(),
"--model".to_string(),
self.config.model.clone(),
];
if self.config.skip_permissions {
args.push("--dangerously-skip-permissions".to_string());
}
if let Some(ref resume) = self.config.resume_id {
args.push("--resume".to_string());
args.push(resume.clone());
}
// Add MCP config if enabled
if let Some(ref config_path) = mcp_config_path {
args.push("--mcp-config".to_string());
args.push(config_path.display().to_string());
}
args.push("--".to_string());
args.push(prompt);
// Spawn claude CLI
let mut child = Command::new("claude")
.args(&args)
.current_dir(&self.config.workspace_root)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to capture stdout"))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to capture stderr"))?;
// Parse stdout stream (each line is complete JSON - conductor pattern)
let event_tx_stdout = event_tx.clone();
let run_id_stdout = run_id.clone();
let run_dir_stdout = run_dir.clone();
let stdout_task = tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
let mut parser = VmOutputParser::new(run_id_stdout, run_dir_stdout);
while let Ok(Some(line)) = reader.next_line().await {
// Each line is atomic JSON (conductor pattern)
if let Ok(value) = serde_json::from_str::<Value>(&line) {
for event in parser.parse(&value) {
if event_tx_stdout.send(event).await.is_err() {
break;
}
}
}
}
parser.resume_id
});
// Capture stderr tail for debugging (conductor pattern: last 200 lines)
let stderr_task = tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
let mut tail: VecDeque<String> = VecDeque::with_capacity(STDERR_TAIL_LINES);
while let Ok(Some(line)) = reader.next_line().await {
if tail.len() >= STDERR_TAIL_LINES {
tail.pop_front();
}
tail.push_back(line);
}
tail.into_iter().collect::<Vec<_>>().join("\n")
});
// Spawn background task for process completion
let event_tx_completion = event_tx;
let run_id_completion = run_id.clone();
let model = self.config.model.clone();
tokio::spawn(async move {
// Wait for process exit
let status = child.wait().await;
let resume_id = stdout_task.await.ok().flatten();
let stderr_tail = stderr_task.await.unwrap_or_default();
// Save session state for resumption (conductor pattern)
let session_state = SessionState {
run_id: run_id_completion.clone(),
resume_id: resume_id.clone(),
model,
started_at: chrono::Utc::now().to_rfc3339(),
updated_at: chrono::Utc::now().to_rfc3339(),
};
let session_path = run_dir.join("session.json");
if let Ok(json) = serde_json::to_string_pretty(&session_state) {
let _ = std::fs::write(&session_path, json);
}
let (success, error) = match status {
Ok(s) if s.success() => (true, None),
Ok(s) => {
let err_msg = format!("Exit code: {:?}", s.code());
let error = if !stderr_tail.is_empty() {
Some(format!("{}\n\nStderr:\n{}", err_msg, stderr_tail))
} else {
Some(err_msg)
};
(false, error)
}
Err(e) => (false, Some(e.to_string())),
};
let _ = event_tx_completion
.send(VmEvent::VmCompleted {
run_id: run_id_completion,
success,
error,
resume_id,
stderr_tail: if stderr_tail.is_empty() {
None
} else {
Some(stderr_tail)
},
})
.await;
});
Ok(VmExecutionHandle {
run_id,
mcp_server,
})
}
}
/// Build the prompt that makes Claude become the VM.
fn build_vm_prompt(prose_md: &str, program: &str, workspace: &Path, run_id: &str) -> String {
format!(
r#"You are the OpenProse VM. Read and internalize the VM specification below, then execute the program that follows.
CRITICAL: You ARE the virtual machine. This is not a simulation—you perform real execution:
- Each `session` statement spawns a real subagent via the Task tool
- Write bindings to the state directory as specified
- Use the narration protocol to track your execution state
- Evaluate discretion conditions (`**...**`) with your intelligence
=== VM SPECIFICATION ===
{prose_md}
=== PROGRAM TO EXECUTE ===
```prose
{program}
=== EXECUTION CONTEXT === Working directory: {workspace} Run ID: {run_id} State directory: {workspace}/.prose/runs/{run_id}/ Bindings directory: {workspace}/.prose/runs/{run_id}/bindings/
=== INSTRUCTIONS ===
- Parse the program structure (imports, agents, blocks, statements)
- Execute statements in order
- For each
sessionstatement, use the Task tool to spawn a subagent - Write outputs to bindings/ as markdown files
- Track your position using narration: [Position] statement N, [Binding] name, etc.
Begin execution now. Report your reasoning as you go."#, prose_md = prose_md, program = program, workspace = workspace.display(), run_id = run_id, ) }
/// Persist resume_id immediately when captured (for crash recovery). /// This spawns a blocking task to avoid blocking the async runtime. fn persist_resume_id(run_dir: PathBuf, resume_id: String) { // Spawn blocking task to avoid blocking async runtime tokio::task::spawn_blocking(move || { let session_path = run_dir.join("session.json");
// Read existing session state or create minimal one
let mut state: serde_json::Value = if let Ok(content) = std::fs::read_to_string(&session_path) {
serde_json::from_str(&content).unwrap_or_else(|_| serde_json::json!({}))
} else {
serde_json::json!({})
};
// Update resume_id and timestamp
state["resume_id"] = serde_json::json!(resume_id);
state["updated_at"] = serde_json::json!(chrono::Utc::now().to_rfc3339());
// Write atomically (write to temp then rename)
let tmp_path = session_path.with_extension("json.tmp");
if let Ok(json) = serde_json::to_string_pretty(&state) {
if std::fs::write(&tmp_path, &json).is_ok() {
let _ = std::fs::rename(&tmp_path, &session_path);
}
}
});
}
/// Parser for Claude's stream-json output, mapping to VM events. struct VmOutputParser { run_id: String, run_dir: PathBuf, resume_id: Option, pending_tools: HashMap<String, ToolInfo>, session_counter: u32, stream_state: StreamState, }
struct ToolInfo { name: String, #[allow(dead_code)] input: Value, session_id: Option, // For Task tools, the assigned session_id }
/// Streaming state for real-time progress during Claude processing. #[derive(Default)] struct StreamState { /// Currently active content block type (text, thinking, tool_use). current_block_type: Option, /// Buffer for accumulating text deltas. text_buffer: String, /// Count of deltas since last emit. delta_count: usize, /// Whether we've emitted the Processing event. processing_emitted: bool, }
impl VmOutputParser { fn new(run_id: String, run_dir: PathBuf) -> Self { Self { run_id, run_dir, resume_id: None, pending_tools: HashMap::new(), session_counter: 0, stream_state: StreamState::default(), } }
fn parse(&mut self, value: &Value) -> Vec<VmEvent> {
let mut events = Vec::new();
let event_type = value.get("type").and_then(Value::as_str).unwrap_or("");
match event_type {
"system" => {
// Extract resume token from init event (conductor pattern)
if value.get("subtype").and_then(Value::as_str) == Some("init") {
if let Some(sid) = value.get("session_id").and_then(Value::as_str) {
self.resume_id = Some(sid.to_string());
// Persist immediately for crash recovery (non-blocking)
persist_resume_id(self.run_dir.clone(), sid.to_string());
}
}
}
// Streaming events - for real-time progress indication
"message_start" => {
if !self.stream_state.processing_emitted {
self.stream_state.processing_emitted = true;
events.push(VmEvent::Processing {
run_id: self.run_id.clone(),
});
}
}
"content_block_start" => {
// Track what type of content block is starting
if let Some(block) = value.get("content_block").and_then(Value::as_object) {
let block_type = block.get("type").and_then(Value::as_str).unwrap_or("");
self.stream_state.current_block_type = Some(block_type.to_string());
self.stream_state.text_buffer.clear();
self.stream_state.delta_count = 0;
}
}
"content_block_delta" => {
if let Some(delta) = value.get("delta").and_then(Value::as_object) {
let delta_type = delta.get("type").and_then(Value::as_str).unwrap_or("");
let text = match delta_type {
"text_delta" => delta.get("text").and_then(Value::as_str),
"thinking_delta" => delta.get("thinking").and_then(Value::as_str),
_ => None,
};
if let Some(text) = text {
self.stream_state.text_buffer.push_str(text);
self.stream_state.delta_count += 1;
// Emit periodic updates (every 15 deltas or ~500 chars)
if self.stream_state.delta_count >= 15
|| self.stream_state.text_buffer.len() >= 500
{
let is_thinking =
self.stream_state.current_block_type.as_deref() == Some("thinking");
if is_thinking {
events.push(VmEvent::Thought {
run_id: self.run_id.clone(),
text: self.stream_state.text_buffer.clone(),
position: None,
});
} else {
events.push(VmEvent::StreamingText {
run_id: self.run_id.clone(),
text: self.stream_state.text_buffer.clone(),
is_complete: false,
});
}
self.stream_state.text_buffer.clear();
self.stream_state.delta_count = 0;
}
}
}
}
"content_block_stop" => {
// Flush any remaining buffered text
if !self.stream_state.text_buffer.is_empty() {
let is_thinking =
self.stream_state.current_block_type.as_deref() == Some("thinking");
if is_thinking {
events.push(VmEvent::Thought {
run_id: self.run_id.clone(),
text: std::mem::take(&mut self.stream_state.text_buffer),
position: None,
});
} else {
events.push(VmEvent::StreamingText {
run_id: self.run_id.clone(),
text: std::mem::take(&mut self.stream_state.text_buffer),
is_complete: true,
});
}
}
self.stream_state.current_block_type = None;
}
"assistant" => {
if let Some(message) = value.get("message").and_then(Value::as_object) {
if let Some(content) = message.get("content").and_then(Value::as_array) {
for block in content {
let block_type =
block.get("type").and_then(Value::as_str).unwrap_or("");
match block_type {
"tool_use" => {
events.extend(self.parse_tool_use(block));
}
"tool_result" => {
events.extend(self.parse_tool_result(block));
}
"text" => {
if let Some(text) = block.get("text").and_then(Value::as_str) {
events.push(VmEvent::TextOutput {
run_id: self.run_id.clone(),
text: text.to_string(),
});
// Extract narration markers
events.extend(self.extract_narration(text));
}
}
"thinking" => {
if let Some(text) =
block.get("thinking").and_then(Value::as_str)
{
events.push(VmEvent::Thought {
run_id: self.run_id.clone(),
text: text.to_string(),
position: None,
});
}
}
_ => {}
}
}
}
}
}
"user" => {
// User messages contain tool_result blocks (from internal tool execution)
if let Some(message) = value.get("message").and_then(Value::as_object) {
if let Some(content) = message.get("content").and_then(Value::as_array) {
for block in content {
let block_type =
block.get("type").and_then(Value::as_str).unwrap_or("");
if block_type == "tool_result" {
events.extend(self.parse_tool_result(block));
}
}
}
}
}
"result" => {
// Completion event - could extract usage stats here
// For now, just note that we've seen completion
}
_ => {}
}
events
}
fn parse_tool_use(&mut self, block: &Value) -> Vec<VmEvent> {
let Some(tool_id) = block.get("id").and_then(Value::as_str) else {
return vec![];
};
let Some(name) = block.get("name").and_then(Value::as_str) else {
return vec![];
};
let input = block.get("input").cloned().unwrap_or(Value::Null);
// Check if this is a Task call (session spawn)
if name == "Task" {
self.session_counter += 1;
let session_id = format!("s{:04}", self.session_counter);
// Store with session_id for later matching with result
self.pending_tools.insert(
tool_id.to_string(),
ToolInfo {
name: name.to_string(),
input: input.clone(),
session_id: Some(session_id.clone()),
},
);
let full_prompt = input
.get("prompt")
.and_then(Value::as_str)
.unwrap_or_default();
let prompt_preview = truncate(full_prompt, 100);
let agent = input
.get("subagent_type")
.and_then(Value::as_str)
.map(String::from);
// Estimate tokens: ~4 chars per token
// Don't add artificial overhead - let the breakdown match the total
let prompt_tokens = (full_prompt.len() / 4).max(1) as u32;
let token_estimate = prompt_tokens;
return vec![
VmEvent::ContextAssembled {
run_id: self.run_id.clone(),
session_id: session_id.clone(),
token_estimate,
prompt_tokens,
},
VmEvent::SessionSpawned {
run_id: self.run_id.clone(),
session_id,
agent,
prompt_preview,
},
];
}
// Non-Task tool: store for later matching with result
self.pending_tools.insert(
tool_id.to_string(),
ToolInfo {
name: name.to_string(),
input: input.clone(),
session_id: None,
},
);
// Generic tool call
vec![VmEvent::ToolCall {
run_id: self.run_id.clone(),
tool_id: tool_id.to_string(),
tool_name: name.to_string(),
input_preview: truncate(&input.to_string(), 200),
}]
}
fn parse_tool_result(&mut self, block: &Value) -> Vec<VmEvent> {
let mut events = Vec::new();
let tool_use_id = block
.get("tool_use_id")
.and_then(Value::as_str)
.unwrap_or("");
let is_error = block.get("is_error").and_then(Value::as_bool) == Some(true);
let output_preview = extract_result_preview(block.get("content"));
// Check if this was a Task call
if let Some(tool_info) = self.pending_tools.remove(tool_use_id) {
if tool_info.name == "Task" {
// Use the stored session_id (or fallback to counter-based for legacy)
let session_id = tool_info
.session_id
.unwrap_or_else(|| format!("s{:04}", self.session_counter));
if is_error {
events.push(VmEvent::SessionFailed {
run_id: self.run_id.clone(),
session_id,
error: output_preview.clone(),
});
} else {
events.push(VmEvent::SessionCompleted {
run_id: self.run_id.clone(),
session_id,
output_preview: output_preview.clone(),
});
}
}
}
events.push(VmEvent::ToolResult {
run_id: self.run_id.clone(),
tool_id: tool_use_id.to_string(),
is_error,
output_preview,
});
events
}
fn extract_narration(&self, text: &str) -> Vec<VmEvent> {
let mut events = Vec::new();
// Look for [Position], [Binding], [Thought] markers
for line in text.lines() {
let line = line.trim();
if line.starts_with("[Binding]") {
let rest = line.strip_prefix("[Binding]").unwrap_or("").trim();
if let Some(name) = rest.split_whitespace().next() {
events.push(VmEvent::BindingCreated {
run_id: self.run_id.clone(),
name: name.to_string(),
path: format!("bindings/{}.md", name),
});
}
}
}
events
}
}
fn extract_result_preview(content: Option<&Value>) -> String { // Use a larger limit so expanded view shows meaningful content const MAX_OUTPUT: usize = 4000; match content { None => String::new(), Some(Value::String(text)) => truncate(text, MAX_OUTPUT), Some(Value::Array(items)) => { let mut parts = Vec::new(); for item in items { if let Some(text) = item.get("text").and_then(Value::as_str) { if !text.is_empty() { parts.push(text.to_string()); } } } truncate(&parts.join("\n"), MAX_OUTPUT) } Some(other) => truncate(&other.to_string(), MAX_OUTPUT), } }
fn truncate(s: &str, max: usize) -> String { if s.len() <= max { s.to_string() } else { // Find a valid UTF-8 boundary at or before max let mut end = max; while end > 0 && !s.is_char_boundary(end) { end -= 1; } format!("{}...", &s[..end]) } }
fn generate_run_id() -> String { use chrono::Utc; let now = Utc::now(); let random: String = uuid::Uuid::new_v4().to_string()[..6].to_string(); format!("{}-{}", now.format("%Y%m%d-%H%M%S"), random) }
#[cfg(test)] mod tests { use super::*;
#[test]
fn test_generate_run_id() {
let id = generate_run_id();
assert!(id.len() > 15); // YYYYMMDD-HHMMSS-random
assert!(id.contains('-'));
}
#[test]
fn test_truncate() {
assert_eq!(truncate("hello", 10), "hello");
assert_eq!(truncate("hello world", 5), "hello...");
}
}
## File: ide/crates/prose-runtime/src/mcp_server.rs
//! MCP Server for VM tools (vm.ask_user, vm.suspend). //! //! This module provides an HTTP-based MCP server that Claude CLI can connect to //! for feedback blocks and puppet mode. Uses oneshot channels for suspension.
use axum::{ extract::State, http::StatusCode, routing::post, Json, Router, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{mpsc, oneshot, Mutex};
/// Events emitted by the MCP server to notify the UI. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum McpEvent { /// User question requested - UI should show prompt. AskUser { request_id: String, summary: String, question: Option, options: Vec, }, /// Puppet mode suspension requested. Suspend { request_id: String, session_id: String, context_summary: String, }, }
/// Response from UI for a pending request. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct McpResponse { pub request_id: String, pub response: Value, }
/// Shared state for the MCP server. #[derive(Default)] pub struct McpServerState { /// Pending requests waiting for UI response. pending: HashMap<String, oneshot::Sender>, }
/// MCP Server handle. pub struct McpServer { /// Port the server is running on. pub port: u16, /// Shared state for responding to requests. state: Arc<Mutex>, /// Shutdown signal. shutdown_tx: Option<oneshot::Sender<()>>, }
impl McpServer { /// Start the MCP server on a random available port. /// Returns the server handle and a receiver for MCP events. pub async fn start() -> anyhow::Result<(Self, mpsc::Receiver)> { let state = Arc::new(Mutex::new(McpServerState::default())); let (event_tx, event_rx) = mpsc::channel::(100); let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let app_state = AppState {
state: state.clone(),
event_tx,
};
let app = Router::new()
.route("/mcp", post(handle_mcp_request))
.with_state(app_state);
// Bind to random port
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let port = listener.local_addr()?.port();
// Spawn server task
tokio::spawn(async move {
axum::serve(listener, app)
.with_graceful_shutdown(async {
let _ = shutdown_rx.await;
})
.await
.ok();
});
Ok((
Self {
port,
state,
shutdown_tx: Some(shutdown_tx),
},
event_rx,
))
}
/// Get the MCP config JSON for Claude CLI.
pub fn mcp_config(&self) -> Value {
json!({
"mcpServers": {
"mission-control": {
"command": "curl",
"args": ["-s", "-X", "POST", "-H", "Content-Type: application/json",
"-d", "@-", format!("http://127.0.0.1:{}/mcp", self.port)]
}
}
})
}
/// Respond to a pending request.
pub async fn respond(&self, request_id: &str, response: Value) -> anyhow::Result<()> {
let mut state = self.state.lock().await;
if let Some(tx) = state.pending.remove(request_id) {
tx.send(response).map_err(|_| anyhow::anyhow!("Response channel closed"))?;
}
Ok(())
}
/// Shutdown the server.
pub fn shutdown(&mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
}
}
impl Drop for McpServer { fn drop(&mut self) { self.shutdown(); } }
#[derive(Clone)] struct AppState { state: Arc<Mutex>, event_tx: mpsc::Sender, }
/// MCP JSON-RPC request. #[derive(Debug, Deserialize)] struct McpRequest { #[allow(dead_code)] jsonrpc: String, id: Value, method: String, #[serde(default)] params: Value, }
/// MCP JSON-RPC response. #[derive(Debug, Serialize)] struct McpRpcResponse { jsonrpc: String, id: Value, #[serde(skip_serializing_if = "Option::is_none")] result: Option, #[serde(skip_serializing_if = "Option::is_none")] error: Option, }
#[derive(Debug, Serialize)] struct McpError { code: i32, message: String, }
async fn handle_mcp_request( State(app_state): State, Json(request): Json, ) -> Result<Json, StatusCode> { let response = match request.method.as_str() { "initialize" => { // MCP initialization McpRpcResponse { jsonrpc: "2.0".to_string(), id: request.id, result: Some(json!({ "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "mission-control", "version": "0.1.0" } })), error: None, } } "tools/list" => { // List available tools McpRpcResponse { jsonrpc: "2.0".to_string(), id: request.id, result: Some(json!({ "tools": [ { "name": "vm_ask_user", "description": "Ask the user a question and wait for their response. Use for feedback blocks.", "inputSchema": { "type": "object", "properties": { "summary": { "type": "string", "description": "Brief summary of current state/progress" }, "question": { "type": "string", "description": "The question to ask the user" }, "options": { "type": "array", "items": { "type": "string" }, "description": "Optional list of choices for the user" } }, "required": ["summary"] } }, { "name": "vm_suspend", "description": "Suspend execution and let the human take control (puppet mode).", "inputSchema": { "type": "object", "properties": { "session_id": { "type": "string", "description": "The session to suspend" }, "context_summary": { "type": "string", "description": "Summary of current context for the human" } }, "required": ["session_id", "context_summary"] } } ] })), error: None, } } "tools/call" => { // Execute a tool let tool_name = request.params.get("name").and_then(Value::as_str).unwrap_or(""); let arguments = request.params.get("arguments").cloned().unwrap_or(json!({}));
match tool_name {
"vm_ask_user" => {
handle_ask_user(&app_state, request.id, arguments).await
}
"vm_suspend" => {
handle_suspend(&app_state, request.id, arguments).await
}
_ => {
McpRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(McpError {
code: -32601,
message: format!("Unknown tool: {}", tool_name),
}),
}
}
}
}
_ => {
McpRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(McpError {
code: -32601,
message: format!("Method not found: {}", request.method),
}),
}
}
};
Ok(Json(response))
}
async fn handle_ask_user( app_state: &AppState, id: Value, arguments: Value, ) -> McpRpcResponse { let summary = arguments.get("summary").and_then(Value::as_str).unwrap_or("").to_string(); let question = arguments.get("question").and_then(Value::as_str).map(String::from); let options: Vec = arguments .get("options") .and_then(Value::as_array) .map(|arr| arr.iter().filter_map(Value::as_str).map(String::from).collect()) .unwrap_or_default();
let request_id = uuid::Uuid::new_v4().to_string();
// Create oneshot channel for response
let (tx, rx) = oneshot::channel::<Value>();
// Register pending request
{
let mut state = app_state.state.lock().await;
state.pending.insert(request_id.clone(), tx);
}
// Emit event to UI
let _ = app_state.event_tx.send(McpEvent::AskUser {
request_id: request_id.clone(),
summary,
question,
options,
}).await;
// Wait for response (with timeout)
match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
Ok(Ok(response)) => {
McpRpcResponse {
jsonrpc: "2.0".to_string(),
id,
result: Some(json!({
"content": [{
"type": "text",
"text": response.to_string()
}]
})),
error: None,
}
}
Ok(Err(_)) => {
// Channel closed (request cancelled)
McpRpcResponse {
jsonrpc: "2.0".to_string(),
id,
result: Some(json!({
"content": [{
"type": "text",
"text": "Request cancelled by user"
}],
"isError": true
})),
error: None,
}
}
Err(_) => {
// Timeout
// Clean up pending request
{
let mut state = app_state.state.lock().await;
state.pending.remove(&request_id);
}
McpRpcResponse {
jsonrpc: "2.0".to_string(),
id,
result: Some(json!({
"content": [{
"type": "text",
"text": "Request timed out (5 minutes)"
}],
"isError": true
})),
error: None,
}
}
}
}
async fn handle_suspend( app_state: &AppState, id: Value, arguments: Value, ) -> McpRpcResponse { let session_id = arguments.get("session_id").and_then(Value::as_str).unwrap_or("unknown").to_string(); let context_summary = arguments.get("context_summary").and_then(Value::as_str).unwrap_or("").to_string();
let request_id = uuid::Uuid::new_v4().to_string();
// Create oneshot channel for response
let (tx, rx) = oneshot::channel::<Value>();
// Register pending request
{
let mut state = app_state.state.lock().await;
state.pending.insert(request_id.clone(), tx);
}
// Emit event to UI
let _ = app_state.event_tx.send(McpEvent::Suspend {
request_id: request_id.clone(),
session_id,
context_summary,
}).await;
// Wait for response (puppet mode can be longer - 30 minutes)
match tokio::time::timeout(std::time::Duration::from_secs(1800), rx).await {
Ok(Ok(response)) => {
// Response contains action and possibly injected content
let action = response.get("action").and_then(Value::as_str).unwrap_or("continue");
let injected = response.get("response").and_then(Value::as_str);
let result_text = if action == "inject" && injected.is_some() {
format!("Human took control and provided: {}", injected.unwrap())
} else {
"Human released control. Continue execution.".to_string()
};
McpRpcResponse {
jsonrpc: "2.0".to_string(),
id,
result: Some(json!({
"content": [{
"type": "text",
"text": result_text
}]
})),
error: None,
}
}
Ok(Err(_)) => {
McpRpcResponse {
jsonrpc: "2.0".to_string(),
id,
result: Some(json!({
"content": [{
"type": "text",
"text": "Puppet mode cancelled"
}],
"isError": true
})),
error: None,
}
}
Err(_) => {
{
let mut state = app_state.state.lock().await;
state.pending.remove(&request_id);
}
McpRpcResponse {
jsonrpc: "2.0".to_string(),
id,
result: Some(json!({
"content": [{
"type": "text",
"text": "Puppet mode timed out (30 minutes)"
}],
"isError": true
})),
error: None,
}
}
}
}
#[cfg(test)] mod tests { use super::*;
#[tokio::test]
async fn test_mcp_server_starts() {
let (server, _event_rx) = McpServer::start().await.unwrap();
assert!(server.port > 0);
println!("MCP server started on port {}", server.port);
}
}
## File: ide/crates/prose-runtime/src/claude.rs
//! Claude Code CLI integration.
//!
//! Spawns claude CLI with stream-json output and parses events.
use anyhow::Result; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; use std::process::Stdio; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; use tokio::sync::mpsc;
/// Events emitted from Claude Code execution. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum ClaudeEvent { Started { session_id: String, model: Option, }, Message { text: String, }, ToolUse { id: String, name: String, input: Value, }, ToolResult { id: String, is_error: bool, result_preview: String, }, Completed { ok: bool, answer: String, resume_id: Option, error: Option, }, Error { message: String, }, }
/// Configuration for Claude invocation. #[derive(Debug, Clone)] pub struct ClaudeConfig { /// Model to use (sonnet, opus, haiku). pub model: String, /// Working directory. pub cwd: std::path::PathBuf, /// Resume token from previous session. pub resume_id: Option, /// Skip permission prompts (dangerous). pub skip_permissions: bool, }
impl Default for ClaudeConfig { fn default() -> Self { Self { model: "sonnet".to_string(), cwd: std::env::current_dir().unwrap_or_default(), resume_id: None, skip_permissions: false, } } }
/// State for parsing Claude's stream-json output. #[derive(Default)] struct ClaudeParser { resume: Option, pending: HashMap<String, Value>, }
/// Run Claude Code with a prompt and stream events. pub async fn run_claude( prompt: &str, config: ClaudeConfig, event_tx: mpsc::Sender, ) -> Result<()> { let mut args = vec![ "-p".to_string(), "--output-format".to_string(), "stream-json".to_string(), "--verbose".to_string(), ];
if config.skip_permissions {
args.push("--dangerously-skip-permissions".to_string());
}
args.push("--model".to_string());
args.push(config.model.clone());
if let Some(ref resume) = config.resume_id {
args.push("--resume".to_string());
args.push(resume.clone());
}
args.push("--".to_string());
args.push(prompt.to_string());
let mut child = Command::new("claude")
.args(&args)
.current_dir(&config.cwd)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to capture stdout"))?;
let stderr = child.stderr.take();
let event_tx_clone = event_tx.clone();
let stdout_task = tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
let mut parser = ClaudeParser::default();
while let Ok(Some(line)) = reader.next_line().await {
let value: Value = match serde_json::from_str(&line) {
Ok(v) => v,
Err(_) => continue,
};
if let Some(events) = parse_claude_event(&value, &mut parser) {
for event in events {
if event_tx_clone.send(event).await.is_err() {
break;
}
}
}
}
parser.resume
});
// Capture stderr for error messages
let stderr_task = if let Some(stderr) = stderr {
let event_tx_clone = event_tx.clone();
Some(tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
let mut stderr_lines = Vec::new();
while let Ok(Some(line)) = reader.next_line().await {
tracing::debug!("claude stderr: {}", line);
stderr_lines.push(line);
}
// If there's an error, send it
if !stderr_lines.is_empty() {
let _ = event_tx_clone
.send(ClaudeEvent::Error {
message: stderr_lines.join("\n"),
})
.await;
}
}))
} else {
None
};
let exit_status = child.wait().await?;
let resume = stdout_task.await?;
if let Some(task) = stderr_task {
let _ = task.await;
}
let ok = exit_status.success();
if !ok {
let _ = event_tx
.send(ClaudeEvent::Completed {
ok: false,
answer: String::new(),
resume_id: resume,
error: Some(format!("Claude exited with code {:?}", exit_status.code())),
})
.await;
}
Ok(())
}
fn parse_claude_event(value: &Value, state: &mut ClaudeParser) -> Option<Vec> { let event_type = value.get("type")?.as_str()?;
match event_type {
"system" => {
if value.get("subtype").and_then(Value::as_str) != Some("init") {
return Some(vec![]);
}
let session_id = value.get("session_id").and_then(Value::as_str)?;
state.resume = Some(session_id.to_string());
let model = value.get("model").and_then(Value::as_str).map(String::from);
Some(vec![ClaudeEvent::Started {
session_id: session_id.to_string(),
model,
}])
}
"assistant" => {
let message = value.get("message").and_then(Value::as_object)?;
let content = message.get("content").and_then(Value::as_array)?;
let mut events = Vec::new();
let mut text_parts = Vec::new();
for block in content {
let block_type = block.get("type").and_then(Value::as_str).unwrap_or("");
match block_type {
"tool_use" => {
let tool_id = block.get("id").and_then(Value::as_str)?;
let name = block
.get("name")
.and_then(Value::as_str)
.unwrap_or("tool");
let input = block.get("input").cloned().unwrap_or(Value::Null);
state.pending.insert(tool_id.to_string(), block.clone());
events.push(ClaudeEvent::ToolUse {
id: tool_id.to_string(),
name: name.to_string(),
input,
});
}
"tool_result" => {
let tool_use_id = block.get("tool_use_id").and_then(Value::as_str)?;
let is_error =
block.get("is_error").and_then(Value::as_bool) == Some(true);
let preview = result_preview(block.get("content"));
state.pending.remove(tool_use_id);
events.push(ClaudeEvent::ToolResult {
id: tool_use_id.to_string(),
is_error,
result_preview: preview,
});
}
"text" => {
if let Some(text) = block.get("text").and_then(Value::as_str) {
text_parts.push(text.to_string());
}
}
_ => {}
}
}
if !text_parts.is_empty() {
events.push(ClaudeEvent::Message {
text: text_parts.join("\n"),
});
}
Some(events)
}
"result" => {
let ok = value.get("is_error").and_then(Value::as_bool) != Some(true);
let answer = value
.get("result")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let error = if ok { None } else { Some(answer.clone()) };
Some(vec![ClaudeEvent::Completed {
ok,
answer,
resume_id: state.resume.clone(),
error,
}])
}
_ => None,
}
}
fn result_preview(content: Option<&Value>) -> String { match content { None => String::new(), Some(Value::String(text)) => truncate(text, 500), Some(Value::Array(items)) => { let mut parts = Vec::new(); for item in items { if let Some(text) = item.get("text").and_then(Value::as_str) { if !text.is_empty() { parts.push(text.to_string()); } } } truncate(&parts.join("\n"), 500) } Some(Value::Object(obj)) => obj .get("text") .and_then(Value::as_str) .map(|s| truncate(s, 500)) .unwrap_or_default(), Some(other) => truncate(&other.to_string(), 500), } }
fn truncate(s: &str, max: usize) -> String { if s.len() <= max { s.to_string() } else { // Find a valid UTF-8 boundary at or before max let mut end = max; while end > 0 && !s.is_char_boundary(end) { end -= 1; } format!("{}...", &s[..end]) } }
#[cfg(test)] mod tests { use super::*;
#[test]
fn test_parse_system_init() {
let json = r#"{"type":"system","subtype":"init","session_id":"abc123","model":"claude-sonnet"}"#;
let value: Value = serde_json::from_str(json).unwrap();
let mut state = ClaudeParser::default();
let events = parse_claude_event(&value, &mut state).unwrap();
assert_eq!(events.len(), 1);
match &events[0] {
ClaudeEvent::Started { session_id, model } => {
assert_eq!(session_id, "abc123");
assert_eq!(model.as_deref(), Some("claude-sonnet"));
}
_ => panic!("Expected Started event"),
}
assert_eq!(state.resume, Some("abc123".to_string()));
}
}
## File: ide/crates/prose-runtime/src/executor.rs
use crate::claude::{run_claude, ClaudeConfig, ClaudeEvent}; use crate::suspension::{ExecutionState, ResumeData, SuspensionPoint}; use anyhow::Result; use prose_core::ast::{FeedbackBlock, Program, SessionHead, Stmt}; use prose_core::ir::{lower_program, IrProgram, IrStmt}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{mpsc, oneshot, Mutex};
/// Configuration for the executor. #[derive(Debug, Clone)] pub struct ExecutorConfig { /// Enable puppet mode (allow human takeover of sessions). pub puppet_mode_enabled: bool, /// Working directory for file operations. pub workspace_root: std::path::PathBuf, /// Default model for sessions (sonnet, opus, haiku). pub model: String, /// Skip permission prompts (dangerous, for automated runs). pub skip_permissions: bool, /// Use mock execution instead of real Claude (for testing). pub mock_execution: bool, }
impl Default for ExecutorConfig { fn default() -> Self { Self { puppet_mode_enabled: false, workspace_root: std::env::current_dir().unwrap_or_default(), model: "sonnet".to_string(), skip_permissions: false, mock_execution: false, } } }
/// Event emitted during execution. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "event_type")] pub enum RuntimeEvent { RunStarted { run_id: String, program_file: String, }, SessionStarted { session_id: String, agent: Option, }, SessionCompleted { session_id: String, output: Option, tokens_used: u64, }, BindingCreated { name: String, value_preview: String, }, FeedbackRequested { event_id: String, summary: String, question: Option, options: Vec, }, FeedbackProvided { event_id: String, selected_option: Option, user_input: Option, }, PuppetModeEntered { session_id: String, }, PuppetModeInput { session_id: String, human_response: String, }, PuppetModeExited { session_id: String, }, RunCompleted { run_id: String, success: bool, }, Error { message: String, }, }
/// Handle to control a running execution. pub struct ExecutionHandle { /// Receives events from the executor. pub events: mpsc::Receiver, /// Current state (may be suspended). Public to allow RPC server to share state. pub state: Arc<Mutex>, }
impl ExecutionHandle { /// Resume execution from a suspension point. pub async fn resume(&self, data: ResumeData) -> Result<()> { let mut state = self.state.lock().await; match std::mem::replace(&mut *state, ExecutionState::Running) { ExecutionState::Suspended { resume_tx, .. } => { let _ = resume_tx.send(data); Ok(()) } other => { *state = other; anyhow::bail!("Not suspended"); } } }
/// Check if execution is suspended.
pub async fn get_suspension(&self) -> Option<SuspensionPoint> {
let state = self.state.lock().await;
match &*state {
ExecutionState::Suspended { point, .. } => Some(point.clone()),
_ => None,
}
}
}
/// The main execution engine. pub struct Executor { config: ExecutorConfig, }
impl Executor { pub fn new(config: ExecutorConfig) -> Self { Self { config } }
/// Run a program and return a handle for control.
pub async fn run(&self, program: Program) -> Result<ExecutionHandle> {
let ir = lower_program(&program)?;
let (event_tx, event_rx) = mpsc::channel(100);
let state = Arc::new(Mutex::new(ExecutionState::Running));
let run_id = format!("run_{}", chrono_lite_id());
event_tx
.send(RuntimeEvent::RunStarted {
run_id: run_id.clone(),
program_file: "input".to_string(),
})
.await?;
let state_clone = state.clone();
let event_tx_clone = event_tx.clone();
let config = self.config.clone();
tokio::spawn(async move {
let result = execute_ir(&ir, &config, &event_tx_clone, &state_clone).await;
let success = result.is_ok();
if let Err(e) = &result {
let _ = event_tx_clone
.send(RuntimeEvent::Error {
message: e.to_string(),
})
.await;
}
let _ = event_tx_clone
.send(RuntimeEvent::RunCompleted {
run_id,
success,
})
.await;
let mut state = state_clone.lock().await;
*state = if success {
ExecutionState::Completed
} else {
ExecutionState::Failed(result.unwrap_err())
};
});
Ok(ExecutionHandle {
events: event_rx,
state,
})
}
}
struct ExecCtx<'a> { config: &'a ExecutorConfig, events: &'a mpsc::Sender, state: &'a Arc<Mutex>, vars: HashMap<String, String>, session_counter: usize, }
async fn execute_ir( ir: &IrProgram, config: &ExecutorConfig, events: &mpsc::Sender, state: &Arc<Mutex>, ) -> Result<()> { let mut ctx = ExecCtx { config, events, state, vars: HashMap::new(), session_counter: 0, };
for stmt in &ir.body {
execute_ir_stmt(stmt, &mut ctx).await?;
}
Ok(())
}
async fn execute_ir_stmt(stmt: &IrStmt, ctx: &mut ExecCtx<'>) -> Result<()> { match stmt { IrStmt::Input() => { // Inputs should already be bound from CLI args or environment } IrStmt::Session(ir_session) => { let session_id = ir_session.id.clone(); let agent = match &ir_session.stmt.head { SessionHead::Agent(a) => Some(a.clone()), SessionHead::NamedAgent { agent, .. } => Some(agent.clone()), SessionHead::Prompt(_) => None, };
ctx.events
.send(RuntimeEvent::SessionStarted {
session_id: session_id.clone(),
agent: agent.clone(),
})
.await?;
// Check if puppet mode should be triggered
if ctx.config.puppet_mode_enabled {
let (resume_tx, resume_rx) = oneshot::channel();
{
let mut s = ctx.state.lock().await;
*s = ExecutionState::Suspended {
point: SuspensionPoint::PuppetMode {
session_id: session_id.clone(),
agent: agent.clone().unwrap_or_default(),
context_summary: "Session context".to_string(),
last_output: None,
},
resume_tx,
};
}
ctx.events
.send(RuntimeEvent::PuppetModeEntered {
session_id: session_id.clone(),
})
.await?;
// Wait for human input
let resume_data = resume_rx.await?;
match resume_data {
ResumeData::PuppetResponse { human_response } => {
ctx.events
.send(RuntimeEvent::PuppetModeInput {
session_id: session_id.clone(),
human_response: human_response.clone(),
})
.await?;
}
ResumeData::PuppetRelease => {
// Human released control, continue with normal execution
}
ResumeData::Cancel => {
anyhow::bail!("Execution cancelled");
}
ResumeData::FeedbackResponse { .. } => {
// Unexpected feedback response in puppet mode context
}
}
ctx.events
.send(RuntimeEvent::PuppetModeExited {
session_id: session_id.clone(),
})
.await?;
}
// Execute the session with Claude
let output = if ctx.config.mock_execution {
// Mock mode for testing
"[Mock session output]".to_string()
} else {
// Build prompt from session head
let prompt = match &ir_session.stmt.head {
SessionHead::Agent(agent) => format!("You are acting as agent: {}", agent),
SessionHead::NamedAgent { agent, .. } => format!("You are acting as agent: {}", agent),
SessionHead::Prompt(p) => p.clone(),
};
// Call Claude Code
let (claude_tx, mut claude_rx) = mpsc::channel::<ClaudeEvent>(100);
let claude_config = ClaudeConfig {
model: ctx.config.model.clone(),
cwd: ctx.config.workspace_root.clone(),
resume_id: None,
skip_permissions: ctx.config.skip_permissions,
};
let events_clone = ctx.events.clone();
// Spawn Claude execution
let claude_handle = tokio::spawn(async move {
run_claude(&prompt, claude_config, claude_tx).await
});
// Process Claude events
let mut answer = String::new();
while let Some(event) = claude_rx.recv().await {
match event {
ClaudeEvent::Message { text } => {
answer.push_str(&text);
}
ClaudeEvent::ToolUse { id, name, input } => {
let _ = events_clone
.send(RuntimeEvent::BindingCreated {
name: format!("tool:{}:{}", name, id),
value_preview: truncate(&input.to_string(), 100),
})
.await;
}
ClaudeEvent::Completed { ok, answer: final_answer, error, .. } => {
if !ok {
if let Some(err) = error {
let _ = events_clone
.send(RuntimeEvent::Error { message: err })
.await;
}
}
answer = final_answer;
}
ClaudeEvent::Error { message } => {
let _ = events_clone
.send(RuntimeEvent::Error { message })
.await;
}
_ => {}
}
}
// Wait for Claude to finish
let _ = claude_handle.await;
answer
};
ctx.events
.send(RuntimeEvent::SessionCompleted {
session_id,
output: Some(output),
tokens_used: 100, // TODO: get from Claude usage
})
.await?;
}
IrStmt::Let(b) | IrStmt::Const(b) | IrStmt::Assign(b) | IrStmt::Output(b) => {
let value = format!("[value of {}]", b.name);
ctx.vars.insert(b.name.clone(), value.clone());
ctx.events
.send(RuntimeEvent::BindingCreated {
name: b.name.clone(),
value_preview: truncate(&value, 50),
})
.await?;
}
IrStmt::Feedback(f) => {
execute_feedback(f, ctx).await?;
}
IrStmt::Parallel(p) => {
for br in &p.branches {
execute_ast_stmt(&br.stmt, ctx).await?;
}
}
IrStmt::Repeat(r) => {
for i in 0..r.times {
if let Some(idx) = &r.index {
ctx.vars.insert(idx.clone(), i.to_string());
}
for s in &r.body {
execute_ast_stmt(s, ctx).await?;
}
}
}
IrStmt::ForEach(f) => {
let mock_items = vec!["item1", "item2", "item3"];
for (i, item) in mock_items.iter().enumerate() {
ctx.vars.insert(f.item.clone(), item.to_string());
if let Some(idx) = &f.index {
ctx.vars.insert(idx.clone(), i.to_string());
}
for s in &f.body {
execute_ast_stmt(s, ctx).await?;
}
}
}
IrStmt::Loop(l) => {
let max = l.max.unwrap_or(10);
for i in 0..max {
if let Some(idx) = &l.index {
ctx.vars.insert(idx.clone(), i.to_string());
}
for s in &l.body {
execute_ast_stmt(s, ctx).await?;
}
}
}
IrStmt::Try(t) => {
let result: Result<()> = async {
for s in &t.body {
execute_ast_stmt(s, ctx).await?;
}
Ok(())
}
.await;
if let Err(e) = result {
if let Some(c) = &t.catch {
if let Some(err_var) = &c.err {
ctx.vars.insert(err_var.clone(), e.to_string());
}
for s in &c.body {
execute_ast_stmt(s, ctx).await?;
}
}
}
if let Some(finally) = &t.finally {
for s in finally {
execute_ast_stmt(s, ctx).await?;
}
}
}
IrStmt::Choice(c) => {
if let Some(opt) = c.options.first() {
for s in &opt.body {
execute_ast_stmt(s, ctx).await?;
}
}
}
IrStmt::If(i) => {
for s in &i.then_body {
execute_ast_stmt(s, ctx).await?;
}
}
IrStmt::Do(d) => {
for s in &d.body {
execute_ast_stmt(s, ctx).await?;
}
}
IrStmt::Throw(t) => {
let msg = t.message.clone().unwrap_or_else(|| "Throw".to_string());
anyhow::bail!("{}", msg);
}
IrStmt::Raw(r) => {
for s in &r.body {
execute_ast_stmt(s, ctx).await?;
}
}
IrStmt::Resume(_) | IrStmt::OutputExpr(_) => {}
}
Ok(())
}
/// Execute an AST Stmt (found inside IR block bodies). fn execute_ast_stmt<'a>( stmt: &'a Stmt, ctx: &'a mut ExecCtx<'>, ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> { Box::pin(async move { match stmt { Stmt::Input() | Stmt::Agent() | Stmt::Block() | Stmt::Use() => {} Stmt::Session(s) => { ctx.session_counter += 1; let session_id = format!("s{:04}", ctx.session_counter); let agent = match &s.head { SessionHead::Agent(a) => Some(a.clone()), SessionHead::NamedAgent { agent, .. } => Some(agent.clone()), SessionHead::Prompt() => None, };
ctx.events
.send(RuntimeEvent::SessionStarted {
session_id: session_id.clone(),
agent,
})
.await?;
ctx.events
.send(RuntimeEvent::SessionCompleted {
session_id,
output: Some("[Mock session output]".to_string()),
tokens_used: 100,
})
.await?;
}
Stmt::Resume(_) => {}
Stmt::Let(b) | Stmt::Const(b) | Stmt::Assign(b) | Stmt::Output(b) => {
let value = format!("[value of {}]", b.name);
ctx.vars.insert(b.name.clone(), value.clone());
ctx.events
.send(RuntimeEvent::BindingCreated {
name: b.name.clone(),
value_preview: truncate(&value, 50),
})
.await?;
}
Stmt::OutputExpr(_) => {}
Stmt::Parallel(p) => {
for br in &p.branches {
execute_ast_stmt(&br.stmt, ctx).await?;
}
}
Stmt::Repeat(r) => {
for i in 0..r.times {
if let Some(idx) = &r.index {
ctx.vars.insert(idx.clone(), i.to_string());
}
for s in &r.body {
execute_ast_stmt(s, ctx).await?;
}
}
}
Stmt::ForEach(f) => {
let mock_items = vec!["item1", "item2", "item3"];
for (i, item) in mock_items.iter().enumerate() {
ctx.vars.insert(f.item.clone(), item.to_string());
if let Some(idx) = &f.index {
ctx.vars.insert(idx.clone(), i.to_string());
}
for s in &f.body {
execute_ast_stmt(s, ctx).await?;
}
}
}
Stmt::Loop(l) => {
let max = l.max.unwrap_or(10);
for i in 0..max {
if let Some(idx) = &l.index {
ctx.vars.insert(idx.clone(), i.to_string());
}
for s in &l.body {
execute_ast_stmt(s, ctx).await?;
}
}
}
Stmt::Try(t) => {
let result: Result<()> = async {
for s in &t.body {
execute_ast_stmt(s, ctx).await?;
}
Ok(())
}
.await;
if let Err(e) = result {
if let Some(c) = &t.catch {
if let Some(err_var) = &c.err {
ctx.vars.insert(err_var.clone(), e.to_string());
}
for s in &c.body {
execute_ast_stmt(s, ctx).await?;
}
}
}
if let Some(finally) = &t.finally {
for s in finally {
execute_ast_stmt(s, ctx).await?;
}
}
}
Stmt::Choice(c) => {
if let Some(opt) = c.options.first() {
for s in &opt.body {
execute_ast_stmt(s, ctx).await?;
}
}
}
Stmt::If(i) => {
for s in &i.then_body {
execute_ast_stmt(s, ctx).await?;
}
}
Stmt::Do(d) => {
for s in &d.body {
execute_ast_stmt(s, ctx).await?;
}
}
Stmt::Throw(t) => {
let msg = t.message.clone().unwrap_or_else(|| "Throw".to_string());
anyhow::bail!("{}", msg);
}
Stmt::Raw(r) => {
for s in &r.body {
execute_ast_stmt(s, ctx).await?;
}
}
Stmt::Feedback(f) => {
execute_feedback(f, ctx).await?;
}
}
Ok(())
})
}
async fn execute_feedback(f: &FeedbackBlock, ctx: &mut ExecCtx<'>) -> Result<()> { let event_id = format!("fb{}", chrono_lite_id());
ctx.events
.send(RuntimeEvent::FeedbackRequested {
event_id: event_id.clone(),
summary: f.summary.clone(),
question: f.question.clone(),
options: f.options.clone(),
})
.await?;
let (resume_tx, resume_rx) = oneshot::channel();
{
let mut s = ctx.state.lock().await;
*s = ExecutionState::Suspended {
point: SuspensionPoint::Feedback {
event_id: event_id.clone(),
summary: f.summary.clone(),
question: f.question.clone(),
options: f.options.clone(),
display_binding: f.display.clone(),
display_value: f.display.as_ref().and_then(|d| ctx.vars.get(d).cloned()),
},
resume_tx,
};
}
let resume_data = resume_rx.await?;
match resume_data {
ResumeData::FeedbackResponse {
selected_option,
user_input,
} => {
ctx.events
.send(RuntimeEvent::FeedbackProvided {
event_id,
selected_option,
user_input,
})
.await?;
}
ResumeData::Cancel => {
anyhow::bail!("Execution cancelled at feedback point");
}
_ => {}
}
{
let mut s = ctx.state.lock().await;
*s = ExecutionState::Running;
}
Ok(())
}
fn truncate(s: &str, max: usize) -> String { if s.len() <= max { s.to_string() } else { // Find a valid UTF-8 boundary at or before max let mut end = max; while end > 0 && !s.is_char_boundary(end) { end -= 1; } format!("{}...", &s[..end]) } }
fn chrono_lite_id() -> String { use std::time::{SystemTime, UNIX_EPOCH}; let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis(); format!("{:x}", now) }
## File: ide/apps/mission-control/src/lib/api.ts
import { invoke } from "@tauri-apps/api/core"; import type { CheckpointInfo, RunSummary, TraceEvent } from "../types";
async function tauriInvoke(cmd: string, args?: Record<string, unknown>): Promise { return invoke(cmd, args); }
export const api = { listRuns: (workspaceRoot: string) => tauriInvoke<RunSummary[]>("list_runs", { workspaceRoot }),
readRunEvents: (workspaceRoot: string, runId: string) => tauriInvoke<TraceEvent[]>("read_run_events", { workspaceRoot, runId }),
readProgram: (workspaceRoot: string, runId: string) => tauriInvoke("read_run_program", { workspaceRoot, runId }),
readBinding: (workspaceRoot: string, runId: string, name: string) => tauriInvoke("read_run_binding", { workspaceRoot, runId, name }),
appendIntervention: (workspaceRoot: string, runId: string, note: string) => tauriInvoke("append_intervention", { workspaceRoot, runId, note }),
forkRun: (workspaceRoot: string, runId: string) => tauriInvoke("fork_run", { workspaceRoot, runId }),
blameSession: (workspaceRoot: string, runId: string, sessionId: string) => tauriInvoke("blame_session", { workspaceRoot, runId, sessionId }),
checkpoints: { list: (workspaceRoot: string) => tauriInvoke<CheckpointInfo[]>("checkpoint_list", { workspaceRoot }), create: (workspaceRoot: string, label?: string) => tauriInvoke("checkpoint_create", { workspaceRoot, label: label ?? null }), restore: (workspaceRoot: string, checkpointId: string) => tauriInvoke("checkpoint_restore", { workspaceRoot, checkpointId }), diff: (workspaceRoot: string, checkpointId: string) => tauriInvoke("checkpoint_diff", { workspaceRoot, checkpointId }), },
watch: { start: (workspaceRoot: string, runId: string) => tauriInvoke("watch_run", { workspaceRoot, runId }), stop: () => tauriInvoke("watch_stop"), },
// File operations saveFile: (filePath: string, content: string) => tauriInvoke("save_file", { filePath, content }),
openFile: (filePath: string) => tauriInvoke("open_file", { filePath }),
// Lint via prose CLI lint: async (content: string): Promise<{ success: boolean; message?: string; errors?: Array<{ line: number; col: number; message: string }>; }> => { try { const result = await tauriInvoke<{ success: boolean; message?: string; errors?: Array<{ line: number; col: number; message: string }>; }>("lint_prose", { content }); return result; } catch (err) { return { success: false, message: String(err) }; } },
// Run a prose program (legacy executor) runProse: async ( programContent: string, workspaceRoot?: string, model?: string, skipPermissions?: boolean ): Promise<{ run_id: string; success: boolean; message?: string; }> => { return tauriInvoke("run_prose", { programContent, workspaceRoot: workspaceRoot ?? null, model: model ?? null, skipPermissions: skipPermissions ?? null, }); },
// Run a prose program using Claude-as-VM approach (new!) runProseVm: async ( programContent: string, workspaceRoot?: string, model?: string, skipPermissions?: boolean, resumeId?: string, runId?: string ): Promise<{ run_id: string; success: boolean; message?: string; }> => { return tauriInvoke("run_prose_vm", { programContent, workspaceRoot: workspaceRoot ?? null, model: model ?? null, skipPermissions: skipPermissions ?? null, resumeId: resumeId ?? null, runId: runId ?? null, }); },
// List prose files in workspace listProseFiles: (workspaceRoot?: string) => tauriInvoke<string[]>("list_prose_files", { workspaceRoot: workspaceRoot ?? null }),
// Clear all runs in workspace clearRuns: (workspaceRoot?: string) => tauriInvoke("clear_runs", { workspaceRoot: workspaceRoot ?? null }),
// Delete a specific run deleteRun: (workspaceRoot: string, runId: string) => tauriInvoke("delete_run", { workspaceRoot, runId }),
// Submit feedback response to an MCP request (vm_ask_user or vm_suspend) submitFeedback: (requestId: string, response: { selected_option?: number; user_input?: string }) => tauriInvoke("submit_feedback", { requestId, response }), };
// WebSocket client for interactive runtime export type RuntimeEvent = { event_type: string; run_id?: string; session_id?: string; agent?: string; output?: string; tokens_used?: number; name?: string; value_preview?: string; event_id?: string; summary?: string; question?: string; options?: string[]; selected_option?: number; user_input?: string; human_response?: string; success?: boolean; message?: string; };
export type ResumeData = | { type: "FeedbackResponse"; selected_option?: number; user_input?: string } | { type: "PuppetResponse"; human_response: string } | { type: "PuppetRelease" } | { type: "Cancel" };
export class RuntimeConnection { private ws: WebSocket | null = null; private requestId = 0; private pendingRequests = new Map<number, { resolve: (v: unknown) => void; reject: (e: Error) => void }>();
onEvent?: (event: RuntimeEvent) => void; onConnect?: () => void; onDisconnect?: () => void;
connect(url: string = "ws://127.0.0.1:9999"): Promise { return new Promise((resolve, reject) => { this.ws = new WebSocket(url);
this.ws.onopen = () => {
this.onConnect?.();
resolve();
};
this.ws.onerror = (event) => {
reject(new Error(`WebSocket error: ${event.type}`));
};
this.ws.onclose = () => {
// Reject all pending requests to prevent memory leaks
for (const [id, { reject }] of this.pendingRequests) {
reject(new Error("Connection closed"));
}
this.pendingRequests.clear();
this.onDisconnect?.();
this.ws = null;
};
this.ws.onmessage = (msg) => {
try {
const data = JSON.parse(msg.data);
// Check if it's a response to a request
if (data.id !== undefined && this.pendingRequests.has(data.id)) {
const { resolve, reject } = this.pendingRequests.get(data.id)!;
this.pendingRequests.delete(data.id);
if (data.error) {
reject(new Error(data.error.message));
} else {
resolve(data.result);
}
return;
}
// Otherwise it's a notification (event)
if (data.method === "event" && data.params) {
this.onEvent?.(data.params as RuntimeEvent);
}
} catch (e) {
console.error("Failed to parse runtime message:", e);
}
};
});
}
disconnect(): void { this.ws?.close(); this.ws = null; }
private async rpc(method: string, params: Record<string, unknown>): Promise { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { throw new Error("Not connected"); }
const id = ++this.requestId;
return new Promise((resolve, reject) => {
// Set up timeout that clears on success/failure
const timeoutId = setTimeout(() => {
if (this.pendingRequests.has(id)) {
this.pendingRequests.delete(id);
reject(new Error("Request timeout"));
}
}, 30000);
this.pendingRequests.set(id, {
resolve: (v: unknown) => {
clearTimeout(timeoutId);
resolve(v as T);
},
reject: (e: Error) => {
clearTimeout(timeoutId);
reject(e);
},
});
this.ws!.send(JSON.stringify({ id, method, params }));
});
}
async run(source: string): Promise<{ status: string }> { return this.rpc("run", { source }); }
async resume(data: ResumeData): Promise<{ status: string }> { return this.rpc("resume", data); }
async cancel(): Promise<{ status: string }> { return this.rpc("cancel", {}); }
async getSuspension(): Promise<{ type: string; event_id?: string; session_id?: string; summary?: string; question?: string; options?: string[]; } | null> { return this.rpc("get_suspension", {}); } }
## File: ide/apps/mission-control/src/lib/trace.ts
import type { TraceEvent } from "../types";
export function timeShort(ts: string): string { const m = ts.match(/T(\d\d:\d\d:\d\d)/); return m ? m[1] : ts; }
export type IncludedBinding = { name: string; binding_id: string; token_count?: number; reason?: "explicit" | "inferred" | "required" | "memory"; reason_detail?: string; };
export type Citation = { binding_id: string; chunk_id?: string; excerpt?: string; };
export type ToolOutput = { tool: string; result: string; };
export type VerdictEvidence = { citations?: Citation[]; tool_outputs?: ToolOutput[]; };
export type VerdictRubric = { criteria?: string[]; thresholds?: Record<string, number>; };
export type Verdict = { event_id: string; event_type: string; condition?: string; result?: boolean; note?: string; rubric?: VerdictRubric; evidence?: VerdictEvidence; confidence?: number; uncertainties?: string[]; };
export type SessionView = { sessionId: string; head?: string; tokenEstimate?: number; includedBindings: IncludedBinding[]; producedBindings: { name?: string; binding_id: string }[]; events: TraceEvent[]; verdicts: Verdict[]; tokens?: number; cost?: number; promptPreview?: string; outputPreview?: string; };
export function sessionsFromEvents(events: TraceEvent[]): SessionView[] { const bySession = new Map<string, TraceEvent[]>(); for (const ev of events) { const sid = ev.session_id ?? "-"; const v = bySession.get(sid) ?? []; v.push(ev); bySession.set(sid, v); }
const out: SessionView[] = []; for (const [sessionId, sesEvents] of bySession.entries()) { sesEvents.sort((a, b) => a.timestamp.localeCompare(b.timestamp)); const view: SessionView = { sessionId, includedBindings: [], producedBindings: [], events: sesEvents, verdicts: [], }; for (const ev of sesEvents) { if (ev.event_type === "SessionPlanned" && typeof ev.head === "string") { view.head = ev.head; } if (ev.event_type === "SessionStarted") { if (typeof ev.prompt_preview === "string") { view.promptPreview = ev.prompt_preview; } } if (ev.event_type === "ContextAssembled") { if (typeof ev.token_estimate === "number") view.tokenEstimate = ev.token_estimate; const included = ev.included_bindings; if (Array.isArray(included)) { view.includedBindings = included .map((x) => { if (!x || typeof x !== "object") return null; const obj = x as Record<string, unknown>; const name = obj.name; const bindingId = obj.binding_id; if (typeof name !== "string" || typeof bindingId !== "string") return null; const binding: IncludedBinding = { name, binding_id: bindingId }; if (typeof obj.token_count === "number") binding.token_count = obj.token_count; if (typeof obj.reason === "string") binding.reason = obj.reason as IncludedBinding["reason"]; if (typeof obj.reason_detail === "string") binding.reason_detail = obj.reason_detail; return binding; }) .filter(Boolean) as IncludedBinding[]; } } if (ev.event_type === "BindingMaterialized") { const bindingId = ev.binding_id; const name = ev.name; if (typeof bindingId === "string") { view.producedBindings.push({ binding_id: bindingId, name: typeof name === "string" ? name : undefined, }); } } if (ev.event_type === "SessionCompleted") { if (typeof ev.tokens_delta === "number") view.tokens = ev.tokens_delta; if (typeof ev.cost_delta === "number") view.cost = ev.cost_delta; if (typeof ev.output_preview === "string") { view.outputPreview = ev.output_preview; } } // Parse verdicts from DiscretionEvaluated events if (ev.event_type === "DiscretionEvaluated") { const verdict: Verdict = { event_id: ev.event_id, event_type: ev.event_type, }; if (ev.condition !== undefined) verdict.condition = ev.condition; if (ev.result !== undefined) verdict.result = ev.result; if (ev.note !== undefined) verdict.note = ev.note; if (ev.confidence !== undefined) verdict.confidence = ev.confidence; if (ev.uncertainties) { verdict.uncertainties = ev.uncertainties; } // Parse rubric if (ev.rubric) { verdict.rubric = {}; if (ev.rubric.criteria) { verdict.rubric.criteria = ev.rubric.criteria; } if (ev.rubric.thresholds) { verdict.rubric.thresholds = ev.rubric.thresholds; } } // Parse evidence if (ev.evidence) { verdict.evidence = {}; if (ev.evidence.citations) { verdict.evidence.citations = ev.evidence.citations.map((c) => ({ binding_id: c.binding_id, chunk_id: c.chunk_id, excerpt: c.excerpt, })); } if (ev.evidence.tool_outputs) { verdict.evidence.tool_outputs = ev.evidence.tool_outputs.map((t) => ({ tool: t.tool, result: t.result, })); } } view.verdicts.push(verdict); } }
// If no ContextAssembled event but we have a prompt, estimate tokens
// This ensures the Context tab shows something for simple sessions
if (view.tokenEstimate === undefined && view.promptPreview) {
// Rough estimate: ~4 chars per token
const promptTokens = Math.max(1, Math.ceil(view.promptPreview.length / 4));
view.tokenEstimate = promptTokens;
// Add a synthetic binding for the prompt so the Context tab shows it
view.includedBindings = [{
name: "prompt",
binding_id: "prompt",
token_count: promptTokens,
reason: "required",
reason_detail: "Session prompt",
}];
}
out.push(view);
}
out.sort((a, b) => a.sessionId.localeCompare(b.sessionId)); return out; }
export function runTotals(events: TraceEvent[]) { let tokens = 0; let cost = 0; let sessions = 0; for (const ev of events) { if (ev.event_type === "SessionCompleted") sessions += 1; if (typeof ev.tokens_delta === "number") tokens += ev.tokens_delta; if (typeof ev.cost_delta === "number") cost += ev.cost_delta; } return { sessions, tokens, cost }; }
export type PreflightView = { mode?: string; profile?: string; sandbox?: boolean; secretStatus?: "ok" | "fail"; secretHits?: string[]; capStatus?: "ok" | "fail"; required?: string[]; allowed?: string[]; missing?: string[]; };
export function preflightFromEvents(events: TraceEvent[]): PreflightView { const out: PreflightView = {}; for (const e of events) { if (e.event_type === "RunBudgetSet") { if (typeof e.mode === "string") out.mode = e.mode; if (typeof e.profile === "string") out.profile = e.profile; if (typeof e.sandbox === "boolean") out.sandbox = e.sandbox; } if (e.event_type === "SecretScanPerformed") { if (typeof e.status === "string" && (e.status === "ok" || e.status === "fail")) out.secretStatus = e.status; if (Array.isArray(e.hits)) out.secretHits = e.hits.filter((x) => typeof x === "string") as string[]; } if (e.event_type === "CapabilityCheckPerformed") { if (typeof e.status === "string" && (e.status === "ok" || e.status === "fail")) out.capStatus = e.status; if (Array.isArray(e.required)) out.required = e.required.filter((x) => typeof x === "string") as string[]; if (Array.isArray(e.allowed)) out.allowed = e.allowed.filter((x) => typeof x === "string") as string[]; if (Array.isArray(e.missing)) out.missing = e.missing.filter((x) => typeof x === "string") as string[]; if (typeof e.profile === "string") out.profile = e.profile; if (typeof e.sandbox === "boolean") out.sandbox = e.sandbox; } } return out; }
export type Alert = { level: "info" | "warn" | "error"; message: string };
export function alertsFromEvents(events: TraceEvent[]): Alert[] { const pf = preflightFromEvents(events); const out: Alert[] = [];
// Only show warnings for actual failures, not missing events (which may be expected for older runtimes)
if (pf.secretStatus === "fail") out.push({ level: "error", message: Secret scan failed (${pf.secretHits?.length ?? 0} hit(s)) });
if (pf.capStatus === "fail") out.push({ level: "error", message: Missing capabilities: ${(pf.missing ?? []).join(", ")} });
let redacted = 0;
for (const e of events) {
if (e.redacted === true) redacted += 1;
}
if (redacted > 0) out.push({ level: "warn", message: ${redacted} event(s) had redactions applied });
const totals = runTotals(events);
if (totals.tokens >= 50_000) out.push({ level: "warn", message: High token usage (${totals.tokens}) });
if (totals.cost >= 2.0) out.push({ level: "warn", message: High estimated cost ($${totals.cost.toFixed(2)}) });
if (pf.sandbox) out.push({ level: "info", message: "Sandbox mode enabled (bash/network/write denied in preflight)" }); if (pf.profile === "untrusted") out.push({ level: "info", message: "Untrusted profile active" });
return out; }
export function bindingNamesFromEvents(events: TraceEvent[]): string[] { const names = new Set(); for (const e of events) { if (e.event_type === "BindingMaterialized" && typeof e.name === "string" && e.name.trim()) { names.add(e.name); } } return Array.from(names).sort(); }
// Memory card types and functions export type MemoryCardType = "goal" | "preference" | "decision" | "fact" | "warning"; export type MemoryRetention = "ephemeral" | "project" | "global";
export type MemoryCard = { card_id: string; agent_id: string; card_type: MemoryCardType; content: string; retention: MemoryRetention; provenance?: { session_id?: string; binding_ids?: string[]; }; created_at: string; updated_at?: string; reference_count: number; is_pruned: boolean; prune_reason?: string; };
export function memoryCardsFromEvents(events: TraceEvent[]): MemoryCard[] { const cards = new Map<string, MemoryCard>();
for (const e of events) { if (e.event_type === "MemoryCardCreated") { const cardId = e.card_id; if (typeof cardId !== "string") continue; cards.set(cardId, { card_id: cardId, agent_id: typeof e.agent_id === "string" ? e.agent_id : "", card_type: (typeof e.card_type === "string" ? e.card_type : "fact") as MemoryCardType, content: typeof e.content === "string" ? e.content : "", retention: (typeof e.retention === "string" ? e.retention : "project") as MemoryRetention, provenance: e.provenance as MemoryCard["provenance"], created_at: e.timestamp, reference_count: 0, is_pruned: false, }); } if (e.event_type === "MemoryCardUpdated") { const cardId = e.card_id; if (typeof cardId !== "string") continue; const card = cards.get(cardId); if (card && typeof e.new_content === "string") { card.content = e.new_content; card.updated_at = e.timestamp; } } if (e.event_type === "MemoryCardPruned") { const cardId = e.card_id; if (typeof cardId !== "string") continue; const card = cards.get(cardId); if (card) { card.is_pruned = true; card.prune_reason = typeof e.reason === "string" ? e.reason : undefined; } } if (e.event_type === "MemoryCardReferenced") { const cardId = e.card_id; if (typeof cardId !== "string") continue; const card = cards.get(cardId); if (card) { card.reference_count += 1; } } }
return Array.from(cards.values()).sort((a, b) => b.reference_count - a.reference_count); }
// Watcher alert types and functions export type WatcherAlertType = "runaway_loop" | "cost_anomaly" | "repeated_failure" | "context_bloat" | "unsafe_write" | "stale_memory";
export type WatcherAlert = { event_id: string; timestamp: string; watcher_id: string; alert_type: WatcherAlertType; severity: "info" | "warning" | "error"; message: string; details?: Record<string, unknown>; recommendation?: string; };
// Configurable thresholds for pattern detectors export interface WatcherThresholds { loopIterationsWarn: number; loopIterationsError: number; contextTokensWarn: number; contextTokensError: number; consecutiveFailuresWarn: number; consecutiveFailuresError: number; agentFailureCount: number; costSpikeMultiplier: number; // Alert if cost > avg * multiplier costMinSampleSize: number; // Min sessions before spike detection }
export const DEFAULT_THRESHOLDS: WatcherThresholds = { loopIterationsWarn: 10, loopIterationsError: 50, contextTokensWarn: 50_000, contextTokensError: 100_000, consecutiveFailuresWarn: 3, consecutiveFailuresError: 5, agentFailureCount: 5, costSpikeMultiplier: 3, costMinSampleSize: 5, };
export function watcherAlertsFromEvents( events: TraceEvent[], thresholds: WatcherThresholds = DEFAULT_THRESHOLDS ): WatcherAlert[] { const alerts: WatcherAlert[] = [];
for (const e of events) { if (e.event_type === "WatcherAlertRaised") { alerts.push({ event_id: e.event_id, timestamp: e.timestamp, watcher_id: typeof e.watcher_id === "string" ? e.watcher_id : "", alert_type: (typeof e.alert_type === "string" ? e.alert_type : "cost_anomaly") as WatcherAlertType, severity: (typeof e.severity === "string" ? e.severity : "warning") as WatcherAlert["severity"], message: typeof e.message === "string" ? e.message : "", details: typeof e.details === "object" ? e.details as Record<string, unknown> : undefined, recommendation: typeof e.recommendation === "string" ? e.recommendation : undefined, }); } }
// Also run pattern-based detection (rule-based watchers) alerts.push(...detectRunawayLoops(events, thresholds)); alerts.push(...detectContextBloat(events, thresholds)); alerts.push(...detectRepeatedFailures(events, thresholds)); alerts.push(...detectCostAnomalies(events, thresholds));
return alerts.sort((a, b) => { const severityOrder = { error: 0, warning: 1, info: 2 }; return severityOrder[a.severity] - severityOrder[b.severity]; }); }
// Pattern-based watcher detectors function detectRunawayLoops(events: TraceEvent[], thresholds: WatcherThresholds): WatcherAlert[] { const alerts: WatcherAlert[] = []; const loopCounts = new Map<string, number>(); const alertedLoops = new Set();
for (const e of events) { if (e.event_type === "LoopIterationStarted" && e.parent_event_id) { const count = (loopCounts.get(e.parent_event_id) ?? 0) + 1; loopCounts.set(e.parent_event_id, count);
// Warn at threshold
if (count === thresholds.loopIterationsWarn && !alertedLoops.has(`warn-${e.parent_event_id}`)) {
alertedLoops.add(`warn-${e.parent_event_id}`);
alerts.push({
event_id: `watcher-runaway-${e.parent_event_id}`,
timestamp: e.timestamp,
watcher_id: "runaway_loop_detector",
alert_type: "runaway_loop",
severity: "warning",
message: `Loop has reached ${count} iterations`,
details: { loop_id: e.parent_event_id, iterations: count, threshold: thresholds.loopIterationsWarn },
recommendation: "Consider adding a max: constraint or more specific exit condition",
});
}
// Error at threshold
if (count === thresholds.loopIterationsError && !alertedLoops.has(`error-${e.parent_event_id}`)) {
alertedLoops.add(`error-${e.parent_event_id}`);
alerts.push({
event_id: `watcher-runaway-critical-${e.parent_event_id}`,
timestamp: e.timestamp,
watcher_id: "runaway_loop_detector",
alert_type: "runaway_loop",
severity: "error",
message: `Loop has reached ${count} iterations - possible runaway`,
details: { loop_id: e.parent_event_id, iterations: count, threshold: thresholds.loopIterationsError },
recommendation: "Strongly consider terminating this loop or reviewing the exit condition",
});
}
}
}
return alerts; }
function detectContextBloat(events: TraceEvent[], thresholds: WatcherThresholds): WatcherAlert[] { const alerts: WatcherAlert[] = [];
for (const e of events) {
if (e.event_type === "ContextAssembled" && e.token_estimate !== undefined) {
if (e.token_estimate > thresholds.contextTokensError) {
alerts.push({
event_id: watcher-context-bloat-${e.event_id},
timestamp: e.timestamp,
watcher_id: "context_bloat_detector",
alert_type: "context_bloat",
severity: "error",
message: Context assembled with ${e.token_estimate.toLocaleString()} tokens (very large),
details: { token_count: e.token_estimate, session_id: e.session_id, threshold: thresholds.contextTokensError },
recommendation: "Reduce context by passing only required bindings or summarizing large inputs",
});
} else if (e.token_estimate > thresholds.contextTokensWarn) {
alerts.push({
event_id: watcher-context-bloat-${e.event_id},
timestamp: e.timestamp,
watcher_id: "context_bloat_detector",
alert_type: "context_bloat",
severity: "warning",
message: Context assembled with ${e.token_estimate.toLocaleString()} tokens (large),
details: { token_count: e.token_estimate, session_id: e.session_id, threshold: thresholds.contextTokensWarn },
recommendation: "Consider reducing context size for better performance and cost",
});
}
}
}
return alerts; }
function detectRepeatedFailures(events: TraceEvent[], thresholds: WatcherThresholds): WatcherAlert[] { const alerts: WatcherAlert[] = []; const failuresByAgent = new Map<string, number>(); let consecutiveFailures = 0; let lastFailureTime = ""; let warnAlertEmitted = false; let errorAlertEmitted = false;
for (const e of events) { // Both SessionFailed and RunFailed count as failures if (e.event_type === "SessionFailed" || e.event_type === "RunFailed") { consecutiveFailures += 1; lastFailureTime = e.timestamp;
if (e.event_type === "SessionFailed") {
const agentId = e.agent_id ?? "unknown";
failuresByAgent.set(agentId, (failuresByAgent.get(agentId) ?? 0) + 1);
}
if (consecutiveFailures === thresholds.consecutiveFailuresWarn && !warnAlertEmitted) {
warnAlertEmitted = true;
alerts.push({
event_id: `watcher-repeated-failure-${e.event_id}`,
timestamp: e.timestamp,
watcher_id: "repeated_failure_detector",
alert_type: "repeated_failure",
severity: "warning",
message: `${consecutiveFailures} consecutive failures detected`,
details: { consecutive_count: consecutiveFailures, threshold: thresholds.consecutiveFailuresWarn },
recommendation: "Review recent session inputs and consider intervention",
});
} else if (consecutiveFailures === thresholds.consecutiveFailuresError && !errorAlertEmitted) {
errorAlertEmitted = true;
alerts.push({
event_id: `watcher-repeated-failure-critical-${e.event_id}`,
timestamp: e.timestamp,
watcher_id: "repeated_failure_detector",
alert_type: "repeated_failure",
severity: "error",
message: `${consecutiveFailures} consecutive failures - run may be stuck`,
details: { consecutive_count: consecutiveFailures, threshold: thresholds.consecutiveFailuresError },
recommendation: "Consider pausing the run and investigating the failure pattern",
});
}
} else if (e.event_type === "SessionCompleted") {
// Only reset on successful completion
consecutiveFailures = 0;
warnAlertEmitted = false;
errorAlertEmitted = false;
}
}
// Check for agent-specific failure patterns
for (const [agentId, count] of failuresByAgent) {
if (count >= thresholds.agentFailureCount) {
alerts.push({
event_id: watcher-agent-failures-${agentId},
timestamp: lastFailureTime,
watcher_id: "repeated_failure_detector",
alert_type: "repeated_failure",
severity: "warning",
message: Agent "${agentId}" has failed ${count} times in this run,
details: { agent_id: agentId, failure_count: count, threshold: thresholds.agentFailureCount },
recommendation: "Review agent configuration and task complexity",
});
}
}
return alerts; }
/**
- Reconstruct memory card state at a specific timestamp.
- Used for the memory timeline scrubber to show historical state. */ export function reconstructMemoryAt(events: TraceEvent[], targetTime: string): MemoryCard[] { const cards = new Map<string, MemoryCard>();
for (const e of events) { // Stop processing once we're past the target time if (e.timestamp > targetTime) break;
if (e.event_type === "MemoryCardCreated") {
const cardId = e.card_id;
if (typeof cardId !== "string") continue;
cards.set(cardId, {
card_id: cardId,
agent_id: typeof e.agent_id === "string" ? e.agent_id : "",
card_type: (typeof e.card_type === "string" ? e.card_type : "fact") as MemoryCardType,
content: typeof e.content === "string" ? e.content : "",
retention: (typeof e.retention === "string" ? e.retention : "project") as MemoryRetention,
provenance: e.provenance as MemoryCard["provenance"],
created_at: e.timestamp,
reference_count: 0,
is_pruned: false,
});
}
if (e.event_type === "MemoryCardUpdated") {
const cardId = e.card_id;
if (typeof cardId !== "string") continue;
const card = cards.get(cardId);
if (card && typeof e.new_content === "string") {
card.content = e.new_content;
card.updated_at = e.timestamp;
}
}
if (e.event_type === "MemoryCardPruned") {
const cardId = e.card_id;
if (typeof cardId !== "string") continue;
const card = cards.get(cardId);
if (card) {
card.is_pruned = true;
card.prune_reason = typeof e.reason === "string" ? e.reason : undefined;
}
}
if (e.event_type === "MemoryCardReferenced") {
const cardId = e.card_id;
if (typeof cardId !== "string") continue;
const card = cards.get(cardId);
if (card) {
card.reference_count += 1;
}
}
}
return Array.from(cards.values()).sort((a, b) => b.reference_count - a.reference_count); }
/**
- Get the time range (first and last timestamps) for a set of events. */ export function getTimeRange(events: TraceEvent[]): { start: string; end: string } | null { if (events.length === 0) return null;
const sorted = [...events].sort((a, b) => a.timestamp.localeCompare(b.timestamp)); return { start: sorted[0].timestamp, end: sorted[sorted.length - 1].timestamp, }; }
function detectCostAnomalies(events: TraceEvent[], thresholds: WatcherThresholds): WatcherAlert[] { const alerts: WatcherAlert[] = []; const sessionCosts: number[] = [];
for (const e of events) { if (e.event_type === "SessionCompleted" && e.cost_delta !== undefined && e.cost_delta !== null) { sessionCosts.push(e.cost_delta);
// Check for cost spike relative to average (no hardcoded $ floor)
// Only after we have enough samples for a meaningful comparison
if (sessionCosts.length >= thresholds.costMinSampleSize) {
const previousCosts = sessionCosts.slice(0, -1);
const avg = previousCosts.reduce((a, b) => a + b, 0) / previousCosts.length;
// Only alert if both the multiplier is exceeded AND the average is non-trivial
// (avoids alerts when avg is near-zero and any cost looks like a spike)
if (avg > 0.001 && e.cost_delta > avg * thresholds.costSpikeMultiplier) {
const multiplier = e.cost_delta / avg;
alerts.push({
event_id: `watcher-cost-anomaly-${e.event_id}`,
timestamp: e.timestamp,
watcher_id: "cost_anomaly_detector",
alert_type: "cost_anomaly",
severity: "warning",
message: `Session cost $${e.cost_delta.toFixed(4)} is ${multiplier.toFixed(1)}x the average ($${avg.toFixed(4)})`,
details: {
session_id: e.session_id,
cost: e.cost_delta,
average: avg,
multiplier: multiplier,
threshold_multiplier: thresholds.costSpikeMultiplier,
},
recommendation: "Investigate why this session was more expensive than typical",
});
}
}
}
}
return alerts; }
## File: ide/apps/mission-control/src/types.ts
export type RunSummary = { run_id: string; trace_path: string; program_path: string; status: "running" | "completed" | "failed"; };
// Base fields present on all trace events type TraceEventBase = { event_id: string; run_id: string; timestamp: string; parent_event_id?: string | null; session_id?: string | null; cost_delta?: number | null; tokens_delta?: number | null; redaction_policy_version: number; redacted?: boolean; redaction_hits?: string[]; };
// Discriminated union for type-safe event handling export type TraceEvent = | (TraceEventBase & { event_type: "RunStarted" }) | (TraceEventBase & { event_type: "RunCompleted" }) | (TraceEventBase & { event_type: "RunFailed"; error_type?: string; message?: string }) | (TraceEventBase & { event_type: "RunBudgetSet"; mode?: string; profile?: string; sandbox?: boolean }) | (TraceEventBase & { event_type: "SecretScanPerformed"; status?: "ok" | "fail"; hits?: string[] }) | (TraceEventBase & { event_type: "CapabilityCheckPerformed"; status?: "ok" | "fail"; required?: string[]; allowed?: string[]; missing?: string[]; profile?: string; sandbox?: boolean; }) | (TraceEventBase & { event_type: "SessionPlanned"; head?: string; kind?: string }) | (TraceEventBase & { event_type: "SessionStarted"; agent_id?: string; prompt_preview?: string }) | (TraceEventBase & { event_type: "SessionCompleted"; agent_id?: string; output_preview?: string }) | (TraceEventBase & { event_type: "SessionFailed"; agent_id?: string; error_type?: string; message?: string }) | (TraceEventBase & { event_type: "ContextAssembled"; token_estimate?: number; included_bindings?: Array<{ name: string; binding_id: string; token_count?: number; reason?: "explicit" | "inferred" | "required" | "memory"; reason_detail?: string; }>; summarization?: { applied?: boolean; original_tokens?: number; compressed_tokens?: number; bindings_affected?: string[]; }; }) | (TraceEventBase & { event_type: "BindingMaterialized"; name?: string; binding_id?: string; path?: string }) | (TraceEventBase & { event_type: "BindingReferenced"; name?: string; binding_id?: string }) | (TraceEventBase & { event_type: "DiscretionEvaluated"; condition?: string; result?: boolean; note?: string; rubric?: { criteria?: string[]; thresholds?: Record<string, number>; }; evidence?: { citations?: Array<{ binding_id: string; chunk_id?: string; excerpt?: string }>; tool_outputs?: Array<{ tool: string; result: string }>; }; confidence?: number; uncertainties?: string[]; }) | (TraceEventBase & { event_type: "LoopIterationStarted"; iteration?: number }) | (TraceEventBase & { event_type: "LoopIterationCompleted"; iteration?: number }) | (TraceEventBase & { event_type: "ParallelBlockStarted" }) | (TraceEventBase & { event_type: "ParallelBlockCompleted" }) | (TraceEventBase & { event_type: "HumanInterventionRequested"; prompt?: string; request_id?: string; summary?: string; question?: string; options?: string[]; }) | (TraceEventBase & { event_type: "HumanInterventionApplied"; note?: string }) | (TraceEventBase & { event_type: "CheckpointCreated"; checkpoint_id?: string; label?: string }) | (TraceEventBase & { event_type: "CheckpointRestored"; checkpoint_id?: string }) | (TraceEventBase & { event_type: "ErrorOccurred"; error_type?: string; message?: string; stack?: string }) | (TraceEventBase & { event_type: "MemoryCardCreated"; card_id?: string; agent_id?: string; card_type?: "goal" | "preference" | "decision" | "fact" | "warning"; content?: string; retention?: "ephemeral" | "project" | "global"; provenance?: { session_id?: string; binding_ids?: string[] }; }) | (TraceEventBase & { event_type: "MemoryCardUpdated"; card_id?: string; agent_id?: string; previous_content?: string; new_content?: string; update_reason?: string; }) | (TraceEventBase & { event_type: "MemoryCardPruned"; card_id?: string; agent_id?: string; reason?: "manual" | "stale" | "retention_policy" | "compaction"; }) | (TraceEventBase & { event_type: "MemoryCardReferenced"; card_id?: string; agent_id?: string; referenced_by_session?: string; }) | (TraceEventBase & { event_type: "WatcherAlertRaised"; watcher_id?: string; alert_type?: "runaway_loop" | "cost_anomaly" | "repeated_failure" | "context_bloat" | "unsafe_write" | "stale_memory"; severity?: "info" | "warning" | "error"; message?: string; details?: Record<string, unknown>; recommendation?: string; }) // VM events (Claude-as-VM execution) | (TraceEventBase & { event_type: "Thought"; text?: string; position?: string }) | (TraceEventBase & { event_type: "ToolCall"; tool_id?: string; tool_name?: string; input_preview?: string }) | (TraceEventBase & { event_type: "ToolResult"; tool_id?: string; is_error?: boolean; output_preview?: string }) | (TraceEventBase & { event_type: "TextOutput"; text?: string }) | (TraceEventBase & { event_type: "Processing" }) | (TraceEventBase & { event_type: "StreamingText"; text?: string; is_complete?: boolean }) | (TraceEventBase & { event_type: "PuppetModeEntered"; request_id?: string; context_summary?: string }) | (TraceEventBase & { event_type: "RuntimeEvent"; event?: Record<string, unknown> });
// Type guard helpers for narrowing export function isEventType<T extends TraceEvent["event_type"]>( event: TraceEvent, type: T ): event is Extract<TraceEvent, { event_type: T }> { return event.event_type === type; }
export type CheckpointInfo = { id: string; label?: string | null; refname: string; commit: string; created_at: string; };
// VM Event types (Claude-as-VM execution) export type VmEvent = | { event_type: "VmStarted"; run_id: string; program_file: string; resume_id?: string } | { event_type: "Thought"; run_id: string; text: string; position?: string } | { event_type: "SessionSpawned"; run_id: string; session_id: string; agent?: string; prompt_preview: string } | { event_type: "SessionCompleted"; run_id: string; session_id: string; output_preview: string } | { event_type: "BindingCreated"; run_id: string; name: string; path: string } | { event_type: "ToolCall"; run_id: string; tool_id: string; tool_name: string; input_preview: string } | { event_type: "ToolResult"; run_id: string; tool_id: string; is_error: boolean; output_preview: string } | { event_type: "TextOutput"; run_id: string; text: string } | { event_type: "FeedbackRequested"; run_id: string; request_id: string; session_id?: string; summary: string; question?: string; options?: string[]; display_binding?: string } | { event_type: "PuppetModeEntered"; run_id: string; request_id: string; session_id: string; agent?: string; context_summary: string; last_output?: string } | { event_type: "SuspensionResolved"; run_id: string; request_id: string } | { event_type: "VmCompleted"; run_id: string; success: boolean; error?: string; resume_id?: string; stderr_tail?: string };
## File: ide/apps/mission-control/src/App.tsx
import { listen, emit } from "@tauri-apps/api/event"; import React, { useEffect, useState, useCallback, useRef, useMemo, lazy, Suspense } from "react"; import { StoreProvider, useAppState, useDerived, useDispatch, useSuspension } from "./store"; import { useTrace } from "./hooks/useTrace"; import { useKeyboard, SHORTCUTS } from "./hooks/useKeyboard"; import { ExecutionFeed } from "./components/Feed"; import { SessionInspector } from "./components/Inspector"; import { FeedbackInput, type FeedbackRequestData } from "./components/Inspector/FeedbackInput"; import { PuppetInterface, type PuppetModeData } from "./components/Inspector/PuppetInterface"; import { RunList } from "./components/Sidebar"; import { StatusBar } from "./components/StatusBar"; import { NotificationBell, RunNotification } from "./components/NotificationBell"; import { Button } from "./components/common"; import { api, RuntimeConnection, RuntimeEvent } from "./lib/api"; import type { TraceEvent, VmEvent } from "./types";
// Lazy load heavy components (~150KB CodeMirror deferred until needed) const BlameGraph = lazy(() => import("./components/BlameGraph").then(m => ({ default: m.BlameGraph }))); const ProseEditor = lazy(() => import("./components/Editor").then(m => ({ default: m.ProseEditor })));
function AppContent() { const state = useAppState(); const derived = useDerived(); const dispatch = useDispatch(); const trace = useTrace(); const suspension = useSuspension();
const [showBlameGraph, setShowBlameGraph] = useState(false); const [showShortcuts, setShowShortcuts] = useState(false); const [showEditor, setShowEditor] = useState(false); const [dismissedNotifications, setDismissedNotifications] = useState<Set>(new Set());
// Build notifications from runs - use backend-provided status const notifications: RunNotification[] = useMemo(() => { return state.runs .filter(r => !dismissedNotifications.has(r.run_id)) .slice(0, 10) // Limit to 10 most recent .map(run => { // For the currently selected run, we can count sessions from loaded events const isSelected = run.run_id === state.selectedRunId; const sessionCount = isSelected ? state.events.filter(e => e.event_type === "SessionStarted").length : 0;
return {
runId: run.run_id,
status: run.status, // Use backend-provided status from trace file
startedAt: new Date().toISOString(), // We don't have this without loading events
completedAt: run.status !== "running" ? new Date().toISOString() : undefined,
sessionCount,
};
});
}, [state.runs, state.selectedRunId, state.events, dismissedNotifications]);
const handleNotificationRunClick = useCallback((runId: string) => { dispatch({ type: "SET_SELECTED_RUN", payload: runId }); }, [dispatch]);
const handleClearNotifications = useCallback(() => { const allRunIds = new Set(state.runs.map(r => r.run_id)); setDismissedNotifications(allRunIds); }, [state.runs]);
// Runtime connection state const [runtimeConnected, setRuntimeConnected] = useState(false); const [puppetMode, setPuppetMode] = useState<{ active: boolean; sessionId?: string; agent?: string; contextSummary?: string; lastOutput?: string; }>({ active: false }); const [puppetInput, setPuppetInput] = useState(""); const runtimeRef = useRef<RuntimeConnection | null>(null);
// Event selection handlers const handleEventSelect = useCallback((eventId: string, sessionId: string) => { dispatch({ type: "SET_SELECTED_EVENT", payload: eventId }); dispatch({ type: "SET_SELECTED_SESSION", payload: sessionId });
// Update focused index
const idx = derived.sortedEvents.findIndex((e) => e.event_id === eventId);
if (idx >= 0) dispatch({ type: "SET_FOCUSED_INDEX", payload: idx });
}, [dispatch, derived.sortedEvents]);
const handleSessionSelect = useCallback((sessionId: string) => { dispatch({ type: "SET_SELECTED_SESSION", payload: sessionId }); }, [dispatch]);
// Keyboard navigation const navigateUp = useCallback(() => { const newIdx = Math.max(0, state.focusedIndex - 1); dispatch({ type: "SET_FOCUSED_INDEX", payload: newIdx }); const ev = derived.sortedEvents[newIdx]; if (ev) { dispatch({ type: "SET_SELECTED_EVENT", payload: ev.event_id }); dispatch({ type: "SET_SELECTED_SESSION", payload: ev.session_id ?? "-" }); } }, [state.focusedIndex, derived.sortedEvents, dispatch]);
const navigateDown = useCallback(() => { const newIdx = Math.min(derived.sortedEvents.length - 1, state.focusedIndex + 1); dispatch({ type: "SET_FOCUSED_INDEX", payload: newIdx }); const ev = derived.sortedEvents[newIdx]; if (ev) { dispatch({ type: "SET_SELECTED_EVENT", payload: ev.event_id }); dispatch({ type: "SET_SELECTED_SESSION", payload: ev.session_id ?? "-" }); } }, [state.focusedIndex, derived.sortedEvents, dispatch]);
const handleSelect = useCallback(() => { // Expand details / trigger preview for current selection if (derived.selectedSession?.producedBindings.length) { const first = derived.selectedSession.producedBindings[0]; if (first.name) void trace.previewBinding(first.name); } }, [derived.selectedSession, trace]);
useKeyboard({ onNavigateUp: navigateUp, onNavigateDown: navigateDown, onSelect: handleSelect, onBlame: () => void trace.blameSelectedSession(), onFork: () => void trace.forkSelectedRun(), onCheckpoint: () => void trace.createCheckpoint(), onRefresh: () => void trace.refreshRuns(), onCommandPalette: () => setShowShortcuts((s) => !s), onEscape: () => { setShowShortcuts(false); setShowBlameGraph(false); setShowEditor(false); }, onToggleEditor: () => { if (state.program) setShowEditor((s) => !s); }, }, true);
// E2E Testing: listen for MCP execute-js commands useEffect(() => { let unlisten: (() => void) | null = null;
const setup = async () => {
unlisten = await listen<string>("execute-js", async (event) => {
const code = event.payload;
try {
// Execute the JS code and capture result (handles both sync and async)
// eslint-disable-next-line no-eval
let result = eval(code);
// If result is a Promise, await it
if (result instanceof Promise) {
result = await result;
}
const resultStr = result === undefined ? "undefined" : JSON.stringify(result);
await emit("execute-js-response", { result: resultStr, type: typeof result });
} catch (err) {
await emit("execute-js-response", { error: String(err) });
}
});
};
void setup();
return () => {
if (unlisten) unlisten();
};
}, []);
// Load workspace on mount useEffect(() => { void trace.refreshRuns(); }, [state.workspaceRoot]);
useEffect(() => { if (state.workspaceRoot) void trace.refreshCheckpoints(); }, [state.workspaceRoot]);
// Listen for trace events useEffect(() => { let unlisten: (() => void) | null = null; let alive = true;
(async () => {
unlisten = await listen<TraceEvent>("trace_event", (event) => {
if (!alive) return;
dispatch({ type: "ADD_EVENT", payload: event.payload });
});
})();
return () => {
alive = false;
if (unlisten) unlisten();
void api.watch.stop();
};
}, [dispatch]);
const handleShowBlameGraph = useCallback(async () => { await trace.blameSelectedSession(); setShowBlameGraph(true); }, [trace]);
const handleBlameNodeClick = useCallback((nodeId: string, type: "session" | "binding") => { if (type === "session") { dispatch({ type: "SET_SELECTED_SESSION", payload: nodeId }); } else { void trace.previewBinding(nodeId); } }, [dispatch, trace]);
// Runtime connection handlers const connectToRuntime = useCallback(async () => { if (runtimeRef.current) return;
const runtime = new RuntimeConnection();
runtime.onConnect = () => setRuntimeConnected(true);
runtime.onDisconnect = () => {
setRuntimeConnected(false);
setPuppetMode({ active: false });
};
runtime.onEvent = (event: RuntimeEvent) => {
// Handle puppet mode events
if (event.event_type === "puppet_mode_entered") {
setPuppetMode({
active: true,
sessionId: event.session_id,
agent: event.agent,
contextSummary: event.summary,
lastOutput: event.output,
});
} else if (event.event_type === "puppet_mode_exited") {
setPuppetMode({ active: false });
}
// Also dispatch as trace event for the Loom
dispatch({ type: "ADD_EVENT", payload: event as unknown as TraceEvent });
};
try {
await runtime.connect();
runtimeRef.current = runtime;
} catch (err) {
console.error("Failed to connect to runtime:", err);
dispatch({ type: "SET_ERROR", payload: `Runtime connection failed: ${err}` });
}
}, [dispatch]);
const disconnectRuntime = useCallback(() => { runtimeRef.current?.disconnect(); runtimeRef.current = null; setRuntimeConnected(false); setPuppetMode({ active: false }); }, []);
const handleTakeControl = useCallback(async () => { if (!runtimeRef.current || !derived.selectedSession) return; // Send puppet mode request - the runtime will respond with puppet_mode_entered event try { await runtimeRef.current.resume({ type: "PuppetResponse", human_response: "TAKE_CONTROL" }); } catch (err) { console.error("Failed to take control:", err); } }, [derived.selectedSession]);
const handleReleasePuppet = useCallback(async () => { if (!runtimeRef.current) return; try { await runtimeRef.current.resume({ type: "PuppetRelease" }); setPuppetMode({ active: false }); setPuppetInput(""); } catch (err) { console.error("Failed to release puppet:", err); } }, []);
const handleSendPuppetResponse = useCallback(async () => { if (!runtimeRef.current || !puppetInput.trim()) return; try { await runtimeRef.current.resume({ type: "PuppetResponse", human_response: puppetInput }); setPuppetInput(""); } catch (err) { console.error("Failed to send puppet response:", err); } }, [puppetInput]);
// Feedback submission handler (integrated with store suspension state)
// Uses Tauri command for VM execution, WebSocket for CLI --interactive mode
const handleFeedbackSubmit = useCallback(async (response: { selectedOption?: number; userInput?: string }) => {
if (!suspension || suspension.type !== "feedback") return;
try {
// Try Tauri command first (for run_prose_vm execution)
await api.submitFeedback(suspension.requestId, {
selected_option: response.selectedOption,
user_input: response.userInput,
});
dispatch({ type: "SET_SUSPENSION", payload: null });
} catch (err) {
// Fallback to WebSocket if Tauri command fails (for --interactive mode)
if (runtimeRef.current) {
try {
await runtimeRef.current.resume({
type: "FeedbackResponse",
selected_option: response.selectedOption,
user_input: response.userInput,
});
dispatch({ type: "SET_SUSPENSION", payload: null });
return;
} catch (wsErr) {
console.error("WebSocket fallback also failed:", wsErr);
}
}
console.error("Failed to submit feedback:", err);
dispatch({ type: "SET_ERROR", payload: Feedback submission failed: ${err} });
}
}, [suspension, dispatch]);
// Puppet mode handlers using store suspension state
const handlePuppetSubmit = useCallback(async (response: string) => {
if (!runtimeRef.current || !suspension || suspension.type !== "puppet") return;
try {
await runtimeRef.current.resume({ type: "PuppetResponse", human_response: response });
// Don't clear suspension - wait for SuspensionResolved event
} catch (err) {
console.error("Failed to send puppet response:", err);
dispatch({ type: "SET_ERROR", payload: Puppet response failed: ${err} });
}
}, [suspension, dispatch]);
const handlePuppetRelease = useCallback(async () => { if (!runtimeRef.current) return; try { await runtimeRef.current.resume({ type: "PuppetRelease" }); dispatch({ type: "SET_SUSPENSION", payload: null }); } catch (err) { console.error("Failed to release puppet:", err); } }, [dispatch]);
// Cleanup runtime connection on unmount useEffect(() => { return () => { runtimeRef.current?.disconnect(); }; }, []);
// Listen for VM events from Claude-as-VM execution useEffect(() => { let unlisten: (() => void) | null = null;
const setupListener = async () => {
unlisten = await listen<VmEvent>("vm_event", (event) => {
dispatch({ type: "ADD_VM_EVENT", payload: event.payload });
});
};
void setupListener();
return () => {
if (unlisten) unlisten();
};
}, [dispatch]);
// Listen for runtime events from Tauri backend (for live progress) // The backend emits TraceEvents directly via vm_event_to_trace() // Batched to avoid excessive re-renders (dispatch every 50ms) const [isExecuting, setIsExecuting] = useState(false); // Boot phase tracks startup progress: waiting → connected → thinking → null (done) const [bootPhase, setBootPhase] = useState<'waiting' | 'connected' | 'thinking' | null>(null); const eventBufferRef = useRef<TraceEvent[]>([]);
// Track boot phase based on events - stays in boot mode until "meaningful" content arrives useEffect(() => { if (state.error) { setBootPhase(null); return; } if (bootPhase === null) return; // Already exited boot mode
// Check what events we have to determine phase
for (const ev of state.events) {
const evType = ev.event_type;
// These events mean we have real content - exit boot mode
if (evType === 'SessionStarted' || evType === 'ToolCall' ||
evType === 'TextOutput' || evType === 'SessionCompleted' ||
evType === 'RunCompleted' || evType === 'RunFailed') {
setBootPhase(null);
return;
}
}
// Still in boot mode - update phase based on what we've seen
const hasStreaming = state.events.some(e => e.event_type === 'StreamingText');
const hasThought = state.events.some(e => e.event_type === 'Thought');
const hasProcessing = state.events.some(e => e.event_type === 'Processing');
const hasRunStarted = state.events.some(e => e.event_type === 'RunStarted');
// Progress through phases: waiting → connected → thinking
if ((hasStreaming || hasThought) && bootPhase !== 'thinking') {
setBootPhase('thinking');
} else if ((hasProcessing || hasRunStarted) && bootPhase === 'waiting') {
setBootPhase('connected');
}
}, [state.events, state.error, bootPhase]);
useEffect(() => { let unlisten: (() => void) | null = null;
// Flush buffered events every 50ms
const flushInterval = setInterval(() => {
if (eventBufferRef.current.length > 0) {
dispatch({ type: "ADD_EVENTS_BATCH", payload: eventBufferRef.current });
eventBufferRef.current = [];
}
}, 50);
const setupListener = async () => {
// The runtime_event is already a TraceEvent from vm_event_to_trace()
unlisten = await listen<TraceEvent>("runtime_event", (event) => {
const traceEvent = event.payload;
setIsExecuting(true);
// Buffer the event for batch dispatch
eventBufferRef.current.push(traceEvent);
// Check for completion events
if (traceEvent.event_type === "RunCompleted" ||
traceEvent.event_type === "RunFailed" ||
traceEvent.event_type === "ErrorOccurred") {
// Flush immediately on completion
if (eventBufferRef.current.length > 0) {
dispatch({ type: "ADD_EVENTS_BATCH", payload: eventBufferRef.current });
eventBufferRef.current = [];
}
setIsExecuting(false);
// Refresh runs list to show the new run
void trace.refreshRuns();
}
});
};
void setupListener();
return () => {
clearInterval(flushInterval);
// Flush any remaining events on cleanup
if (eventBufferRef.current.length > 0) {
dispatch({ type: "ADD_EVENTS_BATCH", payload: eventBufferRef.current });
eventBufferRef.current = [];
}
if (unlisten) unlisten();
};
}, [dispatch, trace]);
const isLive = state.vmRunning || isExecuting;
// Debug: expose test functions for manual testing in dev useEffect(() => { if (import.meta.env.DEV) { (window as any).__testFeedback = () => { dispatch({ type: "SET_SUSPENSION", payload: { type: "feedback", requestId: "test-feedback-001", sessionId: "test-session", summary: "I've completed the initial research phase and found 3 potential approaches.", question: "Which approach should I pursue?", options: [ "Approach A: Fast but less thorough", "Approach B: Balanced speed/quality", "Approach C: Comprehensive but slow", ], }, }); }; (window as any).__testPuppet = () => { dispatch({ type: "SET_SUSPENSION", payload: { type: "puppet", requestId: "test-puppet-001", sessionId: "test-session", agent: "researcher", contextSummary: "Currently analyzing the codebase structure. Found 15 files matching the pattern.", lastOutput: "Found: src/api.ts, src/auth.ts, src/db.ts...", }, }); }; (window as any).__clearSuspension = () => { dispatch({ type: "SET_SUSPENSION", payload: null }); }; console.log("🧪 Test hooks available: __testFeedback(), __testPuppet(), __clearSuspension()"); } }, [dispatch]);
return (
{/* Keyboard shortcuts modal */}
{showShortcuts && (
<div className="shortcuts-modal" onClick={() => setShowShortcuts(false)}>
<div className="shortcuts-modal__content" onClick={(e) => e.stopPropagation()}>
<div className="shortcuts-modal__title">Keyboard Shortcuts</div>
<div className="shortcuts-modal__list">
{SHORTCUTS.map((s) => (
<div key={s.key} className="shortcuts-modal__item">
<kbd>{s.key}</kbd>
<span>{s.description}</span>
</div>
))}
</div>
</div>
</div>
)}
<div className="layout">
<aside className="sidebar">
<RunList
runs={state.runs}
selectedRunId={state.selectedRunId}
workspaceRoot={state.workspaceRoot || "~/.mission-control"}
onRunSelect={(id) => void trace.loadRun(id)}
onFork={() => void trace.forkSelectedRun()}
onClearRuns={() => trace.clearRuns()}
/>
</aside>
<main className="main">
{showEditor && state.program ? (
<Suspense fallback={<div className="execution-feed__empty"><p>Loading editor...</p></div>}>
<ProseEditor
initialContent={state.program}
filePath={state.selectedRunId ? `${state.workspaceRoot}/.prose/runs/${state.selectedRunId}/program.prose` : undefined}
workspaceRoot={state.workspaceRoot || undefined}
onContentChange={(content) => dispatch({ type: "SET_PROGRAM", payload: content })}
onRun={(runId) => {
setBootPhase('waiting');
setShowEditor(false); // Close editor to show execution feed
void trace.loadRun(runId);
}}
/>
</Suspense>
) : showBlameGraph ? (
<Suspense fallback={<div className="execution-feed__empty"><p>Loading graph...</p></div>}>
<BlameGraph
blameText={state.bindingPreview}
onClose={() => setShowBlameGraph(false)}
onNodeClick={handleBlameNodeClick}
/>
</Suspense>
) : (
<ExecutionFeed
events={state.events}
selectedEventId={state.selectedEventId}
focusedIndex={state.focusedIndex}
isLive={isLive}
bootPhase={bootPhase}
onEventSelect={handleEventSelect}
/>
)}
<StatusBar
sessions={derived.totals.sessions}
tokens={derived.totals.tokens}
cost={derived.totals.cost}
profile={derived.preflight.profile}
sandbox={derived.preflight.sandbox}
isLive={isLive}
/>
</main>
<aside className="detail">
<SessionInspector
session={derived.selectedSession}
alerts={derived.alerts}
preflight={derived.preflight}
bindingNames={derived.bindingNames}
checkpoints={state.checkpoints}
runs={state.runs}
selectedRunId={state.selectedRunId}
compareRunId={state.compareRunId}
compareTotals={state.compareTotals}
interventionNote={state.interventionNote}
checkpointLabel={state.checkpointLabel}
bindingPreview={state.bindingPreview}
memoryCards={derived.memoryCards}
watcherAlerts={derived.watcherAlerts}
onPreviewBinding={(name) => void trace.previewBinding(name)}
onCreateCheckpoint={() => void trace.createCheckpoint()}
onRestoreCheckpoint={(id) => void trace.restoreCheckpoint(id)}
onDiffCheckpoint={(id) => void trace.diffCheckpoint(id)}
onSendIntervention={() => void trace.sendIntervention()}
onInterventionNoteChange={(v) => dispatch({ type: "SET_INTERVENTION_NOTE", payload: v })}
onCheckpointLabelChange={(v) => dispatch({ type: "SET_CHECKPOINT_LABEL", payload: v })}
onCompareRunChange={(v) => void trace.loadCompareRun(v)}
onShowBlameGraph={handleShowBlameGraph}
/>
</aside>
</div>
{/* Suspension overlay - shows when feedback or puppet mode is active */}
{suspension && (
<div className="suspension-overlay">
{suspension.type === "feedback" ? (
<FeedbackInput
request={{
requestId: suspension.requestId,
summary: suspension.summary ?? "",
question: suspension.question,
options: suspension.options ?? [],
displayBinding: suspension.displayBinding,
}}
onSubmit={handleFeedbackSubmit}
/>
) : (
<PuppetInterface
data={{
requestId: suspension.requestId,
sessionId: suspension.sessionId ?? "",
agent: suspension.agent,
contextSummary: suspension.contextSummary ?? "",
lastOutput: suspension.lastOutput,
}}
onSubmit={handlePuppetSubmit}
onRelease={handlePuppetRelease}
/>
)}
</div>
)}
</div>
); }
export default function App() { return ( ); }