Skip to content

Commit

Permalink
Revert "Add metrics to measure latency of k8s informer (#1200)"
Browse files Browse the repository at this point in the history
This reverts commit 6960d77.
  • Loading branch information
marctc committed Oct 1, 2024
1 parent 91f8aeb commit ee08741
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 95 deletions.
2 changes: 1 addition & 1 deletion pkg/internal/appolly/appolly.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func setupKubernetes(ctx context.Context, ctxInfo *global.ContextInfo) {
return
}

if ctxInfo.AppO11y.K8sDatabase, err = kube.StartDatabase(informer, ctxInfo.Metrics); err != nil {
if ctxInfo.AppO11y.K8sDatabase, err = kube.StartDatabase(informer); err != nil {
slog.Error("can't setup Kubernetes database. Your traces won't be decorated with Kubernetes metadata",
"error", err)
ctxInfo.K8sInformer.ForceDisable()
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/discover/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (pf *ProcessFinder) Start() (<-chan *ebpf.Instrumentable, <-chan *ebpf.Inst
gb := pipe.NewBuilder(&nodesMap{}, pipe.ChannelBufferLen(pf.cfg.ChannelBufferLen))
pipe.AddStart(gb, processWatcher, ProcessWatcherFunc(pf.ctx, pf.cfg))
pipe.AddMiddleProvider(gb, ptrWatcherKubeEnricher,
WatcherKubeEnricherProvider(pf.ctx, pf.ctxInfo.K8sInformer, pf.ctxInfo.Metrics))
WatcherKubeEnricherProvider(pf.ctx, pf.ctxInfo.K8sInformer))
pipe.AddMiddleProvider(gb, criteriaMatcher, CriteriaMatcherProvider(pf.cfg))
pipe.AddMiddleProvider(gb, execTyper, ExecTyperProvider(pf.cfg, pf.ctxInfo.Metrics))
pipe.AddMiddleProvider(gb, containerDBUpdater,
Expand Down
13 changes: 2 additions & 11 deletions pkg/internal/discover/watcher_kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ import (
"context"
"fmt"
"log/slog"
"time"

"github.com/mariomac/pipes/pipe"
"k8s.io/client-go/tools/cache"

attr "github.com/grafana/beyla/pkg/export/attributes/names"
"github.com/grafana/beyla/pkg/internal/helpers/container"
"github.com/grafana/beyla/pkg/internal/helpers/maps"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/services"
)
Expand All @@ -34,7 +32,7 @@ type watcherKubeEnricher struct {
informer kubeMetadata

log *slog.Logger
m imetrics.Reporter

// cached system objects
containerByPID map[PID]container.Info
processByContainer map[string]processAttrs
Expand Down Expand Up @@ -62,7 +60,6 @@ type kubeMetadataProvider interface {
func WatcherKubeEnricherProvider(
ctx context.Context,
informerProvider kubeMetadataProvider,
m imetrics.Reporter,
) pipe.MiddleProvider[[]Event[processAttrs], []Event[processAttrs]] {
return func() (pipe.MiddleFunc[[]Event[processAttrs], []Event[processAttrs]], error) {
if !informerProvider.IsKubeEnabled() {
Expand All @@ -72,7 +69,7 @@ func WatcherKubeEnricherProvider(
if err != nil {
return nil, fmt.Errorf("instantiating WatcherKubeEnricher: %w", err)
}
wk := watcherKubeEnricher{informer: informer, m: m}
wk := watcherKubeEnricher{informer: informer}
if err := wk.init(); err != nil {
return nil, err
}
Expand All @@ -91,16 +88,10 @@ func (wk *watcherKubeEnricher) init() error {
wk.podsInfoCh = make(chan Event[*kube.PodInfo], 10)
if err := wk.informer.AddPodEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*kube.PodInfo)
d := time.Since(pod.CreationTimestamp.Time)
wk.podsInfoCh <- Event[*kube.PodInfo]{Type: EventCreated, Obj: obj.(*kube.PodInfo)}
wk.m.InformerAddDuration("pod", d)
},
UpdateFunc: func(_, newObj interface{}) {
pod := newObj.(*kube.PodInfo)
d := time.Since(pod.CreationTimestamp.Time)
wk.podsInfoCh <- Event[*kube.PodInfo]{Type: EventCreated, Obj: newObj.(*kube.PodInfo)}
wk.m.InformerUpdateDuration("pod", d)
},
DeleteFunc: func(obj interface{}) {
wk.podsInfoCh <- Event[*kube.PodInfo]{Type: EventDeleted, Obj: obj.(*kube.PodInfo)}
Expand Down
12 changes: 2 additions & 10 deletions pkg/internal/discover/watcher_kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/helpers/container"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/internal/testutil"
"github.com/grafana/beyla/pkg/services"
Expand Down Expand Up @@ -74,7 +73,7 @@ func TestWatcherKubeEnricher(t *testing.T) {
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{}
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, 30*time.Minute))
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer}, fakeInternalMetrics{})()
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer})()
require.NoError(t, err)
inputCh, outputCh := make(chan []Event[processAttrs], 10), make(chan []Event[processAttrs], 10)
defer close(inputCh)
Expand Down Expand Up @@ -120,7 +119,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) {
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{}
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, 30*time.Minute))
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer}, fakeInternalMetrics{})()
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer})()
require.NoError(t, err)
pipeConfig := beyla.Config{}
require.NoError(t, yaml.Unmarshal([]byte(`discovery:
Expand Down Expand Up @@ -310,13 +309,6 @@ func fakeProcessInfo(pp processAttrs) (*services.ProcessInfo, error) {
}, nil
}

type fakeInternalMetrics struct {
imetrics.NoopReporter
}

func (fakeInternalMetrics) InformerAddDuration(_ string, _ time.Duration) {}
func (fakeInternalMetrics) InformerUpdateDuration(_ string, _ time.Duration) {}

type informerProvider struct {
informer *kube.Metadata
}
Expand Down
25 changes: 9 additions & 16 deletions pkg/internal/imetrics/imetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package imetrics

import (
"context"
"time"
)

// Config options for the different metrics exporters
Expand Down Expand Up @@ -33,23 +32,17 @@ type Reporter interface {
InstrumentProcess(processName string)
// UninstrumentProcess is invoked every time a process is removed from the instrumented processed
UninstrumentProcess(processName string)
// InformerAddDuration is invoked every time a kubernetes object is added to the informer
InformerAddDuration(kind string, d time.Duration)
// InformerUpdateDuration is invoked every time a kubernetes object is updated in the informer
InformerUpdateDuration(kind string, d time.Duration)
}

// NoopReporter is a metrics Reporter that just does nothing
type NoopReporter struct{}

func (n NoopReporter) Start(_ context.Context) {}
func (n NoopReporter) TracerFlush(_ int) {}
func (n NoopReporter) OTELMetricExport(_ int) {}
func (n NoopReporter) OTELMetricExportError(_ error) {}
func (n NoopReporter) OTELTraceExport(_ int) {}
func (n NoopReporter) OTELTraceExportError(_ error) {}
func (n NoopReporter) PrometheusRequest(_, _ string) {}
func (n NoopReporter) InstrumentProcess(_ string) {}
func (n NoopReporter) UninstrumentProcess(_ string) {}
func (n NoopReporter) InformerAddDuration(_ string, _ time.Duration) {}
func (n NoopReporter) InformerUpdateDuration(_ string, _ time.Duration) {}
func (n NoopReporter) Start(_ context.Context) {}
func (n NoopReporter) TracerFlush(_ int) {}
func (n NoopReporter) OTELMetricExport(_ int) {}
func (n NoopReporter) OTELMetricExportError(_ error) {}
func (n NoopReporter) OTELTraceExport(_ int) {}
func (n NoopReporter) OTELTraceExportError(_ error) {}
func (n NoopReporter) PrometheusRequest(_, _ string) {}
func (n NoopReporter) InstrumentProcess(_ string) {}
func (n NoopReporter) UninstrumentProcess(_ string) {}
52 changes: 11 additions & 41 deletions pkg/internal/imetrics/iprom.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@ type PrometheusConfig struct {

// PrometheusReporter is an internal metrics Reporter that exports to Prometheus
type PrometheusReporter struct {
connector *connector.PrometheusManager
tracerFlushes prometheus.Histogram
otelMetricExports prometheus.Counter
otelMetricExportErrs *prometheus.CounterVec
otelTraceExports prometheus.Counter
otelTraceExportErrs *prometheus.CounterVec
prometheusRequests *prometheus.CounterVec
instrumentedProcesses *prometheus.GaugeVec
beylaInfo prometheus.Gauge
informerAddDuration *prometheus.HistogramVec
informerUpdateDuration *prometheus.HistogramVec
connector *connector.PrometheusManager
tracerFlushes prometheus.Histogram
otelMetricExports prometheus.Counter
otelMetricExportErrs *prometheus.CounterVec
otelTraceExports prometheus.Counter
otelTraceExportErrs *prometheus.CounterVec
prometheusRequests *prometheus.CounterVec
instrumentedProcesses *prometheus.GaugeVec
beylaInfo prometheus.Gauge
}

func NewPrometheusReporter(cfg *PrometheusConfig, manager *connector.PrometheusManager, registry *prometheus.Registry) *PrometheusReporter {
Expand Down Expand Up @@ -83,22 +81,6 @@ func NewPrometheusReporter(cfg *PrometheusConfig, manager *connector.PrometheusM
"revision": buildinfo.Revision,
},
}),
informerAddDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "beyla_k8s_informer_add_duration_seconds",
Help: "Duration of the object add event in the Kubernetes informer",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{"kind"}),
informerUpdateDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "beyla_k8s_informer_update_duration_seconds",
Help: "Duration of the object update event in the Kubernetes informer",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{"kind"}),
}
if registry != nil {
registry.MustRegister(pr.tracerFlushes,
Expand All @@ -108,9 +90,7 @@ func NewPrometheusReporter(cfg *PrometheusConfig, manager *connector.PrometheusM
pr.otelTraceExportErrs,
pr.prometheusRequests,
pr.instrumentedProcesses,
pr.beylaInfo,
pr.informerAddDuration,
pr.informerUpdateDuration)
pr.beylaInfo)
} else {
manager.Register(cfg.Port, cfg.Path,
pr.tracerFlushes,
Expand All @@ -120,9 +100,7 @@ func NewPrometheusReporter(cfg *PrometheusConfig, manager *connector.PrometheusM
pr.otelTraceExportErrs,
pr.prometheusRequests,
pr.instrumentedProcesses,
pr.beylaInfo,
pr.informerAddDuration,
pr.informerUpdateDuration)
pr.beylaInfo)
}

return pr
Expand Down Expand Up @@ -166,11 +144,3 @@ func (p *PrometheusReporter) InstrumentProcess(processName string) {
func (p *PrometheusReporter) UninstrumentProcess(processName string) {
p.instrumentedProcesses.WithLabelValues(processName).Dec()
}

func (p *PrometheusReporter) InformerAddDuration(kind string, d time.Duration) {
p.informerAddDuration.WithLabelValues(kind).Observe(d.Seconds())
}

func (p *PrometheusReporter) InformerUpdateDuration(kind string, d time.Duration) {
p.informerUpdateDuration.WithLabelValues(kind).Observe(d.Seconds())
}
16 changes: 1 addition & 15 deletions pkg/internal/transform/kube/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"fmt"
"log/slog"
"sync"
"time"

"k8s.io/client-go/tools/cache"

"github.com/grafana/beyla/pkg/internal/helpers/container"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/kube"
)

Expand Down Expand Up @@ -56,7 +54,7 @@ func CreateDatabase(kubeMetadata *kube.Metadata) Database {
}
}

func StartDatabase(kubeMetadata *kube.Metadata, m imetrics.Reporter) (*Database, error) {
func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) {
db := CreateDatabase(kubeMetadata)
db.informer.AddContainerEventHandler(&db)

Expand All @@ -75,17 +73,11 @@ func StartDatabase(kubeMetadata *kube.Metadata, m imetrics.Reporter) (*Database,
}
if err := db.informer.AddServiceIPEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
svc := obj.(*kube.ServiceInfo)
d := time.Since(svc.CreationTimestamp.Time)
db.UpdateNewServicesByIPIndex(obj.(*kube.ServiceInfo))
m.InformerAddDuration("service", d)
},
UpdateFunc: func(oldObj, newObj interface{}) {
svc := newObj.(*kube.ServiceInfo)
d := time.Since(svc.CreationTimestamp.Time)
db.UpdateDeletedServicesByIPIndex(oldObj.(*kube.ServiceInfo))
db.UpdateNewServicesByIPIndex(newObj.(*kube.ServiceInfo))
m.InformerUpdateDuration("service", d)
},
DeleteFunc: func(obj interface{}) {
db.UpdateDeletedServicesByIPIndex(obj.(*kube.ServiceInfo))
Expand All @@ -95,17 +87,11 @@ func StartDatabase(kubeMetadata *kube.Metadata, m imetrics.Reporter) (*Database,
}
if err := db.informer.AddNodeEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
n := obj.(*kube.NodeInfo)
d := time.Since(n.CreationTimestamp.Time)
db.UpdateNewNodesByIPIndex(obj.(*kube.NodeInfo))
m.InformerAddDuration("node", d)
},
UpdateFunc: func(oldObj, newObj interface{}) {
n := newObj.(*kube.NodeInfo)
d := time.Since(n.CreationTimestamp.Time)
db.UpdateDeletedNodesByIPIndex(oldObj.(*kube.NodeInfo))
db.UpdateNewNodesByIPIndex(newObj.(*kube.NodeInfo))
m.InformerUpdateDuration("node", d)
},
DeleteFunc: func(obj interface{}) {
db.UpdateDeletedNodesByIPIndex(obj.(*kube.NodeInfo))
Expand Down

0 comments on commit ee08741

Please sign in to comment.