diff --git a/packages/host/src/lib/cpm-connector.ts b/packages/host/src/lib/cpm-connector.ts index 2c81d3966..20bd14967 100644 --- a/packages/host/src/lib/cpm-connector.ts +++ b/packages/host/src/lib/cpm-connector.ts @@ -2,7 +2,7 @@ import fs from "fs"; import { Readable } from "stream"; import * as http from "http"; -import { CPMMessageCode, InstanceMessageCode, SequenceMessageCode } from "@scramjet/symbols"; +import { CPMMessageCode, SequenceMessageCode } from "@scramjet/symbols"; import { STHRestAPI, CPMConnectorOptions, @@ -582,14 +582,12 @@ export class CPMConnector extends TypedEmitter { * @param {string} instance Instance details. * @param {SequenceMessageCode} instanceStatus Instance status. */ - async sendInstanceInfo(instance: Instance, instanceStatus: InstanceMessageCode): Promise { - this.logger.trace("Send instance status update", instanceStatus); + async sendInstanceInfo(instance: Instance): Promise { + this.logger.trace("Send instance status update", instance.status); await this.communicationStream?.whenWrote( - [CPMMessageCode.INSTANCE, { instance, status: instanceStatus }] + [CPMMessageCode.INSTANCE, { instance }] ); - - this.logger.trace("Instance status update sent", instanceStatus); } /** diff --git a/packages/host/src/lib/csi-controller.ts b/packages/host/src/lib/csi-controller.ts index 50408d7bd..87fec31ca 100644 --- a/packages/host/src/lib/csi-controller.ts +++ b/packages/host/src/lib/csi-controller.ts @@ -6,7 +6,7 @@ import { MessageUtilities } from "@scramjet/model"; import { development } from "@scramjet/sth-config"; -import { CommunicationChannel as CC, RunnerMessageCode } from "@scramjet/symbols"; +import { CommunicationChannel as CC, InstanceStatus, RunnerMessageCode } from "@scramjet/symbols"; import { APIRoute, AppConfig, @@ -19,7 +19,6 @@ import { ILifeCycleAdapterRun, InstanceLimits, InstanceStats, - InstanceStatus, IObjectLogger, MessageDataType, MonitoringMessageData, diff --git a/packages/host/src/lib/csi-dispatcher.ts b/packages/host/src/lib/csi-dispatcher.ts index 46bf443a4..b877b1965 100644 --- a/packages/host/src/lib/csi-dispatcher.ts +++ b/packages/host/src/lib/csi-dispatcher.ts @@ -1,8 +1,8 @@ import { getInstanceAdapter } from "@scramjet/adapters"; import { IDProvider } from "@scramjet/model"; import { ObjLogger } from "@scramjet/obj-logger"; -import { RunnerMessageCode } from "@scramjet/symbols"; -import { ContentType, EventMessageData, HostProxy, ICommunicationHandler, IObjectLogger, Instance, InstanceConfig, InstanceStatus, MessageDataType, PangMessageData, PingMessageData, STHConfiguration, STHRestAPI, SequenceInfo, SequenceInfoInstance } from "@scramjet/types"; +import { InstanceStatus, RunnerMessageCode } from "@scramjet/symbols"; +import { ContentType, EventMessageData, HostProxy, ICommunicationHandler, IObjectLogger, Instance, InstanceConfig, MessageDataType, PangMessageData, PingMessageData, STHConfiguration, STHRestAPI, SequenceInfo, SequenceInfoInstance } from "@scramjet/types"; import { TypedEmitter } from "@scramjet/utility"; import { CSIController, CSIControllerInfo } from "./csi-controller"; import { InstanceStore } from "./instance-store"; @@ -249,6 +249,8 @@ export class CSIDispatcher extends TypedEmitter { this.logger.debug("Dispatched. Waiting for connection...", id); + let established = false; + return await Promise.race([ new Promise((resolve, _reject) => { const resolveFunction = (instance: Instance) => { @@ -256,6 +258,7 @@ export class CSIDispatcher extends TypedEmitter { this.logger.debug("Established", id); this.off("established", resolveFunction); + established = true; resolve(); } }; @@ -271,13 +274,18 @@ export class CSIDispatcher extends TypedEmitter { sequence })), // handle fast fail - before connection is established. - Promise.resolve() - .then(() => instanceAdapter.waitUntilExit(undefined, id, sequence)) - .then(async (exitCode) => { - this.logger.info("Exited before established", id, exitCode); + Promise.resolve().then( + () => instanceAdapter.waitUntilExit(undefined, id, sequence) + .then(async (exitCode: number) => { + if (!established) { + this.logger.info("Exited before established", id, exitCode); + + return mapRunnerExitCode(exitCode, sequence); + } - return mapRunnerExitCode(exitCode, sequence); - }) + return undefined; + }) + ) ]); } } diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index f450b68df..a4a107d66 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -6,7 +6,7 @@ import { AddressInfo } from "net"; import { Duplex } from "stream"; import { CommunicationHandler, HostError, IDProvider } from "@scramjet/model"; -import { HostHeaders, InstanceMessageCode, RunnerMessageCode, SequenceMessageCode } from "@scramjet/symbols"; +import { HostHeaders, InstanceMessageCode, InstanceStatus, RunnerMessageCode, SequenceMessageCode } from "@scramjet/symbols"; import { APIExpose, CPMConnectorOptions, @@ -357,7 +357,7 @@ export class Host implements IComponent { await this.cpmConnector?.sendInstanceInfo({ id: instance.id, sequence: instance.sequence - }, InstanceMessageCode.INSTANCE_CONNECTED); + }); this.pushTelemetry("Instance connected", { id: instance.id, @@ -368,21 +368,22 @@ export class Host implements IComponent { /** * Pass information about ended instance to monitoring and platform services. * - * @param {DispatcherInstanceEndEventData} eventData Event details. + * @param {DispatcherInstanceEndEventData} instance Event details. */ - async handleDispatcherEndEvent(eventData: DispatcherInstanceEndEventData) { - this.auditor.auditInstance(eventData.id, InstanceMessageCode.INSTANCE_ENDED); + async handleDispatcherEndEvent(instance: DispatcherInstanceEndEventData) { + this.auditor.auditInstance(instance.id, InstanceMessageCode.INSTANCE_ENDED); await this.cpmConnector?.sendInstanceInfo({ - id: eventData.id, - sequence: eventData.sequence - }, InstanceMessageCode.INSTANCE_ENDED); + id: instance.id, + status: InstanceStatus.GONE, + sequence: instance.sequence + }); this.pushTelemetry("Instance ended", { - executionTime: eventData.info.executionTime.toString(), - id: eventData.id, - code: eventData.code.toString(), - seqId: eventData.sequence.id + executionTime: instance.info.executionTime.toString(), + id: instance.id, + code: instance.code.toString(), + seqId: instance.sequence.id }); } @@ -1089,7 +1090,7 @@ export class Host implements IComponent { try { const runner = await this.csiDispatcher.startRunner(sequence, payload); - if ("id" in runner) { + if (runner && "id" in runner) { this.logger.debug("Instance limits", runner.limits); this.auditor.auditInstanceStart(runner.id, req as AuditedRequest, runner.limits); this.pushTelemetry("Instance started", { id: runner.id, language: runner.sequence.config.language, seqId: runner.sequence.id }); @@ -1099,9 +1100,11 @@ export class Host implements IComponent { message: `Sequence ${runner.id} starting`, id: runner.id }; - } else { + } else if (runner) { throw runner; } + + throw Error("Unexpected startup error"); } catch (error: any) { this.pushTelemetry("Instance start failed", { error: error.message }, "error"); this.logger.error(error.message); diff --git a/packages/symbols/src/index.ts b/packages/symbols/src/index.ts index e6db6a71b..83d0796ad 100644 --- a/packages/symbols/src/index.ts +++ b/packages/symbols/src/index.ts @@ -8,5 +8,6 @@ export { SequenceMessageCode } from "./sequence-status-code"; export { OpRecordCode } from "./op-record-code"; export { APIErrorCode } from "./api-error-codes"; export { DisconnectHubErrors } from "./disconnect-error-codes"; +export { InstanceStatus } from "./instance-status"; export * from "./headers"; diff --git a/packages/symbols/src/instance-status.ts b/packages/symbols/src/instance-status.ts new file mode 100644 index 000000000..4fe1f0ae3 --- /dev/null +++ b/packages/symbols/src/instance-status.ts @@ -0,0 +1,10 @@ +export const enum InstanceStatus { + INITIALIZING = "initializing", + STARTING = "starting", + RUNNING = "running", + STOPPING = "stopping", + KILLING = "killing", + COMPLETED ="completed", + ERRORED = "errored", + GONE = "gone" +} diff --git a/packages/types/src/instance.ts b/packages/types/src/instance.ts index abef13195..d0b7eb229 100644 --- a/packages/types/src/instance.ts +++ b/packages/types/src/instance.ts @@ -2,16 +2,6 @@ export type InstanceId = string; export type InstanceArgs = any[]; -export const enum InstanceStatus { - INITIALIZING = "initializing", - STARTING = "starting", - RUNNING = "running", - STOPPING = "stopping", - KILLING = "killing", - COMPLETED ="completed", - ERRORED = "errored", -} - export type InstanceConnectionInfo = { }