Skip to content

Instantly share code, notes, and snippets.

@nagetsum
Created October 15, 2017 05:05
Show Gist options
  • Select an option

  • Save nagetsum/974777602eaeb6ce549c148a57de93fa to your computer and use it in GitHub Desktop.

Select an option

Save nagetsum/974777602eaeb6ce549c148a57de93fa to your computer and use it in GitHub Desktop.
Kafka Comsumer Sample
package kafka;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerClient {
private static Logger LOG = LoggerFactory.getLogger(ConsumerClient.class);
public static void main(String[] args) {
new ConsumerClient().run();
}
private void run() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties());
shutdownHook(consumer);
consumer.subscribe(Arrays.asList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
records.forEach(record -> {
LOG.info("partition = {}, offset = {}, key = {}, value = {}",
record.partition(), record.offset(), record.key(), record.value());
});
consumer.commitSync();
}
} catch (WakeupException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
private void shutdownHook(KafkaConsumer consumer) {
Thread mainThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
}
private static Properties properties() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.1:9092");
props.put("group.id", "consumer-group1");
props.put("client.id", "consumer-1");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment