Last active
January 22, 2026 08:24
-
-
Save hotchpotch/4f5c2f5514ab3c4205aac461a42213ed to your computer and use it in GitHub Desktop.
pdf2ja script (PEP 723, updated)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # /// script | |
| # requires-python = ">=3.12,<3.13" | |
| # dependencies = [ | |
| # "plamo-translate-cli", | |
| # "pdf2zh-next", | |
| # ] | |
| # /// | |
| # NOTE: PDFMathTranslate-next is published on PyPI as pdf2zh-next. | |
| # License: MIT | |
| """Translate PDFs to Japanese using pdf2zh_next + plamo-translate. | |
| Defaults: | |
| - If no inputs are provided, scans ~/Downloads for PDFs. | |
| - Outputs are written next to each input PDF unless --output-dir is specified. | |
| - Skips files that already have a translated output (unless --force). | |
| - Uses pdf2zh_next with plamo-translate via CLI, Japanese output, | |
| no watermark, and alternating bilingual pages. | |
| - Use --dry-run to print planned commands and outputs without running. | |
| - Use --no-title-slug to disable title-based suffixes. | |
| - Use --no-strip-paren-index to keep "(1)"-style suffixes intact. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import atexit | |
| import os | |
| import re | |
| import shlex | |
| import shutil | |
| import signal | |
| import socket | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| from pathlib import Path | |
| from typing import Iterable, List, Optional, Tuple | |
| PLAMO_CLI_COMMAND = os.environ.get("PLAMO_CLI_COMMAND", "uvx plamo-translate") | |
| PDF2ZH_COMMAND = os.environ.get("PDF2ZH_COMMAND", "uvx pdf2zh_next") | |
| DEFAULT_AUTO_TRANSLATE_DIR = Path.home() / "Downloads" | |
| PDF2ZH_ARGS = [ | |
| "--lang-in", | |
| "en", | |
| "--lang-out", | |
| "ja", | |
| "--clitranslator", | |
| "--clitranslator-command", | |
| PLAMO_CLI_COMMAND, | |
| "--watermark-output-mode", | |
| "no_watermark", | |
| "--use-alternating-pages-dual", | |
| ] | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser( | |
| description="Translate PDFs to Japanese with pdf2zh_next via uvx." | |
| ) | |
| parser.add_argument( | |
| "pdf", | |
| nargs="*", | |
| type=Path, | |
| help="PDF files to translate (positional).", | |
| ) | |
| parser.add_argument( | |
| "-t", | |
| "--target-pdf", | |
| nargs="+", | |
| action="extend", | |
| type=Path, | |
| default=[], | |
| help="One or more PDF files to translate (repeatable).", | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| type=Path, | |
| default=None, | |
| help="Output PDF file path (only valid with a single input).", | |
| ) | |
| parser.add_argument( | |
| "-o", | |
| "--output-dir", | |
| type=Path, | |
| default=None, | |
| help=( | |
| "Output directory for translated PDFs. " | |
| "If omitted, outputs are written next to each input PDF." | |
| ), | |
| ) | |
| parser.add_argument( | |
| "-a", | |
| "--auto-translate-dir", | |
| type=Path, | |
| default=DEFAULT_AUTO_TRANSLATE_DIR, | |
| help="Directory to scan for PDFs when no inputs are specified.", | |
| ) | |
| parser.add_argument( | |
| "-f", | |
| "--force", | |
| action="store_true", | |
| help="Overwrite existing translated files.", | |
| ) | |
| parser.add_argument( | |
| "-n", | |
| "--dry-run", | |
| action="store_true", | |
| help="Print planned commands and outputs without running translation.", | |
| ) | |
| parser.add_argument( | |
| "--no-title-slug", | |
| action="store_true", | |
| help="Do not append a title-based suffix to output filenames.", | |
| ) | |
| parser.add_argument( | |
| "--no-strip-paren-index", | |
| action="store_true", | |
| help='Keep "(1)"-style suffixes instead of stripping them.', | |
| ) | |
| return parser.parse_args() | |
| def ensure_command_available(command: str) -> None: | |
| exe = shlex.split(command)[0] | |
| if shutil.which(exe) is None: | |
| sys.stderr.write(f"Command not found in PATH: {exe}\n") | |
| sys.exit(1) | |
| def load_pdf2zh_args() -> list[str]: | |
| env_args = os.environ.get("PDF2ZH_ARGS") | |
| if env_args: | |
| return shlex.split(env_args) | |
| return list(PDF2ZH_ARGS) | |
| def adjust_pdf2zh_args(args: list[str]) -> list[str]: | |
| if "--no-dual" in args: | |
| return [arg for arg in args if arg != "--use-alternating-pages-dual"] | |
| return args | |
| def strip_trailing_paren_index(stem: str) -> str: | |
| """Remove trailing ' (number)' pattern from a filename stem.""" | |
| return re.sub(r"\s*\(\d+\)\s*$", "", stem) | |
| def normalize_base(stem: str, strip_paren_index: bool) -> str: | |
| return strip_trailing_paren_index(stem) if strip_paren_index else stem | |
| def find_pdfs(directory: Path) -> Iterable[Path]: | |
| """Yield PDFs in directory (non-recursive).""" | |
| for path in sorted(directory.glob("*.pdf")): | |
| yield path | |
| def select_latest_by_base( | |
| paths: Iterable[Path], strip_paren_index: bool | |
| ) -> List[Tuple[Path, str, str]]: | |
| """For files sharing the same base name, keep the newest.""" | |
| latest: dict[str, Tuple[Path, float, str, str]] = {} | |
| for path in paths: | |
| base_raw = path.stem | |
| base_clean = normalize_base(base_raw, strip_paren_index) | |
| try: | |
| mtime = path.stat().st_mtime | |
| except FileNotFoundError: | |
| continue | |
| key = base_clean | |
| if key not in latest or mtime > latest[key][1]: | |
| latest[key] = (path, mtime, base_raw, base_clean) | |
| return [(p, base_raw, base_clean) for p, _mt, base_raw, base_clean in latest.values()] | |
| def is_port_open(port: int, host: str = "127.0.0.1", timeout: float = 0.3) -> bool: | |
| try: | |
| with socket.create_connection((host, port), timeout=timeout): | |
| return True | |
| except OSError: | |
| return False | |
| def wait_for_port(port: int, timeout: float = 10.0) -> bool: | |
| start = time.time() | |
| while time.time() - start < timeout: | |
| if is_port_open(port): | |
| return True | |
| time.sleep(0.2) | |
| return False | |
| class PlamoServerManager: | |
| def __init__( | |
| self, | |
| port: int = 30000, | |
| check_interval: float = 60.0, | |
| check_timeout: float = 20.0, | |
| ) -> None: | |
| self.port = port | |
| self.check_interval = check_interval | |
| self.check_timeout = check_timeout | |
| self.proc: Optional[subprocess.Popen] = None | |
| self.started_by_us = False | |
| self._stop_event = threading.Event() | |
| self._thread: Optional[threading.Thread] = None | |
| self._lock = threading.Lock() | |
| def ensure_running(self) -> bool: | |
| if is_port_open(self.port): | |
| print(f"plamo-translate server is already running on port {self.port}.") | |
| return True | |
| print(f"plamo-translate server not detected on port {self.port}.") | |
| print("Starting plamo-translate server in the background...") | |
| with self._lock: | |
| self.proc = self._start_process() | |
| self.started_by_us = True | |
| if not wait_for_port(self.port, timeout=15.0): | |
| print( | |
| "Waiting for plamo-translate server to start; will wait another 60 seconds." | |
| ) | |
| if not wait_for_port(self.port, timeout=60.0): | |
| sys.stderr.write( | |
| f"plamo-translate server did not start on port {self.port}.\n" | |
| ) | |
| sys.stderr.write( | |
| "Please start it manually with:\n uvx plamo-translate server\n" | |
| ) | |
| sys.stderr.write( | |
| "Verify it starts successfully, then re-run this command.\n" | |
| ) | |
| self.stop_if_started() | |
| return False | |
| return True | |
| def start_health_monitor(self) -> None: | |
| if self._thread and self._thread.is_alive(): | |
| return | |
| self._stop_event.clear() | |
| self._thread = threading.Thread(target=self._health_loop, daemon=True) | |
| self._thread.start() | |
| def stop_health_monitor(self) -> None: | |
| self._stop_event.set() | |
| if self._thread: | |
| self._thread.join(timeout=2) | |
| def stop_if_started(self) -> None: | |
| with self._lock: | |
| if not self.started_by_us: | |
| return | |
| self._stop_process(self.proc) | |
| self.proc = None | |
| self.started_by_us = False | |
| def restart(self, reason: str) -> None: | |
| print(f"{reason} Restarting plamo-translate server...") | |
| with self._lock: | |
| self._kill_port_processes() | |
| self.proc = self._start_process() | |
| self.started_by_us = True | |
| if not wait_for_port(self.port, timeout=30.0): | |
| sys.stderr.write( | |
| f"plamo-translate server did not start on port {self.port}.\n" | |
| ) | |
| def _health_loop(self) -> None: | |
| while not self._stop_event.wait(self.check_interval): | |
| self._health_check_once() | |
| def _health_check_once(self) -> None: | |
| if not is_port_open(self.port): | |
| self.restart("plamo-translate server is not reachable on the port.") | |
| return | |
| cmd = shlex.split(PLAMO_CLI_COMMAND) + [ | |
| "--input", | |
| "hello", | |
| "--from", | |
| "English", | |
| "--to", | |
| "Japanese", | |
| "--no-stream", | |
| ] | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL, | |
| timeout=self.check_timeout, | |
| ) | |
| except subprocess.TimeoutExpired: | |
| self.restart( | |
| "plamo-translate health check timed out (>20s)." | |
| ) | |
| return | |
| if result.returncode != 0: | |
| self.restart( | |
| f"plamo-translate health check failed (exit {result.returncode})." | |
| ) | |
| def _start_process(self) -> subprocess.Popen: | |
| cmd = shlex.split(PLAMO_CLI_COMMAND) + ["server"] | |
| cmd_display = " ".join(shlex.quote(part) for part in cmd) | |
| print(f"Starting plamo-translate server with: {cmd_display}") | |
| return subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL, | |
| start_new_session=True, | |
| ) | |
| def _stop_process(self, proc: Optional[subprocess.Popen]) -> None: | |
| if proc is None: | |
| return | |
| if proc.poll() is not None: | |
| return | |
| try: | |
| print("Stopping plamo-translate server...") | |
| proc.terminate() | |
| proc.wait(timeout=5) | |
| except subprocess.TimeoutExpired: | |
| proc.kill() | |
| proc.wait(timeout=5) | |
| def _kill_port_processes(self) -> None: | |
| lsof = shutil.which("lsof") | |
| if lsof is None: | |
| self._stop_process(self.proc) | |
| return | |
| result = subprocess.run( | |
| [lsof, "-n", "-P", f"-iTCP:{self.port}", "-sTCP:LISTEN", "-t"], | |
| capture_output=True, | |
| text=True, | |
| ) | |
| pids = [] | |
| for line in result.stdout.splitlines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| pids.append(int(line)) | |
| except ValueError: | |
| continue | |
| for pid in pids: | |
| self._terminate_pid(pid) | |
| def _terminate_pid(self, pid: int) -> None: | |
| try: | |
| os.kill(pid, signal.SIGTERM) | |
| except ProcessLookupError: | |
| return | |
| except PermissionError: | |
| return | |
| if self._wait_pid_exit(pid, timeout=5.0): | |
| return | |
| try: | |
| os.kill(pid, signal.SIGKILL) | |
| except ProcessLookupError: | |
| return | |
| def _wait_pid_exit(self, pid: int, timeout: float) -> bool: | |
| end = time.time() + timeout | |
| while time.time() < end: | |
| try: | |
| os.kill(pid, 0) | |
| except ProcessLookupError: | |
| return True | |
| except PermissionError: | |
| return False | |
| time.sleep(0.2) | |
| return False | |
| def is_generated_output(name: str) -> bool: | |
| return any( | |
| token in name | |
| for token in ( | |
| ".ja.", | |
| ".translated", | |
| "no_watermark", | |
| ".mono.", | |
| ".dual.", | |
| ) | |
| ) | |
| def cleanup_extras( | |
| base_prefixes: Iterable[str], | |
| output_dir: Path, | |
| keep: Path, | |
| protected: Iterable[Path], | |
| ) -> None: | |
| """Remove other generated PDFs for the given prefixes, keeping protected files.""" | |
| prefixes = tuple(base_prefixes) | |
| if not prefixes: | |
| return | |
| protected_paths = set() | |
| for path in protected: | |
| try: | |
| protected_paths.add(path.resolve()) | |
| except FileNotFoundError: | |
| continue | |
| for path in output_dir.glob("*.pdf"): | |
| if not path.name.startswith(prefixes): | |
| continue | |
| if not is_generated_output(path.name): | |
| continue | |
| try: | |
| if path.resolve() == keep.resolve(): | |
| continue | |
| if path.resolve() in protected_paths: | |
| continue | |
| except FileNotFoundError: | |
| continue | |
| try: | |
| path.unlink() | |
| except OSError as exc: | |
| sys.stderr.write(f"Warning: failed to remove {path}: {exc}\n") | |
| def make_title_slug(pdf_path: Path) -> Optional[str]: | |
| """Extract a safe title slug from the first page text.""" | |
| if shutil.which("pdftotext") is None: | |
| sys.stderr.write("Warning: pdftotext not found; skipping title extraction.\n") | |
| return None | |
| try: | |
| result = subprocess.run( | |
| ["pdftotext", "-f", "1", "-l", "1", str(pdf_path), "-"], | |
| capture_output=True, | |
| text=True, | |
| check=True, | |
| ) | |
| except subprocess.CalledProcessError as exc: | |
| sys.stderr.write( | |
| f"Warning: pdftotext failed ({exc.returncode}); skipping title extraction.\n" | |
| ) | |
| return None | |
| lines = result.stdout.splitlines()[:5] | |
| text = " ".join(lines) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| if not text: | |
| return None | |
| matches: list[str] = re.findall(r"[A-Za-z0-9 ]+", text) | |
| if not matches: | |
| return None | |
| candidate = max(matches, key=len) | |
| candidate = re.sub(r"\s+", " ", str(candidate)).strip() | |
| if not candidate: | |
| return None | |
| candidate = candidate[:50].replace(" ", "_") | |
| return candidate | |
| def pick_output_file(paths: Iterable[Path], stem: str) -> Optional[Path]: | |
| pdfs = [p for p in paths if p.suffix.lower() == ".pdf"] | |
| if not pdfs: | |
| return None | |
| def priority(p: Path) -> tuple: | |
| name = p.name | |
| return ( | |
| 0 if "no_watermark" in name else 1, | |
| 0 if "dual" in name else 1, | |
| 0 if stem in name else 1, | |
| -p.stat().st_mtime, | |
| ) | |
| pdfs.sort(key=priority) | |
| return pdfs[0] | |
| def translate_pdf( | |
| pdf_path: Path, | |
| output_dir: Path, | |
| target_path: Path, | |
| base_prefix: str, | |
| base_clean: str, | |
| pdf2zh_args: list[str], | |
| ) -> bool: | |
| """Run pdf2zh_next and move the bilingual output to target_path.""" | |
| cmd = shlex.split(PDF2ZH_COMMAND) + pdf2zh_args + ["--output", str(output_dir), str(pdf_path)] | |
| print(f"[translate] {pdf_path.name} -> {target_path.name}") | |
| result = subprocess.run(cmd) | |
| if result.returncode != 0: | |
| sys.stderr.write( | |
| f"pdf2zh_next failed for {pdf_path.name} (exit {result.returncode}).\n" | |
| ) | |
| return False | |
| output_file = pick_output_file(output_dir.glob("*.pdf"), pdf_path.stem) | |
| if output_file is None: | |
| sys.stderr.write(f"Translated output for {pdf_path.name} not found in {output_dir}.\n") | |
| return False | |
| try: | |
| output_file.replace(target_path) | |
| except OSError as exc: | |
| sys.stderr.write(f"Failed to move output {output_file} -> {target_path}: {exc}\n") | |
| return False | |
| cleanup_extras( | |
| [base_prefix, base_clean], | |
| output_dir, | |
| target_path, | |
| protected=[pdf_path], | |
| ) | |
| return True | |
| def main() -> int: | |
| args = parse_args() | |
| print( | |
| "This CLI uses the PLaMo model; you must agree to the PLaMo license to proceed." | |
| ) | |
| if args.output and args.output_dir: | |
| sys.stderr.write("--output and --output-dir cannot be used together.\n") | |
| return 1 | |
| targets_input = [path.expanduser() for path in (args.pdf + args.target_pdf)] | |
| strip_paren_index = not args.no_strip_paren_index | |
| if targets_input: | |
| missing = [path for path in targets_input if not path.is_file()] | |
| if missing: | |
| sys.stderr.write("The following target PDFs were not found:\n") | |
| for path in missing: | |
| sys.stderr.write(f" - {path}\n") | |
| return 1 | |
| targets: List[Tuple[Path, str, str]] = [] | |
| for pdf_path in targets_input: | |
| base_raw = pdf_path.stem | |
| base_clean = normalize_base(base_raw, strip_paren_index) | |
| targets.append((pdf_path, base_raw, base_clean)) | |
| else: | |
| if args.output: | |
| sys.stderr.write("--output requires exactly one input PDF.\n") | |
| return 1 | |
| auto_dir = args.auto_translate_dir.expanduser() | |
| if not auto_dir.is_dir(): | |
| sys.stderr.write(f"Auto-translate directory not found: {auto_dir}\n") | |
| return 1 | |
| candidates = list(find_pdfs(auto_dir)) | |
| targets = select_latest_by_base(candidates, strip_paren_index) | |
| if not targets: | |
| print("No PDFs found to translate.") | |
| return 0 | |
| if args.output and len(targets) != 1: | |
| sys.stderr.write("--output is only valid with a single input PDF.\n") | |
| return 1 | |
| ensure_command_available(PDF2ZH_COMMAND) | |
| ensure_command_available(PLAMO_CLI_COMMAND) | |
| pdf2zh_args = adjust_pdf2zh_args(load_pdf2zh_args()) | |
| translated: List[Tuple[Path, Path]] = [] | |
| skipped: List[Tuple[Path, Path]] = [] | |
| failed: List[Path] = [] | |
| planned: List[Tuple[Path, Path, str]] = [] | |
| server_manager = PlamoServerManager() | |
| def cleanup() -> None: | |
| server_manager.stop_if_started() | |
| try: | |
| if not args.dry_run: | |
| if not server_manager.ensure_running(): | |
| return 1 | |
| server_manager.start_health_monitor() | |
| atexit.register(cleanup) | |
| for pdf_path, base_raw, base_clean in targets: | |
| if args.output: | |
| dest = args.output.expanduser() | |
| dest_dir = dest.parent | |
| dest_name = dest.name | |
| title_slug = None | |
| else: | |
| dest_dir = args.output_dir.expanduser() if args.output_dir else pdf_path.parent | |
| title_slug = None if args.no_title_slug else make_title_slug(pdf_path) | |
| if title_slug: | |
| dest_name = f"{base_clean}.ja.{title_slug}.pdf" | |
| else: | |
| dest_name = f"{base_clean}.ja.pdf" | |
| dest = dest_dir / dest_name | |
| dest_dir.mkdir(parents=True, exist_ok=True) | |
| if dest.exists() and not args.force: | |
| print(f"[skip] {dest.name} already exists.") | |
| skipped.append((pdf_path, dest)) | |
| continue | |
| if dest.exists(): | |
| print(f"[retranslate] Overwriting {dest.name}.") | |
| cmd = shlex.split(PDF2ZH_COMMAND) + pdf2zh_args + [ | |
| "--output", | |
| str(dest_dir), | |
| str(pdf_path), | |
| ] | |
| cmd_display = " ".join(shlex.quote(part) for part in cmd) | |
| if args.dry_run: | |
| print(f"[dry-run] {cmd_display}") | |
| planned.append((pdf_path, dest, cmd_display)) | |
| continue | |
| success = translate_pdf( | |
| pdf_path, | |
| dest_dir, | |
| dest, | |
| base_raw, | |
| base_clean, | |
| pdf2zh_args, | |
| ) | |
| if success: | |
| print(f"[done] {dest}") | |
| translated.append((pdf_path, dest)) | |
| else: | |
| print(f"[fail] {pdf_path.name}") | |
| failed.append(pdf_path) | |
| finally: | |
| server_manager.stop_health_monitor() | |
| server_manager.stop_if_started() | |
| print("\nSummary:") | |
| if args.dry_run: | |
| if planned: | |
| print("Planned translations:") | |
| for src, out, cmd_display in planned: | |
| print(f" {src} -> {out}") | |
| print(f" {cmd_display}") | |
| if skipped: | |
| print("Skipped (already exists):") | |
| for src, out in skipped: | |
| print(f" {src} -> {out}") | |
| return 0 | |
| if translated: | |
| print("Translated:") | |
| for src, out in translated: | |
| print(f" {src} -> {out}") | |
| if skipped: | |
| print("Skipped (already exists):") | |
| for src, out in skipped: | |
| print(f" {src} -> {out}") | |
| if failed: | |
| print("Failed:") | |
| for src in failed: | |
| print(f" {src}") | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment