This package has been deprecated

Author message:

unstable

couch-kafka-elastic-streaming

1.0.2 • Public • Published

couch-kafka-elastic-streaming

This module is use for data streaming from couchdb to kafka and kafka to elasticSearch. This module devided into two part which is.

  • Couchdb Source
  • ElasticSearch Sink

Couchdb Source

Couchdb use as a source. Whenever changes happend in couchdb, couchdb source detect those changes and send to kafka.

Pre-requisite

  • kafka of kafka-cluster setup
  • Couchdb setup
  • Source properties

Source properties

const Logger = require("log4bro");
const config = {

    kafka: {
        kafkaHost: "0.0.0.0:19092,0.0.0.0:29092,0.0.0.0:39092",
        logger: new Logger(),
        groupId: "kc-test",
        clientName: "kc-test-name",
        workerPerPartition: 1,
        options: {
            sessionTimeout: 8000,
            protocol: ["roundrobin"],
            fromOffset: "earliest", //latest
            fetchMaxBytes: 1024 * 100,
            fetchMinBytes: 1,
            fetchMaxWaitMs: 10,
            heartbeatInterval: 250,
            retryMinTimeout: 250,
            requireAcks: 0,
            autocommit: true
            //ackTimeoutMs: 100,
            //partitionerType: 3
        }
    },
    topic: "kf-connector",
    partitions: 1,
    maxTasks: 1,
    pollInterval: 250,
    produceKeyed: true,
    produceCompressionType: 0,
    connector: {
        host : "localhost",
        port : 5985,
        protocol : 'http',
        auth : {
            user : 'my-secret-user',
            pass : 'my-secret-pass'
        },
        database : 'test',
        maxPollCount: 1,
        currentOffset: "4-g1AAAAFReJzLYWBg4MhgTmHgzcvPy09JdcjLz8gvLskBCjMlMiTJ____PyuRAYeCJAUgmWQPUpPBnMiSC-SxG1qmmSSmWqKrx2WCA8iEePy2JIDU1ONVk8cCJBkagBRQ2XxC6hZA1O0npO4ARN19QuoeQNSB3JcFAKA2a6s"
    },
    http: {
        port: 3145,
        middlewares: []
    },
    enableMetrics: false
};

Use of Couchdb source

const { runSourceConnector} = require("couch-kafka-elastic-streaming");

const test = () => {
    return runSourceConnector(sourceProperties, [], null).then(_config => {
        config = _config;
        config.on("record-read", id => console.log("READ :: " + id));
        config.on("error", error => console.log("ERROR :: ",error));
        return true;
    })
    .catch((err) => {
        console.log("ERROR :: ",err);
    });
}

test();

ElasticSearch Sink

ElasticSearch use as a sink. Whenever data send to kafka, elasticSearch sink consume those data from kafka consumer.

Pre-requisite

  • kafka of kafka-cluster setup
  • ElasticSearch setup
  • Sink properties

Sink properties

const Logger = require("log4bro");

const config = {

    kafka: {
        kafkaHost: "0.0.0.0:19092,0.0.0.0:29092,0.0.0.0:39092",
        logger: new Logger(),
        groupId: "kc-test",
        clientName: "kc-test-name",
        workerPerPartition: 1,
        options: {
            sessionTimeout: 8000,
            protocol: ["roundrobin"],
            fromOffset: "latest", //latest
            fetchMaxBytes: 1024 * 100,
            fetchMinBytes: 1,
            fetchMaxWaitMs: 10,
            heartbeatInterval: 250,
            retryMinTimeout: 250,
            requireAcks: 0,
            //ackTimeoutMs: 100,
            //partitionerType: 3
        }
    },
    topic: "kf-connector",
    partitions: 1,
    maxTasks: 1,
    pollInterval: 250,
    produceKeyed: true,
    produceCompressionType: 0,
    awaitRetry: 2000,
    connector: {
        clientNode : "http://elastic:secret@localhost:9200",
        index : 'my-index',
        type : 'my-topic'
    },
    http: {
        port: 3149,
        middlewares: []
    },
    enableMetrics: false
};

Use ElasticSearch Sink

const { runSinkConnector} = require("couch-kafka-elastic-streaming");

const test = () => {
    return runSinkConnector(sinkProperties, [], null).then(_config => {
        config = _config;
        config.on("record-read", id => console.log("READ :: " + id));
        config.on("error", error => console.log("ERROR :: ",error));
        return true;
    })
    .catch((err) => {
        console.log("ERROR :: ",err);
    });
}

test();

Package Sidebar

Install

npm i couch-kafka-elastic-streaming

Weekly Downloads

2

Version

1.0.2

License

ISC

Unpacked Size

20 kB

Total Files

18

Last publish

Collaborators

  • ankur039943