Skip to content

Instantly share code, notes, and snippets.

@shawngraham
Last active February 24, 2026 19:10
Show Gist options
  • Select an option

  • Save shawngraham/623b74639b4aece14a4a02c8ec8e91fd to your computer and use it in GitHub Desktop.

Select an option

Save shawngraham/623b74639b4aece14a4a02c8ec8e91fd to your computer and use it in GitHub Desktop.
use ocr_keyword_search.py for when you have a folder of ocr'd text, and a file with keywords ; use groq-ocr.py for when you need to get that text in the first place.
"""
groq_ocr.py
Processes newspaper images using Groq's vision API and extracts individual
articles to a CSV. Each row represents one article with associated page metadata.
Usage:
python newspaper_ocr.py --input_dir processed_output/images --output ocr_results.csv
Requirements:
pip install groq
"""
import argparse
import base64
import csv
import json
import os
import time
from pathlib import Path
from groq import Groq
# ── Configuration ──────────────────────────────────────────────────────────────
GROQ_MODEL = "meta-llama/llama-4-scout-17b-16e-instruct"
SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png"}
SLEEP_BETWEEN_REQUESTS = 1 # seconds – be kind to the API
# ── Prompts ────────────────────────────────────────────────────────────────────
EXTRACTION_PROMPT = """
You are a newspaper digitisation assistant. Analyse this newspaper page image and
extract every distinct article you can see.
Return ONLY a JSON object with the following structure — no markdown, no extra text:
{
"page_metadata": {
"publication_name": "...",
"publication_date": "YYYY-MM-DD or best guess, empty string if unknown",
"edition": "e.g. Morning, Evening, or empty string",
"page_number": "integer or empty string",
"section": "e.g. News, Sport, Business, or empty string",
"language": "ISO 639-1 code, e.g. en"
},
"articles": [
{
"headline": "Article headline or title",
"byline": "Author name(s) or empty string",
"column": "Column position on page, e.g. 1-3 or empty string",
"body_text": "Full article text, preserving paragraphs with \\n",
"article_type": "news | opinion | advertisement | obituary | letter | other",
"notes": "Any digitisation caveats (partial text, poor scan, etc.) or empty string"
}
]
}
If no articles are found return {"page_metadata": {}, "articles": []}.
""".strip()
# ── Helpers ────────────────────────────────────────────────────────────────────
def encode_image(image_path: Path) -> tuple[str, str]:
"""Return (base64_string, mime_type) for the image."""
suffix = image_path.suffix.lower()
mime = "image/jpeg" if suffix in {".jpg", ".jpeg"} else "image/png"
with open(image_path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8"), mime
def call_groq(client: Groq, image_path: Path) -> dict:
"""Send one image to Groq and return the parsed JSON response."""
b64, mime = encode_image(image_path)
response = client.chat.completions.create(
model=GROQ_MODEL,
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": EXTRACTION_PROMPT},
{
"type": "image_url",
"image_url": {"url": f"data:{mime};base64,{b64}"},
},
],
}
],
temperature=0.0, # deterministic for structured extraction
)
raw = response.choices[0].message.content.strip()
# Strip accidental markdown fences
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
raw = raw.strip()
return json.loads(raw)
def flatten_to_rows(image_path: Path, parsed: dict) -> list[dict]:
"""
Turn the nested JSON for one page into flat rows suitable for CSV.
One row per article, page metadata repeated on every row.
"""
meta = parsed.get("page_metadata", {})
articles = parsed.get("articles", [])
base_row = {
"source_file": image_path.name,
"publication_name": meta.get("publication_name", ""),
"publication_date": meta.get("publication_date", ""),
"edition": meta.get("edition", ""),
"page_number": meta.get("page_number", ""),
"section": meta.get("section", ""),
"language": meta.get("language", ""),
}
if not articles:
# Still emit one row so the page is recorded even if extraction failed
return [{**base_row, "headline": "", "byline": "", "column": "",
"body_text": "", "article_type": "", "notes": "no articles extracted"}]
rows = []
for art in articles:
rows.append({
**base_row,
"headline": art.get("headline", ""),
"byline": art.get("byline", ""),
"column": art.get("column", ""),
"body_text": art.get("body_text", ""),
"article_type": art.get("article_type", ""),
"notes": art.get("notes", ""),
})
return rows
CSV_FIELDNAMES = [
"source_file", "publication_name", "publication_date", "edition",
"page_number", "section", "language",
"headline", "byline", "column", "body_text", "article_type", "notes",
]
# ── Main ───────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="OCR newspaper images → CSV via Groq")
parser.add_argument(
"--input_dir", default="processed_output/images",
help="Directory containing .jpg / .jpeg / .png newspaper scans",
)
parser.add_argument(
"--output", default="ocr_results.csv",
help="Path for the output CSV file",
)
parser.add_argument(
"--api_key", default=None,
help="Groq API key (falls back to GROQ_API_KEY env var)",
)
args = parser.parse_args()
api_key = args.api_key or os.environ.get("GROQ_API_KEY")
if not api_key:
raise ValueError(
"Groq API key not found. Set GROQ_API_KEY env var or pass --api_key."
)
input_dir = Path(args.input_dir)
if not input_dir.is_dir():
raise FileNotFoundError(f"Image directory not found: {input_dir}")
images = sorted(
p for p in input_dir.iterdir()
if p.suffix.lower() in SUPPORTED_EXTENSIONS
)
if not images:
print(f"No supported image files found in {input_dir}")
return
client = Groq(api_key=api_key)
output_path = Path(args.output)
total_articles = 0
with open(output_path, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=CSV_FIELDNAMES)
writer.writeheader()
for i, img_path in enumerate(images, 1):
print(f"[{i}/{len(images)}] Processing {img_path.name} …", end=" ", flush=True)
try:
parsed = call_groq(client, img_path)
rows = flatten_to_rows(img_path, parsed)
writer.writerows(rows)
csvfile.flush() # write incrementally in case of crash
print(f"→ {len(rows)} article(s) extracted")
total_articles += len(rows)
except json.JSONDecodeError as e:
print(f"⚠ JSON parse error: {e}")
writer.writerow({
"source_file": img_path.name, "notes": f"JSON parse error: {e}",
**{k: "" for k in CSV_FIELDNAMES if k not in ("source_file", "notes")},
})
except Exception as e:
print(f"⚠ Error: {e}")
writer.writerow({
"source_file": img_path.name, "notes": f"Error: {e}",
**{k: "" for k in CSV_FIELDNAMES if k not in ("source_file", "notes")},
})
if i < len(images):
time.sleep(SLEEP_BETWEEN_REQUESTS)
print(f"\nDone. {total_articles} articles written to {output_path}")
if __name__ == "__main__":
main()
"""
ocr_keyword_search.py
─────────────────────
Fuzzy keyword search across OCR'd text files.
Usage:
python ocr_keyword_search.py [options]
Options:
--text-dir DIR Directory containing .txt files (default: ./texts)
--keywords FILE Keyword list, one per line (default: keywords.txt)
--output FILE Plain-text results map (default: output.txt)
--csv FILE CSV results (sortable) (default: output.csv)
--threshold 0-100 Minimum fuzzy match score (default: 75)
--context N Words of context around hit (default: 12)
--encoding ENC Text file encoding (default: utf-8-sig)
--debug Print diagnostics for first file and exit
Requirements:
None beyond the standard library.
"""
import argparse
import csv
import difflib
import re
import sys
import textwrap
from pathlib import Path
from datetime import datetime
FUZZY_BACKEND = "difflib"
def fuzzy_ratio(a: str, b: str) -> float:
return difflib.SequenceMatcher(None, a, b).ratio() * 100
def fuzzy_partial(a: str, b: str) -> float:
"""Slide `a` across `b` and return the best window score."""
la, lb = len(a), len(b)
if la == 0 or lb == 0:
return 0.0
if la > lb:
return fuzzy_ratio(a, b)
best = 0.0
for i in range(lb - la + 1):
window = b[i : i + la]
s = difflib.SequenceMatcher(None, a, window).ratio() * 100
if s > best:
best = s
return best
# ── OCR normalisation ─────────────────────────────────────────────────────────
OCR_FIXES = str.maketrans({
"0": "o",
"1": "l",
"|": "l",
"!": "i",
"@": "a",
"$": "s",
"5": "s",
"8": "b",
"6": "g",
})
_RN_RE = re.compile(r"rn")
def ocr_normalise(text: str) -> str:
t = text.lower().translate(OCR_FIXES)
t = _RN_RE.sub("m", t)
return t
# ── Helpers ───────────────────────────────────────────────────────────────────
def load_keywords(path: Path) -> list[str]:
"""
Load keywords from file.
Uses utf-8-sig so a Windows BOM (\\ufeff) is silently stripped.
"""
keywords = []
with open(path, encoding="utf-8-sig") as fh:
for line in fh:
kw = line.strip()
if kw and not kw.startswith("#"):
keywords.append(kw)
if not keywords:
sys.exit("ERROR: keyword file is empty.")
return keywords
def tokenise(text: str) -> list[tuple[int, str]]:
return [(m.start(), m.group()) for m in re.finditer(r"\w+", text)]
def get_context(text: str, char_pos: int, n_words: int = 12) -> str:
words = text.split()
pos = 0
word_idx = 0
for i, w in enumerate(words):
pos = text.find(w, pos)
if pos >= char_pos:
word_idx = i
break
pos += len(w)
start = max(0, word_idx - n_words)
end = min(len(words), word_idx + n_words + 1)
snippet = " ".join(words[start:end])
if start > 0:
snippet = "..." + snippet
if end < len(words):
snippet = snippet + "..."
return snippet
def score_match(keyword: str, token: str) -> float:
# Pre-filter 1: length ratio
kw_len, tok_len = len(keyword), len(token)
if tok_len == 0 or kw_len == 0:
return 0.0
length_ratio = kw_len / tok_len
if length_ratio < 0.5 or length_ratio > 2.0:
return 0.0
kw_norm = ocr_normalise(keyword)
tok_norm = ocr_normalise(token)
# Pre-filter 2: character overlap
if not set(kw_norm) & set(tok_norm):
return 0.0
if " " in keyword:
return fuzzy_partial(kw_norm, tok_norm)
else:
return max(fuzzy_ratio(kw_norm, tok_norm),
fuzzy_partial(kw_norm, tok_norm))
def search_file(text: str, keywords: list[str], threshold: float,
context_words: int) -> list[dict]:
tokens = tokenise(text)
hits = []
seen = set()
for kw in keywords:
kw_words = kw.split()
n = len(kw_words)
if n == 1:
for char_pos, tok in tokens:
score = score_match(kw, tok)
if score >= threshold:
key = (kw, char_pos)
if key not in seen:
seen.add(key)
hits.append({
"keyword": kw,
"score": round(score, 1),
"char_pos": char_pos,
"matched_text": tok,
"context": get_context(text, char_pos, context_words),
})
else:
for i in range(len(tokens) - n + 1):
window_pos = tokens[i][0]
window_text = " ".join(t for _, t in tokens[i : i + n])
score = score_match(kw, window_text)
if score >= threshold:
key = (kw, window_pos)
if key not in seen:
seen.add(key)
hits.append({
"keyword": kw,
"score": round(score, 1),
"char_pos": window_pos,
"matched_text": window_text,
"context": get_context(text, window_pos, context_words),
})
hits.sort(key=lambda h: h["score"], reverse=True)
return hits
def run_diagnostics(txt_files: list[Path], keywords: list[str],
encoding: str, threshold: float) -> None:
"""
Print detailed debug info then exit. Run with --debug to activate.
Inspects the first text file only.
"""
print("=" * 60)
print("DEBUG MODE")
print("=" * 60)
# Show exactly what the keyword file produced — repr() reveals
# invisible characters like BOM (\ufeff) or stray \r
print(f"\nKeywords loaded ({len(keywords)}):")
for i, kw in enumerate(keywords):
print(f" [{i}] repr={repr(kw)} normalised={repr(ocr_normalise(kw))}")
if not txt_files:
print("\nNo text files found.")
sys.exit(0)
first = txt_files[0]
print(f"\nFirst file : {first.name}")
try:
raw_bytes = first.read_bytes()
print(f" Size : {len(raw_bytes)} bytes")
print(f" First 80B : {repr(raw_bytes[:80])}")
text = raw_bytes.decode(encoding, errors="replace")
except Exception as e:
print(f" ERROR reading file: {e}")
sys.exit(1)
tokens = tokenise(text)
print(f" Tokens : {len(tokens)} total")
print(f" First 20 : {[t for _, t in tokens[:20]]}")
print(f"\nBest scores per keyword (threshold={threshold}):")
for kw in keywords:
scores = [(tok, score_match(kw, tok)) for _, tok in tokens]
scores.sort(key=lambda x: x[1], reverse=True)
top = scores[:5]
hit = "HIT" if top[0][1] >= threshold else "MISS"
print(f" kw={repr(kw):30s} [{hit}] top: "
+ ", ".join(f"{t!r}={s:.1f}" for t, s in top))
print("\nDone. Remove --debug to run normally.")
sys.exit(0)
# ── Main ──────────────────────────────────────────────────────────────────────
def parse_args():
p = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument("--text-dir", default="texts", metavar="DIR")
p.add_argument("--keywords", default="keywords.txt", metavar="FILE")
p.add_argument("--output", default="output.txt", metavar="FILE")
p.add_argument("--csv", default="output.csv", metavar="FILE")
p.add_argument("--threshold", default=75, type=float, metavar="0-100")
p.add_argument("--context", default=12, type=int, metavar="N")
p.add_argument("--encoding", default="utf-8-sig", metavar="ENC")
p.add_argument("--debug", action="store_true",
help="Print diagnostics for first file and exit")
return p.parse_args()
def main():
args = parse_args()
text_dir = Path(args.text_dir)
kw_file = Path(args.keywords)
out_txt = Path(args.output)
out_csv = Path(args.csv)
if not text_dir.is_dir():
sys.exit(f"ERROR: text directory not found: {text_dir}")
if not kw_file.is_file():
sys.exit(f"ERROR: keyword file not found: {kw_file}")
txt_files = sorted(text_dir.glob("*.txt"))
if not txt_files:
sys.exit(f"ERROR: no .txt files found in {text_dir}")
keywords = load_keywords(kw_file)
# ── Debug mode ────────────────────────────────────────────────────────────
if args.debug:
run_diagnostics(txt_files, keywords, args.encoding, args.threshold)
print(f"Fuzzy backend : {FUZZY_BACKEND}")
print(f"Keywords : {len(keywords)} -> {keywords}")
print(f"Text files : {len(txt_files)}")
print(f"Threshold : {args.threshold}")
print(f"Context words : {args.context}")
print(f"Encoding : {args.encoding}")
print()
all_results: list[dict] = []
file_hit_summary: dict = {}
for txt_path in txt_files:
print(f" Searching {txt_path.name} ...", end=" ", flush=True)
try:
text = txt_path.read_text(encoding=args.encoding, errors="replace")
except Exception as e:
print(f"[SKIP] {e}")
continue
hits = search_file(text, keywords, args.threshold, args.context)
if hits:
kws_found = {h["keyword"] for h in hits}
file_hit_summary[txt_path.name] = kws_found
for h in hits:
all_results.append({
"file": txt_path.name,
"keyword": h["keyword"],
"score": h["score"],
"char_pos": h["char_pos"],
"matched_text": h["matched_text"],
"context": h["context"],
"multi_hit": len(kws_found) > 1,
})
flag = " *" if len(kws_found) > 1 else ""
print(f"{len(hits):3d} hit(s) keywords: {', '.join(sorted(kws_found))}{flag}")
else:
print("no hits")
# ── Plain-text output ─────────────────────────────────────────────────────
run_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
divider = "-" * 80
files_with_hits = sorted(set(r["file"] for r in all_results))
files_with_hits.sort(
key=lambda f: (not len(file_hit_summary.get(f, set())) > 1, f)
)
with open(out_txt, "w", encoding="utf-8") as fh:
fh.write("OCR Keyword Search Results\n")
fh.write(f"Generated : {run_time}\n")
fh.write(f"Keywords : {', '.join(keywords)}\n")
fh.write(f"Threshold : {args.threshold}\n")
fh.write(f"Backend : {FUZZY_BACKEND}\n")
fh.write(divider + "\n\n")
for fname in files_with_hits:
file_hits = [r for r in all_results if r["file"] == fname]
kws = file_hit_summary.get(fname, set())
multi = " * MULTIPLE KEYWORDS" if len(kws) > 1 else ""
fh.write(f"FILE: {fname}{multi}\n")
fh.write(f" Keywords matched: {', '.join(sorted(kws))}\n")
for h in file_hits:
fh.write(f" [{h['score']:5.1f}%] kw='{h['keyword']}' "
f"pos={h['char_pos']} matched='{h['matched_text']}'\n")
context_wrapped = textwrap.fill(
h["context"], width=72,
initial_indent=" ",
subsequent_indent=" "
)
fh.write(context_wrapped + "\n\n")
fh.write(divider + "\n\n")
multi_files = [f for f, kws in file_hit_summary.items() if len(kws) > 1]
fh.write("SUMMARY\n")
fh.write(f" Files searched : {len(txt_files)}\n")
fh.write(f" Files with hits: {len(files_with_hits)}\n")
fh.write(f" Total hits : {len(all_results)}\n")
fh.write(f" Multi-kw files : {len(multi_files)}\n")
# ── CSV output ────────────────────────────────────────────────────────────
csv_fields = ["file", "keyword", "score", "char_pos",
"matched_text", "multi_hit", "context"]
with open(out_csv, "w", encoding="utf-8", newline="") as fh:
writer = csv.DictWriter(fh, fieldnames=csv_fields)
writer.writeheader()
for row in sorted(all_results, key=lambda r: r["score"], reverse=True):
writer.writerow({k: row[k] for k in csv_fields})
# ── Console summary ───────────────────────────────────────────────────────
multi_files = [f for f, kws in file_hit_summary.items() if len(kws) > 1]
print()
print(divider)
print(f"Files searched : {len(txt_files)}")
print(f"Files with hits : {len(files_with_hits)}")
print(f"Total hits : {len(all_results)}")
print(f"Multi-keyword * : {len(multi_files)}")
print(f"Plain-text map -> {out_txt}")
print(f"CSV (sortable) -> {out_csv}")
print(divider)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment