Created
August 30, 2023 03:49
-
-
Save AutMaple/2044b9479515f515765ae627aa01c889 to your computer and use it in GitHub Desktop.
[kafka消费者和生产者] #kafka,#python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/home/autmaple/Tools/personal/kafka/.venv/bin/python | |
| from confluent_kafka import Consumer, KafkaError | |
| import argparse | |
| parser = argparse.ArgumentParser(description='Kafka Producer') | |
| parser.add_argument('--broker', required=True, help='Kafka broker address') | |
| parser.add_argument('--topic', required=True, help='Kafka topic to produce to') | |
| args = parser.parse_args() | |
| # Create Kafka consumer configuration | |
| conf = { | |
| 'bootstrap.servers': args.broker, | |
| 'group.id': args.topic + '-consumer-group', | |
| 'auto.offset.reset': 'earliest' | |
| } | |
| # Create Kafka consumer instance | |
| consumer = Consumer(conf) | |
| # Subscribe to the topic | |
| consumer.subscribe([args.topic]) | |
| # Setup loop to consume messages | |
| running = True | |
| try: | |
| while running: | |
| msg = consumer.poll(1.0) | |
| if msg is None: | |
| continue | |
| if msg.error(): | |
| if msg.error().code() == KafkaError._PARTITION_EOF: | |
| print('Reached end of partition') | |
| else: | |
| print('Error: {}'.format(msg.error())) | |
| else: | |
| print('Received message: {}'.format(msg.value().decode('utf-8'))) | |
| except KeyboardInterrupt: | |
| # Close the consumer | |
| consumer.close() | |
| print("\nExiting...") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/home/autmaple/Tools/personal/kafka/.venv/bin/python | |
| from confluent_kafka import Producer | |
| import argparse | |
| import readline # Import readline module for enhanced input | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Kafka Producer') | |
| parser.add_argument('--broker', required=True, help='Kafka broker address') | |
| parser.add_argument('--topic', required=True, help='Kafka topic to produce to') | |
| args = parser.parse_args() | |
| conf = { | |
| 'bootstrap.servers': args.broker | |
| } | |
| producer = Producer(conf) | |
| try: | |
| while True: | |
| # Use readline to get user input with history and editing support | |
| message = input_with_history("Enter a message (Ctrl+C to exit): ") | |
| # Produce the message to Kafka | |
| producer.produce(args.topic, key=None, value=message.encode('utf-8')) | |
| producer.flush() # Wait for the message to be delivered | |
| print("Message sent successfully!") | |
| except KeyboardInterrupt: | |
| print("\nExiting...") | |
| def input_with_history(prompt): | |
| line = input(prompt) | |
| readline.add_history(line) | |
| return line | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment