Skip to content

Instantly share code, notes, and snippets.

@zone559
Last active August 18, 2025 22:52
Show Gist options
  • Select an option

  • Save zone559/c57e2628aef2e075d5b8e8378cef7bcd to your computer and use it in GitHub Desktop.

Select an option

Save zone559/c57e2628aef2e075d5b8e8378cef7bcd to your computer and use it in GitHub Desktop.
tungsten.run downloader
import requests
import re
import os
import sys
from pathlib import Path
from urllib.parse import urlparse
from datetime import datetime
def get_model_name_from_url(session, url):
try:
response = session.get(url, timeout=10)
response.raise_for_status()
match = re.search(r'"name":"([^"]+)"', response.text)
return match.group(1) if match else "unknown_model"
except Exception as e:
print(f"Error getting model name: {e}")
return "unknown_model"
def get_username_from_url(session, url):
try:
response = session.get(url, timeout=10)
response.raise_for_status()
match = re.search(r'"username":"([^"]+)"', response.text)
return match.group(1) if match else "unknown_user"
except Exception as e:
print(f"Error getting username: {e}")
return "unknown_user"
def get_uuid_from_model_url(session, url):
try:
response = session.get(url, timeout=10)
response.raise_for_status()
match = re.search(r'"modelVersionUUID":"([^"]+)"', response.text) or re.search(r'"uuid":"([^"]+)"', response.text)
return match.group(1) if match else None
except Exception as e:
print(f"Error getting UUID: {e}")
return None
def get_image_url_from_post(session, url):
try:
response = session.get(url, timeout=10)
response.raise_for_status()
match = re.search(r'"original_url":"([^"]+)"', response.text)
return match.group(1) if match else None
except Exception as e:
print(f"Error getting image URL: {e}")
return None
def extract_uuid_from_user_url(session, url):
try:
response = session.get(url, timeout=10)
response.raise_for_status()
match = re.search(r'"user":\{"uuid":"([a-zA-Z0-9]+)"', response.text)
return match.group(1) if match else None
except Exception as e:
print(f"Error getting user UUID: {e}")
return None
def get_extension_from_content_type(content_type):
return {
'image/webp': '.webp',
'image/jpeg': '.jpg',
'image/png': '.png',
'image/gif': '.gif',
}.get(content_type.lower(), '.bin')
def download_image(session, url, save_dir, filename):
try:
# Ensure directory exists
Path(save_dir).mkdir(parents=True, exist_ok=True)
# Check for existing files with all possible extensions
for ext in ['.webp', '.jpg', '.png', '.gif', '.bin']:
if os.path.exists(os.path.join(save_dir, f"{filename}{ext}")):
print(f"Skipping {filename} - file already exists")
return True, "File exists"
with session.get(url, stream=True, timeout=15) as response:
response.raise_for_status()
ext = get_extension_from_content_type(response.headers.get('Content-Type', ''))
filepath = os.path.join(save_dir, f"{filename}{ext}")
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return True, filepath
except Exception as e:
return False, str(e)
def fetch_all_model_posts(session, uuid):
all_posts = []
page = 1
while True:
try:
print(f"Fetching page {page}...", end=' ', flush=True)
response = session.get(
f"https://api.tungsten.run/v1/posts?sort=top_all_time&page={page}&per_page=20"
f"&tweakable_only=false&following=false&model_version_uuid={uuid}",
timeout=15
)
response.raise_for_status()
posts = response.json()
if not posts:
print("No more posts found.")
break
all_posts.extend(posts)
print(f"Found {len(posts)} posts")
page += 1
except requests.exceptions.RequestException as e:
print(f"\nError fetching page {page}: {e}")
break
except Exception as e:
print(f"\nUnexpected error: {e}")
break
return all_posts
def fetch_all_user_posts(session, uuid):
all_posts = []
page = 1
has_more = True
while has_more:
try:
api_url = f"https://api.tungsten.run/v1/users/{uuid}/posts?page={page}&per_page=40&sort=newest"
print(f"Fetching page {page}...", end=' ', flush=True)
response = session.get(api_url, timeout=15)
response.raise_for_status()
posts = response.json()
if not posts or len(posts) == 0:
has_more = False
print("No more posts found.")
else:
all_posts.extend(posts)
print(f"Found {len(posts)} posts")
page += 1
except requests.exceptions.RequestException as e:
print(f"\nError fetching page {page}: {e}")
has_more = False
except Exception as e:
print(f"\nUnexpected error: {e}")
has_more = False
return all_posts
def format_date(iso_date):
if not iso_date:
return "N/A"
try:
dt = datetime.fromisoformat(iso_date.replace('Z', '+00:00'))
return dt.strftime("%Y-%m-%d %H:%M:%S")
except:
return iso_date
def display_posts_info(posts_data):
if not posts_data or not isinstance(posts_data, list):
print("No posts found or invalid data format.")
return
print(f"\nFound {len(posts_data)} total posts:")
print("=" * 80)
for idx, post in enumerate(posts_data, 1):
print(f"Post #{idx}")
print(f"Title: {post.get('title', 'No title')}")
print(f"UUID: {post.get('uuid', 'N/A')}")
print(f"Created: {format_date(post.get('created_at'))}")
print(f"Likes: {post.get('like_count', 0)} | Views: {post.get('view_count', 0)} | Comments: {post.get('comment_count', 0)}")
print(f"Original: {post.get('original_url', 'N/A')}")
print(f"Resized: {post.get('resized_url', 'N/A')}")
print(f"NSFW: {'Yes' if post.get('nsfw') else 'No'}")
print("-" * 80)
def download_posts(session, posts, save_dir):
if not posts:
print("No posts to download.")
return
print(f"\nFound {len(posts)} total posts. Starting image downloads...")
success_count = 0
for post in posts:
post_uuid = post.get('uuid')
original_url = post.get('original_url')
if not original_url:
print(f"Skipping post {post_uuid} - no original URL")
continue
print(f"Downloading {post_uuid}...", end=' ', flush=True)
success, result = download_image(session, original_url, save_dir, post_uuid)
if success:
if result != "File exists":
print(f"Saved as {os.path.basename(result)}")
success_count += 1
else:
print(f"Failed: {result}")
print(f"\nDownload complete. Successfully downloaded {success_count}/{len(posts)} images.")
def handle_model_url(session, url):
if not (uuid := get_uuid_from_model_url(session, url)):
print("Failed to get UUID. Exiting.")
return
model_name = get_model_name_from_url(session, url)
safe_model_name = "".join(c for c in model_name if c.isalnum() or c in (' ', '-', '_')).rstrip()
download_dir = f"tungsten-model-{safe_model_name}"
Path(download_dir).mkdir(exist_ok=True)
print(f"Using UUID: {uuid}")
all_posts = fetch_all_model_posts(session, uuid)
if not all_posts:
print("No posts were fetched.")
return
display_posts_info(all_posts)
download_posts(session, all_posts, download_dir)
def handle_user_url(session, url):
if not (uuid := extract_uuid_from_user_url(session, url)):
print("Failed to get user UUID. Exiting.")
return
username = get_username_from_url(session, url)
safe_username = "".join(c for c in username if c.isalnum() or c in (' ', '-', '_')).rstrip()
download_dir = f"tungsten-user-{safe_username}"
Path(download_dir).mkdir(exist_ok=True)
print(f"Using user UUID: {uuid}")
all_posts = fetch_all_user_posts(session, uuid)
if not all_posts:
print("No posts were fetched.")
return
display_posts_info(all_posts)
download_posts(session, all_posts, download_dir)
def handle_post_url(session, url):
# Create default download directory for posts
download_dir = "tungsten-posts"
Path(download_dir).mkdir(exist_ok=True)
# Extract post ID from URL
post_id = url.split('/')[-1]
if not post_id:
print("Invalid post URL format")
return
print(f"Processing post {post_id}...")
if not (image_url := get_image_url_from_post(session, url)):
print("Failed to get image URL from post")
return
print(f"Found image URL: {image_url}")
print("Downloading...", end=' ', flush=True)
success, result = download_image(session, image_url, download_dir, post_id)
if success:
if result != "File exists":
print(f"Saved as {os.path.basename(result)}")
print("Download completed successfully!")
else:
print(f"Failed: {result}")
def main():
if len(sys.argv) != 2:
print("Usage: python script.py <tungsten.run_url>")
return
user_url = sys.argv[1].strip()
if not user_url.startswith("https://tungsten.run/"):
print("Invalid URL - must be from tungsten.run domain")
return
# Create session for connection pooling
with requests.Session() as session:
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
})
if "/model/" in user_url:
handle_model_url(session, user_url)
elif "/user/" in user_url:
handle_user_url(session, user_url)
elif "/post/" in user_url:
handle_post_url(session, user_url)
else:
print("Unsupported URL type - must be a model, user, or post URL")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment