|
from __future__ import annotations |
|
|
|
import argparse |
|
import os |
|
import re |
|
import requests |
|
import subprocess |
|
import sys |
|
from pathlib import Path |
|
from typing import List, Tuple, Optional, Callable, Dict |
|
|
|
import google.generativeai as genai |
|
# Retrieve the API key from the environment. We avoid importing |
|
# ``app.py`` from the Flask implementation because the project may be |
|
# running under Django instead. If the ``GOOGLE_API_KEY`` environment |
|
# variable is not set, API calls to Gemini will fail gracefully. |
|
API_KEY = os.getenv("GOOGLE_API_KEY") |
|
|
|
# ========= Config ========= |
|
DEFAULT_OUTPUT_DIR = Path(r"/mnt/c/Users/ben/Niagara4.13/JENEsys") |
|
|
|
# --- MODIFICATION: Tiered Model Configuration --- |
|
# Use a fast, lightweight model for initial generation and standard fixes. |
|
MODEL_NAME_LITE = "gemini-2.5-flash" |
|
# Use a powerful, advanced model for a single, high-quality fix attempt. |
|
MODEL_NAME_PRO = "gemini-2.5-pro" |
|
# The total number of attempts the agent will make. |
|
MAX_ITERS_DEFAULT = 5 |
|
# The specific attempt number on which to escalate to the PRO model. |
|
PRO_ESCALATION_ATTEMPT = ( |
|
3 # Lite-First: Try with Lite model first, escalate on 3rd attempt |
|
) |
|
# --- End MODIFICATION --- |
|
|
|
# Global metrics, reset for each session |
|
LLM_CALLS = 0 |
|
LLM_INPUT_TOKENS = 0 |
|
LLM_OUTPUT_TOKENS = 0 |
|
|
|
# Toggle remote fetch via env: set DISABLE_REMOTE_CONTEXT=1 to force local-only |
|
DISABLE_REMOTE = os.getenv("DISABLE_REMOTE_CONTEXT", "").lower() in ("1", "true", "yes") |
|
|
|
# Optional: override the branch/path via env if you ever need to |
|
REMOTE_BASE = os.getenv( |
|
"REMOTE_CONTEXT_BASE", |
|
"https://raw.githubusercontent.com/bbartling/pybog/develop/context", |
|
) |
|
|
|
REMOTE_MAP = { |
|
"kitControlIntrol.txt": f"{REMOTE_BASE}/kitControlIntrol.txt", |
|
"llms.txt": f"{REMOTE_BASE}/llms.txt", |
|
"llms-full.txt": f"{REMOTE_BASE}/llms-full.txt", |
|
} |
|
|
|
|
|
def _safe_read(path_obj: Path) -> str: |
|
try: |
|
return path_obj.read_text(encoding="utf-8", errors="ignore") |
|
except Exception: |
|
return "" |
|
|
|
|
|
def _fetch_remote(url: str, timeout_s: float = 5.0) -> str | None: |
|
try: |
|
r = requests.get( |
|
url, |
|
headers={"User-Agent": "BogMaker/1.0 (+https://example.local)"}, |
|
timeout=timeout_s, |
|
) |
|
if r.status_code == 200 and r.text: |
|
print(f" - Remote fetch SUCCESS)") |
|
return r.text |
|
print(f" - Remote fetch non-200 or empty: {url} (code={r.status_code})") |
|
except Exception as e: |
|
print(f" - Remote fetch failed: {url} ({e})") |
|
return None |
|
|
|
|
|
def _load_and_cache_examples() -> str: |
|
""" |
|
Loads key context files in a fixed order with remote-first fallback. |
|
""" |
|
print("📦 Starting context caching (remote-first, local fallback)...") |
|
|
|
try: |
|
script_dir = Path(__file__).resolve().parent |
|
context_dir = script_dir / "context" |
|
if not context_dir.is_dir(): |
|
print(f"⚠️ 'context' directory not found at {context_dir}") |
|
except NameError: |
|
print("⚠️ Could not determine script path. Using CWD instead.") |
|
context_dir = Path.cwd() / "context" |
|
|
|
files_to_load = ["kitControlIntrol.txt", "llms.txt", "llms-full.txt"] |
|
chunks, loaded_files, missing_files = [], [], [] |
|
|
|
print(f"🔎 Searching for files in {context_dir}...\n") |
|
|
|
for idx, filename in enumerate(files_to_load, start=1): |
|
print(f"[{idx}/{len(files_to_load)}] Loading '{filename}'...") |
|
text = None |
|
|
|
if not DISABLE_REMOTE: |
|
remote_url = REMOTE_MAP.get(filename) |
|
if remote_url: |
|
print(f" 🌐 Trying remote → {remote_url}") |
|
text = _fetch_remote(remote_url) |
|
|
|
if not text: |
|
file_path = context_dir / filename |
|
if file_path.is_file(): |
|
print(f" 📂 Using local copy at {file_path}") |
|
text = _safe_read(file_path) |
|
else: |
|
print(f" ❌ Local file not found: {file_path}") |
|
|
|
if text: |
|
chunks.append(f"\n=== FILE: {filename} ===\n{text}\n=== END FILE ===\n") |
|
loaded_files.append(filename) |
|
print(f" ✅ Successfully loaded '{filename}'\n") |
|
else: |
|
missing_files.append(filename) |
|
print(f" ⚠️ Skipped '{filename}' (not found remotely or locally)\n") |
|
|
|
print("📊 Context file load summary:") |
|
print(f" ✅ Loaded: {', '.join(loaded_files) if loaded_files else 'None'}") |
|
print(f" ❌ Missing: {', '.join(missing_files) if missing_files else 'None'}") |
|
|
|
return "".join(chunks) |
|
|
|
|
|
CACHED_EXAMPLES_BLOB: str = _load_and_cache_examples() |
|
|
|
|
|
# ============================================================= |
|
def init_gemini(model_name: str) -> Optional[genai.GenerativeModel]: |
|
api_key = API_KEY |
|
|
|
if not api_key: |
|
print("❌ GOOGLE_API_KEY environment variable not set.") |
|
return None |
|
try: |
|
genai.configure(api_key=api_key) |
|
return genai.GenerativeModel(model_name) |
|
except Exception as e: |
|
print(f"❌ Failed to configure Gemini model '{model_name}': {e}") |
|
return None |
|
|
|
|
|
# ========= LLM prompts & helpers ========= |
|
def _extract_python_code(text: str) -> str: |
|
if not text: |
|
return "" |
|
t = text.strip() |
|
fence = re.search(r"```(?:python)?\s*(.*?)```", t, flags=re.S) |
|
if fence: |
|
return fence.group(1).strip() |
|
return t |
|
|
|
|
|
# ========= Intake Helpers ========= |
|
def build_intake_system_prompt() -> str: |
|
return ( |
|
"You are a Niagara .bog builder assistant. Reflect the user's HVAC request " |
|
"back in 1 short paragraph. Then ask exactly two things: " |
|
"1) Which kitControl widgets should be used (offer common ones), " |
|
"2) Any specific variable names and default values. " |
|
"Be concise. Do not start building code yet." |
|
) |
|
|
|
|
|
def build_intake_user_prompt(user_freeform: str, kitcontrol_catalog_text: str) -> str: |
|
return ( |
|
"User request:\n" |
|
f"{user_freeform}\n\n" |
|
"Reference kitControl catalog (not all tested):\n" |
|
f"{kitcontrol_catalog_text}\n\n" |
|
"Follow Niagara naming rules and ms strings for time.\n" |
|
"Reply with:\n" |
|
"- Acknowledge and restate the intended .bog\n" |
|
"- Ask: which kitControl widgets do you want?\n" |
|
"- Ask: any variable names and default values?\n" |
|
) |
|
|
|
|
|
def consolidate_final_prompt(initial_user_prompt: str, intake_user_reply: str) -> str: |
|
return ( |
|
"Final build intent:\n" |
|
f"{initial_user_prompt}\n\n" |
|
"User intake answers:\n" |
|
f"{intake_user_reply}\n\n" |
|
"Please generate exactly one Python builder script that uses BogFolderBuilder " |
|
"and the requested kitControl widgets, mirroring proven examples. " |
|
"Honor naming rules and link semantics. Use tested blocks where possible." |
|
) |
|
|
|
|
|
def run_pipeline_with_intake( |
|
initial_user_prompt: str, |
|
kitcontrol_catalog_text: str, |
|
log_callback: Optional[Callable[[str], None]] = None, |
|
max_iters: int = MAX_ITERS_DEFAULT, |
|
) -> Dict[str, object]: |
|
""" |
|
Execute the intake round using the LITE model. |
|
""" |
|
log_callback = log_callback or print |
|
model = init_gemini(MODEL_NAME_LITE) |
|
if model is None: |
|
msg = ( |
|
f"Gemini Lite model ({MODEL_NAME_LITE}) unavailable. Check GOOGLE_API_KEY." |
|
) |
|
log_callback(f"❌ {msg}") |
|
return {"error": msg} |
|
|
|
system_prompt = build_intake_system_prompt() |
|
user_prompt = build_intake_user_prompt(initial_user_prompt, kitcontrol_catalog_text) |
|
full_prompt = f"{system_prompt}\n\n{user_prompt}" |
|
|
|
global LLM_CALLS |
|
LLM_CALLS += 1 |
|
try: |
|
resp = model.generate_content(full_prompt) |
|
_record_and_print_usage( |
|
resp, label=f"intake ({MODEL_NAME_LITE})", log_callback=log_callback |
|
) |
|
content = (resp.text or "").strip() |
|
except Exception as e: |
|
log_callback(f"❌ Intake LLM call failed: {e}") |
|
return {"error": str(e)} |
|
|
|
return { |
|
"intake_message_for_user": content, |
|
"max_iters": max_iters, |
|
} |
|
|
|
|
|
def continue_pipeline_after_intake( |
|
initial_user_prompt: str, |
|
intake_user_reply: str, |
|
bog_file_name: str, |
|
output_dir: Path, |
|
log_callback: Optional[Callable[[str], None]] = None, |
|
max_iters: int = MAX_ITERS_DEFAULT, |
|
) -> Dict: |
|
final_prompt = consolidate_final_prompt(initial_user_prompt, intake_user_reply) |
|
log_callback = log_callback or print |
|
return run_agent_session( |
|
description=final_prompt, |
|
bog_file_name=bog_file_name, |
|
output_dir=output_dir, |
|
max_iters=max_iters, |
|
log_callback=log_callback, |
|
) |
|
|
|
|
|
def _record_and_print_usage( |
|
resp, label: str, log_callback: Callable[[str], None] |
|
) -> None: |
|
global LLM_INPUT_TOKENS, LLM_OUTPUT_TOKENS |
|
meta = getattr(resp, "usage_metadata", None) |
|
in_tok = int(getattr(meta, "prompt_token_count", 0) or 0) if meta else 0 |
|
out_tok = int(getattr(meta, "candidates_token_count", 0) or 0) if meta else 0 |
|
tot_tok = ( |
|
int(getattr(meta, "total_token_count", in_tok + out_tok) or (in_tok + out_tok)) |
|
if meta |
|
else (in_tok + out_tok) |
|
) |
|
LLM_INPUT_TOKENS += in_tok |
|
LLM_OUTPUT_TOKENS += out_tok |
|
log_line = f"[TOKENS] {label} prompt={in_tok} output={out_tok} total={tot_tok}" |
|
log_callback(log_line) |
|
|
|
|
|
def llm_generate_script( |
|
model: genai.GenerativeModel, |
|
description: str, |
|
examples_blob: str, |
|
bog_file_name: str, |
|
log_callback: Callable[[str], None], |
|
label: str = "generate", |
|
) -> str: |
|
global LLM_CALLS |
|
LLM_CALLS += 1 |
|
prompt = f""" |
|
You are an expert Niagara / HVAC controls code generator to assist in the human HVAC controls engineer on the Niagara4 platform. |
|
Your task is to produce a single, complete, runnable Python script using the BogFolderBuilder API. |
|
|
|
**Requirements for the Python script:** |
|
- It must import `BogFolderBuilder` from `bog_builder`. |
|
- It must define a `main()` function. |
|
- It must save exactly one `.bog` file named '{bog_file_name}.bog' TO THE CURRENT WORKING DIRECTORY. |
|
- It must print the absolute path to the created `.bog` file upon success. |
|
- The script should be self-contained and use only the Python standard library besides `bog_builder`. |
|
- The script MUST NOT accept any command-line arguments for the output path. |
|
|
|
**User's Description:** |
|
{description} |
|
|
|
**Reference Examples (BogFolderBuilder API usage):** |
|
{examples_blob} |
|
""" |
|
resp = model.generate_content(prompt) |
|
_record_and_print_usage(resp, label=label, log_callback=log_callback) |
|
return _extract_python_code((resp.text or "").strip()) |
|
|
|
|
|
def llm_fix_script( |
|
model, |
|
description, |
|
prev_code, |
|
error_text, |
|
examples_blob, |
|
bog_file_name, |
|
log_callback, |
|
label="fix", |
|
) -> str: |
|
global LLM_CALLS |
|
LLM_CALLS += 1 |
|
prompt = f""" |
|
The previous Python script you generated for the description "{description}" failed with: |
|
|
|
--- |
|
{error_text} |
|
--- |
|
|
|
Please return a complete, corrected script. It must: |
|
- import BogFolderBuilder from bog_builder only (no extra third-party imports) |
|
- save '{bog_file_name}.bog' to the CURRENT WORKING DIRECTORY |
|
- not accept any CLI args |
|
|
|
# API Reference (excerpt) |
|
{examples_blob} |
|
""" |
|
resp = model.generate_content(prompt) |
|
_record_and_print_usage(resp, label=label, log_callback=log_callback) |
|
return _extract_python_code((resp.text or "").strip()) |
|
|
|
|
|
def llm_verify_and_refine_script( |
|
model: genai.GenerativeModel, |
|
description: str, |
|
code_to_review: str, |
|
bog_file_name: str, |
|
log_callback: Callable[[str], None], |
|
label: str = "verify", |
|
) -> str: |
|
"""Asks a powerful model to review and refine a working script.""" |
|
global LLM_CALLS |
|
LLM_CALLS += 1 |
|
prompt = f""" |
|
You are an expert Niagara / HVAC controls code generator acting as a quality assurance specialist. |
|
A junior script was generated to fulfill the user's request. Your task is to review it and provide a final, production-ready version. |
|
|
|
**User's Original Description:** |
|
{description} |
|
|
|
**Working Script to Review:** |
|
```python |
|
{code_to_review} |
|
``` |
|
|
|
**Requirements:** |
|
|
|
- Review the script for any bugs, inefficiencies, or deviations from best practices. |
|
- Ensure it correctly saves a file named '{bog_file_name}.bog' to the current directory. |
|
- If the script is already excellent and correct, return it exactly as is, without any extra text or explanation. |
|
- If you find areas for improvement, return a single, complete, corrected Python script inside a ` python ... ` block. |
|
""" |
|
resp = model.generate_content(prompt) |
|
_record_and_print_usage(resp, label=label, log_callback=log_callback) |
|
return _extract_python_code((resp.text or "").strip()) |
|
|
|
|
|
# ========= Script Execution ========= |
|
|
|
|
|
def write_script(code: str, path: Path) -> None: |
|
path.write_text(code, encoding="utf-8") |
|
try: |
|
path.chmod(0o755) |
|
except Exception: |
|
pass |
|
|
|
|
|
def run_script( |
|
script_path: Path, out_dir: Path, timeout_sec: int = 120 |
|
) -> Tuple[bool, str, str]: |
|
cmd = [sys.executable, str(script_path)] |
|
proc = subprocess.Popen( |
|
cmd, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
encoding="utf-8", |
|
errors="replace", |
|
cwd=out_dir, |
|
) |
|
try: |
|
stdout, stderr = proc.communicate(timeout=timeout_sec) |
|
except subprocess.TimeoutExpired: |
|
proc.kill() |
|
stdout, stderr = proc.communicate() |
|
stderr += "\n[TimeoutExpired]" |
|
ok = proc.returncode == 0 |
|
return ok, stdout, stderr |
|
|
|
|
|
# ========= Core Agent Logic (Callable) ========= |
|
def run_agent_session( |
|
description: str, |
|
bog_file_name: str, |
|
output_dir: Path, |
|
max_iters: int = MAX_ITERS_DEFAULT, |
|
log_callback: Optional[Callable[[str], None]] = None, |
|
) -> Dict: |
|
""" |
|
Runs the full agent process with tiered model logic and a final verification step. |
|
""" |
|
log_callback = log_callback or print |
|
|
|
global LLM_CALLS, LLM_INPUT_TOKENS, LLM_OUTPUT_TOKENS |
|
LLM_CALLS, LLM_INPUT_TOKENS, LLM_OUTPUT_TOKENS = 0, 0, 0 |
|
|
|
# --- Initialize both models at the start of the session --- |
|
log_callback(f"Initializing Lite model: {MODEL_NAME_LITE}") |
|
model_lite = init_gemini(MODEL_NAME_LITE) |
|
log_callback(f"Initializing Pro model: {MODEL_NAME_PRO}") |
|
model_pro = init_gemini(MODEL_NAME_PRO) |
|
|
|
if not model_lite or not model_pro: |
|
msg = "Gemini unavailable. Could not initialize one or both models. Check GOOGLE_API_KEY." |
|
log_callback(f"❌ {msg}") |
|
return {"success": False, "error": msg} |
|
|
|
log_callback("Using cached examples for context...") |
|
examples_blob = CACHED_EXAMPLES_BLOB |
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
script_path = output_dir / f"{bog_file_name}_script.py" |
|
|
|
created_bog_path: Optional[Path] = None |
|
attempts = 0 |
|
last_code = "" |
|
last_err = "" |
|
# Initialize model_label here to have it available after the loop |
|
model_label = "" |
|
|
|
while attempts < max_iters: |
|
attempts += 1 |
|
log_callback(f"\n--- Attempt {attempts}/{max_iters} ---") |
|
|
|
# --- Tiered model selection logic --- |
|
if attempts == PRO_ESCALATION_ATTEMPT: |
|
log_callback( |
|
f"⚠️ Escalating to {MODEL_NAME_PRO} for a high-quality fix attempt..." |
|
) |
|
current_model = model_pro |
|
model_label = MODEL_NAME_PRO |
|
else: |
|
current_model = model_lite |
|
model_label = MODEL_NAME_LITE |
|
|
|
if attempts == 1: |
|
code = llm_generate_script( |
|
current_model, |
|
description, |
|
examples_blob, |
|
bog_file_name, |
|
log_callback, |
|
label=f"generate (attempt {attempts} using {model_label})", |
|
) |
|
else: |
|
code = llm_fix_script( |
|
current_model, |
|
description, |
|
last_code, |
|
last_err, |
|
examples_blob, |
|
bog_file_name, |
|
log_callback, |
|
label=f"fix (attempt {attempts} using {model_label})", |
|
) |
|
|
|
if not code.strip(): |
|
log_callback("❌ LLM returned empty code.") |
|
last_err = "LLM returned empty code." |
|
continue |
|
|
|
write_script(code, script_path) |
|
log_callback(f"Running generated script: {script_path.name}") |
|
ok, stdout, stderr = run_script(script_path, output_dir) |
|
last_code, last_err = code, (stderr or stdout or "") |
|
|
|
if stdout: |
|
log_callback(f"SCRIPT STDOUT:\n{stdout}") |
|
if stderr: |
|
log_callback(f"SCRIPT STDERR:\n{stderr}") |
|
|
|
candidate = output_dir / f"{bog_file_name}.bog" |
|
if ok and candidate.exists(): |
|
created_bog_path = candidate |
|
log_callback( |
|
f"✅ Script executed successfully and created {candidate.name}" |
|
) |
|
break |
|
else: |
|
log_callback("❌ Run failed. Preparing to fix...") |
|
|
|
# Only run verification if the Pro model wasn't the one that produced the successful script. |
|
if created_bog_path and model_label != MODEL_NAME_PRO: |
|
log_callback("\n--- Final Verification Pass ---") |
|
log_callback( |
|
f"✅ Lite model succeeded. Submitting to {MODEL_NAME_PRO} for final review." |
|
) |
|
|
|
successful_code = script_path.read_text(encoding="utf-8") |
|
|
|
verified_code = llm_verify_and_refine_script( |
|
model=model_pro, |
|
description=description, |
|
code_to_review=successful_code, |
|
bog_file_name=bog_file_name, |
|
log_callback=log_callback, |
|
label=f"verify (using {MODEL_NAME_PRO})", |
|
) |
|
|
|
if verified_code and verified_code.strip() and verified_code != successful_code: |
|
log_callback( |
|
f"✅ {MODEL_NAME_PRO} provided refinements. Running final verified script." |
|
) |
|
write_script(verified_code, script_path) |
|
# Re-run the script with the Pro model's changes |
|
ok, stdout, stderr = run_script(script_path, output_dir) |
|
if not ok: |
|
log_callback( |
|
f"⚠️ Pro-verified script failed to execute! Falling back to original Lite version. Error:\n{stderr or stdout}" |
|
) |
|
# Revert to the last known good code |
|
write_script(successful_code, script_path) |
|
else: |
|
log_callback("✅ Pro verification pass successful.") |
|
else: |
|
log_callback(f"✅ {MODEL_NAME_PRO} approved the script without changes.") |
|
elif created_bog_path and model_label == MODEL_NAME_PRO: |
|
log_callback( |
|
f"\n✅ Skipping final verification as {MODEL_NAME_PRO} already produced the successful script." |
|
) |
|
|
|
stats = { |
|
"gemini_calls": LLM_CALLS, |
|
"attempts": attempts, |
|
"total_input_tokens": LLM_INPUT_TOKENS, |
|
"total_output_tokens": LLM_OUTPUT_TOKENS, |
|
"total_tokens": LLM_INPUT_TOKENS + LLM_OUTPUT_TOKENS, |
|
} |
|
log_callback(f"\n—— Stats ——\n{stats}") |
|
|
|
if created_bog_path: |
|
return { |
|
"success": True, |
|
"bog_file_path": str(created_bog_path), |
|
"script_path": str(script_path), |
|
"stats": stats, |
|
} |
|
else: |
|
log_callback("❌ Failed to generate a working .bog file.") |
|
return { |
|
"success": False, |
|
"error": "Agent failed to generate a working BOG file after all attempts.", |
|
"last_error_log": last_err, |
|
"script_path": str(script_path), |
|
"stats": stats, |
|
} |
|
|
|
|
|
def run_correction_pipeline( |
|
description: str, |
|
previous_code: str, |
|
user_correction_notes: str, |
|
bog_file_name: str, |
|
output_dir: Path, |
|
log_callback: Optional[Callable[[str], None]] = None, |
|
) -> Dict: |
|
""" |
|
Runs a single-shot correction pass using the PRO model. |
|
""" |
|
log_callback = log_callback or print |
|
|
|
log_callback(f"Initializing Pro model for correction: {MODEL_NAME_PRO}") |
|
model_pro = init_gemini(MODEL_NAME_PRO) |
|
if not model_pro: |
|
return {"success": False, "error": "Pro model unavailable."} |
|
|
|
examples_blob = CACHED_EXAMPLES_BLOB |
|
script_path = output_dir / f"{bog_file_name}_script_corrected.py" |
|
|
|
# Combine the user's notes with a standard error message |
|
error_text = ( |
|
"The previous script did not fully meet the requirements.\n" |
|
"User Correction Notes:\n" |
|
f"{user_correction_notes}" |
|
) |
|
|
|
log_callback(f"--- Running Correction Attempt on '{bog_file_name}' ---") |
|
corrected_code = llm_fix_script( |
|
model_pro, |
|
description, |
|
previous_code, |
|
error_text, |
|
examples_blob, |
|
bog_file_name, |
|
log_callback, |
|
label=f"correction (using {MODEL_NAME_PRO})", |
|
) |
|
|
|
if not corrected_code.strip(): |
|
return {"success": False, "error": "Correction model returned empty code."} |
|
|
|
write_script(corrected_code, script_path) |
|
log_callback(f"Running corrected script: {script_path.name}") |
|
ok, stdout, stderr = run_script(script_path, output_dir) |
|
|
|
if ok and (output_dir / f"{bog_file_name}.bog").exists(): |
|
log_callback("✅ Correction successful.") |
|
return { |
|
"success": True, |
|
"bog_file_path": str(output_dir / f"{bog_file_name}.bog"), |
|
"script_path": str(script_path), |
|
} |
|
else: |
|
log_callback(f"❌ Corrected script failed to run. Error: {stderr or stdout}") |
|
return { |
|
"success": False, |
|
"error": "The corrected script also failed to execute.", |
|
"last_error_log": stderr or stdout, |
|
"script_path": str(script_path), |
|
} |
|
|
|
|
|
# ========= Standalone CLI Runner ========= |
|
def main() -> None: |
|
parser = argparse.ArgumentParser( |
|
description="Synthesize a Python builder from natural language, run it, and iterate on errors." |
|
) |
|
parser.add_argument( |
|
"--output_dir", |
|
default=str(DEFAULT_OUTPUT_DIR), |
|
help="Directory to store the final .bog file.", |
|
) |
|
parser.add_argument( |
|
"--max-iters", |
|
type=int, |
|
default=MAX_ITERS_DEFAULT, |
|
help=f"Max attempts (default {MAX_ITERS_DEFAULT}).", |
|
) |
|
args = parser.parse_args() |
|
|
|
print("Please describe the HVAC control system you wish to build.") |
|
try: |
|
description = input("Description: ").strip() |
|
except EOFError: |
|
description = "" |
|
if not description: |
|
print("No description provided. Exiting.") |
|
return |
|
|
|
print( |
|
"\nPlease give a name for the bog file to generate (e.g., 'central_plant_sequencing')." |
|
) |
|
try: |
|
bog_file_name = input("Bog File Name: ").strip() |
|
except EOFError: |
|
bog_file_name = "" |
|
if not bog_file_name: |
|
print("No bog file name provided. Exiting.") |
|
return |
|
|
|
result = run_agent_session( |
|
description=description, |
|
bog_file_name=bog_file_name, |
|
output_dir=Path(args.output_dir), |
|
max_iters=args.max_iters, |
|
) |
|
|
|
if result["success"]: |
|
print(f"\n✅ Generated .bog file at: {result['bog_file_path']}") |
|
else: |
|
print(f"\n❌ Error: {result['error']}") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |