|
import io |
|
import os |
|
import typing |
|
from pathlib import Path |
|
import concurrent.futures |
|
|
|
import httpx |
|
from bs4 import BeautifulSoup |
|
from rich import print |
|
import pandas as pd |
|
from documentcloud import DocumentCloud |
|
|
|
|
|
def parallelize( |
|
item_list: list[typing.Any], |
|
func: typing.Callable, |
|
func_kwargs: dict[str, typing.Any] | None = None, |
|
max_workers: int | None = None, |
|
) -> list[typing.Any]: |
|
"""Execute a function in parallel across multiple items. |
|
|
|
Args: |
|
item_list: List of items to process |
|
func: The function to execute for each item |
|
func_kwargs: Additional keyword arguments to pass to func |
|
max_workers: Maximum number of worker processes (defaults to CPU count) |
|
|
|
Returns: |
|
List of results in the order they completed |
|
|
|
Example: |
|
>>> def square(x): |
|
... return x * x |
|
>>> items = [1, 2, 3, 4, 5] |
|
>>> results = parallel_execute(items, square) |
|
>>> print(results) # Output: [1, 4, 9, 16, 25] |
|
""" |
|
# Get the number of items to process |
|
n = len(item_list) |
|
|
|
if n == 0: |
|
print("[red]No items to process.[/red]") |
|
return [] |
|
|
|
# If there are no options to the function, set it to an empty dict |
|
if func_kwargs is None: |
|
func_kwargs = {} |
|
|
|
# If max_workers is not set, use the number of CPUs or the number of items, if it's smaller |
|
if max_workers is None: |
|
max_workers = min(os.cpu_count() or 1, n) |
|
|
|
# Announce what we're doing |
|
print( |
|
f"[green]Processing {n} tasks in parallel using {max_workers} workers...[/green]" |
|
) |
|
|
|
# Create a list to hold the results |
|
results = [] |
|
|
|
|
|
# Use a ProcessPoolExecutor to parallelize the function execution across CPU cores |
|
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: |
|
# Bundle up each item with the function call |
|
future_to_item = { |
|
executor.submit(func, item, **func_kwargs): item for item in item_list |
|
} |
|
|
|
# Process results as they complete |
|
for i, future in enumerate(concurrent.futures.as_completed(future_to_item), 1): |
|
try: |
|
result = future.result() |
|
results.append(result) |
|
print(f"[yellow]Finished #{i}/{n}: {result}[/yellow]") |
|
except Exception as e: |
|
item = future_to_item[future] |
|
print(f"[red]Error processing {item}: {e}[/red]") |
|
raise |
|
|
|
print(f"[green]Completed processing {len(results)} tasks[/green]") |
|
return results |
|
|
|
|
|
def scrape_pdf_list() -> Path: |
|
"""Scrape all of the annual finance reports from the Boone County site. |
|
|
|
Returns: |
|
Path to the CSV file containing the list of PDF URLs. |
|
""" |
|
# Get the URL |
|
url = "https://www.showmeboone.com/auditor/financial-reports/" |
|
response = httpx.get(url) |
|
|
|
# Parse the HTML |
|
soup = BeautifulSoup(response.text, "html.parser") |
|
|
|
# Get all PDF links |
|
pdf_links = soup.find_all("a", href=lambda href: href and href.endswith(".pdf")) |
|
pdf_urls = [link["href"] for link in pdf_links if "acfr" in link["href"]] |
|
|
|
# Get the CSV |
|
df = pd.read_csv("pdfs.csv") |
|
|
|
# If the last five aren't in there, add them |
|
new_pdfs = [url for url in pdf_urls if url not in df["url"].values] |
|
if new_pdfs: |
|
new_df = pd.DataFrame(new_pdfs, columns=["url"]) |
|
df = pd.concat([df, new_df], ignore_index=True) |
|
df.to_csv("pdfs.csv", index=False) |
|
print(f"[green]Added {len(new_pdfs)} new PDFs to pdfs.csv[/green]") |
|
|
|
# Return the path to the CSV |
|
return Path("pdfs.csv") |
|
|
|
|
|
def upload_to_documentcloud(pdf_url: str) -> dict: |
|
"""Upload a PDF to DocumentCloud. |
|
|
|
Args: |
|
pdf_url: URL of the PDF to upload. |
|
|
|
Returns: |
|
The DocumentCloud document ID. |
|
""" |
|
# Download the PDF into a file-like object |
|
print(f"[blue]Downloading {pdf_url}...[/blue]") |
|
response = httpx.get(pdf_url, verify=False) |
|
pdf_data = response.content |
|
pdf_file = io.BytesIO(pdf_data) |
|
|
|
# Name the file |
|
pdf_file.name = os.path.basename(pdf_url) |
|
|
|
# Initialize DocumentCloud client |
|
dc = DocumentCloud( |
|
username=os.getenv("DOCUMENTCLOUD_USER"), |
|
password=os.getenv("DOCUMENTCLOUD_PASSWORD"), |
|
) |
|
|
|
# Upload the document |
|
doc = dc.documents.upload(pdf_file) |
|
|
|
print(f"[blue]Uploaded {pdf_url} to DocumentCloud with ID {doc.id}[/blue]") |
|
return { |
|
'url': pdf_url, |
|
'documentcloud_id': doc.id |
|
} |
|
|
|
|
|
def main(): |
|
# Download the list of PDFs |
|
scrape_pdf_list() |
|
|
|
# Upload them to document cloud in parallel |
|
df = pd.read_csv("pdfs.csv") |
|
pdf_urls = df["url"].tolist() |
|
dict_list = parallelize( |
|
item_list=pdf_urls, |
|
func=upload_to_documentcloud, |
|
# Limiting to 4 workers to avoid overloading DocumentCloud. |
|
# It's possible you could go higher, but I'm being cautious. |
|
max_workers=4, |
|
) |
|
|
|
# Save the results back to the CSV in the documentcloud_id column |
|
df = pd.DataFrame(dict_list) |
|
df.to_csv("pdfs.csv", index=False) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |