Skip to content

Instantly share code, notes, and snippets.

@venil7
Created December 22, 2025 15:11
Show Gist options
  • Select an option

  • Save venil7/2c96716a16784a63199730507efc6e36 to your computer and use it in GitHub Desktop.

Select an option

Save venil7/2c96716a16784a63199730507efc6e36 to your computer and use it in GitHub Desktop.
import asyncio
import random
from collections.abc import Callable
from faker import Faker
from sqlalchemy.ext.asyncio import (
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import MappedColumn, declarative_base, mapped_column
Base = declarative_base()
class User(Base):
__tablename__ = "user"
id: MappedColumn[int] = mapped_column(primary_key=True)
name: MappedColumn[str] = mapped_column(nullable=False)
age: MappedColumn[int] = mapped_column(nullable=False)
def __repr__(self):
return str((self.id, self.name, self.age))
faker = Faker()
server_queue = asyncio.Queue()
database_queue = asyncio.Queue()
db_url = "postgresql+asyncpg://----"
engine = create_async_engine(db_url)
session_factory = async_sessionmaker(
engine,
# expire_on_commit=False,
# autoflush=False,
)
async def stream_handler(
instream: Callable,
handler: Callable,
buffer_size=1,
):
ins = []
while True:
inx = await instream()
if inx is None:
await handler(ins)
await handler(None)
return
ins.append(inx)
if len(ins) >= buffer_size:
await handler(ins)
ins = []
async def get_from_server(id: int) -> User:
await asyncio.sleep(random.random())
# print(f"received {id} from server")
return User(id=id, name=faker.name(), age=faker.port_number())
async def save_database(users: list[User]):
async with session_factory() as session:
print(f"saving {(users[0], users[-1])}")
session.add_all(users)
return await session.commit()
async def server_handler1(buffer_size: int):
async def handle_item(id: int):
await database_queue.put(await get_from_server(id))
async def handler(ids: list[int] | None):
if ids:
await asyncio.gather(*map(handle_item, ids))
else:
await database_queue.put(None)
return await stream_handler(
instream=server_queue.get,
handler=handler,
buffer_size=buffer_size,
)
async def database_handler1(buffer_size: int):
async def handler(users: list[User] | None):
if users:
await save_database(users)
return await stream_handler(
instream=database_queue.get,
handler=handler,
buffer_size=buffer_size,
)
async def main():
s, d = (
asyncio.create_task(server_handler1(200)),
asyncio.create_task(database_handler1(200)),
)
for id in range(1000_000):
await server_queue.put(id)
await server_queue.put(None)
await asyncio.gather(s, d)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment