@flowcore/sdk-data-pump-client
TypeScript icon, indicating that this package has built-in type declarations

1.3.0 • Public • Published

Flowcore SDK Module - Data Pump Client

A Flowcore SDK module that provides a lightweight client for fetching events from the Flowcore platform

Installation

install with npm:

npm install @flowcore/sdk-data-pump-client @flowcore/sdk-oidc-client

or yarn:

yarn add @flowcore/sdk-data-pump-client @flowcore/sdk-oidc-client

Usage

Create a new instance of the Data Pump client:

import {DataPump} from '@flowcore/sdk-data-pump-client';
import {OidcClient} from "@flowcore/sdk-oidc-client";

const client = new OidcClient("your client id", "your client secret", "well known endpoint");

const dataPump = new DataPump("https://graph.api.flowcore.io/graphql", client);

You can configure the page size in the last argument of the constructor, the default is 1000.

Then create a RXJS observable to listen to the events:

import {Subject} from 'rxjs';
import {SourceEvent} from "@flowcore/sdk-data-pump-client";
import {subscribe} from "graphql/execution";

const subject = new Subject<SourceEvent>();

subject.subscribe({
  next: (event) => {
    console.log(event);
  },
  complete: () => {
    console.log("completed");
  },
});

Then you can fetch all events with the fetchAllEvents method:

await dataPump.fetchAllEvents(subject, {
  dataCoreId: "your data core id",
  aggregator: "your aggregator",
  eventTypes: ["your event type"],
});

This will loop through all the events for the specified event types and push them to the observable.

You can specify how many time buckets should be run in parallel with the parallel argument, the default is 1.

To fetch events for a specific time bucket you can use the fetchEvents method:

await dataPump.fetchEvents(subject, {
  dataCoreId: "your data core id",
  aggregator: "your aggregator",
  eventTypes: ["your event type"],
  timeBucket: "your time bucket",
});

This will fetch all events for the specified time bucket and push them to the observable.

To close the observable set the last argument of the fetchEvents method to true.

Limits

You can specify the from and to event id to fetch events between a specific range:

await dataPump.fetchEvents(subject, {
  dataCoreId: "your data core id",
  aggregator: "your aggregator",
  eventTypes: ["your event type"],
  timeBucket: "your time bucket",
  afterEventId: "your from event id",
  beforeEventId: "your to event id",
});

These are both exclusive, meaning that the events with the specified id's will not be included in the result. Either and both can be omitted.

Indexes

You can fetch time buckets for a specific event type with the fetchIndexes method:

await dataPump.fetchIndexes({
  dataCoreId: "your data core id",
  aggregator: "your aggregator",
  eventType: "your event type",
});

This will return a list of time buckets for the specified event type.

You can also specify the from and to time bucket with the from and to arguments. and it will return the time buckets between the specified range.

Pumping Events

You can also pump events to a destination with the pumpEvents method:

const abortController = new AbortController();

await dataPump.pumpEvents(
  "cache-key",
  "observable",
  {
    dataCoreId: "your data core id",
    aggregator: "your aggregator",
    eventTypes: ["your event type"],
  },
  abortController
);

this will pump all events using backfilling, then switch to live mode and pump all new events. The cache-key is used to store the last event id in the cache, so that the client can resume from the last event id if it is restarted. The abort controller can be used to stop the pumping.

Note: the default cache is in memory, you can implement your own cache by extending the SimpleCache class and passing it to the DataPump constructor. via the options object.

Note: You can also specify from time bucket with the from argument and control the backfilling parallelism with the parallel argument.

Note: When not passing the abort controller, the data pump will run once until it has fetched all events currently present in the data container.

Reseting the data pump

You can reset the data pump with the reset method:

await dataPump.reset("cache-key");

Note: it is only possible to reset the data pump if it is not currently running. You can check if the data pump is running with the isRunning method. To stop the data pump you can use the abort controller.

Creating your own pump

You can create your own pump by manually calling the pumpPage method:

let cursor: string | undefined = undefined;
do {
  const result = await dataPump.pumpPage({
    dataCoreId: "your data core id",
    aggregator: "your aggregator",
    eventTypes: ["your event type"],
    timeBucket: "your time bucket",
    afterEventId: "your from event id" | undefined,
    beforeEventId: "your to event id" | undefined,
    cursor: "your cursor" | undefined,
  });

  for (const event of result.events) {
    console.log(event);
  }
  cursor = result.cursor;
} while(cursor);

This will allow you to control the flow of events and prevent the dump from pumping too many events at once.

Development

yarn install

or with npm:

npm install

Readme

Keywords

Package Sidebar

Install

npm i @flowcore/sdk-data-pump-client

Weekly Downloads

7

Version

1.3.0

License

MIT

Unpacked Size

129 kB

Total Files

35

Last publish

Collaborators

  • jbiskur