Skip to content

Instantly share code, notes, and snippets.

@AutMaple
Created August 30, 2023 03:49
Show Gist options
  • Select an option

  • Save AutMaple/2044b9479515f515765ae627aa01c889 to your computer and use it in GitHub Desktop.

Select an option

Save AutMaple/2044b9479515f515765ae627aa01c889 to your computer and use it in GitHub Desktop.
[kafka消费者和生产者] #kafka,#python
#!/home/autmaple/Tools/personal/kafka/.venv/bin/python
from confluent_kafka import Consumer, KafkaError
import argparse
parser = argparse.ArgumentParser(description='Kafka Producer')
parser.add_argument('--broker', required=True, help='Kafka broker address')
parser.add_argument('--topic', required=True, help='Kafka topic to produce to')
args = parser.parse_args()
# Create Kafka consumer configuration
conf = {
'bootstrap.servers': args.broker,
'group.id': args.topic + '-consumer-group',
'auto.offset.reset': 'earliest'
}
# Create Kafka consumer instance
consumer = Consumer(conf)
# Subscribe to the topic
consumer.subscribe([args.topic])
# Setup loop to consume messages
running = True
try:
while running:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('Reached end of partition')
else:
print('Error: {}'.format(msg.error()))
else:
print('Received message: {}'.format(msg.value().decode('utf-8')))
except KeyboardInterrupt:
# Close the consumer
consumer.close()
print("\nExiting...")
#!/home/autmaple/Tools/personal/kafka/.venv/bin/python
from confluent_kafka import Producer
import argparse
import readline # Import readline module for enhanced input
def main():
parser = argparse.ArgumentParser(description='Kafka Producer')
parser.add_argument('--broker', required=True, help='Kafka broker address')
parser.add_argument('--topic', required=True, help='Kafka topic to produce to')
args = parser.parse_args()
conf = {
'bootstrap.servers': args.broker
}
producer = Producer(conf)
try:
while True:
# Use readline to get user input with history and editing support
message = input_with_history("Enter a message (Ctrl+C to exit): ")
# Produce the message to Kafka
producer.produce(args.topic, key=None, value=message.encode('utf-8'))
producer.flush() # Wait for the message to be delivered
print("Message sent successfully!")
except KeyboardInterrupt:
print("\nExiting...")
def input_with_history(prompt):
line = input(prompt)
readline.add_history(line)
return line
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment