Skip to content

Commit

Permalink
Simplify methods by using async
Browse files Browse the repository at this point in the history
  • Loading branch information
Americas authored and diogotorres97 committed Apr 10, 2024
1 parent 57793fc commit 8aa18cf
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 138 deletions.
78 changes: 36 additions & 42 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ class ProcessManager {

constructor() {
this.errors = [];
this.forceShutdown = utils.deferred();
this.hooks = [];
this.log = utils.getDefaultLogger();
this.running = [];
this.running = new Set();
this.startedShutdown = false;
this.terminating = false;
this.timeout = 30000;
}
Expand Down Expand Up @@ -109,16 +109,14 @@ class ProcessManager {
* Handle a loop routine.
*/

loop(fn, { interval = 0 } = {}) {
return (async () => {
while (!this.terminating) {
await this.run(fn, { exit: false });
async loop(fn, { interval = 0 } = {}) {
while (!this.terminating) {
await this.run(fn, { exit: false });

if (!this.terminating) {
await utils.timeout(interval);
}
if (!this.terminating) {
await utils.timeout(interval);
}
})();
}
}

/**
Expand All @@ -134,71 +132,67 @@ class ProcessManager {
*/

once(fn) {
return this.run(fn);
this.run(fn);
}

/**
* Routine handler.
*/

run(fn, { args = [], exit = true } = {}) {
async run(fn, { args = [], exit = true } = {}) {
if (this.terminating) {
return;
}

const id = Symbol();
const chain = utils.reflect(fn, args).then(error => {
this.running.splice(
this.running.findIndex(chain => chain.id === id),
1
);

if (error || exit) {
this.shutdown({ error });
}
});

chain.id = id;
this.running.add(id);

this.running.push(chain);
const error = await utils.reflect(fn, args);

return chain;
this.running.delete(id);

if (error || exit || this.terminating) {
await this.shutdown({ error });
}
}

/**
* Shutdown process.
*/

shutdown({ error, force = false } = {}) {
async shutdown({ error, force = false } = {}) {
if (error) {
this.errors.push(error);
}

if (force) {
this.forceShutdown.reject();
this.log.warn('Forced shutdown, skipped waiting');

return this.exit();
}

if (this.terminating) {
this.terminating = true;

if (this.running.size || this.startedShutdown) {
return;
}

this.terminating = true;
this.startedShutdown = true;

this.log.info('Starting shutdown');

const gracefulShutdown = Promise.all(this.running)
.then(() => this.log.info('All running instances have stopped'))
.then(() => this.hook('drain'))
.then(() => this.log.info(`${this.hooks.filter(hook => hook.type === 'drain').length} server(s) drained`))
.then(() => this.hook('disconnect'))
.then(() =>
this.log.info(`${this.hooks.filter(hook => hook.type === 'disconnect').length} service(s) disconnected`)
)
.then(() => this.hook('exit', this.errors));

Promise.race([gracefulShutdown, this.forceShutdown.promise])
.catch(() => this.log.warn('Forced shutdown, skipped waiting'))
.then(() => this.exit());
await this.hook('drain');

this.log.info(`${this.hooks.filter(({ type }) => type === 'drain').length} server(s) drained`);

await this.hook('disconnect');

this.log.info(`${this.hooks.filter(({ type }) => type === 'disconnect').length} service(s) disconnected`);

await this.hook('exit', this.errors);

this.exit();
}
}

Expand Down
154 changes: 58 additions & 96 deletions test/index.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
/* eslint no-console: 0 */
'use strict';

/**
* Module dependencies.
*/

const utils = require('../src/utils');

/**
* Test `ProcessManager`.
Expand All @@ -25,13 +32,8 @@ describe('ProcessManager', () => {
describe('constructor()', () => {
test('sets the initial state', () => {
expect(processManager.errors).toEqual([]);
expect(processManager.forceShutdown).toMatchObject({
promise: expect.any(Promise),
reject: expect.any(Function),
resolve: expect.any(Function)
});
expect(processManager.hooks).toEqual([]);
expect(processManager.running).toEqual([]);
expect(processManager.running).toEqual(new Set());
expect(processManager.terminating).toEqual(false);
expect(processManager.timeout).toEqual(30000);
});
Expand Down Expand Up @@ -245,123 +247,95 @@ describe('ProcessManager', () => {
});

describe('shutdown()', () => {
test('sets `terminating` to true', () => {
test('sets `processManager.terminating` to true', () => {
processManager.shutdown();

expect(processManager.terminating).toBe(true);
});

test('creates `forceShutdown` promise', () => {
processManager.shutdown();

expect(processManager.forceShutdown.promise).toBeInstanceOf(Promise);
});
test('calls `processManager.exit()` if `force` is set to `true`', async () => {
jest.spyOn(processManager, 'exit').mockImplementation(() => {});

test('with `force` set to `true` it creates `forceShutdown` promise in reject state', done => {
processManager.shutdown({ force: true });
await processManager.shutdown({ force: true });

processManager.forceShutdown.promise.catch(done);
expect(processManager.exit).toHaveBeenCalledTimes(1);
});

test('calls hook `drain`', done => {
jest.spyOn(processManager, 'hook');
test('calls hook `drain`', async () => {
jest.spyOn(processManager, 'hook').mockImplementation(() => {});

processManager.addHook({
handler() {
expect(processManager.hook).toHaveBeenCalledWith('drain');
processManager.addHook({ handler() {}, type: 'drain' });

done();
},
type: 'drain'
});
processManager.configure({ timeout: 1 });
await processManager.shutdown();

processManager.shutdown();
expect(processManager.hook).toHaveBeenCalledWith('drain');
});

test('calls hook `disconnect`', done => {
jest.spyOn(processManager, 'hook');
test('calls hook `disconnect`', async () => {
jest.spyOn(processManager, 'hook').mockImplementation(() => {});

processManager.addHook({
handler() {
expect(processManager.hook).toHaveBeenCalledWith('disconnect');
processManager.addHook({ handler() {}, type: 'disconnect' });

done();
},
type: 'disconnect'
});
processManager.configure({ timeout: 1 });
await processManager.shutdown();

processManager.shutdown();
expect(processManager.hook).toHaveBeenCalledWith('disconnect');
});

test('calls hook `exit`', done => {
jest.spyOn(processManager, 'hook');
test('calls hook `exit`', async () => {
jest.spyOn(processManager, 'hook').mockImplementation(() => {});

processManager.addHook({
handler() {
expect(processManager.hook).toHaveBeenCalledWith('exit', []);
processManager.addHook({ handler() {}, type: 'exit' });

done();
},
type: 'exit'
});
processManager.configure({ timeout: 1 });
await processManager.shutdown();

processManager.shutdown();
expect(processManager.hook).toHaveBeenCalledWith('exit', []);
});

test('calls `processManager.exit`', done => {
jest.spyOn(processManager, 'exit').mockImplementation(() => {
done();
});
test('calls `processManager.exit()`', async () => {
jest.spyOn(processManager, 'exit').mockImplementation(() => {});

processManager.shutdown();
await processManager.shutdown();

expect(processManager.exit).toHaveBeenCalledTimes(1);
});

test('adds error to `processManager.errors`', done => {
test('adds error to `processManager.errors`', async () => {
const error = new Error();

jest.spyOn(processManager, 'exit').mockImplementation(() => {
expect(processManager.errors).toHaveLength(1);
expect(processManager.errors).toContain(error);

expect(processManager.exit).toHaveBeenCalled();
expect(processManager.exit).toHaveBeenCalledTimes(1);
jest.spyOn(processManager, 'exit').mockImplementation(() => {});

done();
});
await processManager.shutdown({ error });

processManager.shutdown({ error });
expect(processManager.errors).toHaveLength(1);
expect(processManager.errors).toContain(error);
});

test('adds errors to `processManager.errors` if called more than once', done => {
test('adds errors to `processManager.errors` if called more than once', async () => {
const [e1, e2] = [new Error(), new Error()];

jest.spyOn(processManager, 'exit').mockImplementation(() => {
expect(processManager.errors).toHaveLength(2);
expect(processManager.errors).toContain(e1);
expect(processManager.errors).toContain(e2);

expect(processManager.exit).toHaveBeenCalled();
expect(processManager.exit).toHaveBeenCalledTimes(1);
jest.spyOn(processManager, 'exit').mockImplementation(() => {});

done();
});
await Promise.all([processManager.shutdown({ error: e1 }), processManager.shutdown({ error: e2 })]);

processManager.shutdown({ error: e1 });
processManager.shutdown({ error: e2 });
expect(processManager.errors).toHaveLength(2);
expect(processManager.errors).toContain(e1);
expect(processManager.errors).toContain(e2);
});

test('forces shutdown if `processManager.shutdown` is called with force `true`', done => {
jest.spyOn(processManager, 'exit').mockImplementation(() => {
processManager.forceShutdown.promise.catch(done);
test('forces shutdown if `processManager.shutdown()` is called with force `true`', async () => {
const deferred = utils.deferred();

jest.spyOn(processManager, 'exit').mockImplementation(() => {});

processManager.once(async () => {
await deferred.promise;
});

processManager.loop(async () => {}, { interval: 1000 });
await processManager.shutdown({ force: true });

processManager.shutdown();
processManager.shutdown({ force: true });
expect(processManager.exit).toHaveBeenCalledTimes(1);

deferred.resolve();
});
});

Expand Down Expand Up @@ -449,29 +423,17 @@ describe('ProcessManager', () => {
});

describe('run()', () => {
test('does nothing if `processManager` is terminating', () => {
test('does nothing if `processManager.terminating` is true', async () => {
const fn = jest.fn();

processManager.terminating = true;

const result = processManager.run(() => fn());
await processManager.run(fn);

expect(fn).not.toHaveBeenCalled();
expect(result).toBeUndefined();
});

test('returns the coroutine', done => {
jest.spyOn(processManager, 'shutdown').mockImplementation(() => {
done();
});

const chain = processManager.run(() => {});

expect(chain.then).toBeDefined();
expect(typeof chain.id).toBe('symbol');
});

test('calls `shutdown` with error if an error is thrown while running the function', async () => {
test('calls `processManager.shutdown()` with error if an error is thrown while running the function', async () => {
const error = new Error();

jest.spyOn(processManager, 'shutdown');
Expand Down

0 comments on commit 8aa18cf

Please sign in to comment.