Last active
March 6, 2026 02:04
-
-
Save hasansezertasan/bbfcb7fa39cd44ed15707bb70d21fdaf to your computer and use it in GitHub Desktop.
I love Litestar! https://docs.litestar.dev/latest/usage/events.html
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| from functools import partial | |
| from typing import Any | |
| from aio_pika import DeliveryMode | |
| from aio_pika import ExchangeType | |
| from aio_pika import IncomingMessage | |
| from aio_pika import Message | |
| from aio_pika import connect_robust | |
| from litestar import Litestar | |
| from litestar import Request | |
| from litestar import get | |
| from litestar.events import BaseEventEmitterBackend | |
| from litestar.events import EventListener | |
| from litestar.events import listener | |
| class RabbitMQEventEmitterBackend(BaseEventEmitterBackend): | |
| def __init__( | |
| self, | |
| listeners: list[EventListener], | |
| amqp_url: str, | |
| exchange_name: str = "litestar.events", | |
| queue_name: str = "litestar.listeners", | |
| ) -> None: | |
| super().__init__(listeners) | |
| self.amqp_url = amqp_url | |
| self.exchange_name = exchange_name | |
| self.queue_name = queue_name | |
| self._emit_queue: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue() | |
| self._connection = None | |
| self._channel = None | |
| self._exchange = None | |
| self._queue = None | |
| self._publisher_task: asyncio.Task[None] | None = None | |
| self._consumer_tag: str | None = None | |
| self._listeners_by_event: dict[str, list[EventListener]] = {} | |
| for event_listener in listeners: | |
| for event_id in event_listener.event_ids: | |
| self._listeners_by_event.setdefault(event_id, []).append(event_listener) | |
| async def __aenter__(self) -> RabbitMQEventEmitterBackend: | |
| self._connection = await connect_robust(self.amqp_url) | |
| self._channel = await self._connection.channel() | |
| await self._channel.set_qos(prefetch_count=32) | |
| self._exchange = await self._channel.declare_exchange( | |
| self.exchange_name, | |
| ExchangeType.TOPIC, | |
| durable=True, | |
| ) | |
| self._queue = await self._channel.declare_queue( | |
| self.queue_name, | |
| durable=True, | |
| ) | |
| for event_id in self._listeners_by_event: | |
| await self._queue.bind(self._exchange, routing_key=event_id) | |
| self._publisher_task = asyncio.create_task(self._publisher_loop()) | |
| self._consumer_tag = await self._queue.consume(self._handle_message) | |
| return self | |
| async def __aexit__(self, exc_type, exc, tb) -> None: | |
| await self._emit_queue.put(None) | |
| if self._publisher_task is not None: | |
| await self._publisher_task | |
| if self._queue is not None and self._consumer_tag is not None: | |
| await self._queue.cancel(self._consumer_tag) | |
| if self._channel is not None: | |
| await self._channel.close() | |
| if self._connection is not None: | |
| await self._connection.close() | |
| def emit(self, event_id: str, *args: Any, **kwargs: Any) -> None: | |
| payload = { | |
| "event_id": event_id, | |
| "args": args, | |
| "kwargs": kwargs, | |
| } | |
| self._emit_queue.put_nowait(payload) | |
| async def _publisher_loop(self) -> None: | |
| while True: | |
| item = await self._emit_queue.get() | |
| if item is None: | |
| break | |
| body = json.dumps(item).encode("utf-8") | |
| message = Message( | |
| body=body, | |
| content_type="application/json", | |
| delivery_mode=DeliveryMode.PERSISTENT, | |
| ) | |
| await self._exchange.publish( | |
| message, | |
| routing_key=item["event_id"], | |
| ) | |
| async def _handle_message(self, message: IncomingMessage) -> None: | |
| async with message.process(requeue=False): | |
| payload = json.loads(message.body.decode("utf-8")) | |
| event_id = payload["event_id"] | |
| args = payload.get("args", []) | |
| kwargs = payload.get("kwargs", {}) | |
| listeners = self._listeners_by_event.get(event_id, []) | |
| if not listeners: | |
| return | |
| tasks = [ | |
| asyncio.create_task(event_listener.fn(*args, **kwargs)) | |
| for event_listener in listeners | |
| ] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| for result in results: | |
| if isinstance(result, Exception): | |
| # Replace with real logging | |
| print(f"Listener error for {event_id}: {result}") | |
| @listener("user.created") | |
| async def send_welcome_email(email: str, **kwargs: object) -> None: | |
| print(f"Send welcome email to {email}") | |
| @get("/users/create") | |
| def create_user(request: Request) -> dict[str, str]: | |
| request.app.emit("user.created", email="alice@example.com") | |
| return {"status": "queued"} | |
| app = Litestar( | |
| route_handlers=[create_user], | |
| listeners=[send_welcome_email], | |
| event_emitter_backend=partial( | |
| RabbitMQEventEmitterBackend, | |
| amqp_url="amqp://guest:guest@localhost/", | |
| exchange_name="litestar.events", | |
| queue_name="litestar.listeners", | |
| ), | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment