Skip to content

Instantly share code, notes, and snippets.

@palewire
Last active December 2, 2025 14:28
Show Gist options
  • Select an option

  • Save palewire/7875b515cf48d69922933b7897b61566 to your computer and use it in GitHub Desktop.

Select an option

Save palewire/7875b515cf48d69922933b7897b61566 to your computer and use it in GitHub Desktop.
Python Parallelization Example

Python Parallelization Example

This script demonstrates how to upload multiple PDF documents to DocumentCloud in parallel across your CPU cores using Python. It reads a list of PDF URLs from a CSV file, uploads each document to DocumentCloud, and saves the resulting document IDs back to the CSV.

It was created to demonstrate the use of the parallelize function for concurrent processing, which can significantly speed up tasks. In practice, our data team at Reuters uses this function to spread tens of thousands of operations across dozens of computer cores.

A more basic example would be something like the following:

from main import parallelize

def your_function(item):
    # Process the item
    return item * 2

items = [1, 2, 3, 4, 5]

results = parallelize(items, your_function)
print(results)  # Output: [2, 4, 6, 8, 10]
import io
import os
import typing
from pathlib import Path
import concurrent.futures
import httpx
from bs4 import BeautifulSoup
from rich import print
import pandas as pd
from documentcloud import DocumentCloud
def parallelize(
item_list: list[typing.Any],
func: typing.Callable,
func_kwargs: dict[str, typing.Any] | None = None,
max_workers: int | None = None,
) -> list[typing.Any]:
"""Execute a function in parallel across multiple items.
Args:
item_list: List of items to process
func: The function to execute for each item
func_kwargs: Additional keyword arguments to pass to func
max_workers: Maximum number of worker processes (defaults to CPU count)
Returns:
List of results in the order they completed
Example:
>>> def square(x):
... return x * x
>>> items = [1, 2, 3, 4, 5]
>>> results = parallel_execute(items, square)
>>> print(results) # Output: [1, 4, 9, 16, 25]
"""
# Get the number of items to process
n = len(item_list)
if n == 0:
print("[red]No items to process.[/red]")
return []
# If there are no options to the function, set it to an empty dict
if func_kwargs is None:
func_kwargs = {}
# If max_workers is not set, use the number of CPUs or the number of items, if it's smaller
if max_workers is None:
max_workers = min(os.cpu_count() or 1, n)
# Announce what we're doing
print(
f"[green]Processing {n} tasks in parallel using {max_workers} workers...[/green]"
)
# Create a list to hold the results
results = []
# Use a ProcessPoolExecutor to parallelize the function execution across CPU cores
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
# Bundle up each item with the function call
future_to_item = {
executor.submit(func, item, **func_kwargs): item for item in item_list
}
# Process results as they complete
for i, future in enumerate(concurrent.futures.as_completed(future_to_item), 1):
try:
result = future.result()
results.append(result)
print(f"[yellow]Finished #{i}/{n}: {result}[/yellow]")
except Exception as e:
item = future_to_item[future]
print(f"[red]Error processing {item}: {e}[/red]")
raise
print(f"[green]Completed processing {len(results)} tasks[/green]")
return results
def scrape_pdf_list() -> Path:
"""Scrape all of the annual finance reports from the Boone County site.
Returns:
Path to the CSV file containing the list of PDF URLs.
"""
# Get the URL
url = "https://www.showmeboone.com/auditor/financial-reports/"
response = httpx.get(url)
# Parse the HTML
soup = BeautifulSoup(response.text, "html.parser")
# Get all PDF links
pdf_links = soup.find_all("a", href=lambda href: href and href.endswith(".pdf"))
pdf_urls = [link["href"] for link in pdf_links if "acfr" in link["href"]]
# Get the CSV
df = pd.read_csv("pdfs.csv")
# If the last five aren't in there, add them
new_pdfs = [url for url in pdf_urls if url not in df["url"].values]
if new_pdfs:
new_df = pd.DataFrame(new_pdfs, columns=["url"])
df = pd.concat([df, new_df], ignore_index=True)
df.to_csv("pdfs.csv", index=False)
print(f"[green]Added {len(new_pdfs)} new PDFs to pdfs.csv[/green]")
# Return the path to the CSV
return Path("pdfs.csv")
def upload_to_documentcloud(pdf_url: str) -> dict:
"""Upload a PDF to DocumentCloud.
Args:
pdf_url: URL of the PDF to upload.
Returns:
The DocumentCloud document ID.
"""
# Download the PDF into a file-like object
print(f"[blue]Downloading {pdf_url}...[/blue]")
response = httpx.get(pdf_url, verify=False)
pdf_data = response.content
pdf_file = io.BytesIO(pdf_data)
# Name the file
pdf_file.name = os.path.basename(pdf_url)
# Initialize DocumentCloud client
dc = DocumentCloud(
username=os.getenv("DOCUMENTCLOUD_USER"),
password=os.getenv("DOCUMENTCLOUD_PASSWORD"),
)
# Upload the document
doc = dc.documents.upload(pdf_file)
print(f"[blue]Uploaded {pdf_url} to DocumentCloud with ID {doc.id}[/blue]")
return {
'url': pdf_url,
'documentcloud_id': doc.id
}
def main():
# Download the list of PDFs
scrape_pdf_list()
# Upload them to document cloud in parallel
df = pd.read_csv("pdfs.csv")
pdf_urls = df["url"].tolist()
dict_list = parallelize(
item_list=pdf_urls,
func=upload_to_documentcloud,
# Limiting to 4 workers to avoid overloading DocumentCloud.
# It's possible you could go higher, but I'm being cautious.
max_workers=4,
)
# Save the results back to the CSV in the documentcloud_id column
df = pd.DataFrame(dict_list)
df.to_csv("pdfs.csv", index=False)
if __name__ == "__main__":
main()
url documentcloud_id
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2024acfr.pdf 26325826
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2022acfr.pdf 26325827
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2023acfr.pdf 26325828
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2021acfr.pdf 26325829
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2020acfr.pdf 26325830
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2019acfr.pdf 26325833
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2018acfr.pdf 26325832
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2017acfr.pdf 26325834
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2016acfr.pdf 26325835
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2014acfr.pdf 26325837
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2015acfr.pdf 26325836
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2011acfr.pdf 26325841
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2013acfr.pdf 26325838
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2012acfr.pdf 26325840
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2010acfr.pdf 26325842
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2009acfr.pdf 26325843
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2008acfr.pdf 26325844
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2006acfr.pdf 26325846
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2007acfr.pdf 26325845
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2005acfr.pdf 26325847
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2004acfr.pdf 26325848
https://www.showmeboone.com/auditor/common/pdf/BooneCounty2003acfr.pdf 26325849
[project]
name = "parallelize-example"
version = "0.1.0"
description = "How to use the `parallelize` function for concurrent processing"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"bs4>=0.0.2",
"httpx>=0.28.1",
"jupyterlab>=4.5.0",
"pandas>=2.3.3",
"python-documentcloud>=4.5.0",
"rich>=14.2.0",
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment