Events subscriptions CRUD and dispatcher. In an event driven microservices architecture it helps to keep generic events but specific subscriptions.
npm:
npm i subscribe-me
Yarn:
yarn add subscribe-me
import {createMemoryStorage, createSubscriber, createNotifier} from "subscribe-me";
import { from } from "rxjs";
// memory storage is for testing/experiments purpose
const storage = createMemoryStorage();
const subscriber = createSubscriber({storage});
// you can subscribe to an event
const id = subscriber.subscribe({
event: "value-change",
target: "my-api.com/my-endpoint",
// you can specify a criteria for the event payload
// it supports mongodb where clause style
criteria: { previous: { $gte: 50 }, current: { $lt: 50 } }
});
// you can use bufferMilliseconds to setup an event time buffer
// this is usful to reduce the amount of subscriptions storage queries
// it works grouping the events by event type and performing only one query per event type
const notifier = createNotifier({
storage,
bufferMilliseconds: 10000,
});
// getNotification method accept a rxjs stream of events and return a rxjs stream of notifications
const notifications = notifier.getNotifications(
from([
// given the subscription criteria this event should be ignored
{type: "value-change", payload: { previous: 60, current: 54 } },
// this event should trigger a notification
{type: "value-change", payload: { previous: 54, current: 49 } },
])
);
// dispatch notification to console log
// this subscribe is the rxjs Observable subscribe method
notifications.subscribe(console.log);
// you can unsubscribe from the event using the subscription id
subscriber.unsubscribe(id);
As you see in the above example, it is possible to setup an event time buffer using the notifier bufferMilliseconds
setting. Lets say we have the subscriptions stored in a postgres database and we have an income events stream of 1000 events in 10 seconds, if we have the bufferMilliseconds
set to 10000 (10 s) and those 1000 events comes in 3 different event types, the notifier gonna perform 3 queries to the postgres database (one per each event type) in those 10 seconds. If we have the bufferMilliseconds
set to 5000 (5 s), and those 1000 events are evenly distributed in those 10 seconds, we have more or less 500 events (of those 3 types) in each 5 seconds buffer, so the notifier will perform 3 queries in 5 seconds (6 each 10 seconds). So having a longer time buffer reduce the amount of storage accesses. The downside is that the time buffer introduce latency between the incoming events and outcoming notifications.
It is possible to create a postgres storage providing the postgres connection configuration. Besides postgres specific confuguration setting you can provide the subscription table name and the cbhunk size. The database is accesed via a cursor that get the data by chunks, if the chunk size is 1000 then each chunk will contain 1000 table rows.
import { createPostgresStorage } from "subscribe-me";
const storage = createPostgresStorage({
user: "dbuser",
host: "database.server.com",
database: "mydb",
password: "secretpassword",
port: 3211,
// besides the postgres settings you can optionally
// porvide the table name and the chunk size
// the following values are the defaults
table: "event_subscriptions",
chunkSize: 1000,
})
Also you can provide directly the connection pool, so for instance, you can allow postgres get its configuration from environment variables.
import { createPostgresStorage } from "subscribe-me";
import { Pool } from "pg";
const storage = createPostgresStorage({
pool: new Pool(),
table: "event_subscriptions",
chunkSize: 1000,
})
Creates a memory storage, it is used just for tesnting and experiments.
Returns Object the storage to be used to create a notifier or a subscriber.
Creates PostgreSQL storage.
options
Object Postgres configurationoptions.user
string db useroptions.password
string db user passwordoptions.host
string db hostoptions.port
number db hostoptions.database
string db nameoptions.pool
Object or you can provide just a connection pool instance instead of the previous settingsoptions.chunkSize
number the chunk size (optional, default1000
)options.table
string subscription table name (optional, default"event_subscriptions"
)
Returns Object The storage to be used to create a notifier or a subscriber.
Create the suubscriber.
Returns any The subscriber with the subscribe
and unsubscribe
methods.
Creates the notifier.
options
Object configuration options
Returns any The notifier with the getNotification
method, it receive the input event stream and return the notification event stream (rxjs observable)
MIT © Agustin Lascialandare