Last active
August 14, 2025 15:29
-
-
Save zzstoatzz/c8a4ad709876c44cefd40e65ac983485 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --quiet --script | |
| # /// script | |
| # dependencies = ["prefect"] | |
| # /// | |
| """ | |
| Asset management script for Prefect Cloud. | |
| This script provides CRUD operations for managing assets in Prefect Cloud workspaces. | |
| Assets are created automatically by Prefect when events are emitted, but this script | |
| allows you to list and delete them programmatically. | |
| ## Prerequisites | |
| - uv must be installed | |
| - You must be authenticated to Prefect Cloud | |
| - You must have a workspace selected | |
| ## Setup | |
| 1. Install Prefect and authenticate: | |
| ```bash | |
| uvx prefect cloud login | |
| ``` | |
| 2. Select your workspace: | |
| ```bash | |
| uvx prefect cloud workspace set --workspace your-workspace-name | |
| ``` | |
| ## Usage | |
| ### List assets | |
| ```bash | |
| python asset_manager.py list | |
| python asset_manager.py list --limit 50 # Show only first 50 assets | |
| ``` | |
| ### Delete old assets | |
| ```bash | |
| # Dry run - see what would be deleted | |
| python asset_manager.py delete-old --days 7 --dry-run | |
| # Actually delete assets older than 7 days (with confirmation) | |
| python asset_manager.py delete-old --days 7 | |
| # Delete without confirmation prompt | |
| python asset_manager.py delete-old --days 7 --force | |
| # Delete ALL assets (older than 0 days) | |
| python asset_manager.py delete-old --days 0 --force | |
| ``` | |
| ### Test connection | |
| ```bash | |
| python asset_manager.py test | |
| ``` | |
| ## How it works | |
| - Assets are tracked by their "key" which is typically a URI (e.g., `slack://workspace/channel/123`) | |
| - The script uses the `/assets/latest-dependencies` endpoint to determine when assets were last updated | |
| - Assets without any materialization events are considered "old" and will be deleted | |
| - All operations use the Prefect client which handles authentication automatically | |
| ## Examples | |
| 1. Clean up assets older than 30 days: | |
| ```bash | |
| python asset_manager.py delete-old --days 30 | |
| ``` | |
| 2. Delete all assets in a workspace (use with caution!): | |
| ```bash | |
| python asset_manager.py delete-old --days 0 --force | |
| ``` | |
| 3. Check how many assets you have: | |
| ```bash | |
| python asset_manager.py list | grep "Found" | |
| ``` | |
| """ | |
| import argparse | |
| import asyncio | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Any | |
| from urllib.parse import quote | |
| from prefect import get_client | |
| # Core asset functions | |
| async def list_assets(limit: int = 100, offset: int = 0) -> list[dict[str, Any]]: | |
| """List assets in the workspace.""" | |
| async with get_client() as client: | |
| response = await client._client.get( | |
| "/assets/", params={"limit": limit, "offset": offset} | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| # The response appears to be a list directly, not wrapped in an object | |
| if isinstance(data, list): | |
| # If we get a list back, check if it's the full list or just a page | |
| # For now, assume it's always the full list based on our testing | |
| if offset == 0: | |
| return data | |
| else: | |
| return [] # No pagination support if returning raw list | |
| return data.get("results", []) | |
| async def get_asset(asset_key: str) -> dict[str, Any]: | |
| """Get a specific asset by key.""" | |
| async with get_client() as client: | |
| encoded_key = quote(asset_key, safe="") | |
| response = await client._client.get(f"/assets/key/{encoded_key}") | |
| response.raise_for_status() | |
| return response.json() | |
| async def delete_asset(asset_key: str) -> None: | |
| """Delete an asset by key.""" | |
| async with get_client() as client: | |
| encoded_key = quote(asset_key, safe="") | |
| response = await client._client.delete(f"/assets/key/{encoded_key}") | |
| response.raise_for_status() | |
| async def _get_latest_dependencies() -> list[dict[str, Any]]: | |
| """Get latest asset dependencies (internal use).""" | |
| async with get_client() as client: | |
| response = await client._client.get("/assets/latest-dependencies") | |
| response.raise_for_status() | |
| return response.json() | |
| # Main functions for different operations | |
| async def delete_old_assets(days: int, dry_run: bool = False, force: bool = False): | |
| """Delete assets older than specified days.""" | |
| cutoff_time = datetime.now(timezone.utc) - timedelta(days=days) | |
| print( | |
| f"{'DRY RUN: ' if dry_run else ''}Deleting assets older than {cutoff_time.isoformat()}" | |
| ) | |
| # Get all assets | |
| all_assets = [] | |
| offset = 0 | |
| limit = 100 | |
| page = 0 | |
| print("Fetching all assets...") | |
| while True: | |
| page += 1 | |
| print(f" Fetching page {page} (offset {offset})...") | |
| assets = await list_assets(limit=limit, offset=offset) | |
| if not assets: | |
| break | |
| all_assets.extend(assets) | |
| if len(assets) < limit: | |
| break | |
| offset += limit | |
| print(f"Found {len(all_assets)} total assets") | |
| # Get latest dependencies to find last materialization times | |
| print("Fetching asset dependencies to determine last update times...") | |
| dependencies = await _get_latest_dependencies() | |
| # Create a map of asset keys to their latest materialization time | |
| asset_last_updated = {} | |
| for dep in dependencies: | |
| # Track both upstream and downstream assets | |
| for key in [dep.get("upstream"), dep.get("downstream")]: | |
| if key: | |
| occurred = dep.get("occurred") | |
| if occurred: | |
| occurred_time = datetime.fromisoformat( | |
| occurred.replace("Z", "+00:00") | |
| ) | |
| # Keep the most recent time for each asset | |
| if ( | |
| key not in asset_last_updated | |
| or occurred_time > asset_last_updated[key] | |
| ): | |
| asset_last_updated[key] = occurred_time | |
| # Find assets to delete | |
| assets_to_delete = [] | |
| for asset in all_assets: | |
| asset_key = asset["key"] | |
| last_updated = asset_last_updated.get(asset_key) | |
| if last_updated: | |
| if last_updated < cutoff_time: | |
| assets_to_delete.append( | |
| {"key": asset_key, "last_updated": last_updated} | |
| ) | |
| else: | |
| # No materialization events found - consider it old | |
| assets_to_delete.append({"key": asset_key, "last_updated": None}) | |
| print(f"\nFound {len(assets_to_delete)} assets to delete") | |
| if not assets_to_delete: | |
| print("No assets to delete") | |
| return | |
| # Sort by last_updated for display | |
| assets_to_delete.sort( | |
| key=lambda x: x["last_updated"] or datetime.min.replace(tzinfo=timezone.utc) | |
| ) | |
| # Display assets to be deleted | |
| print("\nAssets to delete:") | |
| for asset in assets_to_delete[:10]: # Show first 10 | |
| if asset["last_updated"]: | |
| print( | |
| f" - {asset['key']} (last updated: {asset['last_updated'].isoformat()})" | |
| ) | |
| else: | |
| print(f" - {asset['key']} (no materialization events found)") | |
| if len(assets_to_delete) > 10: | |
| print(f" ... and {len(assets_to_delete) - 10} more") | |
| if dry_run: | |
| print("\nDRY RUN: No assets were deleted") | |
| return | |
| # Confirm deletion | |
| if not force: | |
| print(f"\n⚠️ WARNING: This will delete {len(assets_to_delete)} assets!") | |
| user_input = input("Type 'DELETE' to confirm: ") | |
| if user_input != "DELETE": | |
| print("Operation cancelled") | |
| return | |
| else: | |
| print( | |
| f"\n⚠️ WARNING: Force flag set - deleting {len(assets_to_delete)} assets without confirmation!" | |
| ) | |
| # Delete assets | |
| deleted_count = 0 | |
| failed_count = 0 | |
| print("\nDeleting assets...") | |
| for asset in assets_to_delete: | |
| try: | |
| await delete_asset(asset["key"]) | |
| deleted_count += 1 | |
| if deleted_count % 10 == 0: | |
| print(f" Deleted {deleted_count}/{len(assets_to_delete)} assets...") | |
| except Exception as e: | |
| print(f" ✗ Failed to delete {asset['key']}: {e}") | |
| failed_count += 1 | |
| print(f"\n✓ Successfully deleted {deleted_count} assets") | |
| if failed_count > 0: | |
| print(f"✗ Failed to delete {failed_count} assets") | |
| async def list_assets_main(limit: int = 100): | |
| """List assets in the workspace.""" | |
| print("Listing assets...") | |
| assets = await list_assets(limit=limit) | |
| print(f"Found {len(assets)} assets:") | |
| for asset in assets: | |
| print(f" - {asset['key']}") | |
| async def test_connection(): | |
| """Test connection and show basic info.""" | |
| async with get_client() as client: | |
| print(f"Connected to: {client.api_url}") | |
| # List a few assets to confirm access | |
| assets = await list_assets(limit=5) | |
| print(f"\nFound {len(assets)} assets (showing up to 5)") | |
| for asset in assets: | |
| print(f" - {asset['key']}") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Manage Prefect Cloud assets", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # Delete assets older than 30 days (dry run) | |
| %(prog)s delete-old --days 30 --dry-run | |
| # Delete assets older than 7 days | |
| %(prog)s delete-old --days 7 | |
| # List assets | |
| %(prog)s list --limit 50 | |
| # Test connection | |
| %(prog)s test | |
| """, | |
| ) | |
| subparsers = parser.add_subparsers(dest="command", help="Command to run") | |
| # Delete old assets command | |
| delete_parser = subparsers.add_parser( | |
| "delete-old", help="Delete assets older than specified days" | |
| ) | |
| delete_parser.add_argument( | |
| "--days", | |
| type=int, | |
| required=True, | |
| help="Delete assets older than this many days", | |
| ) | |
| delete_parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show what would be deleted without actually deleting", | |
| ) | |
| delete_parser.add_argument( | |
| "--force", | |
| action="store_true", | |
| help="Skip confirmation prompt (use with caution!)", | |
| ) | |
| # List assets command | |
| list_parser = subparsers.add_parser("list", help="List assets in the workspace") | |
| list_parser.add_argument( | |
| "--limit", | |
| type=int, | |
| default=100, | |
| help="Maximum number of assets to list (default: 100)", | |
| ) | |
| # Test connection command | |
| subparsers.add_parser("test", help="Test connection to Prefect Cloud") | |
| args = parser.parse_args() | |
| if not args.command: | |
| parser.print_help() | |
| return | |
| # Run the appropriate command | |
| if args.command == "delete-old": | |
| asyncio.run(delete_old_assets(args.days, args.dry_run, args.force)) | |
| elif args.command == "list": | |
| asyncio.run(list_assets_main(args.limit)) | |
| elif args.command == "test": | |
| asyncio.run(test_connection()) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment