Skip to content

Instantly share code, notes, and snippets.

@mkbabb
Last active August 14, 2024 17:51
Show Gist options
  • Select an option

  • Save mkbabb/4b6c1fe992a426d44992a1fb510971e8 to your computer and use it in GitHub Desktop.

Select an option

Save mkbabb/4b6c1fe992a426d44992a1fb510971e8 to your computer and use it in GitHub Desktop.
QueuePool

QueuePool: A multiprocessing Pool with Queue integration

This gist contains a Python implementation of QueuePool, a custom class that extends the functionality of multiprocessing.pool.Pool by integrating a shared queue for inter-process communication.

Features

  • Type-safe implementation using generics
  • Context manager support for easy resource management
  • Iterable results through a generator method
    • Works with apply_async insofar as, this generator will block until all processes therein are completed
"""
Minimal QueuePool example with a pandas DataFrame, chunking thereof.
"""
import pandas as pd
from multiprocessing import Queue
import multiprocessing as mp
from typing import Any
import numpy as np
def process_row(
row: pd.Series,
):
n = int(row.name) + 2 # type: ignore
return n, row
def worker(chunk: pd.DataFrame, queue: Queue, **kwargs: Any) -> None:
for _, row in chunk.iterrows():
queue.put(
process_row(
row=row,
**kwargs,
)
)
def main():
df = ...
chunks = np.array_split(df, mp.cpu_count() - 1)
with QueuePool[tuple[int, dict]](
processes=max(mp.cpu_count() - 1, 1), context=mp.get_context("spawn")
) as qp:
for chunk in chunks:
qp.apply_async(
worker,
args=(chunk, qp.queue),
)
for n, row in qp.results():
...
if __name__ == "__main__":
main()
import multiprocessing as mp
from multiprocessing import Queue
from multiprocessing.managers import SyncManager
from multiprocessing.pool import Pool
from queue import Empty
from typing import *
T = TypeVar("T")
class QueuePool(Generic[T], Pool):
def __init__(
self: "QueuePool[T]",
*args,
sentinel: Any | None = None,
**kwargs,
):
super().__init__(*args, **kwargs)
self._qp_queue: Queue[T] | None = None
self._qp_sentinel = sentinel
self._qp_num_complete: int = 0
self._qp_manager: SyncManager | None = None
@property
def queue(self):
return self._qp_queue
def __enter__(self) -> "QueuePool[T]":
self._qp_manager = mp.Manager()
self._qp_queue = self._qp_manager.Queue() # type: ignore
self._qp_num_complete = 0
return super().__enter__()
def __exit__(self, *args, **kwargs):
self._qp_manager.__exit__(*args, **kwargs) # type: ignore
return super().__exit__(*args, **kwargs)
def results(self) -> Iterator[T]:
while self._qp_num_complete < self._processes: # type: ignore
try:
if (item := self._qp_queue.get()) is self._qp_sentinel: # type: ignore
self._qp_num_complete += 1
else:
yield item
except Empty:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment