Skip to content

Instantly share code, notes, and snippets.

@IharKrasnik
Created January 17, 2017 09:19
Show Gist options
  • Select an option

  • Save IharKrasnik/1c98ad3f6a68f698b54705c654053437 to your computer and use it in GitHub Desktop.

Select an option

Save IharKrasnik/1c98ad3f6a68f698b54705c654053437 to your computer and use it in GitHub Desktop.
var Q = require('q');
var domain = require('domain');
var http = require('http');
var util = require('util');
var appUtils = require('lib/utils/appUtils');
var _ = require('lodash');
module.exports.start = function (logger, consumer, inMemoryBus, registerHandlers, options) {
return new Worker(logger, consumer, inMemoryBus, registerHandlers, options);
};
var DEFAULTS = {
maxAttempts: 5,
healthCheckPort: null,
skipMessage: null,
onMessage: null,
onStarted: null,
onFailure: null,
onMessageProcessed: null,
onMessageFailed: null
};
function Worker (logger, consumer, inMemoryBus, registerHandlers, options) {
var self = this;
GLOBAL.SyncWorkerContext = {};
// create a top-level domain for the worker
// domain helps catch unexpected errors and shutdown gracefully.
// 1. https://nodejs.org/api/process.html#process_event_uncaughtexception
// 2. https://nodejs.org/api/domain.html#domain_warning_don_t_ignore_errors
var workerDomain = domain.create();
function failWorker (err, evt) {
self.logger.error('Worker [' + consumer.getConsumerGroupId() + '] failed.\n', err.stack, err.message);
self.safeExit(1, consumer.getConsumerGroupId());
}
function messageFailed (err, evt) {
self.logger.error('Worker [' + consumer.getConsumerGroupId() + '] failed.\n', err.stack, err.message);
if (evt) {
options.onMessageFailed(evt, err);
} else {
self.safeExit(1);
}
}
workerDomain.on('error', messageFailed);
options = _.extend(DEFAULTS, options);
this.options = options;
if (!options.onFailure) {
options.onFailure = messageFailed;
}
if (!options.onMessageFailed) {
options.onMessageFailed = _.noop;
}
this.consumer = consumer;
this.inMemoryBus = inMemoryBus;
this.inMemoryBus.setOptions({
maxQueueSize: 50,
maxAttempts: this.options.maxAttempts
});
this.logger = logger;
this.isClosing = false;
this.topicPayloads = null;
workerDomain.run(function () {
//Kafka node.js consumer may receive 100 commands at once,
//so we are using ordered in memory bus
consumer.on('message', function (msg) {
if (self.isClosing) {
return;
}
var message = appUtils.parseJSON(msg.value);
message.kafka = {
topic: msg.topic,
offset: msg.offset,
partition: msg.partition
};
if (message.metadata && message.metadata.name) {
logger.debug('Received [%s] from [%s:%s:%s]. ', message.metadata.name, msg.topic, msg.partition, msg.offset);
console.log('Received [%s] from [%s:%s:%s]. ', message.metadata.name, msg.topic, msg.partition, msg.offset);
return Q.fcall(function () {
if (options.skipMessage) {
return options.skipMessage(message);
}
return false;
})
.then(function (skip) {
if (skip) {
message._skipped = true;
inMemoryBus.emit('event-processed', message);
} else {
if (options.onMessage) {
options.onMessage(message);
}
if (inMemoryBus.hasListenersFor(message.metadata.name)) {
inMemoryBus.emit(message.metadata.name, message);
} else {
message._hasNoListeners = true;
inMemoryBus.emit('event-processed', message);
}
}
});
}
else {
logger.warn('Received message with unknown structure, not sure how to process', message);
}
});
self.inMemoryBus.onInternal('event-processing-started', function (message) {
console.log('Processing message [%s] from [%s:%s:%s]. ', message.metadata.name, message.kafka.topic, message.kafka.partition, message.kafka.offset);
GLOBAL.SyncWorkerContext.message = message;
if (options.onMessageProcessingStarted) {
options.onMessageProcessingStarted(message);
}
});
if (options.onMessageProcessed) {
self.inMemoryBus.onInternal('event-processed', function (message) {
if (!message._skipped && !message.failed) {
return options.onMessageProcessed(message);
}
});
}
self.keepInMemoryQueueSmall();
var intervalId = self.autoCommitOffsets();
self.autoCommitIntervalId = intervalId;
workerDomain.add(intervalId);
workerDomain.add(self.consumer);
Q.fcall(function () {
if (process.env.SYNCWORKER_SKIP_HANDLERS === "1") {
logger.warn('SYNCWORKER HANDLERS DISABLED');
return;
}
return registerHandlers();
})
.then(function () {
self.consumer.connect();
}).done();
self.rebalancing = true;
self.consumer.on('rebalancing', function () {
self.rebalancing = true;
self.consumer.pause();
});
self.consumer.once('rebalanced', function () {
if (options.healthCheckPort) {
self.healthCheck = new HealthCheck(logger, consumer.getId(), options.healthCheckPort, () => {
return {
queueSize: self.inMemoryBus.getQueueSize()
};
});
}
if (options.onStarted) {
options.onStarted();
}
});
self.consumer.on('rebalanced', function () {
function commitOffsetAndFetch () {
function updateOffsetsPartitionsAfterRebalancedAndFetch () {
//rebalance may happen again while offsets were commiting
if (!self.consumer.isRebalancing()) {
self.topicPayloads = _.cloneDeep(self.consumer.getTopicPayloads());
self.logger.debug('topic payloads after rebalanced', self.topicPayloads);
self.rebalancing = false;
if (self.consumer.isPaused()) {
self.consumer.resume(); //fetch
}
}
}
if (self.topicPayloads) { //not initial rebalancing
self.commitOffsets(function (err) {
if (err) {
throw err;
}
updateOffsetsPartitionsAfterRebalancedAndFetch();
});
} else {
updateOffsetsPartitionsAfterRebalancedAndFetch();
}
}
if (inMemoryBus.isEmpty()) {
commitOffsetAndFetch();
} else {
inMemoryBus.onceInternal('queue-empty', commitOffsetAndFetch);
}
});
self.consumer.on('partition-offset-reset', function (data) {
var topicPayload = _.find(self.topicPayloads, function (topicPayload) {
return topicPayload.topic == data.topic && topicPayload.partition == data.partition;
});
topicPayload.offset = data.offset;
if (!_.find(self.topicPayloads, { offset: 0 })) {
if (self.consumer.isPaused()) {
self.consumer.resume();
}
}
});
self.handleUnexpectedErrors();
self.handleProcessErrors();
});
}
/**
* Pause consuming new message when in memory queue size become bigger than 100 messages
* @param consumer - kafka consumer
*/
Worker.prototype.keepInMemoryQueueSmall = function () {
var self = this;
self.inMemoryBus.onInternal('queue-empty', function () {
if (self.consumer.isPaused()) {
self.consumer.resume();
}
});
self.inMemoryBus.onInternal('max-queue-size-reached', function () {
self.consumer.pause();
});
};
/**
* Commit offsets of processed messages every 10 seconds.
*
* @param consumer - kafka consumer
*/
Worker.prototype.autoCommitOffsets = function () {
var self = this;
//Track events which already processed
self.inMemoryBus.onInternal('event-processed', function (evt) {
evt.processedOn = new Date();
var topicPayload = _.find(self.topicPayloads, function (topicPayload) {
return (topicPayload.topic == evt.kafka.topic) && (topicPayload.partition == evt.kafka.partition);
});
//In kafka, stored offset is offset of the message from which start processing
topicPayload.offset = evt.kafka.offset + 1;
});
return setInterval(function () {
if (!self.rebalancing && !self.commiting) {
self.consumer.pause();
function commit () {
self.commitOffsets(function (err) {
if (err) {
throw err;
}
self.commiting = false;
if (self.consumer.isPaused()) {
self.consumer.resume();
}
});
}
self.commiting = true;
if (self.inMemoryBus.isEmpty()) {
commit();
} else {
self.inMemoryBus.onceInternal('queue-empty', commit);
}
}
}, 10000);
};
Worker.prototype.commitOffsets = function (callback) {
var self = this;
self.consumer.commitOffsets(self.topicPayloads, function (err, data) {
if (err) {
self.logger.warn('Failed to commit offset to kafka', err);
if (callback) {
return callback(err);
}
throw err;
}
if (data && data.notCommitedDueToNoChanges) {
//self.logger.debug('Committing offsets ignored: no changes', self.topicPayloads);
} else {
self.logger.debug('Committed offsets to kafka', self.topicPayloads);
}
if (callback) {
callback(err, data);
}
});
};
/**
* Handle unexpected exceptions which occurs and retry to process command
* If not possible to process command - log failed event to loggly
*/
Worker.prototype.handleUnexpectedErrors = function () {
var self = this;
self.inMemoryBus.onInternal('error', function (data) {
var err = data.err;
var evt = data.evt;
self.options.onFailure(err, evt);
});
};
/**
* Pause consumer, remove all handlers from inMemoryBus, commit offsets and exit
* Force exit after 30 seconds
*
* @param exitCode
*/
Worker.prototype.safeExit = function (exitCode) {
var self = this;
self.consumer.pause();
self.inMemoryBus.stop();
if (!self.isClosing) {
//Stop offsets auto-commit
clearInterval(self.autoCommitIntervalId);
self.logger.info('Committing offsets to kafka before closing consumer ', self.consumer.getConsumerGroupId(), self.topicPayloads);
self.isClosing = true;
self.commitOffsets(function (err) {
if (err) {
self.logger.warn('Failed to commit offsets to kafka before closing consumer ', self.consumer.getConsumerGroupId(), err);
} else {
self.logger.info('Committed offsets to kafka before closing consumer ', self.consumer.getConsumerGroupId());
}
self.consumer.close(function () {
self.logger.info('Closed kafka consumer ', self.consumer.getConsumerGroupId());
//give 3 seconds to persist loggly message
setTimeout(function () {
process.exit(exitCode);
}, 3000);
});
});
}
//Exit process anyway in 30 seconds if something went wrong.
setTimeout(function () {
self.logger.warn('Forced closing worker ', self.consumer.getConsumerGroupId());
process.exit(1);
}, 30000);
};
Worker.prototype.handleProcessErrors = function () {
var self = this;
//ASYNC operations does not work here, we use domains instead as node.js suggest: https://nodejs.org/api/process.html#process_event_uncaughtexception
process.on('uncaughtException', function (err) {
console.error((new Date()).toUTCString() + ' uncaughtException:', err.message);
self.logger.error('Fatal error, kafka consumer failed', err);
process.exit(1);
});
//catches ctrl+c event
//IMPORTANT: In kafka we should always close consumer, before close application. Consumers won't de-register automatically
//This is lead to the ConsumerRebalance errors
process.on('SIGINT', function () {
console.log('received SIGINT.');
self.safeExit(0);
});
process.on('SIGTERM', function () {
console.log('received SIGTERM from marathon, closing consumer before shutting down application.');
self.logger.warn('received SIGTERM from marathon, closing consumer before shutting down application.');
self.safeExit(0);
});
process.on('SIGUSR2', function () {
console.log('received SIGUSR2 from nodemon, closing consumer before shutting down application.');
self.logger.warn('received SIGUSR2 from nodemon, closing consumer before shutting down application.');
self.safeExit(0);
});
process.on('beforeExit', function () {
self.safeExit(1);
});
};
function HealthCheck (logger, name, port, getStatusFn) {
var self = this;
if (port === 0) {
port = self.generatePort();
}
self.html = '<html><head></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;">{"consumerId": "%s", "status": "%s"}</pre></body></html>';
var server = http.createServer(function (request, response) {
response.writeHead(200, { 'Content-Type': 'text/html' });
var status = _.isFunction(getStatusFn) ? getStatusFn() : {};
response.end(util.format(self.html, name, JSON.stringify(status, null, 4)));
}).listen(port);
server.on('error', function (err) {
logger.error(new Date() + ': Health Check server error for consumer [%s], port %d. Error: %e', name, port, JSON.stringify(err));
});
logger.info(new Date() + ': Health Check for consumer [%s] started on port %d', name, port);
}
HealthCheck.prototype.generatePort = function () {
var min = 4000,
max = 5000;
return Math.floor(Math.random() * (max - min)) + min;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment