Skip to content

Instantly share code, notes, and snippets.

@kausmeows
Created February 27, 2026 05:11
Show Gist options
  • Select an option

  • Save kausmeows/e2988deabdb3d597ac986934a469ab69 to your computer and use it in GitHub Desktop.

Select an option

Save kausmeows/e2988deabdb3d597ac986934a469ab69 to your computer and use it in GitHub Desktop.
"""
Example: CustomEvents with Teams (respond_directly=False)
This demonstrates how CustomEvents from member agents are streamed
to the user-facing stream even when respond_directly=False.
"""
import time
from agno.agent import Agent
from agno.team import Team
from agno.models.openai import OpenAIChat
from agno.run.team import CustomEvent
def data_processor(query: str):
"""A tool that emits CustomEvents for frontend UI rendering."""
# Emit a progress event
yield CustomEvent(
ui_type="progress",
progress=0,
message="Starting data processing..."
)
# Simulate some work
time.sleep(0.5)
# Emit progress update
yield CustomEvent(
ui_type="progress",
progress=50,
message="Processing halfway done..."
)
# Simulate more work
time.sleep(0.5)
# Emit a chart event for the frontend
yield CustomEvent(
ui_type="chart",
chart_type="bar",
data={
"labels": ["Q1", "Q2", "Q3", "Q4"],
"values": [100, 150, 200, 175]
},
title="Quarterly Results"
)
# Emit completion progress
yield CustomEvent(
ui_type="progress",
progress=100,
message="Processing complete!"
)
return f"Successfully processed query: {query}"
def notification_sender(message: str):
"""A tool that emits notification events."""
# Emit a notification event
yield CustomEvent(
ui_type="notification",
notification_type="info",
title="Processing Started",
message=f"Working on: {message}"
)
time.sleep(0.3)
# Emit success notification
yield CustomEvent(
ui_type="notification",
notification_type="success",
title="Task Complete",
message="The operation completed successfully"
)
return f"Notification sent for: {message}"
# Create member agents with tools that emit CustomEvents
data_agent = Agent(
name="Data Processor",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[data_processor],
instructions="You are a data processing agent. Use the data_processor tool to process data requests.",
)
notification_agent = Agent(
name="Notification Agent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[notification_sender],
instructions="You are a notification agent. Use the notification_sender tool to send notifications.",
)
# Create a team with respond_directly=False (coordinate mode)
team = Team(
name="Custom Events Demo Team",
model=OpenAIChat(id="gpt-4o-mini"),
members=[data_agent, notification_agent],
instructions=[
"You coordinate between the Data Processor and Notification Agent.",
"For data processing requests, delegate to the Data Processor.",
"For notification requests, delegate to the Notification Agent.",
],
# Key settings for CustomEvents:
stream_member_events=True, # Default: True - ensures member events bubble up
respond_directly=False,
)
def main():
print("=" * 60)
print("Testing CustomEvents with Team (respond_directly=False)")
print("=" * 60)
print()
# Run the team with streaming
print("Running team with stream=True, stream_events=True...")
print("-" * 60)
custom_events_received = []
for event in team.run(
"Process some quarterly sales data and show me a chart",
stream=True,
stream_events=True,
):
# Check if this is a CustomEvent
if hasattr(event, 'event') and event.event == "CustomEvent":
custom_events_received.append(event)
# Print the custom event details
print(f"\n[CustomEvent Received]")
print(f" UI Type: {getattr(event, 'ui_type', 'unknown')}")
if hasattr(event, 'progress'):
print(f" Progress: {event.progress}%")
if hasattr(event, 'message'):
print(f" Message: {event.message}")
if hasattr(event, 'data'):
print(f" Data: {event.data}")
if hasattr(event, 'chart_type'):
print(f" Chart Type: {event.chart_type}")
if hasattr(event, 'title'):
print(f" Title: {event.title}")
# Print content events (the actual response)
elif hasattr(event, 'content') and event.content:
print(event.content, end="", flush=True)
print()
print("-" * 60)
print(f"\nTotal CustomEvents received: {len(custom_events_received)}")
print()
# Summary of events
print("CustomEvents Summary:")
for i, evt in enumerate(custom_events_received, 1):
ui_type = getattr(evt, 'ui_type', 'unknown')
print(f" {i}. {ui_type}")
async def main_async():
"""Async version of the test."""
print("=" * 60)
print("Testing CustomEvents with Team (ASYNC)")
print("=" * 60)
print()
custom_events_received = []
async for event in team.arun(
"Send a notification about the new feature release",
stream=True,
stream_events=True,
):
if hasattr(event, 'event') and event.event == "CustomEvent":
custom_events_received.append(event)
print(f"\n[CustomEvent] ui_type={getattr(event, 'ui_type', '?')}")
elif hasattr(event, 'content') and event.content:
print(event.content, end="", flush=True)
print(f"\n\nTotal CustomEvents: {len(custom_events_received)}")
if __name__ == "__main__":
main()
# Uncomment to test async version:
# import asyncio
# asyncio.run(main_async())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment