Last active
January 7, 2026 18:10
-
-
Save DiTo97/bee863029a8e4a18900a9427ab5d042c to your computer and use it in GitHub Desktop.
A collection of utilities for safely executing asynchronous coroutines
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import atexit | |
| import functools | |
| import threading | |
| from typing import Any, Callable, Coroutine, ParamSpec | |
| P = ParamSpec("P") | |
| class _AsyncThread(threading.Thread): | |
| """helper thread class for running async coroutines in a separate thread""" | |
| def __init__(self, coroutine: Coroutine[Any, Any, Any]): | |
| self.coroutine = coroutine | |
| self.result = None | |
| self.exception = None | |
| super().__init__(daemon=True) | |
| def run(self): | |
| try: | |
| self.result = asyncio.run(self.coroutine) | |
| except Exception as e: | |
| self.exception = e | |
| def run_async_safely[T](coroutine: Coroutine[Any, Any, T], timeout: float | None = None) -> T: | |
| """safely runs a coroutine with handling of an existing event loop. | |
| This function detects if there's already a running event loop and uses | |
| a separate thread if needed to avoid the "asyncio.run() cannot be called | |
| from a running event loop" error. This is particularly useful in environments | |
| like Jupyter notebooks, FastAPI applications, or other async frameworks. | |
| Args: | |
| coroutine: The coroutine to run | |
| timeout: max seconds to wait for. None means hanging forever | |
| Returns: | |
| The result of the coroutine | |
| Raises: | |
| Any exception raised by the coroutine | |
| """ | |
| try: | |
| loop = asyncio.get_running_loop() | |
| except RuntimeError: | |
| loop = None | |
| if loop and loop.is_running(): | |
| # There's a running loop, use a separate thread | |
| thread = _AsyncThread(coroutine) | |
| thread.start() | |
| thread.join(timeout=timeout) | |
| if thread.is_alive(): | |
| raise TimeoutError("The operation timed out after %f seconds" % timeout) | |
| if thread.exception: | |
| raise thread.exception | |
| return thread.result | |
| else: | |
| if timeout: | |
| coroutine = asyncio.wait_for(coroutine, timeout) | |
| return asyncio.run(coroutine) | |
| def make_sync(timeout: float | None = None): | |
| """decorator to convert an async function into a sync function. | |
| @make_sync, @make_sync(), or @make_sync(timeout=1.0) | |
| """ | |
| def decorator[T](f: Callable[P, Coroutine[Any, Any, T]]) -> Callable[P, T]: | |
| @functools.wraps(f) | |
| def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: | |
| return run_async_safely(f(*args, **kwargs), timeout=timeout) | |
| return wrapper | |
| # use @make_sync without parentheses | |
| if callable(timeout): | |
| f = timeout | |
| timeout = None | |
| return decorator(f) | |
| return decorator | |
| class AsyncRunner: | |
| """class to run coroutines in a background event loop running in a separate thread""" | |
| def __init__(self): | |
| self._loop = asyncio.new_event_loop() | |
| self._thread = threading.Thread(target=self._loop.run_forever, daemon=True) | |
| self._thread.start() | |
| atexit.register(self._shutdown) | |
| def _shutdown(self): | |
| if self._loop and self._loop.is_running(): | |
| self._loop.call_soon_threadsafe(self._loop.stop) | |
| if self._thread and self._thread.is_alive(): | |
| self._thread.join(timeout=1.0) | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, type, value, traceback): | |
| atexit.unregister(self._shutdown) # closing manually | |
| self._shutdown() | |
| def run[T](self, coroutine: Coroutine[Any, Any, T], timeout: float | None = None) -> T: | |
| """safely submits a coroutine to the background event loop | |
| Args: | |
| coroutine: The coroutine to run | |
| timeout: max seconds to wait for. None means hanging forever | |
| Returns: | |
| The result of the coroutine | |
| """ | |
| if not self._loop or not self._loop.is_running(): | |
| raise RuntimeError("The async runner is not active.") | |
| if timeout: | |
| coroutine = asyncio.wait_for(coroutine, timeout) | |
| future = asyncio.run_coroutine_threadsafe(coroutine, self._loop) | |
| try: | |
| return future.result() | |
| except KeyboardInterrupt: | |
| future.cancel() | |
| raise | |
| except Exception as e: | |
| raise e |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment