From c3485363c1a03b9165006bc1a3e03efadff632d5 Mon Sep 17 00:00:00 2001 From: patuwwy Date: Thu, 29 Jun 2023 10:21:48 +0000 Subject: [PATCH] @scramjet/adapter-docker --- packages/adapter-docker/.eslintrc.js | 7 + packages/adapter-docker/.nycrc.json | 13 + packages/adapter-docker/README.md | 61 +++ packages/adapter-docker/package.json | 61 +++ .../src/docker-instance-adapter.ts | 285 +++++++++++++ .../adapter-docker/src/docker-networking.ts | 41 ++ .../src/docker-sequence-adapter.ts | 298 +++++++++++++ .../src/dockerode-docker-helper.ts | 398 ++++++++++++++++++ packages/adapter-docker/src/index.ts | 11 + packages/adapter-docker/src/readme.mtpl | 18 + packages/adapter-docker/src/types.ts | 330 +++++++++++++++ packages/adapter-docker/test/pass.spec.ts | 5 + packages/adapter-docker/tsconfig.build.json | 9 + packages/adapter-docker/tsconfig.json | 22 + packages/adapter-process/src/index.ts | 7 +- .../src/process-instance-adapter.ts | 2 +- .../adapters/src/docker-sequence-adapter.ts | 34 +- packages/cli/src/lib/commands/sequence.ts | 6 +- packages/host/src/lib/adapter-manager.ts | 10 +- packages/host/src/lib/csi-controller.ts | 2 +- packages/host/src/lib/host.ts | 20 +- packages/sth-config/src/config-service.ts | 38 +- packages/sth/src/bin/hub.ts | 28 +- packages/types/src/index.ts | 2 +- packages/types/src/instance-adapter.ts | 4 + packages/types/src/runtime-adapter.ts | 6 +- packages/types/src/sth-configuration.ts | 36 +- 27 files changed, 1657 insertions(+), 97 deletions(-) create mode 100644 packages/adapter-docker/.eslintrc.js create mode 100644 packages/adapter-docker/.nycrc.json create mode 100644 packages/adapter-docker/README.md create mode 100644 packages/adapter-docker/package.json create mode 100644 packages/adapter-docker/src/docker-instance-adapter.ts create mode 100644 packages/adapter-docker/src/docker-networking.ts create mode 100644 packages/adapter-docker/src/docker-sequence-adapter.ts create mode 100644 packages/adapter-docker/src/dockerode-docker-helper.ts create mode 100644 packages/adapter-docker/src/index.ts create mode 100644 packages/adapter-docker/src/readme.mtpl create mode 100644 packages/adapter-docker/src/types.ts create mode 100644 packages/adapter-docker/test/pass.spec.ts create mode 100644 packages/adapter-docker/tsconfig.build.json create mode 100644 packages/adapter-docker/tsconfig.json create mode 100644 packages/types/src/instance-adapter.ts diff --git a/packages/adapter-docker/.eslintrc.js b/packages/adapter-docker/.eslintrc.js new file mode 100644 index 000000000..0c111683c --- /dev/null +++ b/packages/adapter-docker/.eslintrc.js @@ -0,0 +1,7 @@ +module.exports = { + ignorePatterns: [".eslintrc.js"], + parserOptions:{ + project: "./tsconfig.json", + tsconfigRootDir: __dirname + } +}; diff --git a/packages/adapter-docker/.nycrc.json b/packages/adapter-docker/.nycrc.json new file mode 100644 index 000000000..7797303b5 --- /dev/null +++ b/packages/adapter-docker/.nycrc.json @@ -0,0 +1,13 @@ +{ + "all": true, + "include": [ + "dist/**/*" + ], + "exclude": [ + "**/*.spec.ts" + ], + "reporter": [ + "lcovonly", + "text" + ] +} diff --git a/packages/adapter-docker/README.md b/packages/adapter-docker/README.md new file mode 100644 index 000000000..36b74543f --- /dev/null +++ b/packages/adapter-docker/README.md @@ -0,0 +1,61 @@ +# Scramjet Transform Hub Adapters + +This module holds two types of adapters utilized by Scramjet Transform Hub: Instance Adapter and Sequence Adapter. These Adapters allows for running the Sequence identification and Instance execution in two basic modes: as a non containerized standalone processes or in a Docker container. + +The adapter provides two main exports: + +* [DockerSequenceAdapter](https://github.com/scramjetorg/transform-hub/tree/HEAD/packages/adapters/src/docker-sequence-adapter.ts) - An adapter for preparing Sequence to be run in Docker container. +* [DockerInstanceAdapter](https://github.com/scramjetorg/transform-hub/tree/HEAD/packages/adapters/src/docker-instance-adapter.ts) - An adapter for running Instance by Runner executed in Docker container. + +## Docs + +See the code documentation here: [scramjetorg/transform-hub/docs/adapters/modules.md](https://github.com/scramjetorg/transform-hub/tree/HEAD/docs/adapters/modules.md) + +## Scramjet Transform Hub + +This package is part of [Scramjet Transform Hub](https://www.npmjs.org/package/@scramjet/sth). + +Scramjet Transform Hub is a deployment and execution platform. Once installed on a server, it will allow you to start your programs and keep them running on a remote machine. You will be able to start programs in the background or connect to them and see their output directly on your terminal. You will be able to pipe your local data to the program, as if it was running from your terminal. You can start your server in AWS, Google Cloud or Azure, start it on your local machine, install it on a Raspberry Pi or wherever else you'd like. + +## Use cases + +There's no limit what you can use it for. You want a stock checker? A chat bot? Maybe you'd like to automate your home? Retrieve sensor data? Maybe you have a lot of data and want to transfer and wrangle it? You have a database of cities and you'd like to enrich your data? You do machine learning and you want to train your set while the data is fetched in real time? Hey, you want to use it for something else and ask us if that's a good use? Ask us [via email](mailto:get@scramjet.org) or hop on our [Scramjet Discord](https://scr.je/join-community-mg1)! + +## Some important links + +* Scramjet, the company behind [Transform Hub](https://scramjet.org) +* The [Scramjet Framework - functional reactive stream processing framework](https://framework.scramjet.org) +* The [Transform Hub repo on github](https://github.com/scramjetorg/transform-hub) +* You can see the [Scramjet Transform Hub API docs here](https://github.com/scramjetorg/transform-hub/tree/HEAD/docs/api-client/README.md) +* You can see the [CLI documentation here](https://github.com/scramjetorg/transform-hub/tree/HEAD/packages/cli/README.md), but `si help` should also be quite effective. +* Don't forget to ⭐ this repo if you like it, `subscribe` to releases and keep visiting us for new versions and updates. +* You can [open an issue - file a bug report or a feature request here](https://github.com/scramjetorg/transform-hub/issues/new/choose) + +## License and contributions + +This module is licensed under AGPL-3.0 license. + +The Scramjet Transform Hub project is dual-licensed under the AGPL-3.0 and MIT licenses. Parts of the project that are linked with your programs are MIT licensed, the rest is AGPL. + +## Contributions + +We accept valid contributions and we will be publishing a more specific project roadmap so contributors can propose features and also help us implement them. We kindly ask you that contributed commits are Signed-Off `git commit --sign-off`. + +We provide support for contributors via test cases. If you expect a certain type of workflow to be officially supported, please specify and implement a test case in `Gherkin` format in `bdd` directory and include it in your pull request. More info about our BDD test you will find [here](https://github.com/scramjetorg/transform-hub/tree/HEAD/bdd/README.md). + +### Help wanted 👩‍🎓🧑👱‍♀️ + +The project need's your help! There's lots of work to do and we have a lot of plans. If you want to help and be part of the Scramjet team, please reach out to us, [on discord](https://scr.je/join-community-mg1) or email us: [opensource@scramjet.org](mailto:opensource@scramjet.org). + +### Donation 💸 + +Do you like this project? It helped you to reduce time spent on delivering your solution? You are welcome to buy us a coffee ☕ Thanks a lot! 😉 + +[You can sponsor us on github](https://github.com/sponsors/scramjetorg) + +* There's also a Paypal donation link if you prefer that: + +[![paypal](https://www.paypalobjects.com/en_US/i/btn/btn_donateCC_LG.gif)](https://www.paypal.com/cgi-bin/webscr?cmd=_s-xclick&hosted_button_id=7F7V65C43EBMW) + + + diff --git a/packages/adapter-docker/package.json b/packages/adapter-docker/package.json new file mode 100644 index 000000000..b4d3f6a14 --- /dev/null +++ b/packages/adapter-docker/package.json @@ -0,0 +1,61 @@ +{ + "name": "@scramjet/adapter-docker", + "version": "0.34.4", + "description": "This package is part of Scramjet Transform Hub. This module holds the docker adapters utilized by Scramjet Transform Hub", + "main": "./src/index.ts", + "scripts": { + "start": "ts-node ./src/index", + "build": "../../scripts/build-all.js --config-name=tsconfig.build.json --copy-dist", + "build:docs": "typedoc", + "clean": "rm -rf ./dist .bic_cache", + "cloc": "cloc src --fullpath --include-lang TypeScript --not-match-d \"(node_modules|test|dist|bdd)\" --by-percent cm", + "test": "nyc ava", + "prepack": "node ../../scripts/publish.js" + }, + "author": "Scramjet ", + "license": "AGPL-3.0", + "dependencies": { + "@scramjet/model": "^0.34.4", + "@scramjet/obj-logger": "^0.34.4", + "@scramjet/pre-runner": "^0.34.4", + "@scramjet/python-runner": "^0.34.4", + "@scramjet/runner": "^0.34.4", + "@scramjet/sth-config": "^0.34.4", + "@scramjet/symbols": "^0.34.4", + "@scramjet/utility": "^0.34.4", + "@scramjet/adapters-utils": "^0.34.4", + "dockerode": "^3.3.4", + "scramjet": "^4.36.9", + "shell-escape": "^0.2.0", + "systeminformation": "^5.12.7", + "ts.data.json": "^2.2.0" + }, + "devDependencies": { + "@scramjet/types": "^0.34.4", + "@types/js-yaml": "4.0.5", + "@types/node": "15.12.5", + "@types/request": "2.48.8", + "@types/shell-escape": "^0.2.1", + "@types/ws": "8.5.3", + "ava": "^3.15.0", + "ts-node": "^10.9.1", + "typedoc": "^0.23.17", + "typedoc-plugin-markdown": "^3.13.6", + "typescript": "~4.7.4" + }, + "ava": { + "extensions": [ + "ts" + ], + "files": [ + "**/*.spec.ts" + ], + "require": [ + "ts-node/register" + ] + }, + "repository": { + "type": "git", + "url": "https://github.com/scramjetorg/transform-hub.git" + } +} diff --git a/packages/adapter-docker/src/docker-instance-adapter.ts b/packages/adapter-docker/src/docker-instance-adapter.ts new file mode 100644 index 000000000..1d575c912 --- /dev/null +++ b/packages/adapter-docker/src/docker-instance-adapter.ts @@ -0,0 +1,285 @@ +import { InstanceAdapterError } from "@scramjet/model"; +import { + ContainerConfiguration, + ContainerConfigurationWithExposedPorts, + ExitCode, + IComponent, + ILifeCycleAdapterMain, + ILifeCycleAdapterRun, + IObjectLogger, + MonitoringMessageData, + InstanceConfig, + RunnerContainerConfiguration, + InstanceLimits, + STHConfiguration, +} from "@scramjet/types"; +import path from "path"; +import { DockerodeDockerHelper } from "./dockerode-docker-helper"; +import { DockerAdapterResources, DockerAdapterRunPortsConfig, DockerAdapterVolumeConfig, IDockerHelper } from "./types"; +import { FreePortsFinder, defer, streamToString } from "@scramjet/utility"; +import { STH_DOCKER_NETWORK, isHostSpawnedInDockerContainer, getHostname } from "./docker-networking"; +import { ObjLogger } from "@scramjet/obj-logger"; +import { getRunnerEnvEntries } from "@scramjet/adapters-utils"; +import { Readable } from "stream"; + +/** + * Adapter for running Instance by Runner executed in Docker container. + */ +class DockerInstanceAdapter implements +ILifeCycleAdapterMain, +ILifeCycleAdapterRun, +IComponent { + private dockerHelper: IDockerHelper; + private _limits?: InstanceLimits = {}; + private resources: DockerAdapterResources = {}; + id: string = ""; + + logger: IObjectLogger; + + crashLogStreams?: Promise; + + get limits() { return this._limits || {} as InstanceLimits; } + private set limits(value: InstanceLimits) { this._limits = value; } + + constructor(_sthConfig: STHConfiguration, id: string = "") { + this.dockerHelper = new DockerodeDockerHelper(); + + this.logger = new ObjLogger(this, { id }); + this.dockerHelper.logger.updateBaseLog({ id }); + this.dockerHelper.logger.pipe(this.logger); + } + + async init(): Promise { + /** ignore */ + } + + /** + * Finds free port for every port requested in Sequence configuration and returns map of assigned ports. + * + * @param {string[]} declaredPorts Ports declared in sequence config. + * @param {ContainerConfigurationWithExposedPorts} containerConfig Container configuration + * extended with configuration for ports exposing. + * @param {boolean} [exposed=false] Defines configuration output type. Exposed ports when true or port bindings. + * + * @returns Promise resolving with map of ports mapping. + */ + private async preparePortBindingsConfig( + declaredPorts: string[], + containerConfig: ContainerConfiguration & ContainerConfigurationWithExposedPorts, + exposed: boolean = false): Promise<{ [key: string]: string; }> { + if (declaredPorts.every(entry => (/^\d{3,5}\/(tcp|udp)$/).test(entry))) { + const freePorts = exposed ? [] : await FreePortsFinder.getPorts( + declaredPorts.length, ...containerConfig.exposePortsRange + ); + + return declaredPorts.reduce((obj: { [ key: string ]: any }, entry: string) => { + if (entry) { + const { port, protocol } = entry.match(/^(?\d{3,5})\/(?(tcp|udp))$/)?.groups as { port: string, protocol: string }; + + obj[`${port}/${protocol}`] = exposed ? {} : [{ HostIp: containerConfig.hostIp, HostPort: freePorts?.pop()?.toString() }]; + } + + return obj; + }, {}); + } + + throw new InstanceAdapterError("INVALID_CONFIGURATION", "Incorrect ports configuration provided."); + } + + /** + * Prepares configuration for expose/bind ports from Docker container. + * + * @param {string[]} ports Ports requested to be accessible from container. + * @param {RunnerContainerConfiguration} containerConfig Runner container configuration. + * + * @returns Configuration for exposing and binding ports in Docker container. + */ + private async getPortsConfig( + ports: string[], containerConfig: RunnerContainerConfiguration + ): Promise { + const [ExposedPorts, PortBindings] = await Promise.all([ + this.preparePortBindingsConfig(ports, containerConfig, true), + this.preparePortBindingsConfig(ports, containerConfig, false) + ]); + + return { ExposedPorts, PortBindings }; + } + + /** + * Returns objects with statistics of docker container with running instance. + * + * @param {MonitoringMessageData} msg Message to be included in statistics message. + * @returns {Promise} Promise resolved with container statistics. + */ + async stats(msg: MonitoringMessageData): Promise { + if (this.resources.containerId) { + const stats = await this.dockerHelper.stats(this.resources.containerId)!; + + return { + cpuTotalUsage: stats.cpu_stats?.cpu_usage?.total_usage, + healthy: msg.healthy, + limit: stats.memory_stats?.limit, + memoryMaxUsage: stats.memory_stats?.max_usage, + memoryUsage: stats.memory_stats?.usage, + networkRx: stats.networks?.eth0?.rx_bytes, + networkTx: stats.networks?.eth0?.tx_bytes, + containerId: this.resources.containerId + }; + } + + return msg; + } + + private async getNetworkSetup(): Promise<{ network: string, host: string }> { + const interfaces = await this.dockerHelper.listNetworks(); + const sthDockerNetwork = interfaces.find(net => net.Name === STH_DOCKER_NETWORK); + + if (!sthDockerNetwork) { + // STH docker network should be created in Host initialization + throw new Error(`Couldn't find sth docker network: ${sthDockerNetwork}`); + } + + if (await isHostSpawnedInDockerContainer()) { + const hostname = getHostname(); + + // If Transform Hub runs in Docker container + // then this container should be connected to STH docker network in Host initialization + + this.logger.debug("Runner will connect to STH container with hostname", hostname); + + return { + network: STH_DOCKER_NETWORK, + host: hostname, + }; + } + // otherwise STH runs on Host OS so we Runner can just connect to the Gateway + const sthNetworkGateway = sthDockerNetwork?.IPAM?.Config?.[0]?.Gateway; + + if (!sthNetworkGateway) { + throw new Error(`Couldn't determine gateway for ${STH_DOCKER_NETWORK}`); + } + + this.logger.debug("Runner will connect to STH on host OS using gateway", sthNetworkGateway); + + return { + network: STH_DOCKER_NETWORK, + host: sthNetworkGateway + }; + } + + // eslint-disable-next-line complexity + async run(config: InstanceConfig, instancesServerPort: number, instanceId: string): Promise { + if (config.type !== "docker") { + throw new Error("Docker instance adapter run with invalid runner config"); + } + + this.limits = config.limits; + + this.resources.ports = + config.config?.ports ? await this.getPortsConfig(config.config.ports, config.container) : undefined; + + config.container.maxMem = config.limits.memory || config.container.maxMem; + + this.logger.info("Instance preparation done for config", config); + + const extraVolumes: DockerAdapterVolumeConfig[] = []; + + const networkSetup = await this.getNetworkSetup(); + + const envs = getRunnerEnvEntries({ + sequencePath: path.join(config.sequenceDir, config.entrypointPath), + instancesServerPort, + instancesServerHost: networkSetup.host, + instanceId, + pipesPath: "" + }).map(([k, v]) => `${k}=${v}`); + + this.logger.debug("Runner will start with envs", envs); + + const { containerId, streams } = await this.dockerHelper.run({ + imageName: config.container.image, + volumes: [ + ...extraVolumes, + { mountPoint: config.sequenceDir, volume: config.id, writeable: false } + ], + labels: { + "scramjet.sequence.name": config.name + }, + ports: this.resources.ports, + publishAllPorts: true, + envs, + autoRemove: true, + maxMem: config.container.maxMem, + networkMode: networkSetup.network + }); + + this.crashLogStreams = Promise.all(([streams.stdout, streams.stderr] as Readable[]).map(streamToString)); + + this.resources.containerId = containerId; + + this.logger.trace("Container is running", containerId); + + try { + const { statusCode } = await this.dockerHelper.wait(containerId); + + this.logger.debug("Container exited", statusCode); + + if (statusCode > 0) { + throw new InstanceAdapterError("RUNNER_NON_ZERO_EXITCODE", { statusCode }); + } else { + return 0; + } + } catch (error: any) { + if (error instanceof InstanceAdapterError && error.code === "RUNNER_NON_ZERO_EXITCODE" && error.data.statusCode) { + this.logger.debug("Container returned non-zero status code", error.data.statusCode); + + return error.data.statusCode; + } + + this.logger.debug("Container errored", error); + + throw error; + } + } + + /** + * Performs cleanup after container close. + * Removes volume used by sequence and fifos used to communication with runner. + */ + async cleanup(): Promise { + if (this.resources.volumeId) { + this.logger.debug("Volume will be removed in 60 sec"); + + await defer(60000); // @TODO: one sec? + await this.dockerHelper.removeVolume(this.resources.volumeId); + + this.logger.debug("Volume removed"); + } + } + + // @ts-ignore + monitorRate(_rps: number): this { + /** ignore */ + } + + /** + * Forcefully stops Runner container. + */ + async remove() { + if (this.resources.containerId) { + this.logger.debug("Forcefully stopping container", this.resources.containerId); + + await this.dockerHelper.stopContainer(this.resources.containerId); + + this.logger.debug("Container removed"); + } + } + + async getCrashLog(): Promise { + if (!this.crashLogStreams) return []; + + return this.crashLogStreams; + } +} + +export { DockerInstanceAdapter }; diff --git a/packages/adapter-docker/src/docker-networking.ts b/packages/adapter-docker/src/docker-networking.ts new file mode 100644 index 000000000..7533f80ef --- /dev/null +++ b/packages/adapter-docker/src/docker-networking.ts @@ -0,0 +1,41 @@ +import fs from "fs/promises"; +import os from "os"; +import { IDockerHelper } from "./types"; + +export const isHostSpawnedInDockerContainer = async () => await fs.access("/.dockerenv").then(() => true, () => false); + +export const getHostname = () => os.hostname(); + +export const STH_DOCKER_NETWORK = "transformhub0"; + +// @TODO this could be encapsulated into IInstanceAdapter for doing something on Transform Hub launch +export async function setupDockerNetworking(dockerHelper: IDockerHelper) { + const networkExists = await dockerHelper.inspectNetwork(STH_DOCKER_NETWORK).then(() => true, () => false); + + if (!networkExists) { + await dockerHelper.createNetwork({ + name: STH_DOCKER_NETWORK, + driver: "bridge", + options: { + "com.docker.network.bridge.host_binding_ipv4":"0.0.0.0", + "com.docker.network.bridge.enable_ip_masquerade":"true", + "com.docker.network.bridge.enable_icc":"true", + "com.docker.network.driver.mtu":"1500" + } + }); + } + + if (await isHostSpawnedInDockerContainer()) { + const { containers } = await dockerHelper.inspectNetwork(STH_DOCKER_NETWORK); + + const hostname = getHostname(); + + const isHostConnected = !!Object.entries(containers).find( + ([id, { Name }]: [string, any]) => id.startsWith(hostname) || Name === hostname + ); + + if (!isHostConnected) { + await dockerHelper.connectToNetwork(STH_DOCKER_NETWORK, hostname); + } + } +} diff --git a/packages/adapter-docker/src/docker-sequence-adapter.ts b/packages/adapter-docker/src/docker-sequence-adapter.ts new file mode 100644 index 000000000..53f80c88a --- /dev/null +++ b/packages/adapter-docker/src/docker-sequence-adapter.ts @@ -0,0 +1,298 @@ +import { SequenceAdapterError } from "@scramjet/model"; +import { + ISequenceAdapter, + SequenceConfig, + DockerSequenceConfig, + IObjectLogger, + DockerAdapterConfiguration +} from "@scramjet/types"; +import { Readable } from "stream"; +import { appendFile } from "fs"; +import { DockerodeDockerHelper } from "./dockerode-docker-helper"; +import { + DockerAdapterResources, + DockerAdapterRunResponse, + DockerAdapterStreams, + DockerVolume, + IDockerHelper +} from "./types"; +import { isDefined, readStreamedJSON } from "@scramjet/utility"; +import { ObjLogger } from "@scramjet/obj-logger"; +import { detectLanguage, sequencePackageJSONDecoder } from "@scramjet/adapters-utils"; + +const PACKAGE_DIR = "/package"; + +/** + * Adapter for preparing Sequence to be run in Docker container. + */ +class DockerSequenceAdapter implements ISequenceAdapter { + private dockerHelper: IDockerHelper; + private resources: DockerAdapterResources = {}; + private config: DockerAdapterConfiguration; + + public name = "DockerSequenceAdapter"; + + /** + * Instance of class providing logging utilities. + */ + logger: IObjectLogger; + + constructor(config: DockerAdapterConfiguration) { + this.config = config; + this.logger = new ObjLogger(this.name); + + this.dockerHelper = new DockerodeDockerHelper(); + this.dockerHelper.logger.pipe(this.logger); + } + + /** + * Initializes adapter. + */ + async init(): Promise { + this.logger.trace("Initializing"); + + await this.fetch(this.config.prerunner.image); + + this.logger.info("Docker adapter initialized with options", { + "python3 runner image": this.config.runnerImages.python3, + "node runner image": this.config.runnerImages.node, + "prerunner image": this.config.prerunner.image + }); + } + + /** + * Pulls image from registry. + * + * @param {string} name Docker image name + */ + async fetch(name: string) { + await this.dockerHelper.pullImage(name, true); + } + + /** + * Finds existing Docker volumes containing sequences. + * + * @returns {Promise} Promise resolving to array of identified sequences. + */ + async list(): Promise { + this.logger.trace("Listing exiting sequences"); + + const potentialVolumes = await this.dockerHelper.listVolumes(); + + const configs = await Promise.all( + potentialVolumes + .map(volume => this.identifyOnly(volume)) + .map(configPromised => configPromised.catch(() => null)) + ); + + return configs.filter(isDefined); + } + + /** + * Identifies sequence existing on Docker volume. + * + * @param {string} volume Volume id. + * @returns {SequenceConfig} Sequence configuration or undefined if sequence cannot be identified. + */ + private async identifyOnly(volume: string): Promise { + this.logger.info("Attempting to identify volume", volume); + + try { + const { streams, wait } = await this.dockerHelper.run({ + imageName: this.config.prerunner?.image || "", + volumes: [{ mountPoint: PACKAGE_DIR, volume, writeable: true }], + command: ["/opt/transform-hub/identify.sh"], + autoRemove: true, + maxMem: this.config.prerunner?.maxMem || 0 + }); + + this.logger.debug("Identify started", volume, this.config.prerunner?.maxMem || 0); + + const ret = await this.parsePackage(streams, wait, volume); + + if (!ret.id) { + return undefined; + } + + this.logger.info("Identified image for volume", { volume, image: ret.container?.image }); + + return ret; + } catch (e: any) { + this.logger.error("Docker failed", e.message, volume); + + throw e; + } + } + + /** + * Unpacks and identifies sequence in Docker volume. + * This is the main adapter method creating new Docker volume and starting Prerunner + * with created volume mounted to unpack sequence on it. + * When Prerunner finishes, it will return JSON with sequence information. + * + * @param {Readable} stream Stream containing sequence to be identified. + * @param {string} id Id for the new docker volume where sequence will be stored. + * @param {boolean} override Removes previous sequence + * @returns {Promise} Promise resolving to sequence config. + */ + async identify(stream: Readable, id: string, override = false): Promise { + const volStart = new Date(); + + if (override) { + await this.dockerHelper.removeVolume(id); + } + + const volumeId = await this.createVolume(id); + + const volSecs = (new Date().getTime() - volStart.getTime()) / 1000; + + appendFile("timing-log.ndjson", JSON.stringify({ + operation: "creating volume", + volumeId: volumeId, + time: volSecs, + }) + "\n", () => {}); + + this.resources.volumeId = volumeId; + + this.logger.info(`Volume created in ${volSecs}s`, volumeId); + + let runResult: DockerAdapterRunResponse; + const prerunnerStart = new Date(); + + this.logger.debug("Starting PreRunner", this.config.prerunner); + + try { + runResult = await this.dockerHelper.run({ + imageName: this.config.prerunner.image || "", + volumes: [{ mountPoint: PACKAGE_DIR, volume: volumeId, writeable: true }], + autoRemove: true, + maxMem: this.config.prerunner.maxMem || 0 + }); + } catch (err: any) { + this.logger.error(err); + + throw new SequenceAdapterError("DOCKER_ERROR"); + } + + const startSecs = (new Date().getTime() - prerunnerStart.getTime()) / 1000; + + appendFile("timing-log.ndjson", JSON.stringify({ + operation: "starting pre-runner", + time: startSecs, + }) + "\n", () => {}); + + try { + const { streams, wait } = runResult; + + stream.pipe(streams.stdin); + + const config = await this.parsePackage(streams, wait, volumeId); + + await this.fetch(config.container.image); + + return config; + } catch (err: any) { + this.logger.error("Identify failed on volume", id); + if (err instanceof SequenceAdapterError) { + throw err; + } else { + throw new SequenceAdapterError("PRERUNNER_ERROR", err); + } + } + } + + /** + * Creates volume with provided id. + * + * @param {string} id Volume id. + * @returns {DockerVolume} Created volume. + */ + private async createVolume(id: string): Promise { + try { + return await this.dockerHelper.createVolume(id); + } catch (error: any) { + this.logger.error("Error creating volume", id); + + throw new SequenceAdapterError("DOCKER_ERROR", "Error creating volume"); + } + } + + /** + * Parses PreRunner output and returns sequence configuration. + * + * @param {DockerAdapterStreams} streams Docker container std streams. + * @param {Function} wait TBD + * @param {DockerVolume} volumeId Id of the volume where sequence is stored. + * @returns {Promise} Promise resolving to sequence configuration. + */ + // eslint-disable-next-line complexity + private async parsePackage( + streams: DockerAdapterStreams, + wait: Function, + volumeId: DockerVolume + ): Promise { + const parseStart = new Date(); + + const [preRunnerResult] = (await Promise.all([readStreamedJSON(streams.stdout as Readable), wait])) as any; + + const parseSecs = (new Date().getTime() - parseStart.getTime()) / 1000; + + appendFile("timing-log.ndjson", JSON.stringify({ + operation: "waiting for pre-runner", + time: parseSecs, + }) + "\n", () => {}); + + this.logger.debug("PreRunner response", preRunnerResult); + + if (preRunnerResult && preRunnerResult.error) { + this.logger.error("PreRunner failed", preRunnerResult.error); + + throw new SequenceAdapterError("PRERUNNER_ERROR", preRunnerResult.error); + } + + const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(preRunnerResult); + const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {}; + const config = validPackageJson.scramjet?.config ? { ...validPackageJson.scramjet.config } : {}; + + const container = Object.assign({}, this.config.runner); + + container.image = "python3" in engines + ? this.config.runnerImages.python3 + : this.config.runnerImages.node; + + return { + type: "docker", + container, + name: validPackageJson.name || "", + version: validPackageJson.version || "", + engines, + config, + sequenceDir: PACKAGE_DIR, + entrypointPath: validPackageJson.main, + id: volumeId, + description: validPackageJson.description, + author: validPackageJson.author, + keywords: validPackageJson.keywords, + args: validPackageJson.args, + repository: validPackageJson.repository, + language: detectLanguage(validPackageJson) + }; + } + + /** + * Removes Docker volume used by Sequence. + * + * @param {SequenceConfig} config Sequence configuration. + */ + async remove(config: SequenceConfig) { + if (config.type !== "docker") { + throw new Error(`Incorrect SequenceConfig passed to DockerSequenceAdapter: ${config.type}`); + } + + await this.dockerHelper.removeVolume(config.id); + + this.logger.debug("Volume removed", config.id); + } +} + +export { DockerSequenceAdapter }; diff --git a/packages/adapter-docker/src/dockerode-docker-helper.ts b/packages/adapter-docker/src/dockerode-docker-helper.ts new file mode 100644 index 000000000..d87351d8f --- /dev/null +++ b/packages/adapter-docker/src/dockerode-docker-helper.ts @@ -0,0 +1,398 @@ +import Dockerode from "dockerode"; +import { PassThrough } from "stream"; +import { appendFile } from "fs"; + +import { + DockerAdapterRunConfig, + DockerAdapterRunResponse, + DockerAdapterStreams, DockerAdapterVolumeConfig, + DockerAdapterWaitOptions, + DockerContainer, + IDockerHelper, DockerImage, DockerVolume, ExitData, + DockerCreateNetworkConfig, DockerNetwork +} from "./types"; +import { ObjLogger } from "@scramjet/obj-logger"; + +/** + * Configuration for volumes to be mounted to container. + */ +type DockerodeVolumeMountConfig = { + /** + * Directory in container where volume has to be mounted. + */ + Target: string, + + /** + * Volume name. + */ + Source: string, + + /** + * Mounting mode. + */ + Type: "volume" | "bind", + + /** + * Access mode. + */ + ReadOnly: boolean +} + +let _isDockerConfigured: boolean|undefined; + +async function isDockerConfigured() { + try { + await new Dockerode().info(); + _isDockerConfigured = true; + } catch (e) { + _isDockerConfigured = false; + } + + return _isDockerConfigured; +} + +/** + * Communicates with Docker using Dockerode library. + */ +export class DockerodeDockerHelper implements IDockerHelper { + public dockerode: Dockerode = new Dockerode(); + + logger = new ObjLogger(this); + + /** + * Translates DockerAdapterVolumeConfig to volumes configuration that Docker API can understand. + * + * @param {DockerAdapterVolumeConfig[]} volumeConfigs Volumes configuration. + * @returns {DockerodeVolumeMountConfig[]} Translated volumes configuration. + */ + translateVolumesConfig(volumeConfigs: DockerAdapterVolumeConfig[]): DockerodeVolumeMountConfig[] { + return volumeConfigs.map(cfg => { + if ("bind" in cfg) { + return { + Target: cfg.mountPoint, + Source: cfg.bind, + Type: "bind", + ReadOnly: !cfg.writeable + }; + } + + return { + Target: cfg.mountPoint, + Source: cfg.volume, + Type: "volume", + ReadOnly: !cfg.writeable + }; + }); + } + + /** + * Creates container based on provided parameters. + * + * @param containerCfg Image to start container from. + * @returns {Promise} Promise resolving with created container id. + */ + async createContainer( + containerCfg: { + dockerImage: DockerImage, + volumes: DockerAdapterVolumeConfig[], + binds: string[], + ports: any, + envs: string[], + autoRemove: boolean, + maxMem: number, // TODO: Container configuration + command?: string[], + publishAllPorts: boolean, + labels: { [key: string]: string }, + networkMode?: string + } + ): Promise { + containerCfg.ports = { ...containerCfg.ports }; + const config: Dockerode.ContainerCreateOptions = { + Image: containerCfg.dockerImage, + AttachStdin: true, + AttachStdout: true, + AttachStderr: true, + Tty: false, + OpenStdin: true, + StdinOnce: true, + Env: containerCfg.envs, + ExposedPorts: containerCfg.ports.ExposedPorts, + HostConfig: { + Binds: containerCfg.binds, + Mounts: this.translateVolumesConfig(containerCfg.volumes), + AutoRemove: containerCfg.autoRemove || false, + Memory: containerCfg.maxMem, + MemorySwap: 0, + PortBindings: containerCfg.ports.PortBindings, + PublishAllPorts: containerCfg.publishAllPorts || false, + NetworkMode: containerCfg.networkMode + }, + Labels: containerCfg.labels || {}, + }; + + if (containerCfg.command) { + config.Cmd = [...containerCfg.command]; + } + + const { id } = await this.dockerode.createContainer(config); + + return id; + } + + /** + * Start container with provided id. + * + * @param containerId Container id. + * @returns Promise resolving when container has been started. + */ + startContainer(containerId: DockerContainer): Promise { + return this.dockerode.getContainer(containerId).start(); + } + + /** + * Stops container with provided id. + * + * @param containerId Container id. + * @returns Promise which resolves when the container has been stopped. + */ + stopContainer(containerId: DockerContainer): Promise { + return this.dockerode.getContainer(containerId).stop().catch((error: any) => { + this.logger.warn("Failed to stop container"); + + if (error.statusCode === 304) { + this.logger.warn("Container is already stopped"); + return; + } + + throw error; + }); + } + + /** + * Forcefully removes container with provided id. + * + * @param containerId Container id. + * @returns Promise which resolves when container has been removed. + */ + removeContainer(containerId: DockerContainer): Promise { + return this.dockerode.getContainer(containerId).remove(); + } + + /** + * Gets statistics from container with provided id. + * + * @param containerId Container id. + * @returns Promise which resolves with container statistics. + */ + async stats(containerId: DockerContainer): Promise { + return this.dockerode.getContainer(containerId).stats({ stream: false }); + } + + private async isImageInLocalRegistry(name: string): Promise { + return this.dockerode.getImage(name).get().then(() => true, () => false); + } + + private pulledImages: {[key: string]: Promise | undefined } = {}; + + async pullImage(name: string, fetchOnlyIfNotExists = true) { + if (fetchOnlyIfNotExists) { + const start = new Date(); + + this.logger.trace("Checking image", name); + + if (this.pulledImages[name]) { + this.logger.trace("Image already pulled"); + + return this.pulledImages[name]; + } + + if (await this.isImageInLocalRegistry(name)) { + this.logger.trace("Image found in local registry"); + this.pulledImages[name] = Promise.resolve(); + + const seconds = (new Date().getTime() - start.getTime()) / 1000; + + appendFile("timing-log.ndjson", JSON.stringify({ + operation: "checking image", + image: name, + time: seconds, + }) + "\n", () => {}); + + return this.pulledImages[name]; + } + } + + this.pulledImages[name] = (async () => { + const start = new Date(); + + this.logger.trace("Start pulling image", name); + + const pullStream = await this.dockerode.pull(name); + + // Wait for pull to finish + await new Promise(res => this.dockerode.modem.followProgress(pullStream, res)); + + const seconds = (new Date().getTime() - start.getTime()) / 1000; + + appendFile("timing-log.ndjson", JSON.stringify({ + operation: "docker pull", + image: name, + time: seconds, + }) + "\n", () => {}); + + this.logger.trace(`Image pulled in ${seconds}s`); + })(); + + return this.pulledImages[name]; + } + + /** + * Creates docker volume. + * + * @param name Volume name. Optional. If not provided, volume will be named with unique name. + * @returns Volume name. + */ + async createVolume(name: string = ""): Promise { + return this.dockerode.createVolume({ + Name: name, + Labels: { + "org.scramjet.host.is-sequence": "true" + } + }).then((volume: Dockerode.Volume) => { + return volume.name; + }); + } + + /** + * Removes volume with specific name. + * + * @param volumeName Volume name. + * @returns Promise which resolves when volume has been removed. + */ + async removeVolume(volumeName: DockerVolume): Promise { + return this.dockerode.getVolume(volumeName).remove(); + } + + async listVolumes() { + const { Volumes } = await this.dockerode.listVolumes({ + filters: { label: { "org.scramjet.host.is-sequence": true } } + }); + + return Volumes.map(volume => volume.Name); + } + + /** + * Attaches to container streams. + * + * @param container Container id. + * @param opts Attach options. + * @returns Object with container's standard I/O streams. + */ + async attach(container: DockerContainer, opts: any): Promise { + return this.dockerode.getContainer(container).attach(opts); + } + + /** + * Starts container. + * + * @param config Container configuration. + * @returns @see {DockerAdapterRunResponse} + */ + async run(config: DockerAdapterRunConfig): Promise { + const streams: DockerAdapterStreams = { + stdin: new PassThrough(), + stdout: new PassThrough(), + stderr: new PassThrough() + }; + // ------ + const container = await this.createContainer( + { + dockerImage: config.imageName, + volumes: config.volumes || [], + binds: config.binds || [], + ports: config.ports, + envs: config.envs || [], + autoRemove: config.autoRemove || false, + maxMem: (config.maxMem || 64) * 1024 * 1024, + command: config.command, + labels: config.labels || {}, + publishAllPorts: config.publishAllPorts || false, + networkMode: config.networkMode + } + ); + // ------ + const stream = await this.attach(container, { + stream: true, + stdin: true, + stdout: true, + stderr: true, + hijack: true + }); + + stream.on("close", () => { + streams.stdout.emit("end"); + streams.stderr.emit("end"); + }); + + await this.startContainer(container); + + streams.stdin.pipe(stream); + + this.dockerode.getContainer(container) + .modem.demuxStream(stream, streams.stdout, streams.stderr); + + return { + streams: streams, + containerId: container, + wait: async () => this.wait(container, { condition: "not-running" }) + }; + } + + /** + * Waits for container status change. + * + * @param container Container id. + * @param options Condition to be fulfilled. @see {DockerAdapterWaitOptions} + * @returns Container exit code. + */ + async wait(container: DockerContainer, options: DockerAdapterWaitOptions = {}): Promise { + const containerExitResult = await this.dockerode.getContainer(container).wait(options); + + return { statusCode: containerExitResult.StatusCode }; + } + + async listNetworks(): Promise { + // @TODO this + return this.dockerode.listNetworks(); + } + + async inspectNetwork(id: string): Promise { + const network = await this.dockerode.getNetwork(id).inspect(); + + const dockerodeContainers = network.Containers as Record; + + const containers = Object.fromEntries( + Object.entries(dockerodeContainers).map(([containerId, { Name }]) => [containerId, { name: Name }]) + ); + + return { + containers + }; + } + + async connectToNetwork(networkid: string, container: string): Promise { + await this.dockerode.getNetwork(networkid).connect({ Container: container }); + } + + async createNetwork(config: DockerCreateNetworkConfig): Promise { + await this.dockerode.createNetwork({ + Name: config.name, + Driver:config.driver, + Options: config.options + }); + } + + static async isDockerConfigured() { + return isDockerConfigured(); + } +} diff --git a/packages/adapter-docker/src/index.ts b/packages/adapter-docker/src/index.ts new file mode 100644 index 000000000..5119667c4 --- /dev/null +++ b/packages/adapter-docker/src/index.ts @@ -0,0 +1,11 @@ +/** + * Adapter module must provide SequenceAdapter, InstanceAdapter classes, init method and name field. + */ +export { DockerSequenceAdapter as SequenceAdapter } from "./docker-sequence-adapter"; +export { DockerInstanceAdapter as InstanceAdapter } from "./docker-instance-adapter"; + +export const init = (..._args: any[]) => { + return true; +}; + +export const name = "docker"; diff --git a/packages/adapter-docker/src/readme.mtpl b/packages/adapter-docker/src/readme.mtpl new file mode 100644 index 000000000..237e68061 --- /dev/null +++ b/packages/adapter-docker/src/readme.mtpl @@ -0,0 +1,18 @@ +# Scramjet Transform Hub Adapters + +This module holds two types of adapters utilized by Scramjet Transform Hub: Instance Adapter and Sequence Adapter. These Adapters allows for running the Sequence identification and Instance execution in standalone processes. + +The adapter provides two main exports: + +* [SequenceAdapter](./src/process-sequence-adapter.ts) - An adapter for preparing Sequence to be run in subprocess. +* [InstanceAdapter](./src/process-instance-adapter.ts) - An adapter for running Instance by Runner executed in subprocess. + +## Docs + +>!docs adapters/modules.md & + +>@sth +>@use-cases +>@links +>@license-agpl +>@contrib diff --git a/packages/adapter-docker/src/types.ts b/packages/adapter-docker/src/types.ts new file mode 100644 index 000000000..b5ec64fbc --- /dev/null +++ b/packages/adapter-docker/src/types.ts @@ -0,0 +1,330 @@ +import { ExitCode, InstanceId, IObjectLogger } from "@scramjet/types"; +import { ContainerStats, NetworkInspectInfo } from "dockerode"; +import { PathLike } from "fs"; +import { Stream, Writable } from "stream"; + +/** + * Docker image. + * + * @typedef {string} DockerImage + */ +export type DockerImage = string; + +/** + * Docker container. + * + * @typedef {string} DockerContainer + */ +export type DockerContainer = string; + +/** + * Docker volume. + * + * @typedef {string} DockerVolume + */ +export type DockerVolume = string; + +/** + * Volume mounting configuration. + * + * @typedef {object} DockerAdapterVolumeConfig + */ +export type DockerAdapterVolumeConfig = { + /** + * @property {string} mountPoint Mount point. + * */ + mountPoint: string; + + /** + * @property { boolean } writeable Mount mode. Container can write to the volume if set to true. + */ + writeable: boolean; +} & ( + { + /** + * @property {DockerVolume} volume Volume. + */ + volume: DockerVolume; + } | { + /** + * @property {string} bind A bind mount. + */ + bind: string; + } +); + +export type DockerAdapterRunPortsConfig = { + ExposedPorts: any, + PortBindings: any +} + +export type DockerNetwork = { containers: Record } + +export type DockerCreateNetworkConfig = { name: string, driver: string, options: Record } + +/** + * Configuration used to run command in container. + * + * @typedef {object} DockerAdapterRunConfig + */ +export type DockerAdapterRunConfig = { + /** + * @property {string} imageName Image name. + */ + imageName: string; + + /** + * Command with optional parameters. + * + * @property {string[]} command Command to be executed. + */ + command?: string[]; + + /** + * @property {DockerAdapterVolumeConfig[]} volumes Volumes configuration. + */ + volumes?: DockerAdapterVolumeConfig[], + + /** + * @property {string[]} binds Directories mount configuration. + */ + binds?: string[], + + /** + * @property {DockerAdapterRunPortsConfig} ports Docker ports configuration + */ + ports?: DockerAdapterRunPortsConfig + + /** + * @property {string[]} envs A list of environment variables + * to set inside the container in the form ```["VAR=value", ...]``` + */ + envs?: string[], + + /** + * @property {boolean} autoRemove If true container will be removed after container's process exit. + */ + autoRemove?: boolean, + + /** + * @property {number} maxMem Container memory limit (bytes). + */ + maxMem?: number, + + publishAllPorts?: boolean, + + labels?: { + [key: string]: string + }, + + networkMode?: string, +}; + +/** + * Standard streams connected with container. + */ +export type DockerAdapterStreams = { + /** + * @type {Writable} + */ + stdin: Writable, + + /** + * @type {Stream} + */ + stdout: Stream, + + /** + * @type {Stream} + */ + stderr: Stream +}; + +export type ExitData = { + statusCode: ExitCode +} + +export type DockerAdapterResources = { + containerId?: DockerContainer; + volumeId?: DockerVolume; + fifosDir?: PathLike; + ports?: DockerAdapterRunPortsConfig; +} + +export type DockerAdapterWaitOptions = { + condition?: "not-running" | "next-exit" | "removed" +} + +/** + * Result of running command in container. + */ +export type DockerAdapterRunResponse = { + /** + * @type {DockerAdapterStreams} Set of standard streams. + */ + streams: DockerAdapterStreams, + + /** + * @type {Function} Function which return promise resolving when container status changed. + * Used to wait for container end. + */ + wait: Function + + /** + * @type {DockerContainer} Docker container. + */ + containerId: DockerContainer +}; +export interface IDockerHelper { + logger: IObjectLogger; + + /** + * Converts pairs of mount path and volume name to DockerHelper specific volume configuration. + * + * @param {DockerAdapterVolumeConfig} volumeConfigs[] Volume configuration objects. + * + * @returns {any} DockerHelper volume configuration. + */ + translateVolumesConfig: (volumeConfigs: DockerAdapterVolumeConfig[]) => any; + + /** + * Creates Docker container from provided image with attached volumes and local directories. + * + * @param {DockerImage} dockerImage Docker image name. + * @param {DockerAdapterVolumeConfig[]} volumes Volumes to be mounted to container. + * @param {string[]} binds Directories to be mounted. + * @param {string[]} envs Environment variables. + * @param {boolean} autoRemove If true, container will be removed when finished. + * + * @returns {Promise} Created container. + */ + createContainer: ( + containerCfg: { + dockerImage: DockerImage, + volumes: DockerAdapterVolumeConfig[], + binds: string[], + ports: any, + envs: string[], + autoRemove: boolean, + maxMem: number, + publishAllPorts: boolean, + labels: { + [key: string]: string + }, + networkMode?: string + } + ) => Promise; + + /** + * Starts container. + * + * @param {DockerContainer} containerId Container to be started. + * + * @returns {Promise} + */ + startContainer: (containerId: DockerContainer) => Promise; + + /** + * Stops container. + * + * @param {DockerContainer} containerId Container id to be stopped. + * + * @returns {Promise} + */ + stopContainer: (containerId: DockerContainer) => Promise; + + stats: (containerId: DockerContainer) => Promise; + /** + * Removes container. + * + * @param {DockerContainer} containerId Container id. + * + * @returns {Promise} + */ + removeContainer: (containerId: DockerContainer) => Promise; + + /** + * Lists existing volumes + * + * @returns {Promise} List of existing volumes + */ + listVolumes: () => Promise; + + /** + * Creates volume. + * + * @param {string} name Volume name. + * + * @returns {Promise} Created volume. + */ + createVolume: (name?: string) => Promise; + + /** + * Removes volume. + * + * @param {DockerVolume} Volume. + * + * @returns {Promise} + */ + removeVolume: (volumeId: DockerVolume) => Promise; + + /** + * Executes command in container. + * + * @param {DockerAdapterRunConfig} config Execution configuration. + * + * @returns {Promise} + */ + run: (config: DockerAdapterRunConfig) => Promise; + + /** + * Waits until container exits + * + * @param {DockerContainer} container + * + * @returns {Promise} + */ + wait(container: DockerContainer, options?: DockerAdapterWaitOptions): Promise; + + /** + * Fetches the image from repo + * + * @param name the name of the image, eg. ubuntu:latest + * @param fetchOnlyIfNotExists fetch only if not exists (defaults to true) + */ + pullImage(name: string, fetchOnlyIfNotExists?: boolean): Promise + + listNetworks(): Promise + + inspectNetwork(id: string): Promise + + connectToNetwork(networkid: string, container: string): Promise + + createNetwork(config: DockerCreateNetworkConfig): Promise +} + +export type InstanceAdapterOptions = { + exitDelay: number; +} + +export type RunnerEnvConfig = { + paths?: "posix" | "win32" + sequencePath: string; + pipesPath: string; + instancesServerPort: number; + instancesServerHost: string; + instanceId: InstanceId; +} + +export type RunnerEnvironmentVariables = Partial<{ + PATH: string; + DEVELOPMENT: string; + PRODUCTION: string; + SEQUENCE_PATH: string; + INSTANCES_SERVER_PORT: string; + INSTANCES_SERVER_HOST: string; + INSTANCE_ID: string; + PIPES_LOCATION: string; + CRASH_LOG: string; + [key: string]: string; +}>; diff --git a/packages/adapter-docker/test/pass.spec.ts b/packages/adapter-docker/test/pass.spec.ts new file mode 100644 index 000000000..57dc6c738 --- /dev/null +++ b/packages/adapter-docker/test/pass.spec.ts @@ -0,0 +1,5 @@ +import test from "ava"; + +test("Passing test", (t) => { + t.pass(); +}); diff --git a/packages/adapter-docker/tsconfig.build.json b/packages/adapter-docker/tsconfig.build.json new file mode 100644 index 000000000..e0848624e --- /dev/null +++ b/packages/adapter-docker/tsconfig.build.json @@ -0,0 +1,9 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "./dist" + }, + "include": [ + "src/**/*" + ] +} diff --git a/packages/adapter-docker/tsconfig.json b/packages/adapter-docker/tsconfig.json new file mode 100644 index 000000000..aee20079b --- /dev/null +++ b/packages/adapter-docker/tsconfig.json @@ -0,0 +1,22 @@ +{ + "compilerOptions": { + "outDir": "./dist" + }, + "extends": "../../tsconfig.base.json", + "include": [ + "./src", + "./test" + ], + "exclude": [ + "node_modules" + ], + "typedocOptions": { + "entryPoints": [ + "src/index.ts" + ], + "out": "../../docs/adapters", + "plugin": "typedoc-plugin-markdown", + "gitRevision": "HEAD", + "sort": "alphabetical" + } +} diff --git a/packages/adapter-process/src/index.ts b/packages/adapter-process/src/index.ts index c3ff5178b..ffae3dba5 100644 --- a/packages/adapter-process/src/index.ts +++ b/packages/adapter-process/src/index.ts @@ -1,12 +1,11 @@ /** * Adapter module must provide SequenceAdapter, InstanceAdapter classes, init method and name field. */ - -export { ProcessSequenceAdapter as SequenceAdapter } from "./process-sequence-adapter"; +export { ProcessSequenceAdapter as SequenceAdapter } from "./process-sequence-adapter"; export { ProcessInstanceAdapter as InstanceAdapter } from "./process-instance-adapter"; -export const init = (...args: any[]) => { +export const init = (..._args: any[]) => { return true; -} +}; export const name = "process"; diff --git a/packages/adapter-process/src/process-instance-adapter.ts b/packages/adapter-process/src/process-instance-adapter.ts index 583c1b8e2..3a2b77920 100644 --- a/packages/adapter-process/src/process-instance-adapter.ts +++ b/packages/adapter-process/src/process-instance-adapter.ts @@ -198,4 +198,4 @@ class ProcessInstanceAdapter implements } } -export { ProcessInstanceAdapter }; +export { ProcessInstanceAdapter }; diff --git a/packages/adapters/src/docker-sequence-adapter.ts b/packages/adapters/src/docker-sequence-adapter.ts index 2c71d3038..3bec26f37 100644 --- a/packages/adapters/src/docker-sequence-adapter.ts +++ b/packages/adapters/src/docker-sequence-adapter.ts @@ -2,9 +2,9 @@ import { SequenceAdapterError } from "@scramjet/model"; import { ISequenceAdapter, SequenceConfig, - STHConfiguration, DockerSequenceConfig, - IObjectLogger + IObjectLogger, + DockerAdapterConfiguration } from "@scramjet/types"; import { Readable } from "stream"; import { appendFile } from "fs"; @@ -29,7 +29,7 @@ const PACKAGE_DIR = "/package"; class DockerSequenceAdapter implements ISequenceAdapter { private dockerHelper: IDockerHelper; private resources: DockerAdapterResources = {}; - private config: STHConfiguration; + private config: DockerAdapterConfiguration; public name = "DockerSequenceAdapter"; @@ -38,7 +38,7 @@ class DockerSequenceAdapter implements ISequenceAdapter { */ logger: IObjectLogger; - constructor(config: STHConfiguration) { + constructor(config: DockerAdapterConfiguration) { this.config = config; this.logger = new ObjLogger(this.name); @@ -52,12 +52,12 @@ class DockerSequenceAdapter implements ISequenceAdapter { async init(): Promise { this.logger.trace("Initializing"); - await this.fetch(this.config.docker.prerunner.image); + await this.fetch(this.config.prerunner.image); this.logger.info("Docker adapter initialized with options", { - "python3 runner image": this.config.docker.runnerImages.python3, - "node runner image": this.config.docker.runnerImages.node, - "prerunner image": this.config.docker.prerunner.image + "python3 runner image": this.config.runnerImages.python3, + "node runner image": this.config.runnerImages.node, + "prerunner image": this.config.prerunner.image }); } @@ -100,14 +100,14 @@ class DockerSequenceAdapter implements ISequenceAdapter { try { const { streams, wait } = await this.dockerHelper.run({ - imageName: this.config.docker.prerunner?.image || "", + imageName: this.config.prerunner?.image || "", volumes: [{ mountPoint: PACKAGE_DIR, volume, writeable: true }], command: ["/opt/transform-hub/identify.sh"], autoRemove: true, - maxMem: this.config.docker.prerunner?.maxMem || 0 + maxMem: this.config.prerunner?.maxMem || 0 }); - this.logger.debug("Identify started", volume, this.config.docker.prerunner?.maxMem || 0); + this.logger.debug("Identify started", volume, this.config.prerunner?.maxMem || 0); const ret = await this.parsePackage(streams, wait, volume); @@ -160,14 +160,14 @@ class DockerSequenceAdapter implements ISequenceAdapter { let runResult: DockerAdapterRunResponse; const prerunnerStart = new Date(); - this.logger.debug("Starting PreRunner", this.config.docker.prerunner); + this.logger.debug("Starting PreRunner", this.config.prerunner); try { runResult = await this.dockerHelper.run({ - imageName: this.config.docker.prerunner.image || "", + imageName: this.config.prerunner.image || "", volumes: [{ mountPoint: PACKAGE_DIR, volume: volumeId, writeable: true }], autoRemove: true, - maxMem: this.config.docker.prerunner.maxMem || 0 + maxMem: this.config.prerunner.maxMem || 0 }); } catch (err: any) { this.logger.error(err); @@ -255,11 +255,11 @@ class DockerSequenceAdapter implements ISequenceAdapter { const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {}; const config = validPackageJson.scramjet?.config ? { ...validPackageJson.scramjet.config } : {}; - const container = Object.assign({}, this.config.docker.runner); + const container = Object.assign({}, this.config.runner); container.image = "python3" in engines - ? this.config.docker.runnerImages.python3 - : this.config.docker.runnerImages.node; + ? this.config.runnerImages.python3 + : this.config.runnerImages.node; return { type: "docker", diff --git a/packages/cli/src/lib/commands/sequence.ts b/packages/cli/src/lib/commands/sequence.ts index e5666d49e..c7d3c546a 100644 --- a/packages/cli/src/lib/commands/sequence.ts +++ b/packages/cli/src/lib/commands/sequence.ts @@ -183,12 +183,12 @@ export const sequence: CommandDefinition = (program) => { .description("Removes the Sequence from the Hub") .action(async (id: string, { force }) => { await sequenceDelete(id, { force }).then( - (res => { displayObject(res, profileManager.getProfileConfig().format); }), - (error => { + res => { displayObject(res, profileManager.getProfileConfig().format); }, + error => { displayError( JSON.parse(error?.body || { body: "Unknown error" }) ); - }) + } ); }); diff --git a/packages/host/src/lib/adapter-manager.ts b/packages/host/src/lib/adapter-manager.ts index 7dd357f14..2392c39b3 100644 --- a/packages/host/src/lib/adapter-manager.ts +++ b/packages/host/src/lib/adapter-manager.ts @@ -15,11 +15,13 @@ export class AdapterManager { this.logger.info("Loading adapters...", Object.keys(this.sthConfig.adapters)); this.adapters = (await Promise.all( - Object.keys(this.sthConfig.adapters).map(async (pkgName: string) => ({ name: pkgName, pkg: await import(pkgName) }) - ))).reduce((acc, pkg) => { + Object.keys(this.sthConfig.adapters).map( + async (pkgName: string) => ({ name: pkgName, pkg: await import(pkgName) }) + ) + )).reduce((acc, pkg) => { if (!AdapterManager.validateAdapter(pkg.pkg)) { throw new Error(`Invalid adapter provided ${pkg.name}`); - }; + } if (acc[pkg.name]) throw new Error("Invalid adapters configuration, duplicated adapter name"); @@ -45,7 +47,7 @@ export class AdapterManager { const adapter = this.getAdapterByName(name); if (!adapter) { - return { error: "Adapter not found."}; + return { error: "Adapter not found." }; } return adapter.init(); diff --git a/packages/host/src/lib/csi-controller.ts b/packages/host/src/lib/csi-controller.ts index 854b1a0b0..42dbf4259 100644 --- a/packages/host/src/lib/csi-controller.ts +++ b/packages/host/src/lib/csi-controller.ts @@ -185,7 +185,7 @@ export class CSIController extends TypedEmitter { this.inputTopic = payload.inputTopic; this.hostProxy = hostProxy; this.limits = { - memory: payload.limits?.memory || sthConfig.docker.runner.maxMem + memory: payload.limits?.memory // @TODO: || sthConfig.docker.runner.maxMem }; this.instanceLifetimeExtensionDelay = +sthConfig.timings.instanceLifetimeExtensionDelay; diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index fe137a6e6..80db3591a 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -694,10 +694,7 @@ export class Host implements IComponent { const adapter = this.adapterManager.getAvailableAdapter(); if (!adapter) { - return { - opStatus: ReasonPhrases.FAILED_DEPENDENCY, - error: "Can't use adapter" - }; + throw new Error("Error identifying existing sequences. Adapter unavailable"); } const sequenceAdapter = new adapter!.SequenceAdapter(this.config); @@ -713,6 +710,7 @@ export class Host implements IComponent { this.logger.trace(`Sequence identified: ${config.id}`); this.sequencesStore.set(config.id, { id: config.id, config: config, instances: new Set() }); } + this.logger.info(` ${configs.length} sequences identified`); } catch (e: any) { this.logger.warn("Error while trying to identify existing sequences.", e); @@ -736,7 +734,7 @@ export class Host implements IComponent { return { opStatus: ReasonPhrases.FAILED_DEPENDENCY, error: "Can't initialize Adapter" - } + }; } const sequenceAdapter = new adapter.SequenceAdapter(this.config); @@ -777,19 +775,19 @@ export class Host implements IComponent { this.auditor.auditSequence(id, SequenceMessageCode.SEQUENCE_CREATED); this.pushTelemetry("Sequence uploaded", { language: config.language.toLowerCase(), seqId: id }); - - return { - id: config.id, - opStatus: ReasonPhrases.OK, - }; } catch (error: any) { this.logger.error("Error processing sequence", error); return { opStatus: ReasonPhrases.UNPROCESSABLE_ENTITY, - error, + error }; } + + return { + id, + opStatus: ReasonPhrases.OK, + }; } async handleSequenceUpdate(stream: ParsedMessage): Promise> { diff --git a/packages/sth-config/src/config-service.ts b/packages/sth-config/src/config-service.ts index 30c358791..49a67171f 100644 --- a/packages/sth-config/src/config-service.ts +++ b/packages/sth-config/src/config-service.ts @@ -19,22 +19,6 @@ const _defaultConfig: STHConfiguration = { reconnectionDelay: 2000, }, debug: false, - docker: { - prerunner: { - image: "", - maxMem: 128, - }, - runner: { - image: "", - maxMem: 512, - exposePortsRange: [30000, 32767], - hostIp: "0.0.0.0" - }, - runnerImages: { - python3: "", - node: "", - }, - }, identifyExisting: false, host: { apiBase: "/api/v1", @@ -81,15 +65,27 @@ const _defaultConfig: STHConfiguration = { adapters: { "@scramjet/adapter-process": { sequencesRoot: path.join(homedir(), ".scramjet_sequences"), + }, + "@scramjet/adapter-docker": { + prerunner: { + image: imageConfig.prerunner, + maxMem: 128 + }, + runner: { + image: "", + maxMem: 512, + exposePortsRange: [30000, 32767], + hostIp: "0.0.0.0" + }, + runnerImages: { + python3: imageConfig.runner.python, + node: imageConfig.runner.node + } } } }; merge(_defaultConfig, { - docker: { - prerunner: { image: imageConfig.prerunner }, - runnerImages: imageConfig.runner, - }, kubernetes: { runnerImages: imageConfig.runner, } @@ -113,7 +109,7 @@ export class ConfigService { } getDockerConfig() { - return this.config.docker; + return this.config.adapters["@scramjet/adapter-docker"]; } update(config: DeepPartial) { diff --git a/packages/sth/src/bin/hub.ts b/packages/sth/src/bin/hub.ts index 111f06cf5..b0df60524 100755 --- a/packages/sth/src/bin/hub.ts +++ b/packages/sth/src/bin/hub.ts @@ -111,20 +111,20 @@ const options: OptionValues & STHCommandOptions = program space: options.platformSpace, apiVersion: options.platformApiVersion }, - docker: { - prerunner: { - image: options.prerunnerImage, - maxMem: options.prerunnerMaxMem - }, - runner: { - maxMem: options.runnerMaxMem, - hostIp: options.exposeHostIp - }, - runnerImages: { - node: options.runnerImage, - python3: options.runnerPyImage - } - }, + // docker: { + // prerunner: { + // image: options.prerunnerImage, + // maxMem: options.prerunnerMaxMem + // }, + // runner: { + // maxMem: options.runnerMaxMem, + // hostIp: options.exposeHostIp + // }, + // runnerImages: { + // node: options.runnerImage, + // python3: options.runnerPyImage + // } + // }, host: { apiBase: "/api/v1", instancesServerPort: options.instancesServerPort ? parseInt(options.instancesServerPort, 10) : undefined, diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index 93d587bc2..be13aad36 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -47,7 +47,7 @@ export { STHRestAPI }; export * from "./sequence-package-json"; export * from "./sequence-adapter"; -export * from "./runtime-adapter"; +export { IRuntimeAdapter } from "./runtime-adapter"; export * from "./instance-adapter"; export * from "./dto/index"; diff --git a/packages/types/src/instance-adapter.ts b/packages/types/src/instance-adapter.ts new file mode 100644 index 000000000..ff522b574 --- /dev/null +++ b/packages/types/src/instance-adapter.ts @@ -0,0 +1,4 @@ +import { ILifeCycleAdapterMain, ILifeCycleAdapterRun } from "./lifecycle-adapters"; +import { IComponent } from "./component"; + +export interface IInstanceAdapter extends ILifeCycleAdapterMain, ILifeCycleAdapterRun, IComponent {} diff --git a/packages/types/src/runtime-adapter.ts b/packages/types/src/runtime-adapter.ts index 348c73725..4cb728967 100644 --- a/packages/types/src/runtime-adapter.ts +++ b/packages/types/src/runtime-adapter.ts @@ -1,4 +1,8 @@ -import { ILifeCycleAdapterMain, ILifeCycleAdapterRun, IComponent, ISequenceAdapter, STHConfiguration, IInstanceAdapter } from "./index" +import { IComponent } from "./component"; +import { IInstanceAdapter } from "./instance-adapter"; +import { ILifeCycleAdapterMain, ILifeCycleAdapterRun } from "./lifecycle-adapters"; +import { ISequenceAdapter } from "./sequence-adapter"; +import { STHConfiguration } from "./sth-configuration"; export interface IRuntimeAdapter { SequenceAdapter: ISequenceAdapter & { new (config: STHConfiguration): ISequenceAdapter }; diff --git a/packages/types/src/sth-configuration.ts b/packages/types/src/sth-configuration.ts index fdf6ae989..b11a1f294 100644 --- a/packages/types/src/sth-configuration.ts +++ b/packages/types/src/sth-configuration.ts @@ -77,6 +77,22 @@ export type ProcessAdapterConfiguration = { sequencesRoot: string; }; +export type DockerAdapterConfiguration = { + /** + * PreRunner container configuration. + */ + prerunner: PreRunnerContainerConfiguration; + + /** + * Runner container configuration. + */ + runner: RunnerContainerConfiguration; + runnerImages: { + python3: string; + node: string; + } +} + export type K8SAdapterConfiguration = { /** * The Kubernetes namespace to use for running sequences @@ -170,25 +186,6 @@ export type STHConfiguration = { */ debug: boolean; - /** - * Docker related configuration. - */ - docker: { - /** - * PreRunner container configuration. - */ - prerunner: PreRunnerContainerConfiguration; - - /** - * Runner container configuration. - */ - runner: RunnerContainerConfiguration; - runnerImages: { - python3: string; - node: string; - } - }; - /** * Host configuration. */ @@ -272,6 +269,7 @@ export type STHConfiguration = { adapters: { "@scramjet/adapter-k8s"?: K8SAdapterConfiguration; "@scramjet/adapter-process"?: ProcessAdapterConfiguration; + "@scramjet/adapter-docker"?: DockerAdapterConfiguration; } }