Skip to content

yaplas/subscribe-me

Repository files navigation

subscribe-me

Generated with nod NPM version Build Status Coverage Status

Events subscriptions CRUD and dispatcher. In an event driven microservices architecture it helps to keep generic events but specific subscriptions.

Install

npm:

npm i subscribe-me

Yarn:

yarn add subscribe-me

Usage

import {createMemoryStorage, createSubscriber, createNotifier} from "subscribe-me";
import { from } from "rxjs";

// memory storage is for testing/experiments purpose
const storage = createMemoryStorage();
const subscriber = createSubscriber({storage});

// you can subscribe to an event
const id = subscriber.subscribe({
    event: "value-change",
    target: "my-api.com/my-endpoint",
    // you can specify a criteria for the event payload
    // it supports mongodb where clause style
    criteria: { previous: { $gte: 50 }, current: { $lt: 50 } }
});

// you can use bufferMilliseconds to setup an event time buffer
// this is usful to reduce the amount of subscriptions storage queries
// it works grouping the events by event type and performing only one query per event type
const notifier = createNotifier({
    storage,
    bufferMilliseconds: 10000,
});

// getNotification method accept a rxjs stream of events and return a rxjs stream of notifications
const notifications = notifier.getNotifications(
    from([
        // given the subscription criteria this event should be ignored
        {type: "value-change", payload: { previous: 60, current: 54 } },
        // this event should trigger a notification
        {type: "value-change", payload: { previous: 54, current: 49 } },
    ])
);

// dispatch notification to console log
// this subscribe is the rxjs Observable subscribe method
notifications.subscribe(console.log);

// you can unsubscribe from the event using the subscription id
subscriber.unsubscribe(id);

Events Time Buffer

As you see in the above example, it is possible to setup an event time buffer using the notifier bufferMilliseconds setting. Lets say we have the subscriptions stored in a postgres database and we have an income events stream of 1000 events in 10 seconds, if we have the bufferMilliseconds set to 10000 (10 s) and those 1000 events comes in 3 different event types, the notifier gonna perform 3 queries to the postgres database (one per each event type) in those 10 seconds. If we have the bufferMilliseconds set to 5000 (5 s), and those 1000 events are evenly distributed in those 10 seconds, we have more or less 500 events (of those 3 types) in each 5 seconds buffer, so the notifier will perform 3 queries in 5 seconds (6 each 10 seconds). So having a longer time buffer reduce the amount of storage accesses. The downside is that the time buffer introduce latency between the incoming events and outcoming notifications.

PostgreSQL Storage

It is possible to create a postgres storage providing the postgres connection configuration. Besides postgres specific confuguration setting you can provide the subscription table name and the cbhunk size. The database is accesed via a cursor that get the data by chunks, if the chunk size is 1000 then each chunk will contain 1000 table rows.

import { createPostgresStorage } from "subscribe-me";

const storage = createPostgresStorage({
    user: "dbuser",
    host: "database.server.com",
    database: "mydb",
    password: "secretpassword",
    port: 3211,
    // besides the postgres settings you can optionally
    // porvide the table name and the chunk size
    // the following values are the defaults
    table: "event_subscriptions",
    chunkSize: 1000,
})

Also you can provide directly the connection pool, so for instance, you can allow postgres get its configuration from environment variables.

import { createPostgresStorage } from "subscribe-me";
import { Pool } from "pg";

const storage = createPostgresStorage({
    pool: new Pool(),
    table: "event_subscriptions",
    chunkSize: 1000,
})

API

Table of Contents

createMemoryStorage

Creates a memory storage, it is used just for tesnting and experiments.

Returns Object the storage to be used to create a notifier or a subscriber.

createPostgresStorage

Creates PostgreSQL storage.

Parameters

  • options Object Postgres configuration
    • options.user string db user
    • options.password string db user password
    • options.host string db host
    • options.port number db host
    • options.database string db name
    • options.pool Object or you can provide just a connection pool instance instead of the previous settings
    • options.chunkSize number the chunk size (optional, default 1000)
    • options.table string subscription table name (optional, default "event_subscriptions")

Returns Object The storage to be used to create a notifier or a subscriber.

createSubscriber

Create the suubscriber.

Parameters

  • options Object configuration options
    • options.storage Object storage object (e.g. memory storage)

Returns any The subscriber with the subscribe and unsubscribe methods.

createNotifier

Creates the notifier.

Parameters

  • options Object configuration options
    • options.storage Object storage object (e.g. memory storage)
    • options.bufferMilliseconds number setup an event time buffer, useful to reduce significantly the amount of subscription storage accesses

Returns any The notifier with the getNotification method, it receive the input event stream and return the notification event stream (rxjs observable)

License

MIT © Agustin Lascialandare

About

Events subscriptions CRUD and dispatcher

Resources

License

Stars

Watchers

Forks

Packages

No packages published