βοΈ Architecture Overview
Strands Agents integrates deeply with AWS services for compute, storage, AI/ML, and infrastructure management.
graph TB
subgraph "Strands Agent Runtime"
Agent[Agent Core]
SessionMgr[S3SessionManager]
KnowledgeBase[Bedrock KB]
end
subgraph "AWS Compute"
Lambda[AWS Lambda]
Fargate[ECS Fargate]
EC2[EC2 Instances]
end
subgraph "AWS AI/ML"
Bedrock[Amazon Bedrock]
SageMaker[SageMaker]
Comprehend[Comprehend]
end
subgraph "AWS Storage"
S3[S3 Buckets]
DynamoDB[DynamoDB]
OpenSearch[OpenSearch]
end
Agent --> SessionMgr
Agent --> KnowledgeBase
SessionMgr --> S3
KnowledgeBase --> Bedrock
Agent --> Lambda
Agent --> OpenSearch
Agent --> DynamoDB
Loading
π§ Core Integration Patterns
# Enhanced S3 Session Manager with enterprise features
from strands .session .s3_session_manager import S3SessionManager
import boto3
import json
from datetime import datetime , timedelta
class EnterpriseS3SessionManager (S3SessionManager ):
def __init__ (self , session_id : str , bucket : str , prefix : str = "conversations/" ,
region_name : str = "us-west-2" , encryption_config : dict = None ):
super ().__init__ (session_id , bucket , prefix , region_name )
self .encryption_config = encryption_config or {
"SSEAlgorithm" : "aws:kms" ,
"KMSKeyId" : "alias/strands-agents-key"
}
# Initialize enhanced S3 client with retry configuration
self .s3_client = boto3 .client (
's3' ,
region_name = region_name ,
config = boto3 .session .Config (
retries = {
'max_attempts' : 10 ,
'mode' : 'adaptive'
},
max_pool_connections = 50
)
)
def save_conversation_with_metadata (self , conversation_data : dict ,
metadata : dict = None ) -> str :
"""Save conversation with comprehensive metadata"""
# Enhance conversation data with metadata
enhanced_data = {
"conversation" : conversation_data ,
"metadata" : {
"session_id" : self .session_id ,
"timestamp" : datetime .utcnow ().isoformat (),
"version" : "1.0" ,
"agent_version" : os .environ .get ("STRANDS_VERSION" , "unknown" ),
"custom_metadata" : metadata or {}
},
"tags" : self .generate_conversation_tags (conversation_data )
}
# Save with server-side encryption
key = f"{ self .prefix } { self .session_id } /conversation_{ datetime .utcnow ().isoformat ()} .json"
self .s3_client .put_object (
Bucket = self .bucket ,
Key = key ,
Body = json .dumps (enhanced_data , indent = 2 ),
ServerSideEncryption = self .encryption_config ["SSEAlgorithm" ],
SSEKMSKeyId = self .encryption_config .get ("KMSKeyId" ),
Metadata = {
"session-id" : self .session_id ,
"conversation-type" : metadata .get ("type" , "standard" ),
"agent-model" : metadata .get ("model" , "unknown" )
},
Tagging = f"Environment=production&Project=strands-agents&SessionId={ self .session_id } "
)
return key
def setup_lifecycle_management (self ):
"""Configure S3 lifecycle policies for cost optimization"""
lifecycle_config = {
'Rules' : [
{
'ID' : 'StrandsAgentConversations' ,
'Status' : 'Enabled' ,
'Filter' : {'Prefix' : self .prefix },
'Transitions' : [
{
'Days' : 30 ,
'StorageClass' : 'STANDARD_IA'
},
{
'Days' : 90 ,
'StorageClass' : 'GLACIER'
},
{
'Days' : 365 ,
'StorageClass' : 'DEEP_ARCHIVE'
}
],
'Expiration' : {
'Days' : 2555 # 7 years retention
}
}
]
}
self .s3_client .put_bucket_lifecycle_configuration (
Bucket = self .bucket ,
LifecycleConfiguration = lifecycle_config
)
# Advanced Bedrock Integration with Knowledge Base
class EnterpriseBedrockIntegration :
def __init__ (self , region : str = "us-west-2" ):
self .bedrock_client = boto3 .client ('bedrock-runtime' , region_name = region )
self .bedrock_agent_client = boto3 .client ('bedrock-agent' , region_name = region )
self .knowledge_base_id = os .environ .get ('STRANDS_KNOWLEDGE_BASE_ID' )
def setup_knowledge_base (self , name : str , description : str ,
opensearch_config : dict ) -> str :
"""Create and configure Bedrock Knowledge Base"""
# Create Knowledge Base
response = self .bedrock_agent_client .create_knowledge_base (
name = name ,
description = description ,
roleArn = os .environ .get ('BEDROCK_KB_ROLE_ARN' ),
knowledgeBaseConfiguration = {
'type' : 'VECTOR' ,
'vectorKnowledgeBaseConfiguration' : {
'embeddingModelArn' : f'arn:aws:bedrock:{ self .region } ::foundation-model/amazon.titan-embed-text-v1'
}
},
storageConfiguration = {
'type' : 'OPENSEARCH_SERVERLESS' ,
'opensearchServerlessConfiguration' : {
'collectionArn' : opensearch_config ['collection_arn' ],
'vectorIndexName' : opensearch_config ['index_name' ],
'fieldMapping' : {
'vectorField' : 'vector' ,
'textField' : 'text' ,
'metadataField' : 'metadata'
}
}
}
)
return response ['knowledgeBase' ]['knowledgeBaseId' ]
def store_conversation_in_kb (self , conversation : dict , title : str ) -> str :
"""Store conversation in Bedrock Knowledge Base"""
if not self .knowledge_base_id :
raise ValueError ("Knowledge Base ID not configured" )
# Format conversation for knowledge base
content = self .format_conversation_for_kb (conversation , title )
# Create data source document
document_id = f"conversation_{ uuid .uuid4 ().hex } "
# Upload to S3 first (required for Bedrock KB)
s3_key = self .upload_to_s3_for_kb (content , document_id )
# Trigger ingestion job
response = self .bedrock_agent_client .start_ingestion_job (
knowledgeBaseId = self .knowledge_base_id ,
dataSourceId = self .get_or_create_data_source (),
description = f"Ingestion job for conversation: { title } "
)
return response ['ingestionJob' ]['ingestionJobId' ]
def retrieve_relevant_context (self , query : str , max_results : int = 5 ) -> List [Dict ]:
"""Retrieve relevant context from Knowledge Base"""
if not self .knowledge_base_id :
return []
response = self .bedrock_agent_client .retrieve (
knowledgeBaseId = self .knowledge_base_id ,
retrievalQuery = {'text' : query },
retrievalConfiguration = {
'vectorSearchConfiguration' : {
'numberOfResults' : max_results ,
'overrideSearchType' : 'HYBRID'
}
}
)
return [
{
'content' : result ['content' ]['text' ],
'score' : result ['score' ],
'source' : result .get ('location' , {})
}
for result in response ['retrievalResults' ]
]
3. OpenSearch Serverless Integration
# OpenSearch Serverless for Vector Search
class OpenSearchVectorStore :
def __init__ (self , collection_endpoint : str , region : str = "us-west-2" ):
self .collection_endpoint = collection_endpoint
self .region = region
# Setup authenticated client
credentials = boto3 .Session ().get_credentials ()
auth = AWS4Auth (
credentials .access_key ,
credentials .secret_key ,
region ,
'aoss' ,
session_token = credentials .token
)
self .client = OpenSearch (
hosts = [{'host' : collection_endpoint , 'port' : 443 }],
http_auth = auth ,
use_ssl = True ,
verify_certs = True ,
connection_class = RequestsHttpConnection ,
pool_maxsize = 20
)
def create_vector_index (self , index_name : str , dimension : int = 1536 ):
"""Create optimized vector index for agent conversations"""
index_body = {
"settings" : {
"index.knn" : True ,
"number_of_shards" : 2 ,
"number_of_replicas" : 1
},
"mappings" : {
"properties" : {
"vector" : {
"type" : "knn_vector" ,
"dimension" : dimension ,
"method" : {
"name" : "hnsw" ,
"space_type" : "cosinesimil" ,
"engine" : "lucene" ,
"parameters" : {
"ef_construction" : 128 ,
"m" : 24
}
}
},
"text" : {"type" : "text" },
"metadata" : {
"properties" : {
"session_id" : {"type" : "keyword" },
"timestamp" : {"type" : "date" },
"agent_model" : {"type" : "keyword" },
"conversation_type" : {"type" : "keyword" }
}
}
}
}
}
self .client .indices .create (index = index_name , body = index_body )
def store_conversation_vector (self , conversation_id : str , text : str ,
vector : List [float ], metadata : Dict ):
"""Store conversation with vector embedding"""
doc = {
"text" : text ,
"vector" : vector ,
"metadata" : metadata
}
self .client .index (
index = "agent-conversations" ,
id = conversation_id ,
body = doc
)
def semantic_search (self , query_vector : List [float ],
filters : Dict = None , k : int = 5 ) -> List [Dict ]:
"""Perform semantic search with optional metadata filters"""
search_body = {
"size" : k ,
"query" : {
"bool" : {
"must" : [
{
"knn" : {
"vector" : {
"vector" : query_vector ,
"k" : k
}
}
}
]
}
}
}
# Add metadata filters
if filters :
filter_clauses = []
for key , value in filters .items ():
filter_clauses .append ({"term" : {f"metadata.{ key } " : value }})
search_body ["query" ]["bool" ]["filter" ] = filter_clauses
response = self .client .search (index = "agent-conversations" , body = search_body )
return [
{
"id" : hit ["_id" ],
"score" : hit ["_score" ],
"text" : hit ["_source" ]["text" ],
"metadata" : hit ["_source" ]["metadata" ]
}
for hit in response ["hits" ]["hits" ]
]
π Production Deployment Patterns
Infrastructure as Code (CDK)
# AWS CDK Stack for Strands Agents Infrastructure
from aws_cdk import (
Stack , Duration ,
aws_s3 as s3 ,
aws_opensearchserverless as opensearch ,
aws_iam as iam ,
aws_lambda as lambda_ ,
aws_events as events ,
aws_events_targets as targets
)
class StrandsAgentsInfraStack (Stack ):
def __init__ (self , scope , construct_id , ** kwargs ):
super ().__init__ (scope , construct_id , ** kwargs )
# S3 Bucket for conversation storage
self .conversation_bucket = s3 .Bucket (
self , "ConversationBucket" ,
bucket_name = "strands-agents-conversations" ,
encryption = s3 .BucketEncryption .KMS_MANAGED ,
versioned = True ,
lifecycle_rules = [
s3 .LifecycleRule (
id = "ConversationLifecycle" ,
enabled = True ,
transitions = [
s3 .Transition (
storage_class = s3 .StorageClass .INFREQUENT_ACCESS ,
transition_after = Duration .days (30 )
),
s3 .Transition (
storage_class = s3 .StorageClass .GLACIER ,
transition_after = Duration .days (90 )
)
]
)
]
)
# OpenSearch Serverless Collection
self .vector_collection = opensearch .CfnCollection (
self , "VectorCollection" ,
name = "strands-agents-vectors" ,
type = "VECTORSEARCH" ,
description = "Vector storage for agent conversations"
)
# IAM Role for Bedrock Knowledge Base
self .kb_role = iam .Role (
self , "BedrockKBRole" ,
assumed_by = iam .ServicePrincipal ("bedrock.amazonaws.com" ),
managed_policies = [
iam .ManagedPolicy .from_aws_managed_policy_name (
"AmazonBedrockFullAccess"
)
]
)
# Grant permissions for S3 and OpenSearch
self .conversation_bucket .grant_read_write (self .kb_role )
# Lambda for agent coordination
self .coordination_lambda = lambda_ .Function (
self , "AgentCoordinator" ,
runtime = lambda_ .Runtime .PYTHON_3_11 ,
handler = "coordinator.handler" ,
code = lambda_ .Code .from_asset ("lambda" ),
environment = {
"CONVERSATION_BUCKET" : self .conversation_bucket .bucket_name ,
"VECTOR_COLLECTION" : self .vector_collection .name ,
"KB_ROLE_ARN" : self .kb_role .role_arn
},
timeout = Duration .minutes (15 )
)
# EventBridge rule for scheduled coordination
coordination_rule = events .Rule (
self , "CoordinationSchedule" ,
schedule = events .Schedule .cron (hour = "8" , minute = "0" )
)
coordination_rule .add_target (
targets .LambdaFunction (self .coordination_lambda )
)
Monitoring & Observability
# CloudWatch Integration
import boto3
from datetime import datetime
class AWSMonitoring :
def __init__ (self ):
self .cloudwatch = boto3 .client ('cloudwatch' )
self .logs = boto3 .client ('logs' )
def emit_agent_metrics (self , agent_id : str , metrics : Dict [str , float ]):
"""Emit custom metrics to CloudWatch"""
metric_data = []
for metric_name , value in metrics .items ():
metric_data .append ({
'MetricName' : metric_name ,
'Dimensions' : [
{'Name' : 'AgentId' , 'Value' : agent_id },
{'Name' : 'Environment' , 'Value' : os .environ .get ('ENVIRONMENT' , 'dev' )}
],
'Value' : value ,
'Unit' : 'Count' ,
'Timestamp' : datetime .utcnow ()
})
self .cloudwatch .put_metric_data (
Namespace = 'StrandsAgents' ,
MetricData = metric_data
)
def create_agent_dashboard (self ) -> str :
"""Create CloudWatch dashboard for agent monitoring"""
dashboard_body = {
"widgets" : [
{
"type" : "metric" ,
"properties" : {
"metrics" : [
["StrandsAgents" , "ConversationCount" , "Environment" , "production" ],
["." , "ErrorRate" , "." , "." ],
["." , "ResponseTime" , "." , "." ]
],
"period" : 300 ,
"stat" : "Average" ,
"region" : "us-west-2" ,
"title" : "Agent Performance Metrics"
}
}
]
}
response = self .cloudwatch .put_dashboard (
DashboardName = 'StrandsAgents-Production' ,
DashboardBody = json .dumps (dashboard_body )
)
return response ['DashboardArn' ]