Last active
August 18, 2025 22:52
-
-
Save zone559/c57e2628aef2e075d5b8e8378cef7bcd to your computer and use it in GitHub Desktop.
tungsten.run downloader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import requests | |
| import re | |
| import os | |
| import sys | |
| from pathlib import Path | |
| from urllib.parse import urlparse | |
| from datetime import datetime | |
| def get_model_name_from_url(session, url): | |
| try: | |
| response = session.get(url, timeout=10) | |
| response.raise_for_status() | |
| match = re.search(r'"name":"([^"]+)"', response.text) | |
| return match.group(1) if match else "unknown_model" | |
| except Exception as e: | |
| print(f"Error getting model name: {e}") | |
| return "unknown_model" | |
| def get_username_from_url(session, url): | |
| try: | |
| response = session.get(url, timeout=10) | |
| response.raise_for_status() | |
| match = re.search(r'"username":"([^"]+)"', response.text) | |
| return match.group(1) if match else "unknown_user" | |
| except Exception as e: | |
| print(f"Error getting username: {e}") | |
| return "unknown_user" | |
| def get_uuid_from_model_url(session, url): | |
| try: | |
| response = session.get(url, timeout=10) | |
| response.raise_for_status() | |
| match = re.search(r'"modelVersionUUID":"([^"]+)"', response.text) or re.search(r'"uuid":"([^"]+)"', response.text) | |
| return match.group(1) if match else None | |
| except Exception as e: | |
| print(f"Error getting UUID: {e}") | |
| return None | |
| def get_image_url_from_post(session, url): | |
| try: | |
| response = session.get(url, timeout=10) | |
| response.raise_for_status() | |
| match = re.search(r'"original_url":"([^"]+)"', response.text) | |
| return match.group(1) if match else None | |
| except Exception as e: | |
| print(f"Error getting image URL: {e}") | |
| return None | |
| def extract_uuid_from_user_url(session, url): | |
| try: | |
| response = session.get(url, timeout=10) | |
| response.raise_for_status() | |
| match = re.search(r'"user":\{"uuid":"([a-zA-Z0-9]+)"', response.text) | |
| return match.group(1) if match else None | |
| except Exception as e: | |
| print(f"Error getting user UUID: {e}") | |
| return None | |
| def get_extension_from_content_type(content_type): | |
| return { | |
| 'image/webp': '.webp', | |
| 'image/jpeg': '.jpg', | |
| 'image/png': '.png', | |
| 'image/gif': '.gif', | |
| }.get(content_type.lower(), '.bin') | |
| def download_image(session, url, save_dir, filename): | |
| try: | |
| # Ensure directory exists | |
| Path(save_dir).mkdir(parents=True, exist_ok=True) | |
| # Check for existing files with all possible extensions | |
| for ext in ['.webp', '.jpg', '.png', '.gif', '.bin']: | |
| if os.path.exists(os.path.join(save_dir, f"{filename}{ext}")): | |
| print(f"Skipping {filename} - file already exists") | |
| return True, "File exists" | |
| with session.get(url, stream=True, timeout=15) as response: | |
| response.raise_for_status() | |
| ext = get_extension_from_content_type(response.headers.get('Content-Type', '')) | |
| filepath = os.path.join(save_dir, f"{filename}{ext}") | |
| with open(filepath, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return True, filepath | |
| except Exception as e: | |
| return False, str(e) | |
| def fetch_all_model_posts(session, uuid): | |
| all_posts = [] | |
| page = 1 | |
| while True: | |
| try: | |
| print(f"Fetching page {page}...", end=' ', flush=True) | |
| response = session.get( | |
| f"https://api.tungsten.run/v1/posts?sort=top_all_time&page={page}&per_page=20" | |
| f"&tweakable_only=false&following=false&model_version_uuid={uuid}", | |
| timeout=15 | |
| ) | |
| response.raise_for_status() | |
| posts = response.json() | |
| if not posts: | |
| print("No more posts found.") | |
| break | |
| all_posts.extend(posts) | |
| print(f"Found {len(posts)} posts") | |
| page += 1 | |
| except requests.exceptions.RequestException as e: | |
| print(f"\nError fetching page {page}: {e}") | |
| break | |
| except Exception as e: | |
| print(f"\nUnexpected error: {e}") | |
| break | |
| return all_posts | |
| def fetch_all_user_posts(session, uuid): | |
| all_posts = [] | |
| page = 1 | |
| has_more = True | |
| while has_more: | |
| try: | |
| api_url = f"https://api.tungsten.run/v1/users/{uuid}/posts?page={page}&per_page=40&sort=newest" | |
| print(f"Fetching page {page}...", end=' ', flush=True) | |
| response = session.get(api_url, timeout=15) | |
| response.raise_for_status() | |
| posts = response.json() | |
| if not posts or len(posts) == 0: | |
| has_more = False | |
| print("No more posts found.") | |
| else: | |
| all_posts.extend(posts) | |
| print(f"Found {len(posts)} posts") | |
| page += 1 | |
| except requests.exceptions.RequestException as e: | |
| print(f"\nError fetching page {page}: {e}") | |
| has_more = False | |
| except Exception as e: | |
| print(f"\nUnexpected error: {e}") | |
| has_more = False | |
| return all_posts | |
| def format_date(iso_date): | |
| if not iso_date: | |
| return "N/A" | |
| try: | |
| dt = datetime.fromisoformat(iso_date.replace('Z', '+00:00')) | |
| return dt.strftime("%Y-%m-%d %H:%M:%S") | |
| except: | |
| return iso_date | |
| def display_posts_info(posts_data): | |
| if not posts_data or not isinstance(posts_data, list): | |
| print("No posts found or invalid data format.") | |
| return | |
| print(f"\nFound {len(posts_data)} total posts:") | |
| print("=" * 80) | |
| for idx, post in enumerate(posts_data, 1): | |
| print(f"Post #{idx}") | |
| print(f"Title: {post.get('title', 'No title')}") | |
| print(f"UUID: {post.get('uuid', 'N/A')}") | |
| print(f"Created: {format_date(post.get('created_at'))}") | |
| print(f"Likes: {post.get('like_count', 0)} | Views: {post.get('view_count', 0)} | Comments: {post.get('comment_count', 0)}") | |
| print(f"Original: {post.get('original_url', 'N/A')}") | |
| print(f"Resized: {post.get('resized_url', 'N/A')}") | |
| print(f"NSFW: {'Yes' if post.get('nsfw') else 'No'}") | |
| print("-" * 80) | |
| def download_posts(session, posts, save_dir): | |
| if not posts: | |
| print("No posts to download.") | |
| return | |
| print(f"\nFound {len(posts)} total posts. Starting image downloads...") | |
| success_count = 0 | |
| for post in posts: | |
| post_uuid = post.get('uuid') | |
| original_url = post.get('original_url') | |
| if not original_url: | |
| print(f"Skipping post {post_uuid} - no original URL") | |
| continue | |
| print(f"Downloading {post_uuid}...", end=' ', flush=True) | |
| success, result = download_image(session, original_url, save_dir, post_uuid) | |
| if success: | |
| if result != "File exists": | |
| print(f"Saved as {os.path.basename(result)}") | |
| success_count += 1 | |
| else: | |
| print(f"Failed: {result}") | |
| print(f"\nDownload complete. Successfully downloaded {success_count}/{len(posts)} images.") | |
| def handle_model_url(session, url): | |
| if not (uuid := get_uuid_from_model_url(session, url)): | |
| print("Failed to get UUID. Exiting.") | |
| return | |
| model_name = get_model_name_from_url(session, url) | |
| safe_model_name = "".join(c for c in model_name if c.isalnum() or c in (' ', '-', '_')).rstrip() | |
| download_dir = f"tungsten-model-{safe_model_name}" | |
| Path(download_dir).mkdir(exist_ok=True) | |
| print(f"Using UUID: {uuid}") | |
| all_posts = fetch_all_model_posts(session, uuid) | |
| if not all_posts: | |
| print("No posts were fetched.") | |
| return | |
| display_posts_info(all_posts) | |
| download_posts(session, all_posts, download_dir) | |
| def handle_user_url(session, url): | |
| if not (uuid := extract_uuid_from_user_url(session, url)): | |
| print("Failed to get user UUID. Exiting.") | |
| return | |
| username = get_username_from_url(session, url) | |
| safe_username = "".join(c for c in username if c.isalnum() or c in (' ', '-', '_')).rstrip() | |
| download_dir = f"tungsten-user-{safe_username}" | |
| Path(download_dir).mkdir(exist_ok=True) | |
| print(f"Using user UUID: {uuid}") | |
| all_posts = fetch_all_user_posts(session, uuid) | |
| if not all_posts: | |
| print("No posts were fetched.") | |
| return | |
| display_posts_info(all_posts) | |
| download_posts(session, all_posts, download_dir) | |
| def handle_post_url(session, url): | |
| # Create default download directory for posts | |
| download_dir = "tungsten-posts" | |
| Path(download_dir).mkdir(exist_ok=True) | |
| # Extract post ID from URL | |
| post_id = url.split('/')[-1] | |
| if not post_id: | |
| print("Invalid post URL format") | |
| return | |
| print(f"Processing post {post_id}...") | |
| if not (image_url := get_image_url_from_post(session, url)): | |
| print("Failed to get image URL from post") | |
| return | |
| print(f"Found image URL: {image_url}") | |
| print("Downloading...", end=' ', flush=True) | |
| success, result = download_image(session, image_url, download_dir, post_id) | |
| if success: | |
| if result != "File exists": | |
| print(f"Saved as {os.path.basename(result)}") | |
| print("Download completed successfully!") | |
| else: | |
| print(f"Failed: {result}") | |
| def main(): | |
| if len(sys.argv) != 2: | |
| print("Usage: python script.py <tungsten.run_url>") | |
| return | |
| user_url = sys.argv[1].strip() | |
| if not user_url.startswith("https://tungsten.run/"): | |
| print("Invalid URL - must be from tungsten.run domain") | |
| return | |
| # Create session for connection pooling | |
| with requests.Session() as session: | |
| session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36' | |
| }) | |
| if "/model/" in user_url: | |
| handle_model_url(session, user_url) | |
| elif "/user/" in user_url: | |
| handle_user_url(session, user_url) | |
| elif "/post/" in user_url: | |
| handle_post_url(session, user_url) | |
| else: | |
| print("Unsupported URL type - must be a model, user, or post URL") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment