Streamed
Streamed is an array-like stream. It has methods that are similar to Array ones. In many cases it can be used as a replacement of arrays but with additional reactive features. "Reactive" means you can define a chain ones and stream will process already collected data and future coming data in the same way.
Importing
streamed
can be installed from NPM
❯ npm install --save streamed
Then you can import it using ES syntax
or using CommonJS
var Streamed =
Also it can be imported with the regular <script>
tag. This will expose the Streamed
variable to the global scope
Basic usage
Streams are reactive and can handle values pushed before and after callbacks attached.
var stream = Infinitystreamstreamstream
outputs:
123456
Stream size
The first and the only argument of the Streamed
class is its size (like in Array). It defines how much data will be buffered:
new Streamed(Infinity)
: Infinite size. All data will be stored in the stream instancenew Streamed({Number})
: Only last items of a given number will be saved. Old items will be destroyed.new Streamed()
: No items will be buffered. Any data will be passed down to the stream but will not be stored in the instance.
Examples:
Infinite:
var stream = Infinitystreamstream // logs: 1, 2, 3, 4, 5, 6
Limited:
var stream = 4streamstream // logs: 3, 4, 5, 6
Empty:
var stream = streamstream // no logs. But will log if something is pushed later
Every option above is suitable for different cases. Typically you would like the last pushed value to be buffered. So you can create 1-size stream. Handlers attached any time in the future will handle only the last most relevant value. This makes a stream to act like an observable. See the example of tracking of a geo-position:
var geoPositions = 1geoPositionsgeoPositionsgeoPositions geoPositions //logs: {long: 12.7879993, lat: 35.778993} - only the latest position, because earlier are obsolete
Chain methods
.push(value:Any)
Passes the next value to a stream. If no argument is provided nothing is pushed. Returns a current stream. May have infinite amount of arguments which values will be respectively pushed to the stream.
var stream = streamstreamstream// orstream
.forEach(callback:Function)
Similarly to array, callback
function is called on every item pushed. Only one argument is passed with an item value because streamed items has no indexes. Returns a new stream with the same values. So it can be chained after.
var stream = var iterationStream = stream
Inherits the size of the stream which it is attached to.
.map(callback:Function)
Similarly to array, callback
function is called on every item pushed. Only one argument is passed with an item value because streamed items has no indexes. Returns a new stream with values returned from a callback
. So it can be chained after.
var stream = var types = stream { return typeof value}{ return value} types // we need to push items to `stream` so they can flow to `types`stream
Output:
NUMBERSTRINGUNDEFINEDOBJECT
Inherits the size of the stream which it is attached to.
.filter(callback:Function)
Similarly to array, callback
function is called on every item pushed. Only one argument is passed with an item value because streamed items has no indexes. Returns a new stream with values filtered with a callback
. So it can be chained after.
var stream = var positiveNumbers = stream { return typeof value === 'number'}{ return value > 0} positiveNumbers // we need to push items to `stream` so they can flow to `positiveNumbers`stream
Output:
10Infinity
Inherits the size of the stream which it is attached to.
.buffer(size:Number)
Creates a new stream from the existing one but with the given size. All values will be pushed without changes but buffered with the necessary limit. Returns a new stream.
var stream = // 0 sizevar numbers = stream // set necessary size stream stream // logs:numbers // logs: 3, 4, 5
This method is convenient if you want to change the size of an existing stream producing a new one or if you want to keep some amount of last values in a memory if the original stream size is 0.
.reduce(callback:Function, initialValue)
Similarly to array, callback
function is called on every item pushed. Only one argument is passed with an item value because streamed items has no indexes. Returns a new stream with values returned from a callback
. So it can be chained after.
var numbers = var sums = numbers { return valueA + valueB} // we need to push items to `numbers` so they can flow to `sums`numbers
Output:
1361015212836
The size of the result stream is always reduced to 0
There also an initial value can be passed as a second argument. It will be used as a first argument in the first call of the callback.
var numbers = 4var sums = numbers numbers
Output:
1361014182226
Reducing has an optimization for every of 3 types of streams: unlimited, limited and infinite. If the stream has a size, only the necessary amount of data will be computed, e.g.:
var sums = numbers // pass 10 as an initial value numbers
Output:
1113162025313846
Keep in mind that if the size of the stream is 1 and there is no initial value the callback will be not applied at all to any of values.
.merge(stream:Streamed)
Joins values passed to both streams to a single new stream. Returns a new stream.
var streamA = var streamB = var streamTogether = streamA streamTogether streamAstreamBstreamA
Output:
123
This is not a sequenced stream. So there is no difference between streamA.merge(streamB)
and streamB.merge(streamA)
. The data is pushed as it arrives.
Since this method returns a new stream it may be merged with another one. For example you can merge several streams streamTogether
streamA
The size of the new stream is equal to the sum of the sizes of 2 origin streams. It will completely buffer the values buffered in the both origin streams. E.g.:
var streamA = 2var streamB = 3var streamTogether = streamA console // 5
.pipe(stream:Streamed)
Forwards values from existing stream to another instance of a stream creating a connection between 2 streams. Returns the instance that is passed to the argument.
var streamA = var streamB = streamAstreamBstreamA
Output:
abc
.unpipe(stream:Streamed)
Destroys the connection between "piped" streams. Returns the instance that is passed to the argument.
var streamA = var streamB = streamAstreamBstreamAstreamAstreamA
Output:
a
.unsubscribe()
Removes all piped and chained streams. Call this if you want to stop to handle values in a stream and would like to destroy it. Returns the same stream.
var stream = { return typeof value} stream // attach mapped streamstream // destroy mapped stream and any other handlersstream = undefined // delete reference
Lazy evaluation
By default the stream computation is lazy and the data will start to flow thru chain if there is a side effect in form of one of two:
.forEach()
at the end.pipe()
at the end
For example, console logs in this code will not be shown until forEach()
is not attached
var values = Infinityvar processedValues = values values
This will not output anything as callbacks will not be even called. But as soon as we attach forEach()
or pipe it to another stream, e.g.:
processedValues
it will output
filter 1filter 2filter 3filter 4map 4forEach 14filter 5map 5forEach 15
Looking at logs you may notice that data flows from top to down instead of left to right how it happens in regular Arrays.
We also have used new Streamed(Infinity)
to not lose the data later. If you will not set the size and will not process the data in a moment when it is pushed it will be lost. Some time it is expected but some time it is not. You can manage it passing the necessary size limit in the first argument of the Streamed
class.
Cold and hot streams
If you are familiar with terminology of hot and cold streams it may be more clear to you if we say that
- all
Streamed
instances with undefined size are cold streams by default - all
Streamed
instances with the size greater than 0 are hot streams by default pipe()
andforEach()
methods return hot streamsmap()
,filter()
,reduce()
,merge()
,buffer()
,unpipe()
return cold streams
Inheritance
Inheritance usually is used to provide a custom stream source. This may be UI input events, global events, server-sent events, etc. This example shows how to create a custom stream class of mouse moves in a browser:
{ supersize window } { this } var mouseMoves = 1mouseMoves