Last active
February 24, 2026 19:10
-
-
Save shawngraham/623b74639b4aece14a4a02c8ec8e91fd to your computer and use it in GitHub Desktop.
use ocr_keyword_search.py for when you have a folder of ocr'd text, and a file with keywords ; use groq-ocr.py for when you need to get that text in the first place.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| groq_ocr.py | |
| Processes newspaper images using Groq's vision API and extracts individual | |
| articles to a CSV. Each row represents one article with associated page metadata. | |
| Usage: | |
| python newspaper_ocr.py --input_dir processed_output/images --output ocr_results.csv | |
| Requirements: | |
| pip install groq | |
| """ | |
| import argparse | |
| import base64 | |
| import csv | |
| import json | |
| import os | |
| import time | |
| from pathlib import Path | |
| from groq import Groq | |
| # ── Configuration ────────────────────────────────────────────────────────────── | |
| GROQ_MODEL = "meta-llama/llama-4-scout-17b-16e-instruct" | |
| SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png"} | |
| SLEEP_BETWEEN_REQUESTS = 1 # seconds – be kind to the API | |
| # ── Prompts ──────────────────────────────────────────────────────────────────── | |
| EXTRACTION_PROMPT = """ | |
| You are a newspaper digitisation assistant. Analyse this newspaper page image and | |
| extract every distinct article you can see. | |
| Return ONLY a JSON object with the following structure — no markdown, no extra text: | |
| { | |
| "page_metadata": { | |
| "publication_name": "...", | |
| "publication_date": "YYYY-MM-DD or best guess, empty string if unknown", | |
| "edition": "e.g. Morning, Evening, or empty string", | |
| "page_number": "integer or empty string", | |
| "section": "e.g. News, Sport, Business, or empty string", | |
| "language": "ISO 639-1 code, e.g. en" | |
| }, | |
| "articles": [ | |
| { | |
| "headline": "Article headline or title", | |
| "byline": "Author name(s) or empty string", | |
| "column": "Column position on page, e.g. 1-3 or empty string", | |
| "body_text": "Full article text, preserving paragraphs with \\n", | |
| "article_type": "news | opinion | advertisement | obituary | letter | other", | |
| "notes": "Any digitisation caveats (partial text, poor scan, etc.) or empty string" | |
| } | |
| ] | |
| } | |
| If no articles are found return {"page_metadata": {}, "articles": []}. | |
| """.strip() | |
| # ── Helpers ──────────────────────────────────────────────────────────────────── | |
| def encode_image(image_path: Path) -> tuple[str, str]: | |
| """Return (base64_string, mime_type) for the image.""" | |
| suffix = image_path.suffix.lower() | |
| mime = "image/jpeg" if suffix in {".jpg", ".jpeg"} else "image/png" | |
| with open(image_path, "rb") as f: | |
| return base64.b64encode(f.read()).decode("utf-8"), mime | |
| def call_groq(client: Groq, image_path: Path) -> dict: | |
| """Send one image to Groq and return the parsed JSON response.""" | |
| b64, mime = encode_image(image_path) | |
| response = client.chat.completions.create( | |
| model=GROQ_MODEL, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": EXTRACTION_PROMPT}, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:{mime};base64,{b64}"}, | |
| }, | |
| ], | |
| } | |
| ], | |
| temperature=0.0, # deterministic for structured extraction | |
| ) | |
| raw = response.choices[0].message.content.strip() | |
| # Strip accidental markdown fences | |
| if raw.startswith("```"): | |
| raw = raw.split("```")[1] | |
| if raw.startswith("json"): | |
| raw = raw[4:] | |
| raw = raw.strip() | |
| return json.loads(raw) | |
| def flatten_to_rows(image_path: Path, parsed: dict) -> list[dict]: | |
| """ | |
| Turn the nested JSON for one page into flat rows suitable for CSV. | |
| One row per article, page metadata repeated on every row. | |
| """ | |
| meta = parsed.get("page_metadata", {}) | |
| articles = parsed.get("articles", []) | |
| base_row = { | |
| "source_file": image_path.name, | |
| "publication_name": meta.get("publication_name", ""), | |
| "publication_date": meta.get("publication_date", ""), | |
| "edition": meta.get("edition", ""), | |
| "page_number": meta.get("page_number", ""), | |
| "section": meta.get("section", ""), | |
| "language": meta.get("language", ""), | |
| } | |
| if not articles: | |
| # Still emit one row so the page is recorded even if extraction failed | |
| return [{**base_row, "headline": "", "byline": "", "column": "", | |
| "body_text": "", "article_type": "", "notes": "no articles extracted"}] | |
| rows = [] | |
| for art in articles: | |
| rows.append({ | |
| **base_row, | |
| "headline": art.get("headline", ""), | |
| "byline": art.get("byline", ""), | |
| "column": art.get("column", ""), | |
| "body_text": art.get("body_text", ""), | |
| "article_type": art.get("article_type", ""), | |
| "notes": art.get("notes", ""), | |
| }) | |
| return rows | |
| CSV_FIELDNAMES = [ | |
| "source_file", "publication_name", "publication_date", "edition", | |
| "page_number", "section", "language", | |
| "headline", "byline", "column", "body_text", "article_type", "notes", | |
| ] | |
| # ── Main ─────────────────────────────────────────────────────────────────────── | |
| def main(): | |
| parser = argparse.ArgumentParser(description="OCR newspaper images → CSV via Groq") | |
| parser.add_argument( | |
| "--input_dir", default="processed_output/images", | |
| help="Directory containing .jpg / .jpeg / .png newspaper scans", | |
| ) | |
| parser.add_argument( | |
| "--output", default="ocr_results.csv", | |
| help="Path for the output CSV file", | |
| ) | |
| parser.add_argument( | |
| "--api_key", default=None, | |
| help="Groq API key (falls back to GROQ_API_KEY env var)", | |
| ) | |
| args = parser.parse_args() | |
| api_key = args.api_key or os.environ.get("GROQ_API_KEY") | |
| if not api_key: | |
| raise ValueError( | |
| "Groq API key not found. Set GROQ_API_KEY env var or pass --api_key." | |
| ) | |
| input_dir = Path(args.input_dir) | |
| if not input_dir.is_dir(): | |
| raise FileNotFoundError(f"Image directory not found: {input_dir}") | |
| images = sorted( | |
| p for p in input_dir.iterdir() | |
| if p.suffix.lower() in SUPPORTED_EXTENSIONS | |
| ) | |
| if not images: | |
| print(f"No supported image files found in {input_dir}") | |
| return | |
| client = Groq(api_key=api_key) | |
| output_path = Path(args.output) | |
| total_articles = 0 | |
| with open(output_path, "w", newline="", encoding="utf-8") as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=CSV_FIELDNAMES) | |
| writer.writeheader() | |
| for i, img_path in enumerate(images, 1): | |
| print(f"[{i}/{len(images)}] Processing {img_path.name} …", end=" ", flush=True) | |
| try: | |
| parsed = call_groq(client, img_path) | |
| rows = flatten_to_rows(img_path, parsed) | |
| writer.writerows(rows) | |
| csvfile.flush() # write incrementally in case of crash | |
| print(f"→ {len(rows)} article(s) extracted") | |
| total_articles += len(rows) | |
| except json.JSONDecodeError as e: | |
| print(f"⚠ JSON parse error: {e}") | |
| writer.writerow({ | |
| "source_file": img_path.name, "notes": f"JSON parse error: {e}", | |
| **{k: "" for k in CSV_FIELDNAMES if k not in ("source_file", "notes")}, | |
| }) | |
| except Exception as e: | |
| print(f"⚠ Error: {e}") | |
| writer.writerow({ | |
| "source_file": img_path.name, "notes": f"Error: {e}", | |
| **{k: "" for k in CSV_FIELDNAMES if k not in ("source_file", "notes")}, | |
| }) | |
| if i < len(images): | |
| time.sleep(SLEEP_BETWEEN_REQUESTS) | |
| print(f"\nDone. {total_articles} articles written to {output_path}") | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| ocr_keyword_search.py | |
| ───────────────────── | |
| Fuzzy keyword search across OCR'd text files. | |
| Usage: | |
| python ocr_keyword_search.py [options] | |
| Options: | |
| --text-dir DIR Directory containing .txt files (default: ./texts) | |
| --keywords FILE Keyword list, one per line (default: keywords.txt) | |
| --output FILE Plain-text results map (default: output.txt) | |
| --csv FILE CSV results (sortable) (default: output.csv) | |
| --threshold 0-100 Minimum fuzzy match score (default: 75) | |
| --context N Words of context around hit (default: 12) | |
| --encoding ENC Text file encoding (default: utf-8-sig) | |
| --debug Print diagnostics for first file and exit | |
| Requirements: | |
| None beyond the standard library. | |
| """ | |
| import argparse | |
| import csv | |
| import difflib | |
| import re | |
| import sys | |
| import textwrap | |
| from pathlib import Path | |
| from datetime import datetime | |
| FUZZY_BACKEND = "difflib" | |
| def fuzzy_ratio(a: str, b: str) -> float: | |
| return difflib.SequenceMatcher(None, a, b).ratio() * 100 | |
| def fuzzy_partial(a: str, b: str) -> float: | |
| """Slide `a` across `b` and return the best window score.""" | |
| la, lb = len(a), len(b) | |
| if la == 0 or lb == 0: | |
| return 0.0 | |
| if la > lb: | |
| return fuzzy_ratio(a, b) | |
| best = 0.0 | |
| for i in range(lb - la + 1): | |
| window = b[i : i + la] | |
| s = difflib.SequenceMatcher(None, a, window).ratio() * 100 | |
| if s > best: | |
| best = s | |
| return best | |
| # ── OCR normalisation ───────────────────────────────────────────────────────── | |
| OCR_FIXES = str.maketrans({ | |
| "0": "o", | |
| "1": "l", | |
| "|": "l", | |
| "!": "i", | |
| "@": "a", | |
| "$": "s", | |
| "5": "s", | |
| "8": "b", | |
| "6": "g", | |
| }) | |
| _RN_RE = re.compile(r"rn") | |
| def ocr_normalise(text: str) -> str: | |
| t = text.lower().translate(OCR_FIXES) | |
| t = _RN_RE.sub("m", t) | |
| return t | |
| # ── Helpers ─────────────────────────────────────────────────────────────────── | |
| def load_keywords(path: Path) -> list[str]: | |
| """ | |
| Load keywords from file. | |
| Uses utf-8-sig so a Windows BOM (\\ufeff) is silently stripped. | |
| """ | |
| keywords = [] | |
| with open(path, encoding="utf-8-sig") as fh: | |
| for line in fh: | |
| kw = line.strip() | |
| if kw and not kw.startswith("#"): | |
| keywords.append(kw) | |
| if not keywords: | |
| sys.exit("ERROR: keyword file is empty.") | |
| return keywords | |
| def tokenise(text: str) -> list[tuple[int, str]]: | |
| return [(m.start(), m.group()) for m in re.finditer(r"\w+", text)] | |
| def get_context(text: str, char_pos: int, n_words: int = 12) -> str: | |
| words = text.split() | |
| pos = 0 | |
| word_idx = 0 | |
| for i, w in enumerate(words): | |
| pos = text.find(w, pos) | |
| if pos >= char_pos: | |
| word_idx = i | |
| break | |
| pos += len(w) | |
| start = max(0, word_idx - n_words) | |
| end = min(len(words), word_idx + n_words + 1) | |
| snippet = " ".join(words[start:end]) | |
| if start > 0: | |
| snippet = "..." + snippet | |
| if end < len(words): | |
| snippet = snippet + "..." | |
| return snippet | |
| def score_match(keyword: str, token: str) -> float: | |
| # Pre-filter 1: length ratio | |
| kw_len, tok_len = len(keyword), len(token) | |
| if tok_len == 0 or kw_len == 0: | |
| return 0.0 | |
| length_ratio = kw_len / tok_len | |
| if length_ratio < 0.5 or length_ratio > 2.0: | |
| return 0.0 | |
| kw_norm = ocr_normalise(keyword) | |
| tok_norm = ocr_normalise(token) | |
| # Pre-filter 2: character overlap | |
| if not set(kw_norm) & set(tok_norm): | |
| return 0.0 | |
| if " " in keyword: | |
| return fuzzy_partial(kw_norm, tok_norm) | |
| else: | |
| return max(fuzzy_ratio(kw_norm, tok_norm), | |
| fuzzy_partial(kw_norm, tok_norm)) | |
| def search_file(text: str, keywords: list[str], threshold: float, | |
| context_words: int) -> list[dict]: | |
| tokens = tokenise(text) | |
| hits = [] | |
| seen = set() | |
| for kw in keywords: | |
| kw_words = kw.split() | |
| n = len(kw_words) | |
| if n == 1: | |
| for char_pos, tok in tokens: | |
| score = score_match(kw, tok) | |
| if score >= threshold: | |
| key = (kw, char_pos) | |
| if key not in seen: | |
| seen.add(key) | |
| hits.append({ | |
| "keyword": kw, | |
| "score": round(score, 1), | |
| "char_pos": char_pos, | |
| "matched_text": tok, | |
| "context": get_context(text, char_pos, context_words), | |
| }) | |
| else: | |
| for i in range(len(tokens) - n + 1): | |
| window_pos = tokens[i][0] | |
| window_text = " ".join(t for _, t in tokens[i : i + n]) | |
| score = score_match(kw, window_text) | |
| if score >= threshold: | |
| key = (kw, window_pos) | |
| if key not in seen: | |
| seen.add(key) | |
| hits.append({ | |
| "keyword": kw, | |
| "score": round(score, 1), | |
| "char_pos": window_pos, | |
| "matched_text": window_text, | |
| "context": get_context(text, window_pos, context_words), | |
| }) | |
| hits.sort(key=lambda h: h["score"], reverse=True) | |
| return hits | |
| def run_diagnostics(txt_files: list[Path], keywords: list[str], | |
| encoding: str, threshold: float) -> None: | |
| """ | |
| Print detailed debug info then exit. Run with --debug to activate. | |
| Inspects the first text file only. | |
| """ | |
| print("=" * 60) | |
| print("DEBUG MODE") | |
| print("=" * 60) | |
| # Show exactly what the keyword file produced — repr() reveals | |
| # invisible characters like BOM (\ufeff) or stray \r | |
| print(f"\nKeywords loaded ({len(keywords)}):") | |
| for i, kw in enumerate(keywords): | |
| print(f" [{i}] repr={repr(kw)} normalised={repr(ocr_normalise(kw))}") | |
| if not txt_files: | |
| print("\nNo text files found.") | |
| sys.exit(0) | |
| first = txt_files[0] | |
| print(f"\nFirst file : {first.name}") | |
| try: | |
| raw_bytes = first.read_bytes() | |
| print(f" Size : {len(raw_bytes)} bytes") | |
| print(f" First 80B : {repr(raw_bytes[:80])}") | |
| text = raw_bytes.decode(encoding, errors="replace") | |
| except Exception as e: | |
| print(f" ERROR reading file: {e}") | |
| sys.exit(1) | |
| tokens = tokenise(text) | |
| print(f" Tokens : {len(tokens)} total") | |
| print(f" First 20 : {[t for _, t in tokens[:20]]}") | |
| print(f"\nBest scores per keyword (threshold={threshold}):") | |
| for kw in keywords: | |
| scores = [(tok, score_match(kw, tok)) for _, tok in tokens] | |
| scores.sort(key=lambda x: x[1], reverse=True) | |
| top = scores[:5] | |
| hit = "HIT" if top[0][1] >= threshold else "MISS" | |
| print(f" kw={repr(kw):30s} [{hit}] top: " | |
| + ", ".join(f"{t!r}={s:.1f}" for t, s in top)) | |
| print("\nDone. Remove --debug to run normally.") | |
| sys.exit(0) | |
| # ── Main ────────────────────────────────────────────────────────────────────── | |
| def parse_args(): | |
| p = argparse.ArgumentParser(description=__doc__, | |
| formatter_class=argparse.RawDescriptionHelpFormatter) | |
| p.add_argument("--text-dir", default="texts", metavar="DIR") | |
| p.add_argument("--keywords", default="keywords.txt", metavar="FILE") | |
| p.add_argument("--output", default="output.txt", metavar="FILE") | |
| p.add_argument("--csv", default="output.csv", metavar="FILE") | |
| p.add_argument("--threshold", default=75, type=float, metavar="0-100") | |
| p.add_argument("--context", default=12, type=int, metavar="N") | |
| p.add_argument("--encoding", default="utf-8-sig", metavar="ENC") | |
| p.add_argument("--debug", action="store_true", | |
| help="Print diagnostics for first file and exit") | |
| return p.parse_args() | |
| def main(): | |
| args = parse_args() | |
| text_dir = Path(args.text_dir) | |
| kw_file = Path(args.keywords) | |
| out_txt = Path(args.output) | |
| out_csv = Path(args.csv) | |
| if not text_dir.is_dir(): | |
| sys.exit(f"ERROR: text directory not found: {text_dir}") | |
| if not kw_file.is_file(): | |
| sys.exit(f"ERROR: keyword file not found: {kw_file}") | |
| txt_files = sorted(text_dir.glob("*.txt")) | |
| if not txt_files: | |
| sys.exit(f"ERROR: no .txt files found in {text_dir}") | |
| keywords = load_keywords(kw_file) | |
| # ── Debug mode ──────────────────────────────────────────────────────────── | |
| if args.debug: | |
| run_diagnostics(txt_files, keywords, args.encoding, args.threshold) | |
| print(f"Fuzzy backend : {FUZZY_BACKEND}") | |
| print(f"Keywords : {len(keywords)} -> {keywords}") | |
| print(f"Text files : {len(txt_files)}") | |
| print(f"Threshold : {args.threshold}") | |
| print(f"Context words : {args.context}") | |
| print(f"Encoding : {args.encoding}") | |
| print() | |
| all_results: list[dict] = [] | |
| file_hit_summary: dict = {} | |
| for txt_path in txt_files: | |
| print(f" Searching {txt_path.name} ...", end=" ", flush=True) | |
| try: | |
| text = txt_path.read_text(encoding=args.encoding, errors="replace") | |
| except Exception as e: | |
| print(f"[SKIP] {e}") | |
| continue | |
| hits = search_file(text, keywords, args.threshold, args.context) | |
| if hits: | |
| kws_found = {h["keyword"] for h in hits} | |
| file_hit_summary[txt_path.name] = kws_found | |
| for h in hits: | |
| all_results.append({ | |
| "file": txt_path.name, | |
| "keyword": h["keyword"], | |
| "score": h["score"], | |
| "char_pos": h["char_pos"], | |
| "matched_text": h["matched_text"], | |
| "context": h["context"], | |
| "multi_hit": len(kws_found) > 1, | |
| }) | |
| flag = " *" if len(kws_found) > 1 else "" | |
| print(f"{len(hits):3d} hit(s) keywords: {', '.join(sorted(kws_found))}{flag}") | |
| else: | |
| print("no hits") | |
| # ── Plain-text output ───────────────────────────────────────────────────── | |
| run_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| divider = "-" * 80 | |
| files_with_hits = sorted(set(r["file"] for r in all_results)) | |
| files_with_hits.sort( | |
| key=lambda f: (not len(file_hit_summary.get(f, set())) > 1, f) | |
| ) | |
| with open(out_txt, "w", encoding="utf-8") as fh: | |
| fh.write("OCR Keyword Search Results\n") | |
| fh.write(f"Generated : {run_time}\n") | |
| fh.write(f"Keywords : {', '.join(keywords)}\n") | |
| fh.write(f"Threshold : {args.threshold}\n") | |
| fh.write(f"Backend : {FUZZY_BACKEND}\n") | |
| fh.write(divider + "\n\n") | |
| for fname in files_with_hits: | |
| file_hits = [r for r in all_results if r["file"] == fname] | |
| kws = file_hit_summary.get(fname, set()) | |
| multi = " * MULTIPLE KEYWORDS" if len(kws) > 1 else "" | |
| fh.write(f"FILE: {fname}{multi}\n") | |
| fh.write(f" Keywords matched: {', '.join(sorted(kws))}\n") | |
| for h in file_hits: | |
| fh.write(f" [{h['score']:5.1f}%] kw='{h['keyword']}' " | |
| f"pos={h['char_pos']} matched='{h['matched_text']}'\n") | |
| context_wrapped = textwrap.fill( | |
| h["context"], width=72, | |
| initial_indent=" ", | |
| subsequent_indent=" " | |
| ) | |
| fh.write(context_wrapped + "\n\n") | |
| fh.write(divider + "\n\n") | |
| multi_files = [f for f, kws in file_hit_summary.items() if len(kws) > 1] | |
| fh.write("SUMMARY\n") | |
| fh.write(f" Files searched : {len(txt_files)}\n") | |
| fh.write(f" Files with hits: {len(files_with_hits)}\n") | |
| fh.write(f" Total hits : {len(all_results)}\n") | |
| fh.write(f" Multi-kw files : {len(multi_files)}\n") | |
| # ── CSV output ──────────────────────────────────────────────────────────── | |
| csv_fields = ["file", "keyword", "score", "char_pos", | |
| "matched_text", "multi_hit", "context"] | |
| with open(out_csv, "w", encoding="utf-8", newline="") as fh: | |
| writer = csv.DictWriter(fh, fieldnames=csv_fields) | |
| writer.writeheader() | |
| for row in sorted(all_results, key=lambda r: r["score"], reverse=True): | |
| writer.writerow({k: row[k] for k in csv_fields}) | |
| # ── Console summary ─────────────────────────────────────────────────────── | |
| multi_files = [f for f, kws in file_hit_summary.items() if len(kws) > 1] | |
| print() | |
| print(divider) | |
| print(f"Files searched : {len(txt_files)}") | |
| print(f"Files with hits : {len(files_with_hits)}") | |
| print(f"Total hits : {len(all_results)}") | |
| print(f"Multi-keyword * : {len(multi_files)}") | |
| print(f"Plain-text map -> {out_txt}") | |
| print(f"CSV (sortable) -> {out_csv}") | |
| print(divider) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment