egg-nodekafka
近期不断完善, 目前仅支持 kafka: Producer 和 KafkaClient.
Install
$ npm i egg-nodekafka --save
Usage
// {app_root}/config/plugin.jsexportskafka = enable: true package: 'egg-nodekafka';
Configuration
// {app_root}/config/config.default.jsexportskafka = client: host: '127.0.0.1:9092, 127.0.0.1:9093' // 多个ip可用逗号分隔 agent: true // 支持在agent中使用kafka对象,如agent.kafka(名称视具体配置) app: true // 同上;
see config/config.default.js for more detail.
Example
'use strict'; const Controller = Controller; async { const app logger ctx = this; const producer = appkafka; let result; try result = await producer; catch error logger; ctxstatus = 201; return; logger; ctxstatus = 200; ctxbody = result: 'success' value: JSON ; } moduleexports = HomeController;
sendAsync(payloads)
-
payloads: Array,array of ProduceRequest, ProduceRequest is a JSON object like:
-
更多参考:https://github.com/SOHU-Co/kafka-node#sendpayloads-cb
-
说明: 本方法已将kafka-node官方api中send(payloads, cb),回调函数进行封装,可直接 await producer.sendAsync(payloads) 进行生产,并可得到返回结果。
topic: 'topicName' messages: 'message body' // multi messages should be a array, single message can be just a string or a KeyedMessage instance key: 'theKey' // string or buffer, only needed when using keyed partitioner partition: 0 // default 0 attributes: 2 // default: 0 timestamp: Date // <-- defaults to Date.now() (only available with kafka v0.10+)
目前插件支持的对象
- producer
- client
以上两个对象可以通过 app.kafka.producer(options: objectjson) 和 app.kafka.client 获取。
app.kafka.client 的options配置选项即为config.{$env}.js中exports.kafka的配置信息,options参数配置选项请查看 https://github.com/SOHU-Co/kafka-node#options ,配置方式如下:
// {app_root}/config/config.{$env}.jsexportskafka = client: host: '127.0.0.1:9092, 127.0.0.1:9093' // 多个ip可用逗号分隔 connectTimeout: 10000 requestTimeout: 30000 sslOptions: {} // 更多参考: https://github.com/SOHU-Co/kafka-node#options agent: true // 支持在agent中使用kafka对象,如agent.kafka(名称视具体配置) app: true // 同上;
Questions & Suggestions
Please open an issue here.