diff --git a/packages/adapter-process/.eslintrc.js b/packages/adapter-process/.eslintrc.js new file mode 100644 index 000000000..0c111683c --- /dev/null +++ b/packages/adapter-process/.eslintrc.js @@ -0,0 +1,7 @@ +module.exports = { + ignorePatterns: [".eslintrc.js"], + parserOptions:{ + project: "./tsconfig.json", + tsconfigRootDir: __dirname + } +}; diff --git a/packages/adapter-process/.nycrc.json b/packages/adapter-process/.nycrc.json new file mode 100644 index 000000000..7797303b5 --- /dev/null +++ b/packages/adapter-process/.nycrc.json @@ -0,0 +1,13 @@ +{ + "all": true, + "include": [ + "dist/**/*" + ], + "exclude": [ + "**/*.spec.ts" + ], + "reporter": [ + "lcovonly", + "text" + ] +} diff --git a/packages/adapter-process/README.md b/packages/adapter-process/README.md new file mode 100644 index 000000000..36b74543f --- /dev/null +++ b/packages/adapter-process/README.md @@ -0,0 +1,61 @@ +# Scramjet Transform Hub Adapters + +This module holds two types of adapters utilized by Scramjet Transform Hub: Instance Adapter and Sequence Adapter. These Adapters allows for running the Sequence identification and Instance execution in two basic modes: as a non containerized standalone processes or in a Docker container. + +The adapter provides two main exports: + +* [DockerSequenceAdapter](https://github.com/scramjetorg/transform-hub/tree/HEAD/packages/adapters/src/docker-sequence-adapter.ts) - An adapter for preparing Sequence to be run in Docker container. +* [DockerInstanceAdapter](https://github.com/scramjetorg/transform-hub/tree/HEAD/packages/adapters/src/docker-instance-adapter.ts) - An adapter for running Instance by Runner executed in Docker container. + +## Docs + +See the code documentation here: [scramjetorg/transform-hub/docs/adapters/modules.md](https://github.com/scramjetorg/transform-hub/tree/HEAD/docs/adapters/modules.md) + +## Scramjet Transform Hub + +This package is part of [Scramjet Transform Hub](https://www.npmjs.org/package/@scramjet/sth). + +Scramjet Transform Hub is a deployment and execution platform. Once installed on a server, it will allow you to start your programs and keep them running on a remote machine. You will be able to start programs in the background or connect to them and see their output directly on your terminal. You will be able to pipe your local data to the program, as if it was running from your terminal. You can start your server in AWS, Google Cloud or Azure, start it on your local machine, install it on a Raspberry Pi or wherever else you'd like. + +## Use cases + +There's no limit what you can use it for. You want a stock checker? A chat bot? Maybe you'd like to automate your home? Retrieve sensor data? Maybe you have a lot of data and want to transfer and wrangle it? You have a database of cities and you'd like to enrich your data? You do machine learning and you want to train your set while the data is fetched in real time? Hey, you want to use it for something else and ask us if that's a good use? Ask us [via email](mailto:get@scramjet.org) or hop on our [Scramjet Discord](https://scr.je/join-community-mg1)! + +## Some important links + +* Scramjet, the company behind [Transform Hub](https://scramjet.org) +* The [Scramjet Framework - functional reactive stream processing framework](https://framework.scramjet.org) +* The [Transform Hub repo on github](https://github.com/scramjetorg/transform-hub) +* You can see the [Scramjet Transform Hub API docs here](https://github.com/scramjetorg/transform-hub/tree/HEAD/docs/api-client/README.md) +* You can see the [CLI documentation here](https://github.com/scramjetorg/transform-hub/tree/HEAD/packages/cli/README.md), but `si help` should also be quite effective. +* Don't forget to ⭐ this repo if you like it, `subscribe` to releases and keep visiting us for new versions and updates. +* You can [open an issue - file a bug report or a feature request here](https://github.com/scramjetorg/transform-hub/issues/new/choose) + +## License and contributions + +This module is licensed under AGPL-3.0 license. + +The Scramjet Transform Hub project is dual-licensed under the AGPL-3.0 and MIT licenses. Parts of the project that are linked with your programs are MIT licensed, the rest is AGPL. + +## Contributions + +We accept valid contributions and we will be publishing a more specific project roadmap so contributors can propose features and also help us implement them. We kindly ask you that contributed commits are Signed-Off `git commit --sign-off`. + +We provide support for contributors via test cases. If you expect a certain type of workflow to be officially supported, please specify and implement a test case in `Gherkin` format in `bdd` directory and include it in your pull request. More info about our BDD test you will find [here](https://github.com/scramjetorg/transform-hub/tree/HEAD/bdd/README.md). + +### Help wanted 👩‍🎓🧑👱‍♀️ + +The project need's your help! There's lots of work to do and we have a lot of plans. If you want to help and be part of the Scramjet team, please reach out to us, [on discord](https://scr.je/join-community-mg1) or email us: [opensource@scramjet.org](mailto:opensource@scramjet.org). + +### Donation 💸 + +Do you like this project? It helped you to reduce time spent on delivering your solution? You are welcome to buy us a coffee ☕ Thanks a lot! 😉 + +[You can sponsor us on github](https://github.com/sponsors/scramjetorg) + +* There's also a Paypal donation link if you prefer that: + +[![paypal](https://www.paypalobjects.com/en_US/i/btn/btn_donateCC_LG.gif)](https://www.paypal.com/cgi-bin/webscr?cmd=_s-xclick&hosted_button_id=7F7V65C43EBMW) + + + diff --git a/packages/adapter-process/package.json b/packages/adapter-process/package.json new file mode 100644 index 000000000..6dab7772a --- /dev/null +++ b/packages/adapter-process/package.json @@ -0,0 +1,60 @@ +{ + "name": "@scramjet/adapter-process", + "version": "0.34.4", + "description": "This package is part of Scramjet Transform Hub. This module holds the docker adapters utilized by Scramjet Transform Hub", + "main": "./src/index.ts", + "scripts": { + "start": "ts-node ./src/index", + "build": "../../scripts/build-all.js --config-name=tsconfig.build.json --copy-dist", + "build:docs": "typedoc", + "clean": "rm -rf ./dist .bic_cache", + "cloc": "cloc src --fullpath --include-lang TypeScript --not-match-d \"(node_modules|test|dist|bdd)\" --by-percent cm", + "test": "nyc ava", + "prepack": "node ../../scripts/publish.js" + }, + "author": "Scramjet ", + "license": "AGPL-3.0", + "dependencies": { + "@scramjet/model": "^0.34.4", + "@scramjet/obj-logger": "^0.34.4", + "@scramjet/pre-runner": "^0.34.4", + "@scramjet/python-runner": "^0.34.4", + "@scramjet/runner": "^0.34.4", + "@scramjet/sth-config": "^0.34.4", + "@scramjet/symbols": "^0.34.4", + "@scramjet/utility": "^0.34.4", + "@scramjet/adapters-utils": "^0.34.4", + "scramjet": "^4.36.9", + "shell-escape": "^0.2.0", + "systeminformation": "^5.12.7", + "ts.data.json": "^2.2.0" + }, + "devDependencies": { + "@scramjet/types": "^0.34.4", + "@types/js-yaml": "4.0.5", + "@types/node": "15.12.5", + "@types/request": "2.48.8", + "@types/shell-escape": "^0.2.1", + "@types/ws": "8.5.3", + "ava": "^3.15.0", + "ts-node": "^10.9.1", + "typedoc": "^0.23.17", + "typedoc-plugin-markdown": "^3.13.6", + "typescript": "~4.7.4" + }, + "ava": { + "extensions": [ + "ts" + ], + "files": [ + "**/*.spec.ts" + ], + "require": [ + "ts-node/register" + ] + }, + "repository": { + "type": "git", + "url": "https://github.com/scramjetorg/transform-hub.git" + } +} diff --git a/packages/adapter-process/src/index.ts b/packages/adapter-process/src/index.ts new file mode 100644 index 000000000..f8cc30093 --- /dev/null +++ b/packages/adapter-process/src/index.ts @@ -0,0 +1,8 @@ +/** + * Adapter module must provide SequenceAdapter, InstanceAdapter classes and name field. + */ + +export { ProcessSequenceAdapter as SequenceAdapter } from "./process-sequence-adapter"; +export { ProcessInstanceAdapter as InstanceAdapter } from "./process-instance-adapter"; + +export const name = "process"; diff --git a/packages/adapter-process/src/process-instance-adapter.ts b/packages/adapter-process/src/process-instance-adapter.ts new file mode 100644 index 000000000..583c1b8e2 --- /dev/null +++ b/packages/adapter-process/src/process-instance-adapter.ts @@ -0,0 +1,201 @@ +import { STHConfiguration, + ExitCode, + IComponent, + ILifeCycleAdapterMain, + ILifeCycleAdapterRun, + InstanceConfig, + InstanceLimits, + IObjectLogger, + MonitoringMessageData, + SequenceConfig +} from "@scramjet/types"; +import { ObjLogger } from "@scramjet/obj-logger"; +import { streamToString } from "@scramjet/utility"; +import { getRunnerEnvVariables } from "@scramjet/adapters-utils"; +import { ChildProcess, spawn } from "child_process"; + +import path from "path"; + +const isTSNode = !!(process as any)[Symbol.for("ts-node.register.instance")]; +const gotPython = "\n _ \n __ _____ _ __ ___ ___| |\n \\ \\ /\\ / / _ \\| '_ \\/ __|_ / |\n \\ V V / (_) | | | \\__ \\/ /|_|\n \\_/\\_/ \\___/|_| |_|___/___(_) 🐍\n"; + +/** + * Adapter for running Instance by Runner executed in separate process. + */ +class ProcessInstanceAdapter implements + ILifeCycleAdapterMain, + ILifeCycleAdapterRun, + IComponent { + logger: IObjectLogger; + sthConfig: STHConfiguration; + + private runnerProcess?: ChildProcess; + private crashLogStreams?: Promise; + private _limits?: InstanceLimits = {}; + + get limits() { return this._limits || {} as InstanceLimits; } + private set limits(value: InstanceLimits) { + this._limits = value; + this.logger.warn("Limits are not yet supported in process runner"); + } + + constructor(config: STHConfiguration) { + this.logger = new ObjLogger(this); + this.sthConfig = config; + } + + async init(): Promise { + // noop + } + + async stats(msg: MonitoringMessageData): Promise { + const { runnerProcess } = this; + + if (!runnerProcess) { + // Runner process not initialized yet + return msg; + } + + return { + // @TODO: Provide stats and limits + ...msg, + processId: this.runnerProcess?.pid + }; + } + + getRunnerCmd(config: SequenceConfig) { + const engines = Object.keys(config.engines); + let debugFlags: string[] = []; + + if (engines.length > 1) { + throw new Error("Incorrect config passed to SequenceConfig," + + "'engines' field can't contain more than one element"); + } + + if ("python3" in config.engines) { + this.logger.trace(gotPython); + const runnerPath = path.resolve(__dirname, require.resolve("@scramjet/python-runner")); + + if (this.sthConfig.debug) + debugFlags = ["-m", "pdb", "-c", "continue"]; + + return [ + "/usr/bin/env", + "python3", + ...debugFlags, + path.resolve(__dirname, runnerPath), + "./python-runner-startup.log", + ]; + } + if (this.sthConfig.debug) + debugFlags = ["--inspect-brk=9229"]; + + return [ + isTSNode ? "ts-node" : process.execPath, + ...debugFlags, + path.resolve(__dirname, + process.env.ESBUILD + ? "../../runner/bin/start-runner.js" + : require.resolve("@scramjet/runner") + ) + ]; + } + + getPythonpath(sequenceDir: string) { + // This is for running from source. When the package is built, dependencies + // are installed next to runner.py script (rather than in __pypackages__), + // but that directory is automatically included in PYTHONPATH. + let pythonpath = path.resolve( + __dirname, require.resolve("@scramjet/python-runner"), "../__pypackages__" + ); + + if (process.env.PYTHONPATH) pythonpath += `:${process.env.PYTHONPATH}`; + + pythonpath += `:${sequenceDir}/__pypackages__`; + + return pythonpath; + } + + async run(config: InstanceConfig, instancesServerPort: number, instanceId: string): Promise { + if (config.type !== "process") { + throw new Error("Process instance adapter run with invalid runner config"); + } + + this.logger.info("Instance preparation done"); + + this.logger.trace("Starting Runner", config.id); + + const runnerCommand = this.getRunnerCmd(config); + const sequencePath = path.join( + config.sequenceDir, + config.entrypointPath + ); + const env = getRunnerEnvVariables({ + sequencePath, + instancesServerHost: "127.0.0.1", + instancesServerPort, + instanceId, + pipesPath: "" + }, { + PYTHONPATH: this.getPythonpath(config.sequenceDir), + }); + + this.logger.debug("Spawning Runner process with command", runnerCommand); + this.logger.trace("Runner process environment", env); + + const runnerProcess = spawn(runnerCommand[0], runnerCommand.slice(1), { env }); + + this.crashLogStreams = Promise.all([runnerProcess.stdout, runnerProcess.stderr].map(streamToString)); + + this.logger.trace("Runner process is running", runnerProcess.pid); + + this.runnerProcess = runnerProcess; + + const [statusCode, signal] = await new Promise<[number | null, NodeJS.Signals | null]>( + (res) => runnerProcess.on("exit", (code, sig) => res([code, sig])) + ); + + this.logger.trace("Runner process exited", runnerProcess.pid); + + if (statusCode === null) { + this.logger.warn("Runner was killed by a signal, and didn't return a status code", signal); + + // Probably SIGIKLL + return 137; + } + + if (statusCode > 0) { + this.logger.debug("Process returned non-zero status code", statusCode); + } + + return statusCode; + } + + /** + * Performs cleanup after Runner end. + * Removes fifos used to communication with runner. + */ + async cleanup(): Promise { + //noop + } + + // @ts-ignore + monitorRate(_rps: number): this { + /** ignore */ + } + + /** + * Forcefully stops Runner process. + */ + async remove() { + this.runnerProcess?.kill(); + } + + async getCrashLog(): Promise { + if (!this.crashLogStreams) return []; + + return this.crashLogStreams; + } +} + +export { ProcessInstanceAdapter }; diff --git a/packages/adapter-process/src/process-sequence-adapter.ts b/packages/adapter-process/src/process-sequence-adapter.ts new file mode 100644 index 000000000..ff3b7d1af --- /dev/null +++ b/packages/adapter-process/src/process-sequence-adapter.ts @@ -0,0 +1,165 @@ +import { + ProcessSequenceConfig, + ISequenceAdapter, + STHConfiguration, + SequenceConfig, + IObjectLogger, +} from "@scramjet/types"; +import { ObjLogger } from "@scramjet/obj-logger"; +import { isDefined, readStreamedJSON } from "@scramjet/utility"; +import { detectLanguage, sequencePackageJSONDecoder } from "@scramjet/adapters-utils"; +import { SequenceAdapterError } from "@scramjet/model"; + +import { Readable } from "stream"; +import { createReadStream } from "fs"; +import fs from "fs/promises"; +import path from "path"; +import { exec } from "child_process"; + +/** + * Returns existing Sequence configuration. + * + * @param {string} sequencesRoot Folder where sequences are located. + * @param {string} id Sequence Id. + * @returns {ProcessSequenceConfig} Sequence configuration. + */ +// eslint-disable-next-line complexity +async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string): Promise { + const sequenceDir = path.join(sequencesRoot, id); + const packageJsonPath = path.join(sequenceDir, "package.json"); + const packageJson = await readStreamedJSON(createReadStream(packageJsonPath)); + + const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(packageJson); + const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {}; + + return { + type: "process", + engines, + entrypointPath: validPackageJson.main, + version: validPackageJson.version ?? "", + name: validPackageJson.name ?? "", + id, + sequenceDir, + description: validPackageJson.description, + author: validPackageJson.author, + keywords: validPackageJson.keywords, + args: validPackageJson.args, + repository: validPackageJson.repository, + language: detectLanguage(validPackageJson) + }; +} + +/** + * Adapter for preparing Sequence to be run in process. + */ +class ProcessSequenceAdapter implements ISequenceAdapter { + logger: IObjectLogger; + + name = "ProcessSequenceAdapter"; + config: NonNullable; + + constructor(config: STHConfiguration) { + this.logger = new ObjLogger(this); + this.config = config.adapters["@scramjet/adapter-process"]!; + } + + /** + * Initializes adapter. + * + * @returns {Promise} Promise resolving after initialization. + */ + async init(): Promise { + await fs.access(this.config.sequencesRoot) + .catch(() => fs.mkdir(this.config.sequencesRoot)); + + this.logger.info("Proces adapter initialized with options", { + "sequence root": this.config.sequencesRoot + }); + } + + /** + * Finds existing sequences. + * + * @returns {Promise} Promise resolving to array of identified sequences. + */ + async list(): Promise { + const storedSequencesIds = await fs.readdir(this.config.sequencesRoot); + const sequencesConfigs = (await Promise.all( + storedSequencesIds + .map((id) => getRunnerConfigForStoredSequence(this.config.sequencesRoot, id)) + .map((configPromised) => configPromised.catch(() => null)) + )) + .filter(isDefined); + + this.logger.debug(`Found ${sequencesConfigs.length} stored sequences`); + + return sequencesConfigs; + } + + /** + * Unpacks and identifies sequence. + * + * @param {Readable} stream Stream with packed sequence. + * @param {string} id Sequence Id. + * @param {boolean} override Removes previous sequence + * @returns {Promise} Promise resolving to identified sequence configuration. + */ + async identify(stream: Readable, id: string, override = false): Promise { + const sequenceDir = path.join(this.config.sequencesRoot, id); + + if (override) { + await fs.rm(sequenceDir, { recursive: true, force: true }); + } + + await fs.mkdir(sequenceDir, { recursive: true }); + + const uncompressingProc = exec(`tar zxf - -C ${sequenceDir} >/dev/null 2>&1 || echo >&2 '{"error":"Invalid pkg tar.gz archive"}' && exit 1`); + + stream.pipe(uncompressingProc.stdin!); + + const stderrChunks: string[] = []; + + uncompressingProc.stderr!.on("data", (chunk: Buffer) => { + stderrChunks.push(chunk.toString()); + }); + + await new Promise(res => uncompressingProc.on("close", res)); + + const stderrOutput = stderrChunks.join(""); + + if (stderrOutput) { + let preRunnenrError; + + try { + preRunnenrError = JSON.parse(stderrOutput); + this.logger.error("Unpacking sequence failed", stderrOutput); + } catch (e) { + throw new SequenceAdapterError("PRERUNNER_ERROR", `Error parsing ${stderrOutput}`); + } + + throw new SequenceAdapterError("PRERUNNER_ERROR", preRunnenrError.error); + } + + this.logger.debug("Unpacking sequence succeeded", stderrOutput); + + return getRunnerConfigForStoredSequence(this.config.sequencesRoot, id); + } + + /** + * Removes directory used to store sequence. + * + * @param {SequenceConfig} config Sequence configuration. + * @returns {Promise} Promise resolving after directory deletion. + */ + async remove(config: SequenceConfig) { + if (config.type !== "process") { + throw new Error(`Incorrect SequenceConfig passed to ProcessSequenceAdapter: ${config.type}`); + } + + const sequenceDir = path.join(this.config.sequencesRoot, config.id); + + return fs.rm(sequenceDir, { recursive: true }); + } +} + +export { ProcessSequenceAdapter }; diff --git a/packages/adapter-process/src/readme.mtpl b/packages/adapter-process/src/readme.mtpl new file mode 100644 index 000000000..237e68061 --- /dev/null +++ b/packages/adapter-process/src/readme.mtpl @@ -0,0 +1,18 @@ +# Scramjet Transform Hub Adapters + +This module holds two types of adapters utilized by Scramjet Transform Hub: Instance Adapter and Sequence Adapter. These Adapters allows for running the Sequence identification and Instance execution in standalone processes. + +The adapter provides two main exports: + +* [SequenceAdapter](./src/process-sequence-adapter.ts) - An adapter for preparing Sequence to be run in subprocess. +* [InstanceAdapter](./src/process-instance-adapter.ts) - An adapter for running Instance by Runner executed in subprocess. + +## Docs + +>!docs adapters/modules.md & + +>@sth +>@use-cases +>@links +>@license-agpl +>@contrib diff --git a/packages/adapter-process/test/pass.spec.ts b/packages/adapter-process/test/pass.spec.ts new file mode 100644 index 000000000..57dc6c738 --- /dev/null +++ b/packages/adapter-process/test/pass.spec.ts @@ -0,0 +1,5 @@ +import test from "ava"; + +test("Passing test", (t) => { + t.pass(); +}); diff --git a/packages/adapter-process/tsconfig.build.json b/packages/adapter-process/tsconfig.build.json new file mode 100644 index 000000000..e0848624e --- /dev/null +++ b/packages/adapter-process/tsconfig.build.json @@ -0,0 +1,9 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "./dist" + }, + "include": [ + "src/**/*" + ] +} diff --git a/packages/adapter-process/tsconfig.json b/packages/adapter-process/tsconfig.json new file mode 100644 index 000000000..aee20079b --- /dev/null +++ b/packages/adapter-process/tsconfig.json @@ -0,0 +1,22 @@ +{ + "compilerOptions": { + "outDir": "./dist" + }, + "extends": "../../tsconfig.base.json", + "include": [ + "./src", + "./test" + ], + "exclude": [ + "node_modules" + ], + "typedocOptions": { + "entryPoints": [ + "src/index.ts" + ], + "out": "../../docs/adapters", + "plugin": "typedoc-plugin-markdown", + "gitRevision": "HEAD", + "sort": "alphabetical" + } +} diff --git a/packages/adapters-utils/.eslintrc.js b/packages/adapters-utils/.eslintrc.js new file mode 100644 index 000000000..0c111683c --- /dev/null +++ b/packages/adapters-utils/.eslintrc.js @@ -0,0 +1,7 @@ +module.exports = { + ignorePatterns: [".eslintrc.js"], + parserOptions:{ + project: "./tsconfig.json", + tsconfigRootDir: __dirname + } +}; diff --git a/packages/adapters-utils/.nycrc.json b/packages/adapters-utils/.nycrc.json new file mode 100644 index 000000000..7797303b5 --- /dev/null +++ b/packages/adapters-utils/.nycrc.json @@ -0,0 +1,13 @@ +{ + "all": true, + "include": [ + "dist/**/*" + ], + "exclude": [ + "**/*.spec.ts" + ], + "reporter": [ + "lcovonly", + "text" + ] +} diff --git a/packages/adapters-utils/README.md b/packages/adapters-utils/README.md new file mode 100644 index 000000000..36b74543f --- /dev/null +++ b/packages/adapters-utils/README.md @@ -0,0 +1,61 @@ +# Scramjet Transform Hub Adapters + +This module holds two types of adapters utilized by Scramjet Transform Hub: Instance Adapter and Sequence Adapter. These Adapters allows for running the Sequence identification and Instance execution in two basic modes: as a non containerized standalone processes or in a Docker container. + +The adapter provides two main exports: + +* [DockerSequenceAdapter](https://github.com/scramjetorg/transform-hub/tree/HEAD/packages/adapters/src/docker-sequence-adapter.ts) - An adapter for preparing Sequence to be run in Docker container. +* [DockerInstanceAdapter](https://github.com/scramjetorg/transform-hub/tree/HEAD/packages/adapters/src/docker-instance-adapter.ts) - An adapter for running Instance by Runner executed in Docker container. + +## Docs + +See the code documentation here: [scramjetorg/transform-hub/docs/adapters/modules.md](https://github.com/scramjetorg/transform-hub/tree/HEAD/docs/adapters/modules.md) + +## Scramjet Transform Hub + +This package is part of [Scramjet Transform Hub](https://www.npmjs.org/package/@scramjet/sth). + +Scramjet Transform Hub is a deployment and execution platform. Once installed on a server, it will allow you to start your programs and keep them running on a remote machine. You will be able to start programs in the background or connect to them and see their output directly on your terminal. You will be able to pipe your local data to the program, as if it was running from your terminal. You can start your server in AWS, Google Cloud or Azure, start it on your local machine, install it on a Raspberry Pi or wherever else you'd like. + +## Use cases + +There's no limit what you can use it for. You want a stock checker? A chat bot? Maybe you'd like to automate your home? Retrieve sensor data? Maybe you have a lot of data and want to transfer and wrangle it? You have a database of cities and you'd like to enrich your data? You do machine learning and you want to train your set while the data is fetched in real time? Hey, you want to use it for something else and ask us if that's a good use? Ask us [via email](mailto:get@scramjet.org) or hop on our [Scramjet Discord](https://scr.je/join-community-mg1)! + +## Some important links + +* Scramjet, the company behind [Transform Hub](https://scramjet.org) +* The [Scramjet Framework - functional reactive stream processing framework](https://framework.scramjet.org) +* The [Transform Hub repo on github](https://github.com/scramjetorg/transform-hub) +* You can see the [Scramjet Transform Hub API docs here](https://github.com/scramjetorg/transform-hub/tree/HEAD/docs/api-client/README.md) +* You can see the [CLI documentation here](https://github.com/scramjetorg/transform-hub/tree/HEAD/packages/cli/README.md), but `si help` should also be quite effective. +* Don't forget to ⭐ this repo if you like it, `subscribe` to releases and keep visiting us for new versions and updates. +* You can [open an issue - file a bug report or a feature request here](https://github.com/scramjetorg/transform-hub/issues/new/choose) + +## License and contributions + +This module is licensed under AGPL-3.0 license. + +The Scramjet Transform Hub project is dual-licensed under the AGPL-3.0 and MIT licenses. Parts of the project that are linked with your programs are MIT licensed, the rest is AGPL. + +## Contributions + +We accept valid contributions and we will be publishing a more specific project roadmap so contributors can propose features and also help us implement them. We kindly ask you that contributed commits are Signed-Off `git commit --sign-off`. + +We provide support for contributors via test cases. If you expect a certain type of workflow to be officially supported, please specify and implement a test case in `Gherkin` format in `bdd` directory and include it in your pull request. More info about our BDD test you will find [here](https://github.com/scramjetorg/transform-hub/tree/HEAD/bdd/README.md). + +### Help wanted 👩‍🎓🧑👱‍♀️ + +The project need's your help! There's lots of work to do and we have a lot of plans. If you want to help and be part of the Scramjet team, please reach out to us, [on discord](https://scr.je/join-community-mg1) or email us: [opensource@scramjet.org](mailto:opensource@scramjet.org). + +### Donation 💸 + +Do you like this project? It helped you to reduce time spent on delivering your solution? You are welcome to buy us a coffee ☕ Thanks a lot! 😉 + +[You can sponsor us on github](https://github.com/sponsors/scramjetorg) + +* There's also a Paypal donation link if you prefer that: + +[![paypal](https://www.paypalobjects.com/en_US/i/btn/btn_donateCC_LG.gif)](https://www.paypal.com/cgi-bin/webscr?cmd=_s-xclick&hosted_button_id=7F7V65C43EBMW) + + + diff --git a/packages/adapters-utils/package.json b/packages/adapters-utils/package.json new file mode 100644 index 000000000..1cf4172b8 --- /dev/null +++ b/packages/adapters-utils/package.json @@ -0,0 +1,59 @@ +{ + "name": "@scramjet/adapters-utils", + "version": "0.34.4", + "description": "This package is part of Scramjet Transform Hub. This module holds the docker adapters utilized by Scramjet Transform Hub", + "main": "./src/index.ts", + "scripts": { + "start": "ts-node ./src/index", + "build": "../../scripts/build-all.js --config-name=tsconfig.build.json --copy-dist", + "build:docs": "typedoc", + "clean": "rm -rf ./dist .bic_cache", + "cloc": "cloc src --fullpath --include-lang TypeScript --not-match-d \"(node_modules|test|dist|bdd)\" --by-percent cm", + "test": "nyc ava", + "prepack": "node ../../scripts/publish.js" + }, + "author": "Scramjet ", + "license": "AGPL-3.0", + "dependencies": { + "@scramjet/model": "^0.34.4", + "@scramjet/obj-logger": "^0.34.4", + "@scramjet/pre-runner": "^0.34.4", + "@scramjet/python-runner": "^0.34.4", + "@scramjet/runner": "^0.34.4", + "@scramjet/sth-config": "^0.34.4", + "@scramjet/symbols": "^0.34.4", + "@scramjet/utility": "^0.34.4", + "scramjet": "^4.36.9", + "shell-escape": "^0.2.0", + "systeminformation": "^5.12.7", + "ts.data.json": "^2.2.0" + }, + "devDependencies": { + "@scramjet/types": "^0.34.4", + "@types/js-yaml": "4.0.5", + "@types/node": "15.12.5", + "@types/request": "2.48.8", + "@types/shell-escape": "^0.2.1", + "@types/ws": "8.5.3", + "ava": "^3.15.0", + "ts-node": "^10.9.1", + "typedoc": "^0.23.17", + "typedoc-plugin-markdown": "^3.13.6", + "typescript": "~4.7.4" + }, + "ava": { + "extensions": [ + "ts" + ], + "files": [ + "**/*.spec.ts" + ], + "require": [ + "ts-node/register" + ] + }, + "repository": { + "type": "git", + "url": "https://github.com/scramjetorg/transform-hub.git" + } +} diff --git a/packages/adapters-utils/src/get-runner-env.ts b/packages/adapters-utils/src/get-runner-env.ts new file mode 100644 index 000000000..e658ac99e --- /dev/null +++ b/packages/adapters-utils/src/get-runner-env.ts @@ -0,0 +1,40 @@ +import path from "path"; +import { RunnerEnvConfig, RunnerEnvironmentVariables } from "./types"; + +/** + * Genrates the required runner env variables + * + * @param conf main parameters + * @param extra any extra parameters + * @returns env vars + */ +export function getRunnerEnvVariables({ + sequencePath, instancesServerPort, instancesServerHost, instanceId, pipesPath, paths = "posix" +}: RunnerEnvConfig, extra: Record = {}): RunnerEnvironmentVariables { + const join = path[paths].join; + + return { + PATH: process.env.PATH, + DEVELOPMENT: process.env.DEVELOPMENT, + PRODUCTION: process.env.PRODUCTION, + SEQUENCE_PATH: sequencePath, + INSTANCES_SERVER_PORT: `${instancesServerPort}`, + INSTANCES_SERVER_HOST: instancesServerHost, + INSTANCE_ID: instanceId, + PIPES_LOCATION: pipesPath, + CRASH_LOG: join(pipesPath, "crash_log"), + ...extra + }; +} + +/** + * Genrates the required runner env variables as Object.entries + * + * @param conf main parameters + * @param extra any extra parameters + * @returns env vars as entries + */ +export function getRunnerEnvEntries(conf: RunnerEnvConfig, extra: Record = {}) { + return Object.entries(getRunnerEnvVariables(conf, extra)); +} + diff --git a/packages/adapters-utils/src/index.ts b/packages/adapters-utils/src/index.ts new file mode 100644 index 000000000..bb8daff3a --- /dev/null +++ b/packages/adapters-utils/src/index.ts @@ -0,0 +1,4 @@ +export * from "./get-runner-env"; +export * from "./types"; +export * from "./utils"; +export * from "./validate-sequence-package-json"; diff --git a/packages/adapters-utils/src/readme.mtpl b/packages/adapters-utils/src/readme.mtpl new file mode 100644 index 000000000..237e68061 --- /dev/null +++ b/packages/adapters-utils/src/readme.mtpl @@ -0,0 +1,18 @@ +# Scramjet Transform Hub Adapters + +This module holds two types of adapters utilized by Scramjet Transform Hub: Instance Adapter and Sequence Adapter. These Adapters allows for running the Sequence identification and Instance execution in standalone processes. + +The adapter provides two main exports: + +* [SequenceAdapter](./src/process-sequence-adapter.ts) - An adapter for preparing Sequence to be run in subprocess. +* [InstanceAdapter](./src/process-instance-adapter.ts) - An adapter for running Instance by Runner executed in subprocess. + +## Docs + +>!docs adapters/modules.md & + +>@sth +>@use-cases +>@links +>@license-agpl +>@contrib diff --git a/packages/adapters-utils/src/types.ts b/packages/adapters-utils/src/types.ts new file mode 100644 index 000000000..fca681f76 --- /dev/null +++ b/packages/adapters-utils/src/types.ts @@ -0,0 +1,27 @@ +import { InstanceId } from "@scramjet/types"; + +export type InstanceAdapterOptions = { + exitDelay: number; +} + +export type RunnerEnvConfig = { + paths?: "posix" | "win32" + sequencePath: string; + pipesPath: string; + instancesServerPort: number; + instancesServerHost: string; + instanceId: InstanceId; +} + +export type RunnerEnvironmentVariables = Partial<{ + PATH: string; + DEVELOPMENT: string; + PRODUCTION: string; + SEQUENCE_PATH: string; + INSTANCES_SERVER_PORT: string; + INSTANCES_SERVER_HOST: string; + INSTANCE_ID: string; + PIPES_LOCATION: string; + CRASH_LOG: string; + [key: string]: string; +}>; diff --git a/packages/adapters-utils/src/utils.ts b/packages/adapters-utils/src/utils.ts new file mode 100644 index 000000000..ce2e872cd --- /dev/null +++ b/packages/adapters-utils/src/utils.ts @@ -0,0 +1,16 @@ +/** + * Determines Sequence language by checking file extension in `main` field. + * When failed, checks `engines` field for `node`. + * Returns "unknown" when language can't be determined with this methods. + * + * @param {any} packageJson package.json contents + * @returns {string} Detected language or "unknown" + */ +export const detectLanguage = (packageJson: {[key: string]: any}) => { + if (packageJson.engines) { + if ("python3" in packageJson.engines) return "py"; + if ("node" in packageJson.engines) return "js"; + } + + return (packageJson.main?.match(/(?:\.)([^.\\/:*?"<>|\r\n]+$)/) || { 1: undefined })[1] || "unknown"; +}; diff --git a/packages/adapters-utils/src/validate-sequence-package-json.ts b/packages/adapters-utils/src/validate-sequence-package-json.ts new file mode 100644 index 000000000..fca2df88b --- /dev/null +++ b/packages/adapters-utils/src/validate-sequence-package-json.ts @@ -0,0 +1,52 @@ +import { + PortConfig, + SequencePackageJSON, + SequencePackageJSONScramjetConfig, + SequencePackageJSONScramjetSection +} from "@scramjet/types"; + +import { Err, JsonDecoder, Ok } from "ts.data.json"; + +const enginesDecoder = JsonDecoder.dictionary(JsonDecoder.string, "EnginesDecoder"); + +const portDecoder = new JsonDecoder.Decoder((val) => { + const isValid = typeof val === "string" && (/^\d{3,5}\/(tcp|udp)$/).test(val); + + return isValid ? new Ok(val as PortConfig) : new Err("Invalid port format"); +}); + +const configDecoder = JsonDecoder.object({ + ports: JsonDecoder.optional(JsonDecoder.array(portDecoder, "PortsDecoder")) +}, "ConfigDecoder"); + +const scramjetDecoder = JsonDecoder.object({ + config: JsonDecoder.optional(configDecoder) +}, "ScramjetSectionDecoder"); + +type RepositoryObject = { + type: string; + url: string; + directory?: string; +} + +const repositoryDecoder = JsonDecoder.oneOf([ + JsonDecoder.string, + JsonDecoder.object({ + type: JsonDecoder.string, + url: JsonDecoder.string, + directory: JsonDecoder.optional(JsonDecoder.string) + }, "repositoryObjectDecoder") +], "repositoryDecoder"); + +export const sequencePackageJSONDecoder = JsonDecoder.object({ + name: JsonDecoder.optional(JsonDecoder.string), + version: JsonDecoder.optional(JsonDecoder.string), + main: JsonDecoder.string, + engines: JsonDecoder.optional(enginesDecoder), + scramjet: JsonDecoder.optional(scramjetDecoder), + description: JsonDecoder.optional(JsonDecoder.string), + author: JsonDecoder.optional(JsonDecoder.string), + keywords: JsonDecoder.optional(JsonDecoder.array(JsonDecoder.string, "keywordsDecoder")), + repository: JsonDecoder.optional(repositoryDecoder), + args: JsonDecoder.optional(JsonDecoder.array(JsonDecoder.succeed, "argsDecoder")), +}, "SequencePackageJSON"); diff --git a/packages/adapters-utils/test/pass.spec.ts b/packages/adapters-utils/test/pass.spec.ts new file mode 100644 index 000000000..57dc6c738 --- /dev/null +++ b/packages/adapters-utils/test/pass.spec.ts @@ -0,0 +1,5 @@ +import test from "ava"; + +test("Passing test", (t) => { + t.pass(); +}); diff --git a/packages/adapters-utils/tsconfig.build.json b/packages/adapters-utils/tsconfig.build.json new file mode 100644 index 000000000..e0848624e --- /dev/null +++ b/packages/adapters-utils/tsconfig.build.json @@ -0,0 +1,9 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "./dist" + }, + "include": [ + "src/**/*" + ] +} diff --git a/packages/adapters-utils/tsconfig.json b/packages/adapters-utils/tsconfig.json new file mode 100644 index 000000000..aee20079b --- /dev/null +++ b/packages/adapters-utils/tsconfig.json @@ -0,0 +1,22 @@ +{ + "compilerOptions": { + "outDir": "./dist" + }, + "extends": "../../tsconfig.base.json", + "include": [ + "./src", + "./test" + ], + "exclude": [ + "node_modules" + ], + "typedocOptions": { + "entryPoints": [ + "src/index.ts" + ], + "out": "../../docs/adapters", + "plugin": "typedoc-plugin-markdown", + "gitRevision": "HEAD", + "sort": "alphabetical" + } +} diff --git a/packages/adapters/src/process-sequence-adapter.ts b/packages/adapters/src/process-sequence-adapter.ts index 955b19a50..02c287480 100644 --- a/packages/adapters/src/process-sequence-adapter.ts +++ b/packages/adapters/src/process-sequence-adapter.ts @@ -56,9 +56,11 @@ class ProcessSequenceAdapter implements ISequenceAdapter { logger: IObjectLogger; name = "ProcessSequenceAdapter"; + config: NonNullable; - constructor(private config: STHConfiguration) { + constructor(config: STHConfiguration) { this.logger = new ObjLogger(this); + this.config = config.adapters["@scramjet/adapter-process"]!; } /** diff --git a/packages/host/src/lib/adapter-manager.ts b/packages/host/src/lib/adapter-manager.ts new file mode 100644 index 000000000..e923b27d2 --- /dev/null +++ b/packages/host/src/lib/adapter-manager.ts @@ -0,0 +1,47 @@ +import { ObjLogger } from "@scramjet/obj-logger"; +import { IRuntimeAdapter, STHConfiguration } from "@scramjet/types"; + +export class AdapterManager { + adapters: { [key: string]: IRuntimeAdapter } = {}; + sthConfig: STHConfiguration; + + logger = new ObjLogger(this); + + constructor(sthConfig: STHConfiguration) { + this.sthConfig = sthConfig; + } + + async init() { + this.logger.info("Loading adapters...", Object.keys(this.sthConfig.adapters)); + + this.adapters = (await Promise.all( + Object.keys(this.sthConfig.adapters).map(async (pkgName: string) => ({ name: pkgName, pkg: await import(pkgName) }) + ))).reduce((acc, pkg) => { + if (!AdapterManager.validateAdapter(pkg.pkg)) { + throw new Error(`Invalid adapter provided ${pkg.name}`); + }; + + if (acc[pkg.name]) throw new Error("Invalid adapters configuration, duplicated adapter name"); + + acc[pkg.name] = pkg.pkg; + + return acc; + }, {} as { [key: string]: IRuntimeAdapter }); + + const adaptersCount = Object.keys(this.adapters).length; + + if (adaptersCount) { + this.logger.info(`${adaptersCount} Adapters available:`, this.adapters); + } else { + this.logger.warn("No adapters defined. Sequences and Instances unsupported."); + } + } + + static validateAdapter(adapter: IRuntimeAdapter): boolean { + return !!(adapter.name.trim() && ["SequenceAdapter", "InstanceAdapter"].every((className: string) => className in adapter)); + } + + getAdapterByName(name: string): IRuntimeAdapter | undefined { + return Object.values(this.adapters).find(a => a.name === name); + } +} diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index 99110f3e7..4aea77bd9 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -48,6 +48,7 @@ import { cpus, totalmem } from "os"; import { S3Client } from "./s3-client"; import { DuplexStream } from "@scramjet/api-server"; import { readFileSync } from "fs"; +import { AdapterManager } from "./adapter-manager"; const buildInfo = readJsonFile("build.info", __dirname, ".."); const packageFile = findPackage(__dirname).next(); @@ -103,6 +104,7 @@ export class Host implements IComponent { * Instance of CPMConnector used to communicate with Manager. */ cpmConnector?: CPMConnector; + adapterManager!: AdapterManager; /** * Object to store CSIControllers. @@ -314,6 +316,11 @@ export class Host implements IComponent { await this.identifyExistingSequences(); } + this.adapterManager = new AdapterManager(this.config); + this.adapterManager.logger.pipe(this.logger); + + await this.adapterManager.init(); + const adapter = await initializeRuntimeAdapters(this.config); this.adapterName = adapter; diff --git a/packages/sth-config/src/config-service.ts b/packages/sth-config/src/config-service.ts index 6e987b850..30c358791 100644 --- a/packages/sth-config/src/config-service.ts +++ b/packages/sth-config/src/config-service.ts @@ -50,7 +50,6 @@ const _defaultConfig: STHConfiguration = { }, safeOperationLimit: 512, runtimeAdapter: "detect", - sequencesRoot: path.join(homedir(), ".scramjet_sequences"), kubernetes: { namespace: "default", authConfigPath: undefined, @@ -78,6 +77,11 @@ const _defaultConfig: STHConfiguration = { replaceTimestamp: true, labels: { module: "host", job: "telemetry" } } + }, + adapters: { + "@scramjet/adapter-process": { + sequencesRoot: path.join(homedir(), ".scramjet_sequences"), + } } }; @@ -119,7 +123,6 @@ export class ConfigService { static getConfigInfo(config: STHConfiguration): PublicSTHConfiguration { const { kubernetes: kubeFull, - sequencesRoot: optionsSequencesRoot2, cpmSslCaPath: optionsCpmSslCaPath, ...safe } = config; diff --git a/packages/sth/src/bin/hub.ts b/packages/sth/src/bin/hub.ts index 5391e01af..111f06cf5 100755 --- a/packages/sth/src/bin/hub.ts +++ b/packages/sth/src/bin/hub.ts @@ -133,7 +133,6 @@ const options: OptionValues & STHCommandOptions = program id: options.id }, runtimeAdapter: getRuntimeAdapterOption(options), - sequencesRoot: resolveFile(options.sequencesRoot), startupConfig: resolveFile(options.startupConfig), identifyExisting: options.identifyExisting, exitWithLastInstance: options.exitWithLastInstance, diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index 8172d2238..5a686fff2 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -47,7 +47,7 @@ export { STHRestAPI }; export * from "./sequence-package-json"; export * from "./sequence-adapter"; - +export * from "./runtime-adapter"; export * from "./dto/index"; export * from "./rest-api-error/rest-api-error"; diff --git a/packages/types/src/runtime-adapter.ts b/packages/types/src/runtime-adapter.ts new file mode 100644 index 000000000..467a7a096 --- /dev/null +++ b/packages/types/src/runtime-adapter.ts @@ -0,0 +1,7 @@ +import { ILifeCycleAdapterMain, ILifeCycleAdapterRun, IComponent, ISequenceAdapter } from "./index" + +export interface IRuntimeAdapter extends ILifeCycleAdapterMain, ILifeCycleAdapterRun, IComponent { + SequenceAdapter: ISequenceAdapter; + InstanceAdapter: ILifeCycleAdapterMain & ILifeCycleAdapterRun & IComponent; + name: string; +} diff --git a/packages/types/src/sth-configuration.ts b/packages/types/src/sth-configuration.ts index 4983b145d..fdf6ae989 100644 --- a/packages/types/src/sth-configuration.ts +++ b/packages/types/src/sth-configuration.ts @@ -68,7 +68,14 @@ export type HostConfig = { * Host information filepath. */ infoFilePath: string; -} +}; + +export type ProcessAdapterConfiguration = { + /** + * Where should ProcessSequenceAdapter save new Sequences + */ + sequencesRoot: string; +}; export type K8SAdapterConfiguration = { /** @@ -123,12 +130,12 @@ export type STHConfiguration = { /** * Logging level. */ - logLevel: LogLevel + logLevel: LogLevel; /** * Enable colors in logging. */ - logColors: boolean, + logColors: boolean; /** * CPM url. @@ -146,8 +153,8 @@ export type STHConfiguration = { cpmId: string; cpm: { - maxReconnections: number, - reconnectionDelay: number + maxReconnections: number; + reconnectionDelay: number; }; platform?: { @@ -170,17 +177,17 @@ export type STHConfiguration = { /** * PreRunner container configuration. */ - prerunner: PreRunnerContainerConfiguration, + prerunner: PreRunnerContainerConfiguration; /** * Runner container configuration. */ - runner: RunnerContainerConfiguration, + runner: RunnerContainerConfiguration; runnerImages: { - python3: string, - node: string, - }, - }, + python3: string; + node: string; + } + }; /** * Host configuration. @@ -221,29 +228,23 @@ export type STHConfiguration = { * Which sequence and instance adapters should STH use. * One of 'docker', 'process', 'kubernetes', 'detect' */ - runtimeAdapter: string, + runtimeAdapter: string; /** * Kubernetes adapter configuration */ - kubernetes: Partial, - - /** - * Only used when `noDocker` is true - * Where should ProcessSequenceAdapter save new Sequences - */ - sequencesRoot: string, + kubernetes: Partial; /** * Provides the location of a config file with the list of sequences * to be started along with the host */ - startupConfig: string, + startupConfig: string; /** * Should the hub exit when the last instance ends */ - exitWithLastInstance: boolean, + exitWithLastInstance: boolean; /** * Various timeout and interval configurations @@ -252,7 +253,7 @@ export type STHConfiguration = { /** * Heartbeat interval in miliseconds */ - heartBeatInterval: number, + heartBeatInterval: number; /** * Time to wait after Runner container exit. @@ -266,9 +267,14 @@ export type STHConfiguration = { instanceLifetimeExtensionDelay: number; }; - telemetry: TelemetryConfig + telemetry: TelemetryConfig; + + adapters: { + "@scramjet/adapter-k8s"?: K8SAdapterConfiguration; + "@scramjet/adapter-process"?: ProcessAdapterConfiguration; + } } -export type PublicSTHConfiguration = Omit, "cpmSslCaPath">, "kubernetes"> & { +export type PublicSTHConfiguration = Omit, "kubernetes"> & { kubernetes: Omit, "authConfigPath">, "sequencesRoot"> };