-
Notifications
You must be signed in to change notification settings - Fork 3
/
app.js
112 lines (86 loc) · 3.39 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
var args = require('yargs').argv,
zk = require('node-zookeeper-client'),
kafka = require('kafka-node'),
Table = require('cli-table'),
linq = require('node-linq').LINQ;
if (!args.zkConnect) {
console.error('zkConnect not specified');
return;
}
if (!args.consumers) {
console.error('consumers not specified');
return;
}
var zkClient = zk.createClient(args.zkConnect),
kafkaClient = new kafka.Client(args.zkConnect, 'lagger'),
kafkaOffset = new kafka.Offset(kafkaClient),
poll = function (cb) {
var ops = 1,
start = Date.now(),
result = [];
args.consumers.split(',').forEach(function (c) {
ops++;
zkClient.getChildren('/consumers/' + c + '/offsets', function (e, d, z) {
if (e) return ops--;
d.forEach(function (t) {
ops++;
zkClient.getChildren('/consumers/' + c + '/offsets/' + t, function (e, d, z) {
if (e) return ops--;
d.forEach(function (p) {
ops++;
zkClient.getData('/consumers/' + c + '/offsets/' + t + '/' + p, function (e, d, z) {
if (e) return ops--;
ops++;
var offset = d.toString();
kafkaOffset.fetch([{
topic: t,
partition: p,
time: -1,
maxNum: 1
}], function (e, d) {
if (e) return ops--;
var value = d[t][p];
if (value.length) value = value[0];
else value = undefined;
result.push({
consumer: c,
topic: t,
partition: p,
end: value,
offset: offset,
lag: value - offset
});
ops--;
});
ops--;
});
});
ops--;
});
});
ops--;
});
ops--;
});
var interval = setInterval(function () {
if (ops > 0) return;
cb(result);
clearInterval(interval);
}, 10);
};
zkClient.once('connected', function () {
var go = function () {
poll(function (result) {
var table = new Table({ head : ['Consumer', 'Topic', 'Partition', 'End', 'Offset', 'Lag']});
new linq(result)
.OrderBy(function(r) { return r.consumer + '__' + r.topic + '__' + r.partition; })
.ToArray()
.forEach(function (r) { table.push([ r.consumer, r.topic, r.partition, r.end, r.offset, r.lag ]); });
console.log(table.toString());
console.log(Date.now());
go();
});
};
go();
});
zkClient.connect();