- π Overview
- π Prerequisites
- βοΈ EC2 Instance Setup
- π§ RabbitMQ Installation
- β‘ Single Instance Setup
- π Cluster Setup
- π‘οΈ Security & Management
- π» Code Implementation
- π Monitoring & Troubleshooting
- π° Cost Optimization
π― What is RabbitMQ? Think of RabbitMQ as a super-smart postal service for your applications! It helps different parts of your software talk to each other by sending messages back and forth.
- β‘ Fast: Sub-millisecond message delivery
- π’ High Volume: Handle 100,000+ messages per second
- π° Cost Effective: Save 60-80% vs managed services
- π Scalable: Grow from 1 to 100+ servers easily
- π₯ Multi-Language: Works with Java, Node.js, Python, and more!
By the end of this guide, you'll be able to:
- β Set up RabbitMQ on AWS EC2
- β Create both single and cluster deployments
- β Write code in Java, Node.js/TypeScript, and Python
- β Monitor and troubleshoot your setup
- β Optimize for performance and cost
Don't worry! You just need to know:
- π±οΈ How to use a computer and web browser
- π Very basic typing in terminal/command prompt
- π What a website is (we'll teach the rest!)
- π§ AWS CLI configured
- π SSH key pair setup
- π» Basic Linux commands
- π¦ Package managers (npm, pip, maven)
- π₯οΈ Computer with internet connection
- π AWS Account (free tier is fine!)
- π Text editor (VS Code recommended)
- β Your favorite beverage for motivation!
Think of EC2 instances like different sizes of computers you can rent from Amazon!
| π·οΈ Instance Type | π§ CPU | πΎ RAM | π Best For | π° Cost/Month |
|---|---|---|---|---|
t3.medium |
2 cores | 4 GB | πΆ Learning/Testing | ~$30 |
c5n.large |
2 cores | 5.25 GB | π Small Production | ~$80 |
c5n.xlarge |
4 cores | 10.5 GB | π’ Medium Business | ~$160 |
c5n.2xlarge |
8 cores | 21 GB | π High Traffic | ~$320 |
c5n.4xlarge |
16 cores | 42 GB | π Enterprise | ~$640 |
Start with t3.medium for learning, then upgrade to c5n.large for production!
Start with t3.medium for learning, then upgrade to c5n.large for production!
A security group is like a firewall that controls who can talk to your server.
- π Go to AWS EC2 Console
- π Click "Security Groups" in the left menu
- β Click "Create Security Group"
- π Fill in these details:
Security Group Name: rabbitmq-sg
Description: RabbitMQ Security Group
- β Add these rules by clicking "Add Rule":
| πͺ Type | π Port | π Source | π Description |
|---|---|---|---|
| SSH | 22 | My IP | For remote access |
| Custom TCP | 5672 | Anywhere (0.0.0.0/0) | RabbitMQ AMQP |
| Custom TCP | 15672 | Anywhere (0.0.0.0/0) | RabbitMQ Management UI |
| Custom TCP | 25672 | Anywhere (0.0.0.0/0) | RabbitMQ Cluster |
# Create security group
aws ec2 create-security-group \
--group-name rabbitmq-sg \
--description "RabbitMQ Security Group"
# Add SSH access (replace YOUR_IP with your actual IP)
aws ec2 authorize-security-group-ingress \
--group-name rabbitmq-sg \
--protocol tcp \
--port 22 \
--cidr YOUR_IP/32
# Add RabbitMQ AMQP port
aws ec2 authorize-security-group-ingress \
--group-name rabbitmq-sg \
--protocol tcp \
--port 5672 \
--cidr 0.0.0.0/0
# Add Management UI port
aws ec2 authorize-security-group-ingress \
--group-name rabbitmq-sg \
--protocol tcp \
--port 15672 \
--cidr 0.0.0.0/0
# Add Cluster communication port
aws ec2 authorize-security-group-ingress \
--group-name rabbitmq-sg \
--protocol tcp \
--port 25672 \
--cidr 0.0.0.0/0- π Go to AWS EC2 Console
- π Click "Launch Instance"
- πΌοΈ Choose "Amazon Linux 2 AMI" (it's free!)
- π» Select your instance type (t3.medium for beginners)
- π Choose your security group (rabbitmq-sg)
- π Create or select a key pair
- πΎ Configure storage (20 GB is fine to start)
- π Click "Launch Instance"
# Launch instance
aws ec2 run-instances \
--image-id ami-0abcdef1234567890 \
--count 1 \
--instance-type t3.medium \
--key-name your-key-pair \
--security-groups rabbitmq-sg \
--tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value=RabbitMQ-Server}]'Once your instance is running (this takes 2-3 minutes), you can connect to it:
- π₯ Download PuTTY
- π Convert your .pem key to .ppk using PuTTYgen
- π Connect using your instance's public IP
# Make your key file secure
chmod 400 your-key.pem
# Connect to your instance
ssh -i your-key.pem ec2-user@YOUR_INSTANCE_IPGreat! π Now you're connected to your server. Let's install RabbitMQ!
# Update all packages (this is like updating your phone!)
sudo yum update -y
# Install useful tools
sudo yum install -y wget curl vim htop net-toolsRabbitMQ is built on Erlang (think of it as RabbitMQ's engine):
# Add Erlang Solutions repository
wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm
sudo rpm -Uvh erlang-solutions-2.0-1.noarch.rpm
# Install Erlang
sudo yum install -y erlang
# Verify installation
erl -version# Add RabbitMQ repository
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
# Install RabbitMQ
sudo yum install -y rabbitmq-server
# Start RabbitMQ service
sudo systemctl start rabbitmq-server
# Make it start automatically when server restarts
sudo systemctl enable rabbitmq-server
# Check if it's running
sudo systemctl status rabbitmq-serverThe Management UI is like a dashboard for your RabbitMQ:
# Enable the management plugin
sudo rabbitmq-plugins enable rabbitmq_management
# Create an admin user
sudo rabbitmqctl add_user admin mypassword123
# Make the user an administrator
sudo rabbitmqctl set_user_tags admin administrator
# Give the user permissions
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# Remove the default guest user (for security)
sudo rabbitmqctl delete_user guest
# Restart RabbitMQ to apply changes
sudo systemctl restart rabbitmq-server- π Open your web browser
- π Go to:
http://YOUR_INSTANCE_IP:15672 - π Login with:
- Username:
admin - Password:
mypassword123
- Username:
You should see a beautiful dashboard! π
Perfect for most applications and beginners! This setup can handle thousands of messages per second.
Let's optimize your single RabbitMQ instance:
# Create RabbitMQ configuration file
sudo tee /etc/rabbitmq/rabbitmq.conf << 'EOF'
# π Network Configuration
listeners.tcp.default = 5672
management.tcp.port = 15672
management.tcp.ip = 0.0.0.0
# β‘ Performance Settings
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.8
disk_free_limit.relative = 2.0
# π Logging
log.console = true
log.console.level = info
log.file = /var/log/rabbitmq/rabbitmq.log
log.file.level = info
# πͺ Performance Tuning
collect_statistics_interval = 10000
delegate_count = 16
EOF
# Restart to apply changes
sudo systemctl restart rabbitmq-serverMake your single instance super fast:
# System optimizations
sudo tee -a /etc/security/limits.conf << 'EOF'
rabbitmq soft nofile 65536
rabbitmq hard nofile 65536
EOF
# Network optimizations
sudo tee -a /etc/sysctl.conf << 'EOF'
# Better network performance
net.core.rmem_default = 262144
net.core.rmem_max = 16777216
net.core.wmem_default = 262144
net.core.wmem_max = 16777216
vm.swappiness = 10
EOF
# Apply changes
sudo sysctl -p# Check if RabbitMQ is working
sudo rabbitmqctl status
# Check if you can create a queue
sudo rabbitmqctl list_queues
# Check memory usage
sudo rabbitmqctl status | grep memoryWant to handle millions of messages? Let's create a RabbitMQ cluster! πͺ
- π Higher Performance: Distribute load across multiple servers
- π‘οΈ High Availability: If one server dies, others keep working
- π Scalability: Add more servers as you grow
π Load Balancer
|
βββ΄ββ
β β
π° Node1 π° Node2 π° Node3
Follow the same steps as single instance setup, but create 3 instances:
rabbitmq-node1rabbitmq-node2rabbitmq-node3
# Set the same Erlang cookie (this is like a shared password)
sudo systemctl stop rabbitmq-server
sudo tee /var/lib/rabbitmq/.erlang.cookie << 'EOF'
MYSECRETCOOKIESTRING12345
EOF
sudo chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
sudo chmod 600 /var/lib/rabbitmq/.erlang.cookie# Start RabbitMQ
sudo systemctl start rabbitmq-server
# This will be our main node
sudo rabbitmqctl cluster_status# Start RabbitMQ
sudo systemctl start rabbitmq-server
# Join the cluster (replace NODE1_IP with actual IP)
sudo rabbitmqctl stop_app
sudo rabbitmqctl join_cluster rabbit@NODE1_PRIVATE_IP
sudo rabbitmqctl start_app
# Check cluster status
sudo rabbitmqctl cluster_statusCreate optimized configuration for cluster:
# On all nodes
sudo tee /etc/rabbitmq/rabbitmq.conf << 'EOF'
# π Cluster Configuration
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
# π Network settings
listeners.tcp.default = 5672
management.tcp.port = 15672
# β‘ Performance for cluster
vm_memory_high_watermark.relative = 0.6
channel_max = 2047
heartbeat = 60
# π Cluster communication
cluster_formation.classic_config.nodes.1 = rabbit@NODE1_PRIVATE_IP
cluster_formation.classic_config.nodes.2 = rabbit@NODE2_PRIVATE_IP
cluster_formation.classic_config.nodes.3 = rabbit@NODE3_PRIVATE_IP
EOFInstall on a separate instance:
# Install HAProxy
sudo yum install -y haproxy
# Configure HAProxy
sudo tee /etc/haproxy/haproxy.cfg << 'EOF'
global
daemon
defaults
mode http
timeout connect 5000ms
timeout client 50000ms
timeout server 50000ms
# π RabbitMQ Management UI
frontend rabbitmq_management
bind *:15672
default_backend rabbitmq_management_backend
backend rabbitmq_management_backend
balance roundrobin
server rabbit1 NODE1_IP:15672 check
server rabbit2 NODE2_IP:15672 check
server rabbit3 NODE3_IP:15672 check
# π° RabbitMQ AMQP
frontend rabbitmq_amqp
bind *:5672
mode tcp
default_backend rabbitmq_amqp_backend
backend rabbitmq_amqp_backend
mode tcp
balance roundrobin
server rabbit1 NODE1_IP:5672 check
server rabbit2 NODE2_IP:5672 check
server rabbit3 NODE3_IP:5672 check
EOF
# Start HAProxy
sudo systemctl start haproxy
sudo systemctl enable haproxyFor production, always use encryption:
# Create SSL directory
sudo mkdir -p /etc/rabbitmq/ssl
# Generate self-signed certificate (for testing)
sudo openssl req -new -x509 -days 365 -nodes \
-out /etc/rabbitmq/ssl/server_certificate.pem \
-keyout /etc/rabbitmq/ssl/server_key.pem \
-subj "/C=US/ST=CA/L=SF/O=MyCompany/CN=rabbitmq.mycompany.com"
# Set permissions
sudo chown -R rabbitmq:rabbitmq /etc/rabbitmq/ssl
sudo chmod 600 /etc/rabbitmq/ssl/server_key.pem
# Update configuration to use SSL
sudo tee -a /etc/rabbitmq/rabbitmq.conf << 'EOF'
# π SSL Configuration
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/ssl/server_certificate.pem
ssl_options.certfile = /etc/rabbitmq/ssl/server_certificate.pem
ssl_options.keyfile = /etc/rabbitmq/ssl/server_key.pem
ssl_options.verify = verify_none
ssl_options.fail_if_no_peer_cert = false
# π Management UI SSL
management.ssl.port = 15671
management.ssl.cacertfile = /etc/rabbitmq/ssl/server_certificate.pem
management.ssl.certfile = /etc/rabbitmq/ssl/server_certificate.pem
management.ssl.keyfile = /etc/rabbitmq/ssl/server_key.pem
EOF
# Restart RabbitMQ
sudo systemctl restart rabbitmq-serverCreate different users for different purposes:
# Create application user
sudo rabbitmqctl add_user app_user secure_password123
sudo rabbitmqctl set_permissions -p / app_user ".*" ".*" ".*"
# Create read-only user for monitoring
sudo rabbitmqctl add_user monitor_user monitor_pass123
sudo rabbitmqctl set_user_tags monitor_user monitoring
sudo rabbitmqctl set_permissions -p / monitor_user "" "" ".*"
# List all users
sudo rabbitmqctl list_usersNow for the fun part! Let's write code to use RabbitMQ in different languages.
First, add RabbitMQ dependency to your project:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.18.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>dependencies {
implementation 'com.rabbitmq:amqp-client:5.18.0'
implementation 'org.slf4j:slf4j-simple:2.0.7'
}// RabbitMQConnection.java
package com.example.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQConnection {
private Connection connection;
private Channel channel;
private ConnectionFactory factory;
public RabbitMQConnection(String host, int port, String username, String password) {
factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
// π Performance optimizations
factory.setRequestedHeartbeat(60);
factory.setConnectionTimeout(10000);
factory.setRequestedChannelMax(2047);
factory.setRequestedFrameMax(131072);
}
public void connect() throws IOException, TimeoutException {
System.out.println("π Connecting to RabbitMQ...");
connection = factory.newConnection();
channel = connection.createChannel();
// π Optimize channel
channel.basicQos(1000); // Prefetch count
System.out.println("β
Connected successfully!");
}
public void disconnect() throws IOException, TimeoutException {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
System.out.println("π Disconnected from RabbitMQ");
}
public Channel getChannel() {
return channel;
}
}// MessagePublisher.java
package com.example.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class MessagePublisher {
private RabbitMQConnection connection;
private Channel channel;
public MessagePublisher(RabbitMQConnection connection) {
this.connection = connection;
this.channel = connection.getChannel();
}
public void publishToQueue(String queueName, String message) throws IOException {
// ποΈ Create queue if it doesn't exist
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10);
args.put("x-queue-mode", "lazy"); // π For high throughput
channel.queueDeclare(queueName, true, false, false, args);
// π€ Publish message
channel.basicPublish("", queueName,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes(StandardCharsets.UTF_8));
System.out.println("π€ Sent: " + message);
}
public void publishToExchange(String exchangeName, String routingKey, String message)
throws IOException {
// ποΈ Create exchange if it doesn't exist
channel.exchangeDeclare(exchangeName, "topic", true);
// π€ Publish message
channel.basicPublish(exchangeName, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes(StandardCharsets.UTF_8));
System.out.println("π€ Sent to exchange '" + exchangeName + "': " + message);
}
}// MessageConsumer.java
package com.example.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class MessageConsumer {
private RabbitMQConnection connection;
private Channel channel;
public MessageConsumer(RabbitMQConnection connection) {
this.connection = connection;
this.channel = connection.getChannel();
}
public void consumeFromQueue(String queueName, MessageHandler handler) throws IOException {
// ποΈ Create queue if it doesn't exist
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10);
args.put("x-queue-mode", "lazy");
channel.queueDeclare(queueName, true, false, false, args);
System.out.println("π Waiting for messages from queue: " + queueName);
// π₯ Set up consumer
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
try {
System.out.println("π₯ Received: " + message);
// π Process message
handler.handle(message);
// β
Acknowledge message
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
System.err.println("β Error processing message: " + e.getMessage());
// π Reject and requeue
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true);
}
};
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
}
@FunctionalInterface
public interface MessageHandler {
void handle(String message) throws Exception;
}
}// Main.java
package com.example.rabbitmq;
public class Main {
public static void main(String[] args) {
try {
// π Connect to RabbitMQ
RabbitMQConnection connection = new RabbitMQConnection(
"YOUR_INSTANCE_IP", 5672, "admin", "mypassword123"
);
connection.connect();
// π€ Publisher example
MessagePublisher publisher = new MessagePublisher(connection);
publisher.publishToQueue("hello-queue", "Hello from Java! π");
// π₯ Consumer example
MessageConsumer consumer = new MessageConsumer(connection);
consumer.consumeFromQueue("hello-queue", message -> {
// π Your business logic here
System.out.println("π Processing: " + message);
Thread.sleep(1000); // Simulate work
System.out.println("β
Processed: " + message);
});
// π Keep program running
System.out.println("Press CTRL+C to exit...");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
connection.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}));
} catch (Exception e) {
e.printStackTrace();
}
}
}// BatchPublisher.java
package com.example.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class BatchPublisher {
private RabbitMQConnection connection;
private Channel channel;
private List<String> messageQueue;
private ScheduledExecutorService scheduler;
private int batchSize = 100;
private int flushInterval = 10; // milliseconds
public BatchPublisher(RabbitMQConnection connection) {
this.connection = connection;
this.channel = connection.getChannel();
this.messageQueue = new ArrayList<>();
this.scheduler = Executors.newScheduledThreadPool(1);
// π Schedule periodic flush
scheduler.scheduleAtFixedRate(this::flushBatch, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
}
public synchronized void publishBatch(String queueName, String message) throws IOException {
messageQueue.add(queueName + ":" + message);
if (messageQueue.size() >= batchSize) {
flushBatch();
}
}
private synchronized void flushBatch() {
if (messageQueue.isEmpty()) return;
try {
List<String> batch = new ArrayList<>(messageQueue);
messageQueue.clear();
for (String item : batch) {
String[] parts = item.split(":", 2);
String queueName = parts[0];
String message = parts[1];
channel.basicPublish("", queueName,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes(StandardCharsets.UTF_8));
}
System.out.println("π Published batch of " + batch.size() + " messages");
} catch (IOException e) {
System.err.println("β Error flushing batch: " + e.getMessage());
}
}
public void shutdown() {
flushBatch();
scheduler.shutdown();
}
}First, let's create a new Node.js project:
# Create new project directory
mkdir rabbitmq-nodejs
cd rabbitmq-nodejs
# Initialize npm project
npm init -y
# Install dependencies
npm install amqplib
npm install -D @types/amqplib typescript ts-node @types/node
# Create TypeScript config
npx tsc --initrabbitmq-nodejs/
βββ package.json
βββ tsconfig.json
βββ src/
β βββ connection.ts
β βββ publisher.ts
β βββ consumer.ts
β βββ examples/
β βββ simple-example.ts
β βββ advanced-example.ts
// src/connection.ts
import amqp, { Connection, Channel } from "amqplib";
export interface RabbitMQConfig {
host: string;
port: number;
username: string;
password: string;
vhost?: string;
ssl?: boolean;
heartbeat?: number;
connectionTimeout?: number;
}
export class RabbitMQConnection {
private connection: Connection | null = null;
private channel: Channel | null = null;
private config: RabbitMQConfig;
constructor(config: RabbitMQConfig) {
this.config = {
vhost: "/",
ssl: false,
heartbeat: 60,
connectionTimeout: 10000,
...config,
};
}
async connect(): Promise<void> {
try {
console.log("π Connecting to RabbitMQ...");
const connectionUrl = this.buildConnectionUrl();
this.connection = await amqp.connect(connectionUrl, {
heartbeat: this.config.heartbeat,
timeout: this.config.connectionTimeout,
});
// π Event listeners
this.connection.on("error", (err) => {
console.error("β RabbitMQ connection error:", err);
});
this.connection.on("close", () => {
console.log("π RabbitMQ connection closed");
});
this.channel = await this.connection.createChannel();
// π Optimize channel settings
await this.channel.prefetch(1000);
console.log("β
Successfully connected to RabbitMQ");
} catch (error) {
console.error("β Failed to connect to RabbitMQ:", error);
throw error;
}
}
async disconnect(): Promise<void> {
try {
if (this.channel) {
await this.channel.close();
}
if (this.connection) {
await this.connection.close();
}
console.log("π Disconnected from RabbitMQ");
} catch (error) {
console.error("β Error disconnecting from RabbitMQ:", error);
}
}
getChannel(): Channel {
if (!this.channel) {
throw new Error("β Channel not available. Call connect() first.");
}
return this.channel;
}
private buildConnectionUrl(): string {
const protocol = this.config.ssl ? "amqps" : "amqp";
return `${protocol}://${this.config.username}:${this.config.password}@${this.config.host}:${this.config.port}${this.config.vhost}`;
}
}// src/publisher.ts
import { Channel } from "amqplib";
import { RabbitMQConnection } from "./connection";
export interface PublishOptions {
persistent?: boolean;
priority?: number;
expiration?: string;
messageId?: string;
timestamp?: number;
headers?: Record<string, any>;
}
export class MessagePublisher {
private connection: RabbitMQConnection;
private channel: Channel;
constructor(connection: RabbitMQConnection) {
this.connection = connection;
this.channel = connection.getChannel();
}
async publishToQueue(
queueName: string,
message: any,
options: PublishOptions = {},
): Promise<boolean> {
try {
// ποΈ Ensure queue exists
await this.channel.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
"x-max-priority": 10,
"x-queue-mode": "lazy", // π For high throughput
},
});
const messageBuffer = Buffer.from(JSON.stringify(message));
const publishOptions = {
persistent: true,
deliveryMode: 2,
timestamp: Date.now(),
messageId: this.generateMessageId(),
...options,
};
const result = this.channel.sendToQueue(
queueName,
messageBuffer,
publishOptions,
);
if (result) {
console.log("π€ Message sent to queue:", queueName);
}
return result;
} catch (error) {
console.error("β Failed to publish message:", error);
throw error;
}
}
async publishToExchange(
exchangeName: string,
routingKey: string,
message: any,
options: PublishOptions = {},
): Promise<boolean> {
try {
// ποΈ Ensure exchange exists
await this.channel.assertExchange(exchangeName, "topic", {
durable: true,
autoDelete: false,
});
const messageBuffer = Buffer.from(JSON.stringify(message));
const publishOptions = {
persistent: true,
deliveryMode: 2,
timestamp: Date.now(),
messageId: this.generateMessageId(),
...options,
};
const result = this.channel.publish(
exchangeName,
routingKey,
messageBuffer,
publishOptions,
);
if (result) {
console.log(
`π€ Message sent to exchange '${exchangeName}' with routing key '${routingKey}'`,
);
}
return result;
} catch (error) {
console.error("β Failed to publish message to exchange:", error);
throw error;
}
}
private generateMessageId(): string {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}// src/consumer.ts
import { Channel, Message } from "amqplib";
import { RabbitMQConnection } from "./connection";
export interface ConsumeOptions {
noAck?: boolean;
exclusive?: boolean;
priority?: number;
consumerTag?: string;
}
export type MessageHandler = (
message: any,
originalMessage: Message,
) => Promise<void>;
export class MessageConsumer {
private connection: RabbitMQConnection;
private channel: Channel;
constructor(connection: RabbitMQConnection) {
this.connection = connection;
this.channel = connection.getChannel();
}
async consumeFromQueue(
queueName: string,
handler: MessageHandler,
options: ConsumeOptions = {},
): Promise<void> {
try {
// ποΈ Ensure queue exists
await this.channel.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
"x-max-priority": 10,
"x-queue-mode": "lazy",
},
});
const consumeOptions = {
noAck: false,
...options,
};
console.log(`π Waiting for messages from queue: ${queueName}`);
await this.channel.consume(
queueName,
async (msg) => {
if (msg) {
try {
const messageContent = JSON.parse(
msg.content.toString(),
);
console.log("π₯ Received message:", messageContent);
// π Process the message
await handler(messageContent, msg);
if (!consumeOptions.noAck) {
this.channel.ack(msg);
console.log("β
Message acknowledged");
}
} catch (error) {
console.error(
"β Error processing message:",
error,
);
if (!consumeOptions.noAck) {
// π Reject and requeue the message
this.channel.reject(msg, true);
console.log("π Message rejected and requeued");
}
}
}
},
consumeOptions,
);
} catch (error) {
console.error("β Failed to start consuming:", error);
throw error;
}
}
}// src/examples/simple-example.ts
import { RabbitMQConnection, MessagePublisher, MessageConsumer } from "../";
async function simpleExample() {
// π Configuration
const config = {
host: "YOUR_INSTANCE_IP",
port: 5672,
username: "admin",
password: "mypassword123",
};
// π Create connection
const connection = new RabbitMQConnection(config);
await connection.connect();
// π€ Create publisher
const publisher = new MessagePublisher(connection);
// π₯ Create consumer
const consumer = new MessageConsumer(connection);
// π€ Send a message
await publisher.publishToQueue("hello-queue", {
message: "Hello from Node.js! π",
timestamp: new Date(),
from: "Node.js App",
});
// π₯ Consume messages
await consumer.consumeFromQueue("hello-queue", async (message) => {
// π Your business logic here
console.log("π Processing message:", message);
// Simulate some work
await new Promise((resolve) => setTimeout(resolve, 1000));
console.log("β
Message processed successfully!");
});
// π Graceful shutdown
process.on("SIGINT", async () => {
console.log("\nπ Shutting down gracefully...");
await connection.disconnect();
process.exit(0);
});
console.log("π Simple example running! Press CTRL+C to exit...");
}
// Run the example
simpleExample().catch(console.error);// src/examples/advanced-example.ts
import { RabbitMQConnection, MessagePublisher, MessageConsumer } from "../";
class HighPerformanceBatchPublisher {
private publisher: MessagePublisher;
private messageQueue: Array<{ queueName: string; message: any }> = [];
private batchSize = 100;
private batchTimeout = 10; // milliseconds
private batchTimer: NodeJS.Timeout | null = null;
constructor(publisher: MessagePublisher) {
this.publisher = publisher;
}
async publishBatch(queueName: string, message: any): Promise<void> {
this.messageQueue.push({ queueName, message });
if (this.messageQueue.length >= this.batchSize) {
await this.flushBatch();
} else if (!this.batchTimer) {
this.batchTimer = setTimeout(async () => {
await this.flushBatch();
}, this.batchTimeout);
}
}
private async flushBatch(): Promise<void> {
if (this.batchTimer) {
clearTimeout(this.batchTimer);
this.batchTimer = null;
}
if (this.messageQueue.length === 0) return;
const batch = this.messageQueue.splice(0, this.batchSize);
try {
const publishPromises = batch.map(({ queueName, message }) =>
this.publisher.publishToQueue(queueName, message),
);
await Promise.all(publishPromises);
console.log(`π Published batch of ${batch.length} messages`);
} catch (error) {
console.error("β Error publishing batch:", error);
}
}
async shutdown(): Promise<void> {
await this.flushBatch();
}
}
async function highPerformanceExample() {
const config = {
host: "YOUR_INSTANCE_IP",
port: 5672,
username: "admin",
password: "mypassword123",
};
const connection = new RabbitMQConnection(config);
await connection.connect();
const publisher = new MessagePublisher(connection);
const batchPublisher = new HighPerformanceBatchPublisher(publisher);
console.log("π Starting high-performance publishing...");
// Publish 10,000 messages rapidly
const startTime = Date.now();
for (let i = 0; i < 10000; i++) {
await batchPublisher.publishBatch("high-throughput-queue", {
id: i,
timestamp: Date.now(),
data: `High-speed message ${i}`,
batch: Math.floor(i / 100),
});
if (i % 1000 === 0) {
console.log(`π Queued ${i} messages...`);
}
}
await batchPublisher.shutdown();
const endTime = Date.now();
const duration = endTime - startTime;
const messagesPerSecond = Math.round(10000 / (duration / 1000));
console.log(`π Published 10,000 messages in ${duration}ms`);
console.log(`π Rate: ${messagesPerSecond} messages/second`);
await connection.disconnect();
}
// Run the example
highPerformanceExample().catch(console.error);First, let's set up our Python environment:
# Create virtual environment
python3 -m venv rabbitmq-python
cd rabbitmq-python
# Activate virtual environment
# On Windows:
Scripts\activate
# On Mac/Linux:
source bin/activate
# Install dependencies
pip install pika asyncio-pika aiofiles
# Create project structure
mkdir src examples
touch src/__init__.py
touch src/connection.py src/publisher.py src/consumer.pyrabbitmq-python/
βββ requirements.txt
βββ src/
β βββ __init__.py
β βββ connection.py
β βββ publisher.py
β βββ consumer.py
βββ examples/
βββ simple_example.py
βββ async_example.py
# src/connection.py
import pika
import logging
from typing import Optional, Dict, Any
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RabbitMQConnection:
"""
π° RabbitMQ Connection Manager
Simple and reliable connection handling for RabbitMQ
"""
def __init__(self, host: str, port: int = 5672, username: str = 'guest',
password: str = 'guest', virtual_host: str = '/'):
self.host = host
self.port = port
self.username = username
self.password = password
self.virtual_host = virtual_host
self.connection: Optional[pika.BlockingConnection] = None
self.channel: Optional[pika.channel.Channel] = None
def connect(self) -> None:
"""π Connect to RabbitMQ"""
try:
logger.info("π Connecting to RabbitMQ...")
# π Create credentials
credentials = pika.PlainCredentials(self.username, self.password)
# βοΈ Connection parameters
parameters = pika.ConnectionParameters(
host=self.host,
port=self.port,
virtual_host=self.virtual_host,
credentials=credentials,
heartbeat=600, # 10 minutes
blocked_connection_timeout=300, # 5 minutes
socket_timeout=10,
retry_delay=5,
connection_attempts=3
)
# π Establish connection
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
# π Optimize channel
self.channel.basic_qos(prefetch_count=1000)
logger.info("β
Successfully connected to RabbitMQ")
except Exception as e:
logger.error(f"β Failed to connect to RabbitMQ: {e}")
raise
def disconnect(self) -> None:
"""π Disconnect from RabbitMQ"""
try:
if self.channel and not self.channel.is_closed:
self.channel.close()
if self.connection and not self.connection.is_closed:
self.connection.close()
logger.info("π Disconnected from RabbitMQ")
except Exception as e:
logger.error(f"β Error disconnecting from RabbitMQ: {e}")
def get_channel(self) -> pika.channel.Channel:
"""π‘ Get the channel"""
if not self.channel or self.channel.is_closed:
raise Exception("β Channel not available. Call connect() first.")
return self.channel
def __enter__(self):
"""π Context manager entry"""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""π Context manager exit"""
self.disconnect()# src/publisher.py
import json
import uuid
from datetime import datetime
from typing import Any, Dict, Optional
from .connection import RabbitMQConnection, logger
class MessagePublisher:
"""
π€ RabbitMQ Message Publisher
Easy-to-use publisher for sending messages
"""
def __init__(self, connection: RabbitMQConnection):
self.connection = connection
self.channel = connection.get_channel()
def publish_to_queue(self, queue_name: str, message: Any,
priority: int = 0, persistent: bool = True) -> bool:
"""π€ Publish message to a queue"""
try:
# ποΈ Declare queue with optimizations
self.channel.queue_declare(
queue=queue_name,
durable=True,
auto_delete=False,
arguments={
'x-max-priority': 10,
'x-queue-mode': 'lazy' # π High throughput mode
}
)
# π¦ Prepare message
message_body = json.dumps({
'id': str(uuid.uuid4()),
'timestamp': datetime.now().isoformat(),
'data': message
})
# π€ Publish message
result = self.channel.basic_publish(
exchange='',
routing_key=queue_name,
body=message_body,
properties=pika.BasicProperties(
delivery_mode=2 if persistent else 1, # Persistent or not
priority=priority,
message_id=str(uuid.uuid4()),
timestamp=int(datetime.now().timestamp())
)
)
logger.info(f"π€ Message sent to queue: {queue_name}")
return True
except Exception as e:
logger.error(f"β Failed to publish message: {e}")
return False
def publish_to_exchange(self, exchange_name: str, routing_key: str,
message: Any, priority: int = 0) -> bool:
"""π€ Publish message to an exchange"""
try:
# ποΈ Declare exchange
self.channel.exchange_declare(
exchange=exchange_name,
exchange_type='topic',
durable=True,
auto_delete=False
)
# π¦ Prepare message
message_body = json.dumps({
'id': str(uuid.uuid4()),
'timestamp': datetime.now().isoformat(),
'routing_key': routing_key,
'data': message
})
# π€ Publish message
self.channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body=message_body,
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
priority=priority,
message_id=str(uuid.uuid4()),
timestamp=int(datetime.now().timestamp())
)
)
logger.info(f"π€ Message sent to exchange '{exchange_name}' with routing key '{routing_key}'")
return True
except Exception as e:
logger.error(f"β Failed to publish to exchange: {e}")
return False# src/consumer.py
import json
import signal
import sys
from typing import Callable, Any
from .connection import RabbitMQConnection, logger
MessageHandler = Callable[[Dict[str, Any]], None]
class MessageConsumer:
"""
π₯ RabbitMQ Message Consumer
Easy-to-use consumer for receiving messages
"""
def __init__(self, connection: RabbitMQConnection):
self.connection = connection
self.channel = connection.get_channel()
self.consuming = False
def consume_from_queue(self, queue_name: str, handler: MessageHandler,
auto_ack: bool = False) -> None:
"""π₯ Consume messages from a queue"""
try:
# ποΈ Declare queue
self.channel.queue_declare(
queue=queue_name,
durable=True,
auto_delete=False,
arguments={
'x-max-priority': 10,
'x-queue-mode': 'lazy'
}
)
def callback(ch, method, properties, body):
"""π Message callback handler"""
try:
# π¦ Parse message
message = json.loads(body.decode('utf-8'))
logger.info(f"π₯ Received message: {message.get('id', 'unknown')}")
# π Process message
handler(message)
# β
Acknowledge message
if not auto_ack:
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info("β
Message acknowledged")
except json.JSONDecodeError as e:
logger.error(f"β Failed to parse message: {e}")
if not auto_ack:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
logger.error(f"β Error processing message: {e}")
if not auto_ack:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
# π₯ Set up consumer
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=auto_ack
)
logger.info(f"π Waiting for messages from queue: {queue_name}")
logger.info("π Press CTRL+C to exit")
# π Start consuming
self.consuming = True
# π Handle graceful shutdown
def signal_handler(sig, frame):
logger.info("π Stopping consumer...")
self.consuming = False
self.channel.stop_consuming()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# π Start consuming loop
while self.consuming:
try:
self.channel.start_consuming()
except KeyboardInterrupt:
logger.info("π KeyboardInterrupt received")
break
except Exception as e:
logger.error(f"β Consumer error: {e}")
break
logger.info("π Consumer stopped")
except Exception as e:
logger.error(f"β Failed to start consuming: {e}")
raise# examples/simple_example.py
import sys
import os
import time
# Add src to path
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src'))
from connection import RabbitMQConnection
from publisher import MessagePublisher
from consumer import MessageConsumer
def message_handler(message):
"""π Handle incoming messages"""
print(f"π Processing message: {message}")
# Simulate some work
time.sleep(1)
print(f"β
Processed message: {message.get('id', 'unknown')}")
def main():
"""π Main function"""
# π Configuration
config = {
'host': 'YOUR_INSTANCE_IP',
'port': 5672,
'username': 'admin',
'password': 'mypassword123'
}
# π Create connection using context manager
with RabbitMQConnection(**config) as connection:
# π€ Create publisher and send a message
publisher = MessagePublisher(connection)
success = publisher.publish_to_queue('hello-queue', {
'message': 'Hello from Python! π',
'from': 'Python App',
'version': '1.0'
})
if success:
print("π€ Message sent successfully!")
# π₯ Create consumer and start listening
consumer = MessageConsumer(connection)
consumer.consume_from_queue('hello-queue', message_handler)
if __name__ == '__main__':
main()# examples/async_example.py
import asyncio
import aiofiles
import json
from datetime import datetime
from asyncio_pika import connect_robust, Message, ExchangeType
from asyncio_pika.abc import AbstractIncomingMessage
class AsyncRabbitMQClient:
"""β‘ High-performance async RabbitMQ client"""
def __init__(self, connection_url: str):
self.connection_url = connection_url
self.connection = None
self.channel = None
async def connect(self):
"""π Connect to RabbitMQ"""
print("π Connecting to RabbitMQ (async)...")
self.connection = await connect_robust(
self.connection_url,
heartbeat=600,
client_properties={"connection_name": "async-python-client"}
)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=1000)
print("β
Connected successfully (async)!")
async def disconnect(self):
"""π Disconnect from RabbitMQ"""
if self.connection:
await self.connection.close()
print("π Disconnected (async)")
async def publish_batch(self, queue_name: str, messages: list):
"""π Publish multiple messages efficiently"""
try:
# ποΈ Declare queue
queue = await self.channel.declare_queue(
queue_name,
durable=True,
arguments={
'x-max-priority': 10,
'x-queue-mode': 'lazy'
}
)
# π€ Send messages in batch
tasks = []
for i, msg_data in enumerate(messages):
message_body = json.dumps({
'id': f'batch-{i}',
'timestamp': datetime.now().isoformat(),
'data': msg_data
})
message = Message(
message_body.encode(),
delivery_mode=2, # Persistent
priority=5
)
tasks.append(queue.publish(message))
# π Execute all publishes concurrently
await asyncio.gather(*tasks)
print(f"π Published {len(messages)} messages in batch")
except Exception as e:
print(f"β Batch publish error: {e}")
async def consume_async(self, queue_name: str, handler):
"""π₯ Async message consumer"""
try:
# ποΈ Declare queue
queue = await self.channel.declare_queue(
queue_name,
durable=True,
arguments={
'x-max-priority': 10,
'x-queue-mode': 'lazy'
}
)
async def process_message(message: AbstractIncomingMessage):
"""π Process incoming message"""
try:
# π¦ Parse message
body = json.loads(message.body.decode())
print(f"π₯ Received (async): {body.get('id', 'unknown')}")
# π Process with handler
await handler(body)
# β
Acknowledge
await message.ack()
print(f"β
Processed (async): {body.get('id', 'unknown')}")
except Exception as e:
print(f"β Error processing message: {e}")
await message.reject(requeue=True)
# π₯ Start consuming
await queue.consume(process_message)
print(f"π Listening for messages (async): {queue_name}")
except Exception as e:
print(f"β Consumer error: {e}")
async def async_message_handler(message):
"""β‘ Async message handler"""
print(f"π Processing async message: {message}")
# Simulate async work
await asyncio.sleep(0.1)
print(f"β
Async processing complete: {message.get('id', 'unknown')}")
async def main():
"""π Async main function"""
connection_url = "amqp://admin:mypassword123@YOUR_INSTANCE_IP:5672/"
client = AsyncRabbitMQClient(connection_url)
try:
await client.connect()
# π Batch publishing example
messages = [
{'type': 'order', 'id': i, 'amount': i * 10}
for i in range(1000)
]
start_time = asyncio.get_event_loop().time()
await client.publish_batch('async-queue', messages)
end_time = asyncio.get_event_loop().time()
duration = end_time - start_time
rate = len(messages) / duration
print(f"π Async publish rate: {rate:.0f} messages/second")
# π₯ Start consuming
await client.consume_async('async-queue', async_message_handler)
# Keep running
await asyncio.sleep(60) # Run for 1 minute
finally:
await client.disconnect()
if __name__ == '__main__':
asyncio.run(main())Before diving deep, let's do some quick checks to see if everything is working:
# π Check if RabbitMQ is running
sudo systemctl status rabbitmq-server
# π Check cluster status (if using cluster)
sudo rabbitmqctl cluster_status
# π Check node status
sudo rabbitmqctl status
# π List all queues
sudo rabbitmqctl list_queues name messages consumers
# π₯ List connections
sudo rabbitmqctl list_connections name peer_host peer_port state# π Test AMQP port (5672)
telnet YOUR_INSTANCE_IP 5672
# π Test Management UI port (15672)
curl -I http://YOUR_INSTANCE_IP:15672
# π Check listening ports
sudo netstat -tlnp | grep -E "(5672|15672|25672)"# πΎ Memory usage
free -h
# π» CPU usage
top -bn1 | grep "Cpu(s)"
# π½ Disk usage
df -h
# π Disk I/O
iostat -x 1 5
# π Network statistics
ifstat 1 5# π Queue statistics with details
sudo rabbitmqctl list_queues name messages messages_ready messages_unacknowledged consumers memory
# π Connection details
sudo rabbitmqctl list_connections recv_oct send_oct state
# π‘ Channel information
sudo rabbitmqctl list_channels connection messages_unacknowledged
# π’ Exchange information
sudo rabbitmqctl list_exchanges name type durable auto_deleteCreate a comprehensive monitoring script:
# Create monitoring script
sudo tee /usr/local/bin/rabbitmq-health-check.sh << 'EOF'
#!/bin/bash
# π¨ Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
echo "π° RabbitMQ Health Check Report"
echo "================================"
echo "π
Date: $(date)"
echo ""
# π₯ Service Health
echo -e "${BLUE}π₯ Service Health${NC}"
if systemctl is-active --quiet rabbitmq-server; then
echo -e "β
RabbitMQ service is ${GREEN}running${NC}"
else
echo -e "β RabbitMQ service is ${RED}not running${NC}"
exit 1
fi
# πΎ Memory Usage
echo -e "\n${BLUE}πΎ Memory Usage${NC}"
MEMORY_TOTAL=$(free -m | grep Mem | awk '{print $2}')
MEMORY_USED=$(free -m | grep Mem | awk '{print $3}')
MEMORY_PERCENT=$(echo "scale=1; $MEMORY_USED * 100 / $MEMORY_TOTAL" | bc)
echo "Total Memory: ${MEMORY_TOTAL}MB"
echo "Used Memory: ${MEMORY_USED}MB (${MEMORY_PERCENT}%)"
if (( $(echo "$MEMORY_PERCENT > 80" | bc -l) )); then
echo -e "β οΈ ${YELLOW}High memory usage!${NC}"
fi
# π½ Disk Usage
echo -e "\n${BLUE}π½ Disk Usage${NC}"
DISK_USAGE=$(df -h /var/lib/rabbitmq | awk 'NR==2 {print $5}' | sed 's/%//')
echo "RabbitMQ Data Directory: ${DISK_USAGE}% used"
if (( DISK_USAGE > 80 )); then
echo -e "β οΈ ${YELLOW}High disk usage!${NC}"
fi
# π Connection Count
echo -e "\n${BLUE}π Connections${NC}"
CONNECTION_COUNT=$(rabbitmqctl list_connections 2>/dev/null | grep -v "Listing connections" | wc -l)
echo "Active connections: $CONNECTION_COUNT"
# π Queue Information
echo -e "\n${BLUE}π Top 10 Queues by Message Count${NC}"
rabbitmqctl list_queues name messages 2>/dev/null | grep -v "Listing queues" | sort -k2 -nr | head -10
# π Performance Metrics
echo -e "\n${BLUE}π Performance Metrics${NC}"
RABBIT_STATUS=$(rabbitmqctl status 2>/dev/null)
echo "Node uptime: $(echo "$RABBIT_STATUS" | grep uptime | head -1)"
echo ""
echo "β
Health check completed!"
EOF
# Make executable
sudo chmod +x /usr/local/bin/rabbitmq-health-check.sh
# Run the health check
sudo /usr/local/bin/rabbitmq-health-check.shπ Symptoms:
- Connection refused errors
- Timeouts when connecting
π οΈ Solutions:
# 1. Check if service is running
sudo systemctl status rabbitmq-server
# 2. If not running, start it
sudo systemctl start rabbitmq-server
# 3. Check firewall
sudo iptables -L | grep 5672
# 4. Check logs for errors
sudo tail -f /var/log/rabbitmq/rabbit@$(hostname).log
# 5. Test local connection
rabbitmqctl statusπ Symptoms:
- System becomes slow
- RabbitMQ stops accepting connections
- Memory alarms triggered
π οΈ Solutions:
# 1. Check memory watermark
rabbitmqctl status | grep memory
# 2. Lower memory threshold temporarily
rabbitmqctl set_vm_memory_high_watermark 0.4
# 3. Purge unnecessary queues
rabbitmqctl purge_queue QUEUE_NAME
# 4. Restart RabbitMQ if critically low
sudo systemctl restart rabbitmq-serverπ Symptoms:
- Slow message processing
- High queue depths
- Consumer lag
π οΈ Solutions:
# 1. Check queue modes
rabbitmqctl list_queues name arguments | grep queue-mode
# 2. Enable lazy queues for high throughput
rabbitmqctl set_policy lazy-queue ".*" '{"queue-mode":"lazy"}' --apply-to queues
# 3. Check consumer prefetch
rabbitmqctl list_channels pid prefetch_count
# 4. Monitor message rates
watch -n 1 'rabbitmqctl list_queues name messages_ready messages_unacknowledged'# Install performance testing tool
wget https://github.com/rabbitmq/rabbitmq-perf-test/releases/latest/download/perf-test-latest.jar
# Simple performance test
java -jar perf-test-latest.jar \
--uri amqp://admin:mypassword123@YOUR_INSTANCE_IP:5672 \
--producers 10 \
--consumers 10 \
--rate 1000 \
--time 60
# High-throughput test
java -jar perf-test-latest.jar \
--uri amqp://admin:mypassword123@YOUR_INSTANCE_IP:5672 \
--producers 50 \
--consumers 50 \
--rate 10000 \
--time 300 \
--queue-pattern 'perf-test-%d' \
--queue-pattern-from 1 \
--queue-pattern-to 10Create a simple web dashboard to monitor your RabbitMQ:
# Install Node.js monitoring dashboard
sudo yum install -y nodejs npm
# Create monitoring app
mkdir rabbitmq-monitor
cd rabbitmq-monitor
npm init -y
npm install express axios
# Create simple dashboard
cat > app.js << 'EOF'
const express = require('express');
const axios = require('axios');
const app = express();
// RabbitMQ Management API
const RABBITMQ_API = 'http://admin:mypassword123@localhost:15672/api';
app.get('/', (req, res) => {
res.send(`
<html>
<head>
<title>π° RabbitMQ Monitor</title>
<meta http-equiv="refresh" content="5">
<style>
body { font-family: Arial; margin: 20px; }
.metric { background: #f5f5f5; padding: 10px; margin: 10px 0; border-radius: 5px; }
.healthy { border-left: 5px solid green; }
.warning { border-left: 5px solid orange; }
.error { border-left: 5px solid red; }
</style>
</head>
<body>
<h1>π° RabbitMQ Dashboard</h1>
<div id="metrics"></div>
<script>
fetch('/api/overview')
.then(r => r.json())
.then(data => {
document.getElementById('metrics').innerHTML =
'<div class="metric healthy">π Total Queues: ' + data.object_totals.queues + '</div>' +
'<div class="metric healthy">π Connections: ' + data.object_totals.connections + '</div>' +
'<div class="metric healthy">π¨ Messages: ' + data.queue_totals.messages + '</div>';
});
</script>
</body>
</html>
`);
});
app.get('/api/overview', async (req, res) => {
try {
const response = await axios.get(`${RABBITMQ_API}/overview`);
res.json(response.data);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.listen(8080, () => {
console.log('π Dashboard running at http://localhost:8080');
});
EOF
# Run the dashboard
node app.js &Set up automated alerts for critical issues:
# Create alert script
sudo tee /usr/local/bin/rabbitmq-alerts.sh << 'EOF'
#!/bin/bash
# π§ Email settings (configure these)
EMAIL="your-email@example.com"
SUBJECT_PREFIX="π¨ RabbitMQ Alert"
# π― Thresholds
MEMORY_THRESHOLD=80
DISK_THRESHOLD=80
QUEUE_DEPTH_THRESHOLD=10000
# πΎ Check memory usage
MEMORY_PERCENT=$(free | grep Mem | awk '{printf "%.0f", $3/$2 * 100.0}')
if [ $MEMORY_PERCENT -gt $MEMORY_THRESHOLD ]; then
echo "High memory usage: ${MEMORY_PERCENT}%" | mail -s "${SUBJECT_PREFIX}: High Memory" $EMAIL
fi
# π½ Check disk usage
DISK_PERCENT=$(df -h /var/lib/rabbitmq | awk 'NR==2 {print $5}' | sed 's/%//')
if [ $DISK_PERCENT -gt $DISK_THRESHOLD ]; then
echo "High disk usage: ${DISK_PERCENT}%" | mail -s "${SUBJECT_PREFIX}: High Disk" $EMAIL
fi
# π Check queue depths
rabbitmqctl list_queues name messages 2>/dev/null | while read queue messages; do
if [[ $messages =~ ^[0-9]+$ ]] && [ $messages -gt $QUEUE_DEPTH_THRESHOLD ]; then
echo "Queue $queue has $messages messages" | mail -s "${SUBJECT_PREFIX}: High Queue Depth" $EMAIL
fi
done
EOF
sudo chmod +x /usr/local/bin/rabbitmq-alerts.sh
# Add to crontab (check every 5 minutes)
echo "*/5 * * * * /usr/local/bin/rabbitmq-alerts.sh" | sudo crontab -Understanding your actual usage is key to saving money:
# Monitor actual resource usage over time
cat > monitor-usage.sh << 'EOF'
#!/bin/bash
LOG_FILE="/tmp/rabbitmq-usage.log"
echo "$(date),$(free | grep Mem | awk '{printf "%.1f", $3/$2 * 100}'),$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1),$(df -h /var/lib/rabbitmq | awk 'NR==2 {print $5}' | sed 's/%//')" >> $LOG_FILE
EOF
chmod +x monitor-usage.sh
# Run every minute for a week to gather data
echo "* * * * * /path/to/monitor-usage.sh" | crontab -# Create cost calculator script
cat > cost-calculator.sh << 'EOF'
#!/bin/bash
# π° AWS EC2 On-Demand Pricing (USD per hour, us-east-1)
declare -A ON_DEMAND_PRICES=(
["t3.medium"]=0.0416
["c5n.large"]=0.108
["c5n.xlarge"]=0.216
["c5n.2xlarge"]=0.432
["c5n.4xlarge"]=0.864
)
# π΅ Reserved Instance Pricing (1-year term, no upfront)
declare -A RESERVED_PRICES=(
["t3.medium"]=0.0301
["c5n.large"]=0.078
["c5n.xlarge"]=0.156
["c5n.2xlarge"]=0.312
["c5n.4xlarge"]=0.624
)
# π― Spot Instance Pricing (approximate)
declare -A SPOT_PRICES=(
["t3.medium"]=0.0125
["c5n.large"]=0.0324
["c5n.xlarge"]=0.0648
["c5n.2xlarge"]=0.1296
["c5n.4xlarge"]=0.2592
)
echo "π° RabbitMQ Cost Calculator"
echo "=========================="
for instance in "${!ON_DEMAND_PRICES[@]}"; do
on_demand_monthly=$(echo "scale=2; ${ON_DEMAND_PRICES[$instance]} * 24 * 30" | bc)
reserved_monthly=$(echo "scale=2; ${RESERVED_PRICES[$instance]} * 24 * 30" | bc)
spot_monthly=$(echo "scale=2; ${SPOT_PRICES[$instance]} * 24 * 30" | bc)
reserved_savings=$(echo "scale=1; ($on_demand_monthly - $reserved_monthly) / $on_demand_monthly * 100" | bc)
spot_savings=$(echo "scale=1; ($on_demand_monthly - $spot_monthly) / $on_demand_monthly * 100" | bc)
echo ""
echo "π₯οΈ $instance:"
echo " On-Demand: \$${on_demand_monthly}/month"
echo " Reserved: \$${reserved_monthly}/month (${reserved_savings}% savings)"
echo " Spot: \$${spot_monthly}/month (${spot_savings}% savings)"
done
EOF
chmod +x cost-calculator.sh
./cost-calculator.shSet up automatic scaling based on queue depth:
# Create auto-scaling policy
aws autoscaling create-auto-scaling-group \
--auto-scaling-group-name rabbitmq-asg \
--launch-template LaunchTemplateName=rabbitmq-template,Version=1 \
--min-size 1 \
--max-size 5 \
--desired-capacity 2 \
--target-group-arns arn:aws:elasticloadbalancing:region:account:targetgroup/rabbitmq-targets/50dc6c495c0c9188
# Create scaling policies
aws autoscaling put-scaling-policy \
--auto-scaling-group-name rabbitmq-asg \
--policy-name scale-up \
--policy-type StepScaling \
--adjustment-type ChangeInCapacity \
--step-adjustments MetricIntervalLowerBound=0,ScalingAdjustment=1
# Create CloudWatch alarm
aws cloudwatch put-metric-alarm \
--alarm-name rabbitmq-high-queue-depth \
--alarm-description "Scale up when queue depth is high" \
--metric-name QueueDepth \
--namespace AWS/RabbitMQ \
--statistic Average \
--period 300 \
--threshold 1000 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 2# Use gp3 volumes for better cost/performance
aws ec2 create-volume \
--size 100 \
--volume-type gp3 \
--iops 3000 \
--throughput 125 \
--availability-zone us-east-1a \
--tag-specifications 'ResourceType=volume,Tags=[{Key=Name,Value=rabbitmq-data}]'
# Enable EBS optimization
aws ec2 modify-instance-attribute \
--instance-id i-1234567890abcdef0 \
--ebs-optimizedFor predictable workloads, use scheduled scaling:
# Scale up during business hours
aws autoscaling put-scheduled-update-group-action \
--auto-scaling-group-name rabbitmq-asg \
--scheduled-action-name scale-up-business-hours \
--recurrence "0 8 * * MON-FRI" \
--desired-capacity 3
# Scale down after hours
aws autoscaling put-scheduled-update-group-action \
--auto-scaling-group-name rabbitmq-asg \
--scheduled-action-name scale-down-after-hours \
--recurrence "0 18 * * MON-FRI" \
--desired-capacity 1For non-critical workloads, use spot instances to save up to 90%:
# Create spot fleet configuration
cat > spot-fleet-config.json << 'EOF'
{
"SpotFleetRequestConfig": {
"IamFleetRole": "arn:aws:iam::account:role/aws-ec2-spot-fleet-tagging-role",
"AllocationStrategy": "diversified",
"TargetCapacity": 2,
"SpotPrice": "0.05",
"LaunchSpecifications": [
{
"ImageId": "ami-0abcdef1234567890",
"InstanceType": "c5n.large",
"KeyName": "my-key-pair",
"SecurityGroups": [{"GroupId": "sg-12345678"}],
"SubnetId": "subnet-12345678",
"UserData": "base64-encoded-startup-script"
}
]
}
}
EOF
# Request spot fleet
aws ec2 request-spot-fleet --spot-fleet-request-config file://spot-fleet-config.jsonCreate a cost tracking dashboard:
# Create cost monitoring script
cat > cost-monitor.sh << 'EOF'
#!/bin/bash
# Get current month's usage
CURRENT_MONTH=$(date +%Y-%m)
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
# Get cost data
aws ce get-cost-and-usage \
--time-period Start=${CURRENT_MONTH}-01,End=$(date +%Y-%m-%d) \
--granularity MONTHLY \
--metrics BlendedCost \
--group-by Type=DIMENSION,Key=SERVICE \
--filter file://cost-filter.json
# Cost filter for EC2 only
cat > cost-filter.json << 'FILTER'
{
"Dimensions": {
"Key": "SERVICE",
"Values": ["Amazon Elastic Compute Cloud - Compute"]
}
}
FILTER
echo "π° Current month EC2 costs:"
aws ce get-cost-and-usage \
--time-period Start=${CURRENT_MONTH}-01,End=$(date +%Y-%m-%d) \
--granularity MONTHLY \
--metrics BlendedCost \
--filter file://cost-filter.json \
--query 'ResultsByTime[0].Total.BlendedCost.Amount' \
--output text
# Clean up
rm cost-filter.json
EOF
chmod +x cost-monitor.sh- Use appropriate EC2 instance types (c5n family for network performance)
- Configure proper queue arguments (
x-queue-mode: lazyfor high throughput) - Implement message batching for bulk operations
- Use connection pooling for high-concurrency applications
- Configure appropriate prefetch values
- Always use SSL/TLS in production
- Implement proper authentication and authorization
- Use VPC security groups to restrict access
- Regularly update RabbitMQ and dependencies
- Monitor for suspicious activity
- Implement proper error handling and retry logic
- Use message acknowledgments appropriately
- Set up monitoring and alerting
- Implement health checks
- Plan for disaster recovery
- Right-size your instances based on actual usage
- Use spot instances for non-critical workloads
- Consider reserved instances for predictable workloads
- Monitor resource utilization regularly
- Implement auto-scaling where appropriate
- Set up comprehensive logging
- Monitor queue depths and processing rates
- Track connection counts and resource usage
- Implement alerting for critical metrics
- Regular performance benchmarking
Congratulations! π You've now learned how to:
- β Set up RabbitMQ on AWS EC2 (both single and cluster)
- β Write applications in Java, Node.js/TypeScript, and Python
- β Monitor and troubleshoot your deployment
- β Optimize for performance and cost
- π Performance: Properly configured RabbitMQ on EC2 can handle 100k+ messages/second
- π° Cost: Self-hosting can save 60-80% compared to managed services
- π Scalability: Easy to scale horizontally with clustering
- π‘οΈ Reliability: Proper configuration ensures high availability
- π§ͺ Test Everything: Start with a small instance and test your application
- π Monitor: Set up monitoring before going to production
- π Secure: Implement proper security measures
- π Scale: Use the patterns you've learned to scale as needed
- π RabbitMQ Documentation
- π¬ RabbitMQ Community
- π GitHub Issues
Remember: Always test thoroughly in a staging environment before deploying to production, and continuously monitor your setup to ensure optimal performance! π―
Happy messaging! π°β¨