worker.js 🔧
A library most strongly suited for facilitating data parallelism within a series of orchestrated tasks that run in worker threads. Works with both node.js and the browser (including w/ browserify and webpack) in an abstraction that unifies Worker and Child Process.
Features
- Enables the use of pseudo-Transferable objects in node.js between threads by using shared memory when available (meaning faster messaging with less overhead when passing large objects).
- Serializes messages in node.js using the Structured Clone Algorithm, meaning you can pass actual JavaScript objects (not just JSON as is with process.send) between threads.
- Allows subworkers (i.e., the ability for a worker to spawn its own nested worker) in all enviornments, including a workaround for browsers that do not support it natively.
- Supports transmitting/receiving streams across threads (including special treatment for File and Blob in the browser).
- Compatible with Browserify and webpack
If running on node.js, requires v8.0.0 or higher (for v8 serializer)
Contents
Example
Let's take a long list of words, reverse the letters in each word, and then sort that list. Pretty straightforward. Here's a single-threaded way to do it:
serial.js:
const fs = ; // load a few hundred thousand words into an arraylet a_words = fs; // start the timerconsole; // reverse each word, then sort that listlet a_sorted_words_reversed = a_words ; // write to diskfs;
Parallelize
In this scenario, we can save a bit of time by breaking down the transformation into multiple units, and then dividing the workload among multiple cores.
First, we define tasks in the-worker.js:
const worker = ; const F_SORT_ALPHABETICAL = s_a; worker;
Then, we define how to use those tasks in the-master.js:
const fs = ;const worker = ; // load a few hundred thousand words into an arraylet a_words = fs; // start the timerconsole; // create a group of workers (size defaults to os.cpus().length)let k_group = worker; // processing pipelinek_group // bind data from our list, dividing array evenly among workers dataa_words // send data to workers and push them thru the first transform // as soon as each worker finishes its previous task, forward each result // to a new task in the same thread (keeping data in the same thread) // reduce multiple results into a single one ;
Results
This is just a demonstration to show how to use this library, and the results from my machine here shows the potential benefit of dividing such a task:
serially: 2328.937ms
parallel: 1355.964ms
Intro
Data Parallelism
One form of parallelization is to divide a large set of data across multiple processors by spawning workers that run identical code but are assigned different subsets of the data.
Task Parallelism
Another form of parallelization is to assign a different task to each processor. An effective approach to parallelism can combine both forms together. With worker
, you can use Group to emply both forms cooperatively (i.e., cores perform data parallelism while individually advancing to the next serial task), in order to maximize use of all available processing power.
Pseudo-Datatypes:
Throughout this API document, the following datatypes are used to represent expectations imposed on primitive-datatyped parameters to functions, uses of primitives in class methods, and so forth:
key
- a string used for accessing an arbitrary value in a plain objectpath
- a string that conforms to an expected syntax (e.g., URL, file path, etc.)struct
- an interface for a plain object (i.e., one that has expected key names)hash
- a plain object whose keys are arbitrary (i.e., defined by you, the user)list
- a one-dimensional array containing only elments of the same type/classuint
- a non-negative integerany
- any object or primitive data type that are serializable via the Structured Clone Algorithm
API Documentation
- Factory
- Worker
- Pool
- Group
- WorkerOptions
- EventHash
- Manifest
- Streams
- TaskHandler
- SharedMemory
- DivisionStrategies
Factory
The module's main export is the Factory.
const worker = require('worker');
Constructors:
worker.spawn(source: path[, options:
WorkerOptions
)
-- spawn a single worker at the givenpath
. Additional options can be specified inoptions
.- returns a new Worker
- example:let k_worker = worker;
worker.pool(source: path[, limit: int][, options:
WorkerOptions
)
-- create a pool that will spawn up tolimit
workers (defaults to number of cores, i.e.,navigator.hardwareConcurrency
oros.cpus().length
). Iflimit
is negative, it indicates how many cores to attempt to reserve (such asos.cpus().length -1
), but if there are not enough cores, then the final number of workers in the pool will always end up greater than or equal to1
. Each worker will be spawned from the same source. Additional options for spawning the workers can be specified inoptions
.- returns a new Pool
- example:let k_pool = worker;
worker.group(source: path[, count: int][, options:
WorkerOptions
)
-- create a group (i.e., a cooperative pool) that will spawn up tocount
workers (defaults to number of cores, i.e.,navigator.hardwareConcurrency
oros.cpus().length
). Ifcount
is negative, it indicates how many cores to attempt to reserve (such asos.cpus().length -1
), but if there are not enough cores, then the final number of workers in the group will always end up greater than or equal to1
. Each worker will be spawned from the same source. Additional options for spawning the workers can be specified inoptions
.- returns a new Group
- example:let k_group = worker;
worker.manifest(args: Array<any>[, transfers: list<Paths>])
-- create an object that encapsulates the serializable objects inargs
, optionally declaring a list of those objects that are (a) instances of stream or (b) Transferable (for the browser) or SharedMemory (for node.js). Iftransfers
is omitted ortrue
, then each item inargs
will be exhaustively searched to find all streams/Transferable/SharedMemory objects. Providing a list oftransfers
spares the extra computation.- returns a new Manifest
- example:let h_album =name: 'pics'images:800*6001024*768;let km_args = worker;worker;
Methods:
worker.dedicated(tasks: hash{name =>
TaskHandler
})
-- declare the current thread as a dedicated worker while passing a hash of tasks that associates a task'sname
to its corresponding TaskHandler.- returns
undefined
- example:// i-am-a-worker.jsworker;
- returns
worker.globals([scope: Object])
-- get and optionally set the identifiers for SharedMemory, such asUint8ArrayS
.- returns
scope
, or a new struct - example:worker; // or `window`, e.g.let at_test = ;
- returns
worker.merge_sorted(left: list<any>, right: list<any>[, sort: callback])
-- helper function for merging two sorted lists; useful on the worker side.- returns a
list<any>
that is the sorted combination ofleft
andright
- example:
- See the main example.
- returns a
class Worker
An abstraction of a WebWorker or ChildProcess. Create an instance by using the Factory method worker.spawn()
.
Methods:
.run(taskName: string[, args: Array<any> |
Manifest
[, events:
EventHash
])
-- run the task given bytaskName
on the worker, optionally passingargs
andevents
. For understanding whether or not you need to create a Manifest object, see the Manifest documentation.- returns a new Promise
- example:{let k_worker = worker;// example of a simple call with an arg, using await to get return valuelet x_result_1 = await k_worker;// example with event callbacksawait k_worker;}
.kill([signal='SIGTERM': string | uint])
-- terminate the worker, optionally sending a killsignal
to the child process if running on node.js- returns a new Promise
- example:k_worker;
class Pool
A pool of Workers for simple task parallelism. Create an instance by using the Factory method worker.pool()
.
Methods:
.run(taskName: string[, args: Array<any> |
Manifest
[, events:
EventHash
])
-- pull a single Worker out of the pool and assign it this task, or queue this task if all workers in the pool are busy.- returns a new Promise
.start()
-- starts a new point in the queue from which to wait until all further queued tasks will complete- returns
undefined
- returns
.stop()
-- wait until all previously queued tasks (starting at the start point and ending here) have completed. This also implicitly calls.start()
, resetting the start point to here.- returns a new Promise
.kill([signal='SIGTERM': string | uint])
-- terminate all workers in the pool, optionally sending a killsignal
to the child process if running on node.js.- returns a new Promise
Example:
let k_pool = worker; // start a task on one workerk_pool; // a series of tasks is about to be queuedk_poolstart; // start several more tasks as workers become availablea_downloads; // as soon as the last download is done, start uploadingk_pool;
class Group
A group is a cooperative pool of Workers that are spawned from the same source. Create an instance of this class by using the Factory method worker.group()
.
Methods:
.data(items: Array[, strategy: DivisionStrategy])
-- divideitems
into multiple subsets using the EqualDivisionStrategy by default, or by specifying a differentstrategy
.- returns a new ArmedGroup
.use(subsets: list<any>)
-- assign each item insubsets
to a worker. The length ofsubsets
must be less than or equal to the number of workers in this group.- returns a new ArmedGroup
.wait
-- declare an event listener, or a list of events that should be triggered consequently, for the given event(s)- ...
(lock|s: string|list<string>, unlocked: callback)
-- callsunlocked
once the event namedlock
, or each and every event listed inlocks
, is triggered. - ...
(lock|s: string|list<string>, dependency|ies: string|list<string>)
-- triggers the event nameddependency
, or each and every event listed independencies
, once the event namedlock
, or each and every event listed inlocks
, is triggered. - returns
this
- ...
.unlock
-- triggers any callbacks that are currently waiting for the given event(s), or as soon as they attach a listener. In other words, event binding via.wait()
can occur before or after.unlock()
and the results will be the same.- ...
(lock: string)
-- triggers the event namedlock
. - ...
(locks: list<string>)
-- triggers each event listed inlocks
. - returns
this
- ...
class ArmedGroup
A Group that has data attached but has not yet been assigned any tasks.
Methods:
.map(taskName: string[, args: Array<any> |
Manifest
[, events:
EventHash
]])
-- dispatch workers to run the task given bytaskName
, on the currently binded data, in parallel. The first argument to each task's call will be its corresponding subset, followed byargs
. For understanding whether or not you need to create a Manifest object, see the Manifest documentation.- returns a new ActiveGroup
- example:let a_sequence = 1 2 3 4;workerdataa_sequence;
class ActiveGroup
A Group that has data attached and has been assigned at least one task.
Methods:
.thru(taskName: string[, args: Array<any> |
Manifest
[, events:
EventHash
]])
-- rather than passing the result back to the master thread, keep each worker's result data in their own thread and simply forward it to another task. For understanding whether or not you need to create a Manifest object, see the Manifest documentation.- returns a new ActiveGroup
.each(each:
TaskResultCallback
[, then: callback(error=null)])
-- handle each task result as soon as it completes, callingeach
for each subset of data whether or not they happen to be ready in order. If an error is thrown by one of the workers, or once all tasks complete,then
will be called.- returns a new ArmedGroup
.series(each:
TaskResultCallback
[, then: callback(error=nul)])
-- handle each task result in order, callingeach
for each subset of data once it has been processed by the preceeding task. If an error is thrown by one of the workers, or once all tasks complete,then
will be called.- returns a new ArmedGroup
.end([then: callback(error)])
-- once all the previous tasks end, callthen
if it is given. This essentially ignores the results returned by the workers. The returned Promise resolves after the last worker has finished and afterawait then()
if it is given.- returns a new Promise
.reduce(taskName: string[, args: Array<any> |
Manifest
[, events:
EventHash
]])
-- merge adjacent task results until there is one single result remaining. For understanding whether or not you need to create a Manifest object, see the Manifest documentation.- returns a new Promise
callback TaskResultCallback
A function to implement on the 'master' side that gets called when a task successfully completes along with the result
it returned. As indicated by the signature, this can optionally be an async function which returns a Promise.
Signature: [async] function(result: any, subsetIndex: uint)
Parameters:
0. result: any
-- what the worker returned
subsetIndex: uint
-- the index (from 0 to # workers - 1) of the subset that this result derives from
Returning one of the following will carry the result downstream:
- An
instanceof
a Promise that resolves with something that is not aninstanceof
Error and is notundefined
. - Anything that is not
undefined
Returning one of the following will take whatever action is defined by the task stream for handling errors:
interface WorkerOptions
A struct that specifies options for spawning a worker.
Keys for Worker:
Keys for ChildProcess:
args: Array<string>
-- Arguments to append to theChildProcess#spawn
args list for the script.env: hash
-- Extend/overwrite a clone of the currentprocess.env
object before sending it to the worker.exec: string
-- Defaults toprocess.execPath
.cwd: string
-- Defaults toprocess.cwd()
.node_args: Array<string>
-- Arguments to inject in theChildProcess#spawn
args list for the executable (e.g.,'--max-old-space-size=8192'
)inspect: struct
-- Enables debugger on the spawned worker(s)
interface EventHash
A hash that binds callback listener functions to arbitrary event names, which can be emitted by the worker.
Signature: [eventName] => callback([subsetIndex: uint, ]...arguments: any)
Arguments:
0. subsetIndex: uint
-- the index (from 0 to # workers - 1) of the subset that this event was emitted from (only present for instances of Group).
rest. arguments: any
-- argument data emitted by the worker.
Example:
worker;
class Manifest
An array of objects to transmit/receive between master and worker threads such that special objects are handled properly, i.e., such that streams can be transmitted across threads, Transferable objects can be handled by the browser, or that SharedMemory objects can be exchanged over IPC. Create an instance by using Factory's worker.manifest()
.
For convenience, any method that accepts a Manifest argument can also accept an Array<any>
. In this context, the Array must not contain any special objects below one traversable depth. In other words, so long as you are passing special objects at the top level of the Array, or no special objects at all, then you do not need to create a Manifest.
Example:
// without explicitly creating a Manifest:worker; // with Manifest, we can nest special objectsworker...
Streams
When working in node.js, instances of ReadableStream and WritableStream can be passed between threads thru worker's standard messaging interface. Each event and method call on such streams are transmitted across threads, so keep in mind the additional overhead costs this may incur when streaming data. See Manifest for an example.
When deploying in the browser, the same rules apply to streams as with node.js since a reliable stream module can be used by both parties.
function TaskHandler
A function to implement on the 'worker' side that accepts an input subset as its first argument and any user-defined values for the rest of its arguments.
Signature: [async] function(subset: Array[, ...args])
Must return one of the following:
- An
instanceof
a Response. - An
instanceof
aPromise
, which itself resolves to one of these options. - Anything supported by the Structured Clone Algorithm.
Example:
worker;
Each time a task handler function is called, its this
will have the following fields:
-
Properties:
.events
:hash{name => 1}
-- a 'simple set' of events that the user requests to be notified about. Useful for determining if a certain event even needs to be emitted.
-
Methods:
-
.put(key: string, data: any)
- stores
data
under the givenkey
to a hash store that will be available for future tasks in the current pipeline running on the same thread. -
Note: this hash store is safe to use even if the current thread is reassigned to repeat the same task on a different subset; i.e., it protects against task-level collisions.
- stores
-
.get(key: string)
- retrieves data from the previous task(s) in the current thread pipeline.
- returns
data
set by the user during a call to.put
-
.emit(eventName: string[, ...args])
- emits the event given by
eventName
along with the givenargs
by sending a message to the current worker's master thread.
Note: each of the
args
must be serializable by the Structured Clone Algorithm.- example:// worker.jsworker;// master.js (using single worker)k_worker;// -- or --// master.js (using worker group)k_groupdataa_codes;
- emits the event given by
-
SharedMemory
In node.js, this library spawns a new process for each worker. Communication between processes (IPC) is normally done via pipes, which is a copy-on-write operation requiring outgoing data to be duplicated in memory. To alleviate the overhead when transferring larger objects between processes, you can allocate ArrayBuffers and TypedArrays in shared memory space before filling them with data.
The following classes are provided to ease the process of creating shared memory. They behave the same as their corresponding TypedArray constructors:
ArrayBufferS
Int8ArrayS
Uint8ArrayS
Uint8ClampedArrayS
Int16ArrayS
Uint16ArrayS
Int32ArrayS
Uint32ArrayS
Float32ArrayS
Float64ArrayS
In the browser, each of these classes will invoke the SharedArrayBuffer constructor to create shared memory.
In node.js, each of these classes will attempt to create shared memory on the operating system (right now only support for systems that support POSIX shm and mmap).
Alternatively, you may wish to transfer objects in the browser rather than creating shared memory. In node.js however, IPC still requires shared memory. For this scenario, you can use the transfer-if-able TypedArray constructors:
ArrayBufferT
Int8ArrayT
Uint8ArrayT
Uint8ClampedArrayT
Int16ArrayT
Uint16ArrayT
Int32ArrayT
Uint32ArrayT
Float32ArrayT
Float64ArrayT
DivisionStrategies
A division strategy is an algorithm that divides a list of elements into multiple subsets, so as to share the load among workers in a data parallelism paradigm.
EqualDivisionStrategy
A default division strategy that will attempt to divide the input data into equal subsets over the number of workers in the current Group. Inputs that are not evenly divisible by the number of workers in their Group will result in slightly larger subsets nearer the end, increasing the liklihood that the workers assigned the first subsets finish before those assigned the last subsets; an outcome intended to favor responses that process results based on their order within the original input data.
Building with browserify or webpack for the browser
This module ships ready-to-go with browserify. Just npm install worker
in your project and require normally.
For webpack however, you will need to add the following to your webpack.config.js
:
moduleexports = resolve: aliasFields: 'browser' ;
This will let webpack know to use the browser
key/value pair of worker's package.json
file.