Skip to content

Instantly share code, notes, and snippets.

@Somberor
Last active January 28, 2026 00:43
Show Gist options
  • Select an option

  • Save Somberor/0f79104089f156504a70e102efd49579 to your computer and use it in GitHub Desktop.

Select an option

Save Somberor/0f79104089f156504a70e102efd49579 to your computer and use it in GitHub Desktop.
NSFW Detection Server (OpenNSFW2) - One-liner deploy for RunPod/GPU
#!/bin/bash
# RunPod Full Setup - OCR Server (8765) + NSFW Server (8877)
# Usage: curl -sSL <gist_url>/raw | bash
# Saves to /workspace for persistence across restarts
set -e
cd /workspace
echo "============================================"
echo " RunPod Server Setup"
echo " OCR Server: port 8765"
echo " NSFW Server: port 8877"
echo "============================================"
echo ""
# Use /workspace/.local for pip to persist across restarts
export PYTHONUSERBASE=/workspace/.local
export PATH=$PYTHONUSERBASE/bin:$PATH
mkdir -p $PYTHONUSERBASE
echo "[1/4] Installing dependencies (cached in /workspace/.local)..."
pip install -q --user easyocr opennsfw2 tensorflow fastapi uvicorn python-multipart pillow
echo "[2/4] Creating OCR server..."
mkdir -p /workspace/ocr_server
cat > /workspace/ocr_server/server.py << 'OCREOF'
import io
import time
import asyncio
from typing import Optional
from concurrent.futures import ThreadPoolExecutor
import easyocr
import numpy as np
from PIL import Image
from fastapi import FastAPI, File, UploadFile, HTTPException
import uvicorn
app = FastAPI(title="GPU OCR Server", version="1.0.0")
ocr_reader: Optional[easyocr.Reader] = None
OCR_WORKERS = 50
ocr_executor: Optional[ThreadPoolExecutor] = None
@app.on_event("startup")
async def load_model():
global ocr_reader, ocr_executor
print("[OCR] Loading EasyOCR model...")
start = time.time()
ocr_reader = easyocr.Reader(['en'], gpu=True, verbose=False)
ocr_executor = ThreadPoolExecutor(max_workers=OCR_WORKERS)
print(f"[OCR] Model loaded in {time.time() - start:.1f}s")
@app.get("/health")
async def health_check():
return {"status": "ok", "gpu_loaded": ocr_reader is not None}
def _run_ocr_sync(img_np: np.ndarray):
return ocr_reader.readtext(img_np)
@app.post("/ocr")
async def process_ocr(file: UploadFile = File(...)):
if ocr_reader is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start_time = time.time()
try:
contents = await file.read()
image = Image.open(io.BytesIO(contents))
img_np = np.array(image)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid image: {e}")
read_time = time.time()
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(ocr_executor, _run_ocr_sync, img_np)
ocr_time = time.time()
formatted_results = []
for (bbox, text, confidence) in results:
x1, y1 = int(bbox[0][0]), int(bbox[0][1])
x2, y2 = int(bbox[2][0]), int(bbox[2][1])
formatted_results.append({
"text": text, "confidence": float(confidence),
"bbox": [[int(p[0]), int(p[1])] for p in bbox],
"center": [(x1 + x2) // 2, (y1 + y2) // 2]
})
return {
"results": formatted_results,
"timing": {"read_ms": int((read_time - start_time) * 1000), "ocr_ms": int((ocr_time - read_time) * 1000), "total_ms": int((time.time() - start_time) * 1000)},
"count": len(formatted_results)
}
@app.post("/ocr/find")
async def find_text(file: UploadFile = File(...), search: str = "", exact: bool = False):
if ocr_reader is None:
raise HTTPException(status_code=503, detail="Model not loaded")
if not search:
raise HTTPException(status_code=400, detail="search parameter required")
start_time = time.time()
try:
contents = await file.read()
image = Image.open(io.BytesIO(contents))
img_np = np.array(image)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid image: {e}")
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(ocr_executor, _run_ocr_sync, img_np)
search_lower = search.lower().strip()
found = None
for (bbox, text, confidence) in results:
text_lower = text.lower().strip()
match = (text_lower == search_lower) if exact else (search_lower in text_lower)
if match:
x1, y1 = int(bbox[0][0]), int(bbox[0][1])
x2, y2 = int(bbox[2][0]), int(bbox[2][1])
found = {"text": text, "confidence": float(confidence), "center": [(x1 + x2) // 2, (y1 + y2) // 2], "bbox": [[int(p[0]), int(p[1])] for p in bbox]}
break
return {"found": found, "search": search, "exact": exact, "timing_ms": int((time.time() - start_time) * 1000)}
if __name__ == "__main__":
print("[OCR Server] Starting on port 8765")
uvicorn.run(app, host="0.0.0.0", port=8765)
OCREOF
echo "[3/4] Creating NSFW server..."
cat > /workspace/nsfw_server.py << 'NSFWEOF'
import os
import time
import traceback
import asyncio
from typing import Dict, Any, List, Optional
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from pydantic import BaseModel
from concurrent.futures import ThreadPoolExecutor
import uuid
from datetime import datetime, timedelta
import threading
import gc
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
import tensorflow as tf
def setup_gpu():
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
print(f"[NSFW] Found {len(gpus)} GPU(s)")
except RuntimeError as e:
print(f"[NSFW] GPU error: {e}")
else:
print("[NSFW] No GPUs found")
return gpus
gpus = setup_gpu()
try:
import opennsfw2 as n2
print("[NSFW] OpenNSFW2 loaded")
except ImportError as e:
print(f"[NSFW] OpenNSFW2 not installed: {e}")
n2 = None
class JobStatus:
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class Job(BaseModel):
id: str
status: str = JobStatus.PENDING
created_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
class JobQueue:
def __init__(self, max_concurrent_jobs=64):
self.jobs: Dict[str, Job] = {}
self.queue: asyncio.Queue = asyncio.Queue()
self.max_concurrent = max_concurrent_jobs
self.active_jobs = 0
self.lock = threading.Lock()
self.max_job_age = timedelta(hours=24)
self.max_jobs_in_memory = 1000
self.last_cleanup = datetime.now()
async def add_job(self, job_id: str, image_path: str) -> Job:
await self.cleanup_old_jobs()
job = Job(id=job_id, created_at=datetime.now())
with self.lock:
if len(self.jobs) >= self.max_jobs_in_memory:
self._remove_oldest_completed_jobs(len(self.jobs) - self.max_jobs_in_memory + 1)
self.jobs[job_id] = job
await self.queue.put((job_id, image_path))
return job
def get_job(self, job_id: str) -> Optional[Job]:
return self.jobs.get(job_id)
def _remove_oldest_completed_jobs(self, count: int):
completed = [(jid, j) for jid, j in self.jobs.items() if j.status in [JobStatus.COMPLETED, JobStatus.FAILED]]
completed.sort(key=lambda x: x[1].completed_at or x[1].created_at)
for i in range(min(count, len(completed))):
del self.jobs[completed[i][0]]
async def cleanup_old_jobs(self):
now = datetime.now()
if (now - self.last_cleanup).seconds < 300:
return
with self.lock:
self.last_cleanup = now
cutoff = now - self.max_job_age
to_remove = [jid for jid, j in self.jobs.items()
if (j.status in [JobStatus.COMPLETED, JobStatus.FAILED] and j.completed_at and j.completed_at < cutoff)
or (j.status == JobStatus.PENDING and j.created_at < cutoff)]
for jid in to_remove:
del self.jobs[jid]
if to_remove:
gc.collect()
job_queue = JobQueue(max_concurrent_jobs=64)
executor = ThreadPoolExecutor(max_workers=64)
model_lock = threading.Lock()
app = FastAPI(title="NSFW Detection API", version="5.0.0")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
def predict_nsfw(image_path: str) -> float:
if not n2:
raise RuntimeError("OpenNSFW2 not available")
with model_lock:
return float(n2.predict_image(image_path))
async def job_processor():
loop = asyncio.get_event_loop()
while True:
try:
job_id, image_path = await job_queue.queue.get()
job = job_queue.get_job(job_id)
if job:
with job_queue.lock:
job.status = JobStatus.PROCESSING
job.started_at = datetime.now()
job_queue.active_jobs += 1
try:
prob = await loop.run_in_executor(executor, predict_nsfw, image_path)
job.status = JobStatus.COMPLETED
job.completed_at = datetime.now()
job.result = {"nsfw_probability": prob}
except Exception as e:
job.status = JobStatus.FAILED
job.error = str(e)
job.completed_at = datetime.now()
finally:
if os.path.exists(image_path):
os.remove(image_path)
with job_queue.lock:
job_queue.active_jobs -= 1
except Exception as e:
print(f"[NSFW] Error: {e}")
await asyncio.sleep(1)
@app.on_event("startup")
async def startup():
for _ in range(job_queue.max_concurrent):
asyncio.create_task(job_processor())
print(f"[NSFW Server] Ready on port 8877 | GPU: {len(gpus) > 0}")
@app.get("/")
async def root():
return {"status": "online", "model": "OpenNSFW2", "gpu": len(gpus) > 0,
"queue": {"active": job_queue.active_jobs, "pending": job_queue.queue.qsize()}}
@app.post("/testImageSync")
async def test_sync(file: UploadFile = File(...)):
if file.content_type and not file.content_type.startswith('image/'):
raise HTTPException(400, "File must be an image")
path = f"/tmp/nsfw_{uuid.uuid4()}"
try:
with open(path, 'wb') as f:
f.write(await file.read())
start = time.time()
prob = await asyncio.get_event_loop().run_in_executor(executor, predict_nsfw, path)
return {"nsfw_probability": prob, "processing_time": time.time() - start}
except Exception as e:
traceback.print_exc()
raise HTTPException(500, str(e))
finally:
if os.path.exists(path):
os.remove(path)
@app.post("/testImage")
async def test_async(file: UploadFile = File(...)):
if file.content_type and not file.content_type.startswith('image/'):
raise HTTPException(400, "File must be an image")
job_id = str(uuid.uuid4())
path = f"/tmp/nsfw_{job_id}"
try:
with open(path, 'wb') as f:
f.write(await file.read())
job = await job_queue.add_job(job_id, path)
return {"job_id": job_id, "status": job.status}
except Exception as e:
if os.path.exists(path):
os.remove(path)
raise HTTPException(500, str(e))
@app.get("/job/{job_id}")
async def get_job(job_id: str):
job = job_queue.get_job(job_id)
if not job:
raise HTTPException(404, "Job not found")
return job
if __name__ == "__main__":
print("[NSFW Server] Starting on port 8877")
uvicorn.run(app, host="0.0.0.0", port=8877)
NSFWEOF
echo "[4/4] Creating startup script..."
cat > /workspace/start_servers.sh << 'STARTEOF'
#!/bin/bash
# Start both servers - run this on each restart
# Add to /start.sh: bash /workspace/start_servers.sh
export PYTHONUSERBASE=/workspace/.local
export PATH=$PYTHONUSERBASE/bin:$PATH
cd /workspace
echo "Starting OCR server (port 8765)..."
nohup python /workspace/ocr_server/server.py > /workspace/ocr_server/server.log 2>&1 &
echo "OCR PID: $!"
echo "Starting NSFW server (port 8877)..."
nohup python /workspace/nsfw_server.py > /workspace/nsfw_server.log 2>&1 &
echo "NSFW PID: $!"
sleep 3
echo ""
echo "Servers started! Test with:"
echo " curl http://localhost:8765/health"
echo " curl http://localhost:8877/"
STARTEOF
chmod +x /workspace/start_servers.sh
echo ""
echo "============================================"
echo " Setup Complete!"
echo "============================================"
echo ""
echo "Files created:"
echo " /workspace/ocr_server/server.py (port 8765)"
echo " /workspace/nsfw_server.py (port 8877)"
echo " /workspace/start_servers.sh (startup script)"
echo ""
echo "Pip packages saved to /workspace/.local (persists across restarts)"
echo ""
echo "To auto-start on reboot, add to /start.sh:"
echo " bash /workspace/start_servers.sh"
echo ""
echo "Starting servers now..."
echo ""
bash /workspace/start_servers.sh
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment