Skip to content

Instantly share code, notes, and snippets.

@atifaziz
Last active November 26, 2025 08:33
Show Gist options
  • Select an option

  • Save atifaziz/b152ba95c490b87f9db947785e50166e to your computer and use it in GitHub Desktop.

Select an option

Save atifaziz/b152ba95c490b87f9db947785e50166e to your computer and use it in GitHub Desktop.
LINQ-like pipeline module for Python (for illustration only)
# Copyright (c) 2025 Atif Aziz
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the “Software”), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
LINQ-like pipeline module for Python.
Enables writing functional transformation pipelines using the pipe operator:
result = seq.range(10) | map(lambda x: x * 2) | filter(lambda x: x > 5) | to_list()
"""
from collections.abc import Callable, Iterable, Iterator
from functools import reduce as functools_reduce
from typing import Any, overload
class Seq[T]:
"""
A lazy sequence wrapper that supports pipeline operations via the | operator.
Wraps an iterable and provides chainable transformations that are evaluated lazily.
"""
def __init__(self, iterable: Iterable[T]) -> None:
self._iterable = iterable
def __iter__(self) -> Iterator[T]:
return iter(self._iterable)
@overload
def __or__[U](self, operation: "Transformer[T, U]") -> "Seq[U]": ...
@overload
def __or__[U](self, operation: "Reducer[T, U]") -> U: ...
def __or__(self, operation: "Transformer[T, Any] | Reducer[T, Any]") -> "Seq[Any] | Any":
"""Apply a transformation or reducer using the pipe operator.
- Transformer: lazy, returns Seq[U]
- Reducer: eager, returns U directly
"""
return operation(self)
def to_list(self) -> list[T]:
"""Materialize the sequence into a list.
Example:
>>> seq.of(1, 2, 3).to_list()
[1, 2, 3]
"""
return list(self._iterable)
def to_tuple(self) -> tuple[T, ...]:
"""Materialize the sequence into a tuple.
Example:
>>> seq.of(1, 2, 3).to_tuple()
(1, 2, 3)
"""
return tuple(self._iterable)
def to_set(self) -> set[T]:
"""Materialize the sequence into a set.
Example:
>>> seq.of(1, 2, 2, 3).to_set()
{1, 2, 3}
"""
return set(self._iterable)
def first(self) -> T:
"""Get the first element. Raises StopIteration if empty.
Example:
>>> seq.of(1, 2, 3).first()
1
"""
return next(iter(self._iterable))
def first_or_default(self, default: T) -> T:
"""Get the first element or a default value if empty.
Example:
>>> seq.of(1, 2, 3).first_or_default(0)
1
>>> seq.empty().first_or_default(0)
0
"""
return next(iter(self._iterable), default)
def count(self) -> int:
"""Count the number of elements in the sequence.
Example:
>>> seq.of(1, 2, 3).count()
3
"""
return sum(1 for _ in self._iterable)
def __repr__(self) -> str:
return f"Seq({self.to_list()!r})"
class Transformer[T, U]:
"""
A transformation that can be applied to a Seq using the | operator.
This is a callable wrapper that takes a Seq[T] and returns a Seq[U].
Transformations are lazy - they return a new Seq without evaluating.
"""
def __init__(self, func: Callable[[Seq[T]], Seq[U]]) -> None:
self._func = func
def __call__(self, seq: Seq[T]) -> Seq[U]:
return self._func(seq)
class Reducer[T, U]:
"""
A reducer that can be applied to a Seq using the | operator.
This is a callable wrapper that takes a Seq[T] and returns a U directly.
Reducers are eager - they immediately evaluate the sequence and return the result.
"""
def __init__(self, func: Callable[[Seq[T]], U]) -> None:
self._func = func
def __call__(self, seq: Seq[T]) -> U:
return self._func(seq)
# --- Sequence Constructors ---
class SeqConstructor:
"""Factory class for creating sequences."""
@staticmethod
def of[T](*items: T) -> Seq[T]:
"""Create a sequence from individual items.
Example:
>>> seq.of(1, 2, 3).to_list()
[1, 2, 3]
"""
return Seq(items)
@staticmethod
def from_iterable[T](iterable: Iterable[T]) -> Seq[T]:
"""Create a sequence from an iterable.
Example:
>>> seq.from_iterable([1, 2, 3]).to_list()
[1, 2, 3]
"""
return Seq(iterable)
@staticmethod
def range(start_or_stop: int, stop: int | None = None, step: int = 1) -> Seq[int]:
"""Create a sequence from a range of integers.
Example:
>>> seq.range(5).to_list()
[0, 1, 2, 3, 4]
>>> seq.range(1, 6, 2).to_list()
[1, 3, 5]
"""
if stop is None:
return Seq(range(start_or_stop))
return Seq(range(start_or_stop, stop, step))
@staticmethod
def empty() -> Seq[Any]:
"""Create an empty sequence.
Example:
>>> seq.empty().to_list()
[]
"""
return Seq([])
@staticmethod
def repeat[T](value: T, count: int) -> Seq[T]:
"""Create a sequence that repeats a value count times.
Example:
>>> seq.repeat("x", 3).to_list()
['x', 'x', 'x']
"""
def generator() -> Iterator[T]:
for _ in range(count):
yield value
return Seq(generator())
# Global instance for convenient access
seq = SeqConstructor()
# --- Transformation Functions ---
def map[T, U](func: Callable[[T], U]) -> Transformer[T, U]:
"""
Create a transform that applies a function to each element.
Example:
seq.of(1, 2, 3) | map(lambda x: x * 2) # Seq([2, 4, 6])
"""
def transform(s: Seq[T]) -> Seq[U]:
return Seq(func(item) for item in s)
return Transformer(transform)
def filter[T](predicate: Callable[[T], bool]) -> Transformer[T, T]:
"""
Create a transform that filters elements based on a predicate.
Example:
seq.of(1, 2, 3, 4) | filter(lambda x: x % 2 == 0) # Seq([2, 4])
"""
def transform(s: Seq[T]) -> Seq[T]:
return Seq(item for item in s if predicate(item))
return Transformer(transform)
def flat_map[T, U](func: Callable[[T], Iterable[U]]) -> Transformer[T, U]:
"""
Create a transform that maps and flattens the result.
Example:
seq.of([1, 2], [3, 4]) | flat_map(lambda x: x) # Seq([1, 2, 3, 4])
"""
def transform(s: Seq[T]) -> Seq[U]:
def generator() -> Iterator[U]:
for item in s:
yield from func(item)
return Seq(generator())
return Transformer(transform)
def flatten[T]() -> Transformer[Iterable[T], T]:
"""
Create a transform that flattens nested iterables.
Example:
seq.of([1, 2], [3, 4]) | flatten() # Seq([1, 2, 3, 4])
"""
return flat_map(lambda x: x)
def take[T](n: int) -> Transformer[T, T]:
"""
Create a transform that takes the first n elements.
Example:
seq.range(10) | take(3) # Seq([0, 1, 2])
"""
def transform(s: Seq[T]) -> Seq[T]:
def generator() -> Iterator[T]:
count = 0
for item in s:
if count >= n:
break
yield item
count += 1
return Seq(generator())
return Transformer(transform)
def skip[T](n: int) -> Transformer[T, T]:
"""
Create a transform that skips the first n elements.
Example:
seq.range(5) | skip(2) # Seq([2, 3, 4])
"""
def transform(s: Seq[T]) -> Seq[T]:
def generator() -> Iterator[T]:
count = 0
for item in s:
if count >= n:
yield item
count += 1
return Seq(generator())
return Transformer(transform)
def take_while[T](predicate: Callable[[T], bool]) -> Transformer[T, T]:
"""
Create a transform that takes elements while the predicate is true.
Example:
seq.of(1, 2, 3, 4, 1) | take_while(lambda x: x < 4) # Seq([1, 2, 3])
"""
def transform(s: Seq[T]) -> Seq[T]:
def generator() -> Iterator[T]:
for item in s:
if not predicate(item):
break
yield item
return Seq(generator())
return Transformer(transform)
def skip_while[T](predicate: Callable[[T], bool]) -> Transformer[T, T]:
"""
Create a transform that skips elements while the predicate is true.
Example:
seq.of(1, 2, 3, 4, 1) | skip_while(lambda x: x < 3) # Seq([3, 4, 1])
"""
def transform(s: Seq[T]) -> Seq[T]:
def generator() -> Iterator[T]:
skipping = True
for item in s:
if skipping and predicate(item):
continue
skipping = False
yield item
return Seq(generator())
return Transformer(transform)
def distinct[T]() -> Transformer[T, T]:
"""
Create a transform that removes duplicate elements (preserves order).
Example:
seq.of(1, 2, 2, 3, 1) | distinct() # Seq([1, 2, 3])
"""
def transform(s: Seq[T]) -> Seq[T]:
def generator() -> Iterator[T]:
seen: set[T] = set()
for item in s:
if item not in seen:
seen.add(item)
yield item
return Seq(generator())
return Transformer(transform)
def sorted_by[T](
key: Callable[[T], Any] | None = None, *, reverse: bool = False
) -> Transformer[T, T]:
"""
Create a transform that sorts elements.
Example:
seq.of(3, 1, 2) | sorted_by() # Seq([1, 2, 3])
seq.of("bb", "a", "ccc") | sorted_by(len) # Seq(["a", "bb", "ccc"])
"""
def transform(s: Seq[T]) -> Seq[T]:
return Seq(sorted(s, key=key, reverse=reverse)) # type: ignore[arg-type]
return Transformer(transform)
def reverse[T]() -> Transformer[T, T]:
"""
Create a transform that reverses the sequence.
Example:
seq.of(1, 2, 3) | reverse() # Seq([3, 2, 1])
"""
def transform(s: Seq[T]) -> Seq[T]:
return Seq(reversed(list(s)))
return Transformer(transform)
def enumerate_seq[T](start: int = 0) -> Transformer[T, tuple[int, T]]:
"""
Create a transform that enumerates elements with their index.
Example:
seq.of("a", "b") | enumerate_seq() # Seq([(0, "a"), (1, "b")])
"""
def transform(s: Seq[T]) -> Seq[tuple[int, T]]:
return Seq(enumerate(s, start=start))
return Transformer(transform)
def zip_with[T, U](other: Iterable[U]) -> Transformer[T, tuple[T, U]]:
"""
Create a transform that zips with another iterable.
Example:
seq.of(1, 2) | zip_with(["a", "b"]) # Seq([(1, "a"), (2, "b")])
"""
def transform(s: Seq[T]) -> Seq[tuple[T, U]]:
return Seq(zip(s, other))
return Transformer(transform)
def group_by[T, U](key_func: Callable[[T], U]) -> Transformer[T, tuple[U, list[T]]]:
"""
Create a transform that groups elements by a key function.
Example:
seq.of(1, 2, 3, 4) | group_by(lambda x: x % 2)
# Seq([(1, [1, 3]), (0, [2, 4])])
"""
def transform(s: Seq[T]) -> Seq[tuple[U, list[T]]]:
groups: dict[U, list[T]] = {}
for item in s:
k = key_func(item)
if k not in groups:
groups[k] = []
groups[k].append(item)
return Seq(groups.items())
return Transformer(transform)
def chunk[T](size: int) -> Transformer[T, list[T]]:
"""
Create a transform that splits the sequence into chunks of the given size.
Example:
seq.range(5) | chunk(2) # Seq([[0, 1], [2, 3], [4]])
"""
def transform(s: Seq[T]) -> Seq[list[T]]:
def generator() -> Iterator[list[T]]:
current_chunk: list[T] = []
for item in s:
current_chunk.append(item)
if len(current_chunk) == size:
yield current_chunk
current_chunk = []
if current_chunk:
yield current_chunk
return Seq(generator())
return Transformer(transform)
# --- Terminal Operations as Transforms ---
def reduce[T, U](func: Callable[[U, T], U], initial: U) -> Transformer[T, U]:
"""
Create a transform that reduces the sequence to a single value (wrapped in Seq).
Example:
result = seq.of(1, 2, 3) | reduce(lambda acc, x: acc + x, 0)
result.first() # 6
"""
def transform(s: Seq[T]) -> Seq[U]:
result = functools_reduce(func, s, initial)
return Seq([result])
return Transformer(transform)
def do[T](func: Callable[[T], None]) -> Transformer[T, T]:
"""
Create a transform that applies a side-effect function to each element.
Returns the original sequence unchanged.
Example:
seq.of(1, 2, 3) | do(print) | to_list()
"""
def transform(s: Seq[T]) -> Seq[T]:
def generator() -> Iterator[T]:
for item in s:
func(item)
yield item
return Seq(generator())
return Transformer(transform)
# --- Reducers (Materialization with immediate evaluation) ---
def to_list[T]() -> Reducer[T, list[T]]:
"""
Create a reducer that materializes the sequence into a list.
Example:
>>> seq.of(1, 2, 3) | to_list()
[1, 2, 3]
"""
return Reducer(lambda s: list(s))
def to_tuple[T]() -> Reducer[T, tuple[T, ...]]:
"""
Create a reducer that materializes the sequence into a tuple.
Example:
>>> seq.of(1, 2, 3) | to_tuple()
(1, 2, 3)
"""
return Reducer(lambda s: tuple(s))
def to_set[T]() -> Reducer[T, set[T]]:
"""
Create a reducer that materializes the sequence into a set.
Example:
>>> seq.of(1, 2, 2, 3) | to_set()
{1, 2, 3}
"""
return Reducer(lambda s: set(s))
def first[T]() -> Reducer[T, T]:
"""
Create a reducer that gets the first element.
Raises StopIteration if empty.
Example:
>>> seq.of(1, 2, 3) | first()
1
"""
return Reducer(lambda s: next(iter(s)))
def first_or_default[T](default: T) -> Reducer[T, T]:
"""
Create a reducer that gets the first element or a default value if empty.
Example:
>>> seq.of(1, 2, 3) | first_or_default(0)
1
>>> seq.empty() | first_or_default(0)
0
"""
return Reducer(lambda s: next(iter(s), default))
def count() -> Reducer[Any, int]:
"""
Create a reducer that counts the number of elements.
Example:
>>> seq.of(1, 2, 3) | count()
3
"""
return Reducer(lambda s: sum(1 for _ in s))
# --- Convenience aliases that shadow builtins ---
# Users can import these explicitly if they want to use them
# from linq import map, filter # These shadow builtins
__all__ = [
# Core types
"Seq",
"Transformer",
"Reducer",
# Constructor
"seq",
"SeqConstructor",
# Transforms
"map",
"filter",
"flat_map",
"flatten",
"take",
"skip",
"take_while",
"skip_while",
"distinct",
"sorted_by",
"reverse",
"enumerate_seq",
"zip_with",
"group_by",
"chunk",
"reduce",
"do",
# Reducers (materialization with immediate evaluation)
"to_list",
"to_tuple",
"to_set",
"first",
"first_or_default",
"count",
]
def main() -> None:
"""Demonstrate all functions, transformations, and operations."""
print("=== LINQ-style Pipeline Library Demo ===\n")
# --- Sequence Constructors ---
print("--- Sequence Constructors ---\n")
print("seq.of(1, 2, 3) | to_list()")
print(f" → {seq.of(1, 2, 3) | to_list()}\n")
print("seq.from_iterable([1, 2, 3]) | to_list()")
print(f" → {seq.from_iterable([1, 2, 3]) | to_list()}\n")
print("seq.range(5) | to_list()")
print(f" → {seq.range(5) | to_list()}\n")
print("seq.range(1, 6, 2) | to_list()")
print(f" → {seq.range(1, 6, 2) | to_list()}\n")
print("seq.empty() | to_list()")
print(f" → {seq.empty() | to_list()}\n")
print('seq.repeat("x", 3) | to_list()')
print(f" → {seq.repeat('x', 3) | to_list()}\n")
# --- Reducers (Materialization) ---
print("--- Reducers (Materialization) ---\n")
print("seq.of(1, 2, 3) | to_list()")
print(f" → {seq.of(1, 2, 3) | to_list()}\n")
print("seq.of(1, 2, 3) | to_tuple()")
print(f" → {seq.of(1, 2, 3) | to_tuple()}\n")
print("seq.of(1, 2, 2, 3) | to_set()")
print(f" → {seq.of(1, 2, 2, 3) | to_set()}\n")
print("seq.of(1, 2, 3) | first()")
print(f" → {seq.of(1, 2, 3) | first()}\n")
print("seq.of(1, 2, 3) | first_or_default(0)")
print(f" → {seq.of(1, 2, 3) | first_or_default(0)}\n")
print("seq.empty() | first_or_default(0)")
print(f" → {seq.empty() | first_or_default(0)}\n")
print("seq.of(1, 2, 3) | count()")
print(f" → {seq.of(1, 2, 3) | count()}\n")
# --- Transformations ---
print("--- Transformations ---\n")
print("seq.of(1, 2, 3) | map(lambda x: x * 2) | to_list()")
print(f" → {seq.of(1, 2, 3) | map(lambda x: x * 2) | to_list()}\n")
print("seq.of(1, 2, 3, 4) | filter(lambda x: x % 2 == 0) | to_list()")
print(f" → {seq.of(1, 2, 3, 4) | filter(lambda x: x % 2 == 0) | to_list()}\n")
print("seq.of([1, 2], [3, 4]) | flat_map(lambda x: x) | to_list()")
print(f" → {seq.of([1, 2], [3, 4]) | flat_map(lambda x: x) | to_list()}\n")
print("seq.of([1, 2], [3, 4]) | flatten() | to_list()")
print(f" → {seq.of([1, 2], [3, 4]) | flatten() | to_list()}\n")
print("seq.range(10) | take(3) | to_list()")
print(f" → {seq.range(10) | take(3) | to_list()}\n")
print("seq.range(5) | skip(2) | to_list()")
print(f" → {seq.range(5) | skip(2) | to_list()}\n")
print("seq.of(1, 2, 3, 4, 1) | take_while(lambda x: x < 4) | to_list()")
print(f" → {seq.of(1, 2, 3, 4, 1) | take_while(lambda x: x < 4) | to_list()}\n")
print("seq.of(1, 2, 3, 4, 1) | skip_while(lambda x: x < 3) | to_list()")
print(f" → {seq.of(1, 2, 3, 4, 1) | skip_while(lambda x: x < 3) | to_list()}\n")
print("seq.of(1, 2, 2, 3, 1) | distinct() | to_list()")
print(f" → {seq.of(1, 2, 2, 3, 1) | distinct() | to_list()}\n")
print("seq.of(3, 1, 2) | sorted_by() | to_list()")
print(f" → {seq.of(3, 1, 2) | sorted_by() | to_list()}\n")
print('seq.of("bb", "a", "ccc") | sorted_by(len) | to_list()')
print(f" → {seq.of('bb', 'a', 'ccc') | sorted_by(len) | to_list()}\n")
print("seq.of(1, 2, 3) | reverse() | to_list()")
print(f" → {seq.of(1, 2, 3) | reverse() | to_list()}\n")
print('seq.of("a", "b") | enumerate_seq() | to_list()')
print(f" → {seq.of('a', 'b') | enumerate_seq() | to_list()}\n")
print('seq.of(1, 2) | zip_with(["a", "b"]) | to_list()')
print(f" → {seq.of(1, 2) | zip_with(['a', 'b']) | to_list()}\n")
print("seq.of(1, 2, 3, 4) | group_by(lambda x: x % 2) | to_list()")
print(f" → {seq.of(1, 2, 3, 4) | group_by(lambda x: x % 2) | to_list()}\n")
print("seq.range(5) | chunk(2) | to_list()")
print(f" → {seq.range(5) | chunk(2) | to_list()}\n")
print("seq.of(1, 2, 3) | reduce(lambda acc, x: acc + x, 0) | first()")
result = seq.of(1, 2, 3) | reduce(lambda acc, x: acc + x, 0) | first()
print(f" → {result}\n")
print("seq.of(1, 2, 3) | do(print) | to_list() # prints each element")
print(" → ", end="")
seq.of(1, 2, 3) | do(lambda x: print(x, end=" ")) | to_list()
print("\n")
# --- Chained Pipeline Example ---
print("--- Chained Pipeline Example ---\n")
print(
"seq.range(10) | map(lambda x: x * 2) | filter(lambda x: x % 4 == 0) | take(3) | to_list()"
)
result = (
seq.range(10) | map(lambda x: x * 2) | filter(lambda x: x % 4 == 0) | take(3) | to_list()
)
print(f" → {result}\n")
print("=== Demo Complete ===")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment