Skip to content

Instantly share code, notes, and snippets.

@Somberor
Last active January 25, 2026 23:20
Show Gist options
  • Select an option

  • Save Somberor/7bc8ff399cb082e04a27103d4ac7c37a to your computer and use it in GitHub Desktop.

Select an option

Save Somberor/7bc8ff399cb082e04a27103d4ac7c37a to your computer and use it in GitHub Desktop.
GPU OCR Server - EasyOCR with concurrent processing (50 workers)
"""
GPU OCR Server - FastAPI + EasyOCR
Run on a GPU server (RTX 3090, 4090, etc.) to provide fast OCR processing.
Usage:
pip install fastapi uvicorn easyocr pillow python-multipart
python server.py
The server will listen on port 8765 by default.
"""
import io
import time
import asyncio
from typing import List, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import easyocr
import numpy as np
from PIL import Image
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
import uvicorn
app = FastAPI(title="GPU OCR Server", version="1.0.0")
# Global OCR reader (loaded once at startup)
ocr_reader: Optional[easyocr.Reader] = None
# Thread pool for running blocking OCR calls
# High default - GPU will naturally throttle if saturated
# Only reduce if you see OOM errors (unlikely with 20GB VRAM)
OCR_WORKERS = 50
ocr_executor: Optional[ThreadPoolExecutor] = None
@app.on_event("startup")
async def load_model():
"""Load EasyOCR model on startup."""
global ocr_reader, ocr_executor
print("[OCR Server] Loading EasyOCR model with GPU...")
start = time.time()
ocr_reader = easyocr.Reader(['en'], gpu=True, verbose=False)
ocr_executor = ThreadPoolExecutor(max_workers=OCR_WORKERS)
print(f"[OCR Server] Model loaded in {time.time() - start:.1f}s (workers={OCR_WORKERS})")
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "ok",
"gpu_loaded": ocr_reader is not None,
"workers": OCR_WORKERS
}
def _run_ocr_sync(img_np: np.ndarray):
"""Synchronous OCR function to run in thread pool."""
return ocr_reader.readtext(img_np)
@app.post("/ocr")
async def process_ocr(file: UploadFile = File(...)):
"""
Process an image and return OCR results.
Returns list of detected text regions with:
- text: The detected text
- confidence: Detection confidence (0-1)
- bbox: Bounding box [[x1,y1], [x2,y1], [x2,y2], [x1,y2]]
- center: Center point (x, y)
"""
if ocr_reader is None:
raise HTTPException(status_code=503, detail="OCR model not loaded")
start_time = time.time()
# Read image
try:
contents = await file.read()
image = Image.open(io.BytesIO(contents))
img_np = np.array(image)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid image: {e}")
read_time = time.time()
# Run OCR in thread pool to avoid blocking the event loop
# This allows multiple OCR requests to run concurrently
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(ocr_executor, _run_ocr_sync, img_np)
ocr_time = time.time()
# Format results
formatted_results = []
for (bbox, text, confidence) in results:
x1, y1 = int(bbox[0][0]), int(bbox[0][1])
x2, y2 = int(bbox[2][0]), int(bbox[2][1])
center_x = (x1 + x2) // 2
center_y = (y1 + y2) // 2
formatted_results.append({
"text": text,
"confidence": float(confidence),
"bbox": [[int(p[0]), int(p[1])] for p in bbox],
"center": [center_x, center_y]
})
total_time = time.time() - start_time
return {
"results": formatted_results,
"timing": {
"read_ms": int((read_time - start_time) * 1000),
"ocr_ms": int((ocr_time - read_time) * 1000),
"total_ms": int(total_time * 1000)
},
"count": len(formatted_results)
}
@app.post("/ocr/find")
async def find_text(
file: UploadFile = File(...),
search: str = "",
exact: bool = False
):
"""
Find specific text in an image.
Args:
file: Image file
search: Text to search for
exact: If True, match exactly; if False, partial match
Returns:
Found text with position, or null if not found
"""
if ocr_reader is None:
raise HTTPException(status_code=503, detail="OCR model not loaded")
if not search:
raise HTTPException(status_code=400, detail="search parameter required")
start_time = time.time()
# Read image
try:
contents = await file.read()
image = Image.open(io.BytesIO(contents))
img_np = np.array(image)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid image: {e}")
# Run OCR in thread pool to avoid blocking the event loop
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(ocr_executor, _run_ocr_sync, img_np)
# Search for text
search_lower = search.lower().strip()
found = None
for (bbox, text, confidence) in results:
text_lower = text.lower().strip()
if exact:
match = (text_lower == search_lower)
else:
match = (search_lower in text_lower)
if match:
x1, y1 = int(bbox[0][0]), int(bbox[0][1])
x2, y2 = int(bbox[2][0]), int(bbox[2][1])
found = {
"text": text,
"confidence": float(confidence),
"center": [(x1 + x2) // 2, (y1 + y2) // 2],
"bbox": [[int(p[0]), int(p[1])] for p in bbox]
}
break
return {
"found": found,
"search": search,
"exact": exact,
"timing_ms": int((time.time() - start_time) * 1000)
}
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="GPU OCR Server")
parser.add_argument("--host", default="0.0.0.0", help="Host to bind to")
parser.add_argument("--port", type=int, default=8765, help="Port to bind to")
parser.add_argument("--workers", type=int, default=50, help="Number of concurrent OCR workers (default: 50)")
args = parser.parse_args()
# Update worker count from command line (module-level assignment)
OCR_WORKERS = args.workers
print(f"[OCR Server] Starting on {args.host}:{args.port} with {OCR_WORKERS} workers")
uvicorn.run(app, host=args.host, port=args.port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment