This repository has been archived by the owner on Jun 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka.go
123 lines (105 loc) · 2.82 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package main
import (
"encoding/json"
"log"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
type PostHogEventWrapper struct {
Uuid string `json:"uuid"`
DistinctId string `json:"distinct_id"`
Ip string `json:"ip"`
Data string `json:"data"`
}
type PostHogEvent struct {
Token string `json:"api_key,omitempty"`
Event string `json:"event"`
Properties map[string]interface{} `json:"properties"`
Timestamp string `json:"timestamp,omitempty"`
Uuid string
DistinctId string
Lat float64
Lng float64
}
type KafkaConsumer struct {
consumer *kafka.Consumer
topic string
geolocator *GeoLocator
outgoingChan chan PostHogEvent
statsChan chan PostHogEvent
}
func NewKafkaConsumer(brokers string, securityProtocol string, groupID string, topic string, geolocator *GeoLocator, outgoingChan chan PostHogEvent, statsChan chan PostHogEvent) (*KafkaConsumer, error) {
config := &kafka.ConfigMap{
"bootstrap.servers": brokers,
"group.id": groupID,
"auto.offset.reset": "latest",
"enable.auto.commit": false,
"security.protocol": securityProtocol,
}
consumer, err := kafka.NewConsumer(config)
if err != nil {
return nil, err
}
return &KafkaConsumer{
consumer: consumer,
topic: topic,
geolocator: geolocator,
outgoingChan: outgoingChan,
statsChan: statsChan,
}, nil
}
func (c *KafkaConsumer) Consume() {
err := c.consumer.SubscribeTopics([]string{c.topic}, nil)
if err != nil {
log.Fatalf("Failed to subscribe to topic: %v", err)
}
for {
msg, err := c.consumer.ReadMessage(-1)
if err != nil {
log.Printf("Error consuming message: %v", err)
continue
}
var wrapperMessage PostHogEventWrapper
err = json.Unmarshal(msg.Value, &wrapperMessage)
if err != nil {
log.Printf("Error decoding JSON: %v", err)
continue
}
var phEvent PostHogEvent
err = json.Unmarshal([]byte(wrapperMessage.Data), &phEvent)
if err != nil {
log.Printf("Error decoding JSON: %v", err)
continue
}
phEvent.Uuid = wrapperMessage.Uuid
phEvent.DistinctId = wrapperMessage.DistinctId
if phEvent.Timestamp == "" {
phEvent.Timestamp = time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
}
if phEvent.Token == "" {
if tokenValue, ok := phEvent.Properties["token"].(string); ok {
phEvent.Token = tokenValue
}
}
var ipStr string = ""
if ipValue, ok := phEvent.Properties["$ip"]; ok {
if ipProp, ok := ipValue.(string); ok {
if ipProp != "" {
ipStr = ipProp
}
}
} else {
if wrapperMessage.Ip != "" {
ipStr = wrapperMessage.Ip
}
}
if ipStr != "" {
phEvent.Lat, phEvent.Lng = c.geolocator.Lookup(ipStr)
}
c.outgoingChan <- phEvent
c.statsChan <- phEvent
}
}
func (c *KafkaConsumer) Close() {
c.consumer.Close()
}