Created
January 17, 2017 09:19
-
-
Save IharKrasnik/1c98ad3f6a68f698b54705c654053437 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| var Q = require('q'); | |
| var domain = require('domain'); | |
| var http = require('http'); | |
| var util = require('util'); | |
| var appUtils = require('lib/utils/appUtils'); | |
| var _ = require('lodash'); | |
| module.exports.start = function (logger, consumer, inMemoryBus, registerHandlers, options) { | |
| return new Worker(logger, consumer, inMemoryBus, registerHandlers, options); | |
| }; | |
| var DEFAULTS = { | |
| maxAttempts: 5, | |
| healthCheckPort: null, | |
| skipMessage: null, | |
| onMessage: null, | |
| onStarted: null, | |
| onFailure: null, | |
| onMessageProcessed: null, | |
| onMessageFailed: null | |
| }; | |
| function Worker (logger, consumer, inMemoryBus, registerHandlers, options) { | |
| var self = this; | |
| GLOBAL.SyncWorkerContext = {}; | |
| // create a top-level domain for the worker | |
| // domain helps catch unexpected errors and shutdown gracefully. | |
| // 1. https://nodejs.org/api/process.html#process_event_uncaughtexception | |
| // 2. https://nodejs.org/api/domain.html#domain_warning_don_t_ignore_errors | |
| var workerDomain = domain.create(); | |
| function failWorker (err, evt) { | |
| self.logger.error('Worker [' + consumer.getConsumerGroupId() + '] failed.\n', err.stack, err.message); | |
| self.safeExit(1, consumer.getConsumerGroupId()); | |
| } | |
| function messageFailed (err, evt) { | |
| self.logger.error('Worker [' + consumer.getConsumerGroupId() + '] failed.\n', err.stack, err.message); | |
| if (evt) { | |
| options.onMessageFailed(evt, err); | |
| } else { | |
| self.safeExit(1); | |
| } | |
| } | |
| workerDomain.on('error', messageFailed); | |
| options = _.extend(DEFAULTS, options); | |
| this.options = options; | |
| if (!options.onFailure) { | |
| options.onFailure = messageFailed; | |
| } | |
| if (!options.onMessageFailed) { | |
| options.onMessageFailed = _.noop; | |
| } | |
| this.consumer = consumer; | |
| this.inMemoryBus = inMemoryBus; | |
| this.inMemoryBus.setOptions({ | |
| maxQueueSize: 50, | |
| maxAttempts: this.options.maxAttempts | |
| }); | |
| this.logger = logger; | |
| this.isClosing = false; | |
| this.topicPayloads = null; | |
| workerDomain.run(function () { | |
| //Kafka node.js consumer may receive 100 commands at once, | |
| //so we are using ordered in memory bus | |
| consumer.on('message', function (msg) { | |
| if (self.isClosing) { | |
| return; | |
| } | |
| var message = appUtils.parseJSON(msg.value); | |
| message.kafka = { | |
| topic: msg.topic, | |
| offset: msg.offset, | |
| partition: msg.partition | |
| }; | |
| if (message.metadata && message.metadata.name) { | |
| logger.debug('Received [%s] from [%s:%s:%s]. ', message.metadata.name, msg.topic, msg.partition, msg.offset); | |
| console.log('Received [%s] from [%s:%s:%s]. ', message.metadata.name, msg.topic, msg.partition, msg.offset); | |
| return Q.fcall(function () { | |
| if (options.skipMessage) { | |
| return options.skipMessage(message); | |
| } | |
| return false; | |
| }) | |
| .then(function (skip) { | |
| if (skip) { | |
| message._skipped = true; | |
| inMemoryBus.emit('event-processed', message); | |
| } else { | |
| if (options.onMessage) { | |
| options.onMessage(message); | |
| } | |
| if (inMemoryBus.hasListenersFor(message.metadata.name)) { | |
| inMemoryBus.emit(message.metadata.name, message); | |
| } else { | |
| message._hasNoListeners = true; | |
| inMemoryBus.emit('event-processed', message); | |
| } | |
| } | |
| }); | |
| } | |
| else { | |
| logger.warn('Received message with unknown structure, not sure how to process', message); | |
| } | |
| }); | |
| self.inMemoryBus.onInternal('event-processing-started', function (message) { | |
| console.log('Processing message [%s] from [%s:%s:%s]. ', message.metadata.name, message.kafka.topic, message.kafka.partition, message.kafka.offset); | |
| GLOBAL.SyncWorkerContext.message = message; | |
| if (options.onMessageProcessingStarted) { | |
| options.onMessageProcessingStarted(message); | |
| } | |
| }); | |
| if (options.onMessageProcessed) { | |
| self.inMemoryBus.onInternal('event-processed', function (message) { | |
| if (!message._skipped && !message.failed) { | |
| return options.onMessageProcessed(message); | |
| } | |
| }); | |
| } | |
| self.keepInMemoryQueueSmall(); | |
| var intervalId = self.autoCommitOffsets(); | |
| self.autoCommitIntervalId = intervalId; | |
| workerDomain.add(intervalId); | |
| workerDomain.add(self.consumer); | |
| Q.fcall(function () { | |
| if (process.env.SYNCWORKER_SKIP_HANDLERS === "1") { | |
| logger.warn('SYNCWORKER HANDLERS DISABLED'); | |
| return; | |
| } | |
| return registerHandlers(); | |
| }) | |
| .then(function () { | |
| self.consumer.connect(); | |
| }).done(); | |
| self.rebalancing = true; | |
| self.consumer.on('rebalancing', function () { | |
| self.rebalancing = true; | |
| self.consumer.pause(); | |
| }); | |
| self.consumer.once('rebalanced', function () { | |
| if (options.healthCheckPort) { | |
| self.healthCheck = new HealthCheck(logger, consumer.getId(), options.healthCheckPort, () => { | |
| return { | |
| queueSize: self.inMemoryBus.getQueueSize() | |
| }; | |
| }); | |
| } | |
| if (options.onStarted) { | |
| options.onStarted(); | |
| } | |
| }); | |
| self.consumer.on('rebalanced', function () { | |
| function commitOffsetAndFetch () { | |
| function updateOffsetsPartitionsAfterRebalancedAndFetch () { | |
| //rebalance may happen again while offsets were commiting | |
| if (!self.consumer.isRebalancing()) { | |
| self.topicPayloads = _.cloneDeep(self.consumer.getTopicPayloads()); | |
| self.logger.debug('topic payloads after rebalanced', self.topicPayloads); | |
| self.rebalancing = false; | |
| if (self.consumer.isPaused()) { | |
| self.consumer.resume(); //fetch | |
| } | |
| } | |
| } | |
| if (self.topicPayloads) { //not initial rebalancing | |
| self.commitOffsets(function (err) { | |
| if (err) { | |
| throw err; | |
| } | |
| updateOffsetsPartitionsAfterRebalancedAndFetch(); | |
| }); | |
| } else { | |
| updateOffsetsPartitionsAfterRebalancedAndFetch(); | |
| } | |
| } | |
| if (inMemoryBus.isEmpty()) { | |
| commitOffsetAndFetch(); | |
| } else { | |
| inMemoryBus.onceInternal('queue-empty', commitOffsetAndFetch); | |
| } | |
| }); | |
| self.consumer.on('partition-offset-reset', function (data) { | |
| var topicPayload = _.find(self.topicPayloads, function (topicPayload) { | |
| return topicPayload.topic == data.topic && topicPayload.partition == data.partition; | |
| }); | |
| topicPayload.offset = data.offset; | |
| if (!_.find(self.topicPayloads, { offset: 0 })) { | |
| if (self.consumer.isPaused()) { | |
| self.consumer.resume(); | |
| } | |
| } | |
| }); | |
| self.handleUnexpectedErrors(); | |
| self.handleProcessErrors(); | |
| }); | |
| } | |
| /** | |
| * Pause consuming new message when in memory queue size become bigger than 100 messages | |
| * @param consumer - kafka consumer | |
| */ | |
| Worker.prototype.keepInMemoryQueueSmall = function () { | |
| var self = this; | |
| self.inMemoryBus.onInternal('queue-empty', function () { | |
| if (self.consumer.isPaused()) { | |
| self.consumer.resume(); | |
| } | |
| }); | |
| self.inMemoryBus.onInternal('max-queue-size-reached', function () { | |
| self.consumer.pause(); | |
| }); | |
| }; | |
| /** | |
| * Commit offsets of processed messages every 10 seconds. | |
| * | |
| * @param consumer - kafka consumer | |
| */ | |
| Worker.prototype.autoCommitOffsets = function () { | |
| var self = this; | |
| //Track events which already processed | |
| self.inMemoryBus.onInternal('event-processed', function (evt) { | |
| evt.processedOn = new Date(); | |
| var topicPayload = _.find(self.topicPayloads, function (topicPayload) { | |
| return (topicPayload.topic == evt.kafka.topic) && (topicPayload.partition == evt.kafka.partition); | |
| }); | |
| //In kafka, stored offset is offset of the message from which start processing | |
| topicPayload.offset = evt.kafka.offset + 1; | |
| }); | |
| return setInterval(function () { | |
| if (!self.rebalancing && !self.commiting) { | |
| self.consumer.pause(); | |
| function commit () { | |
| self.commitOffsets(function (err) { | |
| if (err) { | |
| throw err; | |
| } | |
| self.commiting = false; | |
| if (self.consumer.isPaused()) { | |
| self.consumer.resume(); | |
| } | |
| }); | |
| } | |
| self.commiting = true; | |
| if (self.inMemoryBus.isEmpty()) { | |
| commit(); | |
| } else { | |
| self.inMemoryBus.onceInternal('queue-empty', commit); | |
| } | |
| } | |
| }, 10000); | |
| }; | |
| Worker.prototype.commitOffsets = function (callback) { | |
| var self = this; | |
| self.consumer.commitOffsets(self.topicPayloads, function (err, data) { | |
| if (err) { | |
| self.logger.warn('Failed to commit offset to kafka', err); | |
| if (callback) { | |
| return callback(err); | |
| } | |
| throw err; | |
| } | |
| if (data && data.notCommitedDueToNoChanges) { | |
| //self.logger.debug('Committing offsets ignored: no changes', self.topicPayloads); | |
| } else { | |
| self.logger.debug('Committed offsets to kafka', self.topicPayloads); | |
| } | |
| if (callback) { | |
| callback(err, data); | |
| } | |
| }); | |
| }; | |
| /** | |
| * Handle unexpected exceptions which occurs and retry to process command | |
| * If not possible to process command - log failed event to loggly | |
| */ | |
| Worker.prototype.handleUnexpectedErrors = function () { | |
| var self = this; | |
| self.inMemoryBus.onInternal('error', function (data) { | |
| var err = data.err; | |
| var evt = data.evt; | |
| self.options.onFailure(err, evt); | |
| }); | |
| }; | |
| /** | |
| * Pause consumer, remove all handlers from inMemoryBus, commit offsets and exit | |
| * Force exit after 30 seconds | |
| * | |
| * @param exitCode | |
| */ | |
| Worker.prototype.safeExit = function (exitCode) { | |
| var self = this; | |
| self.consumer.pause(); | |
| self.inMemoryBus.stop(); | |
| if (!self.isClosing) { | |
| //Stop offsets auto-commit | |
| clearInterval(self.autoCommitIntervalId); | |
| self.logger.info('Committing offsets to kafka before closing consumer ', self.consumer.getConsumerGroupId(), self.topicPayloads); | |
| self.isClosing = true; | |
| self.commitOffsets(function (err) { | |
| if (err) { | |
| self.logger.warn('Failed to commit offsets to kafka before closing consumer ', self.consumer.getConsumerGroupId(), err); | |
| } else { | |
| self.logger.info('Committed offsets to kafka before closing consumer ', self.consumer.getConsumerGroupId()); | |
| } | |
| self.consumer.close(function () { | |
| self.logger.info('Closed kafka consumer ', self.consumer.getConsumerGroupId()); | |
| //give 3 seconds to persist loggly message | |
| setTimeout(function () { | |
| process.exit(exitCode); | |
| }, 3000); | |
| }); | |
| }); | |
| } | |
| //Exit process anyway in 30 seconds if something went wrong. | |
| setTimeout(function () { | |
| self.logger.warn('Forced closing worker ', self.consumer.getConsumerGroupId()); | |
| process.exit(1); | |
| }, 30000); | |
| }; | |
| Worker.prototype.handleProcessErrors = function () { | |
| var self = this; | |
| //ASYNC operations does not work here, we use domains instead as node.js suggest: https://nodejs.org/api/process.html#process_event_uncaughtexception | |
| process.on('uncaughtException', function (err) { | |
| console.error((new Date()).toUTCString() + ' uncaughtException:', err.message); | |
| self.logger.error('Fatal error, kafka consumer failed', err); | |
| process.exit(1); | |
| }); | |
| //catches ctrl+c event | |
| //IMPORTANT: In kafka we should always close consumer, before close application. Consumers won't de-register automatically | |
| //This is lead to the ConsumerRebalance errors | |
| process.on('SIGINT', function () { | |
| console.log('received SIGINT.'); | |
| self.safeExit(0); | |
| }); | |
| process.on('SIGTERM', function () { | |
| console.log('received SIGTERM from marathon, closing consumer before shutting down application.'); | |
| self.logger.warn('received SIGTERM from marathon, closing consumer before shutting down application.'); | |
| self.safeExit(0); | |
| }); | |
| process.on('SIGUSR2', function () { | |
| console.log('received SIGUSR2 from nodemon, closing consumer before shutting down application.'); | |
| self.logger.warn('received SIGUSR2 from nodemon, closing consumer before shutting down application.'); | |
| self.safeExit(0); | |
| }); | |
| process.on('beforeExit', function () { | |
| self.safeExit(1); | |
| }); | |
| }; | |
| function HealthCheck (logger, name, port, getStatusFn) { | |
| var self = this; | |
| if (port === 0) { | |
| port = self.generatePort(); | |
| } | |
| self.html = '<html><head></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;">{"consumerId": "%s", "status": "%s"}</pre></body></html>'; | |
| var server = http.createServer(function (request, response) { | |
| response.writeHead(200, { 'Content-Type': 'text/html' }); | |
| var status = _.isFunction(getStatusFn) ? getStatusFn() : {}; | |
| response.end(util.format(self.html, name, JSON.stringify(status, null, 4))); | |
| }).listen(port); | |
| server.on('error', function (err) { | |
| logger.error(new Date() + ': Health Check server error for consumer [%s], port %d. Error: %e', name, port, JSON.stringify(err)); | |
| }); | |
| logger.info(new Date() + ': Health Check for consumer [%s] started on port %d', name, port); | |
| } | |
| HealthCheck.prototype.generatePort = function () { | |
| var min = 4000, | |
| max = 5000; | |
| return Math.floor(Math.random() * (max - min)) + min; | |
| }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment