Last active
January 12, 2026 19:03
-
-
Save s3u/6a9801871528f9aa85bcda709b62aa24 to your computer and use it in GitHub Desktop.
ECR Inventory Analysis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| ECR Image Inventory and Cleanup Script for AWS ECS Environments | |
| A comprehensive tool for analyzing Docker images in Amazon ECR repositories, | |
| showing which images are currently in use by ECS services and which are | |
| candidates for cleanup. Now includes safe garbage collection functionality. | |
| Features: | |
| - Analyzes ECS services, task definitions, and CodeDeploy deployments | |
| - Categorizes images as Referenced, Unreferenced, or Garbage candidates | |
| - Supports both table and CSV output formats | |
| - Configurable garbage age threshold | |
| - Safe garbage collection with human confirmation | |
| - Dry-run mode for testing cleanup operations | |
| - Handles missing CodeDeploy applications gracefully | |
| Author: Muppet Platform Team, aka AI | |
| License: MIT | |
| GitHub: https://github.com/your-org/muppet-platform | |
| """ | |
| import boto3 | |
| import json | |
| import sys | |
| import csv | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Set, Tuple | |
| import argparse | |
| from tabulate import tabulate | |
| class ECRInventoryManager: | |
| def __init__(self, region: str = 'us-west-2', quiet: bool = False): | |
| self.ecs = boto3.client('ecs', region_name=region) | |
| self.ecr = boto3.client('ecr', region_name=region) | |
| self.codedeploy = boto3.client('codedeploy', region_name=region) | |
| self.region = region | |
| self.quiet = quiet | |
| def _print_warning(self, message: str): | |
| """Print warning message unless in quiet mode.""" | |
| if not self.quiet: | |
| print(message) | |
| def get_image_usage_details(self) -> Dict[str, Dict]: | |
| """Get detailed usage information for all images.""" | |
| usage_details = {} | |
| # Get all ECS clusters (muppet clusters) | |
| try: | |
| clusters = self.ecs.list_clusters()['clusterArns'] | |
| except Exception as e: | |
| self._print_warning(f"Error listing clusters: {e}") | |
| return usage_details | |
| for cluster_arn in clusters: | |
| cluster_name = cluster_arn.split('/')[-1] | |
| if not cluster_name.endswith('-cluster'): | |
| continue | |
| muppet_name = cluster_name.replace('-cluster', '') | |
| print(f"Analyzing muppet: {muppet_name}") | |
| # Get usage from ECS service | |
| self._analyze_ecs_service(cluster_name, muppet_name, usage_details) | |
| # Get usage from recent task definitions | |
| self._analyze_task_definitions(muppet_name, usage_details) | |
| # Get usage from CodeDeploy deployments | |
| self._analyze_codedeploy_deployments(muppet_name, usage_details) | |
| return usage_details | |
| def _analyze_ecs_service(self, cluster_name: str, service_name: str, usage_details: Dict): | |
| """Analyze ECS service for image usage.""" | |
| try: | |
| services = self.ecs.describe_services( | |
| cluster=cluster_name, | |
| services=[service_name] | |
| )['services'] | |
| if not services: | |
| return | |
| service = services[0] | |
| task_def_arn = service['taskDefinition'] | |
| # Get current task definition | |
| task_def = self.ecs.describe_task_definition( | |
| taskDefinition=task_def_arn | |
| )['taskDefinition'] | |
| # Extract images from container definitions | |
| for container in task_def['containerDefinitions']: | |
| image = container['image'] | |
| if image not in usage_details: | |
| usage_details[image] = { | |
| 'used_by': [], | |
| 'task_definitions': [], | |
| 'deployments': [], | |
| 'status': 'unused' | |
| } | |
| usage_details[image]['used_by'].append(f"ECS Service: {cluster_name}/{service_name}") | |
| usage_details[image]['task_definitions'].append({ | |
| 'arn': task_def_arn, | |
| 'revision': task_def['revision'], | |
| 'status': 'ACTIVE_SERVICE' | |
| }) | |
| usage_details[image]['status'] = 'active' | |
| except Exception as e: | |
| self._print_warning(f" Error analyzing ECS service {service_name}: {e}") | |
| def _analyze_task_definitions(self, muppet_name: str, usage_details: Dict, keep_revisions: int = 10): | |
| """Analyze recent task definitions for image usage.""" | |
| try: | |
| # Get recent task definitions | |
| task_defs = self.ecs.list_task_definitions( | |
| familyPrefix=muppet_name, | |
| status='ACTIVE', | |
| sort='DESC', | |
| maxResults=keep_revisions | |
| )['taskDefinitionArns'] | |
| for i, task_def_arn in enumerate(task_defs): | |
| task_def = self.ecs.describe_task_definition( | |
| taskDefinition=task_def_arn | |
| )['taskDefinition'] | |
| for container in task_def['containerDefinitions']: | |
| image = container['image'] | |
| if image not in usage_details: | |
| usage_details[image] = { | |
| 'used_by': [], | |
| 'task_definitions': [], | |
| 'deployments': [], | |
| 'status': 'unused' | |
| } | |
| # Determine status based on recency | |
| if i == 0: # Most recent | |
| status = 'current' | |
| elif i < 3: # Last 3 revisions | |
| status = 'recent' | |
| else: | |
| status = 'old' | |
| usage_details[image]['task_definitions'].append({ | |
| 'arn': task_def_arn, | |
| 'revision': task_def['revision'], | |
| 'status': status.upper() | |
| }) | |
| if usage_details[image]['status'] == 'unused' or status == 'current': | |
| usage_details[image]['status'] = status | |
| usage_details[image]['used_by'].append(f"Task Def: {muppet_name}:{task_def['revision']} ({status})") | |
| except Exception as e: | |
| self._print_warning(f" Error analyzing task definitions for {muppet_name}: {e}") | |
| def _analyze_codedeploy_deployments(self, muppet_name: str, usage_details: Dict): | |
| """Analyze CodeDeploy deployments for image usage.""" | |
| try: | |
| # First check if CodeDeploy application exists | |
| try: | |
| self.codedeploy.get_application(applicationName=muppet_name) | |
| except self.codedeploy.exceptions.ApplicationDoesNotExistException: | |
| # Application doesn't exist, skip silently (this is normal for many muppets) | |
| return | |
| except Exception as e: | |
| self._print_warning(f" Warning: Could not check CodeDeploy application {muppet_name}: {e}") | |
| return | |
| # Check if deployment group exists | |
| try: | |
| self.codedeploy.get_deployment_group( | |
| applicationName=muppet_name, | |
| deploymentGroupName=muppet_name | |
| ) | |
| except self.codedeploy.exceptions.DeploymentGroupDoesNotExistException: | |
| # Deployment group doesn't exist, skip silently | |
| return | |
| except Exception as e: | |
| self._print_warning(f" Warning: Could not check CodeDeploy deployment group {muppet_name}: {e}") | |
| return | |
| # Get recent deployments (last 30 days) | |
| deployments = self.codedeploy.list_deployments( | |
| applicationName=muppet_name, | |
| deploymentGroupName=muppet_name, | |
| includeOnlyStatuses=['Created', 'InProgress', 'Queued', 'Ready', 'Succeeded'], | |
| createTimeRange={ | |
| 'start': datetime.now() - timedelta(days=30), | |
| 'end': datetime.now() | |
| } | |
| )['deployments'] | |
| for deployment_id in deployments[:5]: # Check last 5 deployments | |
| try: | |
| deployment = self.codedeploy.get_deployment( | |
| deploymentId=deployment_id | |
| )['deploymentInfo'] | |
| # Extract task definition from AppSpec content | |
| if 'revision' in deployment: | |
| revision = deployment['revision'] | |
| # Check for different revision types and content structures | |
| appspec_content = None | |
| # Try different possible locations for AppSpec content | |
| if 'appSpecContent' in revision: | |
| if 'content' in revision['appSpecContent']: | |
| appspec_content = revision['appSpecContent']['content'] | |
| elif 'sha256' in revision['appSpecContent']: | |
| # AppSpec is stored as a hash reference, skip this deployment | |
| continue | |
| elif 'gitHubLocation' in revision or 's3Location' in revision: | |
| # AppSpec is stored externally, we can't easily access it | |
| continue | |
| if appspec_content: | |
| try: | |
| appspec = json.loads(appspec_content) | |
| for resource in appspec.get('Resources', []): | |
| if 'TargetService' in resource: | |
| target_service = resource['TargetService'] | |
| if 'Properties' in target_service and 'TaskDefinition' in target_service['Properties']: | |
| task_def_arn = target_service['Properties']['TaskDefinition'] | |
| # Get task definition and extract image | |
| try: | |
| task_def = self.ecs.describe_task_definition( | |
| taskDefinition=task_def_arn | |
| )['taskDefinition'] | |
| for container in task_def['containerDefinitions']: | |
| image = container['image'] | |
| if image not in usage_details: | |
| usage_details[image] = { | |
| 'used_by': [], | |
| 'task_definitions': [], | |
| 'deployments': [], | |
| 'status': 'unused' | |
| } | |
| usage_details[image]['deployments'].append({ | |
| 'id': deployment_id, | |
| 'status': deployment['status'], | |
| 'created': deployment['createTime'] | |
| }) | |
| usage_details[image]['used_by'].append(f"CodeDeploy: {deployment_id} ({deployment['status']})") | |
| if deployment['status'] in ['InProgress', 'Created', 'Queued', 'Ready']: | |
| usage_details[image]['status'] = 'deploying' | |
| elif usage_details[image]['status'] == 'unused': | |
| usage_details[image]['status'] = 'recent_deployment' | |
| except Exception as e: | |
| self._print_warning(f" Warning: Could not get task definition {task_def_arn} for deployment {deployment_id}: {e}") | |
| else: | |
| # Missing TaskDefinition in Properties | |
| continue | |
| except json.JSONDecodeError as e: | |
| self._print_warning(f" Warning: Could not parse AppSpec JSON for deployment {deployment_id}: Invalid JSON") | |
| except Exception as e: | |
| self._print_warning(f" Warning: Error processing AppSpec for deployment {deployment_id}: {e}") | |
| else: | |
| # No inline AppSpec content available, skip silently | |
| continue | |
| except Exception as e: | |
| self._print_warning(f" Warning: Could not get deployment details for {deployment_id}: {e}") | |
| except Exception as e: | |
| # Only print error for unexpected exceptions, not missing applications/groups | |
| if "ApplicationDoesNotExistException" not in str(e) and "DeploymentGroupDoesNotExistException" not in str(e): | |
| self._print_warning(f" Warning: Error analyzing CodeDeploy deployments for {muppet_name}: {e}") | |
| def get_ecr_inventory(self) -> List[Dict]: | |
| """Get complete inventory of all ECR images.""" | |
| inventory = [] | |
| try: | |
| # Get all ECR repositories | |
| repos = self.ecr.describe_repositories()['repositories'] | |
| for repo in repos: | |
| repo_name = repo['repositoryName'] | |
| print(f"Scanning ECR repository: {repo_name}") | |
| try: | |
| # Get all images in repository | |
| images = self.ecr.describe_images( | |
| repositoryName=repo_name | |
| )['imageDetails'] | |
| for image in images: | |
| # Build image URIs | |
| registry_id = image.get('registryId', '') | |
| image_tags = image.get('imageTags', []) | |
| image_digest = image['imageDigest'] | |
| pushed_at = image['imagePushedAt'] | |
| size_bytes = image.get('imageSizeInBytes', 0) | |
| # Create image URIs | |
| if image_tags: | |
| for tag in image_tags: | |
| inventory.append({ | |
| 'repository': repo_name, | |
| 'tag': tag, | |
| 'digest': image_digest[:12] + '...', # Shortened for display | |
| 'full_digest': image_digest, | |
| 'pushed_at': pushed_at, | |
| 'size_mb': round(size_bytes / (1024 * 1024), 1), | |
| 'size_bytes': size_bytes, | |
| 'image_uri': f"{registry_id}.dkr.ecr.{self.region}.amazonaws.com/{repo_name}:{tag}", | |
| 'is_tagged': True | |
| }) | |
| else: | |
| # Untagged image | |
| inventory.append({ | |
| 'repository': repo_name, | |
| 'tag': '<untagged>', | |
| 'digest': image_digest[:12] + '...', | |
| 'full_digest': image_digest, | |
| 'pushed_at': pushed_at, | |
| 'size_mb': round(size_bytes / (1024 * 1024), 1), | |
| 'size_bytes': size_bytes, | |
| 'image_uri': f"{registry_id}.dkr.ecr.{self.region}.amazonaws.com/{repo_name}@{image_digest}", | |
| 'is_tagged': False | |
| }) | |
| except Exception as e: | |
| self._print_warning(f" Error scanning repository {repo_name}: {e}") | |
| except Exception as e: | |
| self._print_warning(f"Error listing ECR repositories: {e}") | |
| return inventory | |
| def cleanup_garbage_images(self, garbage_age_days: int = 7, dry_run: bool = True) -> None: | |
| """Identify and optionally delete garbage images with human confirmation.""" | |
| print("ποΈ ECR Garbage Collection") | |
| print(f"Region: {self.region}") | |
| print(f"Garbage age threshold: {garbage_age_days} days") | |
| print(f"Mode: {'DRY RUN' if dry_run else 'DELETION MODE'}") | |
| print("=" * 80) | |
| print("\nπ Analyzing image usage...") | |
| usage_details = self.get_image_usage_details() | |
| print("\nπ¦ Scanning ECR repositories...") | |
| inventory = self.get_ecr_inventory() | |
| # Find garbage candidates | |
| garbage_candidates = [] | |
| total_garbage_size = 0 | |
| for item in inventory: | |
| image_uri = item['image_uri'] | |
| usage = usage_details.get(image_uri, {'used_by': []}) | |
| # Determine age in days | |
| age_days = (datetime.now(item['pushed_at'].tzinfo) - item['pushed_at']).days | |
| # Check if it's garbage (unreferenced and old enough) | |
| is_referenced = len(usage['used_by']) > 0 | |
| if not is_referenced and age_days >= garbage_age_days: | |
| garbage_candidates.append({ | |
| 'repository': item['repository'], | |
| 'tag': item['tag'], | |
| 'digest': item['full_digest'], | |
| 'age_days': age_days, | |
| 'size_mb': item['size_mb'], | |
| 'size_bytes': item['size_bytes'], | |
| 'image_uri': image_uri, | |
| 'pushed_at': item['pushed_at'] | |
| }) | |
| total_garbage_size += item['size_bytes'] | |
| if not garbage_candidates: | |
| print("\nβ No garbage images found! Your ECR repositories are clean.") | |
| return | |
| # Display garbage candidates | |
| print(f"\nπ Found {len(garbage_candidates)} garbage candidates:") | |
| print(f"πΎ Total size: {total_garbage_size / (1024**3):.2f} GB") | |
| print() | |
| # Group by repository for better display | |
| by_repo = {} | |
| for item in garbage_candidates: | |
| repo = item['repository'] | |
| if repo not in by_repo: | |
| by_repo[repo] = [] | |
| by_repo[repo].append(item) | |
| for repo, images in sorted(by_repo.items()): | |
| repo_size = sum(img['size_bytes'] for img in images) | |
| print(f"π¦ {repo} ({len(images)} images, {repo_size / (1024**3):.2f} GB)") | |
| for img in sorted(images, key=lambda x: x['age_days'], reverse=True): | |
| tag_display = img['tag'] if img['tag'] != '<untagged>' else f"<untagged:{img['digest'][:12]}>" | |
| print(f" ββ {tag_display:40} {img['age_days']:3d}d {img['size_mb']:6.1f} MB") | |
| if dry_run: | |
| print(f"\nπ DRY RUN MODE - No images will be deleted") | |
| print(f"π‘ To perform actual cleanup, run with --cleanup --no-dry-run") | |
| return | |
| # Confirmation for actual deletion | |
| print(f"\nβ οΈ DELETION MODE ACTIVE") | |
| print(f"π¨ This will permanently delete {len(garbage_candidates)} images ({total_garbage_size / (1024**3):.2f} GB)") | |
| print(f"π¨ This action cannot be undone!") | |
| # Multiple confirmation steps | |
| print(f"\nβ Are you sure you want to delete these {len(garbage_candidates)} garbage images?") | |
| confirm1 = input(" Type 'yes' to continue: ").strip().lower() | |
| if confirm1 != 'yes': | |
| print("β Cleanup cancelled by user") | |
| return | |
| print(f"\nβ Final confirmation: Delete {len(garbage_candidates)} images totaling {total_garbage_size / (1024**3):.2f} GB?") | |
| confirm2 = input(" Type 'DELETE' to confirm: ").strip() | |
| if confirm2 != 'DELETE': | |
| print("β Cleanup cancelled by user") | |
| return | |
| # Perform deletion | |
| print(f"\nποΈ Starting deletion of {len(garbage_candidates)} images...") | |
| deleted_count = 0 | |
| deleted_size = 0 | |
| failed_deletions = [] | |
| for item in garbage_candidates: | |
| try: | |
| repo = item['repository'] | |
| digest = item['digest'] | |
| print(f" Deleting {repo}@{digest[:12]}... ", end='', flush=True) | |
| # Delete the image | |
| self.ecr.batch_delete_image( | |
| repositoryName=repo, | |
| imageIds=[{'imageDigest': digest}] | |
| ) | |
| deleted_count += 1 | |
| deleted_size += item['size_bytes'] | |
| print("β ") | |
| except Exception as e: | |
| print(f"β Failed: {e}") | |
| failed_deletions.append({ | |
| 'item': item, | |
| 'error': str(e) | |
| }) | |
| # Summary | |
| print(f"\nπ Cleanup Summary:") | |
| print(f" β Successfully deleted: {deleted_count} images") | |
| print(f" πΎ Space reclaimed: {deleted_size / (1024**3):.2f} GB") | |
| if failed_deletions: | |
| print(f" β Failed deletions: {len(failed_deletions)}") | |
| print(f"\nπ Failed Deletions:") | |
| for failure in failed_deletions: | |
| item = failure['item'] | |
| error = failure['error'] | |
| print(f" ββ {item['repository']}@{item['digest'][:12]}: {error}") | |
| print(f"\nβ Cleanup completed!") | |
| def create_inventory_report(self, output_format: str = 'table', output_file: str = None, garbage_age_days: int = 7) -> None: | |
| """Create a comprehensive inventory report.""" | |
| print("π ECR Image Inventory for AWS ECS Environment") | |
| print(f"Region: {self.region}") | |
| print("=" * 80) | |
| print("\nπ Analyzing image usage...") | |
| usage_details = self.get_image_usage_details() | |
| print("\nπ¦ Scanning ECR repositories...") | |
| inventory = self.get_ecr_inventory() | |
| # Combine inventory with usage details | |
| report_data = [] | |
| total_size = 0 | |
| status_counts = {'referenced': 0, 'unreferenced': 0, 'garbage': 0} | |
| for item in inventory: | |
| image_uri = item['image_uri'] | |
| usage = usage_details.get(image_uri, { | |
| 'used_by': [], | |
| 'status': 'unused' | |
| }) | |
| # Determine age in days | |
| age_days = (datetime.now(item['pushed_at'].tzinfo) - item['pushed_at']).days | |
| # Determine reference status - simple and clear | |
| is_referenced = len(usage['used_by']) > 0 | |
| if is_referenced: | |
| final_status = 'REFERENCED' | |
| status_counts['referenced'] += 1 | |
| elif age_days >= garbage_age_days: | |
| final_status = 'GARBAGE' | |
| status_counts['garbage'] += 1 | |
| else: | |
| final_status = 'UNREFERENCED' | |
| status_counts['unreferenced'] += 1 | |
| # Get primary usage | |
| primary_usage = usage['used_by'][0] if usage['used_by'] else 'None' | |
| if len(usage['used_by']) > 1: | |
| primary_usage += f" (+{len(usage['used_by'])-1} more)" | |
| # Prepare row data | |
| row_data = { | |
| 'repository': item['repository'], | |
| 'tag': item['tag'], | |
| 'digest': item['digest'], | |
| 'full_digest': item['full_digest'], | |
| 'age_days': age_days, | |
| 'size_mb': item['size_mb'], | |
| 'size_bytes': item['size_bytes'], | |
| 'status': final_status, | |
| 'is_referenced': is_referenced, | |
| 'primary_usage': primary_usage, | |
| 'all_usage': '; '.join(usage['used_by']) if usage['used_by'] else 'None', | |
| 'image_uri': image_uri, | |
| 'pushed_at': item['pushed_at'].isoformat(), | |
| 'is_tagged': item['is_tagged'] | |
| } | |
| report_data.append(row_data) | |
| total_size += item['size_bytes'] | |
| # Sort by status (Referenced first, then Unreferenced, then Garbage), then by repository, then by age | |
| status_order = {'REFERENCED': 0, 'UNREFERENCED': 1, 'GARBAGE': 2} | |
| report_data.sort(key=lambda x: (status_order[x['status']], x['repository'], x['age_days'])) | |
| # Output based on format | |
| if output_format == 'csv': | |
| self._output_csv(report_data, output_file, total_size, status_counts, garbage_age_days) | |
| else: | |
| self._output_table(report_data, usage_details, total_size, status_counts, garbage_age_days) | |
| def _output_csv(self, report_data: List[Dict], output_file: str, total_size: int, status_counts: Dict, garbage_age_days: int): | |
| """Output inventory as CSV format.""" | |
| csv_filename = output_file or f"ecr_inventory_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
| # CSV headers | |
| csv_headers = [ | |
| 'repository', 'tag', 'digest', 'full_digest', 'age_days', 'size_mb', 'size_bytes', | |
| 'status', 'is_referenced', 'primary_usage', 'all_usage', 'image_uri', 'pushed_at', 'is_tagged' | |
| ] | |
| try: | |
| with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=csv_headers) | |
| writer.writeheader() | |
| for row in report_data: | |
| writer.writerow(row) | |
| print(f"\nπ CSV report saved to: {csv_filename}") | |
| print(f"π Total images: {len(report_data)}") | |
| print(f"πΎ Total size: {total_size / (1024**3):.2f} GB") | |
| # Print summary statistics | |
| print(f"\nπ Image Status Summary (garbage age threshold: {garbage_age_days} days):") | |
| print(f" π’ Referenced images: {status_counts['referenced']:3d} ({status_counts['referenced']/len(report_data)*100:.1f}%) - Currently in use") | |
| print(f" π‘ Unreferenced images: {status_counts['unreferenced']:3d} ({status_counts['unreferenced']/len(report_data)*100:.1f}%) - Not in use, age < {garbage_age_days}d") | |
| print(f" π΄ Garbage candidates: {status_counts['garbage']:3d} ({status_counts['garbage']/len(report_data)*100:.1f}%) - Not in use, age β₯ {garbage_age_days}d") | |
| # Calculate garbage cleanup potential | |
| garbage_size = sum(row['size_bytes'] for row in report_data if row['status'] == 'GARBAGE') | |
| if garbage_size > 0: | |
| print(f"\nπ° Garbage cleanup potential: {garbage_size / (1024**3):.2f} GB ({status_counts['garbage']} images)") | |
| except Exception as e: | |
| print(f"β Error writing CSV file: {e}") | |
| sys.exit(1) | |
| def _output_table(self, report_data: List[Dict], usage_details: Dict, total_size: int, status_counts: Dict, garbage_age_days: int): | |
| """Output inventory as formatted table.""" | |
| # Convert to table format | |
| table_data = [] | |
| for row in report_data: | |
| # Color coding for status | |
| status_display = row['status'] | |
| if row['status'] == 'REFERENCED': | |
| status_display = 'π’ REF' | |
| elif row['status'] == 'UNREFERENCED': | |
| status_display = 'π‘ UNREF' | |
| elif row['status'] == 'GARBAGE': | |
| status_display = 'π΄ GARBAGE' | |
| table_data.append([ | |
| row['repository'], | |
| row['tag'], | |
| row['digest'], | |
| f"{row['age_days']}d", | |
| f"{row['size_mb']} MB", | |
| status_display, | |
| row['primary_usage'][:50] + ('...' if len(row['primary_usage']) > 50 else '') | |
| ]) | |
| # Print summary table | |
| print(f"\nπ ECR Image Inventory ({len(report_data)} images)") | |
| print(f"Total size: {total_size / (1024**3):.2f} GB") | |
| print(f"Garbage age threshold: {garbage_age_days} days") | |
| print() | |
| headers = ['Repository', 'Tag', 'Digest', 'Age', 'Size', 'Status', 'Used By'] | |
| print(tabulate(table_data, headers=headers, tablefmt='grid')) | |
| # Print summary statistics | |
| print(f"\nπ Image Status Summary:") | |
| print(f" π’ Referenced images: {status_counts['referenced']:3d} ({status_counts['referenced']/len(report_data)*100:.1f}%) - Currently in use by ECS/TaskDef") | |
| print(f" π‘ Unreferenced images: {status_counts['unreferenced']:3d} ({status_counts['unreferenced']/len(report_data)*100:.1f}%) - Not in use, age < {garbage_age_days} days") | |
| print(f" π΄ Garbage candidates: {status_counts['garbage']:3d} ({status_counts['garbage']/len(report_data)*100:.1f}%) - Not in use, age β₯ {garbage_age_days} days") | |
| # Calculate garbage cleanup potential | |
| garbage_size = sum(row['size_bytes'] for row in report_data if row['status'] == 'GARBAGE') | |
| if garbage_size > 0: | |
| print(f"\nπ° Garbage cleanup potential: {garbage_size / (1024**3):.2f} GB ({status_counts['garbage']} images)") | |
| # Print detailed usage for referenced images only | |
| print(f"\nπ Referenced Images Details:") | |
| referenced_images = [(uri, details) for uri, details in usage_details.items() | |
| if len(details['used_by']) > 0] | |
| if referenced_images: | |
| for image_uri, details in sorted(referenced_images): | |
| print(f"\nπ¦ {image_uri}") | |
| for usage in details['used_by']: | |
| print(f" ββ {usage}") | |
| else: | |
| print(" No referenced images found.") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description='ECR Image Inventory for AWS ECS Environment', | |
| epilog=''' | |
| Image Classifications: | |
| π’ REFERENCED Images currently in use by ECS services or task definitions | |
| π‘ UNREFERENCED Images not in use, but younger than garbage age threshold | |
| π΄ GARBAGE Images not in use and older than garbage age threshold (safe to delete) | |
| Examples: | |
| %(prog)s --region us-west-2 | |
| %(prog)s --format csv --output inventory.csv --garbage-age 14 | |
| %(prog)s --quiet --garbage-age 3 | |
| ''', | |
| formatter_class=argparse.RawDescriptionHelpFormatter | |
| ) | |
| parser.add_argument('--region', default='us-west-2', help='AWS region') | |
| parser.add_argument('--format', choices=['table', 'csv'], default='table', | |
| help='Output format (default: table)') | |
| parser.add_argument('--output', '-o', help='Output file name (for CSV format)') | |
| parser.add_argument('--quiet', '-q', action='store_true', | |
| help='Suppress warning messages') | |
| parser.add_argument('--garbage-age', type=int, default=7, | |
| help='Age threshold in days for garbage candidates (default: 7)') | |
| args = parser.parse_args() | |
| try: | |
| inventory_manager = ECRInventoryManager(region=args.region, quiet=args.quiet) | |
| inventory_manager.create_inventory_report( | |
| output_format=args.format, | |
| output_file=args.output, | |
| garbage_age_days=args.garbage_age | |
| ) | |
| except KeyboardInterrupt: | |
| print("\n\nβ οΈ Interrupted by user") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"\nβ Error: {e}") | |
| sys.exit(1) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment