Kafka Queue
Wrapper around Kafka with built in assumptions to make a keyed-message queuing system a little easier to build. This wrapper was built to support an IoT -like system, where devices in the outside world are communicating to a cloud service. Each device has a unique deviceId. Any process that consumes a device should consume devices with the same deviceId always, and those messages should arrive in order.
This wrapper uses the no-kafka library, which supports retries on connection failures automatcially. This wrapper was developed and tested using this docker container.
Usage
Producer (enqueue)
let Q = config ;Qproducer;
Consumer (dequeue)
let Q = config ;Qconsumer;
Calling the cb()
passed to your message handler controls the message commit. Called with
an err as the first argument will cancel the commit.
You can explicity handle commits, although you should have to normally:
Qconsumer;
The groupId
is optional to the call to connect()
. If not specified, then your config
should contain a groupId
. If groupId
is explicitly passed, it will override any value
specified in the config.
Config
This wrapper expects config
to look like:
"keyField": "deviceId" "connectionString": "192.168.99.103:9092" "logger": "logLevel": 1
The config is generally the same as documented here. The keyField
is required and is
the name of the field in the incoming messages (being passed to producer.send()) that contains the device id that you want to use
as a key.
Example Application
You can run a simple test in this directory. The test environment consists of an "ingest.js" script that emulates three devices sending messages into the system. These messages get sent to the "ingest" queue. There is a "relayer.js" script that reads from the ingest queue and duplicates those incoming messages to a "staging" queue and a "prod" queue. There is a "pipeline.js" script that reads from the staging or prod queue (specifiy on the command line) and prints the messages to stdout.
If you create the "ingest" queue with one partition, you can run one instance of relayer.js. If you create the queue with two partions you can run two instances of the relayer.js script ... and so forth. Same with the other queues. Let us say that you create all three (ingest, staging, prod) with 2 partions each. Then you can run 2 instances of the relayer.js script and four instances of the pipeline.js script; two with "staging" as an argument and two with "prod" as an argument. Then run ingest.js and you'll see messages flow through the system, being duplicated into the two stacks and being "worked on" in the pipeline scripts.
If you kill one of the instances in a pair, you'll see the other instance begin to take over the processing of the killed instance. If you restart the killed instance, it'll begin to process its own messages again.
You should see that messages with id 'X' will always get sent to a consistent instance of the pipeline.js script, except when that instance dies, in which case 'X' will start getting processed by a remaining instance.
Setting up Kafka for this example
Create a docker machine to host zookeeper and kafka. Execute eval $(docker-machine env MACHINE)
,
then execute "sh RUN.sh".