que
简介
一个基于redis
的任务队列,支持分布式(基于http),可横向拓展,错误警告与重试。
benchmark
在自己的最低配阿里云上(单核CPU,1GB内存,1M带宽),利用ab
发起10k并发任务请求(script):
ab -n 10000 -c 10000 -p 'post.txt' -T 'application/json' http://127.0.0.1:8083/task
#outputServer Software:Server Hostname: 127.0.0.1Server Port: 8083 Document Path: /taskDocument Length: 22 bytes Concurrency Level: 10000Time taken for tests: 13.323 secondsComplete requests: 10000Failed requests: 0Write errors: 0Total transferred: 1640000 bytesTotal POSTed: 1594670HTML transferred: 220000 bytesRequests per second: 750.58 [#/sec] (mean)Time per request: 13322.956 [ms] (mean)Time per request: 1.332 [ms] (mean, across all concurrent requests)Transfer rate: 120.21 [Kbytes/sec] received 116.89 kb/s sent 237.10 kb/s total Connection Times (ms) min mean[+/-sd] median maxConnect: 0 1653 2119.5 722 7005Processing: 100 863 906.1 470 6949Waiting: 94 863 906.1 469 6949Total: 254 2517 2139.3 1797 11185 Percentage of the requests served within a certain time (ms) 50% 1797 66% 3245 75% 3371 80% 3438 90% 7294 95% 7513 98% 7627 99% 7952 100% 11185 (longest request)
使用
安装
直接通过npm:
Que使用了ES6的相关特性,请在运行时加上
harmony
选项
npm install node-que --save
例子
#单机模式 Que = require 'node-que' queue = 'myTaskQue'queueon 'done' if err then consoleerror err consolelog "done! result: " = #对传入数据进行自定义操作... resolve taskDatadata #指定处理函数 queueprocessor handler for i in 0..10 queuepush data: 'myData'
#分布模式 #master,调度分配节点 Que = require 'node-que'request = require 'superagent' masterQue = 'myTaskQue'masterQuemaster'http://localhost:8081''http://localhost:8082'listen 8083masterQueon 'done' if err then consoleerror err consolelog "done! result: " #salve,工作节点 = #对传入数据进行自定义操作... resolve taskDatadata salveQue1 = 'myTaskQue'salveQue2 = 'myTaskQue'salveQue1salvehandlerlisten 8081salveQue2salvehandlerlisten 8082 #从脚本中向队列推入数据 for i in 0..10 masterQuepush data: 'by script'#通过http api向队列推入数据 request post 'http://localhost:8083/task' send data: 'by http api' set 'Accept''application/json' end if err then consoleerror err consolelog resstatus
API
new Que(queueName)
queueName: 赋予任务队列的名字,用于区分不同队列,在分布模式下,master
/salve
队列的名字必须相同
生成一个Que实例
push(taskData)
taskData: 待处理数据
将待处理数据推入任务队列(redis list
),暂只支持本地redis
processor(handler)
handler(taskData): 数据的处理函数,参数既是队列中的一个待处理数据,必须返回一个Promise实例
指定数据的处理函数
错误处理与重试
队列中的每个任务在处理出现错误时,Que
都会对其进行重试,若重试5次
仍然未成功,则放弃这个任务
getNumberOfProcessed()
获取队列中已经处理完成的任务数
getNumberOfRejected()
获取队列中重试5次仍未成功后被放弃的任务数
master(salves).listen(port)
salves: 分布模式中,所有salve工作节点
的地址数组
port: 此master分配调度节点
的监听端口
启动分布模式,将此Que作为master节点,并指定所有salves
salve(handler).listen(port)
handler(taskData): 数据的处理函数,参数既是队列中的一个待处理数据,必须返回一个Promise实例
port: 此salve工作节点
的监听端口
stop()
关闭队列
http API
POST /task
将待处理数据推入处理队列
GET /task/processed
获取队列中已经处理完成的任务数
GET /task/rejected
获取队列中重试5次仍未成功后被放弃的任务数