Skip to content

Instantly share code, notes, and snippets.

@GGontijo
Last active May 30, 2025 17:52
Show Gist options
  • Select an option

  • Save GGontijo/0a1902fa50bd71b9b2dd1040f5e0e736 to your computer and use it in GitHub Desktop.

Select an option

Save GGontijo/0a1902fa50bd71b9b2dd1040f5e0e736 to your computer and use it in GitHub Desktop.
rabbitmq consumer and publisher example
import json
import logging
import time
from typing import Callable
import pika
from pika import (
BasicProperties,
ConnectionParameters,
PlainCredentials,
SelectConnection,
)
from pika.channel import Channel
from pika.exceptions import AMQPChannelError, AMQPConnectionError
class RMQPublisher:
"""
Classe especializada para enviar mensagens ao RabbitMQ usando BlockingConnection.
"""
def __init__(
self,
host: str,
port: int,
username: str,
password: str,
heartbeat: int = 60,
blocked_connection_timeout: int = 60,
) -> None:
self.host = host
self.port = port
self.username = username
self.password = password
self.heartbeat = heartbeat
self.blocked_connection_timeout = blocked_connection_timeout
self.connection = None
self.channel = None
self._connect()
def _connect(self):
while True:
try:
credentials = PlainCredentials(
username=self.username, password=self.password
)
parameters = ConnectionParameters(
host=self.host,
port=self.port,
credentials=credentials,
heartbeat=self.heartbeat,
blocked_connection_timeout=self.blocked_connection_timeout,
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
logging.info("Successfully connected to RabbitMQ for publishing")
break
except (AMQPConnectionError, AMQPChannelError) as e:
logging.error(f"Connection failed: {e}. Retrying in 5 seconds...")
time.sleep(5)
def publish_message(
self, queue: str, message: str, properties: BasicProperties = None
):
try:
if not self.connection or self.connection.is_closed:
self._connect()
self.channel.queue_declare(
queue=queue, durable=True, arguments={"x-max-priority": 99}
)
self.channel.basic_publish(
exchange="",
routing_key=queue,
body=json.dumps(message) if isinstance(message, dict) else message,
properties=properties
or BasicProperties(
delivery_mode=2,
headers={"resource": "core"},
content_type="application/json",
app_id="BotVerseCore",
),
)
logging.info(f"Published message to queue '{queue}'")
except (
AMQPConnectionError,
AMQPChannelError,
pika.exceptions.StreamLostError,
) as e:
logging.error(f"Publish failed: {e}. Attempting reconnect and retry...")
self._connect()
# Retry publishing once after reconnect
self.publish_message(queue, message, properties)
def close(self):
if self.connection and self.connection.is_open:
self.connection.close()
logging.info("Publisher connection closed")
def queue_declare(self, **kwargs):
"""Wrapper para declaração de fila com reconexão automática"""
try:
if not self.connection or self.connection.is_closed:
self._connect()
return self.channel.queue_declare(**kwargs)
except (
AMQPConnectionError,
AMQPChannelError,
pika.exceptions.StreamLostError,
) as e:
logging.error(
f"Queue declare failed: {e}. Attempting reconnect and retry..."
)
self._connect()
# Retry once after reconnect
return self.channel.queue_declare(**kwargs)
class RMQConsumer:
"""
Classe especializada para consumir mensagens do RabbitMQ usando SelectConnection.
"""
def __init__(
self,
host: str,
port: int,
username: str,
password: str,
blocked_connection_timeout: int = 60,
heartbeat: int = 60,
auto_reconnect: bool = True,
) -> None:
self.host = host
self.port = port
self.username = username
self.password = password
self.heartbeat = heartbeat
self.blocked_connection_timeout = blocked_connection_timeout
self.auto_reconnect = auto_reconnect
self.connection = None
self.channel: Channel = None
self.queues: list[tuple[str, Callable]] = []
def connect(self) -> None:
"""Estabelece conexão com o RabbitMQ."""
try:
credentials = PlainCredentials(
username=self.username, password=self.password
)
connection_parameters = ConnectionParameters(
host=self.host,
port=self.port,
credentials=credentials,
heartbeat=self.heartbeat,
blocked_connection_timeout=self.blocked_connection_timeout,
)
self.connection = SelectConnection(
connection_parameters, on_open_callback=self.on_connection_open
)
logging.info("Successfully connected to RabbitMQ (Consumer)")
except (AMQPConnectionError, AMQPChannelError) as e:
logging.error(f"Connection failed: {e}. Retrying in 5 seconds...")
time.sleep(5)
def on_connection_open(self, connection):
"""Callback quando a conexão é aberta."""
self.channel = connection.channel(on_open_callback=self.on_channel_open)
def on_channel_open(self, channel: Channel):
"""Callback quando o canal é aberto."""
self.channel = channel
for queue, callback in self.queues:
self.channel.queue_declare(queue=queue, durable=True)
self.channel.basic_consume(queue=queue, on_message_callback=callback)
logging.info(f"Consumer ready. Consuming messages from {self.queues}...")
def add_queue(self, queue: str, callback: Callable):
"""Adiciona uma fila e callback de consumo."""
self.queues.append((queue, callback))
def start_consuming(self):
"""Inicia o consumo contínuo com reconexão automática."""
while self.auto_reconnect:
try:
if not self.connection or not self.connection.is_open:
self.connect()
self.connection.ioloop.start()
except (AMQPConnectionError, AMQPChannelError) as e:
logging.error(f"Consumption error: {e}. Reconnecting in 5 seconds...")
time.sleep(5)
except Exception as e:
logging.error(f"xUnexpected error during consumption: {e}")
time.sleep(5)
def stop_consuming(self):
"""Encerra o consumo de forma segura."""
if self.connection and self.connection.is_open:
logging.info("Stopping consumer...")
try:
self.connection.ioloop.stop()
self.connection.close()
except Exception as e:
logging.error(f"Error while stopping consumer: {e}")
else:
logging.info("Consumer not running or already closed.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment