@fine-js/channels

0.0.2 • Public • Published

Channels.js

Library for asynchronous in-process communication based on Clojure's excellent core.async. Read through introductory blog post or watch Strange Loop presentation for more details and inspiration.

It's queues, essentially, except there are Promises on top because JavaScript.

npm install @fine-js/channels

Supported Platforms

Library's source code is plain JavaScript with no runtime dependencies that works in Node.js and modern browsers.

Node.js

After installing a dependency, require the module as usual:

const {chan} = require('@fine-js/channels')
console.dir(chan())

Browsers

A build for browsers is available as a single-file download and via UNPKG (docs). Here is an example of embedding version 0.0.2 in particular:

<script src="https://unpkg.com/@fine-js/channels@0.0.2/browser.js"
        integrity="sha384-aLUgfMcOf6P0qxZ4k0e084VdlxfruOqU0zXhYBSZS28Y07u7Zoo1NBbYnNpNynck"
        crossorigin="anonymous"></script>

<script>
  const {chan} = window.finejs.channels
  console.dir(chan())
</script>

Alternatively, you can build your own version with Browserify or any other bundler, as well as serve require.resolve('@fine-js/channels/browser.js') from your server directly.

Overview

TODO…

Terminology

Channel is a plain object with two operations:

  • put(ch, val) puts a value onto the channel
  • take(ch) takes a value from the channel

Port is a description of either operation:

  • ch is a port representing taking a value from channel ch
  • [ch, val] is a port representing putting val onto the channel ch

Each channel can be backed by a buffer of given capacity.

Parking is creating and returning a promise that will possibly never resolve, or resolve at indefinite time in the future. This is in contrast to immediately which is used in this document to roughly mean guaranteed resolution in the near future (often, after a couple of microtask ticks).

API

Basics
  • chan() creates a channel
  • put() puts a value onto the channel
  • take() takes a value from the channel
  • close() closes a channel
Core
  • alt() runs at most one of the given operations returning result of handler
  • alts() runs at most one of the given ports
  • poll() takes a value from channel, when immediately possible
  • offer() puts a value onto channel, when immediately possible
  • timeout() creates a self-closing channel
Buffers
JavaScript Extras

chan()

Creates a channel with an optional buffer:

  • chan() unbuffered channel
  • chan(capacity) blocking buffer of given capacity
  • chan(buffer) passed buffer
chan()
chan(10)
chan(sliding(1))

put()

Puts a value onto the channel. null or undefined are not allowed. Will park if no buffer space is available. Resolves to true unless channel is already closed.

put(ch, 42)
ch.put(42)

take()

Takes a value from the channel. Will park if nothing is available. Resolves to a value taken or null if channel is already closed.

take(ch)
ch.take()

close()

Closes given channel. The channel will no longer accept any puts (they will be ignored). Data in the channel remains available for taking, until exhausted, after which takes will return null.

If there are any pending takes, they will be dispatched with null. Any parked puts will remain so until a taker releases them. Closing a closed channel is a no-op. Returns null.

close(ch)
ch.close()

timeout()

Returns a channel that will close after given number of millisecods.

timeout(100)

alt()

Executes one of several channel operations via alts() and passess its result to a handler. Accepts a list of operations follwed by corresponding handler and options to pass to alts(), where:

  • each operation is either
    • single channel to take from
    • array of ports
  • each handler is either
    • a function accepting (val, channel)
    • anything else

Each channel may only appear once.

Resolves to handler's result if it's a function, handler itself otherwise.

alt([
  a,
  (val) => console.log('read %o from a', val),

  [b, c],
  (val, ch) => console.log('read %o from %o', val, ch),

  [[ch, 42]],
  'wrote 42 to ch',
])

alt([
  [[worker1, task], [worker2, task], [worker3, task]],
  `task ${task.id} queued`,
], {default: () => `try again later in ${waittime()} seconds`})

alt([
  results,
  (val) => ({status: 200, text: val}),

  timeout(150),
  {status: 504},
], {priority: true})

alts()

Completes at most one of several ports.

Unless the priority option is true, if more than one port is ready, a non-deterministic choice will be made. If no operation is ready and a default value is supplied, [default-val, alts.default] will be returned. Otherwise alts will park until the first operation to become ready completes.

Resolves to [val port] of the completed operation, where val is the value taken for takes, and a boolean (true unless already closed, as per ch.put) for puts.

alts([ch1, ch2, ch3])

alts([
  [worker1, task],
  [worker2, task],
  [worker3, task],
], {default: 'try again later'})

alts([
  results,
  timeout(150),
], {priority: true})

poll()

Takes a value from the channel, when immediately possible. Resolves a value taken if successful, null otherwise.

poll(ch)

offer()

Puts a value into the channel, when immediately possible. Resolves to true if offer succeeds.

offer(ch, 42)

unbuffered()

Giving this buffer to a channel will make it act as a synchronisation point between readers and writers: puts are parked until the value is taken by consumer.

This is a buffer channel will get when you call chan() or chan(0).

buffer()

Creates a blocking buffer:

  • buffer() with capacity 1
  • buffer(capacity) with given capacity.

When full, puts will be parked.

ch(16)
ch(buffer(16))

dropping()

Creates a dropping non-blocking buffer:

  • dropping() with capacity 1
  • dropping(capacity) with given capacity

When full, puts will complete puts, but values discarded.

ch(dropping(3000))

sliding()

Creates a sliding non-blocking buffer:

  • sliding() with capacity 1
  • sliding(capacity) with given capacity

When full, puts will complete puts, thier values buffered, but oldest elements in buffer will be dropped.

ch(sliding(1))

ch[Symbol.asyncIterator]()

Returns Async Iterator over values being put onto the channel. Not meant to be used directly, instead:

// Simply consuming everything until the channel closes.
for await (const message of ch)
  console.log(message)

// Note that the loop's body will NOT see every single message being put onto the channel,
// since, effectively, this calls `take()` on each iteration and somewhat equivalent to:
while (!closed(ch))
  console.log(await ch.take())

// Reading until we see a sentinel value
for await (const byte of ch)
  if (byte === 0)
    break;

Read more over at MDN:

Package Sidebar

Install

npm i @fine-js/channels

Weekly Downloads

2

Version

0.0.2

License

MIT

Unpacked Size

29.7 kB

Total Files

12

Last publish

Collaborators

  • elmigranto