
0.0.8 • Public • Published


A plugin for RoadieJS

A configurable ETL pipeline, based on Node.js streams.




Registers a new import stream, and goes on to start importing.


POST /streams

  "namespace": "roadietest",
  "blueprintName": "importPlanets",
  "blueprintVersion": 1,
  "localVersion": 0,
  "importStream": "planetCsv",
  "missingAction": "error",
  "source": {
    "type": "file",
    "options": {
      "paths": "./test/planets/import_files/advanced.csv"
Name Notes
namespace Namespace of the blueprint which contains the import element.
blueprintName Name of the blueprint which contains the import element.
blueprintVersion Version number of the blueprint which contains the import element.
localVersion Local version number of the blueprint which contains the import element.
importStream The id of an importStream element, that's defined in the identified blueprint.
missingAction Controls the behaviour if trying to update/delete a document that doesn't exist. Valid values are warning, error.
source An object to configure the source of the import.

Data can be streamed from multiple sources. The source object is therefore mandatory, and has two common keys:

Name Notes
type The type of import (e.g. file).
options An object containing config specific to the type of import (see below).


Imports data from files stored on the local file system.

  "type": "file",
  "options": {
    "paths": "./test/school/import_files/full_school_dump.csv"
Name Notes
paths Defines which files should be loaded. Supports paths to single files, * wildcards, glob-style ** (for directory recursion) and arrays of multiple strings.

Status 201

  "_id": "557c3834f0f1c14e25220e8b",
  "_created": "2015-06-13T14:03:32.571Z",
  "namespace": "roadietest",
  "blueprintName": "importPlanets",
  "blueprintVersion": 1,
  "blueprintLocalVersion": 0,
  "importStream": "planetCsv",
  "totalSize": 1147,
  "status": "starting",
  "processedSize": 0,
  "count": 0,
  "warnings": 0,
  "failures": 0,
  "notDealtWith": 0
Name Notes
_id The unique database-generated id for the import process.
_created A timestamp of when the import was registered.
namespace Namespace of the blueprint, as supplied in the request.
blueprintName Name of the blueprint, as supplied in the request.
blueprintVersion Version of the blueprint, as supplied in the request.
blueprintLocalVersion Local version of the blueprint, as supplied in the request.
importStream The id of an importStream element, as supplied in the request.
totalSize The number of units the import is estimated to be. Most likely number of bytes.
status Current status of the import (expect starting).
processedSize How many units have been imported so far (expect 0 at this point).
count Total count of documents which have been processed (regardless of whether they succeeded or failed).
warnings Number of documents, within the overall count, that have raised a warning.
failures Number of documents, within the overall count, that have failed.
notDealtWith Number of documents, within the overall count, that did not match any record pattern.


Get the latest status of a flow.


GET /streams/:id

Name Notes
id The id that uniquely identifies an import (e.g. the _id value returned from createImportStream).

Status 200

  "_id": "557c6f1487a62fff374fa2ed",
  "_created": "2015-06-13T17:57:40.707Z",
  "namespace": "roadietest",
  "blueprintName": "importSchools",
  "blueprintVersion": 1,
  "blueprintLocalVersion": 0,
  "importStream": "studentCsv",
  "totalSize": 364,
  "status": "succeeded",
  "finished": "2015-06-13T17:57:40.820Z",
  "processedSize": 364,
  "count": 6,
  "warnings": 0,
  "failures": 0,
Name Notes
_id The unique database-generated id for the import process (e.g. the id provided as a parameter as part of the request).
_created A timestamp of when the import was registered.
namespace Namespace of the blueprint, as supplied in the request.
blueprintName Name of the blueprint, as supplied in the request.
blueprintVersion Version of the blueprint, as supplied in the request.
blueprintLocalVersion Local version of the blueprint, as supplied in the request.
importStream The id of an importStream element, as supplied in the request.
totalSize The number of units the import is estimated to be. Most likely number of bytes.
status Current status of the import, valid values are starting, succeeding, warning, failing, warned, failed, succeeded.
finished Timestamp of when the flow finished (not present if it's still running).
processedSize How many units have been imported so far
count Total count of documents which have been processed (regardless of whether they succeeded or failed).
warnings Number of documents, within the overall count, that have raised a warning.
failures Number of documents, within the overall count, that have failed.
notDealtWith Number of documents, within the overall count, that did not match any record pattern.


Returns an array of messages that have been generated by the specified flow (ordered-by creation timestamp ascending).


GET /streams/:id/messages

Name Notes
id The id that uniquely identifies an import (e.g. the _id value returned from createImportStream).

Status 200

  { "_id": "557c7680f09749d93b88619f",
    "transactionId": "557c7680f09749d93b88619a",
    "schemaName": "students",
    "namespace": "roadietest",
    "blueprintName": "importSchools",
    "blueprintVersion": 1,
    "blueprintLocalVersion": 0,
    "type": "warning",
    "name": "noDoc",
    "message": "Unable to find document"
Name Notes
_id A unique value to identify the message.
transactionId The unique database-generated id for the import process (e.g. the id provided as a parameter as part of the request).
schemaName The id of a schema related to the message.
namespace Namespace of the blueprint responsible for the import.
blueprintName Name of the blueprint responsible for the import.
blueprintVersion Version of the blueprint responsible for the import.
blueprintLocalVersion Local version of the blueprint responsible for the import.
type Type of message: a value from info, warning, error or exception.
name Name (e.g. code) of the message.
message Short message content
body Data to support the message (content specific to the type/name of message)



Registers a new import (e.g. a way of importing data into schemas within the blueprint).

  "id": "planetCsv",
  "element": "importStream",
  "config": {
    "parser": {
      "type": "csv",
      "options": {
        "delimiter": ",",
        "qualifier": "\\"
    "target": {
      "type": "data"
Name Type Notes
parser object An object that should contain a type string (e.g. csv) for identifying a parser, and an options object for configuring the parser.
target object An object that configures a supported target for the import. The object must include a type value to identify a target.

A parser takes the raw data stream from a source (configured via createImportStream) and turns it into a usable object for passing onto an adaptor.


The csv parser expects a source that can provide individual chunks of data (typically a line from a file).

  • Internally, parsing is handled via the csv-parse package.
  • The options defined for the parser are passed through to a csv-parse parser. More information here.

An adaptor takes the output of a parser and maps it to fields in a schema.

  • There's no need to explicitly define an adaptor.
  • If an adaptor hasn't been defined, then an adaptor with the same name as the parser is used.
  • The behaviour of an adaptor depends on its type.


The csv adaptor expects one or more csvRecord elements to be defined as a child element of the importStream element.


A target is the final destination in the import pipeline, and does something with the output of an adaptor.


  • Hooks into roadiejs-data so the object produced out of the adaptor can persisted.


  • Outputs the object produced out of the adaptor to the console.


If an importStream element has a parser of type csv, then one or more csvRecord child elements should be defined for it.

  • The purpose of a csvRecord is to transform the output of a csv parser to a schema/field structure.
  • Multiple csvRecord elements can be configured under an importStream element - as it's possible to 'identify' a suitable schema from the available csv columns.
  • A special csv array will be accessible when evaluating expressions, this reflects the parsed columns from the underlying CSV data.
  "id": "craterRecord",
  "element": "csvRecord",
  "parent": "importStream.planetCsv",
  "config": {
    "schemaId": "planets",
    "recordIdentification": "csv[0]=='crater'",
    "actionIdentification": {
      "post": "csv[1]=='I'",
      "put": "csv[1]=='U'",
      "upsert": "csv[1]=='M'",
      "del": "csv[1]=='D'"
    "paramMap": [
    "data": {
      "title": "csv[3]",
      "diameter": "csv[4]"
Name Type Notes
schemaId string The id of a schema defined within the blueprint that the CSV data will be ultimately persisted..
recordIdentification string Optional. An expression. If it evaluates to true then the config of this csvRecord element will be used to transform the CSV data into a field structure.
actionIdentification object Optional. Maps an action (e.g. post, put, upsert or del) to an expression. If it evaluates to true then that action will be used to persist/delete the transformed data.
paramMap [String] Optional. An array of strings. Maps parameters (starting at docId) of a /data route to the contents of the CSV record. It is therefore possible target sub-docs.
data object Maps a field name to an expression. The result of the expression will then be used as the value for that field.


A simple way to populate with data - useful for supplying reference/lookup data from within a blueprint definition.

  • Ensure a populate element is a child of the a schema element you wish to populate.
  • Schemas will only ever be populated once, and will not be re-asserted every time the blueprint is used
  "id": "statesPopulator",
  "element": "populate",
  "parent": "schema.states",
  "config": {
    "map": [
    "data": [
      ["ALABAMA", "AL", "Montgomery", "Birmingham", 4708708, 52423],
      ["ALASKA", "AK", "Juneau", "Anchorage", 698473, 656425],
      ["ARIZONA", "AZ", "Phoenix", "Phoenix", 6595778, 114006],
      ["ARKANSAS", "AR", "Little Rock", "Little Rock", 2889450, 53182],
      ["CALIFORNIA", "CA", "Sacramento", "Los Angeles", 36961664, 163707],
      ["COLORADO", "CO", "Denver", "Denver", 5024748, 104100],
      ["CONNECTICUT", "CT", "Hartford", "Bridgeport", 3518288, 5544]
Name Type Notes
map [string] An array of strings, each a field name within the schema you wish to populate. The order is important...
data [array] An array of arrays - mimicking a record/field structure. The values of each 'record' should be in the same order as defined in map.





Package Sidebar


npm i roadiejs-import

Weekly Downloads






Last publish


  • timneedham