Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save graingert/cd70c6233d03f5c84c9c8d84a25795d0 to your computer and use it in GitHub Desktop.

Select an option

Save graingert/cd70c6233d03f5c84c9c8d84a25795d0 to your computer and use it in GitHub Desktop.
Why you should use AnyIO and why you might already have it installed

Why You Should Use AnyIO (And Why You Might Already Have It)

Abstract

Async Python is fragmented—but you probably already have the solution installed. AnyIO is a portability layer for asyncio and Trio that fixes critical cancellation bugs and provides structured concurrency. If you use httpx, FastAPI, or Jupyter, AnyIO is already in your environment powering your HTTP clients, web frameworks, and notebooks.

This talk reveals AnyIO's level-triggered cancellation (fixing asyncio's dangerous edge-triggered behavior that causes silent hangs), demonstrates structured concurrency patterns with task groups, and shows practical tools like memory object streams for producer-consumer workflows. You'll learn why major libraries chose AnyIO—and how to use it directly for more reliable concurrent code.

Target audience: Intermediate Python developers working with async code who want portable, maintainable concurrent programs.

Description

Talk Structure (40 minutes)

1. The Problem with asyncio.create_task() (9 min)

  • Go statements break everything: one-way jumps, no guaranteed cleanup
  • Four core problems: functions aren't black boxes, resource cleanup breaks, error handling breaks, can't tell if code is finished
  • The root cause: unstructured concurrency
  • Real-world consequences in data pipelines
  • The solution: structured concurrency with task groups

2. Level vs Edge Cancellation (9 min)

  • Edge-triggered cancellation in asyncio: cancellation is a one-shot signal that can be swallowed
  • Live bug demo: asyncio code that hangs vs AnyIO code that fails fast
  • Why this causes production failures: timeouts that don't timeout, cleanup that never happens
  • How AnyIO's level-triggered cancellation ensures cancelled tasks stay cancelled

3. Building Real Applications (10 min)

  • Memory object streams: producer-consumer with backpressure by default (vs asyncio.Queue)
  • Getting results from task groups: nonlocal pattern vs memory streams
  • Buffered byte streams: why AnyIO adds value even on Trio
  • anyio.Path: truly async file operations
  • pytest plugin: test under both asyncio and Trio

4. Ecosystem & Migration (6 min)

  • Major adopters revealed by pipdeptree
  • Migration patterns: asyncio.gather() → task groups, asyncio.Queue → memory streams
  • Why PyPI distribution matters: bugfixes on all Python versions
  • Why data scientists should care

5. The Hidden Dependency Reveal (5 min)

  • Live demo: pipdeptree -p anyio -r reveals dozens of packages depend on AnyIO
  • Why httpx, FastAPI, Jupyter, and Anthropic's MCP SDK chose AnyIO
  • You already have it installed—now you know how to use it directly

6. Q&A (1 min)

Prior Knowledge Expected

  • Basic understanding of async/await syntax
  • Familiarity with asyncio (having used asyncio.create_task() or asyncio.gather())
  • Experience with data processing or web development helpful but not required

Key Takeaways

  1. Understanding why edge-triggered cancellation causes bugs and how level-triggered cancellation fixes them
  2. Practical patterns for getting results from concurrent tasks (nonlocal vs memory streams)
  3. Migration paths from common asyncio patterns to AnyIO
  4. Awareness that AnyIO is already powering much of the Python async ecosystem

Why This Matters to PyData

Async programming appears in data workflows everywhere: parallel data fetching, ETL pipelines, ML model serving, streaming data processing. AnyIO's cancellation semantics and memory streams provide battle-tested patterns for these workflows. The talk bridges "I write async/await sometimes" and "I understand why my async code has weird timeout bugs."

Code examples will be available in a public repository before the conference.

Speaker Notes

I am diagnosed with schizoaffective disorder and autism spectrum disorder. I would appreciate a mid-day time slot (when my antipsychotics are least sedating) and would welcome any mentorship support available for speakers. This will be my second technical talk ever.

I would also appreciate a laser pointer to highlight blocks of example code during the presentation.

Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
marp
true

Why You Should Use AnyIO (and Why You Already Have It Installed)

⚡ Lightning Talk Edition


https://graingert.co.uk/why-anyio-already graingert

  • Core developer of AnyIO, Twisted, and Trio
  • Contributed to asyncio happy eyeballs + TaskGroup fixes

Misconception: asyncio == async/await

  • async/await is syntactic sugar over generators — decoupled from asyncio
  • Twisted, Trio, Curio all use the same syntax with different event loops
  • AnyIO uses sniffio to detect which framework is running, dispatches to the right API

asyncio.create_task() Is the goto of Concurrency

asyncio.create_task(myfunc())  # Fire and forget!
# Parent returns immediately, child runs unsupervised
# No guaranteed reunion point

One-way jump → same problems as goto:

  • ❌ Functions aren't black boxes — hidden background tasks survive returns
  • ❌ Resource cleanup breaks — async with can't track orphaned tasks
  • ❌ Errors silently drop — no stack to propagate up
  • ❌ Return statements lie — "done" doesn't mean done

The Fix: Structured Concurrency with Task Groups

# asyncio — UNSTRUCTURED 😰
async def unstructured():
    asyncio.create_task(myfunc())  # fire and forget
    asyncio.create_task(other())   # errors go nowhere
    return  # are we done? who knows!

# AnyIO — STRUCTURED 😌
async def structured():
    async with anyio.create_task_group() as tg:
        tg.start_soon(myfunc)
        tg.start_soon(other)
    # BLOCKED until ALL tasks finish
    # Errors propagate. Cleanup happens. Actually done.

Task groups = if/while/for of concurrency


Level Cancellation vs Edge Cancellation

AnyIO — level-triggered ✅ asyncio — edge-triggered ❌
with anyio.fail_after(0):
    try:
        await anyio.sleep(1)
        # raises CancelledError
    finally:
        await anyio.sleep(1000)
        # ALSO raises CancelledError
async with asyncio.timeout(0):
    try:
        await asyncio.sleep(1)
        # raises CancelledError
    finally:
        await asyncio.sleep(1000)
        # waits 1000 seconds! 😱

Edge cancellation = your 0s timeout becomes a 1000s timeout. With level cancellation every await in a cancelled scope fails. No surprises.


Shielded Cancel Scopes > asyncio.shield

# asyncio.shield — wraps ONE await, orphans the inner task, edge-triggered 😬
await asyncio.shield(db.execute(INSERT, data))

# AnyIO — shields an entire block, no orphans, level-triggered 😎
with anyio.CancelScope(shield=True):
    await anyio.to_thread.run_sync(db_blocking_write, data)
    await anyio.to_thread.run_sync(db_blocking_flush, data)
# Pending cancellation reliably re-raised here

Batteries Included

  • Memory object streamsasyncio.Queue but with backpressure, clone(), structured shutdown, and async for
  • BufferedByteReceiveStreamreceive_until(delimiter), receive_exactly(n) — even Trio doesn't have this
  • anyio.Path — async drop-in for pathlib (no more blocking the event loop)
  • Built-in pytest plugin@pytest.mark.anyio, no need for pytest-asyncio
  • Networking — TCP/UDP/TLS/subprocesses with Happy Eyeballs built in
  • PyPI-shipped bugfixes — don't wait for a new Python release to fix TaskGroup

pip install anyio

You already have it.

anyio==4.12.1
├── starlette → fastapi, mcp
├── httpx → mcp, jupyterlab
├── jupyter_server → notebook, jupyterlab
├── mcp
└── sse-starlette → mcp

httpx? FastAPI? Jupyter? MCP? AnyIO is already in your virtualenv.


TL;DR

  1. asyncio.create_task() is goto — use task groups instead
  2. Level-triggered cancellation prevents real bugs that edge cancellation causes
  3. Batteries included — streams, paths, pytest, networking
  4. You already have it installed — start using it on purpose

graingert on GitHub · graingert.co.uk/why-anyio-already

marp
true

Why you should use AnyIO and why you might already have it installed


https://graingert.co.uk/why-anyio-already graingert


  • I am a core developer of AnyIO, Twisted and Trio (and a few non-async libraries)
  • I have made contributions to the asyncio happy eyeballs support and fixes to TaskGroup

<style scoped>section{font-size:22px;}</style>
  • misconception: asyncio == async/await
  • the problems with asyncio.create_task
  • why you should use structured concurrency
  • edge cancellation vs level cancellation
  • asyncio.shield vs shielded CancelScopes
  • some of my favourite AnyIO features
    • channels (memory object streams) > asyncio.Queue
    • BufferedByteReceiveStream AnyIO > Trio
    • anyio.Path
    • pytest plugin built in
    • summary of features not covered so far
  • The advantages of being pip installable
  • why you already have AnyIO installed

asyncio != async/await

  • async/await is syntactic sugar over generators — completely decoupled from any event loop
  • Twisted, Trio, and Curio all use async/await with their own event loops
  • You can even use async/await with no event loop at all

No event loop required

import types

@types.coroutine
def _async_yield(v):
    return (yield v)

async def async_range():
    await _async_yield(1)
    await _async_yield(2)
    await _async_yield(3)

It's generators all the way down

coro = async_range()
gen = coro.__await__()
list(gen)  # [1, 2, 3]  — no asyncio, no event loop

For more generator tricks see also:


This means libraries like AnyIO can call either the asyncio API or the Trio API depending on what library is currently in use:

This is similar in approach to libraries like six which let you write code compatible with Python 2 and Python 3


from sniffio import current_async_library
async def sleep_for_one_loop_cycle():
    if current_async_library() == "asyncio":
        fut = asyncio.Future()
        asyncio.create_task(set_fut_result_soon(fut))
        await fut  # calls fut.__await__().send(None)
    elif current_async_library() == "trio":
        event = trio.Event()
        trio.lowlevel.spawn_system_task(set_event_soon, event)
        await event
        """
        this call calls
            (
                _async_yield(
                    WaitTaskRescheduled(abort_func)
                )
                .__await__()
                .send(outcome.Value(None))
            )
        """
    else:  # Twisted?
        raise RuntimeError("unsupported async framework")

The Problem with asyncio.create_task()

It's a "go statement" - and go statements break everything


What's a "go statement"?

# asyncio
asyncio.create_task(myfunc())  # Fire and forget!
# Control returns immediately, myfunc() runs in background
# Golang
go myfunc()  // Same thing
# Python threads  
threading.Thread(target=myfunc).start()  # Also same

Key problem: Control flow splits with one-way jump

  • Parent returns immediately
  • Child jumps to myfunc and runs unsupervised
  • No guaranteed reunion point

Problem 1: Functions Aren't Black Boxes Anymore

async def process_data(data):
    # Does this function spawn background tasks?
    # Are they still running after it returns?
    # You have NO IDEA without reading all the source code!
    await some_library_function(data)
    # Function returned... but is it done? 🤷

You can't reason locally about control flow

Every function call might secretly spawn tasks that outlive the function


Problem 2: Resource Cleanup Breaks

# This LOOKS safe... (pseudocode)
async with aopen("data.csv") as f:
    await process_file(f)

# File closed here... right?
# But what if process_file did this:
async def process_file(f):
    asyncio.create_task(read_data(f))  # Background task!
    return  # Function returns immediately
# Now: file is CLOSED while background task still uses it
# 💥 Error! (if you're lucky)

The language can't help you with automatic cleanup


Problem 3: Error Handling Breaks

async def background_task():
    raise ValueError("Something went wrong!")
# Start background task
task = asyncio.create_task(background_task())
# Error happens... but where does it go?
# Answer: NOWHERE! It's silently dropped!
# (Maybe printed to console if you're lucky)

Exceptions can't propagate because there's no stack to unwind

Compare to regular Python:

def my_function():
    raise ValueError("Something went wrong!")
my_function()  # Exception propagates to caller automatically

Problem 4: You Can't Tell If Code Is Finished

async def mystery_function():
    await do_something()
    return "done"

result = await mystery_function()
# Is mystery_function actually done?
# Or did it spawn tasks that are still running?
# NO WAY TO KNOW!

The "return" statement lies to you


The Root Cause: Unstructured Concurrency

create_task running off on its own One-way jump = no guaranteed cleanup, no error propagation, no completion tracking


Real-World Consequences

In asyncio programs:

Resource leaks - Files/sockets stay open because cleanup is manual
Silent failures - Errors in background tasks get dropped
Shutdown hangs - Can't wait for "done" because tasks are invisible
Operations on closed files - Tasks outlive the data they operate on

In data pipelines specifically:

Timeouts don't work - Can't cancel tasks you've lost track of
Can't reason about code - Every function is a potential landmine


The Solution: Structured Concurrency with Task Groups

# asyncio - UNSTRUCTURED (bad)
async def unstructured():
    asyncio.create_task(myfunc())  # Fire and forget
    asyncio.create_task(other())   # Where do errors go?
    return  # Are we done? Who knows!

# AnyIO - STRUCTURED (good)
async def structured():
    async with anyio.create_task_group() as tg:
        tg.start_soon(myfunc)
        tg.start_soon(other)
    # BLOCKED HERE until all tasks finish
    # All errors propagated automatically
    # All cleanup happens automatically
    return  # NOW we're actually done

Task groups enforce: tasks must complete before you can continue


anyio create task group


Dijkstra Was Right (Again)

In 1968, Dijkstra showed that goto statements break abstraction

In 2018, we learned that go statements do the same thing

goto (1960s) asyncio.create_task() (2010s)
One-way jump One-way jump
Breaks function boundaries Breaks function boundaries
No automatic cleanup No automatic cleanup
No error propagation No error propagation
Makes code impossible to reason about Makes code impossible to reason about

  • Solution then: Remove goto, add structured control flow (if/while/functions)
  • Solution now: Remove create_task, add structured concurrency (task groups)

Key Takeaway

asyncio.create_task() is the goto of concurrency

  • It's powerful
  • It seems convenient
  • It breaks everything

AnyIO task groups are the if/while/for of concurrency

  • They're structured
  • They preserve abstractions
  • They make the language features work again
  • They let you reason about your code

Further Reading

Nathaniel J. Smith (Trio author):
"Notes on structured concurrency, or: Go statement considered harmful"
graingert.co.uk/trio-sc

Original Dijkstra paper:
"Go To Statement Considered Harmful" (1968)
graingert.co.uk/dijkstra68


Two most important reasons to use AnyIO

  • you can mix it with asyncio and optionally/incrementally add Trio support
  • cancellations are level-triggered.

with level cancellation every async operation in a CancelScope will fail with a CancelledError

import anyio
async def example():
    with anyio.fail_after(0):
        try:
            await anyio.sleep(1)  # raises CancelledError
        finally:
            await anyio.sleep(1000)  # also raises CancelledError
    # raises TimeoutError as you leave the scope
anyio.run(example)

in asyncio

import asyncio
async def example():
    async with asyncio.timeout(0):
        try:
            await asyncio.sleep(1)   # raises CancelledError
        finally:
            await asyncio.sleep(1000)  # waits 1000 seconds
    # raises TimeoutError.... eventually
asyncio.run(example())

Edge cancellation can result in deadlocks on asyncio

For example, the following program hangs:


import asyncio
async def main():
    never = asyncio.Future()
    async def task_with_finally():
        try:
            print("task_with_finally running")
            await asyncio.sleep(10)
        finally:
            print("task_with_finally in finally")
            print("awaiting never-completing future (WILL HANG)")
            await never
            print("never reached")
    async def crash_soon():
        await asyncio.sleep(1)
        print("crash_soon raising")
        raise RuntimeError("boom")
    async with asyncio.TaskGroup() as tg:
        tg.create_task(task_with_finally())
        tg.create_task(crash_soon())
asyncio.run(main())

output

$ python demo.py
task_with_finally running
crash_soon raising
task_with_finally in finally
awaiting never-completing future (WILL HANG)

Output after hitting Ctrl+C a few times:

task_with_finally running
crash_soon raising
task_with_finally in finally
awaiting never-completing future (WILL HANG)
^C^Cunhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-1' coro=<main() done, defined at /home/graingert/projects/django/demo.py:4> exception=ExceptionGroup('unhandled errors in a TaskGroup', [RuntimeError('boom')])>
  + Exception Group Traceback (most recent call last):
  |   File "/home/graingert/projects/django/demo.py", line 22, in main
  |     async with asyncio.TaskGroup() as tg:
  |   File "/usr/lib/python3.12/asyncio/taskgroups.py", line 145, in __aexit__
  |     raise me from None
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/home/graingert/projects/django/demo.py", line 20, in crash_soon
    |     raise RuntimeError("boom")
    | RuntimeError: boom
    +------------------------------------
Traceback (most recent call last):
  File "/home/graingert/projects/django/demo.py", line 27, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/base_events.py", line 674, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.12/asyncio/base_events.py", line 641, in run_forever
    self._run_once()
  File "/usr/lib/python3.12/asyncio/base_events.py", line 1949, in _run_once
    event_list = self._selector.select(timeout)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/runners.py", line 157, in _on_sigint
    raise KeyboardInterrupt()
KeyboardInterrupt

example with anyio

import asyncio
import anyio
async def main():
    never = asyncio.Future()
    async def task_with_finally():
        try:
            print("task_with_finally running")
            await asyncio.sleep(10)
        finally:
            print("task_with_finally in finally")
            print("awaiting never-completing future (WILL HANG)")
            await never
            print("never reached")
    async def crash_soon():
        await asyncio.sleep(1)
        print("crash_soon raising")
        raise RuntimeError("boom")
    async with anyio.create_task_group() as tg:
        tg.start_soon(task_with_finally)
        tg.start_soon(crash_soon)
asyncio.run(main())

output:

$ python demo_anyio.py
task_with_finally running
crash_soon raising
task_with_finally in finally
awaiting never-completing future (WILL NOT HANG)
  + Exception Group Traceback (most recent call last):
  |   File "/home/graingert/projects/django/demo.py", line 27, in <module>
  |     asyncio.run(main())
  |   File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
  |     return runner.run(main)
  |            ^^^^^^^^^^^^^^^^
  |   File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
  |     return self._loop.run_until_complete(task)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/usr/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
  |     return future.result()
  |            ^^^^^^^^^^^^^^^
  |   File "/home/graingert/projects/django/demo.py", line 22, in main
  |     async with anyio.create_task_group() as tg:
  |   File "/home/graingert/.virtualenvs/anyio_pipdeptree/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 783, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/home/graingert/projects/django/demo.py", line 20, in crash_soon
    |     raise RuntimeError("boom")
    | RuntimeError: boom
    +------------------------------------

This is still a problem when using WebSockets over TLS

async def consume_ws():
    async with await connect_ws("wss://example.com/news") as ws:
        async for message in ws:
            await process(message)  # cancellation happens here
     # cancellation doesn't happen as we `__aexit__()` the context manager
async def example():
    async with asyncio.timeout(10):
        await consume_ws()  # could hang forever

Shielding from Cancellation

When you can't cancel — even if you want to

Some I/O operations are uncancellable by nature, or you must wait for the cancel to be processed by the OS:

  • Waiting for a thread to finish (loop.run_in_executor, anyio.to_thread.run_sync)
  • On Windows IOCP (Proactor)
    • To cancel pending I/O operations in an IOCP (I/O Completion Port) server, use CancelIoEx to target specific operations, or closesocket(handle) to cancel all pending I/O on a socket. Canceled operations complete with ERROR_OPERATION_ABORTED, and you must wait for the completion packet before freeing memory.

The async framework can raise CancelledError in your coroutine, but the underlying thread keeps running or something still needs to wait to be able to clear memory

You're not cancelling the work — you're just abandoning the future that was watching it. 👻


# IOCP
buffer = allocate_buffer(1024)
try:
    await write_buffer_to_socket(buffer, socket)
finally:
    # if write_buffer_to_socket is cancelled and we don't wait for the cancel
    # signal the OS could still be using the buffer and we send undefined
    # bytes to the socket.
    clear(buffer)

asyncio.shield — The Duct-Tape Approach

async def save_to_db(data):
    # We don't want cancellation to interrupt this
    await asyncio.shield(db.execute(INSERT, data))
    # ⚠️ If cancelled, shield absorbs the cancel...
    # ... but db.execute() keeps running as an orphaned task
    # ⚠️ Edge cancellation means the *next* await might
    # succeed even though we're "cancelled"

Problems

Edge-triggered: a CancelledError sneaks through on the next checkpoint after the shield exits
Orphaned inner task: the shielded work keeps running with no owner
No scope: shield applies to one await, not a logical block of work
Thread can't be shielded: run_in_executor inside a shield still abandons the thread


AnyIO Shielded Cancel Scopes — The Structured Approach

async def save_to_db(data):
    # Level-triggered: cancellation is *held* until we exit the shield
    with anyio.CancelScope(shield=True):
        await anyio.to_thread.run_sync(db_blocking_write, data)
        # Thread runs to completion — no orphan, no abandonment
        await anyio.to_thread.run_sync(db_blocking_flush, data)
        # Still shielded — the *whole scope* is protected
    # Pending cancellation is re-raised here, reliably
# Works correctly even when called inside a task group under timeout:
async def example():
    with anyio.fail_after(5):
        async with anyio.create_task_group() as tg:
            tg.start_soon(save_to_db, important_data)

Why it works

Level-triggered: cancellation is deferred, not lost — re-fires when you leave the scope
Thread-aware: run_sync joins the thread; the shield keeps the join alive
Scoped: protect a whole logical block, not just one await
No orphans: structured concurrency means every task has an owner


Shielding in Detail


asyncio.shield

asyncio.shield()


CancelScope(shield=True)

CancelScope(shield=True)


Comparison

asyncio.shield anyio.CancelScope(shield=True)
Cancellation model Edge (one-shot) Level (persistent, deferred)
Scope Single await Entire with block
Thread safety ❌ Abandons thread ✅ Joins thread to completion
Cancellation after exit ⚠️ Maybe (edge, unreliable) ✅ Always re-raised
Orphaned tasks ❌ Yes ✅ Never
Composable ❌ Not really ✅ Nests with task groups

More AnyIO Features


Backpressure by Default. Structured. Composable.

  • AnyIO provides MemoryObjectSendStream and MemoryObjectReceiveStream
  • like asyncio.Queue but you don't need to keep a count of how many producer/consumers you have
    • you just make a clone for each producer/consumer
    • use a with or async with to close the clones.
    • Once all the clones of one end of the memory object stream are closed iterating the other end will raise StopAsyncIteration.

import anyio
async def consume_ws(url, stream):
    async with stream, await connect_ws(url) as ws:
        async for msg in ws:
            await stream.send(msg)
async def news_and_weather():
    tx, rx = anyio.create_memory_object_stream[bytes]()  # default buffer size = 0
    async with tx, rx, anyio.create_task_group() as tg:
        tg.start_soon(consume_ws, "ws://example.com/news", tx.clone())
        tg.start_soon(consume_ws, "ws://example.com/weather", tx.clone())
        tx.close()
        async for item in rx:
            print(item)
anyio.run(news_and_weather)

Key properties

  • Buffer size defaults to 0 → automatic backpressure

  • async for works naturally

  • aclose() signals end-of-stream

  • clone() enables multiple consumers safely

  • ✅ Structured shutdown


asyncio.Queue Comparison

import asyncio
async def main():
    q = asyncio.Queue()  # unbounded by default (!)
    async def producer():
        for i in range(3):
            print("put", i)
            await q.put(i)
        # How do we signal completion?
    async def consumer():
        while True:
            item = await q.get()
            print("got", item)

Problems

  • ❌ Unbounded by default (no backpressure)

  • ❌ No async iteration

  • ❌ No built-in structured close (historically)

  • ❌ No clone() so requires sentinel values or custom shutdown logic


asyncio.Queue.shutdown() (3.13+)

Python 3.13 introduces:

q.shutdown()

But:

  • Only on new Python

  • Not widely deployed yet

  • Still no cloning

  • Still no structured fan-out


Conceptual Comparison

Feature AnyIO Stream asyncio.Queue
Default backpressure ✅ (buffer=0) ❌ (unbounded)
Async iteration
Structured close ⚠️ (3.13+)
Clone receivers
Trio-compatible

AnyIO streams compose with structured concurrency. asyncio.Queue predates it.


"If I'm already using Trio, I don't need AnyIO."

Most people assume this. But AnyIO adds real value even on the Trio backend.

What AnyIO adds on top of Trio

  • ✅ Backend portability (asyncio, Trio)

  • ✅ A stable public API for libraries

  • ✅ High-level stream utilities

  • ✅ Memory object streams

  • ✅ Buffered byte streams

  • ✅ Stapled streams

  • ✅ Thread/subprocess helpers


Trio vs AnyIO

  • Trio is a minimal framework — only gives you what is mandatory of a network framework
  • AnyIO is a portability + abstraction layer with batteries included.

If you write a library directly against Trio:

  • You lock out asyncio users.

If you write against AnyIO:

  • Trio users still get full Trio semantics.

  • asyncio users can incrementally adopt level cancellation or structured concurrency.

  • You get a bunch of cool extra tools


Buffered Byte Streams (AnyIO Feature Trio Lacks)

Trio provides SendStream / ReceiveStream.

But it does not provide:

  • Buffered reads

  • receive_exactly(n)

  • receive_until(delimiter)

  • Automatic read buffering

AnyIO does.


Demo --- AnyIO Buffered Byte Streams

import anyio
async def main():
    send, receive = anyio.create_memory_object_stream[bytes]()
    async def producer():
        await send.send(b"hello\nworld\n")
        await send.aclose()
    async def consumer():
        buffered = anyio.streams.buffered.BufferedByteReceiveStream(receive)
        line1 = await buffered.receive_until(b"\n", 4096)
        print("line1:", line1)
        line2 = await buffered.receive_until(b"\n", 4096)
        print("line2:", line2)
    async with anyio.create_task_group() as tg:
        tg.start_soon(producer)
        tg.start_soon(consumer)
anyio.run(main)

Output + What Just Happened

$ python demo_buffered_bytes.py
line1: b'hello'
line2: b'world'
  • The producer sent both lines in one chunk.
  • The consumer parsed them cleanly by delimiter.
  • No manual buffering. No partial read bookkeeping.

How You'd Do This in Trio

Trio gives you raw receive:

data = await stream.receive_some(1024)

But you must manually:

  • Accumulate into a buffer

  • Search for delimiters

  • Slice the buffer

  • Handle partial frames

  • Handle EOF correctly


Example (simplified):

I asked ChatGPT and it gave me this — can you spot the bug?

buffer = bytearray()
while True:
    chunk = await stream.receive_some(1024)
    if not chunk:
        break
    buffer.extend(chunk)
    while b"\n" in buffer:
        line, _, rest = buffer.partition(b"\n")
        print(line + b"\n")
        buffer = bytearray(rest)

Quadratic performance in the inner loop

Every iteration of while b"\n" in buffer does buffer = bytearray(rest), copying the remaining data each time. If you receive a chunk with many newlines, this is O(n²) in the number of bytes.


What AnyIO Adds Here

BufferedByteReceiveStream gives you:

  • receive_exactly(n)

  • receive_until(delimiter)

  • Proper EOF semantics

  • Efficient internal buffering

  • Works on both Trio and asyncio backends

This is a real ergonomic upgrade.


  • Trio gives you safety.
  • AnyIO gives you safety plus portability and batteries included.

citation:


anyio.Path: Async File Operations

The Problem with pathlib

from pathlib import Path
async def amain():
    # ⚠️ These all BLOCK the event loop!
    path = Path("data.txt")
    path.write_text("Hello!")      # Blocks
    content = path.read_text()     # Blocks
    exists = path.exists()         # Blocks

The Solution: anyio.Path

import anyio
async def amain():
    # ✅ All truly async - doesn't block!
    path = anyio.Path("data.txt")
    await path.write_text("Hello!")    # Async
    content = await path.read_text()  # Async
    exists = await path.exists()      # Async

Same API as pathlib, but async-native


Real Power: Parallel File Operations

# Process multiple files in parallel
data_dir = anyio.Path("training_data")
results = []
async def process_and_append(p):
    content = await p.read_text()
    results.append(parse_csv(content))
async with anyio.create_task_group() as tg:
    async for path in data_dir.iterdir():
        if await path.is_file() and path.suffix == '.csv':
            tg.start_soon(process_and_append, path)
# All files processed concurrently!

Key Features

Feature pathlib anyio.Path
Async operations ❌ Blocks ✅ async with threads
Parallel I/O ❌ Sequential ✅ Works with task groups
Event loop friendly ❌ Blocks ✅ Non-blocking
API compatibility ✅ Standard ✅ Same interface but async
Type hints ✅ Yes ✅ Yes

Key Takeaway: Drop-in replacement for pathlib that actually respects async/await


pytest plugin

  • AnyIO ships with a pytest plugin that it uses to test itself.
  • This means if you already depend on AnyIO you don't need pytest-asyncio as well.
@pytest.mark.anyio
async def test_something():
    assert await something() == "result"

  • By default the plugin runs your tests under both asyncio and Trio
  • if you're still gradually migrating to AnyIO and still only supportasyncio support
  • you can run your tests in asyncio mode only by adding the following to your root conftest.py
@pytest.fixture
def anyio_backend():
    return 'asyncio'

But await there's more! — Networking & I/O

Feature Benefit
TCP/UDP/UNIX sockets Happy Eyeballs built in; async/await UDP (no Transports/Protocols)
TLS streams TLSStream wraps any byte stream with TLS, not just sockets
Subprocesses run_process() / open_process() with async stream I/O on stdin/stdout/stderr
Signal handling open_signal_receiver() — async iterator over OS signals

But await there's more! — Streams & Concurrency

Feature Benefit
TextReceiveStream Incremental UTF-8 decoding over any byte stream
StapledStream Combine separate send/receive streams into one bidirectional stream
tg.start() await tg.start(server_fn) — blocks until the task signals it's ready
Synchronization primitives Lock, Condition, Event, Semaphore, CapacityLimiter — portable across backends

But await there's more! — Threads, Testing & Beyond

Feature Benefit
to_thread / from_thread Bidirectional sync↔async bridging with structured cancellation
Subinterpreters to_interpreters module for true parallelism (Python 3.13+)
Async functools anyio.functools.lru_cache for async functions

The Advantage of Being on PyPI

In Python 3.13, a number of bug-fixes were applied to asyncio.TaskGroup but they were considered breaking changes so were not backported to 3.11 or 3.12:

https://docs.python.org/3/whatsnew/3.13.html#asyncio

Improve the behavior of TaskGroup when an external cancellation collides with an internal cancellation. For example, when two task groups are nested and both experience an exception in a child task simultaneously, it was possible that the outer task group would hang, because its internal cancellation was swallowed by the inner task group.


  • you need to use the latest version of Python for fixed asyncio
  • Because AnyIO is hosted on PyPI you get bugfixes on all supported python versions
  • AnyIO is currently still supporting the EOL Python 3.9 (as of v4.12.1)

Asyncio is not bad

  • it's better than Twisted (I once spent a week fixing a missing six call — a whole class of bug that doesn't exist with asyncio)
  • but try making an LDAP server without Twisted!
  • some of the mistakes Twisted made were copied into asyncio
  • Curio is good! Unfortunately it's archived
  • Trio isn't perfect: it's slower than asyncio, especially with uvloop
  • AnyIO gives you options and batteries to play with

Normally at this stage of my talk I'd ask you to go run

pip install anyio

but if you're in this room you probably already have it in your virtual environments!


$ pip install httpx fastapi jupyter mcp pipdeptree
$ pipdeptree -p anyio -r  # reverse dependencies (dependants) of anyio

Watch the tree unfold: loads of packages you use daily depend on AnyIO


output:

anyio==4.12.1
├── starlette==0.52.1 [requires: anyio>=3.6.2,<5]
│   ├── fastapi==0.129.0 [requires: starlette>=0.40.0,<1.0.0]
│   ├── mcp==1.26.0 [requires: starlette>=0.27]
│   └── sse-starlette==3.2.0 [requires: starlette>=0.49.1]
│       └── mcp==1.26.0 [requires: sse-starlette>=1.6.1]
├── httpx==0.28.1 [requires: anyio]
│   ├── mcp==1.26.0 [requires: httpx>=0.27.1]
│   └── jupyterlab==4.5.4 [requires: httpx>=0.25.0,<1]
│       ├── notebook==7.5.3 [requires: jupyterlab>=4.5.3,<4.6]
│       │   └── jupyter==1.1.1 [requires: notebook]
│       └── jupyter==1.1.1 [requires: jupyterlab]
├── mcp==1.26.0 [requires: anyio>=4.5]
├── sse-starlette==3.2.0 [requires: anyio>=4.7.0]
│   └── mcp==1.26.0 [requires: sse-starlette>=1.6.1]
└── jupyter_server==2.17.0 [requires: anyio>=3.1.0]
    ├── notebook==7.5.3 [requires: jupyter_server>=2.4.0,<3]
    │   └── jupyter==1.1.1 [requires: notebook]
    ├── jupyter-lsp==2.3.0 [requires: jupyter_server>=1.1.2]
    │   └── jupyterlab==4.5.4 [requires: jupyter-lsp>=2.0.0]
    │       ├── notebook==7.5.3 [requires: jupyterlab>=4.5.3,<4.6]
    │       │   └── jupyter==1.1.1 [requires: notebook]
    │       └── jupyter==1.1.1 [requires: jupyterlab]
    ├── jupyterlab==4.5.4 [requires: jupyter_server>=2.4.0,<3]
    │   ├── notebook==7.5.3 [requires: jupyterlab>=4.5.3,<4.6]
    │   │   └── jupyter==1.1.1 [requires: notebook]
    │   └── jupyter==1.1.1 [requires: jupyterlab]
    ├── jupyterlab_server==2.28.0 [requires: jupyter_server>=1.21,<3]
    │   ├── notebook==7.5.3 [requires: jupyterlab_server>=2.28.0,<3]
    │   │   └── jupyter==1.1.1 [requires: notebook]
    │   └── jupyterlab==4.5.4 [requires: jupyterlab_server>=2.28.0,<3]
    │       ├── notebook==7.5.3 [requires: jupyterlab>=4.5.3,<4.6]
    │       │   └── jupyter==1.1.1 [requires: notebook]
    │       └── jupyter==1.1.1 [requires: jupyterlab]
    └── notebook_shim==0.2.4 [requires: jupyter_server>=1.8,<3]
        ├── notebook==7.5.3 [requires: notebook_shim>=0.2,<0.3]
        │   └── jupyter==1.1.1 [requires: notebook]
        └── jupyterlab==4.5.4 [requires: notebook_shim>=0.2]
            ├── notebook==7.5.3 [requires: jupyterlab>=4.5.3,<4.6]
            │   └── jupyter==1.1.1 [requires: notebook]
            └── jupyter==1.1.1 [requires: jupyterlab]

  • I've given you a whistle-stop tour of some of my favourite features, there's loads more
    • and more being added all the time
  • I hope I've persuaded you to give AnyIO a try
  • you might as well give it a go if you already have it installed

Any questions?

@graingert
Copy link
Author

Image to go along with talk:

Screenshot from 2026-02-11 08-26-10

@graingert
Copy link
Author

graingert commented Feb 11, 2026

asyncio != async/await

Coroutines are generator based in Python, which means not just asyncio can use them because async/await is totally decoupled from asyncio, this means frameworks like Twisted, Curio, Trio and more can support async function interfaces while being completely unrelated to asyncio. You can even use async/await to make your own generators:

>>> import types
>>> @types.coroutine
... def _async_yield(v):
...     return (yield v)
...     
>>> async def coro_fn():
...     await _async_yield(1)
...     await _async_yield(2)
...     await _async_yield(3)
...     
>>> coro = coro_fn()
>>> gen = coro.__await__()
>>> list(gen)
[1, 2, 3]
>>> 

For more generator tricks see also https://www.dabeaz.com/generators/ https://www.dabeaz.com/coroutines/ https://www.dabeaz.com/finalgenerator/

This means libraries like AnyIO can call either the asyncio API or the trio api depending on what library is currently in use:

This is similar in approach to libraries like six which let you write code compatible with Python 2 and Python 3

from sniffio import current_async_library

async def sleep_for_one_loop_cycle():
    if current_async_library() == "asyncio":
        fut = asyncio.Future()
        asyncio.create_task(set_fut_result_soon(fut))
        await fut  # calls fut.__await__().send(None)
    elif current_async_library() == "trio":
        event = trio.Event()
        trio.lowlevel.spawn_system_task(set_event_soon, event)
        await event  # calls _async_yield(WaitTaskRescheduled(abort_func)).__await__().send(outcome.Value(None))
    else:  # Twisted?
        raise RuntimeError("unsupported async framework")

The Problem with asyncio.create_task()

It's a "go statement" - and go statements break everything


What's a "go statement"?

# asyncio
asyncio.create_task(myfunc())  # Fire and forget!
# Control returns immediately, myfunc() runs in background

Golang

go myfunc() // Same thing

Python threads

threading.Thread(target=myfunc).start() # Also same

Key problem: Control flow splits with one-way jump

  • Parent returns immediately
  • Child jumps to myfunc and runs unsupervised
  • No guaranteed reunion point

Problem 1: Functions Aren't Black Boxes Anymore

async def process_data(data):
    # Does this function spawn background tasks?
    # Are they still running after it returns?
    # You have NO IDEA without reading all the source code!
    await some_library_function(data)
    # Function returned... but is it done? 🤷

You can't reason locally about control flow

Every function call might secretly spawn tasks that outlive the function


Problem 2: Resource Cleanup Breaks

# This LOOKS safe...
async with open("data.csv") as f:
    await process_file(f)
# File closed here... right?

But what if process_file did this:

async def process_file(f):
asyncio.create_task(read_data(f)) # Background task!
return # Function returns immediately

Now: file is CLOSED while background task still uses it

💥 Error! (if you're lucky)

The language can't help you with automatic cleanup


Problem 3: Error Handling Breaks

async def background_task():
    raise ValueError("Something went wrong!")

Start background task

task = asyncio.create_task(background_task())

Error happens... but where does it go?

Answer: NOWHERE! It's silently dropped!

(Maybe printed to console if you're lucky)

Exceptions can't propagate because there's no stack to unwind

Compare to regular Python:

def my_function():
    raise ValueError("Something went wrong!")

my_function() # Exception propagates to caller automatically


Problem 4: You Can't Tell If Code Is Finished

async def mystery_function():
    await do_something()
    return "done"

result = await mystery_function()

Is mystery_function actually done?

Or did it spawn tasks that are still running?

NO WAY TO KNOW!

The "return" statement lies to you


The Root Cause: Unstructured Concurrency

asyncio.create_task() control flow:
┌─────────────┐
│   Parent    │
│   task      │
└──────┬──────┘
       │
create_task()
       │
  ┌────┴────┐
  │         │ 

Parent Child ──→ myfunc()
returns (orphaned, unsupervised)

One-way jump = no guaranteed cleanup, no error propagation, no completion tracking


Real-World Consequences

In asyncio programs:

Backpressure problems - Can't tell how many tasks are running
Resource leaks - Files/sockets stay open because cleanup is manual
Silent failures - Errors in background tasks get dropped
Shutdown hangs - Can't wait for "done" because tasks are invisible
Race conditions - Tasks outlive the data they operate on

In data pipelines specifically:

Timeouts don't work - Can't cancel tasks you've lost track of
Parallel processing breaks - No way to collect results safely
Can't reason about code - Every function is a potential landmine


The Solution: Structured Concurrency with Task Groups

# asyncio - UNSTRUCTURED (bad)
async def unstructured():
    asyncio.create_task(myfunc())  # Fire and forget
    asyncio.create_task(other())   # Where do errors go?
    return  # Are we done? Who knows!

AnyIO - STRUCTURED (good)

async def structured():
async with anyio.create_task_group() as tg:
tg.start_soon(myfunc)
tg.start_soon(other)
# BLOCKED HERE until all tasks finish
# All errors propagated automatically
# All cleanup happens automatically
return # NOW we're actually done

Task groups enforce: tasks must complete before you can continue


Dijkstra Was Right (Again)

In 1968, Dijkstra showed that goto statements break abstraction

In 2018, we learned that go statements do the same thing

goto (1960s) asyncio.create_task() (2010s)
One-way jump One-way jump
Breaks function boundaries Breaks function boundaries
No automatic cleanup No automatic cleanup
No error propagation No error propagation
Makes code impossible to reason about Makes code impossible to reason about

Solution then: Remove goto, add structured control flow (if/while/functions)
Solution now: Remove create_task, add structured concurrency (task groups)


Key Takeaway

asyncio.create_task() is the goto of concurrency

  • It's powerful
  • It seems convenient
  • It breaks everything

AnyIO task groups are the if/while/for of concurrency

  • They're structured
  • They preserve abstractions
  • They make the language features work again
  • They let you reason about your code

Further Reading

Nathaniel J. Smith (Trio author):
"Notes on structured concurrency, or: Go statement considered harmful"

Original Dijkstra paper:
"Go To Statement Considered Harmful" (1968)

# The Problem with `asyncio.create_task()`

It's a "go statement" - and go statements break everything


What's a "go statement"?

# asyncio
asyncio.create_task(myfunc())  # Fire and forget!
# Control returns immediately, myfunc() runs in background

# Golang
go myfunc()  // Same thing

# Python threads  
threading.Thread(target=myfunc).start()  # Also same

Key problem: Control flow splits with one-way jump

  • Parent returns immediately
  • Child jumps to myfunc and runs unsupervised
  • No guaranteed reunion point

Problem 1: Functions Aren't Black Boxes Anymore

async def process_data(data):
    # Does this function spawn background tasks?
    # Are they still running after it returns?
    # You have NO IDEA without reading all the source code!
    await some_library_function(data)
    # Function returned... but is it done? 🤷

You can't reason locally about control flow

Every function call might secretly spawn tasks that outlive the function


Problem 2: Resource Cleanup Breaks

# This LOOKS safe...
async with open("data.csv") as f:
    await process_file(f)
# File closed here... right?

# But what if process_file did this:
async def process_file(f):
    asyncio.create_task(read_data(f))  # Background task!
    return  # Function returns immediately
    
# Now: file is CLOSED while background task still uses it
# 💥 Error! (if you're lucky)

The language can't help you with automatic cleanup


Problem 3: Error Handling Breaks

async def background_task():
    raise ValueError("Something went wrong!")

# Start background task
task = asyncio.create_task(background_task())

# Error happens... but where does it go?
# Answer: NOWHERE! It's silently dropped!
# (Maybe printed to console if you're lucky)

Exceptions can't propagate because there's no stack to unwind

Compare to regular Python:

def my_function():
    raise ValueError("Something went wrong!")
    
my_function()  # Exception propagates to caller automatically

Problem 4: You Can't Tell If Code Is Finished

async def mystery_function():
    await do_something()
    return "done"

result = await mystery_function()
# Is mystery_function actually done?
# Or did it spawn tasks that are still running?
# NO WAY TO KNOW!

The "return" statement lies to you


The Root Cause: Unstructured Concurrency

asyncio.create_task() control flow:

    ┌─────────────┐
    │   Parent    │
    │   task      │
    └──────┬──────┘
           │
    create_task()
           │
      ┌────┴────┐
      │         │ 
   Parent    Child ──→ myfunc()
   returns    (orphaned, unsupervised)

One-way jump = no guaranteed cleanup, no error propagation, no completion tracking


Real-World Consequences

In asyncio programs:

Backpressure problems - Can't tell how many tasks are running
Resource leaks - Files/sockets stay open because cleanup is manual
Silent failures - Errors in background tasks get dropped
Shutdown hangs - Can't wait for "done" because tasks are invisible
Race conditions - Tasks outlive the data they operate on

In data pipelines specifically:

Timeouts don't work - Can't cancel tasks you've lost track of
Parallel processing breaks - No way to collect results safely
Can't reason about code - Every function is a potential landmine


The Solution: Structured Concurrency with Task Groups

# asyncio - UNSTRUCTURED (bad)
async def unstructured():
    asyncio.create_task(myfunc())  # Fire and forget
    asyncio.create_task(other())   # Where do errors go?
    return  # Are we done? Who knows!

# AnyIO - STRUCTURED (good)
async def structured():
    async with anyio.create_task_group() as tg:
        tg.start_soon(myfunc)
        tg.start_soon(other)
    # BLOCKED HERE until all tasks finish
    # All errors propagated automatically
    # All cleanup happens automatically
    return  # NOW we're actually done

Task groups enforce: tasks must complete before you can continue


Dijkstra Was Right (Again)

In 1968, Dijkstra showed that goto statements break abstraction

In 2018, we learned that go statements do the same thing

goto (1960s) asyncio.create_task() (2010s)
One-way jump One-way jump
Breaks function boundaries Breaks function boundaries
No automatic cleanup No automatic cleanup
No error propagation No error propagation
Makes code impossible to reason about Makes code impossible to reason about

Solution then: Remove goto, add structured control flow (if/while/functions)
Solution now: Remove create_task, add structured concurrency (task groups)


Key Takeaway

asyncio.create_task() is the goto of concurrency

  • It's powerful
  • It seems convenient
  • It breaks everything

AnyIO task groups are the if/while/for of concurrency

  • They're structured
  • They preserve abstractions
  • They make the language features work again
  • They let you reason about your code

Further Reading

Nathaniel J. Smith (Trio author):
["Notes on structured concurrency, or: Go statement considered harmful"](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/)

Original Dijkstra paper:
["Go To Statement Considered Harmful" (1968)](https://homepages.cwi.nl/~storm/teaching/reader/Dijkstra68.pdf)

@graingert
Copy link
Author

graingert commented Feb 11, 2026

In my personal opinion the two most important reasons to use AnyIO is that you can mix it with asyncio and optionally/incrementally add Trio support, and cancellations are level triggered.

with level cancellation every async operation in a CancelScope will fail with a CancelledError

import anyio

async def example():
    with anyio.fail_after(0) as scope:
        try:
            await anyio.sleep(1)  # raises CancelledError
        finally:
            await anyio.sleep(1000)  # also raises CancelledError

    # raises TimeoutError as you leave the scope


anyio.run(example)
import asyncio

async def example():
    async with asyncio.timeout(0):
        try:
            await asyncio.sleep(1)   # raises CancelledError
        finally:
            await asyncio.sleep(1000)  # waits 1000 seconds
    # raises TimeoutError.... eventually


asyncio.run(example())

edge cancellation can result in deadlocks on asyncio for example the following program hangs

import asyncio


async def main():
    never = asyncio.Future()

    async def task_with_finally():
        try:
            print("task_with_finally running")
            await asyncio.sleep(10)
        finally:
            print("task_with_finally in finally")
            print("awaiting never-completing future (WILL HANG)")
            # This is a bit of an unfair example but it used to be that StreamWriter.wait_close()
            # hung in this case, but that's fixed on Python 3.13+, but it's not fixed for
            # async websockets. https://github.com/python/cpython/issues/104344
            await never
            print("never reached")

    async def crash_soon():
        await asyncio.sleep(1)
        print("crash_soon raising")
        raise RuntimeError("boom")

    async with asyncio.TaskGroup() as tg:
        tg.create_task(task_with_finally())
        tg.create_task(crash_soon())


asyncio.run(main())

output:

task_with_finally running
crash_soon raising
task_with_finally in finally
awaiting never-completing future (WILL HANG)
^C^Cunhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-1' coro=<main() done, defined at /home/graingert/projects/django/demo.py:4> exception=ExceptionGroup('unhandled errors in a TaskGroup', [RuntimeError('boom')])>
  + Exception Group Traceback (most recent call last):
  |   File "/home/graingert/projects/django/demo.py", line 22, in main
  |     async with asyncio.TaskGroup() as tg:
  |   File "/usr/lib/python3.12/asyncio/taskgroups.py", line 145, in __aexit__
  |     raise me from None
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/home/graingert/projects/django/demo.py", line 20, in crash_soon
    |     raise RuntimeError("boom")
    | RuntimeError: boom
    +------------------------------------
Traceback (most recent call last):
  File "/home/graingert/projects/django/demo.py", line 27, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/base_events.py", line 674, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.12/asyncio/base_events.py", line 641, in run_forever
    self._run_once()
  File "/usr/lib/python3.12/asyncio/base_events.py", line 1949, in _run_once
    event_list = self._selector.select(timeout)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/runners.py", line 157, in _on_sigint
    raise KeyboardInterrupt()
KeyboardInterrupt
import asyncio
import anyio

async def main():
    never = asyncio.Future()

    async def task_with_finally():
        try:
            print("task_with_finally running")
            await asyncio.sleep(10)
        finally:
            print("task_with_finally in finally")
            print("awaiting never-completing future (WILL HANG)")
            await never
            print("never reached")

    async def crash_soon():
        await asyncio.sleep(1)
        print("crash_soon raising")
        raise RuntimeError("boom")

    async with anyio.create_task_group() as tg:
        tg.start_soon(task_with_finally)
        tg.start_soon(crash_soon)


asyncio.run(main())

output:

task_with_finally running
crash_soon raising
task_with_finally in finally
awaiting never-completing future (WILL NOT HANG)
  + Exception Group Traceback (most recent call last):
  |   File "/home/graingert/projects/django/demo.py", line 27, in <module>
  |     asyncio.run(main())
  |   File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
  |     return runner.run(main)
  |            ^^^^^^^^^^^^^^^^
  |   File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
  |     return self._loop.run_until_complete(task)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/usr/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
  |     return future.result()
  |            ^^^^^^^^^^^^^^^
  |   File "/home/graingert/projects/django/demo.py", line 22, in main
  |     async with anyio.create_task_group() as tg:
  |   File "/home/graingert/.virtualenvs/anyio_pipdeptree/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 783, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/home/graingert/projects/django/demo.py", line 20, in crash_soon
    |     raise RuntimeError("boom")
    | RuntimeError: boom
    +------------------------------------

This is still a problem when using websockets over TLS

async def consume_ws():
    async with await connect_ws("wss://example.com/news") as ws:
        async for message in ws:
            await process(message)  # cancellation happens here
     # cancellation doesn't happen as we `__aexit__()` the context manager

async def example():
    async with asyncio.timeout(10):
        await consume_ws()  # could hang forever

@graingert
Copy link
Author

graingert commented Feb 11, 2026

Backpressure by Default. Structured. Composable.

AnyIO provides MemoryObjectSendStream and MemoryObjectReceiveStream instead of Queues, but you don't need to keep a count of how many Queue producer/consumers you have you just make a clone for each producer/consumer and use a with or async with to close the clones. Once all the clones of one end of the memory object stream are closed iterating the other end will raise StopAsyncIteration.

import anyio

async def consume_ws(url, stream):
    async with stream, await connect_ws(url) as ws:
        async for msg in ws:
            await stream.send(msg)

async def news_and_weather():
    tx, rx = anyio.create_memory_object_stream[bytes]()  # default buffer size = 0
    async with tx, rx, anyio.create_task_group() as tg:
        tg.start_soon(consume_ws, "ws://example.com/news", tx.clone())
        tg.start_soon(consume_ws, "ws://example.com/weather", tx.clone())
        tx.close()
        async for item in rx:
            print(item)

anyio.run(main)

Key properties

  • Buffer size defaults to 0 → automatic backpressure

  • async for works naturally

  • aclose() signals end-of-stream

  • clone() enables multiple consumers safely

  • ✅ Structured shutdown

Slide 2 — asyncio.Queue Comparison

import asyncio

async def main():
    q = asyncio.Queue()  # unbounded by default (!)

    async def producer():
        for i in range(3):
            print("put", i)
            await q.put(i)
        # How do we signal completion?

    async def consumer():
        while True:
            item = await q.get()
            print("got", item)

Problems

  • ❌ Unbounded by default (no backpressure)

  • ❌ No async iteration

  • ❌ No built-in structured close (historically)

  • ❌ No clone()

  • ❌ Requires sentinel values or custom shutdown logic


Slide 4 — asyncio.Queue.shutdown() (3.13+)

Python 3.13 introduces:

q = asyncio.Queue()
q.shutdown()

But:

  • Only on new Python

  • Not widely deployed yet

  • Need to manually keep track of the number of consumers and producers


Slide 5 — Conceptual Comparison

Feature | AnyIO Stream | asyncio.Queue -- | -- | -- Default backpressure | ✅ (buffer=0) | ❌ (unbounded) Async iteration | ✅ | ❌ Structured close | ✅ | ⚠️ (3.13+) Clone receivers | ✅ | ❌ Cancellation semantics | Level | Edge Trio-compatible | ✅ | ❌

AnyIO streams compose with structured concurrency. asyncio.Queue predates it.

@graingert
Copy link
Author

graingert commented Feb 11, 2026

citation:

Excellent — this is a very strong positioning moment in a talk.

Most people assume:

“If I’m already using Trio, I don’t need AnyIO.”

But AnyIO adds real value even on the Trio backend.

Below are two ready-to-use slide sections:


Slide 1 — Why Use AnyIO If You’re Already Using Trio?

Trio gives you structured concurrency.

AnyIO gives you portability + batteries.

What AnyIO adds on top of Trio

  • ✅ Backend portability (asyncio, Trio)

  • ✅ A stable public API for libraries

  • ✅ High-level stream utilities

  • ✅ Memory object streams

  • ✅ Buffered byte streams

  • ✅ Stapled streams

  • ✅ Thread/subprocess helpers

  • ✅ Ecosystem compatibility (FastAPI, httpx, etc.)

  • ✅ a pytest plugin


The key idea

Trio is a runtime.

AnyIO is a portability + abstraction layer with extra primitives.

If you write a library directly against Trio:

  • You lock out asyncio users.

If you write against AnyIO:

  • Trio users still get full Trio semantics.

  • asyncio users can adopt you incrementally.


The practical takeaway

If you're building:

  • A framework

  • A reusable library

  • Infrastructure code

You should strongly prefer AnyIO APIs, even when running on Trio.


Slide 2 — Buffered Byte Streams (AnyIO Feature Trio Lacks)

Trio provides SendStream / ReceiveStream.

But it does not provide:

  • Buffered reads

  • receive_exactly(n)

  • receive_until(delimiter)

AnyIO does.


Demo — AnyIO Buffered Byte Streams

From:
https://anyio.readthedocs.io/en/stable/streams.html#buffered-byte-streams

import anyio.streams.buffered

async def main():
    send, receive = anyio.create_memory_object_stream

    async def producer():
        await send.send(b"hello\nworld\n")
        await send.aclose()

    async def consumer():
        buffered = anyio.streams.buffered.BufferedByteReceiveStream(receive)

        line1 = await buffered.receive_until(b"\n")
        print("line1:", line1)

        line2 = await buffered.receive_until(b"\n")
        print("line2:", line2)

    async with anyio.create_task_group() as tg:
        tg.start_soon(producer)
        tg.start_soon(consumer)

anyio.run(main)

Output

line1: b'hello\n'
line2: b'world\n'

What Just Happened?

  • The producer sent both lines in one chunk.

  • The consumer parsed them cleanly by delimiter.

  • No manual buffering.

  • No partial read bookkeeping.


How You’d Do This in Trio

Trio gives you raw receive:

data = await stream.receive_some(1024)

But you must manually:

  • Accumulate into a buffer

  • Search for delimiters

  • Slice the buffer

  • Handle partial frames

  • Handle EOF correctly

Example (simplified):

I'm not even sure how to do it correctly but I asked ChatGPT and it gave me this, and I'm pretty sure it's wrong or inefficient.
buffer = bytearray()

while True:
    chunk = await stream.receive_some(1024)
    if not chunk:
        break
    buffer.extend(chunk)

    while b"\n" in buffer:
        line, _, rest = buffer.partition(b"\n")
        print(line + b"\n")
        buffer = bytearray(rest)

What AnyIO Adds Here

BufferedByteReceiveStream gives you:

  • receive_exactly(n)

  • receive_until(delimiter)

  • Proper EOF semantics

  • Works on both Trio and asyncio backends

This is a real ergonomic upgrade.

Strong Closing Line

Trio gives you safety.

AnyIO gives you safety plus portability and higher-level primitives.

@graingert
Copy link
Author

graingert commented Feb 11, 2026

anyio.Path: Truly Async File Operations

The Problem with pathlib

from pathlib import Path

# ⚠️ These all BLOCK the event loop!
path = Path("data.txt")
path.write_text("Hello!")      # Blocks
content = path.read_text()     # Blocks
exists = path.exists()         # Blocks

The Solution: anyio.Path

# ✅ All truly async - doesn't block!
path = anyio.Path("data.txt")
await path.write_text("Hello!")    # Async
content = await path.read_text()  # Async
exists = await path.exists()      # Async

Same API as pathlib, but async-native


Real Power: Parallel File Operations

# Process multiple files in parallel
data_dir = anyio.Path("training_data")

results = []

async with anyio.create_task_group() as tg:
    async for path in data_dir.iterdir():
        if await path.is_file() and path.suffix == '.csv':
            async def process_and_append(p):
                content = await p.read_text()
                results.append(parse_csv(content))
            tg.start_soon(process_and_append, path)

# All files processed concurrently!

Key Features

Feature pathlib anyio.Path
Async operations ❌ Blocks ✅ async with threads
Parallel I/O ❌ Sequential ✅ Works with task groups
Event loop friendly ❌ Blocks ✅ Non-blocking
API compatibility ✅ Standard ✅ Same interface
Type hints ✅ Yes ✅ Yes

Key Takeaway: Drop-in replacement for pathlib that actually respects async/await

@graingert
Copy link
Author

graingert commented Feb 11, 2026

pytest plugin

AnyIO ships with a pytest plugin that it uses to test itself. This means if you already depend on AnyIO you don't need pytest-asyncio as well.

By default the plugin runs your tests under both asyncio and Trio, so if you're still gradually migrating to AnyIO still require asyncio support you can run your tests in asyncio mode only by adding the following to your root conftest.py

@pytest.fixture
def anyio_backend():
    return 'asyncio'

advantages of AnyIO being on PyPI

In python 3.13 a number of bug-fixes were applied to asyncio.TaskGroup but they were considered breaking changes so were not backported to 3.11 or 3.12:

https://docs.python.org/3/whatsnew/3.13.html#asyncio

Improve the behavior of TaskGroup when an external cancellation collides with an internal cancellation. For example, when two task groups are nested and both experience an exception in a child task simultaneously, it was possible that the outer task group would hang, because its internal cancellation was swallowed by the inner task group.

this means to use structured concurrency with asyncio reliably you need to be on the latest python version. Because AnyIO is hosted on PyPI you can get bugfixes on all supported python versions just by updating AnyIO without needing to upgrade your whole interpreter, in fact AnyIO is currently still supporting the EOL Python 3.9

@graingert
Copy link
Author

graingert commented Feb 11, 2026

Normally at this stage of my talk I'd ask you to go run

pip install anyio

but if you're in this room you probably already have it in your virtual environments!
Watch the tree unfold: dozens of packages you use daily depend on AnyIO

pip install httpx starlette jupyter mcp pipdeptree
pipdeptree -p anyio -r  # reverse dependencies (dependants) of anyio
anyio==4.12.1
├── starlette==0.52.1 [requires: anyio>=3.6.2,<5]
│   ├── mcp==1.26.0 [requires: starlette>=0.27]
│   └── sse-starlette==3.2.0 [requires: starlette>=0.49.1]
│       └── mcp==1.26.0 [requires: sse-starlette>=1.6.1]
├── httpx==0.28.1 [requires: anyio]
│   ├── mcp==1.26.0 [requires: httpx>=0.27.1]
│   └── jupyterlab==4.5.4 [requires: httpx>=0.25.0,<1]
│       ├── notebook==7.5.3 [requires: jupyterlab>=4.5.3,<4.6]
│       │   └── jupyter==1.1.1 [requires: notebook]
│       └── jupyter==1.1.1 [requires: jupyterlab]
├── mcp==1.26.0 [requires: anyio>=4.5]
├── sse-starlette==3.2.0 [requires: anyio>=4.7.0]
│   └── mcp==1.26.0 [requires: sse-starlette>=1.6.1]
└── jupyter_server==2.17.0 [requires: anyio>=3.1.0]
    ├── notebook==7.5.3 [requires: jupyter_server>=2.4.0,<3]
    │   └── jupyter==1.1.1 [requires: notebook]
    ├── jupyter-lsp==2.3.0 [requires: jupyter_server>=1.1.2]
    │   └── jupyterlab==4.5.4 [requires: jupyter-lsp>=2.0.0]
    │       ├── notebook==7.5.3 [requires: jupyterlab>=4.5.3,<4.6]
    │       │   └── jupyter==1.1.1 [requires: notebook]
    │       └── jupyter==1.1.1 [requires: jupyterlab]
    ├── jupyterlab==4.5.4 [requires: jupyter_server>=2.4.0,<3]
    │   ├── notebook==7.5.3 [requires: jupyterlab>=4.5.3,<4.6]
    │   │   └── jupyter==1.1.1 [requires: notebook]
    │   └── jupyter==1.1.1 [requires: jupyterlab]
    ├── jupyterlab_server==2.28.0 [requires: jupyter_server>=1.21,<3]
    │   ├── notebook==7.5.3 [requires: jupyterlab_server>=2.28.0,<3]
    │   │   └── jupyter==1.1.1 [requires: notebook]
    │   └── jupyterlab==4.5.4 [requires: jupyterlab_server>=2.28.0,<3]
    │       ├── notebook==7.5.3 [requires: jupyterlab>=4.5.3,<4.6]
    │       │   └── jupyter==1.1.1 [requires: notebook]
    │       └── jupyter==1.1.1 [requires: jupyterlab]
    └── notebook_shim==0.2.4 [requires: jupyter_server>=1.8,<3]
        ├── notebook==7.5.3 [requires: notebook_shim>=0.2,<0.3]
        │   └── jupyter==1.1.1 [requires: notebook]
        └── jupyterlab==4.5.4 [requires: notebook_shim>=0.2]
            ├── notebook==7.5.3 [requires: jupyterlab>=4.5.3,<4.6]
            │   └── jupyter==1.1.1 [requires: notebook]
            └── jupyter==1.1.1 [requires: jupyterlab]

@graingert
Copy link
Author

Any questions?

@graingert
Copy link
Author

Add a slide about who this talk is aimed at

@graingert
Copy link
Author

asyncio_create_task

@graingert
Copy link
Author

graingert commented Feb 21, 2026


Shielding from Cancellation

When you can't cancel — even if you want to

Some I/O operations are uncancellable by nature:

  • Waiting for a thread to finish (loop.run_in_executor, anyio.to_thread.run_sync)
  • Sending data that's already been handed to the OS kernel
  • Third-party blocking calls that ignore signals

The async framework can raise CancelledError in your coroutine, but the underlying thread keeps running.

You're not cancelling the work — you're just abandoning the future that was watching it. 👻


asyncio.shield — The Duct-Tape Approach

async def save_to_db(data):
    # We don't want cancellation to interrupt this
    await asyncio.shield(db.execute(INSERT, data))
    # ⚠️ If cancelled, shield absorbs the cancel...
    # ... but db.execute() keeps running as an orphaned task
    # ⚠️ Edge cancellation means the *next* await might
    # succeed even though we're "cancelled"

Problems

Edge-triggered: a CancelledError sneaks through on the next checkpoint after the shield exits
Orphaned inner task: the shielded work keeps running with no owner
No scope: shield applies to one await, not a logical block of work
Thread can't be shielded: run_in_executor inside a shield still abandons the thread


AnyIO Shielded Cancel Scopes — The Structured Approach

async def save_to_db(data):
    # Level-triggered: cancellation is *held* until we exit the shield
    with anyio.CancelScope(shield=True):
        await anyio.to_thread.run_sync(db_blocking_write, data)
        # Thread runs to completion — no orphan, no abandonment
        await anyio.to_thread.run_sync(db_blocking_flush, data)
        # Still shielded — the *whole scope* is protected
    # Pending cancellation is re-raised here, reliably

# Works correctly even when called inside a task group under timeout:
async def example():
    with anyio.fail_after(5):
        async with anyio.create_task_group() as tg:
            tg.start_soon(save_to_db, important_data)

Why it works

Level-triggered: cancellation is deferred, not lost — re-fires when you leave the scope
Thread-aware: run_sync joins the thread; the shield keeps the join alive
Scoped: protect a whole logical block, not just one await
No orphans: structured concurrency means every task has an owner


The Thread Problem in Detail

asyncio.shield + run_in_executor          anyio CancelScope(shield=True)
                                          + to_thread.run_sync
─────────────────────────────────         ──────────────────────────────
coroutine  ──shield──► Future             coroutine ──shield scope──►
               │                                         │
           cancelled ✗◄── outer cancel              outer cancel
               │           arrives                   arrives
           executor │                                    │
           thread   │◄─── still running!            deferred ⏸
           (orphan) ▼     nobody waiting             thread   │
                  result                             finishes ▼
                  → void  (lost!)                   result delivered ✓
                                                    cancel re-raised ✓

asyncio.shield is a one-way valve. AnyIO's shield is a pressure vessel — it holds the cancellation until you're ready to handle it safely.


Comparison

asyncio.shield anyio.CancelScope(shield=True)
Cancellation model Edge (one-shot) Level (persistent, deferred)
Scope Single await Entire with block
Thread safety ❌ Abandons thread ✅ Joins thread to completion
Cancellation after exit ⚠️ Maybe (edge, unreliable) ✅ Always re-raised
Orphaned tasks ❌ Yes ✅ Never
Composable ❌ Not really ✅ Nests with task groups

If asyncio.shield is a raincoat with holes,
CancelScope(shield=True) is a proper airlock. 🚀

@graingert
Copy link
Author

shield_diagram

@graingert
Copy link
Author

graingert commented Feb 21, 2026

shield_asyncio
shield_anyio
shield_anyio(1)

@graingert
Copy link
Author

graingert commented Feb 21, 2026

shield_asyncio(1)
shield_anyio(2)
shield_anyio(3)
shield_anyio(4)

@graingert
Copy link
Author

shield_anyio(5)
shield_asyncio(2)

@graingert
Copy link
Author

shield_asyncio(3)

@graingert
Copy link
Author

graingert commented Feb 22, 2026

When building an agentic team with MCP, AnyIO is the best way to manage concurrent models.

You should spend some time learning AnyIO because it will pay off when you build your own application made up of a team of agents running concurrently

Demonstrate concurrent streams of models compare with asyncio

@MarkKoz
Copy link

MarkKoz commented Mar 4, 2026

Wrote down my thoughts and feedback as I went through the slides. I realise that you'll be talking over these to fill in some of the gaps I found, so some points may be moot.

  • Add a blank line between blocks of code to improve readability
  • Remove links since the audience won't go to them during your talk and is unlikely to remember the URLs.
  • Consider cutting the code that shows calling different backends. I think folks can understand the concept without it.
  • You call it "the problem" (singular) and state the problem is that it's a "go statement". Then, you go on to list 4 other problems. I found this confusing to read, but it may not be confusing when it's presented.
  • Problem 4 repeats what Problem 1 shows
  • Dijkstra Was Right section is redundant
  • With a task group, the function becomes responsible for the lifecycle of the tasks. This isn't always desirable. For example, a web server that has asynchronous operations (spawns a background task and sends a response without waiting). In such cases (which aren't particularly uncommon), presenting task groups as the solution is not as clear. It may be outside the scope of the talk, but I can see the audience asking "but what if I do want to return without waiting?"
  • Definition of structured concurrency wasn't clear to me. What is "structured" about it?
  • "They're structured" - this reads as an implication there are inherent benefits to task groups being structured, but it's not explicitly clear what those benefits is. List those benefits instead of letting the term "structured" do the heavy lifting.
  • "you can mix it with asyncio and optionally/incrementally add Trio support" - why would someone want to do that? We don't have to focus on asyncio or trio specifically. More generally, why is it important for an application to be decoupled from an async/await backend?
  • You mention a CancelScope without defining what it is (until several slides later). Is there a way to explain level cancellation without also explaining cancellation scopes?
  • Your anyio deadlock example code says "WILL HANG", but your output says "WILL NOT HANG"
  • WebSocket example may be redundant. I appreciate a more real-world example, but it's less clear here why it deadlocks compared to your previous example.
  • The explanations for level and edge cancellation feel scattered across slides.
  • Considering cutting buffered byte streams or queues
  • You didn't cover "why you already have AnyIO installed"
  • Parts of this seem AI-generated, and I don't like its writing style; it doesn't feel authentic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment