Last active
October 26, 2025 16:28
-
-
Save mkeywood1/d446e027e4264dbb563f847edd49096e to your computer and use it in GitHub Desktop.
Agentic Context Engineering (ACE) - Example Framework
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from dataclasses import dataclass, field | |
| from typing import List, Dict, Any, Tuple | |
| import uuid | |
| import math | |
| import requests | |
| import json | |
| from pathlib import Path | |
| root = "personal/Agentic Context Engineering (ACE)/" | |
| ############################################ | |
| # Define Tools | |
| def tool_list_files_pathlib(directory_path): | |
| """ | |
| List all files in the specified directory using pathlib. | |
| Args: | |
| directory_path (str): Path to the directory | |
| Returns: | |
| list: List of filenames in the directory | |
| """ | |
| try: | |
| directory = Path(directory_path) | |
| # Check if the directory exists | |
| if not directory.is_dir(): | |
| return f"Error: '{directory_path}' is not a valid directory" | |
| # Get all files in the directory | |
| files = [file.name for file in directory.iterdir() if file.is_file()] | |
| return files | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def tool_read_file_pathlib(file_path): | |
| """ | |
| Opens a file using pathlib and returns its content. | |
| Args: | |
| file_path (str): Path to the file to be read | |
| Returns: | |
| str: Content of the file or error message | |
| """ | |
| try: | |
| file = Path(file_path) | |
| # Check if the file exists | |
| if not file.is_file(): | |
| return f"Error: '{file_path}' does not exist or is not a file" | |
| # Read the file content | |
| content = file.read_text(encoding='utf-8') | |
| return content | |
| except UnicodeDecodeError: | |
| return f"Error: '{file_path}' appears to be a binary file and cannot be displayed as text" | |
| except Exception as e: | |
| return f"Error reading file: {str(e)}" | |
| # Define tools in the format expected by Ollama API | |
| tools = [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "tool_list_files_pathlib", | |
| "description": "List files", | |
| "parameters": { | |
| "type": "object", | |
| "required": ["directory_path"], | |
| "properties": { | |
| "directory_path": {"type": "string", "description": "The directory path"} | |
| } | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "tool_read_file_pathlib", | |
| "description": "Read a file", | |
| "parameters": { | |
| "type": "object", | |
| "required": ["file_path"], | |
| "properties": { | |
| "file_path": {"type": "string", "description": "The file path"} | |
| } | |
| } | |
| } | |
| } | |
| ] | |
| ############################################ | |
| ############################################ | |
| # LLM Helper Functions | |
| def get_embedding(text): | |
| response = requests.post('http://localhost:11434/api/embeddings', | |
| json={ | |
| 'model': 'nomic-embed-text:latest', | |
| 'prompt': text | |
| }) | |
| return response.json()['embedding'] | |
| def call_llm_vanilla(system_prompt, prompt, model="qwen3:1.7b"): | |
| messages = [{'role': 'system', 'content': system_prompt}, | |
| {'role': 'user', 'content': prompt}] | |
| return call_llm(messages, model=model) | |
| def call_llm(messages, model="qwen3:1.7b"): | |
| """ | |
| Generate text using Ollama API | |
| Args: | |
| messages (dict): The input system and user messages | |
| model (str): The model to use (default: "qwen3:1.7b") | |
| Returns: | |
| {"content": "", "reasoning": "", "used_bullets": []} | |
| """ | |
| url = "http://localhost:11434/api/chat" | |
| available_functions = { | |
| 'tool_list_files_pathlib': tool_list_files_pathlib, | |
| 'tool_read_file_pathlib': tool_read_file_pathlib, | |
| } | |
| payload = { | |
| "model": model, | |
| "messages": messages, | |
| "tools": tools, | |
| "stream": False | |
| } | |
| response = requests.post(url, json=payload) | |
| response_data = response.json() | |
| # Extract the model's response | |
| model_response = response_data.get("message", {}) | |
| tool_calls = model_response.get("tool_calls", []) | |
| output = None | |
| if tool_calls: | |
| # There may be multiple tool calls in the response | |
| for tool in tool_calls: | |
| function_name = tool.get("function", {}).get("name") | |
| try: | |
| function_args = json.loads(tool.get("function", {}).get("arguments", "{}")) | |
| except: | |
| function_args = tool.get("function", {}).get("arguments", "{}") | |
| # Ensure the function is available, and then call it | |
| if function_to_call := available_functions.get(function_name): | |
| # print('Calling function:', function_name) | |
| # print('Arguments:', function_args) | |
| output = function_to_call(**function_args) | |
| # print('Function output:', output) | |
| else: | |
| print('Function', function_name, 'not found') | |
| # Only needed to chat with the model using the tool call results | |
| if tool_calls: | |
| # Add the function response to messages for the model to use | |
| messages.append(model_response) | |
| # Add the tool response | |
| messages.append({ | |
| 'role': 'tool', | |
| 'content': str(output), | |
| 'tool_call_id': tool_calls[-1].get("id"), | |
| 'name': function_name | |
| }) | |
| # Get final response from model with function outputs | |
| final_payload = { | |
| "model": "qwen3:1.7b", | |
| "messages": messages, | |
| "stream": False | |
| } | |
| final_response = requests.post(url, json=final_payload) | |
| final_response_data = final_response.json() | |
| response_str = final_response_data.get("message", {}).get("content", "") | |
| else: | |
| response_str = model_response.get("content", "") | |
| thought = response_str.split("</think>")[0][7:].strip() | |
| answer = response_str if thought == "" else response_str.split("</think>")[1].strip() | |
| return {"content": answer, "reasoning": thought} | |
| ############################################ | |
| @dataclass | |
| class Bullet: | |
| id: str | |
| text: str | |
| helpful: int = 0 | |
| harmful: int = 0 | |
| tags: List[str] = field(default_factory=list) | |
| # lightweight embedding cache for dedup and retrieval | |
| embedding: List[float] = field(default_factory=list) | |
| class Playbook: | |
| def __init__(self, embed_fn, sim_threshold=0.85): | |
| self.bullets: Dict[str, Bullet] = {} | |
| self.embed = embed_fn | |
| self.sim_threshold = sim_threshold | |
| def add_bullet(self, text, tags=None): | |
| b = Bullet(id=str(uuid.uuid4()), text=text, tags=tags or []) | |
| b.embedding = self.embed(b.text) | |
| self.bullets[b.id] = b | |
| return b.id | |
| def update_counts(self, bullet_ids_helpful=None, bullet_ids_harmful=None): | |
| for bid in (bullet_ids_helpful or []): | |
| if bid in self.bullets: | |
| self.bullets[bid].helpful += 1 | |
| for bid in (bullet_ids_harmful or []): | |
| if bid in self.bullets: | |
| self.bullets[bid].harmful += 1 | |
| def retrieve(self, query, k=20): | |
| q = self.embed(query) | |
| scored = [] | |
| for b in self.bullets.values(): | |
| if not b.embedding: | |
| b.embedding = self.embed(b.text) | |
| sim = cosine_sim(q, b.embedding) | |
| scored.append((sim, b)) | |
| scored.sort(key=lambda x: x[0], reverse=True) | |
| return [b for _, b in scored[:k]] | |
| def dedup(self): | |
| ids = list(self.bullets.keys()) | |
| keep = set() | |
| for i, a in enumerate(ids): | |
| if a not in self.bullets: | |
| continue | |
| keep.add(a) | |
| ea = self.bullets[a].embedding or self.embed(self.bullets[a].text) | |
| for j in range(i+1, len(ids)): | |
| b = ids[j] | |
| if b not in self.bullets: | |
| continue | |
| eb = self.bullets[b].embedding or self.embed(self.bullets[b].text) | |
| if cosine_sim(ea, eb) >= self.sim_threshold: | |
| # merge counts and keep the longer / more specific text | |
| self.bullets[a].helpful += self.bullets[b].helpful | |
| self.bullets[a].harmful += self.bullets[b].harmful | |
| if len(self.bullets[b].text) > len(self.bullets[a].text): | |
| self.bullets[a].text = self.bullets[b].text | |
| self.bullets[a].embedding = eb | |
| del self.bullets[b] | |
| def cosine_sim(a, b): | |
| dot = sum(x*y for x, y in zip(a, b)) | |
| na = math.sqrt(sum(x*x for x in a)) + 1e-8 | |
| nb = math.sqrt(sum(y*y for y in b)) + 1e-8 | |
| return dot / (na * nb) | |
| def execute_tools_and_score(answer, task): | |
| # print("answer", answer) | |
| # print("task", task) | |
| prompt ='Does the following original answer correctly address the original task?\n' | |
| prompt += 'Original Answer: "' + answer + '".\n' | |
| prompt += 'Original Task: "' + task + '".\n\n' | |
| prompt += 'Identify SUCCESS, a true or false boolean indicating if the task was successfully achieved.\n' | |
| prompt += 'Identify ERROR, a succinct description of the error identified in the Original Answer leading to SUCCESS being false.\n' | |
| prompt += 'Respond with a JSON object, for example: {"success": SUCCESS, "errors": ERROR}' | |
| # print("prompt", prompt) | |
| check_answer = call_llm_vanilla("", prompt) | |
| # print() | |
| # print(check_answer) | |
| # print() | |
| return check_answer | |
| def generator(task, playbook: Playbook, k=20, m=12): | |
| retrieved = playbook.retrieve(task, k=k) | |
| # keep top-m to fit budget and avoid overload | |
| context_bullets = retrieved[:m] | |
| sys_prompt = "You are an expert agent. Use the playbook bullets when helpful. Do not hallucinate tools." | |
| context_text = "\n".join(f"- {b.text}" for b in context_bullets) | |
| messages = [ | |
| {"role": "system", "content": sys_prompt + "\nPlaybook:\n" + context_text}, | |
| {"role": "user", "content": task} | |
| ] | |
| # print("-----") | |
| # print("messages:", messages) | |
| # print("-----") | |
| out = call_llm(messages) | |
| # print("-----") | |
| # print(out) | |
| # print("-----") | |
| used_ids = [b.id for b in context_bullets] # keep it simple; you can log finer-grained usage | |
| exec_feedback = execute_tools_and_score(out["content"], task) | |
| return { | |
| "answer": out["content"], | |
| "trace": out.get("reasoning", ""), | |
| "used_bullet_ids": used_ids, | |
| "feedback": exec_feedback["content"] | |
| } | |
| def reflector(run): | |
| lessons = [] | |
| feedback = json.loads(run["feedback"]) | |
| if not feedback["success"]: | |
| # Example logic of known problem solviong approach. | |
| # In reality we could get these lessons generated through LLM as well, to solve the reason it errored. | |
| if "directory" in feedback["errors"]: | |
| lessons.append(f"Always ensure the the root path starts with C:/Users/<your_username>/Desktop/") | |
| # de-duplicate obvious repeats and return as delta bullets | |
| uniq = [] | |
| seen = set() | |
| for l in lessons: | |
| key = l.strip().lower() | |
| if key not in seen: | |
| seen.add(key) | |
| uniq.append(l) | |
| return [{"text": l, "tags": ["strategy"]} for l in uniq] | |
| def curator(playbook: Playbook, run, delta_bullets): | |
| # Update helpful/harmful counts for bullets the generator actually used | |
| feedback = json.loads(run["feedback"]) | |
| if feedback["success"]: | |
| playbook.update_counts(bullet_ids_helpful=run["used_bullet_ids"]) | |
| else: | |
| playbook.update_counts(bullet_ids_harmful=run["used_bullet_ids"]) | |
| # Integrate new lessons as itemized updates | |
| for d in delta_bullets: | |
| playbook.add_bullet(d["text"], tags=d.get("tags")) | |
| # Lazy refine only if we exceed budget (or every N updates) | |
| if len(playbook.bullets) % 50 == 0: | |
| playbook.dedup() | |
| # Example usage | |
| if __name__ == "__main__": | |
| playbook = Playbook(get_embedding) | |
| for _ in range(3): | |
| run = generator("list me the files in the directory " + root, playbook) | |
| print("run:", run) | |
| deltas = reflector(run) | |
| print("deltas:", deltas) | |
| curator(playbook, run, deltas) | |
| # print("playbook bullets", playbook.bullets) | |
| feedback = json.loads(run["feedback"]) | |
| if feedback["success"]: | |
| print("Final Answer:", run["answer"]) | |
| break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment