diff --git a/ecosystem.config.cjs b/ecosystem.config.cjs new file mode 100644 index 0000000..051cb08 --- /dev/null +++ b/ecosystem.config.cjs @@ -0,0 +1,26 @@ +const { config } = require('dotenv'); +config(); + +module.exports = + // ecosystem.js + { + apps: [ + { + name: 'Indexer', + script: 'dist/app.js', // name of the startup file + exec_mode: 'fork', // to turn on cluster mode; defaults to 'fork' mode + env: { + PORT: '10001', // the port on which the app should listen + }, + }, + { + name: 'HttpServer', + script: 'dist/HttpServer.js', + instances: 2, + exec_mode: 'cluster', + env: { + PORT: '10000', // the port on which the app should listen + }, + }, + ], + }; diff --git a/scripts/test.ts b/scripts/test.ts new file mode 100644 index 0000000..f14cfed --- /dev/null +++ b/scripts/test.ts @@ -0,0 +1,3 @@ +import { contiguousDataCacheTmpCleanupWorker } from '../src/system'; + +const batch = await contiguousDataCacheTmpCleanupWorker.getBatch('data'); diff --git a/src/HttpServer.ts b/src/HttpServer.ts index c13fc06..717c560 100644 --- a/src/HttpServer.ts +++ b/src/HttpServer.ts @@ -10,6 +10,36 @@ import { dataRouter } from './routes/data/index.js'; import { apolloServer } from './routes/graphql/index.js'; import { openApiRouter } from './routes/openapi.js'; import * as system from './system.js'; +import { GracefulShutdownController } from './utils/gracefulShutdown.js'; + +const logger = log.child({ name: 'HttpServer' }); +let isShuttingDown = false; +let manualShutdown = () => {}; + +const gracefulShutdown = async (): Promise => { + logger.warn('Shutting down...'); + Promise.all([manualShutdown()]) + .then((_) => { + process.exit(0); + }) + .catch((_) => process.exit(1)); +}; + +process.on('message', async (message) => { + if (message === 'shutdown') { + logger.verbose('received shutdown message'); + if (isShuttingDown) return; + isShuttingDown = true; + gracefulShutdown(); + } +}); + +process.on('SIGINT', () => { + logger.verbose('received signal sigint'); + if (isShuttingDown) return; + isShuttingDown = true; + gracefulShutdown(); +}); const app = express(); @@ -29,12 +59,47 @@ const apolloServerInstanceGql = apolloServer(system.db, { introspection: true, persistedQueries: false, }); -apolloServerInstanceGql.start().then(() => { +await apolloServerInstanceGql.start().then(() => { apolloServerInstanceGql.applyMiddleware({ app, path: '/graphql', }); - app.listen(config.PORT, () => { - log.info(`Listening on port ${config.PORT}`); - }); }); + +const server = app.listen(config.PORT); +logger.info(`Listening on port ${config.PORT}`); + +server.keepAliveTimeout = 61 * 1000; +server.headersTimeout = 62 * 1000; + +server.on('close', () => { + logger.debug(`closing...`); +}); + +// // eslint-disable-next-line @typescript-eslint/naming-convention +// async function onHTTPShutdown(): Promise { +// // insert cleanup operation(s) here +// const cleanup = async (): Promise => { +// await new Promise((r) => wss.close(r)); +// }; +// // await Promise.allSettled(wss.clients) +// await Promise.race([sleep(5_000), cleanup]); +// } + +const controller = new GracefulShutdownController({ + server, + preShutdown: async (): Promise => { + server.closeIdleConnections(); // TODO: test me!! + }, +}); +// overwrite manual shutdown noop method +manualShutdown = (): Promise => controller.shutdown(); + +if (process.send) { + process?.send?.('ready', undefined, undefined, (e) => + e ? logger.error(`Error sending ready message: ${e}`) : undefined, + ); // send ready pm2 message + logger.info(`Ready message sent`); +} else { + logger.warn(`Ready message NOT sent`); +} diff --git a/src/utils/gracefulShutdown.ts b/src/utils/gracefulShutdown.ts new file mode 100644 index 0000000..1bd9e96 --- /dev/null +++ b/src/utils/gracefulShutdown.ts @@ -0,0 +1,219 @@ +import type { ServerResponse } from 'http'; +import { type Server } from 'http'; +import { Logger } from 'winston'; + +import logger from '../log.js'; +import { sleep } from './utils.js'; + +export class GracefulShutdownController { + private timeout = 30_000; + private connections = new Map(); + private secureConnections = new Map(); + private server: Server; + private connectionCounter = 0; + private secureConnectionCounter = 0; + private shutdownPromise?: void | PromiseLike; + private preShutdown?: () => Promise; + protected log: Logger; + + constructor(opts: { server: Server; preShutdown?: () => Promise }) { + this.server = opts.server; + this.server.on('connection', this.connectionEventHandler.bind(this)); + this.server.on('request', this.requestEventHandler.bind(this)); + this.server.on('secureConnection', this.secureConnectionHandler.bind(this)); + this.log = logger.child({ class: this.constructor.name }); + } + + get isShuttingDown(): boolean { + return this.shutdownPromise !== undefined; + } + + requestEventHandler(req: any, res: ServerResponse): void { + req.socket._isIdle = false; + if (this.isShuttingDown) { + // this.lastConnection = performance.now(); + this.log.warn('Received request while shutting down'); + } + + if (this.isShuttingDown && !res.headersSent) { + res.setHeader('connection', 'close'); + } + + res.on('finish', function (this: ServerResponse) { + req.socket._isIdle = true; + this.destroy(req.socket); + }); + } + + connectionEventHandler(socket: any): void { + if (this.isShuttingDown) { + // this.log.warn(`REJECTED INCOMING CONNECTION`); + // socket.destroy(); + this.log.warn('Received request while shutting down'); + // this.lastConnection = performance.now(); + } + const id = this.connectionCounter++; + socket._isIdle = true; + socket._connectionId = id; + this.connections.set(id, socket); + + socket.once('close', () => { + this.connections.delete(socket._connectionId); + }); + } + + // destroy(socket: Socket & { _connectionId: string; _isIdle: boolean; server: any }, force = false): void { + // if ((socket._isIdle && this.isShuttingDown) || force) { + // this.log.info("DESTROY"); + // socket.destroy(); + // if (socket.server instanceof Server) { + // this.connections.delete(socket._connectionId); + // } else { + // this.secureConnections.delete(socket._connectionId); + // } + // } + // } + + secureConnectionHandler(socket: any): void { + // if (this.isShuttingDown) { + // this.log.warn(`REJECTED INCOMING CONNECTION`); + // socket.destroy(); + // } else { + const id = this.secureConnectionCounter++; + socket._isIdle = true; + socket._connectionId = id; + this.secureConnections.set(id, socket); + + socket.once('close', () => { + this.secureConnections.delete(socket._connectionId); + }); + // } + } + + // returns true if should force shut down. returns false for shut down without force + async waitForReadyToShutDown(totalNumInterval: number): Promise { + while (totalNumInterval-- > 0) { + this.log.debug(`waitForReadyToShutDown... ${totalNumInterval}`); + + if (totalNumInterval === 0) { + // timeout reached + this.log.warn( + `Could not close connections in time (${this.timeout}ms), will forcefully shut down`, + ); + return; + } + + // const symb = Object.getOwnPropertySymbols(this.server).find((v) => v.toString() === "Symbol(http.server.connections)"); + // const connectionsList = this.server[symb]; + + // const activeConnections = connectionsList.active(); + + // test all connections closed already? + const allConnectionsClosed = + this.connections.size === 0 && this.secureConnections.size === 0; + + if (allConnectionsClosed) { + this.log.debug('All connections closed. Continue to shutting down'); + // use this if issues persist. + + // if (cluster.isWorker) { + // const worker = cluster.worker; + // // console.log(worker); + // console.log("DISCONNECT"); + // worker.disconnect(); + // } + + // const timeSinceLastConn = performance.now() - this.lastConnection; + // // console.log("timeSinceLastConn", timeSinceLastConn); + + // while (performance.now() - this.lastConnection < 250) { + // await sleep(50); + // this.log.info(`BusyWait for no connections...`); + // } + // this.log.info("busywait done"); + + return; + } + + this.log.debug('Schedule the next waitForReadyToShutdown'); + await sleep(250); + } + } + + async destroyAllConnections(force = false): Promise { + // destroy empty and idle connections / all connections (if force = true) + this.log.debug( + 'Destroy Connections : ' + (force ? 'forced close' : 'close'), + ); + + const httpServerConnections = await new Promise((res, rej) => + this.server.getConnections((e, c) => { + if (e != undefined) rej(e); + if (c != undefined) res(c); + }), + ); + + this.log.debug( + `server has ${this.server.connections} (${httpServerConnections}) connections`, + ); + + for (const socket of this.connections.values()) { + const serverResponse = socket._httpMessage; + // send connection close header to open connections + if (serverResponse && !force) { + if (!serverResponse.headersSent) { + serverResponse.setHeader('connection', 'close'); + } + } + } + + this.log.debug('Connection Counter : ' + this.connectionCounter); + + for (const socket of this.secureConnections.values()) { + const serverResponse = socket._httpMessage; + // send connection close header to open connections + if (serverResponse && !force) { + if (!serverResponse.headersSent) { + serverResponse.setHeader('connection', 'close'); + } + } + } + + this.log.debug( + 'Secure Connection Counter : ' + this.secureConnectionCounter, + ); + // const symb = Object.getOwnPropertySymbols(this.server).find((v) => v.toString() === "Symbol(http.server.connections)"); + // const connectionsList = this.server[symb]; + + // const idleConnections = connectionsList.idle(); + + // for (const connection of idleConnections) { + // connection.socket.destroy(); + // } + } + + private async runShutdown(signal: string): Promise { + this.log.info(`shutting down with signal ${signal}`); + if (this.preShutdown instanceof Function) await this.preShutdown(); + + // if (cluster.isWorker) { + // // this *should* cause connection distribution to fail sending reqs to this worker + // // see https://github.com/nodejs/node/blob/33710e7e7d39d19449a75911537d630349110a0c/lib/internal/cluster/child.js#L236 + // this.server.maxConnections = null; + // } + + await this.destroyAllConnections(); + await this.waitForReadyToShutDown(Math.round(this.timeout / 250)); + this.log.verbose(`Closing server`); + await new Promise((res, rej) => + this.server.close((err) => (err ? rej(err) : res())), + ).catch((e) => + this.log.error(`Error closing the server: ${e.toString()} ${e.stack}`), + ); + } + + public async shutdown(signal = 'manual'): Promise { + if (!this.isShuttingDown) this.shutdownPromise = this.runShutdown(signal); + return this.shutdownPromise; + } +} diff --git a/src/utils/utils.ts b/src/utils/utils.ts new file mode 100644 index 0000000..a5bb0f6 --- /dev/null +++ b/src/utils/utils.ts @@ -0,0 +1,2 @@ +export const sleep = (ms: number): Promise => + new Promise((resolve) => setTimeout(resolve, ms));