Skip to content

Commit

Permalink
Implement a Driver connection type (#280)
Browse files Browse the repository at this point in the history
This sits alongside the other connection types (for now) and
communicates with an external driver using the new protocol.

Configuration, polling and data packets are implemented. Command packets
and error handling is not.

The Device subclasses have been removed. Where they had functionality
this has been moved into the DeviceConnections. The Devices now purely
handle the Sparkplug side of things, leaving the DeviceConnection to
handle the southbound details.
  • Loading branch information
amrc-benmorrow authored Jun 20, 2024
2 parents e0d69fc + 0b10386 commit 9f5dc98
Show file tree
Hide file tree
Showing 22 changed files with 837 additions and 335 deletions.
6 changes: 6 additions & 0 deletions acs-edge/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ repo?=acs-edge
# Don't set k8s.deployment, the deployment doesn't have a fixed name.

include ${mk}/acs.js.mk

local.build:
npx tsc --project tsconfig.json

local.run: local.build
node build/app.js
14 changes: 9 additions & 5 deletions acs-edge/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
* Copyright 2023 AMRC
*/

import {ServiceClient, UUIDs} from "@amrc-factoryplus/utilities";
import {Translator} from "./lib/translator.js";
import {log} from "./lib/helpers/log.js";
import {GIT_VERSION} from "./lib/git-version.js";
import * as dotenv from 'dotenv';
import sourceMapSupport from 'source-map-support'

import {ServiceClient, UUIDs} from "@amrc-factoryplus/utilities";

import {DriverBroker} from "./lib/driverBroker.js";
import {GIT_VERSION} from "./lib/git-version.js";
import {log} from "./lib/helpers/log.js";
import {Translator} from "./lib/translator.js";

sourceMapSupport.install()
dotenv.config({path: '../.env'});

Expand All @@ -20,9 +23,10 @@ async function run() {

const pollInt = parseInt(process.env.POLL_INT) || 30;
const fplus = await new ServiceClient({ env: process.env }).init();
const broker = new DriverBroker(process.env);

// Once a configuration has been loaded then start up the translator
let transApp = new Translator(fplus, pollInt);
let transApp = new Translator(fplus, pollInt, broker);
process.once('SIGTERM', () => {
log('🔪️SIGTERM RECEIVED');
transApp.stop(true);
Expand Down
37 changes: 37 additions & 0 deletions acs-edge/docs/TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# TODO list for edge-split work

- [x] The Device subclasses need to go. Where they do work this needs to
move into the Connection. In particular some Devices handle
subscription tasks which should move to `startSubscription`.

- [ ] `DeviceConnection.readMetrics` accepts payload format / delimiter
arguments. I don't think any of the drivers use them? This belongs
EA-side.

- [ ] `writeMetrics` also accepts format/delimiter. I'm not clear yet
that it isn't used for this code path. Ideally we want all device
writes to accept a plain Buffer as might be provided from a read.

- [x] The Connection currently handles the poll loop, as part of
`startSubscription`.
* For simple connections this is implemented in the base class and
should be handled by the Device (EA-side).
* Some Connections (MTConnect, OPCUA) can request polling in the
southbound protocol. The driver protocol needs extending to handle
this case.

- [ ] More generally, the Connections shouldn't see the Metrics at all.
They should operate entirely on addresses.

- [x] Multiple Devices may subscribe to a single Connection. The EA-side
Connection will need to track the current list of addresses we are
interested in and push it down to the driver.

- [ ] Devices are currently linked to a single Connection. This is not
necessary, but means we need to:
* Add a `connection` property to each metric definition.
* Change the Device to poll via a central connection manifold rather
than via an individual connection.
* Supply data topic names to a Device in return for addresses when
it subscribes to the connection manifold.
* Poll the manifold using connection/datatopic pairs.
56 changes: 44 additions & 12 deletions acs-edge/lib/device.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

import {
log
log, logf
} from "./helpers/log.js";
import * as fs from "fs";
import {
Expand Down Expand Up @@ -51,6 +51,7 @@ export abstract class DeviceConnection extends EventEmitter {
#intHandles: {
[index: string]: ReturnType<typeof setInterval>
}
#subHandles: Map<string, any>

/**
* Basic class constructor, doesn't do much. Must emit a 'ready' event when complete.
Expand All @@ -63,7 +64,10 @@ export abstract class DeviceConnection extends EventEmitter {
this._type = type;
// Define object of polling interval handles for each device
this.#intHandles = {};
// Collection of subscription handles
this.#subHandles = new Map();
// Emit ready event
/* XXX this has no listeners */
this.emit('ready');
}

Expand Down Expand Up @@ -100,6 +104,24 @@ export abstract class DeviceConnection extends EventEmitter {
writeCallback(err);
}

/**
* Perform any setup needed to read from certain addresses, e.g. set
* up MQTT subscriptions. This does not attempt to detect duplicate
* requests.
* @param addresses Addresses to start watching
*/
async subscribe (addresses: string[]): Promise<any> {
return null;
}

/**
* Undo any setup performed by `subscribe`.
* @param addresses Addresses to stop watching
*/
async unsubscribe (handle: any): Promise<void> {
return;
}

/**
*
* @param metrics Metrics object to watch
Expand All @@ -109,8 +131,9 @@ export abstract class DeviceConnection extends EventEmitter {
* @param deviceId The device ID whose metrics are to be watched
* @param subscriptionStartCallback A function to call once the subscription has been setup
*/
startSubscription(metrics: Metrics, payloadFormat: serialisationType, delimiter: string, interval: number, deviceId: string, subscriptionStartCallback: Function) {

async startSubscription(metrics: Metrics, payloadFormat: serialisationType, delimiter: string, interval: number, deviceId: string, subscriptionStartCallback: Function) {
this.#subHandles.set(deviceId,
await this.subscribe(metrics.addresses));
this.#intHandles[deviceId] = setInterval(() => {
this.readMetrics(metrics, payloadFormat, delimiter);

Expand All @@ -123,9 +146,11 @@ export abstract class DeviceConnection extends EventEmitter {
* @param deviceId The device ID we are cancelling the subscription for
* @param stopSubCallback A function to call once the subscription has been cancelled
*/
stopSubscription(deviceId: string, stopSubCallback: Function) {
async stopSubscription(deviceId: string, stopSubCallback: Function) {
clearInterval(this.#intHandles[deviceId]);
delete this.#intHandles[deviceId];
await this.unsubscribe(this.#subHandles.get(deviceId));
this.#subHandles.delete(deviceId);
stopSubCallback();
}

Expand All @@ -142,7 +167,7 @@ export abstract class DeviceConnection extends EventEmitter {
/**
* Device class represents both the proprietary connection and Sparkplug connections for a device
*/
export abstract class Device {
export class Device {

#spClient: SparkplugNode // The sparkplug client
#devConn: DeviceConnection // The associated device connection to this device
Expand All @@ -151,7 +176,7 @@ export abstract class Device {
_defaultMetrics: sparkplugMetric[] // The default metrics common to all devices
#isAlive: boolean // Whether this device is alive or not
_isConnected: boolean // Whether this device is ready to publish or not
#deathTimer: ReturnType<typeof setTimeout> // A "dead mans handle" or "watchdog" timer which triggers a DDEATH
//#deathTimer: ReturnType<typeof setTimeout> // A "dead mans handle" or "watchdog" timer which triggers a DDEATH
// if allowed to time out
_payloadFormat: serialisationType // The format of the payloads produced by this device
_delimiter: string // String specifying the delimiter character if needed
Expand Down Expand Up @@ -236,8 +261,8 @@ export abstract class Device {
this.#populateTemplates(options.templates);
}
// Add default metrics to the device metrics object
// To be populated further by child class as custom manipulations need to take place
this._metrics = new Metrics(this._defaultMetrics);
this._metrics.add(options.metrics);
// Flag to keep track of device online status
this.#isAlive = false;

Expand All @@ -246,9 +271,9 @@ export abstract class Device {

// Create watchdog timer which, if allowed to elapse, will set the device as offline
// This watchdog is kicked by several read/write functions below
this.#deathTimer = setTimeout(() => {
this.#publishDDeath();
}, 10000);
//this.#deathTimer = setTimeout(() => {
// this.#publishDDeath();
//}, 10000);

//What to do when the device is ready
//We Just need to sub to metric changes
Expand All @@ -266,16 +291,19 @@ export abstract class Device {
}

_handleData(obj: { [p: string]: any }, parseVals: boolean) {
logf("_handleData %s (%s) %O", this._name, parseVals, obj);
// Array to keep track of values that changed
let changedMetrics: sparkplugMetric[] = [];
// Iterate through each key in obj
for (let addr in obj) {
// Get all payload paths registered for this address
const paths = this._metrics.getPathsForAddr(addr);
logf("paths for %s: %s", addr, paths);
// Iterate through each path
paths.forEach((path) => {
// Get the complete metric according to its address and path
const metric = this._metrics.getByAddrPath(addr, path);
logf("metric for %s:%s: %O", addr, path, metric);
// If the metric can be read i.e. GET method
if (typeof metric.properties !== "undefined" && (metric.properties.method.value as string).search(
/^GET/g) > -1) {
Expand All @@ -290,6 +318,8 @@ export abstract class Device {
this._delimiter
) : obj[addr];

logf("parsed new val: %O", newVal);

// Test if the value is a bigint and convert it to a Long. This is a hack to ensure that the
// Tahu library works - it only accepts Longs, not bigints.
if (typeof newVal === "bigint") {
Expand All @@ -309,6 +339,8 @@ export abstract class Device {
this._payloadFormat,
this._delimiter
);
logf("updating metric %s:%s ts %s val %O",
addr, path, timestamp, newVal);

// Update the metric value and push it to the array of changed metrics
changedMetrics.push(this._metrics.setValueByAddrPath(addr, path, newVal, timestamp));
Expand All @@ -330,7 +362,7 @@ export abstract class Device {
// Kick the watchdog timer to prevent the device dying
_refreshDeathTimer() {
// Reset timeout to it's initial value
this.#deathTimer.refresh();
//this.#deathTimer.refresh();
}

/**
Expand Down Expand Up @@ -487,7 +519,7 @@ export abstract class Device {
this._stopMetricSubscription();

// Stop the watchdog timer so that we can instantly stop
clearTimeout(this.#deathTimer);
//clearTimeout(this.#deathTimer);
}


Expand Down
22 changes: 0 additions & 22 deletions acs-edge/lib/devices/EtherNetIP.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,25 +116,3 @@ export class EtherNetIPConnection extends DeviceConnection {
await this.#client.disconnect();
}
}


/**
* Define the device
*/
export class EtherNetIPDevice extends Device {
#devConn: EtherNetIPConnection

constructor(spClient: SparkplugNode, devConn: EtherNetIPConnection, options: deviceOptions) {

// Force fixed buffer for EtherNet/IP connection before calling super
options.payloadFormat = serialisationType.fixedBuffer;

super(spClient, devConn, options);

// Assign device connection to class attribute
this.#devConn = devConn;

// Add metrics from options argument
this._metrics.add(options.metrics);
}
}
57 changes: 19 additions & 38 deletions acs-edge/lib/devices/MQTT.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,24 @@ export class MQTTConnection extends DeviceConnection {
this.emit('data', {});
}

async subscribe(topic: string) {
this.#client.subscribe(topic, (err) => {
if (err) {
console.log(err);
}
});
async subscribe (addresses: string[]) {
const topics = addresses.filter(t => t);
const granted = await this.#client.subscribeAsync(topics);
const failed = granted
.filter(g => g.qos == 128)
.map(g => g.topic)
.join(", ");
if (failed)
log(`⚠️ Could not subscribe to southbound topics: ${failed}`);
return granted
.filter(g => g.qos != 128)
.map(g => g.topic);
}

/* This accepts the return value from `subscribe`. */
async unsubscribe (handle: any) {
const topics = handle as string[];
await this.#client.unsubscribeAsync(topics);
}

/**
Expand All @@ -107,7 +119,7 @@ export class MQTTConnection extends DeviceConnection {
* @param delimiter
*/
writeMetrics(metrics: Metrics, writeCallback: Function, payloadFormat?: string, delimiter?: string) {
let err = null;
let err: Error|null = null;
metrics.addresses.forEach((addr) => {
let payload = writeValuesToPayload(metrics.getByAddress(addr), payloadFormat || "");
if (payload && payload.length) {
Expand All @@ -128,34 +140,3 @@ export class MQTTConnection extends DeviceConnection {
}
}


export class MQTTDevice extends Device {
#devConn: MQTTConnection

constructor(spClient: SparkplugNode, devConn: MQTTConnection, options: deviceOptions) {
super(spClient, devConn, options);
this.#devConn = devConn;

this._metrics.add(options.metrics);
this._metrics.addresses.forEach((topic) => {
if (topic) this.#devConn.subscribe(topic);
})

// Define function for handling data pushed to device asynchronously
// this.#devConn.on("asyncData", async (topic: string, msg: any) => {
// let changedMetrics: sparkplugMetric[] = [];
// this._metrics.getPathsForAddr(topic).forEach((path) => {
// const targetMetric = this._metrics.getByAddrPath(topic, path);
// const newVal = parseValueFromPayload(msg, targetMetric, this._payloadFormat, this._delimiter);
// if (!util.isDeepStrictEqual(targetMetric.value, newVal)) {
// this._metrics.setValueByAddrPath(topic, path, newVal);
// changedMetrics.push(targetMetric);
// }
// })
// if (changedMetrics.length) {
// this.onConnData(changedMetrics);
// }
// });
}

}
Loading

0 comments on commit 9f5dc98

Please sign in to comment.