From 38ee414fd76dfdc6d6ef625d7d5067657637e15d Mon Sep 17 00:00:00 2001 From: patuwwy Date: Mon, 13 Nov 2023 12:12:25 +0000 Subject: [PATCH] Changes in adapters in order to store parentID --- .../adapters/src/docker-instance-adapter.ts | 4 +- .../adapters/src/docker-sequence-adapter.ts | 30 +++++++++++---- .../adapters/src/dockerode-docker-helper.ts | 5 ++- .../src/kubernetes-sequence-adapter.ts | 33 +++++++++++++---- .../adapters/src/process-sequence-adapter.ts | 35 +++++++++++++----- packages/adapters/src/types.ts | 2 +- packages/host/src/lib/csi-controller.ts | 3 +- packages/host/src/lib/host.ts | 37 ++++++++++++------- packages/types/src/messages/sequence.ts | 3 +- .../src/rest-api-manager/get-sequence.ts | 1 + .../types/src/rest-api-sth/get-sequence.ts | 1 + packages/types/src/runner-config.ts | 1 + packages/types/src/sequence-adapter.ts | 3 +- 13 files changed, 115 insertions(+), 43 deletions(-) diff --git a/packages/adapters/src/docker-instance-adapter.ts b/packages/adapters/src/docker-instance-adapter.ts index 4d88cde88..9a5063480 100644 --- a/packages/adapters/src/docker-instance-adapter.ts +++ b/packages/adapters/src/docker-instance-adapter.ts @@ -186,6 +186,8 @@ IComponent { const networkSetup = await this.getNetworkSetup(); + const volumeId = config.id + "_" + config.parent_id; + const envs = getRunnerEnvEntries({ sequencePath: path.join(config.sequenceDir, config.entrypointPath), instancesServerPort, @@ -200,7 +202,7 @@ IComponent { imageName: config.container.image, volumes: [ ...extraVolumes, - { mountPoint: config.sequenceDir, volume: config.id, writeable: false } + { mountPoint: config.sequenceDir, volume: volumeId, writeable: false } ], labels: { "scramjet.sequence.name": config.name diff --git a/packages/adapters/src/docker-sequence-adapter.ts b/packages/adapters/src/docker-sequence-adapter.ts index a68360c42..369e81dae 100644 --- a/packages/adapters/src/docker-sequence-adapter.ts +++ b/packages/adapters/src/docker-sequence-adapter.ts @@ -108,6 +108,7 @@ class DockerSequenceAdapter implements ISequenceAdapter { this.logger.debug("Identify started", volume, this.config.docker.prerunner?.maxMem || 0); const ret = await this.parsePackage(streams, wait, volume); + const [, parentId] = volume.split("_"); if (!ret.id) { return undefined; @@ -115,6 +116,10 @@ class DockerSequenceAdapter implements ISequenceAdapter { this.logger.info("Identified image for volume", { volume, image: ret.container?.image }); + if (parentId) { + ret.parent_id = parentId; + } + return ret; } catch (e: any) { this.logger.error("Docker failed", e.message, volume); @@ -132,17 +137,18 @@ class DockerSequenceAdapter implements ISequenceAdapter { * @param {Readable} stream Stream containing sequence to be identified. * @param {string} id Id for the new docker volume where sequence will be stored. * @param {boolean} override Removes previous sequence + * @param {string} parentId Id which indicates sequence's source. + * @returns {Promise} Promise resolving to sequence config. */ - async identify(stream: Readable, id: string, override = false): Promise { + async identify(stream: Readable, id: string, override = false, parentId: string): Promise { const volStart = new Date(); if (override) { await this.dockerHelper.removeVolume(id); } - const volumeId = await this.createVolume(id); - + const volumeId = await this.createVolume(id, parentId); const volSecs = (new Date().getTime() - volStart.getTime()) / 1000; appendFile("timing-log.ndjson", JSON.stringify({ @@ -189,6 +195,10 @@ class DockerSequenceAdapter implements ISequenceAdapter { await this.fetch(config.container.image); + if (parentId) { + config.parent_id = parentId; + } + return config; } catch (err: any) { this.logger.error("Identify failed on volume", id); @@ -204,11 +214,12 @@ class DockerSequenceAdapter implements ISequenceAdapter { * Creates volume with provided id. * * @param {string} id Volume id. + * @param {string} parentId Sequence's parentId. * @returns {DockerVolume} Created volume. */ - private async createVolume(id: string): Promise { + private async createVolume(id: string, parentId?: string): Promise { try { - return await this.dockerHelper.createVolume(id); + return await this.dockerHelper.createVolume(id, parentId); } catch (error: any) { this.logger.error("Error creating volume", id); @@ -233,7 +244,6 @@ class DockerSequenceAdapter implements ISequenceAdapter { const parseStart = new Date(); const [preRunnerResult] = (await Promise.all([readStreamedJSON(streams.stdout as Readable), wait])) as any; - const parseSecs = (new Date().getTime() - parseStart.getTime()) / 1000; appendFile("timing-log.ndjson", JSON.stringify({ @@ -252,6 +262,7 @@ class DockerSequenceAdapter implements ISequenceAdapter { const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(preRunnerResult); const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {}; const config = validPackageJson.scramjet?.config ? { ...validPackageJson.scramjet.config } : {}; + const [id, parentId] = volumeId.split("_"); const container = Object.assign({}, this.config.docker.runner); @@ -268,7 +279,8 @@ class DockerSequenceAdapter implements ISequenceAdapter { config, sequenceDir: PACKAGE_DIR, entrypointPath: validPackageJson.main, - id: volumeId, + id: id, + parent_id: parentId, description: validPackageJson.description, author: validPackageJson.author, keywords: validPackageJson.keywords, @@ -288,7 +300,9 @@ class DockerSequenceAdapter implements ISequenceAdapter { throw new Error(`Incorrect SequenceConfig passed to DockerSequenceAdapter: ${config.type}`); } - await this.dockerHelper.removeVolume(config.id); + const volumeId = config.id + "_" + config.parent_id; + + await this.dockerHelper.removeVolume(volumeId); this.logger.debug("Volume removed", config.id); } diff --git a/packages/adapters/src/dockerode-docker-helper.ts b/packages/adapters/src/dockerode-docker-helper.ts index f0c2db067..f853ba5e8 100644 --- a/packages/adapters/src/dockerode-docker-helper.ts +++ b/packages/adapters/src/dockerode-docker-helper.ts @@ -261,9 +261,12 @@ export class DockerodeDockerHelper implements IDockerHelper { * Creates docker volume. * * @param name Volume name. Optional. If not provided, volume will be named with unique name. + * @param parentId Volume parentId. Optional. If not provided, volume will be named the same as the name. * @returns Volume name. */ - async createVolume(name: string = ""): Promise { + async createVolume(name: string = "", parentId: string = name): Promise { + name += "_" + parentId; + return this.dockerode.createVolume({ Name: name, Labels: { diff --git a/packages/adapters/src/kubernetes-sequence-adapter.ts b/packages/adapters/src/kubernetes-sequence-adapter.ts index 5fc3d97d4..24c16c30f 100644 --- a/packages/adapters/src/kubernetes-sequence-adapter.ts +++ b/packages/adapters/src/kubernetes-sequence-adapter.ts @@ -16,6 +16,7 @@ import { isDefined, readStreamedJSON } from "@scramjet/utility"; import { sequencePackageJSONDecoder } from "./validate-sequence-package-json"; import { adapterConfigDecoder } from "./kubernetes-config-decoder"; import { detectLanguage } from "./utils"; +import { IDProvider } from "@scramjet/model"; /** * Returns existing Sequence configuration. @@ -24,8 +25,23 @@ import { detectLanguage } from "./utils"; * @param {string} id Sequence Id. * @returns {ProcessSequenceConfig} Sequence configuration. */ -async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string): Promise { - const sequenceDir = path.join(sequencesRoot, id); +// eslint-disable-next-line max-len +async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string, parentId?: string): Promise { + let sequenceDir: string; + + if (parentId) { + sequenceDir = path.join(sequencesRoot, id + "_" + parentId); + } else { + [id, parentId] = id.split("_"); + const valid = IDProvider.isValid(id); + + if (valid) { + sequenceDir = path.join(sequencesRoot, id + "_" + parentId); + } else { + sequenceDir = path.join(sequencesRoot, id); + } + } + const packageJsonPath = path.join(sequenceDir, "package.json"); const packageJson = await readStreamedJSON(createReadStream(packageJsonPath)); @@ -38,6 +54,7 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin version: validPackageJson.version ?? "", name: validPackageJson.name ?? "", id, + parent_id: parentId || id, sequenceDir, engines, description: validPackageJson.description, @@ -110,13 +127,15 @@ class KubernetesSequenceAdapter implements ISequenceAdapter { * * @param {Readable} stream Stream with packed sequence. * @param {string} id Sequence Id. - * @param {boolean} override Removes previous sequence + * @param {boolean} override Removes previous sequence. + * @param {string} parentId Sequence's parentId. + * @returns {Promise} Promise resolving to identified sequence configuration. */ - async identify(stream: Readable, id: string, override = false): Promise { + async identify(stream: Readable, id: string, override = false, parentId = id): Promise { // 1. Unpack package.json to stdout and map to config // 2. Create compressed package on the disk - const sequenceDir = path.join(this.adapterConfig.sequencesRoot, id); + const sequenceDir = path.join(this.adapterConfig.sequencesRoot, id + "_" + parentId); if (override) { await fs.rm(sequenceDir, { recursive: true, force: true }); @@ -134,7 +153,7 @@ class KubernetesSequenceAdapter implements ISequenceAdapter { await new Promise(res => uncompressingProc.on("close", res)); - return getRunnerConfigForStoredSequence(this.adapterConfig.sequencesRoot, id); + return getRunnerConfigForStoredSequence(this.adapterConfig.sequencesRoot, id, parentId); } /** @@ -148,7 +167,7 @@ class KubernetesSequenceAdapter implements ISequenceAdapter { throw new Error(`Incorrect SequenceConfig passed to KubernetesSequenceAdapter: ${config.type}`); } - const sequenceDir = path.join(this.adapterConfig.sequencesRoot, config.id); + const sequenceDir = config.sequenceDir; this.logger.debug("Removing sequence directory...", sequenceDir); diff --git a/packages/adapters/src/process-sequence-adapter.ts b/packages/adapters/src/process-sequence-adapter.ts index 955b19a50..c046d5b49 100644 --- a/packages/adapters/src/process-sequence-adapter.ts +++ b/packages/adapters/src/process-sequence-adapter.ts @@ -13,7 +13,7 @@ import path from "path"; import { exec } from "child_process"; import { isDefined, readStreamedJSON } from "@scramjet/utility"; import { sequencePackageJSONDecoder } from "./validate-sequence-package-json"; -import { SequenceAdapterError } from "@scramjet/model"; +import { IDProvider, SequenceAdapterError } from "@scramjet/model"; import { detectLanguage } from "./utils"; /** @@ -23,12 +23,25 @@ import { detectLanguage } from "./utils"; * @param {string} id Sequence Id. * @returns {ProcessSequenceConfig} Sequence configuration. */ -// eslint-disable-next-line complexity -async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string): Promise { - const sequenceDir = path.join(sequencesRoot, id); +// eslint-disable-next-line complexity, max-len +async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string, parentId?: string): Promise { + let sequenceDir: string; + + if (parentId) { + sequenceDir = path.join(sequencesRoot, id + "_" + parentId); + } else { + [id, parentId] = id.split("_"); + const valid = IDProvider.isValid(id); + + if (valid) { + sequenceDir = path.join(sequencesRoot, id + "_" + parentId); + } else { + sequenceDir = path.join(sequencesRoot, id); + } + } + const packageJsonPath = path.join(sequenceDir, "package.json"); const packageJson = await readStreamedJSON(createReadStream(packageJsonPath)); - const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(packageJson); const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {}; @@ -39,6 +52,7 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin version: validPackageJson.version ?? "", name: validPackageJson.name ?? "", id, + parent_id: parentId || id, sequenceDir, description: validPackageJson.description, author: validPackageJson.author, @@ -82,6 +96,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter { */ async list(): Promise { const storedSequencesIds = await fs.readdir(this.config.sequencesRoot); + const sequencesConfigs = (await Promise.all( storedSequencesIds .map((id) => getRunnerConfigForStoredSequence(this.config.sequencesRoot, id)) @@ -100,10 +115,12 @@ class ProcessSequenceAdapter implements ISequenceAdapter { * @param {Readable} stream Stream with packed sequence. * @param {string} id Sequence Id. * @param {boolean} override Removes previous sequence + * @param {string} parentId Sequence's parentId. + * @returns {Promise} Promise resolving to identified sequence configuration. */ - async identify(stream: Readable, id: string, override = false): Promise { - const sequenceDir = path.join(this.config.sequencesRoot, id); + async identify(stream: Readable, id: string, override = false, parentId = id): Promise { + const sequenceDir = path.join(this.config.sequencesRoot, id + "_" + parentId); if (override) { await fs.rm(sequenceDir, { recursive: true, force: true }); @@ -140,7 +157,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter { this.logger.debug("Unpacking sequence succeeded", stderrOutput); - return getRunnerConfigForStoredSequence(this.config.sequencesRoot, id); + return getRunnerConfigForStoredSequence(this.config.sequencesRoot, id, parentId); } /** @@ -154,7 +171,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter { throw new Error(`Incorrect SequenceConfig passed to ProcessSequenceAdapter: ${config.type}`); } - const sequenceDir = path.join(this.config.sequencesRoot, config.id); + const sequenceDir = config.sequenceDir; return fs.rm(sequenceDir, { recursive: true }); } diff --git a/packages/adapters/src/types.ts b/packages/adapters/src/types.ts index 71ffcec83..af67f24db 100644 --- a/packages/adapters/src/types.ts +++ b/packages/adapters/src/types.ts @@ -260,7 +260,7 @@ export interface IDockerHelper { * * @returns {Promise} Created volume. */ - createVolume: (name?: string) => Promise; + createVolume: (name?: string, parentId?: string) => Promise; /** * Removes volume. diff --git a/packages/host/src/lib/csi-controller.ts b/packages/host/src/lib/csi-controller.ts index eaab41dd6..cebe8509e 100644 --- a/packages/host/src/lib/csi-controller.ts +++ b/packages/host/src/lib/csi-controller.ts @@ -790,7 +790,8 @@ export class CSIController extends TypedEmitter { id: this.sequence.id, config: this.sequence.config, name: this.sequence.name, - location : this.sequence.location + location : this.sequence.location, + parent_id: this.sequence.parent_id }, ports: this.info.ports, created: this.info.created, diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index 6c837948b..a61751eeb 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -747,9 +747,9 @@ export class Host implements IComponent { if (this.config.host.id) { // eslint-disable-next-line max-len - this.sequenceStore.set({ id: config.id, config: config, instances: [], location: this.config.host.id }); + this.sequenceStore.set({ id: config.id, parent_id: config.parent_id, config, instances: [], location: this.config.host.id }); } else { - this.sequenceStore.set({ id: config.id, config: config, instances: [], location: "STH" }); + this.sequenceStore.set({ id: config.id, parent_id: config.parent_id, config, instances: [], location: "STH" }); } } this.logger.info(` ${configs.length} sequences identified`); @@ -760,10 +760,10 @@ export class Host implements IComponent { async handleIncomingSequence( stream: ParsedMessage, - id: string + id: string, + external?: boolean ): Promise> { stream.params ||= {}; - const sequenceName = stream.params.id_name || stream.headers["x-name"]; this.logger.info("New Sequence incoming", { name: sequenceName }); @@ -795,21 +795,30 @@ export class Host implements IComponent { } } - const config = await sequenceAdapter.identify(stream, id); + const parentId = id; + + if (external) { + id = IDProvider.generate(); + } + + const config = await sequenceAdapter.identify(stream, id, false, parentId); config.packageSize = stream.socket?.bytesRead; if (this.config.host.id) { // eslint-disable-next-line max-len - this.sequenceStore.set({ id, config, instances: [], name: sequenceName, location: this.config.host.id }); + this.sequenceStore.set({ id, parent_id: config.parent_id, config, instances: [], name: sequenceName, location: this.config.host.id }); } else { - this.sequenceStore.set({ id, config, instances: [], name: sequenceName, location: "STH" }); + this.sequenceStore.set({ id, parent_id: config.parent_id, config, instances: [], name: sequenceName, location: "STH" }); } this.logger.trace(`Sequence identified: ${config.id}`); - // eslint-disable-next-line max-len - await this.cpmConnector?.sendSequenceInfo(id, SequenceMessageCode.SEQUENCE_CREATED, config as unknown as GetSequenceResponse); + await this.cpmConnector?.sendSequenceInfo( + id, + SequenceMessageCode.SEQUENCE_CREATED, + config as unknown as GetSequenceResponse + ); this.auditor.auditSequence(id, SequenceMessageCode.SEQUENCE_CREATED); this.pushTelemetry("Sequence uploaded", { language: config.language.toLowerCase(), seqId: id }); @@ -854,9 +863,10 @@ export class Host implements IComponent { * * @param {IncomingMessage} stream Stream of packaged Sequence. * @param {string} id Sequence id. + * @param {boolean} external Define the source of sequence. * @returns {Promise} Promise resolving to operation result. */ - async handleNewSequence(stream: ParsedMessage, id = IDProvider.generate()): + async handleNewSequence(stream: ParsedMessage, id = IDProvider.generate(), external?: boolean): Promise> { const sequenceName = stream.headers["x-name"] as string; @@ -872,8 +882,7 @@ export class Host implements IComponent { }; } } - - return this.handleIncomingSequence(stream, id); + return this.handleIncomingSequence(stream, id, external); } async getExternalSequence(id: string): Promise { @@ -895,7 +904,8 @@ export class Host implements IComponent { const result = (await this.handleNewSequence( packageStream as ParsedMessage, - id + id, + true )) as STHRestAPI.SendSequenceResponse; return this.sequenceStore.getById(result.id)!; @@ -1154,6 +1164,7 @@ export class Host implements IComponent { return { opStatus: ReasonPhrases.OK, id: sequence.id, + parent_id: sequence.parent_id, name: sequence.name, config: sequence.config, location: sequence.location, diff --git a/packages/types/src/messages/sequence.ts b/packages/types/src/messages/sequence.ts index 86da1d40b..60d9af6d5 100644 --- a/packages/types/src/messages/sequence.ts +++ b/packages/types/src/messages/sequence.ts @@ -4,7 +4,8 @@ import { SequenceConfig } from "../runner-config"; export type SequenceMessageData = { id: string, status: SequenceMessageCode, - config: SequenceConfig + config: SequenceConfig, + parent_id?: string } export type SequenceMessage = { msgCode: CPMMessageCode.SEQUENCE } & SequenceMessageData; diff --git a/packages/types/src/rest-api-manager/get-sequence.ts b/packages/types/src/rest-api-manager/get-sequence.ts index ed7231c0d..43cde6293 100644 --- a/packages/types/src/rest-api-manager/get-sequence.ts +++ b/packages/types/src/rest-api-manager/get-sequence.ts @@ -4,6 +4,7 @@ export type GetSequenceResponse = { instances: string[]; name?: string; id: string; + parent_id: string; config: SequenceConfig; location : string; } diff --git a/packages/types/src/rest-api-sth/get-sequence.ts b/packages/types/src/rest-api-sth/get-sequence.ts index 781ba9a1d..e10df9447 100644 --- a/packages/types/src/rest-api-sth/get-sequence.ts +++ b/packages/types/src/rest-api-sth/get-sequence.ts @@ -2,6 +2,7 @@ import { SequenceConfig } from "../runner-config"; export type GetSequenceResponse = { id: string; + parent_id: string; name?: string; config: SequenceConfig, location: string, diff --git a/packages/types/src/runner-config.ts b/packages/types/src/runner-config.ts index 251341012..27a0f2003 100644 --- a/packages/types/src/runner-config.ts +++ b/packages/types/src/runner-config.ts @@ -9,6 +9,7 @@ type CommonSequenceConfig = { type: string; engines: Record; id: string; + parent_id: string; /** * Relative path from sequence package root to JS file */ diff --git a/packages/types/src/sequence-adapter.ts b/packages/types/src/sequence-adapter.ts index 86a2d966e..345407341 100644 --- a/packages/types/src/sequence-adapter.ts +++ b/packages/types/src/sequence-adapter.ts @@ -8,6 +8,7 @@ export type SequenceInfo = { instances: string[]; location : string; name? : string; + parent_id: string; } export type SequenceInfoInstance = Omit; @@ -28,7 +29,7 @@ export interface ISequenceAdapter { /** * Identifies new Sequence */ - identify(stream: Readable, id: string, override?: boolean): Promise; + identify(stream: Readable, id: string, override?: boolean, parent_id?: string,): Promise; remove(conifg: SequenceConfig): Promise