From 156ab2b0ef61cc05d576b07e47d8b3f3e3c8390c Mon Sep 17 00:00:00 2001 From: patuwwy Date: Mon, 25 Sep 2023 22:24:20 +0000 Subject: [PATCH] Restore piping instance --- packages/host/src/lib/csi-controller.ts | 14 +-- packages/host/src/lib/csi-dispatcher.ts | 109 ++++++++++-------- packages/host/src/lib/host.ts | 6 +- .../src/lib/serviceDiscovery/sd-adapter.ts | 2 + 4 files changed, 75 insertions(+), 56 deletions(-) diff --git a/packages/host/src/lib/csi-controller.ts b/packages/host/src/lib/csi-controller.ts index f635c5920..0de68df81 100644 --- a/packages/host/src/lib/csi-controller.ts +++ b/packages/host/src/lib/csi-controller.ts @@ -63,6 +63,7 @@ type Events = { const BPMux = require("bpmux").BPMux; +export type CSIControllerInfo = { ports?: any; created?: Date; started?: Date; ended?: Date; }; /** * Handles all Instance lifecycle, exposes instance's HTTP API. * @@ -95,7 +96,7 @@ export class CSIController extends TypedEmitter { args: Array | undefined; controlDataStream?: DataStream; router?: APIRoute; - info: { ports?: any; created?: Date; started?: Date; ended?: Date; } = {}; + info: CSIControllerInfo = {}; status: InstanceStatus; terminated?: { exitcode: number; reason: string; }; provides?: string; @@ -111,6 +112,8 @@ export class CSIController extends TypedEmitter { apiOutput = new PassThrough(); apiInputEnabled = true; + executionTime: number = -1; + /** * Topic to which the output stream should be routed */ @@ -186,11 +189,6 @@ export class CSIController extends TypedEmitter { this.logger = new ObjLogger(this, { id: this.id }); - this.logger.debug("Constructor executed", arguments); - - // eslint-disable-next-line no-console - console.log("Constructor executed", arguments); - this.status = InstanceStatus.INITIALIZING; this.upStreams = [ @@ -205,8 +203,8 @@ export class CSIController extends TypedEmitter { ]; } - async start() { - const i = new Promise((res, rej) => { + async start(): Promise { + const i = new Promise((res, rej) => { this.initResolver = { res, rej }; this.startInstance(); }); diff --git a/packages/host/src/lib/csi-dispatcher.ts b/packages/host/src/lib/csi-dispatcher.ts index 6a779596d..39363bc0a 100644 --- a/packages/host/src/lib/csi-dispatcher.ts +++ b/packages/host/src/lib/csi-dispatcher.ts @@ -5,13 +5,16 @@ import { RunnerMessageCode } from "@scramjet/symbols"; import { HostProxy, ICommunicationHandler, IObjectLogger, InstanceConfig, MessageDataType, STHConfiguration, STHRestAPI, SequenceInfo } from "@scramjet/types"; import { StartSequencePayload } from "@scramjet/types/src/rest-api-sth"; import { TypedEmitter } from "@scramjet/utility"; -import { CSIController } from "./csi-controller"; +import { CSIController, CSIControllerInfo } from "./csi-controller"; import { InstanceStore } from "./instance-store"; -import SequenceStore from "./sequenceStore"; -import { SocketServer } from "./socket-server"; +import { ServiceDiscovery } from "./serviceDiscovery/sd-adapter"; +import { ContentType } from "./serviceDiscovery/contentType"; +import TopicId from "./serviceDiscovery/topicId"; +import { Readable, Writable } from "stream"; type errorEventData = {id:string, err: any } -type endEventData = {id:string, code:number } +type endEventData = { id: string, code: number, info: CSIControllerInfo & { executionTime: number }, sequence: SequenceInfo}; + type Events = { pang: (payload: MessageDataType) => void; hourChime: () => void; @@ -24,16 +27,16 @@ type Events = { export class CSIDispatcher extends TypedEmitter { public logger: IObjectLogger; - //private socketServer: SocketServer; public instancesStore: typeof InstanceStore; private STHConfig: STHConfiguration; + private serviceDiscovery: ServiceDiscovery; - constructor(_socketServer: SocketServer, instancesStore: typeof InstanceStore, _sequenceStore: SequenceStore, STHConfig: STHConfiguration) { + constructor(instancesStore: typeof InstanceStore, serviceDiscovery: ServiceDiscovery, STHConfig: STHConfiguration) { super(); this.logger = new ObjLogger(this); - //this.socketServer = socketServer; this.instancesStore = instancesStore; this.STHConfig = STHConfig; + this.serviceDiscovery = serviceDiscovery; } async createCSIController( @@ -53,7 +56,6 @@ export class CSIDispatcher extends TypedEmitter { communicationHandler.logger.pipe(this.logger, { end: false }); csiController.on("error", (err) => { - //this.pushTelemetry("Instance error", { ...err }, "error"); this.logger.error("CSIController errored", err.message, err.exitcode); this.emit("error", { id, err }); }); @@ -67,47 +69,46 @@ export class CSIDispatcher extends TypedEmitter { } if (data.requires && !csiController.inputRouted && data.contentType) { - this.logger.trace("Routing Sequence input to topic", data.requires); + this.logger.trace("Routing Sequence topic to input", data.requires); - // await this.serviceDiscovery.routeTopicToStream( - // { topic: data.requires, contentType: data.contentType! }, - // csiController.getInputStream() - // ); + await this.serviceDiscovery.routeTopicToStream( + { topic: new TopicId(data.requires), contentType: data.contentType as ContentType }, + csiController.getInputStream() + ); csiController.inputRouted = true; - // await this.serviceDiscovery.update({ - // requires: data.requires, contentType: data.contentType!, topicName: data.requires - // }); + await this.serviceDiscovery.update({ + requires: data.requires, contentType: data.contentType, topicName: data.requires + }); } if (data.provides && !csiController.outputRouted && data.contentType) { this.logger.trace("Routing Sequence output to topic", data.provides); - // await this.serviceDiscovery.routeStreamToTopic( - // csiController.getOutputStream(), - // { topic: data.provides, contentType: data.contentType! }, - // csiController.id - // ); + await this.serviceDiscovery.routeStreamToTopic( + csiController.getOutputStream(), + { topic: new TopicId(data.provides), contentType: data.contentType as ContentType } + ); csiController.outputRouted = true; - // await this.serviceDiscovery.update({ - // provides: data.provides, contentType: data.contentType!, topicName: data.provides - // }); + await this.serviceDiscovery.update({ + provides: data.provides, contentType: data.contentType!, topicName: data.provides + }); } }); csiController.on("end", async (code) => { - this.logger.trace("csiControllerontrolled ended", `Exit code: ${code}`); - - // if (csiController.provides && csiController.provides !== "") { - // csiController.getOutputStream()!.unpipe(this.serviceDiscovery.getData( - // { - // topic: csiController.provides, - // contentType: "" - // } - // ) as Writable); - // } + this.logger.trace("csiControllerontrolled ended", `id: ${csiController.id}`, `Exit code: ${code}`); + + if (csiController.provides && csiController.provides !== "") { + csiController.getOutputStream().unpipe(this.serviceDiscovery.getData( + { + topic: new TopicId(csiController.provides), + contentType: "" as ContentType + } + ) as Writable); + } csiController.logger.unpipe(this.logger); @@ -123,17 +124,24 @@ export class CSIDispatcher extends TypedEmitter { // }, InstanceMessageCode.INSTANCE_ENDED); // this.auditor.auditInstance(id, InstanceMessageCode.INSTANCE_ENDED); - this.emit("end", { id, code }); + this.emit("end", { + id, + code, + info: { + executionTime: csiController.executionTime + }, + sequence: csiController.sequence + }); }); csiController.once("terminated", (code) => { - // if (csiController.requires && csiController.requires !== "") { - // (this.serviceDiscovery.getData({ - // topic: csiController.requires, - // contentType: "", - // }) as Readable - // ).unpipe(csiController.getInputStream()!); - // } + if (csiController.requires && csiController.requires !== "") { + (this.serviceDiscovery.getData({ + topic: new TopicId(csiController.requires), + contentType: "" as ContentType, + }) as Readable + ).unpipe(csiController.getInputStream()!); + } // this.auditor.auditInstance(id, InstanceMessageCode.INSTANCE_ENDED); // this.pushTelemetry("Instance ended", { @@ -144,11 +152,20 @@ export class CSIDispatcher extends TypedEmitter { // code: code.toString(), // seqId: csiController.sequence.id // }); - this.emit("terminated", { id, code }); + + this.emit("terminated", { + id, + code, + info: { + executionTime: csiController.executionTime + }, + sequence: csiController.sequence + }); }); - csiController.start().catch(() => { - //@TODO: handle start error; + csiController.start().catch((e) => { + this.logger.error("CSIC start error", csiController.id, e); + throw new Error("CSIC start error"); }); this.logger.trace("csiController started", id); @@ -189,8 +206,8 @@ export class CSIDispatcher extends TypedEmitter { await new Promise((resolve, _reject) => { const resolveFunction = (eventId: string) => { if (eventId === id) { - resolve(); this.off("established", resolveFunction); + resolve(); } }; diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index a12d8739e..eb27da72e 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -239,7 +239,7 @@ export class Host implements IComponent { this.instanceBase = `${this.config.host.apiBase}/instance`; this.topicsBase = `${this.config.host.apiBase}/topic`; - this.csiDispatcher = new CSIDispatcher(this.socketServer, this.instancesStore, this.sequenceStore, sthConfig); + this.csiDispatcher = new CSIDispatcher(this.instancesStore, this.serviceDiscovery, sthConfig); this.csiDispatcher.logger.pipe(this.logger); @@ -273,7 +273,9 @@ export class Host implements IComponent { } attachDispatcherEvents() { - //this.csiDispatcher.on(); + this.csiDispatcher.on("error", (errorData) => { + this.pushTelemetry("Instance error", { ...errorData }, "error"); + }); } getId() { diff --git a/packages/host/src/lib/serviceDiscovery/sd-adapter.ts b/packages/host/src/lib/serviceDiscovery/sd-adapter.ts index 48fc7bf8b..f82a86e32 100644 --- a/packages/host/src/lib/serviceDiscovery/sd-adapter.ts +++ b/packages/host/src/lib/serviceDiscovery/sd-adapter.ts @@ -76,6 +76,8 @@ export class ServiceDiscovery { const topic = this.topicsController.get(topicName); if (topic) { + config.contentType ||= topic.contentType; + if (topic.contentType !== config.contentType) { this.logger.error("Content-type mismatch, existing and requested ", topic.contentType, config.contentType); throw new Error("Content-type mismatch");