promise-queue-observable
A Queue implemented on top of the simple SetQueue package. It implements a two-sided queue using promises where any number of publishers can dispatch events to any number of consumers.
const observer = /* Default Configuration */ // Feel free to provide a different Promise implementation. promise: Promise /* optional callback made when the Queue is cancelled through observer.cancel() the callback is made with the PromiseQueue instances this binding. */ onCancel: undefined /* queueStyle allows you to adjust how the queue is handled. When 'next' (default) any calls to observer.next() will return the same promise until a new value has been resolved. If queueStyle is 'shift' then each call to observer.next() will return a new promise that will resolve after all other promises ahead of it have been resolved. */ queueStyle: 'next' /* any time a new promise is created - if promiseFactory is given the promise is sent to the factory with 'pull' or 'push' and the promise instance. expects the promise as a response. this is used to modify the promise to make it operate with 3rd party libraries like redux-saga's cancellation. (type, Promise) */ promiseFactory: undefined /* Log Errors ? */ log: false
Simple Example
// Create an Observableconst observable = // consumers request promises - this promise will resolve when a // publisher provides a value.observablenext observablenext // ... sometime laterobservableobservableobservable observablenext observablenext
Shift Example
// Create an Observableconst observable = queueStyle: 'shift' // consumers request promises - this promise will resolve when a // publisher provides a value.observablenext observablenext // ... sometime laterobservableobservableobservable observablenext observablenext
Promisifying Callbacks
Below we are iterating the observables value and composing them over time until the observable is cancelled (or we don't return getNextEvent). The initial caller receives the final results.
// Create an Observableconst observable = log: true { // handle cancellation however needed } const getNextEvent = observable ? prev : observablenext // works geat for things like window.addEventListener, etc. /*value: [ 1 ]prev: undefinedvalue: [ 3 ]prev: [ 1 ]value: [ 2 ]prev: [ 1, 3 ]result [ 1, 3, 2 ][SagaObservable]: Publish Received after Cancellation undefined [ 4 ]*/