From 58638d1e5ce9324afd05789cd63fd67fb47c9f5f Mon Sep 17 00:00:00 2001 From: patuwwy Date: Fri, 26 Jan 2024 01:23:08 +0000 Subject: [PATCH] Fix input & not-serialized output on reconnect --- .../adapters/src/process-instance-adapter.ts | 4 +- packages/host/src/lib/csi-controller.ts | 18 ++-- packages/host/src/lib/csi-dispatcher.ts | 10 +- packages/host/src/lib/host.ts | 2 +- .../src/lib/serviceDiscovery/sd-adapter.ts | 3 +- packages/host/src/lib/socket-server.ts | 1 + packages/runner/src/host-client.ts | 37 +++++++- packages/runner/src/runner.ts | 95 +++++++++++++------ packages/types/src/messages/handshake.ts | 7 +- .../types/src/rest-api-sth/start-sequence.ts | 2 +- 10 files changed, 126 insertions(+), 53 deletions(-) diff --git a/packages/adapters/src/process-instance-adapter.ts b/packages/adapters/src/process-instance-adapter.ts index b4f6822e6..4a286aa11 100644 --- a/packages/adapters/src/process-instance-adapter.ts +++ b/packages/adapters/src/process-instance-adapter.ts @@ -254,7 +254,9 @@ class ProcessInstanceAdapter implements process.kill(this.processPID, 0); } catch (e) { this.logger.error("Runner process not exists", e); - /** process not exists */ + + clearInterval(interval); + reject("pid not exists"); } } diff --git a/packages/host/src/lib/csi-controller.ts b/packages/host/src/lib/csi-controller.ts index 87fec31ca..e5f08422f 100644 --- a/packages/host/src/lib/csi-controller.ts +++ b/packages/host/src/lib/csi-controller.ts @@ -116,6 +116,7 @@ export class CSIController extends TypedEmitter { apiInputEnabled = true; executionTime: number = -1; + inputHeadersSent = false; /** * Topic to which the output stream should be routed @@ -403,9 +404,10 @@ export class CSIController extends TypedEmitter { .pipe(this.upStreams[CC.CONTROL]); this.communicationHandler.addMonitoringHandler(RunnerMessageCode.PING, async (message) => { - const { status, payload } = message[1]; + const { status, payload, inputHeadersSent } = message[1]; this.status = status || InstanceStatus.RUNNING; + this.inputHeadersSent = inputHeadersSent; if (!payload) { this.emit("error", "No payload in ping!"); @@ -485,6 +487,10 @@ export class CSIController extends TypedEmitter { this.logger.trace("Received a PING message with ports config"); } + this.inputHeadersSent = !!message[1].inputHeadersSent; + + this.logger.info("Headers already sent for input?", this.inputHeadersSent); + if (this.instanceAdapter.setRunner) { await this.instanceAdapter.setRunner({ ...message[1].payload.system, @@ -537,8 +543,6 @@ export class CSIController extends TypedEmitter { } createInstanceAPIRouter() { - let inputHeadersSent = false; - if (!this.upStreams) { throw new AppError("UNATTACHED_STREAMS"); } @@ -551,11 +555,11 @@ export class CSIController extends TypedEmitter { * @experimental */ this.router.duplex("/inout", (duplex, _headers) => { - if (!inputHeadersSent) { + if (!this.inputHeadersSent) { this.downStreams![CC.IN].write(`Content-Type: ${_headers["content-type"]}\r\n`); this.downStreams![CC.IN].write("\r\n"); - inputHeadersSent = true; + this.inputHeadersSent = true; } (duplex as unknown as DuplexStream).input.pipe(this.downStreams![CC.IN], { end: false }); @@ -597,7 +601,7 @@ export class CSIController extends TypedEmitter { const contentType = req.headers["content-type"]; // @TODO: Check if subsequent requests have the same content-type. - if (!inputHeadersSent) { + if (!this.inputHeadersSent) { if (contentType === undefined) { return { opStatus: ReasonPhrases.NOT_ACCEPTABLE, error: "Content-Type must be defined" }; } @@ -605,7 +609,7 @@ export class CSIController extends TypedEmitter { stream.write(`Content-Type: ${contentType}\r\n`); stream.write("\r\n"); - inputHeadersSent = true; + this.inputHeadersSent = true; } return stream; diff --git a/packages/host/src/lib/csi-dispatcher.ts b/packages/host/src/lib/csi-dispatcher.ts index b877b1965..dafc5a344 100644 --- a/packages/host/src/lib/csi-dispatcher.ts +++ b/packages/host/src/lib/csi-dispatcher.ts @@ -66,12 +66,14 @@ export class CSIDispatcher extends TypedEmitter { id, sequenceInfo, payload, - status: InstanceStatus.INITIALIZING + status: InstanceStatus.INITIALIZING, + inputHeadersSent: false }, communicationHandler, config, instanceProxy, this.STHConfig.runtimeAdapter); this.logger.trace("CSIController created", id, sequenceInfo); csiController.logger.pipe(this.logger, { end: false }); + communicationHandler.logger.pipe(this.logger, { end: false }); csiController @@ -99,15 +101,15 @@ export class CSIDispatcher extends TypedEmitter { this.logger.warn("Missing topic content-type"); } - if (data.requires && !csiController.inputRouted && data.contentType) { - this.logger.trace("Routing topic to Sequence input", data.requires); + if (data.requires && data.contentType) { + this.logger.trace("Routing topic to Instance input", data.requires); await this.serviceDiscovery.routeTopicToStream( { topic: new TopicId(data.requires), contentType: data.contentType as ContentType }, csiController.getInputStream() ); - csiController.inputRouted = true; + csiController.inputHeadersSent = true; await this.serviceDiscovery.update({ requires: data.requires, contentType: data.contentType, topicName: data.requires, status: "add" diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index a4a107d66..e4436d906 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -340,7 +340,7 @@ export class Host implements IComponent { const seq = this.sequenceStore.getById(instance.sequence.id); - if (!seq) { + if (!seq && this.cpmConnector?.connected) { this.logger.info("Sequence not found. Checking Store..."); try { diff --git a/packages/host/src/lib/serviceDiscovery/sd-adapter.ts b/packages/host/src/lib/serviceDiscovery/sd-adapter.ts index 43c54e833..e5ea3b3de 100644 --- a/packages/host/src/lib/serviceDiscovery/sd-adapter.ts +++ b/packages/host/src/lib/serviceDiscovery/sd-adapter.ts @@ -179,9 +179,8 @@ export class ServiceDiscovery { } async update(data: STHTopicEventData) { - this.logger.trace("Topic update. Send topic info to CPM", data); - if (this.cpmConnector?.connected) { + this.logger.trace("Topic update. Send topic info to CPM", data); await this.cpmConnector?.sendTopicInfo(data); } } diff --git a/packages/host/src/lib/socket-server.ts b/packages/host/src/lib/socket-server.ts index b17017180..400d5d1c9 100644 --- a/packages/host/src/lib/socket-server.ts +++ b/packages/host/src/lib/socket-server.ts @@ -77,6 +77,7 @@ export class SocketServer extends TypedEmitter implements IComponent { this.server! .listen(this.port, this.hostname, () => { this.logger.info("SocketServer on", this.server?.address()); + res(); }) .on("error", rej); diff --git a/packages/runner/src/host-client.ts b/packages/runner/src/host-client.ts index 1bd4267d7..93aeb2104 100644 --- a/packages/runner/src/host-client.ts +++ b/packages/runner/src/host-client.ts @@ -2,8 +2,10 @@ import { ObjLogger } from "@scramjet/obj-logger"; import { CommunicationChannel as CC } from "@scramjet/symbols"; import { IHostClient, IObjectLogger, UpstreamStreamsConfig, } from "@scramjet/types"; +import { defer } from "@scramjet/utility"; import { Agent } from "http"; import net, { Socket, createConnection } from "net"; +import { PassThrough } from "stream"; type HostOpenConnections = [ net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket @@ -42,20 +44,24 @@ class HostClient implements IHostClient { async init(id: string): Promise { const openConnections = await Promise.all( Array.from(Array(9)) - .map(() => { + .map((_e: any, i: number) => { // Error handling for each connection is process crash for now let connection: Socket; try { connection = net.createConnection(this.instancesServerPort, this.instancesServerHost); - connection.on("error", () => {}); + connection.on("error", () => { + this.logger.warn(`${i} Stream error`); + }); connection.setNoDelay(true); } catch (e) { return Promise.reject(e); } return new Promise(res => { - connection.on("connect", () => res(connection)); + connection.on("connect", () => { + res(connection); + }); }); }) .map((connPromised, index) => { @@ -74,6 +80,26 @@ class HostClient implements IHostClient { this._streams = openConnections as HostOpenConnections; + const input = this._streams[CC.IN]; + + const inputTarget = new PassThrough({ emitClose: false }); + + input.on("end", async () => { + await defer(500); + + if ((this._streams![CC.CONTROL] as net.Socket).readableEnded) { + this.logger.info("Input end. Control is also ended... We are disconnected."); + } else { + this.logger.info("Input end. Control not ended. We are online. Desired input end."); + inputTarget.end(); + } + }); + + input.pipe(inputTarget, { end: false }); + + this._streams[CC.IN] = inputTarget; + //this._streams[CC.STDIN] = this._streams[CC.STDIN].pipe(new PassThrough({ emitClose: false }), { end: false }); + try { this.bpmux = new BPMux(this._streams[CC.PACKAGE]); } catch (e) { @@ -118,6 +144,11 @@ class HostClient implements IHostClient { const streamsExitedPromised: Promise[] = this.streams.map((stream, i) => new Promise( (res) => { + if ([CC.IN, CC.STDIN, CC.CONTROL].includes(i)) { + res(); + return; + } + if (!hard && "writable" in stream!) { stream .on("error", (e) => { diff --git a/packages/runner/src/runner.ts b/packages/runner/src/runner.ts index 5543b57cd..83f000cfb 100644 --- a/packages/runner/src/runner.ts +++ b/packages/runner/src/runner.ts @@ -32,7 +32,7 @@ import { ManagerClient } from "@scramjet/manager-api-client"; import { BufferStream, DataStream, StringStream } from "scramjet"; import { EventEmitter } from "events"; -import { createWriteStream, writeFileSync } from "fs"; +import { WriteStream, createWriteStream, writeFileSync } from "fs"; import { Readable, Writable } from "stream"; import { RunnerAppContext, RunnerProxy } from "./runner-app-context"; @@ -52,6 +52,7 @@ function onBeforeExit(code: number) { } function onException(_error: Error) { + console.error(_error); onBeforeExit(RunnerExitCode.UNCAUGHT_EXCEPTION); } @@ -156,12 +157,17 @@ export class Runner implements IComponent { private provides?: string; private providesContentType?: string; + private inputContentType: string = ""; + private shouldSerialize = false; private status: InstanceStatus = InstanceStatus.STARTING; + private logFile?: WriteStream; private runnerConnectInfo: RunnerConnectInfo = { appConfig: {} }; + instanceOutput?: Readable & HasTopicInformation | void; + constructor( private sequencePath: string, private hostClient: IHostClient, @@ -184,7 +190,9 @@ export class Runner implements IComponent { } if (process.env.RUNNER_LOG_FILE) { - this.logger.addOutput(createWriteStream(process.env.RUNNER_LOG_FILE)); + this.logFile ||= createWriteStream(process.env.RUNNER_LOG_FILE); + this.logFile.write("\n\n------------- \n\n"); + this.logger.addOutput(this.logFile); } this.inputDataStream = new DataStream().catch((e: any) => { @@ -253,11 +261,11 @@ export class Runner implements IComponent { } async setInputContentType(headers: any) { - const contentType = headers["content-type"]; + this.inputContentType ||= headers["content-type"]; - this.logger.debug("Content-Type", contentType); + this.logger.debug("Content-Type", this.inputContentType); - mapToInputDataStream(this.hostClient.inputStream, contentType) + mapToInputDataStream(this.hostClient.inputStream, this.inputContentType) .catch((error: any) => { this.logger.error("mapToInputDataStream", error); // TODO: we should be doing some error handling here: @@ -266,6 +274,8 @@ export class Runner implements IComponent { } async handleMonitoringRequest(data: MonitoringRateMessageData): Promise { + this.logger.info("handleMonitoringRequest"); + if (this.monitoringInterval) { clearInterval(this.monitoringInterval); } @@ -273,22 +283,22 @@ export class Runner implements IComponent { let working = false; this.monitoringInterval = setInterval(async () => { + this.logger.info("working", working); + if (working) { - return; + //return; } working = true; await this.reportHealth(1000); working = false; - }, 1000 / data.monitoringRate).unref(); + }, 1000 / data.monitoringRate);//.unref(); } private async reportHealth(timeout?: number) { - const { healthy } = await this.context.monitor(); + this.logger.info("Report health"); - MessageUtils.writeMessageOnStream( - [RunnerMessageCode.MONITORING, { healthy }], this.hostClient.monitorStream - ); + const { healthy } = await this.context.monitor(); if (timeout) { this.monitoringMessageReplyTimeout = setTimeout(async () => { @@ -297,6 +307,10 @@ export class Runner implements IComponent { await this.handleDisconnect(); }, timeout); } + + MessageUtils.writeMessageOnStream( + [RunnerMessageCode.MONITORING, { healthy }], this.hostClient.monitorStream + ); } async handleDisconnect() { @@ -344,6 +358,7 @@ export class Runner implements IComponent { this.logger.trace("Exiting (expected)"); this.status = InstanceStatus.STOPPING; + return this.exit(RunnerExitCode.STOPPED); } @@ -395,6 +410,9 @@ export class Runner implements IComponent { await promiseTimeout(this.hostClient.init(this.instanceId), 2000); this.logger.debug("connected"); this.connected = true; + + this.hostClient.inputStream.pipe(this.logFile!); + await this.handleMonitoringRequest({ monitoringRate: 1 }); } catch (e) { this.connected = false; @@ -411,6 +429,10 @@ export class Runner implements IComponent { this.logger.debug("Defining control stream"); this.defineControlStream(); + if (this.inputContentType) { + await this.setInputContentType({ headers: { "content-type": this.inputContentType } }); + } + this.hostClient.stdinStream .on("data", (chunk) => process.stdin.unshift(chunk)) .on("end", () => process.stdin.emit("end")); @@ -546,10 +568,20 @@ export class Runner implements IComponent { private redirectOutputs() { this.logger.pipe(this.hostClient.logStream, { stringified: true }); + + if (!this.shouldSerialize) { + this.instanceOutput?.pipe(this.hostClient.outputStream); + } + this.outputDataStream .JSONStringify() .pipe(this.hostClient.outputStream); + if (process.env.PRINT_TO_STDOUT) { + process.stdout.pipe(this.logFile!); + process.stderr.pipe(this.logFile!); + } + overrideStandardStream(process.stdout, this.hostClient.stdoutStream); overrideStandardStream(process.stderr, this.hostClient.stderrStream); } @@ -608,7 +640,8 @@ export class Runner implements IComponent { processPID: process.pid.toString() } }, - status: this.status + status: this.status, + inputHeadersSent: !!this.inputContentType }], this.hostClient.monitorStream); this.logger.trace("Handshake sent"); @@ -648,9 +681,9 @@ export class Runner implements IComponent { * * Pass the input stream to stream instead of creating new DataStream(); */ - let stream: Readable & HasTopicInformation | void = this.inputDataStream; + this.instanceOutput = this.inputDataStream; let itemsLeftInSequence = sequence.length; - let intermediate: SynchronousStreamable | void = stream; + let intermediate: SynchronousStreamable | void = this.instanceOutput; for (const func of sequence) { itemsLeftInSequence--; @@ -664,7 +697,7 @@ export class Runner implements IComponent { out = func.call( this.context, - stream, + this.instanceOutput, ...args ); @@ -691,11 +724,11 @@ export class Runner implements IComponent { } else if (typeof intermediate === "object" && intermediate instanceof DataStream) { this.logger.debug("Sequence function returned DataStream.", sequence.length - itemsLeftInSequence - 1); - stream = intermediate; + this.instanceOutput = intermediate; } else { this.logger.debug("Sequence function returned readable", sequence.length - itemsLeftInSequence - 1); // TODO: what if this is not a DataStream, but BufferStream stream!!!! - stream = DataStream.from(intermediate as Readable); + this.instanceOutput = DataStream.from(intermediate as Readable); } } else { this.logger.info("All Sequences processed."); @@ -703,17 +736,17 @@ export class Runner implements IComponent { intermediate = await out; if (intermediate instanceof Readable) { - stream = intermediate; + this.instanceOutput = intermediate; } else if (intermediate !== undefined && isSynchronousStreamable(intermediate)) { - stream = Object.assign(DataStream.from(intermediate as Readable, { highWaterMark: 0 }), { + this.instanceOutput = Object.assign(DataStream.from(intermediate as Readable, { highWaterMark: 0 }), { topic: intermediate.topic, contentType: intermediate.contentType }); } else { - stream = undefined; + this.instanceOutput = undefined; } - this.logger.debug("Stream type is", typeof stream); + this.logger.debug("Stream type is", typeof this.instanceOutput); } } @@ -735,27 +768,27 @@ export class Runner implements IComponent { this.sendPang({ provides: "", contentType: "" }); res(); - } else if (stream && this.hostClient.outputStream) { - this.logger.trace("Piping Sequence output", typeof stream); + } else if (this.instanceOutput && this.hostClient.outputStream) { + this.logger.trace("Piping Sequence output", typeof this.instanceOutput); - const shouldSerialize = stream.contentType && - ["application/x-ndjson", "text/x-ndjson"].includes(stream.contentType) || - stream instanceof DataStream && !( - stream instanceof StringStream || stream instanceof BufferStream + this.shouldSerialize = this.instanceOutput.contentType && + ["application/x-ndjson", "text/x-ndjson"].includes(this.instanceOutput.contentType) || + this.instanceOutput instanceof DataStream && !( + this.instanceOutput instanceof StringStream || this.instanceOutput instanceof BufferStream ); - stream + this.instanceOutput .once("end", () => { this.logger.debug("Sequence stream ended"); res(); }) - .pipe(shouldSerialize + .pipe(this.shouldSerialize ? this.outputDataStream : this.hostClient.outputStream ); - this.provides = intermediate.topic || ""; - this.providesContentType = intermediate.contentType || ""; + this.provides = intermediate.topic || ""; + this.providesContentType = intermediate.contentType || ""; this.sendPang({ provides: this.provides, contentType: this.providesContentType }); } else { diff --git a/packages/types/src/messages/handshake.ts b/packages/types/src/messages/handshake.ts index a4791c842..338d9c4af 100644 --- a/packages/types/src/messages/handshake.ts +++ b/packages/types/src/messages/handshake.ts @@ -21,10 +21,11 @@ export type PingMessageData = { sequenceInfo: SequenceInfo; created: number; status: InstanceStatus; + inputHeadersSent: boolean; }; export type PangMessageData = { - requires?: string, - contentType?: string, - provides?: string + requires?: string; + contentType?: string; + provides?: string; }; diff --git a/packages/types/src/rest-api-sth/start-sequence.ts b/packages/types/src/rest-api-sth/start-sequence.ts index d8c17bb5a..fc9454200 100644 --- a/packages/types/src/rest-api-sth/start-sequence.ts +++ b/packages/types/src/rest-api-sth/start-sequence.ts @@ -2,4 +2,4 @@ import { RunnerConnectInfo } from "../runner-connect"; export type StartSequenceResponse = { id: string }; -export type StartSequencePayload = Omit; +export type StartSequencePayload = Omit, "inputContentType">;