Skip to content

Instantly share code, notes, and snippets.

@akhundMurad
Last active July 18, 2022 12:42
Show Gist options
  • Select an option

  • Save akhundMurad/f4dcf8d4f42052347da33737da7312e1 to your computer and use it in GitHub Desktop.

Select an option

Save akhundMurad/f4dcf8d4f42052347da33737da7312e1 to your computer and use it in GitHub Desktop.
def build_app() -> FastAPI:
app = FastAPI()
nodepends_provide_dao = lambda: provide_dao(provide_session_factory)
nodepends_provide_service = lambda: provide_service(dao=nodepends_provide_dao())
app.dependency_overrides[provide_service_stub] = nodepends_provide_service
return app
class SomeDAO:
def __init__(self, session_factory: Callable[[], AsyncSession]):
self.session_factory = session_factory
async def some_method(self) -> Any:
async with self.session_factory() as session:
return await session.execute(...) # Just example
def provide_session_factory() -> Callable[[], AsyncSession]:
engine = create_async_engine(
"postgresql+asyncpg://scott:tiger@localhost/test",
echo=True,
)
return sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession
)
def provide_dao(session_factory: Callable[[], AsyncSession]) -> SomeDao:
return SomeDao(session_factory=session_factory)
def provide_service_stub():
...
def provide_service(dao: SomeDao) -> Service:
return Service(dao=dao)
class Service:
def __init__(self, dao: SomeDAO):
self.dao = dao
async def some_method(self) -> Any:
return await self.dao.some_method() # Just example
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment