Opserver
A developer-friendly MongoDB oplog event aggregator
About
The MongoDB oplog stores a record of all transactions (i.e., changes) that occur on your MongoDB server. These records are designed to have a minimal footprint including shorthand key names and strange and changing values depending on transaction type. Inevitably any developer attempting to tail the oplog for their own purposes will spend a lot of time googling around for what the keys mean and why the values change. Opserver was created to hopefully ease this pain.
Put simply, Opserver is an event emitter that tails the MongoDB oplog and emits developer-friendly events. Besides attempting to normalize and expand the key, value naming and values for oplog events, Opserver has a few tricks up its sleeve. It can also do up-front filtering based on database, collection, and/or properties of collection records and automatic record retrieval for all transaction types.
To use the MongoDB oplog you must be running your MongoDB instance as a replica set, even if you are not actually replicating your instance.
Table of Contents
Examples
// Get an opserver factory backvar opserverFactory = require(‘opserver’); var opserverOptions = { mongoAdminURI: ‘mongodb://127.0.0.1:27017/admin?replicaSet=test’, mongoAdminUser: 'admin', mongoAdminPass: 'adminPass', mongoReadOnlyUser: 'readUser', mongoReadOnlyPass: 'readPass'}; // Get the opserver via callback or Promise (omit the callback argument)opserverBuilder(opserverOptions, function(opserver) { // Listen for new inserts into a collection opserver.on(‘insert:mydb.users’, function(eventData) { // Access the id of the inserted document console.log(eventData.id); // Access the full inserted document console.log(eventData.doc.firstName); // Access the doc that was sent in the insert console.log(eventData.operation); }); // Update events give you even more useful information opserver.on(‘update:mydb.users.email’, function(eventData) { // `eventData.operation` for update events is the MongoDB update command that was executed console.log(eventData.operation); // Also have access to the raw MongoDB oplog document console.log(oplogDoc); // Quick access to the property that was updated // (the last value after the dot — e.g. email, firstName) console.log(eventData.setVal); // You can also look at the exact query used to update console.log(eventData.query); }); });
Initialization
Initializing an Opserver requires three key arguments:
-
A mongo connection string for the admin database of your MongoDB installation
-
The options needed to connect to that database (either in object or connection string format)
-
The options needed to authenticate as a read only user to the databases you want to listen for events on. This would just be the same user/pass setup on all your databases that you want Opserver to be able to access
Opserver will initialize in the following way:
- Connect to your admin database using the given admin credentials
- List all the databases that are in your MongoDB installation
- Attempt to connect to all the databases with the read only user credentials
- Begin to tail the Mongo oplog
- Start emitting events
Note: The admin user must have the 'clusterAdmin' and 'readAnyDatabase' roles in the admin database
The full options object used for initialization are listed below with a description and their default values:
let opserverOptions = { mongoURI: undefined // URI to database mongoAdminUser: undefined // Admin user mongoAdminPass: undefined // Admin user password mongoAdminConnectionOptions: {}, // Options object passed to `mongo.connect()` mongoReadOnlyUser: undefined // readOnly user mongoReadOnlyPass: undefined // readOnly pasword mongoReadOnlyConnectionOptions: {}, // Options object passed to `mongo.connect()` debugMode: false, // Whether or not to print debugging messages loggerFunctions: { // Optionally pass in your own logger functions info() { // for info, warn, debug, and error messages when in debug mode console.log(...arguments); }, warn() { console.log(...arguments); }, debug() { console.log(...arguments); }, error() { console.log(...arguments); } }, excludes: { // These are paths to exclude from their respective events. updatePaths: [], // Pass in a string to any of these arrays and if that string is found insertPaths: [], // anywhere inside one of the event strings, that event will be ignored deletePaths: [] // ex. 'update:mydb.users.firstName' will ignore any updates of users } // firstName fields in the mydb database. 'firstName' would ignore any}; // update event that had 'firstName' anywhere in its event string.
Event Types
There are two types of events in Opserver: Insert/Delete and Update.
- Insert/Delete event strings are formatted like so:
'insert:DATABASE_NAME.COLLECTION'
/'delete:DATABASE_NAME.COLLECTION'
These are straightforward, you have the name of the operation, a colon, the name of the database, and then a collection within that database.
- Update event strings are formatted like so:
'update:DATABASE_NAME.COLLECTION.PATH.TO.FIELD'
These are more interesting and let you listen for updates to specific fields or subdocuments within a collection. Using
update events combined with Opserver#onMatched
you are able to construct powerful triggers on changes to your data
fairly easily.
Here are the event types taken straight from the code's JSDoc comments:
/** * Insert event. Listen on 'insert:DATABASE' to get insert events for that database. * @example 'insert:mydb.users' * @event Opserver#insert:DATABASE * @type {OpserverEventData} */ /** * Delete event. Listen on 'delete:DATABASE' to get delete events for that database. * @example 'delete:mydb.users' * @event Opserver#delete:DATABASE * @type {OpserverEventData} */ /** * Update event. Listen on 'update:DATABASE.PROPERTY_PATH' to get update events for that database. * @example 'update:mydb.users.preferences.timezone' * @event Opserver#update:DATABASE.PROPERTY_PATH * @type {OpserverEventData} */
Event Data
Your promise or callback returned by any of Opserver's APIs will come with Event Data. This data will include the following fields:
/** * This is the structure of the data that is sent along with Opserver events * @typedef {object} OpserverEventData * @prop {object} oplogDoc - passed in oplogDoc * @prop {string} id - The _id of the mongo doc involved in the operation. * @prop {string} type - The type of MongoDB operation - e.g. insert, update, delete * @prop {object} doc - The actual mongo doc that was involved in the operation. * @prop {object} operation - For inserts/deletes this is the actual doc; for updates it is the command * to update old document. * @prop {*} setVal - The value of the path that is being observed, only for updates. * @prop {string} event - The event that this data was attached to * @prop {object} query - Only for updates. The selection query for the operation. * @prop {string} changedPropPath - Only for updates. The path that is being observed. */
Methods
EventEmitter3 (which Opserver extends) gives most of the usual EventEmitter functions such as
.on
,.removeListener
, etc. Please refer to the EventEmitter3 and Node EventEmitter docs for these methods standard Node Event methods. The methods below are in addition to those standard methods.
Opserver#onAny(<String[]> eventStrings, <Function> listener) => {Function}
onAny()
allows you to register multiple different event listeners that share the same callback. This returns a function that when called will remove all the event listeners.
Opserver#onceMatched(<String> eventString, <Object> valuesToMatch, <Function> [cb]) => {Promise<EventData>}
Register a listener which will only trigger when the EventData of an event matches the
valuesToMatch
object. ThevaluesToMatch
object must be constructed in the form{ 'OBJECT.PATH': VALUE }
e.g.,{ 'doc.firstName': 'Sam' }
Opserver#onMatched(<String> eventString, <Object> valuesToMatch, <Function> listener) => {Function}
Same as
Opserver#onceMatched
but will trigger multiple times until the listener is removed. Returns a function that when called will remove the listener from the event.
TODO
- Completely document code in JSDoc format
- Build and serve JSDoc output on github.io
- Write tests
- Optimize further
- ???