From 752621168172445a6fde5f7faba6414c2766dd8b Mon Sep 17 00:00:00 2001 From: patuwwy Date: Mon, 18 Sep 2023 16:33:15 +0000 Subject: [PATCH] Reconnect. k8s adapter run --- .../src/kubernetes-instance-adapter.ts | 32 +++++++++---------- .../adapters/src/process-instance-adapter.ts | 9 +++--- packages/model/src/stream-handler.ts | 1 + packages/runner/src/runner.ts | 15 +++++---- 4 files changed, 30 insertions(+), 27 deletions(-) diff --git a/packages/adapters/src/kubernetes-instance-adapter.ts b/packages/adapters/src/kubernetes-instance-adapter.ts index 05e13125d..39add3c03 100644 --- a/packages/adapters/src/kubernetes-instance-adapter.ts +++ b/packages/adapters/src/kubernetes-instance-adapter.ts @@ -39,6 +39,8 @@ IComponent { private adapterConfig: K8SAdapterConfiguration; private _limits?: InstanceLimits = {}; + stdErrorStream?: PassThrough; + get limits() { return this._limits || {} as InstanceLimits; } private set limits(value: InstanceLimits) { this._limits = value; } @@ -89,17 +91,13 @@ IComponent { } }; } - async dispatch(_config: InstanceConfig, _instancesServerPort: number, _instanceId: string, _sequenceInfo: SequenceInfo, _payload: RunnerConnectInfo): Promise { - throw Error("not implemented"); - } - - async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo): Promise { + async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, _payload: RunnerConnectInfo): Promise { if (config.type !== "kubernetes") { throw new Error(`Invalid config type for kubernetes adapter: ${config.type}`); } if (this.adapterConfig.quotaName && await this.kubeClient.isPodsLimitReached(this.adapterConfig.quotaName)) { - return RunnerExitCode.PODS_LIMIT_REACHED; + throw Error(RunnerExitCode.PODS_LIMIT_REACHED.toString()); } this.limits = config.limits; @@ -154,25 +152,27 @@ IComponent { // This means runner pod was unable to start. So it went from "Pending" to "Failed" state directly. // Return 1 which is Linux exit code for "General Error" since we are not able // to determine what happened exactly. - return startPodStatus.code || 137; + return; } this.logger.debug("Copy sequence files to Runner"); const compressedStream = createReadStream(path.join(config.sequenceDir, "compressed.tar.gz")); - const stdErrorStream = new PassThrough(); - stdErrorStream.on("data", (data) => { this.logger.error("POD stderr", data.toString()); }); + this.stdErrorStream = new PassThrough(); + this.stdErrorStream.on("data", (data) => { this.logger.error("POD stderr", data.toString()); }); - await this.kubeClient.exec(runnerName, runnerName, ["unpack.sh", "/package"], process.stdout, stdErrorStream, compressedStream, 2); + await this.kubeClient.exec(runnerName, runnerName, ["unpack.sh", "/package"], process.stdout, this.stdErrorStream, compressedStream, 2); + } - const exitPodStatus = await this.kubeClient.waitForPodStatus(runnerName, ["Succeeded", "Failed", "Unknown"]); + async waitUntilExit(_config: InstanceConfig, _instanceId: string, _sequenceInfo: SequenceInfo): Promise { + const exitPodStatus = await this.kubeClient.waitForPodStatus(this._runnerName!, ["Succeeded", "Failed", "Unknown"]); - stdErrorStream.end(); + this.stdErrorStream?.end(); if (exitPodStatus.status !== "Succeeded") { this.logger.error("Runner stopped incorrectly", exitPodStatus); - this.logger.error("Container failure reason is: ", await this.kubeClient.getPodTerminatedContainerReason(runnerName)); + this.logger.error("Container failure reason is: ", await this.kubeClient.getPodTerminatedContainerReason(this._runnerName!)); return exitPodStatus.code || 137; } @@ -185,9 +185,9 @@ IComponent { return 0; } - async waitUntilExit(_config: InstanceConfig, _instanceId:string, _sequenceInfo: SequenceInfo): Promise { - this.logger.debug("WaitUntilExit", [_config, _instanceId, _sequenceInfo]); - throw Error("Not implemented"); + async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise { + await this.dispatch(config, instancesServerPort, instanceId, sequenceInfo, payload); + return this.waitUntilExit(config, instanceId, sequenceInfo); } async cleanup(): Promise { diff --git a/packages/adapters/src/process-instance-adapter.ts b/packages/adapters/src/process-instance-adapter.ts index 05f6849d7..45dfb6eb3 100644 --- a/packages/adapters/src/process-instance-adapter.ts +++ b/packages/adapters/src/process-instance-adapter.ts @@ -60,7 +60,10 @@ class ProcessInstanceAdapter implements if (!runnerProcess) { // Runner process not initialized yet - return msg; + return { + ...msg, + processId: this.processPID + }; } return { @@ -170,10 +173,6 @@ class ProcessInstanceAdapter implements this.logger.trace("Runner process is running", runnerProcess.pid); - // @todo exit here with pid - // then promise waiting for process with given pid finish (endOfRun) - // how to connect to a process knowing id of it? - this.runnerProcess = runnerProcess; } diff --git a/packages/model/src/stream-handler.ts b/packages/model/src/stream-handler.ts index 2b5d258e7..e4a30d2c5 100644 --- a/packages/model/src/stream-handler.ts +++ b/packages/model/src/stream-handler.ts @@ -163,6 +163,7 @@ export class CommunicationHandler implements ICommunicationHandler { this.logger.error("pipeMessageStreams called twice"); throw new Error("pipeMessageStreams called twice"); } + this._piped = true; if (!this.downstreams || !this.upstreams) { diff --git a/packages/runner/src/runner.ts b/packages/runner/src/runner.ts index c588afd63..a9e13adc2 100644 --- a/packages/runner/src/runner.ts +++ b/packages/runner/src/runner.ts @@ -39,6 +39,15 @@ import { mapToInputDataStream, readInputStreamHeaders } from "./input-stream"; import { MessageUtils } from "./message-utils"; import { RunnerAppContext, RunnerProxy } from "./runner-app-context"; +process.once("beforeExit", (code) => { + const filepath = `/tmp/runner-${process.pid.toString()}`; + + writeFileSync(filepath, code.toString()); + + // eslint-disable-next-line no-console + console.log("Runner exit"); +}); + // async function flushStream(source: Readable | undefined, target: Writable) { // if (!source) return; @@ -163,12 +172,6 @@ export class Runner implements IComponent { throw e; }); - - process.on("beforeExit", (code) => { - const filepath = `/tmp/runner-${process.pid.toString()}`; - - writeFileSync(filepath, code.toString()); - }); } get context(): RunnerAppContext {