Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save AntreasAntoniou/8af1200766705b595f7b06f47a340929 to your computer and use it in GitHub Desktop.

Select an option

Save AntreasAntoniou/8af1200766705b595f7b06f47a340929 to your computer and use it in GitHub Desktop.
multi_process_arrow_parquet_example.py
import shutil
import concurrent
import pyarrow.parquet as pq
import pyarrow as pa
import pyarrow.dataset as ds
import numpy as np
import pandas as pd
import pathlib
from rich import print
import tqdm
total_iters = 10000
cwd = pathlib.Path.cwd()
print(f"CWD is {cwd}")
dataset_folder = cwd / "test"
if dataset_folder.exists():
print("Dataset folder exists, deleting it")
shutil.rmtree(dataset_folder)
dataset_folder.mkdir(parents=True, exist_ok=True)
def sample(index: int):
result = [[index**2], [index**3], [index**4]]
return result
def compute_and_add_entry_with_index(index: int):
entry_filepath = dataset_folder / f"{index}.parquet"
if entry_filepath.exists():
try:
table_entry = pq.read_table(entry_filepath)
except Exception as e:
table_entry = pa.table(
[[index], [index**2], [index**3], [index**4], [index]],
names=["key", "square", "cube", "fourth_power", "last_key"],
)
table = table_entry
pq.write_table(table, entry_filepath)
else:
table_entry = pa.table(
[[index], [index**2], [index**3], [index**4], [index]],
names=["key", "square", "cube", "fourth_power", "last_key"],
)
table = table_entry
pq.write_table(table, entry_filepath)
return True
indexes = [i for i in range(total_iters)]
# if pathlib.Path("test.parquet").exists():
# pathlib.Path("test.parquet").unlink()
with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor:
with tqdm.tqdm(total=total_iters) as pbar:
for length in executor.map(compute_and_add_entry_with_index, indexes):
pbar.update(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment