stanchionjs
TypeScript icon, indicating that this package has built-in type declarations

1.2.1 • Public • Published

StanchionJS
StanchionJS

npm version

A simple & fast queue done right. backed by Redis, supports auto-reconnect, TypeScript, Promise and Rxjs.

Features

  • Fast. Just BLPOP and RPUSH, no other fancy stuff, simple and fast.

  • Auto-Reconnect.

  • Works with Promise. Unleash the power of async / await, say bye-bye to callback.

  • Better error handling.

  • Written in TypeScript. No hacky code and provides better experience if you are using TypeScript.

  • Reactive. If you don't like making promises, there're reactive APIs too, all APIs have two versions.

Installation

$ npm i stanchionjs

Examples

Initialize

const { Stanchion } = require('stanchionjs');
 
const stanchion = new Stanchion({
    redis: {
        host: '127.0.0.1',
        port: 6379,
        db: '1',
    },
    concurrency: 20,
});
 

How To Create a Job

const { Stanchion } = require('stanchionjs');
 
const stanchion = new Stanchion();
 
// Payload will be serialized using `JSON.stringify`
const job_1 = {
    thing: 'apple',
};
const job_2 = {
    thing: 'banana',
};
 
//
// Promise way:
//
 
stanchion.push(job_1, job_2).then(() => {
    console.log('jobs created.');
});
 
//
// Rxjs way:
//
 
stanchion.push$(job_1, job_2).subscribe(() => {
    console.log('jobs created.');
});
 

How To Process a Job

When a Job processing is done, you must tell StanchionJS so it can go fetching next Job for you. StanchionJS provides several ways to do that:

const { Stanchion } = require('stanchionjs');
const { Observable } = require('rxjs'); // Not required.
 
const stanchion = new Stanchion();
 
//
// Promise way:
//
 
stanchion.process(job => new Promise((resolve, reject) => {
    console.log('Got a job:', job);
 
    resolve();
}));
 
//
// Async / Await way:
//
 
stanchion.process(async job => {
    console.log('Got a job:', job);
});
 
//
// Rxjs way:
//
 
stanchion.process$(
    job => Observable.of(job)
        .map(job => {
            console.log('Got a job:', job);
        })
).subscribe(); // Don't forget to subscribe!
 

Error Handling

Every exception (including those from redis) can be obtained by attach handler to Stanchion instance:

const { Stanchion } = require('stanchionjs');
 
const stanchion = new Stanchion();
 
//
// Callback handler:
//
 
stanchion.onError(err => {
    console.log('error occurred', err);
});
 
//
// Rxjs stream:
//
 
stanchion.onError$().subscribe(err => {
    console.log('error occurred', err);
});
 

How To Exit

const { Stanchion } = require('stanchionjs');
 
const stanchion = new Stanchion();
 
//
// Promise way:
//
 
stanchion.shutdown().then(() => {
    console.log('Stanchion exited.');
});
 
//
// Rxjs way:
//
 
stanchion.shutdown$().subscribe(() => {
    console.log('Stanchion exited.');
});
 

When Exited

const { Stanchion } = require('stanchionjs');
 
const stanchion = new Stanchion();
 
//
// Promise way:
//
 
stanchion.onShutdowned(() => {
    console.log('Stanchion exited.');
});
 
//
// Rxjs way:
//
 
stanchion.onShutdowned$().subscribe(() => {
    console.log('Stanchion exited.');
});
 

Reference

Interfaces

ConstructOptions

Default value:

{
    // Redis configuration.
    redis: {
        host: '127.0.0.1',
        port: 6739,
    },
 
    // If you have lots of I/O intensive jobs, increase this may help.
    concurrency: 10,
    
    // Redis key for this queue.
    // Stanchion also support multiple keys, just use:
    //   rediskey: ['stanchion_queue1', 'stanchion_queue2']
    redisKey: 'stanchion_queue',
    
    // How many times you want Stanchion to try reconnecting when connection is lost.
    retryAttempts: 6,
}

Stanchion

Stanchion#constructor

constructor(optionsConstructOptions)

Stanchion#push

push(...jobsany[])Promise<void>

Stanchion#push$

push$(...jobsany[])Observable<void>

Stanchion#getSize

getSize()Promise<number>

Stanchion#getSize$

getSize$()Observable<number>

Stanchion#onError

onError(handlerErrorHandler)Subscription

Stanchion#onError$

onError$()Subject<any>

Stanchion#process

process(processorPromiseProcessor)Promise<void>

Stanchion#process$

process$(processorObservableProcessor)Observable<void>

Stanchion#shutdown

shutdown()Promise<void>

Stanchion#shutdown$

shutdown$()Observable<void>

Stanchion#isShutdowned

isShutdowned()boolean

Stanchion#onShutdowned

onShutdowned(cbVoidFunction)Subscription

Stanchion#onShutdowned$

onShutdowned$()Observable<void>

TODOs

  • Tests. Although Stanchion is bettle-tested, handling about 26K jobs per second.

  • Multiple queue (Redis key) support.

  • "unprocess" method.

  • Key-sharding support for cluster usage.

Credits

Awesome icon by Freepik, licensed under the Creative Commons BY 3.0.

Inspired by Neamar/featureless-job-queue.

Package Sidebar

Install

npm i stanchionjs

Weekly Downloads

1

Version

1.2.1

License

Apache-2.0

Last publish

Collaborators

  • aaronjan