Skip to content

Commit

Permalink
Merge pull request #900 from scramjetorg/feat/reconnect
Browse files Browse the repository at this point in the history
Runner Reconnect: Connection Process Revamp
  • Loading branch information
patuwwy authored Mar 6, 2024
2 parents ef536df + cc146bf commit 3a351ae
Show file tree
Hide file tree
Showing 44 changed files with 1,501 additions and 641 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ indent_style = space
indent_size = 4
trim_trailing_whitespace = true
insert_final_newline = true
max_line_length = 120
max_line_length = 180
end_of_line = lf
charset = utf-8

Expand Down
4 changes: 2 additions & 2 deletions .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@
"max-len": [
"warn",
{
"code": 120,
"code": 180,
"tabWidth": 4,
"comments": 120,
"comments": 180,
"ignoreComments": false,
"ignoreTrailingComments": true,
"ignoreUrls": true,
Expand Down
9 changes: 9 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@
"outFiles": [
"${workspaceFolder}/**/*.js"
]
},
{
"name": "Attach by Process ID",
"processId": "${command:PickProcess}",
"request": "attach",
"skipFiles": [
"<node_internals>/**"
],
"type": "node"
}
]
}
1 change: 1 addition & 0 deletions bdd/features/e2e/E2E-010-cli.feature
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Feature: CLI tests

@ci-api @cli
Scenario: E2E-010 TC-006 Test Sequence 'prune --force' option
Given I set config for local Hub
When I execute CLI with "seq send ../packages/checksum-sequence.tar.gz"
When I execute CLI with "seq send ../packages/csv-transform.tar.gz"
When I execute CLI with "seq list"
Expand Down
1 change: 1 addition & 0 deletions bdd/features/e2e/E2E-011-cli-topic.feature
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ This feature checks topic functionalities over CLI

@ci-topic @cli
Scenario: E2E-011 TC-003 API to Instance
# Given I set config for local Hub
When I execute CLI with "topic send avengers data/data.json" without waiting for the end
When I execute CLI with "seq send ../packages/hello-input-out.tar.gz"
When I execute CLI with "seq start - --input-topic avengers "
Expand Down
1 change: 1 addition & 0 deletions bdd/features/e2e/E2E-015-unified.feature
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Feature: Test our shiny new Python runner
Given host is running
When find and upload sequence "debug-args.tar.gz"
And instance started with arguments "foo 3"
And wait for "1000" ms
Then "output" is "{\"first_arg\":\"foo\",\"second_arg\":\"3\"}"
And host is still running

Expand Down
108 changes: 75 additions & 33 deletions bdd/step-definitions/e2e/host-steps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,23 +99,42 @@ const waitForProcessToEnd = async (pid: number) => {
}
};

const killRunner = async () => {
if (process.env.RUNTIME_ADAPTER === "kubernetes") {
// @TODO
return;
// const killRunner = async () => {
// if (process.env.RUNTIME_ADAPTER === "kubernetes") {
// // @TODO
// return;
// }

// if (process.env.RUNTIME_ADAPTER === "process" && processId) {
// try {
// process.kill(processId);
// await waitForProcessToEnd(processId);
// } catch (e) {
// console.error("Couldn't kill runner", e);
// }
// }

// if (process.env.RUNTIME_ADAPTER === "docker" && containerId) {
// await dockerode.getContainer(containerId).kill();
// }
// };

const killAllRunners = async () => {
if (process.env.RUNTIME_ADAPTER === "process") {
exec("killall runner");
}

if (process.env.RUNTIME_ADAPTER === "process" && processId) {
try {
process.kill(processId);
await waitForProcessToEnd(processId);
} catch (e) {
console.error("Couldn't kill runner", e);
}
}
if (process.env.RUNTIME_ADAPTER === "docker") {
await Promise.all(
(await dockerode.listContainers())
.map(async container => {
if (container.Labels["scramjet.instance.id"]) {
return dockerode.getContainer(container.Id).kill();
}

if (process.env.RUNTIME_ADAPTER === "docker" && containerId) {
await dockerode.getContainer(containerId).kill();
return Promise.resolve();
})
);
}
};

Expand Down Expand Up @@ -183,7 +202,20 @@ Before(() => {
streams = {};
});

After({ tags: "@runner-cleanup" }, killRunner);
After({ tags: "@runner-cleanup" }, killAllRunners);
After({}, async () => {
let insts = [];

try {
insts = await hostClient.listInstances();
} catch (_e) {
return;
}

await Promise.all(
insts.map(i => hostClient.getInstanceClient(i.id).kill({ removeImmediately: true }).catch(_e => {}))
);
});

Before({ tags: "@test-si-init" }, function() {
createDirectory("data/template_seq");
Expand Down Expand Up @@ -499,29 +531,39 @@ When("send kill message to instance", async function(this: CustomWorld) {
assert.ok(resp);
});

// eslint-disable-next-line complexity
When("get runner PID", { timeout: 31000 }, async function(this: CustomWorld) {
let success: any;
let tries = 0;

while (!success && tries < 3) {
if (process.env.RUNTIME_ADAPTER === "kubernetes") {
// @TODO
return;
}

if (process.env.RUNTIME_ADAPTER === "process") {
const res = (await this.resources.instance?.getHealth())?.processId;

if (res) {
processId = success = res;
console.log("Process is identified.", processId);
}
} else {
containerId = success = (await this.resources.instance?.getHealth())?.containerId!;
const adapter = process.env.RUNTIME_ADAPTER;

if (containerId) {
console.log("Container is identified.", containerId);
}
while (!success && tries < 3) {
const health = await this.resources.instance?.getHealth();

console.log("Health", health);

switch (adapter) {
case "kubernetes":
return;
case "docker":

containerId = success = health?.containerId!;

if (containerId) {
console.log("Container is identified.", containerId);
}
break;
case "process":
const res = health?.processId;

if (res) {
processId = success = res;
console.log("Process is identified.", processId);
}
break;
default:
break;
}

tries++;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"lint": "TIMING=1 NODE_OPTIONS=\"--max-old-space-size=2048\" scripts/run-script.js -w modules -j 4 -e \"! ls .eslintrc* > /dev/null || npx eslint ./ --ext .ts --ext .js --cache --cache-strategy=content\"",
"lint:uncached": "find . -name .eslintcache -delete && yarn lint",
"start": "DEVELOPMENT=true node dist/sth/bin/hub.js",
"start:dev": "DEVELOPMENT=true ts-node packages/sth/src/bin/hub.ts",
"start:dev": "ts-node packages/sth/src/bin/hub.ts",
"start:dev:cli": "DEVELOPMENT=true ts-node packages/cli/src/bin/index.ts",
"install:clean": "yarn clean && yarn clean:modules && yarn install",
"postinstall": "scripts/run-script.js -v -w modules install:deps",
Expand Down
64 changes: 45 additions & 19 deletions packages/adapters/src/docker-instance-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
RunnerContainerConfiguration,
InstanceLimits,
STHConfiguration,
SequenceInfo,
} from "@scramjet/types";
import path from "path";
import { DockerodeDockerHelper } from "./dockerode-docker-helper";
Expand All @@ -21,6 +22,7 @@ import { STH_DOCKER_NETWORK, isHostSpawnedInDockerContainer, getHostname } from
import { ObjLogger } from "@scramjet/obj-logger";
import { getRunnerEnvEntries } from "./get-runner-env";
import { Readable } from "stream";
import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect";

/**
* Adapter for running Instance by Runner executed in Docker container.
Expand Down Expand Up @@ -114,22 +116,22 @@ IComponent {
* @returns {Promise<MonitoringMessageData>} Promise resolved with container statistics.
*/
async stats(msg: MonitoringMessageData): Promise<MonitoringMessageData> {
if (this.resources.containerId) {
const stats = await this.dockerHelper.stats(this.resources.containerId)!;
this.logger.debug("STATS. Container id:", this.resources.containerId);

return {
cpuTotalUsage: stats.cpu_stats?.cpu_usage?.total_usage,
healthy: msg.healthy,
limit: stats.memory_stats?.limit,
memoryMaxUsage: stats.memory_stats?.max_usage,
memoryUsage: stats.memory_stats?.usage,
networkRx: stats.networks?.eth0?.rx_bytes,
networkTx: stats.networks?.eth0?.tx_bytes,
containerId: this.resources.containerId
};
}
this.resources.containerId ||= await this.dockerHelper.getContainerIdByLabel("scramjet.instance.id", this.id);

const stats = await this.dockerHelper.stats(this.resources.containerId)!;

return msg;
return {
cpuTotalUsage: stats.cpu_stats?.cpu_usage?.total_usage,
healthy: msg.healthy,
limit: stats.memory_stats?.limit,
memoryMaxUsage: stats.memory_stats?.max_usage,
memoryUsage: stats.memory_stats?.usage,
networkRx: stats.networks?.eth0?.rx_bytes,
networkTx: stats.networks?.eth0?.tx_bytes,
containerId: this.resources.containerId
};
}

private async getNetworkSetup(): Promise<{ network: string, host: string }> {
Expand Down Expand Up @@ -169,8 +171,21 @@ IComponent {
};
}

async setRunner(system: Record<string, string>): Promise<void> {
const containerId = await this.dockerHelper.getContainerIdByLabel("scramjet.instance.id", system.id);

this.logger.debug("Container id restored", containerId);

this.resources.containerId = containerId;
}

async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<ExitCode> {
await this.dispatch(config, instancesServerPort, instanceId, sequenceInfo, payload);
return this.waitUntilExit(config, instanceId, sequenceInfo);
}

// eslint-disable-next-line complexity
async run(config: InstanceConfig, instancesServerPort: number, instanceId: string): Promise<ExitCode> {
async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<ExitCode> {
if (!(config.type === "docker" && "container" in config)) {
throw new Error("Docker instance adapter run with invalid runner config");
}
Expand All @@ -193,7 +208,9 @@ IComponent {
instancesServerPort,
instancesServerHost: networkSetup.host,
instanceId,
pipesPath: ""
pipesPath: "",
sequenceInfo,
payload
}, {
...this.sthConfig.runnerEnvs
}).map(([k, v]) => `${k}=${v}`);
Expand All @@ -207,7 +224,8 @@ IComponent {
{ mountPoint: config.sequenceDir, volume: config.id, writeable: false }
],
labels: {
"scramjet.sequence.name": config.name
"scramjet.sequence.name": config.name,
"scramjet.instance.id": instanceId
},
ports: this.resources.ports,
publishAllPorts: true,
Expand All @@ -220,12 +238,20 @@ IComponent {

this.crashLogStreams = Promise.all(([streams.stdout, streams.stderr] as Readable[]).map(streamToString));

this.resources.containerId = containerId;
this.resources.containerId = containerId; // doesnt matter

this.logger.trace("Container is running", containerId);

return 0;
}

async waitUntilExit(config: InstanceConfig, instanceId:string, _sequenceInfo: SequenceInfo): Promise<number> {
try {
const { statusCode } = await this.dockerHelper.wait(containerId);
this.resources.containerId = this.resources.containerId || await this.dockerHelper.getContainerIdByLabel("scramjet.instance.id", instanceId);

this.logger.debug("Wait for container exit...", this.resources.containerId);

const { statusCode } = await this.dockerHelper.wait(this.resources.containerId);

this.logger.debug("Container exited", statusCode);

Expand Down
6 changes: 6 additions & 0 deletions packages/adapters/src/dockerode-docker-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ export class DockerodeDockerHelper implements IDockerHelper {
return id;
}

async getContainerIdByLabel(label: string, value: string): Promise<DockerContainer> {
const result = await this.dockerode.listContainers({ label: `${label}=${value}` });

return result[0]!.Id;
}

/**
* Start container with provided id.
*
Expand Down
4 changes: 3 additions & 1 deletion packages/adapters/src/get-runner-env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { RunnerEnvConfig, RunnerEnvironmentVariables } from "./types";
* @returns env vars
*/
export function getRunnerEnvVariables({
sequencePath, instancesServerPort, instancesServerHost, instanceId, pipesPath, paths = "posix"
sequencePath, instancesServerPort, instancesServerHost, instanceId, pipesPath, paths = "posix", sequenceInfo, payload
}: RunnerEnvConfig, extra: Record<string, string> = {}): RunnerEnvironmentVariables {
const join = path[paths].join;

Expand All @@ -23,6 +23,8 @@ export function getRunnerEnvVariables({
INSTANCE_ID: instanceId,
PIPES_LOCATION: pipesPath,
CRASH_LOG: join(pipesPath, "crash_log"),
SEQUENCE_INFO: JSON.stringify(sequenceInfo),
RUNNER_CONNECT_INFO: JSON.stringify(payload),
...extra
};
}
Expand Down
Loading

0 comments on commit 3a351ae

Please sign in to comment.