Skip to content

Instantly share code, notes, and snippets.

@dhruvilp
Created October 21, 2025 03:56
Show Gist options
  • Select an option

  • Save dhruvilp/dc8502d3e4c36c50af744a7ae46dc444 to your computer and use it in GitHub Desktop.

Select an option

Save dhruvilp/dc8502d3e4c36c50af744a7ae46dc444 to your computer and use it in GitHub Desktop.
docling plain parallel processing

Granite Docling Document Converter

A high-performance, parallel-processing library for converting documents to Markdown, JSON, and DocTags using the Granite Docling model. No FastAPI, Flask, or web frameworks required - pure Python library with sync and async support.

🚀 Features

  • No Web Framework Required: Pure Python library - use it directly in your code
  • Parallel Processing: Process large PDFs with multiple workers for maximum speed
  • Async Support: Full async/await support for non-blocking operations
  • Multiple Output Formats: Convert to Markdown, JSON, DocTags
  • Streaming Responses: Get results as they're generated
  • Bounding Box Visualization: Automatic detection and annotation of document elements
  • Memory Efficient: Stream processing for extremely large documents
  • Resumable Processing: Save checkpoints and resume interrupted jobs
  • Local Model Loading: Load models from local path (no HuggingFace download)

📦 Installation

pip install torch transformers Pillow numpy docling-core PyMuPDF tqdm

Or use the requirements file:

pip install -r requirements.txt

🎯 Quick Start

Basic Usage

from converter import DocumentConverter

# Initialize converter with local model path
converter = DocumentConverter("/path/to/granite-docling-258M")

# Convert document to markdown
result = converter.convert_to_markdown("document.png")
if result["success"]:
    print(result["content"])
    
# Query a document
result = converter.query_document(
    "document.png", 
    "What is the main topic of this document?"
)
print(f"Answer: {result['answer']}")

Parallel PDF Processing

from parallel_processor import ParallelPDFProcessor, OutputFormat

# Initialize with 8 workers
processor = ParallelPDFProcessor(
    model_path="/path/to/granite-docling-258M",
    max_workers=8,
    dpi=200
)

# Process entire PDF in parallel
results = processor.process_pdf_parallel(
    pdf_path="large_document.pdf",
    output_format=OutputFormat.MARKDOWN
)

# Save results
processor.save_results(
    results=results,
    output_path="output/document.md",
    output_format=OutputFormat.MARKDOWN
)

📊 Performance Comparison

Processing a 100-page PDF on a system with 8 cores:

Method Time Throughput Speedup
Sequential 500s 0.2 pages/s 1x
Parallel (4 workers) 150s 0.67 pages/s 3.3x
Parallel (8 workers) 85s 1.18 pages/s 5.9x

🔧 Core Components

1. DocumentConverter

Main class for document conversion operations.

from converter import DocumentConverter

converter = DocumentConverter("/path/to/model")

# Synchronous methods
converter.convert_to_markdown(image)
converter.convert_to_json(image)
converter.convert_to_doctags(image)
converter.query_document(image, question)
converter.convert_with_bounding_boxes(image)

# Async methods
await converter.convert_to_markdown_async(image)
await converter.convert_to_json_async(image)
await converter.query_document_async(image, question)

# Streaming
for chunk in converter.generate_response_streaming(question, image):
    print(chunk, end='', flush=True)

2. ParallelPDFProcessor

High-performance parallel processing for PDFs.

from parallel_processor import ParallelPDFProcessor, OutputFormat

processor = ParallelPDFProcessor(
    model_path="/path/to/model",
    max_workers=8,  # Number of parallel workers
    dpi=200         # Resolution for PDF conversion
)

# Process entire PDF
results = processor.process_pdf_parallel(
    pdf_path="document.pdf",
    output_format=OutputFormat.MARKDOWN,
    start_page=0,   # Optional: start from specific page
    end_page=99     # Optional: end at specific page
)

# Process with async/await
results = await processor.process_pdf_async(
    pdf_path="document.pdf",
    batch_size=10   # Process 10 pages concurrently
)

💡 Advanced Examples

Example 1: Process Large PDF (500+ pages)

import os
from parallel_processor import ParallelPDFProcessor, OutputFormat

# Auto-detect optimal worker count
cpu_count = os.cpu_count() or 4
optimal_workers = max(1, cpu_count - 1)

processor = ParallelPDFProcessor(
    model_path="/path/to/granite-docling-258M",
    max_workers=optimal_workers,
    dpi=200
)

# Process large PDF
results = processor.process_pdf_parallel(
    pdf_path="large_report_500_pages.pdf",
    output_format=OutputFormat.MARKDOWN
)

processor.save_results(results, "output/report.md", OutputFormat.MARKDOWN)

Example 2: Chunked Processing for Massive PDFs (1000+ pages)

from parallel_processor import ParallelPDFProcessor, OutputFormat
import fitz

processor = ParallelPDFProcessor(
    model_path="/path/to/granite-docling-258M",
    max_workers=8,
    dpi=150  # Lower DPI for very large files
)

# Get total page count
doc = fitz.open("massive_document.pdf")
total_pages = len(doc)
doc.close()

# Process in chunks of 100 pages
chunk_size = 100
all_results = []

for chunk_start in range(0, total_pages, chunk_size):
    chunk_end = min(chunk_start + chunk_size - 1, total_pages - 1)
    
    print(f"Processing pages {chunk_start+1}-{chunk_end+1}")
    
    results = processor.process_pdf_parallel(
        pdf_path="massive_document.pdf",
        start_page=chunk_start,
        end_page=chunk_end
    )
    
    all_results.extend(results)
    
    # Save intermediate results
    processor.save_results(
        results=results,
        output_path=f"output/chunk_{chunk_start+1}_{chunk_end+1}.md",
        output_format=OutputFormat.MARKDOWN
    )

# Combine all chunks
processor.save_results(
    all_results, 
    "output/complete_document.md", 
    OutputFormat.MARKDOWN
)

Example 3: Process Multiple PDFs Concurrently

import asyncio
from parallel_processor import ParallelPDFProcessor, OutputFormat
from pathlib import Path

async def process_multiple_pdfs(pdf_paths):
    async def process_one(pdf_path):
        processor = ParallelPDFProcessor(
            model_path="/path/to/granite-docling-258M",
            max_workers=4
        )
        
        loop = asyncio.get_event_loop()
        results = await loop.run_in_executor(
            None,
            processor.process_pdf_parallel,
            pdf_path,
            OutputFormat.MARKDOWN
        )
        
        output = f"output/{Path(pdf_path).stem}.md"
        processor.save_results(results, output, OutputFormat.MARKDOWN)
        
        return {
            "pdf": pdf_path,
            "pages": len(results),
            "successful": sum(1 for r in results if r.success)
        }
    
    tasks = [process_one(pdf) for pdf in pdf_paths]
    return await asyncio.gather(*tasks)

# Process 5 PDFs concurrently
pdf_files = ["doc1.pdf", "doc2.pdf", "doc3.pdf", "doc4.pdf", "doc5.pdf"]
results = asyncio.run(process_multiple_pdfs(pdf_files))

for r in results:
    print(f"{r['pdf']}: {r['successful']}/{r['pages']} pages")

Example 4: Resume Interrupted Processing

import json
import os
from parallel_processor import ParallelPDFProcessor, OutputFormat, PageResult
from pathlib import Path

checkpoint_file = "processing_checkpoint.json"
output_dir = Path("output/resume_example")
output_dir.mkdir(parents=True, exist_ok=True)

processor = ParallelPDFProcessor(
    model_path="/path/to/granite-docling-258M",
    max_workers=4
)

# Load checkpoint if exists
completed_pages = set()
if os.path.exists(checkpoint_file):
    with open(checkpoint_file, 'r') as f:
        completed_pages = set(json.load(f)['completed_pages'])
    print(f"Resuming from page {len(completed_pages) + 1}")

# Process remaining pages
images = processor.pdf_to_images("mixed_content.pdf")

for page_num, image in enumerate(images):
    # Detect page type
    detection = converter.query_document(
        image,
        "What type of content is on this page? Table, chart, code, or text?"
    )
    
    page_type = detection['answer'].lower()
    
    # Choose appropriate prompt
    if 'table' in page_type:
        prompt = "Convert this table to OTSL."
    elif 'chart' in page_type or 'graph' in page_type:
        prompt = "Convert chart to OTSL."
    elif 'code' in page_type:
        prompt = "Convert code to text."
    else:
        prompt = "Convert this page to docling."
    
    print(f"Page {page_num + 1}: {page_type} -> {prompt}")
    result = converter.convert_to_markdown(image, prompt)
    # Process result...

Example 7: Smart Hardware-Optimized Processing

import torch
from parallel_processor import ParallelPDFProcessor, OutputFormat

# Auto-detect hardware and optimize settings
has_gpu = torch.cuda.is_available()
gpu_memory = torch.cuda.get_device_properties(0).total_memory if has_gpu else 0

if has_gpu:
    if gpu_memory > 16 * 1024**3:  # > 16GB GPU
        max_workers = 8
        dpi = 300
    elif gpu_memory > 8 * 1024**3:  # > 8GB GPU
        max_workers = 4
        dpi = 200
    else:  # < 8GB GPU
        max_workers = 2
        dpi = 150
else:  # CPU only
    max_workers = 2
    dpi = 150

print(f"GPU: {'Yes' if has_gpu else 'No'}")
if has_gpu:
    print(f"GPU Memory: {gpu_memory / 1024**3:.1f}GB")
print(f"Workers: {max_workers}, DPI: {dpi}")

processor = ParallelPDFProcessor(
    model_path="/path/to/granite-docling-258M",
    max_workers=max_workers,
    dpi=dpi
)

results = processor.process_pdf_parallel(
    pdf_path="document.pdf",
    output_format=OutputFormat.MARKDOWN
)

Example 8: Benchmark Different Configurations

import itertools
from parallel_processor import ParallelPDFProcessor, OutputFormat

worker_counts = [1, 2, 4, 8]
dpi_values = [150, 200, 300]

print(f"{'Workers':<10}{'DPI':<10}{'Time (s)':<12}{'Pages/s':<12}")
print("-" * 50)

best_config = None
best_throughput = 0

for workers, dpi in itertools.product(worker_counts, dpi_values):
    processor = ParallelPDFProcessor(
        model_path="/path/to/granite-docling-258M",
        max_workers=workers,
        dpi=dpi
    )
    
    import time
    start = time.time()
    results = processor.process_pdf_parallel(
        pdf_path="benchmark.pdf",
        output_format=OutputFormat.MARKDOWN,
        start_page=0,
        end_page=9  # Test first 10 pages
    )
    elapsed = time.time() - start
    throughput = len(results) / elapsed
    
    print(f"{workers:<10}{dpi:<10}{elapsed:<12.2f}{throughput:<12.2f}")
    
    if throughput > best_throughput:
        best_throughput = throughput
        best_config = {'workers': workers, 'dpi': dpi}

print(f"\nBest: {best_config['workers']} workers at {best_config['dpi']} DPI")
print(f"Throughput: {best_throughput:.2f} pages/s")

🎨 Output Formats

Markdown

result = converter.convert_to_markdown("document.png")
# Returns: {'success': True, 'format': 'markdown', 'content': '# Title\n\n...'}

JSON (Structured Document)

result = converter.convert_to_json("document.png")
# Returns: {'success': True, 'format': 'json', 'content': {...}}

DocTags (XML-based)

result = converter.convert_to_doctags("document.png")
# Returns: {'success': True, 'format': 'doctags', 'content': '<doctag>...'}

With Bounding Boxes

result = converter.convert_with_bounding_boxes("document.png", return_base64=True)
# Returns: {
#   'success': True,
#   'content': '<doctag>...',
#   'annotated_image': 'base64_string...',
#   'has_bounding_boxes': True
# }

🔍 Supported Document Types

  • Full Documents: PDF pages, scanned documents, reports
  • Tables: Extract to OTSL format or markdown tables
  • Mathematical Formulas: Convert to LaTeX
  • Code: Extract code from screenshots
  • Charts & Graphs: Extract chart data
  • Multi-language: Arabic, Japanese, Chinese, and more
  • Mixed Content: Documents with tables, images, text, and code

📝 Common Prompts

# Document conversion
"Convert this page to docling."

# Table extraction
"Convert this table to OTSL."

# Formula recognition
"Convert formula to latex."

# Code extraction
"Convert code to text."

# Chart extraction
"Convert chart to OTSL."

# Image description
"Describe this image."

# Specific questions
"What is the title of this document?"
"Does the document contain tables?"
"Extract the 2nd section header."
"What element is located at <loc_84><loc_403><loc_238><loc_419>"

⚙️ Configuration Options

DocumentConverter

converter = DocumentConverter(
    model_path="/path/to/granite-docling-258M"  # Local path to model
)

ParallelPDFProcessor

processor = ParallelPDFProcessor(
    model_path="/path/to/granite-docling-258M",  # Local path to model
    max_workers=8,        # Number of parallel workers (None = auto)
    dpi=200              # Resolution for PDF to image conversion
)

🚦 Error Handling

All methods return dictionaries with a success field:

result = converter.convert_to_markdown("document.png")

if result["success"]:
    # Process successful result
    markdown_content = result["content"]
    print(markdown_content)
else:
    # Handle error
    print(f"Error: {result['message']}")
    # Raw output may still be available
    if result["raw_output"]:
        print(f"Raw output: {result['raw_output']}")

📈 Performance Tips

  1. Worker Count: Set to CPU cores - 1 for optimal performance
  2. DPI Settings:
    • 150 DPI: Fast processing, lower quality
    • 200 DPI: Balanced (recommended)
    • 300 DPI: High quality, slower
  3. Batch Processing: Use chunking for very large PDFs (1000+ pages)
  4. Memory Management: Use streaming for extremely large documents
  5. GPU Usage: Automatically detects and uses GPU if available
  6. Async Operations: Use async methods for I/O-bound operations

🔧 Command-Line Usage

Basic Conversion

python converter.py /path/to/model document.png

Parallel PDF Processing

python parallel_processor.py /path/to/model document.pdf markdown 8

Advanced Examples

python advanced_examples.py
# Then select from menu:
# 1. Large PDF with optimal workers
# 2. Chunked processing for massive PDFs
# 3. Hybrid async + parallel
# etc.

🐛 Troubleshooting

Out of Memory Errors

  • Reduce DPI: Use 150 instead of 200 or 300
  • Reduce workers: Use fewer parallel workers
  • Use chunked processing: Process in smaller batches
  • Use streaming: Process one page at a time

Slow Processing

  • Increase workers: Match your CPU core count
  • Check GPU: Ensure GPU is detected and used
  • Optimize DPI: 200 DPI is usually sufficient
  • Use parallel processing: Don't process sequentially

Model Loading Issues

  • Ensure model path is correct
  • Check model files exist locally
  • Verify sufficient disk space
  • Use local_files_only=True (already set)

📚 API Reference

DocumentConverter Methods

# Synchronous
convert_to_markdown(image, prompt) -> Dict
convert_to_json(image, prompt) -> Dict
convert_to_doctags(image, prompt) -> Dict
convert_with_bounding_boxes(image, prompt, return_base64) -> Dict
query_document(image, question) -> Dict
generate_response(question, image) -> str

# Asynchronous
convert_to_markdown_async(image, prompt) -> Dict
convert_to_json_async(image, prompt) -> Dict
convert_to_doctags_async(image, prompt) -> Dict
convert_with_bounding_boxes_async(image, prompt, return_base64) -> Dict
query_document_async(image, question) -> Dict
generate_response_async(question, image) -> str

# Streaming
generate_response_streaming(question, image) -> Generator[str]

# Utility
clean_model_response(text) -> str
draw_bounding_boxes(image, response_text, is_doctag_response) -> Image
image_to_base64(image) -> str
base64_to_image(base64_str) -> Image

ParallelPDFProcessor Methods

# Processing
process_pdf_parallel(pdf_path, output_format, prompt, start_page, end_page) -> List[PageResult]
process_pdf_async(pdf_path, output_format, prompt, start_page, end_page, batch_size) -> List[PageResult]
process_page(args) -> PageResult

# Utilities
pdf_to_images(pdf_path) -> List[Image]
save_results(results, output_path, output_format) -> None

🏗️ Project Structure

.
├── converter.py              # Main DocumentConverter class
├── parallel_processor.py     # ParallelPDFProcessor for PDFs
├── advanced_examples.py      # Advanced usage examples
├── usage_examples.py         # Basic usage examples
├── requirements.txt          # Python dependencies
└── output/                   # Output directory (auto-created)

📄 License

This code is provided as-is for use with the Granite Docling model.

🤝 Contributing

Feel free to extend and modify for your specific needs. Common extensions:

  • Add support for other input formats (Word, Excel, etc.)
  • Implement custom post-processing pipelines
  • Add support for distributed processing across multiple machines
  • Integrate with databases or cloud storage

💬 Support

For issues with:

  • Model: Check Granite Docling documentation
  • Dependencies: Verify PyTorch, Transformers versions
  • Performance: See Performance Tips section above

🎯 Use Cases

  1. Document Digitization: Convert scanned PDFs to searchable markdown
  2. Data Extraction: Extract tables and charts from reports
  3. Knowledge Base: Convert documentation to structured formats
  4. Academic Research: Extract formulas and citations from papers
  5. Code Documentation: Extract code from screenshots/images
  6. Multi-language Processing: Handle documents in various languages
  7. Batch Processing: Process thousands of documents overnight

⚡ Quick Tips

# Tip 1: Process specific pages only
results = processor.process_pdf_parallel(
    "document.pdf",
    start_page=10,
    end_page=20
)

# Tip 2: Use PIL Image directly (skip file I/O)
from PIL import Image
img = Image.open("doc.png")
result = converter.convert_to_markdown(img)

# Tip 3: Stream for real-time feedback
for chunk in converter.generate_response_streaming("Convert this", image):
    print(chunk, end='', flush=True)

# Tip 4: Save intermediate results
for i in range(0, total_pages, 100):
    results = processor.process_pdf_parallel(
        pdf, start_page=i, end_page=i+99
    )
    processor.save_results(results, f"output/chunk_{i}.md", OutputFormat.MARKDOWN)

Ready to process documents at scale! 🚀 = processor.pdf_to_images("large_document.pdf")

for page_num, image in enumerate(images): if page_num in completed_pages: continue

try:
    result = processor.process_page((
        page_num, image, OutputFormat.MARKDOWN, "Convert this page to docling."
    ))
    
    # Save individual page
    page_output = output_dir / f"page_{page_num + 1:04d}.md"
    if result.success:
        with open(page_output, 'w') as f:
            f.write(result.content)
    
    # Update checkpoint
    completed_pages.add(page_num)
    with open(checkpoint_file, 'w') as f:
        json.dump({'completed_pages': list(completed_pages)}, f)

except KeyboardInterrupt:
    print(f"\nInterrupted! Resume with: python script.py")
    break

### Example 5: Memory-Efficient Streaming (for 10,000+ page PDFs)

```python
import fitz
from converter import DocumentConverter
from PIL import Image

converter = DocumentConverter("/path/to/granite-docling-258M")
pdf_path = "extremely_large_document.pdf"
output_file = "output/streamed_output.md"

doc = fitz.open(pdf_path)
total_pages = len(doc)

with open(output_file, 'w') as out_file:
    out_file.write(f"# Document Conversion ({total_pages} pages)\n\n")
    
    for page_num in range(total_pages):
        print(f"Processing page {page_num + 1}/{total_pages}...", end='')
        
        # Extract single page
        page = doc[page_num]
        mat = fitz.Matrix(200 / 72, 200 / 72)
        pix = page.get_pixmap(matrix=mat)
        image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
        
        # Convert and write immediately
        result = converter.convert_to_markdown(image)
        
        if result['success']:
            out_file.write(f"## Page {page_num + 1}\n\n{result['content']}\n\n---\n\n")
            out_file.flush()
            print(" ✓")
        else:
            print(" ✗")
        
        # Clear from memory
        del image, pix

doc.close()
print(f"\n✓ Saved to {output_file}")

Example 6: Custom Prompts Based on Page Type

from converter import DocumentConverter
from parallel_processor import ParallelPDFProcessor

converter = DocumentConverter("/path/to/granite-docling-258M")
processor = ParallelPDFProcessor(model_path="/path/to/granite-docling-258M", max_workers=1)

images
"""Advanced examples for parallel PDF processing."""
import asyncio
from pathlib import Path
from typing import List
import time
from parallel_processor import ParallelPDFProcessor, OutputFormat, PageResult
from converter import DocumentConverter
def example_large_pdf_optimal_workers():
"""
Process large PDF with optimal number of workers.
Automatically determines best worker count based on CPU cores.
"""
print("=== LARGE PDF PROCESSING (AUTO-OPTIMIZED) ===\n")
import os
# Determine optimal worker count
cpu_count = os.cpu_count() or 4
optimal_workers = max(1, cpu_count - 1) # Leave 1 core free
print(f"Detected {cpu_count} CPU cores")
print(f"Using {optimal_workers} workers for optimal performance\n")
processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
max_workers=optimal_workers,
dpi=200
)
# Process large PDF (e.g., 500+ pages)
results = processor.process_pdf_parallel(
pdf_path="large_report_500_pages.pdf",
output_format=OutputFormat.MARKDOWN,
prompt="Convert this page to docling."
)
# Save results
processor.save_results(
results=results,
output_path="output/large_report.md",
output_format=OutputFormat.MARKDOWN
)
# Print failed pages if any
failed_pages = [r for r in results if not r.success]
if failed_pages:
print(f"\nFailed pages ({len(failed_pages)}):")
for r in failed_pages:
print(f" Page {r.page_number + 1}: {r.error}")
def example_chunked_processing():
"""
Process PDF in chunks to manage memory for very large files.
Useful for PDFs with 1000+ pages.
"""
print("=== CHUNKED PROCESSING FOR MASSIVE PDFs ===\n")
processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
max_workers=8,
dpi=150 # Lower DPI for very large files
)
pdf_path = "massive_document_1000_pages.pdf"
chunk_size = 100 # Process 100 pages at a time
# Get total page count
import fitz
doc = fitz.open(pdf_path)
total_pages = len(doc)
doc.close()
print(f"Total pages: {total_pages}")
print(f"Chunk size: {chunk_size}\n")
all_results = []
for chunk_start in range(0, total_pages, chunk_size):
chunk_end = min(chunk_start + chunk_size - 1, total_pages - 1)
print(f"\nProcessing chunk: pages {chunk_start+1}-{chunk_end+1}")
results = processor.process_pdf_parallel(
pdf_path=pdf_path,
output_format=OutputFormat.MARKDOWN,
start_page=chunk_start,
end_page=chunk_end
)
all_results.extend(results)
# Save intermediate results
output_path = f"output/chunk_{chunk_start+1}_{chunk_end+1}.md"
processor.save_results(
results=results,
output_path=output_path,
output_format=OutputFormat.MARKDOWN
)
print(f"Chunk saved to: {output_path}")
# Combine all chunks into final output
print(f"\nCombining all {len(all_results)} pages...")
processor.save_results(
results=all_results,
output_path="output/complete_document.md",
output_format=OutputFormat.MARKDOWN
)
def example_mixed_async_parallel():
"""
Hybrid approach: Use asyncio for I/O and ProcessPool for CPU-intensive tasks.
Best for processing multiple PDFs concurrently.
"""
print("=== HYBRID ASYNC + PARALLEL PROCESSING ===\n")
async def process_multiple_pdfs(pdf_paths: List[str]):
"""Process multiple PDFs concurrently."""
async def process_single_pdf(pdf_path: str):
"""Process one PDF with parallel workers."""
processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
max_workers=4 # Use fewer workers per PDF to allow multiple PDFs
)
# Run parallel processing in executor
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(
None,
processor.process_pdf_parallel,
pdf_path,
OutputFormat.MARKDOWN,
"Convert this page to docling.",
None,
None
)
# Save results
output_path = f"output/{Path(pdf_path).stem}.md"
processor.save_results(results, output_path, OutputFormat.MARKDOWN)
return {
"pdf": pdf_path,
"total_pages": len(results),
"successful": sum(1 for r in results if r.success)
}
# Process all PDFs concurrently
tasks = [process_single_pdf(pdf) for pdf in pdf_paths]
results = await asyncio.gather(*tasks)
return results
# Example: Process 5 PDFs concurrently
pdf_files = [
"document1.pdf",
"document2.pdf",
"document3.pdf",
"document4.pdf",
"document5.pdf"
]
start_time = time.time()
results = asyncio.run(process_multiple_pdfs(pdf_files))
total_time = time.time() - start_time
print(f"\n{'='*60}")
print(f"Processed {len(pdf_files)} PDFs in {total_time:.2f}s")
for result in results:
print(f" {result['pdf']}: {result['successful']}/{result['total_pages']} pages")
print(f"{'='*60}")
def example_smart_batch_processing():
"""
Smart batching based on GPU/CPU availability and memory.
"""
print("=== SMART BATCH PROCESSING ===\n")
import torch
# Detect hardware
has_gpu = torch.cuda.is_available()
gpu_memory = torch.cuda.get_device_properties(0).total_memory if has_gpu else 0
# Adjust parameters based on hardware
if has_gpu:
if gpu_memory > 16 * 1024**3: # > 16GB GPU
max_workers = 8
dpi = 300
batch_size = 20
elif gpu_memory > 8 * 1024**3: # > 8GB GPU
max_workers = 4
dpi = 200
batch_size = 10
else: # < 8GB GPU
max_workers = 2
dpi = 150
batch_size = 5
else: # CPU only
max_workers = 2
dpi = 150
batch_size = 5
print(f"Hardware detected:")
print(f" GPU: {'Yes' if has_gpu else 'No'}")
if has_gpu:
print(f" GPU Memory: {gpu_memory / 1024**3:.1f}GB")
print(f" Optimized settings:")
print(f" Workers: {max_workers}")
print(f" DPI: {dpi}")
print(f" Batch size: {batch_size}\n")
processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
max_workers=max_workers,
dpi=dpi
)
results = processor.process_pdf_parallel(
pdf_path="document.pdf",
output_format=OutputFormat.MARKDOWN
)
processor.save_results(results, "output/document.md", OutputFormat.MARKDOWN)
def example_resume_interrupted_processing():
"""
Resume processing from where it left off if interrupted.
Useful for very long-running jobs.
"""
print("=== RESUME INTERRUPTED PROCESSING ===\n")
import json
import os
checkpoint_file = "processing_checkpoint.json"
output_dir = Path("output/resume_example")
output_dir.mkdir(parents=True, exist_ok=True)
processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
max_workers=4
)
pdf_path = "large_document.pdf"
# Check for existing checkpoint
completed_pages = set()
if os.path.exists(checkpoint_file):
print("Found checkpoint file, loading progress...")
with open(checkpoint_file, 'r') as f:
checkpoint = json.load(f)
completed_pages = set(checkpoint['completed_pages'])
print(f"Resuming from page {len(completed_pages) + 1}\n")
# Get total pages
import fitz
doc = fitz.open(pdf_path)
total_pages = len(doc)
doc.close()
# Process only remaining pages
images = processor.pdf_to_images(pdf_path)
results = []
for page_num, image in enumerate(images):
if page_num in completed_pages:
print(f"Skipping page {page_num + 1} (already processed)")
continue
print(f"Processing page {page_num + 1}/{total_pages}...")
try:
result = processor.process_page((
page_num,
image,
OutputFormat.MARKDOWN,
"Convert this page to docling."
))
results.append(result)
# Save individual page
page_output = output_dir / f"page_{page_num + 1:04d}.md"
if result.success:
with open(page_output, 'w') as f:
f.write(result.content)
# Update checkpoint
completed_pages.add(page_num)
with open(checkpoint_file, 'w') as f:
json.dump({'completed_pages': list(completed_pages)}, f)
except KeyboardInterrupt:
print("\nProcessing interrupted! Progress saved.")
print(f"Run again to resume from page {len(completed_pages) + 1}")
return
except Exception as e:
print(f"Error on page {page_num + 1}: {e}")
continue
# Combine all pages
print("\nCombining all pages...")
all_results = []
for page_num in range(total_pages):
page_file = output_dir / f"page_{page_num + 1:04d}.md"
if page_file.exists():
with open(page_file, 'r') as f:
content = f.read()
all_results.append(PageResult(
page_number=page_num,
success=True,
content=content,
processing_time=0
))
processor.save_results(all_results, "output/complete_document.md", OutputFormat.MARKDOWN)
# Clean up checkpoint
if os.path.exists(checkpoint_file):
os.remove(checkpoint_file)
print("✓ Processing complete, checkpoint removed")
def example_quality_vs_speed():
"""
Compare different quality/speed tradeoffs.
"""
print("=== QUALITY VS SPEED COMPARISON ===\n")
pdf_path = "sample_document.pdf"
configs = [
{"name": "Fast", "dpi": 100, "workers": 8},
{"name": "Balanced", "dpi": 200, "workers": 4},
{"name": "High Quality", "dpi": 300, "workers": 2},
]
for config in configs:
print(f"\n{config['name']} Mode (DPI={config['dpi']}, Workers={config['workers']}):")
print("-" * 60)
processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
max_workers=config['workers'],
dpi=config['dpi']
)
start_time = time.time()
results = processor.process_pdf_parallel(
pdf_path=pdf_path,
output_format=OutputFormat.MARKDOWN,
start_page=0,
end_page=9 # Test first 10 pages
)
elapsed = time.time() - start_time
successful = sum(1 for r in results if r.success)
avg_time = sum(r.processing_time for r in results) / len(results)
print(f" Time: {elapsed:.2f}s")
print(f" Success rate: {successful}/{len(results)}")
print(f" Avg per page: {avg_time:.2f}s")
print(f" Throughput: {len(results)/elapsed:.2f} pages/s")
def example_error_recovery():
"""
Process PDF with automatic retry on failures.
"""
print("=== ERROR RECOVERY EXAMPLE ===\n")
processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
max_workers=4
)
# First attempt
print("First processing attempt...")
results = processor.process_pdf_parallel(
pdf_path="document.pdf",
output_format=OutputFormat.MARKDOWN
)
# Find failed pages
failed_pages = [r for r in results if not r.success]
if failed_pages:
print(f"\n{len(failed_pages)} pages failed, retrying...")
# Extract images for failed pages only
images = processor.pdf_to_images("document.pdf")
retry_results = []
for failed_result in failed_pages:
page_num = failed_result.page_number
print(f" Retrying page {page_num + 1}...")
# Retry with single worker (more stable)
retry_processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
max_workers=1
)
retry_result = retry_processor.process_page((
page_num,
images[page_num],
OutputFormat.MARKDOWN,
"Convert this page to docling."
))
if retry_result.success:
# Replace failed result with successful retry
results[page_num] = retry_result
print(f" ✓ Success")
else:
print(f" ✗ Failed again: {retry_result.error}")
# Save final results
processor.save_results(results, "output/document.md", OutputFormat.MARKDOWN)
final_success = sum(1 for r in results if r.success)
print(f"\nFinal result: {final_success}/{len(results)} pages successful")
def example_custom_prompts_per_page():
"""
Use different prompts for different page types.
"""
print("=== CUSTOM PROMPTS PER PAGE TYPE ===\n")
from converter import DocumentConverter
converter = DocumentConverter("/path/to/granite-docling-258M")
processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
max_workers=1 # Use 1 for this example
)
# Extract images
images = processor.pdf_to_images("mixed_content_document.pdf")
results = []
for page_num, image in enumerate(images):
print(f"Processing page {page_num + 1}...")
# First, detect page type
detection_result = converter.query_document(
image,
"What type of content is on this page? Is it text, table, chart, or code?"
)
page_type = detection_result['answer'].lower()
# Choose prompt based on page type
if 'table' in page_type:
prompt = "Convert this table to OTSL."
print(f" Detected: Table")
elif 'chart' in page_type or 'graph' in page_type:
prompt = "Convert chart to OTSL."
print(f" Detected: Chart")
elif 'code' in page_type:
prompt = "Convert code to text."
print(f" Detected: Code")
else:
prompt = "Convert this page to docling."
print(f" Detected: Text")
# Process with appropriate prompt
result = converter.convert_to_markdown(image, prompt)
results.append(PageResult(
page_number=page_num,
success=result['success'],
content=result['content'],
processing_time=0
))
processor.save_results(results, "output/mixed_content.md", OutputFormat.MARKDOWN)
def example_parallel_with_post_processing():
"""
Parallel processing with post-processing pipeline.
"""
print("=== PARALLEL + POST-PROCESSING PIPELINE ===\n")
processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
max_workers=6
)
# Step 1: Parallel conversion
print("Step 1: Converting PDF pages...")
results = processor.process_pdf_parallel(
pdf_path="document.pdf",
output_format=OutputFormat.MARKDOWN
)
# Step 2: Post-processing
print("\nStep 2: Post-processing...")
processed_results = []
for result in results:
if result.success:
content = result.content
# Clean up common OCR errors
content = content.replace('|', 'I') # Example cleanup
content = content.replace('0', 'O') # In specific contexts
# Add page headers
content = f"<!-- Page {result.page_number + 1} -->\n\n{content}"
# Create new result
processed_results.append(PageResult(
page_number=result.page_number,
success=True,
content=content,
processing_time=result.processing_time
))
else:
processed_results.append(result)
# Step 3: Save with table of contents
print("\nStep 3: Generating table of contents...")
toc = ["# Table of Contents\n\n"]
for result in processed_results:
if result.success:
# Extract first heading as chapter title
lines = result.content.split('\n')
for line in lines:
if line.startswith('#'):
title = line.lstrip('#').strip()
toc.append(f"- [Page {result.page_number + 1}]" +
f"(#page-{result.page_number + 1}): {title}\n")
break
toc.append("\n---\n\n")
# Combine TOC with content
with open("output/document_with_toc.md", 'w') as f:
f.write(''.join(toc))
for result in processed_results:
if result.success:
f.write(f"\n\n<a name='page-{result.page_number + 1}'></a>\n\n")
f.write(result.content)
f.write("\n\n---\n\n")
print("✓ Document saved with table of contents")
def example_memory_efficient_streaming():
"""
Memory-efficient processing for extremely large PDFs.
Processes and saves pages one at a time.
"""
print("=== MEMORY-EFFICIENT STREAMING ===\n")
import fitz
from converter import DocumentConverter
converter = DocumentConverter("/path/to/granite-docling-258M")
pdf_path = "extremely_large_document.pdf"
output_file = "output/streamed_output.md"
# Open PDF
doc = fitz.open(pdf_path)
total_pages = len(doc)
print(f"Processing {total_pages} pages (streaming mode)...")
print(f"Output: {output_file}\n")
# Create output file
with open(output_file, 'w') as out_file:
out_file.write(f"# Document Conversion\n\n")
out_file.write(f"Total pages: {total_pages}\n\n")
out_file.write("---\n\n")
# Process pages one at a time
for page_num in range(total_pages):
print(f"Processing page {page_num + 1}/{total_pages}...", end='', flush=True)
# Extract single page
page = doc[page_num]
mat = fitz.Matrix(200 / 72, 200 / 72)
pix = page.get_pixmap(matrix=mat)
image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
# Convert page
result = converter.convert_to_markdown(image)
# Write immediately to file
if result['success']:
out_file.write(f"## Page {page_num + 1}\n\n")
out_file.write(result['content'])
out_file.write("\n\n---\n\n")
out_file.flush() # Force write to disk
print(" ✓")
else:
out_file.write(f"## Page {page_num + 1}\n\n")
out_file.write(f"*Error: {result['message']}*\n\n---\n\n")
out_file.flush()
print(" ✗")
# Clear image from memory
del image
del pix
doc.close()
print(f"\n✓ Complete! Output saved to {output_file}")
def example_benchmark_configuration():
"""
Benchmark different configurations to find optimal settings.
"""
print("=== BENCHMARK CONFIGURATIONS ===\n")
import itertools
pdf_path = "benchmark_document.pdf"
# Test different configurations
worker_counts = [1, 2, 4, 8]
dpi_values = [150, 200, 300]
results_table = []
print(f"{'Workers':<10}{'DPI':<10}{'Time (s)':<12}{'Pages/s':<12}{'Success Rate':<15}")
print("-" * 60)
for workers, dpi in itertools.product(worker_counts, dpi_values):
processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
max_workers=workers,
dpi=dpi
)
start_time = time.time()
results = processor.process_pdf_parallel(
pdf_path=pdf_path,
output_format=OutputFormat.MARKDOWN,
start_page=0,
end_page=9 # Test first 10 pages
)
elapsed = time.time() - start_time
successful = sum(1 for r in results if r.success)
throughput = len(results) / elapsed
success_rate = f"{successful}/{len(results)}"
print(f"{workers:<10}{dpi:<10}{elapsed:<12.2f}{throughput:<12.2f}{success_rate:<15}")
results_table.append({
'workers': workers,
'dpi': dpi,
'time': elapsed,
'throughput': throughput,
'success_rate': successful / len(results)
})
# Find best configuration
best = max(results_table, key=lambda x: x['throughput'])
print(f"\n{'='*60}")
print(f"Best configuration:")
print(f" Workers: {best['workers']}")
print(f" DPI: {best['dpi']}")
print(f" Throughput: {best['throughput']:.2f} pages/s")
print(f"{'='*60}")
if __name__ == "__main__":
print("Advanced PDF Processing Examples")
print("=" * 60)
print("\nChoose an example:")
print("1. Large PDF with optimal workers")
print("2. Chunked processing for massive PDFs")
print("3. Hybrid async + parallel")
print("4. Smart batch processing")
print("5. Resume interrupted processing")
print("6. Quality vs speed comparison")
print("7. Error recovery")
print("8. Custom prompts per page")
print("9. Parallel with post-processing")
print("10. Memory-efficient streaming")
print("11. Benchmark configurations")
choice = input("\nEnter number (1-11): ").strip()
examples = {
'1': example_large_pdf_optimal_workers,
'2': example_chunked_processing,
'3': example_mixed_async_parallel,
'4': example_smart_batch_processing,
'5': example_resume_interrupted_processing,
'6': example_quality_vs_speed,
'7': example_error_recovery,
'8': example_custom_prompts_per_page,
'9': example_parallel_with_post_processing,
'10': example_memory_efficient_streaming,
'11': example_benchmark_configuration,
}
if choice in examples:
print("\n")
examples[choice]()
else:
print("Invalid choice!")
"""Parallel PDF processing with DocumentConverter."""
import asyncio
import concurrent.futures
from pathlib import Path
from typing import List, Dict, Any, Optional
import time
from dataclasses import dataclass
from enum import Enum
from PIL import Image
import fitz # PyMuPDF
from tqdm import tqdm
from converter import DocumentConverter
class OutputFormat(Enum):
"""Output format options."""
MARKDOWN = "markdown"
JSON = "json"
DOCTAGS = "doctags"
@dataclass
class PageResult:
"""Result for a single page."""
page_number: int
success: bool
content: Any
processing_time: float
error: Optional[str] = None
class ParallelPDFProcessor:
"""Process PDF files in parallel using multiple workers."""
def __init__(
self,
model_path: str,
max_workers: Optional[int] = None,
dpi: int = 200
):
"""
Initialize the parallel PDF processor.
Args:
model_path: Path to the Granite Docling model
max_workers: Maximum number of parallel workers (None = CPU count)
dpi: DPI for PDF to image conversion
"""
self.model_path = model_path
self.max_workers = max_workers
self.dpi = dpi
self.converter = None
def _initialize_converter(self):
"""Initialize converter (called in each worker process)."""
if self.converter is None:
self.converter = DocumentConverter(self.model_path)
def pdf_to_images(self, pdf_path: str) -> List[Image.Image]:
"""
Convert PDF pages to PIL Images.
Args:
pdf_path: Path to PDF file
Returns:
List of PIL Image objects
"""
print(f"Converting PDF to images (DPI={self.dpi})...")
doc = fitz.open(pdf_path)
images = []
for page_num in tqdm(range(len(doc)), desc="Extracting pages"):
page = doc[page_num]
# Render page to image
mat = fitz.Matrix(self.dpi / 72, self.dpi / 72)
pix = page.get_pixmap(matrix=mat)
# Convert to PIL Image
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
images.append(img)
doc.close()
print(f"Extracted {len(images)} pages from PDF")
return images
def process_page(
self,
args: tuple
) -> PageResult:
"""
Process a single page (called by worker).
Args:
args: Tuple of (page_number, image, output_format, prompt)
Returns:
PageResult object
"""
page_num, image, output_format, prompt = args
# Initialize converter in this worker if needed
self._initialize_converter()
start_time = time.time()
try:
if output_format == OutputFormat.MARKDOWN:
result = self.converter.convert_to_markdown(image, prompt)
elif output_format == OutputFormat.JSON:
result = self.converter.convert_to_json(image, prompt)
elif output_format == OutputFormat.DOCTAGS:
result = self.converter.convert_to_doctags(image, prompt)
else:
raise ValueError(f"Unknown format: {output_format}")
processing_time = time.time() - start_time
if result["success"]:
return PageResult(
page_number=page_num,
success=True,
content=result["content"],
processing_time=processing_time
)
else:
return PageResult(
page_number=page_num,
success=False,
content=None,
processing_time=processing_time,
error=result["message"]
)
except Exception as e:
processing_time = time.time() - start_time
return PageResult(
page_number=page_num,
success=False,
content=None,
processing_time=processing_time,
error=str(e)
)
def process_pdf_parallel(
self,
pdf_path: str,
output_format: OutputFormat = OutputFormat.MARKDOWN,
prompt: str = "Convert this page to docling.",
start_page: Optional[int] = None,
end_page: Optional[int] = None
) -> List[PageResult]:
"""
Process PDF in parallel using ProcessPoolExecutor.
Args:
pdf_path: Path to PDF file
output_format: Output format (markdown, json, or doctags)
prompt: Conversion prompt
start_page: Starting page number (0-indexed, None = start from beginning)
end_page: Ending page number (0-indexed, None = process until end)
Returns:
List of PageResult objects
"""
# Extract images from PDF
images = self.pdf_to_images(pdf_path)
# Apply page range if specified
if start_page is not None or end_page is not None:
start = start_page or 0
end = end_page + 1 if end_page is not None else len(images)
images = images[start:end]
page_offset = start
else:
page_offset = 0
print(f"\nProcessing {len(images)} pages with {self.max_workers or 'auto'} workers...")
# Prepare arguments for each page
args_list = [
(i + page_offset, img, output_format, prompt)
for i, img in enumerate(images)
]
# Process pages in parallel
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor:
results = list(tqdm(
executor.map(self.process_page, args_list),
total=len(args_list),
desc="Processing pages"
))
total_time = time.time() - start_time
# Print statistics
successful = sum(1 for r in results if r.success)
failed = len(results) - successful
avg_time = sum(r.processing_time for r in results) / len(results)
print(f"\n{'='*60}")
print(f"Processing complete!")
print(f"Total pages: {len(results)}")
print(f"Successful: {successful}")
print(f"Failed: {failed}")
print(f"Total time: {total_time:.2f}s")
print(f"Average time per page: {avg_time:.2f}s")
print(f"Throughput: {len(results)/total_time:.2f} pages/second")
print(f"{'='*60}")
return results
async def process_pdf_async(
self,
pdf_path: str,
output_format: OutputFormat = OutputFormat.MARKDOWN,
prompt: str = "Convert this page to docling.",
start_page: Optional[int] = None,
end_page: Optional[int] = None,
batch_size: int = 10
) -> List[PageResult]:
"""
Process PDF asynchronously using asyncio.
Args:
pdf_path: Path to PDF file
output_format: Output format (markdown, json, or doctags)
prompt: Conversion prompt
start_page: Starting page number (0-indexed)
end_page: Ending page number (0-indexed)
batch_size: Number of pages to process concurrently
Returns:
List of PageResult objects
"""
# Initialize converter
self._initialize_converter()
# Extract images from PDF
images = self.pdf_to_images(pdf_path)
# Apply page range if specified
if start_page is not None or end_page is not None:
start = start_page or 0
end = end_page + 1 if end_page is not None else len(images)
images = images[start:end]
page_offset = start
else:
page_offset = 0
print(f"\nProcessing {len(images)} pages asynchronously (batch_size={batch_size})...")
async def process_page_async(page_num: int, image: Image.Image) -> PageResult:
"""Process single page asynchronously."""
start_time = time.time()
try:
if output_format == OutputFormat.MARKDOWN:
result = await self.converter.convert_to_markdown_async(image, prompt)
elif output_format == OutputFormat.JSON:
result = await self.converter.convert_to_json_async(image, prompt)
elif output_format == OutputFormat.DOCTAGS:
result = await self.converter.convert_to_doctags_async(image, prompt)
else:
raise ValueError(f"Unknown format: {output_format}")
processing_time = time.time() - start_time
if result["success"]:
return PageResult(
page_number=page_num,
success=True,
content=result["content"],
processing_time=processing_time
)
else:
return PageResult(
page_number=page_num,
success=False,
content=None,
processing_time=processing_time,
error=result["message"]
)
except Exception as e:
processing_time = time.time() - start_time
return PageResult(
page_number=page_num,
success=False,
content=None,
processing_time=processing_time,
error=str(e)
)
# Process in batches
results = []
start_time = time.time()
for i in range(0, len(images), batch_size):
batch = images[i:i + batch_size]
batch_tasks = [
process_page_async(j + page_offset, img)
for j, img in enumerate(batch, start=i)
]
batch_results = await asyncio.gather(*batch_tasks)
results.extend(batch_results)
print(f"Processed pages {i+1}-{min(i+batch_size, len(images))}/{len(images)}")
total_time = time.time() - start_time
# Print statistics
successful = sum(1 for r in results if r.success)
failed = len(results) - successful
avg_time = sum(r.processing_time for r in results) / len(results)
print(f"\n{'='*60}")
print(f"Processing complete!")
print(f"Total pages: {len(results)}")
print(f"Successful: {successful}")
print(f"Failed: {failed}")
print(f"Total time: {total_time:.2f}s")
print(f"Average time per page: {avg_time:.2f}s")
print(f"Throughput: {len(results)/total_time:.2f} pages/second")
print(f"{'='*60}")
return results
def save_results(
self,
results: List[PageResult],
output_path: str,
output_format: OutputFormat
):
"""
Save processing results to file.
Args:
results: List of PageResult objects
output_path: Path to save results
output_format: Format of the output
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if output_format == OutputFormat.MARKDOWN:
# Combine all markdown pages
markdown_content = []
for result in sorted(results, key=lambda x: x.page_number):
if result.success:
markdown_content.append(f"# Page {result.page_number + 1}\n\n")
markdown_content.append(result.content)
markdown_content.append("\n\n---\n\n")
else:
markdown_content.append(f"# Page {result.page_number + 1}\n\n")
markdown_content.append(f"*Error: {result.error}*\n\n---\n\n")
with open(output_path, "w", encoding="utf-8") as f:
f.write("".join(markdown_content))
print(f"✓ Markdown saved to: {output_path}")
elif output_format == OutputFormat.JSON:
import json
json_output = {
"total_pages": len(results),
"successful_pages": sum(1 for r in results if r.success),
"pages": []
}
for result in sorted(results, key=lambda x: x.page_number):
page_data = {
"page_number": result.page_number + 1,
"success": result.success,
"processing_time": result.processing_time
}
if result.success:
page_data["content"] = result.content
else:
page_data["error"] = result.error
json_output["pages"].append(page_data)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(json_output, f, indent=2, ensure_ascii=False)
print(f"✓ JSON saved to: {output_path}")
elif output_format == OutputFormat.DOCTAGS:
# Combine all doctags
doctags_content = ['<?xml version="1.0" encoding="UTF-8"?>\n<document>\n']
for result in sorted(results, key=lambda x: x.page_number):
if result.success:
doctags_content.append(f' <page number="{result.page_number + 1}">\n')
doctags_content.append(f" {result.content}\n")
doctags_content.append(' </page>\n')
else:
doctags_content.append(f' <page number="{result.page_number + 1}" error="{result.error}"/>\n')
doctags_content.append('</document>')
with open(output_path, "w", encoding="utf-8") as f:
f.write("".join(doctags_content))
print(f"✓ DocTags saved to: {output_path}")
def example_parallel_processing():
"""Example: Process PDF with parallel workers."""
print("=== PARALLEL PROCESSING EXAMPLE ===\n")
# Initialize processor with 4 workers
processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
max_workers=4,
dpi=200
)
# Process PDF in parallel
results = processor.process_pdf_parallel(
pdf_path="large_document.pdf",
output_format=OutputFormat.MARKDOWN,
prompt="Convert this page to docling."
)
# Save results
processor.save_results(
results=results,
output_path="output/document.md",
output_format=OutputFormat.MARKDOWN
)
def example_async_processing():
"""Example: Process PDF with async/await."""
print("=== ASYNC PROCESSING EXAMPLE ===\n")
async def process():
processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
dpi=200
)
# Process PDF asynchronously
results = await processor.process_pdf_async(
pdf_path="large_document.pdf",
output_format=OutputFormat.MARKDOWN,
batch_size=10
)
# Save results
processor.save_results(
results=results,
output_path="output/document.md",
output_format=OutputFormat.MARKDOWN
)
# Run async function
asyncio.run(process())
def example_page_range():
"""Example: Process specific page range."""
print("=== PAGE RANGE EXAMPLE ===\n")
processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
max_workers=4
)
# Process only pages 10-20
results = processor.process_pdf_parallel(
pdf_path="large_document.pdf",
output_format=OutputFormat.JSON,
start_page=10,
end_page=20
)
processor.save_results(
results=results,
output_path="output/pages_10_20.json",
output_format=OutputFormat.JSON
)
def example_multiple_formats():
"""Example: Process PDF to multiple formats."""
print("=== MULTIPLE FORMATS EXAMPLE ===\n")
processor = ParallelPDFProcessor(
model_path="/path/to/granite-docling-258M",
max_workers=6
)
pdf_path = "document.pdf"
# Process to markdown
print("\n1. Converting to Markdown...")
md_results = processor.process_pdf_parallel(
pdf_path=pdf_path,
output_format=OutputFormat.MARKDOWN
)
processor.save_results(md_results, "output/document.md", OutputFormat.MARKDOWN)
# Process to JSON
print("\n2. Converting to JSON...")
json_results = processor.process_pdf_parallel(
pdf_path=pdf_path,
output_format=OutputFormat.JSON
)
processor.save_results(json_results, "output/document.json", OutputFormat.JSON)
# Process to DocTags
print("\n3. Converting to DocTags...")
doctags_results = processor.process_pdf_parallel(
pdf_path=pdf_path,
output_format=OutputFormat.DOCTAGS
)
processor.save_results(doctags_results, "output/document.xml", OutputFormat.DOCTAGS)
def example_comparison():
"""Example: Compare sequential vs parallel processing."""
print("=== PERFORMANCE COMPARISON ===\n")
import time
from converter import DocumentConverter
pdf_path = "document.pdf"
# Sequential processing
print("1. Sequential Processing:")
start = time.time()
converter = DocumentConverter("/path/to/granite-docling-258M")
processor = ParallelPDFProcessor("/path/to/granite-docling-258M", max_workers=1)
images = processor.pdf_to_images(pdf_path)
sequential_results = []
for i, img in enumerate(images[:10]): # Process first 10 pages
result = converter.convert_to_markdown(img)
sequential_results.append(result)
print(f" Page {i+1}/10 processed")
sequential_time = time.time() - start
print(f" Time: {sequential_time:.2f}s\n")
# Parallel processing
print("2. Parallel Processing (4 workers):")
start = time.time()
processor = ParallelPDFProcessor(
"/path/to/granite-docling-258M",
max_workers=4
)
parallel_results = processor.process_pdf_parallel(
pdf_path=pdf_path,
output_format=OutputFormat.MARKDOWN,
start_page=0,
end_page=9
)
parallel_time = time.time() - start
print(f" Time: {parallel_time:.2f}s\n")
# Comparison
speedup = sequential_time / parallel_time
print(f"{'='*60}")
print(f"Speedup: {speedup:.2f}x faster with parallel processing")
print(f"{'='*60}")
if __name__ == "__main__":
import sys
if len(sys.argv) < 3:
print("Usage: python parallel_processor.py <model_path> <pdf_path> [output_format] [max_workers]")
print("\nExamples:")
print(" python parallel_processor.py /path/to/model document.pdf")
print(" python parallel_processor.py /path/to/model document.pdf markdown 8")
print(" python parallel_processor.py /path/to/model document.pdf json 4")
sys.exit(1)
model_path = sys.argv[1]
pdf_path = sys.argv[2]
output_format = OutputFormat(sys.argv[3]) if len(sys.argv) > 3 else OutputFormat.MARKDOWN
max_workers = int(sys.argv[4]) if len(sys.argv) > 4 else None
# Run parallel processing
processor = ParallelPDFProcessor(
model_path=model_path,
max_workers=max_workers,
dpi=200
)
results = processor.process_pdf_parallel(
pdf_path=pdf_path,
output_format=output_format
)
# Save results
output_ext = {
OutputFormat.MARKDOWN: ".md",
OutputFormat.JSON: ".json",
OutputFormat.DOCTAGS: ".xml"
}
output_path = f"output/{Path(pdf_path).stem}{output_ext[output_format]}"
processor.save_results(results, output_path, output_format)
fastapi==0.104.1
uvicorn[standard]==0.24.0
python-multipart==0.0.6
torch>=2.0.0
transformers>=4.40.0
Pillow>=10.0.0
numpy>=1.24.0
docling-core>=1.0.0
pydantic>=2.0.0
PyMuPDF>=1.23.0
tqdm>=4.66.0
"""Document converter library using Granite Docling model."""
import base64
import html
import io
import re
from pathlib import Path
from typing import Optional, Union, Dict, Any, List
from threading import Thread
import asyncio
import torch
import numpy as np
from PIL import Image, ImageDraw, ImageOps
from docling_core.types.doc import DoclingDocument
from docling_core.types.doc.document import DocTagsDocument
from transformers import (
AutoProcessor,
Idefics3ForConditionalGeneration,
TextIteratorStreamer,
)
class DocumentConverter:
"""Main class for document conversion operations."""
def __init__(self, model_path: str):
"""
Initialize the document converter.
Args:
model_path: Local path to the Granite Docling model
"""
self.model_path = model_path
self.processor = None
self.model = None
self.device = None
self._initialize_model()
def _initialize_model(self):
"""Initialize the model and processor from local path."""
self.device = torch.device(
"cuda" if torch.cuda.is_available()
else "mps" if torch.backends.mps.is_available()
else "cpu"
)
print(f"Loading model from {self.model_path}...")
self.processor = AutoProcessor.from_pretrained(
self.model_path,
local_files_only=True
)
self.model = Idefics3ForConditionalGeneration.from_pretrained(
self.model_path,
device_map=self.device,
torch_dtype=torch.bfloat16,
local_files_only=True
)
if not torch.cuda.is_available():
self.model = self.model.to(self.device)
print(f"Model loaded successfully on {self.device}")
@staticmethod
def clean_model_response(text: str) -> str:
"""Clean up model response by removing special tokens."""
if not text:
return "No response generated."
special_tokens = [
"<|end_of_text|>",
"<|end|>",
"<|assistant|>",
"<|user|>",
"<|system|>",
"<pad>",
"</s>",
"<s>",
]
cleaned = text
for token in special_tokens:
cleaned = cleaned.replace(token, "")
cleaned = cleaned.strip()
if not cleaned or len(cleaned) == 0:
return "The model generated a response, but it appears to be empty."
return cleaned
@staticmethod
def draw_bounding_boxes(
image: Image.Image,
response_text: str,
is_doctag_response: bool = False
) -> Image.Image:
"""Draw bounding boxes on the image based on loc tags."""
try:
image = image.copy()
draw = ImageDraw.Draw(image)
width, height = image.size
# Color mapping for different classes
class_colors = {
"caption": "#FFCC99",
"footnote": "#C8C8FF",
"formula": "#C0C0C0",
"list_item": "#9999FF",
"page_footer": "#CCFFCC",
"page_header": "#CCFFCC",
"picture": "#FFCCA4",
"chart": "#FFCCA4",
"section_header": "#FF9999",
"table": "#FFCCCC",
"text": "#FFFF99",
"title": "#FF9999",
"document_index": "#DCDCDC",
"code": "#7D7D7D",
"paragraph": "#FFFF99",
}
doctag_class_pattern = r"<([^>]+)><loc_(\d+)><loc_(\d+)><loc_(\d+)><loc_(\d+)>[^<]*</[^>]+>"
doctag_matches = re.findall(doctag_class_pattern, response_text)
class_pattern = r"<([^>]+)><loc_(\d+)><loc_(\d+)><loc_(\d+)><loc_(\d+)>"
class_matches = re.findall(class_pattern, response_text)
seen_coords = set()
all_class_matches = []
for match in doctag_matches:
coords = (match[1], match[2], match[3], match[4])
if coords not in seen_coords:
seen_coords.add(coords)
all_class_matches.append(match)
for match in class_matches:
coords = (match[1], match[2], match[3], match[4])
if coords not in seen_coords:
seen_coords.add(coords)
all_class_matches.append(match)
for class_name, xmin, ymin, xmax, ymax in all_class_matches:
if is_doctag_response:
color = class_colors.get(class_name.lower(), "#808080")
else:
color = "#E0115F"
x1 = int((int(xmin) / 500) * width)
y1 = int((int(ymin) / 500) * height)
x2 = int((int(xmax) / 500) * width)
y2 = int((int(ymax) / 500) * height)
draw.rectangle([x1, y1, x2, y2], outline=color, width=3)
return image
except Exception:
return image
@staticmethod
def image_to_base64(image: Image.Image) -> str:
"""Convert PIL Image to base64 string."""
buffered = io.BytesIO()
image.save(buffered, format="PNG")
return base64.b64encode(buffered.getvalue()).decode()
@staticmethod
def base64_to_image(base64_str: str) -> Image.Image:
"""Convert base64 string to PIL Image."""
image_data = base64.b64decode(base64_str)
return Image.open(io.BytesIO(image_data))
def generate_response(
self,
question: str,
image: Union[str, Path, Image.Image]
) -> str:
"""
Generate response using the model.
Args:
question: The question or prompt
image: Path to image file or PIL Image object
Returns:
Model response as string
"""
if isinstance(image, (str, Path)):
image = Image.open(image)
image = image.convert("RGB")
messages = [
{
"role": "user",
"content": [
{"type": "image"},
{"type": "text", "text": question},
],
}
]
prompt = self.processor.apply_chat_template(messages, add_generation_prompt=True)
temperature = 0.0
inputs = self.processor(text=prompt, images=[image], return_tensors="pt")
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.no_grad():
generated_ids = self.model.generate(
**inputs,
max_new_tokens=4096,
temperature=temperature,
do_sample=temperature > 0,
pad_token_id=self.processor.tokenizer.eos_token_id,
)
generated_texts = self.processor.batch_decode(
generated_ids[:, inputs["input_ids"].shape[1]:],
skip_special_tokens=False,
)[0]
return self.clean_model_response(generated_texts)
async def generate_response_async(
self,
question: str,
image: Union[str, Path, Image.Image]
) -> str:
"""
Async version of generate_response.
Args:
question: The question or prompt
image: Path to image file or PIL Image object
Returns:
Model response as string
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.generate_response, question, image)
def generate_response_streaming(
self,
question: str,
image: Union[str, Path, Image.Image]
):
"""
Generate response with streaming (generator).
Args:
question: The question or prompt
image: Path to image file or PIL Image object
Yields:
Chunks of the response as they are generated
"""
if isinstance(image, (str, Path)):
image = Image.open(image)
image = image.convert("RGB")
messages = [
{
"role": "user",
"content": [
{"type": "image"},
{"type": "text", "text": question},
],
}
]
prompt = self.processor.apply_chat_template(messages, add_generation_prompt=True)
temperature = 0.0
inputs = self.processor(text=prompt, images=[image], return_tensors="pt")
inputs = {k: v.to(self.device) for k, v in inputs.items()}
streamer = TextIteratorStreamer(
self.processor,
skip_prompt=True,
skip_special_tokens=False
)
generation_args = dict(
inputs,
streamer=streamer,
max_new_tokens=4096,
temperature=temperature,
do_sample=temperature > 0,
pad_token_id=self.processor.tokenizer.eos_token_id,
)
thread = Thread(target=self.model.generate, kwargs=generation_args)
thread.start()
for new_text in streamer:
yield new_text
thread.join()
def convert_to_markdown(
self,
image: Union[str, Path, Image.Image],
prompt: str = "Convert this page to docling."
) -> Dict[str, Any]:
"""
Convert document to markdown.
Args:
image: Path to image file or PIL Image object
prompt: Conversion prompt
Returns:
Dictionary with conversion results
"""
if isinstance(image, (str, Path)):
image = Image.open(image)
raw_output = self.generate_response(prompt, image)
try:
doctags_doc = DocTagsDocument.from_doctags_and_image_pairs(
[raw_output],
[image]
)
doc = DoclingDocument.load_from_doctags(
doctags_doc,
document_name="Document"
)
markdown_output = doc.export_to_markdown()
return {
"success": True,
"format": "markdown",
"content": markdown_output,
"raw_output": raw_output,
"message": "Successfully converted to markdown"
}
except Exception as e:
return {
"success": False,
"format": "markdown",
"content": "",
"raw_output": raw_output,
"message": f"Error converting to markdown: {str(e)}"
}
async def convert_to_markdown_async(
self,
image: Union[str, Path, Image.Image],
prompt: str = "Convert this page to docling."
) -> Dict[str, Any]:
"""
Async version of convert_to_markdown.
Args:
image: Path to image file or PIL Image object
prompt: Conversion prompt
Returns:
Dictionary with conversion results
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.convert_to_markdown, image, prompt)
def convert_to_doctags(
self,
image: Union[str, Path, Image.Image],
prompt: str = "Convert this page to docling."
) -> Dict[str, Any]:
"""
Convert document to doctags.
Args:
image: Path to image file or PIL Image object
prompt: Conversion prompt
Returns:
Dictionary with conversion results
"""
if isinstance(image, (str, Path)):
image = Image.open(image)
raw_output = self.generate_response(prompt, image)
return {
"success": True,
"format": "doctags",
"content": raw_output,
"raw_output": raw_output,
"message": "Successfully generated doctags"
}
async def convert_to_doctags_async(
self,
image: Union[str, Path, Image.Image],
prompt: str = "Convert this page to docling."
) -> Dict[str, Any]:
"""
Async version of convert_to_doctags.
Args:
image: Path to image file or PIL Image object
prompt: Conversion prompt
Returns:
Dictionary with conversion results
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.convert_to_doctags, image, prompt)
def convert_to_json(
self,
image: Union[str, Path, Image.Image],
prompt: str = "Convert this page to docling."
) -> Dict[str, Any]:
"""
Convert document to JSON dict.
Args:
image: Path to image file or PIL Image object
prompt: Conversion prompt
Returns:
Dictionary with conversion results
"""
if isinstance(image, (str, Path)):
image = Image.open(image)
raw_output = self.generate_response(prompt, image)
try:
doctags_doc = DocTagsDocument.from_doctags_and_image_pairs(
[raw_output],
[image]
)
doc = DoclingDocument.load_from_doctags(
doctags_doc,
document_name="Document"
)
json_output = doc.export_to_dict()
return {
"success": True,
"format": "json",
"content": json_output,
"raw_output": raw_output,
"message": "Successfully converted to JSON"
}
except Exception as e:
return {
"success": False,
"format": "json",
"content": {},
"raw_output": raw_output,
"message": f"Error converting to JSON: {str(e)}"
}
async def convert_to_json_async(
self,
image: Union[str, Path, Image.Image],
prompt: str = "Convert this page to docling."
) -> Dict[str, Any]:
"""
Async version of convert_to_json.
Args:
image: Path to image file or PIL Image object
prompt: Conversion prompt
Returns:
Dictionary with conversion results
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.convert_to_json, image, prompt)
def convert_with_bounding_boxes(
self,
image: Union[str, Path, Image.Image],
prompt: str = "Convert this page to docling.",
return_base64: bool = False
) -> Dict[str, Any]:
"""
Convert document and return with bounding boxes visualization.
Args:
image: Path to image file or PIL Image object
prompt: Conversion prompt
return_base64: If True, return annotated image as base64 string
Returns:
Dictionary with conversion results and annotated image
"""
if isinstance(image, (str, Path)):
image = Image.open(image)
raw_output = self.generate_response(prompt, image)
# Check for location tags
has_doctag = "<doctag>" in raw_output
class_loc_pattern = r"<([^>]+)><loc_(\d+)><loc_(\d+)><loc_(\d+)><loc_(\d+)>"
loc_only_pattern = r"<loc_(\d+)><loc_(\d+)><loc_(\d+)><loc_(\d+)>"
has_loc_tags = bool(
re.findall(class_loc_pattern, raw_output) or
re.findall(loc_only_pattern, raw_output)
)
annotated_image = None
if has_loc_tags:
annotated_image = self.draw_bounding_boxes(image, raw_output, has_doctag)
if return_base64:
annotated_image = self.image_to_base64(annotated_image)
return {
"success": True,
"content": raw_output,
"annotated_image": annotated_image,
"has_bounding_boxes": has_loc_tags
}
async def convert_with_bounding_boxes_async(
self,
image: Union[str, Path, Image.Image],
prompt: str = "Convert this page to docling.",
return_base64: bool = False
) -> Dict[str, Any]:
"""
Async version of convert_with_bounding_boxes.
Args:
image: Path to image file or PIL Image object
prompt: Conversion prompt
return_base64: If True, return annotated image as base64 string
Returns:
Dictionary with conversion results and annotated image
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
self.convert_with_bounding_boxes,
image,
prompt,
return_base64
)
def query_document(
self,
image: Union[str, Path, Image.Image],
question: str
) -> Dict[str, Any]:
"""
Query a document with a specific question.
Args:
image: Path to image file or PIL Image object
question: Question to ask about the document
Returns:
Dictionary with question and answer
"""
if isinstance(image, (str, Path)):
image = Image.open(image)
answer = self.generate_response(question, image)
return {
"success": True,
"question": question,
"answer": answer
}
async def query_document_async(
self,
image: Union[str, Path, Image.Image],
question: str
) -> Dict[str, Any]:
"""
Async version of query_document.
Args:
image: Path to image file or PIL Image object
question: Question to ask about the document
Returns:
Dictionary with question and answer
"""
if isinstance(image, (str, Path)):
image = Image.open(image)
answer = await self.generate_response_async(question, image)
return {
"success": True,
"question": question,
"answer": answer
}
# Example usage
if __name__ == "__main__":
import sys
if len(sys.argv) < 3:
print("Usage: python converter.py <model_path> <image_path>")
print("Example: python converter.py /path/to/granite-docling-258M /path/to/document.png")
sys.exit(1)
model_path = sys.argv[1]
image_path = sys.argv[2]
# Initialize converter
converter = DocumentConverter(model_path)
# Example: Convert to markdown
print("\n=== Converting to Markdown ===")
result = converter.convert_to_markdown(image_path)
if result["success"]:
print(result["content"])
else:
print(f"Error: {result['message']}")
# Example: Query document
print("\n=== Querying Document ===")
result = converter.query_document(image_path, "What is this document about?")
print(f"Q: {result['question']}")
print(f"A: {result['answer']}")
# Example: Convert to JSON
print("\n=== Converting to JSON ===")
result = converter.convert_to_json(image_path)
if result["success"]:
print(f"JSON keys: {list(result['content'].keys())}")
else:
print(f"Error: {result['message']}")
# Example: With bounding boxes
print("\n=== Converting with Bounding Boxes ===")
result = converter.convert_with_bounding_boxes(image_path)
if result["has_bounding_boxes"] and result["annotated_image"]:
result["annotated_image"].save("output_with_boxes.png")
print("Annotated image saved to: output_with_boxes.png")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment