Skip to content

Instantly share code, notes, and snippets.

@zxkane
Last active November 7, 2025 08:30
Show Gist options
  • Select an option

  • Save zxkane/2b9d7da3cdb08b4ea91bcbc7235ef6f0 to your computer and use it in GitHub Desktop.

Select an option

Save zxkane/2b9d7da3cdb08b4ea91bcbc7235ef6f0 to your computer and use it in GitHub Desktop.
invoke MCP hosted on AWS AgentCore Runtime/Gateway. Companion code for blog post https://kane.mx/posts/2025/invoke-mcp-hosted-on-aws-agentcore/
# Generic OAuth 2.0 MCP Client Configuration
# Copy this file to .env.local and fill in your actual values
# ==============================================================================
# Required Configuration
# ==============================================================================
# OAuth 2.0 Provider Configuration
OAUTH_DISCOVERY_URL=https://accounts.google.com/.well-known/openid_configuration
OAUTH_CLIENT_ID=your_oauth_client_id
# OAuth Scopes (Optional - overrides auto-discovery)
# Leave commented unless gateway requires specific resource server scopes
# The client will auto-discover supported scopes from the OAuth provider
# Set this ONLY for gateways with custom resource server scopes (like AgentCore gateways)
# Format: space-separated list including standard scopes + gateway-specific scopes
# Example: openid email https://GATEWAY_URL/mcp/read https://GATEWAY_URL/mcp/write
# OAUTH_SCOPES=
# ==============================================================================
# MCP Server Endpoint Configuration (choose ONE of the following)
# ==============================================================================
# Option 1: Direct MCP Endpoint (RECOMMENDED for flexibility)
# Works with ANY MCP server: AgentCore runtime, gateway, or third-party
#
# The MCP endpoint URL will be automatically included as the 'resource' parameter
# in OAuth requests per RFC 8707 (Resource Indicators for OAuth 2.0).
# This enables resource-scoped tokens for better security.
#
# Examples:
# - AgentCore Gateway: https://GATEWAY_NAME.gateway.bedrock-agentcore.REGION.amazonaws.com/mcp
# - AgentCore Runtime: https://bedrock-agentcore.REGION.amazonaws.com/runtimes/ARN/invocations?qualifier=DEFAULT
# - Third-party MCP: https://your-mcp-server.example.com/mcp
# MCP_ENDPOINT=
# Option 2: AgentCore Runtime ARN (legacy, auto-constructs URL)
# Format: arn:aws:bedrock-agentcore:{region}:{account id}:runtime/{runtime id}
# Note: If MCP_ENDPOINT is set, this will be ignored
# AGENTCORE_RUNTIME_ARN=
# ==============================================================================
# Optional Configuration (has defaults)
# ==============================================================================
# AWS Configuration
AGENTCORE_REGION=us-west-2
AWS_PROFILE=default
# RFC 8707 Resource Indicators (Optional)
# Enable RFC 8707 support to include 'resource' parameter in OAuth requests
# This provides better token scoping but is not supported by all OAuth providers
# Default: false (disabled for maximum compatibility)
# Set to 'true' only if your OAuth provider supports RFC 8707
# OAUTH_ENABLE_RFC8707=false
# ==============================================================================
# Mode-Specific Configuration (optional, depends on authentication mode)
# ==============================================================================
# Quick Mode Configuration (Mode 2)
# ⚠️ IMPORTANT: Quick Mode only works with AWS Cognito providers
# ⚠️ Your Cognito client MUST have USER_PASSWORD_AUTH flow enabled
# Required only for testing with existing user credentials
# Uncomment to enable when using Mode 2
# OAUTH_TEST_USERNAME=test@example.com
# OAUTH_TEST_PASSWORD=SecurePass123!
# M2M Mode Configuration (Mode 3)
# Required only for Machine-to-Machine authentication (client_credentials flow)
# Uncomment to enable when using Mode 3
# OAUTH_CLIENT_SECRET=your_oauth_client_secret
# M2M Scopes Configuration
# Space-separated list of resource scopes for client_credentials grant type
# These are different from interactive OAuth scopes (openid email profile)
# Examples: mcp-server/read mcp-server/write or api:read api:write
# Uncomment to enable when using Mode 3
# OAUTH_M2M_SCOPES="mcp-server/read mcp-server/write"
# ==============================================================================
# OAuth 2.0 Provider Examples
# ==============================================================================
# Google OAuth 2.0
# OAUTH_DISCOVERY_URL=https://accounts.google.com/.well-known/openid_configuration
# OAUTH_CLIENT_ID=your_google_client_id.googleusercontent.com
# Microsoft Azure AD
# OAUTH_DISCOVERY_URL=https://login.microsoftonline.com/{tenant-id}/v2.0/.well-known/openid_configuration
# OAUTH_CLIENT_ID=your_azure_application_id
# Auth0
# OAUTH_DISCOVERY_URL=https://your-domain.auth0.com/.well-known/openid_configuration
# OAUTH_CLIENT_ID=your_auth0_client_id
# AWS Cognito (as OAuth 2.0 provider)
# OAUTH_DISCOVERY_URL=https://cognito-idp.region.amazonaws.com/user-pool-id/.well-known/openid-configuration
# OAUTH_CLIENT_ID=your_cognito_client_id
#
# Note: For AWS Cognito, use the full OpenID Connect discovery URL:
# https://cognito-idp.{region}.amazonaws.com/{user-pool-id}/.well-known/openid-configuration
#
# Example:
# OAUTH_DISCOVERY_URL=https://cognito-idp.us-west-2.amazonaws.com/us-west-2_AbCdEfGhI/.well-known/openid-configuration
# OAUTH_CLIENT_ID=client_id
#
# ⚠️ The issuer URL (from discovery response) should match the "iss" claim in JWT tokens from Cognito
# ⚠️ This enables proper OpenID Connect discovery and endpoint configuration
#
# ⚠️ For Quick Mode (Mode 2), ensure your Cognito User Pool Client has:
# - USER_PASSWORD_AUTH flow enabled
# - Explicit auth flows: ALLOW_USER_PASSWORD_AUTH checked
#
# AWS CLI command to enable USER_PASSWORD_AUTH:
# aws cognito-idp update-user-pool-client \
# --user-pool-id your-pool-id \
# --client-id your-client-id \
# --explicit-auth-flows ALLOW_USER_PASSWORD_AUTH ALLOW_REFRESH_TOKEN_AUTH
# Okta
# OAUTH_DISCOVERY_URL=https://your-domain.okta.com/oauth2/default/.well-known/openid_configuration
# OAUTH_CLIENT_ID=your_okta_client_id
# Generic OpenID Connect Provider
# OAUTH_DISCOVERY_URL=https://your-oidc-provider.com/.well-known/openid_configuration
# OAUTH_CLIENT_ID=your_client_id
# ==============================================================================
# Usage Instructions
# ==============================================================================
# 1. Copy this file: cp .env.example .env.local
# 2. Fill in your OAuth 2.0 provider details in .env.local
# 3. Load environment: source .env.local (or use python-dotenv)
# 4. Run the client: python mcp_oauth_standard_client.py
# Authentication Modes:
# - Mode 1 (Manual): Interactive OAuth flow with browser authorization (works with any OAuth 2.0 provider)
# - Mode 2 (Quick): Direct authentication using test user credentials (AWS Cognito ONLY, requires USER_PASSWORD_AUTH enabled)
# - Mode 3 (M2M): Client credentials flow for service-to-service authentication (works with any OAuth 2.0 provider)
# Features:
# - Auto-discovery of OAuth endpoints via /.well-known/openid_configuration
# - Proper OpenID Connect issuer URL validation (matches JWT "iss" claim)
# - Automatic scope detection (provider-specific optimizations)
# - Fallback support for providers without well-known endpoints
# - Works with any OAuth 2.0/OpenID Connect compliant provider
# For production deployments, use Mode 3 (M2M) for service-to-service authentication
# For development and testing, Mode 1 (Manual) provides step-by-step OAuth flow
# For quick testing with existing user, Mode 2 (Quick) bypasses browser interaction
# ==============================================================================
# Dynamic Tool Invocation Configuration (Optional)
# ==============================================================================
# Automatically invoke a specific MCP tool for testing
# Both OAuth and SigV4 clients support this feature
# Tool name to invoke (optional)
# Example: MCP_TEST_TOOL_NAME=echo
# MCP_TEST_TOOL_NAME=
# Tool parameters as JSON string (optional)
# Must be valid JSON format
# Example: MCP_TEST_TOOL_PARAMS={"message": "Hello from MCP!"}
# Example: MCP_TEST_TOOL_PARAMS={"action": "list", "limit": 10}
# MCP_TEST_TOOL_PARAMS=
# Usage Examples:
# 1. Echo tool test:
# MCP_TEST_TOOL_NAME=echo
# MCP_TEST_TOOL_PARAMS={"message": "Test message"}
#
# 2. List operation:
# MCP_TEST_TOOL_NAME=list_items
# MCP_TEST_TOOL_PARAMS={"category": "all", "max_results": 5}
#
# 3. Search operation:
# MCP_TEST_TOOL_NAME=search
# MCP_TEST_TOOL_PARAMS={"query": "example", "fields": ["title", "description"]}
.env.local*
__pycache__/

MCP Client Implementations

This directory contains different MCP client authentication implementations for connecting to the MCP servers through AWS AgentCore Runtime.

Available Clients

1. mcp_oauth_standard_client.py ⭐ RECOMMENDED

  • Authentication: OAuth 2.0 with OpenID Connect
  • Features:
    • Multiple authentication modes (manual, quick, M2M, native SDK)
    • Environment variable configuration (no hardcoded values)
    • Flexible endpoint support via MCP_ENDPOINT (gateway, runtime, or any MCP server)
    • Smart scope selection: Automatically uses provider's supported scopes from well-known config
    • RFC 8707 support (optional): Includes resource parameter in OAuth requests
    • Enhanced error handling and debugging
    • Supports both 401 and 403 response handling
    • Browser-based OAuth flow with callback handling
    • Native MCP SDK OAuth integration with automatic token refresh
    • Automatic .env file loading for configuration
    • Dynamic tool invocation support
  • Use Case: Production OAuth authentication with user interaction
  • Configuration: Uses .env.example as reference for required environment variables
  • Modes:
    • Mode 1 - Manual: Interactive OAuth with browser redirect and step-by-step debugging
    • Mode 2 - Quick: Direct authentication using test credentials (AWS Cognito only)
    • Mode 3 - M2M: Client credentials flow for service-to-service authentication
    • Mode 4 - Native SDK: ⭐ NEW - Uses official MCP SDK OAuth with AgentCore compatibility (auto-detects M2M vs Interactive)

2. mcp_sigv4_client.py

  • Authentication: AWS SigV4 (IAM credentials)
  • Features:
    • Uses AWS IAM credentials (access keys, profiles, roles)
    • No user interaction required
    • Direct HTTP request testing
    • CLI-based configuration with flexible endpoint options
    • Supports both AgentCore runtime ARN and direct MCP gateway endpoint
    • Supports custom AWS regions
    • Verbose mode (--verbose / -v) for detailed debugging
    • Automatic .env file loading for configuration
    • Dynamic tool invocation support
  • Use Case: Service-to-service authentication using IAM
  • Requirements:
    • Proper IAM permissions for AgentCore runtime access
    • Either AgentCore runtime ARN or direct MCP endpoint URL (one required)

Usage

OAuth Authentication (Recommended)

  1. Set up configuration:
# Copy example configuration
cp .env.example .env.local

# Edit .env.local with your actual values
# At minimum, set these required variables:
# OAUTH_DISCOVERY_URL=https://your-oauth-provider/.well-known/openid_configuration
# OAUTH_CLIENT_ID=your_oauth_client_id

# Option 1 (Recommended): Use direct MCP endpoint
# MCP_ENDPOINT=https://your-mcp-server.example.com/mcp
# Works with: gateway, runtime, or any MCP server

# Option 2: Use AgentCore runtime ARN (auto-constructs URL)
# AGENTCORE_RUNTIME_ARN=arn:aws:bedrock-agentcore:region:account:runtime/agent_name

For Gateway mode:

# Copy gateway configuration
cp .env.example .env.local

# Edit .env.local with your OAuth provider and gateway endpoint
  1. Load environment and run:
# Load environment variables
source .env.local

# Run the client
python mcp_oauth_standard_client.py
  1. Select authentication mode:
    • Mode 1 (Manual): Interactive OAuth flow with browser redirect and detailed debugging
    • Mode 2 (Quick): Uses existing test user credentials (AWS Cognito only)
    • Mode 3 (M2M): Machine-to-machine client_credentials flow (requires client secret)
    • Mode 4 (Native SDK): ⭐ RECOMMENDED - Official MCP SDK OAuth with automatic token management (auto-detects M2M vs Interactive based on client_secret)

IAM Authentication

# Ensure AWS credentials are configured
aws configure  # or use AWS_PROFILE, IAM roles, etc.

# Optional: Set AWS profile (defaults to 'default' if not set)
export AWS_PROFILE=your-profile-name

# Option 1: Using AgentCore runtime ARN
python mcp_sigv4_client.py --agent-runtime-arn arn:aws:bedrock-agentcore:us-west-2:YOUR_ACCOUNT:runtime/YOUR_AGENT

# With custom region
python mcp_sigv4_client.py --agent-runtime-arn arn:aws:bedrock-agentcore:us-east-1:YOUR_ACCOUNT:runtime/YOUR_AGENT --region us-east-1

# Option 2: Using direct MCP gateway endpoint
python mcp_sigv4_client.py --mcp-endpoint https://GATEWAY_NAME.gateway.bedrock-agentcore.us-west-2.amazonaws.com/mcp

# With verbose output (shows full tool descriptions and schemas)
python mcp_sigv4_client.py --mcp-endpoint https://GATEWAY_NAME.gateway.bedrock-agentcore.us-west-2.amazonaws.com/mcp --verbose

# Show help for all options
python mcp_sigv4_client.py --help

Configuration

OAuth Client Configuration

OAuth clients use environment variables defined in .env.local:

  • OAUTH_DISCOVERY_URL: OAuth 2.0 provider's OpenID Connect discovery URL
  • OAUTH_CLIENT_ID: OAuth 2.0 client ID
  • OAUTH_SCOPES (optional): Custom OAuth scopes - required for gateways with resource server scopes
  • MCP_ENDPOINT: Direct MCP server URL (recommended - supports any MCP server)
  • AGENTCORE_RUNTIME_ARN: AgentCore runtime ARN (alternative, auto-constructs URL)
  • OAUTH_CLIENT_SECRET: Client secret (required for M2M mode only)
  • OAUTH_M2M_SCOPES: Scopes for M2M mode (overrides OAUTH_SCOPES in M2M)
  • OAUTH_TEST_USERNAME/PASSWORD: Test credentials (required for quick mode only)
  • OAUTH_ENABLE_RFC8707 (optional): Enable RFC 8707 resource parameter (default: false)

Important:

  • Specify either MCP_ENDPOINT (recommended) or AGENTCORE_RUNTIME_ARN. If both are set, MCP_ENDPOINT takes priority.
  • For AgentCore gateways with OAuth, OAUTH_SCOPES is optional. The gateway validates token issuer and client_id. Gateway-specific resource server scopes (like .../mcp/read) are only needed for fine-grained permission control.

MCP_ENDPOINT Examples:

  • Gateway: https://GATEWAY_NAME.gateway.bedrock-agentcore.REGION.amazonaws.com/mcp
  • Runtime: https://bedrock-agentcore.REGION.amazonaws.com/runtimes/ENCODED_ARN/invocations?qualifier=DEFAULT
  • Third-party: https://your-mcp-server.example.com/mcp

IAM Client Configuration

IAM clients require CLI arguments (exactly one of the following):

  • --agent-runtime-arn: AgentCore runtime ARN (for runtime mode)
  • --mcp-endpoint: Direct MCP endpoint URL (for gateway mode)
  • --region: AWS region (optional, defaults to us-west-2)

AWS Profile:

  • Uses AWS_PROFILE environment variable to select AWS profile
  • Falls back to 'default' profile if AWS_PROFILE is not set
  • Example: export AWS_PROFILE=my-profile

Important: You must specify exactly one of --agent-runtime-arn or --mcp-endpoint. Specifying both or neither will result in an error.

Example formats:

  • Runtime ARN: arn:aws:bedrock-agentcore:REGION:ACCOUNT:runtime/AGENT_NAME
  • Gateway endpoint: https://GATEWAY_NAME.gateway.bedrock-agentcore.REGION.amazonaws.com/mcp

OAuth Client Setup Requirements

AWS Cognito Configuration

To use OAuth authentication modes, your AWS Cognito User Pool App Client must be configured correctly:

Required App Client Settings

  1. Allowed callback URLs:

    • http://localhost:3000
  2. Allowed sign-out URLs:

    • http://localhost:3000
  3. OAuth 2.0 grant types:

    • βœ… Authorization code grant (required for modes 1 and 4)
    • βœ… Client credentials (required for mode 3 only)
  4. OAuth 2.0 scopes:

    • βœ… openid
    • βœ… email
    • βœ… aws.cognito.signin.user.admin

Mode-Specific Requirements

Mode Client Secret User Pool Additional Requirements
Mode 1 (Manual) Optional Yes Test user account
Mode 2 (Quick) No Yes Test user credentials in .env.local
Mode 3 (M2M) Required No Resource server with custom scopes (optional)
Mode 4 (Native SDK) Optional/Required* Yes/No* Auto-detects: Interactive if no client_secret, M2M if client_secret provided

AWS CLI Configuration Commands

# Enable required OAuth flows
aws cognito-idp update-user-pool-client \
  --user-pool-id us-west-2_YourPoolId \
  --client-id your_client_id \
  --allowed-o-auth-flows "code" "client_credentials" \
  --allowed-o-auth-scopes "openid" "email" "aws.cognito.signin.user.admin" \
  --callback-ur-ls "http://localhost:3000" \
  --logout-ur-ls "http://localhost:3000" \
  --allowed-o-auth-flows-user-pool-client

# Generate client secret (for M2M mode)
aws cognito-idp update-user-pool-client \
  --user-pool-id us-west-2_YourPoolId \
  --client-id your_client_id \
  --generate-secret

Generic OAuth 2.0 Provider Configuration

For non-Cognito providers, ensure:

  1. OpenID Connect Discovery endpoint is accessible
  2. Redirect URIs configured: http://localhost:3000
  3. Response type: code
  4. Grant types: authorization_code, refresh_token (and client_credentials for M2M)
  5. Scopes: openid, email, profile (adjust as needed)

Troubleshooting

OAuth Issues

Common Configuration Issues

  • βœ… Callback URLs: http://localhost:3000 must be configured as allowed callback URL
  • βœ… OAuth Flows: Ensure authorization_code is enabled for interactive modes
  • βœ… Scopes: Verify openid email aws.cognito.signin.user.admin are allowed
  • βœ… Client Secret: Required for Mode 3 (M2M), optional for others

Mode-Specific Troubleshooting

Mode 1 (Manual):

  • Check browser console for CORS issues
  • Verify callback URL is correctly copied from browser address bar
  • Ensure test user exists and password is correct

Mode 2 (Quick):

  • Requires OAUTH_TEST_USERNAME and OAUTH_TEST_PASSWORD in .env.local
  • Only works with AWS Cognito (not generic OAuth providers)
  • User must exist in Cognito User Pool

Mode 3 (M2M):

  • Client secret is mandatory - generate using AWS CLI or console
  • May require custom resource server scopes for advanced use cases
  • Verify client_credentials flow is enabled

Mode 4 (Native SDK) ⭐:

  • Auto-detects authentication mode: Interactive (no client_secret) or M2M (with client_secret)
  • Interactive mode: Browser-based authentication with automatic redirect handling
  • M2M mode: Client credentials flow with no user interaction required
  • Handles AgentCore's 403 responses automatically for both modes
  • Works with cross-domain OAuth (Cognito) and MCP (AgentCore) servers
  • Automatic token refresh: uses refresh_token (Interactive) or client_credentials (M2M)
  • Protected resource metadata is configured automatically

IAM Issues

  • Check AWS credentials: aws sts get-caller-identity
  • Verify AWS profile: Check AWS_PROFILE environment variable or use --profile with AWS CLI
  • Ensure IAM user/role has AgentCore runtime permissions
  • Verify region configuration
  • Test with specific profile: AWS_PROFILE=my-profile python mcp_sigv4_client.py ...

Dynamic Tool Invocation

Both clients support dynamic tool invocation for testing via environment variables or .env files:

# In .env.local or as environment variables
MCP_TEST_TOOL_NAME=your_tool_name
MCP_TEST_TOOL_PARAMS={"param1": "value1", "param2": "value2"}

When these are configured, the clients will:

  1. List all available tools from the MCP server
  2. Automatically invoke the specified tool with the provided parameters
  3. Display the result or error information

Example Configuration:

# .env.local
MCP_TEST_TOOL_NAME=echo
MCP_TEST_TOOL_PARAMS={"message": "Hello from MCP!"}

Both clients automatically load environment variables from .env.local or .env files if python-dotenv is installed.

πŸ“– For detailed examples and troubleshooting, see TOOL_TESTING.md

Shared Utilities

The project includes a unified shared module mcp_session_manager.py that provides all common functionality:

Environment and Configuration

  • load_env_files(): Automatic loading of .env files
  • get_test_tool_config(): Parse tool configuration from environment
  • print_tool_invocation_help(): Display help for tool testing

Tool Invocation

  • invoke_test_tool(): Dynamic tool invocation based on environment config

Session Management

  • list_and_display_tools(): List and display MCP tools with optional verbose output
  • list_and_display_resources(): List and display MCP resources with optional verbose output
  • initialize_and_test_session(): Initialize MCP session and run basic operations
  • print_session_summary(): Print formatted session test summary

Both OAuth and SigV4 clients use this shared module for consistent behavior and reduced code duplication.

Dependencies

Both clients require the same core dependencies:

pip install boto3 httpx mcp

# Optional but recommended for .env file support
pip install python-dotenv

Note: Both clients only use standard library modules beyond these core dependencies:

  • OAuth client: Uses asyncio, json, os, webbrowser, urllib.parse
  • SigV4 client: Uses argparse, asyncio, json, os, sys, plus botocore (included with boto3)
  • Shared utilities: mcp_session_manager.py module for session management and tool testing

Smart Scope Selection

Automatic Scope Discovery

The OAuth client automatically selects appropriate scopes based on the OAuth provider's capabilities discovered from the well-known configuration endpoint.

How it works:

  1. Discover supported scopes: Reads scopes_supported from .well-known/openid-configuration
  2. Select available scopes: Requests common scopes (openid, email, profile) that are supported
  3. Fallback gracefully: If no standard scopes are available, uses all provider-supported scopes
  4. Provider-specific defaults: Falls back to provider-specific defaults if discovery doesn't return scopes

Example with Cognito:

// From well-known configuration
{
  "scopes_supported": ["openid", "email", "phone", "profile"]
}

// Client will request: "openid email profile"
// (intersection of requested and supported scopes)

Benefits:

  • βœ… Maximum compatibility: Works with any OAuth provider
  • βœ… No hardcoded scopes: Uses provider's actual capabilities
  • βœ… Graceful degradation: Falls back to defaults if discovery fails
  • βœ… Future-proof: Adapts to provider changes automatically

Fallback behavior:

  • AWS Cognito (no discovery): openid email
  • Generic providers (no discovery): openid email profile

RFC 8707 Support (Optional)

Resource Indicators for OAuth 2.0

The OAuth client supports RFC 8707 which allows clients to explicitly indicate the target resource server in OAuth requests.

⚠️ Important: RFC 8707 is disabled by default for maximum compatibility. Many OAuth providers (including AWS Cognito) don't support the resource parameter and will reject requests with "Invalid request" or invalid_grant errors.

For Native SDK Mode (Mode 4): RFC 8707 is automatically disabled unless explicitly enabled via OAUTH_ENABLE_RFC8707=true. This prevents MCP SDK from adding the resource parameter to OAuth requests.

Enable RFC 8707:

# In .env.local
OAUTH_ENABLE_RFC8707=true

How it works when enabled:

The client includes the resource parameter in:

  1. Authorization requests (authorization endpoint)
  2. Token requests (token endpoint)
  3. M2M token requests (client_credentials flow)

The resource parameter contains the MCP server URL, allowing the OAuth provider to:

  • Issue tokens scoped specifically for that resource server
  • Validate the client has permission to access that resource
  • Enable resource-specific token claims and audience restrictions

Example OAuth request with resource parameter:

GET /authorize?
  response_type=code
  &client_id=your_client_id
  &redirect_uri=http://localhost:3000
  &scope=openid+email
  &resource=https://gateway.bedrock-agentcore.us-west-2.amazonaws.com/mcp
  &state=random_state

Benefits (when supported):

  • βœ… More secure: Tokens are explicitly scoped to the MCP server
  • βœ… Better token validation: OAuth provider can verify resource access
  • βœ… Multi-resource support: Different tokens for different MCP servers
  • βœ… Standards-compliant: Follows OAuth 2.0 best practices

Compatibility:

  • ❌ AWS Cognito: Does NOT support resource parameter in authorization_code flow
  • βœ… Auth0: Supports RFC 8707 (audience parameter)
  • βœ… Okta: Supports RFC 8707
  • ⚠️ Others: Check your OAuth provider's documentation

Recommendation: Leave RFC 8707 disabled unless you know your OAuth provider supports it.

