Created
January 4, 2026 12:05
-
-
Save ascopes/82e4fb94578a33b5a7f6cc523ff70d7c to your computer and use it in GitHub Desktop.
Python functional stream operations
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import abc | |
| import itertools | |
| import typing as t | |
| type MapFn[T, U] = t.Callable[[T], U] | |
| type PredicateFn[T] = t.Callable[[T], bool] | |
| type FlatMapFn[T, U] = t.Callable[[T], Stream[U]] | |
| type ReduceFn[C, T] = t.Callable[[C, T], C] | |
| class Stream[T](abc.ABC): | |
| __slots__: t.Sequence[str] = () | |
| @abc.abstractmethod | |
| def __iter__(self) -> t.Iterator[T]: ... | |
| @abc.abstractmethod | |
| def _create_sub_stream[U](self, source: t.Iterator[U]) -> "Stream[U]": ... | |
| def drop(self, n: int) -> "Stream[T]": | |
| i = 0 | |
| @self.drop_while | |
| def dropper(item: T) -> bool: | |
| nonlocal i | |
| will_drop = i < n | |
| i += 1 | |
| return will_drop | |
| return dropper | |
| def drop_while(self, pred: PredicateFn[T]) -> "Stream[T]": | |
| return self._create_sub_stream(itertools.dropwhile(pred, self)) | |
| def flat_map[U](self, fn: FlatMapFn[T, U]) -> "Stream[U]": | |
| return self._create_sub_stream(itertools.chain.from_iterable(map(fn, self))) | |
| def filter(self, pred: PredicateFn[T]) -> "Stream[T]": | |
| return self._create_sub_stream(filter(pred, self)) | |
| def first(self) -> T | None: | |
| try: | |
| return next(self) | |
| except StopIteration: | |
| return None | |
| def map[U](self, fn: MapFn[T, U]) -> "Stream[U]": | |
| return self._create_sub_stream(map(fn, self)) | |
| def reduce[C](self, initial: C, fn: ReduceFn[C, T]) -> C: | |
| current = initial | |
| for item in self: | |
| current = fn(current, item) | |
| return current | |
| def take(self, n: int) -> "Stream[T]": | |
| i = 0 | |
| @self.take_while | |
| def taker(item: T) -> bool: | |
| nonlocal i | |
| will_take = i < n | |
| i += 1 | |
| return will_take | |
| return taker | |
| def take_while(self, pred: PredicateFn[T]) -> "Stream[T]": | |
| return self._create_sub_stream(itertools.takewhile(pred, self)) | |
| def transform[U](self, fn: MapFn[t.Self, U]) -> U: | |
| return fn(self) | |
| def to_sequence(self) -> t.Sequence[T]: | |
| return tuple(self) | |
| def to_set(self) -> t.AbstractSet[T]: | |
| return frozenset(self) | |
| @staticmethod | |
| def of(source: t.Iterator[T]) -> "Stream[T]": | |
| return _IteratorStream(source) | |
| class _IteratorStream[T](Stream[T]): | |
| __slots__ = ("_source",) | |
| def __init__(self, source: t.Iterator[T]) -> None: | |
| self._source = source | |
| def __iter__(self) -> t.Iterator[T]: | |
| return iter(self._source) | |
| def _create_sub_stream[U](self, source: t.Iterator[U]) -> "_IteratorStream[U]": | |
| return _IteratorStream(source) | |
| numbers = (Stream.of(range(100)) | |
| .filter(lambda n: n % 2 == 1) | |
| .map(lambda n: n * 2) | |
| .take(15) | |
| .to_set()) | |
| print(numbers) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment