Skip to content

Instantly share code, notes, and snippets.

@cagataycali
Created November 8, 2025 08:20
Show Gist options
  • Select an option

  • Save cagataycali/b78a4fe0700a165cb60ac8b86efaef48 to your computer and use it in GitHub Desktop.

Select an option

Save cagataycali/b78a4fe0700a165cb60ac8b86efaef48 to your computer and use it in GitHub Desktop.
Strands Agents Integration Patterns & Protocols - Complete Implementation Guide

Strands Agents Integration Patterns & Protocols

Phase 3: Ecosystem Integration Analysis Complete implementation guide for integrating Strands Agents with existing systems and workflows.

πŸ“‹ Table of Contents

  1. MCP (Model Context Protocol) Integration
  2. A2A (Agent-to-Agent) Protocol
  3. AWS Service Integration
  4. External System Integration
  5. Production Integration Patterns
  6. Decision Matrix & Best Practices

πŸ“– Overview

This guide provides practical, implementation-ready patterns for integrating Strands Agents with enterprise systems. Each pattern includes:

  • Configuration templates
  • Code examples
  • Production considerations
  • Monitoring & observability
  • Security guidelines

Analysis Source: 44+ repositories from the Strands Agents ecosystem Focus: Production-ready integration patterns Target: Enterprise developers and integration architects


A2A (Agent-to-Agent) Protocol

🀝 Protocol Overview

Agent-to-Agent communication in Strands enables multi-agent orchestration, task delegation, and collaborative problem-solving.

Architecture Patterns

graph TB
    subgraph "Orchestrator Agent"
        MainAgent[Main Agent]
        TaskManager[Task Manager]
        SubAgentFactory[SubAgent Factory]
    end
    
    subgraph "Specialized Agents"
        SecurityAgent[Security Agent]
        AnalysisAgent[Analysis Agent]
        DocAgent[Documentation Agent]
    end
    
    subgraph "Communication Layer"
        GitHubAPI[GitHub API]
        WorkflowDispatch[Workflow Dispatch]
        MessageQueue[Message Queue]
    end
    
    MainAgent --> TaskManager
    TaskManager --> SubAgentFactory
    SubAgentFactory --> GitHubAPI
    GitHubAPI --> SecurityAgent
    GitHubAPI --> AnalysisAgent
    GitHubAPI --> DocAgent
    
    SecurityAgent --> MessageQueue
    AnalysisAgent --> MessageQueue
    DocAgent --> MessageQueue
    MessageQueue --> MainAgent
Loading

πŸ”„ Implementation Patterns

1. SubAgent Creation Tool

# tools/create_subagent.py - Production A2A Implementation
from strands import tool
import json
import uuid
from typing import Dict, Any, Optional

@tool
def create_subagent(
    repository: str,
    task: str,
    workflow_id: str = "agent.yml",
    model: str = "us.anthropic.claude-sonnet-4-20250514-v1:0",
    system_prompt: str = None,
    tools: str = None,
    context_data: Dict[str, Any] = None,
    coordination_mode: str = "async",
    timeout_minutes: int = 30
) -> Dict[str, Any]:
    """
    Enterprise-grade subagent creation with coordination patterns.
    
    Args:
        repository: Target repository for agent execution
        task: Specific task for the subagent
        workflow_id: GitHub Actions workflow to trigger
        model: LLM model for the subagent
        system_prompt: Specialized instructions
        tools: Comma-separated tools list
        context_data: Structured context to pass
        coordination_mode: 'async', 'sync', or 'callback'
        timeout_minutes: Maximum execution time
    
    Returns:
        Agent coordination metadata
    """
    
    # Generate unique coordination ID
    coordination_id = f"agent_{uuid.uuid4().hex[:8]}"
    
    # Prepare agent configuration
    agent_config = {
        "task": task,
        "model": model,
        "system_prompt": system_prompt or f"Specialized agent for: {task}",
        "tools": tools,
        "coordination_id": coordination_id,
        "parent_agent": os.environ.get("GITHUB_RUN_ID", "unknown"),
        "timeout_minutes": timeout_minutes
    }
    
    # Add context data if provided
    if context_data:
        agent_config["context_data"] = json.dumps(context_data)
    
    try:
        # Dispatch to GitHub Actions
        github_token = os.environ.get("GITHUB_TOKEN")
        if not github_token:
            raise ValueError("GITHUB_TOKEN not available")
        
        dispatch_result = dispatch_workflow(
            repository=repository,
            workflow_id=workflow_id,
            inputs=agent_config,
            token=github_token
        )
        
        # Setup coordination based on mode
        if coordination_mode == "sync":
            return wait_for_agent_completion(coordination_id, timeout_minutes)
        elif coordination_mode == "callback":
            return setup_agent_callback(coordination_id, agent_config)
        else:  # async
            return {
                "status": "dispatched",
                "coordination_id": coordination_id,
                "repository": repository,
                "workflow_run_id": dispatch_result.get("id"),
                "tracking_url": f"https://github.com/{repository}/actions"
            }
            
    except Exception as e:
        return {
            "status": "error",
            "error": str(e),
            "coordination_id": coordination_id
        }

def dispatch_workflow(repository: str, workflow_id: str, inputs: Dict, token: str):
    """Dispatch GitHub Actions workflow for subagent"""
    url = f"https://api.github.com/repos/{repository}/actions/workflows/{workflow_id}/dispatches"
    headers = {
        "Authorization": f"token {token}",
        "Accept": "application/vnd.github.v3+json"
    }
    
    payload = {
        "ref": "main",
        "inputs": inputs
    }
    
    response = requests.post(url, headers=headers, json=payload)
    response.raise_for_status()
    
    return {"status": "dispatched", "id": response.headers.get("X-GitHub-Request-Id")}

2. Agent Coordination Patterns

# Advanced Agent Coordination System
class AgentCoordinator:
    def __init__(self, coordination_backend="github"):
        self.backend = coordination_backend
        self.active_agents = {}
        self.message_queue = Queue()
    
    def orchestrate_task(self, complex_task: str) -> Dict[str, Any]:
        """Break down complex task into agent specializations"""
        
        # Task analysis and decomposition
        subtasks = self.decompose_task(complex_task)
        
        agent_assignments = []
        
        for subtask in subtasks:
            # Determine optimal agent configuration
            agent_spec = self.select_agent_specialization(subtask)
            
            # Create specialized agent
            agent_id = self.create_specialized_agent(
                task=subtask['description'],
                specialization=agent_spec['type'],
                tools=agent_spec['tools'],
                model=agent_spec['model']
            )
            
            agent_assignments.append({
                "agent_id": agent_id,
                "subtask": subtask,
                "specialization": agent_spec
            })
        
        # Monitor and coordinate execution
        return self.coordinate_execution(agent_assignments)
    
    def select_agent_specialization(self, subtask: Dict) -> Dict[str, Any]:
        """Select optimal agent configuration for task type"""
        
        task_type = subtask['type']
        
        specializations = {
            "security_analysis": {
                "type": "security",
                "model": "us.anthropic.claude-opus-4-20250514-v1:0",
                "tools": "file_read,python_repl,shell,use_github",
                "system_prompt": "Expert security analyst focusing on vulnerability detection"
            },
            "data_analysis": {
                "type": "analyst",
                "model": "us.anthropic.claude-sonnet-4-20250514-v1:0",
                "tools": "python_repl,file_read,http_request,calculator",
                "system_prompt": "Data analysis specialist with statistical expertise"
            },
            "documentation": {
                "type": "documenter",
                "model": "us.anthropic.claude-haiku-4-20250514-v1:0",
                "tools": "file_read,file_write,editor",
                "system_prompt": "Technical writing specialist for clear documentation"
            }
        }
        
        return specializations.get(task_type, specializations["data_analysis"])
    
    def coordinate_execution(self, agent_assignments: List[Dict]) -> Dict:
        """Coordinate multi-agent execution with dependency handling"""
        
        execution_plan = {
            "total_agents": len(agent_assignments),
            "coordination_id": f"coord_{uuid.uuid4().hex[:8]}",
            "status": "executing",
            "agents": agent_assignments
        }
        
        # Execute agents based on dependencies
        for assignment in agent_assignments:
            self.execute_agent_async(assignment)
        
        return execution_plan

3. Message Passing & Synchronization

# Inter-Agent Communication Protocol
class AgentMessageProtocol:
    def __init__(self):
        self.message_store = {}  # In production: Redis/database
        self.coordination_channels = {}
    
    def send_message(self, from_agent: str, to_agent: str, 
                    message: Dict[str, Any], message_type: str = "data"):
        """Send structured message between agents"""
        
        message_id = f"msg_{uuid.uuid4().hex[:8]}"
        
        message_envelope = {
            "id": message_id,
            "from": from_agent,
            "to": to_agent,
            "type": message_type,
            "timestamp": datetime.utcnow().isoformat(),
            "payload": message,
            "status": "sent"
        }
        
        # Store message (GitHub Issues/Gists in this implementation)
        self.store_message(message_envelope)
        
        # Notify receiving agent
        self.notify_agent(to_agent, message_id)
        
        return message_id
    
    def receive_messages(self, agent_id: str, message_type: str = None) -> List[Dict]:
        """Retrieve messages for an agent"""
        
        messages = self.get_messages_for_agent(agent_id)
        
        if message_type:
            messages = [m for m in messages if m['type'] == message_type]
        
        # Mark as received
        for msg in messages:
            msg['status'] = 'received'
            self.update_message_status(msg['id'], 'received')
        
        return messages
    
    def store_message(self, message: Dict[str, Any]):
        """Store message using GitHub as transport layer"""
        
        # Use GitHub Gist as message storage
        gist_content = {
            "message.json": json.dumps(message, indent=2)
        }
        
        gist_result = self.create_coordination_gist(
            description=f"Agent message {message['id']}",
            files=gist_content,
            public=False
        )
        
        message['storage_url'] = gist_result.get('html_url')
        return message

🎯 Production Deployment

GitHub Actions Workflow for A2A

# .github/workflows/coordinated-agent.yml
name: Coordinated Agent System

on:
  workflow_dispatch:
    inputs:
      coordination_mode:
        description: 'Agent coordination mode'
        required: true
        type: choice
        options:
          - orchestrator
          - specialist
          - callback
      task_definition:
        description: 'JSON task definition'
        required: true
        type: string
      parent_coordination_id:
        description: 'Parent coordination ID for tracking'
        required: false
        type: string

jobs:
  coordinate-agents:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Parse Task Definition
        id: parse_task
        run: |
          echo '${{ inputs.task_definition }}' | jq '.' > task.json
          echo "task_type=$(jq -r '.type' task.json)" >> $GITHUB_OUTPUT
          echo "complexity=$(jq -r '.complexity' task.json)" >> $GITHUB_OUTPUT
      
      - name: Execute Orchestrator Agent
        if: inputs.coordination_mode == 'orchestrator'
        uses: ./
        with:
          task: ${{ inputs.task_definition }}
          system_prompt: "You are an orchestrator agent coordinating multiple specialists"
          tools: "create_subagent,use_github,file_read,python_repl"
          coordination_id: ${{ inputs.parent_coordination_id }}
      
      - name: Execute Specialist Agent  
        if: inputs.coordination_mode == 'specialist'
        uses: ./
        with:
          task: ${{ inputs.task_definition }}
          system_prompt: ${{ fromJson(inputs.task_definition).system_prompt }}
          tools: ${{ fromJson(inputs.task_definition).tools }}
          model: ${{ fromJson(inputs.task_definition).model }}
          coordination_id: ${{ inputs.parent_coordination_id }}

Monitoring & Observability

# Agent Coordination Monitoring
from dataclasses import dataclass
from typing import List, Dict

@dataclass
class AgentMetrics:
    coordination_id: str
    total_agents: int
    completed_agents: int
    failed_agents: int
    total_execution_time: float
    message_count: int
    error_rate: float

class CoordinationMonitor:
    def __init__(self):
        self.active_coordinations = {}
        self.metrics_collector = MetricsCollector()
    
    def track_coordination(self, coordination_id: str, agent_count: int):
        """Start tracking a coordination session"""
        self.active_coordinations[coordination_id] = {
            'start_time': time.time(),
            'agent_count': agent_count,
            'completed': 0,
            'failed': 0,
            'messages': 0
        }
    
    def agent_completed(self, coordination_id: str, agent_id: str, success: bool):
        """Record agent completion"""
        if coordination_id in self.active_coordinations:
            coord = self.active_coordinations[coordination_id]
            if success:
                coord['completed'] += 1
            else:
                coord['failed'] += 1
            
            # Check if coordination is complete
            total_finished = coord['completed'] + coord['failed']
            if total_finished >= coord['agent_count']:
                self.finalize_coordination(coordination_id)
    
    def get_coordination_status(self, coordination_id: str) -> Dict:
        """Get real-time coordination status"""
        if coordination_id not in self.active_coordinations:
            return {"status": "not_found"}
        
        coord = self.active_coordinations[coordination_id]
        
        return {
            "status": "active",
            "progress": {
                "completed": coord['completed'],
                "failed": coord['failed'], 
                "total": coord['agent_count'],
                "completion_rate": coord['completed'] / coord['agent_count']
            },
            "runtime_seconds": time.time() - coord['start_time']
        }

AWS Service Integration

☁️ Architecture Overview

Strands Agents integrates deeply with AWS services for compute, storage, AI/ML, and infrastructure management.

graph TB
    subgraph "Strands Agent Runtime"
        Agent[Agent Core]
        SessionMgr[S3SessionManager]
        KnowledgeBase[Bedrock KB]
    end
    
    subgraph "AWS Compute"
        Lambda[AWS Lambda]
        Fargate[ECS Fargate]
        EC2[EC2 Instances]
    end
    
    subgraph "AWS AI/ML"
        Bedrock[Amazon Bedrock]
        SageMaker[SageMaker]
        Comprehend[Comprehend]
    end
    
    subgraph "AWS Storage"
        S3[S3 Buckets]
        DynamoDB[DynamoDB]
        OpenSearch[OpenSearch]
    end
    
    Agent --> SessionMgr
    Agent --> KnowledgeBase
    SessionMgr --> S3
    KnowledgeBase --> Bedrock
    
    Agent --> Lambda
    Agent --> OpenSearch
    Agent --> DynamoDB
Loading

πŸ”§ Core Integration Patterns

1. S3 Session Management

# Enhanced S3 Session Manager with enterprise features
from strands.session.s3_session_manager import S3SessionManager
import boto3
import json
from datetime import datetime, timedelta

class EnterpriseS3SessionManager(S3SessionManager):
    def __init__(self, session_id: str, bucket: str, prefix: str = "conversations/",
                 region_name: str = "us-west-2", encryption_config: dict = None):
        super().__init__(session_id, bucket, prefix, region_name)
        
        self.encryption_config = encryption_config or {
            "SSEAlgorithm": "aws:kms",
            "KMSKeyId": "alias/strands-agents-key"
        }
        
        # Initialize enhanced S3 client with retry configuration
        self.s3_client = boto3.client(
            's3',
            region_name=region_name,
            config=boto3.session.Config(
                retries={
                    'max_attempts': 10,
                    'mode': 'adaptive'
                },
                max_pool_connections=50
            )
        )
    
    def save_conversation_with_metadata(self, conversation_data: dict, 
                                      metadata: dict = None) -> str:
        """Save conversation with comprehensive metadata"""
        
        # Enhance conversation data with metadata
        enhanced_data = {
            "conversation": conversation_data,
            "metadata": {
                "session_id": self.session_id,
                "timestamp": datetime.utcnow().isoformat(),
                "version": "1.0",
                "agent_version": os.environ.get("STRANDS_VERSION", "unknown"),
                "custom_metadata": metadata or {}
            },
            "tags": self.generate_conversation_tags(conversation_data)
        }
        
        # Save with server-side encryption
        key = f"{self.prefix}{self.session_id}/conversation_{datetime.utcnow().isoformat()}.json"
        
        self.s3_client.put_object(
            Bucket=self.bucket,
            Key=key,
            Body=json.dumps(enhanced_data, indent=2),
            ServerSideEncryption=self.encryption_config["SSEAlgorithm"],
            SSEKMSKeyId=self.encryption_config.get("KMSKeyId"),
            Metadata={
                "session-id": self.session_id,
                "conversation-type": metadata.get("type", "standard"),
                "agent-model": metadata.get("model", "unknown")
            },
            Tagging=f"Environment=production&Project=strands-agents&SessionId={self.session_id}"
        )
        
        return key
    
    def setup_lifecycle_management(self):
        """Configure S3 lifecycle policies for cost optimization"""
        
        lifecycle_config = {
            'Rules': [
                {
                    'ID': 'StrandsAgentConversations',
                    'Status': 'Enabled',
                    'Filter': {'Prefix': self.prefix},
                    'Transitions': [
                        {
                            'Days': 30,
                            'StorageClass': 'STANDARD_IA'
                        },
                        {
                            'Days': 90,
                            'StorageClass': 'GLACIER'
                        },
                        {
                            'Days': 365,
                            'StorageClass': 'DEEP_ARCHIVE'
                        }
                    ],
                    'Expiration': {
                        'Days': 2555  # 7 years retention
                    }
                }
            ]
        }
        
        self.s3_client.put_bucket_lifecycle_configuration(
            Bucket=self.bucket,
            LifecycleConfiguration=lifecycle_config
        )

2. Bedrock Integration

# Advanced Bedrock Integration with Knowledge Base
class EnterpriseBedrockIntegration:
    def __init__(self, region: str = "us-west-2"):
        self.bedrock_client = boto3.client('bedrock-runtime', region_name=region)
        self.bedrock_agent_client = boto3.client('bedrock-agent', region_name=region)
        self.knowledge_base_id = os.environ.get('STRANDS_KNOWLEDGE_BASE_ID')
        
    def setup_knowledge_base(self, name: str, description: str, 
                           opensearch_config: dict) -> str:
        """Create and configure Bedrock Knowledge Base"""
        
        # Create Knowledge Base
        response = self.bedrock_agent_client.create_knowledge_base(
            name=name,
            description=description,
            roleArn=os.environ.get('BEDROCK_KB_ROLE_ARN'),
            knowledgeBaseConfiguration={
                'type': 'VECTOR',
                'vectorKnowledgeBaseConfiguration': {
                    'embeddingModelArn': f'arn:aws:bedrock:{self.region}::foundation-model/amazon.titan-embed-text-v1'
                }
            },
            storageConfiguration={
                'type': 'OPENSEARCH_SERVERLESS',
                'opensearchServerlessConfiguration': {
                    'collectionArn': opensearch_config['collection_arn'],
                    'vectorIndexName': opensearch_config['index_name'],
                    'fieldMapping': {
                        'vectorField': 'vector',
                        'textField': 'text',
                        'metadataField': 'metadata'
                    }
                }
            }
        )
        
        return response['knowledgeBase']['knowledgeBaseId']
    
    def store_conversation_in_kb(self, conversation: dict, title: str) -> str:
        """Store conversation in Bedrock Knowledge Base"""
        
        if not self.knowledge_base_id:
            raise ValueError("Knowledge Base ID not configured")
        
        # Format conversation for knowledge base
        content = self.format_conversation_for_kb(conversation, title)
        
        # Create data source document
        document_id = f"conversation_{uuid.uuid4().hex}"
        
        # Upload to S3 first (required for Bedrock KB)
        s3_key = self.upload_to_s3_for_kb(content, document_id)
        
        # Trigger ingestion job
        response = self.bedrock_agent_client.start_ingestion_job(
            knowledgeBaseId=self.knowledge_base_id,
            dataSourceId=self.get_or_create_data_source(),
            description=f"Ingestion job for conversation: {title}"
        )
        
        return response['ingestionJob']['ingestionJobId']
    
    def retrieve_relevant_context(self, query: str, max_results: int = 5) -> List[Dict]:
        """Retrieve relevant context from Knowledge Base"""
        
        if not self.knowledge_base_id:
            return []
        
        response = self.bedrock_agent_client.retrieve(
            knowledgeBaseId=self.knowledge_base_id,
            retrievalQuery={'text': query},
            retrievalConfiguration={
                'vectorSearchConfiguration': {
                    'numberOfResults': max_results,
                    'overrideSearchType': 'HYBRID'
                }
            }
        )
        
        return [
            {
                'content': result['content']['text'],
                'score': result['score'],
                'source': result.get('location', {})
            }
            for result in response['retrievalResults']
        ]

3. OpenSearch Serverless Integration

# OpenSearch Serverless for Vector Search
class OpenSearchVectorStore:
    def __init__(self, collection_endpoint: str, region: str = "us-west-2"):
        self.collection_endpoint = collection_endpoint
        self.region = region
        
        # Setup authenticated client
        credentials = boto3.Session().get_credentials()
        auth = AWS4Auth(
            credentials.access_key,
            credentials.secret_key, 
            region,
            'aoss',
            session_token=credentials.token
        )
        
        self.client = OpenSearch(
            hosts=[{'host': collection_endpoint, 'port': 443}],
            http_auth=auth,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection,
            pool_maxsize=20
        )
    
    def create_vector_index(self, index_name: str, dimension: int = 1536):
        """Create optimized vector index for agent conversations"""
        
        index_body = {
            "settings": {
                "index.knn": True,
                "number_of_shards": 2,
                "number_of_replicas": 1
            },
            "mappings": {
                "properties": {
                    "vector": {
                        "type": "knn_vector",
                        "dimension": dimension,
                        "method": {
                            "name": "hnsw",
                            "space_type": "cosinesimil",
                            "engine": "lucene",
                            "parameters": {
                                "ef_construction": 128,
                                "m": 24
                            }
                        }
                    },
                    "text": {"type": "text"},
                    "metadata": {
                        "properties": {
                            "session_id": {"type": "keyword"},
                            "timestamp": {"type": "date"},
                            "agent_model": {"type": "keyword"},
                            "conversation_type": {"type": "keyword"}
                        }
                    }
                }
            }
        }
        
        self.client.indices.create(index=index_name, body=index_body)
    
    def store_conversation_vector(self, conversation_id: str, text: str, 
                                vector: List[float], metadata: Dict):
        """Store conversation with vector embedding"""
        
        doc = {
            "text": text,
            "vector": vector,
            "metadata": metadata
        }
        
        self.client.index(
            index="agent-conversations",
            id=conversation_id,
            body=doc
        )
    
    def semantic_search(self, query_vector: List[float], 
                       filters: Dict = None, k: int = 5) -> List[Dict]:
        """Perform semantic search with optional metadata filters"""
        
        search_body = {
            "size": k,
            "query": {
                "bool": {
                    "must": [
                        {
                            "knn": {
                                "vector": {
                                    "vector": query_vector,
                                    "k": k
                                }
                            }
                        }
                    ]
                }
            }
        }
        
        # Add metadata filters
        if filters:
            filter_clauses = []
            for key, value in filters.items():
                filter_clauses.append({"term": {f"metadata.{key}": value}})
            
            search_body["query"]["bool"]["filter"] = filter_clauses
        
        response = self.client.search(index="agent-conversations", body=search_body)
        
        return [
            {
                "id": hit["_id"],
                "score": hit["_score"],
                "text": hit["_source"]["text"],
                "metadata": hit["_source"]["metadata"]
            }
            for hit in response["hits"]["hits"]
        ]

πŸš€ Production Deployment Patterns

Infrastructure as Code (CDK)

# AWS CDK Stack for Strands Agents Infrastructure
from aws_cdk import (
    Stack, Duration,
    aws_s3 as s3,
    aws_opensearchserverless as opensearch,
    aws_iam as iam,
    aws_lambda as lambda_,
    aws_events as events,
    aws_events_targets as targets
)

class StrandsAgentsInfraStack(Stack):
    def __init__(self, scope, construct_id, **kwargs):
        super().__init__(scope, construct_id, **kwargs)
        
        # S3 Bucket for conversation storage
        self.conversation_bucket = s3.Bucket(
            self, "ConversationBucket",
            bucket_name="strands-agents-conversations",
            encryption=s3.BucketEncryption.KMS_MANAGED,
            versioned=True,
            lifecycle_rules=[
                s3.LifecycleRule(
                    id="ConversationLifecycle",
                    enabled=True,
                    transitions=[
                        s3.Transition(
                            storage_class=s3.StorageClass.INFREQUENT_ACCESS,
                            transition_after=Duration.days(30)
                        ),
                        s3.Transition(
                            storage_class=s3.StorageClass.GLACIER,
                            transition_after=Duration.days(90)
                        )
                    ]
                )
            ]
        )
        
        # OpenSearch Serverless Collection
        self.vector_collection = opensearch.CfnCollection(
            self, "VectorCollection",
            name="strands-agents-vectors",
            type="VECTORSEARCH",
            description="Vector storage for agent conversations"
        )
        
        # IAM Role for Bedrock Knowledge Base
        self.kb_role = iam.Role(
            self, "BedrockKBRole",
            assumed_by=iam.ServicePrincipal("bedrock.amazonaws.com"),
            managed_policies=[
                iam.ManagedPolicy.from_aws_managed_policy_name(
                    "AmazonBedrockFullAccess"
                )
            ]
        )
        
        # Grant permissions for S3 and OpenSearch
        self.conversation_bucket.grant_read_write(self.kb_role)
        
        # Lambda for agent coordination
        self.coordination_lambda = lambda_.Function(
            self, "AgentCoordinator",
            runtime=lambda_.Runtime.PYTHON_3_11,
            handler="coordinator.handler",
            code=lambda_.Code.from_asset("lambda"),
            environment={
                "CONVERSATION_BUCKET": self.conversation_bucket.bucket_name,
                "VECTOR_COLLECTION": self.vector_collection.name,
                "KB_ROLE_ARN": self.kb_role.role_arn
            },
            timeout=Duration.minutes(15)
        )
        
        # EventBridge rule for scheduled coordination
        coordination_rule = events.Rule(
            self, "CoordinationSchedule",
            schedule=events.Schedule.cron(hour="8", minute="0")
        )
        
        coordination_rule.add_target(
            targets.LambdaFunction(self.coordination_lambda)
        )

Monitoring & Observability

# CloudWatch Integration
import boto3
from datetime import datetime

class AWSMonitoring:
    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
        self.logs = boto3.client('logs')
    
    def emit_agent_metrics(self, agent_id: str, metrics: Dict[str, float]):
        """Emit custom metrics to CloudWatch"""
        
        metric_data = []
        
        for metric_name, value in metrics.items():
            metric_data.append({
                'MetricName': metric_name,
                'Dimensions': [
                    {'Name': 'AgentId', 'Value': agent_id},
                    {'Name': 'Environment', 'Value': os.environ.get('ENVIRONMENT', 'dev')}
                ],
                'Value': value,
                'Unit': 'Count',
                'Timestamp': datetime.utcnow()
            })
        
        self.cloudwatch.put_metric_data(
            Namespace='StrandsAgents',
            MetricData=metric_data
        )
    
    def create_agent_dashboard(self) -> str:
        """Create CloudWatch dashboard for agent monitoring"""
        
        dashboard_body = {
            "widgets": [
                {
                    "type": "metric",
                    "properties": {
                        "metrics": [
                            ["StrandsAgents", "ConversationCount", "Environment", "production"],
                            [".", "ErrorRate", ".", "."],
                            [".", "ResponseTime", ".", "."]
                        ],
                        "period": 300,
                        "stat": "Average",
                        "region": "us-west-2",
                        "title": "Agent Performance Metrics"
                    }
                }
            ]
        }
        
        response = self.cloudwatch.put_dashboard(
            DashboardName='StrandsAgents-Production',
            DashboardBody=json.dumps(dashboard_body)
        )
        
        return response['DashboardArn']

Decision Matrix & Best Practices

🎯 Integration Pattern Decision Matrix

When to Use Each Integration Pattern

Use Case MCP A2A AWS External Rationale
Local Tool Integration βœ… Primary ❌ ⚠️ Optional ❌ MCP stdio transport ideal for local tools
Multi-Agent Coordination ⚠️ Helper βœ… Primary ⚠️ Infrastructure ❌ A2A enables proper orchestration patterns
Cloud-Native Deployment ⚠️ Helper ⚠️ Optional βœ… Primary ❌ AWS services provide scalability & reliability
Enterprise API Integration ❌ ❌ ⚠️ Auth/Config βœ… Primary External patterns handle diverse protocols
Real-time Collaboration ❌ βœ… Primary ⚠️ Infrastructure ⚠️ WebSocket A2A + real-time protocols
Batch Processing ❌ ⚠️ Optional βœ… Primary ❌ AWS services excel at batch workloads
Edge Computing βœ… Primary ❌ ❌ ⚠️ Optional MCP works well with local/edge resources
Legacy System Integration ❌ ❌ ❌ βœ… Primary External patterns handle legacy protocols

πŸ“Š Technology Selection Framework

MCP Server Selection

# MCP Server Selection Decision Tree
class MCPServerSelector:
    """Decision framework for MCP server selection"""
    
    def select_transport(self, requirements: Dict[str, Any]) -> str:
        """Select optimal MCP transport based on requirements"""
        
        # Decision tree for transport selection
        if requirements.get('local_execution', False):
            return 'stdio'  # Local process communication
        
        elif requirements.get('real_time', False):
            return 'sse'  # Server-Sent Events for real-time
        
        elif requirements.get('high_throughput', False):
            return 'streamable_http'  # HTTP for high throughput
        
        else:
            return 'stdio'  # Default to stdio for simplicity
    
    def select_server_configuration(self, use_case: str) -> Dict[str, Any]:
        """Recommend server configuration based on use case"""
        
        configurations = {
            'documentation': {
                'server': 'strands-agents-mcp-server',
                'transport': 'stdio',
                'command': 'uvx',
                'args': ['strands-agents-mcp-server'],
                'description': 'Strands documentation access'
            },
            
            'filesystem': {
                'server': 'filesystem-mcp-server',
                'transport': 'stdio', 
                'command': 'npx',
                'args': ['-y', '@modelcontextprotocol/server-filesystem', '/workspace'],
                'description': 'File system operations'
            },
            
            'web_api': {
                'server': 'custom-api-server',
                'transport': 'streamable_http',
                'url': 'https://api.example.com/mcp',
                'headers': {'Authorization': 'Bearer ${API_TOKEN}'},
                'description': 'Web API integration'
            },
            
            'database': {
                'server': 'database-mcp-server',
                'transport': 'stdio',
                'command': 'python',
                'args': ['-m', 'mcp_database_server'],
                'env': {'DATABASE_URL': '${DATABASE_URL}'},
                'description': 'Database operations'
            }
        }
        
        return configurations.get(use_case, configurations['documentation'])

A2A Pattern Selection

# A2A Pattern Selection Framework
class A2APatternSelector:
    """Framework for selecting appropriate A2A patterns"""
    
    def select_coordination_pattern(self, scenario: Dict[str, Any]) -> str:
        """Select coordination pattern based on scenario requirements"""
        
        complexity = scenario.get('task_complexity', 'simple')
        agent_count = scenario.get('agent_count', 1)
        latency_requirement = scenario.get('latency_requirement', 'standard')
        
        if complexity == 'simple' and agent_count <= 2:
            return 'direct_messaging'
        
        elif complexity == 'medium' and agent_count <= 5:
            return 'orchestrator_pattern'
        
        elif complexity == 'high' or agent_count > 5:
            return 'event_driven_workflow'
        
        elif latency_requirement == 'real_time':
            return 'streaming_coordination'
        
        else:
            return 'orchestrator_pattern'  # Safe default
    
    def get_pattern_implementation(self, pattern: str) -> Dict[str, Any]:
        """Get implementation details for selected pattern"""
        
        implementations = {
            'direct_messaging': {
                'transport': 'github_api',
                'message_format': 'json',
                'coordination_overhead': 'low',
                'suitable_for': ['simple delegation', 'request/response']
            },
            
            'orchestrator_pattern': {
                'transport': 'github_workflow_dispatch',
                'coordination_mechanism': 'central_orchestrator',
                'message_format': 'structured_json',
                'suitable_for': ['complex workflows', 'dependency management']
            },
            
            'event_driven_workflow': {
                'transport': 'temporal_workflow',
                'coordination_mechanism': 'event_sourcing',
                'state_management': 'durable',
                'suitable_for': ['long-running processes', 'fault tolerance']
            },
            
            'streaming_coordination': {
                'transport': 'websocket',
                'coordination_mechanism': 'real_time_messaging',
                'latency': 'sub_second',
                'suitable_for': ['real-time collaboration', 'live updates']
            }
        }
        
        return implementations.get(pattern, implementations['orchestrator_pattern'])

πŸ—οΈ Architecture Decision Records (ADRs)

ADR-001: MCP as Primary Tool Integration

Status: βœ… Accepted

Context: Need standardized approach for tool integration

Decision: Use MCP (Model Context Protocol) as primary integration method for external tools and services.

Consequences:

  • βœ… Standardized integration pattern
  • βœ… Multi-transport support (stdio, HTTP, SSE)
  • βœ… Easy local development
  • ❌ Additional complexity for simple tools
  • ❌ Dependency on MCP ecosystem

Implementation:

# Standard MCP integration pattern
mcp_config = {
    "mcpServers": {
        "primary-tool": {
            "command": "uvx",
            "args": ["tool-mcp-server"]
        }
    }
}

ADR-002: GitHub Actions for A2A Communication

Status: βœ… Accepted

Context: Need reliable agent-to-agent communication mechanism

Decision: Use GitHub Actions workflow dispatch as primary A2A communication transport.

Consequences:

  • βœ… Built-in authentication & authorization
  • βœ… Audit trail and observability
  • βœ… Scalable execution infrastructure
  • ❌ GitHub API rate limits
  • ❌ Latency not suitable for real-time use cases

Implementation:

# A2A communication via GitHub Actions
def dispatch_subagent(repository: str, task: str, config: Dict):
    response = requests.post(
        f"https://api.github.com/repos/{repository}/actions/workflows/agent.yml/dispatches",
        headers={"Authorization": f"token {github_token}"},
        json={"ref": "main", "inputs": config}
    )

ADR-003: AWS as Cloud Infrastructure Provider

Status: βœ… Accepted

Context: Need scalable, secure cloud infrastructure

Decision: Use AWS as primary cloud provider for production deployments.

Consequences:

  • βœ… Comprehensive AI/ML service ecosystem (Bedrock, SageMaker)
  • βœ… Enterprise security and compliance features
  • βœ… Mature tooling and SDKs
  • ❌ Vendor lock-in
  • ❌ Complexity of service configuration

Implementation:

# AWS integration pattern
aws_config = {
    "compute": "ecs_fargate",
    "storage": "s3",
    "ai_services": "bedrock",
    "search": "opensearch_serverless",
    "monitoring": "cloudwatch"
}

πŸ“‹ Best Practices Checklist

Development Phase

  • MCP Configuration

    • Define MCP servers in mcp.json
    • Test stdio transport locally
    • Implement retry logic for network transports
    • Add health checks for MCP connections
  • A2A Implementation

    • Design clear agent responsibilities
    • Implement coordination patterns
    • Add message validation
    • Test failure scenarios
  • AWS Integration

    • Configure IAM roles with least privilege
    • Implement S3 lifecycle policies
    • Set up Bedrock knowledge base
    • Configure monitoring and alerting

Production Deployment

  • Security

    • Enable encryption at rest and in transit
    • Implement authentication and authorization
    • Set up audit logging
    • Configure network security groups
  • Monitoring

    • Set up application metrics
    • Configure distributed tracing
    • Implement health checks
    • Set up alerting rules
  • Scalability

    • Configure auto-scaling policies
    • Implement connection pooling
    • Set up load balancing
    • Plan capacity requirements

Operations

  • Maintenance

    • Schedule regular dependency updates
    • Monitor resource utilization
    • Review and rotate secrets
    • Update documentation
  • Incident Response

    • Define runbooks for common issues
    • Set up on-call procedures
    • Test backup and recovery
    • Document escalation paths

πŸš€ Quick Start Templates

Minimal Production Setup

# 1. Clone and configure
git clone <your-strands-agents-repo>
cd strands-agents

# 2. Set environment variables
export STRANDS_PROVIDER="bedrock"
export STRANDS_MODEL_ID="us.anthropic.claude-sonnet-4-20250514-v1:0"
export MCP_SERVERS='{"mcpServers":{"docs":{"command":"uvx","args":["strands-agents-mcp-server"]}}}'

# 3. Install dependencies
uv pip install -r requirements.txt

# 4. Test locally
python agent_runner.py "Hello, test the integration"

# 5. Deploy to production
# Configure secrets in GitHub repository
# Push to main branch to trigger deployment

Enterprise Setup

# 1. Infrastructure setup
aws cloudformation deploy \
  --template-file infrastructure/strands-agents-stack.yaml \
  --stack-name strands-agents-prod \
  --capabilities CAPABILITY_IAM

# 2. Configure secrets
aws secretsmanager create-secret \
  --name "production/strands-agents/config" \
  --secret-string file://config/production-secrets.json

# 3. Deploy application
docker build -t strands-agents:production .
docker tag strands-agents:production $ECR_REPOSITORY:latest
docker push $ECR_REPOSITORY:latest

# 4. Update ECS service
aws ecs update-service \
  --cluster strands-agents-prod \
  --service strands-agents-api \
  --force-new-deployment

πŸ”§ Troubleshooting Guide

Common Integration Issues

Issue Symptoms Solution
MCP Connection Failed Tools not loading, timeout errors Check command availability, verify permissions
A2A Communication Failed Subagents not responding Verify GitHub token permissions, check rate limits
AWS Service Errors Authentication failures Review IAM roles, check region configuration
High Latency Slow response times Implement caching, optimize queries
Memory Issues OOM errors, crashes Monitor memory usage, implement limits

Debug Commands

# Test MCP connections
python -c "from tools.mcp_client import mcp_client; print(mcp_client('list_connections', connection_id=None))"

# Check AWS connectivity
aws sts get-caller-identity
aws s3 ls

# Verify agent configuration
python -c "from agent_runner import get_tools; print(list(get_tools().keys()))"

# Test health endpoints
curl http://localhost:8080/health

Next Steps: Choose the integration patterns that best fit your requirements and follow the implementation guides. Start with MCP for tool integration, then add A2A coordination and AWS services as needed.

Questions or Issues?

  • Review the troubleshooting guide
  • Check the Strands Agents documentation
  • Open an issue in the GitHub repository

External System Integration

🌐 Integration Architecture

Strands Agents supports diverse external system integrations through standardized patterns and protocols.

graph TB
    subgraph "Strands Agent Core"
        Agent[Agent Runtime]
        ToolRegistry[Tool Registry]
        ProtocolAdapters[Protocol Adapters]
    end
    
    subgraph "Framework Integrations"
        LangGraph[LangGraph]
        CrewAI[CrewAI]
        AutoGen[AutoGen]
        Haystack[Haystack]
    end
    
    subgraph "Workflow Systems"
        Temporal[Temporal.io]
        Airflow[Apache Airflow]
        Prefect[Prefect]
        GitHub[GitHub Actions]
    end
    
    subgraph "External APIs"
        REST[REST APIs]
        GraphQL[GraphQL]
        WebSocket[WebSocket]
        gRPC[gRPC Services]
    end
    
    subgraph "Specialized Systems"
        MLX[Apple MLX]
        BitChat[P2P Networks]
        Slack[Slack]
        Discord[Discord]
    end
    
    Agent --> ToolRegistry
    ToolRegistry --> ProtocolAdapters
    ProtocolAdapters --> LangGraph
    ProtocolAdapters --> Temporal
    ProtocolAdapters --> REST
    ProtocolAdapters --> MLX
Loading

πŸ”§ Implementation Patterns

1. Framework Compatibility Layer

# Framework Integration Adapter
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
import asyncio
import json

class FrameworkAdapter(ABC):
    """Base adapter for external framework integration"""
    
    @abstractmethod
    async def execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """Execute task in external framework"""
        pass
    
    @abstractmethod
    def convert_to_framework_format(self, strands_task: Dict) -> Any:
        """Convert Strands task to framework-specific format"""
        pass
    
    @abstractmethod
    def convert_from_framework_format(self, framework_result: Any) -> Dict:
        """Convert framework result back to Strands format"""
        pass

class LangGraphAdapter(FrameworkAdapter):
    """LangGraph integration adapter"""
    
    def __init__(self, graph_config: Dict[str, Any]):
        self.graph_config = graph_config
        self.langgraph_app = None
        self._initialize_graph()
    
    def _initialize_graph(self):
        """Initialize LangGraph application"""
        try:
            from langgraph.graph import StateGraph
            from langchain_core.messages import HumanMessage
            
            # Create LangGraph workflow
            workflow = StateGraph(dict)
            
            # Add nodes based on configuration
            for node_name, node_config in self.graph_config.get('nodes', {}).items():
                workflow.add_node(node_name, self._create_node_function(node_config))
            
            # Add edges
            for edge in self.graph_config.get('edges', []):
                workflow.add_edge(edge['from'], edge['to'])
            
            # Set entry point
            workflow.set_entry_point(self.graph_config.get('entry_point', 'start'))
            workflow.set_finish_point(self.graph_config.get('finish_point', 'end'))
            
            self.langgraph_app = workflow.compile()
            
        except ImportError:
            raise ImportError("LangGraph not installed. Install with: pip install langgraph")
    
    def _create_node_function(self, node_config: Dict):
        """Create a LangGraph node function from configuration"""
        
        def node_function(state: Dict) -> Dict:
            # Execute Strands tool or agent based on configuration
            tool_name = node_config.get('tool')
            if tool_name:
                # Execute Strands tool
                result = self._execute_strands_tool(tool_name, state)
                return {**state, 'result': result}
            
            # Default processing
            return state
        
        return node_function
    
    async def execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """Execute task using LangGraph"""
        
        try:
            # Convert to LangGraph format
            graph_input = self.convert_to_framework_format(task)
            
            # Execute the graph
            result = await self.langgraph_app.ainvoke(graph_input)
            
            # Convert back to Strands format
            return self.convert_from_framework_format(result)
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e),
                "framework": "langgraph"
            }
    
    def convert_to_framework_format(self, strands_task: Dict) -> Dict:
        """Convert Strands task to LangGraph state format"""
        return {
            "input": strands_task.get("query", ""),
            "context": strands_task.get("context", {}),
            "tools": strands_task.get("available_tools", [])
        }
    
    def convert_from_framework_format(self, graph_result: Dict) -> Dict:
        """Convert LangGraph result to Strands format"""
        return {
            "status": "success",
            "result": graph_result.get("result", "No result"),
            "framework": "langgraph",
            "execution_path": graph_result.get("path", [])
        }

class CrewAIAdapter(FrameworkAdapter):
    """CrewAI integration adapter"""
    
    def __init__(self, crew_config: Dict[str, Any]):
        self.crew_config = crew_config
        self.crew = None
        self._initialize_crew()
    
    def _initialize_crew(self):
        """Initialize CrewAI crew"""
        try:
            from crewai import Crew, Agent as CrewAgent, Task
            
            # Create agents from configuration
            agents = []
            for agent_config in self.crew_config.get('agents', []):
                agent = CrewAgent(
                    role=agent_config['role'],
                    goal=agent_config['goal'],
                    backstory=agent_config.get('backstory', ''),
                    verbose=True
                )
                agents.append(agent)
            
            # Create tasks
            tasks = []
            for task_config in self.crew_config.get('tasks', []):
                task = Task(
                    description=task_config['description'],
                    agent=agents[task_config.get('agent_index', 0)]
                )
                tasks.append(task)
            
            # Initialize crew
            self.crew = Crew(
                agents=agents,
                tasks=tasks,
                verbose=2
            )
            
        except ImportError:
            raise ImportError("CrewAI not installed. Install with: pip install crewai")
    
    async def execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """Execute task using CrewAI"""
        
        try:
            # Update task descriptions with current query
            query = task.get("query", "")
            
            # Execute crew
            result = self.crew.kickoff()
            
            return {
                "status": "success",
                "result": str(result),
                "framework": "crewai"
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e),
                "framework": "crewai"
            }
    
    def convert_to_framework_format(self, strands_task: Dict) -> Dict:
        """Convert Strands task to CrewAI format"""
        return strands_task  # CrewAI uses flexible dict format
    
    def convert_from_framework_format(self, crew_result: Any) -> Dict:
        """Convert CrewAI result to Strands format"""
        return {
            "status": "success",
            "result": str(crew_result),
            "framework": "crewai"
        }

2. Temporal.io Integration

# Temporal Workflow Integration
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from datetime import timedelta
import asyncio

@workflow.defn
class AgentWorkflow:
    """Temporal workflow for long-running agent tasks"""
    
    @workflow.run
    async def run(self, agent_config: Dict[str, Any]) -> Dict[str, Any]:
        """Main workflow execution"""
        
        # Step 1: Initialize agent environment
        setup_result = await workflow.execute_activity(
            setup_agent_environment,
            agent_config,
            start_to_close_timeout=timedelta(minutes=5)
        )
        
        if setup_result["status"] != "success":
            return setup_result
        
        # Step 2: Execute main agent task
        execution_result = await workflow.execute_activity(
            execute_agent_task,
            {
                "task": agent_config["task"],
                "context": setup_result["context"]
            },
            start_to_close_timeout=timedelta(hours=1),
            retry_policy=workflow.RetryPolicy(
                maximum_attempts=3,
                initial_interval=timedelta(seconds=10)
            )
        )
        
        # Step 3: Process results and cleanup
        cleanup_result = await workflow.execute_activity(
            cleanup_agent_environment,
            {
                "execution_result": execution_result,
                "agent_id": agent_config["agent_id"]
            },
            start_to_close_timeout=timedelta(minutes=5)
        )
        
        return {
            "workflow_id": workflow.info().workflow_id,
            "execution_result": execution_result,
            "cleanup_status": cleanup_result["status"]
        }

@activity.defn
async def setup_agent_environment(config: Dict[str, Any]) -> Dict[str, Any]:
    """Set up agent execution environment"""
    
    try:
        # Initialize Strands Agent
        from strands import Agent
        from strands_tools.utils.models.model import create_model
        
        model = create_model(
            provider=config.get("provider", "bedrock"),
            model_id=config.get("model_id")
        )
        
        # Load tools
        tools = load_tools_for_agent(config.get("tools", []))
        
        # Create agent
        agent = Agent(
            model=model,
            system_prompt=config.get("system_prompt", ""),
            tools=tools
        )
        
        return {
            "status": "success",
            "context": {
                "agent_id": config["agent_id"],
                "model_config": config.get("model_config", {}),
                "environment": "temporal"
            }
        }
        
    except Exception as e:
        return {
            "status": "error",
            "error": str(e)
        }

@activity.defn
async def execute_agent_task(task_config: Dict[str, Any]) -> Dict[str, Any]:
    """Execute the main agent task"""
    
    try:
        # Re-create agent (activities are stateless)
        agent = recreate_agent_from_context(task_config["context"])
        
        # Execute task with progress tracking
        task_result = agent(task_config["task"])
        
        return {
            "status": "success",
            "result": task_result,
            "execution_time": time.time() - task_config.get("start_time", time.time())
        }
        
    except Exception as e:
        return {
            "status": "error",
            "error": str(e)
        }

@activity.defn
async def cleanup_agent_environment(cleanup_config: Dict[str, Any]) -> Dict[str, Any]:
    """Clean up agent resources"""
    
    try:
        # Save execution artifacts
        if cleanup_config["execution_result"]["status"] == "success":
            save_agent_artifacts(cleanup_config)
        
        # Clean up temporary resources
        cleanup_temporary_files(cleanup_config["agent_id"])
        
        return {"status": "success"}
        
    except Exception as e:
        return {
            "status": "error",
            "error": str(e)
        }

class TemporalAgentManager:
    """Manager for Temporal-based agent execution"""
    
    def __init__(self, temporal_address: str = "localhost:7233"):
        self.temporal_address = temporal_address
        self.client = None
    
    async def initialize(self):
        """Initialize Temporal client"""
        self.client = await Client.connect(self.temporal_address)
    
    async def start_agent_workflow(self, agent_config: Dict[str, Any]) -> str:
        """Start a new agent workflow"""
        
        if not self.client:
            await self.initialize()
        
        workflow_id = f"agent_{agent_config['agent_id']}_{int(time.time())}"
        
        handle = await self.client.start_workflow(
            AgentWorkflow.run,
            agent_config,
            id=workflow_id,
            task_queue="agent-task-queue"
        )
        
        return handle.id
    
    async def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
        """Get workflow execution status"""
        
        try:
            handle = self.client.get_workflow_handle(workflow_id)
            result = await handle.result()
            
            return {
                "status": "completed",
                "result": result
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }

3. Apple MLX Integration

# Apple MLX Integration for local AI inference
class MLXIntegration:
    """Integration with Apple MLX for local model inference"""
    
    def __init__(self):
        self.models = {}
        self.tokenizers = {}
        self._check_mlx_availability()
    
    def _check_mlx_availability(self):
        """Check if MLX is available (Apple Silicon required)"""
        try:
            import mlx.core as mx
            import mlx.nn as nn
            self.mlx_available = True
        except ImportError:
            self.mlx_available = False
            print("MLX not available. Install with: pip install mlx")
    
    def load_model(self, model_path: str, model_name: str) -> bool:
        """Load MLX model for local inference"""
        
        if not self.mlx_available:
            return False
        
        try:
            import mlx.core as mx
            from mlx_lm import load, generate
            
            # Load model and tokenizer
            model, tokenizer = load(model_path)
            
            self.models[model_name] = model
            self.tokenizers[model_name] = tokenizer
            
            return True
            
        except Exception as e:
            print(f"Failed to load MLX model {model_name}: {e}")
            return False
    
    def generate_response(self, model_name: str, prompt: str, 
                         max_tokens: int = 512, temperature: float = 0.7) -> str:
        """Generate response using MLX model"""
        
        if model_name not in self.models:
            raise ValueError(f"Model {model_name} not loaded")
        
        try:
            from mlx_lm import generate
            
            model = self.models[model_name]
            tokenizer = self.tokenizers[model_name]
            
            # Generate response
            response = generate(
                model, tokenizer, prompt,
                max_tokens=max_tokens,
                temp=temperature
            )
            
            return response
            
        except Exception as e:
            raise RuntimeError(f"MLX generation failed: {e}")

# MLX Tool for Strands Agents
@tool
def mlx_generate(
    prompt: str,
    model_name: str = "default",
    max_tokens: int = 512,
    temperature: float = 0.7
) -> Dict[str, Any]:
    """Generate text using local MLX models"""
    
    try:
        # Initialize MLX integration
        mlx_integration = MLXIntegration()
        
        # Check if model is loaded
        if model_name not in mlx_integration.models:
            # Try to load default model
            model_path = os.environ.get('MLX_MODEL_PATH', 'mlx-community/Llama-3.2-3B-Instruct-4bit')
            mlx_integration.load_model(model_path, model_name)
        
        # Generate response
        response = mlx_integration.generate_response(
            model_name, prompt, max_tokens, temperature
        )
        
        return {
            "status": "success",
            "response": response,
            "model": model_name,
            "inference_type": "local_mlx"
        }
        
    except Exception as e:
        return {
            "status": "error",
            "error": str(e)
        }

4. P2P Network Integration (BitChat)

# P2P Mesh Network Integration
class P2PMeshIntegration:
    """Integration with P2P mesh networks for distributed agent communication"""
    
    def __init__(self, node_config: Dict[str, Any]):
        self.node_config = node_config
        self.peer_connections = {}
        self.message_handlers = {}
        self._initialize_node()
    
    def _initialize_node(self):
        """Initialize P2P node"""
        try:
            # Initialize libp2p or similar P2P library
            import asyncio
            from libp2p import new_host
            from libp2p.network.stream.net_stream_interface import INetStream
            from libp2p.peer.peerinfo import info_from_p2p_addr
            from multiaddr import Multiaddr
            
            self.host = None
            self.loop = asyncio.new_event_loop()
            
        except ImportError:
            print("P2P libraries not available. Install libp2p-py for P2P support")
    
    async def start_node(self):
        """Start P2P node"""
        try:
            from libp2p import new_host
            
            self.host = new_host()
            await self.host.get_network().listen(
                Multiaddr(self.node_config.get('listen_addr', '/ip4/127.0.0.1/tcp/0'))
            )
            
            # Set up message handling
            await self.host.set_stream_handler(
                "/strands-agent/1.0.0",
                self._handle_incoming_stream
            )
            
            print(f"P2P node started. ID: {self.host.get_id()}")
            
        except Exception as e:
            print(f"Failed to start P2P node: {e}")
    
    async def _handle_incoming_stream(self, stream):
        """Handle incoming P2P messages"""
        try:
            # Read message
            message_bytes = await stream.read()
            message = json.loads(message_bytes.decode())
            
            # Process message based on type
            message_type = message.get('type')
            if message_type in self.message_handlers:
                response = await self.message_handlers[message_type](message)
                
                # Send response
                await stream.write(json.dumps(response).encode())
            
            await stream.close()
            
        except Exception as e:
            print(f"Error handling P2P stream: {e}")
    
    def register_message_handler(self, message_type: str, handler_func):
        """Register handler for specific message types"""
        self.message_handlers[message_type] = handler_func
    
    async def send_agent_message(self, peer_id: str, message: Dict[str, Any]) -> Dict:
        """Send message to another agent in the network"""
        try:
            # Connect to peer
            stream = await self.host.new_stream(
                peer_id, 
                ["/strands-agent/1.0.0"]
            )
            
            # Send message
            await stream.write(json.dumps(message).encode())
            
            # Read response
            response_bytes = await stream.read()
            response = json.loads(response_bytes.decode())
            
            await stream.close()
            
            return response
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    async def discover_peers(self) -> List[str]:
        """Discover other agent nodes in the network"""
        try:
            # Use DHT or discovery mechanism
            # This is a simplified version
            peers = []
            
            # Query known bootstrap nodes
            bootstrap_nodes = self.node_config.get('bootstrap_nodes', [])
            
            for node_addr in bootstrap_nodes:
                try:
                    # Connect and query for peers
                    peer_info = info_from_p2p_addr(Multiaddr(node_addr))
                    # Add peer discovery logic here
                    peers.append(str(peer_info.peer_id))
                except Exception:
                    continue
            
            return peers
            
        except Exception as e:
            print(f"Peer discovery failed: {e}")
            return []

# P2P Agent Communication Tool
@tool
def p2p_send_message(
    target_agent_id: str,
    message: Dict[str, Any],
    message_type: str = "task_request"
) -> Dict[str, Any]:
    """Send message to another agent via P2P network"""
    
    try:
        # Initialize P2P integration
        p2p_config = {
            'listen_addr': '/ip4/0.0.0.0/tcp/0',
            'bootstrap_nodes': os.environ.get('P2P_BOOTSTRAP_NODES', '').split(',')
        }
        
        p2p_integration = P2PMeshIntegration(p2p_config)
        
        # Send message
        result = asyncio.run(
            p2p_integration.send_agent_message(
                target_agent_id, 
                {
                    'type': message_type,
                    'payload': message,
                    'timestamp': datetime.utcnow().isoformat()
                }
            )
        )
        
        return {
            "status": "success",
            "response": result,
            "transport": "p2p_mesh"
        }
        
    except Exception as e:
        return {
            "status": "error",
            "error": str(e)
        }

πŸ”Œ API Integration Patterns

REST API Integration

# Enhanced REST API Integration
class RESTAPIIntegration:
    """Standardized REST API integration with retry and caching"""
    
    def __init__(self, base_url: str, auth_config: Dict = None):
        self.base_url = base_url
        self.auth_config = auth_config or {}
        self.session = self._create_session()
        self.cache = {}  # Simple in-memory cache
    
    def _create_session(self) -> requests.Session:
        """Create configured requests session"""
        session = requests.Session()
        
        # Add authentication
        if self.auth_config.get('type') == 'bearer':
            session.headers.update({
                'Authorization': f"Bearer {self.auth_config['token']}"
            })
        elif self.auth_config.get('type') == 'api_key':
            session.headers.update({
                self.auth_config['header']: self.auth_config['key']
            })
        
        # Add retry strategy
        from requests.adapters import HTTPAdapter
        from urllib3.util.retry import Retry
        
        retry_strategy = Retry(
            total=3,
            status_forcelist=[429, 500, 502, 503, 504],
            method_whitelist=["HEAD", "GET", "OPTIONS"]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        
        return session
    
    def get(self, endpoint: str, params: Dict = None, cache_ttl: int = 300) -> Dict:
        """GET request with caching"""
        
        cache_key = f"GET:{endpoint}:{json.dumps(params or {}, sort_keys=True)}"
        
        # Check cache
        if cache_key in self.cache:
            cached_data, timestamp = self.cache[cache_key]
            if time.time() - timestamp < cache_ttl:
                return cached_data
        
        # Make request
        response = self.session.get(
            f"{self.base_url}/{endpoint.lstrip('/')}",
            params=params,
            timeout=30
        )
        response.raise_for_status()
        
        result = response.json()
        
        # Cache result
        self.cache[cache_key] = (result, time.time())
        
        return result
    
    def post(self, endpoint: str, data: Dict = None, json_data: Dict = None) -> Dict:
        """POST request"""
        response = self.session.post(
            f"{self.base_url}/{endpoint.lstrip('/')}",
            data=data,
            json=json_data,
            timeout=60
        )
        response.raise_for_status()
        return response.json()

MCP (Model Context Protocol) Integration

πŸ”Œ Protocol Analysis

MCP enables standardized communication between Strands Agents and external tools/services through multiple transport layers.

Core Architecture

graph TB
    Agent[Strands Agent] --> MCPClient[MCP Client]
    MCPClient --> Transport{Transport Layer}
    Transport --> StdIO[stdio_client]
    Transport --> SSE[sse_client]
    Transport --> HTTP[streamable_http]
    
    StdIO --> LocalTool[Local Tools]
    SSE --> RemoteAPI[Remote APIs]
    HTTP --> WebService[Web Services]
    
    MCPClient --> ToolRegistry[Agent Tool Registry]
    ToolRegistry --> Wrapper[MCPToolWrapper]
Loading

πŸš€ Implementation Patterns

1. Multi-Transport MCP Client

# tools/mcp_client.py - Production Implementation
from mcp import StdioServerParameters, stdio_client
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client
from strands.tools.mcp import MCPClient

class ProductionMCPManager:
    def __init__(self):
        self.connections = {}
        self.connection_lock = Lock()
    
    def connect_stdio_server(self, connection_id: str, command: str, args: list):
        """Connect to stdio-based MCP server (local tools)"""
        client = MCPClient(
            lambda: stdio_client(
                StdioServerParameters(
                    command=command, 
                    args=args
                )
            )
        )
        
        with self.connection_lock:
            self.connections[connection_id] = {
                'client': client,
                'transport': 'stdio',
                'status': 'connected'
            }
        
        return client
    
    def connect_http_server(self, connection_id: str, url: str, headers: dict = None):
        """Connect to HTTP-based MCP server (web services)"""
        client = MCPClient(
            lambda: streamablehttp_client(
                url=url,
                headers=headers or {},
                timeout=timedelta(seconds=30)
            )
        )
        
        with self.connection_lock:
            self.connections[connection_id] = {
                'client': client,
                'transport': 'http',
                'status': 'connected'
            }
        
        return client

2. Configuration Management

// mcp_config.json - Enterprise Configuration
{
  "mcpServers": {
    "strands-docs": {
      "command": "uvx",
      "args": ["strands-agents-mcp-server"],
      "env": {
        "DOCS_API_KEY": "${DOCS_API_KEY}"
      }
    },
    "enterprise-api": {
      "url": "https://api.company.com/mcp",
      "headers": {
        "Authorization": "Bearer ${ENTERPRISE_TOKEN}"
      },
      "timeout": 60,
      "sse_read_timeout": 300
    },
    "filesystem": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-filesystem", "/workspace"]
    }
  }
}

3. Tool Wrapper Implementation

# Advanced MCP Tool Wrapper with Error Recovery
class EnterpriseToolWrapper(MCPToolWrapper):
    def __init__(self, connection_id: str, tool_name: str, tool_spec: ToolSpec, 
                 mcp_client: Any, retry_config: dict = None):
        super().__init__(connection_id, tool_name, tool_spec, mcp_client)
        self.retry_config = retry_config or {
            'max_retries': 3,
            'backoff_factor': 2,
            'timeout': 30
        }
    
    def invoke_with_retry(self, tool: ToolUse, *args, **kwargs) -> ToolResult:
        """Invoke tool with exponential backoff retry logic"""
        last_error = None
        
        for attempt in range(self.retry_config['max_retries']):
            try:
                return super().invoke(tool, *args, **kwargs)
            except Exception as e:
                last_error = e
                if attempt < self.retry_config['max_retries'] - 1:
                    wait_time = self.retry_config['backoff_factor'] ** attempt
                    time.sleep(wait_time)
                    logger.warning(f"Retry {attempt + 1} for {self.tool_name}: {e}")
        
        return {
            "toolUseId": tool.get("toolUseId", "unknown"),
            "status": "error",
            "content": [{
                "text": f"Tool failed after {self.retry_config['max_retries']} retries: {last_error}"
            }]
        }

πŸ”§ Production Deployment

Environment Variables

# MCP Server Configuration
export MCP_SERVERS='{"mcpServers":{"strands-docs":{"command":"uvx","args":["strands-agents-mcp-server"]}}}'

# Authentication
export DOCS_API_KEY="your-docs-api-key"
export ENTERPRISE_TOKEN="your-enterprise-token"

# Timeouts & Limits
export STRANDS_MCP_TIMEOUT="30.0"
export MCP_MAX_CONNECTIONS="10"

Docker Integration

# Dockerfile for MCP-enabled Strands Agent
FROM python:3.11-slim

# Install UV and Node.js for MCP servers
RUN pip install uv
RUN curl -fsSL https://deb.nodesource.com/setup_18.x | bash - && \
    apt-get install -y nodejs

# Install Python dependencies
COPY requirements.txt .
RUN uv pip install --system -r requirements.txt

# Install MCP servers
RUN uvx install strands-agents-mcp-server
RUN npm install -g @modelcontextprotocol/server-filesystem

# Copy application
COPY . /app
WORKDIR /app

# Health check for MCP connections
HEALTHCHEK --interval=30s --timeout=10s --retries=3 \
  CMD python -c "from tools.mcp_client import test_connections; test_connections()"

CMD ["python", "agent_runner.py"]

πŸ›‘οΈ Security & Best Practices

Authentication Patterns

# Secure MCP Authentication
class SecureMCPClient:
    def __init__(self, credentials_manager):
        self.credentials = credentials_manager
    
    def get_authenticated_headers(self, server_name: str) -> dict:
        """Get authentication headers with credential rotation"""
        token = self.credentials.get_token(server_name)
        return {
            "Authorization": f"Bearer {token}",
            "X-Client-Version": "strands-agent-1.0",
            "X-Request-ID": str(uuid.uuid4())
        }
    
    def create_secure_transport(self, server_config: dict):
        """Create transport with security validation"""
        if server_config.get('url'):
            # Validate URL is allowlisted
            if not self.is_url_allowed(server_config['url']):
                raise SecurityError(f"URL not in allowlist: {server_config['url']}")
            
            headers = self.get_authenticated_headers(server_config['name'])
            return streamablehttp_client(
                url=server_config['url'],
                headers=headers,
                verify_ssl=True  # Always verify SSL in production
            )

Monitoring Integration

# MCP Connection Health Monitoring
from prometheus_client import Counter, Histogram, Gauge

mcp_requests_total = Counter('mcp_requests_total', 'Total MCP requests', ['server', 'tool', 'status'])
mcp_request_duration = Histogram('mcp_request_duration_seconds', 'MCP request duration')
mcp_active_connections = Gauge('mcp_active_connections', 'Active MCP connections', ['server'])

class MonitoredMCPWrapper(MCPToolWrapper):
    def invoke(self, tool: ToolUse, *args, **kwargs) -> ToolResult:
        start_time = time.time()
        
        try:
            result = super().invoke(tool, *args, **kwargs)
            mcp_requests_total.labels(
                server=self.connection_id,
                tool=self.original_tool_name,
                status='success'
            ).inc()
            return result
        except Exception as e:
            mcp_requests_total.labels(
                server=self.connection_id,
                tool=self.original_tool_name,
                status='error'
            ).inc()
            raise
        finally:
            duration = time.time() - start_time
            mcp_request_duration.observe(duration)

Production Integration Patterns

πŸš€ Production Deployment Architecture

graph TB
    subgraph "Load Balancer Layer"
        ALB[Application Load Balancer]
        CF[CloudFront CDN]
    end
    
    subgraph "Compute Layer"
        ECS[ECS Fargate]
        Lambda[Lambda Functions]
        EC2[EC2 Auto Scaling]
    end
    
    subgraph "Agent Runtime"
        Agent[Strands Agent]
        MCP[MCP Clients]
        Tools[Tool Registry]
    end
    
    subgraph "Data Layer"
        RDS[RDS PostgreSQL]
        S3[S3 Storage]
        OpenSearch[OpenSearch]
        Redis[ElastiCache Redis]
    end
    
    subgraph "Monitoring"
        CW[CloudWatch]
        XRay[X-Ray Tracing]
        Prometheus[Prometheus]
        Grafana[Grafana]
    end
    
    ALB --> ECS
    ALB --> Lambda
    ECS --> Agent
    Lambda --> Agent
    Agent --> MCP
    Agent --> Tools
    
    Agent --> RDS
    Agent --> S3
    Agent --> OpenSearch
    Agent --> Redis
    
    Agent --> CW
    Agent --> XRay
    
    style Agent fill:#2196f3
    style Monitoring fill:#4caf50
Loading

πŸ›‘οΈ Security & Compliance

1. Enterprise Authentication

# Enterprise Authentication & Authorization
class EnterpriseAuthManager:
    """Enterprise-grade authentication and authorization"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.jwt_secret = os.environ.get('JWT_SECRET_KEY')
        self.oauth_config = config.get('oauth', {})
        self.rbac_config = config.get('rbac', {})
    
    def authenticate_request(self, token: str) -> Dict[str, Any]:
        """Authenticate incoming request"""
        
        try:
            # Validate JWT token
            payload = jwt.decode(
                token, 
                self.jwt_secret, 
                algorithms=['HS256']
            )
            
            # Check token expiration
            if payload.get('exp', 0) < time.time():
                raise ValueError("Token expired")
            
            # Validate against identity provider
            user_info = self.validate_with_idp(payload)
            
            return {
                "status": "authenticated",
                "user_id": user_info['user_id'],
                "roles": user_info['roles'],
                "permissions": user_info['permissions']
            }
            
        except Exception as e:
            return {
                "status": "unauthorized",
                "error": str(e)
            }
    
    def authorize_action(self, user_info: Dict, resource: str, action: str) -> bool:
        """Check if user is authorized for action on resource"""
        
        user_roles = user_info.get('roles', [])
        user_permissions = user_info.get('permissions', [])
        
        # Check explicit permissions
        required_permission = f"{resource}:{action}"
        if required_permission in user_permissions:
            return True
        
        # Check role-based permissions
        for role in user_roles:
            role_permissions = self.rbac_config.get('roles', {}).get(role, [])
            if required_permission in role_permissions:
                return True
        
        return False
    
    def validate_with_idp(self, payload: Dict) -> Dict[str, Any]:
        """Validate token with identity provider"""
        
        # Implement validation with your IDP (Azure AD, Okta, etc.)
        # This is a simplified example
        
        user_id = payload.get('sub')
        
        # Query user information from IDP
        idp_response = requests.get(
            f"{self.oauth_config['idp_url']}/users/{user_id}",
            headers={
                'Authorization': f"Bearer {self.oauth_config['admin_token']}"
            }
        )
        
        if idp_response.status_code == 200:
            user_data = idp_response.json()
            return {
                'user_id': user_id,
                'roles': user_data.get('roles', []),
                'permissions': user_data.get('permissions', [])
            }
        else:
            raise ValueError("User validation failed")

# Secure Agent Wrapper
class SecureAgentWrapper:
    """Security wrapper for Strands Agent"""
    
    def __init__(self, agent: Agent, auth_manager: EnterpriseAuthManager):
        self.agent = agent
        self.auth_manager = auth_manager
        self.audit_logger = self._setup_audit_logging()
    
    def execute_with_auth(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Execute agent request with authentication and authorization"""
        
        # Extract authentication token
        auth_token = request.get('auth_token')
        if not auth_token:
            return self._unauthorized_response("No auth token provided")
        
        # Authenticate user
        auth_result = self.auth_manager.authenticate_request(auth_token)
        if auth_result['status'] != 'authenticated':
            return self._unauthorized_response(auth_result.get('error', 'Authentication failed'))
        
        # Authorize action
        if not self.auth_manager.authorize_action(
            auth_result, 'agent', 'execute'
        ):
            return self._forbidden_response("Insufficient permissions")
        
        # Log request
        self.audit_logger.info({
            'user_id': auth_result['user_id'],
            'action': 'agent_execute',
            'query': request.get('query', '')[:100],  # Truncate for logs
            'timestamp': datetime.utcnow().isoformat()
        })
        
        try:
            # Execute agent
            result = self.agent(request['query'])
            
            # Log successful execution
            self.audit_logger.info({
                'user_id': auth_result['user_id'],
                'action': 'agent_execute_success',
                'timestamp': datetime.utcnow().isoformat()
            })
            
            return {
                'status': 'success',
                'result': result,
                'user_id': auth_result['user_id']
            }
            
        except Exception as e:
            # Log error
            self.audit_logger.error({
                'user_id': auth_result['user_id'],
                'action': 'agent_execute_error',
                'error': str(e),
                'timestamp': datetime.utcnow().isoformat()
            })
            
            return {
                'status': 'error',
                'error': 'Agent execution failed'
            }
    
    def _setup_audit_logging(self):
        """Setup audit logging"""
        logger = logging.getLogger('strands_audit')
        logger.setLevel(logging.INFO)
        
        # Add CloudWatch handler for production
        if os.environ.get('ENVIRONMENT') == 'production':
            import watchtower
            handler = watchtower.CloudWatchLogsHandler(
                log_group='strands-agents-audit',
                stream_name=f"agent-{socket.gethostname()}"
            )
            logger.addHandler(handler)
        
        return logger

2. Configuration Management

# Enterprise Configuration Management
class ConfigurationManager:
    """Centralized configuration management with environment-specific settings"""
    
    def __init__(self, environment: str = None):
        self.environment = environment or os.environ.get('ENVIRONMENT', 'development')
        self.config_cache = {}
        self.secret_manager = self._initialize_secret_manager()
    
    def _initialize_secret_manager(self):
        """Initialize AWS Secrets Manager or similar"""
        try:
            import boto3
            return boto3.client('secretsmanager')
        except Exception:
            return None
    
    def get_config(self, key: str, default: Any = None, sensitive: bool = False) -> Any:
        """Get configuration value with caching and secret management"""
        
        cache_key = f"{self.environment}:{key}"
        
        # Check cache first (for non-sensitive data)
        if not sensitive and cache_key in self.config_cache:
            return self.config_cache[cache_key]
        
        # Try environment variable first
        env_key = f"{self.environment.upper()}_{key.upper()}"
        value = os.environ.get(env_key)
        
        if value is None:
            # Try generic environment variable
            value = os.environ.get(key.upper())
        
        if value is None and sensitive and self.secret_manager:
            # Try AWS Secrets Manager
            try:
                secret_name = f"{self.environment}/strands-agents/{key}"
                response = self.secret_manager.get_secret_value(SecretId=secret_name)
                value = response['SecretString']
            except Exception:
                pass
        
        if value is None:
            value = default
        
        # Cache non-sensitive values
        if not sensitive and value is not None:
            self.config_cache[cache_key] = value
        
        return value
    
    def get_agent_config(self) -> Dict[str, Any]:
        """Get complete agent configuration for environment"""
        
        return {
            'model': {
                'provider': self.get_config('MODEL_PROVIDER', 'bedrock'),
                'model_id': self.get_config('MODEL_ID', 'us.anthropic.claude-sonnet-4-20250514-v1:0'),
                'max_tokens': int(self.get_config('MAX_TOKENS', '32768')),
                'temperature': float(self.get_config('TEMPERATURE', '0.7'))
            },
            'aws': {
                'region': self.get_config('AWS_REGION', 'us-west-2'),
                'role_arn': self.get_config('AWS_ROLE_ARN', sensitive=True),
                's3_bucket': self.get_config('S3_BUCKET', 'strands-agents-prod'),
                'knowledge_base_id': self.get_config('KNOWLEDGE_BASE_ID', sensitive=True)
            },
            'mcp': {
                'servers': self._load_mcp_config()
            },
            'security': {
                'jwt_secret': self.get_config('JWT_SECRET_KEY', sensitive=True),
                'api_key_header': self.get_config('API_KEY_HEADER', 'X-API-Key'),
                'rate_limit': int(self.get_config('RATE_LIMIT_RPM', '60'))
            },
            'monitoring': {
                'metrics_enabled': self.get_config('METRICS_ENABLED', 'true').lower() == 'true',
                'tracing_enabled': self.get_config('TRACING_ENABLED', 'true').lower() == 'true',
                'log_level': self.get_config('LOG_LEVEL', 'INFO')
            }
        }
    
    def _load_mcp_config(self) -> Dict[str, Any]:
        """Load MCP server configuration"""
        
        # Try environment variable first
        mcp_config_str = self.get_config('MCP_SERVERS')
        if mcp_config_str:
            try:
                return json.loads(mcp_config_str).get('mcpServers', {})
            except json.JSONDecodeError:
                pass
        
        # Load from file
        config_files = [
            f'mcp_config_{self.environment}.json',
            'mcp_config.json',
            'mcp.json'
        ]
        
        for config_file in config_files:
            if os.path.exists(config_file):
                try:
                    with open(config_file, 'r') as f:
                        config = json.load(f)
                        return config.get('mcpServers', {})
                except Exception:
                    continue
        
        # Default configuration
        return {
            'strands-docs': {
                'command': 'uvx',
                'args': ['strands-agents-mcp-server']
            }
        }

