Skip to content

Commit

Permalink
Add metrics to measure latency of k8s informer (#1200)
Browse files Browse the repository at this point in the history
  • Loading branch information
marctc authored Sep 26, 2024
1 parent f9f1c5e commit 6960d77
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 27 deletions.
2 changes: 1 addition & 1 deletion pkg/internal/appolly/appolly.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func setupKubernetes(ctx context.Context, ctxInfo *global.ContextInfo) {
return
}

if ctxInfo.AppO11y.K8sDatabase, err = kube.StartDatabase(informer); err != nil {
if ctxInfo.AppO11y.K8sDatabase, err = kube.StartDatabase(informer, ctxInfo.Metrics); err != nil {
slog.Error("can't setup Kubernetes database. Your traces won't be decorated with Kubernetes metadata",
"error", err)
ctxInfo.K8sInformer.ForceDisable()
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/discover/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (pf *ProcessFinder) Start() (<-chan *ebpf.Instrumentable, <-chan *ebpf.Inst
gb := pipe.NewBuilder(&nodesMap{}, pipe.ChannelBufferLen(pf.cfg.ChannelBufferLen))
pipe.AddStart(gb, processWatcher, ProcessWatcherFunc(pf.ctx, pf.cfg))
pipe.AddMiddleProvider(gb, ptrWatcherKubeEnricher,
WatcherKubeEnricherProvider(pf.ctx, pf.ctxInfo.K8sInformer))
WatcherKubeEnricherProvider(pf.ctx, pf.ctxInfo.K8sInformer, pf.ctxInfo.Metrics))
pipe.AddMiddleProvider(gb, criteriaMatcher, CriteriaMatcherProvider(pf.cfg))
pipe.AddMiddleProvider(gb, execTyper, ExecTyperProvider(pf.cfg, pf.ctxInfo.Metrics))
pipe.AddMiddleProvider(gb, containerDBUpdater,
Expand Down
19 changes: 17 additions & 2 deletions pkg/internal/discover/watcher_kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"fmt"
"log/slog"
"time"

"github.com/mariomac/pipes/pipe"
"k8s.io/client-go/tools/cache"

"github.com/grafana/beyla/pkg/internal/helpers/container"
"github.com/grafana/beyla/pkg/internal/helpers/maps"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/services"
)
Expand All @@ -33,7 +35,7 @@ type watcherKubeEnricher struct {
informer kubeMetadata

log *slog.Logger

m imetrics.Reporter
// cached system objects
containerByPID map[PID]container.Info
processByContainer map[string]processAttrs
Expand Down Expand Up @@ -62,6 +64,7 @@ type kubeMetadataProvider interface {
func WatcherKubeEnricherProvider(
ctx context.Context,
informerProvider kubeMetadataProvider,
m imetrics.Reporter,
) pipe.MiddleProvider[[]Event[processAttrs], []Event[processAttrs]] {
return func() (pipe.MiddleFunc[[]Event[processAttrs], []Event[processAttrs]], error) {
if !informerProvider.IsKubeEnabled() {
Expand All @@ -71,7 +74,7 @@ func WatcherKubeEnricherProvider(
if err != nil {
return nil, fmt.Errorf("instantiating WatcherKubeEnricher: %w", err)
}
wk := watcherKubeEnricher{informer: informer}
wk := watcherKubeEnricher{informer: informer, m: m}
if err := wk.init(); err != nil {
return nil, err
}
Expand All @@ -90,10 +93,16 @@ func (wk *watcherKubeEnricher) init() error {
wk.podsInfoCh = make(chan Event[*kube.PodInfo], 10)
if err := wk.informer.AddPodEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*kube.PodInfo)
d := time.Since(pod.CreationTimestamp.Time)
wk.podsInfoCh <- Event[*kube.PodInfo]{Type: EventCreated, Obj: obj.(*kube.PodInfo)}
wk.m.InformerAddDuration("pod", d)
},
UpdateFunc: func(_, newObj interface{}) {
pod := newObj.(*kube.PodInfo)
d := time.Since(pod.CreationTimestamp.Time)
wk.podsInfoCh <- Event[*kube.PodInfo]{Type: EventCreated, Obj: newObj.(*kube.PodInfo)}
wk.m.InformerUpdateDuration("pod", d)
},
DeleteFunc: func(obj interface{}) {
wk.podsInfoCh <- Event[*kube.PodInfo]{Type: EventDeleted, Obj: obj.(*kube.PodInfo)}
Expand All @@ -106,10 +115,16 @@ func (wk *watcherKubeEnricher) init() error {
wk.rsInfoCh = make(chan Event[*kube.ReplicaSetInfo], 10)
if err := wk.informer.AddReplicaSetEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
rs := obj.(*kube.ReplicaSetInfo)
d := time.Since(rs.CreationTimestamp.Time)
wk.rsInfoCh <- Event[*kube.ReplicaSetInfo]{Type: EventCreated, Obj: obj.(*kube.ReplicaSetInfo)}
wk.m.InformerAddDuration("replicaset", d)
},
UpdateFunc: func(_, newObj interface{}) {
rs := newObj.(*kube.ReplicaSetInfo)
d := time.Since(rs.CreationTimestamp.Time)
wk.rsInfoCh <- Event[*kube.ReplicaSetInfo]{Type: EventCreated, Obj: newObj.(*kube.ReplicaSetInfo)}
wk.m.InformerUpdateDuration("replicaset", d)
},
DeleteFunc: func(obj interface{}) {
wk.rsInfoCh <- Event[*kube.ReplicaSetInfo]{Type: EventDeleted, Obj: obj.(*kube.ReplicaSetInfo)}
Expand Down
12 changes: 10 additions & 2 deletions pkg/internal/discover/watcher_kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/helpers/container"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/internal/testutil"
"github.com/grafana/beyla/pkg/services"
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestWatcherKubeEnricher(t *testing.T) {
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{}
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, 30*time.Minute))
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer})()
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer}, fakeInternalMetrics{})()
require.NoError(t, err)
inputCh, outputCh := make(chan []Event[processAttrs], 10), make(chan []Event[processAttrs], 10)
defer close(inputCh)
Expand Down Expand Up @@ -119,7 +120,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) {
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{}
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, 30*time.Minute))
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer})()
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer}, fakeInternalMetrics{})()
require.NoError(t, err)
pipeConfig := beyla.Config{}
require.NoError(t, yaml.Unmarshal([]byte(`discovery:
Expand Down Expand Up @@ -309,6 +310,13 @@ func fakeProcessInfo(pp processAttrs) (*services.ProcessInfo, error) {
}, nil
}

type fakeInternalMetrics struct {
imetrics.NoopReporter
}

func (fakeInternalMetrics) InformerAddDuration(_ string, _ time.Duration) {}
func (fakeInternalMetrics) InformerUpdateDuration(_ string, _ time.Duration) {}

type informerProvider struct {
informer *kube.Metadata
}
Expand Down
25 changes: 16 additions & 9 deletions pkg/internal/imetrics/imetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package imetrics

import (
"context"
"time"
)

// Config options for the different metrics exporters
Expand Down Expand Up @@ -32,17 +33,23 @@ type Reporter interface {
InstrumentProcess(processName string)
// UninstrumentProcess is invoked every time a process is removed from the instrumented processed
UninstrumentProcess(processName string)
// InformerAddDuration is invoked every time a kubernetes object is added to the informer
InformerAddDuration(kind string, d time.Duration)
// InformerUpdateDuration is invoked every time a kubernetes object is updated in the informer
InformerUpdateDuration(kind string, d time.Duration)
}

// NoopReporter is a metrics Reporter that just does nothing
type NoopReporter struct{}

func (n NoopReporter) Start(_ context.Context) {}
func (n NoopReporter) TracerFlush(_ int) {}
func (n NoopReporter) OTELMetricExport(_ int) {}
func (n NoopReporter) OTELMetricExportError(_ error) {}
func (n NoopReporter) OTELTraceExport(_ int) {}
func (n NoopReporter) OTELTraceExportError(_ error) {}
func (n NoopReporter) PrometheusRequest(_, _ string) {}
func (n NoopReporter) InstrumentProcess(_ string) {}
func (n NoopReporter) UninstrumentProcess(_ string) {}
func (n NoopReporter) Start(_ context.Context) {}
func (n NoopReporter) TracerFlush(_ int) {}
func (n NoopReporter) OTELMetricExport(_ int) {}
func (n NoopReporter) OTELMetricExportError(_ error) {}
func (n NoopReporter) OTELTraceExport(_ int) {}
func (n NoopReporter) OTELTraceExportError(_ error) {}
func (n NoopReporter) PrometheusRequest(_, _ string) {}
func (n NoopReporter) InstrumentProcess(_ string) {}
func (n NoopReporter) UninstrumentProcess(_ string) {}
func (n NoopReporter) InformerAddDuration(_ string, _ time.Duration) {}
func (n NoopReporter) InformerUpdateDuration(_ string, _ time.Duration) {}
52 changes: 41 additions & 11 deletions pkg/internal/imetrics/iprom.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ type PrometheusConfig struct {

// PrometheusReporter is an internal metrics Reporter that exports to Prometheus
type PrometheusReporter struct {
connector *connector.PrometheusManager
tracerFlushes prometheus.Histogram
otelMetricExports prometheus.Counter
otelMetricExportErrs *prometheus.CounterVec
otelTraceExports prometheus.Counter
otelTraceExportErrs *prometheus.CounterVec
prometheusRequests *prometheus.CounterVec
instrumentedProcesses *prometheus.GaugeVec
beylaInfo prometheus.Gauge
connector *connector.PrometheusManager
tracerFlushes prometheus.Histogram
otelMetricExports prometheus.Counter
otelMetricExportErrs *prometheus.CounterVec
otelTraceExports prometheus.Counter
otelTraceExportErrs *prometheus.CounterVec
prometheusRequests *prometheus.CounterVec
instrumentedProcesses *prometheus.GaugeVec
beylaInfo prometheus.Gauge
informerAddDuration *prometheus.HistogramVec
informerUpdateDuration *prometheus.HistogramVec
}

func NewPrometheusReporter(cfg *PrometheusConfig, manager *connector.PrometheusManager, registry *prometheus.Registry) *PrometheusReporter {
Expand Down Expand Up @@ -81,6 +83,22 @@ func NewPrometheusReporter(cfg *PrometheusConfig, manager *connector.PrometheusM
"revision": buildinfo.Revision,
},
}),
informerAddDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "beyla_k8s_informer_add_duration_seconds",
Help: "Duration of the object add event in the Kubernetes informer",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{"kind"}),
informerUpdateDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "beyla_k8s_informer_update_duration_seconds",
Help: "Duration of the object update event in the Kubernetes informer",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{"kind"}),
}
if registry != nil {
registry.MustRegister(pr.tracerFlushes,
Expand All @@ -90,7 +108,9 @@ func NewPrometheusReporter(cfg *PrometheusConfig, manager *connector.PrometheusM
pr.otelTraceExportErrs,
pr.prometheusRequests,
pr.instrumentedProcesses,
pr.beylaInfo)
pr.beylaInfo,
pr.informerAddDuration,
pr.informerUpdateDuration)
} else {
manager.Register(cfg.Port, cfg.Path,
pr.tracerFlushes,
Expand All @@ -100,7 +120,9 @@ func NewPrometheusReporter(cfg *PrometheusConfig, manager *connector.PrometheusM
pr.otelTraceExportErrs,
pr.prometheusRequests,
pr.instrumentedProcesses,
pr.beylaInfo)
pr.beylaInfo,
pr.informerAddDuration,
pr.informerUpdateDuration)
}

return pr
Expand Down Expand Up @@ -144,3 +166,11 @@ func (p *PrometheusReporter) InstrumentProcess(processName string) {
func (p *PrometheusReporter) UninstrumentProcess(processName string) {
p.instrumentedProcesses.WithLabelValues(processName).Dec()
}

func (p *PrometheusReporter) InformerAddDuration(kind string, d time.Duration) {
p.informerAddDuration.WithLabelValues(kind).Observe(d.Seconds())
}

func (p *PrometheusReporter) InformerUpdateDuration(kind string, d time.Duration) {
p.informerUpdateDuration.WithLabelValues(kind).Observe(d.Seconds())
}
16 changes: 15 additions & 1 deletion pkg/internal/transform/kube/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"fmt"
"log/slog"
"sync"
"time"

"k8s.io/client-go/tools/cache"

"github.com/grafana/beyla/pkg/internal/helpers/container"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/kube"
)

Expand Down Expand Up @@ -54,7 +56,7 @@ func CreateDatabase(kubeMetadata *kube.Metadata) Database {
}
}

func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) {
func StartDatabase(kubeMetadata *kube.Metadata, m imetrics.Reporter) (*Database, error) {
db := CreateDatabase(kubeMetadata)
db.informer.AddContainerEventHandler(&db)

Expand All @@ -73,11 +75,17 @@ func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) {
}
if err := db.informer.AddServiceIPEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
svc := obj.(*kube.ServiceInfo)
d := time.Since(svc.CreationTimestamp.Time)
db.UpdateNewServicesByIPIndex(obj.(*kube.ServiceInfo))
m.InformerAddDuration("service", d)
},
UpdateFunc: func(oldObj, newObj interface{}) {
svc := newObj.(*kube.ServiceInfo)
d := time.Since(svc.CreationTimestamp.Time)
db.UpdateDeletedServicesByIPIndex(oldObj.(*kube.ServiceInfo))
db.UpdateNewServicesByIPIndex(newObj.(*kube.ServiceInfo))
m.InformerUpdateDuration("service", d)
},
DeleteFunc: func(obj interface{}) {
db.UpdateDeletedServicesByIPIndex(obj.(*kube.ServiceInfo))
Expand All @@ -87,11 +95,17 @@ func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) {
}
if err := db.informer.AddNodeEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
n := obj.(*kube.NodeInfo)
d := time.Since(n.CreationTimestamp.Time)
db.UpdateNewNodesByIPIndex(obj.(*kube.NodeInfo))
m.InformerAddDuration("node", d)
},
UpdateFunc: func(oldObj, newObj interface{}) {
n := newObj.(*kube.NodeInfo)
d := time.Since(n.CreationTimestamp.Time)
db.UpdateDeletedNodesByIPIndex(oldObj.(*kube.NodeInfo))
db.UpdateNewNodesByIPIndex(newObj.(*kube.NodeInfo))
m.InformerUpdateDuration("node", d)
},
DeleteFunc: func(obj interface{}) {
db.UpdateDeletedNodesByIPIndex(obj.(*kube.NodeInfo))
Expand Down

0 comments on commit 6960d77

Please sign in to comment.