Imagine Kafka as a magical post office that never loses letters! 📮
🏠 House A (Producer) ──── 📬 Kafka Post Office ──── 🏠 House B (Consumer)
Sends letters Stores letters safely Receives letters
- Fast: Like a super-fast mail delivery service ⚡
- Reliable: Never loses your messages 🛡️
- Scalable: Can handle millions of messages 📈
- Real-time: Messages arrive instantly ⏱️
- 🎵 Spotify: Sends song recommendations
- 🛒 Amazon: Tracks your orders
- 🚗 Uber: Connects drivers with riders
- 📱 Netflix: Suggests movies you might like
┌─────────────────────────────────────────────────────────────┐
│ 🏢 KAFKA OFFICE │
├─────────────────────────────────────────────────────────────┤
│ │
│ 📥 PRODUCERS 📚 TOPICS 📤 CONSUMERS │
│ (Message Senders) (Message Boxes) (Message Receivers) │
│ │
│ 🏠 App A ────────▶ 📦 user-events ────────▶ 🏠 App B │
│ 🏠 App C ────────▶ 📦 orders ────────▶ 🏠 App D │
│ 🏠 App E ────────▶ 📦 payments ────────▶ 🏠 App F │
│ │
└─────────────────────────────────────────────────────────────┘
Step 1: Producer creates a message
┌─────────────┐
│ 🏠 App A │ ──── "User John logged in"
└─────────────┘
Step 2: Message goes to Kafka Topic
┌─────────────┐ ┌─────────────┐
│ 🏠 App A │ ──── │ 📦 Topic │
└─────────────┘ │ user-events │
└─────────────┘
Step 3: Consumer reads the message
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 🏠 App A │ ──── │ 📦 Topic │ ──── │ 🏠 App B │
└─────────────┘ │ user-events │ │ (Analytics) │
└─────────────┘ └─────────────┘
- Overview
- Prerequisites
- EC2 Instance Setup
- Kafka Installation & Configuration
- Performance Optimization
- Kafka UI Setup
- TypeScript/Node.js Integration
- Monitoring & Maintenance
- Security Best Practices
- Cost Optimization
- Troubleshooting
This guide provides a complete setup for deploying Apache Kafka on EC2 instances optimized for:
- High Throughput: 100K+ messages/second
- Low Latency: Sub-millisecond response times
- Cost Efficiency: Self-hosted to reduce AWS MSK costs
- Production Ready: Monitoring, security, and reliability
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Producer │ │ Kafka Broker │ │ Consumer │
│ (Node.js/TS) │───▶│ (EC2) │───▶│ (Node.js/TS) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
┌─────────────────┐
│ Kafka UI │
│ (Web Admin) │
└─────────────────┘
- AWS CLI configured with appropriate permissions
- VPC with public/private subnets
- Security groups configured for Kafka ports
- Node.js 18+ and npm/yarn
- TypeScript 4.5+
- Docker (optional, for local testing)
# Instance Type: m5.2xlarge or c5.2xlarge
# CPU: 8 vCPUs
# Memory: 32 GB RAM
# Network: Up to 10 Gbps
# Storage: 2x 500GB gp3 SSD (RAID 0)# Instance Type: t3.large
# CPU: 2 vCPUs
# Memory: 8 GB RAM
# Network: Up to 5 Gbps
# Storage: 1x 200GB gp3 SSD- Create Launch Template
aws ec2 create-launch-template \
--launch-template-name kafka-production \
--version-description "Kafka production template" \
--launch-template-data '{
"ImageId": "ami-0c02fb55956c7d316",
"InstanceType": "m5.2xlarge",
"KeyName": "your-key-pair",
"SecurityGroupIds": ["sg-xxxxxxxxx"],
"IamInstanceProfile": {
"Name": "kafka-instance-profile"
},
"BlockDeviceMappings": [
{
"DeviceName": "/dev/xvda",
"Ebs": {
"VolumeSize": 50,
"VolumeType": "gp3"
}
},
{
"DeviceName": "/dev/xvdf",
"Ebs": {
"VolumeSize": 500,
"VolumeType": "gp3",
"Iops": 3000
}
}
],
"UserData": "'"$(base64 -w 0 user-data.sh)"'"
}'- User Data Script (user-data.sh)
#!/bin/bash
yum update -y
yum install -y java-11-amazon-corretto-headless
# Install monitoring tools
yum install -y htop iotop nethogs
# Format and mount data volume
mkfs.ext4 /dev/xvdf
mkdir -p /kafka-data
mount /dev/xvdf /kafka-data
echo '/dev/xvdf /kafka-data ext4 defaults,nofail 0 2' >> /etc/fstab
# Create kafka user
useradd -r -s /bin/false kafka
chown -R kafka:kafka /kafka-data
# Set system limits
cat << 'EOF' >> /etc/security/limits.conf
kafka soft nofile 65536
kafka hard nofile 65536
kafka soft nproc 32768
kafka hard nproc 32768
EOF
# Kernel optimizations
cat << 'EOF' >> /etc/sysctl.conf
# Network optimizations
net.core.rmem_max = 134217728
net.core.wmem_max = 134217728
net.ipv4.tcp_rmem = 4096 87380 134217728
net.ipv4.tcp_wmem = 4096 65536 134217728
net.core.netdev_max_backlog = 5000
# File system optimizations
vm.dirty_ratio = 5
vm.dirty_background_ratio = 2
vm.swappiness = 1
EOF
sysctl -p# Download Kafka
cd /opt
wget https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz
tar -xzf kafka_2.13-2.8.2.tgz
ln -s kafka_2.13-2.8.2 kafka
chown -R kafka:kafka kafka*
# Create directories
mkdir -p /kafka-data/logs
mkdir -p /kafka-data/zookeeper
chown -R kafka:kafka /kafka-dataCreate /opt/kafka/config/zookeeper.properties:
# Zookeeper configuration for production
dataDir=/kafka-data/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
# Performance settings
tickTime=2000
initLimit=10
syncLimit=5
maxSessionTimeout=40000
# Purge settings
autopurge.snapRetainCount=3
autopurge.purgeInterval=24Create /opt/kafka/config/server.properties:
# Broker configuration
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://$(curl -s http://169.254.169.254/latest/meta-data/public-ipv4):9092
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# Log configuration
log.dirs=/kafka-data/logs
num.partitions=8
default.replication.factor=1
min.insync.replicas=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# Performance optimizations
log.flush.interval.messages=10000
log.flush.interval.ms=1000
replica.fetch.max.bytes=1048576
message.max.bytes=1048576
# Compression
compression.type=lz4
# Memory settings
log.cleaner.enable=true
log.cleanup.policy=delete
# Network settings
replica.socket.timeout.ms=30000
controller.socket.timeout.ms=30000
# Zookeeper connection
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000Zookeeper Service (/etc/systemd/system/zookeeper.service):
[Unit]
Description=Apache Zookeeper
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
Group=kafka
Environment=KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.targetKafka Service (/etc/systemd/system/kafka.service):
[Unit]
Description=Apache Kafka
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
Group=kafka
Environment=KAFKA_HEAP_OPTS="-Xmx8G -Xms8G -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.targetStart Services:
systemctl daemon-reload
systemctl enable zookeeper kafka
systemctl start zookeeper
sleep 10
systemctl start kafkaCreate /opt/kafka/bin/kafka-server-start-optimized.sh:
#!/bin/bash
export KAFKA_HEAP_OPTS="-Xmx8G -Xms8G"
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
export KAFKA_GC_LOG_OPTS="-Xloggc:/kafka-data/logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties# Set CPU governor to performance
echo performance | tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor
# Disable swap
swapoff -a
sed -i '/ swap / s/^/#/' /etc/fstab
# Set file system mount options for performance
mount -o remount,noatime,nodiratime /kafka-data# Install Docker
yum install -y docker
systemctl start docker
systemctl enable docker
# Run Kafka UI
docker run -d \
--name kafka-ui \
--restart unless-stopped \
-p 8080:8080 \
-e KAFKA_CLUSTERS_0_NAME=production \
-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=localhost:9092 \
-e KAFKA_CLUSTERS_0_ZOOKEEPER=localhost:2181 \
-e AUTH_TYPE=LOGIN_FORM \
-e SPRING_SECURITY_USER_NAME=admin \
-e SPRING_SECURITY_USER_PASSWORD=your-secure-password \
provectuslabs/kafka-ui:latest- URL:
http://your-ec2-public-ip:8080 - Username:
admin - Password:
your-secure-password
# Initialize new project
mkdir kafka-typescript-client
cd kafka-typescript-client
npm init -y
# Install dependencies
npm install kafkajs
npm install -D typescript @types/node ts-node nodemon
# Create tsconfig.json
cat << 'EOF' > tsconfig.json
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"lib": ["ES2020"],
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}
EOFCreate src/config/kafka.ts:
import { Kafka, KafkaConfig } from "kafkajs";
export interface KafkaClientConfig {
brokers: string[];
clientId: string;
connectionTimeout?: number;
requestTimeout?: number;
retry?: {
retries: number;
initialRetryTime: number;
maxRetryTime: number;
};
}
export class KafkaClient {
private kafka: Kafka;
private config: KafkaClientConfig;
constructor(config: KafkaClientConfig) {
this.config = config;
const kafkaConfig: KafkaConfig = {
clientId: config.clientId,
brokers: config.brokers,
connectionTimeout: config.connectionTimeout || 1000,
requestTimeout: config.requestTimeout || 30000,
retry: config.retry || {
retries: 8,
initialRetryTime: 100,
maxRetryTime: 30000,
},
};
this.kafka = new Kafka(kafkaConfig);
}
get client(): Kafka {
return this.kafka;
}
async createTopics(
topics: Array<{
topic: string;
numPartitions: number;
replicationFactor: number;
}>,
) {
const admin = this.kafka.admin();
await admin.connect();
try {
await admin.createTopics({
topics: topics.map((t) => ({
topic: t.topic,
numPartitions: t.numPartitions,
replicationFactor: t.replicationFactor,
configEntries: [
{ name: "compression.type", value: "lz4" },
{ name: "min.insync.replicas", value: "1" },
{ name: "segment.ms", value: "86400000" }, // 24 hours
],
})),
});
} finally {
await admin.disconnect();
}
}
}Create src/producer/high-performance-producer.ts:
import { Producer, ProducerRecord, RecordMetadata } from "kafkajs";
import { KafkaClient } from "../config/kafka";
export interface ProducerConfig {
maxInFlightRequests?: number;
idempotent?: boolean;
transactionTimeout?: number;
acks?: -1 | 0 | 1;
compression?: "gzip" | "snappy" | "lz4" | "zstd";
batchSize?: number;
lingerMs?: number;
}
export class HighPerformanceProducer {
private producer: Producer;
private isConnected = false;
constructor(
private kafkaClient: KafkaClient,
private config: ProducerConfig = {},
) {
this.producer = this.kafkaClient.client.producer({
maxInFlightRequests: config.maxInFlightRequests || 5,
idempotent: config.idempotent || true,
transactionTimeout: config.transactionTimeout || 30000,
allowAutoTopicCreation: false,
retry: {
retries: 5,
initialRetryTime: 100,
maxRetryTime: 30000,
},
});
}
async connect(): Promise<void> {
if (!this.isConnected) {
await this.producer.connect();
this.isConnected = true;
}
}
async disconnect(): Promise<void> {
if (this.isConnected) {
await this.producer.disconnect();
this.isConnected = false;
}
}
async send(record: ProducerRecord): Promise<RecordMetadata[]> {
await this.connect();
return this.producer.send({
...record,
acks: this.config.acks || -1,
compression: this.config.compression || "lz4",
});
}
async sendBatch(records: ProducerRecord[]): Promise<RecordMetadata[][]> {
await this.connect();
return this.producer.sendBatch({
topicMessages: records.map((record) => ({
...record,
acks: this.config.acks || -1,
compression: this.config.compression || "lz4",
})),
});
}
// High-throughput streaming method
async sendStream(
topic: string,
messages: Array<{
key?: string;
value: string;
partition?: number;
timestamp?: string;
}>,
batchSize: number = 1000,
): Promise<void> {
await this.connect();
const batches = this.chunkArray(messages, batchSize);
for (const batch of batches) {
const record: ProducerRecord = {
topic,
messages: batch,
acks: this.config.acks || -1,
compression: this.config.compression || "lz4",
};
await this.producer.send(record);
}
}
private chunkArray<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
}Create src/consumer/high-performance-consumer.ts:
import {
Consumer,
ConsumerSubscribeTopic,
EachMessagePayload,
KafkaMessage,
} from "kafkajs";
import { KafkaClient } from "../config/kafka";
export interface ConsumerConfig {
groupId: string;
sessionTimeout?: number;
rebalanceTimeout?: number;
heartbeatInterval?: number;
maxBytesPerPartition?: number;
minBytes?: number;
maxBytes?: number;
maxWaitTimeInMs?: number;
allowAutoTopicCreation?: boolean;
}
export interface MessageHandler {
(payload: EachMessagePayload): Promise<void>;
}
export class HighPerformanceConsumer {
private consumer: Consumer;
private isConnected = false;
private isRunning = false;
constructor(
private kafkaClient: KafkaClient,
private config: ConsumerConfig,
) {
this.consumer = this.kafkaClient.client.consumer({
groupId: config.groupId,
sessionTimeout: config.sessionTimeout || 30000,
rebalanceTimeout: config.rebalanceTimeout || 60000,
heartbeatInterval: config.heartbeatInterval || 3000,
maxBytesPerPartition: config.maxBytesPerPartition || 1048576,
minBytes: config.minBytes || 1,
maxBytes: config.maxBytes || 10485760,
maxWaitTimeInMs: config.maxWaitTimeInMs || 5000,
allowAutoTopicCreation: config.allowAutoTopicCreation || false,
retry: {
retries: 5,
initialRetryTime: 100,
maxRetryTime: 30000,
},
});
}
async connect(): Promise<void> {
if (!this.isConnected) {
await this.consumer.connect();
this.isConnected = true;
}
}
async disconnect(): Promise<void> {
if (this.isConnected) {
await this.consumer.disconnect();
this.isConnected = false;
}
}
async subscribe(subscription: ConsumerSubscribeTopic): Promise<void> {
await this.connect();
await this.consumer.subscribe(subscription);
}
async run(messageHandler: MessageHandler): Promise<void> {
if (this.isRunning) {
throw new Error("Consumer is already running");
}
await this.connect();
this.isRunning = true;
await this.consumer.run({
autoCommit: false,
partitionsConsumedConcurrently: 3,
eachMessage: async (payload: EachMessagePayload) => {
try {
await messageHandler(payload);
await this.consumer.commitOffsets([
{
topic: payload.topic,
partition: payload.partition,
offset: (
parseInt(payload.message.offset) + 1
).toString(),
},
]);
} catch (error) {
console.error("Error processing message:", error);
// Implement your error handling strategy here
// You might want to send to a dead letter queue
}
},
});
}
async runBatch(
batchHandler: (
messages: KafkaMessage[],
topic: string,
partition: number,
) => Promise<void>,
batchSize: number = 100,
): Promise<void> {
if (this.isRunning) {
throw new Error("Consumer is already running");
}
await this.connect();
this.isRunning = true;
await this.consumer.run({
autoCommit: false,
partitionsConsumedConcurrently: 3,
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
try {
const messages = batch.messages;
const chunks = this.chunkArray(messages, batchSize);
for (const chunk of chunks) {
await batchHandler(chunk, batch.topic, batch.partition);
// Commit the offset of the last message in the chunk
const lastMessage = chunk[chunk.length - 1];
resolveOffset(lastMessage.offset);
await heartbeat();
}
} catch (error) {
console.error("Error processing batch:", error);
// Implement your error handling strategy here
}
},
});
}
async stop(): Promise<void> {
if (this.isRunning) {
await this.consumer.stop();
this.isRunning = false;
}
}
private chunkArray<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
}Create src/examples/producer-example.ts:
import { KafkaClient } from "../config/kafka";
import { HighPerformanceProducer } from "../producer/high-performance-producer";
async function runProducerExample() {
const kafkaClient = new KafkaClient({
brokers: ["your-ec2-public-ip:9092"],
clientId: "high-performance-producer",
});
const producer = new HighPerformanceProducer(kafkaClient, {
compression: "lz4",
acks: -1,
idempotent: true,
});
try {
// Create topic
await kafkaClient.createTopics([
{ topic: "test-topic", numPartitions: 8, replicationFactor: 1 },
]);
// Single message
await producer.send({
topic: "test-topic",
messages: [
{
key: "key1",
value: JSON.stringify({ id: 1, message: "Hello Kafka!" }),
timestamp: Date.now().toString(),
},
],
});
// Batch messages for high throughput
const messages = Array.from({ length: 10000 }, (_, i) => ({
key: `key-${i}`,
value: JSON.stringify({
id: i,
message: `Message ${i}`,
timestamp: Date.now(),
}),
}));
await producer.sendStream("test-topic", messages, 1000);
console.log("Messages sent successfully");
} catch (error) {
console.error("Error:", error);
} finally {
await producer.disconnect();
}
}
runProducerExample().catch(console.error);Create src/examples/consumer-example.ts:
import { KafkaClient } from "../config/kafka";
import { HighPerformanceConsumer } from "../consumer/high-performance-consumer";
async function runConsumerExample() {
const kafkaClient = new KafkaClient({
brokers: ["your-ec2-public-ip:9092"],
clientId: "high-performance-consumer",
});
const consumer = new HighPerformanceConsumer(kafkaClient, {
groupId: "test-group",
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576,
});
try {
await consumer.subscribe({ topic: "test-topic", fromBeginning: true });
await consumer.run(async ({ topic, partition, message }) => {
const key = message.key?.toString();
const value = message.value?.toString();
const offset = message.offset;
console.log(
`Received message: ${key} = ${value} (offset: ${offset})`,
);
// Process your message here
const data = JSON.parse(value || "{}");
// ... business logic
});
} catch (error) {
console.error("Error:", error);
}
}
runConsumerExample().catch(console.error);Update your package.json:
{
"scripts": {
"build": "tsc",
"start": "node dist/index.js",
"dev": "ts-node src/index.ts",
"producer": "ts-node src/examples/producer-example.ts",
"consumer": "ts-node src/examples/consumer-example.ts"
}
}Create /opt/kafka/config/jmx.properties:
# Enable JMX
com.sun.management.jmxremote=true
com.sun.management.jmxremote.port=9999
com.sun.management.jmxremote.authenticate=false
com.sun.management.jmxremote.ssl=falseAdd to Kafka startup script:
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"Create /opt/kafka/bin/health-check.sh:
#!/bin/bash
# Kafka health check script
KAFKA_HOME="/opt/kafka"
KAFKA_HOST="localhost:9092"
# Check if Kafka is responding
if $KAFKA_HOME/bin/kafka-broker-api-versions.sh --bootstrap-server $KAFKA_HOST > /dev/null 2>&1; then
echo "Kafka is healthy"
exit 0
else
echo "Kafka is not responding"
exit 1
fiCreate /etc/logrotate.d/kafka:
/kafka-data/logs/*.log {
daily
rotate 7
compress
delaycompress
missingok
notifempty
create 0644 kafka kafka
postrotate
/bin/kill -HUP `cat /var/run/kafka.pid 2> /dev/null` 2> /dev/null || true
endscript
}
Generate certificates:
# Create CA
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -nodes
# Create server certificate
openssl req -keyout server-key -out server-cert-request -nodes -new
openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-request -out server-cert -days 365 -CAcreateserialUpdate server.properties:
# SSL Configuration
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
ssl.keystore.location=/opt/kafka/config/server.keystore.jks
ssl.keystore.password=your-password
ssl.key.password=your-password
ssl.truststore.location=/opt/kafka/config/server.truststore.jks
ssl.truststore.password=your-passwordConfigure AWS Security Groups:
# Kafka broker port
aws ec2 authorize-security-group-ingress \
--group-id sg-xxxxxxxxx \
--protocol tcp \
--port 9092 \
--source-group sg-xxxxxxxxx
# Zookeeper port (only from Kafka brokers)
aws ec2 authorize-security-group-ingress \
--group-id sg-xxxxxxxxx \
--protocol tcp \
--port 2181 \
--source-group sg-xxxxxxxxx
# Kafka UI (restrict to your IP)
aws ec2 authorize-security-group-ingress \
--group-id sg-xxxxxxxxx \
--protocol tcp \
--port 8080 \
--cidr your-ip/32Create Auto Scaling Group:
aws autoscaling create-auto-scaling-group \
--auto-scaling-group-name kafka-asg \
--launch-template LaunchTemplateName=kafka-production,Version=1 \
--min-size 1 \
--max-size 3 \
--desired-capacity 1 \
--vpc-zone-identifier subnet-xxxxxxxxx \
--health-check-type ELB \
--health-check-grace-period 300Use Spot Instances for development:
aws ec2 request-spot-instances \
--spot-price "0.20" \
--instance-count 1 \
--type "one-time" \
--launch-specification '{
"ImageId": "ami-0c02fb55956c7d316",
"InstanceType": "m5.large",
"KeyName": "your-key-pair",
"SecurityGroups": ["kafka-sg"]
}'Set up CloudWatch billing alerts:
aws cloudwatch put-metric-alarm \
--alarm-name "Kafka-Monthly-Cost" \
--alarm-description "Monitor Kafka monthly costs" \
--metric-name EstimatedCharges \
--namespace AWS/Billing \
--statistic Maximum \
--period 86400 \
--threshold 100 \
--comparison-operator GreaterThanThreshold \
--dimensions Name=Currency,Value=USD \
--evaluation-periods 1 \
--alarm-actions arn:aws:sns:us-east-1:123456789012:billing-alarmRight-sizing script (/opt/kafka/bin/optimize-resources.sh):
#!/bin/bash
# Optimize Kafka resources based on load
# Monitor CPU usage
CPU_USAGE=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1)
MEMORY_USAGE=$(free | grep Mem | awk '{printf "%.2f", $3/$2 * 100.0}')
echo "Current CPU Usage: $CPU_USAGE%"
echo "Current Memory Usage: $MEMORY_USAGE%"
# Adjust JVM heap based on memory usage
if (( $(echo "$MEMORY_USAGE > 80" | bc -l) )); then
echo "High memory usage detected. Consider increasing instance size."
fi
# Check disk usage
DISK_USAGE=$(df /kafka-data | tail -1 | awk '{print $5}' | cut -d'%' -f1)
if [ "$DISK_USAGE" -gt 80 ]; then
echo "High disk usage: $DISK_USAGE%. Consider cleanup or expansion."
fiDiagnosis:
# Check network latency
ping -c 10 your-kafka-broker
# Monitor JVM garbage collection
tail -f /kafka-data/logs/kafkaServer-gc.log
# Check I/O wait
iostat -x 1Solutions:
# Optimize network buffer sizes
echo 'net.core.rmem_max = 134217728' >> /etc/sysctl.conf
echo 'net.core.wmem_max = 134217728' >> /etc/sysctl.conf
sysctl -p
# Optimize JVM GC settings
export KAFKA_HEAP_OPTS="-Xmx8G -Xms8G -XX:+UseG1GC -XX:MaxGCPauseMillis=10"Diagnosis:
# Test producer throughput
/opt/kafka/bin/kafka-producer-perf-test.sh \
--topic test-topic \
--num-records 100000 \
--record-size 1000 \
--throughput 10000 \
--producer-props bootstrap.servers=localhost:9092
# Test consumer throughput
/opt/kafka/bin/kafka-consumer-perf-test.sh \
--topic test-topic \
--bootstrap-server localhost:9092 \
--messages 100000Solutions:
# Increase batch size in server.properties
batch.size=65536
linger.ms=10
compression.type=lz4
# Increase number of partitions
num.partitions=16Automatic cleanup script (/opt/kafka/bin/cleanup-logs.sh):
#!/bin/bash
# Clean up old Kafka logs
LOG_DIR="/kafka-data/logs"
RETENTION_DAYS=7
# Remove old log segments
find $LOG_DIR -name "*.log" -type f -mtime +$RETENTION_DAYS -delete
find $LOG_DIR -name "*.index" -type f -mtime +$RETENTION_DAYS -delete
find $LOG_DIR -name "*.timeindex" -type f -mtime +$RETENTION_DAYS -delete
# Clean up old GC logs
find /kafka-data/logs -name "kafkaServer-gc.log.*" -type f -mtime +$RETENTION_DAYS -delete
echo "Cleanup completed at $(date)"Add to crontab:
# Run cleanup daily at 2 AM
0 2 * * * /opt/kafka/bin/cleanup-logs.sh >> /var/log/kafka-cleanup.log 2>&1Memory optimization script (/opt/kafka/bin/optimize-memory.sh):
#!/bin/bash
# Optimize Kafka memory usage
# Get system memory
TOTAL_MEM=$(free -g | grep Mem | awk '{print $2}')
AVAILABLE_MEM=$(free -g | grep Mem | awk '{print $7}')
# Calculate optimal heap size (50% of total memory)
HEAP_SIZE=$((TOTAL_MEM / 2))
echo "Total Memory: ${TOTAL_MEM}G"
echo "Available Memory: ${AVAILABLE_MEM}G"
echo "Recommended Heap Size: ${HEAP_SIZE}G"
# Update Kafka service with optimal memory settings
sed -i "s/KAFKA_HEAP_OPTS=\"-Xmx.*G -Xms.*G\"/KAFKA_HEAP_OPTS=\"-Xmx${HEAP_SIZE}G -Xms${HEAP_SIZE}G\"/" /etc/systemd/system/kafka.service
systemctl daemon-reloadJVM Profiling:
# Enable JFR (Java Flight Recorder)
export KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS -XX:+FlightRecorder -XX:StartFlightRecording=duration=60s,filename=/tmp/kafka-profile.jfr"
# Analyze with jcmd
jcmd $(pgrep -f kafka) JFR.dump filename=/tmp/kafka-dump.jfrNetwork Profiling:
# Monitor network connections
netstat -an | grep :9092
# Monitor network traffic
tcpdump -i eth0 port 9092 -w kafka-traffic.pcap
# Analyze with iftop
iftop -i eth0 -P -f "port 9092"Automated log analysis script (/opt/kafka/bin/analyze-logs.sh):
#!/bin/bash
# Analyze Kafka logs for issues
LOG_FILE="/kafka-data/logs/server.log"
ALERT_FILE="/tmp/kafka-alerts.log"
# Check for common error patterns
grep -i "error\|exception\|failed\|timeout" $LOG_FILE | tail -20 > $ALERT_FILE
# Check for performance issues
grep -i "gc\|memory\|slow" $LOG_FILE | tail -10 >> $ALERT_FILE
# Check for replication issues
grep -i "replica\|sync\|lag" $LOG_FILE | tail -10 >> $ALERT_FILE
if [ -s $ALERT_FILE ]; then
echo "Kafka issues detected:"
cat $ALERT_FILE
# Optional: Send alerts via SNS
# aws sns publish --topic-arn arn:aws:sns:region:account:kafka-alerts --message file://$ALERT_FILE
fiFor high availability, set up multiple brokers:
Broker 1 Configuration:
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://broker1-ip:9092
log.dirs=/kafka-data/logs-1Broker 2 Configuration:
broker.id=2
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://broker2-ip:9092
log.dirs=/kafka-data/logs-2Broker 3 Configuration:
broker.id=3
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://broker3-ip:9092
log.dirs=/kafka-data/logs-3High-throughput topic:
/opt/kafka/bin/kafka-topics.sh \
--create \
--bootstrap-server localhost:9092 \
--topic high-throughput-topic \
--partitions 16 \
--replication-factor 3 \
--config compression.type=lz4 \
--config min.insync.replicas=2 \
--config segment.ms=86400000 \
--config retention.ms=604800000Low-latency topic:
/opt/kafka/bin/kafka-topics.sh \
--create \
--bootstrap-server localhost:9092 \
--topic low-latency-topic \
--partitions 8 \
--replication-factor 3 \
--config compression.type=none \
--config min.insync.replicas=2 \
--config segment.ms=3600000 \
--config flush.ms=100Create src/patterns/event-driven.ts:
import { EventEmitter } from "events";
import { KafkaClient } from "../config/kafka";
import { HighPerformanceProducer } from "../producer/high-performance-producer";
import { HighPerformanceConsumer } from "../consumer/high-performance-consumer";
export interface DomainEvent {
eventId: string;
eventType: string;
aggregateId: string;
version: number;
timestamp: Date;
data: Record<string, any>;
}
export class EventStore extends EventEmitter {
private producer: HighPerformanceProducer;
private consumer: HighPerformanceConsumer;
constructor(private kafkaClient: KafkaClient, private eventTopic: string) {
super();
this.producer = new HighPerformanceProducer(kafkaClient);
this.consumer = new HighPerformanceConsumer(kafkaClient, {
groupId: "event-store-consumer",
});
}
async publishEvent(event: DomainEvent): Promise<void> {
await this.producer.send({
topic: this.eventTopic,
messages: [
{
key: event.aggregateId,
value: JSON.stringify(event),
partition: this.getPartition(event.aggregateId),
},
],
});
}
async subscribeToEvents(): Promise<void> {
await this.consumer.subscribe({ topic: this.eventTopic });
await this.consumer.run(async ({ message }) => {
const event: DomainEvent = JSON.parse(
message.value?.toString() || "{}",
);
this.emit("event", event);
this.emit(event.eventType, event);
});
}
private getPartition(aggregateId: string): number {
// Simple hash-based partitioning
let hash = 0;
for (let i = 0; i < aggregateId.length; i++) {
hash = (hash << 5) - hash + aggregateId.charCodeAt(i);
hash = hash & hash; // Convert to 32-bit integer
}
return Math.abs(hash) % 16; // Assuming 16 partitions
}
}Create src/patterns/pipeline.ts:
export interface MessageProcessor<T, R> {
process(message: T): Promise<R>;
}
export class ProcessingPipeline<T> {
private processors: MessageProcessor<any, any>[] = [];
addProcessor<R>(processor: MessageProcessor<T, R>): ProcessingPipeline<R> {
this.processors.push(processor);
return this as any;
}
async execute(message: T): Promise<any> {
let result = message;
for (const processor of this.processors) {
result = await processor.process(result);
}
return result;
}
}
// Example processors
export class ValidationProcessor implements MessageProcessor<any, any> {
async process(message: any): Promise<any> {
if (!message.id) {
throw new Error("Message must have an ID");
}
return message;
}
}
export class EnrichmentProcessor implements MessageProcessor<any, any> {
async process(message: any): Promise<any> {
return {
...message,
processedAt: new Date(),
enrichedData: await this.enrichData(message),
};
}
private async enrichData(message: any): Promise<any> {
// Simulate data enrichment
return { additionalInfo: `enriched-${message.id}` };
}
}
export class TransformationProcessor implements MessageProcessor<any, any> {
async process(message: any): Promise<any> {
return {
id: message.id,
data: message.data,
metadata: {
processedAt: message.processedAt,
version: "1.0",
},
};
}
}Create infrastructure/kafka-stack.yaml:
AWSTemplateFormatVersion: "2010-09-09"
Description: "Kafka Cluster on EC2"
Parameters:
InstanceType:
Type: String
Default: m5.2xlarge
Description: EC2 instance type for Kafka brokers
KeyPairName:
Type: AWS::EC2::KeyPair::KeyName
Description: Key pair for SSH access
VpcId:
Type: AWS::EC2::VPC::Id
Description: VPC ID for the Kafka cluster
SubnetIds:
Type: List<AWS::EC2::Subnet::Id>
Description: Subnet IDs for the Kafka cluster
Resources:
KafkaSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for Kafka cluster
VpcId: !Ref VpcId
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 9092
ToPort: 9092
SourceSecurityGroupId: !Ref KafkaSecurityGroup
- IpProtocol: tcp
FromPort: 2181
ToPort: 2181
SourceSecurityGroupId: !Ref KafkaSecurityGroup
- IpProtocol: tcp
FromPort: 8080
ToPort: 8080
CidrIp: 0.0.0.0/0
- IpProtocol: tcp
FromPort: 22
ToPort: 22
CidrIp: 0.0.0.0/0
KafkaLaunchTemplate:
Type: AWS::EC2::LaunchTemplate
Properties:
LaunchTemplateName: kafka-production-template
LaunchTemplateData:
ImageId: ami-0c02fb55956c7d316
InstanceType: !Ref InstanceType
KeyName: !Ref KeyPairName
SecurityGroupIds:
- !Ref KafkaSecurityGroup
IamInstanceProfile:
Name: !Ref KafkaInstanceProfile
BlockDeviceMappings:
- DeviceName: /dev/xvda
Ebs:
VolumeSize: 50
VolumeType: gp3
- DeviceName: /dev/xvdf
Ebs:
VolumeSize: 500
VolumeType: gp3
Iops: 3000
UserData:
Fn::Base64: !Sub |
#!/bin/bash
yum update -y
yum install -y java-11-amazon-corretto-headless
# Download and install Kafka
cd /opt
wget https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz
tar -xzf kafka_2.13-2.8.2.tgz
ln -s kafka_2.13-2.8.2 kafka
# Setup data directory
mkfs.ext4 /dev/xvdf
mkdir -p /kafka-data
mount /dev/xvdf /kafka-data
echo '/dev/xvdf /kafka-data ext4 defaults,nofail 0 2' >> /etc/fstab
# Create kafka user and set permissions
useradd -r -s /bin/false kafka
chown -R kafka:kafka /opt/kafka*
chown -R kafka:kafka /kafka-data
# Configure and start services
systemctl enable kafka
systemctl start kafka
KafkaInstanceProfile:
Type: AWS::IAM::InstanceProfile
Properties:
Roles:
- !Ref KafkaRole
KafkaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: ec2.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/CloudWatchAgentServerPolicy
Policies:
- PolicyName: KafkaPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- cloudwatch:PutMetricData
- ec2:DescribeInstances
- ec2:DescribeVolumes
Resource: "*"
KafkaAutoScalingGroup:
Type: AWS::AutoScaling::AutoScalingGroup
Properties:
LaunchTemplate:
LaunchTemplateId: !Ref KafkaLaunchTemplate
Version: !GetAtt KafkaLaunchTemplate.LatestVersionNumber
MinSize: 1
MaxSize: 5
DesiredCapacity: 3
VPCZoneIdentifier: !Ref SubnetIds
HealthCheckType: EC2
HealthCheckGracePeriod: 300
Tags:
- Key: Name
Value: Kafka-Broker
PropagateAtLaunch: true
Outputs:
SecurityGroupId:
Description: Security Group ID for Kafka cluster
Value: !Ref KafkaSecurityGroup
Export:
Name: !Sub "${AWS::StackName}-SecurityGroup"
LaunchTemplateId:
Description: Launch Template ID for Kafka
Value: !Ref KafkaLaunchTemplate
Export:
Name: !Sub "${AWS::StackName}-LaunchTemplate"Create scripts/deploy.sh:
#!/bin/bash
# Kafka deployment script
set -e
# Configuration
STACK_NAME="kafka-production"
REGION="us-east-1"
INSTANCE_TYPE="m5.2xlarge"
KEY_PAIR="your-key-pair"
VPC_ID="vpc-xxxxxxxxx"
SUBNET_IDS="subnet-xxxxxxxxx,subnet-yyyyyyyyy"
echo "Deploying Kafka stack..."
# Deploy CloudFormation stack
aws cloudformation deploy \
--template-file infrastructure/kafka-stack.yaml \
--stack-name $STACK_NAME \
--parameter-overrides \
InstanceType=$INSTANCE_TYPE \
KeyPairName=$KEY_PAIR \
VpcId=$VPC_ID \
SubnetIds=$SUBNET_IDS \
--capabilities CAPABILITY_IAM \
--region $REGION
# Wait for stack to complete
aws cloudformation wait stack-create-complete \
--stack-name $STACK_NAME \
--region $REGION
echo "Kafka stack deployed successfully!"
# Get instance IPs
INSTANCE_IPS=$(aws ec2 describe-instances \
--filters "Name=tag:Name,Values=Kafka-Broker" "Name=instance-state-name,Values=running" \
--query 'Reservations[*].Instances[*].PublicIpAddress' \
--output text \
--region $REGION)
echo "Kafka brokers deployed at:"
for ip in $INSTANCE_IPS; do
echo " $ip:9092"
done
echo "Kafka UI available at:"
for ip in $INSTANCE_IPS; do
echo " http://$ip:8080"
doneThis comprehensive guide provides everything needed to deploy a production-ready Kafka cluster on EC2 with optimal performance, cost efficiency, and TypeScript/Node.js integration. The setup includes:
- High Performance: Optimized for 100K+ messages/second with sub-millisecond latency
- Cost Efficiency: Self-hosted solution saving 60-70% compared to managed services
- Production Ready: Monitoring, security, and reliability features
- Developer Friendly: Complete TypeScript SDK with examples
- Scalable: Auto-scaling and multi-broker support
- Maintainable: Automated deployment and maintenance scripts
- Throughput: 100,000+ messages/second
- Latency: < 1ms for producer, < 5ms for consumer
- Availability: 99.9% uptime with proper configuration
- Cost: 60-70% reduction compared to AWS MSK
- Customize configurations for your specific use case
- Implement monitoring and alerting
- Set up backup and disaster recovery
- Scale horizontally as needed
- Optimize based on your message patterns
Remember to always test in a staging environment before deploying to production, and monitor performance metrics to ensure optimal operation.
📱 Producer App → 🏢 Kafka → 📱 Consumer App
│ │ │
├─ Create Message ├─ Store Topic ├─ Process Message
├─ Send to Topic ├─ Replicate ├─ Send Ack
└─ Get Confirmation └─ Forward └─ Complete
🌐 KAFKA ECOSYSTEM
├── 🟨 Node.js/TypeScript (Web apps, APIs)
│ ├── kafkajs library
│ ├── High-level API
│ └── Great for microservices
│
├── ☕ Java/Spring Boot (Enterprise apps)
│ ├── kafka-clients library
│ ├── Native Kafka support
│ └── Best performance
│
└── 🐍 Python (Data processing, ML)
├── kafka-python library
├── Simple syntax
└── Perfect for analytics
# Create project
mkdir kafka-nodejs && cd kafka-nodejs
npm init -y && npm install kafkajs
# Create simple producer
cat > producer.js << 'EOF'
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();
async function run() {
await producer.connect();
await producer.send({
topic: 'test-topic',
messages: [{ value: 'Hello Kafka from Node.js!' }]
});
await producer.disconnect();
console.log('✅ Message sent!');
}
run().catch(console.error);
EOF
# Run it
node producer.js# Create project
mkdir kafka-python && cd kafka-python
pip install kafka-python
# Create simple producer
cat > producer.py << 'EOF'
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
producer.send('test-topic', {'message': 'Hello from Python!'})
producer.flush()
print('✅ Message sent!')
EOF
# Run it
python producer.py# Create Maven project
mvn archetype:generate -DgroupId=com.example -DartifactId=kafka-java -DinteractiveMode=false
# Add Kafka dependency to pom.xml (copy from Java section above)
# Create KafkaProducer class (copy from Java section above)
# Run with: mvn exec:java# Create topic manually
kafka-topics.sh --create --bootstrap-server localhost:9092 \
--topic your-topic-name --partitions 4 --replication-factor 1# Debug with console consumer
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic your-topic --from-beginning# Optimize producer settings
batch.size=65536
linger.ms=10
compression.type=lz4