Skip to content

Instantly share code, notes, and snippets.

@kausmeows
Created February 2, 2026 07:11
Show Gist options
  • Select an option

  • Save kausmeows/b03e28717facc310ac5437c5f8ce422d to your computer and use it in GitHub Desktop.

Select an option

Save kausmeows/b03e28717facc310ac5437c5f8ce422d to your computer and use it in GitHub Desktop.
from typing import AsyncIterator, Union
from agno.agent import Agent
from agno.db.in_memory import InMemoryDb
from agno.models.anthropic import Claude
from agno.os import AgentOS
from agno.run.workflow import WorkflowRunOutputEvent
from agno.workflow import Step, Workflow
from agno.workflow.steps import Steps
from agno.workflow.types import StepInput, StepOutput
# === AGENTS ===
product_lead_agent = Agent(
name="Product Lead Agent",
model=Claude(id="claude-sonnet-4-5"),
db=InMemoryDb(),
)
lead_engineer_agent = Agent(
name="Lead Engineer Agent",
model=Claude(id="claude-sonnet-4-5"),
db=InMemoryDb(),
)
software_engineer_agent = Agent(
name="Software Engineer Agent",
model=Claude(id="claude-sonnet-4-5"),
db=InMemoryDb(),
)
# === STEP FUNCTIONS ===
async def run_analysis(
step_input: StepInput,
) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""Run analysis step with streaming events."""
try:
response_iterator = product_lead_agent.arun(
"Analyze this request...", stream=True, stream_events=True
)
async for event in response_iterator:
yield event
response = product_lead_agent.get_last_run_output()
yield StepOutput(content=response.content, success=True)
except Exception as e:
yield StepOutput(content=f"Analysis failed: {str(e)}", success=False)
async def run_prd_creation(
step_input: StepInput,
) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""Run PRD creation step with streaming events."""
try:
response_iterator = product_lead_agent.arun(
"Create PRD...", stream=True, stream_events=True
)
async for event in response_iterator:
yield event
response = product_lead_agent.get_last_run_output()
yield StepOutput(content=response.content, success=True)
except Exception as e:
yield StepOutput(content=f"PRD creation failed: {str(e)}", success=False)
async def run_architecture(
step_input: StepInput,
) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""Run architecture design step with streaming events."""
try:
response_iterator = lead_engineer_agent.arun(
"Design architecture...", stream=True, stream_events=True
)
async for event in response_iterator:
yield event
response = lead_engineer_agent.get_last_run_output()
yield StepOutput(content=response.content, success=True)
except Exception as e:
yield StepOutput(
content=f"Architecture design failed: {str(e)}", success=False
)
async def run_create_repo(
step_input: StepInput,
) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""Software Engineer creates repo."""
try:
response_iterator = software_engineer_agent.arun(
"Create repo...", stream=True, stream_events=True
)
async for event in response_iterator:
yield event
response = software_engineer_agent.get_last_run_output()
yield StepOutput(content=response.content, success=True)
except Exception as e:
yield StepOutput(content=f"Create repo failed: {str(e)}", success=False)
async def run_create_file(
step_input: StepInput,
) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""Software Engineer creates file."""
try:
response_iterator = software_engineer_agent.arun(
"Create file...", stream=True, stream_events=True
)
async for event in response_iterator:
yield event
response = software_engineer_agent.get_last_run_output()
yield StepOutput(content=response.content, success=True)
except Exception as e:
yield StepOutput(content=f"Create file failed: {str(e)}", success=False)
async def run_code_review(
step_input: StepInput,
) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""Lead Engineer reviews code."""
try:
response_iterator = lead_engineer_agent.arun(
"Review code...", stream=True, stream_events=True
)
async for event in response_iterator:
yield event
response = lead_engineer_agent.get_last_run_output()
yield StepOutput(content=response.content, success=True)
except Exception as e:
yield StepOutput(content=f"Code review failed: {str(e)}", success=False)
async def run_create_second_file(
step_input: StepInput,
) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""Software Engineer creates second file."""
try:
response_iterator = software_engineer_agent.arun(
"Create second file...", stream=True, stream_events=True
)
async for event in response_iterator:
yield event
response = software_engineer_agent.get_last_run_output()
yield StepOutput(content=response.content, success=True)
except Exception as e:
yield StepOutput(
content=f"Create second file failed: {str(e)}", success=False
)
# === GROUPED STEPS ===
# Product Discovery - groups analysis and PRD creation steps
product_discovery_steps = Steps(
name="Product Discovery",
steps=[
Step(name="analysis", executor=run_analysis),
Step(name="prd_creation", executor=run_prd_creation),
],
)
# Implementation Cycle - groups repo creation, file creation, review, and second file
implementation_steps = Steps(
name="Implementation Cycle",
steps=[
Step(name="create_repo", executor=run_create_repo),
Step(name="create_file", executor=run_create_file),
Step(name="code_review", executor=run_code_review),
Step(name="create_second_file", executor=run_create_second_file),
],
)
# === MAIN WORKFLOW: Software Development ===
software_development_workflow = Workflow(
name="Software Development",
steps=[
product_discovery_steps, # Grouped steps for product discovery
Step(name="architecture_design", executor=run_architecture),
implementation_steps, # Grouped steps for implementation
],
)
# Initialize the AgentOS with the workflows
agent_os = AgentOS(
description="Example OS setup",
workflows=[software_development_workflow],
)
app = agent_os.get_app()
if __name__ == "__main__":
agent_os.serve(app="test:app", reload=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment