Skip to content

Commit

Permalink
Fail. runner not exits
Browse files Browse the repository at this point in the history
  • Loading branch information
patuwwy committed Sep 19, 2023
1 parent a3e50ec commit e233b91
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 57 deletions.
6 changes: 6 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
"outFiles": [
"${workspaceFolder}/**/*.js"
]
},
{
"name": "Attach to Process",
"type": "node",
"request": "attach",
"processId": "${command:PickProcess}"
}
]
}
5 changes: 3 additions & 2 deletions packages/adapters/src/process-instance-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class ProcessInstanceAdapter implements
...debugFlags,
path.resolve(__dirname,
process.env.ESBUILD

Check warning on line 107 in packages/adapters/src/process-instance-adapter.ts

View workflow job for this annotation

GitHub Actions / analyze-code / Analyze the source code

Do not nest ternary expressions

Check warning on line 107 in packages/adapters/src/process-instance-adapter.ts

View workflow job for this annotation

GitHub Actions / analyze-code / Analyze the source code

Do not nest ternary expressions
? "../../runner/bin/start-runner.js"
? !isTSNode ? "../../runner/bin/start-runner.js" : "../../runner/src/bin/start-runner.ts"
: require.resolve("@scramjet/runner")
)
];
Expand Down Expand Up @@ -172,7 +172,8 @@ class ProcessInstanceAdapter implements
this.crashLogStreams = Promise.all([runnerProcess.stdout, runnerProcess.stderr].map(streamToString));

this.logger.trace("Runner process is running", runnerProcess.pid);

this.runnerProcess?.stderr?.pipe(process.stdout);
this.runnerProcess?.stdout?.pipe(process.stdout);
this.runnerProcess = runnerProcess;
}

Expand Down
4 changes: 2 additions & 2 deletions packages/host/src/lib/csi-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ export class CSIController extends TypedEmitter<Events> {

async main() {
this.status = InstanceStatus.RUNNING;
this.logger.trace("Instance started");
this.logger.trace("Instance status: RUNNING");

let code = -1;

Expand Down Expand Up @@ -545,7 +545,7 @@ export class CSIController extends TypedEmitter<Events> {
}

this.info.started = new Date();
this.logger.info("Instance started", JSON.stringify(message, undefined));
this.logger.info("Instance started", JSON.stringify(message));
}

async handleInstanceConnect(streams: DownstreamStreamsConfig) {
Expand Down
7 changes: 6 additions & 1 deletion packages/runner/src/host-client.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-console */
/* eslint-disable dot-notation */
import { ObjLogger } from "@scramjet/obj-logger";
import { CommunicationChannel as CC } from "@scramjet/symbols";
Expand Down Expand Up @@ -48,7 +49,7 @@ class HostClient implements IHostClient {

try {
connection = net.createConnection(this.instancesServerPort, this.instancesServerHost);
connection.on("error", () => {});
connection.on("error", (error) => { this.logger.error("Connection error", error); });
connection.setNoDelay(true);
} catch (e) {
return Promise.reject(e);
Expand Down Expand Up @@ -118,17 +119,21 @@ class HostClient implements IHostClient {
const streamsExitedPromised: Promise<void>[] = this.streams.map((stream, i) =>
new Promise(
(res) => {
console.log("Disconnecting", i);
if ("writable" in stream!) {
stream
.on("error", (e) => {
console.error("Error on stream", i, e.stack);
res();
})
.on("close", () => {
console.log("Closed", i);
res();
})
.end();
} else {
stream!.destroy();
console.log("destroyed", i);
res();
}
}
Expand Down
23 changes: 17 additions & 6 deletions packages/runner/src/runner-app-context.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@

import { ObjLogger } from "@scramjet/obj-logger";
import {
EventMessageData, KeepAliveMessageData, MonitoringMessageFromRunnerData,
AppConfig, AppError, AppErrorConstructor, AppContext, WritableStream,
FunctionDefinition, KillHandler, StopHandler, MonitoringHandler, IObjectLogger, HostClient, ManagerClient
AppConfig,
AppContext,
AppError, AppErrorConstructor,
EventMessageData,
FunctionDefinition,
HostClient,
IObjectLogger,
KeepAliveMessageData,
KillHandler,
ManagerClient,
MonitoringHandler,
MonitoringMessageFromRunnerData,
StopHandler,
WritableStream
} from "@scramjet/types";
import { EventEmitter } from "events";

Expand Down Expand Up @@ -36,14 +47,14 @@ implements AppContext<AppConfigType, State> {
space: ManagerClient;
instanceId: string;

constructor(config: AppConfigType, monitorStream: WritableStream<any>,
emitter: EventEmitter, runner: RunnerProxy, hostClient: HostClient, spaceClient: ManagerClient, id: string) {
constructor(id: string, config: AppConfigType, monitorStream: WritableStream<any>,
emitter: EventEmitter, runner: RunnerProxy, hostClient: HostClient, spaceClient?: ManagerClient) {
this.config = config;
this.monitorStream = monitorStream;
this.emitter = emitter;
this.runner = runner;
this.hub = hostClient;
this.space = spaceClient;
this.space = spaceClient!;
this.instanceId = id;
}

Expand Down
81 changes: 35 additions & 46 deletions packages/runner/src/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,12 @@ import { Readable, Writable } from "stream";

import { HostClient as HostApiClient } from "@scramjet/api-client";
import { ClientUtilsCustomAgent } from "@scramjet/client-utils";
import { ManagerClient } from "@scramjet/manager-api-client";
import { RunnerConnectInfo } from "@scramjet/types/src/runner-connect";
import { writeFileSync } from "fs";
import { mapToInputDataStream, readInputStreamHeaders } from "./input-stream";
import { MessageUtils } from "./message-utils";
import { RunnerAppContext, RunnerProxy } from "./runner-app-context";

process.once("beforeExit", (code) => {
const filepath = `/tmp/runner-${process.pid.toString()}`;

writeFileSync(filepath, code.toString());

// eslint-disable-next-line no-console
console.log("Runner exit");
});

// async function flushStream(source: Readable | undefined, target: Writable) {
// if (!source) return;

Expand Down Expand Up @@ -74,26 +64,10 @@ export function isSynchronousStreamable(obj: SynchronousStreamable<any> | Primit

const overrideMap: Map<Writable, OverrideConfig> = new Map();

function revertStandardStream(oldStream: Writable) {
if (overrideMap.has(oldStream)) {
const { write, drainCb, errorCb } = overrideMap.get(oldStream) as OverrideConfig;

// @ts-ignore - this is ok, we're doing this on purpose!
delete oldStream.write;

// if prototypic write is there, then no change needed
if (oldStream.write !== write)
oldStream.write = write;

oldStream.off("drain", drainCb);
oldStream.off("error", errorCb);
overrideMap.delete(oldStream);
}
}

function overrideStandardStream(oldStream: Writable, newStream: Writable) {
if (overrideMap.has(oldStream)) {
//throw new Error("Attempt to override stream more than once");
// eslint-disable-next-line no-use-before-define
revertStandardStream(oldStream);
}

Expand All @@ -115,6 +89,23 @@ function overrideStandardStream(oldStream: Writable, newStream: Writable) {
overrideMap.set(oldStream, { write, drainCb, errorCb });
}

function revertStandardStream(oldStream: Writable) {
if (overrideMap.has(oldStream)) {
const { write, drainCb, errorCb } = overrideMap.get(oldStream) as OverrideConfig;

// @ts-ignore - this is ok, we're doing this on purpose!
delete oldStream.write;

// if prototypic write is there, then no change needed
if (oldStream.write !== write)
oldStream.write = write;

oldStream.off("drain", drainCb);
oldStream.off("error", errorCb);
overrideMap.delete(oldStream);
}
}

/**
* Runtime environment for sequence code.
* Communicates with Host with data transferred to/from Sequence, health info,
Expand Down Expand Up @@ -165,11 +156,17 @@ export class Runner<X extends AppConfig> implements IComponent {
throw e;
});

this.outputDataStream = new DataStream({ highWaterMark: 0 }).catch((e: any) => {
this.outputDataStream = new DataStream().catch((e: any) => {
this.logger.error("Error during input data stream", e);

throw e;
});

process.on("beforeExit", (code) => {
const filepath = `/tmp/runner-${process.pid.toString()}`;

writeFileSync(filepath, code.toString());
});
}

get context(): RunnerAppContext<X, any> {
Expand Down Expand Up @@ -261,17 +258,17 @@ export class Runner<X extends AppConfig> implements IComponent {
[RunnerMessageCode.MONITORING, { healthy }], this.hostClient.monitorStream
);

this.monitoringMessageReplyTimeout = setTimeout(async () => {
await this.handleDisconnect();
this.monitoringMessageReplyTimeout = setTimeout(() => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.handleDisconnect();
}, 500);
}

async handleDisconnect() {
this.logger.info("Reinitializing....");

this.premain().catch((e) => {
this.logger.error("Premain error", e);
});
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.premain();
}

async handleKillRequest(): Promise<void> {
Expand Down Expand Up @@ -330,13 +327,9 @@ export class Runner<X extends AppConfig> implements IComponent {
try {
await this.hostClient.init(this.instanceId);
} catch (e) {
this.logger.error("hostClient init error", e);

await defer(2000);

this.premain().catch((err: any) => {
this.logger.error("Premain error", err);
});
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.premain();
}

this.redirectOutputs();
Expand Down Expand Up @@ -480,9 +473,6 @@ export class Runner<X extends AppConfig> implements IComponent {
const hostClientUtils = new ClientUtilsCustomAgent("http://scramjet-host/api/v1", this.hostClient.getAgent());
const hostApiClient = new HostApiClient("http://scramjet-host/api/v1", hostClientUtils);

const managerClientUtils = new ClientUtilsCustomAgent("http://scramjet-host/api/v1/cpm/api/v1", this.hostClient.getAgent());
const managerApiClient = new ManagerClient("http://scramjet-host/api/v1/cpm/api/v1", managerClientUtils);

const runner: RunnerProxy = {
keepAliveIssued: () => this.keepAliveIssued(),
sendStop: (err?: Error) => {
Expand All @@ -493,13 +483,12 @@ export class Runner<X extends AppConfig> implements IComponent {
};

this._context = new RunnerAppContext(
this.instanceId,
config,
this.hostClient.monitorStream,
this.emitter,
runner,
hostApiClient as HostClient,
managerApiClient as ManagerClient,
this.instanceId
hostApiClient as HostClient
);
this._context.logger.pipe(this.logger);

Expand Down Expand Up @@ -612,7 +601,7 @@ export class Runner<X extends AppConfig> implements IComponent {
if (intermediate instanceof Readable) {
stream = intermediate;
} else if (intermediate !== undefined && isSynchronousStreamable(intermediate)) {
stream = Object.assign(DataStream.from(intermediate as Readable, { highWaterMark: 0 }), {
stream = Object.assign(DataStream.from(intermediate as Readable), {
topic: intermediate.topic,
contentType: intermediate.contentType
});
Expand Down

0 comments on commit e233b91

Please sign in to comment.