kafka-node-topic-consumer

0.3.5 • Public • Published

kafka-node-topic-consumer

wrapper around kafka-node's HighLevelConsumer that provides error handling and message processing concurrency control via fastq (a lightweight fast queue modeled after async's queue).

Installing

npm install --save kafka-node kafka-node-topic-consumer

Purpose

There are two main motivations for this module:

  1. There are known issues with the high level consumer api in kafka 0.8. Often when starting consumers too quickly after a failure or too near in time to another member of the same group, rebalancing issues are experienced. To help alleviate these issues, the TopicConsumer will self heal when an error is encountered by the underlying HighLevelConsumer by first attempting to close the existing consumer before removing it and scheduling a rebuild at a random time in the near future (30-90 seconds). The rebuild process is infinite, in that if it fails, it will restart the healing process.
  2. Although kafka guarantees ordering within a partition, kafka-node's HighLevelConsumer' resembles a sort of firehose, emitting messages as soon as they arrive, regardless of how fast the application is able to process them. To control this issue, the TopicConsumer implements an in memory queue which processes a single batch of messages at a time. As soon as the underlying consumer emits the first message of a newly received batch, it pauses the consumer and pushes all messages into the queue. Once the last message has been processed, it resumes consuming messages.

Getting Started

import TopicConsumer from 'kafka-node-topic-consumer';
 
// create a new TopicConsumer
const consumer = new TopicConsumer({
  host: process.env.ZOOKEEPER_HOST,
  consumer: { groupId: 'my-consumer-group' },
  topic: 'my-topic',
});
 
consumer.registerWorker((msg) => {
  console.log(msg);
  return Promise.resolve();
});
 
consumer.on('message:error', (err, msg) => {
  console.error(err, msg);
});
 
consumer.connect()

API

constructor(options) => TopicConsumer

instantiate a new topic consumer

Params
name type description
options Object constructor options
[options.concurrency] Number number of tasks to be processed at any given time, default is 1
options.consumer Object consumer options
options.consumer.groupId String consumer group id
options.host String zookeeper connection string
[options.parse] Function a function (raw) => Promise for parsing raw kafka messages before they are pushed into the queue. the default parse function will attempt to parse the raw message's value attribute as utf-8 stringified json and add it as the parsedValue attribute on the message
[options.rebuild] Object rebuild configuration
[options.rebuild.closing] Object valid retry options for closing failed consumers
[options.rebuild.maxDelay] Number, String the maximum time to wait before rebuilding, default is 2m
[options.rebuild.minDelay] Number, String the minimum time to wait before rebuilding, default is 35s
options.topic String, Object topic name or payload
[options.validate] Function a function (parsed) => Promise for validating queue messages. Messages that fail validation will not be processed by workers
Example
import Bluebird from 'bluebird';
import joi from 'joi';
import TopicConsumer from 'kafka-node-topic-consumer';
 
const consumer = new TopicConsumer({
  host: process.env.ZOOKEEPER_HOST,
  consumer: {
    groupId: 'my-group-id'
  },
  topic: 'my-topic',
  parse(raw) {
    return Bluebird.try(() => {
      return JSON.parse(raw.value.toString('utf8'));
    });
  },
  validate(parsed) {
    const schema = joi.object({
      id: joi.string().guid().required(),
      action: joi.string().valid('create', 'destroy', 'update').required(),
      data: joi.object().required(),
    });
    const result = joi.validate(parsed, schema);
    if (result.error) {
      return Promise.reject(result.error);
    }
    return Promise.resolve(result.value);
  },
});

connect([done]) => Promise

Wait for a new consumer to register

Params
name type description
done Function optional callback
Example
consumer.connect(err => {});
 
consumer.connect()
.then(() => {})
.catch(err => {});

consumer

the underlying HighLevelConsumer instance


queue

the underlying queue instance


getStatus() => Object

get current status

Returns
{
  "consumer": {
    "groupId": "my-consumer-group",
    "initialized": false,
    "ready": true,
    "closing": false,
    "paused": false,
    "rebalancing": false,
    "topicPayloads": [
      {
        "topic": "my-topic",
        "partition": "6",
        "offset": 39,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "7",
        "offset": 19,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "8",
        "offset": 16,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "9",
        "offset": 28,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "10",
        "offset": 14,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "11",
        "offset": 33,
        "maxBytes": 1048576,
        "metadata": "m"
      }
    ]
  },
  "queue": {
    "idle": true,
    "length": 0
  },
  "status": "up"
}

registerWorker(worker)

register a new worker function

Params
name type description
worker Function a function worker(parsed) => Promise that is passed every (valid) message for processing
Example
consumer.registerWorker(parsed => {
  return Promise.resolve();
});

Events

the TopicConsumer extends from the EventEmitter class and emits the following lifecycle events:

event description
consumer:closing-error fired (err) when all attempts to close a failed consumer have failed
consumer:commit-error fired (err) when an error is encountered commiting offsets
consumer:connecting fired when a new consumer instance is waiting to connect/register
consumer:error fired (err) anytime the underlying consumer emits an error
consumer:offset-out-of-range fired when underlying consumer encounters an OffsetOutOfRangeError
consumer:pausing fired when first message is pushed into queue and underlying consumer is paused
consumer:rebuild-initiated fired when the rebuild process has been initiated
consumer:rebuild-scheduled fired (delayInSeconds) when the rebuild has been scheduled
consumer:rebuild-started fired when the rebuild has started
consumer:resuming fired when last task in queue has been processed and underlying consumer is resuming
consumer:starting fired after a new consumer has registered and is beginning to fetch messages
message:processing fired (parsed) when the queue has started processing a message
message:skipped fired (parsed, reason) when a message fails validation
message:success fired (parsed, results) when a message has been successfully processed
message:error fired (err, parsed) when a worker rejects

Testing

Requires docker 1.8+ and docker-compose 1.12+

docker-compose up

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

License

Copyright (c) 2016 Gaia

Licensed under the MIT license

Package Sidebar

Install

npm i kafka-node-topic-consumer

Weekly Downloads

1

Version

0.3.5

License

MIT

Last publish

Collaborators

  • cludden