timed-queue
Distributed timed job queue, backed by redis.
Features
- Support redis cluster.
- Support one or more
timed-queue
instance in a redis instance. Eachtimed-queue
instance are segregated byprefix
option - Support one or more job queues in a timed-queue instance.
- Support one or more timed-queue clients for a timed-queue instance.
Demo
const TimedQueue = const timedQueue = prefix: 'TQ1' interval: 1000 * 60 // connect to redis cluster.timedQueue // create 'event' job queue in timed-queue instanceconst eventQueue = timedQueue // add 'job' listenereventQueue // add job to queueeventQueue { console}
Installation
npm install timed-queue
Job
Job
Class:
{ thisqueue = queue thisjob = job thistiming = timing thisactive = active thisretryCount = retryCount}
this.queue
: {String} Queue namethis.job
: {String} The job's namethis.timing
: {Number} The time in millisecond when the job should be activedthis.active
: {Number} The actual time in millisecond that the job be activedthis.retryCount
: {Number} A job that has been actived but has not been ACK inretry
time will be actived again.retryCount
is times that the job re-actived.
API
const TimedQueue =
timedQueue
object
new TimedQueue([options]) => Return a timedQueue
client. It is an EventEmitter instance.
options.prefix
: {String} Redis key's prefix, or namespace. Default to"TIMEDQ"
options.count
: {Number} The maximum job count for queue'sgetjobs
method. Default to64
options.interval
: {Number} Interval time for scanning. Default to1000 * 60
msoptions.retry
: {Number} Retry time for a job. A job that has been actived but has not been ACK inretry
time will be actived again. Default tointerval / 2
msoptions.expire
: {Number} Expiration time for a job. A job that has been actived and has not been ACK inexpire
time will be removed from the queue. Default tointerval * 5
msoptions.accuracy
: {Number} Scanning accuracy. Default tointerval / 5
options.autoScan
: {Boolean} The flag to enable or disable automatic scan. Default totrue
. It can be set tofalse
if automatic scan is not desired.
const timedQueue =
TimedQueue Events
- timedQueue.on('connect', function () {})
- timedQueue.on('error', function (error) {})
- timedQueue.on('close', function () {})
- timedQueue.on('scanStart', function (queuesLength) {})
- timedQueue.on('scanEnd', function (queuesLength, timeConsuming) {})
this
TimedQueue.prototype.connect([host, options]) => this
TimedQueue.prototype.connect(redisClient) => Connect to redis. Arguments are the same as thunk-redis's createClient
, or give a thunk-redis instance.
timedQueue
this
TimedQueue.prototype.scan() => Start scanning. It automatically starts after connect
method is called unless autoScan
is set to false
.
this
TimedQueue.prototype.stop() => Stop scanning.
this
TimedQueue.prototype.close() => Close the timedQueue
. It closes redis client of the timedQueue
accordingly.
this
TimedQueue.prototype.regulateFreq(factor) => It is used to regulate the automatic scanning frequency.
this
TimedQueue.prototype.destroyQueue(queue[, options]) => Remove the queue. It deletes all data in the queue from redis.
Queue
instance
TimedQueue.prototype.queue(queue[, options]) => Return a Queue
instance if one exists. Otherwise it creates a Queue
instance and return it. Queue
instance is a EventEmitter instance.
queue
: {String} The queue's nameoptions.count
: {Number} The maximum job count for queue'sgetjobs
method. Default to timedQueue'scount
options.retry
: {Number} Retry time for a job. A job that has been actived and has not been ACK inretry
time will be actived again. Default to timedQueue'sretry
options.expire
: {Number} Expiration time for job. A job that has been actived and has not been ACK inexpire
time will be removed from the queue. Default to timedQueue'sexpire
options.accuracy
: {Number} Scanning accuracy, Default to timedQueue'saccuracy
const eventQueue = timedQueue
Queue Events
- queue.on('job', function (job) {})
If no job
listener on queue, queue scanning will not run.
this
Queue.prototype.init([options]) => options.count
: {Number} The maximum job count for queue'sgetjobs
method. Default to timedQueue'scount
options.retry
: {Number} Retry time for a job. A job that has been actived and has not been ACK inretry
time will be actived again. Default to timedQueue'sretry
options.expire
: {Number} Expire time for a job. A job that has been actived and has not been ACK inexpire
time will be removed from queue. Default to timedQueue'sexpire
options.accuracy
: {Number} Scanning accuracy. Default to timedQueue'saccuracy
thunk
function
Queue.prototype.addjob(job, timing[, job, timing, ...]) => thunk
function
Queue.prototype.addjob([job, timing, job, timing, ...]) => Add one or more jobs to the queue. It can be used to update the job's timing.
job
: {String} The job's nametiming
: {Number} The time in millisecond when the job should be actived. It should greater thanDate.now()
eventQueue { console // null, 1}
thunk
function
Queue.prototype.show(job) => Show the job info.
job
: {String} job
eventQueue { console // { // queue: 'event', // job: '52b3b5f49c2238313600015d', // timing: 1441552050409 // active: 0, // retryCount: 0 // }}
thunk
function
Queue.prototype.deljob(job[, job, ...]) => thunk
function
Queue.prototype.deljob([job, job, ...]) => Delete one or more jobs.
job
: {String} job
eventQueue { console // null, 1}
thunk
function
Queue.prototype.getjobs([scanActive]) => It is called by Queue.prototype.scan
. It should not be called explicitly unless you know what you are doing.
thunk
function
Queue.prototype.ackjob(job[, job, ...]) => thunk
function
Queue.prototype.ackjob([job, job, ...]) => ACK one or more jobs.
job
: {String} job
eventQueue { console // null, 1}
thunk
function
Queue.prototype.scan() => It is called by TimedQueue.prototype.scan
. It should not be called explicitly unless you know what you are doing.
thunk
function
Queue.prototype.len() => Return the queue' length.
eventQueue { console // null, 3}
thunk
function
Queue.prototype.showActive() => Return actived jobs in the queue.
eventQueue { console // null, [jobs...]}