Last active
October 26, 2025 00:39
-
-
Save sjlongland/e4419cc83922286a62a00366697a2bce to your computer and use it in GitHub Desktop.
Running a background AsyncIO thread without blocking main thread
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import threading | |
| import functools | |
| import logging | |
| import asyncio | |
| import time | |
| import sys | |
| class BackgroundAsyncThread(object): | |
| def __init__(self, name="BGThread"): | |
| self._name = name | |
| self._thread = None | |
| self._loop = None | |
| self._evt = threading.Event() | |
| self._exc_info = None | |
| self._log = logging.getLogger(self.__class__.__name__) | |
| async def start(self): | |
| # Ensure an existing loop is not present and not being | |
| # constructed already. | |
| assert self._loop is None, "Loop already present" | |
| assert self._thread is None, "Thread already present" | |
| assert not self._evt.is_set(), "Thread start/stop in progress" | |
| # Create and start a new thread | |
| self._log.debug("Starting background thread") | |
| self._thread = threading.Thread( | |
| name=self._name, target=self._thread_main | |
| ) | |
| self._thread.start() | |
| # Wait for it to notify us on our readiness | |
| self._log.debug("Waiting for start-up") | |
| self._evt.wait() | |
| # Clear the event flag for later use. | |
| self._log.debug("Clearing event flag") | |
| self._evt.clear() | |
| # If there was an exception in the thread loop, report it | |
| if self._exc_info is not None: | |
| self._log.debug("Background thread failed to start") | |
| _, exc, traceback = self._exc_info | |
| raise exc.with_traceback(traceback) | |
| self._log.debug("Thread is now started") | |
| async def stop(self): | |
| # Ensure a loop is present for us to stop | |
| assert self._loop is not None, "Loop not present" | |
| assert self._thread is not None, "Thread not present" | |
| assert not self._evt.is_set(), "Thread start/stop in progress" | |
| # Schedule a callback that calls loop.stop() | |
| self._log.debug("Requesting shutdown") | |
| self._loop.call_soon_threadsafe(self._loop.stop) | |
| # Wait for the event loop thread to tell us it's done | |
| self._log.debug("Waiting for shutdown") | |
| self._evt.wait() | |
| # Clean up the thread | |
| self._log.debug("Waiting for thread to stop") | |
| self._thread.join() | |
| self._evt.clear() | |
| self._thread = None | |
| self._log.debug("Thread is now stopped") | |
| # If there was an exception in the thread loop, report it | |
| if self._exc_info is not None: | |
| self._log.debug("Background thread encountered an exception") | |
| _, exc, traceback = self._exc_info | |
| raise exc.with_traceback(traceback) | |
| async def run(self, fn): | |
| assert self._loop is not None, "No loop running" | |
| assert not self._evt.is_set(), "Thread start/stop in progress" | |
| # Set up a container for results | |
| result = {} | |
| done = asyncio.Event() | |
| # Create a function wrapper | |
| def _run_fn(): | |
| try: | |
| result["ret"] = fn() | |
| except: | |
| result["exc"] = sys.exc_info() | |
| finally: | |
| done.set() | |
| # Schedule the wrapper to run in the event loop | |
| self._loop.call_soon_threadsafe(_run_fn) | |
| # Wait for it | |
| await done.wait() | |
| # Did it work? | |
| if "exc" in result: | |
| # It failed | |
| (_, exc, traceback) = result["exc"] | |
| raise exc.with_traceback(traceback) | |
| else: | |
| # It worked | |
| return result["ret"] | |
| def _thread_main(self): | |
| # Clear the exception info (if it failed in a previous run) | |
| self._exc_info = None | |
| try: | |
| # Create a new event loop and register it | |
| self._log.debug("Creating event loop") | |
| self._loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(self._loop) | |
| # Set up a function to run the moment the loop starts iterating | |
| self._log.debug("Scheduling init function") | |
| self._loop.call_soon(self._on_loop_ready) | |
| # Run the loop until we are stopped | |
| self._log.debug("Entering loop") | |
| self._loop.run_forever() | |
| except: | |
| # Capture the exception information for later | |
| self._log.debug("Exception in event loop", exc_info=1) | |
| self._exc_info = sys.exc_info() | |
| # Loop has stopped, so clean up and notify | |
| self._loop = None | |
| self._evt.set() | |
| self._log.debug("Background loop has stopped") | |
| def _on_loop_ready(self): | |
| # Signal to the main thread that we're ready | |
| self._evt.set() | |
| self._log.debug("Background loop is ready") | |
| def do_something(a, b): | |
| # an example of a function that makes a blocking call | |
| time.sleep(5) | |
| return a + b | |
| mainlog = logging.getLogger("main") | |
| def main_loop_task(n=0): | |
| mainlog.info("Main loop task run %d", n) | |
| if n < 30: | |
| # Call ourselves in a second | |
| asyncio.get_event_loop().call_later(1, main_loop_task, n + 1) | |
| async def main(): | |
| main_loop_task() | |
| mainlog.info("Starting background thread") | |
| bgloop = BackgroundAsyncThread() | |
| await bgloop.start() | |
| mainlog.info("Starting tasks") | |
| try: | |
| # Try a happy path | |
| result = await bgloop.run(functools.partial(do_something, 12, 34)) | |
| mainlog.info("First call returns: %r", result) | |
| assert result == 46 | |
| try: | |
| # Try doing something invalid | |
| await bgloop.run(functools.partial(do_something, 123, "a string")) | |
| assert False, "We should not get here" | |
| except TypeError: | |
| mainlog.info("Second call throws error", exc_info=1) | |
| pass | |
| finally: | |
| await bgloop.stop() | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format="%(asctime)s %(threadName)s " | |
| "%(levelname)s %(name)s: %(message)s", | |
| ) | |
| # Run our main function | |
| asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| $ python3 asyncioexample.py | |
| 2025-10-26 10:37:36,019 MainThread DEBUG asyncio: Using selector: EpollSelector | |
| 2025-10-26 10:37:36,019 MainThread INFO main: Main loop task run 0 | |
| 2025-10-26 10:37:36,020 MainThread INFO main: Starting background thread | |
| 2025-10-26 10:37:36,020 MainThread DEBUG BackgroundAsyncThread: Starting background thread | |
| 2025-10-26 10:37:36,020 BGThread DEBUG BackgroundAsyncThread: Creating event loop | |
| 2025-10-26 10:37:36,021 BGThread DEBUG asyncio: Using selector: EpollSelector | |
| 2025-10-26 10:37:36,021 MainThread DEBUG BackgroundAsyncThread: Waiting for start-up | |
| 2025-10-26 10:37:36,021 BGThread DEBUG BackgroundAsyncThread: Scheduling init function | |
| 2025-10-26 10:37:36,021 BGThread DEBUG BackgroundAsyncThread: Entering loop | |
| 2025-10-26 10:37:36,022 BGThread DEBUG BackgroundAsyncThread: Background loop is ready | |
| 2025-10-26 10:37:36,022 MainThread DEBUG BackgroundAsyncThread: Clearing event flag | |
| 2025-10-26 10:37:36,022 MainThread DEBUG BackgroundAsyncThread: Thread is now started | |
| 2025-10-26 10:37:36,022 MainThread INFO main: Starting tasks | |
| 2025-10-26 10:37:37,022 MainThread INFO main: Main loop task run 1 | |
| 2025-10-26 10:37:38,023 MainThread INFO main: Main loop task run 2 | |
| 2025-10-26 10:37:39,025 MainThread INFO main: Main loop task run 3 | |
| 2025-10-26 10:37:40,027 MainThread INFO main: Main loop task run 4 | |
| 2025-10-26 10:37:41,028 MainThread INFO main: First call returns: 46 | |
| 2025-10-26 10:37:41,029 MainThread INFO main: Main loop task run 5 | |
| 2025-10-26 10:37:42,030 MainThread INFO main: Main loop task run 6 | |
| 2025-10-26 10:37:43,032 MainThread INFO main: Main loop task run 7 | |
| 2025-10-26 10:37:44,033 MainThread INFO main: Main loop task run 8 | |
| 2025-10-26 10:37:45,034 MainThread INFO main: Main loop task run 9 | |
| 2025-10-26 10:37:46,036 MainThread INFO main: Second call throws error | |
| Traceback (most recent call last): | |
| File "/tmp/asyncioexample.py", line 174, in main | |
| await bgloop.run(functools.partial(do_something, 123, "a string")) | |
| File "/tmp/asyncioexample.py", line 105, in run | |
| raise exc.with_traceback(traceback) | |
| File "/tmp/asyncioexample.py", line 89, in _run_fn | |
| result["ret"] = fn() | |
| ~~^^ | |
| File "/tmp/asyncioexample.py", line 146, in do_something | |
| return a + b | |
| ~~^~~ | |
| TypeError: unsupported operand type(s) for +: 'int' and 'str' | |
| 2025-10-26 10:37:46,038 MainThread DEBUG BackgroundAsyncThread: Requesting shutdown | |
| 2025-10-26 10:37:46,038 MainThread DEBUG BackgroundAsyncThread: Waiting for shutdown | |
| 2025-10-26 10:37:46,039 BGThread DEBUG BackgroundAsyncThread: Background loop has stopped | |
| 2025-10-26 10:37:46,039 MainThread DEBUG BackgroundAsyncThread: Waiting for thread to stop | |
| 2025-10-26 10:37:46,039 MainThread DEBUG BackgroundAsyncThread: Thread is now stopped | |
| 2025-10-26 10:37:46,039 MainThread INFO main: Main loop task run 10 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment