Skip to content

Instantly share code, notes, and snippets.

@outman
Last active April 28, 2025 07:40
Show Gist options
  • Select an option

  • Save outman/1ae061e5d5fa9b16bc456b88ffc1fa9d to your computer and use it in GitHub Desktop.

Select an option

Save outman/1ae061e5d5fa9b16bc456b88ffc1fa9d to your computer and use it in GitHub Desktop.
python async command cli
from sqlalchemy import URL
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase
from app.config.setting import settings
"""
main database
"""
connection_url = URL.create(
'mysql+aiomysql',
username=settings.database_user,
password=settings.database_pass,
host=settings.database_host,
port=settings.database_port,
database=settings.database_name,
query={'charset': settings.database_charset}
)
engine = create_async_engine(
connection_url,
echo=bool(settings.debug_sql),
pool_size=settings.database_pool_size,
max_overflow=settings.database_pool_max,
future=True,
)
SessionLocal = async_sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)
"""
game database
"""
connection_url_game = URL.create(
'mysql+aiomysql',
username=settings.database_game_user,
password=settings.database_game_pass,
host=settings.database_game_host,
port=settings.database_game_port,
database=settings.database_game_name,
query={'charset': settings.database_charset}
)
engine_game = create_async_engine(
connection_url_game,
echo=bool(settings.debug_sql),
pool_size=settings.database_pool_size,
max_overflow=settings.database_pool_max,
future=True,
)
SessionGameLocal = async_sessionmaker(
bind=engine_game,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)
async def dispose_db_engine():
"""
Exit dispose resource
:return:
"""
await engine.dispose()
await engine_game.dispose()
class Base(DeclarativeBase):
"""
Base class for all models
"""
pass
import asyncio
import importlib
import signal
from typing import Annotated
import typer
from app.component.logger import logger
from app.database.db import dispose_db_engine
async def async_main(module: str, handle: str):
module_path = f'app.console.{module}'
import_module = importlib.import_module(module_path)
if not hasattr(import_module, handle):
logger.error(f'module {module} has no handler {handle}')
return
func = getattr(import_module, handle)
if not callable(func):
logger.error(f"module {module} {handle} is not callable")
return
loop = asyncio.get_running_loop()
shutdown_event = asyncio.Event()
def signal_handler(*args):
logger.info(f'signal {args}')
shutdown_event.set()
for sig in (signal.SIGINT, signal.SIGTERM):
if hasattr(signal, sig.name):
loop.add_signal_handler(sig, signal_handler, [sig, ])
try:
main_task = loop.create_task(func())
shutdown_task = loop.create_task(shutdown_event.wait())
down, pending = await asyncio.wait([main_task, shutdown_task], return_when=asyncio.FIRST_COMPLETED)
if shutdown_event.is_set():
for task in pending:
task.cancel()
await asyncio.gather(*pending, return_exceptions=True)
finally:
await dispose_db_engine()
await asyncio.sleep(5)
def main(module: Annotated[str, typer.Option(help="run module")],
handle: Annotated[str, typer.Option(help="run handle")]):
asyncio.run(async_main(module, handle))
if __name__ == "__main__":
typer.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment