Skip to content

Commit

Permalink
Handle unsubscription correctly (#11)
Browse files Browse the repository at this point in the history
This pulls out the parts of #10 which are not concerned with
reconnecting to MQTT. Reconnecting is simply not working; the logic to
attempt to handle it is getting more and more complicated, and we need a
different solution for now.

* Make sure we don't keep attempting to rebirth a device we aren't
interested in any more.
* Track MQTT subscriptions. Unsubscribe from a topic which has no more
consumers.
* We were incorrectly trying to resolve aliases on a BIRTH.
* Make available a sequence of failed subscription attempts. This will
allow client to handle as they see fit.
  • Loading branch information
amrc-benmorrow authored Dec 20, 2023
2 parents ace219f + 9f693d4 commit a1141a6
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 139 deletions.
209 changes: 72 additions & 137 deletions lib/sparkplug-app.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,161 +5,96 @@

import rx from "rxjs";

import { Address, SpB, Topic, UUIDs } from "@amrc-factoryplus/utilities";
import { SpB, Topic } from "@amrc-factoryplus/utilities";

import { SPAppError } from "./spapp-error.js";
import { SparkplugDevice } from "./sparkplug-device.js";

class SparkplugDevice {
constructor (app, opts) {
this.app = app;
this.log = this.app.debug.log.bind(this.app.debug, "device");

this.address = this._setup_address(opts);
this.packets = this._setup_packets();
this.births = this._setup_births(opts.rebirth);
this.metrics = this._setup_metrics();

return this;
}

/* XXX This should be more dynamic. In both cases we should be
* tracking the relevant source of information. */
_setup_address (opts) {
const fp = this.app.fplus;

const resolver =
opts.address ? rx.of(Address.parse(opts.address))
: opts.node ? fp.ConfigDB
.get_config(UUIDs.App.SparkplugAddress, opts.node)
.then(add => add && new Address(add.group_id, add.node_id))
: opts.device ? fp.Directory.get_device_address(opts.device)
: null;
if (resolver == null)
throw new SPAppError("You must provide a device to watch");

/* Return an endless sequence. This is for future compat when we
* track the device's address. */
return rx.concat(resolver, rx.NEVER).pipe(rx.share());
export class SparkplugApp {
constructor (opts) {
this.fplus = opts.fplus;
this.debug = opts.fplus.debug;
}

_setup_packets () {
return this.address.pipe(
rx.tap(addr => this.log("Watching %s", addr)),
rx.switchMap(addr => this.app.watch_address(addr)),
rx.share(),
);
}
async init () {
const fplus = this.fplus;

_setup_births (opts) {
const cmdesc = this.app.fplus.CmdEsc;

const timeout = opts?.timeout ?? 5*60*1000;
const rebirth = opts?.interval ?? 2000;

/* XXX We map aliases to strings here. It would be better to map
* to BigInts, or to have the protobuf decoder decode to BigInts
* in the first place, but the only way to convert Longs to
* BigInts is via a string. */
const births = this.packets.pipe(
rx.filter(p => p.type == "BIRTH"),
rx.map(birth => ({
address: birth.address,
factoryplus: birth.uuid == UUIDs.FactoryPlus,
metrics: new Map(
birth.metrics.map(m => [m.name, m])),
aliases: new Map(
birth.metrics
.filter(m => "alias" in m)
.map(m => [m.alias.toString(), m.name])),
})),
rx.timeout({ first: timeout }),
rx.shareReplay(1),
rx.tap(b => this.log("Birth for %s", b.address)));

/* XXX The initial delay here is crude. We should be reacting to
* successful MQTT subscription instead. */
const rebirths = rx.timer(20, rebirth)
.pipe(
rx.takeUntil(births),
rx.withLatestFrom(this.address),
)
.subscribe(([ix, addr]) => {
this.log("Rebirthing %s", addr);
cmdesc.rebirth(addr)
.catch(e => this.log("Error rebirthing %s: %s", addr, e));
});

return births;
}
const mqtt = this.mqtt = await fplus.MQTT.mqtt_client({});

_setup_metrics () {
/* XXX This resolves aliases on all metrics. We could probably
* avoid this by instead keeping track of the current alias of
* each metric we are interested in. */
return this.packets.pipe(
rx.mergeMap(p => rx.from(p.metrics)),
rx.withLatestFrom(this.births,
(m, b) => {
const rv = {...m};
if (m.alias)
rv.name = b.aliases.get(m.alias.toString());
return rv;
}),
rx.share());
}
this.packets = this._init_packets();
this.topics = new Map();
this.failed_subscriptions = new rx.Subject();

async init () {
return this;
}

metric (tag) {
return this.metrics.pipe(
rx.filter(m => m.name == tag),
rx.map(m => m.value));
/* Sparkplug packets received from the broker */
_init_packets () {
return rx.fromEvent(this.mqtt, "message",
(t, p) => ({
topic: t,
payload: SpB.decodePayload(p),
}))
.pipe(rx.share());
}
}

export class SparkplugApp {
constructor (opts) {
this.fplus = opts.fplus;
this.debug = opts.fplus.debug;
subscribe (topic) {
this.debug.log("topic", "Subscribing to %s", topic);
this.mqtt.subscribeAsync(topic)
.then(grs => grs.filter(gr => gr.qos & 0x80)
.map(gr => gr.topic)
.forEach(t => this.failed_subscriptions.next(t)));
}

async init () {
const fplus = this.fplus;

const mqtt = this.mqtt = await fplus.MQTT.mqtt_client({});
unsubscribe (topic) {
this.debug.log("topic", "Unubscribing from %s", topic);
this.mqtt.unsubscribe(topic);
}

this.packets = rx.fromEvent(mqtt, "message", (t, m) => [t, m])
.pipe(
rx.map(([t, p]) => ({
topic: Topic.parse(t),
payload: SpB.decodePayload(p),
})));

this.watch = new rx.Subject();
this.watch.pipe(
rx.flatMap(addr => rx.from(
["BIRTH", "DEATH", "DATA"]
.map(t => addr.topic(t)))),
rx.tap(v => this.debug.log("watch", "%o", v)))
.subscribe(topic => mqtt.subscribe(topic));
/* Return a sequence following a particular topic. These sequences
* are shared and cached so we can track the subscriptions we need. */
watch_topic (topic) {
return rx.defer(() => this._watch_topic(topic));
}

return this;
_watch_topic (topic) {
const log = this.debug.log.bind(this.debug, "topic");

if (this.topics.has(topic)) {
log("Using cached seq for %s", topic);
return this.topics.get(topic);
}

log("Starting new seq for %s", topic);
const seq = this.packets.pipe(
rx.filter(p => p.topic == topic),
/* Teardown when we have no more consumers */
rx.finalize(() => {
this.topics.delete(topic);
this.unsubscribe(topic);
}),
/* Delay 5s after we have no subscribers in case we get some
* more. This avoids churn in our MQTT subscriptions. */
rx.share({ resetOnRefCountZero: () => rx.timer(5000) }),
);
this.subscribe(topic);
this.topics.set(topic, seq);
return seq;
}

/* This subscribes to MQTT when the method is called. This is
* incorrect: our MQTT subscriptions should be driven by the
* sequence subscriptions. */
watch_address (addr) {
this.watch.next(addr);
return this.packets.pipe(
rx.filter(m => m.topic.address.equals(addr)),
rx.map(({topic, payload}) => ({
type: topic.type,
address: topic.address,
...payload,
})));
/* Return a sequence of all packets from this address. types is a
* list of Sparkplug topic types. */
watch_address (addr, types) {
types ??= ["BIRTH", "DEATH", "DATA"];

const topics = types
.map(typ => addr.topic(typ))
.map(top => this.watch_topic(top));
return rx.merge(...topics).pipe(
rx.map(p => {
const top = Topic.parse(p.topic);
return { ...top, ...p.payload };
}),
);
}

device (opts) {
Expand Down
146 changes: 146 additions & 0 deletions lib/sparkplug-device.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* ACS Sparkplug App library.
* Class representing an individual Sparkplug device we are watching.
* Copyright 2023 AMRC
*/

import rx from "rxjs";

import * as rxx from "@amrc-factoryplus/rx-util";
import { Address, UUIDs } from "@amrc-factoryplus/utilities";

import { SPAppError } from "./spapp-error.js";

export class SparkplugDevice {
constructor (app, opts) {
this.app = app;
this.log = this.app.debug.log.bind(this.app.debug, "device");

this.address = this._setup_address(opts);
this.packets = this._setup_packets();
this.births = this._setup_births(opts.rebirth);
this.metrics = this._setup_metrics();

return this;
}

/* XXX This should be more dynamic. In both cases we should be
* tracking the relevant source of information. */
_setup_address (opts) {
const fp = this.app.fplus;

const resolver =
opts.address ? rx.of(Address.parse(opts.address))
: opts.node ? fp.ConfigDB
.get_config(UUIDs.App.SparkplugAddress, opts.node)
.then(add => add && new Address(add.group_id, add.node_id))
: opts.device ? fp.Directory.get_device_address(opts.device)
: null;
if (resolver == null)
throw new SPAppError("You must provide a device to watch");

/* Return an endless sequence. This is for future compat when we
* track the device's address. */
return rx.concat(resolver, rx.NEVER).pipe(
/* Ensure new subscribers see the last address. This uses
* shareReplay for the moment, as we never update the
* address. When we do it should use rxx.shareLatest. */
rx.shareReplay(1),
);
}

/* XXX This should maybe not exist, or should be a rx.merge of
* sequences directly from app.watch_topic. Perhaps
* app.watch_address shouldn't exist at all? We probably mostly want
* the packet types separately. */
_setup_packets () {
return this.address.pipe(
rx.tap(addr => this.log("Watching %s", addr)),
rx.switchMap(addr => this.app.watch_address(addr)),
rx.share(),
);
}

_setup_births (rebirth) {
rebirth ??= 5000;

/* XXX We map aliases to strings here. It would be better to map
* to BigInts, or to have the protobuf decoder decode to BigInts
* in the first place, but the only way to convert Longs to
* BigInts is via a string. */
const births = this.packets.pipe(
rx.filter(p => p.type == "BIRTH"),
/* If we don't get a birth, rebirth and retry */
rx.timeout({ first: rebirth }),
rx.retry({ delay: e => this.rebirth() }),
rx.map(birth => ({
address: birth.address,
factoryplus: birth.uuid == UUIDs.FactoryPlus,
metrics: birth.metrics,
aliases: new Map(
birth.metrics
.filter(m => "alias" in m)
.map(m => [m.alias.toString(), m.name])),
})),
rxx.shareLatest(),
rx.tap(b => this.log("Birth for %s", b.address)),
);

return births;
}

_setup_metrics () {
/* XXX This resolves aliases on all metrics. We could probably
* avoid this by instead keeping track of the current alias of
* each metric we are interested in. */

const births = this.births;
/* Resolve aliases on DATA packets */
const data = this.packets.pipe(
rx.filter(p => p.type == "DATA"),
/* we can't decode data packets if we don't have a birth */
rx.skipUntil(births),
rx.withLatestFrom(births, (data, birth) => {
const aliases = birth.aliases;
const metrics = data.metrics.map(m => {
if (m.alias) {
const name = aliases.get(m.alias.toString());
if (name) return { ...m, name };
this.log("Can't resolve alias %s for %s",
m.alias, data.address);
}
return m;
});
return { ...data, metrics };
}),
);

return rx.merge(births, data).pipe(
rx.mergeMap(p => rx.from(p.metrics)),
rx.share(),
);
}

async init () {
return this;
}

/* Send a rebirth request. */
async rebirth () {
const addr = await rx.firstValueFrom(this.address);
this.log("Rebirthing %s", addr);
try {
await this.app.fplus.CmdEsc.rebirth(addr);
}
catch (e) {
this.log("Error rebirthing %s: %s", addr, e);
}
}

metric (tag) {
return this.metrics.pipe(
rx.filter(m => m.name == tag),
rx.map(m => m.value));
}
}

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@amrc-factoryplus/sparkplug-app",
"version": "0.0.2",
"version": "0.0.3",
"description": "A Javascript library providing Sparkplug Application functionality",
"main": "lib/index.js",
"type": "module",
Expand All @@ -11,7 +11,8 @@
"author": "AMRC",
"license": "MIT",
"dependencies": {
"@amrc-factoryplus/utilities": "^1.0.9",
"@amrc-factoryplus/rx-util": "^0.0.1",
"@amrc-factoryplus/utilities": "^1.2.2",
"rxjs": "^7.8.1"
}
}

0 comments on commit a1141a6

Please sign in to comment.