Created
December 22, 2025 15:11
-
-
Save venil7/2c96716a16784a63199730507efc6e36 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import random | |
| from collections.abc import Callable | |
| from faker import Faker | |
| from sqlalchemy.ext.asyncio import ( | |
| async_sessionmaker, | |
| create_async_engine, | |
| ) | |
| from sqlalchemy.orm import MappedColumn, declarative_base, mapped_column | |
| Base = declarative_base() | |
| class User(Base): | |
| __tablename__ = "user" | |
| id: MappedColumn[int] = mapped_column(primary_key=True) | |
| name: MappedColumn[str] = mapped_column(nullable=False) | |
| age: MappedColumn[int] = mapped_column(nullable=False) | |
| def __repr__(self): | |
| return str((self.id, self.name, self.age)) | |
| faker = Faker() | |
| server_queue = asyncio.Queue() | |
| database_queue = asyncio.Queue() | |
| db_url = "postgresql+asyncpg://----" | |
| engine = create_async_engine(db_url) | |
| session_factory = async_sessionmaker( | |
| engine, | |
| # expire_on_commit=False, | |
| # autoflush=False, | |
| ) | |
| async def stream_handler( | |
| instream: Callable, | |
| handler: Callable, | |
| buffer_size=1, | |
| ): | |
| ins = [] | |
| while True: | |
| inx = await instream() | |
| if inx is None: | |
| await handler(ins) | |
| await handler(None) | |
| return | |
| ins.append(inx) | |
| if len(ins) >= buffer_size: | |
| await handler(ins) | |
| ins = [] | |
| async def get_from_server(id: int) -> User: | |
| await asyncio.sleep(random.random()) | |
| # print(f"received {id} from server") | |
| return User(id=id, name=faker.name(), age=faker.port_number()) | |
| async def save_database(users: list[User]): | |
| async with session_factory() as session: | |
| print(f"saving {(users[0], users[-1])}") | |
| session.add_all(users) | |
| return await session.commit() | |
| async def server_handler1(buffer_size: int): | |
| async def handle_item(id: int): | |
| await database_queue.put(await get_from_server(id)) | |
| async def handler(ids: list[int] | None): | |
| if ids: | |
| await asyncio.gather(*map(handle_item, ids)) | |
| else: | |
| await database_queue.put(None) | |
| return await stream_handler( | |
| instream=server_queue.get, | |
| handler=handler, | |
| buffer_size=buffer_size, | |
| ) | |
| async def database_handler1(buffer_size: int): | |
| async def handler(users: list[User] | None): | |
| if users: | |
| await save_database(users) | |
| return await stream_handler( | |
| instream=database_queue.get, | |
| handler=handler, | |
| buffer_size=buffer_size, | |
| ) | |
| async def main(): | |
| s, d = ( | |
| asyncio.create_task(server_handler1(200)), | |
| asyncio.create_task(database_handler1(200)), | |
| ) | |
| for id in range(1000_000): | |
| await server_queue.put(id) | |
| await server_queue.put(None) | |
| await asyncio.gather(s, d) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment