stream-wrench

1.1.0 • Public • Published

🔧 Stream Wrench

Toolbox for stream manipulation.

StreamChopper

Implements a NodeJS Readable stream and consumes all written data.

API

const chopper = new StreamChopper();

Method: readUntil(delimiter)

const data = await chopper.readUntil(delimiter);

Reads from the input stream until the String or Buffer delimiter is found and returns all data that has been received until the occurrence of delimiter without the delimiter itself. The delimiter is also consumed.

Method: seekUntil(delimiter)

await chopper.seekUntil(delimiter);

Like readUntil() but drops all read data.

Example

Read lines from a text file:

import StreamChopper from 'stream-wrench/stream-chopper';
import {createReadStream} from 'node:fs';

// Open a text file generated by this shell call:
// echo -e "Knock knock!\nNeutrino\nHow's there?" >joke.txt
const src = createReadStream('./joke.txt');

const chopper = new StreamChopper();

src.pipe(chopper);

// Read the first line. Outputs: "Knock knock!"
const firstLine = await chopper.readUntil('\n');
console.log(firstLine.toString());

// Skip the second line.
await chopper.seekUntil('\n');

// Read the third line. Outputs: "How's there?"
const thirdLine = await chopper.readUntil('\n');
console.log(thirdLine.toString());

CircuitBreaker

API

Implements a NodeJS Transform stream which is transparent until a trigger sequence is found. Once it triggered it destroys itself.

const cb = new CircuitBreaker();

Method: setTrigger(sequences)

cb.setTrigger(sequences);

Sets the trigger sequence. Previously set sequences are overwritten.

sequences may be a Sequence or an Array of Sequence.

Sequence is a Buffer or a String or an Object with the following keys:

  • seq: Buffer or String of the trigger sequence
  • err: String with the error message that will be used if the circuit breaker triggers by the given seq

Method: protectPromise(promise)

const result = await cb.protectPromise(promise);

If the circuit breaker hasn't triggered, the settled state of promise will be transparently propagated.

Once the circuit breaker has triggered, protectPromise() will always reject.

Example

import CircuitBreaker from 'stream-wrench/circuit-breaker';

// Setup with trigger sequence 'STOP'.
// All passed data is written to console output.
// Once it triggers, it'll write the error message to the console.
const cb = new CircuitBreaker();
cb.setTrigger('STOP');
cb.on('error', (e) => console.log(e.message));
cb.pipe(process.stdout);

// Can be used to guard promises.
// It'll handle promises transparently if the circuit breaker hasn't triggered, yet.
// Outputs: 42
cb.protectPromise(Promise.resolve(42)).then(console.log);
// Outputs: 13
cb.protectPromise(Promise.reject(13)).catch(console.log);

// Prints 'Hello world!' to stdout
cb.write('Hello world!\n');
// Prints 'Found sequence: STOP' to stdout
cb.write('STOP');

// Promises can't pass the circuit breaker anymore
// Outputs: Found sequence: 'STOP'
cb.protectPromise(Promise.resolve(42)).catch((e) => console.log(e.message));
// Outputs: Found sequence: 'STOP'
cb.protectPromise(Promise.reject(13)).catch((e) => console.log(e.message));

Readme

Keywords

Package Sidebar

Install

npm i stream-wrench

Weekly Downloads

3

Version

1.1.0

License

MIT

Unpacked Size

16.4 kB

Total Files

12

Last publish

Collaborators

  • jue89