-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka_notes.properties
407 lines (273 loc) · 12.8 KB
/
kafka_notes.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
#Topics
- similar to a databse table
- topics are identified by the name
#Partitions
- Topics are split into partitions
- ordered
#Offset
- each message inside the partition gets an incremental id called offset
- offsets are ordered only inside a partition
- data can be kept only for a week
- data once written cannot be changed. It is immutable
#Broker
- a kafka cluster consistes of multiple brokers
- Cluster is a machine and broker is a server
- Each broker has its own indentity (integer)
- each broker contains certain topic partitions
- After connecting to any broker (bootstrap broker), you are connected to the entire cluster
- good number is 3 but some companies have 100 brokers
#Brokers and Topics
- if Topic A has 3 paritions and 3 brokers,
1. broker 101 has partition 0
2. broker 102 has partition 1
3. broker 103 has partition 2
#Topic Replication Factor
- Topics should have replication factor >1 (usually between 2 and 3)
- If a broker is down, another will server the data
#Leader
- Only one broker can be a leader for a give partition
- Only the leader can receive and server data for a partition
- Other brokers will synchronize the data
- Each partition has one leader and multiple ISR (in-sync replica)
- Eg
1. Topic A patition 0 is in broker 101
2. Topic A parition 1 is in broker 102
3. Topic A parition 2 is in broker 103
Any one of the partitions will be a leader and other 2 partitions will be the ISR
- leader and ISR are determined by zookeeper
#Producers
- Producers write data to topics (which is made of partitions)
- Producers automatically know to which broker and parition to write to
- In case of broker failures, producers will automatically recover
- Load balancing done by producer when writing to partitions by round robin
- Producers can choose to receive ack of data writes
1. acks = 0 (no ack and possible data loss)
2. acks = 1 (producer will wait for leader to ack. limited data loss)
3. acks = 2 (leader and replicas ack. no data loss)
#Producer:Message Keys
- Producers can choose to send a key with the message (string,number etc)
- If the key is null, then data in sent round robin
- If a key is sent, all messages for that key will go to the same partition
- A key should be sent if we need message ordering for a specific field eg: truck_id
#Consumers
- read data from topic
- automatically knows which broker to read from
- in case of broker failures, consumers know how to recover
- data read in order within partitions (offsets)
- can read data from multiple brokers. no specific orders
- A single topic can be read by multiple consumers
- Have no of consumers same as no of partitions
#Consumer Groups
- consumers read data in groups
- each consumers within a group reads data from exclusive partitions
- if there ar more consumers than paritions, some consumers will be inactive
- Consumers will automatically use a GroupCoordinator and ConsumerCoordinator to assign consumers to a parition
#Consumer Offsets
- Kafka stores the offsets at which a consumer group has been reading
- Offsets committed live in a kafka topic named __consumer_offsets
- When a consumer in a group has processed data received from kafka, it should be committing the offsets
- if a consumer crashes, it can read back from the place it left off using the committed offsets
#Delivery Semantics
- Consumers choose when to commit offsets
- there are 3 delivery sematnics
1. At most once
2. At least once (preferred)
3. Exactly once
#Kafka Broker Discovery
- every kafka broker is also called a bootstrap server
- you only need to connect to one broker to connect to the entire cluster
- Each broker knows about all brokers, topics and partitions (metadata)
#Zookeeper
- Manages brokers (keeps a list of them)
- helps in perfomring leader election for partitons
- sends notifications to kafka in case pf changes (eg: new topic, delete topic, broker crash, broker comes up etc)
- Kafka cannot work without zookeeper
- zookeeper works with odd number of servers (3,5,7)
- zookeeper has a leader (handles writes) and the rest of the servers are followers (handle reads)
- zookeeper does not store conusmer offsets with kafka > v0.10
- it is isolated from kafka
- kafka cluster will be connected to the zookeeper cluster
#Kafka Programming
- consumer group rebalance happens automatically when a consumer is added or deleted
#Bi-directional compatibility
- kafka version and client can be different
- old client can talk to new broker and vice versa
- so always use lastest client version
#producer acks
- default is acks=1
acks=all ->leader+replicas ack
-- there is a latency added but it is safe
min.insync.replicas=3 (recommended)
#producer retries
- messages may be out of order in retry
#max.in.flight.requests.per.connection
default val is 5
#idempotent producer
- producer can introduce duplicate messgaes in kafka due to network errors
- idempotent producer does not introduce duplicates on network errors
- great to guarante and safe pipeline
#Message Compression
- need to apply compression to the producer
- enabled at producer level and does not require any config change in brokers or consumers
- compression type can be none(default), gzip, lz4, snappy
- when compression is enabled, messages are compressed and sent in batch
- smaller producer request size (up to 4x)
- faster transmission of data over the network
- better disk utilization in kafka
- snappy or lz4 is best for optimal speed
settings
- linger.ms (wait few ms to send msgs in a batch)
- batch.size (default size is 16 kb. max no of bytes included in a batch)
- by default producer sends upto 5 inflight requests individually at the same time
- after this, producer will batch the messages and wait to send them all at once
#kafka producer metrics
- to monitor average batch size metric
#Key hash
- keys are hashed using murmur2 algorithm
#buffer memory
settings
1. max.block.ms
2. buffer.memory
- if the producer produces faster than the broker can take, then the records are buffered in memory of producer
- each producer has a buffer of 32MB
- if the buffer is full, .send() method will start to block
- this wait is controlled by max.block.ms = 60000
#Delivery_Semantics
-At most once: offsets are committed as soon as the message batch is received. If the processing goes wrong, the message will be lost( it wont be read again)
-At Least Once: offsets are committed after the message is processed. If the processing goes wrong, the message will be read again. This can result in duplicate processing of message. So make sure that the processing is idempotent
-Exactly Once: only for kafka => kafka workflows using kafka streams API
-at least once should be the preferred one for most applications.
#Consumer_poll_behaviour
- kafka consumers have poll model while other messagin systems use push model
- This gives consumers to control how fats they can consumer and also to replay the events
#fetch.min.bytes (default val is 1 byte)
- controls how much data you want to pull on each request
- helps to improve throughput and decrease request number
- but latency will be very high
#max.poll.records(default 500)
-controls how many records to receive per poll request
- increase the value if the msgs are small and more RAM available
- good to monitor how many records are polled per request
#max.partitions.fetch.bytes(default 1 MB)
- maximum data returned by the broker per partition
- if you read from 100 partitions, you need lot of memory (RAM)
#fetch..ax.bytes (default 50 MB)
- maximum data returned for each request (covers multiple partitions)
- consumer fetches multiple fetches in parallel
- do not change these setting unless consumer throughout is reached to max
#Consumer_offser_strategy
- 2 common patterns for commiting offsets
1. enable.auto.commit = true (synchronous processing of batches)
- offsets will be committed automatically at regular intervals (auto.commit.interval.ms=5000 by default)
- if you dont use synchronous processing, you will be in "at-once" behavious because offsets wil be committed before your data is processed
- hence it is risky
2. enable.auto.commit = false(manual commit of offsets)
- you control when you commit offsets and whats the condition for committing them
- eg: accumulate records into a buffer and then fluh the buffer to a database and commit offsets then
#consumer offset reset behaviour
- the behaviour for the consumer is to use
auto.offset.reset = latest (will read from end of the log)
auto.offset.reset=earliest (will read from the start of the log)
auto.offset.reset=none(will throw exception if no offset is found)
- if a consumer has not read new data in 7 days or commit offsets in 7 days,
then it will lose data
- it can be controlled by broker setting #offset.retention.minutes
#Replay data for consumers
- take all consumers from a sepcific group down
- use kafka-consumer-groups command to set the offset you want
- restart consumers
- We can set data retenstion period and offset retention period to any value(default is 7 days)
#Consumer_Liveliness
- consumers in a group talk to a consumer coordinator and send heartbeats to it. It is called heartbeat thread
- consumers talk to kafka broker through poll thread
- poll thread and heartbeat threads are isolated
- consumers are encouraged to process data fast and poll ofsten to avoid rebalancing
#session.timeout.ms (default 10 seconds)
- heartbeats are sent periodically to the broker
- if no heartbeat is sent during that period, then the consumer is considered dead
#heartbeat.interval.ms (default 3 seconds)
- how often to send heartbearts
- usually set to 1/3 of session.timeout.ms
#max.poll.interval.ms(default 5 minutes)
#kafka_connect and streams
use cases
1. source => kafka - Producer API (Kafka connect source)
2. Kafka => kafka - Consumer, Producer API (Kafka Streams)
3. Kafka => sink - Consumer API (Kafka Connect Sink)
4. Kafka => App - Consumer API (Kafka Connect Sink)
- simplify and improve getting data in and out of kafka
- simplify transforming data within kafka without relying on external libs
#Why kakfa connect
- Programmers want to import data from the same source eg: database,twitter etc
- Programmers always want to store data in the same sinks eg: elasticsearch, S3,MongoDB etc
- it is tough to achieve fauilt tolerance,idempotence,distribution,ordering
- Other programmers may have already implmenetd it and provided in kafka connect
#high_level
- Source connectors to get data from Common Data Sources
- Sink Connectors to publish that data in common data sources
- Makes it easy for non-experienced dev to quickly get the data reliably into kafka
- part of many company wide ETL pipelines
- highly scalable
- re-usable code
#Kafka_streams
- Standard java aplication
- no need to create a separate cluster
- highly scalable,elastic and fault tolerant
- exactly once capabilities
- once record at a time processing
- works for any application size
- serious contender to other processing frameworks like apache spark, flink or nifi
#Schemea_Registry
- takes bytes as an input and publishes it
- we need data to be self describable
- we need to be able to evolve data without breaking downstream consumers
- so we need schemas and a schema registry
-kafka does not parse ot read the data
- it just takes bytes as inout and even without load into memory. it is called zero copy
- kafka does not know the data type.
- so schema registry must be a separate component
- producers and consumers need to be able to talk to it
- schema registry must be able to reject bad data
- Confluent Schema Registry
- it uses apache avro data format
#kafka_Security
- Authentication, Authorization and Encryption
#Encryption
- data exchanged between clients and brokers is secret to the routers on the way
- similar to https
- kafka with encryption on port 9093 with SSL
- client encrypts and sends. kafka broker decrypts and stores on disk
#Authentication
- Client needs to prove the identity to kafka
- similar to user name and password
- we are who we say we are
- SSL authentication : using SSL
- SASL Authenctication:
1. PLAIN : clients authenticate using uername/password
2. Kerberos such as Microsoft Active Directory
3. SCRAM : username/password
#Authorization
- which user can access what topic
- this is called ACl (access control list)
#Other ways to start kafka
#Confluent CLI
- intended for development not for production
- starts zooker and kafka together
- it also loads kafka connect
#commands
confluent start
confluent stop
#multi-broker cluster
- copy server.properties multiple times and start all
- if we connect to 1 broker, we are automatically connected to all the brokers
- if we have 3 brokers, we can have 3 replicas
#Advertised listeners
- Kakfa broker has
1. public ip
2. private ip
3. adv_host
#advertised_host_name
- a unique name for kafka broker for clients to connect
- set internal ip or internal dns host name
- set this property in server.properties
- DC