rxjs-etl

2.0.1 • Public • Published

RxJS-ETL

RxJS-ETL is a modular platform that employs RxJS observables, allowing developers to build stream-based ETL (Extract, Transform, Load) pipelines complete with buffering, bulk-insertions, notifications, and task dependencies.

Badges

NPM Version Build Status semantic-release License: MIT


Installation

npm install rxjs-etl

Usage

Require the RxJS-ETL library in the desired file to make it accessible.

const {Etl, extract, load} = require('rxjs-etl');

Sample configuration .csv -> mongodb

    const task1 = new Etl()
        .addExtractors(extract.fromCSV, 'SAMPLE_DATA.csv')
        .addTransformers([transform1, transform2 ])
        .addLoaders(load.toMongoDB, mongoURI, collectionName)
        .combine()
        .addEmailNotification(email)
        .addTextNotification(text)
        .addSchedule('0 * * * * *')
        .next(task2)
        .start()

Table of Contents


Extract

addExtractors(extractorFunction, connectStrOrFilePath, collectionOrTableName)

This method accepts one of the extractor helper methods as an argument and stores it to the Etl state. The second parameter is either the file path or connection URI, depending on the data source. The third paramenter is optional and consists of either the collection name or table name if extracting from a Mongo or Postgres database.

The following helper methods are available for use with addExtractors:

extract.fromCSV(filePath)

This method imports a CSV file from the file system, parses data into JSON, and wraps the data in an observable. The method takes a file path as its parameter.

Example

.addExtractors(extract.fromCSV, '~/EXAMPLE_DATA.csv')
extract.fromJSON(filePath)

This method imports a JSON file from the file system and wraps the data in an observable. The method takes a file path as its parameter.

Example

.addExtractors(extract.fromJSON, '~/EXAMPLE_DATA.json')
extract.fromXML(filePath)

This method imports an XML file from the file system, parses the data into JSON, and wraps it in an observable. The method takes a file path as its parameter.

Example

.addExtractors(extract.fromXML, '~/EXAMPLE_DATA.xml')
extract.fromMongoDB(connectionString, collectionOrTableName)

This method connects to a Mongo database, imports the data from the desired collection, parses the data into JSON, and wraps it in an observable. The method takes a connection URI and collection name as its parameters.

Example

.addExtractors(extract.fromMongoDB, process.env.MONGO_URI, collectionOrTableName)
extract.fromPostgres(connectionString, collectionOrTableName)

This method connects to a Postgres database, imports the data from the desired table, parses the data into JSON, and wraps it in an observable. The method takes a connection URI and table name as its parameters.

Example

.addExtractors(extract.fromPostgres, process.env.POSTGRES_URI, collectionOrTableName)

Transform

addTransformers(transformArray)

This method accepts an array of function supplied by the users. The addTransformers method will apply each function to the data stream.


Load

addLoaders(loaderFunction, connectStrOrFilePath, collectionOrTableName)

This method accepts one of the loader helper methods as an argument and stores it to the Etl state. The second parameter is either the file path or connection URI depending on the data target. The third parameter is optional and consists of either the collection name or table name if loading to a Mongo or Postgres database.

The following helper methods are available for use with addLoaders:

load.toCSV(filePath)

This method converts the transformed data to CSV and writes the data to the file system. The method takes a file path (with file name included) as its parameter.

Example

.addExtractors(extract.toCSV, '~/EXAMPLE_DATA.csv')
load.toJSON(filePath)

This method converts the transformed data to JSON and writes the data to the file system. The method takes a file path (with file name included) as its parameter.

Example

.addExtractors(extract.toJSON, '~/EXAMPLE_DATA.json')
load.toXML(filePath)

This method converts the transformed data to XML and writes the data to the file system. The method takes a file path (with file name included) as its parameter.

Example

.addExtractors(extract.toXML, '~/EXAMPLE_DATA.xml')
load.toMongoDB(data, connectionString, collectionOrTableName)

This method connects to a Mongo database, converts the transformed data, and streams it to the desired Mongo collection. The method takes the transformed data, a connection URI, and collection name as its parameters.

Example

.addExtractors(load.toMongoDB, process.env.MONGO_URI, collectionOrTableName)
load.toPostgres(data,connectionString, collectionOrTableName)

This method connects to a Postgres database, converts the transformed data, and streams it to the desired Postgres table. The method takes the transformed data, a connection URI, and collection name as its parameters.

Example

.addExtractors(load.toPostgres, process.env.MONGO_URI, collectionOrTableName)

Notifications

addEmailNotification(email)

This optional method allows users to receive an email notification upon successful completion of the ETL task. It takes an object as an argument and will be formatted like this:

const emailMessage = {
    to: 'to-email-address',
    from: 'from-email-address',
    subject: 'Your subject here',
    text: 'Your body message here',
    html: '<strong>Your html here</strong>',
};

Example

.addEmailNotification(emailMessage)

addTextNotification(textMessage)

This optional method allows users to receive a text notification upon successful completion of the ETL task. It takes an object as an argument and will be formatted like this:

const textMessage = {
        to: 'to-phone-number',
        body: 'YYour body message here',
    }

Example

.addTextNotification(emailMessage)

NOTE: You'll also need to create a .env on the root of the project. It should include the following:

# Twilio account info
TWILIO_ACCOUNT_SID=
TWILIO_AUTH_TOKEN=
TWILIO_PHONE_NUMBER=

# SendGrid account info
SENDGRID_API_KEY= 

Scheduling

.addSchedule(cronString)

This optional method allows users to schedule a task to be executed repeatedly using cron. It takes a cron string as a parameter.

Example

.addSchedule('0 * * * * *')

Chaining

.next(etlInstance)

This optional method allows users to trigger the execution of another ETL task upon successful completion of the current task. It takes an Etl object as an argument.

Example

.next(task2)

Misc

.combine()

This method combines the extractor, transformers, and loader, piping each segment to one another and wrapping everything in an observable.

.start()

This method subscribes to the task observable stored in Etl's state, triggering the ETL process to begin.

Readme

Keywords

Package Sidebar

Install

npm i rxjs-etl

Weekly Downloads

1

Version

2.0.1

License

MIT

Unpacked Size

55.7 kB

Total Files

11

Last publish

Collaborators

  • teamvelocirabbit