Skip to content

Instantly share code, notes, and snippets.

@bbartling
Last active October 23, 2025 12:47
Show Gist options
  • Select an option

  • Save bbartling/d210e924a2afa0e17415b1d4fc296d78 to your computer and use it in GitHub Desktop.

Select an option

Save bbartling/d210e924a2afa0e17415b1d4fc296d78 to your computer and use it in GitHub Desktop.
AI Agent Acitivities
from __future__ import annotations
import argparse
import os
import re
import requests
import subprocess
import sys
from pathlib import Path
from typing import List, Tuple, Optional, Callable, Dict
import google.generativeai as genai
# Retrieve the API key from the environment. We avoid importing
# ``app.py`` from the Flask implementation because the project may be
# running under Django instead. If the ``GOOGLE_API_KEY`` environment
# variable is not set, API calls to Gemini will fail gracefully.
API_KEY = os.getenv("GOOGLE_API_KEY")
# ========= Config =========
DEFAULT_OUTPUT_DIR = Path(r"/mnt/c/Users/ben/Niagara4.13/JENEsys")
# --- MODIFICATION: Tiered Model Configuration ---
# Use a fast, lightweight model for initial generation and standard fixes.
MODEL_NAME_LITE = "gemini-2.5-flash"
# Use a powerful, advanced model for a single, high-quality fix attempt.
MODEL_NAME_PRO = "gemini-2.5-pro"
# The total number of attempts the agent will make.
MAX_ITERS_DEFAULT = 5
# The specific attempt number on which to escalate to the PRO model.
PRO_ESCALATION_ATTEMPT = (
3 # Lite-First: Try with Lite model first, escalate on 3rd attempt
)
# --- End MODIFICATION ---
# Global metrics, reset for each session
LLM_CALLS = 0
LLM_INPUT_TOKENS = 0
LLM_OUTPUT_TOKENS = 0
# Toggle remote fetch via env: set DISABLE_REMOTE_CONTEXT=1 to force local-only
DISABLE_REMOTE = os.getenv("DISABLE_REMOTE_CONTEXT", "").lower() in ("1", "true", "yes")
# Optional: override the branch/path via env if you ever need to
REMOTE_BASE = os.getenv(
"REMOTE_CONTEXT_BASE",
"https://raw.githubusercontent.com/bbartling/pybog/develop/context",
)
REMOTE_MAP = {
"kitControlIntrol.txt": f"{REMOTE_BASE}/kitControlIntrol.txt",
"llms.txt": f"{REMOTE_BASE}/llms.txt",
"llms-full.txt": f"{REMOTE_BASE}/llms-full.txt",
}
def _safe_read(path_obj: Path) -> str:
try:
return path_obj.read_text(encoding="utf-8", errors="ignore")
except Exception:
return ""
def _fetch_remote(url: str, timeout_s: float = 5.0) -> str | None:
try:
r = requests.get(
url,
headers={"User-Agent": "BogMaker/1.0 (+https://example.local)"},
timeout=timeout_s,
)
if r.status_code == 200 and r.text:
print(f" - Remote fetch SUCCESS)")
return r.text
print(f" - Remote fetch non-200 or empty: {url} (code={r.status_code})")
except Exception as e:
print(f" - Remote fetch failed: {url} ({e})")
return None
def _load_and_cache_examples() -> str:
"""
Loads key context files in a fixed order with remote-first fallback.
"""
print("📦 Starting context caching (remote-first, local fallback)...")
try:
script_dir = Path(__file__).resolve().parent
context_dir = script_dir / "context"
if not context_dir.is_dir():
print(f"⚠️ 'context' directory not found at {context_dir}")
except NameError:
print("⚠️ Could not determine script path. Using CWD instead.")
context_dir = Path.cwd() / "context"
files_to_load = ["kitControlIntrol.txt", "llms.txt", "llms-full.txt"]
chunks, loaded_files, missing_files = [], [], []
print(f"🔎 Searching for files in {context_dir}...\n")
for idx, filename in enumerate(files_to_load, start=1):
print(f"[{idx}/{len(files_to_load)}] Loading '{filename}'...")
text = None
if not DISABLE_REMOTE:
remote_url = REMOTE_MAP.get(filename)
if remote_url:
print(f" 🌐 Trying remote → {remote_url}")
text = _fetch_remote(remote_url)
if not text:
file_path = context_dir / filename
if file_path.is_file():
print(f" 📂 Using local copy at {file_path}")
text = _safe_read(file_path)
else:
print(f" ❌ Local file not found: {file_path}")
if text:
chunks.append(f"\n=== FILE: {filename} ===\n{text}\n=== END FILE ===\n")
loaded_files.append(filename)
print(f" ✅ Successfully loaded '{filename}'\n")
else:
missing_files.append(filename)
print(f" ⚠️ Skipped '{filename}' (not found remotely or locally)\n")
print("📊 Context file load summary:")
print(f" ✅ Loaded: {', '.join(loaded_files) if loaded_files else 'None'}")
print(f" ❌ Missing: {', '.join(missing_files) if missing_files else 'None'}")
return "".join(chunks)
CACHED_EXAMPLES_BLOB: str = _load_and_cache_examples()
# =============================================================
def init_gemini(model_name: str) -> Optional[genai.GenerativeModel]:
api_key = API_KEY
if not api_key:
print("❌ GOOGLE_API_KEY environment variable not set.")
return None
try:
genai.configure(api_key=api_key)
return genai.GenerativeModel(model_name)
except Exception as e:
print(f"❌ Failed to configure Gemini model '{model_name}': {e}")
return None
# ========= LLM prompts & helpers =========
def _extract_python_code(text: str) -> str:
if not text:
return ""
t = text.strip()
fence = re.search(r"```(?:python)?\s*(.*?)```", t, flags=re.S)
if fence:
return fence.group(1).strip()
return t
# ========= Intake Helpers =========
def build_intake_system_prompt() -> str:
return (
"You are a Niagara .bog builder assistant. Reflect the user's HVAC request "
"back in 1 short paragraph. Then ask exactly two things: "
"1) Which kitControl widgets should be used (offer common ones), "
"2) Any specific variable names and default values. "
"Be concise. Do not start building code yet."
)
def build_intake_user_prompt(user_freeform: str, kitcontrol_catalog_text: str) -> str:
return (
"User request:\n"
f"{user_freeform}\n\n"
"Reference kitControl catalog (not all tested):\n"
f"{kitcontrol_catalog_text}\n\n"
"Follow Niagara naming rules and ms strings for time.\n"
"Reply with:\n"
"- Acknowledge and restate the intended .bog\n"
"- Ask: which kitControl widgets do you want?\n"
"- Ask: any variable names and default values?\n"
)
def consolidate_final_prompt(initial_user_prompt: str, intake_user_reply: str) -> str:
return (
"Final build intent:\n"
f"{initial_user_prompt}\n\n"
"User intake answers:\n"
f"{intake_user_reply}\n\n"
"Please generate exactly one Python builder script that uses BogFolderBuilder "
"and the requested kitControl widgets, mirroring proven examples. "
"Honor naming rules and link semantics. Use tested blocks where possible."
)
def run_pipeline_with_intake(
initial_user_prompt: str,
kitcontrol_catalog_text: str,
log_callback: Optional[Callable[[str], None]] = None,
max_iters: int = MAX_ITERS_DEFAULT,
) -> Dict[str, object]:
"""
Execute the intake round using the LITE model.
"""
log_callback = log_callback or print
model = init_gemini(MODEL_NAME_LITE)
if model is None:
msg = (
f"Gemini Lite model ({MODEL_NAME_LITE}) unavailable. Check GOOGLE_API_KEY."
)
log_callback(f"❌ {msg}")
return {"error": msg}
system_prompt = build_intake_system_prompt()
user_prompt = build_intake_user_prompt(initial_user_prompt, kitcontrol_catalog_text)
full_prompt = f"{system_prompt}\n\n{user_prompt}"
global LLM_CALLS
LLM_CALLS += 1
try:
resp = model.generate_content(full_prompt)
_record_and_print_usage(
resp, label=f"intake ({MODEL_NAME_LITE})", log_callback=log_callback
)
content = (resp.text or "").strip()
except Exception as e:
log_callback(f"❌ Intake LLM call failed: {e}")
return {"error": str(e)}
return {
"intake_message_for_user": content,
"max_iters": max_iters,
}
def continue_pipeline_after_intake(
initial_user_prompt: str,
intake_user_reply: str,
bog_file_name: str,
output_dir: Path,
log_callback: Optional[Callable[[str], None]] = None,
max_iters: int = MAX_ITERS_DEFAULT,
) -> Dict:
final_prompt = consolidate_final_prompt(initial_user_prompt, intake_user_reply)
log_callback = log_callback or print
return run_agent_session(
description=final_prompt,
bog_file_name=bog_file_name,
output_dir=output_dir,
max_iters=max_iters,
log_callback=log_callback,
)
def _record_and_print_usage(
resp, label: str, log_callback: Callable[[str], None]
) -> None:
global LLM_INPUT_TOKENS, LLM_OUTPUT_TOKENS
meta = getattr(resp, "usage_metadata", None)
in_tok = int(getattr(meta, "prompt_token_count", 0) or 0) if meta else 0
out_tok = int(getattr(meta, "candidates_token_count", 0) or 0) if meta else 0
tot_tok = (
int(getattr(meta, "total_token_count", in_tok + out_tok) or (in_tok + out_tok))
if meta
else (in_tok + out_tok)
)
LLM_INPUT_TOKENS += in_tok
LLM_OUTPUT_TOKENS += out_tok
log_line = f"[TOKENS] {label} prompt={in_tok} output={out_tok} total={tot_tok}"
log_callback(log_line)
def llm_generate_script(
model: genai.GenerativeModel,
description: str,
examples_blob: str,
bog_file_name: str,
log_callback: Callable[[str], None],
label: str = "generate",
) -> str:
global LLM_CALLS
LLM_CALLS += 1
prompt = f"""
You are an expert Niagara / HVAC controls code generator to assist in the human HVAC controls engineer on the Niagara4 platform.
Your task is to produce a single, complete, runnable Python script using the BogFolderBuilder API.
**Requirements for the Python script:**
- It must import `BogFolderBuilder` from `bog_builder`.
- It must define a `main()` function.
- It must save exactly one `.bog` file named '{bog_file_name}.bog' TO THE CURRENT WORKING DIRECTORY.
- It must print the absolute path to the created `.bog` file upon success.
- The script should be self-contained and use only the Python standard library besides `bog_builder`.
- The script MUST NOT accept any command-line arguments for the output path.
**User's Description:**
{description}
**Reference Examples (BogFolderBuilder API usage):**
{examples_blob}
"""
resp = model.generate_content(prompt)
_record_and_print_usage(resp, label=label, log_callback=log_callback)
return _extract_python_code((resp.text or "").strip())
def llm_fix_script(
model,
description,
prev_code,
error_text,
examples_blob,
bog_file_name,
log_callback,
label="fix",
) -> str:
global LLM_CALLS
LLM_CALLS += 1
prompt = f"""
The previous Python script you generated for the description "{description}" failed with:
---
{error_text}
---
Please return a complete, corrected script. It must:
- import BogFolderBuilder from bog_builder only (no extra third-party imports)
- save '{bog_file_name}.bog' to the CURRENT WORKING DIRECTORY
- not accept any CLI args
# API Reference (excerpt)
{examples_blob}
"""
resp = model.generate_content(prompt)
_record_and_print_usage(resp, label=label, log_callback=log_callback)
return _extract_python_code((resp.text or "").strip())
def llm_verify_and_refine_script(
model: genai.GenerativeModel,
description: str,
code_to_review: str,
bog_file_name: str,
log_callback: Callable[[str], None],
label: str = "verify",
) -> str:
"""Asks a powerful model to review and refine a working script."""
global LLM_CALLS
LLM_CALLS += 1
prompt = f"""
You are an expert Niagara / HVAC controls code generator acting as a quality assurance specialist.
A junior script was generated to fulfill the user's request. Your task is to review it and provide a final, production-ready version.
**User's Original Description:**
{description}
**Working Script to Review:**
```python
{code_to_review}
```
**Requirements:**
- Review the script for any bugs, inefficiencies, or deviations from best practices.
- Ensure it correctly saves a file named '{bog_file_name}.bog' to the current directory.
- If the script is already excellent and correct, return it exactly as is, without any extra text or explanation.
- If you find areas for improvement, return a single, complete, corrected Python script inside a ` python ... ` block.
"""
resp = model.generate_content(prompt)
_record_and_print_usage(resp, label=label, log_callback=log_callback)
return _extract_python_code((resp.text or "").strip())
# ========= Script Execution =========
def write_script(code: str, path: Path) -> None:
path.write_text(code, encoding="utf-8")
try:
path.chmod(0o755)
except Exception:
pass
def run_script(
script_path: Path, out_dir: Path, timeout_sec: int = 120
) -> Tuple[bool, str, str]:
cmd = [sys.executable, str(script_path)]
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace",
cwd=out_dir,
)
try:
stdout, stderr = proc.communicate(timeout=timeout_sec)
except subprocess.TimeoutExpired:
proc.kill()
stdout, stderr = proc.communicate()
stderr += "\n[TimeoutExpired]"
ok = proc.returncode == 0
return ok, stdout, stderr
# ========= Core Agent Logic (Callable) =========
def run_agent_session(
description: str,
bog_file_name: str,
output_dir: Path,
max_iters: int = MAX_ITERS_DEFAULT,
log_callback: Optional[Callable[[str], None]] = None,
) -> Dict:
"""
Runs the full agent process with tiered model logic and a final verification step.
"""
log_callback = log_callback or print
global LLM_CALLS, LLM_INPUT_TOKENS, LLM_OUTPUT_TOKENS
LLM_CALLS, LLM_INPUT_TOKENS, LLM_OUTPUT_TOKENS = 0, 0, 0
# --- Initialize both models at the start of the session ---
log_callback(f"Initializing Lite model: {MODEL_NAME_LITE}")
model_lite = init_gemini(MODEL_NAME_LITE)
log_callback(f"Initializing Pro model: {MODEL_NAME_PRO}")
model_pro = init_gemini(MODEL_NAME_PRO)
if not model_lite or not model_pro:
msg = "Gemini unavailable. Could not initialize one or both models. Check GOOGLE_API_KEY."
log_callback(f"❌ {msg}")
return {"success": False, "error": msg}
log_callback("Using cached examples for context...")
examples_blob = CACHED_EXAMPLES_BLOB
output_dir.mkdir(parents=True, exist_ok=True)
script_path = output_dir / f"{bog_file_name}_script.py"
created_bog_path: Optional[Path] = None
attempts = 0
last_code = ""
last_err = ""
# Initialize model_label here to have it available after the loop
model_label = ""
while attempts < max_iters:
attempts += 1
log_callback(f"\n--- Attempt {attempts}/{max_iters} ---")
# --- Tiered model selection logic ---
if attempts == PRO_ESCALATION_ATTEMPT:
log_callback(
f"⚠️ Escalating to {MODEL_NAME_PRO} for a high-quality fix attempt..."
)
current_model = model_pro
model_label = MODEL_NAME_PRO
else:
current_model = model_lite
model_label = MODEL_NAME_LITE
if attempts == 1:
code = llm_generate_script(
current_model,
description,
examples_blob,
bog_file_name,
log_callback,
label=f"generate (attempt {attempts} using {model_label})",
)
else:
code = llm_fix_script(
current_model,
description,
last_code,
last_err,
examples_blob,
bog_file_name,
log_callback,
label=f"fix (attempt {attempts} using {model_label})",
)
if not code.strip():
log_callback("❌ LLM returned empty code.")
last_err = "LLM returned empty code."
continue
write_script(code, script_path)
log_callback(f"Running generated script: {script_path.name}")
ok, stdout, stderr = run_script(script_path, output_dir)
last_code, last_err = code, (stderr or stdout or "")
if stdout:
log_callback(f"SCRIPT STDOUT:\n{stdout}")
if stderr:
log_callback(f"SCRIPT STDERR:\n{stderr}")
candidate = output_dir / f"{bog_file_name}.bog"
if ok and candidate.exists():
created_bog_path = candidate
log_callback(
f"✅ Script executed successfully and created {candidate.name}"
)
break
else:
log_callback("❌ Run failed. Preparing to fix...")
# Only run verification if the Pro model wasn't the one that produced the successful script.
if created_bog_path and model_label != MODEL_NAME_PRO:
log_callback("\n--- Final Verification Pass ---")
log_callback(
f"✅ Lite model succeeded. Submitting to {MODEL_NAME_PRO} for final review."
)
successful_code = script_path.read_text(encoding="utf-8")
verified_code = llm_verify_and_refine_script(
model=model_pro,
description=description,
code_to_review=successful_code,
bog_file_name=bog_file_name,
log_callback=log_callback,
label=f"verify (using {MODEL_NAME_PRO})",
)
if verified_code and verified_code.strip() and verified_code != successful_code:
log_callback(
f"✅ {MODEL_NAME_PRO} provided refinements. Running final verified script."
)
write_script(verified_code, script_path)
# Re-run the script with the Pro model's changes
ok, stdout, stderr = run_script(script_path, output_dir)
if not ok:
log_callback(
f"⚠️ Pro-verified script failed to execute! Falling back to original Lite version. Error:\n{stderr or stdout}"
)
# Revert to the last known good code
write_script(successful_code, script_path)
else:
log_callback("✅ Pro verification pass successful.")
else:
log_callback(f"✅ {MODEL_NAME_PRO} approved the script without changes.")
elif created_bog_path and model_label == MODEL_NAME_PRO:
log_callback(
f"\n✅ Skipping final verification as {MODEL_NAME_PRO} already produced the successful script."
)
stats = {
"gemini_calls": LLM_CALLS,
"attempts": attempts,
"total_input_tokens": LLM_INPUT_TOKENS,
"total_output_tokens": LLM_OUTPUT_TOKENS,
"total_tokens": LLM_INPUT_TOKENS + LLM_OUTPUT_TOKENS,
}
log_callback(f"\n—— Stats ——\n{stats}")
if created_bog_path:
return {
"success": True,
"bog_file_path": str(created_bog_path),
"script_path": str(script_path),
"stats": stats,
}
else:
log_callback("❌ Failed to generate a working .bog file.")
return {
"success": False,
"error": "Agent failed to generate a working BOG file after all attempts.",
"last_error_log": last_err,
"script_path": str(script_path),
"stats": stats,
}
def run_correction_pipeline(
description: str,
previous_code: str,
user_correction_notes: str,
bog_file_name: str,
output_dir: Path,
log_callback: Optional[Callable[[str], None]] = None,
) -> Dict:
"""
Runs a single-shot correction pass using the PRO model.
"""
log_callback = log_callback or print
log_callback(f"Initializing Pro model for correction: {MODEL_NAME_PRO}")
model_pro = init_gemini(MODEL_NAME_PRO)
if not model_pro:
return {"success": False, "error": "Pro model unavailable."}
examples_blob = CACHED_EXAMPLES_BLOB
script_path = output_dir / f"{bog_file_name}_script_corrected.py"
# Combine the user's notes with a standard error message
error_text = (
"The previous script did not fully meet the requirements.\n"
"User Correction Notes:\n"
f"{user_correction_notes}"
)
log_callback(f"--- Running Correction Attempt on '{bog_file_name}' ---")
corrected_code = llm_fix_script(
model_pro,
description,
previous_code,
error_text,
examples_blob,
bog_file_name,
log_callback,
label=f"correction (using {MODEL_NAME_PRO})",
)
if not corrected_code.strip():
return {"success": False, "error": "Correction model returned empty code."}
write_script(corrected_code, script_path)
log_callback(f"Running corrected script: {script_path.name}")
ok, stdout, stderr = run_script(script_path, output_dir)
if ok and (output_dir / f"{bog_file_name}.bog").exists():
log_callback("✅ Correction successful.")
return {
"success": True,
"bog_file_path": str(output_dir / f"{bog_file_name}.bog"),
"script_path": str(script_path),
}
else:
log_callback(f"❌ Corrected script failed to run. Error: {stderr or stdout}")
return {
"success": False,
"error": "The corrected script also failed to execute.",
"last_error_log": stderr or stdout,
"script_path": str(script_path),
}
# ========= Standalone CLI Runner =========
def main() -> None:
parser = argparse.ArgumentParser(
description="Synthesize a Python builder from natural language, run it, and iterate on errors."
)
parser.add_argument(
"--output_dir",
default=str(DEFAULT_OUTPUT_DIR),
help="Directory to store the final .bog file.",
)
parser.add_argument(
"--max-iters",
type=int,
default=MAX_ITERS_DEFAULT,
help=f"Max attempts (default {MAX_ITERS_DEFAULT}).",
)
args = parser.parse_args()
print("Please describe the HVAC control system you wish to build.")
try:
description = input("Description: ").strip()
except EOFError:
description = ""
if not description:
print("No description provided. Exiting.")
return
print(
"\nPlease give a name for the bog file to generate (e.g., 'central_plant_sequencing')."
)
try:
bog_file_name = input("Bog File Name: ").strip()
except EOFError:
bog_file_name = ""
if not bog_file_name:
print("No bog file name provided. Exiting.")
return
result = run_agent_session(
description=description,
bog_file_name=bog_file_name,
output_dir=Path(args.output_dir),
max_iters=args.max_iters,
)
if result["success"]:
print(f"\n✅ Generated .bog file at: {result['bog_file_path']}")
else:
print(f"\n❌ Error: {result['error']}")
if __name__ == "__main__":
main()

Prompt Chaining Activity Diagram

  • Note - It is very experimental but working and subject to change once better methods can be created. TODO research MCP server to burn less tokens currently it uses LOTS of tokens in the context files sent to LLM service.
flowchart TD
  start([Start CLI]) --> askDesc[Prompt description]
  askDesc --> askName[Prompt bog file name]
  askName --> loadCtx[Load context files]
  loadCtx --> attempt{{Attempt == 1?}}

  attempt -- Yes --> gen[LLM generate script]
  attempt -- No  --> fix[LLM fix script - prev code + traceback]

  gen --> write[Write script to temp folder]
  fix --> write

  write --> run[Run script with -o output dir]
  run --> success{Run ok AND file created?}

  success -- Yes --> done[Print success\nPrint stats\nExit]
  success -- No  --> cap[Capture stderr tail as traceback]
  cap --> retry{Attempt < max iters?}

  retry -- Yes --> incr[Increment attempt\nStore code and error]
  incr --> attempt

  retry -- No --> fail[Print failure\nPrint stats\nExit]
Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment