Created
December 17, 2025 02:51
-
-
Save jamesacklin/46dba9c1ab861b583ce3987d88fb330c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Export COMPLETE message history from Tlon channels with pagination. | |
| This script works on both hosted and local ships. | |
| """ | |
| import argparse | |
| import requests | |
| from datetime import datetime | |
| from typing import List, Dict, Optional | |
| import sys | |
| import os | |
| class UrbitClient: | |
| def __init__(self, ship_url: str, access_code: str): | |
| self.ship_url = ship_url.rstrip('/') | |
| self.session = requests.Session() | |
| self.authenticate(access_code) | |
| def authenticate(self, access_code: str): | |
| """Authenticate using ship access code.""" | |
| url = f"{self.ship_url}/~/login" | |
| response = self.session.post(url, data={'password': access_code}) | |
| if response.status_code not in [200, 204]: | |
| raise Exception(f"Authentication failed: {response.status_code} - {response.text}") | |
| print(f"β Authenticated to {self.ship_url}") | |
| def get_all_channels(self) -> Dict: | |
| """Get list of all channels.""" | |
| url = f"{self.ship_url}/~/scry/channels/v4/channels.json" | |
| response = self.session.get(url, timeout=60) | |
| if response.status_code != 200: | |
| raise Exception(f"Failed to get channels: {response.status_code}") | |
| return response.json() | |
| def format_ud(self, num_str: str) -> str: | |
| """Format a number as @ud (dot-separated decimal).""" | |
| parts = [] | |
| while num_str: | |
| parts.insert(0, num_str[-3:]) | |
| num_str = num_str[:-3] | |
| return '.'.join(parts) | |
| def get_channel_posts_page(self, channel_id: str, mode: str = 'newest', | |
| cursor: Optional[str] = None, count: int = 100) -> Dict: | |
| """Fetch a single page of channel posts.""" | |
| # Build path following formatScryPath logic - null values are filtered out | |
| # Working format: /v4/{channelId}/posts/{mode}/{cursor?}/{count}/post | |
| # NOTE: cursor must be formatted as @ud (with dots) for older/newer modes | |
| path_parts = ['v4', channel_id, 'posts', mode] | |
| if cursor: | |
| # Format cursor as @ud if it's not already | |
| if '.' not in cursor: | |
| cursor = self.format_ud(cursor) | |
| path_parts.append(cursor) | |
| path_parts.extend([str(count), 'post']) | |
| path = '/' + '/'.join(path_parts) | |
| url = f"{self.ship_url}/~/scry/channels{path}.json" | |
| response = self.session.get(url, timeout=60) | |
| if response.status_code != 200: | |
| raise Exception(f"Failed to fetch posts: {response.status_code} - {response.text[:200]}") | |
| return response.json() | |
| def get_all_channel_posts(self, channel_id: str) -> List[Dict]: | |
| """Fetch ALL posts from a channel by paginating.""" | |
| all_posts = [] | |
| cursor = None | |
| page = 0 | |
| while True: | |
| page += 1 | |
| mode = 'older' if cursor else 'newest' | |
| print(f" Page {page}...", end='', flush=True) | |
| result = self.get_channel_posts_page(channel_id, mode=mode, cursor=cursor, count=100) | |
| posts = result.get('posts', {}) | |
| if not posts: | |
| print(" (no posts)") | |
| break | |
| post_list = list(posts.values()) | |
| all_posts.extend(post_list) | |
| print(f" {len(post_list)} messages") | |
| # Check if there are more pages | |
| cursor = result.get('older') | |
| if not cursor: | |
| print(f" β Reached end (total: {len(all_posts)} messages)") | |
| break | |
| return all_posts | |
| def parse_inline_element(element): | |
| """Parse a single inline element.""" | |
| if isinstance(element, str): | |
| return element | |
| elif isinstance(element, dict): | |
| if 'ship' in element: | |
| return f"@{element['ship']}" | |
| elif 'link' in element: | |
| link = element['link'] | |
| if isinstance(link, dict): | |
| return link.get('href', '[link]') | |
| return '[link]' | |
| elif 'break' in element: | |
| return '\n' | |
| elif 'bold' in element: | |
| return parse_inline_element(element['bold']) | |
| elif 'italics' in element: | |
| return parse_inline_element(element['italics']) | |
| elif 'inline-code' in element: | |
| return f"`{element['inline-code']}`" | |
| elif 'blockquote' in element: | |
| return f"> {element['blockquote']}" | |
| else: | |
| return str(element) | |
| return str(element) | |
| def parse_story_content(story: List) -> str: | |
| """Parse Story content into plain text.""" | |
| if not story: | |
| return "" | |
| text_parts = [] | |
| for verse in story: | |
| if isinstance(verse, dict): | |
| if 'inline' in verse: | |
| inline = verse['inline'] | |
| if isinstance(inline, list): | |
| for elem in inline: | |
| text_parts.append(parse_inline_element(elem)) | |
| else: | |
| text_parts.append(parse_inline_element(inline)) | |
| elif 'block' in verse: | |
| block = verse['block'] | |
| if isinstance(block, dict): | |
| if 'image' in block: | |
| img = block['image'] | |
| src = img.get('src', '') if isinstance(img, dict) else '' | |
| text_parts.append(f"[image: {src}]") | |
| elif 'cite' in block: | |
| text_parts.append("[quote]") | |
| elif 'listing' in block: | |
| listing = block['listing'] | |
| if isinstance(listing, dict) and 'code' in listing: | |
| text_parts.append(f"[code: {listing['code']}]") | |
| else: | |
| text_parts.append("[code]") | |
| elif 'header' in block: | |
| header = block['header'] | |
| if isinstance(header, dict): | |
| content = parse_story_content(header.get('content', [])) | |
| text_parts.append(f"## {content}") | |
| else: | |
| block_type = list(block.keys())[0] if block else 'block' | |
| text_parts.append(f"[{block_type}]") | |
| elif isinstance(verse, str): | |
| text_parts.append(verse) | |
| result = ''.join(text_parts) | |
| result = ' '.join(result.split()) | |
| return result.strip() | |
| def format_timestamp(urbit_time: int) -> str: | |
| """Convert timestamp (milliseconds) to readable format.""" | |
| try: | |
| dt = datetime.fromtimestamp(urbit_time / 1000) | |
| return dt.strftime('%Y-%m-%d %H:%M:%S') | |
| except: | |
| return f"[invalid timestamp: {urbit_time}]" | |
| def export_channel_to_file(client: UrbitClient, channel_id: str, output_file: str) -> int: | |
| """Export a single channel to a text file.""" | |
| print(f"\nπ Exporting: {channel_id}") | |
| posts = client.get_all_channel_posts(channel_id) | |
| if not posts: | |
| print(f" β οΈ No messages found") | |
| return 0 | |
| # Sort by timestamp (oldest first) | |
| posts.sort(key=lambda p: p.get('essay', {}).get('sent', 0)) | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| f.write(f"Channel: {channel_id}\n") | |
| f.write(f"Exported: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") | |
| f.write(f"Messages: {len(posts)}\n") | |
| f.write("=" * 80 + "\n\n") | |
| for post in posts: | |
| essay = post.get('essay', {}) | |
| author = essay.get('author', 'unknown') | |
| sent_time = essay.get('sent', 0) | |
| timestamp = format_timestamp(sent_time) | |
| content = essay.get('content', []) | |
| message_text = parse_story_content(content) | |
| f.write(f"{author} ({timestamp}): {message_text}\n") | |
| # Include replies | |
| seal = post.get('seal', {}) | |
| replies = seal.get('replies', []) | |
| if replies: | |
| for reply_tuple in replies: | |
| if isinstance(reply_tuple, dict) and 'memo' in reply_tuple: | |
| memo = reply_tuple['memo'] | |
| reply_author = memo.get('author', 'unknown') | |
| reply_sent = memo.get('sent', 0) | |
| reply_timestamp = format_timestamp(reply_sent) | |
| reply_content = parse_story_content(memo.get('content', [])) | |
| f.write(f" ββ {reply_author} ({reply_timestamp}): {reply_content}\n") | |
| f.write("\n") | |
| return len(posts) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description='Export complete message history from Tlon channels with pagination' | |
| ) | |
| parser.add_argument('--ship-url', required=True, | |
| help='Ship URL (e.g., http://localhost:8080 or https://ship.tlon.network)') | |
| parser.add_argument('--code', required=True, | |
| help='Access code') | |
| parser.add_argument('--output-dir', default='full_export', | |
| help='Output directory (default: full_export/)') | |
| parser.add_argument('--channel-id', default=None, | |
| help='Export single channel (e.g., chat/~zod/general)') | |
| args = parser.parse_args() | |
| try: | |
| client = UrbitClient(args.ship_url, args.code) | |
| os.makedirs(args.output_dir, exist_ok=True) | |
| if args.channel_id: | |
| # Export single channel | |
| safe_name = args.channel_id.replace('/', '_').replace('~', '').replace(' ', '_') | |
| output_file = os.path.join(args.output_dir, f"{safe_name}.txt") | |
| message_count = export_channel_to_file(client, args.channel_id, output_file) | |
| print(f"\nβ Export complete: {message_count} messages") | |
| else: | |
| # Export all channels | |
| print("\nπ Fetching channel list...") | |
| channels = client.get_all_channels() | |
| print(f"β Found {len(channels)} channels") | |
| total_messages = 0 | |
| exported_count = 0 | |
| for channel_id in channels.keys(): | |
| safe_name = channel_id.replace('/', '_').replace('~', '').replace(' ', '_') | |
| output_file = os.path.join(args.output_dir, f"channel_{safe_name}.txt") | |
| try: | |
| message_count = export_channel_to_file(client, channel_id, output_file) | |
| if message_count > 0: | |
| total_messages += message_count | |
| exported_count += 1 | |
| except Exception as e: | |
| print(f" β Failed: {e}") | |
| # Create summary | |
| summary_file = os.path.join(args.output_dir, "_SUMMARY.txt") | |
| with open(summary_file, 'w', encoding='utf-8') as f: | |
| f.write(f"Complete Channel Export Summary\n") | |
| f.write("=" * 80 + "\n\n") | |
| f.write(f"Ship URL: {args.ship_url}\n") | |
| f.write(f"Exported: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") | |
| f.write(f"Total Channels: {exported_count}\n") | |
| f.write(f"Total Messages: {total_messages}\n\n") | |
| f.write("Exported channels:\n") | |
| for channel_id in sorted(channels.keys()): | |
| f.write(f" - {channel_id}\n") | |
| print(f"\n" + "=" * 80) | |
| print(f"β Export complete!") | |
| print(f" Channels exported: {exported_count}") | |
| print(f" Total messages: {total_messages}") | |
| print(f" Output directory: {args.output_dir}") | |
| except KeyboardInterrupt: | |
| print("\n\nβ Export cancelled by user") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"\nβ Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment