From 4abd144e948fe40ad27c072140a14f4e9dc91122 Mon Sep 17 00:00:00 2001 From: patuwwy Date: Thu, 29 Jun 2023 11:29:00 +0000 Subject: [PATCH] adapter-k8s, remove old adapters package --- packages/adapter-k8s/.eslintrc.js | 7 + packages/adapter-k8s/.nycrc.json | 13 ++ packages/adapter-k8s/README.md | 61 +++++ packages/adapter-k8s/package.json | 61 +++++ packages/adapter-k8s/src/index.ts | 11 + .../src/kubernetes-client-adapter.ts | 212 +++++++++++++++++ .../src/kubernetes-config-decoder.ts | 19 ++ .../src/kubernetes-instance-adapter.ts | 216 ++++++++++++++++++ .../src/kubernetes-sequence-adapter.ts | 158 +++++++++++++ packages/adapter-k8s/src/readme.mtpl | 18 ++ packages/adapter-k8s/test/pass.spec.ts | 5 + packages/adapter-k8s/tsconfig.build.json | 9 + packages/adapter-k8s/tsconfig.json | 22 ++ 13 files changed, 812 insertions(+) create mode 100644 packages/adapter-k8s/.eslintrc.js create mode 100644 packages/adapter-k8s/.nycrc.json create mode 100644 packages/adapter-k8s/README.md create mode 100644 packages/adapter-k8s/package.json create mode 100644 packages/adapter-k8s/src/index.ts create mode 100644 packages/adapter-k8s/src/kubernetes-client-adapter.ts create mode 100644 packages/adapter-k8s/src/kubernetes-config-decoder.ts create mode 100644 packages/adapter-k8s/src/kubernetes-instance-adapter.ts create mode 100644 packages/adapter-k8s/src/kubernetes-sequence-adapter.ts create mode 100644 packages/adapter-k8s/src/readme.mtpl create mode 100644 packages/adapter-k8s/test/pass.spec.ts create mode 100644 packages/adapter-k8s/tsconfig.build.json create mode 100644 packages/adapter-k8s/tsconfig.json diff --git a/packages/adapter-k8s/.eslintrc.js b/packages/adapter-k8s/.eslintrc.js new file mode 100644 index 000000000..0c111683c --- /dev/null +++ b/packages/adapter-k8s/.eslintrc.js @@ -0,0 +1,7 @@ +module.exports = { + ignorePatterns: [".eslintrc.js"], + parserOptions:{ + project: "./tsconfig.json", + tsconfigRootDir: __dirname + } +}; diff --git a/packages/adapter-k8s/.nycrc.json b/packages/adapter-k8s/.nycrc.json new file mode 100644 index 000000000..7797303b5 --- /dev/null +++ b/packages/adapter-k8s/.nycrc.json @@ -0,0 +1,13 @@ +{ + "all": true, + "include": [ + "dist/**/*" + ], + "exclude": [ + "**/*.spec.ts" + ], + "reporter": [ + "lcovonly", + "text" + ] +} diff --git a/packages/adapter-k8s/README.md b/packages/adapter-k8s/README.md new file mode 100644 index 000000000..36b74543f --- /dev/null +++ b/packages/adapter-k8s/README.md @@ -0,0 +1,61 @@ +# Scramjet Transform Hub Adapters + +This module holds two types of adapters utilized by Scramjet Transform Hub: Instance Adapter and Sequence Adapter. These Adapters allows for running the Sequence identification and Instance execution in two basic modes: as a non containerized standalone processes or in a Docker container. + +The adapter provides two main exports: + +* [DockerSequenceAdapter](https://github.com/scramjetorg/transform-hub/tree/HEAD/packages/adapters/src/docker-sequence-adapter.ts) - An adapter for preparing Sequence to be run in Docker container. +* [DockerInstanceAdapter](https://github.com/scramjetorg/transform-hub/tree/HEAD/packages/adapters/src/docker-instance-adapter.ts) - An adapter for running Instance by Runner executed in Docker container. + +## Docs + +See the code documentation here: [scramjetorg/transform-hub/docs/adapters/modules.md](https://github.com/scramjetorg/transform-hub/tree/HEAD/docs/adapters/modules.md) + +## Scramjet Transform Hub + +This package is part of [Scramjet Transform Hub](https://www.npmjs.org/package/@scramjet/sth). + +Scramjet Transform Hub is a deployment and execution platform. Once installed on a server, it will allow you to start your programs and keep them running on a remote machine. You will be able to start programs in the background or connect to them and see their output directly on your terminal. You will be able to pipe your local data to the program, as if it was running from your terminal. You can start your server in AWS, Google Cloud or Azure, start it on your local machine, install it on a Raspberry Pi or wherever else you'd like. + +## Use cases + +There's no limit what you can use it for. You want a stock checker? A chat bot? Maybe you'd like to automate your home? Retrieve sensor data? Maybe you have a lot of data and want to transfer and wrangle it? You have a database of cities and you'd like to enrich your data? You do machine learning and you want to train your set while the data is fetched in real time? Hey, you want to use it for something else and ask us if that's a good use? Ask us [via email](mailto:get@scramjet.org) or hop on our [Scramjet Discord](https://scr.je/join-community-mg1)! + +## Some important links + +* Scramjet, the company behind [Transform Hub](https://scramjet.org) +* The [Scramjet Framework - functional reactive stream processing framework](https://framework.scramjet.org) +* The [Transform Hub repo on github](https://github.com/scramjetorg/transform-hub) +* You can see the [Scramjet Transform Hub API docs here](https://github.com/scramjetorg/transform-hub/tree/HEAD/docs/api-client/README.md) +* You can see the [CLI documentation here](https://github.com/scramjetorg/transform-hub/tree/HEAD/packages/cli/README.md), but `si help` should also be quite effective. +* Don't forget to ⭐ this repo if you like it, `subscribe` to releases and keep visiting us for new versions and updates. +* You can [open an issue - file a bug report or a feature request here](https://github.com/scramjetorg/transform-hub/issues/new/choose) + +## License and contributions + +This module is licensed under AGPL-3.0 license. + +The Scramjet Transform Hub project is dual-licensed under the AGPL-3.0 and MIT licenses. Parts of the project that are linked with your programs are MIT licensed, the rest is AGPL. + +## Contributions + +We accept valid contributions and we will be publishing a more specific project roadmap so contributors can propose features and also help us implement them. We kindly ask you that contributed commits are Signed-Off `git commit --sign-off`. + +We provide support for contributors via test cases. If you expect a certain type of workflow to be officially supported, please specify and implement a test case in `Gherkin` format in `bdd` directory and include it in your pull request. More info about our BDD test you will find [here](https://github.com/scramjetorg/transform-hub/tree/HEAD/bdd/README.md). + +### Help wanted 👩‍🎓🧑👱‍♀️ + +The project need's your help! There's lots of work to do and we have a lot of plans. If you want to help and be part of the Scramjet team, please reach out to us, [on discord](https://scr.je/join-community-mg1) or email us: [opensource@scramjet.org](mailto:opensource@scramjet.org). + +### Donation 💸 + +Do you like this project? It helped you to reduce time spent on delivering your solution? You are welcome to buy us a coffee ☕ Thanks a lot! 😉 + +[You can sponsor us on github](https://github.com/sponsors/scramjetorg) + +* There's also a Paypal donation link if you prefer that: + +[![paypal](https://www.paypalobjects.com/en_US/i/btn/btn_donateCC_LG.gif)](https://www.paypal.com/cgi-bin/webscr?cmd=_s-xclick&hosted_button_id=7F7V65C43EBMW) + + + diff --git a/packages/adapter-k8s/package.json b/packages/adapter-k8s/package.json new file mode 100644 index 000000000..1c2f03050 --- /dev/null +++ b/packages/adapter-k8s/package.json @@ -0,0 +1,61 @@ +{ + "name": "@scramjet/adapter-k8s", + "version": "0.34.4", + "description": "This package is part of Scramjet Transform Hub. This module holds the docker adapters utilized by Scramjet Transform Hub", + "main": "./src/index.ts", + "scripts": { + "start": "ts-node ./src/index", + "build": "../../scripts/build-all.js --config-name=tsconfig.build.json --copy-dist", + "build:docs": "typedoc", + "clean": "rm -rf ./dist .bic_cache", + "cloc": "cloc src --fullpath --include-lang TypeScript --not-match-d \"(node_modules|test|dist|bdd)\" --by-percent cm", + "test": "nyc ava", + "prepack": "node ../../scripts/publish.js" + }, + "author": "Scramjet ", + "license": "AGPL-3.0", + "dependencies": { + "@scramjet/model": "^0.34.4", + "@scramjet/obj-logger": "^0.34.4", + "@scramjet/pre-runner": "^0.34.4", + "@scramjet/python-runner": "^0.34.4", + "@scramjet/runner": "^0.34.4", + "@scramjet/sth-config": "^0.34.4", + "@scramjet/symbols": "^0.34.4", + "@scramjet/utility": "^0.34.4", + "@scramjet/adapters-utils": "^0.34.4", + "@kubernetes/client-node": "^0.17.1", + "scramjet": "^4.36.9", + "shell-escape": "^0.2.0", + "systeminformation": "^5.12.7", + "ts.data.json": "^2.2.0" + }, + "devDependencies": { + "@scramjet/types": "^0.34.4", + "@types/js-yaml": "4.0.5", + "@types/node": "15.12.5", + "@types/request": "2.48.8", + "@types/shell-escape": "^0.2.1", + "@types/ws": "8.5.3", + "ava": "^3.15.0", + "ts-node": "^10.9.1", + "typedoc": "^0.23.17", + "typedoc-plugin-markdown": "^3.13.6", + "typescript": "~4.7.4" + }, + "ava": { + "extensions": [ + "ts" + ], + "files": [ + "**/*.spec.ts" + ], + "require": [ + "ts-node/register" + ] + }, + "repository": { + "type": "git", + "url": "https://github.com/scramjetorg/transform-hub.git" + } +} diff --git a/packages/adapter-k8s/src/index.ts b/packages/adapter-k8s/src/index.ts new file mode 100644 index 000000000..1e8f17476 --- /dev/null +++ b/packages/adapter-k8s/src/index.ts @@ -0,0 +1,11 @@ +/** + * Adapter module must provide SequenceAdapter, InstanceAdapter classes, init method and name field. + */ +export { KubernetesSequenceAdapter as SequenceAdapter } from "./kubernetes-sequence-adapter"; +export { KubernetesInstanceAdapter as InstanceAdapter } from "./kubernetes-instance-adapter"; + +export const init = (..._args: any[]) => { + return true; +}; + +export const name = "kubernetes"; diff --git a/packages/adapter-k8s/src/kubernetes-client-adapter.ts b/packages/adapter-k8s/src/kubernetes-client-adapter.ts new file mode 100644 index 000000000..6f8feabe1 --- /dev/null +++ b/packages/adapter-k8s/src/kubernetes-client-adapter.ts @@ -0,0 +1,212 @@ +import { IObjectLogger } from "@scramjet/types"; +import * as k8s from "@kubernetes/client-node"; +import { ObjLogger } from "@scramjet/obj-logger"; +import { defer } from "@scramjet/utility"; +import { Writable, Readable } from "stream"; +import http from "http"; +import { HttpError } from "@kubernetes/client-node"; + +const POD_STATUS_CHECK_INTERVAL_MS = 500; +const POD_STATUS_FAIL_LIMIT = 10; + +class KubernetesClientAdapter { + logger: IObjectLogger; + name = "KubernetesClientAdapter"; + + private _configPath: string; + private _config?: k8s.KubeConfig; + private _namespace: string; + + constructor(configPath: string = "", namespace: string = "default") { + this.logger = new ObjLogger(this.name); + this._configPath = configPath; + this._namespace = namespace; + } + + private get config(): k8s.KubeConfig { + if (!this._config) { + throw new Error("Kubernetes API client not initialized"); + } + + return this._config; + } + + public init() { + const kc = new k8s.KubeConfig(); + + try { + if (this._configPath && this._configPath.length) { + kc.loadFromFile(this._configPath); + } else { + kc.loadFromCluster(); + } + + this._config = kc; + } catch (err: any) { + this.logger.error("Unable to load kubeconfig", err); + } + } + + async createPod(metadata: k8s.V1ObjectMeta, spec: k8s.V1PodSpec, retries: number = 0) { + const kubeApi = this.config.makeApiClient(k8s.CoreV1Api); + + const result = await this.runWithRetries(retries, "Create Pod", () => + kubeApi.createNamespacedPod(this._namespace, { + apiVersion: "v1", + kind: "Pod", + metadata, + spec + }) + ); + + return result as { + response: http.IncomingMessage; + body: k8s.V1Pod; + }; + } + + async deletePod(podName: string, retries: number = 0) { + const kubeApi = this.config.makeApiClient(k8s.CoreV1Api); + + const result = await this.runWithRetries(retries, "Delete Pod", () => + kubeApi.deleteNamespacedPod(podName, this._namespace, undefined, undefined, 0) + ); + + return result as { + response: http.IncomingMessage; + body: k8s.V1Pod; + }; + } + + async exec(podName: string, containerName: string, command: string | string[], + stdout: Writable | null, stderr: Writable | null, stdin: Readable | null, retries: number = 0 + ) { + const exec = new k8s.Exec(this.config); + + await this.runWithRetries(retries, "Exec", () => + exec.exec(this._namespace, podName, containerName, command, stdout, stderr, stdin, false, + (...args) => this.logger.debug("exec status", ...args)) + ); + } + + async waitForPodStatus(podName: string, expectedStatuses: string[]): Promise<{ status: string, code?: number }> { + const kubeApi = this.config.makeApiClient(k8s.CoreV1Api); + + let failCount = 0; + + // eslint-disable-next-line no-constant-condition + while (true) { + try { + const response = await kubeApi.readNamespacedPodStatus(podName, this._namespace); + const status = response.body.status?.phase || ""; + + const container = (response.body.status?.containerStatuses || []).find(c => c.name === podName); + + if (expectedStatuses.includes(status)) { + return { + status, + code: container?.state?.terminated?.exitCode + }; + } + } catch (err: any) { + if (err instanceof HttpError) { + this.logger.error(`Status for "${podName}" pod responded with error`, err?.body?.message); + + if (err.statusCode === 404) { + this.logger.error("You have deleted this pod already! Try to increase runnerExitDelay in CSIController."); + } + } else { + this.logger.error(`Failed to get pod status: ${podName}.`, err); + } + + failCount++; + + if (failCount > POD_STATUS_FAIL_LIMIT) { + throw new Error("Reached the limit of failed pod status requests"); + } + } + + await defer(POD_STATUS_CHECK_INTERVAL_MS); + } + } + + async getPodLog(podName: string): Promise { + const kubeApi = this.config.makeApiClient(k8s.CoreV1Api); + const response = await kubeApi.readNamespacedPodLog( + podName, this._namespace, + undefined, false, undefined, undefined, + undefined, false, undefined, + 100, true + ); + + return [response.body]; + } + + async getPodTerminatedContainerReason(podName: string): Promise { + const kubeApi = this.config.makeApiClient(k8s.CoreV1Api); + const response = await kubeApi.readNamespacedPod(podName, this._namespace); + + return response.body.status?.containerStatuses?.[0].state?.terminated?.reason; + } + + async isPodsLimitReached(quotaName: string) { + const kubeApi = this.config.makeApiClient(k8s.CoreV1Api); + + try { + const getQuotaPromise = + await kubeApi.readNamespacedResourceQuota(quotaName, this._namespace); + + const responseBody = getQuotaPromise.body; + + if (responseBody) { + const used = parseInt(responseBody.status?.used?.pods || "", 10) || 0; + const hard = parseInt(responseBody.status?.hard?.pods || "", 10) || Infinity; + + this.logger.info("Pods limit quota", used, hard); + + return used >= hard; + } + } catch (e) { + this.logger.warn("Can't get quota object. "); + } + + return false; + } + + private async runWithRetries(retries: number, name: string, callback: any) { + let tries = 0; + let sleepMs = 1000; + let success = false; + let result: any = null; + + this.logger.debug(`Starting: ${name}...`); + + while (!success && tries <= retries) { + tries++; + + try { + result = await callback(); + + success = true; + } catch (err: any) { + if (err instanceof HttpError) { + this.logger.error(`Running "${name}" responded with error`, err?.body?.message); + } else { + this.logger.error(`Failed to run: ${name}.`, err); + } + + await defer(sleepMs); + + sleepMs *= 2; + } + } + + if (!success) { + throw new Error(`Failed to run: ${name} after ${tries} retries.`); + } + + return result; + } +} + +export { KubernetesClientAdapter }; diff --git a/packages/adapter-k8s/src/kubernetes-config-decoder.ts b/packages/adapter-k8s/src/kubernetes-config-decoder.ts new file mode 100644 index 000000000..d887a3a0e --- /dev/null +++ b/packages/adapter-k8s/src/kubernetes-config-decoder.ts @@ -0,0 +1,19 @@ +import { K8SAdapterConfiguration } from "@scramjet/types"; +import { JsonDecoder } from "ts.data.json"; + +export const adapterConfigDecoder = JsonDecoder.object({ + authConfigPath: JsonDecoder.optional(JsonDecoder.string), + namespace: JsonDecoder.string, + quotaName: JsonDecoder.optional(JsonDecoder.string), + sthPodHost: JsonDecoder.string, + runnerImages: JsonDecoder.object({ + python3: JsonDecoder.string, + node: JsonDecoder.string + }, "K8SImagesDecoder"), + sequencesRoot: JsonDecoder.string, + timeout: JsonDecoder.optional(JsonDecoder.string), + runnerResourcesRequestsMemory: JsonDecoder.optional(JsonDecoder.string), + runnerResourcesRequestsCpu: JsonDecoder.optional(JsonDecoder.string), + runnerResourcesLimitsMemory: JsonDecoder.optional(JsonDecoder.string), + runnerResourcesLimitsCpu: JsonDecoder.optional(JsonDecoder.string) +}, "K8SAdapterConfiguration"); diff --git a/packages/adapter-k8s/src/kubernetes-instance-adapter.ts b/packages/adapter-k8s/src/kubernetes-instance-adapter.ts new file mode 100644 index 000000000..e403de393 --- /dev/null +++ b/packages/adapter-k8s/src/kubernetes-instance-adapter.ts @@ -0,0 +1,216 @@ +/* eslint-disable no-console */ +import { + ExitCode, + IComponent, + ILifeCycleAdapterMain, + ILifeCycleAdapterRun, + InstanceConfig, + InstanceLimits, + IObjectLogger, + K8SAdapterConfiguration, + MonitoringMessageData, + STHConfiguration, +} from "@scramjet/types"; + +import path from "path"; +import { ObjLogger } from "@scramjet/obj-logger"; +import { createReadStream } from "fs"; +import { KubernetesClientAdapter } from "./kubernetes-client-adapter"; +import { adapterConfigDecoder } from "./kubernetes-config-decoder"; +import { getRunnerEnvEntries } from "@scramjet/adapters-utils"; +import { PassThrough } from "stream"; +import { RunnerExitCode } from "@scramjet/symbols"; + +/** + * Adapter for running Instance by Runner executed in separate process. + */ +class KubernetesInstanceAdapter implements +ILifeCycleAdapterMain, +ILifeCycleAdapterRun, +IComponent { + logger: IObjectLogger; + name = "KubernetesInstanceAdapter"; + + private _runnerName?: string; + private _kubeClient?: KubernetesClientAdapter; + + private adapterConfig: K8SAdapterConfiguration; + private _limits?: InstanceLimits = {}; + + get limits() { return this._limits || {} as InstanceLimits; } + private set limits(value: InstanceLimits) { this._limits = value; } + + constructor(sthConfig: STHConfiguration) { + // @TODO this is a redundant check (it was already checked in sequence adapter) + // We should move this to config service decoding: https://github.com/scramjetorg/transform-hub/issues/279 + const decodedAdapterConfig = adapterConfigDecoder.decode(sthConfig.kubernetes); + + if (!decodedAdapterConfig.isOk()) { + throw new Error("Invalid Kubernetes Adapter configuration"); + } + + this.adapterConfig = decodedAdapterConfig.value; + this.logger = new ObjLogger(this); + } + + private get kubeClient() { + if (!this._kubeClient) { + throw new Error("Kubernetes client not initialized"); + } + + return this._kubeClient; + } + + async init(): Promise { + this._kubeClient = new KubernetesClientAdapter(this.adapterConfig.authConfigPath, this.adapterConfig.namespace); + this.kubeClient.init(); + + this._kubeClient.logger.pipe(this.logger); + } + + async stats(msg: MonitoringMessageData): Promise { + return { + // @TODO: provide limits and stats + ...msg, + }; + } + + private get runnerResourcesConfig() { + return { + requests: { + memory: this.limits?.memory ? this.limits?.memory + "M" : this.adapterConfig.runnerResourcesRequestsMemory || "128M", + cpu: this.adapterConfig.runnerResourcesRequestsCpu || "250m" + }, + limits: { + memory: this.limits?.memory ? this.limits?.memory * 2 + "M" : this.adapterConfig.runnerResourcesLimitsMemory || "1G", + cpu: this.adapterConfig.runnerResourcesLimitsCpu || "1000m" + } + }; + } + + async run(config: InstanceConfig, instancesServerPort: number, instanceId: string): Promise { + if (config.type !== "kubernetes") { + throw new Error(`Invalid config type for kubernetes adapter: ${config.type}`); + } + + if (this.adapterConfig.quotaName && await this.kubeClient.isPodsLimitReached(this.adapterConfig.quotaName)) { + return RunnerExitCode.PODS_LIMIT_REACHED; + } + + this.limits = config.limits; + + const runnerName = this._runnerName = `runner-${ instanceId }`; + + this.logger.debug("Creating Runner Pod"); + + const env = + getRunnerEnvEntries({ + sequencePath: path.join("/package", config.entrypointPath), + instancesServerPort, + instancesServerHost: this.adapterConfig.sthPodHost, + instanceId, + pipesPath: "" + }).map(([name, value]) => ({ name, value })); + + const runnerImage = config.engines.python3 + ? this.adapterConfig.runnerImages.python3 + : this.adapterConfig.runnerImages.node; + + await this.kubeClient.createPod( + { + name: runnerName, + labels: { + app: "runner" + } + }, + { + containers: [{ + env, + name: runnerName, + image: runnerImage, + stdin: true, + command: ["wait-for-sequence-and-start.sh"], + imagePullPolicy: "Always", + resources: this.runnerResourcesConfig + }], + restartPolicy: "Never", + }, + 2 + ); + + const startPodStatus = await this.kubeClient.waitForPodStatus(runnerName, ["Running", "Failed"]); + + if (startPodStatus.status === "Failed") { + this.logger.error("Runner unable to start", startPodStatus); + + await this.remove(this.adapterConfig.timeout); + + // This means runner pod was unable to start. So it went from "Pending" to "Failed" state directly. + // Return 1 which is Linux exit code for "General Error" since we are not able + // to determine what happened exactly. + return startPodStatus.code || 137; + } + + this.logger.debug("Copy sequence files to Runner"); + + const compressedStream = createReadStream(path.join(config.sequenceDir, "compressed.tar.gz")); + const stdErrorStream = new PassThrough(); + + stdErrorStream.on("data", (data) => { this.logger.error("POD stderr", data.toString()); }); + + await this.kubeClient.exec(runnerName, runnerName, ["unpack.sh", "/package"], process.stdout, stdErrorStream, compressedStream, 2); + + const exitPodStatus = await this.kubeClient.waitForPodStatus(runnerName, ["Succeeded", "Failed", "Unknown"]); + + stdErrorStream.end(); + + if (exitPodStatus.status !== "Succeeded") { + this.logger.error("Runner stopped incorrectly", exitPodStatus); + this.logger.error("Container failure reason is: ", await this.kubeClient.getPodTerminatedContainerReason(runnerName)); + + return exitPodStatus.code || 137; + } + + this.logger.info("Runner stopped without issues"); + + await this.remove(this.adapterConfig.timeout); + + // @TODO handle error status + return 0; + } + + async cleanup(): Promise { + await this.remove(this.adapterConfig.timeout); + } + + // @ts-ignore + monitorRate(_rps: number): this { + /** ignore */ + } + + async timeout(ms: string) { + return new Promise(resolve => setTimeout(resolve, parseInt(ms, 10))); + } + + // Forcefully stops Runner process. + async remove(ms: string = "0") { + if (!this._runnerName) { + this.logger.error("Trying to stop non existent runner", this._runnerName); + } else { + await this.timeout(ms); + await this.kubeClient.deletePod(this._runnerName, 2); + + this._runnerName = undefined; + } + } + + async getCrashLog(): Promise { + if (this._kubeClient && this._runnerName) { + return this._kubeClient.getPodLog(this._runnerName); + } + + return ["Crashlog cannot be fetched"]; + } +} + +export { KubernetesInstanceAdapter }; diff --git a/packages/adapter-k8s/src/kubernetes-sequence-adapter.ts b/packages/adapter-k8s/src/kubernetes-sequence-adapter.ts new file mode 100644 index 000000000..e7b620be8 --- /dev/null +++ b/packages/adapter-k8s/src/kubernetes-sequence-adapter.ts @@ -0,0 +1,158 @@ +import { ObjLogger } from "@scramjet/obj-logger"; +import { + ISequenceAdapter, + STHConfiguration, + SequenceConfig, + IObjectLogger, + KubernetesSequenceConfig, + K8SAdapterConfiguration, +} from "@scramjet/types"; +import { Readable } from "stream"; +import { createReadStream, createWriteStream } from "fs"; +import fs from "fs/promises"; +import path from "path"; +import { exec } from "child_process"; +import { isDefined, readStreamedJSON } from "@scramjet/utility"; +import { detectLanguage, sequencePackageJSONDecoder } from "@scramjet/adapters-utils"; +import { adapterConfigDecoder } from "./kubernetes-config-decoder"; + +/** + * Returns existing Sequence configuration. + * + * @param {string} sequencesRoot Folder where sequences are located. + * @param {string} id Sequence Id. + * @returns {ProcessSequenceConfig} Sequence configuration. + */ +async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string): Promise { + const sequenceDir = path.join(sequencesRoot, id); + const packageJsonPath = path.join(sequenceDir, "package.json"); + const packageJson = await readStreamedJSON(createReadStream(packageJsonPath)); + + const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(packageJson); + const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {}; + + return { + type: "kubernetes", + entrypointPath: validPackageJson.main, + version: validPackageJson.version ?? "", + name: validPackageJson.name ?? "", + id, + sequenceDir, + engines, + description: validPackageJson.description, + author: validPackageJson.author, + keywords: validPackageJson.keywords, + args: validPackageJson.args, + repository: validPackageJson.repository, + language: detectLanguage(validPackageJson) + }; +} + +/** + * Adapter for preparing Sequence to be run in process. + */ +class KubernetesSequenceAdapter implements ISequenceAdapter { + logger: IObjectLogger; + + name = "KubernetesSequenceAdapter"; + + private adapterConfig: K8SAdapterConfiguration; + + constructor(sthConfig: STHConfiguration) { + const decodedAdapterConfig = adapterConfigDecoder.decode(sthConfig.kubernetes); + + if (!decodedAdapterConfig.isOk()) { + throw new Error("Invalid Kubernetes Adapter configuration"); + } + + this.adapterConfig = decodedAdapterConfig.value; + this.logger = new ObjLogger(this); + } + + /** + * Initializes adapter. + * + * @returns {Promise} Promise resolving after initialization. + */ + async init(): Promise { + await fs.access(this.adapterConfig.sequencesRoot) + .catch(() => fs.mkdir(this.adapterConfig.sequencesRoot)); + + this.logger.info("Kubernetes adapter initialized with options", { + "runner images": this.adapterConfig.runnerImages, + "sequences root": this.adapterConfig.sequencesRoot, + timeout: this.adapterConfig.timeout + }); + } + + /** + * Finds existing sequences. + * + * @returns {Promise} Promise resolving to array of identified sequences. + */ + async list(): Promise { + const storedSequencesIds = await fs.readdir(this.adapterConfig.sequencesRoot); + const sequencesConfigs = (await Promise.all( + storedSequencesIds + .map((id) => getRunnerConfigForStoredSequence(this.adapterConfig.sequencesRoot, id)) + .map((configPromised) => configPromised.catch(() => null)) + )) + .filter(isDefined); + + this.logger.debug(`Found ${sequencesConfigs.length} stored sequences`); + + return sequencesConfigs; + } + + /** + * Unpacks and identifies sequence. + * + * @param {Readable} stream Stream with packed sequence. + * @param {string} id Sequence Id. + * @param {boolean} override Removes previous sequence + * @returns {Promise} Promise resolving to identified sequence configuration. + */ + async identify(stream: Readable, id: string, override = false): Promise { + // 1. Unpack package.json to stdout and map to config + // 2. Create compressed package on the disk + const sequenceDir = path.join(this.adapterConfig.sequencesRoot, id); + + if (override) { + await fs.rm(sequenceDir, { recursive: true, force: true }); + } + + await fs.mkdir(sequenceDir); + + const compressedOut = createWriteStream(path.join(sequenceDir, "compressed.tar.gz")); + + // @TODO unpack only package.json + const uncompressingProc = exec(`tar zxf - -C ${sequenceDir}`); + + stream.pipe(uncompressingProc.stdin!); + stream.pipe(compressedOut); + + await new Promise(res => uncompressingProc.on("close", res)); + + return getRunnerConfigForStoredSequence(this.adapterConfig.sequencesRoot, id); + } + + /** + * Removes directory used to store sequence. + * + * @param {SequenceConfig} config Sequence configuration. + * @returns {Promise} Promise resolving after directory deletion. + */ + async remove(config: SequenceConfig) { + if (config.type !== "kubernetes") { + throw new Error(`Incorrect SequenceConfig passed to KubernetesSequenceAdapter: ${config.type}`); + } + + const sequenceDir = path.join(this.adapterConfig.sequencesRoot, config.id); + + this.logger.debug("Removing sequence directory...", sequenceDir); + + return fs.rm(sequenceDir, { recursive: true }); + } +} + +export { KubernetesSequenceAdapter }; diff --git a/packages/adapter-k8s/src/readme.mtpl b/packages/adapter-k8s/src/readme.mtpl new file mode 100644 index 000000000..237e68061 --- /dev/null +++ b/packages/adapter-k8s/src/readme.mtpl @@ -0,0 +1,18 @@ +# Scramjet Transform Hub Adapters + +This module holds two types of adapters utilized by Scramjet Transform Hub: Instance Adapter and Sequence Adapter. These Adapters allows for running the Sequence identification and Instance execution in standalone processes. + +The adapter provides two main exports: + +* [SequenceAdapter](./src/process-sequence-adapter.ts) - An adapter for preparing Sequence to be run in subprocess. +* [InstanceAdapter](./src/process-instance-adapter.ts) - An adapter for running Instance by Runner executed in subprocess. + +## Docs + +>!docs adapters/modules.md & + +>@sth +>@use-cases +>@links +>@license-agpl +>@contrib diff --git a/packages/adapter-k8s/test/pass.spec.ts b/packages/adapter-k8s/test/pass.spec.ts new file mode 100644 index 000000000..57dc6c738 --- /dev/null +++ b/packages/adapter-k8s/test/pass.spec.ts @@ -0,0 +1,5 @@ +import test from "ava"; + +test("Passing test", (t) => { + t.pass(); +}); diff --git a/packages/adapter-k8s/tsconfig.build.json b/packages/adapter-k8s/tsconfig.build.json new file mode 100644 index 000000000..e0848624e --- /dev/null +++ b/packages/adapter-k8s/tsconfig.build.json @@ -0,0 +1,9 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "./dist" + }, + "include": [ + "src/**/*" + ] +} diff --git a/packages/adapter-k8s/tsconfig.json b/packages/adapter-k8s/tsconfig.json new file mode 100644 index 000000000..aee20079b --- /dev/null +++ b/packages/adapter-k8s/tsconfig.json @@ -0,0 +1,22 @@ +{ + "compilerOptions": { + "outDir": "./dist" + }, + "extends": "../../tsconfig.base.json", + "include": [ + "./src", + "./test" + ], + "exclude": [ + "node_modules" + ], + "typedocOptions": { + "entryPoints": [ + "src/index.ts" + ], + "out": "../../docs/adapters", + "plugin": "typedoc-plugin-markdown", + "gitRevision": "HEAD", + "sort": "alphabetical" + } +}