-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathBankBalanceTopology.java
40 lines (33 loc) · 1.85 KB
/
BankBalanceTopology.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.github.programmingwithmati.topology;
import com.github.programmingwithmati.model.BankBalance;
import com.github.programmingwithmati.model.BankTransaction;
import com.github.programmingwithmati.model.JsonSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
public class BankBalanceTopology {
public static final String BANK_TRANSACTIONS = "bank-transactions";
public static final String BANK_BALANCES = "bank-balances";
public static final String REJECTED_TRANSACTIONS = "rejected-transactions";
public static Topology buildTopology() {
Serde<BankTransaction> bankTransactionSerdes = new JsonSerde<>(BankTransaction.class);
Serde<BankBalance> bankBalanceSerde = new JsonSerde<>(BankBalance.class);
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<Long, BankBalance> bankBalancesStream = streamsBuilder.stream(BANK_TRANSACTIONS,
Consumed.with(Serdes.Long(), bankTransactionSerdes))
.groupByKey()
.aggregate(BankBalance::new,
(key, value, aggregate) -> aggregate.process(value),
Materialized.with(Serdes.Long(), bankBalanceSerde))
.toStream();
bankBalancesStream
.to(BANK_BALANCES, Produced.with(Serdes.Long(), bankBalanceSerde));
bankBalancesStream
.mapValues((readOnlyKey, value) -> value.getLatestTransaction())
.filter((key, value) -> value.state == BankTransaction.BankTransactionState.REJECTED)
.to(REJECTED_TRANSACTIONS, Produced.with(Serdes.Long(), bankTransactionSerdes));
return streamsBuilder.build();
}
}