Skip to content

Commit

Permalink
Reconnect. Fix lint issues
Browse files Browse the repository at this point in the history
  • Loading branch information
patuwwy committed Sep 25, 2023
1 parent b7a7a70 commit 08b71fe
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 113 deletions.
16 changes: 8 additions & 8 deletions packages/adapters/src/kubernetes-instance-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import {
STHConfiguration,
} from "@scramjet/types";

import path from "path";
import { ObjLogger } from "@scramjet/obj-logger";
import { RunnerExitCode } from "@scramjet/symbols";
import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect";
import { createReadStream } from "fs";
import path from "path";
import { PassThrough } from "stream";
import { getRunnerEnvEntries } from "./get-runner-env";
import { KubernetesClientAdapter } from "./kubernetes-client-adapter";
import { adapterConfigDecoder } from "./kubernetes-config-decoder";
import { getRunnerEnvEntries } from "./get-runner-env";
import { PassThrough } from "stream";
import { RunnerExitCode } from "@scramjet/symbols";
import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect";

/**
* Adapter for running Instance by Runner executed in separate process.
Expand Down Expand Up @@ -89,7 +89,7 @@ IComponent {
}
};
}
async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<void> {
async dispatch(_config: InstanceConfig, _instancesServerPort: number, _instanceId: string, _sequenceInfo: SequenceInfo, _payload: RunnerConnectInfo): Promise<void> {
throw Error("not implemented");
}

Expand Down Expand Up @@ -185,11 +185,11 @@ IComponent {
return 0;
}

async waitUntilExit(config: InstanceConfig, instanceId:string, _sequenceInfo: SequenceInfo): Promise<number> {
async waitUntilExit(_config: InstanceConfig, _instanceId:string, _sequenceInfo: SequenceInfo): Promise<number> {
this.logger.debug("WaitUntilExit", [_config, _instanceId, _sequenceInfo]);
throw Error("Not implemented");
}


async cleanup(): Promise<void> {
await this.remove(this.adapterConfig.timeout);
}
Expand Down
20 changes: 10 additions & 10 deletions packages/adapters/src/process-instance-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ class ProcessInstanceAdapter implements
getRunnerInfo(): RunnerConnectInfo["system"] {
return {
processPID: this.processPID.toString()
}
};
}

async waitUntilExit(config: InstanceConfig, instanceId: string, _sequenceInfo: SequenceInfo): Promise<ExitCode> {
async waitUntilExit(_config: InstanceConfig, _instanceId: string, _sequenceInfo: SequenceInfo): Promise<ExitCode> {
if (this.runnerProcess) {
const [statusCode, signal] = await new Promise<[number | null, NodeJS.Signals | null]>(
(res) => this.runnerProcess?.on("exit", (code, sig) => res([code, sig]))
Expand All @@ -207,34 +207,34 @@ class ProcessInstanceAdapter implements

// When no process reference Wait for file created by runner
return new Promise<ExitCode>((res, reject) => {
const interval = setInterval(async() => {
const interval = setInterval(async () => {
if (this.processPID < 1) return;

const filePath = `/tmp/runner-${this.processPID}`;

try {
await access(filePath, constants.F_OK)
await access(filePath, constants.F_OK);

clearInterval(interval);

const data = await readFile(filePath, 'utf8').catch((readErr) => {
const data = await readFile(filePath, "utf8").catch((readErr) => {
this.logger.error(`Cant' read runner exit code from: ${readErr}`);
reject(readErr);
return;
})
});

this.logger.debug("exitCode saved to file by runner:", data, filePath);

rm(filePath).then(() => {
this.logger.debug("File removed");
}, (err) => {
this.logger.error("Can't remove exitcode file");
})
}, (err: any) => {
this.logger.error("Can't remove exitcode file", err);
});

res(parseInt(data!, 10));
} catch (err) {
/** file not exists */
};
}
}, 1000);
});
}
Expand Down
50 changes: 25 additions & 25 deletions packages/host/src/lib/csi-controller.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,49 @@
import {
AppError,
CSIControllerError,
HostError,
InstanceAdapterError,
MessageUtilities
} from "@scramjet/model";
import { development } from "@scramjet/sth-config";
import { CommunicationChannel as CC, RunnerExitCode, RunnerMessageCode } from "@scramjet/symbols";
import {
APIRoute,
AppConfig,
DownstreamStreamsConfig,
EncodedMessage,
HandshakeAcknowledgeMessage,
ParsedMessage,
PassThroughStreamsConfig,
ReadableStream,
SequenceInfo,
WritableStream,
HostProxy,
ICommunicationHandler,
ILifeCycleAdapterRun,
MessageDataType,
IObjectLogger,
STHRestAPI,
STHConfiguration,
InstanceLimits,
InstanceStats,
InstanceStatus,
IObjectLogger,
MessageDataType,
MonitoringMessageData,
InstanceStats,
OpResponse,
ParsedMessage,
PassThroughStreamsConfig,
ReadableStream,
SequenceInfo,
STHConfiguration,
STHRestAPI,
StopSequenceMessageData,
HostProxy,
ICommunicationHandler
WritableStream
} from "@scramjet/types";
import {
AppError,
CSIControllerError,
HostError,
MessageUtilities,
InstanceAdapterError
} from "@scramjet/model";
import { CommunicationChannel as CC, RunnerExitCode, RunnerMessageCode } from "@scramjet/symbols";
import { Duplex, PassThrough, Readable } from "stream";
import { development } from "@scramjet/sth-config";

import { DataStream } from "scramjet";
import { DuplexStream, getRouter } from "@scramjet/api-server";
import { EventEmitter, once } from "events";
import { ServerResponse } from "http";
import { DuplexStream, getRouter } from "@scramjet/api-server";
import { DataStream } from "scramjet";

import { getInstanceAdapter } from "@scramjet/adapters";
import { cancellableDefer, CancellablePromise, defer, promiseTimeout, TypedEmitter } from "@scramjet/utility";
import { ObjLogger } from "@scramjet/obj-logger";
import { ReasonPhrases } from "http-status-codes";
import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect";
import { cancellableDefer, CancellablePromise, defer, promiseTimeout, TypedEmitter } from "@scramjet/utility";
import { ReasonPhrases } from "http-status-codes";

/**
* @TODO: Runner exits after 10secs and k8s client checks status every 500ms so we need to give it some time
Expand Down
25 changes: 18 additions & 7 deletions packages/host/src/lib/csi-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import { StartSequencePayload } from "@scramjet/types/src/rest-api-sth";
import { TypedEmitter } from "@scramjet/utility";
import { CSIController } from "./csi-controller";
import { InstanceStore } from "./instance-store";
import { SocketServer } from "./socket-server";
import SequenceStore from "./sequenceStore";
import { SocketServer } from "./socket-server";

type errorEventData = {id:string, err: any }
type endEventData = {id:string, code:number }
Expand Down Expand Up @@ -43,7 +43,7 @@ export class CSIDispatcher extends TypedEmitter<Events> {
communicationHandler: ICommunicationHandler,
config: STHConfiguration,
instanceProxy: HostProxy) {
sequenceInfo.instances = sequenceInfo.instances || new Set();
sequenceInfo.instances = sequenceInfo.instances || [];

const csiController = new CSIController({ id, sequenceInfo, payload }, communicationHandler, config, instanceProxy, this.STHConfig.runtimeAdapter);

Expand All @@ -58,10 +58,15 @@ export class CSIDispatcher extends TypedEmitter<Events> {
this.emit("error", { id, err });
});

// eslint-disable-next-line complexity
csiController.on("pang", async (data) => {
this.logger.trace("PANG received", data);

if (data.requires && !csiController.inputRouted) {
if ((data.requires || data.provides) && !data.contentType) {
this.logger.warn("Missing topic content-type");
}

if (data.requires && !csiController.inputRouted && data.contentType) {
this.logger.trace("Routing Sequence input to topic", data.requires);

// await this.serviceDiscovery.routeTopicToStream(
Expand All @@ -76,7 +81,7 @@ export class CSIDispatcher extends TypedEmitter<Events> {
// });
}

if (data.provides && !csiController.outputRouted) {
if (data.provides && !csiController.outputRouted && data.contentType) {
this.logger.trace("Routing Sequence output to topic", data.provides);
// await this.serviceDiscovery.routeStreamToTopic(
// csiController.getOutputStream(),
Expand Down Expand Up @@ -108,7 +113,9 @@ export class CSIDispatcher extends TypedEmitter<Events> {

delete InstanceStore[csiController.id];

sequenceInfo.instances.filter(a => a !== id);
sequenceInfo.instances = sequenceInfo.instances.filter(item => {
return item !== id;
});

// await this.cpmConnector?.sendInstanceInfo({
// id: csiController.id,
Expand All @@ -130,15 +137,19 @@ export class CSIDispatcher extends TypedEmitter<Events> {

// this.auditor.auditInstance(id, InstanceMessageCode.INSTANCE_ENDED);
// this.pushTelemetry("Instance ended", {
// executionTime: csiController.info.ended && csiController.info.started ? ((csiController.info.ended?.getTime() - csiController.info.started.getTime()) / 1000).toString() : "-1",
// executionTime: csiController.info.ended && csiController.info.started
// ? ((csiController.info.ended?.getTime() - csiController.info.started.getTime()) / 1000).toString()
// : "-1",
// id: csiController.id,
// code: code.toString(),
// seqId: csiController.sequence.id
// });
this.emit("terminated", { id, code });
});

csiController.start().then(() => {}, () => {});
csiController.start().catch(() => {
//@TODO: handle start error;
});

this.logger.trace("csiController started", id);

Expand Down
41 changes: 23 additions & 18 deletions packages/host/src/lib/host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import {
OpResponse,
ParsedMessage,
PublicSTHConfiguration,
SequenceInfo,
StartSequenceDTO,
STHConfiguration,
STHRestAPI
STHRestAPI,
SequenceInfo,
StartSequenceDTO
} from "@scramjet/types";

import { getSequenceAdapter, initializeRuntimeAdapters } from "@scramjet/adapters";
Expand All @@ -42,17 +42,17 @@ import { readFileSync } from "fs";
import { cpus, totalmem } from "os";
import { DataStream } from "scramjet";
import { inspect } from "util";
import { Auditor } from "./auditor";

import { AuditedRequest, Auditor } from "./auditor";
import { auditMiddleware, logger as auditMiddlewareLogger } from "./middlewares/audit";
import { corsMiddleware } from "./middlewares/cors";
import { optionsMiddleware } from "./middlewares/options";
import { S3Client } from "./s3-client";
import { ServiceDiscovery } from "./serviceDiscovery/sd-adapter";
import { SocketServer } from "./socket-server";

import SequenceStore from "./sequenceStore";
import TopicRouter from "./serviceDiscovery/topicRouter";
import { GetSequenceResponse } from "@scramjet/types/src/rest-api-sth";
import { loadModule, logger as loadModuleLogger } from "@scramjet/module-loader";
import { CSIDispatcher } from "./csi-dispatcher";

Expand Down Expand Up @@ -185,6 +185,7 @@ export class Host implements IComponent {
constructor(apiServer: APIExpose, socketServer: SocketServer, sthConfig: STHConfiguration) {
this.config = sthConfig;
this.publicConfig = ConfigService.getConfigInfo(sthConfig);
this.sequenceStore = new SequenceStore();

this.logger = new ObjLogger(
this,
Expand Down Expand Up @@ -672,7 +673,7 @@ export class Host implements IComponent {

this.logger.trace("Sequence removed:", id);
// eslint-disable-next-line max-len
await this.cpmConnector?.sendSequenceInfo(id, SequenceMessageCode.SEQUENCE_DELETED, sequenceInfo as unknown as GetSequenceResponse);
await this.cpmConnector?.sendSequenceInfo(id, SequenceMessageCode.SEQUENCE_DELETED, sequenceInfo as unknown as STHRestAPI.GetSequenceResponse);
this.auditor.auditSequence(id, SequenceMessageCode.SEQUENCE_DELETED);

return {
Expand Down Expand Up @@ -805,7 +806,7 @@ export class Host implements IComponent {
this.logger.trace(`Sequence identified: ${config.id}`);

// eslint-disable-next-line max-len
await this.cpmConnector?.sendSequenceInfo(id, SequenceMessageCode.SEQUENCE_CREATED, config as unknown as GetSequenceResponse);
await this.cpmConnector?.sendSequenceInfo(id, SequenceMessageCode.SEQUENCE_CREATED, config as unknown as STHRestAPI.GetSequenceResponse);

this.auditor.auditSequence(id, SequenceMessageCode.SEQUENCE_CREATED);
this.pushTelemetry("Sequence uploaded", { language: config.language.toLowerCase(), seqId: id });
Expand Down Expand Up @@ -955,21 +956,25 @@ export class Host implements IComponent {
const runner = await this.csiDispatcher.startRunner(sequence, payload);

// @todo more info

// await this.cpmConnector?.sendInstanceInfo({
// id: runner.id,
// appConfig: payload.appConfig,
// args: payload.args,
// sequence: sequence.id,
// // ports: runner.info.ports
// // created: csic.info.created,
// // started: csic.info.started,
// // status: csic.status,
// appConfig: runner.appConfig,
// args: runner.args,
// sequence: (info => {
// // eslint-disable-next-line @typescript-eslint/no-unused-vars
// const { instances, ...rest } = info;

// return rest;
// })(sequence),
// ports: runner.info.ports,
// created: csic.info.created,
// started: csic.info.started,
// status: csic.status,
// }, InstanceMessageCode.INSTANCE_STARTED);

//this.logger.debug("Instance limits", runner.limits);
//this.auditor.auditInstanceStart(runner.id, req as AuditedRequest, runner.limits);
//this.pushTelemetry("Instance started", { id: runner.id, language: runner.sequence.config.language, seqId: runner.sequence.id });
this.logger.debug("Instance limits", runner.limits);
this.auditor.auditInstanceStart(runner.id, req as AuditedRequest, runner.limits);
this.pushTelemetry("Instance started", { id: runner.id, language: runner.sequence.config.language, seqId: runner.sequence.id });

// csic.on("hourChime", () => {
// this.pushTelemetry("Instance hour chime", { id: csic.id, language: csic.sequence.config.language, seqId: csic.sequence.id });
Expand Down
4 changes: 2 additions & 2 deletions packages/model/src/stream-handler.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { ObjLogger } from "@scramjet/obj-logger";
import { CommunicationChannel as CC, CPMMessageCode, RunnerMessageCode } from "@scramjet/symbols";
import {
ControlMessageCode,
Expand All @@ -18,7 +19,6 @@ import {
UpstreamStreamsConfig,
WritableStream
} from "@scramjet/types";
import { ObjLogger } from "@scramjet/obj-logger";

import { DataStream, StringStream } from "scramjet";
import { PassThrough, Readable, Writable } from "stream";
Expand Down Expand Up @@ -155,7 +155,7 @@ export class CommunicationHandler implements ICommunicationHandler {
this.addMonitoringHandler(RunnerMessageCode.PING, (msg) => {
res(msg);
});
})
});
}

pipeMessageStreams() {
Expand Down
1 change: 0 additions & 1 deletion packages/runner/src/bin/start-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ try {
process.exit(RunnerExitCode.INVALID_ENV_VARS);
}


try {
if (!sequenceInfo) throw new Error("Connection JSON is required.");
connectInfo = JSON.parse(sequenceInfo);
Expand Down
Loading

0 comments on commit 08b71fe

Please sign in to comment.