Skip to content

Instantly share code, notes, and snippets.

@CallumJHays
Last active January 20, 2026 19:24
Show Gist options
  • Select an option

  • Save CallumJHays/0841c5fdb7b2774d2a0b9b8233689761 to your computer and use it in GitHub Desktop.

Select an option

Save CallumJHays/0841c5fdb7b2774d2a0b9b8233689761 to your computer and use it in GitHub Desktop.
lazy_executor_map

lazy-executor-map

This gist provides a function that acts like concurrent.futures.ThreadPoolExecutor.map(fn, it), but it extracts it lazily rather than eagerly.

Works with both ThreadPoolExecutor and ProcessPoolExecutor.

Usage

from lazy_executor_map import lazy_executor_map
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as ex:
  for output in lazy_executor_map(work, out_of_core_inputs, ex):
    ...

Why?

This is useful for out-of-core iterator pipelines where the entire data source behind out_of_core_inputs doesn''t fit in memory.

One way to implement efficient io multitasking is via ThreadPoolExecutor, which is an implementation of concurrent.futures.Executor. As concurrent.futures.Executor.map(fn, it) collects it eagerly, calling pool.map(fn, dataset) will collect the entire dataset into memory even if dataset is a streaming iterator.

This makes it unsuitable for out-of-core operation. However, it is possible to get this to behave lazily instead. This gist implements that functionality.

from concurrent.futures import Executor, Future, wait, FIRST_COMPLETED
from typing import Callable, Iterable, Iterator, TypeVar
from typing_extensions import TypeVar
In = TypeVar("In")
Out = TypeVar("Out")
def lazy_executor_map(
fn: Callable[[In], Out],
it: Iterable[In],
ex: Executor,
# probably want this to be equal to the n_threads/n_processes
n_concurrent: int = 6
) -> Iterator[Out]:
queue: list[Future[Out]] = []
in_progress: set[Future[Out]] = set()
itr = iter(it)
try:
while True:
for _ in range(n_concurrent - len(in_progress)):
el = next(itr) # this line will raise StopIteration when finished
# - which will get caught by the try: except: below
fut = ex.submit(fn, el)
queue.append(fut)
in_progress.add(fut)
_, in_progress = wait(in_progress, return_when=FIRST_COMPLETED)
# iterate over the queue, yielding outputs if available in the order they came in with
while queue and queue[0].done():
yield queue.pop(0).result()
except StopIteration:
wait(queue)
for fut in queue:
yield fut.result()
from unittest import TestCase
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from lazy_executor_map import lazy_executor_map
from time import time
class TestLazyExecutorMap(TestCase):
def test_threadpool_basic(self):
inputs = list(range(100))
with ThreadPoolExecutor() as ex:
outputs = list(lazy_executor_map(_identity, inputs, ex))
assert inputs == outputs
def test_processpool_basic(self):
inputs = list(range(100))
with ProcessPoolExecutor() as ex:
outputs = list(lazy_executor_map(_identity, inputs, ex))
assert inputs == outputs
def test_performance_acceptable(self):
# test that the lazy executor map is comparable in performance to the classic eager one (+- 10%)
inputs = range(100000)
with ThreadPoolExecutor() as ex:
start = time()
for _ in ex.map(_identity, inputs):
pass
eager_time = time() - start
start = time()
for _ in lazy_executor_map(_identity, inputs, ex):
pass
lazy_time = time() - start
assert 0.9 < lazy_time / eager_time < 1.1
def test_generator_chain(self):
# test that the lazy executor map can be chained with other generators
finished = False
def new_inputs():
nonlocal finished
for x in range(10):
yield x
finished = True
with ThreadPoolExecutor() as ex:
for _ in lazy_executor_map(_identity, new_inputs(), ex, n_concurrent=6):
assert not finished
break
def _identity(x: int):
return x
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment