Created
October 6, 2022 16:29
-
-
Save AntreasAntoniou/8af1200766705b595f7b06f47a340929 to your computer and use it in GitHub Desktop.
multi_process_arrow_parquet_example.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import shutil | |
| import concurrent | |
| import pyarrow.parquet as pq | |
| import pyarrow as pa | |
| import pyarrow.dataset as ds | |
| import numpy as np | |
| import pandas as pd | |
| import pathlib | |
| from rich import print | |
| import tqdm | |
| total_iters = 10000 | |
| cwd = pathlib.Path.cwd() | |
| print(f"CWD is {cwd}") | |
| dataset_folder = cwd / "test" | |
| if dataset_folder.exists(): | |
| print("Dataset folder exists, deleting it") | |
| shutil.rmtree(dataset_folder) | |
| dataset_folder.mkdir(parents=True, exist_ok=True) | |
| def sample(index: int): | |
| result = [[index**2], [index**3], [index**4]] | |
| return result | |
| def compute_and_add_entry_with_index(index: int): | |
| entry_filepath = dataset_folder / f"{index}.parquet" | |
| if entry_filepath.exists(): | |
| try: | |
| table_entry = pq.read_table(entry_filepath) | |
| except Exception as e: | |
| table_entry = pa.table( | |
| [[index], [index**2], [index**3], [index**4], [index]], | |
| names=["key", "square", "cube", "fourth_power", "last_key"], | |
| ) | |
| table = table_entry | |
| pq.write_table(table, entry_filepath) | |
| else: | |
| table_entry = pa.table( | |
| [[index], [index**2], [index**3], [index**4], [index]], | |
| names=["key", "square", "cube", "fourth_power", "last_key"], | |
| ) | |
| table = table_entry | |
| pq.write_table(table, entry_filepath) | |
| return True | |
| indexes = [i for i in range(total_iters)] | |
| # if pathlib.Path("test.parquet").exists(): | |
| # pathlib.Path("test.parquet").unlink() | |
| with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor: | |
| with tqdm.tqdm(total=total_iters) as pbar: | |
| for length in executor.map(compute_and_add_entry_with_index, indexes): | |
| pbar.update(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment