Skip to content

Instantly share code, notes, and snippets.

@hasansezertasan
Last active March 6, 2026 02:04
Show Gist options
  • Select an option

  • Save hasansezertasan/bbfcb7fa39cd44ed15707bb70d21fdaf to your computer and use it in GitHub Desktop.

Select an option

Save hasansezertasan/bbfcb7fa39cd44ed15707bb70d21fdaf to your computer and use it in GitHub Desktop.
from __future__ import annotations
import asyncio
import json
from functools import partial
from typing import Any
from aio_pika import DeliveryMode
from aio_pika import ExchangeType
from aio_pika import IncomingMessage
from aio_pika import Message
from aio_pika import connect_robust
from litestar import Litestar
from litestar import Request
from litestar import get
from litestar.events import BaseEventEmitterBackend
from litestar.events import EventListener
from litestar.events import listener
class RabbitMQEventEmitterBackend(BaseEventEmitterBackend):
def __init__(
self,
listeners: list[EventListener],
amqp_url: str,
exchange_name: str = "litestar.events",
queue_name: str = "litestar.listeners",
) -> None:
super().__init__(listeners)
self.amqp_url = amqp_url
self.exchange_name = exchange_name
self.queue_name = queue_name
self._emit_queue: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue()
self._connection = None
self._channel = None
self._exchange = None
self._queue = None
self._publisher_task: asyncio.Task[None] | None = None
self._consumer_tag: str | None = None
self._listeners_by_event: dict[str, list[EventListener]] = {}
for event_listener in listeners:
for event_id in event_listener.event_ids:
self._listeners_by_event.setdefault(event_id, []).append(event_listener)
async def __aenter__(self) -> RabbitMQEventEmitterBackend:
self._connection = await connect_robust(self.amqp_url)
self._channel = await self._connection.channel()
await self._channel.set_qos(prefetch_count=32)
self._exchange = await self._channel.declare_exchange(
self.exchange_name,
ExchangeType.TOPIC,
durable=True,
)
self._queue = await self._channel.declare_queue(
self.queue_name,
durable=True,
)
for event_id in self._listeners_by_event:
await self._queue.bind(self._exchange, routing_key=event_id)
self._publisher_task = asyncio.create_task(self._publisher_loop())
self._consumer_tag = await self._queue.consume(self._handle_message)
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
await self._emit_queue.put(None)
if self._publisher_task is not None:
await self._publisher_task
if self._queue is not None and self._consumer_tag is not None:
await self._queue.cancel(self._consumer_tag)
if self._channel is not None:
await self._channel.close()
if self._connection is not None:
await self._connection.close()
def emit(self, event_id: str, *args: Any, **kwargs: Any) -> None:
payload = {
"event_id": event_id,
"args": args,
"kwargs": kwargs,
}
self._emit_queue.put_nowait(payload)
async def _publisher_loop(self) -> None:
while True:
item = await self._emit_queue.get()
if item is None:
break
body = json.dumps(item).encode("utf-8")
message = Message(
body=body,
content_type="application/json",
delivery_mode=DeliveryMode.PERSISTENT,
)
await self._exchange.publish(
message,
routing_key=item["event_id"],
)
async def _handle_message(self, message: IncomingMessage) -> None:
async with message.process(requeue=False):
payload = json.loads(message.body.decode("utf-8"))
event_id = payload["event_id"]
args = payload.get("args", [])
kwargs = payload.get("kwargs", {})
listeners = self._listeners_by_event.get(event_id, [])
if not listeners:
return
tasks = [
asyncio.create_task(event_listener.fn(*args, **kwargs))
for event_listener in listeners
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
# Replace with real logging
print(f"Listener error for {event_id}: {result}")
@listener("user.created")
async def send_welcome_email(email: str, **kwargs: object) -> None:
print(f"Send welcome email to {email}")
@get("/users/create")
def create_user(request: Request) -> dict[str, str]:
request.app.emit("user.created", email="alice@example.com")
return {"status": "queued"}
app = Litestar(
route_handlers=[create_user],
listeners=[send_welcome_email],
event_emitter_backend=partial(
RabbitMQEventEmitterBackend,
amqp_url="amqp://guest:guest@localhost/",
exchange_name="litestar.events",
queue_name="litestar.listeners",
),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment