kafka-node-topic-consumer
wrapper around kafka-node's HighLevelConsumer
that provides error handling and message processing concurrency control via fastq (a lightweight fast queue modeled after async
's queue).
Installing
npm install --save kafka-node kafka-node-topic-consumer
Purpose
There are two main motivations for this module:
- There are known issues with the high level consumer api in kafka 0.8. Often when starting consumers too quickly after a failure or too near in time to another member of the same group, rebalancing issues are experienced. To help alleviate these issues, the
TopicConsumer
will self heal when an error is encountered by the underlying HighLevelConsumer by first attempting to close the existing consumer before removing it and scheduling a rebuild at a random time in the near future (30-90 seconds). The rebuild process is infinite, in that if it fails, it will restart the healing process. - Although kafka guarantees ordering within a partition,
kafka-node
's HighLevelConsumer' resembles a sort of firehose, emitting messages as soon as they arrive, regardless of how fast the application is able to process them. To control this issue, the TopicConsumer implements an in memory queue which processes a single batch of messages at a time. As soon as the underlying consumer emits the first message of a newly received batch, it pauses the consumer and pushes all messages into the queue. Once the last message has been processed, it resumes consuming messages.
Getting Started
; // create a new TopicConsumerconst consumer = host: processenvZOOKEEPER_HOST consumer: groupId: 'my-consumer-group' topic: 'my-topic'; consumer; consumer; consumer
API
constructor(options) => TopicConsumer
instantiate a new topic consumer
Params
name | type | description |
---|---|---|
options | Object | constructor options |
[options.concurrency] | Number | number of tasks to be processed at any given time, default is 1 |
options.consumer | Object | consumer options |
options.consumer.groupId | String | consumer group id |
options.host | String | zookeeper connection string |
[options.parse] | Function | a function (raw) => Promise for parsing raw kafka messages before they are pushed into the queue. the default parse function will attempt to parse the raw message's value attribute as utf-8 stringified json and add it as the parsedValue attribute on the message |
[options.rebuild] | Object | rebuild configuration |
[options.rebuild.closing] | Object | valid retry options for closing failed consumers |
[options.rebuild.maxDelay] | Number, String | the maximum time to wait before rebuilding, default is 2m |
[options.rebuild.minDelay] | Number, String | the minimum time to wait before rebuilding, default is 35s |
options.topic | String, Object | topic name or payload |
[options.validate] | Function | a function (parsed) => Promise for validating queue messages. Messages that fail validation will not be processed by workers |
Example
;;; const consumer = host: processenvZOOKEEPER_HOST consumer: groupId: 'my-group-id' topic: 'my-topic' { return Bluebird; } { const schema = joiobject id: joi action: joi data: joiobject ; const result = joi; if resulterror return Promise; return Promise; };
connect([done]) => Promise
Wait for a new consumer to register
Params
name | type | description |
---|---|---|
done | Function | optional callback |
Example
consumer; consumer;
consumer
the underlying HighLevelConsumer instance
queue
the underlying queue instance
getStatus() => Object
get current status
Returns
registerWorker(worker)
register a new worker function
Params
name | type | description |
---|---|---|
worker | Function | a function worker(parsed) => Promise that is passed every (valid) message for processing |
Example
consumer;
Events
the TopicConsumer extends from the EventEmitter class and emits the following lifecycle events:
event | description |
---|---|
consumer:closing-error |
fired (err) when all attempts to close a failed consumer have failed |
consumer:commit-error |
fired (err) when an error is encountered commiting offsets |
consumer:connecting |
fired when a new consumer instance is waiting to connect/register |
consumer:error |
fired (err) anytime the underlying consumer emits an error |
consumer:offset-out-of-range |
fired when underlying consumer encounters an OffsetOutOfRangeError |
consumer:pausing |
fired when first message is pushed into queue and underlying consumer is paused |
consumer:rebuild-initiated |
fired when the rebuild process has been initiated |
consumer:rebuild-scheduled |
fired (delayInSeconds) when the rebuild has been scheduled |
consumer:rebuild-started |
fired when the rebuild has started |
consumer:resuming |
fired when last task in queue has been processed and underlying consumer is resuming |
consumer:starting |
fired after a new consumer has registered and is beginning to fetch messages |
message:processing |
fired (parsed) when the queue has started processing a message |
message:skipped |
fired (parsed, reason) when a message fails validation |
message:success |
fired (parsed, results) when a message has been successfully processed |
message:error |
fired (err, parsed) when a worker rejects |
Testing
Requires docker 1.8+ and docker-compose 1.12+
docker-compose up
Contributing
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request
License
Copyright (c) 2016 Gaia
Licensed under the MIT license