Skip to content

Commit

Permalink
Dispatching error code handling
Browse files Browse the repository at this point in the history
  • Loading branch information
patuwwy committed Jan 24, 2024
1 parent 10be6f2 commit 0afc685
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 13 deletions.
4 changes: 3 additions & 1 deletion packages/adapters/src/docker-instance-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ IComponent {
}

// eslint-disable-next-line complexity
async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<void> {
async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<ExitCode> {
if (!(config.type === "docker" && "container" in config)) {
throw new Error("Docker instance adapter run with invalid runner config");
}
Expand Down Expand Up @@ -241,6 +241,8 @@ IComponent {
this.resources.containerId = containerId; // doesnt matter

this.logger.trace("Container is running", containerId);

return 0;
}

async waitUntilExit(config: InstanceConfig, instanceId:string, _sequenceInfo: SequenceInfo): Promise<number> {
Expand Down
8 changes: 5 additions & 3 deletions packages/adapters/src/kubernetes-instance-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ IComponent {
}
};
}
async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<void> {
async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<number> {
if (config.type !== "kubernetes") {
throw new Error(`Invalid config type for kubernetes adapter: ${config.type}`);
}

if (this.adapterConfig.quotaName && await this.kubeClient.isPodsLimitReached(this.adapterConfig.quotaName)) {
throw Error(RunnerExitCode.PODS_LIMIT_REACHED.toString());
return RunnerExitCode.PODS_LIMIT_REACHED;
}

this.limits = config.limits;
Expand Down Expand Up @@ -163,7 +163,7 @@ IComponent {
// This means runner pod was unable to start. So it went from "Pending" to "Failed" state directly.
// Return 1 which is Linux exit code for "General Error" since we are not able
// to determine what happened exactly.
return;
return RunnerExitCode.UNCAUGHT_EXCEPTION;
}

this.logger.debug("Copy sequence files to Runner");
Expand All @@ -176,6 +176,8 @@ IComponent {
await this.kubeClient.exec(runnerName, runnerName, ["unpack.sh", "/package"], process.stdout, this.stdErrorStream, compressedStream, 2);

this.logger.debug("Copy command done");

return 0;
}

async waitUntilExit(_config: InstanceConfig, instanceId: string, _sequenceInfo: SequenceInfo): Promise<ExitCode> {
Expand Down
4 changes: 3 additions & 1 deletion packages/adapters/src/process-instance-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class ProcessInstanceAdapter implements
return this.waitUntilExit(config, instanceId, sequenceInfo);
}

async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<void> {
async dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<ExitCode> {
if (config.type !== "process") {
throw new Error("Process instance adapter run with invalid runner config");
}
Expand Down Expand Up @@ -182,6 +182,8 @@ class ProcessInstanceAdapter implements
this.runnerProcess = runnerProcess;

this.logger.trace("Runner process is running", runnerProcess.pid);

return 0;
}

getRunnerInfo(): RunnerConnectInfo["system"] {
Expand Down
5 changes: 3 additions & 2 deletions packages/host/src/lib/csi-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,14 +406,14 @@ export class CSIController extends TypedEmitter<Events> {
this.communicationHandler.addMonitoringHandler(RunnerMessageCode.PING, async (message) => {
const { status, payload } = message[1];

this.status = status || InstanceStatus.RUNNING;

if (!payload) {
this.emit("error", "No payload in ping!");

return null;
}

this.status = status || InstanceStatus.RUNNING;

this.args = payload.args;
this.info.created = new Date(message[1].created);

Expand Down Expand Up @@ -517,6 +517,7 @@ export class CSIController extends TypedEmitter<Events> {
this.logger.info("Handshake", JSON.stringify(message, undefined));
}

//@TODO: ! unhookup ! set proper state for reconnecting !
async handleInstanceConnect(streams: DownstreamStreamsConfig) {
try {
this.hookupStreams(streams);
Expand Down
12 changes: 8 additions & 4 deletions packages/host/src/lib/csi-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,16 +234,20 @@ export class CSIDispatcher extends TypedEmitter<Events> {

this.logger.debug("Dispatching...");

await instanceAdapter.dispatch(
const dispatchResultCode = await instanceAdapter.dispatch(
instanceConfig,
this.STHConfig.host.instancesServerPort,
id,
sequence,
payload
);

this.logger.debug("Dispatched.");
this.logger.debug("Waiting for connection...");
if (dispatchResultCode !== 0) {
this.logger.warn("Dispatch result code:", dispatchResultCode);
throw await mapRunnerExitCode(dispatchResultCode, sequence);
}

this.logger.debug("Dispatched. Waiting for connection...", id);

return await Promise.race([
new Promise<void>((resolve, _reject) => {
Expand All @@ -266,7 +270,7 @@ export class CSIDispatcher extends TypedEmitter<Events> {
limits,
sequence
})),
// handle failed start
// handle fast fail - before connection is established.
Promise.resolve()
.then(() => instanceAdapter.waitUntilExit(undefined, id, sequence))
.then(async (exitCode) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/host/src/lib/host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ export class Host implements IComponent {
this.logger.debug("Instance connecting", id);

if (!this.instancesStore[id]) {
this.logger.info("creating new CSIController unknown istance");
this.logger.info("Creating new CSIController for unknown Instance");

await this.csiDispatcher.createCSIController(
id,
Expand Down
1 change: 1 addition & 0 deletions packages/host/src/lib/serviceDiscovery/sd-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ export class ServiceDiscovery {
const topic = this.createTopicIfNotExist(topicData);

topic.acceptPipe(source);

await this.cpmConnector?.sendTopicInfo({
provides: topicData.topic.toString(),
topicName: topicData.topic.toString(),
Expand Down
2 changes: 1 addition & 1 deletion packages/types/src/lifecycle-adapters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export interface ILifeCycleAdapterRun extends ILifeCycleAdapterMain {
* @param {InstanceConfig} Runner configuration.
* @returns {ExitCode} Runner exit code.
*/
dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<void>;
dispatch(config: InstanceConfig, instancesServerPort: number, instanceId: string, sequenceInfo: SequenceInfo, payload: RunnerConnectInfo): Promise<number>;

/**
* Starts Runner - in essence does `dispatch` and then `waitUntilExit`.
Expand Down

0 comments on commit 0afc685

Please sign in to comment.