Skip to content

Instantly share code, notes, and snippets.

@akhundMurad
Created July 18, 2022 08:37
Show Gist options
  • Select an option

  • Save akhundMurad/ad3a24295f75de454365cdc94851051c to your computer and use it in GitHub Desktop.

Select an option

Save akhundMurad/ad3a24295f75de454365cdc94851051c to your computer and use it in GitHub Desktop.
# providers.py
def provide_dao() -> SomeDAO:
return SomeDAO()
def provide_service(dao_provider: Callable([], SomeDAO)) -> Service:
return Service(dao=dao_provider())
# tasks.py
def some_task(service_provider: Callable[[], Service]) -> None:
service = service_provider()
service.calling_some_method()
# celery.py
from functools import partial
def build_celery() -> Celery:
app = Celery()
nodepends_provide_service = partial(provide_service, dao_provider=provide_dao)
nodepends_some_task = partial(some_task, service_provider=nodepends_provide_service)
app.registry.add_task(nodepends_some_task)
return app
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment