Last active
December 23, 2021 15:35
-
-
Save elbaro/77e669306b847e31566315d688c17757 to your computer and use it in GitHub Desktop.
RabbitMQ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pika | |
| from pika.adapters.blocking_connection import BlockingConnection, BlockingChannel | |
| import time | |
| import atexit | |
| from typing import List, Iterator, Tuple | |
| from multiprocessing import Queue, Process | |
| import queue | |
| class RabbitConsumer(object): | |
| """ | |
| A RabbitMQ consumer that provides data in batch. | |
| If channel is given, host and port are ignored. | |
| If channel is not given, host and port are used to create a new channel. | |
| Args: | |
| channel (BlockingChannel): the channel instance with ack enabled. | |
| queue (str): the name of a queue. | |
| """ | |
| def __init__(self, queue: str, host: str = 'localhost', port: int = 5672, channel: BlockingChannel = None): | |
| if channel is None: | |
| self.conn = BlockingConnection(pika.ConnectionParameters(host=host, port=port)) | |
| self.channel = self.conn.channel() | |
| else: | |
| self.conn = None | |
| self.channel = channel | |
| self.channel.queue_declare(queue=queue) | |
| self.queue = queue | |
| self.buf = Queue() | |
| # start receiving messages | |
| self.channel.basic_consume(self._handle_receive, queue=queue) | |
| self.process = Process(target=self.channel.start_consuming) | |
| self.process.start() | |
| atexit.register(self.close) | |
| def close(self): | |
| """ | |
| Close the connection. | |
| If a msg is not acked until the disconnect, it is considered not delivered. | |
| """ | |
| if self.process is not None: | |
| self.process.terminate() | |
| self.process.join() | |
| self.process = None | |
| self.conn.close() | |
| def _handle_receive(self, _channel, method, _header, body): | |
| body = body.decode() | |
| self.buf.put((body, method.delivery_tag)) | |
| def one(self, timeout=None): | |
| """ | |
| Consume one message. (blocking) | |
| After processing the message, you should call consumer.ack(ack_tag). | |
| Returns: | |
| A tuple (msg, ack_tag). | |
| Raise: | |
| raise on timeout. | |
| """ | |
| return self.buf.get(block=True, timeout=timeout) | |
| def one_batch(self, max_batch_size:int) -> Tuple[List, List]: | |
| """ | |
| Consume up to `max_batch_size` messages. | |
| Wait until at least one msg is available. | |
| After processing the message, you should call consumer.ack. | |
| Args: | |
| max_batch_size: the number of messages to consume. | |
| Returns: (List,List). | |
| The first is a list of messages. | |
| The second is a list of ack tags. | |
| """ | |
| l = [self.buf.get(block=True)] | |
| for i in range(max_batch_size-1): | |
| try: | |
| l.append(self.buf.get(block=False)) | |
| except queue.Empty: | |
| break | |
| msgs = [x[0] for x in l] | |
| acks = [x[1] for x in l] | |
| return (msgs, acks) | |
| def iter_batch(self, max_batch_size:int) -> Iterator[Tuple[List,List]]: | |
| while True: | |
| yield self.one_batch(max_batch_size) | |
| def ack(self, ack_tags: List): | |
| """ | |
| Report that you successfully processed messages. | |
| Args: | |
| ack_tags: a single ack tag or a list of ack tags of successful messages. | |
| """ | |
| if isinstance(ack_tags, list): | |
| for tag in ack_tags: | |
| self.channel.basic_ack(tag) | |
| else: | |
| self.channel.basic_ack(ack_tags) | |
| def ack_upto(self, ack_tag): | |
| """ | |
| Report that you successfully processed all messages up to `ack_tag`. | |
| Args: | |
| ack_tag: the ack tag of the last successful message. | |
| """ | |
| self.channel.basic_ack(ack_tag, multiple=True) | |
| def nack(self, ack_tags: List, requeue): | |
| """ | |
| Report that you fail to process messages. | |
| Args: | |
| ack_tags: a list of ack tags of successful messages. | |
| """ | |
| if isinstance(ack_tags, list): | |
| for tag in ack_tags: | |
| self.channel.basic_nack(tag) | |
| else: | |
| self.channel.basic_nack(ack_tags) | |
| def nack_upto(self, ack_tag, requeue): | |
| """ | |
| Report that you fail to process all messages up to `ack_tag`. | |
| Args: | |
| ack_tag: the ack tag of the last successful message. | |
| """ | |
| self.channel.basic_nack(ack_tag, multiple=True, requeue=requeue) | |
| class RabbitProducer(object): | |
| """ | |
| A RabbitMQ consumer that provides data in batch. | |
| If channel is given, host and port are ignored. | |
| If channel is not given, host and port are used to create a new channel. | |
| Args: | |
| channel (BlockingChannel): the channel instance with ack enabled. | |
| queue (str): the name of a queue. | |
| """ | |
| def __init__(self, queue: str, host: str = 'localhost', port: int = 5672, | |
| channel: BlockingChannel = None): | |
| if channel is None: | |
| self.conn = BlockingConnection(pika.ConnectionParameters(host=host, port=port)) | |
| self.channel = self.conn.channel() | |
| else: | |
| self.conn = None | |
| self.channel = channel | |
| self.queue = queue | |
| self.channel.queue_declare(queue=queue) | |
| # make sure deliveries | |
| self.channel.confirm_delivery() | |
| def send(self, msg: str) -> bool: | |
| """ | |
| send the message. (sync) | |
| Args: | |
| msg: a single msg(str) | |
| Return: | |
| bool: True on success | |
| """ | |
| return self.channel.basic_publish( | |
| '', | |
| self.queue, | |
| msg, | |
| properties=pika.BasicProperties(content_type='text/plain', | |
| delivery_mode=2), # persistent | |
| mandatory=True) | |
| def close(self): | |
| """ | |
| Close the connection. | |
| After the call to this method, you cannot `send`. | |
| """ | |
| if self.conn is not None: | |
| self.conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment