Skip to content

Instantly share code, notes, and snippets.

@jmlon
Created January 6, 2026 21:34
Show Gist options
  • Select an option

  • Save jmlon/1efbe0eeb7d89d639e8759746abe30cc to your computer and use it in GitHub Desktop.

Select an option

Save jmlon/1efbe0eeb7d89d639e8759746abe30cc to your computer and use it in GitHub Desktop.
Minimalist Code Assistant with Human-in-the-Loop. A code assistant demonstrating the usage of the LangGraph v0.3 Functional API with human-in-the-loop workflows
# Minimalist Code Assistant with Human-in-the-Loop
# LangGraph v0.3 Functional API
# ============================================================================
# INSTALLATION & SETUP INSTRUCTIONS
# ============================================================================
#
# 1. Initialize UV project (if not already done):
# uv init
#
# 2. Install required packages:
# uv add langgraph langchain langchain-core python-dotenv
#
# And install your preferred model provider:
# uv add langchain-groq # For Groq
# uv add langchain-google-genai # For Google Gemini
# uv add langchain-openai # For OpenAI
# uv add langchain-anthropic # For Anthropic Claude
# uv add langchain-mistralai # For Mistral AI
#
# 3. Create a .env file in the same directory (see .env-example):
#
# # Choose your model provider (uncomment one):
# MODEL_PROVIDER=google_genai
# # MODEL_PROVIDER=groq
# # MODEL_PROVIDER=openai
# # MODEL_PROVIDER=anthropic
# # MODEL_PROVIDER=mistralai
# # MODEL_PROVIDER=ollama
#
# # Add the corresponding API key and model for your provider:
#
# # For Groq:
# GROQ_API_KEY=gsk_...
# GROQ_MODEL=llama-3.3-70b-versatile
#
# # For Google Gemini:
# GOOGLE_API_KEY=...
# GEMINI_MODEL=gemini-2.0-flash-exp
#
# # For OpenAI:
# OPENAI_API_KEY=sk-proj-...
# OPENAI_MODEL=gpt-4o-mini
#
# # For Anthropic:
# ANTHROPIC_API_KEY=sk-ant-...
# ANTHROPIC_MODEL=claude-3-5-sonnet-20241022
#
# # For Mistral:
# MISTRAL_API_KEY=...
# MISTRAL_MODEL=mistral-large-latest
#
# 4. Run the assistant:
# uv run minimalistCodeAssistant.py
#
# ============================================================================
# USAGE
# ============================================================================
#
# - Reference files with @filename (e.g., '@script.py')
# - Type '/quit' or '/exit' to exit
# - Type '/loglevel <level>' to change logging (debug/info/warning/error)
# - When prompted, type 'yes' to apply changes, 'no' to reject, or 'refine' to modify
#
# Example:
# Your request: Help me complete the code in @example.py
# Your request: Create a function to calculate fibonacci numbers
# Your request: /loglevel debug
#
# ============================================================================
import time
import os
import re
import uuid
import logging
from pathlib import Path
from langchain.chat_models import init_chat_model
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import Command, interrupt
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from dotenv import load_dotenv
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.WARNING, # Default: only warnings and errors
format='%(levelname)s: %(message)s'
)
logger = logging.getLogger(__name__)
llm = init_chat_model(os.getenv("GEMINI_MODEL"), model_provider=os.getenv("MODEL_PROVIDER"))
@task
def read_file(filename: str) -> str:
"""Read file contents."""
try:
logger.debug(f"Reading file: {filename}")
with open(filename, "r", encoding="utf-8") as f:
code = f.read()
logger.debug(f"Successfully read {len(code)} characters from {filename}")
return code
except FileNotFoundError:
logger.error(f"File not found: {filename}")
return f"[File {filename} not found]"
except Exception as e:
logger.error(f"Error reading {filename}: {str(e)}")
return f"[Error reading {filename}: {str(e)}]"
@task
def write_file(filename: str, code: str):
"""Write code to file."""
try:
logger.debug(f"Writing to file: {filename}")
# Create directory if it doesn't exist
Path(filename).parent.mkdir(parents=True, exist_ok=True)
with open(filename, "w", encoding="utf-8") as f:
f.write(code)
logger.info(f"Successfully wrote to {filename}")
return True
except Exception as e:
logger.error(f"Error writing to {filename}: {str(e)}")
return False
def extract_file_references(prompt: str) -> list[str]:
"""
Extract file references from prompt.
Files are referenced as @filename or @path/to/file.py
"""
# Match @filename or @path/to/file.ext
pattern = r'@([\w\-./]+\.?\w*)'
matches = re.findall(pattern, prompt)
return list(set(matches)) # Remove duplicates
def parse_code_changes(llm_response: str) -> dict[str, str]:
"""
Parse the LLM response to extract file changes.
Expected formats:
1. **filename** followed by ```language\ncode```
2. ```filename\ncode```
3. ```language\nfilename\ncode``` (filename on first line inside code block)
"""
changes = {}
# Pattern 1: Look for **filename** markdown bold followed by code block
# This handles: **example.py**\n```python\ncode\n```
pattern1 = r'\*\*([^\*]+\.\w+)\*\*\s*```(?:\w+)?\n(.*?)```'
matches1 = re.findall(pattern1, llm_response, re.DOTALL)
for filename, code in matches1:
changes[filename.strip()] = code.strip()
# Pattern 2: Look for code blocks with filename (not language) as identifier
# This handles: ```filename.py\ncode\n```
pattern2 = r'```([^\s]+\.\w+)\n(.*?)```'
matches2 = re.findall(pattern2, llm_response, re.DOTALL)
for filename, code in matches2:
# Skip if it's a common language identifier
if filename.lower() not in ['python', 'javascript', 'java', 'cpp', 'c', 'rust', 'go', 'typescript', 'js', 'ts', 'jsx', 'tsx']:
changes[filename.strip()] = code.strip()
# Pattern 3: Look for code blocks with filename on the first line
# This handles: ```python\nfilename.py\ncode\n```
pattern3 = r'```(?:python|javascript|java|cpp|c|rust|go|typescript|js|ts|jsx|tsx)?\n([^\n]+\.\w+)\n(.*?)```'
matches3 = re.findall(pattern3, llm_response, re.DOTALL)
for filename, code in matches3:
# Only add if filename looks like a valid filename (not code)
filename_stripped = filename.strip()
# Check if it's a simple filename (no spaces, no special chars except .-_/)
if re.match(r'^[\w\-./]+\.\w+$', filename_stripped):
changes[filename_stripped] = code.strip()
return changes
@task
def assist(prompt: str) -> dict:
"""Attend to the user assistance request."""
logger.debug(f"Processing assistance request: {prompt[:100]}...")
# Extract the problem (remove @file references for cleaner problem statement)
problem = re.sub(r'@[\w\-./]+', '', prompt).strip()
# Extract list of programs referenced in the prompt
referenced_files = extract_file_references(prompt)
if referenced_files:
logger.debug(f"Referenced files: {referenced_files}")
# Read the content of referenced files
file_contents = {}
for filename in referenced_files:
content = read_file(filename).result()
file_contents[filename] = content
# Build context with file contents
files_context = ""
if file_contents:
files_context = "\n\nReferenced files:\n"
for filename, content in file_contents.items():
files_context += f"\n--- {filename} ---\n{content}\n"
# Create the prompt for the LLM
prompt_template = PromptTemplate.from_template("""You are a helpful code assistant.
User request: {problem}
{files_context}
IMPORTANT: When providing code, use EXACTLY this format:
**filename.ext**
```python
code here
```
Example:
**calculator.py**
```python
def add(a, b):
return a + b
```
Be concise and provide working code.""")
logger.debug("Calling LLM for assistance...")
chain = prompt_template | llm | StrOutputParser()
result = chain.invoke({
"problem": problem,
"files_context": files_context
})
logger.debug(f"LLM response received ({len(result)} characters)")
# Parse code changes from response
code_changes = parse_code_changes(result)
if code_changes:
logger.debug(f"Parsed {len(code_changes)} file change(s): {list(code_changes.keys())}")
else:
logger.warning("No code changes detected in LLM response")
return {
"response": result,
"code_changes": code_changes,
"referenced_files": referenced_files
}
@entrypoint(checkpointer=InMemorySaver())
def workflow(prompt: str) -> dict:
"""
Workflow for the code assistant.
Processes a single prompt with approval loop.
"""
while True:
# Get assistance
result = assist(prompt).result()
# Request approval - workflow pauses here
is_approved = interrupt({
"response": result["response"],
"code_changes": result["code_changes"],
"action": "approve",
"message": "Do you approve these changes?",
})
# If approved, apply changes and return
if is_approved and str(is_approved).lower() in ["yes", "y", "true", "1", "apply"]:
logger.debug("Changes approved by user")
# Apply code changes
applied_files = []
if result["code_changes"]:
logger.info(f"Applying changes to {len(result['code_changes'])} file(s)")
for filename, code in result["code_changes"].items():
logger.debug(f"Writing to {filename}...")
success = write_file(filename, code).result()
logger.debug(f"Write result for {filename}: {success}")
if success:
applied_files.append(filename)
else:
logger.warning("No code changes detected to apply")
return {
"response": result["response"],
"code_changes": result["code_changes"],
"applied_files": applied_files,
"is_approved": True,
"prompt": prompt,
}
# If user wants to refine, get new prompt
elif is_approved and str(is_approved).lower() in ["refine", "r", "modify", "m"]:
logger.debug("User requested refinement")
refinement = interrupt({
"action": "refine",
"message": "How should I modify the solution?",
"original_response": result["response"]
})
# Update prompt with refinement
prompt = f"{prompt}\n\nAdditional instructions: {refinement}"
logger.debug(f"Refining solution based on: {refinement}")
continue
# If not approved, ask what to do
else:
logger.debug("Changes rejected by user")
next_action = interrupt({
"action": "rejected",
"message": "Would you like to: (r)efine the solution, or (q)uit?",
})
if next_action and next_action.lower() in ["quit", "q", "exit"]:
logger.info("Session ended by user")
return {
"response": "Session ended by user",
"is_approved": False,
"prompt": prompt
}
elif next_action and next_action.lower() in ["refine", "r"]:
logger.debug("User chose to refine")
refinement = interrupt({
"action": "refine",
"message": "How should I modify the solution?",
})
prompt = f"{prompt}\n\nAdditional instructions: {refinement}"
continue
def set_log_level(level: str):
"""Set the logging level dynamically."""
level_map = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL
}
if level.lower() in level_map:
logger.setLevel(level_map[level.lower()])
logging.getLogger().setLevel(level_map[level.lower()])
print(f"Log level set to {level.upper()}")
else:
print(f"Invalid log level: {level}. Choose from: debug, info, warning, error, critical")
def run_code_assistant():
"""
Client function to run the code assistant with proper interrupt handling.
Supports continuous interaction until user quits.
"""
print("="*70)
print(" πŸ’» MINIMALIST CODE ASSISTANT πŸ’»")
print("="*70)
print("\nCommands:")
print(" - Reference files with @filename (e.g., '@script.py')")
print(" - Type '/quit' to exit")
print(" - Type '/loglevel <level>' to change logging (debug/info/warning/error)")
print(" - When prompted, type 'yes' to apply changes")
print(" - Type 'refine' to modify the solution")
print("="*70)
# Main interaction loop
session_active = True
while session_active:
print("\n")
prompt = input("Your request: ").strip()
# Check for quit command
if prompt.lower() in ['/quit', '/exit', 'quit', 'exit']:
print("\nπŸ‘‹ Goodbye!")
break
# Check for log level command
if prompt.lower().startswith('/loglevel '):
level = prompt.split(None, 1)[1] if len(prompt.split(None, 1)) > 1 else ''
set_log_level(level)
continue
if not prompt:
continue
logger.info(f"Processing request: {prompt[:50]}...")
print(f"\nπŸ”§ Working on your request...\n")
# Create a new thread for each request
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
logger.debug(f"Created new workflow thread: {thread_id}")
# Start the workflow with the prompt
current_input = prompt
workflow_completed = False
while not workflow_completed:
for event in workflow.stream(current_input, config=config, stream_mode="updates"):
# Handle interrupt
if "__interrupt__" in event:
interrupts = event["__interrupt__"]
for interrupt_item in interrupts:
interrupt_value = interrupt_item.value
action = interrupt_value.get("action", "")
# Display the response/changes
if action == "approve":
print("="*70)
print("πŸ“ PROPOSED SOLUTION:")
print("="*70)
print(f"\n{interrupt_value['response']}\n")
if interrupt_value.get('code_changes'):
print("="*70)
print("πŸ“ FILES TO BE MODIFIED/CREATED:")
print("="*70)
for filename in interrupt_value['code_changes'].keys():
print(f" β€’ {filename}")
print()
print("="*70)
# Get user approval
user_input = input(interrupt_value['message'] + " (yes/no/refine): ").strip()
elif action == "refine":
print("\n" + "="*70)
user_input = input(interrupt_value['message'] + "\n> ").strip()
elif action == "rejected":
print("\n" + "="*70)
user_input = input(interrupt_value['message'] + " ").strip()
else:
user_input = input(interrupt_value.get('message', 'Continue? ') + " ").strip()
# Resume workflow with user's response
print(f"\nβš™οΈ Processing...\n")
current_input = Command(resume=user_input)
break # Break to resume workflow
# Handle final result
elif "workflow" in event:
result = event["workflow"]
if result.get('is_approved'):
print("="*70)
print("βœ… CHANGES APPLIED!")
print("="*70)
if result.get('applied_files'):
print("\nModified files:")
for filename in result['applied_files']:
print(f" βœ“ {filename}")
else:
print("\n❌ Session ended without applying changes")
workflow_completed = True
break
# If we hit an interrupt, we need to continue the inner loop
# to process the resumed workflow
if not workflow_completed:
continue
else:
break
def run_single_request():
"""
Simplified version for a single request (useful for testing).
"""
thread_id = "single-request"
config = {"configurable": {"thread_id": thread_id}}
prompt = input("Enter your request: ")
current_input = prompt
while True:
for event in workflow.stream(current_input, config=config, stream_mode="updates"):
if "__interrupt__" in event:
data = event["__interrupt__"][0].value
print(f"\n{data['response']}\n")
if data.get('code_changes'):
print(f"Files to modify: {list(data['code_changes'].keys())}")
user_input = input(data['message'] + " (yes/no): ")
current_input = Command(resume=user_input)
break
elif "workflow" in event:
result = event["workflow"]
if result.get('applied_files'):
print(f"Applied changes to: {result['applied_files']}")
return result
if __name__ == "__main__":
# Run the full interactive code assistant
run_code_assistant()
# Or use the simple version:
# run_single_request()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment