Test 1)
Forcing the seed broker to fail with ECONNRESET. The consumer example on the repo was changed to:
const kafka = new Kafka({
brokers: ['127.0.0.1:9094'],
// other configs
})Test 1)
Forcing the seed broker to fail with ECONNRESET. The consumer example on the repo was changed to:
const kafka = new Kafka({
brokers: ['127.0.0.1:9094'],
// other configs
})| const createProducer = require('../../producer') | |
| const createConsumer = require('../index') | |
| const { Types, Codecs } = require('../../protocol/message/compression') | |
| const LZ4 = require('kafkajs-lz4') | |
| Codecs[Types.LZ4] = new LZ4().codec | |
| const { | |
| secureRandom, | |
| createCluster, |
| const fs = require('fs') | |
| const ip = require('ip') | |
| const cluster = require('cluster') | |
| const { Kafka, logLevel } = require('../index') | |
| const errorTypes = ['unhandledRejection', 'uncaughtException'] | |
| const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'] | |
| const host = process.env.HOST_IP || ip.address() |
| const { promisify } = require('util') | |
| const snappy = require('snappy') | |
| const snappyCompress = promisify(snappy.compress) | |
| const snappyDecompress = promisify(snappy.uncompress) | |
| const XERIAL_HEADER = Buffer.from([130, 83, 78, 65, 80, 80, 89, 0]) | |
| const SIZE_BYTES = 4 | |
| const SIZE_OFFSET = 16 |
| $ phobos start | |
| [2016-08-13T17:29:59:218+0200Z] INFO -- Phobos : <Hash> {:message=>"Phobos configured", | |
| :env=>"development"} | |
| ______ _ _ | |
| | ___ \ | | | | |
| | |_/ / |__ ___ | |__ ___ ___ | |
| | __/| '_ \ / _ \| '_ \ / _ \/ __| | |
| | | | | | | (_) | |_) | (_) \__ \ | |
| \_| |_| |_|\___/|_.__/ \___/|___/ |
| class MyHandler | |
| include PhobosDBCheckpoint::Handler | |
| def consume(payload, metadata) | |
| my_event = JSON.parse(payload) | |
| # <-- your logic (which possibly skips messages) here | |
| ack(my_event['id'], Time.now) | |
| end | |
| end |
| class MyHandler | |
| include Phobos::Handler | |
| def self.start(kafka_client) | |
| # setup handler | |
| end | |
| def self.stop | |
| # teardown | |
| end |
| class MyHandler | |
| include Phobos::Handler | |
| def consume(payload, metadata) | |
| # payload - This is the content of your Kafka message, Phobos does not attempt to | |
| # parse this content, it is delivered raw to you | |
| # metadata - A hash with useful information about this event, it contains: key, | |
| # partition, offset, retry_count, topic, group_id, and listener_id | |
| end | |
| end |
| # Gemfile | |
| platforms :ruby do | |
| gem 'byebug' | |
| end | |
| platforms :jruby do | |
| gem 'pry' | |
| end |
| # ... | |
| Gem::Specification.new do |spec| | |
| # ... | |
| if RUBY_PLATFORM =~ /java/ | |
| spec.platform = 'java' | |
| spec.add_dependency 'activerecord-jdbcpostgresql-adapter' | |
| else | |
| spec.add_dependency 'pg' | |
| end | |
| end |