diff --git a/packages/host/src/lib/cpm-connector.ts b/packages/host/src/lib/cpm-connector.ts index 6035ab032..ca041a69b 100644 --- a/packages/host/src/lib/cpm-connector.ts +++ b/packages/host/src/lib/cpm-connector.ts @@ -23,6 +23,7 @@ import { TypedEmitter, generateSTHKey, normalizeUrl } from "@scramjet/utility"; import { ObjLogger } from "@scramjet/obj-logger"; import { ReasonPhrases } from "http-status-codes"; import { DuplexStream } from "@scramjet/api-server"; +import { VerserClientConnection } from "@scramjet/verser/src/types"; type STHInformation = { id?: string; @@ -223,7 +224,7 @@ export class CPMConnector extends TypedEmitter { * @returns {string} Host id. */ getId(): string | undefined { - return this.info.id; + return this.config.id; } /** @@ -249,6 +250,8 @@ export class CPMConnector extends TypedEmitter { }; } + await this.setLoadCheckMessageSender(); + StringStream.from(duplex.input as Readable) .JSONParse() .map(async (message: EncodedControlMessage) => { @@ -292,9 +295,10 @@ export class CPMConnector extends TypedEmitter { [CPMMessageCode.NETWORK_INFO, await this.getNetworkInfo()] ); - this.emit("connect"); - await this.setLoadCheckMessageSender(); + + + this.emit("connect"); return new Promise((resolve, reject) => { duplex.on("end", () => { @@ -332,11 +336,17 @@ export class CPMConnector extends TypedEmitter { this.verserClient.updateHeaders({ "x-sth-id": this.info.id }); } - let connection; + let connection: VerserClientConnection; try { this.logger.trace("Connecting to Manager", this.cpmUrl, this.cpmId); connection = await this.verserClient.connect(); + + connection.socket + .once("close", async () => { + this.logger.warn("CLOSE STATUS", connection.res.statusCode) + await this.handleConnectionClose(connection.res.statusCode || -1); + }); } catch (error: any) { this.logger.error("Can not connect to Manager", this.cpmUrl, this.cpmId, error.message); @@ -345,12 +355,7 @@ export class CPMConnector extends TypedEmitter { return; } - this.logger.info("Connected to Manager"); - - connection.socket - .once("close", async () => { - await this.handleConnectionClose(); - }); + this.logger.info("Connected..."); /** * @TODO: Distinguish existing `connect` request and started communication (Manager handled this host @@ -361,7 +366,7 @@ export class CPMConnector extends TypedEmitter { this.connected = true; this.connectionAttempts = 0; - connection.req.once("error", async (error: any) => { + connection.res.once("error", async (error: any) => { this.logger.error("Request error", error); try { @@ -386,7 +391,7 @@ export class CPMConnector extends TypedEmitter { * Handles connection close. * Tries to reconnect. */ - async handleConnectionClose() { + async handleConnectionClose(connectionStatusCode: number) { this.handleCommunicationRequestEnd(); this.connection?.removeAllListeners(); @@ -400,6 +405,10 @@ export class CPMConnector extends TypedEmitter { clearInterval(this.loadInterval); } + if (connectionStatusCode === 403) { + this.isAbandoned = true; + } + await this.reconnect(); } @@ -426,9 +435,9 @@ export class CPMConnector extends TypedEmitter { this.isReconnecting = true; await new Promise((resolve, reject) => { - setTimeout(async () => { - this.logger.info("Connection lost, retrying", this.connectionAttempts); + this.logger.info("Connection lost, retrying", this.connectionAttempts); + setTimeout(async () => { await this.connect().then(resolve, reject); }, this.config.reconnectionDelay); }); diff --git a/packages/verser/src/lib/verser-client.ts b/packages/verser/src/lib/verser-client.ts index e4d37b1ba..568007edd 100644 --- a/packages/verser/src/lib/verser-client.ts +++ b/packages/verser/src/lib/verser-client.ts @@ -13,6 +13,7 @@ const BPMux = require("bpmux").BPMux; type Events = { error: (err: Error) => void; + close: (reason: string) => void; }; /** @@ -112,11 +113,12 @@ export class VerserClient extends TypedEmitter { reject(err); }); - connectRequest.on("connect", (req, socket) => { + connectRequest.once("connect", (res, socket, head) => { + this.logger.info("HEAD", head.toString()); this.socket = socket; this.mux(); - resolve({ req, socket }); + resolve({ res, socket }); }); connectRequest.flushHeaders(); diff --git a/packages/verser/src/types/index.ts b/packages/verser/src/types/index.ts index 21e6496b4..d6d8c195f 100644 --- a/packages/verser/src/types/index.ts +++ b/packages/verser/src/types/index.ts @@ -40,7 +40,7 @@ export type VerserClientConnection = { /** * Connection request object. */ - req: IncomingMessage; + res: IncomingMessage; }; export type VerserRequestResult = { incomingMessage: IncomingMessage; clientRequest: ClientRequest }