From 08b71fe7b0201559e07aec52d0dcfbeeec382540 Mon Sep 17 00:00:00 2001 From: patuwwy Date: Mon, 18 Sep 2023 11:31:54 +0000 Subject: [PATCH] Reconnect. Fix lint issues --- .../src/kubernetes-instance-adapter.ts | 16 ++--- .../adapters/src/process-instance-adapter.ts | 20 +++--- packages/host/src/lib/csi-controller.ts | 50 +++++++-------- packages/host/src/lib/csi-dispatcher.ts | 25 +++++--- packages/host/src/lib/host.ts | 41 +++++++------ packages/model/src/stream-handler.ts | 4 +- packages/runner/src/bin/start-runner.ts | 1 - packages/runner/src/host-client.ts | 10 +-- packages/runner/src/runner.ts | 61 ++++++++++--------- packages/types/src/messages/monitor-reply.ts | 9 +++ packages/utility/src/index.ts | 17 +++--- 11 files changed, 141 insertions(+), 113 deletions(-) create mode 100644 packages/types/src/messages/monitor-reply.ts diff --git a/packages/adapters/src/kubernetes-instance-adapter.ts b/packages/adapters/src/kubernetes-instance-adapter.ts index 6f5f508f0..05e13125d 100644 --- a/packages/adapters/src/kubernetes-instance-adapter.ts +++ b/packages/adapters/src/kubernetes-instance-adapter.ts @@ -13,15 +13,15 @@ import { STHConfiguration, } from "@scramjet/types"; -import path from "path"; import { ObjLogger } from "@scramjet/obj-logger"; +import { RunnerExitCode } from "@scramjet/symbols"; +import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect"; import { createReadStream } from "fs"; +import path from "path"; +import { PassThrough } from "stream"; +import { getRunnerEnvEntries } from "./get-runner-env"; import { KubernetesClientAdapter } from "./kubernetes-client-adapter"; import { adapterConfigDecoder } from "./kubernetes-config-decoder"; -import { getRunnerEnvEntries } from "./get-runner-env"; -import { PassThrough } from "stream"; -import { RunnerExitCode } from "@scramjet/symbols"; -import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect"; /** * Adapter for running Instance by Runner executed in separate process. @@ -89,7 +89,7 @@ IComponent { } }; } - async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise { + async dispatch(_config: InstanceConfig, _instancesServerPort: number, _instanceId: string, _sequenceInfo: SequenceInfo, _payload: RunnerConnectInfo): Promise { throw Error("not implemented"); } @@ -185,11 +185,11 @@ IComponent { return 0; } - async waitUntilExit(config: InstanceConfig, instanceId:string, _sequenceInfo: SequenceInfo): Promise { + async waitUntilExit(_config: InstanceConfig, _instanceId:string, _sequenceInfo: SequenceInfo): Promise { + this.logger.debug("WaitUntilExit", [_config, _instanceId, _sequenceInfo]); throw Error("Not implemented"); } - async cleanup(): Promise { await this.remove(this.adapterConfig.timeout); } diff --git a/packages/adapters/src/process-instance-adapter.ts b/packages/adapters/src/process-instance-adapter.ts index 8765fc4cd..05f6849d7 100644 --- a/packages/adapters/src/process-instance-adapter.ts +++ b/packages/adapters/src/process-instance-adapter.ts @@ -180,10 +180,10 @@ class ProcessInstanceAdapter implements getRunnerInfo(): RunnerConnectInfo["system"] { return { processPID: this.processPID.toString() - } + }; } - async waitUntilExit(config: InstanceConfig, instanceId: string, _sequenceInfo: SequenceInfo): Promise { + async waitUntilExit(_config: InstanceConfig, _instanceId: string, _sequenceInfo: SequenceInfo): Promise { if (this.runnerProcess) { const [statusCode, signal] = await new Promise<[number | null, NodeJS.Signals | null]>( (res) => this.runnerProcess?.on("exit", (code, sig) => res([code, sig])) @@ -207,34 +207,34 @@ class ProcessInstanceAdapter implements // When no process reference Wait for file created by runner return new Promise((res, reject) => { - const interval = setInterval(async() => { + const interval = setInterval(async () => { if (this.processPID < 1) return; const filePath = `/tmp/runner-${this.processPID}`; try { - await access(filePath, constants.F_OK) + await access(filePath, constants.F_OK); clearInterval(interval); - const data = await readFile(filePath, 'utf8').catch((readErr) => { + const data = await readFile(filePath, "utf8").catch((readErr) => { this.logger.error(`Cant' read runner exit code from: ${readErr}`); reject(readErr); return; - }) + }); this.logger.debug("exitCode saved to file by runner:", data, filePath); rm(filePath).then(() => { this.logger.debug("File removed"); - }, (err) => { - this.logger.error("Can't remove exitcode file"); - }) + }, (err: any) => { + this.logger.error("Can't remove exitcode file", err); + }); res(parseInt(data!, 10)); } catch (err) { /** file not exists */ - }; + } }, 1000); }); } diff --git a/packages/host/src/lib/csi-controller.ts b/packages/host/src/lib/csi-controller.ts index 4be838fb0..fcc4360af 100644 --- a/packages/host/src/lib/csi-controller.ts +++ b/packages/host/src/lib/csi-controller.ts @@ -1,49 +1,49 @@ +import { + AppError, + CSIControllerError, + HostError, + InstanceAdapterError, + MessageUtilities +} from "@scramjet/model"; +import { development } from "@scramjet/sth-config"; +import { CommunicationChannel as CC, RunnerExitCode, RunnerMessageCode } from "@scramjet/symbols"; import { APIRoute, AppConfig, DownstreamStreamsConfig, EncodedMessage, HandshakeAcknowledgeMessage, - ParsedMessage, - PassThroughStreamsConfig, - ReadableStream, - SequenceInfo, - WritableStream, + HostProxy, + ICommunicationHandler, ILifeCycleAdapterRun, - MessageDataType, - IObjectLogger, - STHRestAPI, - STHConfiguration, InstanceLimits, + InstanceStats, InstanceStatus, + IObjectLogger, + MessageDataType, MonitoringMessageData, - InstanceStats, OpResponse, + ParsedMessage, + PassThroughStreamsConfig, + ReadableStream, + SequenceInfo, + STHConfiguration, + STHRestAPI, StopSequenceMessageData, - HostProxy, - ICommunicationHandler + WritableStream } from "@scramjet/types"; -import { - AppError, - CSIControllerError, - HostError, - MessageUtilities, - InstanceAdapterError -} from "@scramjet/model"; -import { CommunicationChannel as CC, RunnerExitCode, RunnerMessageCode } from "@scramjet/symbols"; import { Duplex, PassThrough, Readable } from "stream"; -import { development } from "@scramjet/sth-config"; -import { DataStream } from "scramjet"; +import { DuplexStream, getRouter } from "@scramjet/api-server"; import { EventEmitter, once } from "events"; import { ServerResponse } from "http"; -import { DuplexStream, getRouter } from "@scramjet/api-server"; +import { DataStream } from "scramjet"; import { getInstanceAdapter } from "@scramjet/adapters"; -import { cancellableDefer, CancellablePromise, defer, promiseTimeout, TypedEmitter } from "@scramjet/utility"; import { ObjLogger } from "@scramjet/obj-logger"; -import { ReasonPhrases } from "http-status-codes"; import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect"; +import { cancellableDefer, CancellablePromise, defer, promiseTimeout, TypedEmitter } from "@scramjet/utility"; +import { ReasonPhrases } from "http-status-codes"; /** * @TODO: Runner exits after 10secs and k8s client checks status every 500ms so we need to give it some time diff --git a/packages/host/src/lib/csi-dispatcher.ts b/packages/host/src/lib/csi-dispatcher.ts index dd6a69c1b..6a779596d 100644 --- a/packages/host/src/lib/csi-dispatcher.ts +++ b/packages/host/src/lib/csi-dispatcher.ts @@ -7,8 +7,8 @@ import { StartSequencePayload } from "@scramjet/types/src/rest-api-sth"; import { TypedEmitter } from "@scramjet/utility"; import { CSIController } from "./csi-controller"; import { InstanceStore } from "./instance-store"; -import { SocketServer } from "./socket-server"; import SequenceStore from "./sequenceStore"; +import { SocketServer } from "./socket-server"; type errorEventData = {id:string, err: any } type endEventData = {id:string, code:number } @@ -43,7 +43,7 @@ export class CSIDispatcher extends TypedEmitter { communicationHandler: ICommunicationHandler, config: STHConfiguration, instanceProxy: HostProxy) { - sequenceInfo.instances = sequenceInfo.instances || new Set(); + sequenceInfo.instances = sequenceInfo.instances || []; const csiController = new CSIController({ id, sequenceInfo, payload }, communicationHandler, config, instanceProxy, this.STHConfig.runtimeAdapter); @@ -58,10 +58,15 @@ export class CSIDispatcher extends TypedEmitter { this.emit("error", { id, err }); }); + // eslint-disable-next-line complexity csiController.on("pang", async (data) => { this.logger.trace("PANG received", data); - if (data.requires && !csiController.inputRouted) { + if ((data.requires || data.provides) && !data.contentType) { + this.logger.warn("Missing topic content-type"); + } + + if (data.requires && !csiController.inputRouted && data.contentType) { this.logger.trace("Routing Sequence input to topic", data.requires); // await this.serviceDiscovery.routeTopicToStream( @@ -76,7 +81,7 @@ export class CSIDispatcher extends TypedEmitter { // }); } - if (data.provides && !csiController.outputRouted) { + if (data.provides && !csiController.outputRouted && data.contentType) { this.logger.trace("Routing Sequence output to topic", data.provides); // await this.serviceDiscovery.routeStreamToTopic( // csiController.getOutputStream(), @@ -108,7 +113,9 @@ export class CSIDispatcher extends TypedEmitter { delete InstanceStore[csiController.id]; - sequenceInfo.instances.filter(a => a !== id); + sequenceInfo.instances = sequenceInfo.instances.filter(item => { + return item !== id; + }); // await this.cpmConnector?.sendInstanceInfo({ // id: csiController.id, @@ -130,7 +137,9 @@ export class CSIDispatcher extends TypedEmitter { // this.auditor.auditInstance(id, InstanceMessageCode.INSTANCE_ENDED); // this.pushTelemetry("Instance ended", { - // executionTime: csiController.info.ended && csiController.info.started ? ((csiController.info.ended?.getTime() - csiController.info.started.getTime()) / 1000).toString() : "-1", + // executionTime: csiController.info.ended && csiController.info.started + // ? ((csiController.info.ended?.getTime() - csiController.info.started.getTime()) / 1000).toString() + // : "-1", // id: csiController.id, // code: code.toString(), // seqId: csiController.sequence.id @@ -138,7 +147,9 @@ export class CSIDispatcher extends TypedEmitter { this.emit("terminated", { id, code }); }); - csiController.start().then(() => {}, () => {}); + csiController.start().catch(() => { + //@TODO: handle start error; + }); this.logger.trace("csiController started", id); diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index 97a735058..a12d8739e 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -20,10 +20,10 @@ import { OpResponse, ParsedMessage, PublicSTHConfiguration, - SequenceInfo, - StartSequenceDTO, STHConfiguration, - STHRestAPI + STHRestAPI, + SequenceInfo, + StartSequenceDTO } from "@scramjet/types"; import { getSequenceAdapter, initializeRuntimeAdapters } from "@scramjet/adapters"; @@ -42,17 +42,17 @@ import { readFileSync } from "fs"; import { cpus, totalmem } from "os"; import { DataStream } from "scramjet"; import { inspect } from "util"; -import { Auditor } from "./auditor"; +import { AuditedRequest, Auditor } from "./auditor"; import { auditMiddleware, logger as auditMiddlewareLogger } from "./middlewares/audit"; import { corsMiddleware } from "./middlewares/cors"; import { optionsMiddleware } from "./middlewares/options"; import { S3Client } from "./s3-client"; import { ServiceDiscovery } from "./serviceDiscovery/sd-adapter"; import { SocketServer } from "./socket-server"; + import SequenceStore from "./sequenceStore"; import TopicRouter from "./serviceDiscovery/topicRouter"; -import { GetSequenceResponse } from "@scramjet/types/src/rest-api-sth"; import { loadModule, logger as loadModuleLogger } from "@scramjet/module-loader"; import { CSIDispatcher } from "./csi-dispatcher"; @@ -185,6 +185,7 @@ export class Host implements IComponent { constructor(apiServer: APIExpose, socketServer: SocketServer, sthConfig: STHConfiguration) { this.config = sthConfig; this.publicConfig = ConfigService.getConfigInfo(sthConfig); + this.sequenceStore = new SequenceStore(); this.logger = new ObjLogger( this, @@ -672,7 +673,7 @@ export class Host implements IComponent { this.logger.trace("Sequence removed:", id); // eslint-disable-next-line max-len - await this.cpmConnector?.sendSequenceInfo(id, SequenceMessageCode.SEQUENCE_DELETED, sequenceInfo as unknown as GetSequenceResponse); + await this.cpmConnector?.sendSequenceInfo(id, SequenceMessageCode.SEQUENCE_DELETED, sequenceInfo as unknown as STHRestAPI.GetSequenceResponse); this.auditor.auditSequence(id, SequenceMessageCode.SEQUENCE_DELETED); return { @@ -805,7 +806,7 @@ export class Host implements IComponent { this.logger.trace(`Sequence identified: ${config.id}`); // eslint-disable-next-line max-len - await this.cpmConnector?.sendSequenceInfo(id, SequenceMessageCode.SEQUENCE_CREATED, config as unknown as GetSequenceResponse); + await this.cpmConnector?.sendSequenceInfo(id, SequenceMessageCode.SEQUENCE_CREATED, config as unknown as STHRestAPI.GetSequenceResponse); this.auditor.auditSequence(id, SequenceMessageCode.SEQUENCE_CREATED); this.pushTelemetry("Sequence uploaded", { language: config.language.toLowerCase(), seqId: id }); @@ -955,21 +956,25 @@ export class Host implements IComponent { const runner = await this.csiDispatcher.startRunner(sequence, payload); // @todo more info - // await this.cpmConnector?.sendInstanceInfo({ // id: runner.id, - // appConfig: payload.appConfig, - // args: payload.args, - // sequence: sequence.id, - // // ports: runner.info.ports - // // created: csic.info.created, - // // started: csic.info.started, - // // status: csic.status, + // appConfig: runner.appConfig, + // args: runner.args, + // sequence: (info => { + // // eslint-disable-next-line @typescript-eslint/no-unused-vars + // const { instances, ...rest } = info; + + // return rest; + // })(sequence), + // ports: runner.info.ports, + // created: csic.info.created, + // started: csic.info.started, + // status: csic.status, // }, InstanceMessageCode.INSTANCE_STARTED); - //this.logger.debug("Instance limits", runner.limits); - //this.auditor.auditInstanceStart(runner.id, req as AuditedRequest, runner.limits); - //this.pushTelemetry("Instance started", { id: runner.id, language: runner.sequence.config.language, seqId: runner.sequence.id }); + this.logger.debug("Instance limits", runner.limits); + this.auditor.auditInstanceStart(runner.id, req as AuditedRequest, runner.limits); + this.pushTelemetry("Instance started", { id: runner.id, language: runner.sequence.config.language, seqId: runner.sequence.id }); // csic.on("hourChime", () => { // this.pushTelemetry("Instance hour chime", { id: csic.id, language: csic.sequence.config.language, seqId: csic.sequence.id }); diff --git a/packages/model/src/stream-handler.ts b/packages/model/src/stream-handler.ts index 79c96c297..2b5d258e7 100644 --- a/packages/model/src/stream-handler.ts +++ b/packages/model/src/stream-handler.ts @@ -1,3 +1,4 @@ +import { ObjLogger } from "@scramjet/obj-logger"; import { CommunicationChannel as CC, CPMMessageCode, RunnerMessageCode } from "@scramjet/symbols"; import { ControlMessageCode, @@ -18,7 +19,6 @@ import { UpstreamStreamsConfig, WritableStream } from "@scramjet/types"; -import { ObjLogger } from "@scramjet/obj-logger"; import { DataStream, StringStream } from "scramjet"; import { PassThrough, Readable, Writable } from "stream"; @@ -155,7 +155,7 @@ export class CommunicationHandler implements ICommunicationHandler { this.addMonitoringHandler(RunnerMessageCode.PING, (msg) => { res(msg); }); - }) + }); } pipeMessageStreams() { diff --git a/packages/runner/src/bin/start-runner.ts b/packages/runner/src/bin/start-runner.ts index 1eae1764b..63f1803b2 100755 --- a/packages/runner/src/bin/start-runner.ts +++ b/packages/runner/src/bin/start-runner.ts @@ -25,7 +25,6 @@ try { process.exit(RunnerExitCode.INVALID_ENV_VARS); } - try { if (!sequenceInfo) throw new Error("Connection JSON is required."); connectInfo = JSON.parse(sequenceInfo); diff --git a/packages/runner/src/host-client.ts b/packages/runner/src/host-client.ts index aec42528a..213169a28 100644 --- a/packages/runner/src/host-client.ts +++ b/packages/runner/src/host-client.ts @@ -1,9 +1,9 @@ /* eslint-disable dot-notation */ -import { IHostClient, IObjectLogger, UpstreamStreamsConfig, } from "@scramjet/types"; -import { CommunicationChannel as CC } from "@scramjet/symbols"; -import net, { createConnection, Socket } from "net"; import { ObjLogger } from "@scramjet/obj-logger"; +import { CommunicationChannel as CC } from "@scramjet/symbols"; +import { IHostClient, IObjectLogger, UpstreamStreamsConfig, } from "@scramjet/types"; import { Agent } from "http"; +import net, { Socket, createConnection } from "net"; type HostOpenConnections = [ net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket @@ -68,7 +68,9 @@ class HostClient implements IHostClient { return connection; }); }) - ).catch((e) => {}); + ).catch((_e) => { + //@TODO: handle error. + }); this._streams = openConnections as HostOpenConnections; diff --git a/packages/runner/src/runner.ts b/packages/runner/src/runner.ts index f88aae7af..c588afd63 100644 --- a/packages/runner/src/runner.ts +++ b/packages/runner/src/runner.ts @@ -30,20 +30,14 @@ import { Readable, Writable } from "stream"; import { HostClient as HostApiClient } from "@scramjet/api-client"; import { ClientUtilsCustomAgent } from "@scramjet/client-utils"; -<<<<<<< HEAD import { ManagerClient } from "@scramjet/manager-api-client"; -||||||| constructed merge base -======= + import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect"; -<<<<<<< HEAD ->>>>>>> Reconnect. Fix starting instance -||||||| constructed merge base -======= + import { writeFileSync } from "fs"; import { mapToInputDataStream, readInputStreamHeaders } from "./input-stream"; import { MessageUtils } from "./message-utils"; import { RunnerAppContext, RunnerProxy } from "./runner-app-context"; ->>>>>>> Reconnect. Write runner exitcode to file. read in process-adapter // async function flushStream(source: Readable | undefined, target: Writable) { // if (!source) return; @@ -73,6 +67,23 @@ export function isSynchronousStreamable(obj: SynchronousStreamable | Primit const overrideMap: Map = new Map(); +function revertStandardStream(oldStream: Writable) { + if (overrideMap.has(oldStream)) { + const { write, drainCb, errorCb } = overrideMap.get(oldStream) as OverrideConfig; + + // @ts-ignore - this is ok, we're doing this on purpose! + delete oldStream.write; + + // if prototypic write is there, then no change needed + if (oldStream.write !== write) + oldStream.write = write; + + oldStream.off("drain", drainCb); + oldStream.off("error", errorCb); + overrideMap.delete(oldStream); + } +} + function overrideStandardStream(oldStream: Writable, newStream: Writable) { if (overrideMap.has(oldStream)) { //throw new Error("Attempt to override stream more than once"); @@ -97,23 +108,6 @@ function overrideStandardStream(oldStream: Writable, newStream: Writable) { overrideMap.set(oldStream, { write, drainCb, errorCb }); } -function revertStandardStream(oldStream: Writable) { - if (overrideMap.has(oldStream)) { - const { write, drainCb, errorCb } = overrideMap.get(oldStream) as OverrideConfig; - - // @ts-ignore - this is ok, we're doing this on purpose! - delete oldStream.write; - - // if prototypic write is there, then no change needed - if (oldStream.write !== write) - oldStream.write = write; - - oldStream.off("drain", drainCb); - oldStream.off("error", errorCb); - overrideMap.delete(oldStream); - } -} - /** * Runtime environment for sequence code. * Communicates with Host with data transferred to/from Sequence, health info, @@ -170,7 +164,7 @@ export class Runner implements IComponent { throw e; }); - process.on("beforeExit", (code)=> { + process.on("beforeExit", (code) => { const filepath = `/tmp/runner-${process.pid.toString()}`; writeFileSync(filepath, code.toString()); @@ -266,15 +260,17 @@ export class Runner implements IComponent { [RunnerMessageCode.MONITORING, { healthy }], this.hostClient.monitorStream ); - this.monitoringMessageReplyTimeout = setTimeout(() => { - this.handleDisconnect(); + this.monitoringMessageReplyTimeout = setTimeout(async () => { + await this.handleDisconnect(); }, 500); } async handleDisconnect() { this.logger.info("Reinitializing...."); - this.premain(); + this.premain().catch((e) => { + this.logger.error("Premain error", e); + }); } async handleKillRequest(): Promise { @@ -333,8 +329,13 @@ export class Runner implements IComponent { try { await this.hostClient.init(this.instanceId); } catch (e) { + this.logger.error("hostClient init error", e); + await defer(2000); - this.premain(); + + this.premain().catch((err: any) => { + this.logger.error("Premain error", err); + }); } this.redirectOutputs(); diff --git a/packages/types/src/messages/monitor-reply.ts b/packages/types/src/messages/monitor-reply.ts new file mode 100644 index 000000000..699b35f0a --- /dev/null +++ b/packages/types/src/messages/monitor-reply.ts @@ -0,0 +1,9 @@ +import { RunnerMessageCode } from "@scramjet/symbols"; + +export type MonitoringReplyMessageData = {}; + +/** + * Message instructing Runner how often to emit monitoring messages. + * This message type is sent from CSIController. + */ +export type MonitoringReplyMessage = { msgCode: RunnerMessageCode.MONITORING_REPLY} & MonitoringReplyMessageData; diff --git a/packages/utility/src/index.ts b/packages/utility/src/index.ts index 44d9406c6..8028bc519 100644 --- a/packages/utility/src/index.ts +++ b/packages/utility/src/index.ts @@ -1,15 +1,16 @@ +export * from "./config"; +export * from "./constants"; export * from "./defer"; +export * from "./file"; export * from "./free-ports-finder"; +export * from "./keygen"; export * from "./merge"; +export * from "./normalize-url"; export * from "./promise-timeout"; -export * from "./read-streamed-json"; -export * from "./typeguards"; -export * from "./typed-emitter"; export * from "./read-json-file"; -export * from "./normalize-url"; +export * from "./read-streamed-json"; export * from "./stream-to-string"; -export * from "./config"; -export * from "./file"; -export * from "./constants"; +export * from "./typed-emitter"; +export * from "./typeguards"; export * from "./validators"; -export * from "./keygen"; +