diff --git a/packages/adapters/src/docker-instance-adapter.ts b/packages/adapters/src/docker-instance-adapter.ts index 11b868b37..1f673e6ce 100644 --- a/packages/adapters/src/docker-instance-adapter.ts +++ b/packages/adapters/src/docker-instance-adapter.ts @@ -185,7 +185,7 @@ IComponent { } // eslint-disable-next-line complexity - async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise { + async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise { if (!(config.type === "docker" && "container" in config)) { throw new Error("Docker instance adapter run with invalid runner config"); } @@ -241,6 +241,8 @@ IComponent { this.resources.containerId = containerId; // doesnt matter this.logger.trace("Container is running", containerId); + + return 0; } async waitUntilExit(config: InstanceConfig, instanceId:string, _sequenceInfo: SequenceInfo): Promise { diff --git a/packages/adapters/src/kubernetes-instance-adapter.ts b/packages/adapters/src/kubernetes-instance-adapter.ts index d68e3c001..7be572307 100644 --- a/packages/adapters/src/kubernetes-instance-adapter.ts +++ b/packages/adapters/src/kubernetes-instance-adapter.ts @@ -95,13 +95,13 @@ IComponent { } }; } - async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise { + async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise { if (config.type !== "kubernetes") { throw new Error(`Invalid config type for kubernetes adapter: ${config.type}`); } if (this.adapterConfig.quotaName && await this.kubeClient.isPodsLimitReached(this.adapterConfig.quotaName)) { - throw Error(RunnerExitCode.PODS_LIMIT_REACHED.toString()); + return RunnerExitCode.PODS_LIMIT_REACHED; } this.limits = config.limits; @@ -163,7 +163,7 @@ IComponent { // This means runner pod was unable to start. So it went from "Pending" to "Failed" state directly. // Return 1 which is Linux exit code for "General Error" since we are not able // to determine what happened exactly. - return; + return RunnerExitCode.UNCAUGHT_EXCEPTION; } this.logger.debug("Copy sequence files to Runner"); @@ -176,6 +176,8 @@ IComponent { await this.kubeClient.exec(runnerName, runnerName, ["unpack.sh", "/package"], process.stdout, this.stdErrorStream, compressedStream, 2); this.logger.debug("Copy command done"); + + return 0; } async waitUntilExit(_config: InstanceConfig, instanceId: string, _sequenceInfo: SequenceInfo): Promise { diff --git a/packages/adapters/src/process-instance-adapter.ts b/packages/adapters/src/process-instance-adapter.ts index 6679a545b..b4f6822e6 100644 --- a/packages/adapters/src/process-instance-adapter.ts +++ b/packages/adapters/src/process-instance-adapter.ts @@ -138,7 +138,7 @@ class ProcessInstanceAdapter implements return this.waitUntilExit(config, instanceId, sequenceInfo); } - async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise { + async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise { if (config.type !== "process") { throw new Error("Process instance adapter run with invalid runner config"); } @@ -182,6 +182,8 @@ class ProcessInstanceAdapter implements this.runnerProcess = runnerProcess; this.logger.trace("Runner process is running", runnerProcess.pid); + + return 0; } getRunnerInfo(): RunnerConnectInfo["system"] { diff --git a/packages/host/src/lib/csi-controller.ts b/packages/host/src/lib/csi-controller.ts index ec5a9dda0..50408d7bd 100644 --- a/packages/host/src/lib/csi-controller.ts +++ b/packages/host/src/lib/csi-controller.ts @@ -406,14 +406,14 @@ export class CSIController extends TypedEmitter { this.communicationHandler.addMonitoringHandler(RunnerMessageCode.PING, async (message) => { const { status, payload } = message[1]; + this.status = status || InstanceStatus.RUNNING; + if (!payload) { this.emit("error", "No payload in ping!"); return null; } - this.status = status || InstanceStatus.RUNNING; - this.args = payload.args; this.info.created = new Date(message[1].created); @@ -517,6 +517,7 @@ export class CSIController extends TypedEmitter { this.logger.info("Handshake", JSON.stringify(message, undefined)); } + //@TODO: ! unhookup ! set proper state for reconnecting ! async handleInstanceConnect(streams: DownstreamStreamsConfig) { try { this.hookupStreams(streams); diff --git a/packages/host/src/lib/csi-dispatcher.ts b/packages/host/src/lib/csi-dispatcher.ts index d0f8691d4..0b12db7b2 100644 --- a/packages/host/src/lib/csi-dispatcher.ts +++ b/packages/host/src/lib/csi-dispatcher.ts @@ -234,7 +234,7 @@ export class CSIDispatcher extends TypedEmitter { this.logger.debug("Dispatching..."); - await instanceAdapter.dispatch( + const dispatchResultCode = await instanceAdapter.dispatch( instanceConfig, this.STHConfig.host.instancesServerPort, id, @@ -242,8 +242,12 @@ export class CSIDispatcher extends TypedEmitter { payload ); - this.logger.debug("Dispatched."); - this.logger.debug("Waiting for connection..."); + if (dispatchResultCode !== 0) { + this.logger.warn("Dispatch result code:", dispatchResultCode); + throw await mapRunnerExitCode(dispatchResultCode, sequence); + } + + this.logger.debug("Dispatched. Waiting for connection...", id); return await Promise.race([ new Promise((resolve, _reject) => { @@ -266,7 +270,7 @@ export class CSIDispatcher extends TypedEmitter { limits, sequence })), - // handle failed start + // handle fast fail - before connection is established. Promise.resolve() .then(() => instanceAdapter.waitUntilExit(undefined, id, sequence)) .then(async (exitCode) => { diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index e469dcc53..6a167997e 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -1136,7 +1136,7 @@ export class Host implements IComponent { this.logger.debug("Instance connecting", id); if (!this.instancesStore[id]) { - this.logger.info("creating new CSIController unknown istance"); + this.logger.info("Creating new CSIController for unknown Instance"); await this.csiDispatcher.createCSIController( id, diff --git a/packages/host/src/lib/serviceDiscovery/sd-adapter.ts b/packages/host/src/lib/serviceDiscovery/sd-adapter.ts index ddb761c69..43c54e833 100644 --- a/packages/host/src/lib/serviceDiscovery/sd-adapter.ts +++ b/packages/host/src/lib/serviceDiscovery/sd-adapter.ts @@ -169,6 +169,7 @@ export class ServiceDiscovery { const topic = this.createTopicIfNotExist(topicData); topic.acceptPipe(source); + await this.cpmConnector?.sendTopicInfo({ provides: topicData.topic.toString(), topicName: topicData.topic.toString(), diff --git a/packages/types/src/lifecycle-adapters.ts b/packages/types/src/lifecycle-adapters.ts index 5397f1ad6..5d5d17ac4 100644 --- a/packages/types/src/lifecycle-adapters.ts +++ b/packages/types/src/lifecycle-adapters.ts @@ -46,7 +46,7 @@ export interface ILifeCycleAdapterRun extends ILifeCycleAdapterMain { * @param {InstanceConfig} Runner configuration. * @returns {ExitCode} Runner exit code. */ - dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise; + dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise; /** * Starts Runner - in essence does `dispatch` and then `waitUntilExit`.