Skip to content

Commit

Permalink
Get HTTP platform connection status
Browse files Browse the repository at this point in the history
  • Loading branch information
patuwwy committed Mar 27, 2023
1 parent 2f23dde commit 82f13c6
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 17 deletions.
37 changes: 23 additions & 14 deletions packages/host/src/lib/cpm-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { TypedEmitter, generateSTHKey, normalizeUrl } from "@scramjet/utility";
import { ObjLogger } from "@scramjet/obj-logger";
import { ReasonPhrases } from "http-status-codes";
import { DuplexStream } from "@scramjet/api-server";
import { VerserClientConnection } from "@scramjet/verser/src/types";

type STHInformation = {
id?: string;
Expand Down Expand Up @@ -223,7 +224,7 @@ export class CPMConnector extends TypedEmitter<Events> {
* @returns {string} Host id.
*/
getId(): string | undefined {
return this.info.id;
return this.config.id;
}

/**
Expand All @@ -249,6 +250,8 @@ export class CPMConnector extends TypedEmitter<Events> {
};
}

await this.setLoadCheckMessageSender();

StringStream.from(duplex.input as Readable)
.JSONParse()
.map(async (message: EncodedControlMessage) => {
Expand Down Expand Up @@ -292,9 +295,10 @@ export class CPMConnector extends TypedEmitter<Events> {
[CPMMessageCode.NETWORK_INFO, await this.getNetworkInfo()]
);

this.emit("connect");

await this.setLoadCheckMessageSender();


this.emit("connect");

return new Promise((resolve, reject) => {
duplex.on("end", () => {
Expand Down Expand Up @@ -332,11 +336,17 @@ export class CPMConnector extends TypedEmitter<Events> {
this.verserClient.updateHeaders({ "x-sth-id": this.info.id });
}

let connection;
let connection: VerserClientConnection;

try {
this.logger.trace("Connecting to Manager", this.cpmUrl, this.cpmId);
connection = await this.verserClient.connect();

connection.socket
.once("close", async () => {
this.logger.warn("CLOSE STATUS", connection.res.statusCode)
await this.handleConnectionClose(connection.res.statusCode || -1);
});
} catch (error: any) {
this.logger.error("Can not connect to Manager", this.cpmUrl, this.cpmId, error.message);

Expand All @@ -345,12 +355,7 @@ export class CPMConnector extends TypedEmitter<Events> {
return;
}

this.logger.info("Connected to Manager");

connection.socket
.once("close", async () => {
await this.handleConnectionClose();
});
this.logger.info("Connected...");

/**
* @TODO: Distinguish existing `connect` request and started communication (Manager handled this host
Expand All @@ -361,7 +366,7 @@ export class CPMConnector extends TypedEmitter<Events> {
this.connected = true;
this.connectionAttempts = 0;

connection.req.once("error", async (error: any) => {
connection.res.once("error", async (error: any) => {
this.logger.error("Request error", error);

try {
Expand All @@ -386,7 +391,7 @@ export class CPMConnector extends TypedEmitter<Events> {
* Handles connection close.
* Tries to reconnect.
*/
async handleConnectionClose() {
async handleConnectionClose(connectionStatusCode: number) {
this.handleCommunicationRequestEnd();

this.connection?.removeAllListeners();
Expand All @@ -400,6 +405,10 @@ export class CPMConnector extends TypedEmitter<Events> {
clearInterval(this.loadInterval);
}

if (connectionStatusCode === 403) {
this.isAbandoned = true;
}

await this.reconnect();
}

Expand All @@ -426,9 +435,9 @@ export class CPMConnector extends TypedEmitter<Events> {
this.isReconnecting = true;

await new Promise<void>((resolve, reject) => {
setTimeout(async () => {
this.logger.info("Connection lost, retrying", this.connectionAttempts);
this.logger.info("Connection lost, retrying", this.connectionAttempts);

setTimeout(async () => {
await this.connect().then(resolve, reject);
}, this.config.reconnectionDelay);
});
Expand Down
6 changes: 4 additions & 2 deletions packages/verser/src/lib/verser-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const BPMux = require("bpmux").BPMux;

type Events = {
error: (err: Error) => void;
close: (reason: string) => void;
};

/**
Expand Down Expand Up @@ -112,11 +113,12 @@ export class VerserClient extends TypedEmitter<Events> {
reject(err);
});

connectRequest.on("connect", (req, socket) => {
connectRequest.once("connect", (res, socket, head) => {
this.logger.info("HEAD", head.toString());
this.socket = socket;
this.mux();

resolve({ req, socket });
resolve({ res, socket });
});

connectRequest.flushHeaders();
Expand Down
2 changes: 1 addition & 1 deletion packages/verser/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export type VerserClientConnection = {
/**
* Connection request object.
*/
req: IncomingMessage;
res: IncomingMessage;
};

export type VerserRequestResult = { incomingMessage: IncomingMessage; clientRequest: ClientRequest }
Expand Down

0 comments on commit 82f13c6

Please sign in to comment.