#!/usr/bin/env python3
"""
MCP Client using Standard OAuth Flow with AWS Cognito
This script provides multiple OAuth authentication methods for connecting to
MCP servers through AWS AgentCore Runtime.
"""
import asyncio
import os
import webbrowser
from urllib.parse import parse_qs, urlparse, urlencode, urljoin
import boto3
import httpx
from mcp import ClientSession
from mcp.client.auth import TokenStorage, OAuthClientProvider, OAuthTokenError
from mcp.client.streamable_http import streamablehttp_client, MCP_PROTOCOL_VERSION
from mcp.shared.auth import OAuthClientInformationFull, OAuthToken, OAuthClientMetadata
import logging
from pydantic import ValidationError
from collections.abc import AsyncGenerator
# Import shared utilities
from mcp_session_manager import load_env_files, invoke_test_tool
logger = logging.getLogger(__name__)
# For native SDK OAuth support
try:
from pydantic import AnyUrl
except ImportError:
print("⚠️ pydantic not available - native SDK mode will be limited")
AnyUrl = str # Fallback for systems without pydantic
# Try to load .env files automatically
load_env_files()
class DebugTokenStorage(TokenStorage):
"""Debug token storage with detailed logging."""
def __init__(self):
self.tokens: OAuthToken | None = None
self.client_info: OAuthClientInformationFull | None = None
async def get_tokens(self) -> OAuthToken | None:
"""Get stored tokens."""
if self.tokens:
print(f"πŸ”‘ Retrieved tokens from storage: access_token={'*' * 20}... (expires: {getattr(self.tokens, 'expires_in', 'unknown')})")
else:
print("❌ No tokens in storage")
return self.tokens
async def set_tokens(self, tokens: OAuthToken) -> None:
"""Store tokens."""
print(f"πŸ’Ύ Storing tokens: access_token={'*' * 20}... (type: {getattr(tokens, 'token_type', 'unknown')})")
self.tokens = tokens
async def get_client_info(self) -> OAuthClientInformationFull | None:
"""Get stored client information."""
if self.client_info:
print(f"ℹ️ Retrieved client info: client_id={getattr(self.client_info, 'client_id', 'unknown')}")
return self.client_info
async def set_client_info(self, client_info: OAuthClientInformationFull) -> None:
"""Store client information."""
print(f"πŸ’Ύ Storing client info: client_id={getattr(client_info, 'client_id', 'unknown')}")
self.client_info = client_info
class OAuth2Handler:
"""Generic OAuth 2.0 handler with well-known endpoint discovery.
Supports RFC 8707 resource parameter for indicating target resource server.
"""
def __init__(self, discovery_url: str, client_id: str, client_secret: str = None,
resource_server: str = None, enable_rfc8707: bool = False, custom_scopes: str = None):
self.discovery_url = discovery_url.rstrip('/')
self.client_id = client_id
self.client_secret = client_secret
self.resource_server = resource_server # RFC 8707: target resource server URL
self.enable_rfc8707 = enable_rfc8707 # Flag to enable RFC 8707 resource parameter
self.custom_scopes = custom_scopes # Custom OAuth scopes (overrides auto-discovery)
self.redirect_uri = "http://localhost:3000"
self.well_known_config = None
async def discover_endpoints(self) -> dict:
"""Discover OAuth 2.0 endpoints using well-known configuration."""
# Use the provided discovery URL directly - no assumptions about path
well_known_url = self.discovery_url
print(f"πŸ” Discovering OAuth 2.0 endpoints from: {well_known_url}")
async with httpx.AsyncClient() as client:
try:
response = await client.get(well_known_url, timeout=10.0)
if response.status_code == 200:
config = response.json()
self.well_known_config = config
print(f"βœ… Discovery successful!")
print(f" Authorization endpoint: {config.get('authorization_endpoint', 'Not found')}")
print(f" Token endpoint: {config.get('token_endpoint', 'Not found')}")
print(f" Issuer: {config.get('issuer', 'Not found')}")
print(f" Supported scopes: {config.get('scopes_supported', 'Not listed')}")
return config
else:
raise ValueError(f"Discovery failed: HTTP {response.status_code}")
except Exception as e:
print(f"❌ OAuth 2.0 discovery failed: {e}")
print(f"πŸ”„ Cannot discover endpoints from provided URL")
print(f"πŸ’‘ Please provide valid discovery URL or configure endpoints manually")
raise ValueError(f"Discovery failed and no fallback available: {e}")
async def get_authorization_url(self) -> str:
"""Generate the authorization URL using discovered endpoints."""
if not self.well_known_config:
await self.discover_endpoints()
auth_endpoint = self.well_known_config.get('authorization_endpoint')
if not auth_endpoint:
raise ValueError("No authorization endpoint found in OAuth 2.0 configuration")
# Determine scopes - priority order:
# 1. Custom scopes (from configuration) - highest priority
# 2. Provider's supported scopes (from well-known config)
# 3. Provider-specific defaults (fallback)
if self.custom_scopes:
# Use explicitly configured custom scopes
scope = self.custom_scopes
print(f"πŸ”§ Using custom configured scopes: {scope}")
else:
# Auto-discover scopes
scopes_supported = self.well_known_config.get('scopes_supported', [])
if scopes_supported:
# Use scopes from well-known configuration
# Request common OpenID Connect scopes that are supported
requested_scopes = ['openid', 'email', 'profile']
available_scopes = [s for s in requested_scopes if s in scopes_supported]
if not available_scopes:
# If none of our requested scopes are available, use all supported scopes
available_scopes = scopes_supported
scope = ' '.join(available_scopes)
print(f"πŸ”§ Using provider's supported scopes: {scope}")
else:
# Fallback to provider-specific defaults
if 'cognito-idp' in self.discovery_url.lower() or 'cognito' in self.discovery_url.lower() or 'amazoncognito' in self.discovery_url.lower():
scope = 'openid email'
print("πŸ”§ Detected AWS Cognito - using Cognito-specific scopes")
else:
scope = 'openid email profile'
print("πŸ”§ Using standard OpenID Connect scopes")
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'scope': scope,
'state': 'random_state_12345' # In production, use a secure random state
}
# RFC 8707: Add resource parameter to indicate target resource server (if enabled)
if self.enable_rfc8707 and self.resource_server:
params['resource'] = self.resource_server
print(f"πŸ”§ RFC 8707: Including resource server in auth request: {self.resource_server}")
auth_url = f"{auth_endpoint}?{urlencode(params)}"
return auth_url
async def exchange_code_for_tokens(self, authorization_code: str) -> dict:
"""Exchange authorization code for tokens using discovered endpoints."""
if not self.well_known_config:
await self.discover_endpoints()
token_endpoint = self.well_known_config.get('token_endpoint')
if not token_endpoint:
raise ValueError("No token endpoint found in OAuth 2.0 configuration")
data = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'code': authorization_code,
'redirect_uri': self.redirect_uri
}
# RFC 8707: Include resource parameter in token request (if enabled)
if self.enable_rfc8707 and self.resource_server:
data['resource'] = self.resource_server
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
print(f"πŸ”„ Exchanging authorization code for tokens...")
print(f" Token endpoint: {token_endpoint}")
print(f" Authorization code: {authorization_code[:20]}...")
if self.enable_rfc8707 and self.resource_server:
print(f" Resource (RFC 8707): {self.resource_server}")
async with httpx.AsyncClient() as client:
try:
response = await client.post(token_endpoint, data=data, headers=headers)
print(f" Response status: {response.status_code}")
if response.status_code == 200:
tokens = response.json()
print(f"βœ… Token exchange successful!")
print(f" Access token: {'*' * 20}... (expires_in: {tokens.get('expires_in')})")
print(f" Token type: {tokens.get('token_type')}")
if 'refresh_token' in tokens:
print(f" Refresh token: {'*' * 20}...")
return tokens
else:
error_text = response.text
print(f"❌ Token exchange failed:")
print(f" Status code: {response.status_code}")
print(f" Error response: {error_text}")
raise ValueError(f"Token exchange failed: {response.status_code} - {error_text}")
except Exception as e:
print(f"❌ Network error: {e}")
raise
async def get_m2m_token(self) -> dict:
"""Get M2M access token using client_credentials flow."""
if not self.client_secret:
raise ValueError("Client secret is required for M2M authentication")
if not self.well_known_config:
await self.discover_endpoints()
token_endpoint = self.well_known_config.get('token_endpoint')
if not token_endpoint:
raise ValueError("No token endpoint found in OAuth 2.0 configuration")
# Use provider-specific scopes
if 'cognito-idp' in self.discovery_url.lower() or 'cognito' in self.discovery_url.lower() or 'amazoncognito' in self.discovery_url.lower():
# For AWS Cognito M2M, we need resource server scopes (not user pool scopes)
# Common patterns: 'read', 'write', 'admin', or custom resource server scopes
# If no resource server is configured, try without scopes or use custom ones
scope = None # Try without scope first, Cognito might have default scopes
else:
# For generic OAuth providers, use standard scopes
scope = 'openid profile'
data = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret,
}
# Only add scope if we have one
if scope:
data['scope'] = scope
# RFC 8707: Include resource parameter in M2M token request (if enabled)
if self.enable_rfc8707 and self.resource_server:
data['resource'] = self.resource_server
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
print(f"πŸ€– Requesting M2M token...")
print(f" Token endpoint: {token_endpoint}")
print(f" Client ID: {self.client_id}")
print(f" Grant type: client_credentials")
print(f" Scope: {scope or 'none (using default)'}")
if self.enable_rfc8707 and self.resource_server:
print(f" Resource (RFC 8707): {self.resource_server}")
async with httpx.AsyncClient() as client:
try:
response = await client.post(token_endpoint, data=data, headers=headers)
print(f" Response status: {response.status_code}")
if response.status_code == 200:
tokens = response.json()
print(f"βœ… M2M token request successful!")
print(f" Access token: {'*' * 20}... (expires_in: {tokens.get('expires_in')} seconds)")
print(f" Token type: {tokens.get('token_type')}")
print(f" Scope: {tokens.get('scope', 'unknown')}")
# Try to decode JWT token to see claims (for debugging)
try:
import base64
import json
access_token = tokens['access_token']
# JWT has 3 parts separated by dots: header.payload.signature
parts = access_token.split('.')
if len(parts) >= 2:
# Decode payload (add padding if needed)
payload = parts[1]
payload += '=' * (4 - len(payload) % 4) # Add padding
decoded = base64.urlsafe_b64decode(payload)
claims = json.loads(decoded)
print(f" Token claims:")
print(f" - aud (audience): {claims.get('aud', 'unknown')}")
print(f" - scope: {claims.get('scope', 'unknown')}")
print(f" - client_id: {claims.get('client_id', 'unknown')}")
print(f" - token_use: {claims.get('token_use', 'unknown')}")
except Exception as decode_error:
print(f" (Could not decode token for inspection: {decode_error})")
return tokens
else:
error_text = response.text
print(f"❌ M2M token request failed:")
print(f" Status code: {response.status_code}")
print(f" Error response: {error_text}")
raise ValueError(f"M2M token request failed: {response.status_code} - {error_text}")
except Exception as e:
print(f"❌ M2M authentication network error: {e}")
raise
async def handle_redirect_with_browser(auth_url: str) -> None:
"""Handle OAuth redirect by opening browser automatically."""
print(f"\n🌐 Opening browser for authorization...")
print(f"πŸ”— Authorization URL: {auth_url}")
try:
webbrowser.open(auth_url)
print(f"βœ… Browser opened, please complete login")
except:
print(f"⚠️ Unable to open browser automatically, please visit manually:")
print(f" {auth_url}")
print(f"\nπŸ“‹ After authorization, you will be redirected to: http://localhost:3000")
async def handle_callback_interactive() -> tuple[str, str | None]:
"""Handle OAuth callback by prompting for the callback URL."""
print(f"\nπŸ“‹ After authorization completion, copy the full URL from browser address bar:")
print(f"πŸ’‘ Hint: URL should start with 'http://localhost:3000/?code='")
while True:
callback_url = input("\nCallback URL: ").strip()
if not callback_url:
print("❌ URL cannot be empty, please try again")
continue
try:
parsed_url = urlparse(callback_url)
params = parse_qs(parsed_url.query)
if "code" not in params:
print("❌ Authorization code not found in URL (missing code parameter)")
print("πŸ’‘ Please ensure you copied the complete redirect URL")
continue
authorization_code = params["code"][0]
state = params.get("state", [None])[0]
print(f"βœ… Authorization code parsed: {authorization_code[:20]}...")
if state:
print(f" State parameter: {state}")
return authorization_code, state
except Exception as e:
print(f"❌ URL parsing error: {e}")
print("πŸ’‘ Please check URL format")
continue
async def test_mcp_with_manual_token(mcp_server_url: str, access_token: str):
"""Test MCP connection with manually obtained access token."""
print(f"\nπŸ”— Connecting to MCP server with token...")
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
try:
async with streamablehttp_client(mcp_server_url, headers) as (read, write, _):
print("βœ… MCP connection successful!")
try:
async with ClientSession(read, write) as session:
print("πŸ”„ Initializing session...")
try:
await session.initialize()
print("βœ… Session initialization successful!")
# List available tools
print("πŸ” Listing available tools...")
tools_result = await session.list_tools()
tools = tools_result.tools
if tools:
print(f"\nπŸ“š Found {len(tools)} tools:")
for i, tool in enumerate(tools, 1):
print(f" {i}. {tool.name}")
if tool.description:
print(f" Description: {tool.description[:100]}...")
# Use shared tool tester for dynamic invocation
await invoke_test_tool(session, tools)
else:
print("\n❌ No available tools")
return True
except Exception as init_error:
print(f"❌ Session initialization failed: {init_error}")
print(f" Error type: {type(init_error).__name__}")
# Try to get more details
import traceback
print("πŸ” Full error traceback:")
traceback.print_exc()
return False
except Exception as session_cleanup_error:
# Ignore session termination errors (common with servers that don't implement session/end)
if "404" in str(session_cleanup_error) or "termination" in str(session_cleanup_error).lower():
print("ℹ️ Session cleanup skipped (server doesn't support session termination)")
else:
print(f"⚠️ Session cleanup error: {session_cleanup_error}")
return True # Still return success if the main operation worked
except Exception as e:
print(f"❌ MCP connection failed: {e}")
print(f" Error type: {type(e).__name__}")
import traceback
print("πŸ” Full error traceback:")
traceback.print_exc()
return False
def load_config() -> dict:
"""Load configuration from environment variables."""
# Required OAuth configuration
required_oauth_vars = ['OAUTH_DISCOVERY_URL', 'OAUTH_CLIENT_ID']
missing_oauth_vars = [var for var in required_oauth_vars if not os.getenv(var)]
if missing_oauth_vars:
print("❌ Missing required OAuth environment variables:")
for var in missing_oauth_vars:
print(f" {var}")
print("\nπŸ’‘ Please check .env.example for reference values")
raise ValueError(f"Missing required OAuth variables: {missing_oauth_vars}")
# MCP endpoint configuration - at least one is required
mcp_endpoint = os.getenv('MCP_ENDPOINT')
agentcore_runtime_arn = os.getenv('AGENTCORE_RUNTIME_ARN')
if not mcp_endpoint and not agentcore_runtime_arn:
print("❌ Missing MCP server endpoint configuration!")
print(" Please set one of the following:")
print(" - MCP_ENDPOINT: Direct MCP server URL (any MCP server)")
print(" - AGENTCORE_RUNTIME_ARN: AgentCore runtime ARN (for runtime mode)")
print("\nπŸ’‘ Please check .env.example for reference values")
raise ValueError("Either MCP_ENDPOINT or AGENTCORE_RUNTIME_ARN must be provided")
if mcp_endpoint and agentcore_runtime_arn:
print("⚠️ Warning: Both MCP_ENDPOINT and AGENTCORE_RUNTIME_ARN are set")
print(" Using MCP_ENDPOINT (ignoring AGENTCORE_RUNTIME_ARN)")
config = {
# OAuth 2.0 Configuration
'discovery_url': os.getenv('OAUTH_DISCOVERY_URL'),
'client_id': os.getenv('OAUTH_CLIENT_ID'),
'client_secret': os.getenv('OAUTH_CLIENT_SECRET'),
'custom_scopes': os.getenv('OAUTH_SCOPES'), # Custom scopes (overrides auto-discovery)
'm2m_scopes': os.getenv('OAUTH_M2M_SCOPES'),
# MCP Server Configuration
'mcp_endpoint': mcp_endpoint,
'agentcore_runtime_arn': agentcore_runtime_arn,
'agentcore_region': os.getenv('AGENTCORE_REGION', 'us-west-2'),
# RFC 8707 Configuration
'enable_rfc8707': os.getenv('OAUTH_ENABLE_RFC8707', 'false').lower() in ('true', '1', 'yes'),
# Test User Configuration (for quick mode)
'test_username': os.getenv('OAUTH_TEST_USERNAME'),
'test_password': os.getenv('OAUTH_TEST_PASSWORD'),
# AWS Profile
'aws_profile': os.getenv('AWS_PROFILE', 'default'),
}
# Auto-detect provider type for better user experience
if 'cognito-idp' in config['discovery_url'].lower() or 'cognito' in config['discovery_url'].lower() or 'amazoncognito' in config['discovery_url'].lower():
print("πŸ›οΈ Detected AWS Cognito OAuth 2.0 provider")
else:
print("πŸ”— Using generic OAuth 2.0 provider")
# Construct MCP server URL
if mcp_endpoint:
config['mcp_server_url'] = mcp_endpoint
print(f"πŸ”— Using direct MCP endpoint: {mcp_endpoint}")
else:
# URL encode the ARN for AgentCore
encoded_arn = agentcore_runtime_arn.replace(':', '%3A').replace('/', '%2F')
config['mcp_server_url'] = f"https://bedrock-agentcore.{config['agentcore_region']}.amazonaws.com/runtimes/{encoded_arn}/invocations?qualifier=DEFAULT"
print(f"πŸ”— Using AgentCore runtime ARN: {agentcore_runtime_arn}")
return config
async def test_manual_oauth_flow(config: dict):
"""Test manual OAuth flow with detailed debugging."""
print(f"\nπŸ”§ === Manual OAuth Flow - Step-by-step debugging ===")
oauth_handler = OAuth2Handler(
config['discovery_url'],
config['client_id'],
resource_server=config['mcp_server_url'],
enable_rfc8707=config.get('enable_rfc8707', False),
custom_scopes=config.get('custom_scopes')
)
try:
# 1. Generate authorization URL
auth_url = await oauth_handler.get_authorization_url()
await handle_redirect_with_browser(auth_url)
# 2. Get authorization code
authorization_code, _ = await handle_callback_interactive()
# 3. Exchange for tokens
tokens = await oauth_handler.exchange_code_for_tokens(authorization_code)
access_token = tokens['access_token']
# 4. Test MCP connection
success = await test_mcp_with_manual_token(config['mcp_server_url'], access_token)
if success:
print(f"\nπŸŽ‰ Manual OAuth flow completely successful!")
print(f"βœ… Access token is valid, MCP server connection normal")
else:
print(f"\n⚠️ OAuth successful but MCP connection failed")
except Exception as e:
print(f"❌ Manual OAuth flow failed: {e}")
import traceback
traceback.print_exc()
async def test_quick_mode(config: dict):
"""Quick mode using existing test user credentials."""
print(f"\nπŸš€ === Quick Mode - Using existing credentials ===")
# Check if credentials are available
if not config['test_username'] or not config['test_password']:
print("❌ Quick mode requires test user credentials")
print("πŸ’‘ Please set environment variables:")
print(" export OAUTH_TEST_USERNAME='your_username'")
print(" export OAUTH_TEST_PASSWORD='your_password'")
print("\nπŸ“ Or check .env.example for configuration reference")
return
# Check if this is AWS Cognito (Quick mode only works with Cognito direct auth)
if 'cognito-idp' not in config['discovery_url'].lower() and 'cognito' not in config['discovery_url'].lower() and 'amazoncognito' not in config['discovery_url'].lower():
print("❌ Quick mode only works with AWS Cognito providers")
print("πŸ’‘ For other OAuth 2.0 providers, use:")
print(" Mode 1 (Manual): Interactive OAuth flow")
print(" Mode 3 (M2M): Client credentials flow")
return
print("Using test user credentials for quick token retrieval...")
try:
# Extract region from discovery URL for Cognito
# New format: https://cognito-idp.us-west-2.amazonaws.com/us-west-2_UserPoolId/.well-known/openid_configuration
# Legacy format: https://domain.auth.us-west-2.amazoncognito.com/.well-known/openid_configuration
import re
# Try new cognito-idp format first
region_match = re.search(r'cognito-idp\.([^.]+)\.amazonaws\.com', config['discovery_url'])
if not region_match:
# Fallback to legacy auth domain format
region_match = re.search(r'\.auth\.([^.]+)\.amazoncognito\.com', config['discovery_url'])
if not region_match:
raise ValueError("Cannot extract AWS region from discovery URL. Quick mode only works with AWS Cognito.")
region = region_match.group(1)
print(f"πŸ“ Extracted AWS region: {region}")
# Use boto3 for quick token retrieval
session = boto3.Session(profile_name=config['aws_profile'])
cognito_client = session.client('cognito-idp', region_name=region)
response = cognito_client.initiate_auth(
ClientId=config['client_id'],
AuthFlow='USER_PASSWORD_AUTH',
AuthParameters={
'USERNAME': config['test_username'],
'PASSWORD': config['test_password']
}
)
access_token = response['AuthenticationResult']['AccessToken']
print(f"βœ… Token retrieval successful: {'*' * 20}...")
# Test MCP connection
success = await test_mcp_with_manual_token(config['mcp_server_url'], access_token)
if success:
print(f"\nπŸŽ‰ Quick mode completely successful!")
print(f"βœ… This confirms your Cognito configuration and MCP server are working properly")
else:
print(f"\n⚠️ Token retrieval successful but MCP connection failed")
except Exception as e:
print(f"❌ Quick mode failed: {e}")
print(f"πŸ’‘ Please ensure:")
print(f" - AWS_PROFILE={config['aws_profile']} points to correct account")
print(f" - Test user {config['test_username']} exists with correct password")
async def test_m2m_mode(config: dict):
"""Test M2M (Machine-to-Machine) authentication with client credentials."""
print(f"\n🏭 === M2M Mode - Machine-to-machine authentication ===")
client_secret = config['client_secret']
if not client_secret:
print("πŸ”‘ M2M authentication requires client secret")
print("πŸ’‘ Set environment variable: export OAUTH_CLIENT_SECRET='your_secret'")
print(" - Enter manually (not recommended for production)")
print()
user_choice = input("Enter client secret manually? (y/N): ").strip().lower()
if user_choice == 'y':
client_secret = input("Please enter client secret: ").strip()
if not client_secret:
print("❌ Client secret cannot be empty")
return
else:
print("❌ No client secret provided, cannot continue M2M authentication")
print("πŸ’‘ Hint: You can generate a secret in AWS Cognito console for your client")
return
try:
oauth_handler = OAuth2Handler(
config['discovery_url'],
config['client_id'],
client_secret,
resource_server=config['mcp_server_url'],
enable_rfc8707=config.get('enable_rfc8707', False),
custom_scopes=config.get('m2m_scopes') # M2M mode uses m2m_scopes
)
# Get M2M token
tokens = await oauth_handler.get_m2m_token()
access_token = tokens['access_token']
# Test MCP connection
success = await test_mcp_with_manual_token(config['mcp_server_url'], access_token)
if success:
print(f"\nπŸŽ‰ M2M mode completely successful!")
print(f"βœ… Client credentials authentication working properly, no user interaction required")
print(f"🏭 This is the ideal authentication method for service-to-service communication")
print(f"⏱️ Token validity: {tokens.get('expires_in', 'unknown')} seconds")
else:
print(f"\n⚠️ M2M authentication successful but MCP connection failed")
except Exception as e:
print(f"❌ M2M authentication failed: {e}")
print(f"\nπŸ”§ Troubleshooting suggestions:")
print(f" 1. Ensure Cognito client has client_credentials flow enabled")
print(f" 2. Verify client secret is correct")
print(f" 3. Check client has appropriate OAuth scopes")
print(f"\nπŸ“š Configuration commands:")
print(f" # Enable client_credentials flow (adjust user-pool-id as needed)")
print(f" AWS_PROFILE={config['aws_profile']} aws cognito-idp update-user-pool-client \\")
print(f" --user-pool-id <your-user-pool-id> \\")
print(f" --client-id {config['client_id']} \\")
print(f" --allowed-o-auth-flows \"client_credentials\" \\")
print(f" --generate-secret")
async def handle_native_sdk_redirect(auth_url: str) -> None:
"""Handle OAuth redirect for native SDK mode."""
print(f"\n🌐 Opening browser for authorization (Native SDK)...")
print(f"πŸ”— Authorization URL: {auth_url}")
try:
webbrowser.open(auth_url)
print(f"βœ… Browser opened, please complete login")
except:
print(f"⚠️ Unable to open browser automatically, please visit manually:")
print(f" {auth_url}")
print(f"\nπŸ“‹ After authorization, you will be redirected to: http://localhost:3000/")
class AgentCoreOAuthClientProvider(OAuthClientProvider):
"""Custom OAuth provider that triggers on 403 (not just 401) for AgentCore compatibility.
Supports both interactive OAuth flows and M2M (client_credentials) flows with automatic
token refresh for both modes.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.is_m2m_mode = False # Will be set after client info is available
def _detect_m2m_mode(self) -> bool:
"""Detect if we're in M2M mode based on client_secret availability."""
return bool(
self.context.client_info and
self.context.client_info.client_secret and
hasattr(self.context, 'client_metadata') and
not hasattr(self.context.client_metadata, 'redirect_uris') or
not self.context.client_metadata.redirect_uris
)
def can_refresh_token(self) -> bool:
"""Check if token can be refreshed - supports both interactive and M2M modes."""
if self.is_m2m_mode:
# M2M can always "refresh" by re-authenticating with client_credentials
return bool(
self.context.client_info and
self.context.client_info.client_secret and
self.context.client_info.client_id
)
else:
# Interactive mode uses parent's refresh_token logic
return super().can_refresh_token()
async def _get_m2m_token(self) -> httpx.Request:
"""Get M2M access token using client_credentials flow."""
if not self.context.client_info or not self.context.client_info.client_secret:
raise OAuthTokenError("Client secret required for M2M authentication")
# Use discovered token endpoint or fallback
if self.context.oauth_metadata and self.context.oauth_metadata.token_endpoint:
token_url = str(self.context.oauth_metadata.token_endpoint)
else:
auth_base_url = self.context.get_authorization_base_url(self.context.auth_server_url or self.context.server_url)
token_url = urljoin(auth_base_url, "/token")
token_data = {
"grant_type": "client_credentials",
"client_id": self.context.client_info.client_id,
"client_secret": self.context.client_info.client_secret,
}
# Add scope if specified
if self.context.client_metadata.scope:
token_data["scope"] = self.context.client_metadata.scope
# Add resource parameter if conditions are met (RFC 8707)
if self.context.should_include_resource_param(self.context.protocol_version):
token_data["resource"] = self.context.get_resource_url()
# Debug logging for M2M request
print(f"πŸ”„ Preparing M2M token request:")
print(f" Token endpoint: {token_url}")
print(f" Grant type: {token_data['grant_type']}")
print(f" Client ID: {token_data['client_id']}")
print(f" Client secret: {'*' * 20}... (redacted)")
print(f" Scope: {token_data.get('scope', 'none')}")
if 'resource' in token_data:
print(f" Resource: {token_data['resource']}")
return httpx.Request(
"POST",
token_url,
data=token_data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
async def _handle_m2m_token_response(self, response: httpx.Response) -> None:
"""Handle M2M token response with detailed error logging."""
print(f"πŸ”„ M2M token response: {response.status_code}")
if response.status_code != 200:
# Get detailed error information
try:
error_body = await response.aread()
error_text = error_body.decode('utf-8')
print(f"❌ M2M token request failed:")
print(f" Status code: {response.status_code}")
print(f" Error response: {error_text}")
# Try to parse as JSON for structured error info
try:
import json
error_json = json.loads(error_text)
if 'error' in error_json:
print(f" Error type: {error_json.get('error')}")
print(f" Error description: {error_json.get('error_description', 'N/A')}")
except json.JSONDecodeError:
print(f" Raw error (not JSON): {error_text}")
except Exception as e:
print(f" Could not read error response: {e}")
# Re-raise with original error for parent handling
raise OAuthTokenError(f"M2M token request failed: {response.status_code}")
# Success case - delegate to parent for token parsing
await super()._handle_token_response(response)
async def _refresh_token(self) -> httpx.Request:
"""Build token refresh request - routes to M2M or interactive flow."""
if self.is_m2m_mode:
# For M2M, "refresh" means getting a new token with client_credentials
return await self._get_m2m_token()
else:
# For interactive, use parent's refresh_token flow
return await super()._refresh_token()
async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx.Request, httpx.Response]:
"""HTTPX auth flow integration with 403 support and M2M mode."""
async with self.context.lock:
if not self._initialized:
await self._initialize()
# Detect M2M mode after initialization
self.is_m2m_mode = bool(
self.context.client_info and
self.context.client_info.client_secret
)
# Capture protocol version from request headers
self.context.protocol_version = request.headers.get(MCP_PROTOCOL_VERSION)
if not self.context.is_token_valid() and self.context.can_refresh_token():
# Try to refresh token (routes to appropriate flow based on mode)
refresh_request = await self._refresh_token()
refresh_response = yield refresh_request
if not await self._handle_refresh_response(refresh_response):
# Refresh failed, need full re-authentication
self._initialized = False
if self.context.is_token_valid():
self._add_auth_header(request)
response = yield request
# CUSTOM FIX: Trigger OAuth flow on 403 OR 401 (AgentCore returns 403)
if response.status_code in (401, 403):
# Perform full OAuth flow (same as original, but triggered on 403 too)
try:
# OAuth flow must be inline due to generator constraints
# Step 1: Skip protected resource discovery since we manually configured it
# Step 2: Discover OAuth metadata (with fallback for legacy servers)
discovery_urls = self._get_discovery_urls()
for url in discovery_urls:
oauth_metadata_request = self._create_oauth_metadata_request(url)
oauth_metadata_response = yield oauth_metadata_request
if oauth_metadata_response.status_code == 200:
try:
await self._handle_oauth_metadata_response(oauth_metadata_response)
break
except ValidationError:
continue
elif oauth_metadata_response.status_code < 400 or oauth_metadata_response.status_code >= 500:
break # Non-4XX error, stop trying
# Step 3: Register client if needed
registration_request = await self._register_client()
if registration_request:
registration_response = yield registration_request
await self._handle_registration_response(registration_response)
# Step 4: Perform authorization - different for M2M vs interactive
if self.is_m2m_mode:
# M2M mode: Use client_credentials directly, no browser interaction
token_request = await self._get_m2m_token()
token_response = yield token_request
await self._handle_m2m_token_response(token_response)
else:
# Interactive mode: Use authorization code flow
auth_code, code_verifier = await self._perform_authorization()
# Step 5: Exchange authorization code for tokens
token_request = await self._exchange_token(auth_code, code_verifier)
token_response = yield token_request
await self._handle_token_response(token_response)
except Exception:
logger.exception("OAuth flow error")
raise
# Retry with new tokens
self._add_auth_header(request)
yield request
async def handle_native_sdk_callback() -> tuple[str, str | None]:
"""Handle OAuth callback for native SDK mode."""
print(f"\nπŸ“‹ After authorization completion, copy the full URL from browser address bar:")
print(f"πŸ’‘ Hint: URL should start with 'http://localhost:3000/?code='")
while True:
callback_url = input("\nCallback URL: ").strip()
if not callback_url:
print("❌ URL cannot be empty, please try again")
continue
try:
parsed_url = urlparse(callback_url)
params = parse_qs(parsed_url.query)
if "code" not in params:
print("❌ Authorization code not found in URL (missing code parameter)")
print("πŸ’‘ Please ensure you copied the complete redirect URL")
continue
authorization_code = params["code"][0]
state = params.get("state", [None])[0]
print(f"βœ… Authorization code parsed: {authorization_code[:20]}...")
if state:
print(f" State parameter: {state}")
return authorization_code, state
except Exception as e:
print(f"❌ URL parsing error: {e}")
print("πŸ’‘ Please check URL format")
continue
async def test_native_sdk_oauth_flow(config: dict):
"""Test native MCP SDK OAuth flow with auto-detection of M2M vs interactive mode."""
# Detect M2M mode based on client_secret presence
is_m2m_mode = bool(config.get('client_secret'))
if is_m2m_mode:
print(f"\n🏭 === Native SDK Mode (M2M) - Client Credentials Flow ===")
print(f"πŸ”§ Detected client_secret - using M2M authentication")
print(f"πŸš€ No user interaction required - fully automated")
# Remind user about M2M scopes configuration
m2m_scopes = config.get('m2m_scopes')
if m2m_scopes:
print(f"🎯 Using configured M2M scopes: {m2m_scopes}")
else:
print(f"⚠️ No M2M scopes configured - authentication may fail!")
print(f"πŸ’‘ Add OAUTH_M2M_SCOPES=\"mcp-server/read mcp-server/write\" to .env if needed")
else:
print(f"\nπŸ” === Native SDK Mode (Interactive) - Authorization Code Flow ===")
print(f"πŸ”§ No client_secret detected - using interactive authentication")
print(f"🌐 Browser-based user authentication required")
try:
# Prepare redirect URIs (required by constructor but not used in M2M mode)
redirect_uris = [
AnyUrl("http://localhost:3000")
]
# Determine appropriate scopes based on provider and mode
# Priority: custom_scopes > m2m_scopes (for M2M) > defaults
if is_m2m_mode:
# M2M mode: use m2m_scopes
scope = config.get('m2m_scopes')
if scope:
print(f"πŸ”§ Using configured M2M scopes: {scope}")
else:
if 'cognito-idp' in config['discovery_url'].lower() or 'cognito' in config['discovery_url'].lower() or 'amazoncognito' in config['discovery_url'].lower():
print("πŸ”§ Detected AWS Cognito M2M - no scopes configured (may work for some setups)")
print("πŸ’‘ Tip: Set OAUTH_M2M_SCOPES in .env if authentication fails")
else:
print("πŸ”§ Using generic M2M without scopes")
print("πŸ’‘ Tip: Set OAUTH_M2M_SCOPES in .env if authentication fails")
scope = None
else:
# Interactive mode: use custom_scopes if configured, otherwise use defaults
if config.get('custom_scopes'):
scope = config['custom_scopes']
print(f"πŸ”§ Using configured custom scopes: {scope}")
elif 'cognito-idp' in config['discovery_url'].lower() or 'cognito' in config['discovery_url'].lower() or 'amazoncognito' in config['discovery_url'].lower():
scope = 'openid email'
print("πŸ”§ Detected AWS Cognito Interactive - using Cognito-specific scopes")
else:
scope = 'openid email profile'
print("πŸ”§ Using standard OpenID Connect scopes")
# Note: OAuthClientMetadata enforces interactive OAuth constraints
# For M2M mode, we use interactive-compatible metadata but override behavior in the provider
if is_m2m_mode:
client_name = "MCP AgentCore OAuth Client (Native SDK - M2M)"
print(f" Note: Using interactive-compatible metadata, M2M logic handled in provider")
else:
client_name = "MCP AgentCore OAuth Client (Native SDK - Interactive)"
# Create OAuth client metadata (always use interactive-compatible values)
client_metadata = OAuthClientMetadata(
client_name=client_name,
redirect_uris=redirect_uris, # Always provide redirect URIs (required by validation)
grant_types=["authorization_code", "refresh_token"], # Always use interactive grant types
response_types=["code"], # Always provide response types
scope=scope,
)
print(f"πŸ“‹ Client metadata configured:")
print(f" Client name: {client_metadata.client_name}")
print(f" Grant types: {client_metadata.grant_types}")
print(f" Scope: {client_metadata.scope}")
if not is_m2m_mode:
print(f" Redirect URIs: {[str(uri) for uri in client_metadata.redirect_uris]}")
# Create token storage
token_storage = DebugTokenStorage()
# Pre-configure client info to skip registration (AWS Cognito doesn't support dynamic registration)
print(f"πŸ”§ Pre-configuring client info to skip registration...")
# We'll populate the endpoints after OAuth metadata discovery, but set basic info now
from mcp.shared.auth import OAuthClientInformationFull
client_info = OAuthClientInformationFull(
client_id=config['client_id'],
client_secret=config.get('client_secret'),
authorization_endpoint="", # Will be populated during OAuth metadata discovery
token_endpoint="", # Will be populated during OAuth metadata discovery
redirect_uris=redirect_uris
)
await token_storage.set_client_info(client_info)
print(f" βœ… Client info pre-configured with ID: {config['client_id']}")
if is_m2m_mode:
print(f" πŸ”‘ Client secret configured for M2M authentication")
# Create dummy handlers for M2M mode (required by constructor but won't be used)
async def dummy_redirect_handler(auth_url: str) -> None:
"""Dummy handler for M2M mode - should never be called."""
print(f"⚠️ WARNING: Redirect handler called in M2M mode - this shouldn't happen!")
async def dummy_callback_handler() -> tuple[str, str | None]:
"""Dummy handler for M2M mode - should never be called."""
print(f"⚠️ WARNING: Callback handler called in M2M mode - this shouldn't happen!")
return "", None
# Choose appropriate handlers based on mode
if is_m2m_mode:
redirect_handler = dummy_redirect_handler
callback_handler = dummy_callback_handler
else:
redirect_handler = handle_native_sdk_redirect
callback_handler = handle_native_sdk_callback
# Create custom OAuth client provider with 403 support and M2M compatibility
oauth_auth = AgentCoreOAuthClientProvider(
server_url=config['mcp_server_url'], # Use MCP server URL
client_metadata=client_metadata,
storage=token_storage,
redirect_handler=redirect_handler,
callback_handler=callback_handler,
)
# Extract OAuth server URL from discovery URL
oauth_server_url = config['discovery_url'].replace('/.well-known/openid_configuration', '').replace('/.well-known/openid-configuration', '')
# Only set protected resource metadata if RFC 8707 is enabled
# For providers like Cognito that don't support RFC 8707, skip this
if config.get('enable_rfc8707', False):
print(f"\nπŸ”§ RFC 8707 ENABLED: Configuring protected resource metadata...")
from mcp.shared.auth import ProtectedResourceMetadata
from pydantic import AnyUrl as PydanticUrl
# Create protected resource metadata
protected_metadata = ProtectedResourceMetadata(
resource=PydanticUrl(config['mcp_server_url']),
authorization_servers=[PydanticUrl(oauth_server_url)]
)
print(f" Resource: {protected_metadata.resource}")
print(f" Authorization servers: {[str(s) for s in protected_metadata.authorization_servers]}")
# Manually inject the metadata into the OAuth context
oauth_auth.context.protected_resource_metadata = protected_metadata
else:
print(f"\nπŸ”§ RFC 8707 DISABLED: Skipping resource parameter (for Cognito compatibility)")
# Always set auth_server_url for OAuth metadata discovery
oauth_auth.context.auth_server_url = oauth_server_url
print(f"\nπŸ”„ Testing native SDK OAuth with custom AgentCore provider...")
print(f" MCP Server URL: {config['mcp_server_url']}")
print(f" OAuth Server URL: {oauth_server_url}")
print(f" Custom feature: Triggers OAuth flow on 403 (not just 401)")
# Use the OAuth provider with streamable HTTP client
async with streamablehttp_client(config['mcp_server_url'], auth=oauth_auth) as (read, write, _):
print("βœ… MCP connection with native SDK OAuth successful!")
async with ClientSession(read, write) as session:
print("πŸ”„ Initializing session...")
try:
await session.initialize()
print("βœ… Session initialization successful!")
# List available tools
print("πŸ” Listing available tools...")
tools_result = await session.list_tools()
tools = tools_result.tools
if tools:
print(f"\nπŸ“š Found {len(tools)} tools:")
for i, tool in enumerate(tools, 1):
print(f" {i}. {tool.name}")
if tool.description:
print(f" Description: {tool.description[:100]}...")
# Use shared tool tester for dynamic invocation
await invoke_test_tool(session, tools)
else:
print("\n❌ No available tools")
if is_m2m_mode:
print(f"\nπŸŽ‰ Native SDK M2M OAuth flow completely successful!")
print(f"βœ… Using official MCP SDK OAuth implementation (M2M mode)")
print(f"🏭 Client credentials authentication working properly")
print(f"πŸš€ No user interaction required - fully automated")
print(f"πŸ”„ Automatic token refresh via client_credentials")
else:
print(f"\nπŸŽ‰ Native SDK Interactive OAuth flow completely successful!")
print(f"βœ… Using official MCP SDK OAuth implementation (Interactive mode)")
print(f"🌐 Browser-based authentication completed")
print(f"πŸ”„ Automatic token refresh via refresh_token")
print(f"πŸ”§ Custom AgentCore compatibility (403 β†’ OAuth trigger)")
print(f"πŸ” Protected resource metadata configured for cross-domain")
print(f"πŸš€ Automatic token refresh and session management enabled")
return True
except Exception as init_error:
print(f"❌ Session initialization failed: {init_error}")
print(f" Error type: {type(init_error).__name__}")
# Try to get more details
import traceback
print("πŸ” Full error traceback:")
traceback.print_exc()
return False
except Exception as e:
print(f"❌ Native SDK OAuth flow failed: {e}")
print(f" Error type: {type(e).__name__}")
import traceback
print("πŸ” Full error traceback:")
traceback.print_exc()
print(f"\nπŸ”§ Troubleshooting suggestions:")
print(f" 1. Check that client ID and discovery URL are correct")
if is_m2m_mode:
print(f" 2. Verify client_secret is valid and properly configured")
print(f" 3. Ensure client_credentials grant is enabled in OAuth provider")
print(f" 4. Check that client has appropriate scopes for M2M access")
else:
print(f" 2. Verify redirect URI is configured in OAuth provider")
print(f" 3. Complete the OAuth flow in browser when prompted")
print(f" 4. Ensure authorization_code grant is enabled")
print(f" 5. Ensure AWS credentials are valid for the MCP server")
print(f" 6. This implementation handles AgentCore's 403 responses correctly")
return False
def print_config(config: dict):
"""Print current configuration."""
print("πŸ“‹ Configuration:")
print(f" OAuth Discovery URL: {config['discovery_url']}")
print(f" Client ID: {config['client_id']}")
print(f" MCP Server URL: {config['mcp_server_url']}")
# Show endpoint source
if config.get('mcp_endpoint'):
print(f" Endpoint Source: Direct MCP_ENDPOINT")
elif config.get('agentcore_runtime_arn'):
print(f" Endpoint Source: AgentCore Runtime ARN")
print(f" Region: {config['agentcore_region']}")
print(f" AWS Profile: {config['aws_profile']}")
# Show RFC 8707 status
if config.get('enable_rfc8707'):
print(f" RFC 8707: βœ… ENABLED (resource parameter in OAuth requests)")
else:
print(f" RFC 8707: ❌ DISABLED (set OAUTH_ENABLE_RFC8707=true to enable)")
print()
async def main():
"""Main function demonstrating OAuth 2.0 flows with any provider."""
print("Generic OAuth 2.0 MCP Client")
print("=" * 60)
# Load configuration
config = load_config()
print_config(config)
# Choose authentication mode
print("Select OAuth authentication mode:")
print("1. πŸ”§ Manual Mode - Manual OAuth flow (step-by-step debugging)")
print("2. πŸš€ Quick Mode - Using existing user credentials")
print("3. 🏭 M2M Mode - Machine-to-machine authentication (client_credentials)")
print("4. πŸ” Native SDK Mode - MCP SDK OAuth (Auto-detects M2M vs Interactive)")
choice = input("\nSelect mode (1/2/3/4) [default: 2]: ").strip() or "2"
if choice == "1":
await test_manual_oauth_flow(config)
elif choice == "2":
await test_quick_mode(config)
elif choice == "3":
await test_m2m_mode(config)
elif choice == "4":
await test_native_sdk_oauth_flow(config)
else:
print("❌ Invalid selection")
if __name__ == "__main__":
asyncio.run(main())
#!/usr/bin/env python3
"""
Shared MCP Session Management and Tool Testing Utilities
This module provides shared utilities for:
- Loading environment configuration
- Managing MCP sessions
- Listing tools and resources
- Testing tool invocations dynamically
"""
import json
import os
from typing import Any, Optional
from mcp import ClientSession
# ==============================================================================
# Environment and Configuration Management
# ==============================================================================
def load_env_files() -> bool:
"""
Load environment variables from .env.local or .env files if available.
Returns:
bool: True if a .env file was loaded, False otherwise
"""
try:
from dotenv import load_dotenv
# Try multiple .env file names in priority order
for env_file in ['.env.local', '.env']:
if os.path.exists(env_file):
load_dotenv(env_file)
print(f"πŸ“ Loaded environment from {env_file}")
return True
return False
except ImportError:
# dotenv not available, user needs to manually load environment
print("πŸ’‘ Hint: Install python-dotenv for automatic .env file loading: pip install python-dotenv")
print(" Or manually load environment: source .env.local")
return False
def get_test_tool_config() -> tuple[str | None, dict | None]:
"""
Get test tool configuration from environment variables.
Returns:
tuple: (tool_name, tool_params) or (None, None) if not configured
"""
test_tool_name = os.getenv('MCP_TEST_TOOL_NAME')
test_tool_params_str = os.getenv('MCP_TEST_TOOL_PARAMS')
if not test_tool_name or not test_tool_params_str:
return None, None
try:
params = json.loads(test_tool_params_str)
return test_tool_name, params
except json.JSONDecodeError as e:
print(f"❌ Invalid JSON in MCP_TEST_TOOL_PARAMS: {e}")
print(f" Value: {test_tool_params_str}")
return None, None
def print_tool_invocation_help():
"""Print help message for tool invocation configuration."""
print("\nπŸ’‘ To test tool invocation, set environment variables:")
print(" export MCP_TEST_TOOL_NAME='tool_name'")
print(" export MCP_TEST_TOOL_PARAMS='{\"param1\": \"value1\", \"param2\": \"value2\"}'")
print("\n Or add to .env.local:")
print(" MCP_TEST_TOOL_NAME=tool_name")
print(' MCP_TEST_TOOL_PARAMS={"param1": "value1"}')
# ==============================================================================
# Tool Invocation
# ==============================================================================
async def invoke_test_tool(session: Any, tools: list) -> bool:
"""
Invoke a test tool dynamically based on environment variables.
Args:
session: MCP ClientSession instance
tools: List of available tools from session.list_tools()
Returns:
bool: True if invocation was successful, False otherwise
"""
test_tool_name, test_tool_params = get_test_tool_config()
if not test_tool_name or test_tool_params is None:
print("ℹ️ No test tool configured (MCP_TEST_TOOL_NAME and MCP_TEST_TOOL_PARAMS not set)")
return False
print(f"\nπŸ”§ Dynamic tool invocation requested: {test_tool_name}")
# Check if the requested tool exists
tool_names = [tool.name for tool in tools]
if test_tool_name not in tool_names:
print(f"❌ Tool '{test_tool_name}' not found in available tools")
print(f" Available tools: {tool_names}")
return False
try:
print(f"πŸš€ Invoking tool '{test_tool_name}' with parameters: {test_tool_params}")
# Call the tool
result = await session.call_tool(test_tool_name, test_tool_params)
print(f"βœ… Tool invocation successful!")
print(f"πŸ“„ Result: {result}")
return True
except Exception as te:
print(f"❌ Tool invocation failed: {te}")
print(f" Error type: {type(te).__name__}")
import traceback
print("πŸ” Tool invocation error traceback:")
traceback.print_exc()
return False
# ==============================================================================
# Session Management
# ==============================================================================
async def list_and_display_tools(session: ClientSession, verbose: bool = False) -> list:
"""
List and display available MCP tools.
Args:
session: MCP ClientSession instance
verbose: If True, display full descriptions and schemas
Returns:
list: List of available tools
"""
print("\nπŸ” Listing available tools...")
tools_result = await session.list_tools()
tools = tools_result.tools
if tools:
print(f"\nπŸ“š Found {len(tools)} tools:")
for i, tool in enumerate(tools, 1):
print(f" {i}. {tool.name}")
if tool.description:
if verbose:
# Print full description
print(f" Description: {tool.description}")
else:
# Print truncated description
description = tool.description[:100] + "..." if len(tool.description) > 100 else tool.description
print(f" Description: {description}")
# Print input schema if available and verbose mode
if verbose and hasattr(tool, 'inputSchema') and tool.inputSchema:
schema_str = json.dumps(tool.inputSchema, indent=6)
print(f" Input Schema: {schema_str}")
if verbose:
print() # Add blank line between tools in verbose mode
else:
print("πŸ“­ No tools available on this server")
return tools
async def list_and_display_resources(session: ClientSession, verbose: bool = False) -> Optional[list]:
"""
List and display available MCP resources.
Args:
session: MCP ClientSession instance
verbose: If True, display full descriptions
Returns:
Optional[list]: List of available resources or None if not supported
"""
print("\nπŸ“‚ Listing available resources...")
try:
resources_result = await session.list_resources()
resources = resources_result.resources
if resources:
print(f"\nπŸ“ Found {len(resources)} resources:")
for i, resource in enumerate(resources, 1):
print(f" {i}. {resource.uri}")
if resource.name:
print(f" Name: {resource.name}")
if resource.description:
if verbose:
print(f" Description: {resource.description}")
else:
description = resource.description[:100] + "..." if len(resource.description) > 100 else resource.description
print(f" Description: {description}")
if verbose:
print() # Add blank line in verbose mode
else:
print("πŸ“­ No resources available on this server")
return resources
except Exception as e:
if verbose:
print(f"⚠️ Could not list resources: {e}")
else:
print(f"⚠️ Could not list resources: {str(e)[:50]}")
return None
async def initialize_and_test_session(
session: ClientSession,
verbose: bool = False,
invoke_tool_callback: Any = None
) -> bool:
"""
Initialize MCP session and test basic operations.
Args:
session: MCP ClientSession instance
verbose: If True, display detailed information
invoke_tool_callback: Optional async callback(session, tools) for dynamic tool invocation
Returns:
bool: True if session operations successful, False otherwise
"""
print("πŸ”„ Initializing MCP session...")
try:
# Initialize the session
await session.initialize()
print("βœ… Session initialization successful!")
# List available tools
tools = await list_and_display_tools(session, verbose)
# Invoke test tool if callback provided
if invoke_tool_callback and tools:
await invoke_tool_callback(session, tools)
# List available resources
await list_and_display_resources(session, verbose)
return True
except Exception as init_error:
print(f"❌ Session initialization failed: {init_error}")
if verbose:
print(f" Error type: {type(init_error).__name__}")
import traceback
print("πŸ” Full error traceback:")
traceback.print_exc()
return False
def print_session_summary(success: bool, server_info: Optional[dict] = None, verbose: bool = False):
"""
Print session test summary.
Args:
success: Whether the session test was successful
server_info: Optional server information from initialization
verbose: If True, display detailed information
"""
print("\n" + "=" * 50)
if success:
print("βœ… MCP connection with SigV4 authentication successful!")
print("πŸŽ‰ AgentCore MCP server is accessible and responding correctly.")
if server_info and verbose:
print(f"\nπŸ“Š Server Information:")
print(f" Name: {server_info.get('name', 'Unknown')}")
print(f" Version: {server_info.get('version', 'Unknown')}")
print("You can now use this client pattern to integrate with MCP-enabled applications.")
else:
print("❌ MCP connection with SigV4 authentication failed")
print("Please check your AWS credentials, AgentCore ARN/endpoint, and network connectivity.")
if verbose:
print("\nπŸ”§ Troubleshooting tips:")
print(" 1. Verify AWS credentials: aws sts get-caller-identity")
print(" 2. Check IAM permissions for bedrock-agentcore:InvokeGateway or InvokeRuntime")
print(" 3. Verify endpoint URL or ARN format")
print(" 4. Run with --verbose for detailed error information")
#!/usr/bin/env python3
"""
MCP Client for AWS AgentCore using SigV4 Authentication
This script uses AWS SigV4 authentication (the default for AgentCore)
instead of JWT/OAuth tokens.
"""
import argparse
import asyncio
import boto3
import os
import sys
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
import httpx
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
# Import shared utilities
from mcp_session_manager import (
load_env_files,
invoke_test_tool,
initialize_and_test_session,
print_session_summary
)
# Try to load .env files automatically
load_env_files()
class HTTPXSigV4Auth(httpx.Auth):
"""HTTPX authentication handler for AWS SigV4."""
def __init__(self, credentials, service: str, region: str):
self.credentials = credentials
self.service = service
self.region = region
def auth_flow(self, request: httpx.Request):
"""Implement the authentication flow for SigV4."""
# For HTTPX, we need to read the actual content that will be sent
# HTTPX may not have populated request.content yet, so we need to trigger content preparation
if hasattr(request, 'read'):
try:
# Try to read the content if it's available
body = request.read()
except Exception:
# Fallback to manual content handling
body = self._get_request_body(request)
else:
body = self._get_request_body(request)
# Create botocore AWS request with exact same data
aws_request = AWSRequest(
method=request.method,
url=str(request.url),
data=body,
headers={}
)
# Use only minimal essential headers required for SigV4
# AWS SigV4 requires Host header and any headers that will actually be sent
essential_headers = {
'Host': request.url.host
}
# Add Content-Type and Content-Length for POST requests
if request.method == 'POST' and body:
essential_headers['Content-Type'] = 'application/json'
essential_headers['Content-Length'] = str(len(body))
# Set headers on AWS request
for name, value in essential_headers.items():
aws_request.headers[name] = value
# Calculate and set content hash - required for bedrock-agentcore
import hashlib
content_hash = hashlib.sha256(body).hexdigest()
aws_request.headers['X-Amz-Content-Sha256'] = content_hash
# Sign the request
signer = SigV4Auth(self.credentials, self.service, self.region)
signer.add_auth(aws_request)
# Update the HTTPX request with all signed headers
for name, value in aws_request.headers.items():
request.headers[name] = value
yield request
def _get_request_body(self, request: httpx.Request) -> bytes:
"""Extract request body content for signing."""
body = b''
# Check for content attribute first
if hasattr(request, 'content') and request.content is not None:
if isinstance(request.content, bytes):
body = request.content
elif isinstance(request.content, str):
body = request.content.encode('utf-8')
else:
# For other types (like dict), serialize to JSON
import json
body = json.dumps(request.content).encode('utf-8')
# If no content, check if there's a stream
elif hasattr(request, 'stream') and request.stream is not None:
# For streaming content, we need to read it
if hasattr(request.stream, '__iter__'):
try:
body = b''.join(request.stream)
except Exception:
# If we can't read stream, assume empty body
body = b''
return body
class SigV4AgentCoreMCPClient:
"""MCP client for AWS AgentCore using SigV4 authentication."""
def __init__(self, region: str = "us-west-2", profile_name: str = None,
agent_runtime_arn: str = None, mcp_endpoint: str = None,
verbose: bool = False):
"""
Initialize the MCP client.
Args:
region: AWS region (default: us-west-2)
profile_name: AWS profile name (default: from AWS_PROFILE env var or 'default')
agent_runtime_arn: AgentCore runtime ARN (for runtime mode)
mcp_endpoint: Direct MCP endpoint URL (for gateway mode)
verbose: Enable verbose output (default: False)
"""
self.agent_runtime_arn = agent_runtime_arn
self.mcp_endpoint = mcp_endpoint
self.region = region
self.verbose = verbose
# Use specified profile, or AWS_PROFILE env var, or 'default'
if profile_name is None:
profile_name = os.getenv('AWS_PROFILE', 'default')
self.session = boto3.Session(profile_name=profile_name)
self.credentials = self.session.get_credentials()
def get_mcp_url(self) -> str:
"""Generate the proper MCP URL for AgentCore."""
# If direct endpoint is provided, use it
if self.mcp_endpoint:
return self.mcp_endpoint
# Otherwise, construct URL from runtime ARN
if self.agent_runtime_arn:
# Use the full ARN in the URL path as per AWS documentation
# URL encode the ARN for safe inclusion in URL path
encoded_arn = self.agent_runtime_arn.replace(':', '%3A').replace('/', '%2F')
return f"https://bedrock-agentcore.{self.region}.amazonaws.com/runtimes/{encoded_arn}/invocations?qualifier=DEFAULT"
raise ValueError("Either agent_runtime_arn or mcp_endpoint must be provided")
def get_auth_handler(self) -> HTTPXSigV4Auth:
"""Create SigV4 auth handler."""
return HTTPXSigV4Auth(self.credentials, 'bedrock-agentcore', self.region)
async def test_simple_http_auth(self):
"""Test simple HTTP request with SigV4 auth first."""
mcp_url = self.get_mcp_url()
if self.verbose:
print(f"πŸ§ͺ Testing simple HTTP auth to: {mcp_url}")
# Create SigV4 auth handler
auth = self.get_auth_handler()
# Simple test payload
test_payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {"roots": {"listChanged": False}},
"clientInfo": {"name": "test-client", "version": "1.0.0"}
}
}
if self.verbose:
# Debug: Try with no auth first
print("πŸ” Testing without auth first...")
async with httpx.AsyncClient() as client:
try:
response = await client.post(
mcp_url,
json=test_payload,
headers={'Accept': 'application/json, text/event-stream'},
timeout=30.0
)
print(f"No auth response: {response.status_code} - {response.text[:100]}...")
except Exception as e:
print(f"No auth test failed: {e}")
print("πŸ” Testing with SigV4 auth...")
async with httpx.AsyncClient(auth=auth) as client:
try:
response = await client.post(
mcp_url,
json=test_payload,
headers={'Accept': 'application/json, text/event-stream'},
timeout=30.0
)
if response.status_code == 200:
if self.verbose:
print(f"βœ… HTTP Response: {response.status_code}")
print(f"βœ… Response: {response.text[:200]}...")
return True
else:
print(f"❌ HTTP Response: {response.status_code}")
print(f"❌ Error: {response.text}")
return False
except Exception as e:
print(f"❌ HTTP request failed: {e}")
if self.verbose:
import traceback
traceback.print_exc()
return False
async def test_mcp_connection(self):
"""Test MCP connection with SigV4 authentication using streamablehttp_client."""
mcp_url = self.get_mcp_url()
if self.verbose:
print(f"πŸ”— Connecting to MCP server: {mcp_url}")
print("πŸ”‘ Using streamablehttp_client with SigV4 auth handler")
# Create SigV4 auth handler
auth = self.get_auth_handler()
# Create headers - let MCP client handle Content-Type
headers = {
'Accept': 'application/json, text/event-stream'
}
try:
async with streamablehttp_client(
url=mcp_url,
headers=headers,
auth=auth,
timeout=30.0
) as (read, write, _):
if not self.verbose:
print("βœ… MCP connection established!")
async with ClientSession(read, write) as session:
# Use shared session manager for all session operations
success = await initialize_and_test_session(
session,
verbose=self.verbose,
invoke_tool_callback=invoke_test_tool
)
return success
except Exception as e:
print(f"❌ MCP connection failed: {e}")
if self.verbose:
import traceback
print(f"Full error details: {traceback.format_exc()}")
return False
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="MCP Client for AWS AgentCore using SigV4 Authentication",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Using runtime ARN:
python mcp_sigv4_client.py --agent-runtime-arn arn:aws:bedrock-agentcore:us-west-2:123456789012:runtime/my-agent
python mcp_sigv4_client.py --agent-runtime-arn arn:aws:bedrock-agentcore:us-east-1:123456789012:runtime/my-agent --region us-east-1
# Using direct MCP gateway endpoint:
python mcp_sigv4_client.py --mcp-endpoint https://GATEWAY_NAME.gateway.bedrock-agentcore.us-west-2.amazonaws.com/mcp
"""
)
parser.add_argument(
"--agent-runtime-arn",
help="AWS AgentCore runtime ARN (e.g., arn:aws:bedrock-agentcore:us-west-2:123456789012:runtime/my-agent)"
)
parser.add_argument(
"--mcp-endpoint",
help="Direct MCP endpoint URL (e.g., https://GATEWAY_NAME.gateway.bedrock-agentcore.us-west-2.amazonaws.com/mcp)"
)
parser.add_argument(
"--region", "-r",
default="us-west-2",
help="AWS region (default: us-west-2)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose output (show full tool descriptions, schemas, and debug traces)"
)
args = parser.parse_args()
# Validate that exactly one of agent-runtime-arn or mcp-endpoint is specified
if args.agent_runtime_arn and args.mcp_endpoint:
parser.error("Cannot specify both --agent-runtime-arn and --mcp-endpoint. Please use only one.")
if not args.agent_runtime_arn and not args.mcp_endpoint:
parser.error("Must specify either --agent-runtime-arn or --mcp-endpoint")
return args
async def main():
"""Main function to test SigV4 authentication."""
args = parse_args()
print("AWS AgentCore MCP Client with SigV4 Authentication")
print("==================================================")
# Validate ARN format if using runtime ARN
if args.agent_runtime_arn:
if not args.agent_runtime_arn.startswith("arn:aws:bedrock-agentcore:"):
print(f"❌ Invalid ARN format: {args.agent_runtime_arn}")
print("ARN should start with: arn:aws:bedrock-agentcore:")
return
print(f"Agent Runtime ARN: {args.agent_runtime_arn}")
print(f"Region: {args.region}")
else:
print(f"MCP Endpoint: {args.mcp_endpoint}")
print(f"Region: {args.region}")
# Check AWS credentials using AWS_PROFILE env var or 'default' profile
profile_name = os.getenv('AWS_PROFILE', 'default')
print(f"Using AWS profile: {profile_name}")
try:
session = boto3.Session(profile_name=profile_name)
credentials = session.get_credentials()
if not credentials:
print("❌ AWS credentials not found!")
print("Please configure AWS credentials using:")
print(" aws configure")
print(" or set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY")
print(" or set AWS_PROFILE environment variable")
return
# Get caller identity to verify credentials
sts = session.client('sts')
identity = sts.get_caller_identity()
print(f"βœ“ AWS credentials configured for: {identity.get('Arn', 'Unknown')}")
except Exception as e:
print(f"❌ AWS credentials error: {e}")
return
# Test simple HTTP auth first
if not args.verbose:
print("\nTesting simple HTTP authentication...")
client = SigV4AgentCoreMCPClient(
region=args.region,
profile_name=profile_name,
agent_runtime_arn=args.agent_runtime_arn,
mcp_endpoint=args.mcp_endpoint,
verbose=args.verbose
)
http_success = await client.test_simple_http_auth()
if not http_success:
print("❌ Simple HTTP auth failed, skipping MCP test")
return
# Test the MCP connection with proper SigV4 transport
if not args.verbose:
print("\nTesting MCP connection with SigV4 authentication...")
success = await client.test_mcp_connection()
# Use shared summary printer
print_session_summary(success, verbose=args.verbose)
if __name__ == "__main__":
# Check if required packages are available
try:
import boto3
import httpx
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
except ImportError as e:
print(f"❌ Required package missing: {e}")
print("Install with: pip install boto3 httpx mcp")
sys.exit(1)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment