Skip to content

Instantly share code, notes, and snippets.

@nandordudas
Last active January 22, 2026 20:00
Show Gist options
  • Select an option

  • Save nandordudas/f0adcc943b963f49cc7cc61209edb468 to your computer and use it in GitHub Desktop.

Select an option

Save nandordudas/f0adcc943b963f49cc7cc61209edb468 to your computer and use it in GitHub Desktop.

RabbitMQ Message Flow & Error Handling Guide

Complete guide to understanding message flow, publisher/consumer patterns, and error handling in this RabbitMQ setup.


Table of Contents

  1. Message Publishing Flow
  2. Message Consumption Flow
  3. Dead-Letter Queue (DLQ) Workflow
  4. Error Handling & Edge Cases
  5. PHP Implementation Examples
  6. Monitoring & Debugging

Message Publishing Flow

Diagram

graph LR
    P[Publisher] -->|"publish()"| E[product.exchange<br/>topic]
    E -->|"product.create"| Q1[product.create<br/>queue]
    E -->|"product.update"| Q2[product.update<br/>queue]
    E -->|"product.delete"| Q3[product.delete<br/>queue]
    E -->|"product.inventory.*"| Q4[product.inventory<br/>queue]
    E -.->|"no route match"| P

    Q1 --> C1[Consumer]
    Q2 --> C2[Consumer]
    Q3 --> C3[Consumer]
    Q4 --> C4[Consumer]

    style E fill:#ff9800
    style Q1 fill:#4caf50
    style Q2 fill:#4caf50
    style Q3 fill:#4caf50
    style Q4 fill:#4caf50
    style P fill:#2196f3
Loading

Step-by-Step Process

Publisher → product.exchange → Routing Key Match → Queue → Consumer
                ↓ (on routing failure)
           Returns to Publisher (if mandatory=true)

1. Publisher Connects

$connection = new AMQPConnection([
    'host' => getenv('RABBITMQ_HOST'),
    'port' => 5672,
    'vhost' => '/product_sync',
    'login' => getenv('RABBITMQ_PUBLISHER_USER'),  // 'publisher'
    'password' => getenv('RABBITMQ_PUBLISHER_PASS')
]);
$connection->connect();

$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('product.exchange');

2. Publisher Sends Message

// Publish to product.create queue
$message = json_encode([
    'product_id' => 12345,
    'name' => 'Example Product',
    'price' => 29.99,
    'timestamp' => time()
]);

$routingKey = 'product.create';

$exchange->publish(
    $message,
    $routingKey,
    AMQP_NOPARAM,
    [
        'delivery_mode' => 2,        // Persistent (survives broker restart)
        'content_type' => 'application/json',
        'timestamp' => time(),
        'app_id' => 'product-sync-publisher'
    ]
);

3. Exchange Routes Message

The product.exchange (type: topic) matches the routing key:

Routing Key Destination Queue
product.create product.create
product.update product.update
product.delete product.delete
product.inventory.* product.inventory

4. Message Stored in Queue

Quorum Queue Properties:

  • Durable: Message survives broker restart
  • Replicated: (For multi-node clusters)
  • Disk-backed: x-max-in-memory-length: 0 forces disk storage
  • TTL: 24 hours (86400000ms) before auto-deletion
  • Max Length: 100,000 messages
  • Overflow: reject-publish (publisher gets error if queue full)

Message Consumption Flow

Diagram

graph TD
    Q[Queue] -->|"fetch message"| C[Consumer]
    C -->|"process"| D{Processing<br/>Result}

    D -->|"success"| ACK[ACK]
    D -->|"temporary error"| NACK[NACK<br/>requeue]
    D -->|"permanent error"| REJ[REJECT<br/>no requeue]

    ACK -->|"remove from queue"| DONE[✓ Complete]
    NACK -->|"retry"| Q
    REJ -->|"dead-letter"| DLX[product.dlx]

    Q -.->|"5 delivery attempts"| DLX
    Q -.->|"24h TTL expired"| DLX

    DLX --> DLQ[product.failed<br/>DLQ]

    style D fill:#ff9800
    style ACK fill:#4caf50
    style NACK fill:#ffc107
    style REJ fill:#f44336
    style DLX fill:#9c27b0
    style DLQ fill:#e91e63
Loading

Step-by-Step Process

Queue → Consumer Fetches → Process Message → ACK/NACK/REJECT
                                    ↓ (on NACK/REJECT)
                            Requeue or Dead-Letter

1. Consumer Connects

$connection = new AMQPConnection([
    'host' => getenv('RABBITMQ_HOST'),
    'port' => 5672,
    'vhost' => '/product_sync',
    'login' => getenv('RABBITMQ_CONSUMER_USER'),  // 'consumer'
    'password' => getenv('RABBITMQ_CONSUMER_PASS')
]);
$connection->connect();

$channel = new AMQPChannel($connection);
$queue = new AMQPQueue($channel);
$queue->setName('product.create');

2. Consumer Fetches Message

// Option A: Blocking consume (recommended for workers)
$queue->consume(function(AMQPEnvelope $message, AMQPQueue $queue) {
    processMessage($message, $queue);
}, AMQP_NOPARAM);

// Option B: Get single message (for testing)
$message = $queue->get(AMQP_AUTOACK);

3. Consumer Processes Message

function processMessage(AMQPEnvelope $message, AMQPQueue $queue) {
    try {
        $data = json_decode($message->getBody(), true);

        // Validate message
        if (!isset($data['product_id'])) {
            throw new InvalidArgumentException('Missing product_id');
        }

        // Process the product creation
        createProductInDatabase($data);

        // SUCCESS: Acknowledge message (removes from queue)
        $queue->ack($message->getDeliveryTag());

        echo "✓ Processed product #{$data['product_id']}\n";

    } catch (InvalidArgumentException $e) {
        // PERMANENT ERROR: Reject without requeue (goes to DLQ)
        $queue->reject($message->getDeliveryTag(), AMQP_NOPARAM);

        echo "✗ Invalid message: {$e->getMessage()}\n";

    } catch (Exception $e) {
        // TEMPORARY ERROR: Nack with requeue (retry)
        $queue->nack($message->getDeliveryTag(), AMQP_REQUEUE);

        echo "⚠ Temporary error: {$e->getMessage()} - will retry\n";
    }
}

4. Acknowledgment Actions

Action PHP Command Effect Use Case
ACK $queue->ack($deliveryTag) Remove message from queue Successfully processed
NACK $queue->nack($deliveryTag, AMQP_REQUEUE) Requeue message for retry Temporary failure (DB down, network issue)
REJECT $queue->reject($deliveryTag, AMQP_NOPARAM) Send to DLQ (no requeue) Permanent failure (invalid data, unrecoverable)
AUTO-ACK AMQP_AUTOACK flag Auto-remove on delivery ⚠️ DANGEROUS - no error handling

Dead-Letter Queue (DLQ) Workflow

Diagram

graph TD
    Q1[product.create<br/>queue] --> DLX{Dead-Letter<br/>Triggers}
    Q2[product.update<br/>queue] --> DLX
    Q3[product.delete<br/>queue] --> DLX
    Q4[product.inventory<br/>queue] --> DLX

    DLX -->|"1. x-delivery-limit: 5"| T1[5 failed attempts]
    DLX -->|"2. explicit reject"| T2[REJECT no requeue]
    DLX -->|"3. TTL expired"| T3[24h unprocessed]

    T1 --> DLXE[product.dlx<br/>fanout exchange]
    T2 --> DLXE
    T3 --> DLXE

    DLXE --> DLQ[product.failed<br/>queue]

    DLQ -->|"manual review"| INSPECT[Inspect & Fix]
    INSPECT -->|"option 1"| REPUB[Republish to<br/>original queue]
    INSPECT -->|"option 2"| DISC[Discard message]
    INSPECT -->|"option 3"| LOG[Log & alert]

    DLQ -.->|"7d TTL"| AUTO[Auto-delete]
    DLQ -.->|"50k max-length"| DROP[Drop oldest]

    style DLX fill:#9c27b0
    style DLXE fill:#9c27b0
    style DLQ fill:#e91e63
    style T1 fill:#f44336
    style T2 fill:#f44336
    style T3 fill:#f44336
Loading

When Messages Go to DLQ

Messages are sent to product.failed queue via product.dlx exchange in these cases:

1. Maximum Delivery Attempts Reached

Queue Config: x-delivery-limit: 5

If a consumer NACKs or REJECTs (with requeue) a message 5 times, it's automatically dead-lettered.

Example:

// First 4 attempts: NACK with requeue
$queue->nack($message->getDeliveryTag(), AMQP_REQUEUE);

// 5th attempt: Automatically goes to product.failed (dead-letter exchange)

2. Explicit Rejection Without Requeue

// Consumer decides message is permanently invalid
$queue->reject($message->getDeliveryTag(), AMQP_NOPARAM);
// → Immediately goes to product.failed

3. Message TTL Expired

Queue TTL: 24 hours (86400000ms)

If a message sits in product.create for 24 hours unprocessed:

product.create (24h expired) → product.dlx → product.failed

4. Queue Full (Max Length Exceeded)

Queue Max Length: 100,000 messages
Overflow: reject-publish

Note: With reject-publish, publisher gets an error. This doesn't trigger DLQ - the message never enters the queue.

DLQ Properties

Property Value Purpose
TTL 7 days (604800000ms) Keep failed messages for investigation
Max Length 50,000 messages Prevent infinite growth
Overflow drop-head Drop oldest messages when full
No DLX Not configured Prevents circular dead-lettering

Processing Failed Messages

// Connect as consumer
$connection = new AMQPConnection([
    'host' => getenv('RABBITMQ_HOST'),
    'port' => 5672,
    'vhost' => '/product_sync',
    'login' => getenv('RABBITMQ_CONSUMER_USER'),
    'password' => getenv('RABBITMQ_CONSUMER_PASS')
]);
$connection->connect();

$channel = new AMQPChannel($connection);
$queue = new AMQPQueue($channel);
$queue->setName('product.failed');

// Inspect failed messages
while ($message = $queue->get(AMQP_NOPARAM)) {
    echo "Failed Message:\n";
    echo "  Body: {$message->getBody()}\n";
    echo "  Routing Key: {$message->getRoutingKey()}\n";
    echo "  Exchange: {$message->getExchangeName()}\n";

    // Headers contain original queue info
    $headers = $message->getHeaders();
    if (isset($headers['x-first-death-queue'])) {
        echo "  Original Queue: {$headers['x-first-death-queue']}\n";
    }

    // Option 1: Fix and republish
    $data = json_decode($message->getBody(), true);
    $data['retry_count'] = ($data['retry_count'] ?? 0) + 1;
    republishToOriginalQueue($data, $headers['x-first-death-queue']);
    $queue->ack($message->getDeliveryTag());

    // Option 2: Log and discard
    logToFile($message);
    $queue->ack($message->getDeliveryTag());

    // Option 3: Keep for manual review (don't ACK)
    break;
}

Error Handling & Edge Cases

1. Consumer Crashes Mid-Processing

Scenario: Consumer receives message but crashes before ACK/NACK.

$queue->consume(function(AMQPEnvelope $message, AMQPQueue $queue) {
    // Message delivered, consumer has it
    $data = json_decode($message->getBody(), true);

    // CRASH! Process killed here
    // exit(1);

    // Never reached ACK
    $queue->ack($message->getDeliveryTag());
});

What Happens:

  1. RabbitMQ marks message as "unacked"
  2. When TCP connection closes, RabbitMQ requeues the message
  3. Another consumer (or same consumer after restart) gets it again

Prevention:

// Use prefetch to limit unacked messages
$channel->setPrefetchCount(1);  // Only fetch 1 message at a time

// Implement graceful shutdown
pcntl_signal(SIGTERM, function() use ($queue, $connection) {
    echo "Shutting down gracefully...\n";
    $queue->cancel();  // Stop consuming
    $connection->disconnect();
    exit(0);
});

2. Network Partition During Publish

Scenario: Publisher sends message but network drops before confirmation.

try {
    $exchange->publish($message, $routingKey, AMQP_NOPARAM, [
        'delivery_mode' => 2
    ]);
    // Network failure here - did message arrive?
} catch (AMQPConnectionException $e) {
    echo "Connection lost: {$e->getMessage()}\n";
    // Should we retry? Is it a duplicate?
}

What Happens:

  • Without Publisher Confirms: You don't know if message was stored
  • With Publisher Confirms: (Requires advanced setup) RabbitMQ ACKs receipt

Solution: Idempotent Messages

// Include unique ID in message
$message = json_encode([
    'message_id' => uniqid('msg_', true),  // Unique identifier
    'product_id' => 12345,
    'name' => 'Example Product'
]);

// Consumer checks for duplicates
function processMessage(AMQPEnvelope $message, AMQPQueue $queue) {
    $data = json_decode($message->getBody(), true);

    // Check if already processed
    if (isDuplicateMessage($data['message_id'])) {
        echo "Duplicate message, skipping\n";
        $queue->ack($message->getDeliveryTag());
        return;
    }

    // Process and record
    createProductInDatabase($data);
    recordProcessedMessage($data['message_id']);
    $queue->ack($message->getDeliveryTag());
}

3. Queue Full (Max Length Exceeded)

Scenario: Queue has 100,000 messages and new message arrives.

// Publisher tries to send when queue is full
try {
    $exchange->publish($message, 'product.create', AMQP_NOPARAM, [
        'delivery_mode' => 2
    ]);
} catch (AMQPChannelException $e) {
    // Error: basic.publish: queue 'product.create' in vhost '/product_sync'
    // has a max-length policy and the queue is full

    echo "Queue full: {$e->getMessage()}\n";

    // Options:
    // 1. Wait and retry
    sleep(5);
    retry();

    // 2. Use different queue (if available)
    $exchange->publish($message, 'product.create.overflow', ...);

    // 3. Store locally and replay later
    saveToLocalQueue($message);
}

Why This Happens:

  • Policy: overflow: reject-publish
  • Queue is at 100,000 messages
  • Consumers are slow or stopped

Prevention:

# Monitor queue depth
docker exec rabbitmq rabbitmqctl list_queues name messages

# Scale consumers horizontally
docker compose scale consumer=5

4. Message Format Invalid (Poison Message)

Scenario: Consumer receives unparseable or invalid data.

$queue->consume(function(AMQPEnvelope $message, AMQPQueue $queue) {
    try {
        // Attempt to parse
        $data = json_decode($message->getBody(), true);

        if (json_last_error() !== JSON_ERROR_NONE) {
            throw new RuntimeException('Invalid JSON: ' . json_last_error_msg());
        }

        if (!isset($data['product_id'])) {
            throw new InvalidArgumentException('Missing required field: product_id');
        }

        processProduct($data);
        $queue->ack($message->getDeliveryTag());

    } catch (InvalidArgumentException $e) {
        // POISON MESSAGE: Will never be valid, don't retry
        echo "Poison message detected: {$e->getMessage()}\n";
        echo "Body: {$message->getBody()}\n";

        // Send to DLQ immediately
        $queue->reject($message->getDeliveryTag(), AMQP_NOPARAM);

        // Optionally log for investigation
        file_put_contents(
            '/var/log/poison_messages.log',
            date('Y-m-d H:i:s') . " - {$e->getMessage()} - {$message->getBody()}\n",
            FILE_APPEND
        );
    }
});

5. Database Transaction Failure

Scenario: Message is valid but database operation fails.

function processMessage(AMQPEnvelope $message, AMQPQueue $queue) {
    $pdo = new PDO(/* ... */);

    try {
        $data = json_decode($message->getBody(), true);

        $pdo->beginTransaction();

        // Insert product
        $stmt = $pdo->prepare('INSERT INTO products (id, name, price) VALUES (?, ?, ?)');
        $stmt->execute([$data['product_id'], $data['name'], $data['price']]);

        // Update inventory
        $stmt = $pdo->prepare('UPDATE inventory SET stock = stock + ? WHERE product_id = ?');
        $stmt->execute([$data['quantity'], $data['product_id']]);

        $pdo->commit();

        // Only ACK after successful DB commit
        $queue->ack($message->getDeliveryTag());

    } catch (PDOException $e) {
        $pdo->rollBack();

        // Check if temporary or permanent error
        if ($e->getCode() == '23000') {
            // Duplicate key - probably already processed
            echo "Duplicate entry, skipping\n";
            $queue->ack($message->getDeliveryTag());

        } elseif ($e->getCode() == 'HY000') {
            // Connection error - retry
            echo "DB connection error, requeueing\n";
            $queue->nack($message->getDeliveryTag(), AMQP_REQUEUE);

        } else {
            // Unknown error - send to DLQ for investigation
            echo "DB error: {$e->getMessage()}\n";
            $queue->reject($message->getDeliveryTag(), AMQP_NOPARAM);
        }
    }
}

6. Consumer Memory Leak

Scenario: Long-running consumer accumulates memory.

$processedCount = 0;
$maxMessages = 1000;  // Restart after processing 1000 messages

$queue->consume(function(AMQPEnvelope $message, AMQPQueue $queue) use (&$processedCount, $maxMessages) {
    processMessage($message, $queue);

    $processedCount++;

    // Periodic restart to prevent memory leaks
    if ($processedCount >= $maxMessages) {
        echo "Processed {$processedCount} messages, restarting consumer...\n";
        $queue->cancel();  // Stop consuming
        return false;  // Exit consume loop
    }

    // Force garbage collection periodically
    if ($processedCount % 100 === 0) {
        gc_collect_cycles();
    }
}, AMQP_NOPARAM);

// Disconnect and let process manager restart
$connection->disconnect();
exit(0);  // Systemd/supervisor will restart

7. RabbitMQ Server Restart

Scenario: RabbitMQ restarts during operations.

Publisher Impact:

try {
    $exchange->publish($message, $routingKey, AMQP_NOPARAM, ['delivery_mode' => 2]);
} catch (AMQPConnectionException $e) {
    echo "Connection lost: {$e->getMessage()}\n";

    // Reconnect with exponential backoff
    $retries = 0;
    while ($retries < 5) {
        sleep(pow(2, $retries));  // 1s, 2s, 4s, 8s, 16s

        try {
            $connection->reconnect();
            $exchange->publish($message, $routingKey, AMQP_NOPARAM, ['delivery_mode' => 2]);
            echo "Reconnected successfully\n";
            break;
        } catch (AMQPConnectionException $e) {
            $retries++;
            echo "Reconnection attempt {$retries} failed\n";
        }
    }
}

Consumer Impact:

  • All unacked messages are requeued when connection drops
  • Consumer must reconnect and resume

Queue Survival:

  • Durable queues survive restart
  • Persistent messages (delivery_mode: 2) survive restart
  • Non-durable queues are deleted
  • Non-persistent messages are lost

PHP Implementation Examples

Complete Publisher Example

<?php
// publisher.php - Publish product creation event

require_once __DIR__ . '/vendor/autoload.php';

function publishProductCreated(array $productData): bool {
    try {
        // Connect as publisher
        $connection = new AMQPConnection([
            'host' => getenv('RABBITMQ_HOST'),
            'port' => (int) getenv('RABBITMQ_PORT'),
            'vhost' => '/product_sync',
            'login' => getenv('RABBITMQ_PUBLISHER_USER'),
            'password' => getenv('RABBITMQ_PUBLISHER_PASS'),
            'read_timeout' => 3,
            'write_timeout' => 3,
            'connect_timeout' => 5
        ]);

        if (!$connection->connect()) {
            throw new RuntimeException('Failed to connect to RabbitMQ');
        }

        $channel = new AMQPChannel($connection);
        $exchange = new AMQPExchange($channel);
        $exchange->setName('product.exchange');
        $exchange->setType(AMQP_EX_TYPE_TOPIC);

        // Build message
        $message = json_encode([
            'message_id' => uniqid('prod_', true),
            'product_id' => $productData['id'],
            'name' => $productData['name'],
            'price' => $productData['price'],
            'timestamp' => time(),
            'source' => 'api'
        ]);

        // Publish with persistence
        $published = $exchange->publish(
            $message,
            'product.create',  // Routing key
            AMQP_NOPARAM,
            [
                'delivery_mode' => 2,  // Persistent
                'content_type' => 'application/json',
                'timestamp' => time(),
                'app_id' => 'product-api'
            ]
        );

        $connection->disconnect();

        echo "✓ Published product #{$productData['id']}\n";
        return true;

    } catch (AMQPConnectionException $e) {
        error_log("RabbitMQ connection error: {$e->getMessage()}");
        return false;

    } catch (AMQPChannelException $e) {
        error_log("RabbitMQ channel error: {$e->getMessage()}");
        return false;

    } catch (AMQPExchangeException $e) {
        error_log("RabbitMQ exchange error: {$e->getMessage()}");
        return false;
    }
}

// Usage
$product = [
    'id' => 12345,
    'name' => 'Example Product',
    'price' => 29.99
];

if (publishProductCreated($product)) {
    echo "Message sent successfully\n";
} else {
    echo "Failed to send message\n";
}

Complete Consumer Example

<?php
// consumer.php - Consume product creation events

require_once __DIR__ . '/vendor/autoload.php';

class ProductConsumer {
    private $connection;
    private $channel;
    private $queue;
    private $processedCount = 0;

    public function __construct() {
        $this->connection = new AMQPConnection([
            'host' => getenv('RABBITMQ_HOST'),
            'port' => (int) getenv('RABBITMQ_PORT'),
            'vhost' => '/product_sync',
            'login' => getenv('RABBITMQ_CONSUMER_USER'),
            'password' => getenv('RABBITMQ_CONSUMER_PASS'),
            'read_timeout' => 0,  // Infinite for long-running consumers
            'write_timeout' => 3,
            'connect_timeout' => 5
        ]);

        if (!$this->connection->connect()) {
            throw new RuntimeException('Failed to connect to RabbitMQ');
        }

        $this->channel = new AMQPChannel($this->connection);
        $this->channel->setPrefetchCount(1);  // Process 1 message at a time

        $this->queue = new AMQPQueue($this->channel);
        $this->queue->setName('product.create');

        // Setup signal handlers for graceful shutdown
        pcntl_signal(SIGTERM, [$this, 'shutdown']);
        pcntl_signal(SIGINT, [$this, 'shutdown']);
    }

    public function start() {
        echo "Starting consumer for product.create queue...\n";

        $this->queue->consume(
            [$this, 'processMessage'],
            AMQP_NOPARAM
        );
    }

    public function processMessage(AMQPEnvelope $message, AMQPQueue $queue): bool {
        // Allow signal handling
        pcntl_signal_dispatch();

        echo "\n--- New Message ---\n";
        echo "Delivery Tag: {$message->getDeliveryTag()}\n";
        echo "Routing Key: {$message->getRoutingKey()}\n";

        try {
            // Parse message
            $data = json_decode($message->getBody(), true);

            if (json_last_error() !== JSON_ERROR_NONE) {
                throw new InvalidArgumentException('Invalid JSON: ' . json_last_error_msg());
            }

            // Validate required fields
            $required = ['message_id', 'product_id', 'name', 'price'];
            foreach ($required as $field) {
                if (!isset($data[$field])) {
                    throw new InvalidArgumentException("Missing field: {$field}");
                }
            }

            // Process the product
            $this->createProduct($data);

            // SUCCESS
            $queue->ack($message->getDeliveryTag());
            echo "✓ Processed message #{$data['message_id']}\n";

            $this->processedCount++;

            // Periodic restart (prevent memory leaks)
            if ($this->processedCount >= 1000) {
                echo "Processed 1000 messages, shutting down for restart...\n";
                $this->shutdown();
                return false;
            }

            return true;

        } catch (InvalidArgumentException $e) {
            // POISON MESSAGE - permanent error
            echo "✗ Invalid message: {$e->getMessage()}\n";
            $this->logFailedMessage($message, $e);
            $queue->reject($message->getDeliveryTag(), AMQP_NOPARAM);
            return true;

        } catch (PDOException $e) {
            // DATABASE ERROR
            echo "⚠ Database error: {$e->getMessage()}\n";

            // Check error type
            if (in_array($e->getCode(), ['23000'])) {
                // Duplicate - already processed
                echo "Duplicate, acknowledging\n";
                $queue->ack($message->getDeliveryTag());
            } else {
                // Temporary error - retry
                echo "Temporary error, requeueing\n";
                $queue->nack($message->getDeliveryTag(), AMQP_REQUEUE);
            }

            return true;

        } catch (Exception $e) {
            // UNKNOWN ERROR - retry
            echo "⚠ Unexpected error: {$e->getMessage()}\n";
            $queue->nack($message->getDeliveryTag(), AMQP_REQUEUE);
            return true;
        }
    }

    private function createProduct(array $data): void {
        // Simulate database operation
        $pdo = new PDO(
            "mysql:host=localhost;dbname=products",
            "user",
            "password"
        );

        $pdo->beginTransaction();

        try {
            $stmt = $pdo->prepare(
                'INSERT INTO products (id, name, price, created_at) VALUES (?, ?, ?, NOW())'
            );
            $stmt->execute([
                $data['product_id'],
                $data['name'],
                $data['price']
            ]);

            $pdo->commit();

        } catch (PDOException $e) {
            $pdo->rollBack();
            throw $e;
        }
    }

    private function logFailedMessage(AMQPEnvelope $message, Exception $e): void {
        $logEntry = [
            'timestamp' => date('Y-m-d H:i:s'),
            'error' => $e->getMessage(),
            'routing_key' => $message->getRoutingKey(),
            'body' => $message->getBody(),
            'headers' => $message->getHeaders()
        ];

        file_put_contents(
            '/var/log/rabbitmq_failed_messages.log',
            json_encode($logEntry) . "\n",
            FILE_APPEND
        );
    }

    public function shutdown(): void {
        echo "\nShutting down gracefully...\n";
        $this->queue->cancel();
        $this->connection->disconnect();
        exit(0);
    }
}

// Run consumer
try {
    $consumer = new ProductConsumer();
    $consumer->start();
} catch (Exception $e) {
    echo "Consumer error: {$e->getMessage()}\n";
    exit(1);
}

DLQ Inspector Example

<?php
// inspect-dlq.php - Inspect and process failed messages

require_once __DIR__ . '/vendor/autoload.php';

function inspectFailedMessages(): void {
    $connection = new AMQPConnection([
        'host' => getenv('RABBITMQ_HOST'),
        'port' => (int) getenv('RABBITMQ_PORT'),
        'vhost' => '/product_sync',
        'login' => getenv('RABBITMQ_CONSUMER_USER'),
        'password' => getenv('RABBITMQ_CONSUMER_PASS')
    ]);

    $connection->connect();

    $channel = new AMQPChannel($connection);
    $queue = new AMQPQueue($channel);
    $queue->setName('product.failed');

    echo "Inspecting DLQ: product.failed\n";
    echo "Total messages: " . $queue->declareQueue() . "\n\n";

    $count = 0;

    while ($message = $queue->get(AMQP_NOPARAM)) {
        $count++;

        echo "=== Failed Message #{$count} ===\n";
        echo "Body: {$message->getBody()}\n";
        echo "Routing Key: {$message->getRoutingKey()}\n";

        $headers = $message->getHeaders();

        if (isset($headers['x-first-death-reason'])) {
            echo "Failure Reason: {$headers['x-first-death-reason']}\n";
        }

        if (isset($headers['x-first-death-queue'])) {
            echo "Original Queue: {$headers['x-first-death-queue']}\n";
        }

        if (isset($headers['x-death'])) {
            echo "Death History:\n";
            print_r($headers['x-death']);
        }

        echo "\nOptions:\n";
        echo "1. Republish to original queue\n";
        echo "2. Discard message\n";
        echo "3. Skip for manual review\n";
        echo "Choose: ";

        $choice = trim(fgets(STDIN));

        switch ($choice) {
            case '1':
                republishMessage($message, $headers);
                $queue->ack($message->getDeliveryTag());
                echo "✓ Republished\n\n";
                break;

            case '2':
                $queue->ack($message->getDeliveryTag());
                echo "✓ Discarded\n\n";
                break;

            case '3':
                echo "⊘ Skipped (message will remain in DLQ)\n\n";
                break;

            default:
                echo "Invalid choice, skipping\n\n";
        }
    }

    echo "Inspected {$count} failed messages\n";
    $connection->disconnect();
}

function republishMessage(AMQPEnvelope $message, array $headers): void {
    // Extract original queue from headers
    $originalQueue = $headers['x-first-death-queue'] ?? 'product.create';

    $connection = new AMQPConnection([
        'host' => getenv('RABBITMQ_HOST'),
        'port' => (int) getenv('RABBITMQ_PORT'),
        'vhost' => '/product_sync',
        'login' => getenv('RABBITMQ_PUBLISHER_USER'),
        'password' => getenv('RABBITMQ_PUBLISHER_PASS')
    ]);

    $connection->connect();

    $channel = new AMQPChannel($connection);
    $exchange = new AMQPExchange($channel);
    $exchange->setName('product.exchange');
    $exchange->setType(AMQP_EX_TYPE_TOPIC);

    // Republish to original routing key
    $exchange->publish(
        $message->getBody(),
        $originalQueue,
        AMQP_NOPARAM,
        ['delivery_mode' => 2]
    );

    $connection->disconnect();
}

inspectFailedMessages();

Monitoring & Debugging

Check Queue Status

# List all queues with message counts
docker exec rabbitmq rabbitmqctl list_queues name messages messages_ready messages_unacknowledged

# Detailed queue info
docker exec rabbitmq rabbitmqctl list_queues name durable auto_delete arguments policy

# Check dead-letter queue
docker exec rabbitmq rabbitmqctl list_queues | grep failed

Monitor Consumer Status

# List active consumers
docker exec rabbitmq rabbitmqctl list_consumers

# Check consumer prefetch
docker exec rabbitmq rabbitmqctl list_channels name prefetch_count

View Message Flow

# Management UI
http://localhost:15672
# Login: admin / aUGqMcaKHE/5GVdmuoXVxfb1unkl6Vp5MzUK3rdjJzU=

# View queues → Click queue name → Get messages

Debug Connection Issues

// Enable debug logging in PHP
ini_set('display_errors', 1);
error_reporting(E_ALL);

// Check connection parameters
var_dump([
    'host' => getenv('RABBITMQ_HOST'),
    'port' => getenv('RABBITMQ_PORT'),
    'vhost' => getenv('RABBITMQ_VHOST'),
    'user' => getenv('RABBITMQ_CONSUMER_USER')
]);

// Test connection
try {
    $connection = new AMQPConnection([
        'host' => getenv('RABBITMQ_HOST'),
        'port' => 5672,
        'vhost' => '/product_sync',
        'login' => getenv('RABBITMQ_CONSUMER_USER'),
        'password' => getenv('RABBITMQ_CONSUMER_PASS')
    ]);

    if ($connection->connect()) {
        echo "✓ Connected successfully\n";
    } else {
        echo "✗ Connection failed\n";
    }
} catch (Exception $e) {
    echo "Error: {$e->getMessage()}\n";
}

Performance Metrics

# Message rates
docker exec rabbitmq rabbitmqctl list_queues name messages_ready message_stats

# Connection stats
docker exec rabbitmq rabbitmqctl list_connections name state channels

# Memory usage
docker exec rabbitmq rabbitmqctl status | grep -A 10 "Memory"

Best Practices Summary

Publisher

  • ✅ Use delivery_mode: 2 for persistence
  • ✅ Include unique message_id for idempotency
  • ✅ Handle connection errors with retry logic
  • ✅ Set reasonable timeouts
  • ❌ Don't use AUTO-ACK mode

Consumer

  • ✅ Use prefetch_count: 1 for even distribution
  • ✅ ACK only after successful processing
  • ✅ Use NACK for temporary errors (retry)
  • ✅ Use REJECT for permanent errors (DLQ)
  • ✅ Implement graceful shutdown
  • ✅ Restart periodically to prevent memory leaks
  • ❌ Don't use AUTO-ACK mode in production

Error Handling

  • ✅ Distinguish temporary vs permanent errors
  • ✅ Log poison messages before rejecting
  • ✅ Monitor DLQ regularly
  • ✅ Implement database transaction rollback
  • ✅ Use exponential backoff for retries
  • ❌ Don't requeue infinitely (use x-delivery-limit)

Monitoring

  • ✅ Monitor queue depth
  • ✅ Track message rates
  • ✅ Alert on DLQ growth
  • ✅ Check consumer lag
  • ✅ Monitor memory usage

Quick Reference

Message Lifecycle

PUBLISH → ROUTE → QUEUE → CONSUME → PROCESS → ACK/NACK/REJECT
                                           ↓
                                      (on reject)
                                           ↓
                                    DEAD-LETTER EXCHANGE
                                           ↓
                                    FAILED QUEUE (DLQ)

Acknowledgment Decision Tree

graph TD
    START[Message Received] --> CHECK{Processing<br/>Outcome}

    CHECK -->|"success"| ACK[✓ ACK<br/>remove from queue]
    CHECK -->|"invalid format/data"| REJ[✗ REJECT<br/>send to DLQ immediately]
    CHECK -->|"database error"| DB{Error<br/>Type}
    CHECK -->|"network error"| NACK1[⚠ NACK<br/>requeue for retry]
    CHECK -->|"unknown error"| NACK2[⚠ NACK<br/>requeue for retry]

    DB -->|"duplicate key"| ACK2[✓ ACK<br/>already processed]
    DB -->|"connection error"| NACK3[⚠ NACK<br/>requeue for retry]

    REJ --> DLQ1[product.failed]
    NACK1 -.->|"after 5 attempts"| DLQ2[product.failed]
    NACK2 -.->|"after 5 attempts"| DLQ3[product.failed]
    NACK3 -.->|"after 5 attempts"| DLQ4[product.failed]

    style ACK fill:#4caf50
    style ACK2 fill:#4caf50
    style REJ fill:#f44336
    style NACK1 fill:#ffc107
    style NACK2 fill:#ffc107
    style NACK3 fill:#ffc107
    style DLQ1 fill:#e91e63
    style DLQ2 fill:#e91e63
    style DLQ3 fill:#e91e63
    style DLQ4 fill:#e91e63
Loading
Message Received
    ├─ Valid & Processed Successfully → ACK
    ├─ Invalid Format/Data → REJECT (no requeue) → DLQ
    ├─ Database Error
    │   ├─ Duplicate Key → ACK (already processed)
    │   └─ Connection Error → NACK (requeue)
    ├─ Network Error → NACK (requeue)
    └─ Unknown Error → NACK (requeue) → eventually DLQ after 5 attempts

Environment Variables

# Connection
RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
RABBITMQ_VHOST=/product_sync

# Admin (full access)
RABBITMQ_USER=admin
RABBITMQ_PASS=aUGqMcaKHE/5GVdmuoXVxfb1unkl6Vp5MzUK3rdjJzU=

# Publisher (write-only)
RABBITMQ_PUBLISHER_USER=publisher
RABBITMQ_PUBLISHER_PASS=a0Eif5UY77MPOQelIi5lbCCHRIOPgiy4Q+EPHiHnKHc=

# Consumer (read-only)
RABBITMQ_CONSUMER_USER=consumer
RABBITMQ_CONSUMER_PASS=2QqdzOYVf2PbeIFfZm5PhNcf1ry3e9GL18vLglDvWFQ=
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment