Last active
May 30, 2025 17:52
-
-
Save GGontijo/0a1902fa50bd71b9b2dd1040f5e0e736 to your computer and use it in GitHub Desktop.
rabbitmq consumer and publisher example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import logging | |
| import time | |
| from typing import Callable | |
| import pika | |
| from pika import ( | |
| BasicProperties, | |
| ConnectionParameters, | |
| PlainCredentials, | |
| SelectConnection, | |
| ) | |
| from pika.channel import Channel | |
| from pika.exceptions import AMQPChannelError, AMQPConnectionError | |
| class RMQPublisher: | |
| """ | |
| Classe especializada para enviar mensagens ao RabbitMQ usando BlockingConnection. | |
| """ | |
| def __init__( | |
| self, | |
| host: str, | |
| port: int, | |
| username: str, | |
| password: str, | |
| heartbeat: int = 60, | |
| blocked_connection_timeout: int = 60, | |
| ) -> None: | |
| self.host = host | |
| self.port = port | |
| self.username = username | |
| self.password = password | |
| self.heartbeat = heartbeat | |
| self.blocked_connection_timeout = blocked_connection_timeout | |
| self.connection = None | |
| self.channel = None | |
| self._connect() | |
| def _connect(self): | |
| while True: | |
| try: | |
| credentials = PlainCredentials( | |
| username=self.username, password=self.password | |
| ) | |
| parameters = ConnectionParameters( | |
| host=self.host, | |
| port=self.port, | |
| credentials=credentials, | |
| heartbeat=self.heartbeat, | |
| blocked_connection_timeout=self.blocked_connection_timeout, | |
| ) | |
| self.connection = pika.BlockingConnection(parameters) | |
| self.channel = self.connection.channel() | |
| logging.info("Successfully connected to RabbitMQ for publishing") | |
| break | |
| except (AMQPConnectionError, AMQPChannelError) as e: | |
| logging.error(f"Connection failed: {e}. Retrying in 5 seconds...") | |
| time.sleep(5) | |
| def publish_message( | |
| self, queue: str, message: str, properties: BasicProperties = None | |
| ): | |
| try: | |
| if not self.connection or self.connection.is_closed: | |
| self._connect() | |
| self.channel.queue_declare( | |
| queue=queue, durable=True, arguments={"x-max-priority": 99} | |
| ) | |
| self.channel.basic_publish( | |
| exchange="", | |
| routing_key=queue, | |
| body=json.dumps(message) if isinstance(message, dict) else message, | |
| properties=properties | |
| or BasicProperties( | |
| delivery_mode=2, | |
| headers={"resource": "core"}, | |
| content_type="application/json", | |
| app_id="BotVerseCore", | |
| ), | |
| ) | |
| logging.info(f"Published message to queue '{queue}'") | |
| except ( | |
| AMQPConnectionError, | |
| AMQPChannelError, | |
| pika.exceptions.StreamLostError, | |
| ) as e: | |
| logging.error(f"Publish failed: {e}. Attempting reconnect and retry...") | |
| self._connect() | |
| # Retry publishing once after reconnect | |
| self.publish_message(queue, message, properties) | |
| def close(self): | |
| if self.connection and self.connection.is_open: | |
| self.connection.close() | |
| logging.info("Publisher connection closed") | |
| def queue_declare(self, **kwargs): | |
| """Wrapper para declaração de fila com reconexão automática""" | |
| try: | |
| if not self.connection or self.connection.is_closed: | |
| self._connect() | |
| return self.channel.queue_declare(**kwargs) | |
| except ( | |
| AMQPConnectionError, | |
| AMQPChannelError, | |
| pika.exceptions.StreamLostError, | |
| ) as e: | |
| logging.error( | |
| f"Queue declare failed: {e}. Attempting reconnect and retry..." | |
| ) | |
| self._connect() | |
| # Retry once after reconnect | |
| return self.channel.queue_declare(**kwargs) | |
| class RMQConsumer: | |
| """ | |
| Classe especializada para consumir mensagens do RabbitMQ usando SelectConnection. | |
| """ | |
| def __init__( | |
| self, | |
| host: str, | |
| port: int, | |
| username: str, | |
| password: str, | |
| blocked_connection_timeout: int = 60, | |
| heartbeat: int = 60, | |
| auto_reconnect: bool = True, | |
| ) -> None: | |
| self.host = host | |
| self.port = port | |
| self.username = username | |
| self.password = password | |
| self.heartbeat = heartbeat | |
| self.blocked_connection_timeout = blocked_connection_timeout | |
| self.auto_reconnect = auto_reconnect | |
| self.connection = None | |
| self.channel: Channel = None | |
| self.queues: list[tuple[str, Callable]] = [] | |
| def connect(self) -> None: | |
| """Estabelece conexão com o RabbitMQ.""" | |
| try: | |
| credentials = PlainCredentials( | |
| username=self.username, password=self.password | |
| ) | |
| connection_parameters = ConnectionParameters( | |
| host=self.host, | |
| port=self.port, | |
| credentials=credentials, | |
| heartbeat=self.heartbeat, | |
| blocked_connection_timeout=self.blocked_connection_timeout, | |
| ) | |
| self.connection = SelectConnection( | |
| connection_parameters, on_open_callback=self.on_connection_open | |
| ) | |
| logging.info("Successfully connected to RabbitMQ (Consumer)") | |
| except (AMQPConnectionError, AMQPChannelError) as e: | |
| logging.error(f"Connection failed: {e}. Retrying in 5 seconds...") | |
| time.sleep(5) | |
| def on_connection_open(self, connection): | |
| """Callback quando a conexão é aberta.""" | |
| self.channel = connection.channel(on_open_callback=self.on_channel_open) | |
| def on_channel_open(self, channel: Channel): | |
| """Callback quando o canal é aberto.""" | |
| self.channel = channel | |
| for queue, callback in self.queues: | |
| self.channel.queue_declare(queue=queue, durable=True) | |
| self.channel.basic_consume(queue=queue, on_message_callback=callback) | |
| logging.info(f"Consumer ready. Consuming messages from {self.queues}...") | |
| def add_queue(self, queue: str, callback: Callable): | |
| """Adiciona uma fila e callback de consumo.""" | |
| self.queues.append((queue, callback)) | |
| def start_consuming(self): | |
| """Inicia o consumo contínuo com reconexão automática.""" | |
| while self.auto_reconnect: | |
| try: | |
| if not self.connection or not self.connection.is_open: | |
| self.connect() | |
| self.connection.ioloop.start() | |
| except (AMQPConnectionError, AMQPChannelError) as e: | |
| logging.error(f"Consumption error: {e}. Reconnecting in 5 seconds...") | |
| time.sleep(5) | |
| except Exception as e: | |
| logging.error(f"xUnexpected error during consumption: {e}") | |
| time.sleep(5) | |
| def stop_consuming(self): | |
| """Encerra o consumo de forma segura.""" | |
| if self.connection and self.connection.is_open: | |
| logging.info("Stopping consumer...") | |
| try: | |
| self.connection.ioloop.stop() | |
| self.connection.close() | |
| except Exception as e: | |
| logging.error(f"Error while stopping consumer: {e}") | |
| else: | |
| logging.info("Consumer not running or already closed.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment