|
#!/usr/bin/env python3 |
|
""" |
|
Playwright automation to search FU Berlin Primo and capture results. |
|
|
|
Usage: |
|
python primo_search.py "your search query" |
|
python primo_search.py "your search query" --output results.png |
|
python primo_search.py "your search query" --headless |
|
""" |
|
|
|
import argparse |
|
import json |
|
import re |
|
import sys |
|
import time |
|
from datetime import datetime |
|
from pathlib import Path |
|
|
|
|
|
def safe_filename(s: str, max_len: int = 80) -> str: |
|
"""Convert string to filesystem-safe filename.""" |
|
# Replace unsafe chars with underscore |
|
safe = re.sub(r'[^\w\s-]', '', s) |
|
safe = re.sub(r'[\s]+', '_', safe) |
|
return safe[:max_len].strip('_') |
|
|
|
|
|
def parse_citation(citation: str) -> dict: |
|
"""Extract author, year, and title from a citation string.""" |
|
result = {"author": "", "year": "", "title": "", "raw": citation} |
|
|
|
# Year: 4 digits, often in parentheses |
|
year_match = re.search(r'\((\d{4})\)', citation) |
|
if year_match: |
|
result["year"] = year_match.group(1) |
|
|
|
# Author: everything before the year |
|
if year_match: |
|
author_part = citation[:year_match.start()].strip().rstrip(',') |
|
# Take just the last name (before comma) |
|
result["author"] = author_part.split(',')[0].strip() |
|
|
|
# Title: after "Year). " until next period or slash |
|
title_match = re.search(r'\d{4}\)\.\s*([^/\.]+)', citation) |
|
if title_match: |
|
result["title"] = title_match.group(1).strip() |
|
|
|
return result |
|
|
|
|
|
def build_query(citation: str) -> str: |
|
"""Build a search query from a citation string.""" |
|
parts = parse_citation(citation) |
|
query_parts = [] |
|
|
|
if parts["author"]: |
|
query_parts.append(parts["author"]) |
|
if parts["year"]: |
|
query_parts.append(parts["year"]) |
|
if parts["title"]: |
|
# Take first few significant words from title |
|
words = [w for w in parts["title"].split() if len(w) > 3][:5] |
|
query_parts.extend(words) |
|
|
|
return " ".join(query_parts) if query_parts else citation[:100] |
|
|
|
|
|
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout |
|
|
|
|
|
PRIMO_URL = "https://fu-berlin.primo.exlibrisgroup.com/discovery/search?vid=49KOBV_FUB:FUB" |
|
|
|
|
|
def search_primo(query: str, output: str = "results.png", headless: bool = False, |
|
timeout: int = 30000, delay: float = 2.0) -> Path: |
|
""" |
|
Search FU Berlin Primo with a query and capture the results page. |
|
|
|
Args: |
|
query: Search query string |
|
output: Output filename for screenshot |
|
headless: Run browser without GUI |
|
timeout: Page load timeout in milliseconds |
|
delay: Seconds to wait after search for results to load |
|
|
|
Returns: |
|
Path to the saved screenshot |
|
""" |
|
output_path = Path(output) |
|
|
|
with sync_playwright() as p: |
|
browser = p.chromium.launch(headless=headless) |
|
context = browser.new_context( |
|
viewport={"width": 1920, "height": 1080} |
|
) |
|
page = context.new_page() |
|
|
|
print(f"Navigating to Primo...") |
|
page.goto(PRIMO_URL, wait_until="networkidle", timeout=timeout) |
|
|
|
# Wait for the search input to be ready |
|
# The search input has various possible selectors on Primo |
|
search_selectors = [ |
|
"input#searchBar", |
|
"input[id*='search']", |
|
"input[name='query']", |
|
"input[type='search']", |
|
"input[placeholder*='Search']", |
|
"prm-search-bar input", |
|
] |
|
|
|
search_input = None |
|
for selector in search_selectors: |
|
try: |
|
search_input = page.wait_for_selector(selector, timeout=5000) |
|
if search_input: |
|
print(f"Found search input with selector: {selector}") |
|
break |
|
except PlaywrightTimeout: |
|
continue |
|
|
|
if not search_input: |
|
# Try a more aggressive approach - find any visible text input |
|
search_input = page.locator("input[type='text']").first |
|
print("Using fallback: first text input found") |
|
|
|
print(f"Entering query: {query}") |
|
search_input.fill(query) |
|
|
|
# Submit the search |
|
search_input.press("Enter") |
|
|
|
# Wait for results to load |
|
print(f"Waiting {delay}s for results to load...") |
|
time.sleep(delay) |
|
|
|
# Also wait for network to settle |
|
try: |
|
page.wait_for_load_state("networkidle", timeout=10000) |
|
except PlaywrightTimeout: |
|
print("Network still active, continuing anyway...") |
|
|
|
# Take screenshot |
|
print(f"Capturing screenshot to: {output_path}") |
|
page.screenshot(path=str(output_path), full_page=True) |
|
|
|
# Also save the page HTML for later analysis |
|
html_path = output_path.with_suffix(".html") |
|
html_content = page.content() |
|
html_path.write_text(html_content) |
|
print(f"Saved HTML to: {html_path}") |
|
|
|
# Check for results |
|
if "Es gibt keine Ergebnisse" in html_content: |
|
status = "\033[31mno such title\033[0m" |
|
else: |
|
status = "\033[32mOK\033[0m" |
|
|
|
browser.close() |
|
|
|
return output_path, status |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser( |
|
description="Search FU Berlin Primo and capture results page" |
|
) |
|
parser.add_argument( |
|
"query", |
|
nargs="?", |
|
help="Search query or full citation (use --citation to parse)" |
|
) |
|
parser.add_argument( |
|
"-c", "--citation", |
|
action="store_true", |
|
help="Parse input as citation and extract author/year/title" |
|
) |
|
parser.add_argument( |
|
"-f", "--from-file", |
|
type=str, |
|
metavar="FILE", |
|
help="Read citations from file (one per line, use - for stdin)" |
|
) |
|
parser.add_argument( |
|
"-j", "--json", |
|
action="store_true", |
|
help="Output results as JSON (batch mode only)" |
|
) |
|
parser.add_argument( |
|
"-o", "--output", |
|
default=None, |
|
help="Output filename for screenshot (default: based on query)" |
|
) |
|
parser.add_argument( |
|
"--headless", |
|
action="store_true", |
|
help="Run browser in headless mode (no GUI)" |
|
) |
|
parser.add_argument( |
|
"--timeout", |
|
type=int, |
|
default=30000, |
|
help="Page load timeout in milliseconds (default: 30000)" |
|
) |
|
parser.add_argument( |
|
"--delay", |
|
type=float, |
|
default=3.0, |
|
help="Seconds to wait for results to load (default: 3.0)" |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
# Batch mode from file |
|
if args.from_file: |
|
if args.from_file == "-": |
|
lines = sys.stdin.read().splitlines() |
|
else: |
|
with open(args.from_file) as f: |
|
lines = f.read().splitlines() |
|
|
|
entries = [line.strip() for line in lines if line.strip()] |
|
|
|
for entry in entries: |
|
query = build_query(entry) |
|
ts = datetime.now().strftime("%Y%m%d%H%M%S") |
|
output = f"result-{ts}-{safe_filename(entry)}.png" |
|
|
|
try: |
|
_, status_str = search_primo( |
|
query=query, |
|
output=output, |
|
headless=args.headless, |
|
timeout=args.timeout, |
|
delay=args.delay |
|
) |
|
found = "no such title" not in status_str |
|
if args.json: |
|
print(json.dumps({"entry": entry, "query": query, "found": found})) |
|
else: |
|
print(f"{entry}\t{status_str}") |
|
except Exception as e: |
|
if args.json: |
|
print(json.dumps({"entry": entry, "query": query, "found": False, "error": str(e)})) |
|
else: |
|
print(f"{entry}\t\033[31mError: {e}\033[0m", file=sys.stderr) |
|
|
|
return |
|
|
|
# Single query mode |
|
if not args.query: |
|
parser.error("query is required unless using --from-file") |
|
|
|
original_input = args.query |
|
query = args.query |
|
if args.citation: |
|
query = build_query(args.query) |
|
print(f"Parsed citation -> query: {query}") |
|
|
|
ts = datetime.now().strftime("%Y%m%d%H%M%S") |
|
output = args.output or f"result-{ts}-{safe_filename(original_input)}.png" |
|
|
|
try: |
|
output_path, status = search_primo( |
|
query=query, |
|
output=output, |
|
headless=args.headless, |
|
timeout=args.timeout, |
|
delay=args.delay |
|
) |
|
print(status) |
|
except Exception as e: |
|
print(f"Error: {e}", file=sys.stderr) |
|
sys.exit(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |