Datapumps: Simple ETL for node.js
Overview
Use pumps to import, export, transform or transfer data. A data pump will read from its input stream, array or datapumps Buffer and will write to its output buffers. A pump will finish when all data is consumed from its output buffers. Make a group of pumps to handle complex ETL tasks.
Installation
$ npm install datapumps --save
Usage example: export mongodb to excel
var datapumps = Pump = datapumpsPump MongodbMixin = datapumpsmixinMongodbMixin ExcelWriterMixin = datapumpsmixinExcelWriterMixin pump = ; pump ;
Usage example with more details:
-
First, we create a pump and setup reading from mongodb
var pump = ;pumpMixins extend the functionality of a pump. The MongodbMixin adds
.find()
method which executes a query on the collection specified with.useCollection()
method. The pump will read the query results and controls data flow, i.e. it pauses read when it cannot write excel rows. -
Write data to excel with ExcelWriterMixin:
pumpThe excel workbook, worksheet and header rows are created after adding ExcelWriterMixin to the pump. Each pump has a
.process()
callback that may transform or filter data. The callback is called for every data item of the buffer and should return a promise (we use bluebird library) that fulfills when the data is processed. In this example, the default processing callback (which copies data to the output buffer by default) is overridden with writing rows to the excel worksheet. -
Finally, start the pump and write to console when it's done.
pump;The
.logErrorsToConsole()
will log any error to the console, surprisingly. The pump will start on calling.run()
. It returns a promise that resolves when the pump finished.
Pump
A pump reads data from its input buffer or stream and copies it to the output buffer by default:
datapumps = ;pump =
To access the output buffer, use the .buffer()
method, which returns a Buffer instance:
buffer = pump;buffer = pump; // equivalent with previous as the default buffer // of the pump is called 'output'
Use the .buffers()
method when you need to write data into multiple output buffers:
ticketsPump ; reminderMailer = reminderMailer ...
Note that the ticketsPump pump has two output buffers: openTickets and closedTickets. The reminderMailer pump reads data from the openTickets buffer of the tickets pump.
Transforming data
Use the .process()
method to set the function which processes data:
ticketsPump ;
The argument of .process()
is a function that will be executed after the pump reads a data item.
The function is executed in the context of the pump object, i.e. this
refers to the pump itself. The
function should return a Promise that fulfills when the data is processed (i.e. written into a buffer
or stored elsewhere).
Start and end of pumping
A pump is started by calling the .start()
method. The end
event will be emitted when the
input stream or buffer ended and all output buffers became empty.
pump
Pump group
You often need multiple pumps to complete an ETL task. Pump groups help starting multiple pump in one step, and also enables handling the event when every pump ended:
sendMails = datapumps;sendMails ...;sendMails ...;sendMails start ;
The .addPump()
method creates a new pump with given name and returns it for configuration.
.start()
will start all pumps in the group, while .whenFinished()
returns a Promise the fulfills
when every pump ended (Note: end
event is also emitted).
Encapsulation
Sometimes you wish to encapsulate a part of an ETL process and also use it elsewhere. It is possible
to set an input pump and expose buffers from the group, so it will provide the same interface as a
simple pump (i.e. it has .from()
, .start()
, .buffer()
methods and emits end
event).
Most likely, you want to extend datapumps.Group
class (example is written in CoffeeScript):
require 'datapumps' : -> super @addPump 'emailLookup' mixin MysqlMixin connection process @query'SELECT email FROM user where username = ?' datausername then data.emailAddress = resultemail @bufferwriteAsync data @addPump 'sendMail' from @pump 'emailLookup' process ... # send email to data.emailAddress @bufferwriteAsync recipient: name: dataname email: dataemailAddress @setInputPump 'emailLookup' @expose 'output''sendMail/output'
The Notifier
will behave like pump, but in the inside, it does an email address lookup using
mysql, and sends mail to those addresses. The output buffer of sendMail
pump is filled with
recipient data.
Use the created class like this:
etlProcess = datapumpsgroupetlProcess addPump 'notifier' from <node stream or datapumps buffer> etlProcess addPump 'logger' from etlProcesspump'notifier'buffer process consolelog "Email sent to ()"
Please note that you cannot use .process
method on a group.
Error handling
Errors may occur while data is transferred between systems. Most of the time, you don't want to stop
on the first error but complete the transfer and re-run after fixing problems. Therefore
the pump group has an error buffer (.errorBuffer()
) which can hold ten error messages by default.
When the error buffer fills up, error
event is triggered and .whenFinised()
promise is rejected:
group start ;
You can use the .logErrorsToConsole()
helper method will configure the pump or group to print
errors when processing finished:
group start;
You can use the .logErrorsToLogger()
helper method will configure the pump or group to print
errors to a logger when processing finished:
group start;
This is useful for running the ETL on a server. The logger can be any logging method that contains
an .error()
method such as Winston, Log4js, etc.
Debugging
The following example shows a fingers-crossed type logging, i.e. debug logging is turned on after the first error occured:
require'datapumps' d = group addPump 'test' from dcreateBuffer sealed: true content: 'first''second''third''fourth' process throw 'Start debugging'data if data == 'second' @copy data derrorBufferon 'write' consolelog data dbuffer'test/output'on 'write' consolelog " was written to test/output buffer" dstart
The output:
{ message: [Error: Start debugging], pump: 'test' }
third was written to test/output buffer
fourth was written to test/output buffer
Mixins
The core components of datapumps is only responsible for passing data in a flow-controlled manner. The features required for import, export or transfer is provided by mixins:
- BatchMixin - Processes input in batches. Useful with MysqlMixin or other database writing mixins (batch insert can be much faster than inserting one by one).
- MergeMixin - Enables pump to read from multiple input buffers.
- ObjectTransformMixin - Common object transformation and validation methods
- CsvWriterMixin - Writes csv files using fast-csv package
- ExcelWriterMixin - Writes excel xlsx workbooks
- ExcelReaderMixin - Reads excel xlsx workbooks
- MysqlMixin - Queries and writes mysql databases
- PostgresqlMixin - Queries and writes postgresql databases
- MongodbMixin - Queries and writes mongodb
- RestMixin - Interact with REST services
When you implement new mixins, please fork datapumps and make a pull request.