Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Store full Schema and Instance nesting with InfluxDB records (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexGodbehere authored Aug 2, 2023
1 parent cf54c57 commit c08e0e1
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 348 deletions.
152 changes: 130 additions & 22 deletions lib/mqttclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
import {ServiceClient, SpB, Topic, UUIDs} from "@amrc-factoryplus/utilities";
import {Reader} from "protobufjs";
import {logger} from "../bin/ingester.js";
import Long from "long";
import {InfluxDB, Point} from '@influxdata/influxdb-client'
import {Agent} from 'http'

let dotenv: any = null;
try {
dotenv = await import ('dotenv')
} catch (e) {
}
import Long from "long";
import {InfluxDB, Point, DEFAULT_WriteOptions} from '@influxdata/influxdb-client'
import {Agent} from 'http'

dotenv?.config()

Expand Down Expand Up @@ -64,7 +64,7 @@ let interval: any;

/* points/lines are batched in order to minimize networking and increase performance */

const writeApi = influxDB.getWriteApi(influxOrganisation, 'default', 'ns', {
const writeApi = influxDB.getWriteApi(influxOrganisation, process.env.INFLUX_BUCKET || 'default', 'ns', {
/* the maximum points/lines to send in a single batch to InfluxDB server */
batchSize: batchSize + 1, // don't let automatically flush data
/* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */
Expand Down Expand Up @@ -181,28 +181,138 @@ export default class MQTTClient {
// Don't handle Node births
if (!topic.address.device) return;

let instance = null;
let schema = null;
let topLevelSchema = null;
let schemaUUIDMapping = {};

let topLevelInstance = null;
let instanceUUIDMapping = {};

// Check if this is a Factory+ birth
// If we have a Factory+ payload then process the Schema UUIDs and Instance UUIDs
if (payload.uuid === UUIDs.FactoryPlus) {
instance = payload.metrics.find((metric) => metric.name === "Instance_UUID")?.value;
schema = payload.metrics.find((metric) => metric.name === "Schema_UUID")?.value;

// Schema_UUIDs
payload.metrics.forEach((metric) => {
// If the name ends in Schema_UUID (e.g. Phases/1/Schema_UUID)
if (metric.name.endsWith("Schema_UUID")) {
// Then get the entire string before the /Schema_UUID
let schemaPath = metric.name.split("/").slice(0, -1).join("/");

// If the schemaPath is empty then this is the top level Schema_UUID
if (schemaPath === "") {
// So set the topLevelSchema to this value
topLevelSchema = metric.value;
} else {
// Add this mapping to the schemaUUIDMapping array. This means
// that all metrics that contain this schema path in their name
// will have
schemaUUIDMapping[schemaPath] = metric.value;
}
}
})

// Instance_UUIDs
payload.metrics.forEach((metric) => {
// If the name ends in Instance_UUID (e.g. Phases/1/Instance_UUID)
if (metric.name.endsWith("Instance_UUID")) {
// Then get the entire string before the /Instance_UUID
let instancePath = metric.name.split("/").slice(0, -1).join("/");

// If the instancePath is empty then this is the top level Instance_UUID
if (instancePath === "") {
// So set the topLevelInstance to this value
topLevelInstance = metric.value;
} else {
// Add this mapping to the instanceUUIDMapping array. This means
// that all metrics that contain this instance path in their name
// will have
instanceUUIDMapping[instancePath] = metric.value;
}
}
})
}

// If we've already seen this birth, update it
if (this.aliasResolver?.[topic.address.group]?.[topic.address.node]?.[topic.address.device]) {
logger.info(`🔄 Received updated birth certificate for ${topic.address.group}/${topic.address.node}/${topic.address.device} with Instance_UUID ${instance}`);
logger.info(`🔄 Received updated birth certificate for ${topic.address.group}/${topic.address.node}/${topic.address.device} with Instance_UUID ${topLevelInstance} using Schema_UUID ${topLevelSchema}`);
} else {
logger.info(`👶 Received birth certificate for ${topic.address.group}/${topic.address.node}/${topic.address.device} with Instance_UUID ${instance}`);
logger.info(`👶 Received birth certificate for ${topic.address.group}/${topic.address.node}/${topic.address.device} with Instance_UUID ${topLevelInstance} using Schema_UUID ${topLevelSchema}`);
}

// Store the birth certificate mapping in the alias resolver. This uses the alias as the key and a simplified object containing the name and type as the value.
this.setNestedValue(this.aliasResolver, [topic.address.group, topic.address.node, topic.address.device], payload.metrics.reduce(function (acc, obj) {
let alias = Long.isLong(obj.alias) ? obj.alias.toNumber() : obj.alias;

// Work out all schemas that are involved in this metric.
//
// e.g. Assume the current metric is CNC/Axes/1/BaseAxis/Position/Actual
//
// The schemaUUIDMapping object will contain the following (non-relevant emitted):
// - Axes/1: e39007e9-1427-4867-9d72-1c00c663db15
// - Axes/1/BaseAxis: 777dd941-f426-4355-8130-e144530b1376
// - Axes/1/BaseAxis/Position: 1a2c3594-d311-4f6b-865b-b97db3fa6d42

// Schema_UUIDs

let schemas = [];
// Get all entries in the schemaUUIDMapping object that have keys that fit in the current obj.name
Object.entries(schemaUUIDMapping).forEach(([schemaPath, schemaUUID]) => {
// If the current obj.name contains the schemaPath then add the schemaUUID to the schemas array
if (obj.name.includes(schemaPath)) {
schemas.push(schemaUUID);
}
});

// Get the bottom level schema by finding the the entry in the schemaUUIDMapping object that has
// the key with the most slashes that fits in the current obj.name
let bottomLevelSchema = Object.entries(schemaUUIDMapping).reduce((acc, [schemaPath, schemaUUID]) => {
if (obj.name.includes(schemaPath) && schemaPath.split("/").length > acc.split("/").length) {
return schemaUUID;
} else {
return acc;
}
}, "");

// Instance_UUIDs

let instances = [];
// Get all entries in the instanceUUIDMapping object that have keys that fit in the current obj.name
Object.entries(instanceUUIDMapping).forEach(([instancePath, instanceUUID]) => {
// If the current obj.name contains the instancePath then add the instanceUUID to the instances array
if (obj.name.includes(instancePath)) {
instances.push(instanceUUID);
}
});

// Get the bottom level instance by finding the the entry in the instanceUUIDMapping object that has
// the key with the most slashes that fits in the current obj.name
let bottomLevelInstance = Object.entries(instanceUUIDMapping).reduce((acc, [instancePath, instanceUUID]) => {
if (obj.name.includes(instancePath) && instancePath.split("/").length > acc.split("/").length) {
return instanceUUID;
} else {
return acc;
}
}, "");

acc[alias] = {
instance: instance,
schema: schema,
instance: {
// The top level instance for this device
top: topLevelInstance,

// The last instance before this metric
bottom: bottomLevelInstance,

// All instances between the top and bottom, inclusive
full: instances,
},
schema: {
// The top level schema for this device
top: topLevelSchema,

// The last schema before this metric
bottom: bottomLevelSchema,

// All schemas between the top and bottom, inclusive
full: schemas,
},
name: obj.name,
type: obj.type,
alias: alias
Expand Down Expand Up @@ -296,21 +406,19 @@ export default class MQTTClient {
// Get the path as everything behind the last /
let path = birth.name.substring(0, birth.name.lastIndexOf("/"));

// Here we only currently store the InstanceUUID and SchemaUUID
// of the top-level birth certificate. It would be nice if we could
// also store the InstanceUUID and SchemaUUID of all nested schemas
// but unsure how to handle this in InfluxDB.

writeApi.useDefaultTags({
instance: birth.instance,
schema: birth.schema,
topLevelInstance: birth.instance.top,
bottomLevelInstance: birth.instance.bottom,
usesInstances: birth.instance.top + ':' + birth.instance.full.join(':'),
topLevelSchema: birth.schema.top,
bottomLevelSchema: birth.schema.bottom,
usesSchemas: birth.schema.top + ':' + birth.schema.full.join(':'),
group: topic.address.group,
node: topic.address.node,
device: topic.address.device,
path: path,
});


let numVal = null;

switch (birth.type) {
Expand Down
Loading

0 comments on commit c08e0e1

Please sign in to comment.