πŸ“Š Monitoring & Observability

1. Comprehensive Monitoring Stack

# Production Monitoring Integration
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import structlog
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

class ProductionMonitoring:
    """Production monitoring and observability"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.setup_metrics()
        self.setup_logging()
        self.setup_tracing()
    
    def setup_metrics(self):
        """Setup Prometheus metrics"""
        
        self.metrics = {
            'agent_requests_total': Counter(
                'agent_requests_total',
                'Total agent requests',
                ['user_id', 'status', 'model']
            ),
            'agent_response_time': Histogram(
                'agent_response_time_seconds',
                'Agent response time in seconds',
                ['model', 'tool_count']
            ),
            'active_agents': Gauge(
                'active_agents',
                'Number of active agents'
            ),
            'mcp_connections': Gauge(
                'mcp_connections_active',
                'Active MCP connections',
                ['server_name']
            ),
            'tool_executions': Counter(
                'tool_executions_total',
                'Total tool executions',
                ['tool_name', 'status']
            )
        }
        
        # Start metrics server
        if self.config.get('metrics_port'):
            start_http_server(self.config['metrics_port'])
    
    def setup_logging(self):
        """Setup structured logging"""
        
        structlog.configure(
            processors=[
                structlog.stdlib.filter_by_level,
                structlog.stdlib.add_logger_name,
                structlog.stdlib.add_log_level,
                structlog.stdlib.PositionalArgumentsFormatter(),
                structlog.processors.TimeStamper(fmt="iso"),
                structlog.processors.StackInfoRenderer(),
                structlog.processors.format_exc_info,
                structlog.processors.JSONRenderer()
            ],
            context_class=dict,
            logger_factory=structlog.stdlib.LoggerFactory(),
            wrapper_class=structlog.stdlib.BoundLogger,
            cache_logger_on_first_use=True,
        )
        
        self.logger = structlog.get_logger()
    
    def setup_tracing(self):
        """Setup distributed tracing"""
        
        if not self.config.get('tracing_enabled'):
            return
        
        # Setup Jaeger exporter
        jaeger_exporter = JaegerExporter(
            agent_host_name=self.config.get('jaeger_host', 'localhost'),
            agent_port=self.config.get('jaeger_port', 6831),
        )
        
        # Configure tracer
        trace.set_tracer_provider(TracerProvider())
        tracer = trace.get_tracer(__name__)
        
        span_processor = BatchSpanProcessor(jaeger_exporter)
        trace.get_tracer_provider().add_span_processor(span_processor)
        
        self.tracer = tracer
    
    def track_agent_execution(self, func):
        """Decorator to track agent execution metrics"""
        
        def wrapper(*args, **kwargs):
            start_time = time.time()
            user_id = kwargs.get('user_id', 'unknown')
            model = kwargs.get('model', 'unknown')
            
            # Create trace span
            with self.tracer.start_as_current_span("agent_execution") as span:
                span.set_attribute("user_id", user_id)
                span.set_attribute("model", model)
                
                try:
                    # Execute function
                    result = func(*args, **kwargs)
                    
                    # Track success metrics
                    self.metrics['agent_requests_total'].labels(
                        user_id=user_id, status='success', model=model
                    ).inc()
                    
                    execution_time = time.time() - start_time
                    self.metrics['agent_response_time'].labels(
                        model=model, tool_count=len(kwargs.get('tools', []))
                    ).observe(execution_time)
                    
                    # Log successful execution
                    self.logger.info(
                        "Agent execution completed",
                        user_id=user_id,
                        model=model,
                        execution_time=execution_time,
                        result_length=len(str(result))
                    )
                    
                    return result
                    
                except Exception as e:
                    # Track error metrics
                    self.metrics['agent_requests_total'].labels(
                        user_id=user_id, status='error', model=model
                    ).inc()
                    
                    # Log error
                    self.logger.error(
                        "Agent execution failed",
                        user_id=user_id,
                        model=model,
                        error=str(e),
                        exc_info=True
                    )
                    
                    span.set_attribute("error", True)
                    span.set_attribute("error_message", str(e))
                    
                    raise
        
        return wrapper

# Health Check System
class HealthCheckManager:
    """Health check system for production deployment"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.checks = {}
        self.register_default_checks()
    
    def register_check(self, name: str, check_func, timeout: int = 30):
        """Register a health check"""
        self.checks[name] = {
            'function': check_func,
            'timeout': timeout
        }
    
    def register_default_checks(self):
        """Register default health checks"""
        
        self.register_check('database', self._check_database)
        self.register_check('s3', self._check_s3_connectivity)
        self.register_check('mcp_servers', self._check_mcp_servers)
        self.register_check('memory', self._check_memory_usage)
        self.register_check('disk', self._check_disk_space)
    
    async def run_all_checks(self) -> Dict[str, Any]:
        """Run all health checks"""
        
        results = {}
        overall_status = 'healthy'
        
        for check_name, check_config in self.checks.items():
            try:
                # Run check with timeout
                check_result = await asyncio.wait_for(
                    check_config['function'](),
                    timeout=check_config['timeout']
                )
                
                results[check_name] = {
                    'status': 'healthy',
                    'details': check_result
                }
                
            except asyncio.TimeoutError:
                results[check_name] = {
                    'status': 'unhealthy',
                    'error': 'Health check timeout'
                }
                overall_status = 'unhealthy'
                
            except Exception as e:
                results[check_name] = {
                    'status': 'unhealthy',
                    'error': str(e)
                }
                overall_status = 'unhealthy'
        
        return {
            'status': overall_status,
            'checks': results,
            'timestamp': datetime.utcnow().isoformat()
        }
    
    async def _check_database(self) -> Dict[str, Any]:
        """Check database connectivity"""
        # Implementation depends on your database
        return {'connection': 'ok', 'latency_ms': 5}
    
    async def _check_s3_connectivity(self) -> Dict[str, Any]:
        """Check S3 connectivity"""
        try:
            import boto3
            s3 = boto3.client('s3')
            
            # Try to list buckets
            response = s3.list_buckets()
            return {
                'connection': 'ok',
                'bucket_count': len(response['Buckets'])
            }
        except Exception as e:
            raise Exception(f"S3 connectivity failed: {e}")
    
    async def _check_mcp_servers(self) -> Dict[str, Any]:
        """Check MCP server connectivity"""
        # Check MCP server connections
        from tools.mcp_client import _list_active_connections
        
        connections = _list_active_connections({})
        active_count = sum(
            1 for conn in connections.get('connections', [])
            if conn.get('is_active')
        )
        
        return {
            'active_connections': active_count,
            'total_connections': connections.get('total_connections', 0)
        }
    
    async def _check_memory_usage(self) -> Dict[str, Any]:
        """Check memory usage"""
        import psutil
        
        memory = psutil.virtual_memory()
        
        if memory.percent > 90:
            raise Exception(f"High memory usage: {memory.percent}%")
        
        return {
            'usage_percent': memory.percent,
            'available_mb': memory.available // 1024 // 1024
        }
    
    async def _check_disk_space(self) -> Dict[str, Any]:
        """Check disk space"""
        import shutil
        
        total, used, free = shutil.disk_usage("/")
        usage_percent = (used / total) * 100
        
        if usage_percent > 85:
            raise Exception(f"High disk usage: {usage_percent:.1f}%")
        
        return {
            'usage_percent': usage_percent,
            'free_gb': free // 1024 // 1024 // 1024
        }

πŸ”„ CI/CD Pipeline Integration

# .github/workflows/production-deploy.yml
name: Production Deployment

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

env:
  AWS_REGION: us-west-2
  ECS_CLUSTER: strands-agents-prod
  ECR_REPOSITORY: strands-agents

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: |
          pip install uv
          uv pip install --system -r requirements.txt
          uv pip install --system -r requirements-test.txt
      
      - name: Run tests
        run: |
          pytest tests/ -v --cov=. --cov-report=xml
      
      - name: Security scan
        run: |
          bandit -r . -f json -o security-report.json
      
      - name: Upload coverage
        uses: codecov/codecov-action@v3
        with:
          file: ./coverage.xml

  build:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    outputs:
      image-tag: ${{ steps.build-image.outputs.image-tag }}
    steps:
      - uses: actions/checkout@v4
      
      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          role-to-assume: ${{ secrets.AWS_ROLE_ARN }}
          aws-region: ${{ env.AWS_REGION }}
      
      - name: Login to Amazon ECR
        id: login-ecr
        uses: aws-actions/amazon-ecr-login@v1
      
      - name: Build and push Docker image
        id: build-image
        env:
          ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
          IMAGE_TAG: ${{ github.sha }}
        run: |
          docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG .
          docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
          echo "image-tag=$IMAGE_TAG" >> $GITHUB_OUTPUT

  deploy-staging:
    needs: build
    runs-on: ubuntu-latest
    environment: staging
    steps:
      - uses: actions/checkout@v4
      
      - name: Deploy to staging
        env:
          IMAGE_TAG: ${{ needs.build.outputs.image-tag }}
        run: |
          # Update ECS service with new image
          aws ecs update-service \
            --cluster strands-agents-staging \
            --service strands-agents-api \
            --force-new-deployment
      
      - name: Run integration tests
        run: |
          python integration_tests/test_staging.py

  deploy-production:
    needs: [build, deploy-staging]
    runs-on: ubuntu-latest
    environment: production
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4
      
      - name: Deploy to production
        env:
          IMAGE_TAG: ${{ needs.build.outputs.image-tag }}
        run: |
          # Blue-green deployment
          python deployment/blue_green_deploy.py \
            --cluster $ECS_CLUSTER \
            --service strands-agents-api \
            --image-tag $IMAGE_TAG
      
      - name: Verify deployment
        run: |
          python deployment/verify_deployment.py
      
      - name: Notify deployment
        if: always()
        uses: 8398a7/action-slack@v3
        with:
          status: ${{ job.status }}
          text: 'Production deployment ${{ job.status }}'
        env:
          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment