You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Why You Should Use AnyIO (And Why You Might Already Have It)
Abstract
Async Python is fragmented—but you probably already have the solution installed. AnyIO is a portability layer for asyncio and Trio that fixes critical cancellation bugs and provides structured concurrency. If you use httpx, FastAPI, or Jupyter, AnyIO is already in your environment powering your HTTP clients, web frameworks, and notebooks.
This talk reveals AnyIO's level-triggered cancellation (fixing asyncio's dangerous edge-triggered behavior that causes silent hangs), demonstrates structured concurrency patterns with task groups, and shows practical tools like memory object streams for producer-consumer workflows. You'll learn why major libraries chose AnyIO—and how to use it directly for more reliable concurrent code.
Target audience: Intermediate Python developers working with async code who want portable, maintainable concurrent programs.
Description
Talk Structure (40 minutes)
1. The Problem with asyncio.create_task() (9 min)
Go statements break everything: one-way jumps, no guaranteed cleanup
Four core problems: functions aren't black boxes, resource cleanup breaks, error handling breaks, can't tell if code is finished
The root cause: unstructured concurrency
Real-world consequences in data pipelines
The solution: structured concurrency with task groups
2. Level vs Edge Cancellation (9 min)
Edge-triggered cancellation in asyncio: cancellation is a one-shot signal that can be swallowed
Live bug demo: asyncio code that hangs vs AnyIO code that fails fast
Why this causes production failures: timeouts that don't timeout, cleanup that never happens
How AnyIO's level-triggered cancellation ensures cancelled tasks stay cancelled
3. Building Real Applications (10 min)
Memory object streams: producer-consumer with backpressure by default (vs asyncio.Queue)
Getting results from task groups: nonlocal pattern vs memory streams
Buffered byte streams: why AnyIO adds value even on Trio
Why PyPI distribution matters: bugfixes on all Python versions
Why data scientists should care
5. The Hidden Dependency Reveal (5 min)
Live demo: pipdeptree -p anyio -r reveals dozens of packages depend on AnyIO
Why httpx, FastAPI, Jupyter, and Anthropic's MCP SDK chose AnyIO
You already have it installed—now you know how to use it directly
6. Q&A (1 min)
Prior Knowledge Expected
Basic understanding of async/await syntax
Familiarity with asyncio (having used asyncio.create_task() or asyncio.gather())
Experience with data processing or web development helpful but not required
Key Takeaways
Understanding why edge-triggered cancellation causes bugs and how level-triggered cancellation fixes them
Practical patterns for getting results from concurrent tasks (nonlocal vs memory streams)
Migration paths from common asyncio patterns to AnyIO
Awareness that AnyIO is already powering much of the Python async ecosystem
Why This Matters to PyData
Async programming appears in data workflows everywhere: parallel data fetching, ETL pipelines, ML model serving, streaming data processing. AnyIO's cancellation semantics and memory streams provide battle-tested patterns for these workflows. The talk bridges "I write async/await sometimes" and "I understand why my async code has weird timeout bugs."
Code examples will be available in a public repository before the conference.
Speaker Notes
I am diagnosed with schizoaffective disorder and autism spectrum disorder. I would appreciate a mid-day time slot (when my antipsychotics are least sedating) and would welcome any mentorship support available for speakers. This will be my second technical talk ever.
I would also appreciate a laser pointer to highlight blocks of example code during the presentation.
❌ Return statements lie — "done" doesn't mean done
The Fix: Structured Concurrency with Task Groups
# asyncio — UNSTRUCTURED 😰asyncdefunstructured():
asyncio.create_task(myfunc()) # fire and forgetasyncio.create_task(other()) # errors go nowherereturn# are we done? who knows!# AnyIO — STRUCTURED 😌asyncdefstructured():
asyncwithanyio.create_task_group() astg:
tg.start_soon(myfunc)
tg.start_soon(other)
# BLOCKED until ALL tasks finish# Errors propagate. Cleanup happens. Actually done.
It's a "go statement" - and go statements break everything
What's a "go statement"?
# asyncioasyncio.create_task(myfunc()) # Fire and forget!# Control returns immediately, myfunc() runs in background# Golanggomyfunc() //Samething# Python threads threading.Thread(target=myfunc).start() # Also same
Key problem: Control flow splits with one-way jump
Parent returns immediately
Child jumps to myfunc and runs unsupervised
No guaranteed reunion point
Problem 1: Functions Aren't Black Boxes Anymore
asyncdefprocess_data(data):
# Does this function spawn background tasks?# Are they still running after it returns?# You have NO IDEA without reading all the source code!awaitsome_library_function(data)
# Function returned... but is it done? 🤷
You can't reason locally about control flow
Every function call might secretly spawn tasks that outlive the function
Problem 2: Resource Cleanup Breaks
# This LOOKS safe... (pseudocode)asyncwithaopen("data.csv") asf:
awaitprocess_file(f)
# File closed here... right?# But what if process_file did this:asyncdefprocess_file(f):
asyncio.create_task(read_data(f)) # Background task!return# Function returns immediately# Now: file is CLOSED while background task still uses it# 💥 Error! (if you're lucky)
The language can't help you with automatic cleanup
Problem 3: Error Handling Breaks
asyncdefbackground_task():
raiseValueError("Something went wrong!")
# Start background tasktask=asyncio.create_task(background_task())
# Error happens... but where does it go?# Answer: NOWHERE! It's silently dropped!# (Maybe printed to console if you're lucky)
Exceptions can't propagate because there's no stack to unwind
Compare to regular Python:
defmy_function():
raiseValueError("Something went wrong!")
my_function() # Exception propagates to caller automatically
Problem 4: You Can't Tell If Code Is Finished
asyncdefmystery_function():
awaitdo_something()
return"done"result=awaitmystery_function()
# Is mystery_function actually done?# Or did it spawn tasks that are still running?# NO WAY TO KNOW!
The "return" statement lies to you
The Root Cause: Unstructured Concurrency
One-way jump = no guaranteed cleanup, no error propagation, no
completion tracking
Real-World Consequences
In asyncio programs:
❌ Resource leaks - Files/sockets stay open because cleanup is
manual
❌ Silent failures - Errors in background tasks get dropped
❌ Shutdown hangs - Can't wait for "done" because tasks are
invisible
❌ Operations on closed files - Tasks outlive the data they operate on
In data pipelines specifically:
❌ Timeouts don't work - Can't cancel tasks you've lost track of
❌ Can't reason about code - Every function is a potential landmine
The Solution: Structured Concurrency with Task Groups
# asyncio - UNSTRUCTURED (bad)asyncdefunstructured():
asyncio.create_task(myfunc()) # Fire and forgetasyncio.create_task(other()) # Where do errors go?return# Are we done? Who knows!# AnyIO - STRUCTURED (good)asyncdefstructured():
asyncwithanyio.create_task_group() astg:
tg.start_soon(myfunc)
tg.start_soon(other)
# BLOCKED HERE until all tasks finish# All errors propagated automatically# All cleanup happens automaticallyreturn# NOW we're actually done
Task groups enforce: tasks must complete before you can continue
Dijkstra Was Right (Again)
In 1968, Dijkstra showed that goto statements break abstraction
In 2018, we learned that go statements do the same thing
goto (1960s)
asyncio.create_task() (2010s)
One-way jump
One-way jump
Breaks function boundaries
Breaks function boundaries
No automatic cleanup
No automatic cleanup
No error propagation
No error propagation
Makes code impossible to reason about
Makes code impossible to reason about
Solution then: Remove goto, add structured control flow
(if/while/functions)
you can mix it with asyncio and optionally/incrementally add Trio
support
cancellations are level-triggered.
with level cancellation every async operation in a CancelScope will fail
with a CancelledError
importanyioasyncdefexample():
withanyio.fail_after(0):
try:
awaitanyio.sleep(1) # raises CancelledErrorfinally:
awaitanyio.sleep(1000) # also raises CancelledError# raises TimeoutError as you leave the scopeanyio.run(example)
task_with_finally running
crash_soon raising
task_with_finally in finally
awaiting never-completing future (WILL HANG)
^C^Cunhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-1' coro=<main() done, defined at /home/graingert/projects/django/demo.py:4> exception=ExceptionGroup('unhandled errors in a TaskGroup', [RuntimeError('boom')])>
+ Exception Group Traceback (most recent call last):
| File "/home/graingert/projects/django/demo.py", line 22, in main
| async with asyncio.TaskGroup() as tg:
| File "/usr/lib/python3.12/asyncio/taskgroups.py", line 145, in __aexit__
| raise me from None
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/home/graingert/projects/django/demo.py", line 20, in crash_soon
| raise RuntimeError("boom")
| RuntimeError: boom
+------------------------------------
Traceback (most recent call last):
File "/home/graingert/projects/django/demo.py", line 27, in<module>asyncio.run(main())
File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/asyncio/base_events.py", line 674, in run_until_complete
self.run_forever()
File "/usr/lib/python3.12/asyncio/base_events.py", line 641, in run_forever
self._run_once()
File "/usr/lib/python3.12/asyncio/base_events.py", line 1949, in _run_once
event_list = self._selector.select(timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/selectors.py", line 468, inselect
fd_event_list = self._selector.poll(timeout, max_ev)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/asyncio/runners.py", line 157, in _on_sigint
raise KeyboardInterrupt()
KeyboardInterrupt
$ python demo_anyio.py
task_with_finally running
crash_soon raising
task_with_finally in finally
awaiting never-completing future (WILL NOT HANG)
+ Exception Group Traceback (most recent call last):
| File "/home/graingert/projects/django/demo.py", line 27, in<module>|asyncio.run(main())
| File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
|return runner.run(main)
| ^^^^^^^^^^^^^^^^
| File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
|return self._loop.run_until_complete(task)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
|returnfuture.result()
| ^^^^^^^^^^^^^^^
| File "/home/graingert/projects/django/demo.py", line 22, in main
| async with anyio.create_task_group() as tg:
| File "/home/graingert/.virtualenvs/anyio_pipdeptree/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 783, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/home/graingert/projects/django/demo.py", line 20, in crash_soon
| raise RuntimeError("boom")
| RuntimeError: boom
+------------------------------------
This is still a problem when using WebSockets over TLS
asyncdefconsume_ws():
asyncwithawaitconnect_ws("wss://example.com/news") asws:
asyncformessageinws:
awaitprocess(message) # cancellation happens here# cancellation doesn't happen as we `__aexit__()` the context managerasyncdefexample():
asyncwithasyncio.timeout(10):
awaitconsume_ws() # could hang forever
Shielding from Cancellation
When you can't cancel — even if you want to
Some I/O operations are uncancellable by nature, or you must wait for the
cancel to be processed by the OS:
Waiting for a thread to finish (loop.run_in_executor, anyio.to_thread.run_sync)
On Windows IOCP (Proactor)
To cancel pending I/O operations in an IOCP (I/O Completion Port) server, use CancelIoEx to target specific operations, or closesocket(handle)
to cancel all pending I/O on a socket. Canceled operations complete with ERROR_OPERATION_ABORTED, and you must wait for the completion packet before freeing memory.
The async framework can raise CancelledError in your coroutine, but the underlying thread keeps running or something still needs to wait to be able to clear memory
You're not cancelling the work — you're just abandoning the future that was watching it. 👻
# IOCPbuffer=allocate_buffer(1024)
try:
awaitwrite_buffer_to_socket(buffer, socket)
finally:
# if write_buffer_to_socket is cancelled and we don't wait for the cancel# signal the OS could still be using the buffer and we send undefined# bytes to the socket.clear(buffer)
asyncio.shield — The Duct-Tape Approach
asyncdefsave_to_db(data):
# We don't want cancellation to interrupt thisawaitasyncio.shield(db.execute(INSERT, data))
# ⚠️ If cancelled, shield absorbs the cancel...# ... but db.execute() keeps running as an orphaned task# ⚠️ Edge cancellation means the *next* await might# succeed even though we're "cancelled"
Problems
❌ Edge-triggered: a CancelledError sneaks through on the next checkpoint after the shield exits
❌ Orphaned inner task: the shielded work keeps running with no owner
❌ No scope: shield applies to one await, not a logical block of work
❌ Thread can't be shielded: run_in_executor inside a shield still abandons the thread
AnyIO Shielded Cancel Scopes — The Structured Approach
asyncdefsave_to_db(data):
# Level-triggered: cancellation is *held* until we exit the shieldwithanyio.CancelScope(shield=True):
awaitanyio.to_thread.run_sync(db_blocking_write, data)
# Thread runs to completion — no orphan, no abandonmentawaitanyio.to_thread.run_sync(db_blocking_flush, data)
# Still shielded — the *whole scope* is protected# Pending cancellation is re-raised here, reliably# Works correctly even when called inside a task group under timeout:asyncdefexample():
withanyio.fail_after(5):
asyncwithanyio.create_task_group() astg:
tg.start_soon(save_to_db, important_data)
Why it works
✅ Level-triggered: cancellation is deferred, not lost — re-fires when you leave the scope
✅ Thread-aware: run_sync joins the thread; the shield keeps the join alive
✅ Scoped: protect a whole logical block, not just one await
✅ No orphans: structured concurrency means every task has an owner
Shielding in Detail
asyncio.shield
CancelScope(shield=True)
Comparison
asyncio.shield
anyio.CancelScope(shield=True)
Cancellation model
Edge (one-shot)
Level (persistent, deferred)
Scope
Single await
Entire with block
Thread safety
❌ Abandons thread
✅ Joins thread to completion
Cancellation after exit
⚠️ Maybe (edge, unreliable)
✅ Always re-raised
Orphaned tasks
❌ Yes
✅ Never
Composable
❌ Not really
✅ Nests with task groups
More AnyIO Features
Backpressure by Default. Structured. Composable.
AnyIO provides MemoryObjectSendStream and MemoryObjectReceiveStream
like asyncio.Queue but you don't need to keep a count of how many producer/consumers you have
you just make a clone for each producer/consumer
use a with or async with to close the clones.
Once all the clones of one end of the memory object stream are closed
iterating the other end will raise StopAsyncIteration.
✅ Buffer size defaults to 0 → automatic backpressure
✅ async for works naturally
✅ aclose() signals end-of-stream
✅ clone() enables multiple consumers safely
✅ Structured shutdown
asyncio.Queue Comparison
importasyncioasyncdefmain():
q=asyncio.Queue() # unbounded by default (!)asyncdefproducer():
foriinrange(3):
print("put", i)
awaitq.put(i)
# How do we signal completion?asyncdefconsumer():
whileTrue:
item=awaitq.get()
print("got", item)
Problems
❌ Unbounded by default (no backpressure)
❌ No async iteration
❌ No built-in structured close (historically)
❌ No clone() so requires sentinel values or custom shutdown logic
asyncio.Queue.shutdown() (3.13+)
Python 3.13 introduces:
q.shutdown()
But:
Only on new Python
Not widely deployed yet
Still no cloning
Still no structured fan-out
Conceptual Comparison
Feature
AnyIO Stream
asyncio.Queue
Default backpressure
✅ (buffer=0)
❌ (unbounded)
Async iteration
✅
❌
Structured close
✅
⚠️ (3.13+)
Clone receivers
✅
❌
Trio-compatible
✅
❌
AnyIO streams compose with structured concurrency. asyncio.Queue
predates it.
"If I'm already using Trio, I don't need AnyIO."
Most people assume this. But AnyIO adds real value even on the Trio backend.
What AnyIO adds on top of Trio
✅ Backend portability (asyncio, Trio)
✅ A stable public API for libraries
✅ High-level stream utilities
✅ Memory object streams
✅ Buffered byte streams
✅ Stapled streams
✅ Thread/subprocess helpers
Trio vs AnyIO
Trio is a minimal framework — only gives you what is mandatory of a network framework
AnyIO is a portability + abstraction layer with batteries included.
If you write a library directly against Trio:
You lock out asyncio users.
If you write against AnyIO:
Trio users still get full Trio semantics.
asyncio users can incrementally adopt
level cancellation or structured concurrency.
Every iteration of while b"\n" in buffer does buffer = bytearray(rest),
copying the remaining data each time. If you receive a chunk with many
newlines, this is O(n²) in the number of bytes.
What AnyIO Adds Here
BufferedByteReceiveStream gives you:
receive_exactly(n)
receive_until(delimiter)
Proper EOF semantics
Efficient internal buffering
Works on both Trio and asyncio backends
This is a real ergonomic upgrade.
Trio gives you safety.
AnyIO gives you safety plus portability and batteries included.
Happy Eyeballs built in; async/await UDP (no Transports/Protocols)
TLS streams
TLSStream wraps any byte stream with TLS, not just sockets
Subprocesses
run_process() / open_process() with async stream I/O on stdin/stdout/stderr
Signal handling
open_signal_receiver() — async iterator over OS signals
But await there's more! — Streams & Concurrency
Feature
Benefit
TextReceiveStream
Incremental UTF-8 decoding over any byte stream
StapledStream
Combine separate send/receive streams into one bidirectional stream
tg.start()
await tg.start(server_fn) — blocks until the task signals it's ready
Synchronization primitives
Lock, Condition, Event, Semaphore, CapacityLimiter — portable across backends
But await there's more! — Threads, Testing & Beyond
Feature
Benefit
to_thread / from_thread
Bidirectional sync↔async bridging with structured cancellation
Subinterpreters
to_interpreters module for true parallelism (Python 3.13+)
Async functools
anyio.functools.lru_cache for async functions
The Advantage of Being on PyPI
In Python 3.13, a number of bug-fixes were applied to asyncio.TaskGroup
but they were considered breaking changes so were not backported to 3.11
or 3.12:
Improve the behavior of
TaskGroup
when an external cancellation collides with an internal cancellation.
For example, when two task groups are nested and both experience an
exception in a child task simultaneously, it was possible that the
outer task group would hang, because its internal cancellation was
swallowed by the inner task group.
you need to use the latest version of Python for fixed asyncio
Because AnyIO is hosted on PyPI you get bugfixes on all supported python versions
AnyIO is currently still supporting the EOL Python 3.9 (as of v4.12.1)
Asyncio is not bad
it's better than Twisted (I once spent a week fixing a missing six call
— a whole class of bug that doesn't exist with asyncio)
but try making an LDAP server without Twisted!
some of the mistakes Twisted made were copied into asyncio
Curio is good! Unfortunately it's archived
Trio isn't perfect: it's slower than asyncio, especially with uvloop
AnyIO gives you options and batteries to play with
Normally at this stage of my talk I'd ask you to go run
pip install anyio
but if you're in this room you probably already have it in your virtual
environments!
Coroutines are generator based in Python, which means not just asyncio can use them because async/await is totally decoupled from asyncio, this means frameworks like Twisted, Curio, Trio and more can support async function interfaces while being completely unrelated to asyncio. You can even use async/await to make your own generators:
It's a "go statement" - and go statements break everything
What's a "go statement"?
# asyncio
asyncio.create_task(myfunc()) # Fire and forget!
# Control returns immediately, myfunc() runs in background
Golang
go myfunc() // Same thing
Python threads
threading.Thread(target=myfunc).start() # Also same
Key problem: Control flow splits with one-way jump
Parent returns immediately
Child jumps to myfunc and runs unsupervised
No guaranteed reunion point
Problem 1: Functions Aren't Black Boxes Anymore
async def process_data(data):
# Does this function spawn background tasks?
# Are they still running after it returns?
# You have NO IDEA without reading all the source code!
await some_library_function(data)
# Function returned... but is it done? 🤷
You can't reason locally about control flow
Every function call might secretly spawn tasks that outlive the function
Problem 2: Resource Cleanup Breaks
# This LOOKS safe...
async with open("data.csv") as f:
await process_file(f)
# File closed here... right?
One-way jump = no guaranteed cleanup, no error propagation, no completion tracking
Real-World Consequences
In asyncio programs:
❌ Backpressure problems - Can't tell how many tasks are running
❌ Resource leaks - Files/sockets stay open because cleanup is manual
❌ Silent failures - Errors in background tasks get dropped
❌ Shutdown hangs - Can't wait for "done" because tasks are invisible
❌ Race conditions - Tasks outlive the data they operate on
In data pipelines specifically:
❌ Timeouts don't work - Can't cancel tasks you've lost track of
❌ Parallel processing breaks - No way to collect results safely
❌ Can't reason about code - Every function is a potential landmine
The Solution: Structured Concurrency with Task Groups
# asyncio - UNSTRUCTURED (bad)
async def unstructured():
asyncio.create_task(myfunc()) # Fire and forget
asyncio.create_task(other()) # Where do errors go?
return # Are we done? Who knows!
AnyIO - STRUCTURED (good)
async def structured():
async with anyio.create_task_group() as tg:
tg.start_soon(myfunc)
tg.start_soon(other)
# BLOCKED HERE until all tasks finish
# All errors propagated automatically
# All cleanup happens automatically
return # NOW we're actually done
Task groups enforce: tasks must complete before you can continue
Dijkstra Was Right (Again)
In 1968, Dijkstra showed that goto statements break abstraction
In 2018, we learned that go statements do the same thing
It's a "go statement" - and go statements break everything
What's a "go statement"?
# asyncioasyncio.create_task(myfunc()) # Fire and forget!# Control returns immediately, myfunc() runs in background# Golanggomyfunc() //Samething# Python threads threading.Thread(target=myfunc).start() # Also same
Key problem: Control flow splits with one-way jump
Parent returns immediately
Child jumps to myfunc and runs unsupervised
No guaranteed reunion point
Problem 1: Functions Aren't Black Boxes Anymore
asyncdefprocess_data(data):
# Does this function spawn background tasks?# Are they still running after it returns?# You have NO IDEA without reading all the source code!awaitsome_library_function(data)
# Function returned... but is it done? 🤷
You can't reason locally about control flow
Every function call might secretly spawn tasks that outlive the function
Problem 2: Resource Cleanup Breaks
# This LOOKS safe...asyncwithopen("data.csv") asf:
awaitprocess_file(f)
# File closed here... right?# But what if process_file did this:asyncdefprocess_file(f):
asyncio.create_task(read_data(f)) # Background task!return# Function returns immediately# Now: file is CLOSED while background task still uses it# 💥 Error! (if you're lucky)
The language can't help you with automatic cleanup
Problem 3: Error Handling Breaks
asyncdefbackground_task():
raiseValueError("Something went wrong!")
# Start background tasktask=asyncio.create_task(background_task())
# Error happens... but where does it go?# Answer: NOWHERE! It's silently dropped!# (Maybe printed to console if you're lucky)
Exceptions can't propagate because there's no stack to unwind
Compare to regular Python:
defmy_function():
raiseValueError("Something went wrong!")
my_function() # Exception propagates to caller automatically
Problem 4: You Can't Tell If Code Is Finished
asyncdefmystery_function():
awaitdo_something()
return"done"result=awaitmystery_function()
# Is mystery_function actually done?# Or did it spawn tasks that are still running?# NO WAY TO KNOW!
One-way jump = no guaranteed cleanup, no error propagation, no completion tracking
Real-World Consequences
In asyncio programs:
❌ Backpressure problems - Can't tell how many tasks are running
❌ Resource leaks - Files/sockets stay open because cleanup is manual
❌ Silent failures - Errors in background tasks get dropped
❌ Shutdown hangs - Can't wait for "done" because tasks are invisible
❌ Race conditions - Tasks outlive the data they operate on
In data pipelines specifically:
❌ Timeouts don't work - Can't cancel tasks you've lost track of
❌ Parallel processing breaks - No way to collect results safely
❌ Can't reason about code - Every function is a potential landmine
The Solution: Structured Concurrency with Task Groups
# asyncio - UNSTRUCTURED (bad)asyncdefunstructured():
asyncio.create_task(myfunc()) # Fire and forgetasyncio.create_task(other()) # Where do errors go?return# Are we done? Who knows!# AnyIO - STRUCTURED (good)asyncdefstructured():
asyncwithanyio.create_task_group() astg:
tg.start_soon(myfunc)
tg.start_soon(other)
# BLOCKED HERE until all tasks finish# All errors propagated automatically# All cleanup happens automaticallyreturn# NOW we're actually done
Task groups enforce: tasks must complete before you can continue
Dijkstra Was Right (Again)
In 1968, Dijkstra showed that goto statements break abstraction
In 2018, we learned that go statements do the same thing
In my personal opinion the two most important reasons to use AnyIO is that you can mix it with asyncio and optionally/incrementally add Trio support, and cancellations are level triggered.
with level cancellation every async operation in a CancelScope will fail with a CancelledError
importanyioasyncdefexample():
withanyio.fail_after(0) asscope:
try:
awaitanyio.sleep(1) # raises CancelledErrorfinally:
awaitanyio.sleep(1000) # also raises CancelledError# raises TimeoutError as you leave the scopeanyio.run(example)
edge cancellation can result in deadlocks on asyncio for example the following program hangs
importasyncioasyncdefmain():
never=asyncio.Future()
asyncdeftask_with_finally():
try:
print("task_with_finally running")
awaitasyncio.sleep(10)
finally:
print("task_with_finally in finally")
print("awaiting never-completing future (WILL HANG)")
# This is a bit of an unfair example but it used to be that StreamWriter.wait_close()# hung in this case, but that's fixed on Python 3.13+, but it's not fixed for# async websockets. https://github.com/python/cpython/issues/104344awaitneverprint("never reached")
asyncdefcrash_soon():
awaitasyncio.sleep(1)
print("crash_soon raising")
raiseRuntimeError("boom")
asyncwithasyncio.TaskGroup() astg:
tg.create_task(task_with_finally())
tg.create_task(crash_soon())
asyncio.run(main())
output:
task_with_finally running
crash_soon raising
task_with_finally in finally
awaiting never-completing future (WILL HANG)
^C^Cunhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-1' coro=<main() done, defined at /home/graingert/projects/django/demo.py:4> exception=ExceptionGroup('unhandled errors in a TaskGroup', [RuntimeError('boom')])>
+ Exception Group Traceback (most recent call last):
| File "/home/graingert/projects/django/demo.py", line 22, in main
| async with asyncio.TaskGroup() as tg:
| File "/usr/lib/python3.12/asyncio/taskgroups.py", line 145, in __aexit__
| raise me from None
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/home/graingert/projects/django/demo.py", line 20, in crash_soon
| raise RuntimeError("boom")
| RuntimeError: boom
+------------------------------------
Traceback (most recent call last):
File "/home/graingert/projects/django/demo.py", line 27, in <module>
asyncio.run(main())
File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/asyncio/base_events.py", line 674, in run_until_complete
self.run_forever()
File "/usr/lib/python3.12/asyncio/base_events.py", line 641, in run_forever
self._run_once()
File "/usr/lib/python3.12/asyncio/base_events.py", line 1949, in _run_once
event_list = self._selector.select(timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/selectors.py", line 468, in select
fd_event_list = self._selector.poll(timeout, max_ev)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/asyncio/runners.py", line 157, in _on_sigint
raise KeyboardInterrupt()
KeyboardInterrupt
task_with_finally running
crash_soon raising
task_with_finally in finally
awaiting never-completing future (WILL NOT HANG)
+ Exception Group Traceback (most recent call last):
| File "/home/graingert/projects/django/demo.py", line 27, in <module>
| asyncio.run(main())
| File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
| return runner.run(main)
| ^^^^^^^^^^^^^^^^
| File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
| return self._loop.run_until_complete(task)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
| return future.result()
| ^^^^^^^^^^^^^^^
| File "/home/graingert/projects/django/demo.py", line 22, in main
| async with anyio.create_task_group() as tg:
| File "/home/graingert/.virtualenvs/anyio_pipdeptree/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 783, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/home/graingert/projects/django/demo.py", line 20, in crash_soon
| raise RuntimeError("boom")
| RuntimeError: boom
+------------------------------------
This is still a problem when using websockets over TLS
asyncdefconsume_ws():
asyncwithawaitconnect_ws("wss://example.com/news") asws:
asyncformessageinws:
awaitprocess(message) # cancellation happens here# cancellation doesn't happen as we `__aexit__()` the context managerasyncdefexample():
asyncwithasyncio.timeout(10):
awaitconsume_ws() # could hang forever
AnyIO provides MemoryObjectSendStream and MemoryObjectReceiveStream instead of Queues, but you don't need to keep a count of how many Queue producer/consumers you have you just make a clone for each producer/consumer and use a with or async with to close the clones. Once all the clones of one end of the memory object stream are closed iterating the other end will raise StopAsyncIteration.
✅ Buffer size defaults to 0 → automatic backpressure
✅ async for works naturally
✅ aclose() signals end-of-stream
✅ clone() enables multiple consumers safely
✅ Structured shutdown
Slide 2 — asyncio.Queue Comparison
importasyncioasyncdefmain():
q=asyncio.Queue() # unbounded by default (!)asyncdefproducer():
foriinrange(3):
print("put", i)
awaitq.put(i)
# How do we signal completion?asyncdefconsumer():
whileTrue:
item=awaitq.get()
print("got", item)
Problems
❌ Unbounded by default (no backpressure)
❌ No async iteration
❌ No built-in structured close (historically)
❌ No clone()
❌ Requires sentinel values or custom shutdown logic
Slide 4 — asyncio.Queue.shutdown() (3.13+)
Python 3.13 introduces:
q=asyncio.Queue()
q.shutdown()
But:
Only on new Python
Not widely deployed yet
Need to manually keep track of the number of consumers and producers
frompathlibimportPath# ⚠️ These all BLOCK the event loop!path=Path("data.txt")
path.write_text("Hello!") # Blockscontent=path.read_text() # Blocksexists=path.exists() # Blocks
AnyIO ships with a pytest plugin that it uses to test itself. This means if you already depend on AnyIO you don't need pytest-asyncio as well.
By default the plugin runs your tests under both asyncio and Trio, so if you're still gradually migrating to AnyIO still require asyncio support you can run your tests in asyncio mode only by adding the following to your root conftest.py
In python 3.13 a number of bug-fixes were applied to asyncio.TaskGroup but they were considered breaking changes so were not backported to 3.11 or 3.12:
Improve the behavior of TaskGroup when an external cancellation collides with an internal cancellation. For example, when two task groups are nested and both experience an exception in a child task simultaneously, it was possible that the outer task group would hang, because its internal cancellation was swallowed by the inner task group.
this means to use structured concurrency with asyncio reliably you need to be on the latest python version. Because AnyIO is hosted on PyPI you can get bugfixes on all supported python versions just by updating AnyIO without needing to upgrade your whole interpreter, in fact AnyIO is currently still supporting the EOL Python 3.9
Normally at this stage of my talk I'd ask you to go run
pip install anyio
but if you're in this room you probably already have it in your virtual environments!
Watch the tree unfold: dozens of packages you use daily depend on AnyIO
Waiting for a thread to finish (loop.run_in_executor, anyio.to_thread.run_sync)
Sending data that's already been handed to the OS kernel
Third-party blocking calls that ignore signals
The async framework can raise CancelledError in your coroutine, but the underlying thread keeps running.
You're not cancelling the work — you're just abandoning the future that was watching it. 👻
asyncio.shield — The Duct-Tape Approach
asyncdefsave_to_db(data):
# We don't want cancellation to interrupt thisawaitasyncio.shield(db.execute(INSERT, data))
# ⚠️ If cancelled, shield absorbs the cancel...# ... but db.execute() keeps running as an orphaned task# ⚠️ Edge cancellation means the *next* await might# succeed even though we're "cancelled"
Problems
❌ Edge-triggered: a CancelledError sneaks through on the next checkpoint after the shield exits
❌ Orphaned inner task: the shielded work keeps running with no owner
❌ No scope: shield applies to one await, not a logical block of work
❌ Thread can't be shielded: run_in_executor inside a shield still abandons the thread
AnyIO Shielded Cancel Scopes — The Structured Approach
asyncdefsave_to_db(data):
# Level-triggered: cancellation is *held* until we exit the shieldwithanyio.CancelScope(shield=True):
awaitanyio.to_thread.run_sync(db_blocking_write, data)
# Thread runs to completion — no orphan, no abandonmentawaitanyio.to_thread.run_sync(db_blocking_flush, data)
# Still shielded — the *whole scope* is protected# Pending cancellation is re-raised here, reliably# Works correctly even when called inside a task group under timeout:asyncdefexample():
withanyio.fail_after(5):
asyncwithanyio.create_task_group() astg:
tg.start_soon(save_to_db, important_data)
Why it works
✅ Level-triggered: cancellation is deferred, not lost — re-fires when you leave the scope
✅ Thread-aware: run_sync joins the thread; the shield keeps the join alive
✅ Scoped: protect a whole logical block, not just one await
✅ No orphans: structured concurrency means every task has an owner
Wrote down my thoughts and feedback as I went through the slides. I realise that you'll be talking over these to fill in some of the gaps I found, so some points may be moot.
Add a blank line between blocks of code to improve readability
Remove links since the audience won't go to them during your talk and is unlikely to remember the URLs.
Consider cutting the code that shows calling different backends. I think folks can understand the concept without it.
You call it "the problem" (singular) and state the problem is that it's a "go statement". Then, you go on to list 4 other problems. I found this confusing to read, but it may not be confusing when it's presented.
Problem 4 repeats what Problem 1 shows
Dijkstra Was Right section is redundant
With a task group, the function becomes responsible for the lifecycle of the tasks. This isn't always desirable. For example, a web server that has asynchronous operations (spawns a background task and sends a response without waiting). In such cases (which aren't particularly uncommon), presenting task groups as the solution is not as clear. It may be outside the scope of the talk, but I can see the audience asking "but what if I do want to return without waiting?"
Definition of structured concurrency wasn't clear to me. What is "structured" about it?
"They're structured" - this reads as an implication there are inherent benefits to task groups being structured, but it's not explicitly clear what those benefits is. List those benefits instead of letting the term "structured" do the heavy lifting.
"you can mix it with asyncio and optionally/incrementally add Trio support" - why would someone want to do that? We don't have to focus on asyncio or trio specifically. More generally, why is it important for an application to be decoupled from an async/await backend?
You mention a CancelScope without defining what it is (until several slides later). Is there a way to explain level cancellation without also explaining cancellation scopes?
Your anyio deadlock example code says "WILL HANG", but your output says "WILL NOT HANG"
WebSocket example may be redundant. I appreciate a more real-world example, but it's less clear here why it deadlocks compared to your previous example.
The explanations for level and edge cancellation feel scattered across slides.
Considering cutting buffered byte streams or queues
You didn't cover "why you already have AnyIO installed"
Parts of this seem AI-generated, and I don't like its writing style; it doesn't feel authentic.
Image to go along with talk: