Skip to content

Instantly share code, notes, and snippets.

@sjlongland
Last active October 26, 2025 00:39
Show Gist options
  • Select an option

  • Save sjlongland/e4419cc83922286a62a00366697a2bce to your computer and use it in GitHub Desktop.

Select an option

Save sjlongland/e4419cc83922286a62a00366697a2bce to your computer and use it in GitHub Desktop.
Running a background AsyncIO thread without blocking main thread
#!/usr/bin/env python3
import threading
import functools
import logging
import asyncio
import time
import sys
class BackgroundAsyncThread(object):
def __init__(self, name="BGThread"):
self._name = name
self._thread = None
self._loop = None
self._evt = threading.Event()
self._exc_info = None
self._log = logging.getLogger(self.__class__.__name__)
async def start(self):
# Ensure an existing loop is not present and not being
# constructed already.
assert self._loop is None, "Loop already present"
assert self._thread is None, "Thread already present"
assert not self._evt.is_set(), "Thread start/stop in progress"
# Create and start a new thread
self._log.debug("Starting background thread")
self._thread = threading.Thread(
name=self._name, target=self._thread_main
)
self._thread.start()
# Wait for it to notify us on our readiness
self._log.debug("Waiting for start-up")
self._evt.wait()
# Clear the event flag for later use.
self._log.debug("Clearing event flag")
self._evt.clear()
# If there was an exception in the thread loop, report it
if self._exc_info is not None:
self._log.debug("Background thread failed to start")
_, exc, traceback = self._exc_info
raise exc.with_traceback(traceback)
self._log.debug("Thread is now started")
async def stop(self):
# Ensure a loop is present for us to stop
assert self._loop is not None, "Loop not present"
assert self._thread is not None, "Thread not present"
assert not self._evt.is_set(), "Thread start/stop in progress"
# Schedule a callback that calls loop.stop()
self._log.debug("Requesting shutdown")
self._loop.call_soon_threadsafe(self._loop.stop)
# Wait for the event loop thread to tell us it's done
self._log.debug("Waiting for shutdown")
self._evt.wait()
# Clean up the thread
self._log.debug("Waiting for thread to stop")
self._thread.join()
self._evt.clear()
self._thread = None
self._log.debug("Thread is now stopped")
# If there was an exception in the thread loop, report it
if self._exc_info is not None:
self._log.debug("Background thread encountered an exception")
_, exc, traceback = self._exc_info
raise exc.with_traceback(traceback)
async def run(self, fn):
assert self._loop is not None, "No loop running"
assert not self._evt.is_set(), "Thread start/stop in progress"
# Set up a container for results
result = {}
done = asyncio.Event()
# Create a function wrapper
def _run_fn():
try:
result["ret"] = fn()
except:
result["exc"] = sys.exc_info()
finally:
done.set()
# Schedule the wrapper to run in the event loop
self._loop.call_soon_threadsafe(_run_fn)
# Wait for it
await done.wait()
# Did it work?
if "exc" in result:
# It failed
(_, exc, traceback) = result["exc"]
raise exc.with_traceback(traceback)
else:
# It worked
return result["ret"]
def _thread_main(self):
# Clear the exception info (if it failed in a previous run)
self._exc_info = None
try:
# Create a new event loop and register it
self._log.debug("Creating event loop")
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
# Set up a function to run the moment the loop starts iterating
self._log.debug("Scheduling init function")
self._loop.call_soon(self._on_loop_ready)
# Run the loop until we are stopped
self._log.debug("Entering loop")
self._loop.run_forever()
except:
# Capture the exception information for later
self._log.debug("Exception in event loop", exc_info=1)
self._exc_info = sys.exc_info()
# Loop has stopped, so clean up and notify
self._loop = None
self._evt.set()
self._log.debug("Background loop has stopped")
def _on_loop_ready(self):
# Signal to the main thread that we're ready
self._evt.set()
self._log.debug("Background loop is ready")
def do_something(a, b):
# an example of a function that makes a blocking call
time.sleep(5)
return a + b
mainlog = logging.getLogger("main")
def main_loop_task(n=0):
mainlog.info("Main loop task run %d", n)
if n < 30:
# Call ourselves in a second
asyncio.get_event_loop().call_later(1, main_loop_task, n + 1)
async def main():
main_loop_task()
mainlog.info("Starting background thread")
bgloop = BackgroundAsyncThread()
await bgloop.start()
mainlog.info("Starting tasks")
try:
# Try a happy path
result = await bgloop.run(functools.partial(do_something, 12, 34))
mainlog.info("First call returns: %r", result)
assert result == 46
try:
# Try doing something invalid
await bgloop.run(functools.partial(do_something, 123, "a string"))
assert False, "We should not get here"
except TypeError:
mainlog.info("Second call throws error", exc_info=1)
pass
finally:
await bgloop.stop()
# Set up logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(threadName)s "
"%(levelname)s %(name)s: %(message)s",
)
# Run our main function
asyncio.run(main())
$ python3 asyncioexample.py
2025-10-26 10:37:36,019 MainThread DEBUG asyncio: Using selector: EpollSelector
2025-10-26 10:37:36,019 MainThread INFO main: Main loop task run 0
2025-10-26 10:37:36,020 MainThread INFO main: Starting background thread
2025-10-26 10:37:36,020 MainThread DEBUG BackgroundAsyncThread: Starting background thread
2025-10-26 10:37:36,020 BGThread DEBUG BackgroundAsyncThread: Creating event loop
2025-10-26 10:37:36,021 BGThread DEBUG asyncio: Using selector: EpollSelector
2025-10-26 10:37:36,021 MainThread DEBUG BackgroundAsyncThread: Waiting for start-up
2025-10-26 10:37:36,021 BGThread DEBUG BackgroundAsyncThread: Scheduling init function
2025-10-26 10:37:36,021 BGThread DEBUG BackgroundAsyncThread: Entering loop
2025-10-26 10:37:36,022 BGThread DEBUG BackgroundAsyncThread: Background loop is ready
2025-10-26 10:37:36,022 MainThread DEBUG BackgroundAsyncThread: Clearing event flag
2025-10-26 10:37:36,022 MainThread DEBUG BackgroundAsyncThread: Thread is now started
2025-10-26 10:37:36,022 MainThread INFO main: Starting tasks
2025-10-26 10:37:37,022 MainThread INFO main: Main loop task run 1
2025-10-26 10:37:38,023 MainThread INFO main: Main loop task run 2
2025-10-26 10:37:39,025 MainThread INFO main: Main loop task run 3
2025-10-26 10:37:40,027 MainThread INFO main: Main loop task run 4
2025-10-26 10:37:41,028 MainThread INFO main: First call returns: 46
2025-10-26 10:37:41,029 MainThread INFO main: Main loop task run 5
2025-10-26 10:37:42,030 MainThread INFO main: Main loop task run 6
2025-10-26 10:37:43,032 MainThread INFO main: Main loop task run 7
2025-10-26 10:37:44,033 MainThread INFO main: Main loop task run 8
2025-10-26 10:37:45,034 MainThread INFO main: Main loop task run 9
2025-10-26 10:37:46,036 MainThread INFO main: Second call throws error
Traceback (most recent call last):
File "/tmp/asyncioexample.py", line 174, in main
await bgloop.run(functools.partial(do_something, 123, "a string"))
File "/tmp/asyncioexample.py", line 105, in run
raise exc.with_traceback(traceback)
File "/tmp/asyncioexample.py", line 89, in _run_fn
result["ret"] = fn()
~~^^
File "/tmp/asyncioexample.py", line 146, in do_something
return a + b
~~^~~
TypeError: unsupported operand type(s) for +: 'int' and 'str'
2025-10-26 10:37:46,038 MainThread DEBUG BackgroundAsyncThread: Requesting shutdown
2025-10-26 10:37:46,038 MainThread DEBUG BackgroundAsyncThread: Waiting for shutdown
2025-10-26 10:37:46,039 BGThread DEBUG BackgroundAsyncThread: Background loop has stopped
2025-10-26 10:37:46,039 MainThread DEBUG BackgroundAsyncThread: Waiting for thread to stop
2025-10-26 10:37:46,039 MainThread DEBUG BackgroundAsyncThread: Thread is now stopped
2025-10-26 10:37:46,039 MainThread INFO main: Main loop task run 10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment