This module is use for data streaming from couchdb to kafka and kafka to elasticSearch. This module devided into two part which is.
- Couchdb Source
- ElasticSearch Sink
Couchdb use as a source. Whenever changes happend in couchdb, couchdb source detect those changes and send to kafka.
- kafka of kafka-cluster setup
- Couchdb setup
- Source properties
const Logger = require("log4bro");
const config = {
kafka: {
kafkaHost: "0.0.0.0:19092,0.0.0.0:29092,0.0.0.0:39092",
logger: new Logger(),
groupId: "kc-test",
clientName: "kc-test-name",
workerPerPartition: 1,
options: {
sessionTimeout: 8000,
protocol: ["roundrobin"],
fromOffset: "earliest", //latest
fetchMaxBytes: 1024 * 100,
fetchMinBytes: 1,
fetchMaxWaitMs: 10,
heartbeatInterval: 250,
retryMinTimeout: 250,
requireAcks: 0,
autocommit: true
//ackTimeoutMs: 100,
//partitionerType: 3
}
},
topic: "kf-connector",
partitions: 1,
maxTasks: 1,
pollInterval: 250,
produceKeyed: true,
produceCompressionType: 0,
connector: {
host : "localhost",
port : 5985,
protocol : 'http',
auth : {
user : 'my-secret-user',
pass : 'my-secret-pass'
},
database : 'test',
maxPollCount: 1,
currentOffset: "4-g1AAAAFReJzLYWBg4MhgTmHgzcvPy09JdcjLz8gvLskBCjMlMiTJ____PyuRAYeCJAUgmWQPUpPBnMiSC-SxG1qmmSSmWqKrx2WCA8iEePy2JIDU1ONVk8cCJBkagBRQ2XxC6hZA1O0npO4ARN19QuoeQNSB3JcFAKA2a6s"
},
http: {
port: 3145,
middlewares: []
},
enableMetrics: false
};
const { runSourceConnector} = require("couch-kafka-elastic-streaming");
const test = () => {
return runSourceConnector(sourceProperties, [], null).then(_config => {
config = _config;
config.on("record-read", id => console.log("READ :: " + id));
config.on("error", error => console.log("ERROR :: ",error));
return true;
})
.catch((err) => {
console.log("ERROR :: ",err);
});
}
test();
ElasticSearch use as a sink. Whenever data send to kafka, elasticSearch sink consume those data from kafka consumer.
- kafka of kafka-cluster setup
- ElasticSearch setup
- Sink properties
const Logger = require("log4bro");
const config = {
kafka: {
kafkaHost: "0.0.0.0:19092,0.0.0.0:29092,0.0.0.0:39092",
logger: new Logger(),
groupId: "kc-test",
clientName: "kc-test-name",
workerPerPartition: 1,
options: {
sessionTimeout: 8000,
protocol: ["roundrobin"],
fromOffset: "latest", //latest
fetchMaxBytes: 1024 * 100,
fetchMinBytes: 1,
fetchMaxWaitMs: 10,
heartbeatInterval: 250,
retryMinTimeout: 250,
requireAcks: 0,
//ackTimeoutMs: 100,
//partitionerType: 3
}
},
topic: "kf-connector",
partitions: 1,
maxTasks: 1,
pollInterval: 250,
produceKeyed: true,
produceCompressionType: 0,
awaitRetry: 2000,
connector: {
clientNode : "http://elastic:secret@localhost:9200",
index : 'my-index',
type : 'my-topic'
},
http: {
port: 3149,
middlewares: []
},
enableMetrics: false
};
const { runSinkConnector} = require("couch-kafka-elastic-streaming");
const test = () => {
return runSinkConnector(sinkProperties, [], null).then(_config => {
config = _config;
config.on("record-read", id => console.log("READ :: " + id));
config.on("error", error => console.log("ERROR :: ",error));
return true;
})
.catch((err) => {
console.log("ERROR :: ",err);
});
}
test();