Complete guide to understanding message flow, publisher/consumer patterns, and error handling in this RabbitMQ setup.
- Message Publishing Flow
- Message Consumption Flow
- Dead-Letter Queue (DLQ) Workflow
- Error Handling & Edge Cases
- PHP Implementation Examples
- Monitoring & Debugging
graph LR
P[Publisher] -->|"publish()"| E[product.exchange<br/>topic]
E -->|"product.create"| Q1[product.create<br/>queue]
E -->|"product.update"| Q2[product.update<br/>queue]
E -->|"product.delete"| Q3[product.delete<br/>queue]
E -->|"product.inventory.*"| Q4[product.inventory<br/>queue]
E -.->|"no route match"| P
Q1 --> C1[Consumer]
Q2 --> C2[Consumer]
Q3 --> C3[Consumer]
Q4 --> C4[Consumer]
style E fill:#ff9800
style Q1 fill:#4caf50
style Q2 fill:#4caf50
style Q3 fill:#4caf50
style Q4 fill:#4caf50
style P fill:#2196f3
Publisher → product.exchange → Routing Key Match → Queue → Consumer
↓ (on routing failure)
Returns to Publisher (if mandatory=true)
$connection = new AMQPConnection([
'host' => getenv('RABBITMQ_HOST'),
'port' => 5672,
'vhost' => '/product_sync',
'login' => getenv('RABBITMQ_PUBLISHER_USER'), // 'publisher'
'password' => getenv('RABBITMQ_PUBLISHER_PASS')
]);
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('product.exchange');// Publish to product.create queue
$message = json_encode([
'product_id' => 12345,
'name' => 'Example Product',
'price' => 29.99,
'timestamp' => time()
]);
$routingKey = 'product.create';
$exchange->publish(
$message,
$routingKey,
AMQP_NOPARAM,
[
'delivery_mode' => 2, // Persistent (survives broker restart)
'content_type' => 'application/json',
'timestamp' => time(),
'app_id' => 'product-sync-publisher'
]
);The product.exchange (type: topic) matches the routing key:
| Routing Key | Destination Queue |
|---|---|
product.create |
product.create |
product.update |
product.update |
product.delete |
product.delete |
product.inventory.* |
product.inventory |
Quorum Queue Properties:
- Durable: Message survives broker restart
- Replicated: (For multi-node clusters)
- Disk-backed:
x-max-in-memory-length: 0forces disk storage - TTL: 24 hours (86400000ms) before auto-deletion
- Max Length: 100,000 messages
- Overflow:
reject-publish(publisher gets error if queue full)
graph TD
Q[Queue] -->|"fetch message"| C[Consumer]
C -->|"process"| D{Processing<br/>Result}
D -->|"success"| ACK[ACK]
D -->|"temporary error"| NACK[NACK<br/>requeue]
D -->|"permanent error"| REJ[REJECT<br/>no requeue]
ACK -->|"remove from queue"| DONE[✓ Complete]
NACK -->|"retry"| Q
REJ -->|"dead-letter"| DLX[product.dlx]
Q -.->|"5 delivery attempts"| DLX
Q -.->|"24h TTL expired"| DLX
DLX --> DLQ[product.failed<br/>DLQ]
style D fill:#ff9800
style ACK fill:#4caf50
style NACK fill:#ffc107
style REJ fill:#f44336
style DLX fill:#9c27b0
style DLQ fill:#e91e63
Queue → Consumer Fetches → Process Message → ACK/NACK/REJECT
↓ (on NACK/REJECT)
Requeue or Dead-Letter
$connection = new AMQPConnection([
'host' => getenv('RABBITMQ_HOST'),
'port' => 5672,
'vhost' => '/product_sync',
'login' => getenv('RABBITMQ_CONSUMER_USER'), // 'consumer'
'password' => getenv('RABBITMQ_CONSUMER_PASS')
]);
$connection->connect();
$channel = new AMQPChannel($connection);
$queue = new AMQPQueue($channel);
$queue->setName('product.create');// Option A: Blocking consume (recommended for workers)
$queue->consume(function(AMQPEnvelope $message, AMQPQueue $queue) {
processMessage($message, $queue);
}, AMQP_NOPARAM);
// Option B: Get single message (for testing)
$message = $queue->get(AMQP_AUTOACK);function processMessage(AMQPEnvelope $message, AMQPQueue $queue) {
try {
$data = json_decode($message->getBody(), true);
// Validate message
if (!isset($data['product_id'])) {
throw new InvalidArgumentException('Missing product_id');
}
// Process the product creation
createProductInDatabase($data);
// SUCCESS: Acknowledge message (removes from queue)
$queue->ack($message->getDeliveryTag());
echo "✓ Processed product #{$data['product_id']}\n";
} catch (InvalidArgumentException $e) {
// PERMANENT ERROR: Reject without requeue (goes to DLQ)
$queue->reject($message->getDeliveryTag(), AMQP_NOPARAM);
echo "✗ Invalid message: {$e->getMessage()}\n";
} catch (Exception $e) {
// TEMPORARY ERROR: Nack with requeue (retry)
$queue->nack($message->getDeliveryTag(), AMQP_REQUEUE);
echo "⚠ Temporary error: {$e->getMessage()} - will retry\n";
}
}| Action | PHP Command | Effect | Use Case |
|---|---|---|---|
| ACK | $queue->ack($deliveryTag) |
Remove message from queue | Successfully processed |
| NACK | $queue->nack($deliveryTag, AMQP_REQUEUE) |
Requeue message for retry | Temporary failure (DB down, network issue) |
| REJECT | $queue->reject($deliveryTag, AMQP_NOPARAM) |
Send to DLQ (no requeue) | Permanent failure (invalid data, unrecoverable) |
| AUTO-ACK | AMQP_AUTOACK flag |
Auto-remove on delivery |
graph TD
Q1[product.create<br/>queue] --> DLX{Dead-Letter<br/>Triggers}
Q2[product.update<br/>queue] --> DLX
Q3[product.delete<br/>queue] --> DLX
Q4[product.inventory<br/>queue] --> DLX
DLX -->|"1. x-delivery-limit: 5"| T1[5 failed attempts]
DLX -->|"2. explicit reject"| T2[REJECT no requeue]
DLX -->|"3. TTL expired"| T3[24h unprocessed]
T1 --> DLXE[product.dlx<br/>fanout exchange]
T2 --> DLXE
T3 --> DLXE
DLXE --> DLQ[product.failed<br/>queue]
DLQ -->|"manual review"| INSPECT[Inspect & Fix]
INSPECT -->|"option 1"| REPUB[Republish to<br/>original queue]
INSPECT -->|"option 2"| DISC[Discard message]
INSPECT -->|"option 3"| LOG[Log & alert]
DLQ -.->|"7d TTL"| AUTO[Auto-delete]
DLQ -.->|"50k max-length"| DROP[Drop oldest]
style DLX fill:#9c27b0
style DLXE fill:#9c27b0
style DLQ fill:#e91e63
style T1 fill:#f44336
style T2 fill:#f44336
style T3 fill:#f44336
Messages are sent to product.failed queue via product.dlx exchange in these cases:
Queue Config: x-delivery-limit: 5
If a consumer NACKs or REJECTs (with requeue) a message 5 times, it's automatically dead-lettered.
Example:
// First 4 attempts: NACK with requeue
$queue->nack($message->getDeliveryTag(), AMQP_REQUEUE);
// 5th attempt: Automatically goes to product.failed (dead-letter exchange)// Consumer decides message is permanently invalid
$queue->reject($message->getDeliveryTag(), AMQP_NOPARAM);
// → Immediately goes to product.failedQueue TTL: 24 hours (86400000ms)
If a message sits in product.create for 24 hours unprocessed:
product.create (24h expired) → product.dlx → product.failed
Queue Max Length: 100,000 messages
Overflow: reject-publish
Note: With reject-publish, publisher gets an error. This doesn't trigger DLQ - the message never enters the queue.
| Property | Value | Purpose |
|---|---|---|
| TTL | 7 days (604800000ms) | Keep failed messages for investigation |
| Max Length | 50,000 messages | Prevent infinite growth |
| Overflow | drop-head |
Drop oldest messages when full |
| No DLX | Not configured | Prevents circular dead-lettering |
// Connect as consumer
$connection = new AMQPConnection([
'host' => getenv('RABBITMQ_HOST'),
'port' => 5672,
'vhost' => '/product_sync',
'login' => getenv('RABBITMQ_CONSUMER_USER'),
'password' => getenv('RABBITMQ_CONSUMER_PASS')
]);
$connection->connect();
$channel = new AMQPChannel($connection);
$queue = new AMQPQueue($channel);
$queue->setName('product.failed');
// Inspect failed messages
while ($message = $queue->get(AMQP_NOPARAM)) {
echo "Failed Message:\n";
echo " Body: {$message->getBody()}\n";
echo " Routing Key: {$message->getRoutingKey()}\n";
echo " Exchange: {$message->getExchangeName()}\n";
// Headers contain original queue info
$headers = $message->getHeaders();
if (isset($headers['x-first-death-queue'])) {
echo " Original Queue: {$headers['x-first-death-queue']}\n";
}
// Option 1: Fix and republish
$data = json_decode($message->getBody(), true);
$data['retry_count'] = ($data['retry_count'] ?? 0) + 1;
republishToOriginalQueue($data, $headers['x-first-death-queue']);
$queue->ack($message->getDeliveryTag());
// Option 2: Log and discard
logToFile($message);
$queue->ack($message->getDeliveryTag());
// Option 3: Keep for manual review (don't ACK)
break;
}Scenario: Consumer receives message but crashes before ACK/NACK.
$queue->consume(function(AMQPEnvelope $message, AMQPQueue $queue) {
// Message delivered, consumer has it
$data = json_decode($message->getBody(), true);
// CRASH! Process killed here
// exit(1);
// Never reached ACK
$queue->ack($message->getDeliveryTag());
});What Happens:
- RabbitMQ marks message as "unacked"
- When TCP connection closes, RabbitMQ requeues the message
- Another consumer (or same consumer after restart) gets it again
Prevention:
// Use prefetch to limit unacked messages
$channel->setPrefetchCount(1); // Only fetch 1 message at a time
// Implement graceful shutdown
pcntl_signal(SIGTERM, function() use ($queue, $connection) {
echo "Shutting down gracefully...\n";
$queue->cancel(); // Stop consuming
$connection->disconnect();
exit(0);
});Scenario: Publisher sends message but network drops before confirmation.
try {
$exchange->publish($message, $routingKey, AMQP_NOPARAM, [
'delivery_mode' => 2
]);
// Network failure here - did message arrive?
} catch (AMQPConnectionException $e) {
echo "Connection lost: {$e->getMessage()}\n";
// Should we retry? Is it a duplicate?
}What Happens:
- Without Publisher Confirms: You don't know if message was stored
- With Publisher Confirms: (Requires advanced setup) RabbitMQ ACKs receipt
Solution: Idempotent Messages
// Include unique ID in message
$message = json_encode([
'message_id' => uniqid('msg_', true), // Unique identifier
'product_id' => 12345,
'name' => 'Example Product'
]);
// Consumer checks for duplicates
function processMessage(AMQPEnvelope $message, AMQPQueue $queue) {
$data = json_decode($message->getBody(), true);
// Check if already processed
if (isDuplicateMessage($data['message_id'])) {
echo "Duplicate message, skipping\n";
$queue->ack($message->getDeliveryTag());
return;
}
// Process and record
createProductInDatabase($data);
recordProcessedMessage($data['message_id']);
$queue->ack($message->getDeliveryTag());
}Scenario: Queue has 100,000 messages and new message arrives.
// Publisher tries to send when queue is full
try {
$exchange->publish($message, 'product.create', AMQP_NOPARAM, [
'delivery_mode' => 2
]);
} catch (AMQPChannelException $e) {
// Error: basic.publish: queue 'product.create' in vhost '/product_sync'
// has a max-length policy and the queue is full
echo "Queue full: {$e->getMessage()}\n";
// Options:
// 1. Wait and retry
sleep(5);
retry();
// 2. Use different queue (if available)
$exchange->publish($message, 'product.create.overflow', ...);
// 3. Store locally and replay later
saveToLocalQueue($message);
}Why This Happens:
- Policy:
overflow: reject-publish - Queue is at 100,000 messages
- Consumers are slow or stopped
Prevention:
# Monitor queue depth
docker exec rabbitmq rabbitmqctl list_queues name messages
# Scale consumers horizontally
docker compose scale consumer=5Scenario: Consumer receives unparseable or invalid data.
$queue->consume(function(AMQPEnvelope $message, AMQPQueue $queue) {
try {
// Attempt to parse
$data = json_decode($message->getBody(), true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new RuntimeException('Invalid JSON: ' . json_last_error_msg());
}
if (!isset($data['product_id'])) {
throw new InvalidArgumentException('Missing required field: product_id');
}
processProduct($data);
$queue->ack($message->getDeliveryTag());
} catch (InvalidArgumentException $e) {
// POISON MESSAGE: Will never be valid, don't retry
echo "Poison message detected: {$e->getMessage()}\n";
echo "Body: {$message->getBody()}\n";
// Send to DLQ immediately
$queue->reject($message->getDeliveryTag(), AMQP_NOPARAM);
// Optionally log for investigation
file_put_contents(
'/var/log/poison_messages.log',
date('Y-m-d H:i:s') . " - {$e->getMessage()} - {$message->getBody()}\n",
FILE_APPEND
);
}
});Scenario: Message is valid but database operation fails.
function processMessage(AMQPEnvelope $message, AMQPQueue $queue) {
$pdo = new PDO(/* ... */);
try {
$data = json_decode($message->getBody(), true);
$pdo->beginTransaction();
// Insert product
$stmt = $pdo->prepare('INSERT INTO products (id, name, price) VALUES (?, ?, ?)');
$stmt->execute([$data['product_id'], $data['name'], $data['price']]);
// Update inventory
$stmt = $pdo->prepare('UPDATE inventory SET stock = stock + ? WHERE product_id = ?');
$stmt->execute([$data['quantity'], $data['product_id']]);
$pdo->commit();
// Only ACK after successful DB commit
$queue->ack($message->getDeliveryTag());
} catch (PDOException $e) {
$pdo->rollBack();
// Check if temporary or permanent error
if ($e->getCode() == '23000') {
// Duplicate key - probably already processed
echo "Duplicate entry, skipping\n";
$queue->ack($message->getDeliveryTag());
} elseif ($e->getCode() == 'HY000') {
// Connection error - retry
echo "DB connection error, requeueing\n";
$queue->nack($message->getDeliveryTag(), AMQP_REQUEUE);
} else {
// Unknown error - send to DLQ for investigation
echo "DB error: {$e->getMessage()}\n";
$queue->reject($message->getDeliveryTag(), AMQP_NOPARAM);
}
}
}Scenario: Long-running consumer accumulates memory.
$processedCount = 0;
$maxMessages = 1000; // Restart after processing 1000 messages
$queue->consume(function(AMQPEnvelope $message, AMQPQueue $queue) use (&$processedCount, $maxMessages) {
processMessage($message, $queue);
$processedCount++;
// Periodic restart to prevent memory leaks
if ($processedCount >= $maxMessages) {
echo "Processed {$processedCount} messages, restarting consumer...\n";
$queue->cancel(); // Stop consuming
return false; // Exit consume loop
}
// Force garbage collection periodically
if ($processedCount % 100 === 0) {
gc_collect_cycles();
}
}, AMQP_NOPARAM);
// Disconnect and let process manager restart
$connection->disconnect();
exit(0); // Systemd/supervisor will restartScenario: RabbitMQ restarts during operations.
Publisher Impact:
try {
$exchange->publish($message, $routingKey, AMQP_NOPARAM, ['delivery_mode' => 2]);
} catch (AMQPConnectionException $e) {
echo "Connection lost: {$e->getMessage()}\n";
// Reconnect with exponential backoff
$retries = 0;
while ($retries < 5) {
sleep(pow(2, $retries)); // 1s, 2s, 4s, 8s, 16s
try {
$connection->reconnect();
$exchange->publish($message, $routingKey, AMQP_NOPARAM, ['delivery_mode' => 2]);
echo "Reconnected successfully\n";
break;
} catch (AMQPConnectionException $e) {
$retries++;
echo "Reconnection attempt {$retries} failed\n";
}
}
}Consumer Impact:
- All unacked messages are requeued when connection drops
- Consumer must reconnect and resume
Queue Survival:
- Durable queues survive restart
- Persistent messages (
delivery_mode: 2) survive restart - Non-durable queues are deleted
- Non-persistent messages are lost
<?php
// publisher.php - Publish product creation event
require_once __DIR__ . '/vendor/autoload.php';
function publishProductCreated(array $productData): bool {
try {
// Connect as publisher
$connection = new AMQPConnection([
'host' => getenv('RABBITMQ_HOST'),
'port' => (int) getenv('RABBITMQ_PORT'),
'vhost' => '/product_sync',
'login' => getenv('RABBITMQ_PUBLISHER_USER'),
'password' => getenv('RABBITMQ_PUBLISHER_PASS'),
'read_timeout' => 3,
'write_timeout' => 3,
'connect_timeout' => 5
]);
if (!$connection->connect()) {
throw new RuntimeException('Failed to connect to RabbitMQ');
}
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('product.exchange');
$exchange->setType(AMQP_EX_TYPE_TOPIC);
// Build message
$message = json_encode([
'message_id' => uniqid('prod_', true),
'product_id' => $productData['id'],
'name' => $productData['name'],
'price' => $productData['price'],
'timestamp' => time(),
'source' => 'api'
]);
// Publish with persistence
$published = $exchange->publish(
$message,
'product.create', // Routing key
AMQP_NOPARAM,
[
'delivery_mode' => 2, // Persistent
'content_type' => 'application/json',
'timestamp' => time(),
'app_id' => 'product-api'
]
);
$connection->disconnect();
echo "✓ Published product #{$productData['id']}\n";
return true;
} catch (AMQPConnectionException $e) {
error_log("RabbitMQ connection error: {$e->getMessage()}");
return false;
} catch (AMQPChannelException $e) {
error_log("RabbitMQ channel error: {$e->getMessage()}");
return false;
} catch (AMQPExchangeException $e) {
error_log("RabbitMQ exchange error: {$e->getMessage()}");
return false;
}
}
// Usage
$product = [
'id' => 12345,
'name' => 'Example Product',
'price' => 29.99
];
if (publishProductCreated($product)) {
echo "Message sent successfully\n";
} else {
echo "Failed to send message\n";
}<?php
// consumer.php - Consume product creation events
require_once __DIR__ . '/vendor/autoload.php';
class ProductConsumer {
private $connection;
private $channel;
private $queue;
private $processedCount = 0;
public function __construct() {
$this->connection = new AMQPConnection([
'host' => getenv('RABBITMQ_HOST'),
'port' => (int) getenv('RABBITMQ_PORT'),
'vhost' => '/product_sync',
'login' => getenv('RABBITMQ_CONSUMER_USER'),
'password' => getenv('RABBITMQ_CONSUMER_PASS'),
'read_timeout' => 0, // Infinite for long-running consumers
'write_timeout' => 3,
'connect_timeout' => 5
]);
if (!$this->connection->connect()) {
throw new RuntimeException('Failed to connect to RabbitMQ');
}
$this->channel = new AMQPChannel($this->connection);
$this->channel->setPrefetchCount(1); // Process 1 message at a time
$this->queue = new AMQPQueue($this->channel);
$this->queue->setName('product.create');
// Setup signal handlers for graceful shutdown
pcntl_signal(SIGTERM, [$this, 'shutdown']);
pcntl_signal(SIGINT, [$this, 'shutdown']);
}
public function start() {
echo "Starting consumer for product.create queue...\n";
$this->queue->consume(
[$this, 'processMessage'],
AMQP_NOPARAM
);
}
public function processMessage(AMQPEnvelope $message, AMQPQueue $queue): bool {
// Allow signal handling
pcntl_signal_dispatch();
echo "\n--- New Message ---\n";
echo "Delivery Tag: {$message->getDeliveryTag()}\n";
echo "Routing Key: {$message->getRoutingKey()}\n";
try {
// Parse message
$data = json_decode($message->getBody(), true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new InvalidArgumentException('Invalid JSON: ' . json_last_error_msg());
}
// Validate required fields
$required = ['message_id', 'product_id', 'name', 'price'];
foreach ($required as $field) {
if (!isset($data[$field])) {
throw new InvalidArgumentException("Missing field: {$field}");
}
}
// Process the product
$this->createProduct($data);
// SUCCESS
$queue->ack($message->getDeliveryTag());
echo "✓ Processed message #{$data['message_id']}\n";
$this->processedCount++;
// Periodic restart (prevent memory leaks)
if ($this->processedCount >= 1000) {
echo "Processed 1000 messages, shutting down for restart...\n";
$this->shutdown();
return false;
}
return true;
} catch (InvalidArgumentException $e) {
// POISON MESSAGE - permanent error
echo "✗ Invalid message: {$e->getMessage()}\n";
$this->logFailedMessage($message, $e);
$queue->reject($message->getDeliveryTag(), AMQP_NOPARAM);
return true;
} catch (PDOException $e) {
// DATABASE ERROR
echo "⚠ Database error: {$e->getMessage()}\n";
// Check error type
if (in_array($e->getCode(), ['23000'])) {
// Duplicate - already processed
echo "Duplicate, acknowledging\n";
$queue->ack($message->getDeliveryTag());
} else {
// Temporary error - retry
echo "Temporary error, requeueing\n";
$queue->nack($message->getDeliveryTag(), AMQP_REQUEUE);
}
return true;
} catch (Exception $e) {
// UNKNOWN ERROR - retry
echo "⚠ Unexpected error: {$e->getMessage()}\n";
$queue->nack($message->getDeliveryTag(), AMQP_REQUEUE);
return true;
}
}
private function createProduct(array $data): void {
// Simulate database operation
$pdo = new PDO(
"mysql:host=localhost;dbname=products",
"user",
"password"
);
$pdo->beginTransaction();
try {
$stmt = $pdo->prepare(
'INSERT INTO products (id, name, price, created_at) VALUES (?, ?, ?, NOW())'
);
$stmt->execute([
$data['product_id'],
$data['name'],
$data['price']
]);
$pdo->commit();
} catch (PDOException $e) {
$pdo->rollBack();
throw $e;
}
}
private function logFailedMessage(AMQPEnvelope $message, Exception $e): void {
$logEntry = [
'timestamp' => date('Y-m-d H:i:s'),
'error' => $e->getMessage(),
'routing_key' => $message->getRoutingKey(),
'body' => $message->getBody(),
'headers' => $message->getHeaders()
];
file_put_contents(
'/var/log/rabbitmq_failed_messages.log',
json_encode($logEntry) . "\n",
FILE_APPEND
);
}
public function shutdown(): void {
echo "\nShutting down gracefully...\n";
$this->queue->cancel();
$this->connection->disconnect();
exit(0);
}
}
// Run consumer
try {
$consumer = new ProductConsumer();
$consumer->start();
} catch (Exception $e) {
echo "Consumer error: {$e->getMessage()}\n";
exit(1);
}<?php
// inspect-dlq.php - Inspect and process failed messages
require_once __DIR__ . '/vendor/autoload.php';
function inspectFailedMessages(): void {
$connection = new AMQPConnection([
'host' => getenv('RABBITMQ_HOST'),
'port' => (int) getenv('RABBITMQ_PORT'),
'vhost' => '/product_sync',
'login' => getenv('RABBITMQ_CONSUMER_USER'),
'password' => getenv('RABBITMQ_CONSUMER_PASS')
]);
$connection->connect();
$channel = new AMQPChannel($connection);
$queue = new AMQPQueue($channel);
$queue->setName('product.failed');
echo "Inspecting DLQ: product.failed\n";
echo "Total messages: " . $queue->declareQueue() . "\n\n";
$count = 0;
while ($message = $queue->get(AMQP_NOPARAM)) {
$count++;
echo "=== Failed Message #{$count} ===\n";
echo "Body: {$message->getBody()}\n";
echo "Routing Key: {$message->getRoutingKey()}\n";
$headers = $message->getHeaders();
if (isset($headers['x-first-death-reason'])) {
echo "Failure Reason: {$headers['x-first-death-reason']}\n";
}
if (isset($headers['x-first-death-queue'])) {
echo "Original Queue: {$headers['x-first-death-queue']}\n";
}
if (isset($headers['x-death'])) {
echo "Death History:\n";
print_r($headers['x-death']);
}
echo "\nOptions:\n";
echo "1. Republish to original queue\n";
echo "2. Discard message\n";
echo "3. Skip for manual review\n";
echo "Choose: ";
$choice = trim(fgets(STDIN));
switch ($choice) {
case '1':
republishMessage($message, $headers);
$queue->ack($message->getDeliveryTag());
echo "✓ Republished\n\n";
break;
case '2':
$queue->ack($message->getDeliveryTag());
echo "✓ Discarded\n\n";
break;
case '3':
echo "⊘ Skipped (message will remain in DLQ)\n\n";
break;
default:
echo "Invalid choice, skipping\n\n";
}
}
echo "Inspected {$count} failed messages\n";
$connection->disconnect();
}
function republishMessage(AMQPEnvelope $message, array $headers): void {
// Extract original queue from headers
$originalQueue = $headers['x-first-death-queue'] ?? 'product.create';
$connection = new AMQPConnection([
'host' => getenv('RABBITMQ_HOST'),
'port' => (int) getenv('RABBITMQ_PORT'),
'vhost' => '/product_sync',
'login' => getenv('RABBITMQ_PUBLISHER_USER'),
'password' => getenv('RABBITMQ_PUBLISHER_PASS')
]);
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('product.exchange');
$exchange->setType(AMQP_EX_TYPE_TOPIC);
// Republish to original routing key
$exchange->publish(
$message->getBody(),
$originalQueue,
AMQP_NOPARAM,
['delivery_mode' => 2]
);
$connection->disconnect();
}
inspectFailedMessages();# List all queues with message counts
docker exec rabbitmq rabbitmqctl list_queues name messages messages_ready messages_unacknowledged
# Detailed queue info
docker exec rabbitmq rabbitmqctl list_queues name durable auto_delete arguments policy
# Check dead-letter queue
docker exec rabbitmq rabbitmqctl list_queues | grep failed# List active consumers
docker exec rabbitmq rabbitmqctl list_consumers
# Check consumer prefetch
docker exec rabbitmq rabbitmqctl list_channels name prefetch_count# Management UI
http://localhost:15672
# Login: admin / aUGqMcaKHE/5GVdmuoXVxfb1unkl6Vp5MzUK3rdjJzU=
# View queues → Click queue name → Get messages// Enable debug logging in PHP
ini_set('display_errors', 1);
error_reporting(E_ALL);
// Check connection parameters
var_dump([
'host' => getenv('RABBITMQ_HOST'),
'port' => getenv('RABBITMQ_PORT'),
'vhost' => getenv('RABBITMQ_VHOST'),
'user' => getenv('RABBITMQ_CONSUMER_USER')
]);
// Test connection
try {
$connection = new AMQPConnection([
'host' => getenv('RABBITMQ_HOST'),
'port' => 5672,
'vhost' => '/product_sync',
'login' => getenv('RABBITMQ_CONSUMER_USER'),
'password' => getenv('RABBITMQ_CONSUMER_PASS')
]);
if ($connection->connect()) {
echo "✓ Connected successfully\n";
} else {
echo "✗ Connection failed\n";
}
} catch (Exception $e) {
echo "Error: {$e->getMessage()}\n";
}# Message rates
docker exec rabbitmq rabbitmqctl list_queues name messages_ready message_stats
# Connection stats
docker exec rabbitmq rabbitmqctl list_connections name state channels
# Memory usage
docker exec rabbitmq rabbitmqctl status | grep -A 10 "Memory"- ✅ Use
delivery_mode: 2for persistence - ✅ Include unique
message_idfor idempotency - ✅ Handle connection errors with retry logic
- ✅ Set reasonable timeouts
- ❌ Don't use AUTO-ACK mode
- ✅ Use
prefetch_count: 1for even distribution - ✅ ACK only after successful processing
- ✅ Use NACK for temporary errors (retry)
- ✅ Use REJECT for permanent errors (DLQ)
- ✅ Implement graceful shutdown
- ✅ Restart periodically to prevent memory leaks
- ❌ Don't use AUTO-ACK mode in production
- ✅ Distinguish temporary vs permanent errors
- ✅ Log poison messages before rejecting
- ✅ Monitor DLQ regularly
- ✅ Implement database transaction rollback
- ✅ Use exponential backoff for retries
- ❌ Don't requeue infinitely (use x-delivery-limit)
- ✅ Monitor queue depth
- ✅ Track message rates
- ✅ Alert on DLQ growth
- ✅ Check consumer lag
- ✅ Monitor memory usage
PUBLISH → ROUTE → QUEUE → CONSUME → PROCESS → ACK/NACK/REJECT
↓
(on reject)
↓
DEAD-LETTER EXCHANGE
↓
FAILED QUEUE (DLQ)
graph TD
START[Message Received] --> CHECK{Processing<br/>Outcome}
CHECK -->|"success"| ACK[✓ ACK<br/>remove from queue]
CHECK -->|"invalid format/data"| REJ[✗ REJECT<br/>send to DLQ immediately]
CHECK -->|"database error"| DB{Error<br/>Type}
CHECK -->|"network error"| NACK1[⚠ NACK<br/>requeue for retry]
CHECK -->|"unknown error"| NACK2[⚠ NACK<br/>requeue for retry]
DB -->|"duplicate key"| ACK2[✓ ACK<br/>already processed]
DB -->|"connection error"| NACK3[⚠ NACK<br/>requeue for retry]
REJ --> DLQ1[product.failed]
NACK1 -.->|"after 5 attempts"| DLQ2[product.failed]
NACK2 -.->|"after 5 attempts"| DLQ3[product.failed]
NACK3 -.->|"after 5 attempts"| DLQ4[product.failed]
style ACK fill:#4caf50
style ACK2 fill:#4caf50
style REJ fill:#f44336
style NACK1 fill:#ffc107
style NACK2 fill:#ffc107
style NACK3 fill:#ffc107
style DLQ1 fill:#e91e63
style DLQ2 fill:#e91e63
style DLQ3 fill:#e91e63
style DLQ4 fill:#e91e63
Message Received
├─ Valid & Processed Successfully → ACK
├─ Invalid Format/Data → REJECT (no requeue) → DLQ
├─ Database Error
│ ├─ Duplicate Key → ACK (already processed)
│ └─ Connection Error → NACK (requeue)
├─ Network Error → NACK (requeue)
└─ Unknown Error → NACK (requeue) → eventually DLQ after 5 attempts
# Connection
RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
RABBITMQ_VHOST=/product_sync
# Admin (full access)
RABBITMQ_USER=admin
RABBITMQ_PASS=aUGqMcaKHE/5GVdmuoXVxfb1unkl6Vp5MzUK3rdjJzU=
# Publisher (write-only)
RABBITMQ_PUBLISHER_USER=publisher
RABBITMQ_PUBLISHER_PASS=a0Eif5UY77MPOQelIi5lbCCHRIOPgiy4Q+EPHiHnKHc=
# Consumer (read-only)
RABBITMQ_CONSUMER_USER=consumer
RABBITMQ_CONSUMER_PASS=2QqdzOYVf2PbeIFfZm5PhNcf1ry3e9GL18vLglDvWFQ=