Created
January 6, 2026 21:34
-
-
Save jmlon/1efbe0eeb7d89d639e8759746abe30cc to your computer and use it in GitHub Desktop.
Minimalist Code Assistant with Human-in-the-Loop. A code assistant demonstrating the usage of the LangGraph v0.3 Functional API with human-in-the-loop workflows
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Minimalist Code Assistant with Human-in-the-Loop | |
| # LangGraph v0.3 Functional API | |
| # ============================================================================ | |
| # INSTALLATION & SETUP INSTRUCTIONS | |
| # ============================================================================ | |
| # | |
| # 1. Initialize UV project (if not already done): | |
| # uv init | |
| # | |
| # 2. Install required packages: | |
| # uv add langgraph langchain langchain-core python-dotenv | |
| # | |
| # And install your preferred model provider: | |
| # uv add langchain-groq # For Groq | |
| # uv add langchain-google-genai # For Google Gemini | |
| # uv add langchain-openai # For OpenAI | |
| # uv add langchain-anthropic # For Anthropic Claude | |
| # uv add langchain-mistralai # For Mistral AI | |
| # | |
| # 3. Create a .env file in the same directory (see .env-example): | |
| # | |
| # # Choose your model provider (uncomment one): | |
| # MODEL_PROVIDER=google_genai | |
| # # MODEL_PROVIDER=groq | |
| # # MODEL_PROVIDER=openai | |
| # # MODEL_PROVIDER=anthropic | |
| # # MODEL_PROVIDER=mistralai | |
| # # MODEL_PROVIDER=ollama | |
| # | |
| # # Add the corresponding API key and model for your provider: | |
| # | |
| # # For Groq: | |
| # GROQ_API_KEY=gsk_... | |
| # GROQ_MODEL=llama-3.3-70b-versatile | |
| # | |
| # # For Google Gemini: | |
| # GOOGLE_API_KEY=... | |
| # GEMINI_MODEL=gemini-2.0-flash-exp | |
| # | |
| # # For OpenAI: | |
| # OPENAI_API_KEY=sk-proj-... | |
| # OPENAI_MODEL=gpt-4o-mini | |
| # | |
| # # For Anthropic: | |
| # ANTHROPIC_API_KEY=sk-ant-... | |
| # ANTHROPIC_MODEL=claude-3-5-sonnet-20241022 | |
| # | |
| # # For Mistral: | |
| # MISTRAL_API_KEY=... | |
| # MISTRAL_MODEL=mistral-large-latest | |
| # | |
| # 4. Run the assistant: | |
| # uv run minimalistCodeAssistant.py | |
| # | |
| # ============================================================================ | |
| # USAGE | |
| # ============================================================================ | |
| # | |
| # - Reference files with @filename (e.g., '@script.py') | |
| # - Type '/quit' or '/exit' to exit | |
| # - Type '/loglevel <level>' to change logging (debug/info/warning/error) | |
| # - When prompted, type 'yes' to apply changes, 'no' to reject, or 'refine' to modify | |
| # | |
| # Example: | |
| # Your request: Help me complete the code in @example.py | |
| # Your request: Create a function to calculate fibonacci numbers | |
| # Your request: /loglevel debug | |
| # | |
| # ============================================================================ | |
| import time | |
| import os | |
| import re | |
| import uuid | |
| import logging | |
| from pathlib import Path | |
| from langchain.chat_models import init_chat_model | |
| from langgraph.checkpoint.memory import InMemorySaver | |
| from langgraph.func import entrypoint, task | |
| from langgraph.types import Command, interrupt | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.WARNING, # Default: only warnings and errors | |
| format='%(levelname)s: %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| llm = init_chat_model(os.getenv("GEMINI_MODEL"), model_provider=os.getenv("MODEL_PROVIDER")) | |
| @task | |
| def read_file(filename: str) -> str: | |
| """Read file contents.""" | |
| try: | |
| logger.debug(f"Reading file: {filename}") | |
| with open(filename, "r", encoding="utf-8") as f: | |
| code = f.read() | |
| logger.debug(f"Successfully read {len(code)} characters from {filename}") | |
| return code | |
| except FileNotFoundError: | |
| logger.error(f"File not found: {filename}") | |
| return f"[File {filename} not found]" | |
| except Exception as e: | |
| logger.error(f"Error reading {filename}: {str(e)}") | |
| return f"[Error reading {filename}: {str(e)}]" | |
| @task | |
| def write_file(filename: str, code: str): | |
| """Write code to file.""" | |
| try: | |
| logger.debug(f"Writing to file: {filename}") | |
| # Create directory if it doesn't exist | |
| Path(filename).parent.mkdir(parents=True, exist_ok=True) | |
| with open(filename, "w", encoding="utf-8") as f: | |
| f.write(code) | |
| logger.info(f"Successfully wrote to {filename}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error writing to {filename}: {str(e)}") | |
| return False | |
| def extract_file_references(prompt: str) -> list[str]: | |
| """ | |
| Extract file references from prompt. | |
| Files are referenced as @filename or @path/to/file.py | |
| """ | |
| # Match @filename or @path/to/file.ext | |
| pattern = r'@([\w\-./]+\.?\w*)' | |
| matches = re.findall(pattern, prompt) | |
| return list(set(matches)) # Remove duplicates | |
| def parse_code_changes(llm_response: str) -> dict[str, str]: | |
| """ | |
| Parse the LLM response to extract file changes. | |
| Expected formats: | |
| 1. **filename** followed by ```language\ncode``` | |
| 2. ```filename\ncode``` | |
| 3. ```language\nfilename\ncode``` (filename on first line inside code block) | |
| """ | |
| changes = {} | |
| # Pattern 1: Look for **filename** markdown bold followed by code block | |
| # This handles: **example.py**\n```python\ncode\n``` | |
| pattern1 = r'\*\*([^\*]+\.\w+)\*\*\s*```(?:\w+)?\n(.*?)```' | |
| matches1 = re.findall(pattern1, llm_response, re.DOTALL) | |
| for filename, code in matches1: | |
| changes[filename.strip()] = code.strip() | |
| # Pattern 2: Look for code blocks with filename (not language) as identifier | |
| # This handles: ```filename.py\ncode\n``` | |
| pattern2 = r'```([^\s]+\.\w+)\n(.*?)```' | |
| matches2 = re.findall(pattern2, llm_response, re.DOTALL) | |
| for filename, code in matches2: | |
| # Skip if it's a common language identifier | |
| if filename.lower() not in ['python', 'javascript', 'java', 'cpp', 'c', 'rust', 'go', 'typescript', 'js', 'ts', 'jsx', 'tsx']: | |
| changes[filename.strip()] = code.strip() | |
| # Pattern 3: Look for code blocks with filename on the first line | |
| # This handles: ```python\nfilename.py\ncode\n``` | |
| pattern3 = r'```(?:python|javascript|java|cpp|c|rust|go|typescript|js|ts|jsx|tsx)?\n([^\n]+\.\w+)\n(.*?)```' | |
| matches3 = re.findall(pattern3, llm_response, re.DOTALL) | |
| for filename, code in matches3: | |
| # Only add if filename looks like a valid filename (not code) | |
| filename_stripped = filename.strip() | |
| # Check if it's a simple filename (no spaces, no special chars except .-_/) | |
| if re.match(r'^[\w\-./]+\.\w+$', filename_stripped): | |
| changes[filename_stripped] = code.strip() | |
| return changes | |
| @task | |
| def assist(prompt: str) -> dict: | |
| """Attend to the user assistance request.""" | |
| logger.debug(f"Processing assistance request: {prompt[:100]}...") | |
| # Extract the problem (remove @file references for cleaner problem statement) | |
| problem = re.sub(r'@[\w\-./]+', '', prompt).strip() | |
| # Extract list of programs referenced in the prompt | |
| referenced_files = extract_file_references(prompt) | |
| if referenced_files: | |
| logger.debug(f"Referenced files: {referenced_files}") | |
| # Read the content of referenced files | |
| file_contents = {} | |
| for filename in referenced_files: | |
| content = read_file(filename).result() | |
| file_contents[filename] = content | |
| # Build context with file contents | |
| files_context = "" | |
| if file_contents: | |
| files_context = "\n\nReferenced files:\n" | |
| for filename, content in file_contents.items(): | |
| files_context += f"\n--- {filename} ---\n{content}\n" | |
| # Create the prompt for the LLM | |
| prompt_template = PromptTemplate.from_template("""You are a helpful code assistant. | |
| User request: {problem} | |
| {files_context} | |
| IMPORTANT: When providing code, use EXACTLY this format: | |
| **filename.ext** | |
| ```python | |
| code here | |
| ``` | |
| Example: | |
| **calculator.py** | |
| ```python | |
| def add(a, b): | |
| return a + b | |
| ``` | |
| Be concise and provide working code.""") | |
| logger.debug("Calling LLM for assistance...") | |
| chain = prompt_template | llm | StrOutputParser() | |
| result = chain.invoke({ | |
| "problem": problem, | |
| "files_context": files_context | |
| }) | |
| logger.debug(f"LLM response received ({len(result)} characters)") | |
| # Parse code changes from response | |
| code_changes = parse_code_changes(result) | |
| if code_changes: | |
| logger.debug(f"Parsed {len(code_changes)} file change(s): {list(code_changes.keys())}") | |
| else: | |
| logger.warning("No code changes detected in LLM response") | |
| return { | |
| "response": result, | |
| "code_changes": code_changes, | |
| "referenced_files": referenced_files | |
| } | |
| @entrypoint(checkpointer=InMemorySaver()) | |
| def workflow(prompt: str) -> dict: | |
| """ | |
| Workflow for the code assistant. | |
| Processes a single prompt with approval loop. | |
| """ | |
| while True: | |
| # Get assistance | |
| result = assist(prompt).result() | |
| # Request approval - workflow pauses here | |
| is_approved = interrupt({ | |
| "response": result["response"], | |
| "code_changes": result["code_changes"], | |
| "action": "approve", | |
| "message": "Do you approve these changes?", | |
| }) | |
| # If approved, apply changes and return | |
| if is_approved and str(is_approved).lower() in ["yes", "y", "true", "1", "apply"]: | |
| logger.debug("Changes approved by user") | |
| # Apply code changes | |
| applied_files = [] | |
| if result["code_changes"]: | |
| logger.info(f"Applying changes to {len(result['code_changes'])} file(s)") | |
| for filename, code in result["code_changes"].items(): | |
| logger.debug(f"Writing to {filename}...") | |
| success = write_file(filename, code).result() | |
| logger.debug(f"Write result for {filename}: {success}") | |
| if success: | |
| applied_files.append(filename) | |
| else: | |
| logger.warning("No code changes detected to apply") | |
| return { | |
| "response": result["response"], | |
| "code_changes": result["code_changes"], | |
| "applied_files": applied_files, | |
| "is_approved": True, | |
| "prompt": prompt, | |
| } | |
| # If user wants to refine, get new prompt | |
| elif is_approved and str(is_approved).lower() in ["refine", "r", "modify", "m"]: | |
| logger.debug("User requested refinement") | |
| refinement = interrupt({ | |
| "action": "refine", | |
| "message": "How should I modify the solution?", | |
| "original_response": result["response"] | |
| }) | |
| # Update prompt with refinement | |
| prompt = f"{prompt}\n\nAdditional instructions: {refinement}" | |
| logger.debug(f"Refining solution based on: {refinement}") | |
| continue | |
| # If not approved, ask what to do | |
| else: | |
| logger.debug("Changes rejected by user") | |
| next_action = interrupt({ | |
| "action": "rejected", | |
| "message": "Would you like to: (r)efine the solution, or (q)uit?", | |
| }) | |
| if next_action and next_action.lower() in ["quit", "q", "exit"]: | |
| logger.info("Session ended by user") | |
| return { | |
| "response": "Session ended by user", | |
| "is_approved": False, | |
| "prompt": prompt | |
| } | |
| elif next_action and next_action.lower() in ["refine", "r"]: | |
| logger.debug("User chose to refine") | |
| refinement = interrupt({ | |
| "action": "refine", | |
| "message": "How should I modify the solution?", | |
| }) | |
| prompt = f"{prompt}\n\nAdditional instructions: {refinement}" | |
| continue | |
| def set_log_level(level: str): | |
| """Set the logging level dynamically.""" | |
| level_map = { | |
| 'debug': logging.DEBUG, | |
| 'info': logging.INFO, | |
| 'warning': logging.WARNING, | |
| 'error': logging.ERROR, | |
| 'critical': logging.CRITICAL | |
| } | |
| if level.lower() in level_map: | |
| logger.setLevel(level_map[level.lower()]) | |
| logging.getLogger().setLevel(level_map[level.lower()]) | |
| print(f"Log level set to {level.upper()}") | |
| else: | |
| print(f"Invalid log level: {level}. Choose from: debug, info, warning, error, critical") | |
| def run_code_assistant(): | |
| """ | |
| Client function to run the code assistant with proper interrupt handling. | |
| Supports continuous interaction until user quits. | |
| """ | |
| print("="*70) | |
| print(" π» MINIMALIST CODE ASSISTANT π»") | |
| print("="*70) | |
| print("\nCommands:") | |
| print(" - Reference files with @filename (e.g., '@script.py')") | |
| print(" - Type '/quit' to exit") | |
| print(" - Type '/loglevel <level>' to change logging (debug/info/warning/error)") | |
| print(" - When prompted, type 'yes' to apply changes") | |
| print(" - Type 'refine' to modify the solution") | |
| print("="*70) | |
| # Main interaction loop | |
| session_active = True | |
| while session_active: | |
| print("\n") | |
| prompt = input("Your request: ").strip() | |
| # Check for quit command | |
| if prompt.lower() in ['/quit', '/exit', 'quit', 'exit']: | |
| print("\nπ Goodbye!") | |
| break | |
| # Check for log level command | |
| if prompt.lower().startswith('/loglevel '): | |
| level = prompt.split(None, 1)[1] if len(prompt.split(None, 1)) > 1 else '' | |
| set_log_level(level) | |
| continue | |
| if not prompt: | |
| continue | |
| logger.info(f"Processing request: {prompt[:50]}...") | |
| print(f"\nπ§ Working on your request...\n") | |
| # Create a new thread for each request | |
| thread_id = str(uuid.uuid4()) | |
| config = {"configurable": {"thread_id": thread_id}} | |
| logger.debug(f"Created new workflow thread: {thread_id}") | |
| # Start the workflow with the prompt | |
| current_input = prompt | |
| workflow_completed = False | |
| while not workflow_completed: | |
| for event in workflow.stream(current_input, config=config, stream_mode="updates"): | |
| # Handle interrupt | |
| if "__interrupt__" in event: | |
| interrupts = event["__interrupt__"] | |
| for interrupt_item in interrupts: | |
| interrupt_value = interrupt_item.value | |
| action = interrupt_value.get("action", "") | |
| # Display the response/changes | |
| if action == "approve": | |
| print("="*70) | |
| print("π PROPOSED SOLUTION:") | |
| print("="*70) | |
| print(f"\n{interrupt_value['response']}\n") | |
| if interrupt_value.get('code_changes'): | |
| print("="*70) | |
| print("π FILES TO BE MODIFIED/CREATED:") | |
| print("="*70) | |
| for filename in interrupt_value['code_changes'].keys(): | |
| print(f" β’ {filename}") | |
| print() | |
| print("="*70) | |
| # Get user approval | |
| user_input = input(interrupt_value['message'] + " (yes/no/refine): ").strip() | |
| elif action == "refine": | |
| print("\n" + "="*70) | |
| user_input = input(interrupt_value['message'] + "\n> ").strip() | |
| elif action == "rejected": | |
| print("\n" + "="*70) | |
| user_input = input(interrupt_value['message'] + " ").strip() | |
| else: | |
| user_input = input(interrupt_value.get('message', 'Continue? ') + " ").strip() | |
| # Resume workflow with user's response | |
| print(f"\nβοΈ Processing...\n") | |
| current_input = Command(resume=user_input) | |
| break # Break to resume workflow | |
| # Handle final result | |
| elif "workflow" in event: | |
| result = event["workflow"] | |
| if result.get('is_approved'): | |
| print("="*70) | |
| print("β CHANGES APPLIED!") | |
| print("="*70) | |
| if result.get('applied_files'): | |
| print("\nModified files:") | |
| for filename in result['applied_files']: | |
| print(f" β {filename}") | |
| else: | |
| print("\nβ Session ended without applying changes") | |
| workflow_completed = True | |
| break | |
| # If we hit an interrupt, we need to continue the inner loop | |
| # to process the resumed workflow | |
| if not workflow_completed: | |
| continue | |
| else: | |
| break | |
| def run_single_request(): | |
| """ | |
| Simplified version for a single request (useful for testing). | |
| """ | |
| thread_id = "single-request" | |
| config = {"configurable": {"thread_id": thread_id}} | |
| prompt = input("Enter your request: ") | |
| current_input = prompt | |
| while True: | |
| for event in workflow.stream(current_input, config=config, stream_mode="updates"): | |
| if "__interrupt__" in event: | |
| data = event["__interrupt__"][0].value | |
| print(f"\n{data['response']}\n") | |
| if data.get('code_changes'): | |
| print(f"Files to modify: {list(data['code_changes'].keys())}") | |
| user_input = input(data['message'] + " (yes/no): ") | |
| current_input = Command(resume=user_input) | |
| break | |
| elif "workflow" in event: | |
| result = event["workflow"] | |
| if result.get('applied_files'): | |
| print(f"Applied changes to: {result['applied_files']}") | |
| return result | |
| if __name__ == "__main__": | |
| # Run the full interactive code assistant | |
| run_code_assistant() | |
| # Or use the simple version: | |
| # run_single_request() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment