Skip to content

Commit

Permalink
Fix input & not-serialized output on reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
patuwwy committed Jan 26, 2024
1 parent d840bcf commit 58638d1
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 53 deletions.
4 changes: 3 additions & 1 deletion packages/adapters/src/process-instance-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ class ProcessInstanceAdapter implements
process.kill(this.processPID, 0);
} catch (e) {
this.logger.error("Runner process not exists", e);
/** process not exists */

clearInterval(interval);

reject("pid not exists");
}
}
Expand Down
18 changes: 11 additions & 7 deletions packages/host/src/lib/csi-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ export class CSIController extends TypedEmitter<Events> {
apiInputEnabled = true;

executionTime: number = -1;
inputHeadersSent = false;

/**
* Topic to which the output stream should be routed
Expand Down Expand Up @@ -403,9 +404,10 @@ export class CSIController extends TypedEmitter<Events> {
.pipe(this.upStreams[CC.CONTROL]);

this.communicationHandler.addMonitoringHandler(RunnerMessageCode.PING, async (message) => {
const { status, payload } = message[1];
const { status, payload, inputHeadersSent } = message[1];

this.status = status || InstanceStatus.RUNNING;
this.inputHeadersSent = inputHeadersSent;

if (!payload) {
this.emit("error", "No payload in ping!");
Expand Down Expand Up @@ -485,6 +487,10 @@ export class CSIController extends TypedEmitter<Events> {
this.logger.trace("Received a PING message with ports config");
}

this.inputHeadersSent = !!message[1].inputHeadersSent;

this.logger.info("Headers already sent for input?", this.inputHeadersSent);

if (this.instanceAdapter.setRunner) {
await this.instanceAdapter.setRunner({
...message[1].payload.system,
Expand Down Expand Up @@ -537,8 +543,6 @@ export class CSIController extends TypedEmitter<Events> {
}

createInstanceAPIRouter() {
let inputHeadersSent = false;

if (!this.upStreams) {
throw new AppError("UNATTACHED_STREAMS");
}
Expand All @@ -551,11 +555,11 @@ export class CSIController extends TypedEmitter<Events> {
* @experimental
*/
this.router.duplex("/inout", (duplex, _headers) => {
if (!inputHeadersSent) {
if (!this.inputHeadersSent) {
this.downStreams![CC.IN].write(`Content-Type: ${_headers["content-type"]}\r\n`);
this.downStreams![CC.IN].write("\r\n");

inputHeadersSent = true;
this.inputHeadersSent = true;
}

(duplex as unknown as DuplexStream).input.pipe(this.downStreams![CC.IN], { end: false });
Expand Down Expand Up @@ -597,15 +601,15 @@ export class CSIController extends TypedEmitter<Events> {
const contentType = req.headers["content-type"];

// @TODO: Check if subsequent requests have the same content-type.
if (!inputHeadersSent) {
if (!this.inputHeadersSent) {
if (contentType === undefined) {
return { opStatus: ReasonPhrases.NOT_ACCEPTABLE, error: "Content-Type must be defined" };
}

stream.write(`Content-Type: ${contentType}\r\n`);
stream.write("\r\n");

inputHeadersSent = true;
this.inputHeadersSent = true;
}

return stream;
Expand Down
10 changes: 6 additions & 4 deletions packages/host/src/lib/csi-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,14 @@ export class CSIDispatcher extends TypedEmitter<Events> {
id,
sequenceInfo,
payload,
status: InstanceStatus.INITIALIZING
status: InstanceStatus.INITIALIZING,
inputHeadersSent: false
}, communicationHandler, config, instanceProxy, this.STHConfig.runtimeAdapter);

this.logger.trace("CSIController created", id, sequenceInfo);

csiController.logger.pipe(this.logger, { end: false });

communicationHandler.logger.pipe(this.logger, { end: false });

csiController
Expand Down Expand Up @@ -99,15 +101,15 @@ export class CSIDispatcher extends TypedEmitter<Events> {
this.logger.warn("Missing topic content-type");
}

if (data.requires && !csiController.inputRouted && data.contentType) {
this.logger.trace("Routing topic to Sequence input", data.requires);
if (data.requires && data.contentType) {
this.logger.trace("Routing topic to Instance input", data.requires);

await this.serviceDiscovery.routeTopicToStream(
{ topic: new TopicId(data.requires), contentType: data.contentType as ContentType },
csiController.getInputStream()
);

csiController.inputRouted = true;
csiController.inputHeadersSent = true;

await this.serviceDiscovery.update({
requires: data.requires, contentType: data.contentType, topicName: data.requires, status: "add"
Expand Down
2 changes: 1 addition & 1 deletion packages/host/src/lib/host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ export class Host implements IComponent {

const seq = this.sequenceStore.getById(instance.sequence.id);

if (!seq) {
if (!seq && this.cpmConnector?.connected) {
this.logger.info("Sequence not found. Checking Store...");

try {
Expand Down
3 changes: 1 addition & 2 deletions packages/host/src/lib/serviceDiscovery/sd-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,8 @@ export class ServiceDiscovery {
}

async update(data: STHTopicEventData) {
this.logger.trace("Topic update. Send topic info to CPM", data);

if (this.cpmConnector?.connected) {
this.logger.trace("Topic update. Send topic info to CPM", data);
await this.cpmConnector?.sendTopicInfo(data);
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/host/src/lib/socket-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export class SocketServer extends TypedEmitter<Events> implements IComponent {
this.server!
.listen(this.port, this.hostname, () => {
this.logger.info("SocketServer on", this.server?.address());

res();
})
.on("error", rej);
Expand Down
37 changes: 34 additions & 3 deletions packages/runner/src/host-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import { ObjLogger } from "@scramjet/obj-logger";
import { CommunicationChannel as CC } from "@scramjet/symbols";
import { IHostClient, IObjectLogger, UpstreamStreamsConfig, } from "@scramjet/types";
import { defer } from "@scramjet/utility";
import { Agent } from "http";
import net, { Socket, createConnection } from "net";
import { PassThrough } from "stream";

type HostOpenConnections = [
net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket
Expand Down Expand Up @@ -42,20 +44,24 @@ class HostClient implements IHostClient {
async init(id: string): Promise<void> {
const openConnections = await Promise.all(
Array.from(Array(9))
.map(() => {
.map((_e: any, i: number) => {
// Error handling for each connection is process crash for now
let connection: Socket;

try {
connection = net.createConnection(this.instancesServerPort, this.instancesServerHost);
connection.on("error", () => {});
connection.on("error", () => {
this.logger.warn(`${i} Stream error`);
});
connection.setNoDelay(true);
} catch (e) {
return Promise.reject(e);
}

return new Promise<net.Socket>(res => {
connection.on("connect", () => res(connection));
connection.on("connect", () => {
res(connection);
});
});
})
.map((connPromised, index) => {
Expand All @@ -74,6 +80,26 @@ class HostClient implements IHostClient {

this._streams = openConnections as HostOpenConnections;

const input = this._streams[CC.IN];

const inputTarget = new PassThrough({ emitClose: false });

input.on("end", async () => {
await defer(500);

if ((this._streams![CC.CONTROL] as net.Socket).readableEnded) {
this.logger.info("Input end. Control is also ended... We are disconnected.");
} else {
this.logger.info("Input end. Control not ended. We are online. Desired input end.");
inputTarget.end();
}
});

input.pipe(inputTarget, { end: false });

this._streams[CC.IN] = inputTarget;
//this._streams[CC.STDIN] = this._streams[CC.STDIN].pipe(new PassThrough({ emitClose: false }), { end: false });

try {
this.bpmux = new BPMux(this._streams[CC.PACKAGE]);
} catch (e) {
Expand Down Expand Up @@ -118,6 +144,11 @@ class HostClient implements IHostClient {
const streamsExitedPromised: Promise<void>[] = this.streams.map((stream, i) =>
new Promise(
(res) => {
if ([CC.IN, CC.STDIN, CC.CONTROL].includes(i)) {
res();
return;
}

if (!hard && "writable" in stream!) {
stream
.on("error", (e) => {
Expand Down
Loading

0 comments on commit 58638d1

Please sign in to comment.