-
Notifications
You must be signed in to change notification settings - Fork 1
/
code-listings.txt
34 lines (34 loc) · 1.25 KB
/
code-listings.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# Flink Watermark API
env.addSource(source).assignTimestampsAndWatermarks(watermarkStrategy).keyBy(key).window(windowType).reduce(reduceFunction)...
# Keyed Watermark API
env.addSource(source).keyBy(key).assignTimestampsAndKeyedWatermarks(keyedWatermarkStrategy).window(windowType).reduce(reduceFunction)...
# Flink Watermark Generation
onPeriodicEmit():
emitWatermark(watermark(maxTimestamp-outOfOrderness-1))
emitWatermark():
if mark > currentWatermark:
currentWatermark = mark
emit(watermark(mark))
# Keyed Watermark Generation
onPeriodicEmit():
for each k in maxTimestamps:
emitWatermark(watermark(maxTimestamps.get(k),k))
emitWatermark(mark, key):
if mark > keysAndWatermarks.get(key):
keysAndWatermarks.put(key, mark)
emitWatermark(watermark(mark, key))
# Flink Watermark Firing
advanceWatermark(mark):
currentWatermark = mark
while queue != empty:
if queue.timer.ts <= mark:
pop(timer)
onEventTime(timer)
# Keyed Watermark Firing
advanceKeyedWatermark(mark, key):
watermarksAndKeys.put(key, mark)
queue = hashmapOfQueue.get(key)
while queue != empty:
if queue.timer.ts <= mark:
pop(timer)
onEventTime(timer) #trigger to fire the window