Skip to content

Instantly share code, notes, and snippets.

@ayaksvals
Created September 23, 2024 16:39
Show Gist options
  • Select an option

  • Save ayaksvals/02bac651a51ccf2cbc63c361afc28f0c to your computer and use it in GitHub Desktop.

Select an option

Save ayaksvals/02bac651a51ccf2cbc63c361afc28f0c to your computer and use it in GitHub Desktop.
Merge Sort on Pandas (Unix utility)
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 44,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"import os\n",
"import tempfile\n",
"import pyarrow.parquet as pq\n",
"\n",
"import concurrent.futures\n",
"from functools import partial\n",
"from concurrent.futures import as_completed"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\"\"\" 1.4G comressed file \"\"\"\n",
"\n",
"\" Pairtools \"\n",
"# real 7m31.938s\n",
"# user 19m48.562s\n",
"# sys 13m21.282s\n",
"\n",
"\n",
"\" Pandas merge sort \" \n",
"# 15 min just to chunk data and to sort blocks\n",
"# 85 min to merge files\n",
"# 99\n",
"\n",
"\" Parquet merge sort \"\n",
"# 10 min to chunk data + sort blocks\n",
"# 5 min to merge\n",
"# 15\n",
"\"\"\n",
"\n",
"\" Parquet merge sort parallel\"\n",
"# 5 min to chunk data + sort blocks\n",
"# 5 min to merge\n",
"# 10\n",
"\n",
"\"\""
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>read_id</th>\n",
" <th>chr1</th>\n",
" <th>pos1</th>\n",
" <th>chr2</th>\n",
" <th>pos2</th>\n",
" <th>strand1</th>\n",
" <th>strand2</th>\n",
" <th>pairs_type</th>\n",
" <th>score1</th>\n",
" <th>score2</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003235</td>\n",
" <td>chr1</td>\n",
" <td>3003446</td>\n",
" <td>+</td>\n",
" <td>-</td>\n",
" <td>DD</td>\n",
" <td>60</td>\n",
" <td>60</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003235</td>\n",
" <td>chr1</td>\n",
" <td>3003446</td>\n",
" <td>+</td>\n",
" <td>-</td>\n",
" <td>DD</td>\n",
" <td>60</td>\n",
" <td>60</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003256</td>\n",
" <td>chr1</td>\n",
" <td>3005828</td>\n",
" <td>-</td>\n",
" <td>+</td>\n",
" <td>DD</td>\n",
" <td>57</td>\n",
" <td>60</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003264</td>\n",
" <td>chr1</td>\n",
" <td>3003342</td>\n",
" <td>-</td>\n",
" <td>-</td>\n",
" <td>DD</td>\n",
" <td>60</td>\n",
" <td>60</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003279</td>\n",
" <td>chr1</td>\n",
" <td>88256867</td>\n",
" <td>-</td>\n",
" <td>-</td>\n",
" <td>DD</td>\n",
" <td>60</td>\n",
" <td>60</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" read_id chr1 pos1 chr2 pos2 strand1 strand2 pairs_type score1 \\\n",
"0 . chr1 3003235 chr1 3003446 + - DD 60 \n",
"1 . chr1 3003235 chr1 3003446 + - DD 60 \n",
"2 . chr1 3003256 chr1 3005828 - + DD 57 \n",
"3 . chr1 3003264 chr1 3003342 - - DD 60 \n",
"4 . chr1 3003279 chr1 88256867 - - DD 60 \n",
"\n",
" score2 \n",
"0 60 \n",
"1 60 \n",
"2 60 \n",
"3 60 \n",
"4 60 "
]
},
"execution_count": 51,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"file_path='large_input.csv'\n",
"df = pd.read_csv(file_path, compression='gzip', nrows=5, skiprows=500, delimiter='\\t', names=[\"read_id\", \"chr1\", \"pos1\", \"chr2\", \"pos2\", \"strand1\", \"strand2\", \"pairs_type\", \"score1\", \"score2\"], header=None)\n",
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# PARQUET PARALLEL VERSION"
]
},
{
"cell_type": "code",
"execution_count": 60,
"metadata": {},
"outputs": [],
"source": [
"def process_block(block, block_idx, temp_dir, merge_chunk_size, sort_columns):\n",
" # Sort the block\n",
" block_sorted = block.sort_values(by=sort_columns)\n",
" \n",
" # Write to temp file as Parquet\n",
" temp_file = os.path.join(temp_dir, f\"block_{block_idx}.parquet\")\n",
" block_sorted.to_parquet(temp_file, index=False, engine='pyarrow', row_group_size=merge_chunk_size)\n",
" \n",
" print(f\" Processed block {block_idx}\")\n",
" return temp_file # Return the temp file name for tracking\n"
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {},
"outputs": [],
"source": [
"def process_in_parallel(input_file, temp_dir, input_chunk_size, merge_chunk_size, sort_columns, n_jobs=4):\n",
" print(f\"Temporary directory created at: {temp_dir}\")\n",
" \n",
" \n",
" # Step 1: Initial Chunking and Sorting\n",
" print(\"Step 1: Chunking and initial sorting...\")\n",
" \n",
"\n",
" # engine Pyarrow add \n",
" reader = pd.read_csv(input_file, chunksize=input_chunk_size, compression='gzip', skiprows=500, \n",
" delimiter='\\t', names=[\"read_id\", \"chr1\", \"pos1\", \"chr2\", \"pos2\", \n",
" \"strand1\", \"strand2\", \"pairs_type\", \"score1\", \"score2\"], \n",
" header=None)\n",
" temp_files = []\n",
" # Use a ProcessPoolExecutor to parallelize the processing\n",
" with concurrent.futures.ProcessPoolExecutor(max_workers=n_jobs) as executor:\n",
" futures = []\n",
" block_idx = 0\n",
" \n",
" for block in reader:\n",
" # Submit the task to the executor\n",
" futures.append(executor.submit(process_block, block, block_idx, temp_dir, merge_chunk_size, sort_columns))\n",
" print(f\"Added block: {block_idx}\")\n",
" block_idx += 1\n",
" \n",
" # Collect the results as they complete\n",
" \n",
" for futureTemp in as_completed(futures):\n",
" temp_files.append(futureTemp.result()) # Append the temp file name\n",
" \n",
" return temp_files"
]
},
{
"cell_type": "code",
"execution_count": 62,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Temporary directory created at: /users/slavska.olesia/scratch/slavska.olesia\n",
"Step 1: Chunking and initial sorting...\n",
"Added block: 0\n",
"Added block: 1\n",
" Processed block 0\n",
"Added block: 2\n",
" Processed block 1\n",
"Added block: 3\n",
" Processed block 2\n",
"Added block: 4\n",
" Processed block 3\n",
"Added block: 5\n",
" Processed block 4\n",
"Added block: 6\n",
" Processed block 5\n",
"Added block: 7\n",
" Processed block 6\n",
"Added block: 8\n",
" Processed block 7\n",
"Added block: 9\n",
" Processed block 8\n",
"Added block: 10\n",
" Processed block 9\n",
"Added block: 11\n",
" Processed block 10\n",
"Added block: 12\n",
" Processed block 11\n",
"Added block: 13\n",
" Processed block 12\n",
"Added block: 14\n",
" Processed block 13\n",
"Added block: 15\n",
" Processed block 14\n",
"Added block: 16\n",
" Processed block 15\n",
"Added block: 17\n",
" Processed block 16\n",
"Added block: 18\n",
" Processed block 17\n",
"Added block: 19\n",
" Processed block 18\n",
"Added block: 20\n",
"Added block: 21\n",
" Processed block 21\n",
" Processed block 19\n",
" Processed block 20\n"
]
}
],
"source": [
"input_file = 'large_input.csv'\n",
"temp_dir = 'temp_dir/'\n",
"input_chunk_size = 10000000 \n",
"merge_chunk_size = input_chunk_size // 10\n",
"sort_columns = [\"chr1\", \"pos1\", \"chr2\", \"pos2\"] # Columns to sort by\n",
"n_jobs = 4 \n",
"\n",
"\n",
"# Run the parallelized processing\n",
"temp_files = process_in_parallel(input_file, temp_dir, input_chunk_size, merge_chunk_size, sort_columns, n_jobs)\n"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Temporary directory created at: /users/slavska.olesia/scratch/slavska.olesia\n",
"Step 1: Chunking and initial sorting...\n",
" Processed chunk 1\n",
" Processed chunk 2\n",
" Processed chunk 3\n",
" Processed chunk 4\n",
" Processed chunk 5\n",
" Processed chunk 6\n",
" Processed chunk 7\n",
" Processed chunk 8\n",
" Processed chunk 9\n",
" Processed chunk 10\n",
" Processed chunk 11\n",
" Processed chunk 12\n",
" Processed chunk 13\n",
" Processed chunk 14\n",
" Processed chunk 15\n",
" Processed chunk 16\n",
" Processed chunk 17\n",
" Processed chunk 18\n",
" Processed chunk 19\n",
" Processed chunk 20\n",
" Processed chunk 21\n",
" Processed chunk 22\n"
]
}
],
"source": [
"# Not a parallel Version of first step, use as alternative\n",
"\n",
"# Read csv, divide into blocks, sort and save to parquet\n",
"\n",
"temp_dir = 'temp_dir/'\n",
"print(f\"Temporary directory created at: {temp_dir}\")\n",
"temp_files = []\n",
"\n",
"# Step 1: Initial Chunking and Sorting\n",
"print(\"Step 1: Chunking and initial sorting...\")\n",
"reader = pd.read_csv(input_file, chunksize=input_chunk_size, compression='gzip', skiprows=500, delimiter='\\t', names=[\"read_id\", \"chr1\", \"pos1\", \"chr2\", \"pos2\", \"strand1\", \"strand2\", \"pairs_type\", \"score1\", \"score2\"], header=None)\n",
"block_idx = 0\n",
"\n",
"merge_chunk_size = input_chunk_size // 10\n",
"\n",
"for block in reader:\n",
" # Sort the block\n",
" block_sorted = block.sort_values(by=sort_columns)\n",
" # Write to temp file without 'last_in_chunk'\n",
" temp_file = os.path.join(temp_dir, f\"block_{block_idx}.csv\")\n",
" block_sorted.to_parquet(temp_file, index=False, engine='pyarrow', row_group_size=merge_chunk_size)\n",
" temp_files.append(temp_file)\n",
" block_idx += 1\n",
" print(f\" Processed block {block_idx}\")"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Step 2: Merging sorted chunks...\n"
]
}
],
"source": [
"print(\"Step 2: Merging sorted chunks...\")\n",
"# Initialize readers for each temp file\n",
"# adding the iterator itself to the blocks list, not the actual data\n",
"blocks = []\n",
"\n",
"for temp_file in temp_files:\n",
" block = pq.ParquetFile(temp_file)\n",
" blocks.append(block)\n",
"\n",
"# Initialize current chunks (the current data from each temp file)\n",
"current_chunks = []\n",
"active_files = len(blocks)\n",
"\n",
"\n",
"for block in blocks:\n",
" try:\n",
" current_chunk = block.read_row_group(0).to_pandas()\n",
" #print(f\"Current_chunk: {current_chunk}\")\n",
" # Add 'last_in_chunk' column here\n",
" current_chunk['last_in_chunk'] = False\n",
" current_chunk.iloc[-1, current_chunk.columns.get_loc('last_in_chunk')] = True\n",
" current_chunks.append(current_chunk)\n",
" except StopIteration:\n",
" current_chunks.append(pd.DataFrame())\n",
" active_files -= 1\n",
"\n",
"carryover_data = pd.DataFrame()\n",
"\n",
"max_row_groups = max([block.num_row_groups for block in blocks]) \n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with open(output_file, 'w') as outfile:\n",
" # This flag is used to write the header only once\n",
" header_written = False\n",
" print(\"File opened\")\n",
" for row_group_index in range(1, max_row_groups):\n",
" print(f\"More files to come {active_files}\")\n",
" # Concatenate current chunks and carryover data\n",
" data_frames = [df for df in current_chunks if not df.empty]\n",
" data_frames.append(carryover_data)\n",
" merged_chunk = pd.concat(data_frames, ignore_index=True)\n",
" \n",
" # If merged_chunk is empty, break the loop\n",
" if merged_chunk.empty:\n",
" break\n",
"\n",
" # Sort the merged chunk\n",
" merged_chunk.sort_values(by=sort_columns, inplace=True)\n",
" merged_chunk.reset_index(drop=True, inplace=True)\n",
" \n",
" # Find the position of the first 'last_in_chunk' == True\n",
" last_in_chunk_idx = merged_chunk.index[merged_chunk['last_in_chunk'] == True]\n",
" print(f\"Last in chunk id: {last_in_chunk_idx}\")\n",
" if not last_in_chunk_idx.empty:\n",
" split_idx = last_in_chunk_idx[0] + 1\n",
" # Data to write to output\n",
" data_to_output = merged_chunk.iloc[:split_idx]\n",
" print(f\"Output in file: {data_to_output}\")\n",
" # Data to carry over to next iteration\n",
" carryover_data = merged_chunk.iloc[split_idx:].copy()\n",
" print(f\"Continue to work with: {carryover_data}\")\n",
" carryover_data['last_in_chunk'] = False # Reset 'last_in_chunk' flags\n",
" else:\n",
" # If no 'last_in_chunk' found, carry all data to next iteration\n",
" data_to_output = pd.DataFrame()\n",
" carryover_data = merged_chunk\n",
"\n",
" # Write data_to_output to the output file\n",
"\n",
" if not data_to_output.empty:\n",
" if not header_written:\n",
" print(\"First Output\")\n",
" data_to_output.to_csv(outfile, index=False, header=True)\n",
" header_written = True\n",
" else:\n",
" data_to_output.to_csv(\n",
" outfile, index=False, header=False)\n",
" # Read next chunks from temp files\n",
" \n",
" for i in range(len(blocks)):\n",
" if (not current_chunks[i].empty) and (row_group_index < blocks[i].num_row_groups):\n",
" try:\n",
" # Read the next chunk\n",
" print(f\"There are more chunks to process: {i}\")\n",
" print(\"Before: \")\n",
" print(current_chunks[i])\n",
"\n",
" current_chunks[i] = blocks[i].read_row_group(row_group_index).to_pandas() \n",
" print(\"After: \")\n",
" print(current_chunks[i])\n",
"\n",
"\n",
" # Add 'last_in_chunk' column here\n",
" current_chunks[i]['last_in_chunk'] = False\n",
" current_chunks[i].iloc[-1, current_chunks[i].columns.get_loc('last_in_chunk')] = True\n",
" \n",
" except StopIteration:\n",
" current_chunks[i] = pd.DataFrame()\n",
" active_files -= 1\n",
" break \n",
"\n",
" # Write any remaining carryover data\n",
" if not carryover_data.empty:\n",
" carryover_data.drop(columns=['last_in_chunk'], inplace=True)\n",
" if not header_written:\n",
" carryover_data.to_csv(outfile, index=False, header=True)\n",
" else:\n",
" carryover_data.to_csv(outfile, index=False, header=False)\n",
" \n",
"\n",
"# Clean up temporary files\n",
"print(\"Cleaning up temporary files...\")\n",
"for temp_file in temp_files:\n",
" os.remove(temp_file)\n",
"#os.rmdir(temp_dir)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Parquet temp version\n",
"\n",
"def external_sort(input_file, output_file, sort_columns, input_chunk_size=10000000):\n",
" \"\"\"\n",
" Sorts a large CSV file using external sorting with pandas.\n",
" \n",
" Parameters:\n",
" - input_file: Path to the input CSV file.\n",
" - output_file: Path where the sorted CSV file will be saved.\n",
" - sort_columns: List of column names to sort by.\n",
" - input_chunk_size: Number of rows to read at a time during initial chunking.\n",
" - merge_chunk_size: Number of rows to read from each temp file during merging.\n",
" \"\"\"\n",
" temp_dir = ''\n",
" print(f\"Temporary directory created at: {temp_dir}\")\n",
" temp_files = []\n",
"\n",
" # Step 1: Initial Chunking and Sorting\n",
" print(\"Step 1: Chunking and initial sorting...\")\n",
" reader = pd.read_csv(input_file, chunksize=input_chunk_size, compression='gzip', skiprows=500, delimiter='\\t', names=[\"read_id\", \"chr1\", \"pos1\", \"chr2\", \"pos2\", \"strand1\", \"strand2\", \"pairs_type\", \"score1\", \"score2\"], header=None)\n",
" chunk_idx = 0\n",
"\n",
" merge_chunk_size = input_chunk_size // 10\n",
"\n",
" for chunk in reader:\n",
" # Sort the chunk\n",
" chunk_sorted = chunk.sort_values(by=sort_columns)\n",
" # Write to temp file without 'last_in_chunk'\n",
" temp_file = os.path.join(temp_dir, f\"chunk_{chunk_idx}.csv\")\n",
" chunk_sorted.to_parquet(temp_file, index=False, engine='pyarrow', row_group_size=merge_chunk_size)\n",
" temp_files.append(temp_file)\n",
" chunk_idx += 1\n",
" print(f\" Processed chunk {chunk_idx}\")\n",
"\n",
" # Step 2: Merging Sorted Chunks\n",
" print(\"Step 2: Merging sorted chunks...\")\n",
" \n",
" # Initialize readers for each temp file\n",
" # adding the iterator itself to the blocks list, not the actual data\n",
" blocks = []\n",
" \n",
" for temp_file in temp_files:\n",
" block = pq.ParquetFile(temp_file)\n",
" blocks.append(block)\n",
"\n",
"\n",
"\n",
" # Initialize current chunks (the current data from each temp file)\n",
" current_chunks = []\n",
" active_files = len(blocks)\n",
"\n",
"\n",
" for block in blocks:\n",
" try:\n",
" current_chunk = block.read_row_group(0).to_pandas()\n",
" #print(f\"Current_chunk: {current_chunk}\")\n",
" # Add 'last_in_chunk' column here\n",
" current_chunk['last_in_chunk'] = False\n",
" current_chunk.iloc[-1, current_chunk.columns.get_loc('last_in_chunk')] = True\n",
" current_chunks.append(current_chunk)\n",
" except StopIteration:\n",
" current_chunks.append(pd.DataFrame())\n",
" active_files -= 1\n",
"\n",
" carryover_data = pd.DataFrame()\n",
"\n",
" max_row_groups = max([block.num_row_groups for block in blocks]) \n",
"\n",
" # Open the output file for writing\n",
" with open(output_file, 'w') as outfile:\n",
" # This flag is used to write the header only once\n",
" header_written = False\n",
" print(\"File opened\")\n",
" for row_group_index in range(1, max_row_groups):\n",
" print(f\"More files to come {active_files}\")\n",
" # Concatenate current chunks and carryover data\n",
" data_frames = [df for df in current_chunks if not df.empty]\n",
" data_frames.append(carryover_data)\n",
" merged_chunk = pd.concat(data_frames, ignore_index=True)\n",
" \n",
" # If merged_chunk is empty, break the loop\n",
" if merged_chunk.empty:\n",
" break\n",
"\n",
" # Sort the merged chunk\n",
" merged_chunk.sort_values(by=sort_columns, inplace=True)\n",
" merged_chunk.reset_index(drop=True, inplace=True)\n",
" \n",
" # Find the position of the first 'last_in_chunk' == True\n",
" last_in_chunk_idx = merged_chunk.index[merged_chunk['last_in_chunk'] == True]\n",
" print(f\"Last in chunk id: {last_in_chunk_idx}\")\n",
" if not last_in_chunk_idx.empty:\n",
" split_idx = last_in_chunk_idx[0] + 1\n",
" # Data to write to output\n",
" data_to_output = merged_chunk.iloc[:split_idx]\n",
" print(f\"Output in file: {data_to_output}\")\n",
" # Data to carry over to next iteration\n",
" carryover_data = merged_chunk.iloc[split_idx:].copy()\n",
" print(f\"Continue to work with: {carryover_data}\")\n",
" carryover_data['last_in_chunk'] = False # Reset 'last_in_chunk' flags\n",
" else:\n",
" # If no 'last_in_chunk' found, carry all data to next iteration\n",
" data_to_output = pd.DataFrame()\n",
" carryover_data = merged_chunk\n",
"\n",
" # Write data_to_output to the output file\n",
"\n",
" if not data_to_output.empty:\n",
" if not header_written:\n",
" print(\"First Output\")\n",
" data_to_output.to_csv(outfile, index=False, header=True)\n",
" header_written = True\n",
" else:\n",
" data_to_output.to_csv(\n",
" outfile, index=False, header=False)\n",
" # Read next chunks from temp files\n",
" \n",
" for i in range(len(blocks)):\n",
" if not current_chunks[i].empty:\n",
" try:\n",
" # Read the next chunk\n",
"\n",
" print(f\"There are more chunks to process: {i}\")\n",
" print(\"Before: \")\n",
" print(current_chunks[i])\n",
"\n",
" current_chunk[i] = blocks[i].read_row_group(row_group_index).to_pandas() \n",
" print(\"After: \")\n",
" print(current_chunks[i])\n",
"\n",
"\n",
" # Add 'last_in_chunk' column here\n",
" current_chunks[i]['last_in_chunk'] = False\n",
" current_chunks[i].iloc[-1, current_chunks[i].columns.get_loc('last_in_chunk')] = True\n",
" \n",
" except StopIteration:\n",
" current_chunks[i] = pd.DataFrame()\n",
" active_files -= 1\n",
" break \n",
"\n",
" # Write any remaining carryover data\n",
" if not carryover_data.empty:\n",
" carryover_data.drop(columns=['last_in_chunk'], inplace=True)\n",
" if not header_written:\n",
" carryover_data.to_csv(outfile, index=False, header=True)\n",
" else:\n",
" carryover_data.to_csv(outfile, index=False, header=False)\n",
" \n",
"\n",
" # Clean up temporary files\n",
" print(\"Cleaning up temporary files...\")\n",
" for temp_file in temp_files:\n",
" os.remove(temp_file)\n",
" os.rmdir(temp_dir)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"external_sort('large_input.csv', 'sorted_output.csv', sort_columns=['chr1', 'chr2', 'pos1', 'pos2'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# CSV temp version\n",
"\n",
"def external_sort(input_file, output_file, sort_columns, input_chunk_size=10000000):\n",
" \"\"\"\n",
" Sorts a large CSV file using external sorting with pandas.\n",
" \n",
" Parameters:\n",
" - input_file: Path to the input CSV file.\n",
" - output_file: Path where the sorted CSV file will be saved.\n",
" - sort_columns: List of column names to sort by.\n",
" - input_chunk_size: Number of rows to read at a time during initial chunking.\n",
" - merge_chunk_size: Number of rows to read from each temp file during merging.\n",
" \"\"\"\n",
" #temp_dir = tempfile.mkdtemp()\n",
" temp_dir = '/users/slavska.olesia/scratch/slavska.olesia'\n",
" print(f\"Temporary directory created at: {temp_dir}\")\n",
" temp_files = []\n",
"\n",
" # Step 1: Initial Chunking and Sorting\n",
" print(\"Step 1: Chunking and initial sorting...\")\n",
" reader = pd.read_csv(input_file, chunksize=input_chunk_size, compression='gzip', skiprows=500, delimiter='\\t', names=[\"read_id\", \"chr1\", \"pos1\", \"chr2\", \"pos2\", \"strand1\", \"strand2\", \"pairs_type\", \"score1\", \"score2\"], header=None)\n",
" chunk_idx = 0\n",
"\n",
" merge_chunk_size=input_chunk_size // chunk_idx\n",
"\n",
" for chunk in reader:\n",
" # Sort the chunk\n",
" chunk_sorted = chunk.sort_values(by=sort_columns)\n",
" # Write to temp file without 'last_in_chunk'\n",
" temp_file = os.path.join(temp_dir, f\"chunk_{chunk_idx}.csv\")\n",
" chunk_sorted.to_csv(temp_file, index=False)\n",
" temp_files.append(temp_file)\n",
" chunk_idx += 1\n",
" print(f\" Processed chunk {chunk_idx}\")\n",
"\n",
" # Step 2: Merging Sorted Chunks\n",
" print(\"Step 2: Merging sorted chunks...\")\n",
" \n",
" \n",
" \n",
" \n",
" # Initialize readers for each temp file\n",
"\n",
"\n",
" # adding the iterator itself to the blocks list, not the actual data\n",
" blocks = []\n",
" \n",
" for temp_file in temp_files:\n",
" block = pd.read_csv(temp_file) #returns iterator\n",
" blocks.append(block)\n",
"\n",
" # Initialize current chunks (the current data from each temp file)\n",
" current_chunks = []\n",
" active_files = len(blocks)\n",
" \n",
" for reader in blocks:\n",
" try:\n",
" current_chunk = next(reader)\n",
" print(f\"Current_chunk: {current_chunk}\")\n",
" # Add 'last_in_chunk' column here\n",
" current_chunk['last_in_chunk'] = False\n",
" current_chunk.iloc[-1, current_chunk.columns.get_loc('last_in_chunk')] = True\n",
" current_chunks.append(current_chunk)\n",
" except StopIteration:\n",
" current_chunks.append(pd.DataFrame())\n",
" active_files -= 1\n",
"\n",
" carryover_data = pd.DataFrame()\n",
"\n",
" # Open the output file for writing\n",
" with open(output_file, 'w') as outfile:\n",
" # This flag is used to write the header only once\n",
" header_written = False\n",
" print(\"File opened\")\n",
" while active_files > 0:\n",
" print(f\"More files to come {active_files}\")\n",
" # Concatenate current chunks and carryover data\n",
" data_frames = [df for df in current_chunks if not df.empty]\n",
" data_frames.append(carryover_data)\n",
" merged_chunk = pd.concat(data_frames, ignore_index=True)\n",
" \n",
" # If merged_chunk is empty, break the loop\n",
" if merged_chunk.empty:\n",
" break\n",
" # Sort the merged chunk\n",
" merged_chunk.sort_values(by=sort_columns, inplace=True)\n",
" merged_chunk.reset_index(drop=True, inplace=True)\n",
"\n",
" \n",
" \n",
" # Find the position of the first 'last_in_chunk' == True\n",
" last_in_chunk_idx = merged_chunk.index[merged_chunk['last_in_chunk'] == True]\n",
" print(f\"Last in chunk id: {last_in_chunk_idx}\")\n",
" if not last_in_chunk_idx.empty:\n",
" split_idx = last_in_chunk_idx[0] + 1\n",
" # Data to write to output\n",
" data_to_output = merged_chunk.iloc[:split_idx]\n",
" print(f\"Output in file: {data_to_output}\")\n",
" # Data to carry over to next iteration\n",
" carryover_data = merged_chunk.iloc[split_idx:].copy()\n",
" print(f\"Continue to work with: {carryover_data}\")\n",
" carryover_data['last_in_chunk'] = False # Reset 'last_in_chunk' flags\n",
" else:\n",
" # If no 'last_in_chunk' found, carry all data to next iteration\n",
" data_to_output = pd.DataFrame()\n",
" carryover_data = merged_chunk\n",
"\n",
" # Write data_to_output to the output file\n",
"\n",
" if not data_to_output.empty:\n",
" if not header_written:\n",
" print(\"First Output\")\n",
" data_to_output.to_csv(outfile, index=False, header=True)\n",
" header_written = True\n",
" else:\n",
" data_to_output.to_csv(\n",
" outfile, index=False, header=False)\n",
" # Read next chunks from temp files\n",
" \n",
" for i in range(len(blocks)):\n",
" if not current_chunks[i].empty:\n",
" try:\n",
" # Read the next chunk\n",
" print(f\"There are more chunks to process: {i}\")\n",
" print(\"Before: \")\n",
" print(current_chunks[i])\n",
"\n",
" current_chunks[i] = next(blocks[i])\n",
" \n",
" print(\"After: \")\n",
" print(current_chunks[i])\n",
"\n",
"\n",
" # Add 'last_in_chunk' column here\n",
" current_chunks[i]['last_in_chunk'] = False\n",
" current_chunks[i].iloc[-1, current_chunks[i].columns.get_loc('last_in_chunk')] = True\n",
" \n",
" except StopIteration:\n",
" current_chunks[i] = pd.DataFrame()\n",
" active_files -= 1\n",
" \n",
"\n",
" # Write any remaining carryover data\n",
" if not carryover_data.empty:\n",
" carryover_data.drop(columns=['last_in_chunk'], inplace=True)\n",
" if not header_written:\n",
" carryover_data.to_csv(outfile, index=False, header=True)\n",
" else:\n",
" carryover_data.to_csv(outfile, index=False, header=False)\n",
" \n",
"\n",
" # Clean up temporary files\n",
" print(\"Cleaning up temporary files...\")\n",
" for temp_file in temp_files:\n",
" os.remove(temp_file)\n",
" os.rmdir(temp_dir)\n",
"\n",
"# Example usage:\n",
"# external_sort('large_input.csv', 'sorted_output.csv', sort_columns=['column_to_sort_by'])"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Generate sample data\n",
"\n",
"num_rows = 1000000 # Adjust the size as needed for testing\n",
"data = {\n",
" 'id': np.arange(num_rows),\n",
" 'value': np.random.randint(0, 100000, size=num_rows)\n",
"}\n",
"df = pd.DataFrame(data)\n",
"df.to_csv('large_input.csv', index=False)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment