Skip to content

Commit

Permalink
Restore piping instance
Browse files Browse the repository at this point in the history
  • Loading branch information
patuwwy committed Sep 25, 2023
1 parent 12a087e commit 94d5e34
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 54 deletions.
14 changes: 6 additions & 8 deletions packages/host/src/lib/csi-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Events = {

const BPMux = require("bpmux").BPMux;

export type CSIControllerInfo = { ports?: any; created?: Date; started?: Date; ended?: Date; };
/**
* Handles all Instance lifecycle, exposes instance's HTTP API.
*
Expand Down Expand Up @@ -95,7 +96,7 @@ export class CSIController extends TypedEmitter<Events> {
args: Array<any> | undefined;
controlDataStream?: DataStream;
router?: APIRoute;
info: { ports?: any; created?: Date; started?: Date; ended?: Date; } = {};
info: CSIControllerInfo = {};
status: InstanceStatus;
terminated?: { exitcode: number; reason: string; };
provides?: string;
Expand All @@ -111,6 +112,8 @@ export class CSIController extends TypedEmitter<Events> {
apiOutput = new PassThrough();
apiInputEnabled = true;

executionTime: number = -1;

/**
* Topic to which the output stream should be routed
*/
Expand Down Expand Up @@ -186,11 +189,6 @@ export class CSIController extends TypedEmitter<Events> {

this.logger = new ObjLogger(this, { id: this.id });

this.logger.debug("Constructor executed", arguments);

// eslint-disable-next-line no-console
console.log("Constructor executed", arguments);

this.status = InstanceStatus.INITIALIZING;

this.upStreams = [
Expand All @@ -205,8 +203,8 @@ export class CSIController extends TypedEmitter<Events> {
];
}

async start() {
const i = new Promise((res, rej) => {
async start(): Promise<void> {
const i = new Promise<void>((res, rej) => {
this.initResolver = { res, rej };
this.startInstance();
});
Expand Down
105 changes: 61 additions & 44 deletions packages/host/src/lib/csi-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import { RunnerMessageCode } from "@scramjet/symbols";
import { HostProxy, ICommunicationHandler, IObjectLogger, InstanceConfig, MessageDataType, STHConfiguration, STHRestAPI, SequenceInfo } from "@scramjet/types";
import { StartSequencePayload } from "@scramjet/types/src/rest-api-sth";
import { TypedEmitter } from "@scramjet/utility";
import { CSIController } from "./csi-controller";
import { CSIController, CSIControllerInfo } from "./csi-controller";
import { InstanceStore } from "./instance-store";
import SequenceStore from "./sequenceStore";
import { SocketServer } from "./socket-server";
import { ServiceDiscovery } from "./serviceDiscovery/sd-adapter";
import { ContentType } from "./serviceDiscovery/contentType";
import TopicId from "./serviceDiscovery/topicId";
import { Readable, Writable } from "stream";

type errorEventData = {id:string, err: any }
type endEventData = {id:string, code:number }
type endEventData = { id: string, code: number, info: CSIControllerInfo & { executionTime: number }, sequence: SequenceInfo};

type Events = {
pang: (payload: MessageDataType<RunnerMessageCode.PANG>) => void;
hourChime: () => void;
Expand All @@ -24,16 +27,16 @@ type Events = {

export class CSIDispatcher extends TypedEmitter<Events> {
public logger: IObjectLogger;
//private socketServer: SocketServer;
public instancesStore: typeof InstanceStore;
private STHConfig: STHConfiguration;
private serviceDiscovery: ServiceDiscovery;

constructor(_socketServer: SocketServer, instancesStore: typeof InstanceStore, _sequenceStore: SequenceStore, STHConfig: STHConfiguration) {
constructor(instancesStore: typeof InstanceStore, serviceDiscovery: ServiceDiscovery, STHConfig: STHConfiguration) {
super();
this.logger = new ObjLogger(this);
//this.socketServer = socketServer;
this.instancesStore = instancesStore;
this.STHConfig = STHConfig;
this.serviceDiscovery = serviceDiscovery;
}

async createCSIController(
Expand All @@ -53,7 +56,6 @@ export class CSIDispatcher extends TypedEmitter<Events> {
communicationHandler.logger.pipe(this.logger, { end: false });

csiController.on("error", (err) => {
//this.pushTelemetry("Instance error", { ...err }, "error");
this.logger.error("CSIController errored", err.message, err.exitcode);
this.emit("error", { id, err });
});
Expand All @@ -67,47 +69,46 @@ export class CSIDispatcher extends TypedEmitter<Events> {
}

if (data.requires && !csiController.inputRouted && data.contentType) {
this.logger.trace("Routing Sequence input to topic", data.requires);
this.logger.trace("Routing Sequence topic to input", data.requires);

// await this.serviceDiscovery.routeTopicToStream(
// { topic: data.requires, contentType: data.contentType! },
// csiController.getInputStream()
// );
await this.serviceDiscovery.routeTopicToStream(
{ topic: new TopicId(data.requires), contentType: data.contentType as ContentType },
csiController.getInputStream()
);

csiController.inputRouted = true;

// await this.serviceDiscovery.update({
// requires: data.requires, contentType: data.contentType!, topicName: data.requires
// });
await this.serviceDiscovery.update({
requires: data.requires, contentType: data.contentType, topicName: data.requires
});
}

if (data.provides && !csiController.outputRouted && data.contentType) {
this.logger.trace("Routing Sequence output to topic", data.provides);
// await this.serviceDiscovery.routeStreamToTopic(
// csiController.getOutputStream(),
// { topic: data.provides, contentType: data.contentType! },
// csiController.id
// );
await this.serviceDiscovery.routeStreamToTopic(
csiController.getOutputStream(),
{ topic: new TopicId(data.provides), contentType: data.contentType as ContentType }
);

csiController.outputRouted = true;

// await this.serviceDiscovery.update({
// provides: data.provides, contentType: data.contentType!, topicName: data.provides
// });
await this.serviceDiscovery.update({
provides: data.provides, contentType: data.contentType!, topicName: data.provides
});
}
});

csiController.on("end", async (code) => {
this.logger.trace("csiControllerontrolled ended", `Exit code: ${code}`);

// if (csiController.provides && csiController.provides !== "") {
// csiController.getOutputStream()!.unpipe(this.serviceDiscovery.getData(
// {
// topic: csiController.provides,
// contentType: ""
// }
// ) as Writable);
// }
if (csiController.provides && csiController.provides !== "") {
csiController.getOutputStream().unpipe(this.serviceDiscovery.getData(
{
topic: new TopicId(csiController.provides),
contentType: "" as ContentType
}
) as Writable);
}

csiController.logger.unpipe(this.logger);

Expand All @@ -123,17 +124,24 @@ export class CSIDispatcher extends TypedEmitter<Events> {
// }, InstanceMessageCode.INSTANCE_ENDED);

// this.auditor.auditInstance(id, InstanceMessageCode.INSTANCE_ENDED);
this.emit("end", { id, code });
this.emit("end", {
id,
code,
info: {
executionTime: csiController.executionTime
},
sequence: csiController.sequence
});
});

csiController.once("terminated", (code) => {
// if (csiController.requires && csiController.requires !== "") {
// (this.serviceDiscovery.getData({
// topic: csiController.requires,
// contentType: "",
// }) as Readable
// ).unpipe(csiController.getInputStream()!);
// }
if (csiController.requires && csiController.requires !== "") {
(this.serviceDiscovery.getData({
topic: new TopicId(csiController.requires),
contentType: "" as ContentType,
}) as Readable
).unpipe(csiController.getInputStream()!);
}

// this.auditor.auditInstance(id, InstanceMessageCode.INSTANCE_ENDED);
// this.pushTelemetry("Instance ended", {
Expand All @@ -144,11 +152,20 @@ export class CSIDispatcher extends TypedEmitter<Events> {
// code: code.toString(),
// seqId: csiController.sequence.id
// });
this.emit("terminated", { id, code });

this.emit("terminated", {
id,
code,
info: {
executionTime: csiController.executionTime
},
sequence: csiController.sequence
});
});

csiController.start().catch(() => {
//@TODO: handle start error;
csiController.start().catch((e) => {
this.logger.error("CSIC start error", csiController.id, e);
throw new Error("CSIC start error");
});

this.logger.trace("csiController started", id);
Expand Down Expand Up @@ -189,8 +206,8 @@ export class CSIDispatcher extends TypedEmitter<Events> {
await new Promise<void>((resolve, _reject) => {
const resolveFunction = (eventId: string) => {
if (eventId === id) {
resolve();
this.off("established", resolveFunction);
resolve();
}
};

Expand Down
6 changes: 4 additions & 2 deletions packages/host/src/lib/host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ export class Host implements IComponent {
this.instanceBase = `${this.config.host.apiBase}/instance`;
this.topicsBase = `${this.config.host.apiBase}/topic`;

this.csiDispatcher = new CSIDispatcher(this.socketServer, this.instancesStore, this.sequenceStore, sthConfig);
this.csiDispatcher = new CSIDispatcher(this.instancesStore, this.serviceDiscovery, sthConfig);

this.csiDispatcher.logger.pipe(this.logger);

Expand Down Expand Up @@ -273,7 +273,9 @@ export class Host implements IComponent {
}

attachDispatcherEvents() {
//this.csiDispatcher.on();
this.csiDispatcher.on("error", (errorData) => {
this.pushTelemetry("Instance error", { ...errorData }, "error");
});
}

getId() {
Expand Down
2 changes: 2 additions & 0 deletions packages/host/src/lib/serviceDiscovery/sd-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ export class ServiceDiscovery {
const topic = this.topicsController.get(topicName);

if (topic) {
config.contentType ||= topic.contentType;

if (topic.contentType !== config.contentType) {
this.logger.error("Content-type mismatch, existing and requested ", topic.contentType, config.contentType);
throw new Error("Content-type mismatch");
Expand Down

0 comments on commit 94d5e34

Please sign in to comment.