Skip to content

Commit

Permalink
Initial implementation (#1)
Browse files Browse the repository at this point in the history
* Start implementing a Sparkplug App.
Using Rx to handle the async communication.

Set up an Observable to watch and decode incoming MQTT packets. Set up
another to handle requested subscriptions to Nodes we want to watch.

* Need a temp directory.

* Resolve aliases.

Create an Observable returning the latest birth. Use this to resolve
aliases in data packets.

* Depend on released version of 'utilities'.
  • Loading branch information
amrc-benmorrow authored Jul 11, 2023
1 parent 0d88023 commit 53e2864
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 8 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
*.swp
node_modules/
tmp/
config.mk
.idea
11 changes: 11 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/* ACS Sparkplug App library.
* Library exports.
* Copyright 2023 AMRC.
*/

export { SPAppError } from "./spapp-error.js";
export { SparkplugApp } from "./sparkplug-app.js";




10 changes: 10 additions & 0 deletions lib/spapp-error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/* ACS Sparkplug App library.
* Sparkplug App exception class.
* Copyright 2023 AMRC.
*/

export class SPAppError extends Error {
constructor (msg, opts) {
super(msg, opts);
}
}
148 changes: 148 additions & 0 deletions lib/sparkplug-app.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/* ACS Sparkplug App library.
* Sparkplug App class.
* Copyright 2023 AMRC.
*/

import rx from "rxjs";

import { Address, SpB, Topic, UUIDs } from "@amrc-factoryplus/utilities";

import { SPAppError } from "./spapp-error.js";

class SparkplugDevice {
constructor (app, opts) {
this.app = app;
this.log = this.app.debug.log.bind(this.app.debug, "device");

if (opts.device) {
this.device = opts.device;
}
else if (opts.address) {
this.address = opts.address;
}
else {
throw new SPAppError("Must supply device or address to constructor");
}
}

async _address () {
const addr = this.address;
if (addr instanceof Address) return addr;
if (addr != null) return Address.parse(addr);

const resolved = await this.app.fplus.Directory.get_device_address(this.device);
if (!resolved)
throw new SPAppError(`Can't resolve device ${this.device}`);
return resolved;
}

_setup_births (addr) {
const cmdesc = this.app.fplus.CmdEsc;

/* XXX We map aliases to strings here. It would be better to map
* to BigInts, or to have the protobuf decoder decode to BigInts
* in the first place, but the only way to convert Longs to
* BigInts is via a string. */
const births = this.packets.pipe(
rx.filter(p => p.type == "BIRTH"),
rx.map(birth => ({
factoryplus: birth.uuid == UUIDs.FactoryPlus,
metrics: new Map(
birth.metrics.map(m => [m.name, m])),
aliases: new Map(
birth.metrics
.filter(m => "alias" in m)
.map(m => [m.alias.toString(), m.name])),
})),
rx.shareReplay(1),
rx.tap(b => this.log("Birth for %s", addr)));

/* XXX The initial delay here is crude. We should be reacting to
* successful subscription instead. */
rx.timer(20, 2000)
.pipe(
rx.takeUntil(births),
rx.tap(() => this.log("Rebirthing %s", addr)),
rx.mergeMap(n => rx.from(cmdesc.rebirth(addr))))
.subscribe();

return births;
}

_setup_metrics () {
/* XXX This resolves aliases on all metrics. We could probably
* avoid this by instead keeping track of the current alias of
* each metric we are interested in. */
return this.packets.pipe(
rx.mergeMap(p => rx.from(p.metrics)),
rx.withLatestFrom(this.births,
(m, b) => {
const rv = {...m};
if (m.alias)
rv.name = b.aliases.get(m.alias.toString());
return rv;
}),
rx.share());
}

async init () {
const { fplus, mqtt } = this.app;

const addr = await this._address();
this.log("Watching %s", addr);

this.packets = this.app.watch_address(addr).pipe(rx.share());
this.births = this._setup_births(addr);
this.metrics = this._setup_metrics();

return this;
}

metric (tag) {
return this.metrics.pipe(
rx.filter(m => m.name == tag),
rx.map(m => m.value));
}
}

export class SparkplugApp {
constructor (opts) {
this.fplus = opts.fplus;
this.debug = opts.fplus.debug;
}

async init () {
const fplus = this.fplus;

const mqtt = this.mqtt = await fplus.MQTT.mqtt_client({});

this.packets = rx.fromEvent(mqtt, "message", (t, m) => [t, m])
.pipe(
rx.map(([t, p]) => ({
topic: Topic.parse(t),
payload: SpB.decodePayload(p),
})));

this.watch = new rx.Subject();
this.watch.pipe(
rx.flatMap(addr => rx.from(
["BIRTH", "DEATH", "DATA"]
.map(t => addr.topic(t)))),
rx.tap(v => this.debug.log("watch", "%o", v)))
.subscribe(topic => mqtt.subscribe(topic));

return this;
}

watch_address (addr) {
this.watch.next(addr);
return this.packets.pipe(
rx.filter(m => m.topic.address.equals(addr)),
rx.map(({topic, payload}) => ({type: topic.type, ...payload})));
}

async device (opts) {
const dev = new SparkplugDevice(this, opts);
return await dev.init();
}
}
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"author": "AMRC",
"license": "MIT",
"dependencies": {
"@amrc-factoryplus/utilities": "1.0.9-bmz1",
"@amrc-factoryplus/utilities": "^1.0.9",
"rxjs": "^7.8.1"
}
}

0 comments on commit 53e2864

Please sign in to comment.