Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes in adapters in order to store parentID #985

Open
wants to merge 1 commit into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/adapters/src/docker-instance-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ IComponent {

const networkSetup = await this.getNetworkSetup();

const volumeId = config.id + "_" + config.parent_id;

const envs = getRunnerEnvEntries({
sequencePath: path.join(config.sequenceDir, config.entrypointPath),
instancesServerPort,
Expand All @@ -200,7 +202,7 @@ IComponent {
imageName: config.container.image,
volumes: [
...extraVolumes,
{ mountPoint: config.sequenceDir, volume: config.id, writeable: false }
{ mountPoint: config.sequenceDir, volume: volumeId, writeable: false }
],
labels: {
"scramjet.sequence.name": config.name
Expand Down
30 changes: 22 additions & 8 deletions packages/adapters/src/docker-sequence-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,18 @@ class DockerSequenceAdapter implements ISequenceAdapter {
this.logger.debug("Identify started", volume, this.config.docker.prerunner?.maxMem || 0);

const ret = await this.parsePackage(streams, wait, volume);
const [, parentId] = volume.split("_");

if (!ret.id) {
return undefined;
}

this.logger.info("Identified image for volume", { volume, image: ret.container?.image });

if (parentId) {
ret.parent_id = parentId;
}

return ret;
} catch (e: any) {
this.logger.error("Docker failed", e.message, volume);
Expand All @@ -132,17 +137,18 @@ class DockerSequenceAdapter implements ISequenceAdapter {
* @param {Readable} stream Stream containing sequence to be identified.
* @param {string} id Id for the new docker volume where sequence will be stored.
* @param {boolean} override Removes previous sequence
* @param {string} parentId Id which indicates sequence's source.

* @returns {Promise<SequenceConfig>} Promise resolving to sequence config.
*/
async identify(stream: Readable, id: string, override = false): Promise<SequenceConfig> {
async identify(stream: Readable, id: string, override = false, parentId: string): Promise<SequenceConfig> {
const volStart = new Date();

if (override) {
await this.dockerHelper.removeVolume(id);
}

const volumeId = await this.createVolume(id);

const volumeId = await this.createVolume(id, parentId);
const volSecs = (new Date().getTime() - volStart.getTime()) / 1000;

appendFile("timing-log.ndjson", JSON.stringify({
Expand Down Expand Up @@ -189,6 +195,10 @@ class DockerSequenceAdapter implements ISequenceAdapter {

await this.fetch(config.container.image);

if (parentId) {
config.parent_id = parentId;
}

return config;
} catch (err: any) {
this.logger.error("Identify failed on volume", id);
Expand All @@ -204,11 +214,12 @@ class DockerSequenceAdapter implements ISequenceAdapter {
* Creates volume with provided id.
*
* @param {string} id Volume id.
* @param {string} parentId Sequence's parentId.
* @returns {DockerVolume} Created volume.
*/
private async createVolume(id: string): Promise<DockerVolume> {
private async createVolume(id: string, parentId?: string): Promise<DockerVolume> {
try {
return await this.dockerHelper.createVolume(id);
return await this.dockerHelper.createVolume(id, parentId);
} catch (error: any) {
this.logger.error("Error creating volume", id);

Expand All @@ -233,7 +244,6 @@ class DockerSequenceAdapter implements ISequenceAdapter {
const parseStart = new Date();

const [preRunnerResult] = (await Promise.all([readStreamedJSON(streams.stdout as Readable), wait])) as any;

const parseSecs = (new Date().getTime() - parseStart.getTime()) / 1000;

appendFile("timing-log.ndjson", JSON.stringify({
Expand All @@ -252,6 +262,7 @@ class DockerSequenceAdapter implements ISequenceAdapter {
const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(preRunnerResult);
const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {};
const config = validPackageJson.scramjet?.config ? { ...validPackageJson.scramjet.config } : {};
const [id, parentId] = volumeId.split("_");

const container = Object.assign({}, this.config.docker.runner);

Expand All @@ -268,7 +279,8 @@ class DockerSequenceAdapter implements ISequenceAdapter {
config,
sequenceDir: PACKAGE_DIR,
entrypointPath: validPackageJson.main,
id: volumeId,
id: id,
parent_id: parentId,
description: validPackageJson.description,
author: validPackageJson.author,
keywords: validPackageJson.keywords,
Expand All @@ -288,7 +300,9 @@ class DockerSequenceAdapter implements ISequenceAdapter {
throw new Error(`Incorrect SequenceConfig passed to DockerSequenceAdapter: ${config.type}`);
}

await this.dockerHelper.removeVolume(config.id);
const volumeId = config.id + "_" + config.parent_id;

await this.dockerHelper.removeVolume(volumeId);

this.logger.debug("Volume removed", config.id);
}
Expand Down
5 changes: 4 additions & 1 deletion packages/adapters/src/dockerode-docker-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,12 @@ export class DockerodeDockerHelper implements IDockerHelper {
* Creates docker volume.
*
* @param name Volume name. Optional. If not provided, volume will be named with unique name.
* @param parentId Volume parentId. Optional. If not provided, volume will be named the same as the name.
* @returns Volume name.
*/
async createVolume(name: string = ""): Promise<DockerVolume> {
async createVolume(name: string = "", parentId: string = name): Promise<DockerVolume> {
name += "_" + parentId;

return this.dockerode.createVolume({
Name: name,
Labels: {
Expand Down
33 changes: 26 additions & 7 deletions packages/adapters/src/kubernetes-sequence-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { isDefined, readStreamedJSON } from "@scramjet/utility";
import { sequencePackageJSONDecoder } from "./validate-sequence-package-json";
import { adapterConfigDecoder } from "./kubernetes-config-decoder";
import { detectLanguage } from "./utils";
import { IDProvider } from "@scramjet/model";

/**
* Returns existing Sequence configuration.
Expand All @@ -24,8 +25,23 @@ import { detectLanguage } from "./utils";
* @param {string} id Sequence Id.
* @returns {ProcessSequenceConfig} Sequence configuration.
*/
async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string): Promise<KubernetesSequenceConfig> {
const sequenceDir = path.join(sequencesRoot, id);
// eslint-disable-next-line max-len
async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string, parentId?: string): Promise<KubernetesSequenceConfig> {
let sequenceDir: string;

if (parentId) {
sequenceDir = path.join(sequencesRoot, id + "_" + parentId);
} else {
[id, parentId] = id.split("_");
const valid = IDProvider.isValid(id);

if (valid) {
sequenceDir = path.join(sequencesRoot, id + "_" + parentId);
} else {
sequenceDir = path.join(sequencesRoot, id);
}
}

const packageJsonPath = path.join(sequenceDir, "package.json");
const packageJson = await readStreamedJSON(createReadStream(packageJsonPath));

Expand All @@ -38,6 +54,7 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin
version: validPackageJson.version ?? "",
name: validPackageJson.name ?? "",
id,
parent_id: parentId || id,
sequenceDir,
engines,
description: validPackageJson.description,
Expand Down Expand Up @@ -110,13 +127,15 @@ class KubernetesSequenceAdapter implements ISequenceAdapter {
*
* @param {Readable} stream Stream with packed sequence.
* @param {string} id Sequence Id.
* @param {boolean} override Removes previous sequence
* @param {boolean} override Removes previous sequence.
* @param {string} parentId Sequence's parentId.

* @returns {Promise<SequenceConfig>} Promise resolving to identified sequence configuration.
*/
async identify(stream: Readable, id: string, override = false): Promise<SequenceConfig> {
async identify(stream: Readable, id: string, override = false, parentId = id): Promise<SequenceConfig> {
// 1. Unpack package.json to stdout and map to config
// 2. Create compressed package on the disk
const sequenceDir = path.join(this.adapterConfig.sequencesRoot, id);
const sequenceDir = path.join(this.adapterConfig.sequencesRoot, id + "_" + parentId);

if (override) {
await fs.rm(sequenceDir, { recursive: true, force: true });
Expand All @@ -134,7 +153,7 @@ class KubernetesSequenceAdapter implements ISequenceAdapter {

await new Promise(res => uncompressingProc.on("close", res));

return getRunnerConfigForStoredSequence(this.adapterConfig.sequencesRoot, id);
return getRunnerConfigForStoredSequence(this.adapterConfig.sequencesRoot, id, parentId);
}

/**
Expand All @@ -148,7 +167,7 @@ class KubernetesSequenceAdapter implements ISequenceAdapter {
throw new Error(`Incorrect SequenceConfig passed to KubernetesSequenceAdapter: ${config.type}`);
}

const sequenceDir = path.join(this.adapterConfig.sequencesRoot, config.id);
const sequenceDir = config.sequenceDir;

this.logger.debug("Removing sequence directory...", sequenceDir);

Expand Down
35 changes: 26 additions & 9 deletions packages/adapters/src/process-sequence-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import path from "path";
import { exec } from "child_process";
import { isDefined, readStreamedJSON } from "@scramjet/utility";
import { sequencePackageJSONDecoder } from "./validate-sequence-package-json";
import { SequenceAdapterError } from "@scramjet/model";
import { IDProvider, SequenceAdapterError } from "@scramjet/model";
import { detectLanguage } from "./utils";

/**
Expand All @@ -23,12 +23,25 @@ import { detectLanguage } from "./utils";
* @param {string} id Sequence Id.
* @returns {ProcessSequenceConfig} Sequence configuration.
*/
// eslint-disable-next-line complexity
async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string): Promise<ProcessSequenceConfig> {
const sequenceDir = path.join(sequencesRoot, id);
// eslint-disable-next-line complexity, max-len
async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string, parentId?: string): Promise<ProcessSequenceConfig> {
let sequenceDir: string;

if (parentId) {
sequenceDir = path.join(sequencesRoot, id + "_" + parentId);
} else {
[id, parentId] = id.split("_");
const valid = IDProvider.isValid(id);

if (valid) {
sequenceDir = path.join(sequencesRoot, id + "_" + parentId);
} else {
sequenceDir = path.join(sequencesRoot, id);
}
}

const packageJsonPath = path.join(sequenceDir, "package.json");
const packageJson = await readStreamedJSON(createReadStream(packageJsonPath));

const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(packageJson);
const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {};

Expand All @@ -39,6 +52,7 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin
version: validPackageJson.version ?? "",
name: validPackageJson.name ?? "",
id,
parent_id: parentId || id,
sequenceDir,
description: validPackageJson.description,
author: validPackageJson.author,
Expand Down Expand Up @@ -82,6 +96,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter {
*/
async list(): Promise<SequenceConfig[]> {
const storedSequencesIds = await fs.readdir(this.config.sequencesRoot);

const sequencesConfigs = (await Promise.all(
storedSequencesIds
.map((id) => getRunnerConfigForStoredSequence(this.config.sequencesRoot, id))
Expand All @@ -100,10 +115,12 @@ class ProcessSequenceAdapter implements ISequenceAdapter {
* @param {Readable} stream Stream with packed sequence.
* @param {string} id Sequence Id.
* @param {boolean} override Removes previous sequence
* @param {string} parentId Sequence's parentId.

* @returns {Promise<SequenceConfig>} Promise resolving to identified sequence configuration.
*/
async identify(stream: Readable, id: string, override = false): Promise<SequenceConfig> {
const sequenceDir = path.join(this.config.sequencesRoot, id);
async identify(stream: Readable, id: string, override = false, parentId = id): Promise<SequenceConfig> {
const sequenceDir = path.join(this.config.sequencesRoot, id + "_" + parentId);

if (override) {
await fs.rm(sequenceDir, { recursive: true, force: true });
Expand Down Expand Up @@ -140,7 +157,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter {

this.logger.debug("Unpacking sequence succeeded", stderrOutput);

return getRunnerConfigForStoredSequence(this.config.sequencesRoot, id);
return getRunnerConfigForStoredSequence(this.config.sequencesRoot, id, parentId);
}

/**
Expand All @@ -154,7 +171,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter {
throw new Error(`Incorrect SequenceConfig passed to ProcessSequenceAdapter: ${config.type}`);
}

const sequenceDir = path.join(this.config.sequencesRoot, config.id);
const sequenceDir = config.sequenceDir;

return fs.rm(sequenceDir, { recursive: true });
}
Expand Down
2 changes: 1 addition & 1 deletion packages/adapters/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ export interface IDockerHelper {
*
* @returns {Promise<DockerVolume>} Created volume.
*/
createVolume: (name?: string) => Promise<DockerVolume>;
createVolume: (name?: string, parentId?: string) => Promise<DockerVolume>;

/**
* Removes volume.
Expand Down
3 changes: 2 additions & 1 deletion packages/host/src/lib/csi-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,8 @@ export class CSIController extends TypedEmitter<Events> {
id: this.sequence.id,
config: this.sequence.config,
name: this.sequence.name,
location : this.sequence.location
location : this.sequence.location,
parent_id: this.sequence.parent_id
},
ports: this.info.ports,
created: this.info.created,
Expand Down
Loading
Loading