Created
February 27, 2026 05:11
-
-
Save kausmeows/e2988deabdb3d597ac986934a469ab69 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Example: CustomEvents with Teams (respond_directly=False) | |
| This demonstrates how CustomEvents from member agents are streamed | |
| to the user-facing stream even when respond_directly=False. | |
| """ | |
| import time | |
| from agno.agent import Agent | |
| from agno.team import Team | |
| from agno.models.openai import OpenAIChat | |
| from agno.run.team import CustomEvent | |
| def data_processor(query: str): | |
| """A tool that emits CustomEvents for frontend UI rendering.""" | |
| # Emit a progress event | |
| yield CustomEvent( | |
| ui_type="progress", | |
| progress=0, | |
| message="Starting data processing..." | |
| ) | |
| # Simulate some work | |
| time.sleep(0.5) | |
| # Emit progress update | |
| yield CustomEvent( | |
| ui_type="progress", | |
| progress=50, | |
| message="Processing halfway done..." | |
| ) | |
| # Simulate more work | |
| time.sleep(0.5) | |
| # Emit a chart event for the frontend | |
| yield CustomEvent( | |
| ui_type="chart", | |
| chart_type="bar", | |
| data={ | |
| "labels": ["Q1", "Q2", "Q3", "Q4"], | |
| "values": [100, 150, 200, 175] | |
| }, | |
| title="Quarterly Results" | |
| ) | |
| # Emit completion progress | |
| yield CustomEvent( | |
| ui_type="progress", | |
| progress=100, | |
| message="Processing complete!" | |
| ) | |
| return f"Successfully processed query: {query}" | |
| def notification_sender(message: str): | |
| """A tool that emits notification events.""" | |
| # Emit a notification event | |
| yield CustomEvent( | |
| ui_type="notification", | |
| notification_type="info", | |
| title="Processing Started", | |
| message=f"Working on: {message}" | |
| ) | |
| time.sleep(0.3) | |
| # Emit success notification | |
| yield CustomEvent( | |
| ui_type="notification", | |
| notification_type="success", | |
| title="Task Complete", | |
| message="The operation completed successfully" | |
| ) | |
| return f"Notification sent for: {message}" | |
| # Create member agents with tools that emit CustomEvents | |
| data_agent = Agent( | |
| name="Data Processor", | |
| model=OpenAIChat(id="gpt-4o-mini"), | |
| tools=[data_processor], | |
| instructions="You are a data processing agent. Use the data_processor tool to process data requests.", | |
| ) | |
| notification_agent = Agent( | |
| name="Notification Agent", | |
| model=OpenAIChat(id="gpt-4o-mini"), | |
| tools=[notification_sender], | |
| instructions="You are a notification agent. Use the notification_sender tool to send notifications.", | |
| ) | |
| # Create a team with respond_directly=False (coordinate mode) | |
| team = Team( | |
| name="Custom Events Demo Team", | |
| model=OpenAIChat(id="gpt-4o-mini"), | |
| members=[data_agent, notification_agent], | |
| instructions=[ | |
| "You coordinate between the Data Processor and Notification Agent.", | |
| "For data processing requests, delegate to the Data Processor.", | |
| "For notification requests, delegate to the Notification Agent.", | |
| ], | |
| # Key settings for CustomEvents: | |
| stream_member_events=True, # Default: True - ensures member events bubble up | |
| respond_directly=False, | |
| ) | |
| def main(): | |
| print("=" * 60) | |
| print("Testing CustomEvents with Team (respond_directly=False)") | |
| print("=" * 60) | |
| print() | |
| # Run the team with streaming | |
| print("Running team with stream=True, stream_events=True...") | |
| print("-" * 60) | |
| custom_events_received = [] | |
| for event in team.run( | |
| "Process some quarterly sales data and show me a chart", | |
| stream=True, | |
| stream_events=True, | |
| ): | |
| # Check if this is a CustomEvent | |
| if hasattr(event, 'event') and event.event == "CustomEvent": | |
| custom_events_received.append(event) | |
| # Print the custom event details | |
| print(f"\n[CustomEvent Received]") | |
| print(f" UI Type: {getattr(event, 'ui_type', 'unknown')}") | |
| if hasattr(event, 'progress'): | |
| print(f" Progress: {event.progress}%") | |
| if hasattr(event, 'message'): | |
| print(f" Message: {event.message}") | |
| if hasattr(event, 'data'): | |
| print(f" Data: {event.data}") | |
| if hasattr(event, 'chart_type'): | |
| print(f" Chart Type: {event.chart_type}") | |
| if hasattr(event, 'title'): | |
| print(f" Title: {event.title}") | |
| # Print content events (the actual response) | |
| elif hasattr(event, 'content') and event.content: | |
| print(event.content, end="", flush=True) | |
| print() | |
| print("-" * 60) | |
| print(f"\nTotal CustomEvents received: {len(custom_events_received)}") | |
| print() | |
| # Summary of events | |
| print("CustomEvents Summary:") | |
| for i, evt in enumerate(custom_events_received, 1): | |
| ui_type = getattr(evt, 'ui_type', 'unknown') | |
| print(f" {i}. {ui_type}") | |
| async def main_async(): | |
| """Async version of the test.""" | |
| print("=" * 60) | |
| print("Testing CustomEvents with Team (ASYNC)") | |
| print("=" * 60) | |
| print() | |
| custom_events_received = [] | |
| async for event in team.arun( | |
| "Send a notification about the new feature release", | |
| stream=True, | |
| stream_events=True, | |
| ): | |
| if hasattr(event, 'event') and event.event == "CustomEvent": | |
| custom_events_received.append(event) | |
| print(f"\n[CustomEvent] ui_type={getattr(event, 'ui_type', '?')}") | |
| elif hasattr(event, 'content') and event.content: | |
| print(event.content, end="", flush=True) | |
| print(f"\n\nTotal CustomEvents: {len(custom_events_received)}") | |
| if __name__ == "__main__": | |
| main() | |
| # Uncomment to test async version: | |
| # import asyncio | |
| # asyncio.run(main_async()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment