Created
January 10, 2026 15:34
-
-
Save nperez/faccdd42344ef85b0ba00911cf9a3f8e to your computer and use it in GitHub Desktop.
Magnetico Two-Stage Pornographic Torrent Filter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Magnetico Two-Stage Pornographic Torrent Filter | |
| This script implements a two-stage classification system to identify and delete | |
| pornographic torrents from a magnetico PostgreSQL database using local Ollama LLMs. | |
| Installation: | |
| pip install psycopg2-binary requests | |
| Usage: | |
| export DB_PASSWORD=your_password | |
| python3 filter_torrents.py [options] | |
| Dependencies: | |
| - psycopg2-binary: PostgreSQL database adapter | |
| - requests: HTTP library for Ollama API calls | |
| """ | |
| import sys | |
| import os | |
| import argparse | |
| import copy | |
| import time | |
| import json | |
| from typing import List, Tuple, Optional | |
| import psycopg2 | |
| from psycopg2 import OperationalError | |
| import requests | |
| # ============================================================================= | |
| # PROMPT TEMPLATES - Edit these to adjust classification behavior | |
| # ============================================================================= | |
| CLASSIFICATION_PROMPT = """Is each torrent title pornographic? (A title is pornographic only when it mentions XXX, well known porn stars, sexual acts, or sexually explicit terms) | |
| Below is a list of torrent titles to classify. REJECT if pornographic, ACCEPT otherwise. No commentary. ONLY RETURN CLASSIFICATIONS. | |
| Example: | |
| Titles: | |
| 1:Ferngully_1080.mkv | |
| 2:Ass-Blasters-XXX-720p.avi | |
| Classification: | |
| 1:ACCEPT | |
| 2:REJECT | |
| Titles: | |
| {title_list} | |
| Classification: | |
| """ | |
| VERIFICATION_PROMPT = """Is each torrent title pornographic? (A title is pornographic only when it mentions XXX, well known porn stars, sexual acts, or sexually explicit terms) | |
| Below is a list of torrent titles to review. REJECT if pornographic, ACCEPT otherwise. No commentary. | |
| Example: | |
| Titles: | |
| 1:Ferngully_1080.mkv | |
| 2:Ass-Blasters-XXX-720p.avi | |
| Classification: | |
| 1:ACCEPT | |
| 2:REJECT | |
| Titles: | |
| {title_list} | |
| Classification: | |
| """ | |
| # ============================================================================= | |
| # CONFIGURATION CONSTANTS | |
| # ============================================================================= | |
| # Database defaults | |
| DEFAULT_DB_HOST = "localhost" | |
| DEFAULT_DB_PORT = 5432 | |
| DEFAULT_DB_NAME = "magnetico" | |
| DEFAULT_DB_USER = "magnetico" | |
| DEFAULT_DB_PASSWORD = "magnetico" | |
| # Ollama defaults | |
| DEFAULT_OLLAMA_URL = "http://localhost:11434" | |
| DEFAULT_CLASSIFICATION_MODEL = "gemma3:4b-it-qat" | |
| DEFAULT_VERIFICATION_MODEL = "qwen3:30b-a3b-instruct-2507-q4_K_M" | |
| DEFAULT_CTX_WINDOW = 1024 | |
| # Processing defaults | |
| DEFAULT_BATCH_SIZE = 100 | |
| DEFAULT_TITLES_PER_CALL = 5 | |
| DEFAULT_RATE_LIMIT = 0.0 | |
| # Resume state file | |
| STATE_FILE = ".magnetico_filter_state" | |
| # ============================================================================= | |
| # ERROR CODES | |
| # ============================================================================= | |
| ERR_DB_CONNECTION = 1 | |
| ERR_OLLAMA_CONNECTION = 1 | |
| ERR_INTERRUPTED = 130 | |
| # ============================================================================= | |
| # STATE MANAGEMENT | |
| # ============================================================================= | |
| def read_offset_from_file() -> Optional[int]: | |
| """Read the last processed ID from state file.""" | |
| if os.path.exists(STATE_FILE): | |
| try: | |
| with open(STATE_FILE, 'r') as f: | |
| offset = f.read().strip() | |
| if offset: | |
| return int(offset) | |
| except (ValueError, IOError) as e: | |
| print(f"Warning: Could not read state file: {e}", file=sys.stderr) | |
| return None | |
| def write_offset_to_file(offset: int) -> None: | |
| """Write the last processed ID to state file.""" | |
| try: | |
| with open(STATE_FILE, 'w') as f: | |
| f.write(str(offset)) | |
| print(f"Saved resume state: last processed ID = {offset}", file=sys.stderr) | |
| except IOError as e: | |
| print(f"Warning: Could not write state file: {e}", file=sys.stderr) | |
| def cleanup_and_save_state(offset: int, cursor=None, conn=None) -> None: | |
| """Clean up database connections and save state.""" | |
| # Close database connections | |
| if cursor: | |
| cursor.close() | |
| print("Database cursor closed", file=sys.stderr) | |
| if conn: | |
| conn.close() | |
| print("Database connection closed", file=sys.stderr) | |
| # Save state | |
| if offset >= 0: | |
| write_offset_to_file(offset) | |
| # ============================================================================= | |
| # HELPER FUNCTIONS | |
| # ============================================================================= | |
| def parse_arguments() -> argparse.Namespace: | |
| """Parse command line arguments with environment variable fallbacks.""" | |
| parser = argparse.ArgumentParser( | |
| description="Two-stage pornographic torrent filter for magnetico database", | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter | |
| ) | |
| # Database connection arguments | |
| db_group = parser.add_argument_group("Database Connection") | |
| db_group.add_argument( | |
| "--db-host", | |
| default=os.getenv("DB_HOST", DEFAULT_DB_HOST), | |
| help="PostgreSQL host (or env DB_HOST)" | |
| ) | |
| db_group.add_argument( | |
| "--db-port", | |
| type=int, | |
| default=int(os.getenv("DB_PORT", DEFAULT_DB_PORT)), | |
| help="PostgreSQL port (or env DB_PORT)" | |
| ) | |
| db_group.add_argument( | |
| "--db-name", | |
| default=os.getenv("DB_NAME", DEFAULT_DB_NAME), | |
| help="PostgreSQL database name (or env DB_NAME)" | |
| ) | |
| db_group.add_argument( | |
| "--db-user", | |
| default=os.getenv("DB_USER", DEFAULT_DB_USER), | |
| help="PostgreSQL user (or env DB_USER)" | |
| ) | |
| db_group.add_argument( | |
| "--db-password", | |
| default=os.getenv("DB_PASSWORD", DEFAULT_DB_PASSWORD), | |
| help="PostgreSQL password (required, or env DB_PASSWORD)" | |
| ) | |
| # Ollama configuration | |
| ollama_group = parser.add_argument_group("Ollama Configuration") | |
| ollama_group.add_argument( | |
| "--ollama-url", | |
| default=os.getenv("OLLAMA_URL", DEFAULT_OLLAMA_URL), | |
| help="Ollama API base URL" | |
| ) | |
| ollama_group.add_argument( | |
| "--classification-model", | |
| default=os.getenv("CLASSIFICATION_MODEL", DEFAULT_CLASSIFICATION_MODEL), | |
| help="Model for initial classification" | |
| ) | |
| ollama_group.add_argument( | |
| "--verification-model", | |
| default=os.getenv("VERIFICATION_MODEL", DEFAULT_VERIFICATION_MODEL), | |
| help="Model for verification stage" | |
| ) | |
| ollama_group.add_argument( | |
| "--classification-ctx", | |
| type=int, | |
| default=int(os.getenv("CLASSIFICATION_CTX", DEFAULT_CTX_WINDOW)), | |
| help="Context window size for classification model" | |
| ) | |
| ollama_group.add_argument( | |
| "--verification-ctx", | |
| type=int, | |
| default=int(os.getenv("VERIFICATION_CTX", DEFAULT_CTX_WINDOW)), | |
| help="Context window size for verification model" | |
| ) | |
| # Processing configuration | |
| proc_group = parser.add_argument_group("Processing Configuration") | |
| proc_group.add_argument( | |
| "--batch-size", | |
| type=int, | |
| default=DEFAULT_BATCH_SIZE, | |
| help="Number of torrents to fetch per database batch" | |
| ) | |
| proc_group.add_argument( | |
| "--titles-per-call", | |
| type=int, | |
| default=DEFAULT_TITLES_PER_CALL, | |
| help="Number of titles per API call" | |
| ) | |
| proc_group.add_argument( | |
| "--rate-limit", | |
| type=float, | |
| default=DEFAULT_RATE_LIMIT, | |
| help="Seconds to wait between API calls" | |
| ) | |
| proc_group.add_argument( | |
| "--max-batches", | |
| type=int, | |
| help="Optional limit on number of batches to process" | |
| ) | |
| proc_group.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Perform classification but skip deletion" | |
| ) | |
| proc_group.add_argument( | |
| "--offset", | |
| type=int, | |
| help="Starting offset (ID) for resume. If not provided, reads from state file or starts from 0" | |
| ) | |
| args = parser.parse_args() | |
| # Validate required arguments | |
| if not args.db_password: | |
| parser.error("--db-password is required (or set DB_PASSWORD environment variable)") | |
| # Determine offset | |
| if args.offset is None: | |
| # Try to read from state file | |
| file_offset = read_offset_from_file() | |
| if file_offset is not None: | |
| args.offset = file_offset | |
| print(f"Resuming from saved state: ID {args.offset}", file=sys.stderr) | |
| else: | |
| args.offset = 0 | |
| print("Starting from beginning (no state file found)", file=sys.stderr) | |
| else: | |
| print(f"Using manual offset: ID {args.offset}", file=sys.stderr) | |
| return args | |
| def connect_db(args: argparse.Namespace): | |
| """Establish PostgreSQL connection with error handling.""" | |
| try: | |
| conn = psycopg2.connect( | |
| host=args.db_host, | |
| port=args.db_port, | |
| dbname=args.db_name, | |
| user=args.db_user, | |
| password=args.db_password | |
| ) | |
| return conn | |
| except OperationalError as e: | |
| print(f"Database connection failed: {e}", file=sys.stderr) | |
| sys.exit(ERR_DB_CONNECTION) | |
| def test_ollama_models(args: argparse.Namespace) -> None: | |
| """Test Ollama API connectivity and model availability.""" | |
| try: | |
| response = requests.get( | |
| f"{args.ollama_url}/api/tags", | |
| timeout=10 | |
| ) | |
| response.raise_for_status() | |
| models_data = response.json() | |
| available_models = [m["name"] for m in models_data.get("models", [])] | |
| required_models = [ | |
| args.classification_model, | |
| args.verification_model | |
| ] | |
| for model in required_models: | |
| if model not in available_models: | |
| print( | |
| f"Required model '{model}' not found in Ollama. " | |
| f"Run: ollama pull {model}", | |
| file=sys.stderr | |
| ) | |
| sys.exit(ERR_OLLAMA_CONNECTION) | |
| print("Ollama API and required models verified", file=sys.stderr) | |
| except requests.RequestException as e: | |
| print(f"Ollama API connection failed: {e}", file=sys.stderr) | |
| sys.exit(ERR_OLLAMA_CONNECTION) | |
| def call_ollama_api( | |
| args: argparse.Namespace, | |
| prompt: str, | |
| model: str, | |
| ctx_window: int | |
| ) -> Optional[str]: | |
| """Call Ollama generate API with retry logic for invalid responses.""" | |
| url = f"{args.ollama_url}/api/generate" | |
| payload = { | |
| "model": model, | |
| "prompt": prompt, | |
| "stream": False, | |
| "options": { | |
| "num_predict": ctx_window | |
| } | |
| } | |
| try: | |
| response = requests.post( | |
| url, | |
| json=payload, | |
| timeout=60 | |
| ) | |
| response.raise_for_status() | |
| response_data = response.json() | |
| response_text = response_data.get("response", "") | |
| print("+++ START PROMPT +++", file=sys.stderr) | |
| print(prompt, file=sys.stderr) | |
| print("+++ END PROMPT +++", file=sys.stderr) | |
| print("") | |
| print("--- RAW RESPONSE ---", file=sys.stderr) | |
| print(response_text, file=sys.stderr) | |
| print("--- END RESPONSE ---", file=sys.stderr) | |
| print("=" * 60, file=sys.stderr) | |
| return response_text | |
| except requests.RequestException as e: | |
| print(f"API call failed: {e}", file=sys.stderr) | |
| return None | |
| def parse_api_response( | |
| response_text: str, | |
| expected_count: int | |
| ) -> Optional[List[str]]: | |
| """ | |
| Parse and validate API response. | |
| Returns: | |
| List of classifications or None on validation failure | |
| """ | |
| lines = response_text.strip().split('\n') | |
| # Check line count matches (+1 because of "Classification: " | |
| if len(lines) != expected_count: | |
| print( | |
| f"Response line count mismatch: expected {expected_count}, got {len(lines)}", | |
| file=sys.stderr | |
| ) | |
| return None | |
| results = [] | |
| for line in lines: | |
| classification = line.split(":")[1].strip().upper() | |
| # Check classification is valid | |
| if classification not in ["ACCEPT", "REJECT"]: | |
| print(f"Invalid classification value: '{classification}'", file=sys.stderr) | |
| return None | |
| results.append(classification) | |
| return results | |
| def process_batch( | |
| args: argparse.Namespace, | |
| torrents: List[Tuple[int, str]], | |
| prompt_template: str, | |
| model: str, | |
| ctx_window: int | |
| ) -> List[Tuple[int, str, str]]: | |
| """ | |
| Process a batch of torrents through the LLM. | |
| Returns: | |
| List of (id, title, classification) tuples | |
| """ | |
| results = [] | |
| # Split into sub-batches for API calls | |
| for i in range(0, len(torrents), args.titles_per_call): | |
| sub_batch = torrents[i:i + args.titles_per_call] | |
| title_list = '\n'.join([f"{t+1}:{sub_batch[t][1]}" for t in range(0, len(sub_batch))]) | |
| prompt = prompt_template.format(title_list=title_list) | |
| print( | |
| f" Processing API sub-batch of {len(sub_batch)} titles...", | |
| file=sys.stderr | |
| ) | |
| # Retry loop for invalid responses | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| response_text = call_ollama_api(args, prompt, model, ctx_window) | |
| if response_text is None: | |
| print(f" API call failed, retrying... (attempt {attempt + 1})", file=sys.stderr) | |
| time.sleep(args.rate_limit * 2) | |
| continue | |
| parsed = parse_api_response(response_text, len(sub_batch)) | |
| if parsed is None: | |
| print(f" Invalid response format, retrying... (attempt {attempt + 1})", file=sys.stderr) | |
| time.sleep(args.rate_limit * 2) | |
| continue | |
| # Map results back to torrent IDs | |
| for (torrent_id, title), (classification) in zip(sub_batch, parsed): | |
| results.append((torrent_id, title, classification)) | |
| break | |
| else: | |
| print(f" Failed after {max_retries} attempts, skipping batch", file=sys.stderr) | |
| # Add all as ACCEPT to avoid false positives | |
| for torrent_id, title in sub_batch: | |
| results.append((torrent_id, title, "ACCEPT")) | |
| # Rate limiting | |
| if i + args.titles_per_call < len(torrents): | |
| time.sleep(args.rate_limit) | |
| return results | |
| def main(): | |
| """Main execution function.""" | |
| args = parse_arguments() | |
| # Verify Ollama models before starting | |
| print("Verifying Ollama models...", file=sys.stderr) | |
| test_ollama_models(args) | |
| # Connect to database | |
| print("Connecting to database...", file=sys.stderr) | |
| conn = connect_db(args) | |
| cursor = conn.cursor() | |
| # Statistics | |
| total_processed = 0 | |
| phase1_rejects = 0 | |
| phase2_accepts = 0 | |
| confirmed_rejects = 0 | |
| interrupted = False | |
| last_processed_id = args.offset | |
| try: | |
| # Get total count for progress tracking | |
| cursor.execute("SELECT COUNT(*) FROM torrents") | |
| total_torrents = cursor.fetchone()[0] | |
| print(f"Total torrents in database: {total_torrents}", file=sys.stderr) | |
| print("\n=== BEGINNING BATCH PROCESSING ===", file=sys.stderr) | |
| print("Each batch will be classified, verified, and deleted in a single transaction", file=sys.stderr) | |
| print("Ctrl+C will only lose progress on the current batch", file=sys.stderr) | |
| print("=" * 60, file=sys.stderr) | |
| batch_offset = args.offset | |
| batch_num = 0 | |
| while True: | |
| if args.max_batches and batch_num >= args.max_batches: | |
| print(f"\nReached max batches limit ({args.max_batches})", file=sys.stderr) | |
| break | |
| # Fetch batch from database starting from offset | |
| print(f"\n--- FETCHING BATCH {batch_num + 1} (starting from ID {batch_offset}) ---", file=sys.stderr) | |
| cursor.execute( | |
| "SELECT id, name FROM torrents WHERE id > %s ORDER BY id LIMIT %s", | |
| (batch_offset, args.batch_size) | |
| ) | |
| torrents = cursor.fetchall() | |
| if not torrents: | |
| print("\nNo more torrents to process", file=sys.stderr) | |
| break | |
| batch_num += 1 | |
| print( | |
| f"\n=== PROCESSING BATCH {batch_num} ({len(torrents)} torrents) ===", | |
| file=sys.stderr | |
| ) | |
| # Phase 1: Classification for this batch | |
| print("\nPhase 1: Classifying batch...", file=sys.stderr) | |
| results = process_batch( | |
| args, | |
| torrents, | |
| CLASSIFICATION_PROMPT, | |
| args.classification_model, | |
| args.classification_ctx | |
| ) | |
| # Collect rejects for this batch | |
| batch_rejects = [] | |
| batch_processed = 0 | |
| for torrent_id, title, classification in results: | |
| batch_processed += 1 | |
| total_processed += 1 | |
| last_processed_id = max(last_processed_id, torrent_id) | |
| if classification == "REJECT": | |
| batch_rejects.append((torrent_id, title)) | |
| phase1_rejects += 1 | |
| print(f" Batch {batch_num}: Found {len(batch_rejects)} potential rejects", file=sys.stderr) | |
| # Phase 2: Verification for this batch's rejects | |
| batch_confirmed = 0 | |
| final_rejects = [] | |
| if batch_rejects: | |
| print(f"\nPhase 2: Verifying {len(batch_rejects)} potential rejects from batch...", file=sys.stderr) | |
| deep_copy = argparse.Namespace(**{k: copy.deepcopy(v) for k, v in vars(args).items()}) | |
| deep_copy.titles_per_call *= 2 | |
| results = process_batch( | |
| deep_copy, | |
| batch_rejects, | |
| VERIFICATION_PROMPT, | |
| args.verification_model, | |
| args.verification_ctx | |
| ) | |
| # Collect confirmed rejects | |
| for torrent_id, title, classification in results: | |
| last_processed_id = max(last_processed_id, torrent_id) | |
| if classification == "REJECT": | |
| final_rejects.append(torrent_id) | |
| confirmed_rejects += 1 | |
| batch_confirmed += 1 | |
| else: | |
| phase2_accepts += 1 | |
| print(f" Batch {batch_num}: Confirmed {batch_confirmed} rejects for deletion", file=sys.stderr) | |
| # Deletion phase for this batch | |
| if not args.dry_run and final_rejects: | |
| print(f" Batch {batch_num}: Deleting {len(final_rejects)} torrents (cascade to files)...", file=sys.stderr) | |
| # Delete in chunks to avoid overly large queries | |
| chunk_size = 100 | |
| deleted_count = 0 | |
| for i in range(0, len(final_rejects), chunk_size): | |
| chunk = final_rejects[i:i + chunk_size] | |
| placeholders = ','.join(['%s'] * len(chunk)) | |
| cursor.execute( | |
| f"DELETE FROM torrents WHERE id IN ({placeholders})", | |
| chunk | |
| ) | |
| conn.commit() | |
| deleted_count += len(chunk) | |
| print(f" Deleted {deleted_count}/{len(final_rejects)} from batch...", file=sys.stderr) | |
| print(f" Batch {batch_num}: Successfully deleted all {deleted_count} torrents", file=sys.stderr) | |
| elif args.dry_run and final_rejects: | |
| print(f" Batch {batch_num}: Dry run - would delete {len(final_rejects)} torrents", file=sys.stderr) | |
| # Update offset for next batch (use the highest ID from this batch) | |
| if torrents: | |
| batch_offset = max(t[0] for t in torrents) | |
| # Log batch completion and save state | |
| print(f"\n--- BATCH {batch_num} COMPLETE ---", file=sys.stderr) | |
| print(f"Processed: {batch_processed} | Potential rejects: {len(batch_rejects)} | Confirmed: {batch_confirmed}", file=sys.stderr) | |
| write_offset_to_file(batch_offset) | |
| # Rate limiting between batches | |
| if batch_processed >= args.batch_size: | |
| print(f"Waiting {args.rate_limit} seconds before next batch...", file=sys.stderr) | |
| time.sleep(args.rate_limit) | |
| except KeyboardInterrupt: | |
| interrupted = True | |
| print("\n\n" + "=" * 60, file=sys.stderr) | |
| print("OPERATION CANCELLED BY USER (Ctrl+C)", file=sys.stderr) | |
| print("=" * 60, file=sys.stderr) | |
| except psycopg2.Error as e: | |
| print(f"\nDatabase error: {e}", file=sys.stderr) | |
| sys.exit(ERR_DB_CONNECTION) | |
| finally: | |
| # Clean up database connections and save final state | |
| cleanup_and_save_state( | |
| last_processed_id if interrupted else batch_offset, | |
| cursor if 'cursor' in locals() else None, | |
| conn if 'conn' in locals() else None | |
| ) | |
| # Always print final summary | |
| print("\n" + "=" * 60, file=sys.stderr) | |
| print("=== FINAL SUMMARY ===", file=sys.stderr) | |
| print("=" * 60, file=sys.stderr) | |
| print(f"Total torrents processed: {total_processed}", file=sys.stderr) | |
| print(f"Phase 1 potential rejects: {phase1_rejects}", file=sys.stderr) | |
| print(f"Phase 2 false positives: {phase2_accepts}", file=sys.stderr) | |
| print(f"Final confirmed rejects: {confirmed_rejects}", file=sys.stderr) | |
| print(f"Dry run: {args.dry_run}", file=sys.stderr) | |
| print(f"Last processed ID: {last_processed_id if interrupted else batch_offset}", file=sys.stderr) | |
| if interrupted: | |
| print("Status: INTERRUPTED - Current batch was not completed", file=sys.stderr) | |
| else: | |
| print("Status: COMPLETED - All batches processed", file=sys.stderr) | |
| # Print final deletion count to STDOUT | |
| print(f"\n{confirmed_rejects}", file=sys.stdout) | |
| if interrupted: | |
| sys.exit(ERR_INTERRUPTED) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment