Skip to content

Commit

Permalink
Reconnect. k8s adapter run
Browse files Browse the repository at this point in the history
  • Loading branch information
patuwwy committed Sep 25, 2023
1 parent 08b71fe commit 7526211
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 27 deletions.
32 changes: 16 additions & 16 deletions packages/adapters/src/kubernetes-instance-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ IComponent {
private adapterConfig: K8SAdapterConfiguration;
private _limits?: InstanceLimits = {};

stdErrorStream?: PassThrough;

get limits() { return this._limits || {} as InstanceLimits; }
private set limits(value: InstanceLimits) { this._limits = value; }

Expand Down Expand Up @@ -89,17 +91,13 @@ IComponent {
}
};
}
async dispatch(_config: InstanceConfig, _instancesServerPort: number, _instanceId: string, _sequenceInfo: SequenceInfo, _payload: RunnerConnectInfo): Promise<void> {
throw Error("not implemented");
}

async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo): Promise<ExitCode> {
async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, _payload: RunnerConnectInfo): Promise<void> {
if (config.type !== "kubernetes") {
throw new Error(`Invalid config type for kubernetes adapter: ${config.type}`);
}

if (this.adapterConfig.quotaName && await this.kubeClient.isPodsLimitReached(this.adapterConfig.quotaName)) {
return RunnerExitCode.PODS_LIMIT_REACHED;
throw Error(RunnerExitCode.PODS_LIMIT_REACHED.toString());
}

this.limits = config.limits;
Expand Down Expand Up @@ -154,25 +152,27 @@ IComponent {
// This means runner pod was unable to start. So it went from "Pending" to "Failed" state directly.
// Return 1 which is Linux exit code for "General Error" since we are not able
// to determine what happened exactly.
return startPodStatus.code || 137;
return;
}

this.logger.debug("Copy sequence files to Runner");

const compressedStream = createReadStream(path.join(config.sequenceDir, "compressed.tar.gz"));
const stdErrorStream = new PassThrough();

stdErrorStream.on("data", (data) => { this.logger.error("POD stderr", data.toString()); });
this.stdErrorStream = new PassThrough();
this.stdErrorStream.on("data", (data) => { this.logger.error("POD stderr", data.toString()); });

await this.kubeClient.exec(runnerName, runnerName, ["unpack.sh", "/package"], process.stdout, stdErrorStream, compressedStream, 2);
await this.kubeClient.exec(runnerName, runnerName, ["unpack.sh", "/package"], process.stdout, this.stdErrorStream, compressedStream, 2);
}

const exitPodStatus = await this.kubeClient.waitForPodStatus(runnerName, ["Succeeded", "Failed", "Unknown"]);
async waitUntilExit(_config: InstanceConfig, _instanceId: string, _sequenceInfo: SequenceInfo): Promise<ExitCode> {
const exitPodStatus = await this.kubeClient.waitForPodStatus(this._runnerName!, ["Succeeded", "Failed", "Unknown"]);

stdErrorStream.end();
this.stdErrorStream?.end();

if (exitPodStatus.status !== "Succeeded") {
this.logger.error("Runner stopped incorrectly", exitPodStatus);
this.logger.error("Container failure reason is: ", await this.kubeClient.getPodTerminatedContainerReason(runnerName));
this.logger.error("Container failure reason is: ", await this.kubeClient.getPodTerminatedContainerReason(this._runnerName!));

return exitPodStatus.code || 137;
}
Expand All @@ -185,9 +185,9 @@ IComponent {
return 0;
}

async waitUntilExit(_config: InstanceConfig, _instanceId:string, _sequenceInfo: SequenceInfo): Promise<number> {
this.logger.debug("WaitUntilExit", [_config, _instanceId, _sequenceInfo]);
throw Error("Not implemented");
async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<ExitCode> {
await this.dispatch(config, instancesServerPort, instanceId, sequenceInfo, payload);
return this.waitUntilExit(config, instanceId, sequenceInfo);
}

async cleanup(): Promise<void> {
Expand Down
9 changes: 4 additions & 5 deletions packages/adapters/src/process-instance-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ class ProcessInstanceAdapter implements

if (!runnerProcess) {
// Runner process not initialized yet
return msg;
return {
...msg,
processId: this.processPID
};
}

return {
Expand Down Expand Up @@ -170,10 +173,6 @@ class ProcessInstanceAdapter implements

this.logger.trace("Runner process is running", runnerProcess.pid);

// @todo exit here with pid
// then promise waiting for process with given pid finish (endOfRun)
// how to connect to a process knowing id of it?

this.runnerProcess = runnerProcess;
}

Expand Down
1 change: 1 addition & 0 deletions packages/model/src/stream-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ export class CommunicationHandler implements ICommunicationHandler {
this.logger.error("pipeMessageStreams called twice");
throw new Error("pipeMessageStreams called twice");
}

this._piped = true;

if (!this.downstreams || !this.upstreams) {
Expand Down
15 changes: 9 additions & 6 deletions packages/runner/src/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ import { mapToInputDataStream, readInputStreamHeaders } from "./input-stream";
import { MessageUtils } from "./message-utils";
import { RunnerAppContext, RunnerProxy } from "./runner-app-context";

process.once("beforeExit", (code) => {
const filepath = `/tmp/runner-${process.pid.toString()}`;

writeFileSync(filepath, code.toString());

// eslint-disable-next-line no-console
console.log("Runner exit");
});

// async function flushStream(source: Readable | undefined, target: Writable) {
// if (!source) return;

Expand Down Expand Up @@ -163,12 +172,6 @@ export class Runner<X extends AppConfig> implements IComponent {

throw e;
});

process.on("beforeExit", (code) => {
const filepath = `/tmp/runner-${process.pid.toString()}`;

writeFileSync(filepath, code.toString());
});
}

get context(): RunnerAppContext<X, any> {
Expand Down

0 comments on commit 7526211

Please sign in to comment.