Skip to content

Instantly share code, notes, and snippets.

@graingert
Created February 28, 2026 06:49
Show Gist options
  • Select an option

  • Save graingert/9dc56c649cef82c4cc21a5c24b8d0dcf to your computer and use it in GitHub Desktop.

Select an option

Save graingert/9dc56c649cef82c4cc21a5c24b8d0dcf to your computer and use it in GitHub Desktop.
@app.get("/news-and-weather")
@contextlib.asynccontextmanager
async def news_and_weather() -> AsyncGenerator[MemoryObjectRecieveStream[bytes]]:
async with anyio.create_task_group() as tg:
tx, rx = anyio.create_memory_object_stream[bytes]()
with tx, rx:
tg.start_soon(ws_stream, "ws://example.com/news", tx.clone())
tg.start_soon(ws_stream, "ws://example.com/weather", tx.clone())
tx.close()
yield rx
@tiangolo
Copy link

Would this work?

Modified to be a full working example that can run as is:

import random

import anyio
from anyio.streams.memory import MemoryObjectSendStream
from beartype.typing import AsyncIterable
from fastapi import FastAPI

app = FastAPI()


async def fake_stream(url: str, tx: MemoryObjectSendStream[bytes]) -> None:
    i = 0
    while True:
        i += 1
        await tx.send(f"{url} message {i}".encode())
        await anyio.sleep(1 + 1 * random.random())


@app.get("/news-and-weather")
async def news_and_weather() -> AsyncIterable[bytes]:
    async with anyio.create_task_group() as tg:
        tx, rx = anyio.create_memory_object_stream[bytes]()
        async with tx, rx:
            tg.start_soon(fake_stream, "ws://example.com/news", tx.clone())
            tg.start_soon(fake_stream, "ws://example.com/weather", tx.clone())
            tx.close()
            async for message in rx:
                yield message

Then:

$ curl -X 'GET' \
  'http://127.0.0.1:8000/news-and-weather' \
  -H 'accept: application/jsonl'

"ws://example.com/news message 1"
"ws://example.com/weather message 1"
"ws://example.com/weather message 2"
"ws://example.com/news message 2"
"ws://example.com/weather message 3"
"ws://example.com/news message 3"
"ws://example.com/news message 4"
"ws://example.com/weather message 4"
"ws://example.com/weather message 5"
"ws://example.com/news message 5"
"ws://example.com/weather message 6"
"ws://example.com/news message 6"
"ws://example.com/weather message 7"
"ws://example.com/news message 7"
"ws://example.com/weather message 8"
"ws://example.com/news message 8"
"ws://example.com/weather message 9"
"ws://example.com/news message 9"
"ws://example.com/weather message 10"
"ws://example.com/news message 10"
"ws://example.com/weather message 11"
"ws://example.com/weather message 12"
"ws://example.com/news message 11"
"ws://example.com/news message 12"
"ws://example.com/weather message 13"
"ws://example.com/news message 13"
"ws://example.com/weather message 14"

@graingert
Copy link
Author

graingert commented Feb 28, 2026

No! It's illegal to yield inside a TaskGroup unless you're in an @contextlib.asyncontextmanager, see https://peps.python.org/pep-0789/

See what happens if you add a with anyio.move_on_after(5): or if you add a raise to fake_stream the task group can't cancel the current task correctly

https://claude.ai/share/9cfb34b5-fadf-4213-82a8-90703bc08002

@graingert
Copy link
Author

@tiangolo
Copy link

tiangolo commented Mar 1, 2026

Thank you! I'm learning about all this.

Thanks for the Claude link, and the PEP, I just read it all.

I iterated a few times with LLMs, trying to find something that would work (in theory) but still keep the style I wanted to have in FastAPI.

Would this work?

import random
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

import anyio
from anyio.abc import ObjectReceiveStream
from anyio.streams.memory import MemoryObjectSendStream
from fastapi import FastAPI

app = FastAPI()


async def fake_stream(url: str, tx: MemoryObjectSendStream[bytes]) -> None:
    i = 0
    while True:
        i += 1
        await tx.send(f"{url} message {i}".encode())
        await anyio.sleep(1 + 1 * random.random())


@asynccontextmanager
async def combined_stream() -> AsyncIterator[ObjectReceiveStream[bytes]]:
    async with anyio.create_task_group() as tg:
        tx, rx = anyio.create_memory_object_stream[bytes]()
        async with tx, rx:
            tg.start_soon(fake_stream, "ws://example.com/news", tx.clone())
            tg.start_soon(fake_stream, "ws://example.com/weather", tx.clone())
            tx.close()
            yield rx
            tg.cancel_scope.cancel()


@app.get("/news-and-weather")
async def news_and_weather() -> AsyncIterator[bytes]:
    async with combined_stream() as stream:
        async for message in stream:
            yield message

@tiangolo
Copy link

tiangolo commented Mar 1, 2026

Just for completeness, in case anyone else comes here checking for this discussion that we finished on chat, here's the version that should work fine.

The trick is that the task group is created in a dependency with yield, that is converted by FastAPI to an async context manager, and the exit code is run after the response is done, so nothing ever "yields out" of the task group:

import random
from collections.abc import AsyncIterator
from typing import Annotated

import anyio
from anyio.abc import TaskGroup
from anyio.streams.memory import MemoryObjectSendStream
from fastapi import Depends, FastAPI

app = FastAPI()


async def fake_stream(url: str, tx: MemoryObjectSendStream[bytes]) -> None:
    i = 0
    while True:
        i += 1
        await tx.send(f"{url} message {i}".encode())
        await anyio.sleep(1 + 1 * random.random())


async def task_group() -> AsyncIterator[TaskGroup]:
    async with anyio.create_task_group() as tg:
        yield tg
        tg.cancel_scope.cancel()


@app.get("/news-and-weather")
async def news_and_weather(
    tg: Annotated[TaskGroup, Depends(task_group, scope="request")],
) -> AsyncIterator[bytes]:
    tx, rx = anyio.create_memory_object_stream[bytes]()
    async with tx, rx:
        tg.start_soon(fake_stream, "ws://example.com/news", tx.clone())
        tg.start_soon(fake_stream, "ws://example.com/weather", tx.clone())
        tx.close()
        async for message in rx:
            yield message

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment