-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathBankTransactionProducer.java
91 lines (79 loc) · 3.73 KB
/
BankTransactionProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package com.github.programmingwithmati;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.programmingwithmati.model.BankTransaction;
import lombok.SneakyThrows;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.Map;
public class BankTransactionProducer {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) {
KafkaProducer<Long, String> bankTransactionProducer =
new KafkaProducer<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
));
List<BankTransaction> data1 = List.of(
BankTransaction.builder()
.balanceId(1L)
.time(new Date())
.amount(new BigDecimal(500))
.build(),
BankTransaction.builder()
.balanceId(2L)
.time(new Date())
.amount(new BigDecimal(3000)).build(),
BankTransaction.builder()
.balanceId(1L)
.time(new Date())
.amount(new BigDecimal(500)).build(),
BankTransaction.builder()
.balanceId(4L)
.time(new Date())
.amount(new BigDecimal(2000)).build(),
BankTransaction.builder()
.balanceId(4L)
.time(new Date())
.amount(new BigDecimal(-2500)).build(),
BankTransaction.builder()
.balanceId(3L)
.time(new Date())
.amount(new BigDecimal(1000)).build(),
BankTransaction.builder()
.balanceId(1L)
.time(new Date())
.amount(new BigDecimal(-500)).build(),
BankTransaction.builder()
.balanceId(2L)
.time(new Date())
.amount(new BigDecimal(-4000)).build(),
BankTransaction.builder()
.balanceId(3L)
.time(new Date())
.amount(new BigDecimal(-500)).build()
);
data1.stream()
.map(bankTransaction -> new ProducerRecord<>("bank-transactions", bankTransaction.getBalanceId(), toJson(bankTransaction)))
.forEach(record -> send(bankTransactionProducer, record));
BankTransaction bankTransaction = BankTransaction.builder()
.balanceId(3L)
.time(new Date())
.amount(new BigDecimal(-10_000)).build();
send(bankTransactionProducer, new ProducerRecord<>("bank-transactions", bankTransaction.getBalanceId(), toJson(bankTransaction)));
}
@SneakyThrows
private static void send(KafkaProducer<Long, String> bankTransactionProducer, ProducerRecord<Long, String> record) {
bankTransactionProducer.send(record).get();
}
@SneakyThrows
private static String toJson(BankTransaction bankTransaction) {
return OBJECT_MAPPER.writeValueAsString(bankTransaction);
}
}