Skip to content

Commit

Permalink
Move InstanceStatus to enums. Send instance event without saparate st…
Browse files Browse the repository at this point in the history
…atus
  • Loading branch information
patuwwy committed Jan 24, 2024
1 parent 0704147 commit 17b1ee5
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 40 deletions.
10 changes: 4 additions & 6 deletions packages/host/src/lib/cpm-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import fs from "fs";
import { Readable } from "stream";
import * as http from "http";

import { CPMMessageCode, InstanceMessageCode, SequenceMessageCode } from "@scramjet/symbols";
import { CPMMessageCode, SequenceMessageCode } from "@scramjet/symbols";
import {
STHRestAPI,
CPMConnectorOptions,
Expand Down Expand Up @@ -582,14 +582,12 @@ export class CPMConnector extends TypedEmitter<Events> {
* @param {string} instance Instance details.
* @param {SequenceMessageCode} instanceStatus Instance status.
*/
async sendInstanceInfo(instance: Instance, instanceStatus: InstanceMessageCode): Promise<void> {
this.logger.trace("Send instance status update", instanceStatus);
async sendInstanceInfo(instance: Instance): Promise<void> {
this.logger.trace("Send instance status update", instance.status);

await this.communicationStream?.whenWrote(
[CPMMessageCode.INSTANCE, { instance, status: instanceStatus }]
[CPMMessageCode.INSTANCE, { instance }]
);

this.logger.trace("Instance status update sent", instanceStatus);
}

/**
Expand Down
3 changes: 1 addition & 2 deletions packages/host/src/lib/csi-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
MessageUtilities
} from "@scramjet/model";
import { development } from "@scramjet/sth-config";
import { CommunicationChannel as CC, RunnerMessageCode } from "@scramjet/symbols";
import { CommunicationChannel as CC, InstanceStatus, RunnerMessageCode } from "@scramjet/symbols";
import {
APIRoute,
AppConfig,
Expand All @@ -19,7 +19,6 @@ import {
ILifeCycleAdapterRun,
InstanceLimits,
InstanceStats,
InstanceStatus,
IObjectLogger,
MessageDataType,
MonitoringMessageData,
Expand Down
24 changes: 16 additions & 8 deletions packages/host/src/lib/csi-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { getInstanceAdapter } from "@scramjet/adapters";
import { IDProvider } from "@scramjet/model";
import { ObjLogger } from "@scramjet/obj-logger";
import { RunnerMessageCode } from "@scramjet/symbols";
import { ContentType, EventMessageData, HostProxy, ICommunicationHandler, IObjectLogger, Instance, InstanceConfig, InstanceStatus, MessageDataType, PangMessageData, PingMessageData, STHConfiguration, STHRestAPI, SequenceInfo, SequenceInfoInstance } from "@scramjet/types";
import { InstanceStatus, RunnerMessageCode } from "@scramjet/symbols";
import { ContentType, EventMessageData, HostProxy, ICommunicationHandler, IObjectLogger, Instance, InstanceConfig, MessageDataType, PangMessageData, PingMessageData, STHConfiguration, STHRestAPI, SequenceInfo, SequenceInfoInstance } from "@scramjet/types";
import { TypedEmitter } from "@scramjet/utility";
import { CSIController, CSIControllerInfo } from "./csi-controller";
import { InstanceStore } from "./instance-store";
Expand Down Expand Up @@ -249,13 +249,16 @@ export class CSIDispatcher extends TypedEmitter<Events> {

this.logger.debug("Dispatched. Waiting for connection...", id);

let established = false;

return await Promise.race([
new Promise<void>((resolve, _reject) => {
const resolveFunction = (instance: Instance) => {
if (instance.id === id) {
this.logger.debug("Established", id);

this.off("established", resolveFunction);
established = true;
resolve();
}
};
Expand All @@ -271,13 +274,18 @@ export class CSIDispatcher extends TypedEmitter<Events> {
sequence
})),
// handle fast fail - before connection is established.
Promise.resolve()
.then(() => instanceAdapter.waitUntilExit(undefined, id, sequence))
.then(async (exitCode) => {
this.logger.info("Exited before established", id, exitCode);
Promise.resolve().then(
() => instanceAdapter.waitUntilExit(undefined, id, sequence)
.then(async (exitCode: number) => {
if (!established) {
this.logger.info("Exited before established", id, exitCode);

return mapRunnerExitCode(exitCode, sequence);
}

return mapRunnerExitCode(exitCode, sequence);
})
return undefined;
})
)
]);
}
}
31 changes: 17 additions & 14 deletions packages/host/src/lib/host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { AddressInfo } from "net";
import { Duplex } from "stream";

import { CommunicationHandler, HostError, IDProvider } from "@scramjet/model";
import { HostHeaders, InstanceMessageCode, RunnerMessageCode, SequenceMessageCode } from "@scramjet/symbols";
import { HostHeaders, InstanceMessageCode, InstanceStatus, RunnerMessageCode, SequenceMessageCode } from "@scramjet/symbols";
import {
APIExpose,
CPMConnectorOptions,
Expand Down Expand Up @@ -357,7 +357,7 @@ export class Host implements IComponent {
await this.cpmConnector?.sendInstanceInfo({
id: instance.id,
sequence: instance.sequence
}, InstanceMessageCode.INSTANCE_CONNECTED);
});

this.pushTelemetry("Instance connected", {
id: instance.id,
Expand All @@ -368,21 +368,22 @@ export class Host implements IComponent {
/**
* Pass information about ended instance to monitoring and platform services.
*
* @param {DispatcherInstanceEndEventData} eventData Event details.
* @param {DispatcherInstanceEndEventData} instance Event details.
*/
async handleDispatcherEndEvent(eventData: DispatcherInstanceEndEventData) {
this.auditor.auditInstance(eventData.id, InstanceMessageCode.INSTANCE_ENDED);
async handleDispatcherEndEvent(instance: DispatcherInstanceEndEventData) {
this.auditor.auditInstance(instance.id, InstanceMessageCode.INSTANCE_ENDED);

await this.cpmConnector?.sendInstanceInfo({
id: eventData.id,
sequence: eventData.sequence
}, InstanceMessageCode.INSTANCE_ENDED);
id: instance.id,
status: InstanceStatus.GONE,
sequence: instance.sequence
});

this.pushTelemetry("Instance ended", {
executionTime: eventData.info.executionTime.toString(),
id: eventData.id,
code: eventData.code.toString(),
seqId: eventData.sequence.id
executionTime: instance.info.executionTime.toString(),
id: instance.id,
code: instance.code.toString(),
seqId: instance.sequence.id
});
}

Expand Down Expand Up @@ -1089,7 +1090,7 @@ export class Host implements IComponent {
try {
const runner = await this.csiDispatcher.startRunner(sequence, payload);

if ("id" in runner) {
if (runner && "id" in runner) {
this.logger.debug("Instance limits", runner.limits);
this.auditor.auditInstanceStart(runner.id, req as AuditedRequest, runner.limits);
this.pushTelemetry("Instance started", { id: runner.id, language: runner.sequence.config.language, seqId: runner.sequence.id });
Expand All @@ -1099,9 +1100,11 @@ export class Host implements IComponent {
message: `Sequence ${runner.id} starting`,
id: runner.id
};
} else {
} else if (runner) {
throw runner;
}

throw Error("Unexpected startup error");
} catch (error: any) {
this.pushTelemetry("Instance start failed", { error: error.message }, "error");
this.logger.error(error.message);
Expand Down
1 change: 1 addition & 0 deletions packages/symbols/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ export { SequenceMessageCode } from "./sequence-status-code";
export { OpRecordCode } from "./op-record-code";
export { APIErrorCode } from "./api-error-codes";
export { DisconnectHubErrors } from "./disconnect-error-codes";
export { InstanceStatus } from "./instance-status";

export * from "./headers";
10 changes: 10 additions & 0 deletions packages/symbols/src/instance-status.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export const enum InstanceStatus {
INITIALIZING = "initializing",
STARTING = "starting",
RUNNING = "running",
STOPPING = "stopping",
KILLING = "killing",
COMPLETED ="completed",
ERRORED = "errored",
GONE = "gone"
}
10 changes: 0 additions & 10 deletions packages/types/src/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,6 @@ export type InstanceId = string;

export type InstanceArgs = any[];

export const enum InstanceStatus {
INITIALIZING = "initializing",
STARTING = "starting",
RUNNING = "running",
STOPPING = "stopping",
KILLING = "killing",
COMPLETED ="completed",
ERRORED = "errored",
}

export type InstanceConnectionInfo = {

}

0 comments on commit 17b1ee5

Please sign in to comment.