Skip to content

Instantly share code, notes, and snippets.

@ascopes
Created January 4, 2026 12:05
Show Gist options
  • Select an option

  • Save ascopes/82e4fb94578a33b5a7f6cc523ff70d7c to your computer and use it in GitHub Desktop.

Select an option

Save ascopes/82e4fb94578a33b5a7f6cc523ff70d7c to your computer and use it in GitHub Desktop.
Python functional stream operations
import abc
import itertools
import typing as t
type MapFn[T, U] = t.Callable[[T], U]
type PredicateFn[T] = t.Callable[[T], bool]
type FlatMapFn[T, U] = t.Callable[[T], Stream[U]]
type ReduceFn[C, T] = t.Callable[[C, T], C]
class Stream[T](abc.ABC):
__slots__: t.Sequence[str] = ()
@abc.abstractmethod
def __iter__(self) -> t.Iterator[T]: ...
@abc.abstractmethod
def _create_sub_stream[U](self, source: t.Iterator[U]) -> "Stream[U]": ...
def drop(self, n: int) -> "Stream[T]":
i = 0
@self.drop_while
def dropper(item: T) -> bool:
nonlocal i
will_drop = i < n
i += 1
return will_drop
return dropper
def drop_while(self, pred: PredicateFn[T]) -> "Stream[T]":
return self._create_sub_stream(itertools.dropwhile(pred, self))
def flat_map[U](self, fn: FlatMapFn[T, U]) -> "Stream[U]":
return self._create_sub_stream(itertools.chain.from_iterable(map(fn, self)))
def filter(self, pred: PredicateFn[T]) -> "Stream[T]":
return self._create_sub_stream(filter(pred, self))
def first(self) -> T | None:
try:
return next(self)
except StopIteration:
return None
def map[U](self, fn: MapFn[T, U]) -> "Stream[U]":
return self._create_sub_stream(map(fn, self))
def reduce[C](self, initial: C, fn: ReduceFn[C, T]) -> C:
current = initial
for item in self:
current = fn(current, item)
return current
def take(self, n: int) -> "Stream[T]":
i = 0
@self.take_while
def taker(item: T) -> bool:
nonlocal i
will_take = i < n
i += 1
return will_take
return taker
def take_while(self, pred: PredicateFn[T]) -> "Stream[T]":
return self._create_sub_stream(itertools.takewhile(pred, self))
def transform[U](self, fn: MapFn[t.Self, U]) -> U:
return fn(self)
def to_sequence(self) -> t.Sequence[T]:
return tuple(self)
def to_set(self) -> t.AbstractSet[T]:
return frozenset(self)
@staticmethod
def of(source: t.Iterator[T]) -> "Stream[T]":
return _IteratorStream(source)
class _IteratorStream[T](Stream[T]):
__slots__ = ("_source",)
def __init__(self, source: t.Iterator[T]) -> None:
self._source = source
def __iter__(self) -> t.Iterator[T]:
return iter(self._source)
def _create_sub_stream[U](self, source: t.Iterator[U]) -> "_IteratorStream[U]":
return _IteratorStream(source)
numbers = (Stream.of(range(100))
.filter(lambda n: n % 2 == 1)
.map(lambda n: n * 2)
.take(15)
.to_set())
print(numbers)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment