Skip to content

Instantly share code, notes, and snippets.

@relic-yuexi
Created January 21, 2026 09:42
Show Gist options
  • Select an option

  • Save relic-yuexi/f34672b01d3e07d886f9a625f2558ed3 to your computer and use it in GitHub Desktop.

Select an option

Save relic-yuexi/f34672b01d3e07d886f9a625f2558ed3 to your computer and use it in GitHub Desktop.
Convert PDF 2 Markdown based on HunayuanOCR
import os
import re
import argparse
from pathlib import Path
os.environ['VLLM_WORKER_MULTIPROC_METHOD'] = 'spawn'
from PIL import Image
from pdf2image import convert_from_path
from transformers import AutoProcessor
import torch.distributed as dist
def clean_repeated_substrings(text):
"""清理文本中的重复子字符串"""
n = len(text)
if n < 8000:
return text
for length in range(2, n // 10 + 1):
candidate = text[-length:]
count = 0
i = n - length
while i >= 0 and text[i:i + length] == candidate:
count += 1
i -= length
if count >= 10:
return text[:n - length * (count - 1)]
return text
class PDF2MD:
"""PDF转Markdown工具类"""
def __init__(self, model_path="tencent/HunyuanOCR"):
self.model_path = model_path
self.llm = None
self.processor = None
self.sampling_params = None
# 匹配坐标的正则表达式: (x1, y1), (x2, y2)
self.coord_pattern = r'\(\s*(\d+)\s*,\s*(\d+)\s*\)\s*,\s*\(\s*(\d+)\s*,\s*(\d+)\s*\)'
def init_model(self):
"""初始化OCR模型"""
from vllm import LLM, SamplingParams
print("正在加载模型...")
self.llm = LLM(
model=self.model_path,
trust_remote_code=True,
mm_processor_cache_gb=0,
enable_prefix_caching=False
)
self.processor = AutoProcessor.from_pretrained(self.model_path)
self.sampling_params = SamplingParams(temperature=0, max_tokens=16384)
print("模型加载完成")
def cleanup(self):
"""清理资源"""
if self.llm:
del self.llm
self.llm = None
if dist.is_initialized():
dist.destroy_process_group()
def pdf_to_images(self, pdf_path, output_dir, dpi=300):
"""将PDF转换为图片"""
pages_dir = output_dir / "pages"
pages_dir.mkdir(parents=True, exist_ok=True)
print(f"正在将PDF转换为图片 (DPI={dpi})...")
images = convert_from_path(pdf_path, dpi=dpi)
image_paths = []
for i, img in enumerate(images):
img_path = pages_dir / f"page_{i:03d}.png"
img.save(img_path, "PNG")
image_paths.append(img_path)
print(f" 保存页面 {i+1}/{len(images)}: {img_path.name}")
return image_paths, images
def ocr_image(self, img_path, img):
"""对单张图片进行OCR识别"""
messages = [
{"role": "system", "content": ""},
{"role": "user", "content": [
{"type": "image", "image": str(img_path)},
{"type": "text", "text": (
"提取文档图片中正文的所有信息用markdown格式表示,"
"其中页眉、页脚部分忽略,表格用html格式表达,"
"文档中公式用latex格式表示,按照阅读顺序组织进行解析。"
)}
]}
]
prompt = self.processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
inputs = {"prompt": prompt, "multi_modal_data": {"image": [img]}}
output = self.llm.generate([inputs], self.sampling_params)[0]
return clean_repeated_substrings(output.outputs[0].text)
def extract_crops_and_replace_md(self, text, image, page_idx, crop_dir, rel_crop_dir):
"""解析文本中的坐标,切割图片,保存,并替换Markdown中的坐标为图片链接。"""
img_width, img_height = image.size
crop_counter = [0] # 使用列表以便在闭包中修改
def replace_match(match):
try:
x1, y1, x2, y2 = map(int, match.groups())
# 坐标归一化转换 (1000系 -> 像素)
x1_px = x1 * img_width / 1000
y1_px = y1 * img_height / 1000
x2_px = x2 * img_width / 1000
y2_px = y2 * img_height / 1000
# 防止坐标越界
x1_px = max(0, x1_px)
y1_px = max(0, y1_px)
x2_px = min(img_width, x2_px)
y2_px = min(img_height, y2_px)
if x2_px <= x1_px or y2_px <= y1_px:
return ""
crop_img = image.crop((x1_px, y1_px, x2_px, y2_px))
# 保存裁剪的图片
crop_filename = f"page_{page_idx:03d}_crop_{crop_counter[0]:03d}.png"
crop_path = crop_dir / crop_filename
crop_img.save(crop_path, "PNG")
crop_counter[0] += 1
# 返回Markdown图片链接
rel_path = f"{rel_crop_dir}/{crop_filename}"
return f"![图片]({rel_path})"
except Exception as e:
print(f" 警告: 裁剪图片失败 - {e}")
return match.group(0) # 保留原始文本
result_text = re.sub(self.coord_pattern, replace_match, text)
return result_text, crop_counter[0]
def convert(self, pdf_path, output_dir):
"""执行PDF到Markdown的完整转换"""
pdf_path = Path(pdf_path)
output_dir = Path(output_dir)
if not pdf_path.exists():
raise FileNotFoundError(f"PDF文件不存在: {pdf_path}")
# 创建输出目录
output_dir.mkdir(parents=True, exist_ok=True)
crop_dir = output_dir / "crops"
crop_dir.mkdir(exist_ok=True)
# 初始化模型
self.init_model()
try:
# 1. PDF转图片
image_paths, images = self.pdf_to_images(pdf_path, output_dir, dpi=300)
# 2. 对每页进行OCR并处理
all_md_content = []
total_crops = 0
for page_idx, (img_path, img) in enumerate(zip(image_paths, images)):
print(f"\n处理第 {page_idx + 1}/{len(images)} 页...")
# OCR识别
print(f" 正在OCR识别...")
md_text = self.ocr_image(img_path, img)
# 提取坐标并裁剪图片
print(f" 正在提取图片坐标...")
processed_text, crop_count = self.extract_crops_and_replace_md(
md_text, img, page_idx, crop_dir, "crops"
)
total_crops += crop_count
print(f" 提取了 {crop_count} 张图片")
# 添加页面分隔
page_header = f"\n\n<!-- Page {page_idx + 1} -->\n\n"
all_md_content.append(page_header + processed_text)
# 3. 合并并保存Markdown
md_filename = pdf_path.stem + ".md"
md_path = output_dir / md_filename
final_content = "\n".join(all_md_content)
with open(md_path, "w", encoding="utf-8") as f:
f.write(final_content)
print(f"\n转换完成!")
print(f" Markdown文件: {md_path}")
print(f" 页面图片目录: {output_dir / 'pages'}")
print(f" 裁剪图片目录: {crop_dir}")
print(f" 总共提取图片: {total_crops} 张")
return md_path
finally:
self.cleanup()
def main():
parser = argparse.ArgumentParser(description="PDF转Markdown工具")
parser.add_argument("pdf_path", help="输入PDF文件路径")
parser.add_argument("output_dir", help="输出目录路径")
parser.add_argument(
"--model-path",
default="tencent/HunyuanOCR",
help="HunyuanOCR模型路径"
)
args = parser.parse_args()
converter = PDF2MD(model_path=args.model_path)
converter.convert(args.pdf_path, args.output_dir)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment