egg-mqc
Install
$ npm i egg-mqc --save
Usage
// {app_root}/config/plugin.jsexportsmqc = enable: true package: 'egg-mqc';
Configuration
// {app_root}/config/config.default.jsexportsamqp = protocol: 'amqp' hostname: 'localhost' port: 5672 username: 'Thomas' password: 'Thomas.A.Edison' vhost: '/' // http://www.squaremobius.net/amqp.node/ssl.html // opts: { // cert: certificateAsBuffer, client cert // key: privateKeyAsBuffer, client key // passphrase: 'MySecretPassword', passphrase for key // ca: [caCertAsBuffer], array of trusted CA certs // }, pool: max: 10 // maximum size of the pool min: 2 // minimum size of the pool acquireTimeoutMillis?: number ;
see config/config.default.js for more detail.
Example
- app/extends/application.ts
/**
* publish data to topic
* @param this
* @param topicKey
* @param data
*/
mqPublish(this: Application, topicKey: string, data: Buffer) {
const exchange = 'amq.topic';
return this.amqp()
.then(conn => conn.createChannel())
.then(channel => {
const result = channel.publish(exchange, topicKey, data, { persistent: true });
channel.close();
return result;
})
.catch(() => false);
}
- publish message to topic
const rst = await this.app.mqPublish('this_is_your_topic', Buffer.from('hello there'));
- consume in app.ts
async serverDidReady() {
// Server is listening.
subscribe(this.app, "#", "fund.queue", (channel: Channel, msg: ConsumeMessage | null) => {
if (msg) {
let text = msg.content.toString();
let { exchange, routingKey } = msg.fields;
this.app.logger.info(` - [${exchange}, ${routingKey}] ${text}`);
channel.ack(msg);
}
});
}
define AmqpConsumer in file app/lib/amqpconsumer
import { Connection, ConsumeMessage, Channel } from 'amqplib';
import { Application } from 'egg';
export class AmqpConsumer {
private readonly tag: string;
private readonly app: Application;
private handler: (conn: Connection, tag: string) => {};
constructor(app: Application, handler, tag?: string) {
this.tag = tag && `[${tag}]` || '';
this.app = app;
this.handler = handler;
this.process();
}
public process() {
this.app.logger.info(`${this.tag} start consume...`);
this.app.amqp().then(conn => {
conn.on('close', () => {
this.app.logger.info(`${this.tag} consume is closed!!!`);
this.process();
});
return conn;
}).then(conn => {
this.app.logger.info(`${this.tag} consuming`);
this.handler(conn, this.tag);
}).catch(() => {
this.app.logger.info(`${this.tag} error!`);
this.process();
});
}
}
/**
* 订阅消息
* 使用示例
* subscribe(this.app, "#", "fund.queue", (channel: Channel, msg: ConsumeMessage | null) => {
if (msg) {
let text = msg.content.toString();
let { exchange, routingKey } = msg.fields;
this.app.logger.info(`[${exchange}, ${routingKey}] ${text}`);
channel.ack(msg);
}
});
* @param app
* @param topic 主题名字
* @param queue 消息队列名字
* @param handler 订阅处理,处理完消息务必调用channel.ack(msg)
*/
export function subscribe(app: Application, topic: string, queue: string, handler: (channel: Channel, msg: ConsumeMessage | null) => void) {
new AmqpConsumer(app, (conn) => {
const exchange = 'amq.topic';
conn.createChannel().then((channel => {
// 消费时,绑定队列到exchange
channel.assertExchange(exchange, 'topic', { durable: true });
channel.assertQueue(queue, { durable: true });
channel.bindQueue(queue, exchange, topic);
channel.consume(queue, (msg) => {
try {
handler(channel, msg);
} catch (err) {
app.logger.error(`consume ${queue} error ${err}`);
}
});
}))
}, topic);
}
Questions & Suggestions
Please open an issue here.