Skip to content

Instantly share code, notes, and snippets.

@jparklev
Created January 18, 2026 21:21
Show Gist options
  • Select an option

  • Save jparklev/d6c1ba887beb5dd4ecccf3865985c2dd to your computer and use it in GitHub Desktop.

Select an option

Save jparklev/d6c1ba887beb5dd4ecccf3865985c2dd to your computer and use it in GitHub Desktop.
Mission Control VM Bridge Code Review - OpenProse Tauri + Claude Code Integration

Mission Control VM Bridge - Code Review Request

You are reviewing a Tauri desktop app called "Mission Control" that acts as a frontend for Claude Code CLI. The app uses Claude as a VM to execute programs written in a custom language called OpenProse.

Architecture Overview

The key integration points are:

  1. Tauri backend (lib.rs) spawns Claude CLI processes and manages state
  2. prose-runtime crate contains VmExecutor which spawns claude -p --output-format stream-json and parses its output
  3. MCP server provides HTTP endpoints for Claude to call back for user feedback and puppet mode
  4. Frontend (React/TypeScript) receives events via Tauri's emit system and displays execution state

What I Need You To Review

Please do extensive research on:

  • Claude Code CLI architecture and best practices (the official Anthropic CLI tool)
  • Similar Tauri + CLI bridging patterns in production apps
  • Rust async process management patterns
  • MCP (Model Context Protocol) server implementations

Then review this codebase for:

1. Bridging Logic

  • Is the process spawning and stream parsing robust?
  • Are there race conditions between stdout parsing and process exit?
  • Is the JSON stream parsing (stream-json format) handling edge cases correctly?
  • Are there potential deadlocks in the channel/oneshot communication?

2. Event Emissions

  • Is the Tauri event emission pattern correct and efficient?
  • Are there potential memory leaks from event listeners?
  • Is the watch_run polling mechanism efficient? Could it miss events?

3. Claude Code Handling

  • Is the CLI invocation correct? Are we missing important flags?
  • Is the MCP server integration following best practices?
  • Is the feedback/puppet mode implementation robust?
  • Are timeouts appropriate (5min for feedback, 30min for puppet)?

4. Error Handling & Robustness

  • What happens if Claude crashes mid-execution?
  • What happens if the MCP server receives malformed requests?
  • Are there cleanup issues (orphaned processes, leaked file handles)?
  • Is the stderr capture sufficient for debugging?

5. Security Considerations

  • Any concerns with how we spawn processes?
  • MCP server security (binding to localhost, request validation)?

6. Improvements

  • What patterns from similar apps should we adopt?
  • Any Rust idioms we're missing?
  • Performance optimizations?
  • Better error recovery mechanisms?

Please be thorough and specific. Reference similar open source projects where relevant. If you find issues, provide concrete code suggestions for fixes.


File: ide/apps/mission-control/src-tauri/src/lib.rs

use fs2::FileExt;
use prose_checkpoint::git as checkpoints;
use prose_runtime::{ExecutorConfig, Executor, McpServer, VmConfig, VmEvent, VmExecutionHandle, VmExecutor};
use prose_trace::redaction;
use prose_trace::event::TraceEvent;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tauri::Emitter;
use tokio::sync::Mutex;

#[cfg(target_os = "macos")]
use cocoa::base::{id, nil};
#[cfg(target_os = "macos")]
use objc::{class, msg_send, sel, sel_impl};

/// Get the one-off storage directory (~/.mission-control/)
fn oneoff_root() -> PathBuf {
    dirs::home_dir()
        .unwrap_or_else(|| PathBuf::from("/tmp"))
        .join(".mission-control")
}

/// Determine the effective workspace root for runs
fn effective_workspace(workspace_root: Option<&str>) -> PathBuf {
    match workspace_root {
        Some(ws) if !ws.is_empty() => PathBuf::from(ws),
        _ => oneoff_root(),
    }
}

#[derive(Debug, Clone, Serialize)]
struct RunSummary {
    run_id: String,
    trace_path: String,
    program_path: String,
    status: String, // "running" | "completed" | "failed"
}

/// Quick check of trace file for completion status (reads last few lines only)
fn get_run_status(trace_path: &Path) -> String {
    // Read the file and check for completion events
    if let Ok(content) = std::fs::read_to_string(trace_path) {
        // Check from the end for efficiency
        for line in content.lines().rev().take(20) {
            if line.contains("\"RunCompleted\"") {
                return "completed".to_string();
            }
            if line.contains("\"RunFailed\"") {
                return "failed".to_string();
            }
        }
    }
    "running".to_string()
}

fn run_dir(workspace_root: &Path, run_id: &str) -> PathBuf {
    workspace_root.join(".prose").join("runs").join(run_id)
}

fn trace_path(workspace_root: &Path, run_id: &str) -> PathBuf {
    run_dir(workspace_root, run_id).join("trace.jsonl")
}

fn program_path(workspace_root: &Path, run_id: &str) -> PathBuf {
    run_dir(workspace_root, run_id).join("program.prose")
}

fn ensure_safe_component(what: &str, s: &str) -> Result<(), String> {
    if s.is_empty() {
        return Err(format!("{what} must not be empty"));
    }
    if !s
        .chars()
        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
    {
        return Err(format!("{what} contains invalid characters"));
    }
    Ok(())
}

fn ensure_safe_session_id(s: &str) -> Result<(), String> {
    ensure_safe_component("session_id", s)
}

/// Maximum events to read from a trace file (safety limit against unbounded growth)
const MAX_TRACE_EVENTS: usize = 100_000;

fn read_events(path: &Path) -> anyhow::Result<Vec<TraceEvent>> {
    let f = File::open(path)?;
    let r = BufReader::new(f);
    let mut out = Vec::new();
    for line in r.lines() {
        if out.len() >= MAX_TRACE_EVENTS {
            break; // Safety limit reached
        }
        let line = line?;
        if line.trim().is_empty() {
            continue;
        }
        if let Ok(ev) = serde_json::from_str::<TraceEvent>(&line) {
            out.push(ev);
        }
    }
    Ok(out)
}

#[tauri::command]
fn list_runs(workspace_root: String) -> Result<Vec<RunSummary>, String> {
    let root = PathBuf::from(&workspace_root);
    let dir = root.join(".prose").join("runs");
    let mut out = Vec::new();
    let entries = std::fs::read_dir(&dir).map_err(|e| format!("read {}: {e}", dir.display()))?;
    for ent in entries {
        let ent = ent.map_err(|e| e.to_string())?;
        if !ent.file_type().map_err(|e| e.to_string())?.is_dir() {
            continue;
        }
        let Some(run_id) = ent.file_name().to_str().map(|s| s.to_string()) else {
            continue;
        };
        let trace = trace_path(&root, &run_id);
        if !trace.exists() {
            continue;
        }
        let program = program_path(&root, &run_id);
        let status = get_run_status(&trace);
        out.push(RunSummary {
            run_id,
            trace_path: trace.display().to_string(),
            program_path: program.display().to_string(),
            status,
        });
    }
    out.sort_by(|a, b| b.run_id.cmp(&a.run_id));
    Ok(out)
}

#[tauri::command]
fn read_run_events(workspace_root: String, run_id: String) -> Result<Vec<TraceEvent>, String> {
    ensure_safe_component("run_id", &run_id)?;
    let root = PathBuf::from(&workspace_root);
    let path = trace_path(&root, &run_id);
    read_events(&path).map_err(|e| format!("read {}: {e}", path.display()))
}

#[tauri::command]
fn read_run_program(workspace_root: String, run_id: String) -> Result<String, String> {
    ensure_safe_component("run_id", &run_id)?;
    let root = PathBuf::from(&workspace_root);
    let path = program_path(&root, &run_id);
    std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))
}

#[tauri::command]
fn read_run_binding(workspace_root: String, run_id: String, name: String) -> Result<String, String> {
    if name.trim().is_empty() {
        return Ok(String::new());
    }
    ensure_safe_component("run_id", &run_id)?;
    ensure_safe_component("binding name", name.trim())?;
    let root = PathBuf::from(&workspace_root);
    let path = run_dir(&root, &run_id).join("bindings").join(format!("{name}.md"));
    std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))
}

#[tauri::command]
fn append_intervention(workspace_root: String, run_id: String, note: String) -> Result<(), String> {
    ensure_safe_component("run_id", &run_id)?;
    let root = PathBuf::from(&workspace_root);
    let path = trace_path(&root, &run_id);
    let f = File::options()
        .create(true)
        .append(true)
        .open(&path)
        .map_err(|e| format!("open {}: {e}", path.display()))?;
    let _ = f.lock_exclusive();
    let mut w = BufWriter::new(f);
    let mut ev = TraceEvent::new(&run_id, "HumanInterventionApplied");
    let (note, hits) = redaction::redact_text(note.trim());
    ev.put_str("note", &note);
    if !hits.is_empty() {
        ev.put_json("redaction_hits", serde_json::json!(hits));
    }
    serde_json::to_writer(&mut w, &ev).map_err(|e| e.to_string())?;
    w.write_all(b"\n").map_err(|e| e.to_string())?;
    w.flush().map_err(|e| e.to_string())?;
    Ok(())
}

#[tauri::command]
fn fork_run(workspace_root: String, run_id: String) -> Result<String, String> {
    ensure_safe_component("run_id", &run_id)?;
    prose_trace::writer::fork_run(Path::new(&workspace_root), &run_id).map_err(|e| e.to_string())
}

#[tauri::command]
fn blame_session(workspace_root: String, run_id: String, session_id: String) -> Result<String, String> {
    ensure_safe_component("run_id", &run_id)?;
    ensure_safe_session_id(&session_id)?;
    prose_trace::writer::blame_session(Path::new(&workspace_root), &run_id, &session_id)
        .map_err(|e| e.to_string())
}

#[tauri::command]
fn checkpoint_list(workspace_root: String) -> Result<Vec<checkpoints::CheckpointInfo>, String> {
    checkpoints::list_checkpoints(Path::new(&workspace_root)).map_err(|e| e.to_string())
}

#[tauri::command]
fn checkpoint_create(workspace_root: String, label: Option<String>) -> Result<String, String> {
    let label = label.as_deref().filter(|s| !s.trim().is_empty());
    checkpoints::create_checkpoint(Path::new(&workspace_root), label).map_err(|e| e.to_string())
}

#[tauri::command]
fn checkpoint_restore(workspace_root: String, checkpoint_id: String) -> Result<(), String> {
    checkpoints::restore_checkpoint(Path::new(&workspace_root), &checkpoint_id).map_err(|e| e.to_string())
}

#[tauri::command]
fn checkpoint_diff(workspace_root: String, checkpoint_id: String) -> Result<String, String> {
    checkpoints::diff_checkpoint(Path::new(&workspace_root), &checkpoint_id).map_err(|e| e.to_string())
}

#[derive(Default)]
struct WatchState {
    stop: Option<Arc<AtomicBool>>,
    task: Option<tauri::async_runtime::JoinHandle<()>>,
    run_id: Option<String>,
}

/// State for active VM execution with MCP server for feedback/puppet mode
struct VmState {
    run_id: Option<String>,
    mcp_server: Option<McpServer>,
}

impl Default for VmState {
    fn default() -> Self {
        Self {
            run_id: None,
            mcp_server: None,
        }
    }
}

#[tauri::command]
async fn watch_run(
    app: tauri::AppHandle,
    state: tauri::State<'_, Mutex<WatchState>>,
    workspace_root: String,
    run_id: String,
) -> Result<(), String> {
    ensure_safe_component("run_id", &run_id)?;
    let mut s = state.lock().await;

    if let Some(stop) = s.stop.take() {
        stop.store(true, Ordering::SeqCst);
    }
    if let Some(task) = s.task.take() {
        task.abort();
    }

    let stop = Arc::new(AtomicBool::new(false));
    let stop2 = stop.clone();
    let app2 = app.clone();
    let ws = workspace_root.clone();
    let rid = run_id.clone();

    s.stop = Some(stop);
    s.run_id = Some(run_id);

    s.task = Some(tauri::async_runtime::spawn(async move {
        let path = trace_path(Path::new(&ws), &rid);
        let mut pos = 0u64;
        let mut leftover = String::new(); // Keep partial lines between iterations

        // Start at end so the initial load via `read_run_events` owns the baseline.
        if let Ok(md) = std::fs::metadata(&path) {
            pos = md.len();
        }

        loop {
            if stop2.load(Ordering::SeqCst) {
                break;
            }

            let mut f = match File::open(&path) {
                Ok(f) => f,
                Err(_) => {
                    tokio::time::sleep(std::time::Duration::from_millis(250)).await;
                    continue;
                }
            };

            let len = match f.metadata() {
                Ok(m) => m.len(),
                Err(_) => {
                    tokio::time::sleep(std::time::Duration::from_millis(250)).await;
                    continue;
                }
            };
            if len < pos {
                pos = 0;
                leftover.clear(); // File was truncated, reset leftover
            }
            if f.seek(SeekFrom::Start(pos)).is_ok() {
                let mut buf = std::mem::take(&mut leftover); // Start with leftover from previous read
                if f.read_to_string(&mut buf).is_ok() {
                    pos = f.stream_position().unwrap_or(len);

                    // Split on newlines, keeping the last fragment if it doesn't end with \n
                    let ends_with_newline = buf.ends_with('\n');
                    let lines: Vec<&str> = buf.split('\n').collect();
                    let (complete_lines, partial) = if ends_with_newline || lines.is_empty() {
                        (lines.as_slice(), "")
                    } else {
                        let (complete, last) = lines.split_at(lines.len() - 1);
                        (complete, last.first().copied().unwrap_or(""))
                    };

                    // Process complete lines
                    for line in complete_lines {
                        let line = line.trim();
                        if line.is_empty() {
                            continue;
                        }
                        if let Ok(ev) = serde_json::from_str::<TraceEvent>(line) {
                            let _ = app2.emit("trace_event", ev);
                        }
                    }

                    // Keep partial line for next iteration
                    leftover = partial.to_string();
                }
            }

            tokio::time::sleep(std::time::Duration::from_millis(250)).await;
        }
    }));

    Ok(())
}

#[tauri::command]
async fn watch_stop(state: tauri::State<'_, Mutex<WatchState>>) -> Result<(), String> {
    let mut s = state.lock().await;
    if let Some(stop) = s.stop.take() {
        stop.store(true, Ordering::SeqCst);
    }
    if let Some(task) = s.task.take() {
        task.abort();
    }
    s.run_id = None;
    Ok(())
}

#[tauri::command]
fn save_file(file_path: String, content: String) -> Result<(), String> {
    let path = PathBuf::from(&file_path);
    // Security: ensure path doesn't escape expected directories
    if path.components().any(|c| matches!(c, std::path::Component::ParentDir)) {
        return Err("Path traversal not allowed".to_string());
    }
    std::fs::write(&path, content).map_err(|e| format!("write {}: {e}", path.display()))
}

#[tauri::command]
fn open_file(file_path: String) -> Result<String, String> {
    let path = PathBuf::from(&file_path);
    std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))
}

#[derive(Serialize)]
struct LintError {
    line: usize,
    col: usize,
    message: String,
}

#[derive(Serialize)]
struct LintResult {
    success: bool,
    message: Option<String>,
    errors: Option<Vec<LintError>>,
}

#[tauri::command]
fn lint_prose(content: String) -> LintResult {
    match prose_core::parse_program(&content) {
        Ok(program) => {
            // Run linter on the parsed program
            let lint_errors = prose_core::lint_program(&program);
            if lint_errors.is_empty() {
                LintResult {
                    success: true,
                    message: None,
                    errors: None,
                }
            } else {
                // Filter to only errors (not warnings) for failure
                let error_count = lint_errors
                    .iter()
                    .filter(|e| e.level == prose_core::LintLevel::Error)
                    .count();
                LintResult {
                    success: error_count == 0,
                    message: if error_count > 0 {
                        Some(format!("{} lint error(s)", error_count))
                    } else {
                        Some(format!("{} warning(s)", lint_errors.len()))
                    },
                    errors: Some(
                        lint_errors
                            .into_iter()
                            .map(|e| LintError {
                                line: e.line,
                                col: e.col,
                                message: e.message,
                            })
                            .collect(),
                    ),
                }
            }
        }
        Err(e) => {
            // Parse error - extract line/col if possible
            let msg = e.to_string();
            let (line, col) = parse_error_location(&msg).unwrap_or((1, 1));
            LintResult {
                success: false,
                message: Some(msg.clone()),
                errors: Some(vec![LintError { line, col, message: msg }]),
            }
        }
    }
}

fn parse_error_location(msg: &str) -> Option<(usize, usize)> {
    // Parse "line:col: message" format
    let parts: Vec<&str> = msg.splitn(3, ':').collect();
    if parts.len() >= 2 {
        let line = parts[0].trim().parse().ok()?;
        let col = parts[1].trim().parse().ok()?;
        return Some((line, col));
    }
    None
}

/// Ensure OpenProse telemetry is disabled in the workspace.
/// This speeds up execution by skipping telemetry notices and analytics POST requests.
fn ensure_telemetry_disabled(workspace: &Path) -> Result<(), String> {
    let prose_dir = workspace.join(".prose");
    let env_path = prose_dir.join(".env");

    // Create .prose directory if needed
    if !prose_dir.exists() {
        std::fs::create_dir_all(&prose_dir)
            .map_err(|e| format!("Failed to create .prose directory: {}", e))?;
    }

    // Read existing .env or start fresh
    let existing = std::fs::read_to_string(&env_path).unwrap_or_default();

    // Check if telemetry is already disabled
    if existing.contains("OPENPROSE_TELEMETRY=disabled") {
        return Ok(());
    }

    // Update or add the telemetry setting
    let new_content = if existing.contains("OPENPROSE_TELEMETRY=") {
        // Replace existing setting
        existing
            .lines()
            .map(|line| {
                if line.starts_with("OPENPROSE_TELEMETRY=") {
                    "OPENPROSE_TELEMETRY=disabled"
                } else {
                    line
                }
            })
            .collect::<Vec<_>>()
            .join("\n")
    } else {
        // Add new setting
        if existing.is_empty() {
            "OPENPROSE_TELEMETRY=disabled".to_string()
        } else {
            format!("{}\nOPENPROSE_TELEMETRY=disabled", existing.trim_end())
        }
    };

    std::fs::write(&env_path, new_content)
        .map_err(|e| format!("Failed to write .prose/.env: {}", e))?;

    Ok(())
}

#[derive(Serialize)]
struct RunProseResult {
    run_id: String,
    success: bool,
    message: Option<String>,
}

/// Run a prose program through the executor
#[tauri::command]
async fn run_prose(
    app: tauri::AppHandle,
    workspace_root: Option<String>,
    program_content: String,
    model: Option<String>,
    skip_permissions: Option<bool>,
) -> Result<RunProseResult, String> {
    // Parse the program first
    let program = prose_core::parse_program(&program_content)
        .map_err(|e| format!("Parse error: {}", e))?;

    // Determine effective workspace
    let effective_ws = effective_workspace(workspace_root.as_deref());

    // Generate run ID
    let run_id = format!(
        "run-{}",
        std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap_or_default()
            .as_millis()
    );

    // Ensure run directory exists
    let run_dir = run_dir(&effective_ws, &run_id);
    std::fs::create_dir_all(&run_dir)
        .map_err(|e| format!("Failed to create run dir: {}", e))?;

    // Save the program
    let prog_path = program_path(&effective_ws, &run_id);
    std::fs::write(&prog_path, &program_content)
        .map_err(|e| format!("Failed to write program: {}", e))?;

    // Create trace file
    let trace_path = trace_path(&effective_ws, &run_id);
    let trace_file = File::create(&trace_path)
        .map_err(|e| format!("Failed to create trace: {}", e))?;

    // Configure executor
    let config = ExecutorConfig {
        puppet_mode_enabled: false,
        workspace_root: effective_ws.clone(),
        model: model.unwrap_or_else(|| "sonnet".to_string()),
        skip_permissions: skip_permissions.unwrap_or(true), // Default to true for now (no approval UI yet)
        mock_execution: false,
    };

    // Run the executor
    let executor = Executor::new(config);
    match executor.run(program).await {
        Ok(mut handle) => {
            let app_clone = app.clone();
            let run_id_clone = run_id.clone();
            let trace_file = Arc::new(Mutex::new(trace_file));

            // Process events
            while let Some(event) = handle.events.recv().await {
                // Convert to TraceEvent for consistency
                let trace_event = runtime_event_to_trace(&run_id_clone, &event);

                // Write to trace file
                {
                    let mut f = trace_file.lock().await;
                    if let Ok(json) = serde_json::to_string(&trace_event) {
                        let _ = writeln!(f, "{}", json);
                    }
                }

                // Emit to frontend (emit TraceEvent, not raw RuntimeEvent)
                let _ = app_clone.emit("runtime_event", &trace_event);
            }

            Ok(RunProseResult {
                run_id,
                success: true,
                message: None,
            })
        }
        Err(e) => Ok(RunProseResult {
            run_id,
            success: false,
            message: Some(e.to_string()),
        }),
    }
}

/// Convert a RuntimeEvent to a TraceEvent for logging
fn runtime_event_to_trace(run_id: &str, event: &prose_runtime::RuntimeEvent) -> TraceEvent {
    let mut trace = TraceEvent::new(run_id, "RuntimeEvent");
    trace.put_json("event", serde_json::to_value(event).unwrap_or_default());
    trace
}

/// Run a prose program using Claude-as-VM approach
/// This sends the program + prose.md to Claude, which becomes the VM
#[tauri::command]
async fn run_prose_vm(
    app: tauri::AppHandle,
    vm_state: tauri::State<'_, Arc<Mutex<VmState>>>,
    workspace_root: Option<String>,
    program_content: String,
    model: Option<String>,
    skip_permissions: Option<bool>,
    enable_mcp: Option<bool>,
    resume_id: Option<String>,
    run_id: Option<String>,
) -> Result<RunProseResult, String> {
    // Determine effective workspace
    let effective_ws = effective_workspace(workspace_root.as_deref());

    // Ensure telemetry is disabled for faster execution
    // This prevents the VM from showing telemetry notices and sending analytics
    ensure_telemetry_disabled(&effective_ws)?;

    // Configure VM executor
    let config = VmConfig {
        prose_md_path: effective_ws.join("prose.md"),
        workspace_root: effective_ws.clone(),
        model: model.unwrap_or_else(|| "sonnet".to_string()),
        skip_permissions: skip_permissions.unwrap_or(true), // Default to true for now (no approval UI yet)
        resume_id: resume_id.clone(),
        enable_mcp: enable_mcp.unwrap_or(true),
    };

    let executor = VmExecutor::new(config);

    // Create event channel
    let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<VmEvent>(100);

    // Start execution (non-blocking - returns immediately)
    // If run_id is provided (e.g., from fork), use that existing run directory
    let handle = executor
        .execute(&program_content, event_tx, run_id)
        .await
        .map_err(|e| format!("Failed to start VM: {}", e))?;

    let run_id = handle.run_id.clone();

    // Store MCP server for feedback/puppet mode responses
    {
        let mut state = vm_state.lock().await;
        state.run_id = Some(run_id.clone());
        state.mcp_server = handle.mcp_server;
    }

    // Spawn task to process events and forward to frontend
    let app_clone = app.clone();
    let effective_ws_clone = effective_ws.clone();
    let vm_state_clone = vm_state.inner().clone();

    tokio::spawn(async move {
        let mut trace_file: Option<File> = None;
        let mut saw_completion = false;

        while let Some(event) = event_rx.recv().await {
            // Convert VmEvent to TraceEvent for consistency
            let trace_event = vm_event_to_trace(&event);

            // Create trace file on first event (VmStarted)
            if let VmEvent::VmStarted { ref run_id, .. } = event {
                let path = trace_path(&effective_ws_clone, run_id);
                trace_file = File::create(&path).ok();
            }

            // Write to trace file (flush immediately for real-time streaming)
            if let Some(ref mut f) = trace_file {
                if let Ok(json) = serde_json::to_string(&trace_event) {
                    let _ = writeln!(f, "{}", json);
                    let _ = f.flush(); // Ensure event is written immediately
                }
            }

            // Emit to frontend
            let _ = app_clone.emit("vm_event", &event);
            let _ = app_clone.emit("runtime_event", &trace_event);

            // Clear VM state on completion
            if matches!(event, VmEvent::VmCompleted { .. }) {
                saw_completion = true;
                let mut state = vm_state_clone.lock().await;
                state.run_id = None;
                state.mcp_server = None;
            }
        }

        // Channel closed without VmCompleted - emit error and cleanup
        if !saw_completion {
            let mut state = vm_state_clone.lock().await;
            if let Some(ref run_id) = state.run_id {
                let mut error_trace = TraceEvent::new(run_id, "ErrorOccurred");
                error_trace.put_str("message", "VM event channel closed unexpectedly");
                error_trace.put_str("error_type", "ChannelClosed");

                // Write to trace file
                if let Some(ref mut f) = trace_file {
                    if let Ok(json) = serde_json::to_string(&error_trace) {
                        let _ = writeln!(f, "{}", json);
                        let _ = f.flush();
                    }
                }

                // Emit to frontend
                let _ = app_clone.emit("runtime_event", &error_trace);
            }
            state.run_id = None;
            state.mcp_server = None;
        }
    });

    Ok(RunProseResult {
        run_id,
        success: true,
        message: None,
    })
}

/// Submit feedback response to an MCP request (vm_ask_user or vm_suspend)
#[tauri::command]
async fn submit_feedback(
    vm_state: tauri::State<'_, Arc<Mutex<VmState>>>,
    request_id: String,
    response: serde_json::Value,
) -> Result<(), String> {
    let state = vm_state.lock().await;

    if let Some(ref mcp_server) = state.mcp_server {
        mcp_server
            .respond(&request_id, response)
            .await
            .map_err(|e| format!("Failed to submit feedback: {}", e))
    } else {
        Err("No active VM session".to_string())
    }
}

/// Convert a VmEvent to a TraceEvent for logging and UI
fn vm_event_to_trace(event: &VmEvent) -> TraceEvent {
    match event {
        VmEvent::VmStarted { run_id, program_file, resume_id } => {
            let mut trace = TraceEvent::new(run_id, "RunStarted");
            trace.put_str("program_file", program_file);
            if let Some(rid) = resume_id {
                trace.put_str("resume_id", rid);
            }
            trace
        }
        VmEvent::Thought { run_id, text, position } => {
            let mut trace = TraceEvent::new(run_id, "Thought");
            trace.put_str("text", text);
            if let Some(pos) = position {
                trace.put_str("position", pos);
            }
            trace
        }
        VmEvent::ContextAssembled { run_id, session_id, token_estimate, prompt_tokens } => {
            let mut trace = TraceEvent::new(run_id, "ContextAssembled");
            trace.session_id = Some(session_id.clone());
            trace.put_json("token_estimate", serde_json::json!(*token_estimate));
            // Add a synthetic "prompt" binding for the Context tab
            trace.put_json("included_bindings", serde_json::json!([{
                "name": "prompt",
                "binding_id": "prompt",
                "token_count": *prompt_tokens,
                "reason": "required",
                "reason_detail": "Session prompt"
            }]));
            trace
        }
        VmEvent::SessionSpawned { run_id, session_id, agent, prompt_preview } => {
            let mut trace = TraceEvent::new(run_id, "SessionStarted");
            trace.session_id = Some(session_id.clone());
            if let Some(a) = agent {
                trace.put_str("agent_id", a);
            }
            trace.put_str("prompt_preview", prompt_preview);
            trace
        }
        VmEvent::SessionCompleted { run_id, session_id, output_preview } => {
            let mut trace = TraceEvent::new(run_id, "SessionCompleted");
            trace.session_id = Some(session_id.clone());
            trace.put_str("output_preview", output_preview);
            trace
        }
        VmEvent::SessionFailed { run_id, session_id, error } => {
            let mut trace = TraceEvent::new(run_id, "SessionFailed");
            trace.session_id = Some(session_id.clone());
            trace.put_str("error", error);
            trace
        }
        VmEvent::BindingCreated { run_id, name, path } => {
            let mut trace = TraceEvent::new(run_id, "BindingMaterialized");
            trace.put_str("name", name);
            trace.put_str("path", path);
            // Use path as stable binding_id (since we don't have a content hash)
            trace.put_str("binding_id", path);
            trace
        }
        VmEvent::ToolCall { run_id, tool_id, tool_name, input_preview } => {
            let mut trace = TraceEvent::new(run_id, "ToolCall");
            trace.put_str("tool_id", tool_id);
            trace.put_str("tool_name", tool_name);
            trace.put_str("input_preview", input_preview);
            trace
        }
        VmEvent::ToolResult { run_id, tool_id, is_error, output_preview } => {
            let mut trace = TraceEvent::new(run_id, "ToolResult");
            trace.put_str("tool_id", tool_id);
            trace.put_json("is_error", serde_json::json!(*is_error));
            trace.put_str("output_preview", output_preview);
            trace
        }
        VmEvent::TextOutput { run_id, text } => {
            let mut trace = TraceEvent::new(run_id, "TextOutput");
            trace.put_str("text", text);
            trace
        }
        VmEvent::Processing { run_id } => {
            TraceEvent::new(run_id, "Processing")
        }
        VmEvent::StreamingText { run_id, text, is_complete } => {
            let mut trace = TraceEvent::new(run_id, "StreamingText");
            trace.put_str("text", text);
            trace.put_json("is_complete", serde_json::json!(*is_complete));
            trace
        }
        VmEvent::FeedbackRequested { run_id, request_id, summary, question, options } => {
            let mut trace = TraceEvent::new(run_id, "HumanInterventionRequested");
            trace.put_str("request_id", request_id);
            trace.put_str("summary", summary);
            if let Some(q) = question {
                trace.put_str("question", q);
            }
            trace.put_json("options", serde_json::json!(options));
            trace
        }
        VmEvent::PuppetModeEntered { run_id, request_id, session_id, context_summary } => {
            let mut trace = TraceEvent::new(run_id, "PuppetModeEntered");
            trace.put_str("request_id", request_id);
            trace.session_id = Some(session_id.clone());
            trace.put_str("context_summary", context_summary);
            trace
        }
        VmEvent::VmCompleted { run_id, success, error, resume_id, stderr_tail } => {
            let event_type = if *success { "RunCompleted" } else { "RunFailed" };
            let mut trace = TraceEvent::new(run_id, event_type);
            trace.put_json("success", serde_json::json!(*success));
            if let Some(e) = error {
                trace.put_str("message", e); // Use "message" for error details (consistent with RunFailed)
            }
            if let Some(rid) = resume_id {
                trace.put_str("resume_id", rid);
            }
            if let Some(tail) = stderr_tail {
                trace.put_str("stderr_tail", tail);
            }
            trace
        }
    }
}

/// List prose files in a workspace
#[tauri::command]
fn list_prose_files(workspace_root: Option<String>) -> Result<Vec<String>, String> {
    let root = effective_workspace(workspace_root.as_deref());
    let mut files = Vec::new();

    // Look for .prose files in the workspace
    let patterns = [
        root.join("*.prose"),
        root.join("**/*.prose"),
    ];

    for pattern in &patterns {
        if let Ok(entries) = glob::glob(&pattern.to_string_lossy()) {
            for entry in entries.flatten() {
                if let Some(path_str) = entry.to_str() {
                    files.push(path_str.to_string());
                }
            }
        }
    }

    files.sort();
    files.dedup();
    Ok(files)
}

/// Clear all runs in a workspace
#[tauri::command]
fn clear_runs(workspace_root: Option<String>) -> Result<usize, String> {
    let root = effective_workspace(workspace_root.as_deref());
    let runs_dir = root.join(".prose").join("runs");

    if !runs_dir.exists() {
        return Ok(0);
    }

    let mut count = 0;
    let entries = std::fs::read_dir(&runs_dir)
        .map_err(|e| format!("Failed to read runs directory: {}", e))?;

    for entry in entries.flatten() {
        if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
            if std::fs::remove_dir_all(entry.path()).is_ok() {
                count += 1;
            }
        }
    }

    Ok(count)
}

/// Delete a specific run
#[tauri::command]
fn delete_run(workspace_root: String, run_id: String) -> Result<(), String> {
    ensure_safe_component("run_id", &run_id)?;
    let root = PathBuf::from(&workspace_root);
    let dir = run_dir(&root, &run_id);

    if !dir.exists() {
        return Err(format!("Run {} not found", run_id));
    }

    std::fs::remove_dir_all(&dir)
        .map_err(|e| format!("Failed to delete run: {}", e))
}

// =============================================================================
// E2E Testing: Interaction simulation (focus-free)
// =============================================================================

/// Interaction types for E2E testing - executed via webview.eval() so no focus stealing
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Interaction {
    Click { selector: String },
    ClickXy { x: f64, y: f64 },
    Type { selector: String, text: String },
    Clear { selector: String },
    Focus { selector: String },
}

#[tauri::command]
async fn simulate_interaction(
    webview: tauri::Webview,
    interaction: Interaction,
) -> Result<String, String> {
    let js = match interaction {
        Interaction::Click { selector } => {
            format!(
                r#"(function() {{
                    const el = document.querySelector({:?});
                    if (!el) return JSON.stringify({{ ok: false, error: "not found: " + {:?} }});
                    el.scrollIntoView({{ block: "center" }});
                    el.click();
                    return JSON.stringify({{ ok: true }});
                }})()"#,
                selector, selector
            )
        }
        Interaction::ClickXy { x, y } => {
            format!(
                r#"(function() {{
                    const el = document.elementFromPoint({}, {});
                    if (!el) return JSON.stringify({{ ok: false, error: "no element at ({}, {})" }});
                    el.dispatchEvent(new MouseEvent('click', {{
                        view: window, bubbles: true, cancelable: true, clientX: {}, clientY: {}
                    }}));
                    return JSON.stringify({{ ok: true, tag: el.tagName }});
                }})()"#,
                x, y, x, y, x, y
            )
        }
        Interaction::Type { selector, text } => {
            format!(
                r#"(function() {{
                    const el = document.querySelector({:?});
                    if (!el) return JSON.stringify({{ ok: false, error: "not found: " + {:?} }});
                    el.focus();
                    if (el.tagName === 'INPUT' || el.tagName === 'TEXTAREA') {{
                        el.value = {:?};
                        el.dispatchEvent(new Event('input', {{ bubbles: true }}));
                        el.dispatchEvent(new Event('change', {{ bubbles: true }}));
                    }} else if (el.isContentEditable) {{
                        el.textContent = {:?};
                        el.dispatchEvent(new Event('input', {{ bubbles: true }}));
                    }} else {{
                        return JSON.stringify({{ ok: false, error: "element not editable" }});
                    }}
                    return JSON.stringify({{ ok: true }});
                }})()"#,
                selector, selector, text, text
            )
        }
        Interaction::Clear { selector } => {
            format!(
                r#"(function() {{
                    const el = document.querySelector({:?});
                    if (!el) return JSON.stringify({{ ok: false, error: "not found: " + {:?} }});
                    if (el.tagName === 'INPUT' || el.tagName === 'TEXTAREA') {{
                        el.value = '';
                        el.dispatchEvent(new Event('input', {{ bubbles: true }}));
                    }} else if (el.isContentEditable) {{
                        el.textContent = '';
                        el.dispatchEvent(new Event('input', {{ bubbles: true }}));
                    }}
                    return JSON.stringify({{ ok: true }});
                }})()"#,
                selector, selector
            )
        }
        Interaction::Focus { selector } => {
            format!(
                r#"(function() {{
                    const el = document.querySelector({:?});
                    if (!el) return JSON.stringify({{ ok: false, error: "not found: " + {:?} }});
                    el.focus();
                    return JSON.stringify({{ ok: true }});
                }})()"#,
                selector, selector
            )
        }
    };

    webview.eval(&js).map_err(|e| e.to_string())?;
    // eval doesn't return the result directly, but the JS writes to a global we could read
    // For now just return success - the JS will throw if there's an error
    Ok(r#"{"ok":true}"#.to_string())
}

// =============================================================================
// Snapshot for E2E testing (macOS only)
// =============================================================================

const SNAPSHOT_PATH: &str = "/tmp/mission-control-snapshot.png";

#[tauri::command]
async fn capture_snapshot(webview: tauri::Webview) -> Result<String, String> {
    #[cfg(target_os = "macos")]
    {
        use block::ConcreteBlock;

        let (tx, rx) = tokio::sync::oneshot::channel();

        let tx_mutex = std::sync::Arc::new(std::sync::Mutex::new(Some::<
            tokio::sync::oneshot::Sender<Result<String, String>>,
        >(tx)));

        webview
            .with_webview(move |webview_ptr| {
                unsafe {
                    let webview_id = webview_ptr.inner() as id;
                    let config: id = msg_send![class!(WKSnapshotConfiguration), new];

                    let tx_clone = tx_mutex.clone();
                    let block = ConcreteBlock::new(move |image: id, _error: id| {
                        let mut tx_lock = match tx_clone.lock() {
                            Ok(guard) => guard,
                            Err(poisoned) => poisoned.into_inner(), // Recover from poisoned mutex
                        };
                        if let Some(tx) = tx_lock.take() {
                            if image == nil {
                                let _ = tx.send(Err("Snapshot returned nil image".to_string()));
                                return;
                            }

                            let tiff_data: id = msg_send![image, TIFFRepresentation];
                            let bitmap_rep: id =
                                msg_send![class!(NSBitmapImageRep), imageRepWithData:tiff_data];
                            let png_data: id =
                                msg_send![bitmap_rep, representationUsingType:4 properties:nil];

                            let bytes: *const u8 = msg_send![png_data, bytes];
                            let length: usize = msg_send![png_data, length];
                            let data = std::slice::from_raw_parts(bytes, length);

                            match std::fs::write(SNAPSHOT_PATH, data) {
                                Ok(_) => {
                                    let _ = tx.send(Ok(SNAPSHOT_PATH.to_string()));
                                }
                                Err(e) => {
                                    let _ = tx.send(Err(e.to_string()));
                                }
                            }
                        }
                    });

                    let block_copy = block.copy();
                    let _: () = msg_send![webview_id, takeSnapshotWithConfiguration:config completionHandler:block_copy];
                }
            })
            .map_err(|e| e.to_string())?;

        rx.await
            .map_err(|e: tokio::sync::oneshot::error::RecvError| e.to_string())?
    }

    #[cfg(not(target_os = "macos"))]
    {
        Err("Snapshot only supported on macOS".to_string())
    }
}

pub fn run() {
    let mut builder = tauri::Builder::default()
        .plugin(tauri_plugin_dialog::init())
        .plugin(tauri_plugin_opener::init())
        .manage(Mutex::new(WatchState::default()))
        .manage(Arc::new(Mutex::new(VmState::default())))
        .invoke_handler(tauri::generate_handler![
            list_runs,
            read_run_events,
            read_run_program,
            read_run_binding,
            append_intervention,
            fork_run,
            blame_session,
            checkpoint_list,
            checkpoint_create,
            checkpoint_restore,
            checkpoint_diff,
            watch_run,
            watch_stop,
            save_file,
            open_file,
            lint_prose,
            run_prose,
            list_prose_files,
            clear_runs,
            delete_run,
            run_prose_vm,
            submit_feedback,
            capture_snapshot,
            simulate_interaction
        ]);

    // E2E testing: MCP plugin for CLI interaction (debug builds only)
    #[cfg(debug_assertions)]
    {
        let socket_path = std::path::PathBuf::from("/tmp/mission-control-mcp.sock");
        if socket_path.exists() {
            let _ = std::fs::remove_file(&socket_path);
        }
        builder = builder.plugin(tauri_plugin_mcp::init_with_config(
            tauri_plugin_mcp::PluginConfig::new("mission-control".to_string())
                .socket_path(socket_path)
                .start_socket_server(true),
        ));
    }

    builder
        .run(tauri::generate_context!())
        .expect("error while running tauri application");
}

File: ide/crates/prose-runtime/src/lib.rs

mod claude;
mod executor;
mod mcp_server;
mod rpc;
mod suspension;
mod vm;

pub use claude::{run_claude, ClaudeConfig, ClaudeEvent};
pub use executor::{Executor, ExecutorConfig, RuntimeEvent};
pub use mcp_server::{McpEvent, McpResponse, McpServer};
pub use rpc::{RuntimeServer, RuntimeClient};
pub use suspension::{ExecutionState, ResumeData, SuspensionPoint};
pub use vm::{VmConfig, VmEvent, VmExecutionHandle, VmExecutor, SessionState};

File: ide/crates/prose-runtime/src/vm.rs

//! Claude-as-VM executor.
//!
//! Incorporates lessons from conductor codebase:
//! - Stderr tail capture for debugging
//! - Session state persistence with resume tokens
//! - Robust process exit handling
//! - Atomic JSON line parsing

use crate::mcp_server::{McpEvent, McpServer};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use std::path::{Path, PathBuf};
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc;

/// Maximum stderr lines to keep for debugging (from conductor).
const STDERR_TAIL_LINES: usize = 200;

/// Events emitted during VM execution.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "event_type")]
pub enum VmEvent {
    /// VM started executing a program.
    VmStarted {
        run_id: String,
        program_file: String,
        resume_id: Option<String>,
    },
    /// VM reported a thought/reasoning step.
    Thought {
        run_id: String,
        text: String,
        position: Option<String>,
    },
    /// A session was spawned via Task tool.
    SessionSpawned {
        run_id: String,
        session_id: String,
        agent: Option<String>,
        prompt_preview: String,
    },
    /// Context was assembled for a session (token budget tracking).
    ContextAssembled {
        run_id: String,
        session_id: String,
        token_estimate: u32,
        prompt_tokens: u32,
    },
    /// A session completed successfully.
    SessionCompleted {
        run_id: String,
        session_id: String,
        output_preview: String,
    },
    /// A session failed with an error.
    SessionFailed {
        run_id: String,
        session_id: String,
        error: String,
    },
    /// A binding was created/materialized.
    BindingCreated {
        run_id: String,
        name: String,
        path: String,
    },
    /// A tool was called (generic, for visibility).
    ToolCall {
        run_id: String,
        tool_id: String,
        tool_name: String,
        input_preview: String,
    },
    /// A tool returned a result.
    ToolResult {
        run_id: String,
        tool_id: String,
        is_error: bool,
        output_preview: String,
    },
    /// Claude output text (narration, etc).
    TextOutput {
        run_id: String,
        text: String,
    },
    /// Claude has started processing (message_start received).
    Processing {
        run_id: String,
    },
    /// Streaming text update (partial, for progress indication).
    StreamingText {
        run_id: String,
        text: String,
        is_complete: bool,
    },
    /// Feedback requested (suspension point) - from MCP vm_ask_user.
    FeedbackRequested {
        run_id: String,
        request_id: String,
        summary: String,
        question: Option<String>,
        options: Vec<String>,
    },
    /// Puppet mode suspension requested - from MCP vm_suspend.
    PuppetModeEntered {
        run_id: String,
        request_id: String,
        session_id: String,
        context_summary: String,
    },
    /// VM execution completed.
    VmCompleted {
        run_id: String,
        success: bool,
        error: Option<String>,
        resume_id: Option<String>,
        stderr_tail: Option<String>,
    },
}

/// Session state for persistence (like conductor's session.json).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionState {
    pub run_id: String,
    pub resume_id: Option<String>,
    pub model: String,
    pub started_at: String,
    pub updated_at: String,
}

/// Configuration for VM execution.
#[derive(Debug, Clone)]
pub struct VmConfig {
    /// Path to prose.md (VM specification).
    pub prose_md_path: PathBuf,
    /// Working directory for execution.
    pub workspace_root: PathBuf,
    /// Model to use (sonnet, opus, haiku).
    pub model: String,
    /// Skip permission prompts.
    pub skip_permissions: bool,
    /// Resume ID from previous session (for continuity).
    pub resume_id: Option<String>,
    /// Enable MCP server for feedback/puppet mode.
    pub enable_mcp: bool,
}

impl Default for VmConfig {
    fn default() -> Self {
        Self {
            prose_md_path: PathBuf::from("prose.md"),
            workspace_root: std::env::current_dir().unwrap_or_default(),
            model: "sonnet".to_string(),
            skip_permissions: false,
            resume_id: None,
            enable_mcp: true,
        }
    }
}

/// Result of VM execution, including MCP handle for responding to requests.
pub struct VmExecutionHandle {
    pub run_id: String,
    pub mcp_server: Option<McpServer>,
}

/// The VM executor - runs Claude as the OpenProse VM.
pub struct VmExecutor {
    config: VmConfig,
}

impl VmExecutor {
    pub fn new(config: VmConfig) -> Self {
        Self { config }
    }

    /// Execute a .prose program using Claude as the VM.
    /// Returns a handle with the run_id and optional MCP server for responding to requests.
    /// The execution runs in the background - listen for VmCompleted event for completion.
    /// If `existing_run_id` is provided, uses that run directory (from fork) instead of creating new.
    pub async fn execute(
        &self,
        program_content: &str,
        event_tx: mpsc::Sender<VmEvent>,
        existing_run_id: Option<String>,
    ) -> Result<VmExecutionHandle> {
        // Use provided run_id or generate new one
        let run_id = existing_run_id.unwrap_or_else(generate_run_id);
        let run_dir = self
            .config
            .workspace_root
            .join(".prose")
            .join("runs")
            .join(&run_id);

        // Create run directory structure (may already exist if forked)
        std::fs::create_dir_all(run_dir.join("bindings"))?;
        std::fs::create_dir_all(run_dir.join("agents"))?;

        // Save/overwrite the program
        std::fs::write(run_dir.join("program.prose"), program_content)?;

        // Read prose.md (VM specification)
        let prose_md = std::fs::read_to_string(&self.config.prose_md_path)
            .unwrap_or_else(|_| include_str!("../../../../skills/open-prose/prose.md").to_string());

        // Build the VM prompt
        let prompt = build_vm_prompt(
            &prose_md,
            program_content,
            &self.config.workspace_root,
            &run_id,
        );

        // Start MCP server if enabled
        let (mcp_server, mcp_config_path) = if self.config.enable_mcp {
            let (server, mcp_event_rx) = McpServer::start().await?;

            // Write MCP config to temp file
            let config_path = run_dir.join("mcp-config.json");
            let config = server.mcp_config();
            std::fs::write(&config_path, serde_json::to_string_pretty(&config)?)?;

            // Forward MCP events to VmEvent channel
            let event_tx_mcp = event_tx.clone();
            let run_id_mcp = run_id.clone();

            tokio::spawn(async move {
                let mut rx = mcp_event_rx;
                while let Some(mcp_event) = rx.recv().await {
                    let vm_event = match mcp_event {
                        McpEvent::AskUser {
                            request_id,
                            summary,
                            question,
                            options,
                        } => VmEvent::FeedbackRequested {
                            run_id: run_id_mcp.clone(),
                            request_id,
                            summary,
                            question,
                            options,
                        },
                        McpEvent::Suspend {
                            request_id,
                            session_id,
                            context_summary,
                        } => VmEvent::PuppetModeEntered {
                            run_id: run_id_mcp.clone(),
                            request_id,
                            session_id,
                            context_summary,
                        },
                    };
                    if event_tx_mcp.send(vm_event).await.is_err() {
                        break;
                    }
                }
            });

            (Some(server), Some(config_path))
        } else {
            (None, None)
        };

        // Emit start event (with existing resume_id if resuming)
        event_tx
            .send(VmEvent::VmStarted {
                run_id: run_id.clone(),
                program_file: "program.prose".to_string(),
                resume_id: self.config.resume_id.clone(),
            })
            .await?;

        // Build claude CLI args
        let mut args = vec![
            "-p".to_string(),
            "--output-format".to_string(),
            "stream-json".to_string(),
            "--verbose".to_string(),
            "--model".to_string(),
            self.config.model.clone(),
        ];

        if self.config.skip_permissions {
            args.push("--dangerously-skip-permissions".to_string());
        }

        if let Some(ref resume) = self.config.resume_id {
            args.push("--resume".to_string());
            args.push(resume.clone());
        }

        // Add MCP config if enabled
        if let Some(ref config_path) = mcp_config_path {
            args.push("--mcp-config".to_string());
            args.push(config_path.display().to_string());
        }

        args.push("--".to_string());
        args.push(prompt);

        // Spawn claude CLI
        let mut child = Command::new("claude")
            .args(&args)
            .current_dir(&self.config.workspace_root)
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
            .spawn()?;

        let stdout = child
            .stdout
            .take()
            .ok_or_else(|| anyhow::anyhow!("Failed to capture stdout"))?;

        let stderr = child
            .stderr
            .take()
            .ok_or_else(|| anyhow::anyhow!("Failed to capture stderr"))?;

        // Parse stdout stream (each line is complete JSON - conductor pattern)
        let event_tx_stdout = event_tx.clone();
        let run_id_stdout = run_id.clone();
        let run_dir_stdout = run_dir.clone();
        let stdout_task = tokio::spawn(async move {
            let mut reader = BufReader::new(stdout).lines();
            let mut parser = VmOutputParser::new(run_id_stdout, run_dir_stdout);

            while let Ok(Some(line)) = reader.next_line().await {
                // Each line is atomic JSON (conductor pattern)
                if let Ok(value) = serde_json::from_str::<Value>(&line) {
                    for event in parser.parse(&value) {
                        if event_tx_stdout.send(event).await.is_err() {
                            break;
                        }
                    }
                }
            }
            parser.resume_id
        });

        // Capture stderr tail for debugging (conductor pattern: last 200 lines)
        let stderr_task = tokio::spawn(async move {
            let mut reader = BufReader::new(stderr).lines();
            let mut tail: VecDeque<String> = VecDeque::with_capacity(STDERR_TAIL_LINES);

            while let Ok(Some(line)) = reader.next_line().await {
                if tail.len() >= STDERR_TAIL_LINES {
                    tail.pop_front();
                }
                tail.push_back(line);
            }

            tail.into_iter().collect::<Vec<_>>().join("\n")
        });

        // Spawn background task for process completion
        let event_tx_completion = event_tx;
        let run_id_completion = run_id.clone();
        let model = self.config.model.clone();
        tokio::spawn(async move {
            // Wait for process exit
            let status = child.wait().await;
            let resume_id = stdout_task.await.ok().flatten();
            let stderr_tail = stderr_task.await.unwrap_or_default();

            // Save session state for resumption (conductor pattern)
            let session_state = SessionState {
                run_id: run_id_completion.clone(),
                resume_id: resume_id.clone(),
                model,
                started_at: chrono::Utc::now().to_rfc3339(),
                updated_at: chrono::Utc::now().to_rfc3339(),
            };
            let session_path = run_dir.join("session.json");
            if let Ok(json) = serde_json::to_string_pretty(&session_state) {
                let _ = std::fs::write(&session_path, json);
            }

            let (success, error) = match status {
                Ok(s) if s.success() => (true, None),
                Ok(s) => {
                    let err_msg = format!("Exit code: {:?}", s.code());
                    let error = if !stderr_tail.is_empty() {
                        Some(format!("{}\n\nStderr:\n{}", err_msg, stderr_tail))
                    } else {
                        Some(err_msg)
                    };
                    (false, error)
                }
                Err(e) => (false, Some(e.to_string())),
            };

            let _ = event_tx_completion
                .send(VmEvent::VmCompleted {
                    run_id: run_id_completion,
                    success,
                    error,
                    resume_id,
                    stderr_tail: if stderr_tail.is_empty() {
                        None
                    } else {
                        Some(stderr_tail)
                    },
                })
                .await;
        });

        Ok(VmExecutionHandle {
            run_id,
            mcp_server,
        })
    }
}

/// Build the prompt that makes Claude become the VM.
fn build_vm_prompt(prose_md: &str, program: &str, workspace: &Path, run_id: &str) -> String {
    format!(
        r#"You are the OpenProse VM. Read and internalize the VM specification below, then execute the program that follows.

CRITICAL: You ARE the virtual machine. This is not a simulation—you perform real execution:
- Each `session` statement spawns a real subagent via the Task tool
- Write bindings to the state directory as specified
- Use the narration protocol to track your execution state
- Evaluate discretion conditions (`**...**`) with your intelligence

=== VM SPECIFICATION ===
{prose_md}

=== PROGRAM TO EXECUTE ===
```prose
{program}

=== EXECUTION CONTEXT === Working directory: {workspace} Run ID: {run_id} State directory: {workspace}/.prose/runs/{run_id}/ Bindings directory: {workspace}/.prose/runs/{run_id}/bindings/

=== INSTRUCTIONS ===

  1. Parse the program structure (imports, agents, blocks, statements)
  2. Execute statements in order
  3. For each session statement, use the Task tool to spawn a subagent
  4. Write outputs to bindings/ as markdown files
  5. Track your position using narration: [Position] statement N, [Binding] name, etc.

Begin execution now. Report your reasoning as you go."#, prose_md = prose_md, program = program, workspace = workspace.display(), run_id = run_id, ) }

/// Persist resume_id immediately when captured (for crash recovery). /// This spawns a blocking task to avoid blocking the async runtime. fn persist_resume_id(run_dir: PathBuf, resume_id: String) { // Spawn blocking task to avoid blocking async runtime tokio::task::spawn_blocking(move || { let session_path = run_dir.join("session.json");

    // Read existing session state or create minimal one
    let mut state: serde_json::Value = if let Ok(content) = std::fs::read_to_string(&session_path) {
        serde_json::from_str(&content).unwrap_or_else(|_| serde_json::json!({}))
    } else {
        serde_json::json!({})
    };

    // Update resume_id and timestamp
    state["resume_id"] = serde_json::json!(resume_id);
    state["updated_at"] = serde_json::json!(chrono::Utc::now().to_rfc3339());

    // Write atomically (write to temp then rename)
    let tmp_path = session_path.with_extension("json.tmp");
    if let Ok(json) = serde_json::to_string_pretty(&state) {
        if std::fs::write(&tmp_path, &json).is_ok() {
            let _ = std::fs::rename(&tmp_path, &session_path);
        }
    }
});

}

/// Parser for Claude's stream-json output, mapping to VM events. struct VmOutputParser { run_id: String, run_dir: PathBuf, resume_id: Option, pending_tools: HashMap<String, ToolInfo>, session_counter: u32, stream_state: StreamState, }

struct ToolInfo { name: String, #[allow(dead_code)] input: Value, session_id: Option, // For Task tools, the assigned session_id }

/// Streaming state for real-time progress during Claude processing. #[derive(Default)] struct StreamState { /// Currently active content block type (text, thinking, tool_use). current_block_type: Option, /// Buffer for accumulating text deltas. text_buffer: String, /// Count of deltas since last emit. delta_count: usize, /// Whether we've emitted the Processing event. processing_emitted: bool, }

impl VmOutputParser { fn new(run_id: String, run_dir: PathBuf) -> Self { Self { run_id, run_dir, resume_id: None, pending_tools: HashMap::new(), session_counter: 0, stream_state: StreamState::default(), } }

fn parse(&mut self, value: &Value) -> Vec<VmEvent> {
    let mut events = Vec::new();
    let event_type = value.get("type").and_then(Value::as_str).unwrap_or("");

    match event_type {
        "system" => {
            // Extract resume token from init event (conductor pattern)
            if value.get("subtype").and_then(Value::as_str) == Some("init") {
                if let Some(sid) = value.get("session_id").and_then(Value::as_str) {
                    self.resume_id = Some(sid.to_string());
                    // Persist immediately for crash recovery (non-blocking)
                    persist_resume_id(self.run_dir.clone(), sid.to_string());
                }
            }
        }

        // Streaming events - for real-time progress indication
        "message_start" => {
            if !self.stream_state.processing_emitted {
                self.stream_state.processing_emitted = true;
                events.push(VmEvent::Processing {
                    run_id: self.run_id.clone(),
                });
            }
        }

        "content_block_start" => {
            // Track what type of content block is starting
            if let Some(block) = value.get("content_block").and_then(Value::as_object) {
                let block_type = block.get("type").and_then(Value::as_str).unwrap_or("");
                self.stream_state.current_block_type = Some(block_type.to_string());
                self.stream_state.text_buffer.clear();
                self.stream_state.delta_count = 0;
            }
        }

        "content_block_delta" => {
            if let Some(delta) = value.get("delta").and_then(Value::as_object) {
                let delta_type = delta.get("type").and_then(Value::as_str).unwrap_or("");

                let text = match delta_type {
                    "text_delta" => delta.get("text").and_then(Value::as_str),
                    "thinking_delta" => delta.get("thinking").and_then(Value::as_str),
                    _ => None,
                };

                if let Some(text) = text {
                    self.stream_state.text_buffer.push_str(text);
                    self.stream_state.delta_count += 1;

                    // Emit periodic updates (every 15 deltas or ~500 chars)
                    if self.stream_state.delta_count >= 15
                        || self.stream_state.text_buffer.len() >= 500
                    {
                        let is_thinking =
                            self.stream_state.current_block_type.as_deref() == Some("thinking");

                        if is_thinking {
                            events.push(VmEvent::Thought {
                                run_id: self.run_id.clone(),
                                text: self.stream_state.text_buffer.clone(),
                                position: None,
                            });
                        } else {
                            events.push(VmEvent::StreamingText {
                                run_id: self.run_id.clone(),
                                text: self.stream_state.text_buffer.clone(),
                                is_complete: false,
                            });
                        }

                        self.stream_state.text_buffer.clear();
                        self.stream_state.delta_count = 0;
                    }
                }
            }
        }

        "content_block_stop" => {
            // Flush any remaining buffered text
            if !self.stream_state.text_buffer.is_empty() {
                let is_thinking =
                    self.stream_state.current_block_type.as_deref() == Some("thinking");

                if is_thinking {
                    events.push(VmEvent::Thought {
                        run_id: self.run_id.clone(),
                        text: std::mem::take(&mut self.stream_state.text_buffer),
                        position: None,
                    });
                } else {
                    events.push(VmEvent::StreamingText {
                        run_id: self.run_id.clone(),
                        text: std::mem::take(&mut self.stream_state.text_buffer),
                        is_complete: true,
                    });
                }
            }
            self.stream_state.current_block_type = None;
        }

        "assistant" => {
            if let Some(message) = value.get("message").and_then(Value::as_object) {
                if let Some(content) = message.get("content").and_then(Value::as_array) {
                    for block in content {
                        let block_type =
                            block.get("type").and_then(Value::as_str).unwrap_or("");

                        match block_type {
                            "tool_use" => {
                                events.extend(self.parse_tool_use(block));
                            }
                            "tool_result" => {
                                events.extend(self.parse_tool_result(block));
                            }
                            "text" => {
                                if let Some(text) = block.get("text").and_then(Value::as_str) {
                                    events.push(VmEvent::TextOutput {
                                        run_id: self.run_id.clone(),
                                        text: text.to_string(),
                                    });

                                    // Extract narration markers
                                    events.extend(self.extract_narration(text));
                                }
                            }
                            "thinking" => {
                                if let Some(text) =
                                    block.get("thinking").and_then(Value::as_str)
                                {
                                    events.push(VmEvent::Thought {
                                        run_id: self.run_id.clone(),
                                        text: text.to_string(),
                                        position: None,
                                    });
                                }
                            }
                            _ => {}
                        }
                    }
                }
            }
        }

        "user" => {
            // User messages contain tool_result blocks (from internal tool execution)
            if let Some(message) = value.get("message").and_then(Value::as_object) {
                if let Some(content) = message.get("content").and_then(Value::as_array) {
                    for block in content {
                        let block_type =
                            block.get("type").and_then(Value::as_str).unwrap_or("");

                        if block_type == "tool_result" {
                            events.extend(self.parse_tool_result(block));
                        }
                    }
                }
            }
        }

        "result" => {
            // Completion event - could extract usage stats here
            // For now, just note that we've seen completion
        }

        _ => {}
    }

    events
}

fn parse_tool_use(&mut self, block: &Value) -> Vec<VmEvent> {
    let Some(tool_id) = block.get("id").and_then(Value::as_str) else {
        return vec![];
    };
    let Some(name) = block.get("name").and_then(Value::as_str) else {
        return vec![];
    };
    let input = block.get("input").cloned().unwrap_or(Value::Null);

    // Check if this is a Task call (session spawn)
    if name == "Task" {
        self.session_counter += 1;
        let session_id = format!("s{:04}", self.session_counter);

        // Store with session_id for later matching with result
        self.pending_tools.insert(
            tool_id.to_string(),
            ToolInfo {
                name: name.to_string(),
                input: input.clone(),
                session_id: Some(session_id.clone()),
            },
        );

        let full_prompt = input
            .get("prompt")
            .and_then(Value::as_str)
            .unwrap_or_default();
        let prompt_preview = truncate(full_prompt, 100);
        let agent = input
            .get("subagent_type")
            .and_then(Value::as_str)
            .map(String::from);

        // Estimate tokens: ~4 chars per token
        // Don't add artificial overhead - let the breakdown match the total
        let prompt_tokens = (full_prompt.len() / 4).max(1) as u32;
        let token_estimate = prompt_tokens;

        return vec![
            VmEvent::ContextAssembled {
                run_id: self.run_id.clone(),
                session_id: session_id.clone(),
                token_estimate,
                prompt_tokens,
            },
            VmEvent::SessionSpawned {
                run_id: self.run_id.clone(),
                session_id,
                agent,
                prompt_preview,
            },
        ];
    }

    // Non-Task tool: store for later matching with result
    self.pending_tools.insert(
        tool_id.to_string(),
        ToolInfo {
            name: name.to_string(),
            input: input.clone(),
            session_id: None,
        },
    );

    // Generic tool call
    vec![VmEvent::ToolCall {
        run_id: self.run_id.clone(),
        tool_id: tool_id.to_string(),
        tool_name: name.to_string(),
        input_preview: truncate(&input.to_string(), 200),
    }]
}

fn parse_tool_result(&mut self, block: &Value) -> Vec<VmEvent> {
    let mut events = Vec::new();
    let tool_use_id = block
        .get("tool_use_id")
        .and_then(Value::as_str)
        .unwrap_or("");
    let is_error = block.get("is_error").and_then(Value::as_bool) == Some(true);

    let output_preview = extract_result_preview(block.get("content"));

    // Check if this was a Task call
    if let Some(tool_info) = self.pending_tools.remove(tool_use_id) {
        if tool_info.name == "Task" {
            // Use the stored session_id (or fallback to counter-based for legacy)
            let session_id = tool_info
                .session_id
                .unwrap_or_else(|| format!("s{:04}", self.session_counter));
            if is_error {
                events.push(VmEvent::SessionFailed {
                    run_id: self.run_id.clone(),
                    session_id,
                    error: output_preview.clone(),
                });
            } else {
                events.push(VmEvent::SessionCompleted {
                    run_id: self.run_id.clone(),
                    session_id,
                    output_preview: output_preview.clone(),
                });
            }
        }
    }

    events.push(VmEvent::ToolResult {
        run_id: self.run_id.clone(),
        tool_id: tool_use_id.to_string(),
        is_error,
        output_preview,
    });

    events
}

fn extract_narration(&self, text: &str) -> Vec<VmEvent> {
    let mut events = Vec::new();

    // Look for [Position], [Binding], [Thought] markers
    for line in text.lines() {
        let line = line.trim();

        if line.starts_with("[Binding]") {
            let rest = line.strip_prefix("[Binding]").unwrap_or("").trim();
            if let Some(name) = rest.split_whitespace().next() {
                events.push(VmEvent::BindingCreated {
                    run_id: self.run_id.clone(),
                    name: name.to_string(),
                    path: format!("bindings/{}.md", name),
                });
            }
        }
    }

    events
}

}

fn extract_result_preview(content: Option<&Value>) -> String { // Use a larger limit so expanded view shows meaningful content const MAX_OUTPUT: usize = 4000; match content { None => String::new(), Some(Value::String(text)) => truncate(text, MAX_OUTPUT), Some(Value::Array(items)) => { let mut parts = Vec::new(); for item in items { if let Some(text) = item.get("text").and_then(Value::as_str) { if !text.is_empty() { parts.push(text.to_string()); } } } truncate(&parts.join("\n"), MAX_OUTPUT) } Some(other) => truncate(&other.to_string(), MAX_OUTPUT), } }

fn truncate(s: &str, max: usize) -> String { if s.len() <= max { s.to_string() } else { // Find a valid UTF-8 boundary at or before max let mut end = max; while end > 0 && !s.is_char_boundary(end) { end -= 1; } format!("{}...", &s[..end]) } }

fn generate_run_id() -> String { use chrono::Utc; let now = Utc::now(); let random: String = uuid::Uuid::new_v4().to_string()[..6].to_string(); format!("{}-{}", now.format("%Y%m%d-%H%M%S"), random) }

#[cfg(test)] mod tests { use super::*;

#[test]
fn test_generate_run_id() {
    let id = generate_run_id();
    assert!(id.len() > 15); // YYYYMMDD-HHMMSS-random
    assert!(id.contains('-'));
}

#[test]
fn test_truncate() {
    assert_eq!(truncate("hello", 10), "hello");
    assert_eq!(truncate("hello world", 5), "hello...");
}

}


## File: ide/crates/prose-runtime/src/mcp_server.rs

//! MCP Server for VM tools (vm.ask_user, vm.suspend). //! //! This module provides an HTTP-based MCP server that Claude CLI can connect to //! for feedback blocks and puppet mode. Uses oneshot channels for suspension.

use axum::{ extract::State, http::StatusCode, routing::post, Json, Router, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{mpsc, oneshot, Mutex};

/// Events emitted by the MCP server to notify the UI. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum McpEvent { /// User question requested - UI should show prompt. AskUser { request_id: String, summary: String, question: Option, options: Vec, }, /// Puppet mode suspension requested. Suspend { request_id: String, session_id: String, context_summary: String, }, }

/// Response from UI for a pending request. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct McpResponse { pub request_id: String, pub response: Value, }

/// Shared state for the MCP server. #[derive(Default)] pub struct McpServerState { /// Pending requests waiting for UI response. pending: HashMap<String, oneshot::Sender>, }

/// MCP Server handle. pub struct McpServer { /// Port the server is running on. pub port: u16, /// Shared state for responding to requests. state: Arc<Mutex>, /// Shutdown signal. shutdown_tx: Option<oneshot::Sender<()>>, }

impl McpServer { /// Start the MCP server on a random available port. /// Returns the server handle and a receiver for MCP events. pub async fn start() -> anyhow::Result<(Self, mpsc::Receiver)> { let state = Arc::new(Mutex::new(McpServerState::default())); let (event_tx, event_rx) = mpsc::channel::(100); let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();

    let app_state = AppState {
        state: state.clone(),
        event_tx,
    };

    let app = Router::new()
        .route("/mcp", post(handle_mcp_request))
        .with_state(app_state);

    // Bind to random port
    let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
    let port = listener.local_addr()?.port();

    // Spawn server task
    tokio::spawn(async move {
        axum::serve(listener, app)
            .with_graceful_shutdown(async {
                let _ = shutdown_rx.await;
            })
            .await
            .ok();
    });

    Ok((
        Self {
            port,
            state,
            shutdown_tx: Some(shutdown_tx),
        },
        event_rx,
    ))
}

/// Get the MCP config JSON for Claude CLI.
pub fn mcp_config(&self) -> Value {
    json!({
        "mcpServers": {
            "mission-control": {
                "command": "curl",
                "args": ["-s", "-X", "POST", "-H", "Content-Type: application/json",
                         "-d", "@-", format!("http://127.0.0.1:{}/mcp", self.port)]
            }
        }
    })
}

/// Respond to a pending request.
pub async fn respond(&self, request_id: &str, response: Value) -> anyhow::Result<()> {
    let mut state = self.state.lock().await;
    if let Some(tx) = state.pending.remove(request_id) {
        tx.send(response).map_err(|_| anyhow::anyhow!("Response channel closed"))?;
    }
    Ok(())
}

/// Shutdown the server.
pub fn shutdown(&mut self) {
    if let Some(tx) = self.shutdown_tx.take() {
        let _ = tx.send(());
    }
}

}

impl Drop for McpServer { fn drop(&mut self) { self.shutdown(); } }

#[derive(Clone)] struct AppState { state: Arc<Mutex>, event_tx: mpsc::Sender, }

/// MCP JSON-RPC request. #[derive(Debug, Deserialize)] struct McpRequest { #[allow(dead_code)] jsonrpc: String, id: Value, method: String, #[serde(default)] params: Value, }

/// MCP JSON-RPC response. #[derive(Debug, Serialize)] struct McpRpcResponse { jsonrpc: String, id: Value, #[serde(skip_serializing_if = "Option::is_none")] result: Option, #[serde(skip_serializing_if = "Option::is_none")] error: Option, }

#[derive(Debug, Serialize)] struct McpError { code: i32, message: String, }

async fn handle_mcp_request( State(app_state): State, Json(request): Json, ) -> Result<Json, StatusCode> { let response = match request.method.as_str() { "initialize" => { // MCP initialization McpRpcResponse { jsonrpc: "2.0".to_string(), id: request.id, result: Some(json!({ "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "mission-control", "version": "0.1.0" } })), error: None, } } "tools/list" => { // List available tools McpRpcResponse { jsonrpc: "2.0".to_string(), id: request.id, result: Some(json!({ "tools": [ { "name": "vm_ask_user", "description": "Ask the user a question and wait for their response. Use for feedback blocks.", "inputSchema": { "type": "object", "properties": { "summary": { "type": "string", "description": "Brief summary of current state/progress" }, "question": { "type": "string", "description": "The question to ask the user" }, "options": { "type": "array", "items": { "type": "string" }, "description": "Optional list of choices for the user" } }, "required": ["summary"] } }, { "name": "vm_suspend", "description": "Suspend execution and let the human take control (puppet mode).", "inputSchema": { "type": "object", "properties": { "session_id": { "type": "string", "description": "The session to suspend" }, "context_summary": { "type": "string", "description": "Summary of current context for the human" } }, "required": ["session_id", "context_summary"] } } ] })), error: None, } } "tools/call" => { // Execute a tool let tool_name = request.params.get("name").and_then(Value::as_str).unwrap_or(""); let arguments = request.params.get("arguments").cloned().unwrap_or(json!({}));

        match tool_name {
            "vm_ask_user" => {
                handle_ask_user(&app_state, request.id, arguments).await
            }
            "vm_suspend" => {
                handle_suspend(&app_state, request.id, arguments).await
            }
            _ => {
                McpRpcResponse {
                    jsonrpc: "2.0".to_string(),
                    id: request.id,
                    result: None,
                    error: Some(McpError {
                        code: -32601,
                        message: format!("Unknown tool: {}", tool_name),
                    }),
                }
            }
        }
    }
    _ => {
        McpRpcResponse {
            jsonrpc: "2.0".to_string(),
            id: request.id,
            result: None,
            error: Some(McpError {
                code: -32601,
                message: format!("Method not found: {}", request.method),
            }),
        }
    }
};

Ok(Json(response))

}

async fn handle_ask_user( app_state: &AppState, id: Value, arguments: Value, ) -> McpRpcResponse { let summary = arguments.get("summary").and_then(Value::as_str).unwrap_or("").to_string(); let question = arguments.get("question").and_then(Value::as_str).map(String::from); let options: Vec = arguments .get("options") .and_then(Value::as_array) .map(|arr| arr.iter().filter_map(Value::as_str).map(String::from).collect()) .unwrap_or_default();

let request_id = uuid::Uuid::new_v4().to_string();

// Create oneshot channel for response
let (tx, rx) = oneshot::channel::<Value>();

// Register pending request
{
    let mut state = app_state.state.lock().await;
    state.pending.insert(request_id.clone(), tx);
}

// Emit event to UI
let _ = app_state.event_tx.send(McpEvent::AskUser {
    request_id: request_id.clone(),
    summary,
    question,
    options,
}).await;

// Wait for response (with timeout)
match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
    Ok(Ok(response)) => {
        McpRpcResponse {
            jsonrpc: "2.0".to_string(),
            id,
            result: Some(json!({
                "content": [{
                    "type": "text",
                    "text": response.to_string()
                }]
            })),
            error: None,
        }
    }
    Ok(Err(_)) => {
        // Channel closed (request cancelled)
        McpRpcResponse {
            jsonrpc: "2.0".to_string(),
            id,
            result: Some(json!({
                "content": [{
                    "type": "text",
                    "text": "Request cancelled by user"
                }],
                "isError": true
            })),
            error: None,
        }
    }
    Err(_) => {
        // Timeout
        // Clean up pending request
        {
            let mut state = app_state.state.lock().await;
            state.pending.remove(&request_id);
        }
        McpRpcResponse {
            jsonrpc: "2.0".to_string(),
            id,
            result: Some(json!({
                "content": [{
                    "type": "text",
                    "text": "Request timed out (5 minutes)"
                }],
                "isError": true
            })),
            error: None,
        }
    }
}

}

async fn handle_suspend( app_state: &AppState, id: Value, arguments: Value, ) -> McpRpcResponse { let session_id = arguments.get("session_id").and_then(Value::as_str).unwrap_or("unknown").to_string(); let context_summary = arguments.get("context_summary").and_then(Value::as_str).unwrap_or("").to_string();

let request_id = uuid::Uuid::new_v4().to_string();

// Create oneshot channel for response
let (tx, rx) = oneshot::channel::<Value>();

// Register pending request
{
    let mut state = app_state.state.lock().await;
    state.pending.insert(request_id.clone(), tx);
}

// Emit event to UI
let _ = app_state.event_tx.send(McpEvent::Suspend {
    request_id: request_id.clone(),
    session_id,
    context_summary,
}).await;

// Wait for response (puppet mode can be longer - 30 minutes)
match tokio::time::timeout(std::time::Duration::from_secs(1800), rx).await {
    Ok(Ok(response)) => {
        // Response contains action and possibly injected content
        let action = response.get("action").and_then(Value::as_str).unwrap_or("continue");
        let injected = response.get("response").and_then(Value::as_str);

        let result_text = if action == "inject" && injected.is_some() {
            format!("Human took control and provided: {}", injected.unwrap())
        } else {
            "Human released control. Continue execution.".to_string()
        };

        McpRpcResponse {
            jsonrpc: "2.0".to_string(),
            id,
            result: Some(json!({
                "content": [{
                    "type": "text",
                    "text": result_text
                }]
            })),
            error: None,
        }
    }
    Ok(Err(_)) => {
        McpRpcResponse {
            jsonrpc: "2.0".to_string(),
            id,
            result: Some(json!({
                "content": [{
                    "type": "text",
                    "text": "Puppet mode cancelled"
                }],
                "isError": true
            })),
            error: None,
        }
    }
    Err(_) => {
        {
            let mut state = app_state.state.lock().await;
            state.pending.remove(&request_id);
        }
        McpRpcResponse {
            jsonrpc: "2.0".to_string(),
            id,
            result: Some(json!({
                "content": [{
                    "type": "text",
                    "text": "Puppet mode timed out (30 minutes)"
                }],
                "isError": true
            })),
            error: None,
        }
    }
}

}

#[cfg(test)] mod tests { use super::*;

#[tokio::test]
async fn test_mcp_server_starts() {
    let (server, _event_rx) = McpServer::start().await.unwrap();
    assert!(server.port > 0);
    println!("MCP server started on port {}", server.port);
}

}


## File: ide/crates/prose-runtime/src/claude.rs

//! Claude Code CLI integration. //! //! Spawns claude CLI with stream-json output and parses events.

use anyhow::Result; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; use std::process::Stdio; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; use tokio::sync::mpsc;

/// Events emitted from Claude Code execution. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum ClaudeEvent { Started { session_id: String, model: Option, }, Message { text: String, }, ToolUse { id: String, name: String, input: Value, }, ToolResult { id: String, is_error: bool, result_preview: String, }, Completed { ok: bool, answer: String, resume_id: Option, error: Option, }, Error { message: String, }, }

/// Configuration for Claude invocation. #[derive(Debug, Clone)] pub struct ClaudeConfig { /// Model to use (sonnet, opus, haiku). pub model: String, /// Working directory. pub cwd: std::path::PathBuf, /// Resume token from previous session. pub resume_id: Option, /// Skip permission prompts (dangerous). pub skip_permissions: bool, }

impl Default for ClaudeConfig { fn default() -> Self { Self { model: "sonnet".to_string(), cwd: std::env::current_dir().unwrap_or_default(), resume_id: None, skip_permissions: false, } } }

/// State for parsing Claude's stream-json output. #[derive(Default)] struct ClaudeParser { resume: Option, pending: HashMap<String, Value>, }

/// Run Claude Code with a prompt and stream events. pub async fn run_claude( prompt: &str, config: ClaudeConfig, event_tx: mpsc::Sender, ) -> Result<()> { let mut args = vec![ "-p".to_string(), "--output-format".to_string(), "stream-json".to_string(), "--verbose".to_string(), ];

if config.skip_permissions {
    args.push("--dangerously-skip-permissions".to_string());
}

args.push("--model".to_string());
args.push(config.model.clone());

if let Some(ref resume) = config.resume_id {
    args.push("--resume".to_string());
    args.push(resume.clone());
}

args.push("--".to_string());
args.push(prompt.to_string());

let mut child = Command::new("claude")
    .args(&args)
    .current_dir(&config.cwd)
    .stdout(Stdio::piped())
    .stderr(Stdio::piped())
    .spawn()?;

let stdout = child
    .stdout
    .take()
    .ok_or_else(|| anyhow::anyhow!("Failed to capture stdout"))?;

let stderr = child.stderr.take();

let event_tx_clone = event_tx.clone();
let stdout_task = tokio::spawn(async move {
    let mut reader = BufReader::new(stdout).lines();
    let mut parser = ClaudeParser::default();

    while let Ok(Some(line)) = reader.next_line().await {
        let value: Value = match serde_json::from_str(&line) {
            Ok(v) => v,
            Err(_) => continue,
        };

        if let Some(events) = parse_claude_event(&value, &mut parser) {
            for event in events {
                if event_tx_clone.send(event).await.is_err() {
                    break;
                }
            }
        }
    }
    parser.resume
});

// Capture stderr for error messages
let stderr_task = if let Some(stderr) = stderr {
    let event_tx_clone = event_tx.clone();
    Some(tokio::spawn(async move {
        let mut reader = BufReader::new(stderr).lines();
        let mut stderr_lines = Vec::new();
        while let Ok(Some(line)) = reader.next_line().await {
            tracing::debug!("claude stderr: {}", line);
            stderr_lines.push(line);
        }
        // If there's an error, send it
        if !stderr_lines.is_empty() {
            let _ = event_tx_clone
                .send(ClaudeEvent::Error {
                    message: stderr_lines.join("\n"),
                })
                .await;
        }
    }))
} else {
    None
};

let exit_status = child.wait().await?;
let resume = stdout_task.await?;
if let Some(task) = stderr_task {
    let _ = task.await;
}

let ok = exit_status.success();
if !ok {
    let _ = event_tx
        .send(ClaudeEvent::Completed {
            ok: false,
            answer: String::new(),
            resume_id: resume,
            error: Some(format!("Claude exited with code {:?}", exit_status.code())),
        })
        .await;
}

Ok(())

}

fn parse_claude_event(value: &Value, state: &mut ClaudeParser) -> Option<Vec> { let event_type = value.get("type")?.as_str()?;

match event_type {
    "system" => {
        if value.get("subtype").and_then(Value::as_str) != Some("init") {
            return Some(vec![]);
        }
        let session_id = value.get("session_id").and_then(Value::as_str)?;
        state.resume = Some(session_id.to_string());
        let model = value.get("model").and_then(Value::as_str).map(String::from);
        Some(vec![ClaudeEvent::Started {
            session_id: session_id.to_string(),
            model,
        }])
    }
    "assistant" => {
        let message = value.get("message").and_then(Value::as_object)?;
        let content = message.get("content").and_then(Value::as_array)?;
        let mut events = Vec::new();
        let mut text_parts = Vec::new();

        for block in content {
            let block_type = block.get("type").and_then(Value::as_str).unwrap_or("");
            match block_type {
                "tool_use" => {
                    let tool_id = block.get("id").and_then(Value::as_str)?;
                    let name = block
                        .get("name")
                        .and_then(Value::as_str)
                        .unwrap_or("tool");
                    let input = block.get("input").cloned().unwrap_or(Value::Null);

                    state.pending.insert(tool_id.to_string(), block.clone());
                    events.push(ClaudeEvent::ToolUse {
                        id: tool_id.to_string(),
                        name: name.to_string(),
                        input,
                    });
                }
                "tool_result" => {
                    let tool_use_id = block.get("tool_use_id").and_then(Value::as_str)?;
                    let is_error =
                        block.get("is_error").and_then(Value::as_bool) == Some(true);
                    let preview = result_preview(block.get("content"));
                    state.pending.remove(tool_use_id);
                    events.push(ClaudeEvent::ToolResult {
                        id: tool_use_id.to_string(),
                        is_error,
                        result_preview: preview,
                    });
                }
                "text" => {
                    if let Some(text) = block.get("text").and_then(Value::as_str) {
                        text_parts.push(text.to_string());
                    }
                }
                _ => {}
            }
        }

        if !text_parts.is_empty() {
            events.push(ClaudeEvent::Message {
                text: text_parts.join("\n"),
            });
        }

        Some(events)
    }
    "result" => {
        let ok = value.get("is_error").and_then(Value::as_bool) != Some(true);
        let answer = value
            .get("result")
            .and_then(Value::as_str)
            .unwrap_or("")
            .to_string();
        let error = if ok { None } else { Some(answer.clone()) };
        Some(vec![ClaudeEvent::Completed {
            ok,
            answer,
            resume_id: state.resume.clone(),
            error,
        }])
    }
    _ => None,
}

}

fn result_preview(content: Option<&Value>) -> String { match content { None => String::new(), Some(Value::String(text)) => truncate(text, 500), Some(Value::Array(items)) => { let mut parts = Vec::new(); for item in items { if let Some(text) = item.get("text").and_then(Value::as_str) { if !text.is_empty() { parts.push(text.to_string()); } } } truncate(&parts.join("\n"), 500) } Some(Value::Object(obj)) => obj .get("text") .and_then(Value::as_str) .map(|s| truncate(s, 500)) .unwrap_or_default(), Some(other) => truncate(&other.to_string(), 500), } }

fn truncate(s: &str, max: usize) -> String { if s.len() <= max { s.to_string() } else { // Find a valid UTF-8 boundary at or before max let mut end = max; while end > 0 && !s.is_char_boundary(end) { end -= 1; } format!("{}...", &s[..end]) } }

#[cfg(test)] mod tests { use super::*;

#[test]
fn test_parse_system_init() {
    let json = r#"{"type":"system","subtype":"init","session_id":"abc123","model":"claude-sonnet"}"#;
    let value: Value = serde_json::from_str(json).unwrap();
    let mut state = ClaudeParser::default();
    let events = parse_claude_event(&value, &mut state).unwrap();
    assert_eq!(events.len(), 1);
    match &events[0] {
        ClaudeEvent::Started { session_id, model } => {
            assert_eq!(session_id, "abc123");
            assert_eq!(model.as_deref(), Some("claude-sonnet"));
        }
        _ => panic!("Expected Started event"),
    }
    assert_eq!(state.resume, Some("abc123".to_string()));
}

}


## File: ide/crates/prose-runtime/src/executor.rs

use crate::claude::{run_claude, ClaudeConfig, ClaudeEvent}; use crate::suspension::{ExecutionState, ResumeData, SuspensionPoint}; use anyhow::Result; use prose_core::ast::{FeedbackBlock, Program, SessionHead, Stmt}; use prose_core::ir::{lower_program, IrProgram, IrStmt}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{mpsc, oneshot, Mutex};

/// Configuration for the executor. #[derive(Debug, Clone)] pub struct ExecutorConfig { /// Enable puppet mode (allow human takeover of sessions). pub puppet_mode_enabled: bool, /// Working directory for file operations. pub workspace_root: std::path::PathBuf, /// Default model for sessions (sonnet, opus, haiku). pub model: String, /// Skip permission prompts (dangerous, for automated runs). pub skip_permissions: bool, /// Use mock execution instead of real Claude (for testing). pub mock_execution: bool, }

impl Default for ExecutorConfig { fn default() -> Self { Self { puppet_mode_enabled: false, workspace_root: std::env::current_dir().unwrap_or_default(), model: "sonnet".to_string(), skip_permissions: false, mock_execution: false, } } }

/// Event emitted during execution. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "event_type")] pub enum RuntimeEvent { RunStarted { run_id: String, program_file: String, }, SessionStarted { session_id: String, agent: Option, }, SessionCompleted { session_id: String, output: Option, tokens_used: u64, }, BindingCreated { name: String, value_preview: String, }, FeedbackRequested { event_id: String, summary: String, question: Option, options: Vec, }, FeedbackProvided { event_id: String, selected_option: Option, user_input: Option, }, PuppetModeEntered { session_id: String, }, PuppetModeInput { session_id: String, human_response: String, }, PuppetModeExited { session_id: String, }, RunCompleted { run_id: String, success: bool, }, Error { message: String, }, }

/// Handle to control a running execution. pub struct ExecutionHandle { /// Receives events from the executor. pub events: mpsc::Receiver, /// Current state (may be suspended). Public to allow RPC server to share state. pub state: Arc<Mutex>, }

impl ExecutionHandle { /// Resume execution from a suspension point. pub async fn resume(&self, data: ResumeData) -> Result<()> { let mut state = self.state.lock().await; match std::mem::replace(&mut *state, ExecutionState::Running) { ExecutionState::Suspended { resume_tx, .. } => { let _ = resume_tx.send(data); Ok(()) } other => { *state = other; anyhow::bail!("Not suspended"); } } }

/// Check if execution is suspended.
pub async fn get_suspension(&self) -> Option<SuspensionPoint> {
    let state = self.state.lock().await;
    match &*state {
        ExecutionState::Suspended { point, .. } => Some(point.clone()),
        _ => None,
    }
}

}

/// The main execution engine. pub struct Executor { config: ExecutorConfig, }

impl Executor { pub fn new(config: ExecutorConfig) -> Self { Self { config } }

/// Run a program and return a handle for control.
pub async fn run(&self, program: Program) -> Result<ExecutionHandle> {
    let ir = lower_program(&program)?;
    let (event_tx, event_rx) = mpsc::channel(100);
    let state = Arc::new(Mutex::new(ExecutionState::Running));

    let run_id = format!("run_{}", chrono_lite_id());
    event_tx
        .send(RuntimeEvent::RunStarted {
            run_id: run_id.clone(),
            program_file: "input".to_string(),
        })
        .await?;

    let state_clone = state.clone();
    let event_tx_clone = event_tx.clone();
    let config = self.config.clone();

    tokio::spawn(async move {
        let result = execute_ir(&ir, &config, &event_tx_clone, &state_clone).await;
        let success = result.is_ok();

        if let Err(e) = &result {
            let _ = event_tx_clone
                .send(RuntimeEvent::Error {
                    message: e.to_string(),
                })
                .await;
        }

        let _ = event_tx_clone
            .send(RuntimeEvent::RunCompleted {
                run_id,
                success,
            })
            .await;

        let mut state = state_clone.lock().await;
        *state = if success {
            ExecutionState::Completed
        } else {
            ExecutionState::Failed(result.unwrap_err())
        };
    });

    Ok(ExecutionHandle {
        events: event_rx,
        state,
    })
}

}

struct ExecCtx<'a> { config: &'a ExecutorConfig, events: &'a mpsc::Sender, state: &'a Arc<Mutex>, vars: HashMap<String, String>, session_counter: usize, }

async fn execute_ir( ir: &IrProgram, config: &ExecutorConfig, events: &mpsc::Sender, state: &Arc<Mutex>, ) -> Result<()> { let mut ctx = ExecCtx { config, events, state, vars: HashMap::new(), session_counter: 0, };

for stmt in &ir.body {
    execute_ir_stmt(stmt, &mut ctx).await?;
}

Ok(())

}

async fn execute_ir_stmt(stmt: &IrStmt, ctx: &mut ExecCtx<'>) -> Result<()> { match stmt { IrStmt::Input() => { // Inputs should already be bound from CLI args or environment } IrStmt::Session(ir_session) => { let session_id = ir_session.id.clone(); let agent = match &ir_session.stmt.head { SessionHead::Agent(a) => Some(a.clone()), SessionHead::NamedAgent { agent, .. } => Some(agent.clone()), SessionHead::Prompt(_) => None, };

        ctx.events
            .send(RuntimeEvent::SessionStarted {
                session_id: session_id.clone(),
                agent: agent.clone(),
            })
            .await?;

        // Check if puppet mode should be triggered
        if ctx.config.puppet_mode_enabled {
            let (resume_tx, resume_rx) = oneshot::channel();
            {
                let mut s = ctx.state.lock().await;
                *s = ExecutionState::Suspended {
                    point: SuspensionPoint::PuppetMode {
                        session_id: session_id.clone(),
                        agent: agent.clone().unwrap_or_default(),
                        context_summary: "Session context".to_string(),
                        last_output: None,
                    },
                    resume_tx,
                };
            }

            ctx.events
                .send(RuntimeEvent::PuppetModeEntered {
                    session_id: session_id.clone(),
                })
                .await?;

            // Wait for human input
            let resume_data = resume_rx.await?;

            match resume_data {
                ResumeData::PuppetResponse { human_response } => {
                    ctx.events
                        .send(RuntimeEvent::PuppetModeInput {
                            session_id: session_id.clone(),
                            human_response: human_response.clone(),
                        })
                        .await?;
                }
                ResumeData::PuppetRelease => {
                    // Human released control, continue with normal execution
                }
                ResumeData::Cancel => {
                    anyhow::bail!("Execution cancelled");
                }
                ResumeData::FeedbackResponse { .. } => {
                    // Unexpected feedback response in puppet mode context
                }
            }

            ctx.events
                .send(RuntimeEvent::PuppetModeExited {
                    session_id: session_id.clone(),
                })
                .await?;
        }

        // Execute the session with Claude
        let output = if ctx.config.mock_execution {
            // Mock mode for testing
            "[Mock session output]".to_string()
        } else {
            // Build prompt from session head
            let prompt = match &ir_session.stmt.head {
                SessionHead::Agent(agent) => format!("You are acting as agent: {}", agent),
                SessionHead::NamedAgent { agent, .. } => format!("You are acting as agent: {}", agent),
                SessionHead::Prompt(p) => p.clone(),
            };

            // Call Claude Code
            let (claude_tx, mut claude_rx) = mpsc::channel::<ClaudeEvent>(100);
            let claude_config = ClaudeConfig {
                model: ctx.config.model.clone(),
                cwd: ctx.config.workspace_root.clone(),
                resume_id: None,
                skip_permissions: ctx.config.skip_permissions,
            };

            let events_clone = ctx.events.clone();

            // Spawn Claude execution
            let claude_handle = tokio::spawn(async move {
                run_claude(&prompt, claude_config, claude_tx).await
            });

            // Process Claude events
            let mut answer = String::new();
            while let Some(event) = claude_rx.recv().await {
                match event {
                    ClaudeEvent::Message { text } => {
                        answer.push_str(&text);
                    }
                    ClaudeEvent::ToolUse { id, name, input } => {
                        let _ = events_clone
                            .send(RuntimeEvent::BindingCreated {
                                name: format!("tool:{}:{}", name, id),
                                value_preview: truncate(&input.to_string(), 100),
                            })
                            .await;
                    }
                    ClaudeEvent::Completed { ok, answer: final_answer, error, .. } => {
                        if !ok {
                            if let Some(err) = error {
                                let _ = events_clone
                                    .send(RuntimeEvent::Error { message: err })
                                    .await;
                            }
                        }
                        answer = final_answer;
                    }
                    ClaudeEvent::Error { message } => {
                        let _ = events_clone
                            .send(RuntimeEvent::Error { message })
                            .await;
                    }
                    _ => {}
                }
            }

            // Wait for Claude to finish
            let _ = claude_handle.await;
            answer
        };

        ctx.events
            .send(RuntimeEvent::SessionCompleted {
                session_id,
                output: Some(output),
                tokens_used: 100, // TODO: get from Claude usage
            })
            .await?;
    }
    IrStmt::Let(b) | IrStmt::Const(b) | IrStmt::Assign(b) | IrStmt::Output(b) => {
        let value = format!("[value of {}]", b.name);
        ctx.vars.insert(b.name.clone(), value.clone());

        ctx.events
            .send(RuntimeEvent::BindingCreated {
                name: b.name.clone(),
                value_preview: truncate(&value, 50),
            })
            .await?;
    }
    IrStmt::Feedback(f) => {
        execute_feedback(f, ctx).await?;
    }
    IrStmt::Parallel(p) => {
        for br in &p.branches {
            execute_ast_stmt(&br.stmt, ctx).await?;
        }
    }
    IrStmt::Repeat(r) => {
        for i in 0..r.times {
            if let Some(idx) = &r.index {
                ctx.vars.insert(idx.clone(), i.to_string());
            }
            for s in &r.body {
                execute_ast_stmt(s, ctx).await?;
            }
        }
    }
    IrStmt::ForEach(f) => {
        let mock_items = vec!["item1", "item2", "item3"];
        for (i, item) in mock_items.iter().enumerate() {
            ctx.vars.insert(f.item.clone(), item.to_string());
            if let Some(idx) = &f.index {
                ctx.vars.insert(idx.clone(), i.to_string());
            }
            for s in &f.body {
                execute_ast_stmt(s, ctx).await?;
            }
        }
    }
    IrStmt::Loop(l) => {
        let max = l.max.unwrap_or(10);
        for i in 0..max {
            if let Some(idx) = &l.index {
                ctx.vars.insert(idx.clone(), i.to_string());
            }
            for s in &l.body {
                execute_ast_stmt(s, ctx).await?;
            }
        }
    }
    IrStmt::Try(t) => {
        let result: Result<()> = async {
            for s in &t.body {
                execute_ast_stmt(s, ctx).await?;
            }
            Ok(())
        }
        .await;

        if let Err(e) = result {
            if let Some(c) = &t.catch {
                if let Some(err_var) = &c.err {
                    ctx.vars.insert(err_var.clone(), e.to_string());
                }
                for s in &c.body {
                    execute_ast_stmt(s, ctx).await?;
                }
            }
        }

        if let Some(finally) = &t.finally {
            for s in finally {
                execute_ast_stmt(s, ctx).await?;
            }
        }
    }
    IrStmt::Choice(c) => {
        if let Some(opt) = c.options.first() {
            for s in &opt.body {
                execute_ast_stmt(s, ctx).await?;
            }
        }
    }
    IrStmt::If(i) => {
        for s in &i.then_body {
            execute_ast_stmt(s, ctx).await?;
        }
    }
    IrStmt::Do(d) => {
        for s in &d.body {
            execute_ast_stmt(s, ctx).await?;
        }
    }
    IrStmt::Throw(t) => {
        let msg = t.message.clone().unwrap_or_else(|| "Throw".to_string());
        anyhow::bail!("{}", msg);
    }
    IrStmt::Raw(r) => {
        for s in &r.body {
            execute_ast_stmt(s, ctx).await?;
        }
    }
    IrStmt::Resume(_) | IrStmt::OutputExpr(_) => {}
}

Ok(())

}

/// Execute an AST Stmt (found inside IR block bodies). fn execute_ast_stmt<'a>( stmt: &'a Stmt, ctx: &'a mut ExecCtx<'>, ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> { Box::pin(async move { match stmt { Stmt::Input() | Stmt::Agent() | Stmt::Block() | Stmt::Use() => {} Stmt::Session(s) => { ctx.session_counter += 1; let session_id = format!("s{:04}", ctx.session_counter); let agent = match &s.head { SessionHead::Agent(a) => Some(a.clone()), SessionHead::NamedAgent { agent, .. } => Some(agent.clone()), SessionHead::Prompt() => None, };

        ctx.events
            .send(RuntimeEvent::SessionStarted {
                session_id: session_id.clone(),
                agent,
            })
            .await?;

        ctx.events
            .send(RuntimeEvent::SessionCompleted {
                session_id,
                output: Some("[Mock session output]".to_string()),
                tokens_used: 100,
            })
            .await?;
    }
    Stmt::Resume(_) => {}
    Stmt::Let(b) | Stmt::Const(b) | Stmt::Assign(b) | Stmt::Output(b) => {
        let value = format!("[value of {}]", b.name);
        ctx.vars.insert(b.name.clone(), value.clone());

        ctx.events
            .send(RuntimeEvent::BindingCreated {
                name: b.name.clone(),
                value_preview: truncate(&value, 50),
            })
            .await?;
    }
    Stmt::OutputExpr(_) => {}
    Stmt::Parallel(p) => {
        for br in &p.branches {
            execute_ast_stmt(&br.stmt, ctx).await?;
        }
    }
    Stmt::Repeat(r) => {
        for i in 0..r.times {
            if let Some(idx) = &r.index {
                ctx.vars.insert(idx.clone(), i.to_string());
            }
            for s in &r.body {
                execute_ast_stmt(s, ctx).await?;
            }
        }
    }
    Stmt::ForEach(f) => {
        let mock_items = vec!["item1", "item2", "item3"];
        for (i, item) in mock_items.iter().enumerate() {
            ctx.vars.insert(f.item.clone(), item.to_string());
            if let Some(idx) = &f.index {
                ctx.vars.insert(idx.clone(), i.to_string());
            }
            for s in &f.body {
                execute_ast_stmt(s, ctx).await?;
            }
        }
    }
    Stmt::Loop(l) => {
        let max = l.max.unwrap_or(10);
        for i in 0..max {
            if let Some(idx) = &l.index {
                ctx.vars.insert(idx.clone(), i.to_string());
            }
            for s in &l.body {
                execute_ast_stmt(s, ctx).await?;
            }
        }
    }
    Stmt::Try(t) => {
        let result: Result<()> = async {
            for s in &t.body {
                execute_ast_stmt(s, ctx).await?;
            }
            Ok(())
        }
        .await;

        if let Err(e) = result {
            if let Some(c) = &t.catch {
                if let Some(err_var) = &c.err {
                    ctx.vars.insert(err_var.clone(), e.to_string());
                }
                for s in &c.body {
                    execute_ast_stmt(s, ctx).await?;
                }
            }
        }

        if let Some(finally) = &t.finally {
            for s in finally {
                execute_ast_stmt(s, ctx).await?;
            }
        }
    }
    Stmt::Choice(c) => {
        if let Some(opt) = c.options.first() {
            for s in &opt.body {
                execute_ast_stmt(s, ctx).await?;
            }
        }
    }
    Stmt::If(i) => {
        for s in &i.then_body {
            execute_ast_stmt(s, ctx).await?;
        }
    }
    Stmt::Do(d) => {
        for s in &d.body {
            execute_ast_stmt(s, ctx).await?;
        }
    }
    Stmt::Throw(t) => {
        let msg = t.message.clone().unwrap_or_else(|| "Throw".to_string());
        anyhow::bail!("{}", msg);
    }
    Stmt::Raw(r) => {
        for s in &r.body {
            execute_ast_stmt(s, ctx).await?;
        }
    }
    Stmt::Feedback(f) => {
        execute_feedback(f, ctx).await?;
    }
}

Ok(())
})

}

async fn execute_feedback(f: &FeedbackBlock, ctx: &mut ExecCtx<'>) -> Result<()> { let event_id = format!("fb{}", chrono_lite_id());

ctx.events
    .send(RuntimeEvent::FeedbackRequested {
        event_id: event_id.clone(),
        summary: f.summary.clone(),
        question: f.question.clone(),
        options: f.options.clone(),
    })
    .await?;

let (resume_tx, resume_rx) = oneshot::channel();
{
    let mut s = ctx.state.lock().await;
    *s = ExecutionState::Suspended {
        point: SuspensionPoint::Feedback {
            event_id: event_id.clone(),
            summary: f.summary.clone(),
            question: f.question.clone(),
            options: f.options.clone(),
            display_binding: f.display.clone(),
            display_value: f.display.as_ref().and_then(|d| ctx.vars.get(d).cloned()),
        },
        resume_tx,
    };
}

let resume_data = resume_rx.await?;

match resume_data {
    ResumeData::FeedbackResponse {
        selected_option,
        user_input,
    } => {
        ctx.events
            .send(RuntimeEvent::FeedbackProvided {
                event_id,
                selected_option,
                user_input,
            })
            .await?;
    }
    ResumeData::Cancel => {
        anyhow::bail!("Execution cancelled at feedback point");
    }
    _ => {}
}

{
    let mut s = ctx.state.lock().await;
    *s = ExecutionState::Running;
}

Ok(())

}

fn truncate(s: &str, max: usize) -> String { if s.len() <= max { s.to_string() } else { // Find a valid UTF-8 boundary at or before max let mut end = max; while end > 0 && !s.is_char_boundary(end) { end -= 1; } format!("{}...", &s[..end]) } }

fn chrono_lite_id() -> String { use std::time::{SystemTime, UNIX_EPOCH}; let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis(); format!("{:x}", now) }


## File: ide/apps/mission-control/src/lib/api.ts

import { invoke } from "@tauri-apps/api/core"; import type { CheckpointInfo, RunSummary, TraceEvent } from "../types";

async function tauriInvoke(cmd: string, args?: Record<string, unknown>): Promise { return invoke(cmd, args); }

export const api = { listRuns: (workspaceRoot: string) => tauriInvoke<RunSummary[]>("list_runs", { workspaceRoot }),

readRunEvents: (workspaceRoot: string, runId: string) => tauriInvoke<TraceEvent[]>("read_run_events", { workspaceRoot, runId }),

readProgram: (workspaceRoot: string, runId: string) => tauriInvoke("read_run_program", { workspaceRoot, runId }),

readBinding: (workspaceRoot: string, runId: string, name: string) => tauriInvoke("read_run_binding", { workspaceRoot, runId, name }),

appendIntervention: (workspaceRoot: string, runId: string, note: string) => tauriInvoke("append_intervention", { workspaceRoot, runId, note }),

forkRun: (workspaceRoot: string, runId: string) => tauriInvoke("fork_run", { workspaceRoot, runId }),

blameSession: (workspaceRoot: string, runId: string, sessionId: string) => tauriInvoke("blame_session", { workspaceRoot, runId, sessionId }),

checkpoints: { list: (workspaceRoot: string) => tauriInvoke<CheckpointInfo[]>("checkpoint_list", { workspaceRoot }), create: (workspaceRoot: string, label?: string) => tauriInvoke("checkpoint_create", { workspaceRoot, label: label ?? null }), restore: (workspaceRoot: string, checkpointId: string) => tauriInvoke("checkpoint_restore", { workspaceRoot, checkpointId }), diff: (workspaceRoot: string, checkpointId: string) => tauriInvoke("checkpoint_diff", { workspaceRoot, checkpointId }), },

watch: { start: (workspaceRoot: string, runId: string) => tauriInvoke("watch_run", { workspaceRoot, runId }), stop: () => tauriInvoke("watch_stop"), },

// File operations saveFile: (filePath: string, content: string) => tauriInvoke("save_file", { filePath, content }),

openFile: (filePath: string) => tauriInvoke("open_file", { filePath }),

// Lint via prose CLI lint: async (content: string): Promise<{ success: boolean; message?: string; errors?: Array<{ line: number; col: number; message: string }>; }> => { try { const result = await tauriInvoke<{ success: boolean; message?: string; errors?: Array<{ line: number; col: number; message: string }>; }>("lint_prose", { content }); return result; } catch (err) { return { success: false, message: String(err) }; } },

// Run a prose program (legacy executor) runProse: async ( programContent: string, workspaceRoot?: string, model?: string, skipPermissions?: boolean ): Promise<{ run_id: string; success: boolean; message?: string; }> => { return tauriInvoke("run_prose", { programContent, workspaceRoot: workspaceRoot ?? null, model: model ?? null, skipPermissions: skipPermissions ?? null, }); },

// Run a prose program using Claude-as-VM approach (new!) runProseVm: async ( programContent: string, workspaceRoot?: string, model?: string, skipPermissions?: boolean, resumeId?: string, runId?: string ): Promise<{ run_id: string; success: boolean; message?: string; }> => { return tauriInvoke("run_prose_vm", { programContent, workspaceRoot: workspaceRoot ?? null, model: model ?? null, skipPermissions: skipPermissions ?? null, resumeId: resumeId ?? null, runId: runId ?? null, }); },

// List prose files in workspace listProseFiles: (workspaceRoot?: string) => tauriInvoke<string[]>("list_prose_files", { workspaceRoot: workspaceRoot ?? null }),

// Clear all runs in workspace clearRuns: (workspaceRoot?: string) => tauriInvoke("clear_runs", { workspaceRoot: workspaceRoot ?? null }),

// Delete a specific run deleteRun: (workspaceRoot: string, runId: string) => tauriInvoke("delete_run", { workspaceRoot, runId }),

// Submit feedback response to an MCP request (vm_ask_user or vm_suspend) submitFeedback: (requestId: string, response: { selected_option?: number; user_input?: string }) => tauriInvoke("submit_feedback", { requestId, response }), };

// WebSocket client for interactive runtime export type RuntimeEvent = { event_type: string; run_id?: string; session_id?: string; agent?: string; output?: string; tokens_used?: number; name?: string; value_preview?: string; event_id?: string; summary?: string; question?: string; options?: string[]; selected_option?: number; user_input?: string; human_response?: string; success?: boolean; message?: string; };

export type ResumeData = | { type: "FeedbackResponse"; selected_option?: number; user_input?: string } | { type: "PuppetResponse"; human_response: string } | { type: "PuppetRelease" } | { type: "Cancel" };

export class RuntimeConnection { private ws: WebSocket | null = null; private requestId = 0; private pendingRequests = new Map<number, { resolve: (v: unknown) => void; reject: (e: Error) => void }>();

onEvent?: (event: RuntimeEvent) => void; onConnect?: () => void; onDisconnect?: () => void;

connect(url: string = "ws://127.0.0.1:9999"): Promise { return new Promise((resolve, reject) => { this.ws = new WebSocket(url);

  this.ws.onopen = () => {
    this.onConnect?.();
    resolve();
  };

  this.ws.onerror = (event) => {
    reject(new Error(`WebSocket error: ${event.type}`));
  };

  this.ws.onclose = () => {
    // Reject all pending requests to prevent memory leaks
    for (const [id, { reject }] of this.pendingRequests) {
      reject(new Error("Connection closed"));
    }
    this.pendingRequests.clear();
    this.onDisconnect?.();
    this.ws = null;
  };

  this.ws.onmessage = (msg) => {
    try {
      const data = JSON.parse(msg.data);

      // Check if it's a response to a request
      if (data.id !== undefined && this.pendingRequests.has(data.id)) {
        const { resolve, reject } = this.pendingRequests.get(data.id)!;
        this.pendingRequests.delete(data.id);
        if (data.error) {
          reject(new Error(data.error.message));
        } else {
          resolve(data.result);
        }
        return;
      }

      // Otherwise it's a notification (event)
      if (data.method === "event" && data.params) {
        this.onEvent?.(data.params as RuntimeEvent);
      }
    } catch (e) {
      console.error("Failed to parse runtime message:", e);
    }
  };
});

}

disconnect(): void { this.ws?.close(); this.ws = null; }

private async rpc(method: string, params: Record<string, unknown>): Promise { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { throw new Error("Not connected"); }

const id = ++this.requestId;
return new Promise((resolve, reject) => {
  // Set up timeout that clears on success/failure
  const timeoutId = setTimeout(() => {
    if (this.pendingRequests.has(id)) {
      this.pendingRequests.delete(id);
      reject(new Error("Request timeout"));
    }
  }, 30000);

  this.pendingRequests.set(id, {
    resolve: (v: unknown) => {
      clearTimeout(timeoutId);
      resolve(v as T);
    },
    reject: (e: Error) => {
      clearTimeout(timeoutId);
      reject(e);
    },
  });

  this.ws!.send(JSON.stringify({ id, method, params }));
});

}

async run(source: string): Promise<{ status: string }> { return this.rpc("run", { source }); }

async resume(data: ResumeData): Promise<{ status: string }> { return this.rpc("resume", data); }

async cancel(): Promise<{ status: string }> { return this.rpc("cancel", {}); }

async getSuspension(): Promise<{ type: string; event_id?: string; session_id?: string; summary?: string; question?: string; options?: string[]; } | null> { return this.rpc("get_suspension", {}); } }


## File: ide/apps/mission-control/src/lib/trace.ts

import type { TraceEvent } from "../types";

export function timeShort(ts: string): string { const m = ts.match(/T(\d\d:\d\d:\d\d)/); return m ? m[1] : ts; }

export type IncludedBinding = { name: string; binding_id: string; token_count?: number; reason?: "explicit" | "inferred" | "required" | "memory"; reason_detail?: string; };

export type Citation = { binding_id: string; chunk_id?: string; excerpt?: string; };

export type ToolOutput = { tool: string; result: string; };

export type VerdictEvidence = { citations?: Citation[]; tool_outputs?: ToolOutput[]; };

export type VerdictRubric = { criteria?: string[]; thresholds?: Record<string, number>; };

export type Verdict = { event_id: string; event_type: string; condition?: string; result?: boolean; note?: string; rubric?: VerdictRubric; evidence?: VerdictEvidence; confidence?: number; uncertainties?: string[]; };

export type SessionView = { sessionId: string; head?: string; tokenEstimate?: number; includedBindings: IncludedBinding[]; producedBindings: { name?: string; binding_id: string }[]; events: TraceEvent[]; verdicts: Verdict[]; tokens?: number; cost?: number; promptPreview?: string; outputPreview?: string; };

export function sessionsFromEvents(events: TraceEvent[]): SessionView[] { const bySession = new Map<string, TraceEvent[]>(); for (const ev of events) { const sid = ev.session_id ?? "-"; const v = bySession.get(sid) ?? []; v.push(ev); bySession.set(sid, v); }

const out: SessionView[] = []; for (const [sessionId, sesEvents] of bySession.entries()) { sesEvents.sort((a, b) => a.timestamp.localeCompare(b.timestamp)); const view: SessionView = { sessionId, includedBindings: [], producedBindings: [], events: sesEvents, verdicts: [], }; for (const ev of sesEvents) { if (ev.event_type === "SessionPlanned" && typeof ev.head === "string") { view.head = ev.head; } if (ev.event_type === "SessionStarted") { if (typeof ev.prompt_preview === "string") { view.promptPreview = ev.prompt_preview; } } if (ev.event_type === "ContextAssembled") { if (typeof ev.token_estimate === "number") view.tokenEstimate = ev.token_estimate; const included = ev.included_bindings; if (Array.isArray(included)) { view.includedBindings = included .map((x) => { if (!x || typeof x !== "object") return null; const obj = x as Record<string, unknown>; const name = obj.name; const bindingId = obj.binding_id; if (typeof name !== "string" || typeof bindingId !== "string") return null; const binding: IncludedBinding = { name, binding_id: bindingId }; if (typeof obj.token_count === "number") binding.token_count = obj.token_count; if (typeof obj.reason === "string") binding.reason = obj.reason as IncludedBinding["reason"]; if (typeof obj.reason_detail === "string") binding.reason_detail = obj.reason_detail; return binding; }) .filter(Boolean) as IncludedBinding[]; } } if (ev.event_type === "BindingMaterialized") { const bindingId = ev.binding_id; const name = ev.name; if (typeof bindingId === "string") { view.producedBindings.push({ binding_id: bindingId, name: typeof name === "string" ? name : undefined, }); } } if (ev.event_type === "SessionCompleted") { if (typeof ev.tokens_delta === "number") view.tokens = ev.tokens_delta; if (typeof ev.cost_delta === "number") view.cost = ev.cost_delta; if (typeof ev.output_preview === "string") { view.outputPreview = ev.output_preview; } } // Parse verdicts from DiscretionEvaluated events if (ev.event_type === "DiscretionEvaluated") { const verdict: Verdict = { event_id: ev.event_id, event_type: ev.event_type, }; if (ev.condition !== undefined) verdict.condition = ev.condition; if (ev.result !== undefined) verdict.result = ev.result; if (ev.note !== undefined) verdict.note = ev.note; if (ev.confidence !== undefined) verdict.confidence = ev.confidence; if (ev.uncertainties) { verdict.uncertainties = ev.uncertainties; } // Parse rubric if (ev.rubric) { verdict.rubric = {}; if (ev.rubric.criteria) { verdict.rubric.criteria = ev.rubric.criteria; } if (ev.rubric.thresholds) { verdict.rubric.thresholds = ev.rubric.thresholds; } } // Parse evidence if (ev.evidence) { verdict.evidence = {}; if (ev.evidence.citations) { verdict.evidence.citations = ev.evidence.citations.map((c) => ({ binding_id: c.binding_id, chunk_id: c.chunk_id, excerpt: c.excerpt, })); } if (ev.evidence.tool_outputs) { verdict.evidence.tool_outputs = ev.evidence.tool_outputs.map((t) => ({ tool: t.tool, result: t.result, })); } } view.verdicts.push(verdict); } }

// If no ContextAssembled event but we have a prompt, estimate tokens
// This ensures the Context tab shows something for simple sessions
if (view.tokenEstimate === undefined && view.promptPreview) {
  // Rough estimate: ~4 chars per token
  const promptTokens = Math.max(1, Math.ceil(view.promptPreview.length / 4));
  view.tokenEstimate = promptTokens;

  // Add a synthetic binding for the prompt so the Context tab shows it
  view.includedBindings = [{
    name: "prompt",
    binding_id: "prompt",
    token_count: promptTokens,
    reason: "required",
    reason_detail: "Session prompt",
  }];
}

out.push(view);

}

out.sort((a, b) => a.sessionId.localeCompare(b.sessionId)); return out; }

export function runTotals(events: TraceEvent[]) { let tokens = 0; let cost = 0; let sessions = 0; for (const ev of events) { if (ev.event_type === "SessionCompleted") sessions += 1; if (typeof ev.tokens_delta === "number") tokens += ev.tokens_delta; if (typeof ev.cost_delta === "number") cost += ev.cost_delta; } return { sessions, tokens, cost }; }

export type PreflightView = { mode?: string; profile?: string; sandbox?: boolean; secretStatus?: "ok" | "fail"; secretHits?: string[]; capStatus?: "ok" | "fail"; required?: string[]; allowed?: string[]; missing?: string[]; };

export function preflightFromEvents(events: TraceEvent[]): PreflightView { const out: PreflightView = {}; for (const e of events) { if (e.event_type === "RunBudgetSet") { if (typeof e.mode === "string") out.mode = e.mode; if (typeof e.profile === "string") out.profile = e.profile; if (typeof e.sandbox === "boolean") out.sandbox = e.sandbox; } if (e.event_type === "SecretScanPerformed") { if (typeof e.status === "string" && (e.status === "ok" || e.status === "fail")) out.secretStatus = e.status; if (Array.isArray(e.hits)) out.secretHits = e.hits.filter((x) => typeof x === "string") as string[]; } if (e.event_type === "CapabilityCheckPerformed") { if (typeof e.status === "string" && (e.status === "ok" || e.status === "fail")) out.capStatus = e.status; if (Array.isArray(e.required)) out.required = e.required.filter((x) => typeof x === "string") as string[]; if (Array.isArray(e.allowed)) out.allowed = e.allowed.filter((x) => typeof x === "string") as string[]; if (Array.isArray(e.missing)) out.missing = e.missing.filter((x) => typeof x === "string") as string[]; if (typeof e.profile === "string") out.profile = e.profile; if (typeof e.sandbox === "boolean") out.sandbox = e.sandbox; } } return out; }

export type Alert = { level: "info" | "warn" | "error"; message: string };

export function alertsFromEvents(events: TraceEvent[]): Alert[] { const pf = preflightFromEvents(events); const out: Alert[] = [];

// Only show warnings for actual failures, not missing events (which may be expected for older runtimes) if (pf.secretStatus === "fail") out.push({ level: "error", message: Secret scan failed (${pf.secretHits?.length ?? 0} hit(s)) }); if (pf.capStatus === "fail") out.push({ level: "error", message: Missing capabilities: ${(pf.missing ?? []).join(", ")} });

let redacted = 0; for (const e of events) { if (e.redacted === true) redacted += 1; } if (redacted > 0) out.push({ level: "warn", message: ${redacted} event(s) had redactions applied });

const totals = runTotals(events); if (totals.tokens >= 50_000) out.push({ level: "warn", message: High token usage (${totals.tokens}) }); if (totals.cost >= 2.0) out.push({ level: "warn", message: High estimated cost ($${totals.cost.toFixed(2)}) });

if (pf.sandbox) out.push({ level: "info", message: "Sandbox mode enabled (bash/network/write denied in preflight)" }); if (pf.profile === "untrusted") out.push({ level: "info", message: "Untrusted profile active" });

return out; }

export function bindingNamesFromEvents(events: TraceEvent[]): string[] { const names = new Set(); for (const e of events) { if (e.event_type === "BindingMaterialized" && typeof e.name === "string" && e.name.trim()) { names.add(e.name); } } return Array.from(names).sort(); }

// Memory card types and functions export type MemoryCardType = "goal" | "preference" | "decision" | "fact" | "warning"; export type MemoryRetention = "ephemeral" | "project" | "global";

export type MemoryCard = { card_id: string; agent_id: string; card_type: MemoryCardType; content: string; retention: MemoryRetention; provenance?: { session_id?: string; binding_ids?: string[]; }; created_at: string; updated_at?: string; reference_count: number; is_pruned: boolean; prune_reason?: string; };

export function memoryCardsFromEvents(events: TraceEvent[]): MemoryCard[] { const cards = new Map<string, MemoryCard>();

for (const e of events) { if (e.event_type === "MemoryCardCreated") { const cardId = e.card_id; if (typeof cardId !== "string") continue; cards.set(cardId, { card_id: cardId, agent_id: typeof e.agent_id === "string" ? e.agent_id : "", card_type: (typeof e.card_type === "string" ? e.card_type : "fact") as MemoryCardType, content: typeof e.content === "string" ? e.content : "", retention: (typeof e.retention === "string" ? e.retention : "project") as MemoryRetention, provenance: e.provenance as MemoryCard["provenance"], created_at: e.timestamp, reference_count: 0, is_pruned: false, }); } if (e.event_type === "MemoryCardUpdated") { const cardId = e.card_id; if (typeof cardId !== "string") continue; const card = cards.get(cardId); if (card && typeof e.new_content === "string") { card.content = e.new_content; card.updated_at = e.timestamp; } } if (e.event_type === "MemoryCardPruned") { const cardId = e.card_id; if (typeof cardId !== "string") continue; const card = cards.get(cardId); if (card) { card.is_pruned = true; card.prune_reason = typeof e.reason === "string" ? e.reason : undefined; } } if (e.event_type === "MemoryCardReferenced") { const cardId = e.card_id; if (typeof cardId !== "string") continue; const card = cards.get(cardId); if (card) { card.reference_count += 1; } } }

return Array.from(cards.values()).sort((a, b) => b.reference_count - a.reference_count); }

// Watcher alert types and functions export type WatcherAlertType = "runaway_loop" | "cost_anomaly" | "repeated_failure" | "context_bloat" | "unsafe_write" | "stale_memory";

export type WatcherAlert = { event_id: string; timestamp: string; watcher_id: string; alert_type: WatcherAlertType; severity: "info" | "warning" | "error"; message: string; details?: Record<string, unknown>; recommendation?: string; };

// Configurable thresholds for pattern detectors export interface WatcherThresholds { loopIterationsWarn: number; loopIterationsError: number; contextTokensWarn: number; contextTokensError: number; consecutiveFailuresWarn: number; consecutiveFailuresError: number; agentFailureCount: number; costSpikeMultiplier: number; // Alert if cost > avg * multiplier costMinSampleSize: number; // Min sessions before spike detection }

export const DEFAULT_THRESHOLDS: WatcherThresholds = { loopIterationsWarn: 10, loopIterationsError: 50, contextTokensWarn: 50_000, contextTokensError: 100_000, consecutiveFailuresWarn: 3, consecutiveFailuresError: 5, agentFailureCount: 5, costSpikeMultiplier: 3, costMinSampleSize: 5, };

export function watcherAlertsFromEvents( events: TraceEvent[], thresholds: WatcherThresholds = DEFAULT_THRESHOLDS ): WatcherAlert[] { const alerts: WatcherAlert[] = [];

for (const e of events) { if (e.event_type === "WatcherAlertRaised") { alerts.push({ event_id: e.event_id, timestamp: e.timestamp, watcher_id: typeof e.watcher_id === "string" ? e.watcher_id : "", alert_type: (typeof e.alert_type === "string" ? e.alert_type : "cost_anomaly") as WatcherAlertType, severity: (typeof e.severity === "string" ? e.severity : "warning") as WatcherAlert["severity"], message: typeof e.message === "string" ? e.message : "", details: typeof e.details === "object" ? e.details as Record<string, unknown> : undefined, recommendation: typeof e.recommendation === "string" ? e.recommendation : undefined, }); } }

// Also run pattern-based detection (rule-based watchers) alerts.push(...detectRunawayLoops(events, thresholds)); alerts.push(...detectContextBloat(events, thresholds)); alerts.push(...detectRepeatedFailures(events, thresholds)); alerts.push(...detectCostAnomalies(events, thresholds));

return alerts.sort((a, b) => { const severityOrder = { error: 0, warning: 1, info: 2 }; return severityOrder[a.severity] - severityOrder[b.severity]; }); }

// Pattern-based watcher detectors function detectRunawayLoops(events: TraceEvent[], thresholds: WatcherThresholds): WatcherAlert[] { const alerts: WatcherAlert[] = []; const loopCounts = new Map<string, number>(); const alertedLoops = new Set();

for (const e of events) { if (e.event_type === "LoopIterationStarted" && e.parent_event_id) { const count = (loopCounts.get(e.parent_event_id) ?? 0) + 1; loopCounts.set(e.parent_event_id, count);

  // Warn at threshold
  if (count === thresholds.loopIterationsWarn && !alertedLoops.has(`warn-${e.parent_event_id}`)) {
    alertedLoops.add(`warn-${e.parent_event_id}`);
    alerts.push({
      event_id: `watcher-runaway-${e.parent_event_id}`,
      timestamp: e.timestamp,
      watcher_id: "runaway_loop_detector",
      alert_type: "runaway_loop",
      severity: "warning",
      message: `Loop has reached ${count} iterations`,
      details: { loop_id: e.parent_event_id, iterations: count, threshold: thresholds.loopIterationsWarn },
      recommendation: "Consider adding a max: constraint or more specific exit condition",
    });
  }

  // Error at threshold
  if (count === thresholds.loopIterationsError && !alertedLoops.has(`error-${e.parent_event_id}`)) {
    alertedLoops.add(`error-${e.parent_event_id}`);
    alerts.push({
      event_id: `watcher-runaway-critical-${e.parent_event_id}`,
      timestamp: e.timestamp,
      watcher_id: "runaway_loop_detector",
      alert_type: "runaway_loop",
      severity: "error",
      message: `Loop has reached ${count} iterations - possible runaway`,
      details: { loop_id: e.parent_event_id, iterations: count, threshold: thresholds.loopIterationsError },
      recommendation: "Strongly consider terminating this loop or reviewing the exit condition",
    });
  }
}

}

return alerts; }

function detectContextBloat(events: TraceEvent[], thresholds: WatcherThresholds): WatcherAlert[] { const alerts: WatcherAlert[] = [];

for (const e of events) { if (e.event_type === "ContextAssembled" && e.token_estimate !== undefined) { if (e.token_estimate > thresholds.contextTokensError) { alerts.push({ event_id: watcher-context-bloat-${e.event_id}, timestamp: e.timestamp, watcher_id: "context_bloat_detector", alert_type: "context_bloat", severity: "error", message: Context assembled with ${e.token_estimate.toLocaleString()} tokens (very large), details: { token_count: e.token_estimate, session_id: e.session_id, threshold: thresholds.contextTokensError }, recommendation: "Reduce context by passing only required bindings or summarizing large inputs", }); } else if (e.token_estimate > thresholds.contextTokensWarn) { alerts.push({ event_id: watcher-context-bloat-${e.event_id}, timestamp: e.timestamp, watcher_id: "context_bloat_detector", alert_type: "context_bloat", severity: "warning", message: Context assembled with ${e.token_estimate.toLocaleString()} tokens (large), details: { token_count: e.token_estimate, session_id: e.session_id, threshold: thresholds.contextTokensWarn }, recommendation: "Consider reducing context size for better performance and cost", }); } } }

return alerts; }

function detectRepeatedFailures(events: TraceEvent[], thresholds: WatcherThresholds): WatcherAlert[] { const alerts: WatcherAlert[] = []; const failuresByAgent = new Map<string, number>(); let consecutiveFailures = 0; let lastFailureTime = ""; let warnAlertEmitted = false; let errorAlertEmitted = false;

for (const e of events) { // Both SessionFailed and RunFailed count as failures if (e.event_type === "SessionFailed" || e.event_type === "RunFailed") { consecutiveFailures += 1; lastFailureTime = e.timestamp;

  if (e.event_type === "SessionFailed") {
    const agentId = e.agent_id ?? "unknown";
    failuresByAgent.set(agentId, (failuresByAgent.get(agentId) ?? 0) + 1);
  }

  if (consecutiveFailures === thresholds.consecutiveFailuresWarn && !warnAlertEmitted) {
    warnAlertEmitted = true;
    alerts.push({
      event_id: `watcher-repeated-failure-${e.event_id}`,
      timestamp: e.timestamp,
      watcher_id: "repeated_failure_detector",
      alert_type: "repeated_failure",
      severity: "warning",
      message: `${consecutiveFailures} consecutive failures detected`,
      details: { consecutive_count: consecutiveFailures, threshold: thresholds.consecutiveFailuresWarn },
      recommendation: "Review recent session inputs and consider intervention",
    });
  } else if (consecutiveFailures === thresholds.consecutiveFailuresError && !errorAlertEmitted) {
    errorAlertEmitted = true;
    alerts.push({
      event_id: `watcher-repeated-failure-critical-${e.event_id}`,
      timestamp: e.timestamp,
      watcher_id: "repeated_failure_detector",
      alert_type: "repeated_failure",
      severity: "error",
      message: `${consecutiveFailures} consecutive failures - run may be stuck`,
      details: { consecutive_count: consecutiveFailures, threshold: thresholds.consecutiveFailuresError },
      recommendation: "Consider pausing the run and investigating the failure pattern",
    });
  }
} else if (e.event_type === "SessionCompleted") {
  // Only reset on successful completion
  consecutiveFailures = 0;
  warnAlertEmitted = false;
  errorAlertEmitted = false;
}

}

// Check for agent-specific failure patterns for (const [agentId, count] of failuresByAgent) { if (count >= thresholds.agentFailureCount) { alerts.push({ event_id: watcher-agent-failures-${agentId}, timestamp: lastFailureTime, watcher_id: "repeated_failure_detector", alert_type: "repeated_failure", severity: "warning", message: Agent "${agentId}" has failed ${count} times in this run, details: { agent_id: agentId, failure_count: count, threshold: thresholds.agentFailureCount }, recommendation: "Review agent configuration and task complexity", }); } }

return alerts; }

/**

  • Reconstruct memory card state at a specific timestamp.
  • Used for the memory timeline scrubber to show historical state. */ export function reconstructMemoryAt(events: TraceEvent[], targetTime: string): MemoryCard[] { const cards = new Map<string, MemoryCard>();

for (const e of events) { // Stop processing once we're past the target time if (e.timestamp > targetTime) break;

if (e.event_type === "MemoryCardCreated") {
  const cardId = e.card_id;
  if (typeof cardId !== "string") continue;
  cards.set(cardId, {
    card_id: cardId,
    agent_id: typeof e.agent_id === "string" ? e.agent_id : "",
    card_type: (typeof e.card_type === "string" ? e.card_type : "fact") as MemoryCardType,
    content: typeof e.content === "string" ? e.content : "",
    retention: (typeof e.retention === "string" ? e.retention : "project") as MemoryRetention,
    provenance: e.provenance as MemoryCard["provenance"],
    created_at: e.timestamp,
    reference_count: 0,
    is_pruned: false,
  });
}

if (e.event_type === "MemoryCardUpdated") {
  const cardId = e.card_id;
  if (typeof cardId !== "string") continue;
  const card = cards.get(cardId);
  if (card && typeof e.new_content === "string") {
    card.content = e.new_content;
    card.updated_at = e.timestamp;
  }
}

if (e.event_type === "MemoryCardPruned") {
  const cardId = e.card_id;
  if (typeof cardId !== "string") continue;
  const card = cards.get(cardId);
  if (card) {
    card.is_pruned = true;
    card.prune_reason = typeof e.reason === "string" ? e.reason : undefined;
  }
}

if (e.event_type === "MemoryCardReferenced") {
  const cardId = e.card_id;
  if (typeof cardId !== "string") continue;
  const card = cards.get(cardId);
  if (card) {
    card.reference_count += 1;
  }
}

}

return Array.from(cards.values()).sort((a, b) => b.reference_count - a.reference_count); }

/**

  • Get the time range (first and last timestamps) for a set of events. */ export function getTimeRange(events: TraceEvent[]): { start: string; end: string } | null { if (events.length === 0) return null;

const sorted = [...events].sort((a, b) => a.timestamp.localeCompare(b.timestamp)); return { start: sorted[0].timestamp, end: sorted[sorted.length - 1].timestamp, }; }

function detectCostAnomalies(events: TraceEvent[], thresholds: WatcherThresholds): WatcherAlert[] { const alerts: WatcherAlert[] = []; const sessionCosts: number[] = [];

for (const e of events) { if (e.event_type === "SessionCompleted" && e.cost_delta !== undefined && e.cost_delta !== null) { sessionCosts.push(e.cost_delta);

  // Check for cost spike relative to average (no hardcoded $ floor)
  // Only after we have enough samples for a meaningful comparison
  if (sessionCosts.length >= thresholds.costMinSampleSize) {
    const previousCosts = sessionCosts.slice(0, -1);
    const avg = previousCosts.reduce((a, b) => a + b, 0) / previousCosts.length;

    // Only alert if both the multiplier is exceeded AND the average is non-trivial
    // (avoids alerts when avg is near-zero and any cost looks like a spike)
    if (avg > 0.001 && e.cost_delta > avg * thresholds.costSpikeMultiplier) {
      const multiplier = e.cost_delta / avg;
      alerts.push({
        event_id: `watcher-cost-anomaly-${e.event_id}`,
        timestamp: e.timestamp,
        watcher_id: "cost_anomaly_detector",
        alert_type: "cost_anomaly",
        severity: "warning",
        message: `Session cost $${e.cost_delta.toFixed(4)} is ${multiplier.toFixed(1)}x the average ($${avg.toFixed(4)})`,
        details: {
          session_id: e.session_id,
          cost: e.cost_delta,
          average: avg,
          multiplier: multiplier,
          threshold_multiplier: thresholds.costSpikeMultiplier,
        },
        recommendation: "Investigate why this session was more expensive than typical",
      });
    }
  }
}

}

return alerts; }


## File: ide/apps/mission-control/src/types.ts

export type RunSummary = { run_id: string; trace_path: string; program_path: string; status: "running" | "completed" | "failed"; };

// Base fields present on all trace events type TraceEventBase = { event_id: string; run_id: string; timestamp: string; parent_event_id?: string | null; session_id?: string | null; cost_delta?: number | null; tokens_delta?: number | null; redaction_policy_version: number; redacted?: boolean; redaction_hits?: string[]; };

// Discriminated union for type-safe event handling export type TraceEvent = | (TraceEventBase & { event_type: "RunStarted" }) | (TraceEventBase & { event_type: "RunCompleted" }) | (TraceEventBase & { event_type: "RunFailed"; error_type?: string; message?: string }) | (TraceEventBase & { event_type: "RunBudgetSet"; mode?: string; profile?: string; sandbox?: boolean }) | (TraceEventBase & { event_type: "SecretScanPerformed"; status?: "ok" | "fail"; hits?: string[] }) | (TraceEventBase & { event_type: "CapabilityCheckPerformed"; status?: "ok" | "fail"; required?: string[]; allowed?: string[]; missing?: string[]; profile?: string; sandbox?: boolean; }) | (TraceEventBase & { event_type: "SessionPlanned"; head?: string; kind?: string }) | (TraceEventBase & { event_type: "SessionStarted"; agent_id?: string; prompt_preview?: string }) | (TraceEventBase & { event_type: "SessionCompleted"; agent_id?: string; output_preview?: string }) | (TraceEventBase & { event_type: "SessionFailed"; agent_id?: string; error_type?: string; message?: string }) | (TraceEventBase & { event_type: "ContextAssembled"; token_estimate?: number; included_bindings?: Array<{ name: string; binding_id: string; token_count?: number; reason?: "explicit" | "inferred" | "required" | "memory"; reason_detail?: string; }>; summarization?: { applied?: boolean; original_tokens?: number; compressed_tokens?: number; bindings_affected?: string[]; }; }) | (TraceEventBase & { event_type: "BindingMaterialized"; name?: string; binding_id?: string; path?: string }) | (TraceEventBase & { event_type: "BindingReferenced"; name?: string; binding_id?: string }) | (TraceEventBase & { event_type: "DiscretionEvaluated"; condition?: string; result?: boolean; note?: string; rubric?: { criteria?: string[]; thresholds?: Record<string, number>; }; evidence?: { citations?: Array<{ binding_id: string; chunk_id?: string; excerpt?: string }>; tool_outputs?: Array<{ tool: string; result: string }>; }; confidence?: number; uncertainties?: string[]; }) | (TraceEventBase & { event_type: "LoopIterationStarted"; iteration?: number }) | (TraceEventBase & { event_type: "LoopIterationCompleted"; iteration?: number }) | (TraceEventBase & { event_type: "ParallelBlockStarted" }) | (TraceEventBase & { event_type: "ParallelBlockCompleted" }) | (TraceEventBase & { event_type: "HumanInterventionRequested"; prompt?: string; request_id?: string; summary?: string; question?: string; options?: string[]; }) | (TraceEventBase & { event_type: "HumanInterventionApplied"; note?: string }) | (TraceEventBase & { event_type: "CheckpointCreated"; checkpoint_id?: string; label?: string }) | (TraceEventBase & { event_type: "CheckpointRestored"; checkpoint_id?: string }) | (TraceEventBase & { event_type: "ErrorOccurred"; error_type?: string; message?: string; stack?: string }) | (TraceEventBase & { event_type: "MemoryCardCreated"; card_id?: string; agent_id?: string; card_type?: "goal" | "preference" | "decision" | "fact" | "warning"; content?: string; retention?: "ephemeral" | "project" | "global"; provenance?: { session_id?: string; binding_ids?: string[] }; }) | (TraceEventBase & { event_type: "MemoryCardUpdated"; card_id?: string; agent_id?: string; previous_content?: string; new_content?: string; update_reason?: string; }) | (TraceEventBase & { event_type: "MemoryCardPruned"; card_id?: string; agent_id?: string; reason?: "manual" | "stale" | "retention_policy" | "compaction"; }) | (TraceEventBase & { event_type: "MemoryCardReferenced"; card_id?: string; agent_id?: string; referenced_by_session?: string; }) | (TraceEventBase & { event_type: "WatcherAlertRaised"; watcher_id?: string; alert_type?: "runaway_loop" | "cost_anomaly" | "repeated_failure" | "context_bloat" | "unsafe_write" | "stale_memory"; severity?: "info" | "warning" | "error"; message?: string; details?: Record<string, unknown>; recommendation?: string; }) // VM events (Claude-as-VM execution) | (TraceEventBase & { event_type: "Thought"; text?: string; position?: string }) | (TraceEventBase & { event_type: "ToolCall"; tool_id?: string; tool_name?: string; input_preview?: string }) | (TraceEventBase & { event_type: "ToolResult"; tool_id?: string; is_error?: boolean; output_preview?: string }) | (TraceEventBase & { event_type: "TextOutput"; text?: string }) | (TraceEventBase & { event_type: "Processing" }) | (TraceEventBase & { event_type: "StreamingText"; text?: string; is_complete?: boolean }) | (TraceEventBase & { event_type: "PuppetModeEntered"; request_id?: string; context_summary?: string }) | (TraceEventBase & { event_type: "RuntimeEvent"; event?: Record<string, unknown> });

// Type guard helpers for narrowing export function isEventType<T extends TraceEvent["event_type"]>( event: TraceEvent, type: T ): event is Extract<TraceEvent, { event_type: T }> { return event.event_type === type; }

export type CheckpointInfo = { id: string; label?: string | null; refname: string; commit: string; created_at: string; };

// VM Event types (Claude-as-VM execution) export type VmEvent = | { event_type: "VmStarted"; run_id: string; program_file: string; resume_id?: string } | { event_type: "Thought"; run_id: string; text: string; position?: string } | { event_type: "SessionSpawned"; run_id: string; session_id: string; agent?: string; prompt_preview: string } | { event_type: "SessionCompleted"; run_id: string; session_id: string; output_preview: string } | { event_type: "BindingCreated"; run_id: string; name: string; path: string } | { event_type: "ToolCall"; run_id: string; tool_id: string; tool_name: string; input_preview: string } | { event_type: "ToolResult"; run_id: string; tool_id: string; is_error: boolean; output_preview: string } | { event_type: "TextOutput"; run_id: string; text: string } | { event_type: "FeedbackRequested"; run_id: string; request_id: string; session_id?: string; summary: string; question?: string; options?: string[]; display_binding?: string } | { event_type: "PuppetModeEntered"; run_id: string; request_id: string; session_id: string; agent?: string; context_summary: string; last_output?: string } | { event_type: "SuspensionResolved"; run_id: string; request_id: string } | { event_type: "VmCompleted"; run_id: string; success: boolean; error?: string; resume_id?: string; stderr_tail?: string };


## File: ide/apps/mission-control/src/App.tsx

import { listen, emit } from "@tauri-apps/api/event"; import React, { useEffect, useState, useCallback, useRef, useMemo, lazy, Suspense } from "react"; import { StoreProvider, useAppState, useDerived, useDispatch, useSuspension } from "./store"; import { useTrace } from "./hooks/useTrace"; import { useKeyboard, SHORTCUTS } from "./hooks/useKeyboard"; import { ExecutionFeed } from "./components/Feed"; import { SessionInspector } from "./components/Inspector"; import { FeedbackInput, type FeedbackRequestData } from "./components/Inspector/FeedbackInput"; import { PuppetInterface, type PuppetModeData } from "./components/Inspector/PuppetInterface"; import { RunList } from "./components/Sidebar"; import { StatusBar } from "./components/StatusBar"; import { NotificationBell, RunNotification } from "./components/NotificationBell"; import { Button } from "./components/common"; import { api, RuntimeConnection, RuntimeEvent } from "./lib/api"; import type { TraceEvent, VmEvent } from "./types";

// Lazy load heavy components (~150KB CodeMirror deferred until needed) const BlameGraph = lazy(() => import("./components/BlameGraph").then(m => ({ default: m.BlameGraph }))); const ProseEditor = lazy(() => import("./components/Editor").then(m => ({ default: m.ProseEditor })));

function AppContent() { const state = useAppState(); const derived = useDerived(); const dispatch = useDispatch(); const trace = useTrace(); const suspension = useSuspension();

const [showBlameGraph, setShowBlameGraph] = useState(false); const [showShortcuts, setShowShortcuts] = useState(false); const [showEditor, setShowEditor] = useState(false); const [dismissedNotifications, setDismissedNotifications] = useState<Set>(new Set());

// Build notifications from runs - use backend-provided status const notifications: RunNotification[] = useMemo(() => { return state.runs .filter(r => !dismissedNotifications.has(r.run_id)) .slice(0, 10) // Limit to 10 most recent .map(run => { // For the currently selected run, we can count sessions from loaded events const isSelected = run.run_id === state.selectedRunId; const sessionCount = isSelected ? state.events.filter(e => e.event_type === "SessionStarted").length : 0;

    return {
      runId: run.run_id,
      status: run.status, // Use backend-provided status from trace file
      startedAt: new Date().toISOString(), // We don't have this without loading events
      completedAt: run.status !== "running" ? new Date().toISOString() : undefined,
      sessionCount,
    };
  });

}, [state.runs, state.selectedRunId, state.events, dismissedNotifications]);

const handleNotificationRunClick = useCallback((runId: string) => { dispatch({ type: "SET_SELECTED_RUN", payload: runId }); }, [dispatch]);

const handleClearNotifications = useCallback(() => { const allRunIds = new Set(state.runs.map(r => r.run_id)); setDismissedNotifications(allRunIds); }, [state.runs]);

// Runtime connection state const [runtimeConnected, setRuntimeConnected] = useState(false); const [puppetMode, setPuppetMode] = useState<{ active: boolean; sessionId?: string; agent?: string; contextSummary?: string; lastOutput?: string; }>({ active: false }); const [puppetInput, setPuppetInput] = useState(""); const runtimeRef = useRef<RuntimeConnection | null>(null);

// Event selection handlers const handleEventSelect = useCallback((eventId: string, sessionId: string) => { dispatch({ type: "SET_SELECTED_EVENT", payload: eventId }); dispatch({ type: "SET_SELECTED_SESSION", payload: sessionId });

// Update focused index
const idx = derived.sortedEvents.findIndex((e) => e.event_id === eventId);
if (idx >= 0) dispatch({ type: "SET_FOCUSED_INDEX", payload: idx });

}, [dispatch, derived.sortedEvents]);

const handleSessionSelect = useCallback((sessionId: string) => { dispatch({ type: "SET_SELECTED_SESSION", payload: sessionId }); }, [dispatch]);

// Keyboard navigation const navigateUp = useCallback(() => { const newIdx = Math.max(0, state.focusedIndex - 1); dispatch({ type: "SET_FOCUSED_INDEX", payload: newIdx }); const ev = derived.sortedEvents[newIdx]; if (ev) { dispatch({ type: "SET_SELECTED_EVENT", payload: ev.event_id }); dispatch({ type: "SET_SELECTED_SESSION", payload: ev.session_id ?? "-" }); } }, [state.focusedIndex, derived.sortedEvents, dispatch]);

const navigateDown = useCallback(() => { const newIdx = Math.min(derived.sortedEvents.length - 1, state.focusedIndex + 1); dispatch({ type: "SET_FOCUSED_INDEX", payload: newIdx }); const ev = derived.sortedEvents[newIdx]; if (ev) { dispatch({ type: "SET_SELECTED_EVENT", payload: ev.event_id }); dispatch({ type: "SET_SELECTED_SESSION", payload: ev.session_id ?? "-" }); } }, [state.focusedIndex, derived.sortedEvents, dispatch]);

const handleSelect = useCallback(() => { // Expand details / trigger preview for current selection if (derived.selectedSession?.producedBindings.length) { const first = derived.selectedSession.producedBindings[0]; if (first.name) void trace.previewBinding(first.name); } }, [derived.selectedSession, trace]);

useKeyboard({ onNavigateUp: navigateUp, onNavigateDown: navigateDown, onSelect: handleSelect, onBlame: () => void trace.blameSelectedSession(), onFork: () => void trace.forkSelectedRun(), onCheckpoint: () => void trace.createCheckpoint(), onRefresh: () => void trace.refreshRuns(), onCommandPalette: () => setShowShortcuts((s) => !s), onEscape: () => { setShowShortcuts(false); setShowBlameGraph(false); setShowEditor(false); }, onToggleEditor: () => { if (state.program) setShowEditor((s) => !s); }, }, true);

// E2E Testing: listen for MCP execute-js commands useEffect(() => { let unlisten: (() => void) | null = null;

const setup = async () => {
  unlisten = await listen<string>("execute-js", async (event) => {
    const code = event.payload;
    try {
      // Execute the JS code and capture result (handles both sync and async)
      // eslint-disable-next-line no-eval
      let result = eval(code);
      // If result is a Promise, await it
      if (result instanceof Promise) {
        result = await result;
      }
      const resultStr = result === undefined ? "undefined" : JSON.stringify(result);
      await emit("execute-js-response", { result: resultStr, type: typeof result });
    } catch (err) {
      await emit("execute-js-response", { error: String(err) });
    }
  });
};

void setup();

return () => {
  if (unlisten) unlisten();
};

}, []);

// Load workspace on mount useEffect(() => { void trace.refreshRuns(); }, [state.workspaceRoot]);

useEffect(() => { if (state.workspaceRoot) void trace.refreshCheckpoints(); }, [state.workspaceRoot]);

// Listen for trace events useEffect(() => { let unlisten: (() => void) | null = null; let alive = true;

(async () => {
  unlisten = await listen<TraceEvent>("trace_event", (event) => {
    if (!alive) return;
    dispatch({ type: "ADD_EVENT", payload: event.payload });
  });
})();

return () => {
  alive = false;
  if (unlisten) unlisten();
  void api.watch.stop();
};

}, [dispatch]);

const handleShowBlameGraph = useCallback(async () => { await trace.blameSelectedSession(); setShowBlameGraph(true); }, [trace]);

const handleBlameNodeClick = useCallback((nodeId: string, type: "session" | "binding") => { if (type === "session") { dispatch({ type: "SET_SELECTED_SESSION", payload: nodeId }); } else { void trace.previewBinding(nodeId); } }, [dispatch, trace]);

// Runtime connection handlers const connectToRuntime = useCallback(async () => { if (runtimeRef.current) return;

const runtime = new RuntimeConnection();
runtime.onConnect = () => setRuntimeConnected(true);
runtime.onDisconnect = () => {
  setRuntimeConnected(false);
  setPuppetMode({ active: false });
};
runtime.onEvent = (event: RuntimeEvent) => {
  // Handle puppet mode events
  if (event.event_type === "puppet_mode_entered") {
    setPuppetMode({
      active: true,
      sessionId: event.session_id,
      agent: event.agent,
      contextSummary: event.summary,
      lastOutput: event.output,
    });
  } else if (event.event_type === "puppet_mode_exited") {
    setPuppetMode({ active: false });
  }
  // Also dispatch as trace event for the Loom
  dispatch({ type: "ADD_EVENT", payload: event as unknown as TraceEvent });
};

try {
  await runtime.connect();
  runtimeRef.current = runtime;
} catch (err) {
  console.error("Failed to connect to runtime:", err);
  dispatch({ type: "SET_ERROR", payload: `Runtime connection failed: ${err}` });
}

}, [dispatch]);

const disconnectRuntime = useCallback(() => { runtimeRef.current?.disconnect(); runtimeRef.current = null; setRuntimeConnected(false); setPuppetMode({ active: false }); }, []);

const handleTakeControl = useCallback(async () => { if (!runtimeRef.current || !derived.selectedSession) return; // Send puppet mode request - the runtime will respond with puppet_mode_entered event try { await runtimeRef.current.resume({ type: "PuppetResponse", human_response: "TAKE_CONTROL" }); } catch (err) { console.error("Failed to take control:", err); } }, [derived.selectedSession]);

const handleReleasePuppet = useCallback(async () => { if (!runtimeRef.current) return; try { await runtimeRef.current.resume({ type: "PuppetRelease" }); setPuppetMode({ active: false }); setPuppetInput(""); } catch (err) { console.error("Failed to release puppet:", err); } }, []);

const handleSendPuppetResponse = useCallback(async () => { if (!runtimeRef.current || !puppetInput.trim()) return; try { await runtimeRef.current.resume({ type: "PuppetResponse", human_response: puppetInput }); setPuppetInput(""); } catch (err) { console.error("Failed to send puppet response:", err); } }, [puppetInput]);

// Feedback submission handler (integrated with store suspension state) // Uses Tauri command for VM execution, WebSocket for CLI --interactive mode const handleFeedbackSubmit = useCallback(async (response: { selectedOption?: number; userInput?: string }) => { if (!suspension || suspension.type !== "feedback") return; try { // Try Tauri command first (for run_prose_vm execution) await api.submitFeedback(suspension.requestId, { selected_option: response.selectedOption, user_input: response.userInput, }); dispatch({ type: "SET_SUSPENSION", payload: null }); } catch (err) { // Fallback to WebSocket if Tauri command fails (for --interactive mode) if (runtimeRef.current) { try { await runtimeRef.current.resume({ type: "FeedbackResponse", selected_option: response.selectedOption, user_input: response.userInput, }); dispatch({ type: "SET_SUSPENSION", payload: null }); return; } catch (wsErr) { console.error("WebSocket fallback also failed:", wsErr); } } console.error("Failed to submit feedback:", err); dispatch({ type: "SET_ERROR", payload: Feedback submission failed: ${err} }); } }, [suspension, dispatch]);

// Puppet mode handlers using store suspension state const handlePuppetSubmit = useCallback(async (response: string) => { if (!runtimeRef.current || !suspension || suspension.type !== "puppet") return; try { await runtimeRef.current.resume({ type: "PuppetResponse", human_response: response }); // Don't clear suspension - wait for SuspensionResolved event } catch (err) { console.error("Failed to send puppet response:", err); dispatch({ type: "SET_ERROR", payload: Puppet response failed: ${err} }); } }, [suspension, dispatch]);

const handlePuppetRelease = useCallback(async () => { if (!runtimeRef.current) return; try { await runtimeRef.current.resume({ type: "PuppetRelease" }); dispatch({ type: "SET_SUSPENSION", payload: null }); } catch (err) { console.error("Failed to release puppet:", err); } }, [dispatch]);

// Cleanup runtime connection on unmount useEffect(() => { return () => { runtimeRef.current?.disconnect(); }; }, []);

// Listen for VM events from Claude-as-VM execution useEffect(() => { let unlisten: (() => void) | null = null;

const setupListener = async () => {
  unlisten = await listen<VmEvent>("vm_event", (event) => {
    dispatch({ type: "ADD_VM_EVENT", payload: event.payload });
  });
};

void setupListener();

return () => {
  if (unlisten) unlisten();
};

}, [dispatch]);

// Listen for runtime events from Tauri backend (for live progress) // The backend emits TraceEvents directly via vm_event_to_trace() // Batched to avoid excessive re-renders (dispatch every 50ms) const [isExecuting, setIsExecuting] = useState(false); // Boot phase tracks startup progress: waiting → connected → thinking → null (done) const [bootPhase, setBootPhase] = useState<'waiting' | 'connected' | 'thinking' | null>(null); const eventBufferRef = useRef<TraceEvent[]>([]);

// Track boot phase based on events - stays in boot mode until "meaningful" content arrives useEffect(() => { if (state.error) { setBootPhase(null); return; } if (bootPhase === null) return; // Already exited boot mode

// Check what events we have to determine phase
for (const ev of state.events) {
  const evType = ev.event_type;
  // These events mean we have real content - exit boot mode
  if (evType === 'SessionStarted' || evType === 'ToolCall' ||
      evType === 'TextOutput' || evType === 'SessionCompleted' ||
      evType === 'RunCompleted' || evType === 'RunFailed') {
    setBootPhase(null);
    return;
  }
}

// Still in boot mode - update phase based on what we've seen
const hasStreaming = state.events.some(e => e.event_type === 'StreamingText');
const hasThought = state.events.some(e => e.event_type === 'Thought');
const hasProcessing = state.events.some(e => e.event_type === 'Processing');
const hasRunStarted = state.events.some(e => e.event_type === 'RunStarted');

// Progress through phases: waiting → connected → thinking
if ((hasStreaming || hasThought) && bootPhase !== 'thinking') {
  setBootPhase('thinking');
} else if ((hasProcessing || hasRunStarted) && bootPhase === 'waiting') {
  setBootPhase('connected');
}

}, [state.events, state.error, bootPhase]);

useEffect(() => { let unlisten: (() => void) | null = null;

// Flush buffered events every 50ms
const flushInterval = setInterval(() => {
  if (eventBufferRef.current.length > 0) {
    dispatch({ type: "ADD_EVENTS_BATCH", payload: eventBufferRef.current });
    eventBufferRef.current = [];
  }
}, 50);

const setupListener = async () => {
  // The runtime_event is already a TraceEvent from vm_event_to_trace()
  unlisten = await listen<TraceEvent>("runtime_event", (event) => {
    const traceEvent = event.payload;
    setIsExecuting(true);

    // Buffer the event for batch dispatch
    eventBufferRef.current.push(traceEvent);

    // Check for completion events
    if (traceEvent.event_type === "RunCompleted" ||
        traceEvent.event_type === "RunFailed" ||
        traceEvent.event_type === "ErrorOccurred") {
      // Flush immediately on completion
      if (eventBufferRef.current.length > 0) {
        dispatch({ type: "ADD_EVENTS_BATCH", payload: eventBufferRef.current });
        eventBufferRef.current = [];
      }
      setIsExecuting(false);
      // Refresh runs list to show the new run
      void trace.refreshRuns();
    }
  });
};

void setupListener();

return () => {
  clearInterval(flushInterval);
  // Flush any remaining events on cleanup
  if (eventBufferRef.current.length > 0) {
    dispatch({ type: "ADD_EVENTS_BATCH", payload: eventBufferRef.current });
    eventBufferRef.current = [];
  }
  if (unlisten) unlisten();
};

}, [dispatch, trace]);

const isLive = state.vmRunning || isExecuting;

// Debug: expose test functions for manual testing in dev useEffect(() => { if (import.meta.env.DEV) { (window as any).__testFeedback = () => { dispatch({ type: "SET_SUSPENSION", payload: { type: "feedback", requestId: "test-feedback-001", sessionId: "test-session", summary: "I've completed the initial research phase and found 3 potential approaches.", question: "Which approach should I pursue?", options: [ "Approach A: Fast but less thorough", "Approach B: Balanced speed/quality", "Approach C: Comprehensive but slow", ], }, }); }; (window as any).__testPuppet = () => { dispatch({ type: "SET_SUSPENSION", payload: { type: "puppet", requestId: "test-puppet-001", sessionId: "test-session", agent: "researcher", contextSummary: "Currently analyzing the codebase structure. Found 15 files matching the pattern.", lastOutput: "Found: src/api.ts, src/auth.ts, src/db.ts...", }, }); }; (window as any).__clearSuspension = () => { dispatch({ type: "SET_SUSPENSION", payload: null }); }; console.log("🧪 Test hooks available: __testFeedback(), __testPuppet(), __clearSuspension()"); } }, [dispatch]);

return (

OpenProse {isLive && Running}
{state.workspaceRoot || "(no workspace)"}
<Button variant="primary" onClick={() => void trace.chooseWorkspace()}> Open <Button onClick={() => void trace.refreshRuns()} disabled={!state.workspaceRoot} shortcut="r"> Refresh <Button onClick={() => setShowEditor((s) => !s)} disabled={!state.program} variant={showEditor ? "primary" : undefined} shortcut="e" > {showEditor ? "Close" : "Editor"} <button className="btn btn--icon" onClick={() => setShowShortcuts((s) => !s)} title="Keyboard shortcuts (Cmd+K)" > ⌘
{state.error &&
{state.error}
}

  {/* Keyboard shortcuts modal */}
  {showShortcuts && (
    <div className="shortcuts-modal" onClick={() => setShowShortcuts(false)}>
      <div className="shortcuts-modal__content" onClick={(e) => e.stopPropagation()}>
        <div className="shortcuts-modal__title">Keyboard Shortcuts</div>
        <div className="shortcuts-modal__list">
          {SHORTCUTS.map((s) => (
            <div key={s.key} className="shortcuts-modal__item">
              <kbd>{s.key}</kbd>
              <span>{s.description}</span>
            </div>
          ))}
        </div>
      </div>
    </div>
  )}

  <div className="layout">
    <aside className="sidebar">
      <RunList
        runs={state.runs}
        selectedRunId={state.selectedRunId}
        workspaceRoot={state.workspaceRoot || "~/.mission-control"}
        onRunSelect={(id) => void trace.loadRun(id)}
        onFork={() => void trace.forkSelectedRun()}
        onClearRuns={() => trace.clearRuns()}
      />
    </aside>

    <main className="main">
      {showEditor && state.program ? (
        <Suspense fallback={<div className="execution-feed__empty"><p>Loading editor...</p></div>}>
          <ProseEditor
            initialContent={state.program}
            filePath={state.selectedRunId ? `${state.workspaceRoot}/.prose/runs/${state.selectedRunId}/program.prose` : undefined}
            workspaceRoot={state.workspaceRoot || undefined}
            onContentChange={(content) => dispatch({ type: "SET_PROGRAM", payload: content })}
            onRun={(runId) => {
                    setBootPhase('waiting');
                    setShowEditor(false); // Close editor to show execution feed
                    void trace.loadRun(runId);
                  }}
          />
        </Suspense>
      ) : showBlameGraph ? (
        <Suspense fallback={<div className="execution-feed__empty"><p>Loading graph...</p></div>}>
          <BlameGraph
            blameText={state.bindingPreview}
            onClose={() => setShowBlameGraph(false)}
            onNodeClick={handleBlameNodeClick}
          />
        </Suspense>
      ) : (
        <ExecutionFeed
          events={state.events}
          selectedEventId={state.selectedEventId}
          focusedIndex={state.focusedIndex}
          isLive={isLive}
          bootPhase={bootPhase}
          onEventSelect={handleEventSelect}
        />
      )}
      <StatusBar
        sessions={derived.totals.sessions}
        tokens={derived.totals.tokens}
        cost={derived.totals.cost}
        profile={derived.preflight.profile}
        sandbox={derived.preflight.sandbox}
        isLive={isLive}
      />
    </main>

    <aside className="detail">
      <SessionInspector
        session={derived.selectedSession}
        alerts={derived.alerts}
        preflight={derived.preflight}
        bindingNames={derived.bindingNames}
        checkpoints={state.checkpoints}
        runs={state.runs}
        selectedRunId={state.selectedRunId}
        compareRunId={state.compareRunId}
        compareTotals={state.compareTotals}
        interventionNote={state.interventionNote}
        checkpointLabel={state.checkpointLabel}
        bindingPreview={state.bindingPreview}
        memoryCards={derived.memoryCards}
        watcherAlerts={derived.watcherAlerts}
        onPreviewBinding={(name) => void trace.previewBinding(name)}
        onCreateCheckpoint={() => void trace.createCheckpoint()}
        onRestoreCheckpoint={(id) => void trace.restoreCheckpoint(id)}
        onDiffCheckpoint={(id) => void trace.diffCheckpoint(id)}
        onSendIntervention={() => void trace.sendIntervention()}
        onInterventionNoteChange={(v) => dispatch({ type: "SET_INTERVENTION_NOTE", payload: v })}
        onCheckpointLabelChange={(v) => dispatch({ type: "SET_CHECKPOINT_LABEL", payload: v })}
        onCompareRunChange={(v) => void trace.loadCompareRun(v)}
        onShowBlameGraph={handleShowBlameGraph}
      />
    </aside>
  </div>

  {/* Suspension overlay - shows when feedback or puppet mode is active */}
  {suspension && (
    <div className="suspension-overlay">
      {suspension.type === "feedback" ? (
        <FeedbackInput
          request={{
            requestId: suspension.requestId,
            summary: suspension.summary ?? "",
            question: suspension.question,
            options: suspension.options ?? [],
            displayBinding: suspension.displayBinding,
          }}
          onSubmit={handleFeedbackSubmit}
        />
      ) : (
        <PuppetInterface
          data={{
            requestId: suspension.requestId,
            sessionId: suspension.sessionId ?? "",
            agent: suspension.agent,
            contextSummary: suspension.contextSummary ?? "",
            lastOutput: suspension.lastOutput,
          }}
          onSubmit={handlePuppetSubmit}
          onRelease={handlePuppetRelease}
        />
      )}
    </div>
  )}
</div>

); }

export default function App() { return ( ); }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment