# Dreams Intelligence - Complete Build Prompt for LLM
## System Context
You are building **Dreams Intelligence**, a production-grade semantic code search system for the Dyagnosys mental health platform. The system runs locally on Apple Silicon (M2 Max) with Metal GPU acceleration.
### Tech Stack
- **Embeddings**: MLX framework with nomic-embed-text-v1.5 model
- **Vector DB**: LanceDB with cosine similarity (properly normalized 0.0-1.0)
- **Code Analysis**: tree-sitter + NetworkX for dependency graphs
- **MCP Integration**: FastMCP server for Claude Desktop integration
- **GPU**: Apple Metal (MPS) acceleration for 100ms+ semantic searches
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β USER PROMPT β
β "Build X" | "Search Y" | "Analyze Z" | "Create W" β
ββββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PLANNER AGENT (Real LLM) β
β β’ Anthropic Claude API for task decomposition β
β β’ Generates dependency graph (DAG) β
β β’ Outputs: worker definitions + commands + gates β
β β’ Identifies parallel execution opportunities β
ββββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββ΄ββββββββββββββββββββ
β β
ββββββββββΌβββββββββ ββββββββββββΌββββββββββ
β BUILDER β SSH Reports β COORDINATOR β
β (Local/Mac) β βββββββββββββββ> β (Remote/Alpine) β
β β β’ Gate files β β
β Worker A βββ β β’ Timing logs β β’ WORKDIR β
β Worker B β β β’ Status JSON β β’ TMUX SESSION β
β Worker C β β β β’ LOG file β
β Worker D β β β β’ inotify watcher β
β ... β β β β’ State tracking β
β Worker N βββ β β β
β β β tmux windows: β
β Parallel + β β [A] [B] [C] [N] β
β Error Handling β β Event-driven β
βββββββββββββββββββ ββββββββββββββββββββββ
Runs Tasks Tracks Progress
β
Real LLM Integration: Anthropic Claude API replaces mock decomposition
β
Parallel Execution: Workers execute concurrently when dependencies allow
β
Proper Error Handling: Comprehensive try-catch with rollback mechanisms
β
Secure Configuration: Environment variables and SSH keys (no hardcoded credentials)
β
Event-Driven Gates: inotify-based watching replaces polling
β
State Persistence: JSON state file tracks execution history
β
Non-Root Access: Uses dedicated user account
β
Caching System: Reduces redundant package installations
Save as: orchestrator.conf
# Remote coordinator settings
COORDINATOR_HOST="${COORDINATOR_HOST:-alpine-server}"
COORDINATOR_USER="${COORDINATOR_USER:-taskrunner}"
COORDINATOR_PORT="${COORDINATOR_PORT:-22}"
COORDINATOR_KEY="${COORDINATOR_KEY:-~/.ssh/orchestrator_key}"
# LLM settings
ANTHROPIC_API_KEY="${ANTHROPIC_API_KEY}"
LLM_MODEL="${LLM_MODEL:-claude-3-5-sonnet-20241022}"
LLM_MAX_TOKENS="${LLM_MAX_TOKENS:-8000}"
# Execution settings
MAX_PARALLEL_WORKERS="${MAX_PARALLEL_WORKERS:-4}"
GATE_CHECK_TIMEOUT="${GATE_CHECK_TIMEOUT:-300}"
ENABLE_ROLLBACK="${ENABLE_ROLLBACK:-true}"
ENABLE_CACHE="${ENABLE_CACHE:-true}"Save as: decompose_task.py
#!/usr/bin/env python3
import sys
import json
import os
import anthropic
def decompose_task(prompt: str) -> dict:
# Use Anthropic API for real task decomposition
if not os.environ.get("ANTHROPIC_API_KEY"):
return create_fallback_dag(prompt)
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
system_prompt = """You are an expert task decomposition system. Break down user prompts into executable workers for a MacBook/Linux environment.
Return ONLY valid JSON (no markdown, no explanation):
{
"workers": [
{
"id": "A",
"name": "ShortDescriptiveName",
"commands": ["bash command 1", "bash command 2"],
"depends_on": [],
"gate": "unique_gate_name",
"rollback_commands": ["cleanup command 1"],
"cache_key": "optional_cache_identifier"
}
]
}
Requirements:
- Use sequential IDs: A, B, C, D, etc.
- Commands must be valid bash for macOS/Linux
- depends_on: array of worker IDs that must complete first (empty array if no dependencies)
- Workers with no shared dependencies can run in parallel
- Include rollback_commands for cleanup if task fails
- Add cache_key for cacheable operations (npm install, pip install, docker build, etc.)
- Keep worker names short (2-3 words max)
- Each worker should take 10-120 seconds ideally
- Use absolute paths or cd into directories as needed
- For interactive commands, add --yes or equivalent flags
"""
try:
response = client.messages.create(
model=os.environ.get("LLM_MODEL", "claude-3-5-sonnet-20241022"),
max_tokens=int(os.environ.get("LLM_MAX_TOKENS", "8000")),
system=system_prompt,
messages=[{
"role": "user",
"content": f"Decompose this task into 3-8 executable workers: {prompt}"
}]
)
content = response.content[0].text.strip()
# Remove markdown code blocks if present
if content.startswith("```"):
lines = content.split("\n")
# Find first and last ``` markers
start_idx = 1 if lines[0].startswith("```") else 0
end_idx = len(lines) - 1 if lines[-1].strip() == "```" else len(lines)
content = "\n".join(lines[start_idx:end_idx])
content = content.strip()
if content.startswith("json"):
content = content[4:].strip()
dag = json.loads(content)
# Validate structure
if "workers" not in dag or not isinstance(dag["workers"], list):
raise ValueError("Invalid DAG structure: missing 'workers' array")
if len(dag["workers"]) == 0:
raise ValueError("Invalid DAG: no workers defined")
# Validate each worker
for worker in dag["workers"]:
required_fields = ["id", "name", "commands", "depends_on", "gate"]
for field in required_fields:
if field not in worker:
raise ValueError(f"Worker missing required field: {field}")
return dag
except anthropic.APIError as e:
print(f"Anthropic API Error: {e}", file=sys.stderr)
return create_fallback_dag(prompt)
except json.JSONDecodeError as e:
print(f"JSON Parse Error: {e}", file=sys.stderr)
print(f"Content was: {content[:500]}", file=sys.stderr)
return create_fallback_dag(prompt)
except Exception as e:
print(f"Error in LLM decomposition: {e}", file=sys.stderr)
return create_fallback_dag(prompt)
def create_fallback_dag(prompt: str) -> dict:
# Fallback DAG for when LLM fails
return {
"workers": [{
"id": "A",
"name": "ExecuteTask",
"commands": [
f"echo 'Task: {prompt}'",
"echo 'LLM decomposition unavailable - please check API key and refine prompt'",
"echo 'This is a fallback single-worker execution'"
],
"depends_on": [],
"gate": "execute",
"rollback_commands": [],
"cache_key": ""
}]
}
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: decompose_task.py <prompt>", file=sys.stderr)
sys.exit(1)
prompt = " ".join(sys.argv[1:])
dag = decompose_task(prompt)
print(json.dumps(dag, indent=2))Save as: orchestrate.sh
#!/bin/bash
set -euo pipefail
# ============================================================================
# GENERIC ORCHESTRATOR - Enhanced Version
# Usage: ./orchestrate.sh "Your prompt here"
# ============================================================================
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
CONFIG_FILE="${SCRIPT_DIR}/orchestrator.conf"
# Load configuration
if [ -f "$CONFIG_FILE" ]; then
source "$CONFIG_FILE"
else
echo "β Configuration file not found: $CONFIG_FILE"
echo "π Creating default configuration..."
cat > "$CONFIG_FILE" << 'EOF'
COORDINATOR_HOST="${COORDINATOR_HOST:-alpine-server}"
COORDINATOR_USER="${COORDINATOR_USER:-taskrunner}"
COORDINATOR_PORT="${COORDINATOR_PORT:-22}"
COORDINATOR_KEY="${COORDINATOR_KEY:-~/.ssh/orchestrator_key}"
ANTHROPIC_API_KEY="${ANTHROPIC_API_KEY}"
LLM_MODEL="${LLM_MODEL:-claude-3-5-sonnet-20241022}"
LLM_MAX_TOKENS="${LLM_MAX_TOKENS:-8000}"
MAX_PARALLEL_WORKERS="${MAX_PARALLEL_WORKERS:-4}"
GATE_CHECK_TIMEOUT="${GATE_CHECK_TIMEOUT:-300}"
ENABLE_ROLLBACK="${ENABLE_ROLLBACK:-true}"
ENABLE_CACHE="${ENABLE_CACHE:-true}"
EOF
source "$CONFIG_FILE"
fi
# Validate required settings
if [ -z "${ANTHROPIC_API_KEY:-}" ]; then
echo "β οΈ ANTHROPIC_API_KEY not set - will use fallback mode"
fi
PROMPT="$1"
if [ -z "$PROMPT" ]; then
echo "Usage: $0 \"<prompt>\""
exit 1
fi
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
REMOTE_CONN="${COORDINATOR_USER}@${COORDINATOR_HOST}"
WORKDIR="task_${TIMESTAMP}"
SESSION="task_${TIMESTAMP}"
LOGFILE="log_${TIMESTAMP}.txt"
STATEFILE="state_${TIMESTAMP}.json"
DAG_FILE="/tmp/dag_${TIMESTAMP}.json"
echo "π Starting orchestration for prompt: $PROMPT"
echo "β±οΈ Timestamp: $TIMESTAMP"
echo "π₯οΈ Coordinator: $REMOTE_CONN"
# ============================================================================
# ERROR HANDLING & CLEANUP
# ============================================================================
cleanup() {
local exit_code=$?
if [ $exit_code -ne 0 ] && [ "${ENABLE_ROLLBACK:-true}" = "true" ]; then
echo "β οΈ Error detected (exit code: $exit_code). Initiating rollback..."
rollback_execution || true
fi
}
trap cleanup EXIT
rollback_execution() {
echo "π Rolling back execution..."
ssh -i "$COORDINATOR_KEY" -p "$COORDINATOR_PORT" "$REMOTE_CONN" \
"[ -f ~/$WORKDIR/rollback.sh ] && bash ~/$WORKDIR/rollback.sh" 2>/dev/null || true
echo "β
Rollback complete"
}
# ============================================================================
# PHASE 0: Bootstrap Coordinator
# ============================================================================
echo "π¦ Phase 0: Setting up coordinator..."
ssh -i "$COORDINATOR_KEY" -p "$COORDINATOR_PORT" "$REMOTE_CONN" << 'REMOTE_SETUP'
set -e
WORKDIR="$1"
LOGFILE="$2"
STATEFILE="$3"
SESSION="$4"
mkdir -p ~/$WORKDIR
touch ~/$WORKDIR/$LOGFILE
echo '{"workers":{}}' > ~/$WORKDIR/$STATEFILE
# Install inotify-tools if not present
if ! command -v inotifywait &> /dev/null; then
if command -v apk &> /dev/null; then
sudo apk add --no-cache inotify-tools 2>/dev/null || echo "Could not install inotify-tools"
elif command -v apt-get &> /dev/null; then
sudo apt-get update && sudo apt-get install -y inotify-tools 2>/dev/null || echo "Could not install inotify-tools"
fi
fi
# Create helper functions
cat > ~/$WORKDIR/helpers.sh << 'HELPERS_EOF'
#!/bin/bash
log_step() {
local worker="$1"
local duration="$2"
local status="${3:-success}"
local timestamp=$(date +%s)
echo "[$timestamp] $worker: ${duration}s [$status]" >> ~/$WORKDIR/$LOGFILE
# Update state file (create if doesn't exist)
if [ ! -f ~/$WORKDIR/$STATEFILE ]; then
echo '{"workers":{}}' > ~/$WORKDIR/$STATEFILE
fi
if command -v jq &> /dev/null; then
jq --arg w "$worker" --arg d "$duration" --arg s "$status" --arg t "$timestamp" \
'.workers[$w] = {duration: $d, status: $s, timestamp: $t}' \
~/$WORKDIR/$STATEFILE > ~/$WORKDIR/$STATEFILE.tmp && \
mv ~/$WORKDIR/$STATEFILE.tmp ~/$WORKDIR/$STATEFILE
fi
}
create_gate() {
local gate_name="$1"
touch ~/$WORKDIR/.ok_$gate_name
echo "[$(date +%s)] Gate created: $gate_name" >> ~/$WORKDIR/$LOGFILE
}
watch_gate() {
local gate_name="$1"
local timeout="${2:-300}"
# Return immediately if gate already exists
[ -f ~/$WORKDIR/.ok_$gate_name ] && return 0
# Try event-driven watching first
if command -v inotifywait &> /dev/null; then
timeout $timeout inotifywait -qq -e create,moved_to --format '%f' ~/$WORKDIR 2>/dev/null | \
while read filename; do
[ "$filename" = ".ok_$gate_name" ] && exit 0
done
[ -f ~/$WORKDIR/.ok_$gate_name ] && return 0
fi
# Fallback to polling
local elapsed=0
while [ ! -f ~/$WORKDIR/.ok_$gate_name ]; do
sleep 1
((elapsed++))
if [ $elapsed -ge $timeout ]; then
echo "[$(date +%s)] β±οΈ Timeout waiting for gate: $gate_name" >> ~/$WORKDIR/$LOGFILE
return 1
fi
done
return 0
}
HELPERS_EOF
chmod +x ~/$WORKDIR/helpers.sh
# Create rollback script
cat > ~/$WORKDIR/rollback.sh << 'ROLLBACK_EOF'
#!/bin/bash
echo "[$(date +%s)] Executing rollback procedures..." >> ~/$WORKDIR/$LOGFILE
# Rollback commands will be appended dynamically
ROLLBACK_EOF
chmod +x ~/$WORKDIR/rollback.sh
# Start tmux session
if command -v tmux &> /dev/null; then
tmux new-session -d -s "$SESSION" -n "Overview" \
"watch -n 1 'echo \"=== Task: $WORKDIR ===\" && \
echo \"\" && \
echo \"Gates:\" && \
ls -lh ~/$WORKDIR/.ok_* 2>/dev/null | tail -10 || echo \"No gates yet\" && \
echo \"\" && \
echo \"Recent Log:\" && \
tail -15 ~/$WORKDIR/$LOGFILE 2>/dev/null || echo \"No logs yet\"'"
else
echo "tmux not available, skipping session creation"
fi
echo "Coordinator setup complete"
REMOTE_SETUP
# Pass variables to remote script
ssh -i "$COORDINATOR_KEY" -p "$COORDINATOR_PORT" "$REMOTE_CONN" \
"bash -s $WORKDIR $LOGFILE $STATEFILE $SESSION" < /dev/stdin << 'REMOTE_SETUP'
# ============================================================================
# PHASE 1: Generate DAG from Prompt using Real LLM
# ============================================================================
echo "π§ Phase 1: Decomposing task with Claude API..."
python3 "${SCRIPT_DIR}/decompose_task.py" "$PROMPT" > "$DAG_FILE"
if [ ! -f "$DAG_FILE" ] || [ ! -s "$DAG_FILE" ]; then
echo "β Failed to generate DAG"
exit 1
fi
echo "π Generated DAG:"
cat "$DAG_FILE" | head -50
# Validate JSON
if ! jq empty "$DAG_FILE" 2>/dev/null; then
echo "β Invalid JSON in DAG file"
exit 1
fi
# ============================================================================
# PHASE 2: Create TMUX Monitoring Windows
# ============================================================================
echo "πΊ Phase 2: Creating tmux monitoring windows..."
jq -r '.workers[] | "\(.id)-\(.name)"' "$DAG_FILE" | while IFS= read -r window_name; do
worker_id=$(echo "$window_name" | cut -d'-' -f1)
ssh -i "$COORDINATOR_KEY" -p "$COORDINATOR_PORT" "$REMOTE_CONN" \
"command -v tmux &>/dev/null && tmux new-window -t $SESSION -n '$window_name' \
'watch -n 0.5 \"if [ -f ~/$WORKDIR/.ok_$worker_id ]; then echo β
COMPLETE; else echo β³ Waiting...; fi; \
echo; cat ~/$WORKDIR/$LOGFILE 2>/dev/null | grep $worker_id || true\"' 2>/dev/null || true"
done
# ============================================================================
# PHASE 3: Execute Workers with Parallel Support
# ============================================================================
echo "βοΈ Phase 3: Executing workers (max parallel: $MAX_PARALLEL_WORKERS)..."
# Extract workers and execute with GNU parallel or xargs
declare -A worker_status
export -A worker_status
# Create temporary directory for worker outputs
mkdir -p "/tmp/orchestrator_${TIMESTAMP}"
# Function to execute a single worker
execute_worker() {
local worker_json="$1"
local worker_id=$(echo "$worker_json" | jq -r '.id')
local worker_name=$(echo "$worker_json" | jq -r '.name')
local worker_gate=$(echo "$worker_json" | jq -r '.gate')
local cache_key=$(echo "$worker_json" | jq -r '.cache_key // empty')
local log_file="/tmp/orchestrator_${TIMESTAMP}/worker_${worker_id}.log"
{
echo "πΉ Starting Worker $worker_id: $worker_name"
# Check dependencies
local dependencies=$(echo "$worker_json" | jq -r '.depends_on[]?' 2>/dev/null || echo "")
for dep in $dependencies; do
echo "β³ Waiting for dependency: Worker $dep"
local timeout=0
while ! ssh -i "$COORDINATOR_KEY" -p "$COORDINATOR_PORT" "$REMOTE_CONN" \
"[ -f ~/$WORKDIR/.ok_$dep ]" 2>/dev/null; do
sleep 2
((timeout+=2))
if [ $timeout -ge ${GATE_CHECK_TIMEOUT} ]; then
echo "β Timeout waiting for Worker $dep"
echo "failed" > "/tmp/orchestrator_${TIMESTAMP}/status_${worker_id}"
return 1
fi
done
# Check if dependency failed
if [ -f "/tmp/orchestrator_${TIMESTAMP}/status_$dep" ]; then
dep_status=$(cat "/tmp/orchestrator_${TIMESTAMP}/status_$dep")
if [ "$dep_status" = "failed" ]; then
echo "β Dependency $dep failed, skipping Worker $worker_id"
echo "skipped" > "/tmp/orchestrator_${TIMESTAMP}/status_${worker_id}"
return 1
fi
fi
done
# Check cache
if [ "${ENABLE_CACHE}" = "true" ] && [ -n "$cache_key" ]; then
if ssh -i "$COORDINATOR_KEY" -p "$COORDINATOR_PORT" "$REMOTE_CONN" \
"[ -f ~/$WORKDIR/.cache_$cache_key ]" 2>/dev/null; then
echo "πΎ Cache hit for $cache_key"
ssh -i "$COORDINATOR_KEY" -p "$COORDINATOR_PORT" "$REMOTE_CONN" \
"source ~/$WORKDIR/helpers.sh && create_gate $worker_gate" 2>/dev/null
echo "cached" > "/tmp/orchestrator_${TIMESTAMP}/status_${worker_id}"
return 0
fi
fi
# Execute commands
local start_time=$(date +%s)
local cmd_success=true
echo "$worker_json" | jq -r '.commands[]' | while IFS= read -r cmd; do
echo "π» Running: $cmd"
if eval "$cmd" 2>&1; then
echo "β Command succeeded"
else
echo "β Command failed: $cmd"
cmd_success=false
break
fi
done
local end_time=$(date +%s)
local duration=$((end_time - start_time))
if [ "$cmd_success" = true ]; then
echo "β
Worker $worker_id completed in ${duration}s"
ssh -i "$COORDINATOR_KEY" -p "$COORDINATOR_PORT" "$REMOTE_CONN" << REMOTE_LOG
source ~/$WORKDIR/helpers.sh
log_step '$worker_name' '$duration' 'success'
create_gate $worker_gate
REMOTE_LOG
# Create cache marker
if [ "${ENABLE_CACHE}" = "true" ] && [ -n "$cache_key" ]; then
ssh -i "$COORDINATOR_KEY" -p "$COORDINATOR_PORT" "$REMOTE_CONN" \
"touch ~/$WORKDIR/.cache_$cache_key" 2>/dev/null || true
fi
echo "success" > "/tmp/orchestrator_${TIMESTAMP}/status_${worker_id}"
return 0
else
echo "β Worker $worker_id failed after ${duration}s"
ssh -i "$COORDINATOR_KEY" -p "$COORDINATOR_PORT" "$REMOTE_CONN" \
"source ~/$WORKDIR/helpers.sh && log_step '$worker_name' '$duration' 'failed'" 2>/dev/null || true
# Add rollback commands
echo "$worker_json" | jq -r '.rollback_commands[]?' 2>/dev/null | while IFS= read -r rollback_cmd; do
[ -n "$rollback_cmd" ] && ssh -i "$COORDINATOR_KEY" -p "$COORDINATOR_PORT" "$REMOTE_CONN" \
"echo '$rollback_cmd' >> ~/$WORKDIR/rollback.sh" 2>/dev/null || true
done
echo "failed" > "/tmp/orchestrator_${TIMESTAMP}/status_${worker_id}"
return 1
fi
} 2>&1 | tee "$log_file"
}
export -f execute_worker
export COORDINATOR_KEY COORDINATOR_PORT REMOTE_CONN WORKDIR
export GATE_CHECK_TIMEOUT ENABLE_CACHE TIMESTAMP
# Execute workers with proper dependency ordering
# Note: xargs -P handles parallelism; dependencies handled within execute_worker
jq -c '.workers[]' "$DAG_FILE" | xargs -P "${MAX_PARALLEL_WORKERS}" -I {} \
bash -c 'execute_worker "$@"' _ {}
# Wait for all background processes
wait
# ============================================================================
# PHASE 4: Final Report
# ============================================================================
echo ""
echo "βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ"
echo "β
ORCHESTRATION COMPLETE"
echo "βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ"
echo "π Coordinator WORKDIR: ~/$WORKDIR"
echo "πΊ TMUX Session: $SESSION"
echo "π Log File: ~/$WORKDIR/$LOGFILE"
echo "πΎ State File: ~/$WORKDIR/$STATEFILE"
echo ""
# Collect status from all workers
echo "π Worker Status:"
failed_workers=0
for status_file in /tmp/orchestrator_${TIMESTAMP}/status_*; do
if [ -f "$status_file" ]; then
worker_id=$(basename "$status_file" | sed 's/status_//')
status=$(cat "$status_file")
echo " $worker_id: $status"
[ "$status" = "failed" ] && ((failed_workers++))
fi
done
echo ""
echo "π Top 10 Slowest Steps:"
ssh -i "$COORDINATOR_KEY" -p "$COORDINATOR_PORT" "$REMOTE_CONN" \
"sort -t: -k2 -rn ~/$WORKDIR/$LOGFILE 2>/dev/null | head -10" || echo "No timing data available"
echo ""
echo "π‘ To attach to monitoring: ssh $REMOTE_CONN -t tmux attach -t $SESSION"
echo "π‘ Worker logs available in: /tmp/orchestrator_${TIMESTAMP}/"
echo "βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ"
# Exit with error if any workers failed
if [ $failed_workers -gt 0 ]; then
echo ""
echo "β οΈ $failed_workers worker(s) failed - check logs for details"
exit 1
fi
echo ""
echo "π All workers completed successfully!"# Create SSH key for coordinator
ssh-keygen -t ed25519 -f ~/.ssh/orchestrator_key -N ""
# Copy to coordinator machine
ssh-copy-id -i ~/.ssh/orchestrator_key taskrunner@alpine-server
# Install dependencies
pip3 install anthropic
# Make scripts executable
chmod +x orchestrate.sh decompose_task.py
# Set API key
export ANTHROPIC_API_KEY="your-api-key-here"./orchestrate.sh "Build a simple Express.js API with 3 endpoints"./orchestrate.sh "Create a Next.js 14 app with TypeScript, Tailwind, Prisma SQLite, and basic auth"./orchestrate.sh "Build Python data pipeline: fetch CSV from URL, clean data, run analysis, generate report"./orchestrate.sh "Setup Docker compose with nginx, postgres, redis, and a Node.js app"# Attach to tmux session
ssh taskrunner@alpine-server -t tmux attach -t task_20260117_123456
# View specific worker
tmux select-window -t :2 # Window 2
# Detach: Ctrl+B then D# View state JSON
ssh taskrunner@alpine-server "cat ~/task_*/state_*.json | jq"
# Check gates
ssh taskrunner@alpine-server "ls -lh ~/task_*/.ok_* ~/task_*/.cache_*"
# View logs
ssh taskrunner@alpine-server "tail -50 ~/task_*/log_*.txt"
# Local worker logs
ls -lh /tmp/orchestrator_*/
cat /tmp/orchestrator_*/worker_A.log# Create gate manually to unblock
ssh taskrunner@alpine-server "touch ~/task_20260117_123456/.ok_A"
# Execute rollback
ssh taskrunner@alpine-server "bash ~/task_20260117_123456/rollback.sh"
# Clean up old tasks
ssh taskrunner@alpine-server "rm -rf ~/task_2026*"- SSH key authentication configured
- Non-root user (taskrunner) on coordinator
- ANTHROPIC_API_KEY in environment (not hardcoded)
- Coordinator SSH port changed from 22 (optional)
- Firewall rules restricting SSH access
- Regular key rotation policy
- Audit logs enabled on coordinator
| Workload Type | MAX_PARALLEL_WORKERS | GATE_CHECK_TIMEOUT |
|---|---|---|
| I/O Heavy (API calls, downloads) | 8-12 | 600 |
| CPU Heavy (builds, compilation) | 2-4 | 1800 |
| Mixed | 4-6 | 300 |
Automatically caches:
- Package installations (npm, pip, cargo, etc.)
- Docker image builds
- Database migrations
- Asset compilation
Cache invalidation: Change cache_key value in worker definition.
| Feature | Original | Enhanced |
|---|---|---|
| Task Decomposition | Mock if/else logic | Real Claude API |
| Execution Model | Sequential only | Parallel with DAG |
| Error Handling | Silent failures (` | |
| Configuration | Hardcoded values | Environment + config file |
| Gate Checking | Polling every 0.5s | Event-driven inotify |
| State Management | None | JSON state file |
| Security | Root access, hardcoded IPs | Non-root, SSH keys, env vars |
| Caching | None | Intelligent cache system |
| Observability | Basic tmux + logs | tmux + logs + state + per-worker logs |
# Set in current session
export ANTHROPIC_API_KEY="sk-ant-..."
# Or add to shell profile
echo 'export ANTHROPIC_API_KEY="sk-ant-..."' >> ~/.bashrc
source ~/.bashrc# Test connection
ssh -i ~/.ssh/orchestrator_key taskrunner@alpine-server "echo connected"
# Check SSH service on coordinator
ssh root@alpine-server "service sshd status"
# Verify key permissions
chmod 600 ~/.ssh/orchestrator_key# Check which gates exist
ssh taskrunner@alpine-server "ls ~/task_*/\.ok_*"
# Check worker dependencies
jq '.workers[] | {id, depends_on}' /tmp/dag_*.json
# View worker logs
cat /tmp/orchestrator_*/worker_*.log
# Create gate manually
ssh taskrunner@alpine-server "touch ~/task_20260117_123456/.ok_B"# Verify xargs supports -P flag
xargs --help | grep -- -P
# Check MAX_PARALLEL_WORKERS setting
grep MAX_PARALLEL_WORKERS orchestrator.conf
# Monitor parallel execution
watch -n 1 'ps aux | grep execute_worker'β
Production-Ready: Real LLM, error handling, rollback, state persistence
β
Secure: SSH keys, non-root, environment-based secrets
β
Performant: Parallel execution, event-driven gates, intelligent caching
β
Observable: Multi-level logging, tmux monitoring, JSON state
β
Maintainable: Configuration-driven, modular design, clear separation of concerns
β
Portable: Works with any SSH-accessible coordinator, adaptable to different infrastructure
β
Reliable: Comprehensive error handling, automatic rollback, dependency management
Made with β‘ by Carlos for production-grade AI orchestration