Skip to content

Instantly share code, notes, and snippets.

@rohittiwari-dev
Last active July 11, 2025 10:43
Show Gist options
  • Select an option

  • Save rohittiwari-dev/813f2f454a0d12a7df6b67277ec845bf to your computer and use it in GitHub Desktop.

Select an option

Save rohittiwari-dev/813f2f454a0d12a7df6b67277ec845bf to your computer and use it in GitHub Desktop.
RabbitMQ High-Performance EC2 Deployment Guide

🐰 RabbitMQ Complete Deployment Guide

πŸš€ Self-Hosted on AWS EC2 for Beginners & Professionals

πŸ“‹ Table of Contents

  1. 🌟 Overview
  2. πŸ“š Prerequisites
  3. ☁️ EC2 Instance Setup
  4. πŸ”§ RabbitMQ Installation
  5. ⚑ Single Instance Setup
  6. πŸ”— Cluster Setup
  7. πŸ›‘οΈ Security & Management
  8. πŸ’» Code Implementation
  9. πŸ“Š Monitoring & Troubleshooting
  10. πŸ’° Cost Optimization

🌟 Overview

🎯 What is RabbitMQ? Think of RabbitMQ as a super-smart postal service for your applications! It helps different parts of your software talk to each other by sending messages back and forth.

πŸš€ Key Benefits

  • ⚑ Fast: Sub-millisecond message delivery
  • πŸ”’ High Volume: Handle 100,000+ messages per second
  • πŸ’° Cost Effective: Save 60-80% vs managed services
  • πŸ“ˆ Scalable: Grow from 1 to 100+ servers easily
  • πŸ‘₯ Multi-Language: Works with Java, Node.js, Python, and more!

πŸŽͺ What You'll Learn

By the end of this guide, you'll be able to:

  • βœ… Set up RabbitMQ on AWS EC2
  • βœ… Create both single and cluster deployments
  • βœ… Write code in Java, Node.js/TypeScript, and Python
  • βœ… Monitor and troubleshoot your setup
  • βœ… Optimize for performance and cost

πŸ“š Prerequisites

πŸ‘Ά For Beginners

Don't worry! You just need to know:

  • πŸ–±οΈ How to use a computer and web browser
  • πŸ“ Very basic typing in terminal/command prompt
  • 🌐 What a website is (we'll teach the rest!)

πŸ§‘β€πŸ’» For Professionals

  • πŸ”§ AWS CLI configured
  • πŸ”‘ SSH key pair setup
  • πŸ’» Basic Linux commands
  • πŸ“¦ Package managers (npm, pip, maven)

πŸ› οΈ Tools You'll Need

  • πŸ–₯️ Computer with internet connection
  • πŸ†” AWS Account (free tier is fine!)
  • πŸ“ Text editor (VS Code recommended)
  • β˜• Your favorite beverage for motivation!

☁️ EC2 Instance Setup

🎯 Step 1: Choose Your Instance Size

Think of EC2 instances like different sizes of computers you can rent from Amazon!

🏠 Instance Size Guide

🏷️ Instance Type 🧠 CPU πŸ’Ύ RAM πŸ“Š Best For πŸ’° Cost/Month
t3.medium 2 cores 4 GB πŸ‘Ά Learning/Testing ~$30
c5n.large 2 cores 5.25 GB πŸš€ Small Production ~$80
c5n.xlarge 4 cores 10.5 GB 🏒 Medium Business ~$160
c5n.2xlarge 8 cores 21 GB 🏭 High Traffic ~$320
c5n.4xlarge 16 cores 42 GB 🌐 Enterprise ~$640

πŸ’‘ Beginner Tip

Start with t3.medium for learning, then upgrade to c5n.large for production!

πŸ’‘ Beginner Tip

Start with t3.medium for learning, then upgrade to c5n.large for production!

πŸ” Step 2: Create Security Group

A security group is like a firewall that controls who can talk to your server.

🌐 Using AWS Web Console (Easy Way)

  1. 🌐 Go to AWS EC2 Console
  2. πŸ” Click "Security Groups" in the left menu
  3. βž• Click "Create Security Group"
  4. πŸ“ Fill in these details:
Security Group Name: rabbitmq-sg
Description: RabbitMQ Security Group
  1. βž• Add these rules by clicking "Add Rule":
πŸšͺ Type πŸ”Œ Port πŸ“ Source πŸ“ Description
SSH 22 My IP For remote access
Custom TCP 5672 Anywhere (0.0.0.0/0) RabbitMQ AMQP
Custom TCP 15672 Anywhere (0.0.0.0/0) RabbitMQ Management UI
Custom TCP 25672 Anywhere (0.0.0.0/0) RabbitMQ Cluster

πŸ’» Using AWS CLI (Advanced Way)

# Create security group
aws ec2 create-security-group \
  --group-name rabbitmq-sg \
  --description "RabbitMQ Security Group"

# Add SSH access (replace YOUR_IP with your actual IP)
aws ec2 authorize-security-group-ingress \
  --group-name rabbitmq-sg \
  --protocol tcp \
  --port 22 \
  --cidr YOUR_IP/32

# Add RabbitMQ AMQP port
aws ec2 authorize-security-group-ingress \
  --group-name rabbitmq-sg \
  --protocol tcp \
  --port 5672 \
  --cidr 0.0.0.0/0

# Add Management UI port
aws ec2 authorize-security-group-ingress \
  --group-name rabbitmq-sg \
  --protocol tcp \
  --port 15672 \
  --cidr 0.0.0.0/0

# Add Cluster communication port
aws ec2 authorize-security-group-ingress \
  --group-name rabbitmq-sg \
  --protocol tcp \
  --port 25672 \
  --cidr 0.0.0.0/0

πŸš€ Step 3: Launch Your Instance

🌐 Using AWS Web Console (Recommended for Beginners)

  1. 🌐 Go to AWS EC2 Console
  2. πŸš€ Click "Launch Instance"
  3. πŸ–ΌοΈ Choose "Amazon Linux 2 AMI" (it's free!)
  4. πŸ’» Select your instance type (t3.medium for beginners)
  5. πŸ” Choose your security group (rabbitmq-sg)
  6. πŸ”‘ Create or select a key pair
  7. πŸ’Ύ Configure storage (20 GB is fine to start)
  8. πŸš€ Click "Launch Instance"

πŸ’» Using AWS CLI (Advanced)

# Launch instance
aws ec2 run-instances \
  --image-id ami-0abcdef1234567890 \
  --count 1 \
  --instance-type t3.medium \
  --key-name your-key-pair \
  --security-groups rabbitmq-sg \
  --tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value=RabbitMQ-Server}]'

πŸ“± Step 4: Connect to Your Instance

Once your instance is running (this takes 2-3 minutes), you can connect to it:

πŸͺŸ For Windows Users

  1. πŸ“₯ Download PuTTY
  2. πŸ”‘ Convert your .pem key to .ppk using PuTTYgen
  3. 🌐 Connect using your instance's public IP

🍎 For Mac/Linux Users

# Make your key file secure
chmod 400 your-key.pem

# Connect to your instance
ssh -i your-key.pem ec2-user@YOUR_INSTANCE_IP

πŸ”§ RabbitMQ Installation

Great! πŸŽ‰ Now you're connected to your server. Let's install RabbitMQ!

πŸ› οΈ Step 1: Update Your System

# Update all packages (this is like updating your phone!)
sudo yum update -y

# Install useful tools
sudo yum install -y wget curl vim htop net-tools

🐰 Step 2: Install Erlang

RabbitMQ is built on Erlang (think of it as RabbitMQ's engine):

# Add Erlang Solutions repository
wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm
sudo rpm -Uvh erlang-solutions-2.0-1.noarch.rpm

# Install Erlang
sudo yum install -y erlang

# Verify installation
erl -version

πŸš€ Step 3: Install RabbitMQ

# Add RabbitMQ repository
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash

# Install RabbitMQ
sudo yum install -y rabbitmq-server

# Start RabbitMQ service
sudo systemctl start rabbitmq-server

# Make it start automatically when server restarts
sudo systemctl enable rabbitmq-server

# Check if it's running
sudo systemctl status rabbitmq-server

πŸŽ›οΈ Step 4: Enable Management UI

The Management UI is like a dashboard for your RabbitMQ:

# Enable the management plugin
sudo rabbitmq-plugins enable rabbitmq_management

# Create an admin user
sudo rabbitmqctl add_user admin mypassword123

# Make the user an administrator
sudo rabbitmqctl set_user_tags admin administrator

# Give the user permissions
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

# Remove the default guest user (for security)
sudo rabbitmqctl delete_user guest

# Restart RabbitMQ to apply changes
sudo systemctl restart rabbitmq-server

🌐 Step 5: Access Management UI

  1. 🌐 Open your web browser
  2. πŸ“ Go to: http://YOUR_INSTANCE_IP:15672
  3. πŸ”‘ Login with:
    • Username: admin
    • Password: mypassword123

You should see a beautiful dashboard! πŸŽ‰


⚑ Single Instance Setup

Perfect for most applications and beginners! This setup can handle thousands of messages per second.

πŸ”§ Basic Configuration

Let's optimize your single RabbitMQ instance:

# Create RabbitMQ configuration file
sudo tee /etc/rabbitmq/rabbitmq.conf << 'EOF'
# 🌐 Network Configuration
listeners.tcp.default = 5672
management.tcp.port = 15672
management.tcp.ip = 0.0.0.0

# ⚑ Performance Settings
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.8
disk_free_limit.relative = 2.0

# πŸ“ Logging
log.console = true
log.console.level = info
log.file = /var/log/rabbitmq/rabbitmq.log
log.file.level = info

# πŸ’ͺ Performance Tuning
collect_statistics_interval = 10000
delegate_count = 16
EOF

# Restart to apply changes
sudo systemctl restart rabbitmq-server

πŸš€ Performance Optimization

Make your single instance super fast:

# System optimizations
sudo tee -a /etc/security/limits.conf << 'EOF'
rabbitmq soft nofile 65536
rabbitmq hard nofile 65536
EOF

# Network optimizations
sudo tee -a /etc/sysctl.conf << 'EOF'
# Better network performance
net.core.rmem_default = 262144
net.core.rmem_max = 16777216
net.core.wmem_default = 262144
net.core.wmem_max = 16777216
vm.swappiness = 10
EOF

# Apply changes
sudo sysctl -p

βœ… Test Your Setup

# Check if RabbitMQ is working
sudo rabbitmqctl status

# Check if you can create a queue
sudo rabbitmqctl list_queues

# Check memory usage
sudo rabbitmqctl status | grep memory

πŸ”— Cluster Setup

Want to handle millions of messages? Let's create a RabbitMQ cluster! πŸ’ͺ

🎯 Why Use a Cluster?

  • πŸš€ Higher Performance: Distribute load across multiple servers
  • πŸ›‘οΈ High Availability: If one server dies, others keep working
  • πŸ“ˆ Scalability: Add more servers as you grow

πŸ—οΈ Architecture Overview

🌐 Load Balancer
     |
   β”Œβ”€β”΄β”€β”
   β”‚   β”‚
🐰 Node1  🐰 Node2  🐰 Node3

πŸš€ Step 1: Launch Multiple Instances

Follow the same steps as single instance setup, but create 3 instances:

  • rabbitmq-node1
  • rabbitmq-node2
  • rabbitmq-node3

πŸ”— Step 2: Configure Cluster

On All Nodes

# Set the same Erlang cookie (this is like a shared password)
sudo systemctl stop rabbitmq-server
sudo tee /var/lib/rabbitmq/.erlang.cookie << 'EOF'
MYSECRETCOOKIESTRING12345
EOF
sudo chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
sudo chmod 600 /var/lib/rabbitmq/.erlang.cookie

On Node 1 (Master)

# Start RabbitMQ
sudo systemctl start rabbitmq-server

# This will be our main node
sudo rabbitmqctl cluster_status

On Node 2 & 3

# Start RabbitMQ
sudo systemctl start rabbitmq-server

# Join the cluster (replace NODE1_IP with actual IP)
sudo rabbitmqctl stop_app
sudo rabbitmqctl join_cluster rabbit@NODE1_PRIVATE_IP
sudo rabbitmqctl start_app

# Check cluster status
sudo rabbitmqctl cluster_status

πŸ”§ Cluster Configuration

Create optimized configuration for cluster:

# On all nodes
sudo tee /etc/rabbitmq/rabbitmq.conf << 'EOF'
# πŸ”— Cluster Configuration
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config

# 🌐 Network settings
listeners.tcp.default = 5672
management.tcp.port = 15672

# ⚑ Performance for cluster
vm_memory_high_watermark.relative = 0.6
channel_max = 2047
heartbeat = 60

# πŸ”— Cluster communication
cluster_formation.classic_config.nodes.1 = rabbit@NODE1_PRIVATE_IP
cluster_formation.classic_config.nodes.2 = rabbit@NODE2_PRIVATE_IP
cluster_formation.classic_config.nodes.3 = rabbit@NODE3_PRIVATE_IP
EOF

βš–οΈ Step 3: Setup Load Balancer

Using HAProxy (Recommended)

Install on a separate instance:

# Install HAProxy
sudo yum install -y haproxy

# Configure HAProxy
sudo tee /etc/haproxy/haproxy.cfg << 'EOF'
global
    daemon

defaults
    mode http
    timeout connect 5000ms
    timeout client 50000ms
    timeout server 50000ms

# 🌐 RabbitMQ Management UI
frontend rabbitmq_management
    bind *:15672
    default_backend rabbitmq_management_backend

backend rabbitmq_management_backend
    balance roundrobin
    server rabbit1 NODE1_IP:15672 check
    server rabbit2 NODE2_IP:15672 check
    server rabbit3 NODE3_IP:15672 check

# 🐰 RabbitMQ AMQP
frontend rabbitmq_amqp
    bind *:5672
    mode tcp
    default_backend rabbitmq_amqp_backend

backend rabbitmq_amqp_backend
    mode tcp
    balance roundrobin
    server rabbit1 NODE1_IP:5672 check
    server rabbit2 NODE2_IP:5672 check
    server rabbit3 NODE3_IP:5672 check
EOF

# Start HAProxy
sudo systemctl start haproxy
sudo systemctl enable haproxy

πŸ›‘οΈ Security & Management

πŸ” SSL/TLS Setup

For production, always use encryption:

# Create SSL directory
sudo mkdir -p /etc/rabbitmq/ssl

# Generate self-signed certificate (for testing)
sudo openssl req -new -x509 -days 365 -nodes \
  -out /etc/rabbitmq/ssl/server_certificate.pem \
  -keyout /etc/rabbitmq/ssl/server_key.pem \
  -subj "/C=US/ST=CA/L=SF/O=MyCompany/CN=rabbitmq.mycompany.com"

# Set permissions
sudo chown -R rabbitmq:rabbitmq /etc/rabbitmq/ssl
sudo chmod 600 /etc/rabbitmq/ssl/server_key.pem

# Update configuration to use SSL
sudo tee -a /etc/rabbitmq/rabbitmq.conf << 'EOF'
# πŸ” SSL Configuration
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/ssl/server_certificate.pem
ssl_options.certfile = /etc/rabbitmq/ssl/server_certificate.pem
ssl_options.keyfile = /etc/rabbitmq/ssl/server_key.pem
ssl_options.verify = verify_none
ssl_options.fail_if_no_peer_cert = false

# 🌐 Management UI SSL
management.ssl.port = 15671
management.ssl.cacertfile = /etc/rabbitmq/ssl/server_certificate.pem
management.ssl.certfile = /etc/rabbitmq/ssl/server_certificate.pem
management.ssl.keyfile = /etc/rabbitmq/ssl/server_key.pem
EOF

# Restart RabbitMQ
sudo systemctl restart rabbitmq-server

πŸ‘₯ User Management

Create different users for different purposes:

# Create application user
sudo rabbitmqctl add_user app_user secure_password123
sudo rabbitmqctl set_permissions -p / app_user ".*" ".*" ".*"

# Create read-only user for monitoring
sudo rabbitmqctl add_user monitor_user monitor_pass123
sudo rabbitmqctl set_user_tags monitor_user monitoring
sudo rabbitmqctl set_permissions -p / monitor_user "" "" ".*"

# List all users
sudo rabbitmqctl list_users

πŸ’» Code Implementation

Now for the fun part! Let's write code to use RabbitMQ in different languages.

β˜• Java Examples

πŸ› οΈ Setup

First, add RabbitMQ dependency to your project:

Maven (pom.xml)

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.18.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.7</version>
    </dependency>
</dependencies>

Gradle (build.gradle)

dependencies {
    implementation 'com.rabbitmq:amqp-client:5.18.0'
    implementation 'org.slf4j:slf4j-simple:2.0.7'
}

πŸ”Œ Connection Manager

// RabbitMQConnection.java
package com.example.rabbitmq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQConnection {
    private Connection connection;
    private Channel channel;
    private ConnectionFactory factory;

    public RabbitMQConnection(String host, int port, String username, String password) {
        factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);

        // πŸš€ Performance optimizations
        factory.setRequestedHeartbeat(60);
        factory.setConnectionTimeout(10000);
        factory.setRequestedChannelMax(2047);
        factory.setRequestedFrameMax(131072);
    }

    public void connect() throws IOException, TimeoutException {
        System.out.println("πŸ”Œ Connecting to RabbitMQ...");
        connection = factory.newConnection();
        channel = connection.createChannel();

        // πŸš€ Optimize channel
        channel.basicQos(1000); // Prefetch count

        System.out.println("βœ… Connected successfully!");
    }

    public void disconnect() throws IOException, TimeoutException {
        if (channel != null && channel.isOpen()) {
            channel.close();
        }
        if (connection != null && connection.isOpen()) {
            connection.close();
        }
        System.out.println("πŸ‘‹ Disconnected from RabbitMQ");
    }

    public Channel getChannel() {
        return channel;
    }
}

πŸ“€ Publisher (Sender)

// MessagePublisher.java
package com.example.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class MessagePublisher {
    private RabbitMQConnection connection;
    private Channel channel;

    public MessagePublisher(RabbitMQConnection connection) {
        this.connection = connection;
        this.channel = connection.getChannel();
    }

    public void publishToQueue(String queueName, String message) throws IOException {
        // πŸ—οΈ Create queue if it doesn't exist
        Map<String, Object> args = new HashMap<>();
        args.put("x-max-priority", 10);
        args.put("x-queue-mode", "lazy"); // πŸš€ For high throughput

        channel.queueDeclare(queueName, true, false, false, args);

        // πŸ“€ Publish message
        channel.basicPublish("", queueName,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes(StandardCharsets.UTF_8));

        System.out.println("πŸ“€ Sent: " + message);
    }

    public void publishToExchange(String exchangeName, String routingKey, String message)
            throws IOException {
        // πŸ—οΈ Create exchange if it doesn't exist
        channel.exchangeDeclare(exchangeName, "topic", true);

        // πŸ“€ Publish message
        channel.basicPublish(exchangeName, routingKey,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes(StandardCharsets.UTF_8));

        System.out.println("πŸ“€ Sent to exchange '" + exchangeName + "': " + message);
    }
}

πŸ“₯ Consumer (Receiver)

// MessageConsumer.java
package com.example.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class MessageConsumer {
    private RabbitMQConnection connection;
    private Channel channel;

    public MessageConsumer(RabbitMQConnection connection) {
        this.connection = connection;
        this.channel = connection.getChannel();
    }

    public void consumeFromQueue(String queueName, MessageHandler handler) throws IOException {
        // πŸ—οΈ Create queue if it doesn't exist
        Map<String, Object> args = new HashMap<>();
        args.put("x-max-priority", 10);
        args.put("x-queue-mode", "lazy");

        channel.queueDeclare(queueName, true, false, false, args);

        System.out.println("πŸ‘‚ Waiting for messages from queue: " + queueName);

        // πŸ“₯ Set up consumer
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            try {
                System.out.println("πŸ“₯ Received: " + message);

                // πŸ”„ Process message
                handler.handle(message);

                // βœ… Acknowledge message
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

            } catch (Exception e) {
                System.err.println("❌ Error processing message: " + e.getMessage());
                // πŸ”„ Reject and requeue
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true);
            }
        };

        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
    }

    @FunctionalInterface
    public interface MessageHandler {
        void handle(String message) throws Exception;
    }
}

πŸƒβ€β™‚οΈ Example Usage

// Main.java
package com.example.rabbitmq;

public class Main {
    public static void main(String[] args) {
        try {
            // πŸ”Œ Connect to RabbitMQ
            RabbitMQConnection connection = new RabbitMQConnection(
                "YOUR_INSTANCE_IP", 5672, "admin", "mypassword123"
            );
            connection.connect();

            // πŸ“€ Publisher example
            MessagePublisher publisher = new MessagePublisher(connection);
            publisher.publishToQueue("hello-queue", "Hello from Java! πŸŽ‰");

            // πŸ“₯ Consumer example
            MessageConsumer consumer = new MessageConsumer(connection);
            consumer.consumeFromQueue("hello-queue", message -> {
                // πŸ”„ Your business logic here
                System.out.println("πŸ”„ Processing: " + message);
                Thread.sleep(1000); // Simulate work
                System.out.println("βœ… Processed: " + message);
            });

            // πŸ›‘ Keep program running
            System.out.println("Press CTRL+C to exit...");
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                try {
                    connection.disconnect();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }));

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

πŸš€ High-Performance Batch Publisher

// BatchPublisher.java
package com.example.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class BatchPublisher {
    private RabbitMQConnection connection;
    private Channel channel;
    private List<String> messageQueue;
    private ScheduledExecutorService scheduler;
    private int batchSize = 100;
    private int flushInterval = 10; // milliseconds

    public BatchPublisher(RabbitMQConnection connection) {
        this.connection = connection;
        this.channel = connection.getChannel();
        this.messageQueue = new ArrayList<>();
        this.scheduler = Executors.newScheduledThreadPool(1);

        // πŸ•’ Schedule periodic flush
        scheduler.scheduleAtFixedRate(this::flushBatch, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
    }

    public synchronized void publishBatch(String queueName, String message) throws IOException {
        messageQueue.add(queueName + ":" + message);

        if (messageQueue.size() >= batchSize) {
            flushBatch();
        }
    }

    private synchronized void flushBatch() {
        if (messageQueue.isEmpty()) return;

        try {
            List<String> batch = new ArrayList<>(messageQueue);
            messageQueue.clear();

            for (String item : batch) {
                String[] parts = item.split(":", 2);
                String queueName = parts[0];
                String message = parts[1];

                channel.basicPublish("", queueName,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes(StandardCharsets.UTF_8));
            }

            System.out.println("πŸš€ Published batch of " + batch.size() + " messages");

        } catch (IOException e) {
            System.err.println("❌ Error flushing batch: " + e.getMessage());
        }
    }

    public void shutdown() {
        flushBatch();
        scheduler.shutdown();
    }
}

🟒 Node.js/TypeScript Examples

🟒 Node.js/TypeScript Examples

πŸ› οΈ Setup

First, let's create a new Node.js project:

# Create new project directory
mkdir rabbitmq-nodejs
cd rabbitmq-nodejs

# Initialize npm project
npm init -y

# Install dependencies
npm install amqplib
npm install -D @types/amqplib typescript ts-node @types/node

# Create TypeScript config
npx tsc --init

πŸ“ Project Structure

rabbitmq-nodejs/
β”œβ”€β”€ package.json
β”œβ”€β”€ tsconfig.json
β”œβ”€β”€ src/
β”‚   β”œβ”€β”€ connection.ts
β”‚   β”œβ”€β”€ publisher.ts
β”‚   β”œβ”€β”€ consumer.ts
β”‚   └── examples/
β”‚       β”œβ”€β”€ simple-example.ts
β”‚       └── advanced-example.ts

πŸ”Œ Connection Manager

// src/connection.ts
import amqp, { Connection, Channel } from "amqplib";

export interface RabbitMQConfig {
	host: string;
	port: number;
	username: string;
	password: string;
	vhost?: string;
	ssl?: boolean;
	heartbeat?: number;
	connectionTimeout?: number;
}

export class RabbitMQConnection {
	private connection: Connection | null = null;
	private channel: Channel | null = null;
	private config: RabbitMQConfig;

	constructor(config: RabbitMQConfig) {
		this.config = {
			vhost: "/",
			ssl: false,
			heartbeat: 60,
			connectionTimeout: 10000,
			...config,
		};
	}

	async connect(): Promise<void> {
		try {
			console.log("πŸ”Œ Connecting to RabbitMQ...");

			const connectionUrl = this.buildConnectionUrl();

			this.connection = await amqp.connect(connectionUrl, {
				heartbeat: this.config.heartbeat,
				timeout: this.config.connectionTimeout,
			});

			// πŸ”” Event listeners
			this.connection.on("error", (err) => {
				console.error("❌ RabbitMQ connection error:", err);
			});

			this.connection.on("close", () => {
				console.log("πŸ‘‹ RabbitMQ connection closed");
			});

			this.channel = await this.connection.createChannel();

			// πŸš€ Optimize channel settings
			await this.channel.prefetch(1000);

			console.log("βœ… Successfully connected to RabbitMQ");
		} catch (error) {
			console.error("❌ Failed to connect to RabbitMQ:", error);
			throw error;
		}
	}

	async disconnect(): Promise<void> {
		try {
			if (this.channel) {
				await this.channel.close();
			}
			if (this.connection) {
				await this.connection.close();
			}
			console.log("πŸ‘‹ Disconnected from RabbitMQ");
		} catch (error) {
			console.error("❌ Error disconnecting from RabbitMQ:", error);
		}
	}

	getChannel(): Channel {
		if (!this.channel) {
			throw new Error("❌ Channel not available. Call connect() first.");
		}
		return this.channel;
	}

	private buildConnectionUrl(): string {
		const protocol = this.config.ssl ? "amqps" : "amqp";
		return `${protocol}://${this.config.username}:${this.config.password}@${this.config.host}:${this.config.port}${this.config.vhost}`;
	}
}

πŸ“€ Publisher

// src/publisher.ts
import { Channel } from "amqplib";
import { RabbitMQConnection } from "./connection";

export interface PublishOptions {
	persistent?: boolean;
	priority?: number;
	expiration?: string;
	messageId?: string;
	timestamp?: number;
	headers?: Record<string, any>;
}

export class MessagePublisher {
	private connection: RabbitMQConnection;
	private channel: Channel;

	constructor(connection: RabbitMQConnection) {
		this.connection = connection;
		this.channel = connection.getChannel();
	}

	async publishToQueue(
		queueName: string,
		message: any,
		options: PublishOptions = {},
	): Promise<boolean> {
		try {
			// πŸ—οΈ Ensure queue exists
			await this.channel.assertQueue(queueName, {
				durable: true,
				autoDelete: false,
				arguments: {
					"x-max-priority": 10,
					"x-queue-mode": "lazy", // πŸš€ For high throughput
				},
			});

			const messageBuffer = Buffer.from(JSON.stringify(message));

			const publishOptions = {
				persistent: true,
				deliveryMode: 2,
				timestamp: Date.now(),
				messageId: this.generateMessageId(),
				...options,
			};

			const result = this.channel.sendToQueue(
				queueName,
				messageBuffer,
				publishOptions,
			);

			if (result) {
				console.log("πŸ“€ Message sent to queue:", queueName);
			}

			return result;
		} catch (error) {
			console.error("❌ Failed to publish message:", error);
			throw error;
		}
	}

	async publishToExchange(
		exchangeName: string,
		routingKey: string,
		message: any,
		options: PublishOptions = {},
	): Promise<boolean> {
		try {
			// πŸ—οΈ Ensure exchange exists
			await this.channel.assertExchange(exchangeName, "topic", {
				durable: true,
				autoDelete: false,
			});

			const messageBuffer = Buffer.from(JSON.stringify(message));

			const publishOptions = {
				persistent: true,
				deliveryMode: 2,
				timestamp: Date.now(),
				messageId: this.generateMessageId(),
				...options,
			};

			const result = this.channel.publish(
				exchangeName,
				routingKey,
				messageBuffer,
				publishOptions,
			);

			if (result) {
				console.log(
					`πŸ“€ Message sent to exchange '${exchangeName}' with routing key '${routingKey}'`,
				);
			}

			return result;
		} catch (error) {
			console.error("❌ Failed to publish message to exchange:", error);
			throw error;
		}
	}

	private generateMessageId(): string {
		return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
	}
}

πŸ“₯ Consumer

// src/consumer.ts
import { Channel, Message } from "amqplib";
import { RabbitMQConnection } from "./connection";

export interface ConsumeOptions {
	noAck?: boolean;
	exclusive?: boolean;
	priority?: number;
	consumerTag?: string;
}

export type MessageHandler = (
	message: any,
	originalMessage: Message,
) => Promise<void>;

export class MessageConsumer {
	private connection: RabbitMQConnection;
	private channel: Channel;

	constructor(connection: RabbitMQConnection) {
		this.connection = connection;
		this.channel = connection.getChannel();
	}

	async consumeFromQueue(
		queueName: string,
		handler: MessageHandler,
		options: ConsumeOptions = {},
	): Promise<void> {
		try {
			// πŸ—οΈ Ensure queue exists
			await this.channel.assertQueue(queueName, {
				durable: true,
				autoDelete: false,
				arguments: {
					"x-max-priority": 10,
					"x-queue-mode": "lazy",
				},
			});

			const consumeOptions = {
				noAck: false,
				...options,
			};

			console.log(`πŸ‘‚ Waiting for messages from queue: ${queueName}`);

			await this.channel.consume(
				queueName,
				async (msg) => {
					if (msg) {
						try {
							const messageContent = JSON.parse(
								msg.content.toString(),
							);
							console.log("πŸ“₯ Received message:", messageContent);

							// πŸ”„ Process the message
							await handler(messageContent, msg);

							if (!consumeOptions.noAck) {
								this.channel.ack(msg);
								console.log("βœ… Message acknowledged");
							}
						} catch (error) {
							console.error(
								"❌ Error processing message:",
								error,
							);

							if (!consumeOptions.noAck) {
								// πŸ”„ Reject and requeue the message
								this.channel.reject(msg, true);
								console.log("πŸ”„ Message rejected and requeued");
							}
						}
					}
				},
				consumeOptions,
			);
		} catch (error) {
			console.error("❌ Failed to start consuming:", error);
			throw error;
		}
	}
}

πŸƒβ€β™‚οΈ Simple Example

// src/examples/simple-example.ts
import { RabbitMQConnection, MessagePublisher, MessageConsumer } from "../";

async function simpleExample() {
	// πŸ”Œ Configuration
	const config = {
		host: "YOUR_INSTANCE_IP",
		port: 5672,
		username: "admin",
		password: "mypassword123",
	};

	// πŸ”— Create connection
	const connection = new RabbitMQConnection(config);
	await connection.connect();

	// πŸ“€ Create publisher
	const publisher = new MessagePublisher(connection);

	// πŸ“₯ Create consumer
	const consumer = new MessageConsumer(connection);

	// πŸ“€ Send a message
	await publisher.publishToQueue("hello-queue", {
		message: "Hello from Node.js! πŸŽ‰",
		timestamp: new Date(),
		from: "Node.js App",
	});

	// πŸ“₯ Consume messages
	await consumer.consumeFromQueue("hello-queue", async (message) => {
		// πŸ”„ Your business logic here
		console.log("πŸ”„ Processing message:", message);

		// Simulate some work
		await new Promise((resolve) => setTimeout(resolve, 1000));

		console.log("βœ… Message processed successfully!");
	});

	// πŸ›‘ Graceful shutdown
	process.on("SIGINT", async () => {
		console.log("\nπŸ›‘ Shutting down gracefully...");
		await connection.disconnect();
		process.exit(0);
	});

	console.log("πŸš€ Simple example running! Press CTRL+C to exit...");
}

// Run the example
simpleExample().catch(console.error);

πŸš€ High-Performance Example

// src/examples/advanced-example.ts
import { RabbitMQConnection, MessagePublisher, MessageConsumer } from "../";

class HighPerformanceBatchPublisher {
	private publisher: MessagePublisher;
	private messageQueue: Array<{ queueName: string; message: any }> = [];
	private batchSize = 100;
	private batchTimeout = 10; // milliseconds
	private batchTimer: NodeJS.Timeout | null = null;

	constructor(publisher: MessagePublisher) {
		this.publisher = publisher;
	}

	async publishBatch(queueName: string, message: any): Promise<void> {
		this.messageQueue.push({ queueName, message });

		if (this.messageQueue.length >= this.batchSize) {
			await this.flushBatch();
		} else if (!this.batchTimer) {
			this.batchTimer = setTimeout(async () => {
				await this.flushBatch();
			}, this.batchTimeout);
		}
	}

	private async flushBatch(): Promise<void> {
		if (this.batchTimer) {
			clearTimeout(this.batchTimer);
			this.batchTimer = null;
		}

		if (this.messageQueue.length === 0) return;

		const batch = this.messageQueue.splice(0, this.batchSize);

		try {
			const publishPromises = batch.map(({ queueName, message }) =>
				this.publisher.publishToQueue(queueName, message),
			);

			await Promise.all(publishPromises);
			console.log(`πŸš€ Published batch of ${batch.length} messages`);
		} catch (error) {
			console.error("❌ Error publishing batch:", error);
		}
	}

	async shutdown(): Promise<void> {
		await this.flushBatch();
	}
}

async function highPerformanceExample() {
	const config = {
		host: "YOUR_INSTANCE_IP",
		port: 5672,
		username: "admin",
		password: "mypassword123",
	};

	const connection = new RabbitMQConnection(config);
	await connection.connect();

	const publisher = new MessagePublisher(connection);
	const batchPublisher = new HighPerformanceBatchPublisher(publisher);

	console.log("πŸš€ Starting high-performance publishing...");

	// Publish 10,000 messages rapidly
	const startTime = Date.now();

	for (let i = 0; i < 10000; i++) {
		await batchPublisher.publishBatch("high-throughput-queue", {
			id: i,
			timestamp: Date.now(),
			data: `High-speed message ${i}`,
			batch: Math.floor(i / 100),
		});

		if (i % 1000 === 0) {
			console.log(`πŸ“Š Queued ${i} messages...`);
		}
	}

	await batchPublisher.shutdown();

	const endTime = Date.now();
	const duration = endTime - startTime;
	const messagesPerSecond = Math.round(10000 / (duration / 1000));

	console.log(`πŸŽ‰ Published 10,000 messages in ${duration}ms`);
	console.log(`πŸ“Š Rate: ${messagesPerSecond} messages/second`);

	await connection.disconnect();
}

// Run the example
highPerformanceExample().catch(console.error);

🐍 Python Examples

πŸ› οΈ Setup

First, let's set up our Python environment:

# Create virtual environment
python3 -m venv rabbitmq-python
cd rabbitmq-python

# Activate virtual environment
# On Windows:
Scripts\activate
# On Mac/Linux:
source bin/activate

# Install dependencies
pip install pika asyncio-pika aiofiles

# Create project structure
mkdir src examples
touch src/__init__.py
touch src/connection.py src/publisher.py src/consumer.py

πŸ“ Project Structure

rabbitmq-python/
β”œβ”€β”€ requirements.txt
β”œβ”€β”€ src/
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ connection.py
β”‚   β”œβ”€β”€ publisher.py
β”‚   └── consumer.py
└── examples/
    β”œβ”€β”€ simple_example.py
    └── async_example.py

πŸ”Œ Synchronous Connection Manager

# src/connection.py
import pika
import logging
from typing import Optional, Dict, Any

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RabbitMQConnection:
    """
    🐰 RabbitMQ Connection Manager
    Simple and reliable connection handling for RabbitMQ
    """

    def __init__(self, host: str, port: int = 5672, username: str = 'guest',
                 password: str = 'guest', virtual_host: str = '/'):
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.virtual_host = virtual_host

        self.connection: Optional[pika.BlockingConnection] = None
        self.channel: Optional[pika.channel.Channel] = None

    def connect(self) -> None:
        """πŸ”Œ Connect to RabbitMQ"""
        try:
            logger.info("πŸ”Œ Connecting to RabbitMQ...")

            # πŸ” Create credentials
            credentials = pika.PlainCredentials(self.username, self.password)

            # βš™οΈ Connection parameters
            parameters = pika.ConnectionParameters(
                host=self.host,
                port=self.port,
                virtual_host=self.virtual_host,
                credentials=credentials,
                heartbeat=600,  # 10 minutes
                blocked_connection_timeout=300,  # 5 minutes
                socket_timeout=10,
                retry_delay=5,
                connection_attempts=3
            )

            # πŸ”— Establish connection
            self.connection = pika.BlockingConnection(parameters)
            self.channel = self.connection.channel()

            # πŸš€ Optimize channel
            self.channel.basic_qos(prefetch_count=1000)

            logger.info("βœ… Successfully connected to RabbitMQ")

        except Exception as e:
            logger.error(f"❌ Failed to connect to RabbitMQ: {e}")
            raise

    def disconnect(self) -> None:
        """πŸ‘‹ Disconnect from RabbitMQ"""
        try:
            if self.channel and not self.channel.is_closed:
                self.channel.close()

            if self.connection and not self.connection.is_closed:
                self.connection.close()

            logger.info("πŸ‘‹ Disconnected from RabbitMQ")

        except Exception as e:
            logger.error(f"❌ Error disconnecting from RabbitMQ: {e}")

    def get_channel(self) -> pika.channel.Channel:
        """πŸ“‘ Get the channel"""
        if not self.channel or self.channel.is_closed:
            raise Exception("❌ Channel not available. Call connect() first.")
        return self.channel

    def __enter__(self):
        """πŸ”— Context manager entry"""
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """πŸ”š Context manager exit"""
        self.disconnect()

πŸ“€ Publisher

# src/publisher.py
import json
import uuid
from datetime import datetime
from typing import Any, Dict, Optional
from .connection import RabbitMQConnection, logger

class MessagePublisher:
    """
    πŸ“€ RabbitMQ Message Publisher
    Easy-to-use publisher for sending messages
    """

    def __init__(self, connection: RabbitMQConnection):
        self.connection = connection
        self.channel = connection.get_channel()

    def publish_to_queue(self, queue_name: str, message: Any,
                        priority: int = 0, persistent: bool = True) -> bool:
        """πŸ“€ Publish message to a queue"""
        try:
            # πŸ—οΈ Declare queue with optimizations
            self.channel.queue_declare(
                queue=queue_name,
                durable=True,
                auto_delete=False,
                arguments={
                    'x-max-priority': 10,
                    'x-queue-mode': 'lazy'  # πŸš€ High throughput mode
                }
            )

            # πŸ“¦ Prepare message
            message_body = json.dumps({
                'id': str(uuid.uuid4()),
                'timestamp': datetime.now().isoformat(),
                'data': message
            })

            # πŸ“€ Publish message
            result = self.channel.basic_publish(
                exchange='',
                routing_key=queue_name,
                body=message_body,
                properties=pika.BasicProperties(
                    delivery_mode=2 if persistent else 1,  # Persistent or not
                    priority=priority,
                    message_id=str(uuid.uuid4()),
                    timestamp=int(datetime.now().timestamp())
                )
            )

            logger.info(f"πŸ“€ Message sent to queue: {queue_name}")
            return True

        except Exception as e:
            logger.error(f"❌ Failed to publish message: {e}")
            return False

    def publish_to_exchange(self, exchange_name: str, routing_key: str,
                           message: Any, priority: int = 0) -> bool:
        """πŸ“€ Publish message to an exchange"""
        try:
            # πŸ—οΈ Declare exchange
            self.channel.exchange_declare(
                exchange=exchange_name,
                exchange_type='topic',
                durable=True,
                auto_delete=False
            )

            # πŸ“¦ Prepare message
            message_body = json.dumps({
                'id': str(uuid.uuid4()),
                'timestamp': datetime.now().isoformat(),
                'routing_key': routing_key,
                'data': message
            })

            # πŸ“€ Publish message
            self.channel.basic_publish(
                exchange=exchange_name,
                routing_key=routing_key,
                body=message_body,
                properties=pika.BasicProperties(
                    delivery_mode=2,  # Persistent
                    priority=priority,
                    message_id=str(uuid.uuid4()),
                    timestamp=int(datetime.now().timestamp())
                )
            )

            logger.info(f"πŸ“€ Message sent to exchange '{exchange_name}' with routing key '{routing_key}'")
            return True

        except Exception as e:
            logger.error(f"❌ Failed to publish to exchange: {e}")
            return False

πŸ“₯ Consumer

# src/consumer.py
import json
import signal
import sys
from typing import Callable, Any
from .connection import RabbitMQConnection, logger

MessageHandler = Callable[[Dict[str, Any]], None]

class MessageConsumer:
    """
    πŸ“₯ RabbitMQ Message Consumer
    Easy-to-use consumer for receiving messages
    """

    def __init__(self, connection: RabbitMQConnection):
        self.connection = connection
        self.channel = connection.get_channel()
        self.consuming = False

    def consume_from_queue(self, queue_name: str, handler: MessageHandler,
                          auto_ack: bool = False) -> None:
        """πŸ“₯ Consume messages from a queue"""
        try:
            # πŸ—οΈ Declare queue
            self.channel.queue_declare(
                queue=queue_name,
                durable=True,
                auto_delete=False,
                arguments={
                    'x-max-priority': 10,
                    'x-queue-mode': 'lazy'
                }
            )

            def callback(ch, method, properties, body):
                """πŸ”„ Message callback handler"""
                try:
                    # πŸ“¦ Parse message
                    message = json.loads(body.decode('utf-8'))
                    logger.info(f"πŸ“₯ Received message: {message.get('id', 'unknown')}")

                    # πŸ”„ Process message
                    handler(message)

                    # βœ… Acknowledge message
                    if not auto_ack:
                        ch.basic_ack(delivery_tag=method.delivery_tag)
                        logger.info("βœ… Message acknowledged")

                except json.JSONDecodeError as e:
                    logger.error(f"❌ Failed to parse message: {e}")
                    if not auto_ack:
                        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

                except Exception as e:
                    logger.error(f"❌ Error processing message: {e}")
                    if not auto_ack:
                        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)

            # πŸ“₯ Set up consumer
            self.channel.basic_consume(
                queue=queue_name,
                on_message_callback=callback,
                auto_ack=auto_ack
            )

            logger.info(f"πŸ‘‚ Waiting for messages from queue: {queue_name}")
            logger.info("πŸ›‘ Press CTRL+C to exit")

            # πŸ”„ Start consuming
            self.consuming = True

            # πŸ›‘ Handle graceful shutdown
            def signal_handler(sig, frame):
                logger.info("πŸ›‘ Stopping consumer...")
                self.consuming = False
                self.channel.stop_consuming()

            signal.signal(signal.SIGINT, signal_handler)
            signal.signal(signal.SIGTERM, signal_handler)

            # πŸ”„ Start consuming loop
            while self.consuming:
                try:
                    self.channel.start_consuming()
                except KeyboardInterrupt:
                    logger.info("πŸ›‘ KeyboardInterrupt received")
                    break
                except Exception as e:
                    logger.error(f"❌ Consumer error: {e}")
                    break

            logger.info("πŸ‘‹ Consumer stopped")

        except Exception as e:
            logger.error(f"❌ Failed to start consuming: {e}")
            raise

πŸƒβ€β™‚οΈ Simple Example

# examples/simple_example.py
import sys
import os
import time

# Add src to path
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src'))

from connection import RabbitMQConnection
from publisher import MessagePublisher
from consumer import MessageConsumer

def message_handler(message):
    """πŸ”„ Handle incoming messages"""
    print(f"πŸ”„ Processing message: {message}")

    # Simulate some work
    time.sleep(1)

    print(f"βœ… Processed message: {message.get('id', 'unknown')}")

def main():
    """πŸš€ Main function"""
    # πŸ”Œ Configuration
    config = {
        'host': 'YOUR_INSTANCE_IP',
        'port': 5672,
        'username': 'admin',
        'password': 'mypassword123'
    }

    # πŸ”— Create connection using context manager
    with RabbitMQConnection(**config) as connection:

        # πŸ“€ Create publisher and send a message
        publisher = MessagePublisher(connection)

        success = publisher.publish_to_queue('hello-queue', {
            'message': 'Hello from Python! 🐍',
            'from': 'Python App',
            'version': '1.0'
        })

        if success:
            print("πŸ“€ Message sent successfully!")

        # πŸ“₯ Create consumer and start listening
        consumer = MessageConsumer(connection)
        consumer.consume_from_queue('hello-queue', message_handler)

if __name__ == '__main__':
    main()

⚑ Async Example

# examples/async_example.py
import asyncio
import aiofiles
import json
from datetime import datetime
from asyncio_pika import connect_robust, Message, ExchangeType
from asyncio_pika.abc import AbstractIncomingMessage

class AsyncRabbitMQClient:
    """⚑ High-performance async RabbitMQ client"""

    def __init__(self, connection_url: str):
        self.connection_url = connection_url
        self.connection = None
        self.channel = None

    async def connect(self):
        """πŸ”Œ Connect to RabbitMQ"""
        print("πŸ”Œ Connecting to RabbitMQ (async)...")

        self.connection = await connect_robust(
            self.connection_url,
            heartbeat=600,
            client_properties={"connection_name": "async-python-client"}
        )

        self.channel = await self.connection.channel()
        await self.channel.set_qos(prefetch_count=1000)

        print("βœ… Connected successfully (async)!")

    async def disconnect(self):
        """πŸ‘‹ Disconnect from RabbitMQ"""
        if self.connection:
            await self.connection.close()
            print("πŸ‘‹ Disconnected (async)")

    async def publish_batch(self, queue_name: str, messages: list):
        """πŸš€ Publish multiple messages efficiently"""
        try:
            # πŸ—οΈ Declare queue
            queue = await self.channel.declare_queue(
                queue_name,
                durable=True,
                arguments={
                    'x-max-priority': 10,
                    'x-queue-mode': 'lazy'
                }
            )

            # πŸ“€ Send messages in batch
            tasks = []
            for i, msg_data in enumerate(messages):
                message_body = json.dumps({
                    'id': f'batch-{i}',
                    'timestamp': datetime.now().isoformat(),
                    'data': msg_data
                })

                message = Message(
                    message_body.encode(),
                    delivery_mode=2,  # Persistent
                    priority=5
                )

                tasks.append(queue.publish(message))

            # πŸš€ Execute all publishes concurrently
            await asyncio.gather(*tasks)
            print(f"πŸš€ Published {len(messages)} messages in batch")

        except Exception as e:
            print(f"❌ Batch publish error: {e}")

    async def consume_async(self, queue_name: str, handler):
        """πŸ“₯ Async message consumer"""
        try:
            # πŸ—οΈ Declare queue
            queue = await self.channel.declare_queue(
                queue_name,
                durable=True,
                arguments={
                    'x-max-priority': 10,
                    'x-queue-mode': 'lazy'
                }
            )

            async def process_message(message: AbstractIncomingMessage):
                """πŸ”„ Process incoming message"""
                try:
                    # πŸ“¦ Parse message
                    body = json.loads(message.body.decode())
                    print(f"πŸ“₯ Received (async): {body.get('id', 'unknown')}")

                    # πŸ”„ Process with handler
                    await handler(body)

                    # βœ… Acknowledge
                    await message.ack()
                    print(f"βœ… Processed (async): {body.get('id', 'unknown')}")

                except Exception as e:
                    print(f"❌ Error processing message: {e}")
                    await message.reject(requeue=True)

            # πŸ“₯ Start consuming
            await queue.consume(process_message)
            print(f"πŸ‘‚ Listening for messages (async): {queue_name}")

        except Exception as e:
            print(f"❌ Consumer error: {e}")

async def async_message_handler(message):
    """⚑ Async message handler"""
    print(f"πŸ”„ Processing async message: {message}")

    # Simulate async work
    await asyncio.sleep(0.1)

    print(f"βœ… Async processing complete: {message.get('id', 'unknown')}")

async def main():
    """πŸš€ Async main function"""
    connection_url = "amqp://admin:mypassword123@YOUR_INSTANCE_IP:5672/"

    client = AsyncRabbitMQClient(connection_url)

    try:
        await client.connect()

        # πŸš€ Batch publishing example
        messages = [
            {'type': 'order', 'id': i, 'amount': i * 10}
            for i in range(1000)
        ]

        start_time = asyncio.get_event_loop().time()
        await client.publish_batch('async-queue', messages)
        end_time = asyncio.get_event_loop().time()

        duration = end_time - start_time
        rate = len(messages) / duration
        print(f"πŸ“Š Async publish rate: {rate:.0f} messages/second")

        # πŸ“₯ Start consuming
        await client.consume_async('async-queue', async_message_handler)

        # Keep running
        await asyncio.sleep(60)  # Run for 1 minute

    finally:
        await client.disconnect()

if __name__ == '__main__':
    asyncio.run(main())

πŸ“Š Monitoring & Troubleshooting

πŸ“Š Monitoring & Troubleshooting

🎯 Quick Health Checks

Before diving deep, let's do some quick checks to see if everything is working:

βœ… Basic Status Checks

# πŸ” Check if RabbitMQ is running
sudo systemctl status rabbitmq-server

# πŸ“Š Check cluster status (if using cluster)
sudo rabbitmqctl cluster_status

# πŸ“ˆ Check node status
sudo rabbitmqctl status

# πŸ“‹ List all queues
sudo rabbitmqctl list_queues name messages consumers

# πŸ‘₯ List connections
sudo rabbitmqctl list_connections name peer_host peer_port state

🌐 Network Connectivity Tests

# πŸ”— Test AMQP port (5672)
telnet YOUR_INSTANCE_IP 5672

# 🌐 Test Management UI port (15672)
curl -I http://YOUR_INSTANCE_IP:15672

# πŸ” Check listening ports
sudo netstat -tlnp | grep -E "(5672|15672|25672)"

πŸ“ˆ Performance Monitoring

πŸ–₯️ System Resource Monitoring

# πŸ’Ύ Memory usage
free -h

# πŸ’» CPU usage
top -bn1 | grep "Cpu(s)"

# πŸ’½ Disk usage
df -h

# πŸ“Š Disk I/O
iostat -x 1 5

# 🌐 Network statistics
ifstat 1 5

🐰 RabbitMQ Specific Monitoring

# πŸ“Š Queue statistics with details
sudo rabbitmqctl list_queues name messages messages_ready messages_unacknowledged consumers memory

# πŸ”— Connection details
sudo rabbitmqctl list_connections recv_oct send_oct state

# πŸ“‘ Channel information
sudo rabbitmqctl list_channels connection messages_unacknowledged

# πŸ”’ Exchange information
sudo rabbitmqctl list_exchanges name type durable auto_delete

πŸ”§ Automated Monitoring Script

Create a comprehensive monitoring script:

# Create monitoring script
sudo tee /usr/local/bin/rabbitmq-health-check.sh << 'EOF'
#!/bin/bash

# 🎨 Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color

echo "🐰 RabbitMQ Health Check Report"
echo "================================"
echo "πŸ“… Date: $(date)"
echo ""

# πŸ₯ Service Health
echo -e "${BLUE}πŸ₯ Service Health${NC}"
if systemctl is-active --quiet rabbitmq-server; then
    echo -e "βœ… RabbitMQ service is ${GREEN}running${NC}"
else
    echo -e "❌ RabbitMQ service is ${RED}not running${NC}"
    exit 1
fi

# πŸ’Ύ Memory Usage
echo -e "\n${BLUE}πŸ’Ύ Memory Usage${NC}"
MEMORY_TOTAL=$(free -m | grep Mem | awk '{print $2}')
MEMORY_USED=$(free -m | grep Mem | awk '{print $3}')
MEMORY_PERCENT=$(echo "scale=1; $MEMORY_USED * 100 / $MEMORY_TOTAL" | bc)

echo "Total Memory: ${MEMORY_TOTAL}MB"
echo "Used Memory: ${MEMORY_USED}MB (${MEMORY_PERCENT}%)"

if (( $(echo "$MEMORY_PERCENT > 80" | bc -l) )); then
    echo -e "⚠️  ${YELLOW}High memory usage!${NC}"
fi

# πŸ’½ Disk Usage
echo -e "\n${BLUE}πŸ’½ Disk Usage${NC}"
DISK_USAGE=$(df -h /var/lib/rabbitmq | awk 'NR==2 {print $5}' | sed 's/%//')
echo "RabbitMQ Data Directory: ${DISK_USAGE}% used"

if (( DISK_USAGE > 80 )); then
    echo -e "⚠️  ${YELLOW}High disk usage!${NC}"
fi

# πŸ”— Connection Count
echo -e "\n${BLUE}πŸ”— Connections${NC}"
CONNECTION_COUNT=$(rabbitmqctl list_connections 2>/dev/null | grep -v "Listing connections" | wc -l)
echo "Active connections: $CONNECTION_COUNT"

# πŸ“Š Queue Information
echo -e "\n${BLUE}πŸ“Š Top 10 Queues by Message Count${NC}"
rabbitmqctl list_queues name messages 2>/dev/null | grep -v "Listing queues" | sort -k2 -nr | head -10

# πŸš€ Performance Metrics
echo -e "\n${BLUE}πŸš€ Performance Metrics${NC}"
RABBIT_STATUS=$(rabbitmqctl status 2>/dev/null)
echo "Node uptime: $(echo "$RABBIT_STATUS" | grep uptime | head -1)"

echo ""
echo "βœ… Health check completed!"
EOF

# Make executable
sudo chmod +x /usr/local/bin/rabbitmq-health-check.sh

# Run the health check
sudo /usr/local/bin/rabbitmq-health-check.sh

πŸ”₯ Common Issues & Solutions

❌ Problem: Can't Connect to RabbitMQ

πŸ” Symptoms:

  • Connection refused errors
  • Timeouts when connecting

πŸ› οΈ Solutions:

# 1. Check if service is running
sudo systemctl status rabbitmq-server

# 2. If not running, start it
sudo systemctl start rabbitmq-server

# 3. Check firewall
sudo iptables -L | grep 5672

# 4. Check logs for errors
sudo tail -f /var/log/rabbitmq/rabbit@$(hostname).log

# 5. Test local connection
rabbitmqctl status

πŸ’Ύ Problem: High Memory Usage

πŸ” Symptoms:

  • System becomes slow
  • RabbitMQ stops accepting connections
  • Memory alarms triggered

πŸ› οΈ Solutions:

# 1. Check memory watermark
rabbitmqctl status | grep memory

# 2. Lower memory threshold temporarily
rabbitmqctl set_vm_memory_high_watermark 0.4

# 3. Purge unnecessary queues
rabbitmqctl purge_queue QUEUE_NAME

# 4. Restart RabbitMQ if critically low
sudo systemctl restart rabbitmq-server

πŸš€ Problem: Poor Performance

πŸ” Symptoms:

  • Slow message processing
  • High queue depths
  • Consumer lag

πŸ› οΈ Solutions:

# 1. Check queue modes
rabbitmqctl list_queues name arguments | grep queue-mode

# 2. Enable lazy queues for high throughput
rabbitmqctl set_policy lazy-queue ".*" '{"queue-mode":"lazy"}' --apply-to queues

# 3. Check consumer prefetch
rabbitmqctl list_channels pid prefetch_count

# 4. Monitor message rates
watch -n 1 'rabbitmqctl list_queues name messages_ready messages_unacknowledged'

πŸ“Š Performance Benchmarking Tools

πŸ§ͺ RabbitMQ PerfTest Tool

# Install performance testing tool
wget https://github.com/rabbitmq/rabbitmq-perf-test/releases/latest/download/perf-test-latest.jar

# Simple performance test
java -jar perf-test-latest.jar \
  --uri amqp://admin:mypassword123@YOUR_INSTANCE_IP:5672 \
  --producers 10 \
  --consumers 10 \
  --rate 1000 \
  --time 60

# High-throughput test
java -jar perf-test-latest.jar \
  --uri amqp://admin:mypassword123@YOUR_INSTANCE_IP:5672 \
  --producers 50 \
  --consumers 50 \
  --rate 10000 \
  --time 300 \
  --queue-pattern 'perf-test-%d' \
  --queue-pattern-from 1 \
  --queue-pattern-to 10

πŸ“ˆ Custom Monitoring Dashboard

Create a simple web dashboard to monitor your RabbitMQ:

# Install Node.js monitoring dashboard
sudo yum install -y nodejs npm

# Create monitoring app
mkdir rabbitmq-monitor
cd rabbitmq-monitor

npm init -y
npm install express axios

# Create simple dashboard
cat > app.js << 'EOF'
const express = require('express');
const axios = require('axios');
const app = express();

// RabbitMQ Management API
const RABBITMQ_API = 'http://admin:mypassword123@localhost:15672/api';

app.get('/', (req, res) => {
  res.send(`
    <html>
      <head>
        <title>🐰 RabbitMQ Monitor</title>
        <meta http-equiv="refresh" content="5">
        <style>
          body { font-family: Arial; margin: 20px; }
          .metric { background: #f5f5f5; padding: 10px; margin: 10px 0; border-radius: 5px; }
          .healthy { border-left: 5px solid green; }
          .warning { border-left: 5px solid orange; }
          .error { border-left: 5px solid red; }
        </style>
      </head>
      <body>
        <h1>🐰 RabbitMQ Dashboard</h1>
        <div id="metrics"></div>
        <script>
          fetch('/api/overview')
            .then(r => r.json())
            .then(data => {
              document.getElementById('metrics').innerHTML =
                '<div class="metric healthy">πŸ“Š Total Queues: ' + data.object_totals.queues + '</div>' +
                '<div class="metric healthy">πŸ”— Connections: ' + data.object_totals.connections + '</div>' +
                '<div class="metric healthy">πŸ“¨ Messages: ' + data.queue_totals.messages + '</div>';
            });
        </script>
      </body>
    </html>
  `);
});

app.get('/api/overview', async (req, res) => {
  try {
    const response = await axios.get(`${RABBITMQ_API}/overview`);
    res.json(response.data);
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

app.listen(8080, () => {
  console.log('🌐 Dashboard running at http://localhost:8080');
});
EOF

# Run the dashboard
node app.js &

🚨 Alerting Setup

Set up automated alerts for critical issues:

# Create alert script
sudo tee /usr/local/bin/rabbitmq-alerts.sh << 'EOF'
#!/bin/bash

# πŸ“§ Email settings (configure these)
EMAIL="your-email@example.com"
SUBJECT_PREFIX="🚨 RabbitMQ Alert"

# 🎯 Thresholds
MEMORY_THRESHOLD=80
DISK_THRESHOLD=80
QUEUE_DEPTH_THRESHOLD=10000

# πŸ’Ύ Check memory usage
MEMORY_PERCENT=$(free | grep Mem | awk '{printf "%.0f", $3/$2 * 100.0}')
if [ $MEMORY_PERCENT -gt $MEMORY_THRESHOLD ]; then
    echo "High memory usage: ${MEMORY_PERCENT}%" | mail -s "${SUBJECT_PREFIX}: High Memory" $EMAIL
fi

# πŸ’½ Check disk usage
DISK_PERCENT=$(df -h /var/lib/rabbitmq | awk 'NR==2 {print $5}' | sed 's/%//')
if [ $DISK_PERCENT -gt $DISK_THRESHOLD ]; then
    echo "High disk usage: ${DISK_PERCENT}%" | mail -s "${SUBJECT_PREFIX}: High Disk" $EMAIL
fi

# πŸ“Š Check queue depths
rabbitmqctl list_queues name messages 2>/dev/null | while read queue messages; do
    if [[ $messages =~ ^[0-9]+$ ]] && [ $messages -gt $QUEUE_DEPTH_THRESHOLD ]; then
        echo "Queue $queue has $messages messages" | mail -s "${SUBJECT_PREFIX}: High Queue Depth" $EMAIL
    fi
done
EOF

sudo chmod +x /usr/local/bin/rabbitmq-alerts.sh

# Add to crontab (check every 5 minutes)
echo "*/5 * * * * /usr/local/bin/rabbitmq-alerts.sh" | sudo crontab -

πŸ’° Cost Optimization

πŸ’‘ Smart Instance Sizing

πŸ“Š Right-Sizing Your Instances

Understanding your actual usage is key to saving money:

# Monitor actual resource usage over time
cat > monitor-usage.sh << 'EOF'
#!/bin/bash

LOG_FILE="/tmp/rabbitmq-usage.log"
echo "$(date),$(free | grep Mem | awk '{printf "%.1f", $3/$2 * 100}'),$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1),$(df -h /var/lib/rabbitmq | awk 'NR==2 {print $5}' | sed 's/%//')" >> $LOG_FILE
EOF

chmod +x monitor-usage.sh

# Run every minute for a week to gather data
echo "* * * * * /path/to/monitor-usage.sh" | crontab -

πŸ’° Cost Comparison Calculator

# Create cost calculator script
cat > cost-calculator.sh << 'EOF'
#!/bin/bash

# πŸ’° AWS EC2 On-Demand Pricing (USD per hour, us-east-1)
declare -A ON_DEMAND_PRICES=(
    ["t3.medium"]=0.0416
    ["c5n.large"]=0.108
    ["c5n.xlarge"]=0.216
    ["c5n.2xlarge"]=0.432
    ["c5n.4xlarge"]=0.864
)

# πŸ’΅ Reserved Instance Pricing (1-year term, no upfront)
declare -A RESERVED_PRICES=(
    ["t3.medium"]=0.0301
    ["c5n.large"]=0.078
    ["c5n.xlarge"]=0.156
    ["c5n.2xlarge"]=0.312
    ["c5n.4xlarge"]=0.624
)

# 🎯 Spot Instance Pricing (approximate)
declare -A SPOT_PRICES=(
    ["t3.medium"]=0.0125
    ["c5n.large"]=0.0324
    ["c5n.xlarge"]=0.0648
    ["c5n.2xlarge"]=0.1296
    ["c5n.4xlarge"]=0.2592
)

echo "πŸ’° RabbitMQ Cost Calculator"
echo "=========================="

for instance in "${!ON_DEMAND_PRICES[@]}"; do
    on_demand_monthly=$(echo "scale=2; ${ON_DEMAND_PRICES[$instance]} * 24 * 30" | bc)
    reserved_monthly=$(echo "scale=2; ${RESERVED_PRICES[$instance]} * 24 * 30" | bc)
    spot_monthly=$(echo "scale=2; ${SPOT_PRICES[$instance]} * 24 * 30" | bc)

    reserved_savings=$(echo "scale=1; ($on_demand_monthly - $reserved_monthly) / $on_demand_monthly * 100" | bc)
    spot_savings=$(echo "scale=1; ($on_demand_monthly - $spot_monthly) / $on_demand_monthly * 100" | bc)

    echo ""
    echo "πŸ–₯️  $instance:"
    echo "   On-Demand: \$${on_demand_monthly}/month"
    echo "   Reserved:  \$${reserved_monthly}/month (${reserved_savings}% savings)"
    echo "   Spot:      \$${spot_monthly}/month (${spot_savings}% savings)"
done
EOF

chmod +x cost-calculator.sh
./cost-calculator.sh

🎯 Optimization Strategies

πŸ”„ Auto-Scaling with CloudWatch

Set up automatic scaling based on queue depth:

# Create auto-scaling policy
aws autoscaling create-auto-scaling-group \
  --auto-scaling-group-name rabbitmq-asg \
  --launch-template LaunchTemplateName=rabbitmq-template,Version=1 \
  --min-size 1 \
  --max-size 5 \
  --desired-capacity 2 \
  --target-group-arns arn:aws:elasticloadbalancing:region:account:targetgroup/rabbitmq-targets/50dc6c495c0c9188

# Create scaling policies
aws autoscaling put-scaling-policy \
  --auto-scaling-group-name rabbitmq-asg \
  --policy-name scale-up \
  --policy-type StepScaling \
  --adjustment-type ChangeInCapacity \
  --step-adjustments MetricIntervalLowerBound=0,ScalingAdjustment=1

# Create CloudWatch alarm
aws cloudwatch put-metric-alarm \
  --alarm-name rabbitmq-high-queue-depth \
  --alarm-description "Scale up when queue depth is high" \
  --metric-name QueueDepth \
  --namespace AWS/RabbitMQ \
  --statistic Average \
  --period 300 \
  --threshold 1000 \
  --comparison-operator GreaterThanThreshold \
  --evaluation-periods 2

πŸ’Ύ Storage Optimization

# Use gp3 volumes for better cost/performance
aws ec2 create-volume \
  --size 100 \
  --volume-type gp3 \
  --iops 3000 \
  --throughput 125 \
  --availability-zone us-east-1a \
  --tag-specifications 'ResourceType=volume,Tags=[{Key=Name,Value=rabbitmq-data}]'

# Enable EBS optimization
aws ec2 modify-instance-attribute \
  --instance-id i-1234567890abcdef0 \
  --ebs-optimized

πŸ• Scheduled Scaling

For predictable workloads, use scheduled scaling:

# Scale up during business hours
aws autoscaling put-scheduled-update-group-action \
  --auto-scaling-group-name rabbitmq-asg \
  --scheduled-action-name scale-up-business-hours \
  --recurrence "0 8 * * MON-FRI" \
  --desired-capacity 3

# Scale down after hours
aws autoscaling put-scheduled-update-group-action \
  --auto-scaling-group-name rabbitmq-asg \
  --scheduled-action-name scale-down-after-hours \
  --recurrence "0 18 * * MON-FRI" \
  --desired-capacity 1

🎯 Spot Instance Strategy

For non-critical workloads, use spot instances to save up to 90%:

# Create spot fleet configuration
cat > spot-fleet-config.json << 'EOF'
{
    "SpotFleetRequestConfig": {
        "IamFleetRole": "arn:aws:iam::account:role/aws-ec2-spot-fleet-tagging-role",
        "AllocationStrategy": "diversified",
        "TargetCapacity": 2,
        "SpotPrice": "0.05",
        "LaunchSpecifications": [
            {
                "ImageId": "ami-0abcdef1234567890",
                "InstanceType": "c5n.large",
                "KeyName": "my-key-pair",
                "SecurityGroups": [{"GroupId": "sg-12345678"}],
                "SubnetId": "subnet-12345678",
                "UserData": "base64-encoded-startup-script"
            }
        ]
    }
}
EOF

# Request spot fleet
aws ec2 request-spot-fleet --spot-fleet-request-config file://spot-fleet-config.json

πŸ“Š Cost Monitoring Dashboard

Create a cost tracking dashboard:

# Create cost monitoring script
cat > cost-monitor.sh << 'EOF'
#!/bin/bash

# Get current month's usage
CURRENT_MONTH=$(date +%Y-%m)
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)

# Get cost data
aws ce get-cost-and-usage \
  --time-period Start=${CURRENT_MONTH}-01,End=$(date +%Y-%m-%d) \
  --granularity MONTHLY \
  --metrics BlendedCost \
  --group-by Type=DIMENSION,Key=SERVICE \
  --filter file://cost-filter.json

# Cost filter for EC2 only
cat > cost-filter.json << 'FILTER'
{
    "Dimensions": {
        "Key": "SERVICE",
        "Values": ["Amazon Elastic Compute Cloud - Compute"]
    }
}
FILTER

echo "πŸ’° Current month EC2 costs:"
aws ce get-cost-and-usage \
  --time-period Start=${CURRENT_MONTH}-01,End=$(date +%Y-%m-%d) \
  --granularity MONTHLY \
  --metrics BlendedCost \
  --filter file://cost-filter.json \
  --query 'ResultsByTime[0].Total.BlendedCost.Amount' \
  --output text

# Clean up
rm cost-filter.json
EOF

chmod +x cost-monitor.sh

🎯 Best Practices Summary

βœ… Performance Optimization

  • Use appropriate EC2 instance types (c5n family for network performance)
  • Configure proper queue arguments (x-queue-mode: lazy for high throughput)
  • Implement message batching for bulk operations
  • Use connection pooling for high-concurrency applications
  • Configure appropriate prefetch values

πŸ”’ Security

  • Always use SSL/TLS in production
  • Implement proper authentication and authorization
  • Use VPC security groups to restrict access
  • Regularly update RabbitMQ and dependencies
  • Monitor for suspicious activity

πŸ›‘οΈ Reliability

  • Implement proper error handling and retry logic
  • Use message acknowledgments appropriately
  • Set up monitoring and alerting
  • Implement health checks
  • Plan for disaster recovery

πŸ’° Cost Optimization

  • Right-size your instances based on actual usage
  • Use spot instances for non-critical workloads
  • Consider reserved instances for predictable workloads
  • Monitor resource utilization regularly
  • Implement auto-scaling where appropriate

πŸ“Š Monitoring

  • Set up comprehensive logging
  • Monitor queue depths and processing rates
  • Track connection counts and resource usage
  • Implement alerting for critical metrics
  • Regular performance benchmarking

πŸŽ‰ Conclusion

Congratulations! 🎊 You've now learned how to:

  • βœ… Set up RabbitMQ on AWS EC2 (both single and cluster)
  • βœ… Write applications in Java, Node.js/TypeScript, and Python
  • βœ… Monitor and troubleshoot your deployment
  • βœ… Optimize for performance and cost

🌟 Key Takeaways

  • πŸš€ Performance: Properly configured RabbitMQ on EC2 can handle 100k+ messages/second
  • πŸ’° Cost: Self-hosting can save 60-80% compared to managed services
  • πŸ“ˆ Scalability: Easy to scale horizontally with clustering
  • πŸ›‘οΈ Reliability: Proper configuration ensures high availability

πŸš€ Next Steps

  1. πŸ§ͺ Test Everything: Start with a small instance and test your application
  2. πŸ“Š Monitor: Set up monitoring before going to production
  3. πŸ”’ Secure: Implement proper security measures
  4. πŸ“ˆ Scale: Use the patterns you've learned to scale as needed

πŸ†˜ Getting Help

Remember: Always test thoroughly in a staging environment before deploying to production, and continuously monitor your setup to ensure optimal performance! 🎯

Happy messaging! 🐰✨

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment