Instantly share code, notes, and snippets.
Created
July 27, 2025 11:46
-
Star
0
(0)
You must be signed in to star a gist -
Fork
0
(0)
You must be signed in to fork a gist
-
-
Save pavlovmilen/da8948fd7fe336ae0e0d11b5c696e4de to your computer and use it in GitHub Desktop.
Shared tracing code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Shared tracing utilities for AutoGen agent teams. | |
| Provides common OpenTelemetry setup and helper functions to avoid code duplication. | |
| """ | |
| import os | |
| import logging | |
| try: | |
| from opentelemetry import trace | |
| from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter | |
| from opentelemetry.instrumentation.openai import OpenAIInstrumentor | |
| from opentelemetry.sdk.resources import Resource | |
| from opentelemetry.sdk.trace import TracerProvider | |
| from opentelemetry.sdk.trace.export import BatchSpanProcessor | |
| TRACING_AVAILABLE = True | |
| except ImportError: | |
| TRACING_AVAILABLE = False | |
| print("WARNING: OpenTelemetry not available - tracing disabled") | |
| class InvokeAgentFilter(logging.Filter): | |
| """Custom filter to only allow invoke_agent related events through""" | |
| def filter(self, record): | |
| # Always allow LLMCallEvent messages through for token counting | |
| if hasattr(record, 'msg') and hasattr(record.msg, '__class__'): | |
| from autogen_core.logging import LLMCallEvent | |
| if isinstance(record.msg, LLMCallEvent): | |
| return True | |
| # Allow only specific AutoGen events we care about | |
| if hasattr(record, 'msg'): | |
| msg_str = str(record.msg) | |
| # Look for invoke_agent patterns or similar agent invocation events | |
| invoke_patterns = [ | |
| 'invoke_agent', | |
| 'agent.invoke', | |
| 'agent_invocation', | |
| 'trace_invoke_agent', | |
| 'gen_ai.operation.name', # OpenTelemetry GenAI semantic convention | |
| ] | |
| if any(pattern in msg_str.lower() for pattern in invoke_patterns): | |
| return True | |
| # Allow messages that contain agent invocation context | |
| message = str(record.getMessage()).lower() | |
| return any(pattern in message for pattern in [ | |
| 'invoke_agent', | |
| 'agent invocation', | |
| 'calling agent', | |
| 'agent execution' | |
| ]) | |
| class SelectiveSpanProcessor(BatchSpanProcessor): | |
| """Custom span processor that only forwards agent-related spans to Jaeger""" | |
| def __init__(self, span_exporter, **kwargs): | |
| super().__init__(span_exporter, **kwargs) | |
| def on_end(self, span): | |
| # Only process spans related to agent invocations or tool calls | |
| span_name = span.name.lower() if span.name else "" | |
| # Filter OUT autogen publish events and other noise | |
| exclude_patterns = [ | |
| 'autogen publish', | |
| 'autogen process', | |
| 'autogen ack', | |
| 'autogen create group_topic', | |
| ] | |
| # Skip excluded spans | |
| if any(pattern in span_name for pattern in exclude_patterns): | |
| return # Don't forward to Jaeger | |
| # Check if this is an agent invocation or tool-related span we want to keep | |
| include_patterns = [ | |
| 'invoke_agent', | |
| 'agent_initialization', | |
| 'agent_execution', | |
| 'survey_hub_agents_workflow', | |
| 'workflow', # Include all workflow spans | |
| 'tool_call', | |
| 'openai', | |
| 'chat_completion', | |
| 'gen_ai', # OpenTelemetry GenAI operations | |
| ] | |
| # Check span name and attributes for agent-related content | |
| is_agent_related = any(pattern in span_name for pattern in include_patterns) | |
| # Also check span attributes | |
| if hasattr(span, 'attributes') and span.attributes: | |
| attrs_str = str(span.attributes).lower() | |
| is_agent_related = is_agent_related or any(pattern in attrs_str for pattern in include_patterns) | |
| # Always forward spans that are part of our workflow hierarchy | |
| if span_name in ['survey_hub_agents_workflow', 'agent_initialization', 'agent_execution']: | |
| super().on_end(span) # Forward to Jaeger | |
| elif is_agent_related: | |
| super().on_end(span) # Forward to Jaeger | |
| # Otherwise, ignore the span | |
| def setup_focused_logging(): | |
| """Setup focused logging to show tool calls and API calls but silence runtime noise""" | |
| # Silence noisy runtime loggers - these generate lots of internal message flow logs | |
| logging.getLogger('autogen_core').setLevel(logging.ERROR) | |
| logging.getLogger('autogen_core.events').setLevel(logging.ERROR) | |
| # Enable useful loggers at INFO level | |
| logging.getLogger('httpx').setLevel(logging.INFO) # HTTP requests (OpenAI API calls) | |
| logging.getLogger('openai').setLevel(logging.INFO) # OpenAI SDK logs | |
| logging.getLogger('openai.chat').setLevel(logging.INFO) # OpenAI chat completions | |
| logging.getLogger('opentelemetry.instrumentation.openai').setLevel(logging.INFO) # OpenAI instrumentation | |
| # Enable tool-related logging | |
| for tool_logger in ['autogen_agents.app.tools', 'tools']: | |
| logging.getLogger(tool_logger).setLevel(logging.INFO) | |
| # Enable AutoGen agent events but with custom filter for invoke_agent only | |
| from autogen_core import EVENT_LOGGER_NAME | |
| event_logger = logging.getLogger(EVENT_LOGGER_NAME) | |
| event_logger.setLevel(logging.INFO) # Enable INFO level | |
| # Add custom filter to only capture invoke_agent events | |
| invoke_filter = InvokeAgentFilter() | |
| event_logger.addFilter(invoke_filter) | |
| print("INFO: Configured logging to capture only invoke_agent events from AutoGen") | |
| # Set up a basic formatter if not already configured | |
| root_logger = logging.getLogger() | |
| if not root_logger.handlers: | |
| handler = logging.StreamHandler() | |
| formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| handler.setFormatter(formatter) | |
| root_logger.addHandler(handler) | |
| root_logger.setLevel(logging.INFO) | |
| def setup_tracing(service_instance_id): | |
| """Setup OpenTelemetry tracing for AutoGen agents | |
| Args: | |
| service_instance_id (str): Unique identifier for this service instance | |
| Returns: | |
| tracer: OpenTelemetry tracer instance or None if tracing unavailable | |
| """ | |
| if not TRACING_AVAILABLE: | |
| return None | |
| try: | |
| if os.getenv('KUBERNETES_SERVICE_HOST'): | |
| otlp_endpoint = "http://jaeger-service:4317" | |
| else: | |
| otlp_endpoint = "http://localhost:4317" | |
| resource = Resource.create({ | |
| "service.name": "autogen-agents", | |
| "service.version": "1.0.0", | |
| "service.instance.id": service_instance_id, | |
| "deployment.environment": "kubernetes" if os.getenv('KUBERNETES_SERVICE_HOST') else "local" | |
| }) | |
| trace_provider = TracerProvider(resource=resource) | |
| # Set global tracer provider to enable AutoGen's built-in invoke_agent events | |
| trace.set_tracer_provider(trace_provider) | |
| # Set up OpenAI instrumentation with the tracer provider | |
| OpenAIInstrumentor().instrument(tracer_provider=trace_provider) | |
| otlp_exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True) | |
| # Use selective span processor to only send agent-related spans to Jaeger | |
| span_processor = SelectiveSpanProcessor( | |
| otlp_exporter, | |
| max_queue_size=100, | |
| schedule_delay_millis=5000, | |
| max_export_batch_size=50 | |
| ) | |
| trace_provider.add_span_processor(span_processor) | |
| # For debugging: also add a regular batch processor to compare | |
| # debug_processor = BatchSpanProcessor(otlp_exporter) | |
| # trace_provider.add_span_processor(debug_processor) | |
| print(f"SUCCESS: Selective AutoGen tracing enabled - endpoint: {otlp_endpoint}") | |
| print(" Mode: Capturing only invoke_agent and tool-related events") | |
| return trace_provider.get_tracer(__name__) | |
| except Exception as e: | |
| print(f"WARNING: Failed to setup tracing: {e}") | |
| return None | |
| class NoOpSpan: | |
| """No-operation span context manager for when tracing is disabled""" | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *args): | |
| pass | |
| def set_attribute(self, key, value): | |
| pass | |
| def start_span(tracer, name): | |
| """Start a span if tracing is available, otherwise return no-op | |
| Args: | |
| tracer: OpenTelemetry tracer instance (can be None) | |
| name (str): Name of the span to create | |
| Returns: | |
| Span context manager or NoOpSpan if tracing unavailable | |
| """ | |
| if tracer: | |
| return tracer.start_as_current_span(name) | |
| return NoOpSpan() | |
| def get_tracing_status(): | |
| """Get the current tracing configuration status | |
| Returns: | |
| dict: Status information about tracing configuration | |
| """ | |
| return { | |
| "tracing_enabled": TRACING_AVAILABLE, | |
| "jaeger_endpoint": "http://localhost:4317" if not os.getenv('KUBERNETES_SERVICE_HOST') else "http://jaeger-service:4317" | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment