Skip to content

Instantly share code, notes, and snippets.

@rajeshpv
Created February 11, 2026 06:38
Show Gist options
  • Select an option

  • Save rajeshpv/20c13dc2c1eb84959aebcec9152cb66f to your computer and use it in GitHub Desktop.

Select an option

Save rajeshpv/20c13dc2c1eb84959aebcec9152cb66f to your computer and use it in GitHub Desktop.
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "kafka-python",
# ]
# ///
"""Connect to Kafka and read messages from the fcs topic."""
import os
from datetime import datetime
from kafka import KafkaConsumer
def safe_decode(b: bytes | None) -> str | None:
if b is None:
return None
return b.decode("utf-8", errors="replace")
def format_timestamp(ts) -> str:
if ts is None:
return "n/a"
if isinstance(ts, tuple):
ts = ts[1] # (timestamp_type, ms)
if ts <= 0:
return "n/a"
return datetime.utcfromtimestamp(ts / 1000.0).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
def print_message(msg, count: int) -> None:
key = safe_decode(msg.key)
value = safe_decode(msg.value)
ts = getattr(msg, "timestamp", None) or 0
meta = [f"Offset: {msg.offset}", f"Key: {key}", f"Timestamp: {format_timestamp(ts)}"]
header_pairs = []
if msg.headers:
for hkey, hval in msg.headers:
hkey_str = safe_decode(hkey) if isinstance(hkey, bytes) else str(hkey)
hval_str = safe_decode(hval) if isinstance(hval, bytes) else str(hval)
header_pairs.append(f"{hkey_str}: {hval_str}")
line = " | ".join(meta) + (" || " + " | ".join(header_pairs) if header_pairs else "")
print(line)
print(f"Value: {value}")
print()
BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092").split(",")
TOPIC = os.getenv("TOPIC_NAME", "UNKNOWN_TOPIC")
consumer = KafkaConsumer(
TOPIC,
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id="kafka-test-consumer-1",
auto_offset_reset="earliest",
)
try:
count = 0
for msg in consumer:
if count >= 10:
break
count += 1
print_message(msg, count)
finally:
consumer.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment