Skip to content

Instantly share code, notes, and snippets.

@nperez
Created January 10, 2026 15:34
Show Gist options
  • Select an option

  • Save nperez/faccdd42344ef85b0ba00911cf9a3f8e to your computer and use it in GitHub Desktop.

Select an option

Save nperez/faccdd42344ef85b0ba00911cf9a3f8e to your computer and use it in GitHub Desktop.
Magnetico Two-Stage Pornographic Torrent Filter
#!/usr/bin/env python3
"""
Magnetico Two-Stage Pornographic Torrent Filter
This script implements a two-stage classification system to identify and delete
pornographic torrents from a magnetico PostgreSQL database using local Ollama LLMs.
Installation:
pip install psycopg2-binary requests
Usage:
export DB_PASSWORD=your_password
python3 filter_torrents.py [options]
Dependencies:
- psycopg2-binary: PostgreSQL database adapter
- requests: HTTP library for Ollama API calls
"""
import sys
import os
import argparse
import copy
import time
import json
from typing import List, Tuple, Optional
import psycopg2
from psycopg2 import OperationalError
import requests
# =============================================================================
# PROMPT TEMPLATES - Edit these to adjust classification behavior
# =============================================================================
CLASSIFICATION_PROMPT = """Is each torrent title pornographic? (A title is pornographic only when it mentions XXX, well known porn stars, sexual acts, or sexually explicit terms)
Below is a list of torrent titles to classify. REJECT if pornographic, ACCEPT otherwise. No commentary. ONLY RETURN CLASSIFICATIONS.
Example:
Titles:
1:Ferngully_1080.mkv
2:Ass-Blasters-XXX-720p.avi
Classification:
1:ACCEPT
2:REJECT
Titles:
{title_list}
Classification:
"""
VERIFICATION_PROMPT = """Is each torrent title pornographic? (A title is pornographic only when it mentions XXX, well known porn stars, sexual acts, or sexually explicit terms)
Below is a list of torrent titles to review. REJECT if pornographic, ACCEPT otherwise. No commentary.
Example:
Titles:
1:Ferngully_1080.mkv
2:Ass-Blasters-XXX-720p.avi
Classification:
1:ACCEPT
2:REJECT
Titles:
{title_list}
Classification:
"""
# =============================================================================
# CONFIGURATION CONSTANTS
# =============================================================================
# Database defaults
DEFAULT_DB_HOST = "localhost"
DEFAULT_DB_PORT = 5432
DEFAULT_DB_NAME = "magnetico"
DEFAULT_DB_USER = "magnetico"
DEFAULT_DB_PASSWORD = "magnetico"
# Ollama defaults
DEFAULT_OLLAMA_URL = "http://localhost:11434"
DEFAULT_CLASSIFICATION_MODEL = "gemma3:4b-it-qat"
DEFAULT_VERIFICATION_MODEL = "qwen3:30b-a3b-instruct-2507-q4_K_M"
DEFAULT_CTX_WINDOW = 1024
# Processing defaults
DEFAULT_BATCH_SIZE = 100
DEFAULT_TITLES_PER_CALL = 5
DEFAULT_RATE_LIMIT = 0.0
# Resume state file
STATE_FILE = ".magnetico_filter_state"
# =============================================================================
# ERROR CODES
# =============================================================================
ERR_DB_CONNECTION = 1
ERR_OLLAMA_CONNECTION = 1
ERR_INTERRUPTED = 130
# =============================================================================
# STATE MANAGEMENT
# =============================================================================
def read_offset_from_file() -> Optional[int]:
"""Read the last processed ID from state file."""
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, 'r') as f:
offset = f.read().strip()
if offset:
return int(offset)
except (ValueError, IOError) as e:
print(f"Warning: Could not read state file: {e}", file=sys.stderr)
return None
def write_offset_to_file(offset: int) -> None:
"""Write the last processed ID to state file."""
try:
with open(STATE_FILE, 'w') as f:
f.write(str(offset))
print(f"Saved resume state: last processed ID = {offset}", file=sys.stderr)
except IOError as e:
print(f"Warning: Could not write state file: {e}", file=sys.stderr)
def cleanup_and_save_state(offset: int, cursor=None, conn=None) -> None:
"""Clean up database connections and save state."""
# Close database connections
if cursor:
cursor.close()
print("Database cursor closed", file=sys.stderr)
if conn:
conn.close()
print("Database connection closed", file=sys.stderr)
# Save state
if offset >= 0:
write_offset_to_file(offset)
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def parse_arguments() -> argparse.Namespace:
"""Parse command line arguments with environment variable fallbacks."""
parser = argparse.ArgumentParser(
description="Two-stage pornographic torrent filter for magnetico database",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
# Database connection arguments
db_group = parser.add_argument_group("Database Connection")
db_group.add_argument(
"--db-host",
default=os.getenv("DB_HOST", DEFAULT_DB_HOST),
help="PostgreSQL host (or env DB_HOST)"
)
db_group.add_argument(
"--db-port",
type=int,
default=int(os.getenv("DB_PORT", DEFAULT_DB_PORT)),
help="PostgreSQL port (or env DB_PORT)"
)
db_group.add_argument(
"--db-name",
default=os.getenv("DB_NAME", DEFAULT_DB_NAME),
help="PostgreSQL database name (or env DB_NAME)"
)
db_group.add_argument(
"--db-user",
default=os.getenv("DB_USER", DEFAULT_DB_USER),
help="PostgreSQL user (or env DB_USER)"
)
db_group.add_argument(
"--db-password",
default=os.getenv("DB_PASSWORD", DEFAULT_DB_PASSWORD),
help="PostgreSQL password (required, or env DB_PASSWORD)"
)
# Ollama configuration
ollama_group = parser.add_argument_group("Ollama Configuration")
ollama_group.add_argument(
"--ollama-url",
default=os.getenv("OLLAMA_URL", DEFAULT_OLLAMA_URL),
help="Ollama API base URL"
)
ollama_group.add_argument(
"--classification-model",
default=os.getenv("CLASSIFICATION_MODEL", DEFAULT_CLASSIFICATION_MODEL),
help="Model for initial classification"
)
ollama_group.add_argument(
"--verification-model",
default=os.getenv("VERIFICATION_MODEL", DEFAULT_VERIFICATION_MODEL),
help="Model for verification stage"
)
ollama_group.add_argument(
"--classification-ctx",
type=int,
default=int(os.getenv("CLASSIFICATION_CTX", DEFAULT_CTX_WINDOW)),
help="Context window size for classification model"
)
ollama_group.add_argument(
"--verification-ctx",
type=int,
default=int(os.getenv("VERIFICATION_CTX", DEFAULT_CTX_WINDOW)),
help="Context window size for verification model"
)
# Processing configuration
proc_group = parser.add_argument_group("Processing Configuration")
proc_group.add_argument(
"--batch-size",
type=int,
default=DEFAULT_BATCH_SIZE,
help="Number of torrents to fetch per database batch"
)
proc_group.add_argument(
"--titles-per-call",
type=int,
default=DEFAULT_TITLES_PER_CALL,
help="Number of titles per API call"
)
proc_group.add_argument(
"--rate-limit",
type=float,
default=DEFAULT_RATE_LIMIT,
help="Seconds to wait between API calls"
)
proc_group.add_argument(
"--max-batches",
type=int,
help="Optional limit on number of batches to process"
)
proc_group.add_argument(
"--dry-run",
action="store_true",
help="Perform classification but skip deletion"
)
proc_group.add_argument(
"--offset",
type=int,
help="Starting offset (ID) for resume. If not provided, reads from state file or starts from 0"
)
args = parser.parse_args()
# Validate required arguments
if not args.db_password:
parser.error("--db-password is required (or set DB_PASSWORD environment variable)")
# Determine offset
if args.offset is None:
# Try to read from state file
file_offset = read_offset_from_file()
if file_offset is not None:
args.offset = file_offset
print(f"Resuming from saved state: ID {args.offset}", file=sys.stderr)
else:
args.offset = 0
print("Starting from beginning (no state file found)", file=sys.stderr)
else:
print(f"Using manual offset: ID {args.offset}", file=sys.stderr)
return args
def connect_db(args: argparse.Namespace):
"""Establish PostgreSQL connection with error handling."""
try:
conn = psycopg2.connect(
host=args.db_host,
port=args.db_port,
dbname=args.db_name,
user=args.db_user,
password=args.db_password
)
return conn
except OperationalError as e:
print(f"Database connection failed: {e}", file=sys.stderr)
sys.exit(ERR_DB_CONNECTION)
def test_ollama_models(args: argparse.Namespace) -> None:
"""Test Ollama API connectivity and model availability."""
try:
response = requests.get(
f"{args.ollama_url}/api/tags",
timeout=10
)
response.raise_for_status()
models_data = response.json()
available_models = [m["name"] for m in models_data.get("models", [])]
required_models = [
args.classification_model,
args.verification_model
]
for model in required_models:
if model not in available_models:
print(
f"Required model '{model}' not found in Ollama. "
f"Run: ollama pull {model}",
file=sys.stderr
)
sys.exit(ERR_OLLAMA_CONNECTION)
print("Ollama API and required models verified", file=sys.stderr)
except requests.RequestException as e:
print(f"Ollama API connection failed: {e}", file=sys.stderr)
sys.exit(ERR_OLLAMA_CONNECTION)
def call_ollama_api(
args: argparse.Namespace,
prompt: str,
model: str,
ctx_window: int
) -> Optional[str]:
"""Call Ollama generate API with retry logic for invalid responses."""
url = f"{args.ollama_url}/api/generate"
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {
"num_predict": ctx_window
}
}
try:
response = requests.post(
url,
json=payload,
timeout=60
)
response.raise_for_status()
response_data = response.json()
response_text = response_data.get("response", "")
print("+++ START PROMPT +++", file=sys.stderr)
print(prompt, file=sys.stderr)
print("+++ END PROMPT +++", file=sys.stderr)
print("")
print("--- RAW RESPONSE ---", file=sys.stderr)
print(response_text, file=sys.stderr)
print("--- END RESPONSE ---", file=sys.stderr)
print("=" * 60, file=sys.stderr)
return response_text
except requests.RequestException as e:
print(f"API call failed: {e}", file=sys.stderr)
return None
def parse_api_response(
response_text: str,
expected_count: int
) -> Optional[List[str]]:
"""
Parse and validate API response.
Returns:
List of classifications or None on validation failure
"""
lines = response_text.strip().split('\n')
# Check line count matches (+1 because of "Classification: "
if len(lines) != expected_count:
print(
f"Response line count mismatch: expected {expected_count}, got {len(lines)}",
file=sys.stderr
)
return None
results = []
for line in lines:
classification = line.split(":")[1].strip().upper()
# Check classification is valid
if classification not in ["ACCEPT", "REJECT"]:
print(f"Invalid classification value: '{classification}'", file=sys.stderr)
return None
results.append(classification)
return results
def process_batch(
args: argparse.Namespace,
torrents: List[Tuple[int, str]],
prompt_template: str,
model: str,
ctx_window: int
) -> List[Tuple[int, str, str]]:
"""
Process a batch of torrents through the LLM.
Returns:
List of (id, title, classification) tuples
"""
results = []
# Split into sub-batches for API calls
for i in range(0, len(torrents), args.titles_per_call):
sub_batch = torrents[i:i + args.titles_per_call]
title_list = '\n'.join([f"{t+1}:{sub_batch[t][1]}" for t in range(0, len(sub_batch))])
prompt = prompt_template.format(title_list=title_list)
print(
f" Processing API sub-batch of {len(sub_batch)} titles...",
file=sys.stderr
)
# Retry loop for invalid responses
max_retries = 3
for attempt in range(max_retries):
response_text = call_ollama_api(args, prompt, model, ctx_window)
if response_text is None:
print(f" API call failed, retrying... (attempt {attempt + 1})", file=sys.stderr)
time.sleep(args.rate_limit * 2)
continue
parsed = parse_api_response(response_text, len(sub_batch))
if parsed is None:
print(f" Invalid response format, retrying... (attempt {attempt + 1})", file=sys.stderr)
time.sleep(args.rate_limit * 2)
continue
# Map results back to torrent IDs
for (torrent_id, title), (classification) in zip(sub_batch, parsed):
results.append((torrent_id, title, classification))
break
else:
print(f" Failed after {max_retries} attempts, skipping batch", file=sys.stderr)
# Add all as ACCEPT to avoid false positives
for torrent_id, title in sub_batch:
results.append((torrent_id, title, "ACCEPT"))
# Rate limiting
if i + args.titles_per_call < len(torrents):
time.sleep(args.rate_limit)
return results
def main():
"""Main execution function."""
args = parse_arguments()
# Verify Ollama models before starting
print("Verifying Ollama models...", file=sys.stderr)
test_ollama_models(args)
# Connect to database
print("Connecting to database...", file=sys.stderr)
conn = connect_db(args)
cursor = conn.cursor()
# Statistics
total_processed = 0
phase1_rejects = 0
phase2_accepts = 0
confirmed_rejects = 0
interrupted = False
last_processed_id = args.offset
try:
# Get total count for progress tracking
cursor.execute("SELECT COUNT(*) FROM torrents")
total_torrents = cursor.fetchone()[0]
print(f"Total torrents in database: {total_torrents}", file=sys.stderr)
print("\n=== BEGINNING BATCH PROCESSING ===", file=sys.stderr)
print("Each batch will be classified, verified, and deleted in a single transaction", file=sys.stderr)
print("Ctrl+C will only lose progress on the current batch", file=sys.stderr)
print("=" * 60, file=sys.stderr)
batch_offset = args.offset
batch_num = 0
while True:
if args.max_batches and batch_num >= args.max_batches:
print(f"\nReached max batches limit ({args.max_batches})", file=sys.stderr)
break
# Fetch batch from database starting from offset
print(f"\n--- FETCHING BATCH {batch_num + 1} (starting from ID {batch_offset}) ---", file=sys.stderr)
cursor.execute(
"SELECT id, name FROM torrents WHERE id > %s ORDER BY id LIMIT %s",
(batch_offset, args.batch_size)
)
torrents = cursor.fetchall()
if not torrents:
print("\nNo more torrents to process", file=sys.stderr)
break
batch_num += 1
print(
f"\n=== PROCESSING BATCH {batch_num} ({len(torrents)} torrents) ===",
file=sys.stderr
)
# Phase 1: Classification for this batch
print("\nPhase 1: Classifying batch...", file=sys.stderr)
results = process_batch(
args,
torrents,
CLASSIFICATION_PROMPT,
args.classification_model,
args.classification_ctx
)
# Collect rejects for this batch
batch_rejects = []
batch_processed = 0
for torrent_id, title, classification in results:
batch_processed += 1
total_processed += 1
last_processed_id = max(last_processed_id, torrent_id)
if classification == "REJECT":
batch_rejects.append((torrent_id, title))
phase1_rejects += 1
print(f" Batch {batch_num}: Found {len(batch_rejects)} potential rejects", file=sys.stderr)
# Phase 2: Verification for this batch's rejects
batch_confirmed = 0
final_rejects = []
if batch_rejects:
print(f"\nPhase 2: Verifying {len(batch_rejects)} potential rejects from batch...", file=sys.stderr)
deep_copy = argparse.Namespace(**{k: copy.deepcopy(v) for k, v in vars(args).items()})
deep_copy.titles_per_call *= 2
results = process_batch(
deep_copy,
batch_rejects,
VERIFICATION_PROMPT,
args.verification_model,
args.verification_ctx
)
# Collect confirmed rejects
for torrent_id, title, classification in results:
last_processed_id = max(last_processed_id, torrent_id)
if classification == "REJECT":
final_rejects.append(torrent_id)
confirmed_rejects += 1
batch_confirmed += 1
else:
phase2_accepts += 1
print(f" Batch {batch_num}: Confirmed {batch_confirmed} rejects for deletion", file=sys.stderr)
# Deletion phase for this batch
if not args.dry_run and final_rejects:
print(f" Batch {batch_num}: Deleting {len(final_rejects)} torrents (cascade to files)...", file=sys.stderr)
# Delete in chunks to avoid overly large queries
chunk_size = 100
deleted_count = 0
for i in range(0, len(final_rejects), chunk_size):
chunk = final_rejects[i:i + chunk_size]
placeholders = ','.join(['%s'] * len(chunk))
cursor.execute(
f"DELETE FROM torrents WHERE id IN ({placeholders})",
chunk
)
conn.commit()
deleted_count += len(chunk)
print(f" Deleted {deleted_count}/{len(final_rejects)} from batch...", file=sys.stderr)
print(f" Batch {batch_num}: Successfully deleted all {deleted_count} torrents", file=sys.stderr)
elif args.dry_run and final_rejects:
print(f" Batch {batch_num}: Dry run - would delete {len(final_rejects)} torrents", file=sys.stderr)
# Update offset for next batch (use the highest ID from this batch)
if torrents:
batch_offset = max(t[0] for t in torrents)
# Log batch completion and save state
print(f"\n--- BATCH {batch_num} COMPLETE ---", file=sys.stderr)
print(f"Processed: {batch_processed} | Potential rejects: {len(batch_rejects)} | Confirmed: {batch_confirmed}", file=sys.stderr)
write_offset_to_file(batch_offset)
# Rate limiting between batches
if batch_processed >= args.batch_size:
print(f"Waiting {args.rate_limit} seconds before next batch...", file=sys.stderr)
time.sleep(args.rate_limit)
except KeyboardInterrupt:
interrupted = True
print("\n\n" + "=" * 60, file=sys.stderr)
print("OPERATION CANCELLED BY USER (Ctrl+C)", file=sys.stderr)
print("=" * 60, file=sys.stderr)
except psycopg2.Error as e:
print(f"\nDatabase error: {e}", file=sys.stderr)
sys.exit(ERR_DB_CONNECTION)
finally:
# Clean up database connections and save final state
cleanup_and_save_state(
last_processed_id if interrupted else batch_offset,
cursor if 'cursor' in locals() else None,
conn if 'conn' in locals() else None
)
# Always print final summary
print("\n" + "=" * 60, file=sys.stderr)
print("=== FINAL SUMMARY ===", file=sys.stderr)
print("=" * 60, file=sys.stderr)
print(f"Total torrents processed: {total_processed}", file=sys.stderr)
print(f"Phase 1 potential rejects: {phase1_rejects}", file=sys.stderr)
print(f"Phase 2 false positives: {phase2_accepts}", file=sys.stderr)
print(f"Final confirmed rejects: {confirmed_rejects}", file=sys.stderr)
print(f"Dry run: {args.dry_run}", file=sys.stderr)
print(f"Last processed ID: {last_processed_id if interrupted else batch_offset}", file=sys.stderr)
if interrupted:
print("Status: INTERRUPTED - Current batch was not completed", file=sys.stderr)
else:
print("Status: COMPLETED - All batches processed", file=sys.stderr)
# Print final deletion count to STDOUT
print(f"\n{confirmed_rejects}", file=sys.stdout)
if interrupted:
sys.exit(ERR_INTERRUPTED)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment