-
-
Notifications
You must be signed in to change notification settings - Fork 11
/
noroutine.js
143 lines (128 loc) · 3.96 KB
/
noroutine.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
'use strict';
const { Worker } = require('worker_threads');
const path = require('path');
const STATUS_NOT_INITIALIZED = 0;
const STATUS_INITIALIZATION = 1;
const STATUS_INITIALIZED = 2;
const STATUS_FINALIZATION = 3;
const STATUS_FINALIZED = 4;
const WORKER_PATH = path.join(__dirname, 'lib/worker.js');
const DEFAULT_POOL_SIZE = 5;
const DEFAULT_THREAD_WAIT = 2000;
const DEFAULT_TIMEOUT = 5000;
const DEFAULT_MON_INTERVAL = 5000;
const OPTIONS_INT = ['pool', 'wait', 'timeout', 'monitoring'];
const balancer = {
options: null,
pool: [],
modules: null,
status: STATUS_NOT_INITIALIZED,
timer: null,
elu: [],
current: null,
id: 1,
tasks: new Map(),
targets: null,
};
const monitoring = () => {
let utilization = 1;
let index = 0;
for (let i = 0; i < balancer.options.pool; i++) {
const worker = balancer.pool[i];
const prev = balancer.elu[i];
const current = worker.performance.eventLoopUtilization();
const delta = worker.performance.eventLoopUtilization(current, prev);
if (delta.utilization < utilization) {
index = i;
utilization = delta.utilization;
}
balancer.elu[i] = current;
}
balancer.current = balancer.pool[index];
};
const invoke = async (method, args) => {
const id = balancer.id++;
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
reject(new Error(`Timeout execution for method '${method}'`));
}, balancer.options.timeout);
balancer.tasks.set(id, { resolve, reject, timer });
balancer.current.postMessage({ id, method, args });
});
};
const workerResults = ({ id, error, result }) => {
const task = balancer.tasks.get(id);
clearTimeout(task.timer);
balancer.tasks.delete(id);
if (error) task.reject(error);
else task.resolve(result);
};
const register = (worker) => {
balancer.pool.push(worker);
const elu = worker.performance.eventLoopUtilization();
balancer.elu.push(elu);
worker.on('message', workerResults);
};
const findModule = (module) => {
for (const file of Object.keys(require.cache)) {
const cached = require.cache[file];
if (cached.exports === module) return file;
}
throw new Error('Unknown module');
};
const wrapModule = (module) => {
for (const key of Object.keys(module)) {
if (typeof module[key] !== 'function') continue;
module[key] = async (...args) => invoke(key, args);
}
};
const init = (options) => {
if (balancer.status !== STATUS_NOT_INITIALIZED) {
throw new Error('Can not initialize noroutine more than once');
}
balancer.status = STATUS_INITIALIZATION;
for (const module of options.modules) {
if (typeof module !== 'object') {
throw new Error('Module should export an interface');
}
}
balancer.options = {
modules: options.modules,
pool: options.pool || DEFAULT_POOL_SIZE,
wait: options.wait || DEFAULT_THREAD_WAIT,
timeout: options.timeout || DEFAULT_TIMEOUT,
monitoring: options.monitoring || DEFAULT_MON_INTERVAL,
};
for (const key of OPTIONS_INT) {
const value = balancer.options[key];
if (!Number.isInteger(value)) {
throw new Error(`Norutine.init: options.${key} should be integer`);
}
}
balancer.targets = options.modules.map(findModule);
for (const module of options.modules) {
wrapModule(module);
}
const workerData = {
modules: balancer.targets,
timeout: balancer.options.timeout,
};
for (let i = 0; i < balancer.options.pool; i++) {
register(new Worker(WORKER_PATH, { workerData }));
}
balancer.current = balancer.pool[0];
balancer.timer = setInterval(monitoring, balancer.options.monitoring);
balancer.status = STATUS_INITIALIZED;
};
const finalize = async () => {
balancer.status = STATUS_FINALIZATION;
clearInterval(balancer.timer);
const finals = [];
for (let i = 0; i < balancer.options.pool; i++) {
const worker = balancer.pool[i];
finals.push(worker.terminate());
}
await Promise.allSettled(finals);
balancer.status = STATUS_FINALIZED;
};
module.exports = { init, finalize };