Last active
April 28, 2025 07:40
-
-
Save outman/1ae061e5d5fa9b16bc456b88ffc1fa9d to your computer and use it in GitHub Desktop.
python async command cli
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from sqlalchemy import URL | |
| from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession | |
| from sqlalchemy.orm import DeclarativeBase | |
| from app.config.setting import settings | |
| """ | |
| main database | |
| """ | |
| connection_url = URL.create( | |
| 'mysql+aiomysql', | |
| username=settings.database_user, | |
| password=settings.database_pass, | |
| host=settings.database_host, | |
| port=settings.database_port, | |
| database=settings.database_name, | |
| query={'charset': settings.database_charset} | |
| ) | |
| engine = create_async_engine( | |
| connection_url, | |
| echo=bool(settings.debug_sql), | |
| pool_size=settings.database_pool_size, | |
| max_overflow=settings.database_pool_max, | |
| future=True, | |
| ) | |
| SessionLocal = async_sessionmaker( | |
| bind=engine, | |
| class_=AsyncSession, | |
| expire_on_commit=False, | |
| autoflush=False, | |
| ) | |
| """ | |
| game database | |
| """ | |
| connection_url_game = URL.create( | |
| 'mysql+aiomysql', | |
| username=settings.database_game_user, | |
| password=settings.database_game_pass, | |
| host=settings.database_game_host, | |
| port=settings.database_game_port, | |
| database=settings.database_game_name, | |
| query={'charset': settings.database_charset} | |
| ) | |
| engine_game = create_async_engine( | |
| connection_url_game, | |
| echo=bool(settings.debug_sql), | |
| pool_size=settings.database_pool_size, | |
| max_overflow=settings.database_pool_max, | |
| future=True, | |
| ) | |
| SessionGameLocal = async_sessionmaker( | |
| bind=engine_game, | |
| class_=AsyncSession, | |
| expire_on_commit=False, | |
| autoflush=False, | |
| ) | |
| async def dispose_db_engine(): | |
| """ | |
| Exit dispose resource | |
| :return: | |
| """ | |
| await engine.dispose() | |
| await engine_game.dispose() | |
| class Base(DeclarativeBase): | |
| """ | |
| Base class for all models | |
| """ | |
| pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import importlib | |
| import signal | |
| from typing import Annotated | |
| import typer | |
| from app.component.logger import logger | |
| from app.database.db import dispose_db_engine | |
| async def async_main(module: str, handle: str): | |
| module_path = f'app.console.{module}' | |
| import_module = importlib.import_module(module_path) | |
| if not hasattr(import_module, handle): | |
| logger.error(f'module {module} has no handler {handle}') | |
| return | |
| func = getattr(import_module, handle) | |
| if not callable(func): | |
| logger.error(f"module {module} {handle} is not callable") | |
| return | |
| loop = asyncio.get_running_loop() | |
| shutdown_event = asyncio.Event() | |
| def signal_handler(*args): | |
| logger.info(f'signal {args}') | |
| shutdown_event.set() | |
| for sig in (signal.SIGINT, signal.SIGTERM): | |
| if hasattr(signal, sig.name): | |
| loop.add_signal_handler(sig, signal_handler, [sig, ]) | |
| try: | |
| main_task = loop.create_task(func()) | |
| shutdown_task = loop.create_task(shutdown_event.wait()) | |
| down, pending = await asyncio.wait([main_task, shutdown_task], return_when=asyncio.FIRST_COMPLETED) | |
| if shutdown_event.is_set(): | |
| for task in pending: | |
| task.cancel() | |
| await asyncio.gather(*pending, return_exceptions=True) | |
| finally: | |
| await dispose_db_engine() | |
| await asyncio.sleep(5) | |
| def main(module: Annotated[str, typer.Option(help="run module")], | |
| handle: Annotated[str, typer.Option(help="run handle")]): | |
| asyncio.run(async_main(module, handle)) | |
| if __name__ == "__main__": | |
| typer.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment