This repository has been archived by the owner on Oct 29, 2024. It is now read-only.
forked from redpanda-data/kminion
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
89 lines (77 loc) · 2.82 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package main
import (
"github.com/google-cloud-tools/kafka-minion/collector"
"github.com/google-cloud-tools/kafka-minion/kafka"
"github.com/google-cloud-tools/kafka-minion/options"
"github.com/google-cloud-tools/kafka-minion/storage"
"github.com/kelseyhightower/envconfig"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"net"
"net/http"
"os"
"strconv"
)
func main() {
// Initialize logger
log.SetOutput(os.Stdout)
log.SetFormatter(&log.JSONFormatter{})
// Parse and validate environment variables
opts := options.NewOptions()
var err error
err = envconfig.Process("", opts)
if err != nil {
log.Fatal("Error parsing env vars into opts. ", err)
}
// Set log level from environment variables
level, err := log.ParseLevel(opts.LogLevel)
if err != nil {
log.Panicf("Loglevel could not be parsed. See logrus documentation for valid log level inputs. Given input was '%v'", opts.LogLevel)
}
log.SetLevel(level)
log.Infof("Starting kafka minion version %v", opts.Version)
// Create cross package shared dependencies
consumerOffsetsCh := make(chan *kafka.StorageRequest, 1000)
clusterCh := make(chan *kafka.StorageRequest, 200)
// Create storage module
cache := storage.NewMemoryStorage(opts, consumerOffsetsCh, clusterCh)
cache.Start()
// Create cluster module
cluster := kafka.NewCluster(opts, clusterCh)
cluster.Start()
// Create kafka consumer
consumer := kafka.NewOffsetConsumer(opts, consumerOffsetsCh)
consumer.Start()
// Create prometheus collector
collector := collector.NewCollector(opts, cache)
prometheus.MustRegister(collector)
// Start listening on /metrics endpoint
http.Handle("/metrics", promhttp.Handler())
http.Handle("/healthcheck", healthCheck(cluster))
http.Handle("/readycheck", readyCheck(cache))
listenAddress := net.JoinHostPort(opts.TelemetryHost, strconv.Itoa(opts.TelemetryPort))
log.Infof("Listening on: '%s", listenAddress)
log.Fatal(http.ListenAndServe(listenAddress, nil))
}
func healthCheck(cluster *kafka.Cluster) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if cluster.IsHealthy() {
w.Write([]byte("Healthy"))
} else {
http.Error(w, "Healthcheck failed", http.StatusServiceUnavailable)
}
})
}
// readyCheck only returns 200 when it has initially consumed the __consumer_offsets topic
// Utilizing this ready check you can ensure to slow down rolling updates until a pod is ready
// to expose consumer group metrics which are up to date
func readyCheck(storage *storage.MemoryStorage) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if storage.IsConsumed() {
w.Write([]byte("Ready"))
} else {
http.Error(w, "Offsets topic has not been consumed yet", http.StatusServiceUnavailable)
}
})
}