Skip to content

Instantly share code, notes, and snippets.

@elbaro
Last active December 23, 2021 15:35
Show Gist options
  • Select an option

  • Save elbaro/77e669306b847e31566315d688c17757 to your computer and use it in GitHub Desktop.

Select an option

Save elbaro/77e669306b847e31566315d688c17757 to your computer and use it in GitHub Desktop.
RabbitMQ
import pika
from pika.adapters.blocking_connection import BlockingConnection, BlockingChannel
import time
import atexit
from typing import List, Iterator, Tuple
from multiprocessing import Queue, Process
import queue
class RabbitConsumer(object):
"""
A RabbitMQ consumer that provides data in batch.
If channel is given, host and port are ignored.
If channel is not given, host and port are used to create a new channel.
Args:
channel (BlockingChannel): the channel instance with ack enabled.
queue (str): the name of a queue.
"""
def __init__(self, queue: str, host: str = 'localhost', port: int = 5672, channel: BlockingChannel = None):
if channel is None:
self.conn = BlockingConnection(pika.ConnectionParameters(host=host, port=port))
self.channel = self.conn.channel()
else:
self.conn = None
self.channel = channel
self.channel.queue_declare(queue=queue)
self.queue = queue
self.buf = Queue()
# start receiving messages
self.channel.basic_consume(self._handle_receive, queue=queue)
self.process = Process(target=self.channel.start_consuming)
self.process.start()
atexit.register(self.close)
def close(self):
"""
Close the connection.
If a msg is not acked until the disconnect, it is considered not delivered.
"""
if self.process is not None:
self.process.terminate()
self.process.join()
self.process = None
self.conn.close()
def _handle_receive(self, _channel, method, _header, body):
body = body.decode()
self.buf.put((body, method.delivery_tag))
def one(self, timeout=None):
"""
Consume one message. (blocking)
After processing the message, you should call consumer.ack(ack_tag).
Returns:
A tuple (msg, ack_tag).
Raise:
raise on timeout.
"""
return self.buf.get(block=True, timeout=timeout)
def one_batch(self, max_batch_size:int) -> Tuple[List, List]:
"""
Consume up to `max_batch_size` messages.
Wait until at least one msg is available.
After processing the message, you should call consumer.ack.
Args:
max_batch_size: the number of messages to consume.
Returns: (List,List).
The first is a list of messages.
The second is a list of ack tags.
"""
l = [self.buf.get(block=True)]
for i in range(max_batch_size-1):
try:
l.append(self.buf.get(block=False))
except queue.Empty:
break
msgs = [x[0] for x in l]
acks = [x[1] for x in l]
return (msgs, acks)
def iter_batch(self, max_batch_size:int) -> Iterator[Tuple[List,List]]:
while True:
yield self.one_batch(max_batch_size)
def ack(self, ack_tags: List):
"""
Report that you successfully processed messages.
Args:
ack_tags: a single ack tag or a list of ack tags of successful messages.
"""
if isinstance(ack_tags, list):
for tag in ack_tags:
self.channel.basic_ack(tag)
else:
self.channel.basic_ack(ack_tags)
def ack_upto(self, ack_tag):
"""
Report that you successfully processed all messages up to `ack_tag`.
Args:
ack_tag: the ack tag of the last successful message.
"""
self.channel.basic_ack(ack_tag, multiple=True)
def nack(self, ack_tags: List, requeue):
"""
Report that you fail to process messages.
Args:
ack_tags: a list of ack tags of successful messages.
"""
if isinstance(ack_tags, list):
for tag in ack_tags:
self.channel.basic_nack(tag)
else:
self.channel.basic_nack(ack_tags)
def nack_upto(self, ack_tag, requeue):
"""
Report that you fail to process all messages up to `ack_tag`.
Args:
ack_tag: the ack tag of the last successful message.
"""
self.channel.basic_nack(ack_tag, multiple=True, requeue=requeue)
class RabbitProducer(object):
"""
A RabbitMQ consumer that provides data in batch.
If channel is given, host and port are ignored.
If channel is not given, host and port are used to create a new channel.
Args:
channel (BlockingChannel): the channel instance with ack enabled.
queue (str): the name of a queue.
"""
def __init__(self, queue: str, host: str = 'localhost', port: int = 5672,
channel: BlockingChannel = None):
if channel is None:
self.conn = BlockingConnection(pika.ConnectionParameters(host=host, port=port))
self.channel = self.conn.channel()
else:
self.conn = None
self.channel = channel
self.queue = queue
self.channel.queue_declare(queue=queue)
# make sure deliveries
self.channel.confirm_delivery()
def send(self, msg: str) -> bool:
"""
send the message. (sync)
Args:
msg: a single msg(str)
Return:
bool: True on success
"""
return self.channel.basic_publish(
'',
self.queue,
msg,
properties=pika.BasicProperties(content_type='text/plain',
delivery_mode=2), # persistent
mandatory=True)
def close(self):
"""
Close the connection.
After the call to this method, you cannot `send`.
"""
if self.conn is not None:
self.conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment