stream-chain
DefinitelyTyped icon, indicating that this package has TypeScript declarations provided by the separate @types/stream-chain package

2.2.5 • Public • Published

stream-chain NPM version

stream-chain creates a chain of streams out of regular functions, asynchronous functions, generator functions, and existing streams, while properly handling backpressure. The resulting chain is represented as a Duplex stream, which can be combined with other streams the usual way. It eliminates a boilerplate helping to concentrate on functionality without losing the performance especially make it easy to build object mode data processing pipelines.

Originally stream-chain was used internally with stream-fork and stream-json to create flexible data processing pipelines.

stream-chain is a lightweight, no-dependencies micro-package. It is distributed under New BSD license.

Intro

const Chain = require('stream-chain');

const fs = require('fs');
const zlib = require('zlib');
const {Transform} = require('stream');

// the chain will work on a stream of number objects
const chain = new Chain([
  // transforms a value
  x => x * x,
  // returns several values
  x => [x - 1, x, x + 1],
  // waits for an asynchronous operation
  async x => await getTotalFromDatabaseByKey(x),
  // returns multiple values with a generator
  function* (x) {
    for (let i = x; i > 0; --i) {
      yield i;
    }
    return 0;
  },
  // filters out even values
  x => x % 2 ? x : null,
  // uses an arbitrary transform stream
  new Transform({
    writableObjectMode: true,
    transform(x, _, callback) {
      // transform to text
      callback(null, x.toString());
    }
  }),
  // compress
  zlib.createGzip()
]);
// log errors
chain.on('error', error => console.log(error));
// use the chain, and save the result to a file
dataSource.pipe(chain).pipe(fs.createWriteStream('output.txt.gz'));

Making processing pipelines appears to be easy: just chain functions one after another, and we are done. Real life pipelines filter objects out and/or produce more objects out of a few ones. On top of that we have to deal with asynchronous operations, while processing or producing data: networking, databases, files, user responses, and so on. Unequal number of values per stage, and unequal throughput of stages introduced problems like backpressure, which requires algorithms implemented by streams.

While a lot of API improvements were made to make streams easy to use, in reality, a lot of boilerplate is required when creating a pipeline. stream-chain eliminates most of it.

Installation

npm i --save stream-chain
# or: yarn add stream-chain

Documentation

Chain, which is returned by require('stream-chain'), is based on Duplex. It chains its dependents in a single pipeline optionally binding error events.

Many details about this package can be discovered by looking at test files located in tests/ and in the source code (index.js).

Constructor: new Chain(fns[, options])

The constructor accepts the following arguments:

  • fns is an array of functions arrays or stream instances.
    • If a value is a function, a Transform stream is created, which calls this function with two parameters: chunk (an object), and an optional encoding. See Node's documentation for more details on those parameters. The function will be called in the context of the created stream.
      • If it is a regular function, it can return:
        • Regular value:
          • (deprecated since 2.1.0) Array of values to pass several or zero values to the next stream as they are.
            // produces no values:
            x => []
            // produces two values:
            x => [x, x + 1]
            // produces one array value:
            x => [[x, x + 1]]
          • Single value.
            • If it is undefined or null, no value shall be passed.
            • Otherwise, the value will be passed to the next stream.
            // produces no values:
            x => null
            x => undefined
            // produces one value:
            x => x
        • Special value:
          • If it is an instance of Promise or "thenable" (an object with a method called then()), it will be waited for. Its result should be a regular value.
            // delays by 0.5s:
            x => new Promise(
              resolve => setTimeout(() => resolve(x), 500))
          • If it is an instance of a generator or "nextable" (an object with a method called next()), it will be iterated according to the generator protocol. The results should be regular values.
            // produces multiple values:
            class Nextable {
              constructor(x) {
                this.x = x;
                this.i = -1;
              }
              next() {
                return {
                  done:  this.i <= 1,
                  value: this.x + this.i++
                };
              }
            }
            x => new Nextable(x)
            next() can return a Promise according to the asynchronous generator protocol.
        • Any thrown exception will be caught and passed to a callback function effectively generating an error event.
          // fails
          x => { throw new Error('Bad!'); }
    • If it is an asynchronous function, it can return a regular value.
      • In essence, it is covered under "special values" as a function that returns a promise.
      // delays by 0.5s:
      async x => {
        await new Promise(resolve => setTimeout(() => resolve(), 500));
        return x;
      }
    • If it is a generator function, each yield should produce a regular value.
      • In essence, it is covered under "special values" as a function that returns a generator object.
      // produces multiple values:
      function* (x) {
        for (let i = -1; i <= 1; ++i) {
          if (i) yield x + i;
        }
        return x;
      }
    • (since 2.2.0) If it is an asynchronous generator function, each yield should produce a regular value.
      • In essence, it is covered under "special values" as a function that returns a generator object.
      // produces multiple values:
      async function* (x) {
        for (let i = -1; i <= 1; ++i) {
          if (i) {
            await new Promise(resolve => setTimeout(() => resolve(), 50));
            yield x + i;
          }
        }
        return x;
      }
    • (since 2.1.0) If a value is an array, it is assumed to be an array of regular functions. Their values are passed in a chain. All values (including null, undefined, and arrays) are allowed and passed without modifications. The last value is a subject to processing defined above for regular functions.
      • Empty arrays are ignored.
      • If any function returns a value produced by Chain.final(value) (see below), it terminates the chain using value as the final value of the chain.
      • This feature bypasses streams. It is implemented for performance reasons.
    • If a value is a valid stream, it is included as is in the pipeline.
      • Transform.
      • Duplex.
      • The very first stream can be Readable.
        • In this case a Chain instance ignores all possible writes to the front, and ends when the first stream ends.
      • The very last stream can be Writable.
        • In this case a Chain instance does not produce any output, and finishes when the last stream finishes.
        • Because 'data' event is not used in this case, the instance resumes itself automatically. Read about it in Node's documentation:
  • options is an optional object detailed in the Node's documentation.
    • If options is not specified, or falsy, it is assumed to be:
      {writableObjectMode: true, readableObjectMode: true}
    • Always make sure that writableObjectMode is the same as the corresponding object mode of the first stream, and readableObjectMode is the same as the corresponding object mode of the last stream.
      • Eventually both these modes can be deduced, but Node does not define the standard way to determine it, so currently it cannot be done reliably.
    • Additionally the following custom properties are recognized:
      • skipEvents is an optional flag. If it is falsy (the default), 'error' events from all streams are forwarded to the created instance. If it is truthy, no event forwarding is made. A user can always do so externally or in a constructor of derived classes.

An instance can be used to attach handlers for stream events.

const chain = new Chain([x => x * x, x => [x - 1, x, x + 1]]);
chain.on('error', error => console.error(error));
dataSource.pipe(chain);

Properties

Following public properties are available:

  • streams is an array of streams created by a constructor. Its values either Transform streams that use corresponding functions from a constructor parameter, or user-provided streams. All streams are piped sequentially starting from the beginning.
  • input is the beginning of the pipeline. Effectively it is the first item of streams.
  • output is the end of the pipeline. Effectively it is the last item of streams.

Generally, a Chain instance should be used to represent a chain:

const chain = new Chain([
  x => x * x,
  x => [x - 1, x, x + 1],
  new Transform({
    writableObjectMode: true,
    transform(chunk, _, callback) {
      callback(null, chunk.toString());
    }
  })
]);
dataSource
  .pipe(chain);
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('output.txt.gz'));

But in some cases input and output provide a better control over how a data processing pipeline should be organized:

chain.output
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('output.txt.gz'));
dataSource.pipe(chain.input);

Please select what style you want to use, and never mix them together with the same object.

Static methods

Following static methods are available:

  • chain(fns[, options) is a helper factory function, which has the same arguments as the constructor and returns a Chain instance.
    const {chain} = require('stream-chain');
    
    // simple
    dataSource
      .pipe(chain([x => x * x, x => [x - 1, x, x + 1]]));
    
    // all inclusive
    chain([
      dataSource,
      x => x * x,
      x => [x - 1, x, x + 1],
      zlib.createGzip(),
      fs.createWriteStream('output.txt.gz')
    ])
  • (since 2.1.0) final(value) is a helper factory function, which can be used in by chained functions (see above the array of functions). It returns a special value, which terminates the chain and uses the passed value as the result of the chain.
    const {chain, final} = require('stream-chain');
    
    // simple
    dataSource
      .pipe(chain([[x => x * x, x => 2 * x + 1]]));
    // faster than [x => x * x, x => 2 * x + 1]
    
    // final
    dataSource
      .pipe(chain([[
        x => x * x,
        x => final(x),
        x => 2 * x + 1
      ]]));
    // the same as [[x => x * x, x => x]]
    // the same as [[x => x * x]]
    // the same as [x => x * x]
    
    // final as a terminator
    dataSource
      .pipe(chain([[
        x => x * x,
        x => final(),
        x => 2 * x + 1
      ]]));
    // produces no values, because the final value is undefined,
    // which is interpreted as "no value shall be passed"
    // see the doc above
    
    // final() as a filter
    dataSource
      .pipe(chain([[
        x => x * x,
        x => x % 2 ? final() : x,
        x => 2 * x + 1
      ]]));
    // only even values are passed, odd values are ignored
    
    // if you want to be really performant...
    const none = final();
    dataSource
      .pipe(chain([[
        x => x * x,
        x => x % 2 ? none : x,
        x => 2 * x + 1
      ]]));
  • (since 2.1.0) many(array) is a helper factory function, which is used to wrap arrays to be interpreted as multiple values returned from a function. At the moment it is redundant: you can use a simple array to indicate that, but a naked array is being deprecated and in future versions it will be passed as is. The thinking is that using many() is better indicates the intention. Additionally, in the future versions it will be used by array of functions (see above).
    const {chain, many} = require('stream-chain');
    
    dataSource
      .pipe(chain([x => many([x, x + 1, x + 2])]));
    // currently the same as [x => [x, x + 1, x + 2]]

Release History

  • 2.2.5 Relaxed the definition of a stream (thx Rich Hodgkins).
  • 2.2.4 Bugfix: wrong const-ness in the async generator branch (thx Patrick Pang).
  • 2.2.3 Technical release. No need to upgrade.
  • 2.2.2 Technical release. No need to upgrade.
  • 2.2.1 Technical release: new symbols namespace, explicit license (thx Keen Yee Liau), added Greenkeeper.
  • 2.2.0 Added utilities: take, takeWhile, skip, skipWhile, fold, scan, Reduce, comp.
  • 2.1.0 Added simple transducers, dropped Node 6.
  • 2.0.3 Added TypeScript typings and the badge.
  • 2.0.2 Workaround for Node 6: use 'finish' event instead of _final().
  • 2.0.1 Improved documentation.
  • 2.0.0 Upgraded to use Duplex instead of EventEmitter as the base.
  • 1.0.3 Improved documentation.
  • 1.0.2 Better README.
  • 1.0.1 Fixed the README.
  • 1.0.0 The initial release.

Readme

Keywords

Package Sidebar

Install

npm i stream-chain

Weekly Downloads

831,514

Version

2.2.5

License

BSD-3-Clause

Unpacked Size

39 kB

Total Files

17

Last publish

Collaborators

  • elazutkin