Created
February 2, 2026 07:11
-
-
Save kausmeows/b03e28717facc310ac5437c5f8ce422d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from typing import AsyncIterator, Union | |
| from agno.agent import Agent | |
| from agno.db.in_memory import InMemoryDb | |
| from agno.models.anthropic import Claude | |
| from agno.os import AgentOS | |
| from agno.run.workflow import WorkflowRunOutputEvent | |
| from agno.workflow import Step, Workflow | |
| from agno.workflow.steps import Steps | |
| from agno.workflow.types import StepInput, StepOutput | |
| # === AGENTS === | |
| product_lead_agent = Agent( | |
| name="Product Lead Agent", | |
| model=Claude(id="claude-sonnet-4-5"), | |
| db=InMemoryDb(), | |
| ) | |
| lead_engineer_agent = Agent( | |
| name="Lead Engineer Agent", | |
| model=Claude(id="claude-sonnet-4-5"), | |
| db=InMemoryDb(), | |
| ) | |
| software_engineer_agent = Agent( | |
| name="Software Engineer Agent", | |
| model=Claude(id="claude-sonnet-4-5"), | |
| db=InMemoryDb(), | |
| ) | |
| # === STEP FUNCTIONS === | |
| async def run_analysis( | |
| step_input: StepInput, | |
| ) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]: | |
| """Run analysis step with streaming events.""" | |
| try: | |
| response_iterator = product_lead_agent.arun( | |
| "Analyze this request...", stream=True, stream_events=True | |
| ) | |
| async for event in response_iterator: | |
| yield event | |
| response = product_lead_agent.get_last_run_output() | |
| yield StepOutput(content=response.content, success=True) | |
| except Exception as e: | |
| yield StepOutput(content=f"Analysis failed: {str(e)}", success=False) | |
| async def run_prd_creation( | |
| step_input: StepInput, | |
| ) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]: | |
| """Run PRD creation step with streaming events.""" | |
| try: | |
| response_iterator = product_lead_agent.arun( | |
| "Create PRD...", stream=True, stream_events=True | |
| ) | |
| async for event in response_iterator: | |
| yield event | |
| response = product_lead_agent.get_last_run_output() | |
| yield StepOutput(content=response.content, success=True) | |
| except Exception as e: | |
| yield StepOutput(content=f"PRD creation failed: {str(e)}", success=False) | |
| async def run_architecture( | |
| step_input: StepInput, | |
| ) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]: | |
| """Run architecture design step with streaming events.""" | |
| try: | |
| response_iterator = lead_engineer_agent.arun( | |
| "Design architecture...", stream=True, stream_events=True | |
| ) | |
| async for event in response_iterator: | |
| yield event | |
| response = lead_engineer_agent.get_last_run_output() | |
| yield StepOutput(content=response.content, success=True) | |
| except Exception as e: | |
| yield StepOutput( | |
| content=f"Architecture design failed: {str(e)}", success=False | |
| ) | |
| async def run_create_repo( | |
| step_input: StepInput, | |
| ) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]: | |
| """Software Engineer creates repo.""" | |
| try: | |
| response_iterator = software_engineer_agent.arun( | |
| "Create repo...", stream=True, stream_events=True | |
| ) | |
| async for event in response_iterator: | |
| yield event | |
| response = software_engineer_agent.get_last_run_output() | |
| yield StepOutput(content=response.content, success=True) | |
| except Exception as e: | |
| yield StepOutput(content=f"Create repo failed: {str(e)}", success=False) | |
| async def run_create_file( | |
| step_input: StepInput, | |
| ) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]: | |
| """Software Engineer creates file.""" | |
| try: | |
| response_iterator = software_engineer_agent.arun( | |
| "Create file...", stream=True, stream_events=True | |
| ) | |
| async for event in response_iterator: | |
| yield event | |
| response = software_engineer_agent.get_last_run_output() | |
| yield StepOutput(content=response.content, success=True) | |
| except Exception as e: | |
| yield StepOutput(content=f"Create file failed: {str(e)}", success=False) | |
| async def run_code_review( | |
| step_input: StepInput, | |
| ) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]: | |
| """Lead Engineer reviews code.""" | |
| try: | |
| response_iterator = lead_engineer_agent.arun( | |
| "Review code...", stream=True, stream_events=True | |
| ) | |
| async for event in response_iterator: | |
| yield event | |
| response = lead_engineer_agent.get_last_run_output() | |
| yield StepOutput(content=response.content, success=True) | |
| except Exception as e: | |
| yield StepOutput(content=f"Code review failed: {str(e)}", success=False) | |
| async def run_create_second_file( | |
| step_input: StepInput, | |
| ) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]: | |
| """Software Engineer creates second file.""" | |
| try: | |
| response_iterator = software_engineer_agent.arun( | |
| "Create second file...", stream=True, stream_events=True | |
| ) | |
| async for event in response_iterator: | |
| yield event | |
| response = software_engineer_agent.get_last_run_output() | |
| yield StepOutput(content=response.content, success=True) | |
| except Exception as e: | |
| yield StepOutput( | |
| content=f"Create second file failed: {str(e)}", success=False | |
| ) | |
| # === GROUPED STEPS === | |
| # Product Discovery - groups analysis and PRD creation steps | |
| product_discovery_steps = Steps( | |
| name="Product Discovery", | |
| steps=[ | |
| Step(name="analysis", executor=run_analysis), | |
| Step(name="prd_creation", executor=run_prd_creation), | |
| ], | |
| ) | |
| # Implementation Cycle - groups repo creation, file creation, review, and second file | |
| implementation_steps = Steps( | |
| name="Implementation Cycle", | |
| steps=[ | |
| Step(name="create_repo", executor=run_create_repo), | |
| Step(name="create_file", executor=run_create_file), | |
| Step(name="code_review", executor=run_code_review), | |
| Step(name="create_second_file", executor=run_create_second_file), | |
| ], | |
| ) | |
| # === MAIN WORKFLOW: Software Development === | |
| software_development_workflow = Workflow( | |
| name="Software Development", | |
| steps=[ | |
| product_discovery_steps, # Grouped steps for product discovery | |
| Step(name="architecture_design", executor=run_architecture), | |
| implementation_steps, # Grouped steps for implementation | |
| ], | |
| ) | |
| # Initialize the AgentOS with the workflows | |
| agent_os = AgentOS( | |
| description="Example OS setup", | |
| workflows=[software_development_workflow], | |
| ) | |
| app = agent_os.get_app() | |
| if __name__ == "__main__": | |
| agent_os.serve(app="test:app", reload=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment