Skip to content

Instantly share code, notes, and snippets.

@pavlovmilen
Created July 27, 2025 11:46
Show Gist options
  • Select an option

  • Save pavlovmilen/da8948fd7fe336ae0e0d11b5c696e4de to your computer and use it in GitHub Desktop.

Select an option

Save pavlovmilen/da8948fd7fe336ae0e0d11b5c696e4de to your computer and use it in GitHub Desktop.
Shared tracing code
"""
Shared tracing utilities for AutoGen agent teams.
Provides common OpenTelemetry setup and helper functions to avoid code duplication.
"""
import os
import logging
try:
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.openai import OpenAIInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
TRACING_AVAILABLE = True
except ImportError:
TRACING_AVAILABLE = False
print("WARNING: OpenTelemetry not available - tracing disabled")
class InvokeAgentFilter(logging.Filter):
"""Custom filter to only allow invoke_agent related events through"""
def filter(self, record):
# Always allow LLMCallEvent messages through for token counting
if hasattr(record, 'msg') and hasattr(record.msg, '__class__'):
from autogen_core.logging import LLMCallEvent
if isinstance(record.msg, LLMCallEvent):
return True
# Allow only specific AutoGen events we care about
if hasattr(record, 'msg'):
msg_str = str(record.msg)
# Look for invoke_agent patterns or similar agent invocation events
invoke_patterns = [
'invoke_agent',
'agent.invoke',
'agent_invocation',
'trace_invoke_agent',
'gen_ai.operation.name', # OpenTelemetry GenAI semantic convention
]
if any(pattern in msg_str.lower() for pattern in invoke_patterns):
return True
# Allow messages that contain agent invocation context
message = str(record.getMessage()).lower()
return any(pattern in message for pattern in [
'invoke_agent',
'agent invocation',
'calling agent',
'agent execution'
])
class SelectiveSpanProcessor(BatchSpanProcessor):
"""Custom span processor that only forwards agent-related spans to Jaeger"""
def __init__(self, span_exporter, **kwargs):
super().__init__(span_exporter, **kwargs)
def on_end(self, span):
# Only process spans related to agent invocations or tool calls
span_name = span.name.lower() if span.name else ""
# Filter OUT autogen publish events and other noise
exclude_patterns = [
'autogen publish',
'autogen process',
'autogen ack',
'autogen create group_topic',
]
# Skip excluded spans
if any(pattern in span_name for pattern in exclude_patterns):
return # Don't forward to Jaeger
# Check if this is an agent invocation or tool-related span we want to keep
include_patterns = [
'invoke_agent',
'agent_initialization',
'agent_execution',
'survey_hub_agents_workflow',
'workflow', # Include all workflow spans
'tool_call',
'openai',
'chat_completion',
'gen_ai', # OpenTelemetry GenAI operations
]
# Check span name and attributes for agent-related content
is_agent_related = any(pattern in span_name for pattern in include_patterns)
# Also check span attributes
if hasattr(span, 'attributes') and span.attributes:
attrs_str = str(span.attributes).lower()
is_agent_related = is_agent_related or any(pattern in attrs_str for pattern in include_patterns)
# Always forward spans that are part of our workflow hierarchy
if span_name in ['survey_hub_agents_workflow', 'agent_initialization', 'agent_execution']:
super().on_end(span) # Forward to Jaeger
elif is_agent_related:
super().on_end(span) # Forward to Jaeger
# Otherwise, ignore the span
def setup_focused_logging():
"""Setup focused logging to show tool calls and API calls but silence runtime noise"""
# Silence noisy runtime loggers - these generate lots of internal message flow logs
logging.getLogger('autogen_core').setLevel(logging.ERROR)
logging.getLogger('autogen_core.events').setLevel(logging.ERROR)
# Enable useful loggers at INFO level
logging.getLogger('httpx').setLevel(logging.INFO) # HTTP requests (OpenAI API calls)
logging.getLogger('openai').setLevel(logging.INFO) # OpenAI SDK logs
logging.getLogger('openai.chat').setLevel(logging.INFO) # OpenAI chat completions
logging.getLogger('opentelemetry.instrumentation.openai').setLevel(logging.INFO) # OpenAI instrumentation
# Enable tool-related logging
for tool_logger in ['autogen_agents.app.tools', 'tools']:
logging.getLogger(tool_logger).setLevel(logging.INFO)
# Enable AutoGen agent events but with custom filter for invoke_agent only
from autogen_core import EVENT_LOGGER_NAME
event_logger = logging.getLogger(EVENT_LOGGER_NAME)
event_logger.setLevel(logging.INFO) # Enable INFO level
# Add custom filter to only capture invoke_agent events
invoke_filter = InvokeAgentFilter()
event_logger.addFilter(invoke_filter)
print("INFO: Configured logging to capture only invoke_agent events from AutoGen")
# Set up a basic formatter if not already configured
root_logger = logging.getLogger()
if not root_logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)
def setup_tracing(service_instance_id):
"""Setup OpenTelemetry tracing for AutoGen agents
Args:
service_instance_id (str): Unique identifier for this service instance
Returns:
tracer: OpenTelemetry tracer instance or None if tracing unavailable
"""
if not TRACING_AVAILABLE:
return None
try:
if os.getenv('KUBERNETES_SERVICE_HOST'):
otlp_endpoint = "http://jaeger-service:4317"
else:
otlp_endpoint = "http://localhost:4317"
resource = Resource.create({
"service.name": "autogen-agents",
"service.version": "1.0.0",
"service.instance.id": service_instance_id,
"deployment.environment": "kubernetes" if os.getenv('KUBERNETES_SERVICE_HOST') else "local"
})
trace_provider = TracerProvider(resource=resource)
# Set global tracer provider to enable AutoGen's built-in invoke_agent events
trace.set_tracer_provider(trace_provider)
# Set up OpenAI instrumentation with the tracer provider
OpenAIInstrumentor().instrument(tracer_provider=trace_provider)
otlp_exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)
# Use selective span processor to only send agent-related spans to Jaeger
span_processor = SelectiveSpanProcessor(
otlp_exporter,
max_queue_size=100,
schedule_delay_millis=5000,
max_export_batch_size=50
)
trace_provider.add_span_processor(span_processor)
# For debugging: also add a regular batch processor to compare
# debug_processor = BatchSpanProcessor(otlp_exporter)
# trace_provider.add_span_processor(debug_processor)
print(f"SUCCESS: Selective AutoGen tracing enabled - endpoint: {otlp_endpoint}")
print(" Mode: Capturing only invoke_agent and tool-related events")
return trace_provider.get_tracer(__name__)
except Exception as e:
print(f"WARNING: Failed to setup tracing: {e}")
return None
class NoOpSpan:
"""No-operation span context manager for when tracing is disabled"""
def __enter__(self):
return self
def __exit__(self, *args):
pass
def set_attribute(self, key, value):
pass
def start_span(tracer, name):
"""Start a span if tracing is available, otherwise return no-op
Args:
tracer: OpenTelemetry tracer instance (can be None)
name (str): Name of the span to create
Returns:
Span context manager or NoOpSpan if tracing unavailable
"""
if tracer:
return tracer.start_as_current_span(name)
return NoOpSpan()
def get_tracing_status():
"""Get the current tracing configuration status
Returns:
dict: Status information about tracing configuration
"""
return {
"tracing_enabled": TRACING_AVAILABLE,
"jaeger_endpoint": "http://localhost:4317" if not os.getenv('KUBERNETES_SERVICE_HOST') else "http://jaeger-service:4317"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment