This repository has been archived by the owner on Jan 26, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 13
/
broker.js
103 lines (87 loc) · 2.69 KB
/
broker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
module.exports = function (logger, inherits, EventEmitter, Client) {
// A Broker represents a Kafka server. It attempts to maintain a connection
// to the server until it is 'destroy'ed. If the connection is dropped,
// the Broker will attempt to reconnect, backing off exponentially on failure.
function Broker(id, options) {
this.id = id
this.client = Client.nil
this.reconnectAttempts = 0
this.options = options
this.host = options.host
this.port = options.port
this.connector = this.connect.bind(this)
this.onClientEnd = clientEnd.bind(this)
this.onClientReady = clientReady.bind(this)
this.onClientConnect = clientConnect.bind(this)
EventEmitter.call(this)
}
inherits(Broker, EventEmitter)
Broker.prototype.connect = function () {
var options = this.options
logger.info(
'connecting broker', this.id,
'host', options.host,
'port', options.port
)
this.client = new Client(this.id, options)
this.client.once('connect', this.onClientConnect)
this.client.once('end', this.onClientEnd)
this.client.on('ready', this.onClientReady)
this.reconnectTimer = null
}
Broker.prototype.isReady = function () {
return this.client.ready
}
Broker.prototype.fetch = function (topic, partition, cb) {
this.client.fetch(topic, partition, cb)
}
Broker.prototype.write = function (partition, messages, cb) {
return this.client.write(partition.topic, messages, partition.id, cb)
}
Broker.prototype.drain = function (cb) {
this.client.drain(cb)
}
Broker.prototype.destroy = function () {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
this.client.removeListener('connect', this.onClientConnect)
this.client.removeListener('end', this.onClientEnd)
this.client.removeListener('ready', this.onClientReady)
this.client.end()
this.client = Client.nil
logger.info('broker destroyed', this.id)
this.emit('destroy')
}
function exponentialBackoff(attempt) {
return Math.floor(
Math.random() * Math.pow(2, attempt) * 10
)
}
// Event handlers
function clientConnect() {
logger.info('broker connected', this.id)
this.reconnectAttempts = 0
this.emit('connect', this)
this.emit('ready')
}
function clientEnd() {
this.reconnectAttempts++
logger.info(
'broker ended', this.id,
'reconnects', this.reconnectAttempts
)
this.client.removeListener('connect', this.onClientConnect)
this.client.removeListener('end', this.onClientEnd)
this.client.removeListener('ready', this.onClientReady)
this.reconnectTimer = setTimeout(
this.connector,
exponentialBackoff(this.reconnectAttempts)
)
}
function clientReady() {
logger.info('broker ready', this.id)
this.emit('ready')
}
Broker.nil = new Broker(-1, {})
return Broker
}