Version: Public Preview (October 2024+)
Repository: https://github.com/microsoft/agent-framework
Lab Examples: https://github.com/microsoft/agentic-ai-lab
Microsoft Agent Framework is the next-generation unified framework that combines AutoGen (research-focused multi-agent orchestration) and Semantic Kernel (enterprise-ready SDK) into a single, production-ready platform for building AI agent systems.
- Successor to: Both AutoGen and Semantic Kernel (same teams built this)
- Current Status: Public Preview
- Languages: Python and .NET (C#) with consistent APIs
- Philosophy: Code-first, specification-driven development for agentic systems
- AI Agents: Autonomous LLM-powered agents with tool integration
- Workflows: Graph-based orchestration for multi-agent coordination
- Enterprise Features: Observability, security, durability, checkpointing
- Cloud Integration: Native Azure AI Foundry integration
pip install agent-framework --preThis installs all sub-packages. Individual packages available in python/packages/.
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.OpenAI --prereleaseRecommended: Azure CLI authentication
az login --tenant YOUR_TENANT_ID
az account showAutonomous components powered by LLMs that can:
- Process user inputs
- Make decisions using reasoning
- Call tools and functions
- Manage conversation threads
- Generate responses
Graph-based structures that:
- Connect multiple agents and functions
- Define execution flow with edges
- Support conditional routing
- Enable checkpointing and recovery
- Coordinate multi-agent orchestration
Processing units in workflows that can be:
- AI Agents (LLM-powered)
- Custom Functions (deterministic logic)
- External APIs (via MCP or direct integration)
State management for conversations:
- Store message history
- Maintain context across turns
- Support suspend/resume operations
- Can use custom storage backends (Redis, etc.)
Extensions that agents can call:
- Hosted Code Interpreter (Python execution)
- File Search (document retrieval)
- Bing Grounding (web search)
- Custom Functions (your code)
- MCP Servers (external tools)
Python - Azure OpenAI:
import asyncio
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
async def main():
# Create chat client
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Create agent
agent = chat_client.create_agent(
name="HelperBot",
instructions="You are a helpful assistant specialized in cloud architecture."
)
# Run agent
result = await agent.run("Explain Azure AKS networking options")
print(result.text)
if __name__ == "__main__":
asyncio.run(main())Python - Azure AI Foundry:
from agent_framework import ChatAgent
from agent_framework.azure import AzureAIAgentClient
from azure.identity.aio import AzureCliCredential
async with (
AzureCliCredential() as credential,
ChatAgent(
chat_client=AzureAIAgentClient(async_credential=credential),
instructions="You are good at telling jokes."
) as agent,
):
result = await agent.run("Tell me a joke about Kubernetes.")
print(result.text).NET - Azure OpenAI:
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
AIAgent agent = new AzureOpenAIClient(
new Uri("https://your-resource.openai.azure.com/"),
new AzureCliCredential())
.GetChatClient("gpt-4o-mini")
.CreateAIAgent(instructions: "You are a cloud solution architect.");
Console.WriteLine(await agent.RunAsync("Design a multi-region AKS deployment"));Code Interpreter:
from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.azure import AzureAIAgentClient
from azure.identity.aio import DefaultAzureCredential
async with (
DefaultAzureCredential() as credential,
ChatAgent(
chat_client=AzureAIAgentClient(async_credential=credential),
instructions="You are a helpful assistant that can execute Python code.",
tools=HostedCodeInterpreterTool()
) as agent
):
response = await agent.run("Calculate factorial of 100 using Python")
print(response.text)Custom Function Tools:
from agent_framework import ChatAgent, function_tool
@function_tool
def get_cluster_status(cluster_name: str) -> str:
"""Get the current status of an AKS cluster."""
# Your implementation
return f"Cluster {cluster_name} is running"
agent = chat_client.create_agent(
name="AKSBot",
instructions="You help manage AKS clusters",
tools=[get_cluster_status]
)File Search (RAG):
from agent_framework import ChatAgent, HostedFileSearchTool
agent = ChatAgent(
chat_client=client,
instructions="You answer questions based on uploaded documents.",
tools=HostedFileSearchTool()
)Bing Grounding (Web Search):
from agent_framework import ChatAgent, HostedBingGroundingTool
agent = ChatAgent(
chat_client=client,
instructions="You search the web for current information.",
tools=HostedBingGroundingTool()
)Service-Managed Thread (In-Memory):
from agent_framework import AgentThread
# Create thread (stores in-memory within process)
thread = AgentThread()
# Run multiple turns
result1 = await agent.run("What's Kubernetes?", thread=thread)
result2 = await agent.run("What are its benefits?", thread=thread)
# Context from first question is maintainedCustom Thread Storage (Redis):
from agent_framework import ChatMessageStoreProtocol
import redis.asyncio as redis
class RedisChatMessageStore(ChatMessageStoreProtocol):
def __init__(self, redis_client):
self.redis = redis_client
async def save_messages(self, thread_id: str, messages: list):
# Store in Redis
pass
async def load_messages(self, thread_id: str) -> list:
# Load from Redis
pass
# Use custom store
store = RedisChatMessageStore(redis_client)
thread = AgentThread(message_store=store)Function-Based Middleware:
from agent_framework import ChatAgent, MiddlewareContext
async def logging_middleware(context: MiddlewareContext, next):
print(f"Agent called: {context.agent.name}")
result = await next()
print(f"Agent completed: {result}")
return result
agent = chat_client.create_agent(
name="MyAgent",
instructions="You are helpful",
middleware=[logging_middleware]
)Class-Based Middleware:
from agent_framework import Middleware
class SecurityMiddleware(Middleware):
async def handle(self, context, next):
# Pre-processing
if self.is_safe(context.message):
result = await next()
# Post-processing
return self.sanitize(result)
else:
raise SecurityException("Unsafe input")Executors: Individual processing units (agents or functions)
Edges: Connections between executors defining message flow
Workflow: Directed graph of executors + edges
Supersteps: Execution is organized into discrete processing steps
Superstep N:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Collect All │───▶│ Route Messages │───▶│ Execute All │
│ Pending │ │ Based on Type │ │ Target │
│ Messages │ │ & Conditions │ │ Executors │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ Emit Events & │
│ New Messages │
└─────────────────┘
Agents execute in order, passing output to next agent.
from agent_framework import SequentialBuilder
from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential
# Create agents
chat_client = AzureChatClient(credential=AzureCliCredential())
writer = chat_client.create_agent(
instructions="You write concise marketing copy.",
name="writer"
)
reviewer = chat_client.create_agent(
instructions="You review and improve marketing copy.",
name="reviewer"
)
# Build sequential workflow
workflow = SequentialBuilder().participants([writer, reviewer]).build()
# Execute workflow
from agent_framework import WorkflowCompletedEvent
completion = None
async for event in workflow.run_stream("Write tagline for cloud-native AKS"):
if isinstance(event, WorkflowCompletedEvent):
completion = event
if completion:
messages = completion.data
for msg in messages:
print(f"[{msg.author_name}]: {msg.text}")from agent_framework import WorkflowBuilder, Executor, WorkflowContext, handler
from agent_framework import ChatMessage
class DataValidator(Executor):
"""Custom executor for data validation"""
@handler
async def validate(self, data: str, ctx: WorkflowContext) -> None:
if self.is_valid(data):
await ctx.send_message(f"Validated: {data}")
else:
await ctx.send_message("Validation failed")
# Create agents and custom executors
processor = chat_client.create_agent(
name="processor",
instructions="Process and transform data"
)
validator = DataValidator()
formatter = chat_client.create_agent(
name="formatter",
instructions="Format data for output"
)
# Build workflow
builder = WorkflowBuilder()
builder.set_start_executor(processor)
builder.add_edge(processor, validator)
builder.add_edge(validator, formatter)
workflow = builder.build()
# Execute
result = await workflow.run("Process this AKS cluster config")Multiple agents work in parallel.
from agent_framework import ConcurrentBuilder
# Create specialized agents
security_agent = chat_client.create_agent(
name="security",
instructions="Analyze security aspects"
)
performance_agent = chat_client.create_agent(
name="performance",
instructions="Analyze performance aspects"
)
cost_agent = chat_client.create_agent(
name="cost",
instructions="Analyze cost aspects"
)
# Build concurrent workflow (all agents run in parallel)
workflow = ConcurrentBuilder().participants([
security_agent,
performance_agent,
cost_agent
]).build()
# Execute - all agents process input simultaneously
result = await workflow.run("Review this AKS architecture design")Agents pass control based on context.
from agent_framework import HandoffBuilder
# Create agents with handoff conditions
triage_agent = chat_client.create_agent(
name="triage",
instructions="Route issues to appropriate specialist"
)
network_agent = chat_client.create_agent(
name="network",
instructions="Handle networking issues"
)
security_agent = chat_client.create_agent(
name="security",
instructions="Handle security issues"
)
# Build handoff workflow
workflow = HandoffBuilder().participants([
triage_agent,
network_agent,
security_agent
]).build()Agents collaborate in shared conversation space.
from agent_framework import GroupChatBuilder
# Create agents with different specializations
architect = chat_client.create_agent(
name="architect",
instructions="Design system architecture"
)
developer = chat_client.create_agent(
name="developer",
instructions="Implement solutions"
)
reviewer = chat_client.create_agent(
name="reviewer",
instructions="Review and provide feedback"
)
# Build group chat
workflow = GroupChatBuilder().participants([
architect,
developer,
reviewer
]).build()from agent_framework import CheckpointConfig
# Configure checkpointing
checkpoint_config = CheckpointConfig(
storage="azure_blob", # or "local", "redis"
checkpoint_interval=5 # Save every 5 steps
)
workflow = builder.build(checkpoint_config=checkpoint_config)
# Resume from checkpoint
workflow = WorkflowBuilder.load_from_checkpoint("checkpoint_id")
result = await workflow.resume()from agent_framework import HumanApprovalExecutor
class ApprovalGate(HumanApprovalExecutor):
async def request_approval(self, data, ctx: WorkflowContext):
# Pause workflow and request approval
approval = await self.wait_for_human_input()
if approval:
await ctx.send_message(data)
else:
await ctx.terminate("Approval denied")
# Add to workflow
builder.add_executor(ApprovalGate(), position=2)Built-in OpenTelemetry Integration:
from agent_framework.observability import configure_tracing
# Configure tracing
configure_tracing(
service_name="my-agent-system",
exporter="azure_monitor" # or "console", "otlp"
)
# Automatic tracing of:
# - Agent invocations
# - Tool calls
# - Workflow steps
# - Message passingAccess traces in Azure Monitor or Application Insights for debugging.
Agent as MCP Server:
from agent_framework.mcp import AgentMCPServer
# Expose agent as MCP server
server = AgentMCPServer(agent)
await server.start(port=8080)Calling MCP Servers from Agents:
from agent_framework import MCPClient
mcp_client = MCPClient("http://external-mcp-server:8080")
agent = chat_client.create_agent(
name="mcp_caller",
instructions="You can call external MCP tools",
tools=[mcp_client]
)- Models: Azure OpenAI, Azure AI Model Catalog
- Services: Azure AI Search, Bing Grounding, Content Safety
- Storage: Azure Blob Storage (for files, checkpoints)
- Monitoring: Application Insights, Azure Monitor
- Deployment: Azure Container Apps, AKS
import os
# Azure OpenAI
os.environ["AZURE_OPENAI_ENDPOINT"] = "https://your-resource.openai.azure.com/"
os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"] = "gpt-4o"
os.environ["AZURE_OPENAI_API_VERSION"] = "2024-08-01-preview"
# Azure AI Foundry Project
os.environ["AZURE_AI_PROJECT_ENDPOINT"] = "https://your-project.api.azureml.ms"# container-app.yaml
properties:
configuration:
ingress:
external: true
targetPort: 8080
template:
containers:
- name: agent-app
image: myregistry.azurecr.io/agent-app:latest
env:
- name: AZURE_OPENAI_ENDPOINT
secretRef: azure-openai-endpoint
scale:
minReplicas: 1
maxReplicas: 10# Multi-agent chaos testing system
hypothesis_agent = chat_client.create_agent(
name="hypothesis",
instructions="Generate chaos engineering hypotheses for AKS clusters"
)
experiment_agent = chat_client.create_agent(
name="experiment",
instructions="Design and execute chaos experiments",
tools=[kubectl_tool, azure_cli_tool]
)
analysis_agent = chat_client.create_agent(
name="analysis",
instructions="Analyze experiment results and system behavior"
)
workflow = SequentialBuilder().participants([
hypothesis_agent,
experiment_agent,
analysis_agent
]).build()# Platform engineering multi-agent system
provisioning_agent = chat_client.create_agent(
name="provisioning",
instructions="Provision Azure infrastructure using Terraform",
tools=[terraform_tool, azure_rm_tool]
)
security_agent = chat_client.create_agent(
name="security",
instructions="Apply security policies and compliance checks",
tools=[policy_tool, defender_tool]
)
monitoring_agent = chat_client.create_agent(
name="monitoring",
instructions="Configure monitoring and alerting",
tools=[monitor_tool, log_analytics_tool]
)
workflow = ConcurrentBuilder().participants([
provisioning_agent,
security_agent,
monitoring_agent
]).build()# Enterprise workflow
validation_agent = chat_client.create_agent(
name="validation",
instructions="Validate customer data and requirements"
)
provisioning_agent = chat_client.create_agent(
name="provisioning",
instructions="Set up customer accounts and resources"
)
notification_agent = chat_client.create_agent(
name="notification",
instructions="Send welcome emails and setup instructions"
)
workflow = HandoffBuilder().participants([
validation_agent,
provisioning_agent,
notification_agent
]).with_human_approval(position=1) # Approve after validation- Single Responsibility: Each agent should have ONE clear purpose
- Clear Instructions: Be explicit about agent's role and capabilities
- Tool Selection: Only give agents tools they actually need
- Name Meaningfully: Use descriptive names for debugging
- Start Simple: Begin with sequential, add complexity as needed
- Type Safety: Use strong typing for message passing
- Error Handling: Implement retry logic and failure recovery
- Checkpointing: For workflows longer than 5 minutes
- Observability: Always enable tracing in production
- Thread Usage: Use threads for multi-turn conversations
- Custom Storage: Implement custom stores for distributed systems
- Context Limits: Monitor token usage in long conversations
- State Reset: Clear threads when starting new tasks
- Concurrent When Possible: Parallelize independent agent tasks
- Stream Results: Use streaming for real-time feedback
- Cache Aggressively: Cache tool results where appropriate
- Batch Operations: Group similar tasks together
- Input Validation: Validate all user inputs before agent processing
- Tool Restrictions: Limit agent tool access by principle of least privilege
- Content Safety: Use Azure Content Safety for filtering
- Authentication: Always use Azure CLI or Managed Identity, never hardcode keys
| Aspect | Agent Framework (Code-First) | Foundry Agent Service (No-Code) |
|---|---|---|
| Control | Full control over orchestration | Managed orchestration |
| Deployment | Self-hosted (Container Apps, AKS) | Hosted service |
| Workflows | Deterministic, graph-based | AI-driven, flexible |
| Use Case | Complex, custom workflows | Rapid prototyping, simple agents |
| Cost | Compute costs | Service + compute costs |
| Observability | Custom telemetry | Built-in monitoring |
Recommendation: Use Agent Framework when you need deterministic control and specification-driven development (your approach for chaos engineering).
- Replace
ConversableAgentwithChatAgent - Use
WorkflowBuilderinstead ofGroupChat - Tool calling syntax is standardized
- Thread management is more explicit
- Replace
KernelwithChatAgent - Plugins become
function_tooldecorators - Planner patterns become Workflows
- Memory stores adapt to
ChatMessageStoreProtocol
- Official Docs: https://learn.microsoft.com/en-us/agent-framework/
- GitHub Repo: https://github.com/microsoft/agent-framework
- Lab Examples: https://github.com/microsoft/agentic-ai-lab
- Weekly Office Hours: Join via GitHub repo
- Discord: Community support channel
# Core agent classes
from agent_framework import ChatAgent, AIAgent
from agent_framework import AgentThread
# Azure clients
from agent_framework.azure import AzureChatClient
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.azure import AzureAIAgentClient
# Tools
from agent_framework import function_tool
from agent_framework import HostedCodeInterpreterTool
from agent_framework import HostedFileSearchTool
from agent_framework import HostedBingGroundingTool
# Workflows
from agent_framework import WorkflowBuilder
from agent_framework import SequentialBuilder
from agent_framework import ConcurrentBuilder
from agent_framework import HandoffBuilder
from agent_framework import GroupChatBuilder
# Core workflow components
from agent_framework import Executor, WorkflowContext, handler
# Events
from agent_framework import WorkflowCompletedEvent
from agent_framework import AgentRunUpdateEvent
from agent_framework import ChatMessage, Role
# Authentication
from azure.identity import AzureCliCredential
from azure.identity.aio import DefaultAzureCredential
# MCP
from agent_framework.mcp import AgentMCPServer, MCPClient
# Observability
from agent_framework.observability import configure_tracingSolution: Use AgentThread to maintain conversation state
Solution: Check for circular dependencies in workflow graph, enable debug logging
Solution: Verify tool function signatures, check async/sync compatibility
Solution: Run az login, verify subscription access, check environment variables
Solution: Implement thread cleanup, use summarization agents, set token limits
The following sections detail ALL examples from the agent-framework directory in the agentic-ai-lab repository.
Located in: agent-framework/agents/azure_ai_agents/
File: azure_ai_with_explicit_settings.ipynb
from agent_framework.azure import AzureAIAgentClient
from azure.identity.aio import AzureCliCredential
import os
# Explicit configuration
async with AzureCliCredential() as credential:
client = AzureAIAgentClient(
async_credential=credential,
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"]
)
agent = client.create_agent(
name="ExplicitConfigAgent",
instructions="You are a helpful assistant with explicit settings",
model="gpt-4o"
)
result = await agent.run("Hello!")
print(result.text)Key Concept: Direct configuration of project endpoints and model settings.
File: azure_ai_with_existing_agent.ipynb
from agent_framework.azure import AzureAIAgentClient
# Reuse an existing agent by ID
client = AzureAIAgentClient(async_credential=credential)
# Load existing agent
agent_id = "existing-agent-id-from-azure"
agent = client.get_agent(agent_id)
# Use the existing agent
result = await agent.run("Continue our conversation")Key Concept: Reuse agents created in Azure AI Foundry portal or previous sessions.
File: azure_ai_with_existing_thread.ipynb
from agent_framework import AgentThread
# Create a new thread
thread = AgentThread()
thread_id = thread.id
# Later, resume with existing thread ID
existing_thread = AgentThread(thread_id=thread_id)
result = await agent.run(
"What did we discuss earlier?",
thread=existing_thread
)Key Concept: Persist conversation state across sessions by storing thread IDs.
File: azure_ai_with_function_tools.ipynb
from agent_framework import function_tool
from agent_framework.azure import AzureAIAgentClient
@function_tool
def get_weather(location: str, unit: str = "celsius") -> str:
"""
Get current weather for a location.
Args:
location: City name or coordinates
unit: Temperature unit (celsius or fahrenheit)
Returns:
Weather information as string
"""
# Implement weather API call
return f"Weather in {location}: 22°{unit[0].upper()}, Sunny"
@function_tool
def search_flights(origin: str, destination: str, date: str) -> dict:
"""Search for available flights."""
return {
"flights": [
{"flight": "AA123", "price": 299, "departure": "10:00"},
{"flight": "DL456", "price": 325, "departure": "14:30"}
]
}
# Create agent with multiple tools
agent = client.create_agent(
name="TravelAgent",
instructions="You help users with travel planning",
tools=[get_weather, search_flights]
)
result = await agent.run(
"What's the weather in Paris and find flights from NYC to Paris on Dec 15?"
)Key Concept: Use @function_tool decorator for Python functions. Framework handles serialization, parameter validation, and tool calling.
File: azure_ai_with_code_interpreter.ipynb
from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.azure import AzureAIAgentClient
agent = ChatAgent(
chat_client=AzureAIAgentClient(async_credential=credential),
instructions="""You are a data analyst.
Use Python to analyze data and create visualizations.""",
tools=HostedCodeInterpreterTool()
)
# Agent can now execute Python code
result = await agent.run("""
Analyze this dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Calculate mean, median, and create a histogram.
""")
# The agent will write and execute Python code:
# import numpy as np
# import matplotlib.pyplot as plt
# data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# mean = np.mean(data)
# median = np.median(data)
# plt.hist(data)Key Concept: Hosted execution environment for Python code. Supports numpy, pandas, matplotlib, etc.
File: azure_ai_with_file_search.ipynb
from agent_framework import ChatAgent, HostedFileSearchTool
from agent_framework.azure import AzureAIAgentClient
# Upload files to Azure AI (done via portal or SDK)
# Files are automatically indexed for semantic search
agent = ChatAgent(
chat_client=AzureAIAgentClient(async_credential=credential),
instructions="You answer questions based on the uploaded documents",
tools=HostedFileSearchTool()
)
result = await agent.run(
"What does the Q3 financial report say about revenue growth?"
)
# Agent automatically:
# 1. Searches uploaded documents using semantic similarity
# 2. Retrieves relevant chunks
# 3. Generates answer with citationsKey Concept: Built-in RAG with automatic document indexing and semantic search.
File: azure_ai_with_bing_grounding.ipynb
from agent_framework import ChatAgent, HostedBingGroundingTool
from agent_framework.azure import AzureAIAgentClient
agent = ChatAgent(
chat_client=AzureAIAgentClient(async_credential=credential),
instructions="You provide up-to-date information using web search",
tools=HostedBingGroundingTool()
)
result = await agent.run(
"What are the latest developments in quantum computing?"
)
# Agent uses Bing to search web and ground responses in current informationKey Concept: Real-time web search capability for current events and information.
Located in: agent-framework/agents/mcp/
File: agent_as_mcp_server.py
from agent_framework.mcp import AgentMCPServer
from agent_framework.azure import AzureAIAgentClient
import asyncio
# Create agent
client = AzureAIAgentClient(async_credential=credential)
agent = client.create_agent(
name="DataAnalystAgent",
instructions="You analyze data and provide insights",
tools=[HostedCodeInterpreterTool()]
)
# Expose agent as MCP server
server = AgentMCPServer(
agent=agent,
name="data-analyst-mcp",
description="Data analysis agent via MCP"
)
# Start MCP server
await server.start(host="0.0.0.0", port=8080)
# Other applications can now call this agent via MCP protocolKey Concept: Turn any agent into an MCP server for inter-process/inter-service communication.
File: mcp_api_key_auth.py
from agent_framework.mcp import AgentMCPServer, APIKeyAuth
# Create MCP server with API key authentication
auth = APIKeyAuth(
api_keys=["your-secret-key-1", "your-secret-key-2"]
)
server = AgentMCPServer(
agent=agent,
name="secure-agent-mcp",
auth=auth
)
await server.start(port=8080)
# Clients must include API key in headers:
# Authorization: Bearer your-secret-key-1Key Concept: Secure MCP servers with API key authentication for production deployments.
Located in: agent-framework/workflows/
Sequential Orchestration:
from agent_framework import SequentialBuilder
researcher = client.create_agent(
name="researcher",
instructions="Research the topic thoroughly"
)
writer = client.create_agent(
name="writer",
instructions="Write a comprehensive article"
)
editor = client.create_agent(
name="editor",
instructions="Edit for clarity and accuracy"
)
workflow = SequentialBuilder().participants([
researcher,
writer,
editor
]).build()
result = await workflow.run("Write about Kubernetes networking")Concurrent Orchestration:
from agent_framework import ConcurrentBuilder
security_analyst = client.create_agent(
name="security",
instructions="Analyze security vulnerabilities"
)
performance_analyst = client.create_agent(
name="performance",
instructions="Analyze performance bottlenecks"
)
cost_analyst = client.create_agent(
name="cost",
instructions="Analyze cost optimization opportunities"
)
workflow = ConcurrentBuilder().participants([
security_analyst,
performance_analyst,
cost_analyst
]).build()
result = await workflow.run("Analyze this AKS cluster configuration")Handoff Orchestration:
from agent_framework import HandoffBuilder
triage = client.create_agent(
name="triage",
instructions="Determine issue type and route to specialist"
)
network_specialist = client.create_agent(
name="network",
instructions="Handle network-related issues"
)
storage_specialist = client.create_agent(
name="storage",
instructions="Handle storage-related issues"
)
workflow = HandoffBuilder().participants([
triage,
network_specialist,
storage_specialist
]).build()Group Chat Orchestration:
from agent_framework import GroupChatBuilder
architect = client.create_agent(
name="architect",
instructions="Design system architecture"
)
developer = client.create_agent(
name="developer",
instructions="Implement solutions"
)
tester = client.create_agent(
name="tester",
instructions="Test and validate"
)
workflow = GroupChatBuilder().participants([
architect,
developer,
tester
]).max_rounds(10).build()from agent_framework import WorkflowBuilder, CheckpointConfig
from agent_framework.checkpointing import AzureBlobCheckpointer
# Configure Azure Blob Storage for checkpoints
checkpointer = AzureBlobCheckpointer(
connection_string=os.environ["AZURE_STORAGE_CONNECTION_STRING"],
container_name="workflow-checkpoints"
)
workflow = builder.build(
checkpoint_config=CheckpointConfig(
checkpointer=checkpointer,
checkpoint_interval=5, # Every 5 steps
enable_auto_resume=True
)
)
# Run workflow - automatically checkpoints progress
result = await workflow.run("Long running task")
# Resume from failure
if workflow.failed:
result = await workflow.resume_from_checkpoint()from agent_framework import Executor, WorkflowContext, handler
from agent_framework import HumanApprovalRequired
class ApprovalGate(Executor):
"""Executor that requires human approval"""
@handler
async def process(self, data: dict, ctx: WorkflowContext):
# Request human approval
approval_request = {
"data": data,
"timestamp": datetime.now(),
"status": "pending"
}
# Emit event for external system to handle
await ctx.emit_event("approval_required", approval_request)
# Wait for approval (via external webhook or API)
approval = await ctx.wait_for_event("approval_response")
if approval["approved"]:
await ctx.send_message(data)
else:
await ctx.terminate("Workflow rejected by human reviewer")
# Add to workflow
builder = WorkflowBuilder()
builder.add_executor(processor_agent)
builder.add_executor(ApprovalGate())
builder.add_executor(deployment_agent)Magentic Pattern: Manager agent dynamically creates and manages a task list with specialized workers.
from agent_framework import MagenticBuilder
manager = client.create_agent(
name="manager",
instructions="""You are a project manager.
Break down complex tasks into subtasks.
Assign subtasks to appropriate specialists.
Track progress and synthesize results."""
)
code_specialist = client.create_agent(
name="coder",
instructions="You write and debug code"
)
doc_specialist = client.create_agent(
name="documenter",
instructions="You write documentation"
)
test_specialist = client.create_agent(
name="tester",
instructions="You write and run tests"
)
workflow = MagenticBuilder(
manager_agent=manager
).workers([
code_specialist,
doc_specialist,
test_specialist
]).build()
# Manager dynamically assigns tasks to workers
result = await workflow.run(
"Create a REST API for user management with tests and documentation"
)Located in: agent-framework/middleware/
File: 2-function_based_middleware.ipynb
from agent_framework import MiddlewareContext
async def logging_middleware(context: MiddlewareContext, next):
"""Log all agent interactions"""
print(f"[START] Agent: {context.agent.name}")
print(f"[INPUT] {context.message}")
result = await next()
print(f"[OUTPUT] {result.text}")
print(f"[TOKENS] {result.usage}")
print(f"[END] Agent: {context.agent.name}")
return result
agent = client.create_agent(
name="LoggedAgent",
instructions="You are helpful",
middleware=[logging_middleware]
)File: 3-class_based_middleware.ipynb
from agent_framework import Middleware
class TokenLimitMiddleware(Middleware):
"""Enforce token limits on agent responses"""
def __init__(self, max_tokens: int = 1000):
self.max_tokens = max_tokens
async def handle(self, context: MiddlewareContext, next):
# Modify request
context.options.max_tokens = self.max_tokens
result = await next()
# Verify response
if result.usage.total_tokens > self.max_tokens:
raise Exception(f"Token limit exceeded: {result.usage.total_tokens}")
return result
agent = client.create_agent(
name="LimitedAgent",
instructions="Be concise",
middleware=[TokenLimitMiddleware(max_tokens=500)]
)File: 4-decorator_middleware.ipynb
from agent_framework import middleware
@middleware
async def retry_middleware(context, next, max_retries=3):
"""Retry failed agent calls"""
for attempt in range(max_retries):
try:
return await next()
except Exception as e:
if attempt == max_retries - 1:
raise
print(f"Retry {attempt + 1}/{max_retries}")
await asyncio.sleep(2 ** attempt)File: 5-chat_middleware.ipynb
class ContentFilterMiddleware(Middleware):
"""Filter inappropriate content"""
async def handle(self, context, next):
# Pre-filter input
if self.contains_inappropriate_content(context.message):
return ChatResponse(
text="I cannot respond to that request.",
flagged=True
)
result = await next()
# Post-filter output
if self.contains_inappropriate_content(result.text):
return ChatResponse(
text="Response filtered for content policy.",
flagged=True
)
return resultFile: 6-exception_handling_with_middleware.ipynb
class GracefulErrorMiddleware(Middleware):
"""Handle errors gracefully"""
async def handle(self, context, next):
try:
return await next()
except TimeoutError:
return ChatResponse(
text="The request timed out. Please try again.",
error=True
)
except ValueError as e:
return ChatResponse(
text=f"Invalid input: {str(e)}",
error=True
)
except Exception as e:
# Log error for debugging
self.log_error(e, context)
return ChatResponse(
text="An unexpected error occurred. Please contact support.",
error=True
)File: 7-middleware_termination.ipynb
class SafetyTerminationMiddleware(Middleware):
"""Terminate conversation on safety violations"""
async def handle(self, context, next):
result = await next()
if self.is_safety_violation(result.text):
# Terminate the conversation
context.terminate_conversation(
reason="Safety policy violation detected"
)
return ChatResponse(
text="This conversation has been terminated.",
terminated=True
)
return resultFile: 8-override_result_with_middleware.ipynb
class TranslationMiddleware(Middleware):
"""Automatically translate responses"""
def __init__(self, target_language="de"):
self.target_language = target_language
async def handle(self, context, next):
result = await next()
# Override result with translation
translated = await self.translate(
result.text,
self.target_language
)
return ChatResponse(
text=translated,
original=result.text,
language=self.target_language
)File: 9-shared_state_middleware.ipynb
class ConversationStateMiddleware(Middleware):
"""Maintain shared state across middleware"""
def __init__(self):
self.conversation_state = {
"turn_count": 0,
"topics": [],
"user_preferences": {}
}
async def handle(self, context, next):
# Access shared state
context.state.update(self.conversation_state)
result = await next()
# Update shared state
self.conversation_state["turn_count"] += 1
self.conversation_state["topics"].append(
self.extract_topic(context.message)
)
return resultLocated in: agent-framework/threads/
File: 1-in_memory_thread.ipynb
from agent_framework import AgentThread
# Simple in-memory conversation
thread = AgentThread()
result1 = await agent.run("My name is Ivan", thread=thread)
result2 = await agent.run("What's my name?", thread=thread)
# Agent remembers: "Your name is Ivan"File: 2-custom_chat_message_store_thread.ipynb
from agent_framework import ChatMessageStoreProtocol, ChatMessage
from typing import List
class DatabaseMessageStore(ChatMessageStoreProtocol):
"""Store messages in SQL database"""
def __init__(self, db_connection):
self.db = db_connection
async def save_messages(
self,
thread_id: str,
messages: List[ChatMessage]
) -> None:
await self.db.execute(
"INSERT INTO messages (thread_id, role, content) VALUES (?, ?, ?)",
[(thread_id, m.role, m.text) for m in messages]
)
async def load_messages(
self,
thread_id: str
) -> List[ChatMessage]:
rows = await self.db.query(
"SELECT role, content FROM messages WHERE thread_id = ?",
[thread_id]
)
return [ChatMessage(role=r[0], text=r[1]) for r in rows]
async def delete_thread(self, thread_id: str) -> None:
await self.db.execute(
"DELETE FROM messages WHERE thread_id = ?",
[thread_id]
)
# Use custom store
store = DatabaseMessageStore(db_connection)
thread = AgentThread(message_store=store)File: 3-redis_chat_message_store_thread.ipynb
import redis.asyncio as redis
from agent_framework import ChatMessageStoreProtocol
import json
class RedisChatMessageStore(ChatMessageStoreProtocol):
"""Store messages in Redis"""
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
async def save_messages(self, thread_id: str, messages):
# Store as JSON list in Redis
messages_json = json.dumps([
{"role": m.role, "text": m.text} for m in messages
])
await self.redis.set(f"thread:{thread_id}", messages_json)
await self.redis.expire(f"thread:{thread_id}", 3600) # 1 hour TTL
async def load_messages(self, thread_id: str):
data = await self.redis.get(f"thread:{thread_id}")
if not data:
return []
messages_list = json.loads(data)
return [
ChatMessage(role=m["role"], text=m["text"])
for m in messages_list
]
# Use Redis store
store = RedisChatMessageStore("redis://localhost:6379")
thread = AgentThread(message_store=store)File: 4-suspend_resume_thread.ipynb
from agent_framework import AgentThread
# Start conversation
thread = AgentThread()
thread_id = thread.id
result = await agent.run("Start researching AI safety", thread=thread)
# Suspend (save thread ID to database/storage)
save_to_database(user_id="user123", thread_id=thread_id)
# Later, resume from anywhere
saved_thread_id = load_from_database(user_id="user123")
resumed_thread = AgentThread(thread_id=saved_thread_id)
result = await agent.run(
"Continue the research and summarize findings",
thread=resumed_thread
)Located in: agent-framework/observability/
File: 2-azure_ai_chat_client_with_observability.ipynb
from agent_framework.observability import configure_observability
from azure.monitor.opentelemetry import configure_azure_monitor
# Configure Azure Monitor (Application Insights)
configure_azure_monitor(
connection_string=os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
)
# Configure framework observability
configure_observability(
service_name="my-agent-system",
service_version="1.0.0",
enable_tracing=True,
enable_metrics=True,
enable_logging=True
)
# All agent calls are now automatically traced
agent = client.create_agent(name="ObservableAgent", instructions="Help users")
result = await agent.run("Hello")
# Traces include:
# - Agent invocation span
# - LLM call spans
# - Tool call spans
# - Token usage metrics
# - Latency metrics
# - Error tracesWhat Gets Traced:
- Agent creation and configuration
- Every agent.run() invocation
- LLM API calls (including retries)
- Tool/function calls
- Workflow execution steps
- Message passing between agents
- Token usage and costs
- Errors and exceptions
Located in: agent-framework/devui/
The DevUI provides a web interface for testing and debugging agents locally.
from agent_framework.devui import start_devui
# Create your agents
agents = {
"helper": helper_agent,
"researcher": researcher_agent,
"analyst": analyst_agent
}
# Start dev UI
start_devui(
agents=agents,
port=3000,
enable_hot_reload=True
)
# Access at: http://localhost:3000
# Features:
# - Chat with any agent
# - View conversation history
# - Inspect tool calls
# - Monitor token usage
# - Debug workflows
# - Test different inputsBased on the lab examples, here's a checklist for implementing Microsoft Agent Framework:
- Install
agent-frameworkpackage - Configure Azure authentication (
az login) - Set environment variables (AZURE_OPENAI_ENDPOINT, etc.)
- Create Azure AI Foundry project
- Deploy models (gpt-4o, embeddings)
- Create basic agent with AzureAIAgentClient
- Add function tools with @function_tool decorator
- Implement code interpreter for data analysis
- Add file search for RAG scenarios
- Integrate Bing grounding for web search
- Manage conversation state with AgentThread
- Implement custom message store (Redis/Database)
- Build sequential workflows for step-by-step tasks
- Implement concurrent workflows for parallel analysis
- Create handoff workflows for routing
- Set up group chat for collaborative agents
- Implement Magentic orchestration for dynamic task management
- Add checkpointing for long-running workflows
- Implement human-in-the-loop approval gates
- Add logging middleware for debugging
- Implement retry middleware for resilience
- Add content filtering middleware
- Create token limit enforcement
- Implement custom error handling
- Add translation or transformation middleware
- Expose agent as MCP server
- Add API key authentication to MCP server
- Connect to external MCP servers from agents
- Implement MCP tool discovery
- Configure Azure Monitor/Application Insights
- Enable OpenTelemetry tracing
- Set up custom metrics collection
- Implement error tracking and alerting
- Create dashboard for agent monitoring
- Use DevUI for local testing
- Create unit tests for function tools
- Test workflow paths and error handling
- Validate thread persistence and resumption
- Performance test with concurrent users
- Simple Agent: Single LLM with tools →
ChatAgent - Multi-turn Chat: Conversation with context →
AgentThread - Sequential Tasks: Step-by-step processing →
SequentialBuilder - Parallel Analysis: Independent tasks →
ConcurrentBuilder - Complex Orchestration: Custom logic →
WorkflowBuilder - Long-Running: Hours/days duration → Enable checkpointing
- Human Approval: Review steps → Human-in-the-loop pattern
- External Tools: MCP servers →
MCPClient - RAG/Knowledge: Document search →
HostedFileSearchTool - Code Execution: Data analysis →
HostedCodeInterpreterTool - Web Search: Current info →
HostedBingGroundingTool
Your Use Case (Chaos Engineering):
- Multiple specialized agents (hypothesis, experiment, analysis)
- Sequential workflow with validation steps
- Custom tools for kubectl/Azure CLI
- Specification-driven (GitHub Speckit)
- Observability for experiment tracking
→ Use:
SequentialBuilderwith custom function tools, checkpointing, and Azure Monitor integration
From github.com/microsoft/agentic-ai-lab/agent-framework/:
Agents:
azure_ai_with_explicit_settings.ipynb- Direct configurationazure_ai_with_existing_agent.ipynb- Reuse agentsazure_ai_with_existing_thread.ipynb- Conversation persistenceazure_ai_with_function_tools.ipynb- Custom functionsazure_ai_with_code_interpreter.ipynb- Python executionazure_ai_with_file_search.ipynb- RAG/document searchazure_ai_with_bing_grounding.ipynb- Web search
MCP:
agent_as_mcp_server.py- Expose agent as MCPmcp_api_key_auth.py- Secure MCP with auth
Workflows:
orchestration/- Sequential, Concurrent, Handoff, GroupChat, Magenticcheckpointing/- Save/resume long-running workflowshuman-in-the-loop/- Approval gatesmagentic/- Dynamic task management
Middleware:
2-function_based_middleware.ipynb- Simple function middleware3-class_based_middleware.ipynb- Reusable class middleware4-decorator_middleware.ipynb- Decorator pattern5-chat_middleware.ipynb- Chat-specific middleware6-exception_handling_with_middleware.ipynb- Error handling7-middleware_termination.ipynb- Conversation termination8-override_result_with_middleware.ipynb- Result transformation9-shared_state_middleware.ipynb- Cross-middleware state
Threads:
1-in_memory_thread.ipynb- Basic conversation state2-custom_chat_message_store_thread.ipynb- Custom storage3-redis_chat_message_store_thread.ipynb- Redis backend4-suspend_resume_thread.ipynb- Long-term persistence
Observability:
2-azure_ai_chat_client_with_observability.ipynb- Full tracing
DevUI:
- Web interface for local agent testing
This comprehensive reference captures ALL examples and patterns from the microsoft/agentic-ai-lab agent-framework repository. Last updated based on public preview documentation.