Skip to content

Commit

Permalink
Support killing a child worker when mem usage above defined percent
Browse files Browse the repository at this point in the history
  • Loading branch information
alanhamlett committed Sep 29, 2023
1 parent a2c78ea commit 4e1ed3b
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 12 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ For the original Python version, see [WakaQ for Python][wakaq python].
- [Broadcast][broadcast] a task to all workers
- Task [soft][soft timeout] and [hard][hard timeout] timeout limits
- Optionally retry tasks on soft timeout
- Combat memory leaks with `maxTasksPerWorker`
- Combat memory leaks with `maxMemPercent` or `maxTasksPerWorker`
- Super minimal

Want more features like rate limiting, task deduplication, etc? Too bad, feature PRs are not accepted. Maximal features belong in your app’s worker tasks.
Expand Down
19 changes: 19 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"devDependencies": {
"@types/eslint": "^8.44.2",
"@types/node": "^20.5.3",
"@types/pidusage": "^2.0.3",
"@types/prettier": "^2.7.3",
"@types/yargs": "^17.0.24",
"@typescript-eslint/eslint-plugin": "6.4.1",
Expand All @@ -52,6 +53,7 @@
"dependencies": {
"cron-parser": "^4.9.0",
"ioredis": "^5.3.2",
"pidusage": "^3.0.2",
"ts-duration": "^1.1.0",
"winston": "^3.10.0",
"yargs": "^17.7.2"
Expand Down
40 changes: 29 additions & 11 deletions src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { Redis } from 'ioredis';
import { fork } from 'node:child_process';
import * as net from 'node:net';
import process from 'node:process';
import * as os from 'os';
import pidusage from 'pidusage';
import { Duration } from 'ts-duration';
import { type Logger } from 'winston';
import { Child } from './child.js';
Expand Down Expand Up @@ -58,7 +60,7 @@ export class WakaWorker {

while (!this._stopProcessing) {
this._respawnMissingChildren();
this._enqueueReadyEtaTasks();
await this._enqueueReadyEtaTasks();
this._checkChildRuntimes();
await this.wakaq.sleep(Duration.millisecond(500));
}
Expand All @@ -67,6 +69,7 @@ export class WakaWorker {
this.logger.info('shutting down...');
while (this.children.length > 0) {
this._stopAllChildren();
await this._checkChildMemoryUsages();
this._checkChildRuntimes();
await this.wakaq.sleep(Duration.millisecond(500));
}
Expand Down Expand Up @@ -134,16 +137,31 @@ export class WakaWorker {
child.softTimeoutReached = false;
}

_enqueueReadyEtaTasks() {
this.wakaq.queues.forEach(async (q) => {
const results = await this.wakaq.broker.getetatasks(q.brokerEtaKey, String(Math.round(Date.now() / 1000)));
results.forEach((result) => {
const payload = deserialize(result);
const taskName = payload.name;
const args = payload.args;
this.wakaq.enqueueAtFront(taskName, args, q);
});
});
async _enqueueReadyEtaTasks() {
await Promise.all(
this.wakaq.queues.map(async (q) => {
const results = await this.wakaq.broker.getetatasks(q.brokerEtaKey, String(Math.round(Date.now() / 1000)));
await Promise.all(
results.map(async (result) => {
const payload = deserialize(result);
const taskName = payload.name;
const args = payload.args;
await this.wakaq.enqueueAtFront(taskName, args, q);
}),
);
}),
);
}

async _checkChildMemoryUsages() {
if (!this.wakaq.maxMemPercent) return;
const totalMem = os.totalmem();
const percent = ((totalMem - os.freemem()) / totalMem) * 100;
if (percent < this.wakaq.maxMemPercent) return;
const usages = await Promise.all(this.children.map(async (child) => (await pidusage(child.process.pid ?? 0)).memory || 0));
const maxIndex = usages.reduce((iMax, x, i, arr) => (x > (arr[iMax] ?? 0) ? i : iMax), 0);
const child = this.children.at(maxIndex);
child?.sigterm();
}

_checkChildRuntimes() {
Expand Down

0 comments on commit 4e1ed3b

Please sign in to comment.