Skip to content

Instantly share code, notes, and snippets.

@s3u
Last active January 12, 2026 19:03
Show Gist options
  • Select an option

  • Save s3u/6a9801871528f9aa85bcda709b62aa24 to your computer and use it in GitHub Desktop.

Select an option

Save s3u/6a9801871528f9aa85bcda709b62aa24 to your computer and use it in GitHub Desktop.
ECR Inventory Analysis
#!/usr/bin/env python3
"""
ECR Image Inventory and Cleanup Script for AWS ECS Environments
A comprehensive tool for analyzing Docker images in Amazon ECR repositories,
showing which images are currently in use by ECS services and which are
candidates for cleanup. Now includes safe garbage collection functionality.
Features:
- Analyzes ECS services, task definitions, and CodeDeploy deployments
- Categorizes images as Referenced, Unreferenced, or Garbage candidates
- Supports both table and CSV output formats
- Configurable garbage age threshold
- Safe garbage collection with human confirmation
- Dry-run mode for testing cleanup operations
- Handles missing CodeDeploy applications gracefully
Author: Muppet Platform Team, aka AI
License: MIT
GitHub: https://github.com/your-org/muppet-platform
"""
import boto3
import json
import sys
import csv
from datetime import datetime, timedelta
from typing import Dict, List, Set, Tuple
import argparse
from tabulate import tabulate
class ECRInventoryManager:
def __init__(self, region: str = 'us-west-2', quiet: bool = False):
self.ecs = boto3.client('ecs', region_name=region)
self.ecr = boto3.client('ecr', region_name=region)
self.codedeploy = boto3.client('codedeploy', region_name=region)
self.region = region
self.quiet = quiet
def _print_warning(self, message: str):
"""Print warning message unless in quiet mode."""
if not self.quiet:
print(message)
def get_image_usage_details(self) -> Dict[str, Dict]:
"""Get detailed usage information for all images."""
usage_details = {}
# Get all ECS clusters (muppet clusters)
try:
clusters = self.ecs.list_clusters()['clusterArns']
except Exception as e:
self._print_warning(f"Error listing clusters: {e}")
return usage_details
for cluster_arn in clusters:
cluster_name = cluster_arn.split('/')[-1]
if not cluster_name.endswith('-cluster'):
continue
muppet_name = cluster_name.replace('-cluster', '')
print(f"Analyzing muppet: {muppet_name}")
# Get usage from ECS service
self._analyze_ecs_service(cluster_name, muppet_name, usage_details)
# Get usage from recent task definitions
self._analyze_task_definitions(muppet_name, usage_details)
# Get usage from CodeDeploy deployments
self._analyze_codedeploy_deployments(muppet_name, usage_details)
return usage_details
def _analyze_ecs_service(self, cluster_name: str, service_name: str, usage_details: Dict):
"""Analyze ECS service for image usage."""
try:
services = self.ecs.describe_services(
cluster=cluster_name,
services=[service_name]
)['services']
if not services:
return
service = services[0]
task_def_arn = service['taskDefinition']
# Get current task definition
task_def = self.ecs.describe_task_definition(
taskDefinition=task_def_arn
)['taskDefinition']
# Extract images from container definitions
for container in task_def['containerDefinitions']:
image = container['image']
if image not in usage_details:
usage_details[image] = {
'used_by': [],
'task_definitions': [],
'deployments': [],
'status': 'unused'
}
usage_details[image]['used_by'].append(f"ECS Service: {cluster_name}/{service_name}")
usage_details[image]['task_definitions'].append({
'arn': task_def_arn,
'revision': task_def['revision'],
'status': 'ACTIVE_SERVICE'
})
usage_details[image]['status'] = 'active'
except Exception as e:
self._print_warning(f" Error analyzing ECS service {service_name}: {e}")
def _analyze_task_definitions(self, muppet_name: str, usage_details: Dict, keep_revisions: int = 10):
"""Analyze recent task definitions for image usage."""
try:
# Get recent task definitions
task_defs = self.ecs.list_task_definitions(
familyPrefix=muppet_name,
status='ACTIVE',
sort='DESC',
maxResults=keep_revisions
)['taskDefinitionArns']
for i, task_def_arn in enumerate(task_defs):
task_def = self.ecs.describe_task_definition(
taskDefinition=task_def_arn
)['taskDefinition']
for container in task_def['containerDefinitions']:
image = container['image']
if image not in usage_details:
usage_details[image] = {
'used_by': [],
'task_definitions': [],
'deployments': [],
'status': 'unused'
}
# Determine status based on recency
if i == 0: # Most recent
status = 'current'
elif i < 3: # Last 3 revisions
status = 'recent'
else:
status = 'old'
usage_details[image]['task_definitions'].append({
'arn': task_def_arn,
'revision': task_def['revision'],
'status': status.upper()
})
if usage_details[image]['status'] == 'unused' or status == 'current':
usage_details[image]['status'] = status
usage_details[image]['used_by'].append(f"Task Def: {muppet_name}:{task_def['revision']} ({status})")
except Exception as e:
self._print_warning(f" Error analyzing task definitions for {muppet_name}: {e}")
def _analyze_codedeploy_deployments(self, muppet_name: str, usage_details: Dict):
"""Analyze CodeDeploy deployments for image usage."""
try:
# First check if CodeDeploy application exists
try:
self.codedeploy.get_application(applicationName=muppet_name)
except self.codedeploy.exceptions.ApplicationDoesNotExistException:
# Application doesn't exist, skip silently (this is normal for many muppets)
return
except Exception as e:
self._print_warning(f" Warning: Could not check CodeDeploy application {muppet_name}: {e}")
return
# Check if deployment group exists
try:
self.codedeploy.get_deployment_group(
applicationName=muppet_name,
deploymentGroupName=muppet_name
)
except self.codedeploy.exceptions.DeploymentGroupDoesNotExistException:
# Deployment group doesn't exist, skip silently
return
except Exception as e:
self._print_warning(f" Warning: Could not check CodeDeploy deployment group {muppet_name}: {e}")
return
# Get recent deployments (last 30 days)
deployments = self.codedeploy.list_deployments(
applicationName=muppet_name,
deploymentGroupName=muppet_name,
includeOnlyStatuses=['Created', 'InProgress', 'Queued', 'Ready', 'Succeeded'],
createTimeRange={
'start': datetime.now() - timedelta(days=30),
'end': datetime.now()
}
)['deployments']
for deployment_id in deployments[:5]: # Check last 5 deployments
try:
deployment = self.codedeploy.get_deployment(
deploymentId=deployment_id
)['deploymentInfo']
# Extract task definition from AppSpec content
if 'revision' in deployment:
revision = deployment['revision']
# Check for different revision types and content structures
appspec_content = None
# Try different possible locations for AppSpec content
if 'appSpecContent' in revision:
if 'content' in revision['appSpecContent']:
appspec_content = revision['appSpecContent']['content']
elif 'sha256' in revision['appSpecContent']:
# AppSpec is stored as a hash reference, skip this deployment
continue
elif 'gitHubLocation' in revision or 's3Location' in revision:
# AppSpec is stored externally, we can't easily access it
continue
if appspec_content:
try:
appspec = json.loads(appspec_content)
for resource in appspec.get('Resources', []):
if 'TargetService' in resource:
target_service = resource['TargetService']
if 'Properties' in target_service and 'TaskDefinition' in target_service['Properties']:
task_def_arn = target_service['Properties']['TaskDefinition']
# Get task definition and extract image
try:
task_def = self.ecs.describe_task_definition(
taskDefinition=task_def_arn
)['taskDefinition']
for container in task_def['containerDefinitions']:
image = container['image']
if image not in usage_details:
usage_details[image] = {
'used_by': [],
'task_definitions': [],
'deployments': [],
'status': 'unused'
}
usage_details[image]['deployments'].append({
'id': deployment_id,
'status': deployment['status'],
'created': deployment['createTime']
})
usage_details[image]['used_by'].append(f"CodeDeploy: {deployment_id} ({deployment['status']})")
if deployment['status'] in ['InProgress', 'Created', 'Queued', 'Ready']:
usage_details[image]['status'] = 'deploying'
elif usage_details[image]['status'] == 'unused':
usage_details[image]['status'] = 'recent_deployment'
except Exception as e:
self._print_warning(f" Warning: Could not get task definition {task_def_arn} for deployment {deployment_id}: {e}")
else:
# Missing TaskDefinition in Properties
continue
except json.JSONDecodeError as e:
self._print_warning(f" Warning: Could not parse AppSpec JSON for deployment {deployment_id}: Invalid JSON")
except Exception as e:
self._print_warning(f" Warning: Error processing AppSpec for deployment {deployment_id}: {e}")
else:
# No inline AppSpec content available, skip silently
continue
except Exception as e:
self._print_warning(f" Warning: Could not get deployment details for {deployment_id}: {e}")
except Exception as e:
# Only print error for unexpected exceptions, not missing applications/groups
if "ApplicationDoesNotExistException" not in str(e) and "DeploymentGroupDoesNotExistException" not in str(e):
self._print_warning(f" Warning: Error analyzing CodeDeploy deployments for {muppet_name}: {e}")
def get_ecr_inventory(self) -> List[Dict]:
"""Get complete inventory of all ECR images."""
inventory = []
try:
# Get all ECR repositories
repos = self.ecr.describe_repositories()['repositories']
for repo in repos:
repo_name = repo['repositoryName']
print(f"Scanning ECR repository: {repo_name}")
try:
# Get all images in repository
images = self.ecr.describe_images(
repositoryName=repo_name
)['imageDetails']
for image in images:
# Build image URIs
registry_id = image.get('registryId', '')
image_tags = image.get('imageTags', [])
image_digest = image['imageDigest']
pushed_at = image['imagePushedAt']
size_bytes = image.get('imageSizeInBytes', 0)
# Create image URIs
if image_tags:
for tag in image_tags:
inventory.append({
'repository': repo_name,
'tag': tag,
'digest': image_digest[:12] + '...', # Shortened for display
'full_digest': image_digest,
'pushed_at': pushed_at,
'size_mb': round(size_bytes / (1024 * 1024), 1),
'size_bytes': size_bytes,
'image_uri': f"{registry_id}.dkr.ecr.{self.region}.amazonaws.com/{repo_name}:{tag}",
'is_tagged': True
})
else:
# Untagged image
inventory.append({
'repository': repo_name,
'tag': '<untagged>',
'digest': image_digest[:12] + '...',
'full_digest': image_digest,
'pushed_at': pushed_at,
'size_mb': round(size_bytes / (1024 * 1024), 1),
'size_bytes': size_bytes,
'image_uri': f"{registry_id}.dkr.ecr.{self.region}.amazonaws.com/{repo_name}@{image_digest}",
'is_tagged': False
})
except Exception as e:
self._print_warning(f" Error scanning repository {repo_name}: {e}")
except Exception as e:
self._print_warning(f"Error listing ECR repositories: {e}")
return inventory
def cleanup_garbage_images(self, garbage_age_days: int = 7, dry_run: bool = True) -> None:
"""Identify and optionally delete garbage images with human confirmation."""
print("πŸ—‘οΈ ECR Garbage Collection")
print(f"Region: {self.region}")
print(f"Garbage age threshold: {garbage_age_days} days")
print(f"Mode: {'DRY RUN' if dry_run else 'DELETION MODE'}")
print("=" * 80)
print("\nπŸ“Š Analyzing image usage...")
usage_details = self.get_image_usage_details()
print("\nπŸ“¦ Scanning ECR repositories...")
inventory = self.get_ecr_inventory()
# Find garbage candidates
garbage_candidates = []
total_garbage_size = 0
for item in inventory:
image_uri = item['image_uri']
usage = usage_details.get(image_uri, {'used_by': []})
# Determine age in days
age_days = (datetime.now(item['pushed_at'].tzinfo) - item['pushed_at']).days
# Check if it's garbage (unreferenced and old enough)
is_referenced = len(usage['used_by']) > 0
if not is_referenced and age_days >= garbage_age_days:
garbage_candidates.append({
'repository': item['repository'],
'tag': item['tag'],
'digest': item['full_digest'],
'age_days': age_days,
'size_mb': item['size_mb'],
'size_bytes': item['size_bytes'],
'image_uri': image_uri,
'pushed_at': item['pushed_at']
})
total_garbage_size += item['size_bytes']
if not garbage_candidates:
print("\nβœ… No garbage images found! Your ECR repositories are clean.")
return
# Display garbage candidates
print(f"\nπŸ” Found {len(garbage_candidates)} garbage candidates:")
print(f"πŸ’Ύ Total size: {total_garbage_size / (1024**3):.2f} GB")
print()
# Group by repository for better display
by_repo = {}
for item in garbage_candidates:
repo = item['repository']
if repo not in by_repo:
by_repo[repo] = []
by_repo[repo].append(item)
for repo, images in sorted(by_repo.items()):
repo_size = sum(img['size_bytes'] for img in images)
print(f"πŸ“¦ {repo} ({len(images)} images, {repo_size / (1024**3):.2f} GB)")
for img in sorted(images, key=lambda x: x['age_days'], reverse=True):
tag_display = img['tag'] if img['tag'] != '<untagged>' else f"<untagged:{img['digest'][:12]}>"
print(f" └─ {tag_display:40} {img['age_days']:3d}d {img['size_mb']:6.1f} MB")
if dry_run:
print(f"\nπŸ” DRY RUN MODE - No images will be deleted")
print(f"πŸ’‘ To perform actual cleanup, run with --cleanup --no-dry-run")
return
# Confirmation for actual deletion
print(f"\n⚠️ DELETION MODE ACTIVE")
print(f"🚨 This will permanently delete {len(garbage_candidates)} images ({total_garbage_size / (1024**3):.2f} GB)")
print(f"🚨 This action cannot be undone!")
# Multiple confirmation steps
print(f"\n❓ Are you sure you want to delete these {len(garbage_candidates)} garbage images?")
confirm1 = input(" Type 'yes' to continue: ").strip().lower()
if confirm1 != 'yes':
print("❌ Cleanup cancelled by user")
return
print(f"\n❓ Final confirmation: Delete {len(garbage_candidates)} images totaling {total_garbage_size / (1024**3):.2f} GB?")
confirm2 = input(" Type 'DELETE' to confirm: ").strip()
if confirm2 != 'DELETE':
print("❌ Cleanup cancelled by user")
return
# Perform deletion
print(f"\nπŸ—‘οΈ Starting deletion of {len(garbage_candidates)} images...")
deleted_count = 0
deleted_size = 0
failed_deletions = []
for item in garbage_candidates:
try:
repo = item['repository']
digest = item['digest']
print(f" Deleting {repo}@{digest[:12]}... ", end='', flush=True)
# Delete the image
self.ecr.batch_delete_image(
repositoryName=repo,
imageIds=[{'imageDigest': digest}]
)
deleted_count += 1
deleted_size += item['size_bytes']
print("βœ…")
except Exception as e:
print(f"❌ Failed: {e}")
failed_deletions.append({
'item': item,
'error': str(e)
})
# Summary
print(f"\nπŸ“Š Cleanup Summary:")
print(f" βœ… Successfully deleted: {deleted_count} images")
print(f" πŸ’Ύ Space reclaimed: {deleted_size / (1024**3):.2f} GB")
if failed_deletions:
print(f" ❌ Failed deletions: {len(failed_deletions)}")
print(f"\nπŸ” Failed Deletions:")
for failure in failed_deletions:
item = failure['item']
error = failure['error']
print(f" └─ {item['repository']}@{item['digest'][:12]}: {error}")
print(f"\nβœ… Cleanup completed!")
def create_inventory_report(self, output_format: str = 'table', output_file: str = None, garbage_age_days: int = 7) -> None:
"""Create a comprehensive inventory report."""
print("πŸ” ECR Image Inventory for AWS ECS Environment")
print(f"Region: {self.region}")
print("=" * 80)
print("\nπŸ“Š Analyzing image usage...")
usage_details = self.get_image_usage_details()
print("\nπŸ“¦ Scanning ECR repositories...")
inventory = self.get_ecr_inventory()
# Combine inventory with usage details
report_data = []
total_size = 0
status_counts = {'referenced': 0, 'unreferenced': 0, 'garbage': 0}
for item in inventory:
image_uri = item['image_uri']
usage = usage_details.get(image_uri, {
'used_by': [],
'status': 'unused'
})
# Determine age in days
age_days = (datetime.now(item['pushed_at'].tzinfo) - item['pushed_at']).days
# Determine reference status - simple and clear
is_referenced = len(usage['used_by']) > 0
if is_referenced:
final_status = 'REFERENCED'
status_counts['referenced'] += 1
elif age_days >= garbage_age_days:
final_status = 'GARBAGE'
status_counts['garbage'] += 1
else:
final_status = 'UNREFERENCED'
status_counts['unreferenced'] += 1
# Get primary usage
primary_usage = usage['used_by'][0] if usage['used_by'] else 'None'
if len(usage['used_by']) > 1:
primary_usage += f" (+{len(usage['used_by'])-1} more)"
# Prepare row data
row_data = {
'repository': item['repository'],
'tag': item['tag'],
'digest': item['digest'],
'full_digest': item['full_digest'],
'age_days': age_days,
'size_mb': item['size_mb'],
'size_bytes': item['size_bytes'],
'status': final_status,
'is_referenced': is_referenced,
'primary_usage': primary_usage,
'all_usage': '; '.join(usage['used_by']) if usage['used_by'] else 'None',
'image_uri': image_uri,
'pushed_at': item['pushed_at'].isoformat(),
'is_tagged': item['is_tagged']
}
report_data.append(row_data)
total_size += item['size_bytes']
# Sort by status (Referenced first, then Unreferenced, then Garbage), then by repository, then by age
status_order = {'REFERENCED': 0, 'UNREFERENCED': 1, 'GARBAGE': 2}
report_data.sort(key=lambda x: (status_order[x['status']], x['repository'], x['age_days']))
# Output based on format
if output_format == 'csv':
self._output_csv(report_data, output_file, total_size, status_counts, garbage_age_days)
else:
self._output_table(report_data, usage_details, total_size, status_counts, garbage_age_days)
def _output_csv(self, report_data: List[Dict], output_file: str, total_size: int, status_counts: Dict, garbage_age_days: int):
"""Output inventory as CSV format."""
csv_filename = output_file or f"ecr_inventory_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
# CSV headers
csv_headers = [
'repository', 'tag', 'digest', 'full_digest', 'age_days', 'size_mb', 'size_bytes',
'status', 'is_referenced', 'primary_usage', 'all_usage', 'image_uri', 'pushed_at', 'is_tagged'
]
try:
with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_headers)
writer.writeheader()
for row in report_data:
writer.writerow(row)
print(f"\nπŸ“„ CSV report saved to: {csv_filename}")
print(f"πŸ“‹ Total images: {len(report_data)}")
print(f"πŸ’Ύ Total size: {total_size / (1024**3):.2f} GB")
# Print summary statistics
print(f"\nπŸ“Š Image Status Summary (garbage age threshold: {garbage_age_days} days):")
print(f" 🟒 Referenced images: {status_counts['referenced']:3d} ({status_counts['referenced']/len(report_data)*100:.1f}%) - Currently in use")
print(f" 🟑 Unreferenced images: {status_counts['unreferenced']:3d} ({status_counts['unreferenced']/len(report_data)*100:.1f}%) - Not in use, age < {garbage_age_days}d")
print(f" πŸ”΄ Garbage candidates: {status_counts['garbage']:3d} ({status_counts['garbage']/len(report_data)*100:.1f}%) - Not in use, age β‰₯ {garbage_age_days}d")
# Calculate garbage cleanup potential
garbage_size = sum(row['size_bytes'] for row in report_data if row['status'] == 'GARBAGE')
if garbage_size > 0:
print(f"\nπŸ’° Garbage cleanup potential: {garbage_size / (1024**3):.2f} GB ({status_counts['garbage']} images)")
except Exception as e:
print(f"❌ Error writing CSV file: {e}")
sys.exit(1)
def _output_table(self, report_data: List[Dict], usage_details: Dict, total_size: int, status_counts: Dict, garbage_age_days: int):
"""Output inventory as formatted table."""
# Convert to table format
table_data = []
for row in report_data:
# Color coding for status
status_display = row['status']
if row['status'] == 'REFERENCED':
status_display = '🟒 REF'
elif row['status'] == 'UNREFERENCED':
status_display = '🟑 UNREF'
elif row['status'] == 'GARBAGE':
status_display = 'πŸ”΄ GARBAGE'
table_data.append([
row['repository'],
row['tag'],
row['digest'],
f"{row['age_days']}d",
f"{row['size_mb']} MB",
status_display,
row['primary_usage'][:50] + ('...' if len(row['primary_usage']) > 50 else '')
])
# Print summary table
print(f"\nπŸ“‹ ECR Image Inventory ({len(report_data)} images)")
print(f"Total size: {total_size / (1024**3):.2f} GB")
print(f"Garbage age threshold: {garbage_age_days} days")
print()
headers = ['Repository', 'Tag', 'Digest', 'Age', 'Size', 'Status', 'Used By']
print(tabulate(table_data, headers=headers, tablefmt='grid'))
# Print summary statistics
print(f"\nπŸ“Š Image Status Summary:")
print(f" 🟒 Referenced images: {status_counts['referenced']:3d} ({status_counts['referenced']/len(report_data)*100:.1f}%) - Currently in use by ECS/TaskDef")
print(f" 🟑 Unreferenced images: {status_counts['unreferenced']:3d} ({status_counts['unreferenced']/len(report_data)*100:.1f}%) - Not in use, age < {garbage_age_days} days")
print(f" πŸ”΄ Garbage candidates: {status_counts['garbage']:3d} ({status_counts['garbage']/len(report_data)*100:.1f}%) - Not in use, age β‰₯ {garbage_age_days} days")
# Calculate garbage cleanup potential
garbage_size = sum(row['size_bytes'] for row in report_data if row['status'] == 'GARBAGE')
if garbage_size > 0:
print(f"\nπŸ’° Garbage cleanup potential: {garbage_size / (1024**3):.2f} GB ({status_counts['garbage']} images)")
# Print detailed usage for referenced images only
print(f"\nπŸ” Referenced Images Details:")
referenced_images = [(uri, details) for uri, details in usage_details.items()
if len(details['used_by']) > 0]
if referenced_images:
for image_uri, details in sorted(referenced_images):
print(f"\nπŸ“¦ {image_uri}")
for usage in details['used_by']:
print(f" └─ {usage}")
else:
print(" No referenced images found.")
def main():
parser = argparse.ArgumentParser(
description='ECR Image Inventory for AWS ECS Environment',
epilog='''
Image Classifications:
🟒 REFERENCED Images currently in use by ECS services or task definitions
🟑 UNREFERENCED Images not in use, but younger than garbage age threshold
πŸ”΄ GARBAGE Images not in use and older than garbage age threshold (safe to delete)
Examples:
%(prog)s --region us-west-2
%(prog)s --format csv --output inventory.csv --garbage-age 14
%(prog)s --quiet --garbage-age 3
''',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('--region', default='us-west-2', help='AWS region')
parser.add_argument('--format', choices=['table', 'csv'], default='table',
help='Output format (default: table)')
parser.add_argument('--output', '-o', help='Output file name (for CSV format)')
parser.add_argument('--quiet', '-q', action='store_true',
help='Suppress warning messages')
parser.add_argument('--garbage-age', type=int, default=7,
help='Age threshold in days for garbage candidates (default: 7)')
args = parser.parse_args()
try:
inventory_manager = ECRInventoryManager(region=args.region, quiet=args.quiet)
inventory_manager.create_inventory_report(
output_format=args.format,
output_file=args.output,
garbage_age_days=args.garbage_age
)
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n❌ Error: {e}")
sys.exit(1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment