Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runner Reconnect: Connection Process Revamp #900

Merged
merged 68 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
ceb8048
Todo comments and changes for review
MichalCz Jul 3, 2023
321f78d
Add needed info to Ping Interface
MichalCz Jul 3, 2023
05eb95e
Add runner exception
MichalCz Jul 3, 2023
1f3871e
csi dispatcher
piotrek6641 Jul 11, 2023
416580b
Start instance in dispatcher
piotrek6641 Jul 12, 2023
799ecb7
InstanceAdapter..dispatch method
MichalCz Jul 13, 2023
a532cd3
Refactoring runner connect start
MichalCz Jul 13, 2023
21118b9
pushing payload to runner/ docker run&disptach
piotrek6641 Aug 2, 2023
d70edba
Reconnect. Fix starting instance
patuwwy Sep 12, 2023
bd0b54f
Reconnect start seq with process adapter [wip]
patuwwy Sep 13, 2023
b1f37d3
Reconnect instance on STH restart (wip)
patuwwy Sep 13, 2023
00455b0
Reconnect. Write runner exitcode to file. read in process-adapter
patuwwy Sep 15, 2023
0b93064
Starting Instance wait for CSI create
patuwwy Sep 17, 2023
23735c4
Reconnect. Fix lint issues
patuwwy Sep 18, 2023
8cbbf0e
Reconnect. k8s adapter run
patuwwy Sep 18, 2023
e569f3e
Reconnect. logs. [wip]
patuwwy Sep 25, 2023
97ff56a
fix runner appConfig WIP
piotrek6641 Sep 25, 2023
91f0572
Restore piping instance
patuwwy Sep 25, 2023
6a4b319
reconnect. runner pid in docker, k8s [TEST]
patuwwy Sep 26, 2023
23cda86
Kill runner
patuwwy Sep 26, 2023
2cb372a
Fixing missing containerid
patuwwy Sep 29, 2023
66ee06d
Get container id on stats when missing
patuwwy Sep 29, 2023
bf71f9a
Kill all docker runners
patuwwy Sep 29, 2023
bf98b4c
Disable reconect
patuwwy Oct 2, 2023
d226c92
prune after scenario
patuwwy Oct 2, 2023
b9b8bfd
Kill runner will
patuwwy Oct 2, 2023
7ae246e
Fix overwriting topics inout
patuwwy Oct 2, 2023
a1ec49a
Reconnect. Cleanup
patuwwy Oct 5, 2023
5dff602
Reenable reconnect
patuwwy Oct 6, 2023
340df76
Connect py runner
patuwwy Oct 10, 2023
6970aa7
Store args from ping
patuwwy Oct 10, 2023
0b9a3a2
[WIP] Py runner reconnect
piotrek6641 Oct 11, 2023
369aa68
update python magic values
piotrek6641 Oct 11, 2023
019ad27
[WIP] py runner reconnect
piotrek6641 Oct 12, 2023
296ff21
Reconnect pyRunner. [no data after reconnecting
patuwwy Oct 12, 2023
a344150
Merge pull request #959 from scramjetorg/py-runner-reconnect
patuwwy Oct 13, 2023
a7907d2
Remove clock/ cleanup
piotrek6641 Oct 16, 2023
6369145
PyRunner. Reuseable redirecting inout
patuwwy Oct 16, 2023
3600784
Restore log stream
patuwwy Oct 16, 2023
88e8b01
console logs cleanup
piotrek6641 Oct 16, 2023
eb48a7a
Reconnect inout [wip].
patuwwy Oct 17, 2023
c782470
Merge branch 'devel' into feat/reconnect
patuwwy Jan 8, 2024
2df066f
Fix merge
patuwwy Jan 8, 2024
9ce523b
Fix reconnecting js runner
patuwwy Jan 9, 2024
742069d
Fix reconnecting docker
patuwwy Jan 10, 2024
2cab29d
Revert py runner to connect new way but with reconnect
patuwwy Jan 12, 2024
e95e921
Merge branch 'devel' into feat/reconnect
patuwwy Jan 12, 2024
8838905
Fix after devel merge
patuwwy Jan 12, 2024
9d2fabf
Merge branch 'devel' into feat/reconnect
patuwwy Jan 16, 2024
e6ef9da
Send Monitoring reply before stats
patuwwy Jan 16, 2024
21601ef
Restore 'created' field in InstanceInfo
patuwwy Jan 17, 2024
b4d0c84
Warning on unsuccesful connection. Fix status after reconnect
patuwwy Jan 18, 2024
ce5452b
Add logs and missing payload in k8s IA
patuwwy Jan 19, 2024
bce56bf
Init kubeClient on reconnect. Fix execTime on error
patuwwy Jan 19, 2024
15eda16
:resend PANGs after reconnect
patuwwy Jan 22, 2024
aa2e5d7
Send InstanceStatus in Pang
patuwwy Jan 22, 2024
cbc770c
Try to get instance from external source if not exists on Hosts
patuwwy Jan 23, 2024
891fa75
Fix sending sequences info on platform connect
patuwwy Jan 23, 2024
10be6f2
Handle failed start
patuwwy Jan 23, 2024
0afc685
Dispatching error code handling
patuwwy Jan 23, 2024
0704147
Fix sending sequences info on platform connect 2
patuwwy Jan 24, 2024
d840bcf
Move InstanceStatus to enums. Send instance event without saparate st…
patuwwy Jan 24, 2024
1895e1a
Fix input & not-serialized output on reconnect
patuwwy Jan 26, 2024
e79140d
Merge branch 'devel' into feat/reconnect
patuwwy Feb 6, 2024
e341423
Fix imports after merge
patuwwy Feb 6, 2024
2a73c90
Merge branch 'devel' into feat/reconnect
patuwwy Feb 20, 2024
a05c64f
Remove debugging code
patuwwy Feb 21, 2024
cc146bf
Remove dev characters from logs
patuwwy Mar 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ indent_style = space
indent_size = 4
trim_trailing_whitespace = true
insert_final_newline = true
max_line_length = 120
max_line_length = 180
end_of_line = lf
charset = utf-8

Expand Down
4 changes: 2 additions & 2 deletions .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@
"max-len": [
"warn",
{
"code": 120,
"code": 180,
"tabWidth": 4,
"comments": 120,
"comments": 180,
"ignoreComments": false,
"ignoreTrailingComments": true,
"ignoreUrls": true,
Expand Down
9 changes: 9 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@
"outFiles": [
"${workspaceFolder}/**/*.js"
]
},
{
"name": "Attach by Process ID",
"processId": "${command:PickProcess}",
"request": "attach",
"skipFiles": [
"<node_internals>/**"
],
"type": "node"
}
]
}
1 change: 1 addition & 0 deletions bdd/features/e2e/E2E-010-cli.feature
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Feature: CLI tests

@ci-api @cli
Scenario: E2E-010 TC-006 Test Sequence 'prune --force' option
Given I set config for local Hub
When I execute CLI with "seq send ../packages/checksum-sequence.tar.gz"
When I execute CLI with "seq send ../packages/csv-transform.tar.gz"
When I execute CLI with "seq list"
Expand Down
1 change: 1 addition & 0 deletions bdd/features/e2e/E2E-011-cli-topic.feature
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ This feature checks topic functionalities over CLI

@ci-topic @cli
Scenario: E2E-011 TC-003 API to Instance
# Given I set config for local Hub
When I execute CLI with "topic send avengers data/data.json" without waiting for the end
When I execute CLI with "seq send ../packages/hello-input-out.tar.gz"
When I execute CLI with "seq start - --input-topic avengers "
Expand Down
1 change: 1 addition & 0 deletions bdd/features/e2e/E2E-015-unified.feature
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Feature: Test our shiny new Python runner
Given host is running
When find and upload sequence "debug-args.tar.gz"
And instance started with arguments "foo 3"
And wait for "1000" ms
Then "output" is "{\"first_arg\":\"foo\",\"second_arg\":\"3\"}"
And host is still running

Expand Down
108 changes: 75 additions & 33 deletions bdd/step-definitions/e2e/host-steps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,23 +99,42 @@ const waitForProcessToEnd = async (pid: number) => {
}
};

const killRunner = async () => {
if (process.env.RUNTIME_ADAPTER === "kubernetes") {
// @TODO
return;
// const killRunner = async () => {
// if (process.env.RUNTIME_ADAPTER === "kubernetes") {
// // @TODO
// return;
// }

// if (process.env.RUNTIME_ADAPTER === "process" && processId) {
// try {
// process.kill(processId);
// await waitForProcessToEnd(processId);
// } catch (e) {
// console.error("Couldn't kill runner", e);
// }
// }

// if (process.env.RUNTIME_ADAPTER === "docker" && containerId) {
// await dockerode.getContainer(containerId).kill();
// }
// };

const killAllRunners = async () => {
if (process.env.RUNTIME_ADAPTER === "process") {
exec("killall runner");
}

if (process.env.RUNTIME_ADAPTER === "process" && processId) {
try {
process.kill(processId);
await waitForProcessToEnd(processId);
} catch (e) {
console.error("Couldn't kill runner", e);
}
}
if (process.env.RUNTIME_ADAPTER === "docker") {
await Promise.all(
(await dockerode.listContainers())
.map(async container => {
if (container.Labels["scramjet.instance.id"]) {
return dockerode.getContainer(container.Id).kill();
}

if (process.env.RUNTIME_ADAPTER === "docker" && containerId) {
await dockerode.getContainer(containerId).kill();
return Promise.resolve();
})
);
}
};

Expand Down Expand Up @@ -183,7 +202,20 @@ Before(() => {
streams = {};
});

After({ tags: "@runner-cleanup" }, killRunner);
After({ tags: "@runner-cleanup" }, killAllRunners);
After({}, async () => {
let insts = [];

try {
insts = await hostClient.listInstances();
} catch (_e) {
return;
}

await Promise.all(
insts.map(i => hostClient.getInstanceClient(i.id).kill({ removeImmediately: true }).catch(_e => {}))
);
});

Before({ tags: "@test-si-init" }, function() {
createDirectory("data/template_seq");
Expand Down Expand Up @@ -499,29 +531,39 @@ When("send kill message to instance", async function(this: CustomWorld) {
assert.ok(resp);
});

// eslint-disable-next-line complexity
When("get runner PID", { timeout: 31000 }, async function(this: CustomWorld) {
let success: any;
let tries = 0;

while (!success && tries < 3) {
if (process.env.RUNTIME_ADAPTER === "kubernetes") {
// @TODO
return;
}

if (process.env.RUNTIME_ADAPTER === "process") {
const res = (await this.resources.instance?.getHealth())?.processId;

if (res) {
processId = success = res;
console.log("Process is identified.", processId);
}
} else {
containerId = success = (await this.resources.instance?.getHealth())?.containerId!;
const adapter = process.env.RUNTIME_ADAPTER;

if (containerId) {
console.log("Container is identified.", containerId);
}
while (!success && tries < 3) {
const health = await this.resources.instance?.getHealth();

console.log("Health", health);

switch (adapter) {
case "kubernetes":
return;
case "docker":

containerId = success = health?.containerId!;

if (containerId) {
console.log("Container is identified.", containerId);
}
break;
case "process":
const res = health?.processId;

if (res) {
processId = success = res;
console.log("Process is identified.", processId);
}
break;
default:
break;
}

tries++;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"lint": "TIMING=1 NODE_OPTIONS=\"--max-old-space-size=2048\" scripts/run-script.js -w modules -j 4 -e \"! ls .eslintrc* > /dev/null || npx eslint ./ --ext .ts --ext .js --cache --cache-strategy=content\"",
"lint:uncached": "find . -name .eslintcache -delete && yarn lint",
"start": "DEVELOPMENT=true node dist/sth/bin/hub.js",
"start:dev": "DEVELOPMENT=true ts-node packages/sth/src/bin/hub.ts",
"start:dev": "ts-node packages/sth/src/bin/hub.ts",
"start:dev:cli": "DEVELOPMENT=true ts-node packages/cli/src/bin/index.ts",
"install:clean": "yarn clean && yarn clean:modules && yarn install",
"postinstall": "scripts/run-script.js -v -w modules install:deps",
Expand Down
64 changes: 45 additions & 19 deletions packages/adapters/src/docker-instance-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
RunnerContainerConfiguration,
InstanceLimits,
STHConfiguration,
SequenceInfo,
} from "@scramjet/types";
import path from "path";
import { DockerodeDockerHelper } from "./dockerode-docker-helper";
Expand All @@ -21,6 +22,7 @@ import { STH_DOCKER_NETWORK, isHostSpawnedInDockerContainer, getHostname } from
import { ObjLogger } from "@scramjet/obj-logger";
import { getRunnerEnvEntries } from "./get-runner-env";
import { Readable } from "stream";
import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect";

/**
* Adapter for running Instance by Runner executed in Docker container.
Expand Down Expand Up @@ -114,22 +116,22 @@ IComponent {
* @returns {Promise<MonitoringMessageData>} Promise resolved with container statistics.
*/
async stats(msg: MonitoringMessageData): Promise<MonitoringMessageData> {
if (this.resources.containerId) {
const stats = await this.dockerHelper.stats(this.resources.containerId)!;
this.logger.debug("STATS. Container id:", this.resources.containerId);

return {
cpuTotalUsage: stats.cpu_stats?.cpu_usage?.total_usage,
healthy: msg.healthy,
limit: stats.memory_stats?.limit,
memoryMaxUsage: stats.memory_stats?.max_usage,
memoryUsage: stats.memory_stats?.usage,
networkRx: stats.networks?.eth0?.rx_bytes,
networkTx: stats.networks?.eth0?.tx_bytes,
containerId: this.resources.containerId
};
}
this.resources.containerId ||= await this.dockerHelper.getContainerIdByLabel("scramjet.instance.id", this.id);

const stats = await this.dockerHelper.stats(this.resources.containerId)!;

return msg;
return {
cpuTotalUsage: stats.cpu_stats?.cpu_usage?.total_usage,
healthy: msg.healthy,
limit: stats.memory_stats?.limit,
memoryMaxUsage: stats.memory_stats?.max_usage,
memoryUsage: stats.memory_stats?.usage,
networkRx: stats.networks?.eth0?.rx_bytes,
networkTx: stats.networks?.eth0?.tx_bytes,
containerId: this.resources.containerId
};
}

private async getNetworkSetup(): Promise<{ network: string, host: string }> {
Expand Down Expand Up @@ -169,8 +171,21 @@ IComponent {
};
}

async setRunner(system: Record<string, string>): Promise<void> {
const containerId = await this.dockerHelper.getContainerIdByLabel("scramjet.instance.id", system.id);

this.logger.debug("Container id restored", containerId);

this.resources.containerId = containerId;
}

async run(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<ExitCode> {
await this.dispatch(config, instancesServerPort, instanceId, sequenceInfo, payload);
return this.waitUntilExit(config, instanceId, sequenceInfo);
}

// eslint-disable-next-line complexity
async run(config: InstanceConfig, instancesServerPort: number, instanceId: string): Promise<ExitCode> {
async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<ExitCode> {
if (!(config.type === "docker" && "container" in config)) {
throw new Error("Docker instance adapter run with invalid runner config");
}
Expand All @@ -193,7 +208,9 @@ IComponent {
instancesServerPort,
instancesServerHost: networkSetup.host,
instanceId,
pipesPath: ""
pipesPath: "",
sequenceInfo,
payload
}, {
...this.sthConfig.runnerEnvs
}).map(([k, v]) => `${k}=${v}`);
Expand All @@ -207,7 +224,8 @@ IComponent {
{ mountPoint: config.sequenceDir, volume: config.id, writeable: false }
],
labels: {
"scramjet.sequence.name": config.name
"scramjet.sequence.name": config.name,
"scramjet.instance.id": instanceId
},
ports: this.resources.ports,
publishAllPorts: true,
Expand All @@ -220,12 +238,20 @@ IComponent {

this.crashLogStreams = Promise.all(([streams.stdout, streams.stderr] as Readable[]).map(streamToString));

this.resources.containerId = containerId;
this.resources.containerId = containerId; // doesnt matter

this.logger.trace("Container is running", containerId);

return 0;
}

async waitUntilExit(config: InstanceConfig, instanceId:string, _sequenceInfo: SequenceInfo): Promise<number> {
try {
const { statusCode } = await this.dockerHelper.wait(containerId);
this.resources.containerId = this.resources.containerId || await this.dockerHelper.getContainerIdByLabel("scramjet.instance.id", instanceId);

this.logger.debug("Wait for container exit...", this.resources.containerId);

const { statusCode } = await this.dockerHelper.wait(this.resources.containerId);

this.logger.debug("Container exited", statusCode);

Expand Down
6 changes: 6 additions & 0 deletions packages/adapters/src/dockerode-docker-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ export class DockerodeDockerHelper implements IDockerHelper {
return id;
}

async getContainerIdByLabel(label: string, value: string): Promise<DockerContainer> {
const result = await this.dockerode.listContainers({ label: `${label}=${value}` });

return result[0]!.Id;
}

/**
* Start container with provided id.
*
Expand Down
4 changes: 3 additions & 1 deletion packages/adapters/src/get-runner-env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { RunnerEnvConfig, RunnerEnvironmentVariables } from "./types";
* @returns env vars
*/
export function getRunnerEnvVariables({
sequencePath, instancesServerPort, instancesServerHost, instanceId, pipesPath, paths = "posix"
sequencePath, instancesServerPort, instancesServerHost, instanceId, pipesPath, paths = "posix", sequenceInfo, payload
}: RunnerEnvConfig, extra: Record<string, string> = {}): RunnerEnvironmentVariables {
const join = path[paths].join;

Expand All @@ -23,6 +23,8 @@ export function getRunnerEnvVariables({
INSTANCE_ID: instanceId,
PIPES_LOCATION: pipesPath,
CRASH_LOG: join(pipesPath, "crash_log"),
SEQUENCE_INFO: JSON.stringify(sequenceInfo),
RUNNER_CONNECT_INFO: JSON.stringify(payload),
...extra
};
}
Expand Down
Loading
Loading