-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathBankBalanceTopologyTest.java
132 lines (103 loc) · 5.04 KB
/
BankBalanceTopologyTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package com.github.programmingwithmati.topology;
import com.github.programmingwithmati.model.BankBalance;
import com.github.programmingwithmati.model.BankTransaction;
import com.github.programmingwithmati.model.JsonSerde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
class BankBalanceTopologyTest {
TopologyTestDriver testDriver;
private TestInputTopic<Long, BankTransaction> bankTransactionTopic;
private TestOutputTopic<Long, BankBalance> bankBalanceTopic;
private TestOutputTopic<Long, BankTransaction> rejectedBankTransactionTopic;
@BeforeEach
void setup() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
testDriver = new TopologyTestDriver(BankBalanceTopology.buildTopology(), props);
var bankBalanceJsonSerde = new JsonSerde<>(BankBalance.class);
var bankTransactionJsonSerde = new JsonSerde<>(BankTransaction.class);
bankTransactionTopic = testDriver.createInputTopic(BankBalanceTopology.BANK_TRANSACTIONS, Serdes.Long().serializer(), bankTransactionJsonSerde.serializer());
bankBalanceTopic = testDriver.createOutputTopic(BankBalanceTopology.BANK_BALANCES, Serdes.Long().deserializer(), bankBalanceJsonSerde.deserializer());
rejectedBankTransactionTopic = testDriver.createOutputTopic(BankBalanceTopology.REJECTED_TRANSACTIONS, Serdes.Long().deserializer(), bankTransactionJsonSerde.deserializer());
}
@AfterEach
void teardown() {
testDriver.close();
}
@Test
void testTopology() {
List.of(
BankTransaction.builder()
.balanceId(1L)
.time(new Date())
.amount(new BigDecimal(500))
.build(),
BankTransaction.builder()
.balanceId(2L)
.time(new Date())
.amount(new BigDecimal(3000)).build(),
BankTransaction.builder()
.balanceId(1L)
.time(new Date())
.amount(new BigDecimal(500)).build()
)
.forEach(bankTransaction -> bankTransactionTopic.pipeInput(bankTransaction.getBalanceId(), bankTransaction));
var firstBalance = bankBalanceTopic.readValue();
assertEquals(1L, firstBalance.getId());
assertEquals(new BigDecimal(500), firstBalance.getAmount());
var secondBalance = bankBalanceTopic.readValue();
assertEquals(2L, secondBalance.getId());
assertEquals(new BigDecimal(3000), secondBalance.getAmount());
var thirdBalance = bankBalanceTopic.readValue();
assertEquals(1L, thirdBalance.getId());
assertEquals(new BigDecimal(1000), thirdBalance.getAmount());
assertTrue(rejectedBankTransactionTopic.isEmpty());
}
@Test
void testTopologyWhenRejection() {
List.of(
BankTransaction.builder()
.id(1L)
.balanceId(1L)
.time(new Date())
.amount(new BigDecimal(-500))
.build(),
BankTransaction.builder()
.id(2L)
.balanceId(2L)
.time(new Date())
.amount(new BigDecimal(3000)).build(),
BankTransaction.builder()
.id(3L)
.balanceId(1L)
.time(new Date())
.amount(new BigDecimal(500)).build()
)
.forEach(bankTransaction -> bankTransactionTopic.pipeInput(bankTransaction.getBalanceId(), bankTransaction));
var firstBalance = bankBalanceTopic.readValue();
assertEquals(1L, firstBalance.getId());
assertEquals(new BigDecimal(0), firstBalance.getAmount());
var secondBalance = bankBalanceTopic.readValue();
assertEquals(2L, secondBalance.getId());
assertEquals(new BigDecimal(3000), secondBalance.getAmount());
var thirdBalance = bankBalanceTopic.readValue();
assertEquals(1L, thirdBalance.getId());
assertEquals(new BigDecimal(500), thirdBalance.getAmount());
var bankTransaction = rejectedBankTransactionTopic.readValue();
assertEquals(1L, bankTransaction.getId());
assertEquals(BankTransaction.BankTransactionState.REJECTED, bankTransaction.getState());
}
}