Last active
October 1, 2025 04:56
-
-
Save siddharthkrish/fb6629d89f93448b4f9062759611871e to your computer and use it in GitHub Desktop.
Get Amazon Bedrock usage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Usage: | |
| Install required packages: | |
| pip install boto3 pandas | |
| Configure AWS credentials: | |
| aws configure or any other method you use | |
| Run the script: | |
| python bedrock_usage.py | |
| """ | |
| import boto3 | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| from collections import defaultdict | |
| def get_bedrock_usage(days=30, group_by='cost'): | |
| """ | |
| Scrape Amazon Bedrock usage and display by model. | |
| Args: | |
| days: Number of days to look back (default: 30) | |
| group_by: 'cost' or 'tokens' (default: 'cost') | |
| Returns: | |
| DataFrame with usage by model | |
| """ | |
| # Initialize clients | |
| ce_client = boto3.client('ce') # Cost Explorer | |
| bedrock_client = boto3.client('bedrock') | |
| # Calculate date range | |
| end_date = datetime.now().date() | |
| start_date = end_date - timedelta(days=days) | |
| # Format dates for Cost Explorer API | |
| start = start_date.strftime('%Y-%m-%d') | |
| end = end_date.strftime('%Y-%m-%d') | |
| try: | |
| # Query Cost Explorer for Bedrock usage | |
| response = ce_client.get_cost_and_usage( | |
| TimePeriod={ | |
| 'Start': start, | |
| 'End': end | |
| }, | |
| Granularity='DAILY', | |
| Metrics=['UnblendedCost', 'UsageQuantity'], | |
| Filter={ | |
| 'Dimensions': { | |
| 'Key': 'SERVICE', | |
| 'Values': ['Amazon Bedrock'] | |
| } | |
| }, | |
| GroupBy=[ | |
| { | |
| 'Type': 'DIMENSION', | |
| 'Key': 'USAGE_TYPE' | |
| } | |
| ] | |
| ) | |
| # Process the results | |
| usage_data = defaultdict(lambda: {'cost': 0.0, 'tokens': 0.0}) | |
| for result in response['ResultsByTime']: | |
| for group in result['Groups']: | |
| usage_type = group['Keys'][0] | |
| cost = float(group['Metrics']['UnblendedCost']['Amount']) | |
| quantity = float(group['Metrics']['UsageQuantity']['Amount']) | |
| # Extract model name from usage type | |
| # Usage types typically look like: "US-East-ModelName-Input" or "US-East-ModelName-Output" | |
| model_name = extract_model_name(usage_type) | |
| usage_data[model_name]['cost'] += cost | |
| usage_data[model_name]['tokens'] += quantity | |
| usage_data[model_name]['usage_type'] = usage_type | |
| # Convert to DataFrame | |
| df_data = [] | |
| for model, data in usage_data.items(): | |
| df_data.append({ | |
| 'Model': model, | |
| 'Cost (USD)': round(data['cost'], 4), | |
| 'Tokens/Units': int(data['tokens']), | |
| 'Usage Type': data['usage_type'] | |
| }) | |
| df = pd.DataFrame(df_data) | |
| # Sort by the specified metric | |
| if group_by == 'cost': | |
| df = df.sort_values('Cost (USD)', ascending=False) | |
| else: | |
| df = df.sort_values('Tokens/Units', ascending=False) | |
| df = df.reset_index(drop=True) | |
| return df | |
| except Exception as e: | |
| print(f"Error retrieving Bedrock usage: {str(e)}") | |
| return None | |
| def extract_model_name(usage_type): | |
| """ | |
| Extract model name from AWS usage type string. | |
| Example usage types: | |
| - "USE1-Anthropic-Claude3-Sonnet-Input" | |
| - "USE1-Amazon-TitanText-Output" | |
| """ | |
| # Remove region prefix and direction suffix | |
| parts = usage_type.split('-') | |
| # Typical format: Region-Provider-Model-Direction | |
| if len(parts) >= 3: | |
| # Join middle parts as model name | |
| model_parts = parts[1:-1] if parts[-1] in ['Input', 'Output'] else parts[1:] | |
| return '-'.join(model_parts) | |
| return usage_type | |
| def get_detailed_bedrock_usage(days=7): | |
| """ | |
| Get detailed usage with CloudWatch Metrics (alternative method). | |
| This provides more granular token-level data. | |
| """ | |
| cloudwatch = boto3.client('cloudwatch') | |
| end_time = datetime.now() | |
| start_time = end_time - timedelta(days=days) | |
| # Metrics to query | |
| metrics = [ | |
| 'InputTokenCount', | |
| 'OutputTokenCount', | |
| 'InvocationCount' | |
| ] | |
| usage_data = [] | |
| try: | |
| # List available models (you may need to hardcode known models) | |
| models = [ | |
| 'anthropic.claude-sonnet-4-5-20250929-v1:0', | |
| 'openai.gpt-oss-120b-1:0', | |
| 'amazon.titan-text-express-v1', | |
| ] | |
| for model_id in models: | |
| model_data = {'Model': model_id} | |
| for metric_name in metrics: | |
| response = cloudwatch.get_metric_statistics( | |
| Namespace='AWS/Bedrock', | |
| MetricName=metric_name, | |
| Dimensions=[ | |
| { | |
| 'Name': 'ModelId', | |
| 'Value': model_id | |
| } | |
| ], | |
| StartTime=start_time, | |
| EndTime=end_time, | |
| Period=86400, # 1 day | |
| Statistics=['Sum'] | |
| ) | |
| total = sum([point['Sum'] for point in response['Datapoints']]) | |
| model_data[metric_name] = int(total) | |
| # Only add if there's actual usage | |
| if model_data.get('InvocationCount', 0) > 0: | |
| usage_data.append(model_data) | |
| df = pd.DataFrame(usage_data) | |
| return df | |
| except Exception as e: | |
| print(f"Error retrieving CloudWatch metrics: {str(e)}") | |
| return None | |
| # Main execution | |
| if __name__ == "__main__": | |
| print("Fetching Amazon Bedrock Usage...\n") | |
| # Method 1: Cost Explorer (shows cost) | |
| print("=" * 80) | |
| print("BEDROCK USAGE BY COST (Last 30 days)") | |
| print("=" * 80) | |
| df_cost = get_bedrock_usage(days=30, group_by='cost') | |
| if df_cost is not None and not df_cost.empty: | |
| print(df_cost.to_string(index=False)) | |
| print(f"\nTotal Cost: ${df_cost['Cost (USD)'].sum():.4f}") | |
| else: | |
| print("No usage data found or error occurred.") | |
| print("\n") | |
| # Method 2: CloudWatch Metrics (shows tokens) | |
| print("=" * 80) | |
| print("BEDROCK USAGE BY TOKENS (Last 7 days)") | |
| print("=" * 80) | |
| df_tokens = get_detailed_bedrock_usage(days=7) | |
| if df_tokens is not None and not df_tokens.empty: | |
| print(df_tokens.to_string(index=False)) | |
| print(f"\nTotal Input Tokens: {df_tokens['InputTokenCount'].sum():,}") | |
| print(f"Total Output Tokens: {df_tokens['OutputTokenCount'].sum():,}") | |
| print(f"Total Invocations: {df_tokens['InvocationCount'].sum():,}") | |
| else: | |
| print("No token data found or error occurred.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment