Skip to content

Commit

Permalink
K8s discovery (#511)
Browse files Browse the repository at this point in the history
* Basic K8s watcher in discovery pipeline

* WIP fake tests

* added tests with official k8s fake client

* creation tests

* custom YAML parsing for criteria

* starting watcher->matcher integrationtest

* watcher-matcher integration test

* Added some comments and cleanup

* Fixed race condition and improved readability

* integrated common k8s metadata provider

* updated vendor

* starting integration tests

* working version

* fix imports
  • Loading branch information
mariomac authored Dec 20, 2023
1 parent d9269ab commit 69bcc58
Show file tree
Hide file tree
Showing 281 changed files with 28,332 additions and 308 deletions.
29 changes: 12 additions & 17 deletions devdocs/pipeline-map.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,30 @@ Check the in-code documentation for more information about each symbol.

```mermaid
flowchart TD
classDef optional stroke-dasharray: 3 3;
subgraph discovery.Finder pipeline
W(Watcher) --> |new/removed processes| CM(CriteriaMatcher)
CM --> |processes matching the selection criteria| ET(ExecTyper)
PW(ProcessWatcher) --> |new/removed processes| KWE
KWE(WatcherKubeEnricher):::optional --> |process enriched with k8s metadata| CM
CM(CriteriaMatcher) --> |processes matching the selection criteria| ET(ExecTyper)
ET --> |ELFs and its metadata| CU
CU(ContainerDBUpdater) --> |ELFs and its metadata| TA
CU(ContainerDBUpdater):::optional --> |ELFs and its metadata| TA
TA(TraceAttacher) -.-> EBPF1(ebpf.Tracer)
TA -.-> |creates one per executable| EBPF2(ebpf.Tracer)
TA -.-> EBPF3(ebpf.Tracer)
style CU stroke-dasharray: 3 3;
end
subgraph Decoration and forwarding pipeline
EBPF1 -.-> TR
EBPF2 -.-> |"[]request.Span"| TR
EBPF3 -.-> TR
TR(traces.ReadDecorator) --> ROUT(Routes<br/>decorator)
ROUT --> KD(Kubernetes<br/>decorator)
ROUT:::optional --> KD(Kubernetes<br/>decorator)
KD --> OTELM(OTEL<br/> metrics<br/> exporter)
KD --> OTELT(OTEL<br/> traces<br/> exporter)
KD --> PROM(Prometheus<br/>HTTP<br/>endpoint)
style ROUT stroke-dasharray: 3 3;
style KD stroke-dasharray: 3 3;
style OTELM stroke-dasharray: 3 3;
style OTELT stroke-dasharray: 3 3;
style PROM stroke-dasharray: 3 3;
KD:::optional --> OTELM(OTEL<br/> metrics<br/> exporter):::optional
KD --> OTELT(OTEL<br/> traces<br/> exporter):::optional
KD --> PROM(Prometheus<br/>HTTP<br/>endpoint):::optional
end
CU -.-> |New PIDs| KDB
KDB(KubeDatabase) <-.- | Aggregated & indexed Pod info | KD
IF(Informer<br/>&lpar;Kube API&rpar;) -.-> |Pods & ReplicaSets status| KDB
style IF stroke-dasharray: 3 3;
style KDB stroke-dasharray: 3 3;
KDB(KubeDatabase):::optional <-.- | Aggregated & indexed Pod info | KD
IF("Informer<br/>(Kube API)"):::optional -.-> |Pods & ReplicaSets status| KDB
IF -.-> |new Kube objects| KWE
```
16 changes: 8 additions & 8 deletions docs/sources/configure/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ Example of YAML file allowing the selection of multiple groups of services:
```yaml
discovery:
services:
- exe_path_regexp: (worker)|(backend)|(frontend)
- exe_path: (worker)|(backend)|(frontend)
namespace: MyApplication
- exe_path_regexp: loadgen
- exe_path: loadgen
namespace: testing
name: "TestLoadGenerator"
```
Expand Down Expand Up @@ -234,16 +234,16 @@ open_port: 80,443,8000-8999
```
Would make Beyla to select any executable that opens port 80, 443, or any of the ports between 8000 and 8999 included.

If the `exe_path_regexp` property is set, the executables to be selected need to match both properties.
If the `exe_path` property is set, the executables to be selected need to match both properties.

If an executable opens multiple ports, only one of the ports needs to be specified
for Beyla **to instrument all the
HTTP/S and GRPC requests on all application ports**. At the moment, there is no way to
restrict the instrumentation only to the methods exposed through a specific port.

| YAML | Env var | Type | Default |
|-------------------|---------|--------|---------|
| `exe_path_regexp` | -- | string | (unset) |
| YAML | Env var | Type | Default |
|------------|---------|--------|---------|
| `exe_path` | -- | string | (unset) |

Selects the processes to instrument by their executable name path. This property accepts
a regular expression to be matched against the full executable command line, including the directory
Expand All @@ -252,15 +252,15 @@ where the executable resides on the file system.
If the `open_port` property is set, the executables to be selected need to match both properties.

Beyla will try to instrument all the processes with an executable path matching this property.
For example, setting `exe_path_regexp: .*` will make Beyla to try to instrument all the
For example, setting `exe_path: .*` will make Beyla to try to instrument all the
executables in the host.

| YAML | Env var | Type | Default |
|--------|---------|--------|------------------------------------------|
| `name` | -- | string | Name of the instrumented executable file |

Defines a name for the instrumented service. If unset, it will take the name of the executable process.
If set, and multiple processes match the above `open_ports` or `exe_path_regexp` selectors,
If set, and multiple processes match the above `open_ports` or `exe_path` selectors,
the metrics and traces for all the instances will share the same service name.

| YAML | Env var | Type | Default |
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/go-logr/logr v1.2.4
github.com/goccy/go-json v0.10.2
github.com/gorilla/mux v1.8.0
github.com/grafana/go-offsets-tracker v0.1.6
github.com/grafana/go-offsets-tracker v0.1.5
github.com/hashicorp/golang-lru/v2 v2.0.2
github.com/mariomac/guara v0.0.0-20230621100729-42bd7716e524
github.com/mariomac/pipes v0.9.0
Expand All @@ -35,7 +35,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.41.0
go.opentelemetry.io/otel/trace v1.19.0
golang.org/x/arch v0.3.0
golang.org/x/mod v0.14.0
golang.org/x/mod v0.8.0
golang.org/x/net v0.12.0
golang.org/x/sys v0.12.0
google.golang.org/grpc v1.58.3
Expand All @@ -55,6 +55,7 @@ require (
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
Expand Down
14 changes: 8 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhF
github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww=
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA=
Expand Down Expand Up @@ -112,8 +114,8 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/go-offsets-tracker v0.1.6 h1:zCFkH2CslFfxkmH3dBYSFDKQhKbgXI9/T6l1jpuKr3I=
github.com/grafana/go-offsets-tracker v0.1.6/go.mod h1:qcQdu7zlUKIFNUdBJlLyNHuJGW0SKWKjkrN6jtt+jds=
github.com/grafana/go-offsets-tracker v0.1.5 h1:abTdxqjjWYlAYxJSBDgoI4DvgmHJjERRD7eUbrDQ7e8=
github.com/grafana/go-offsets-tracker v0.1.5/go.mod h1:kMOoQRWLsRz5tecLQXB3skxgEqexj6bWwySL9nZSohw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
Expand Down Expand Up @@ -268,8 +270,8 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -317,8 +319,8 @@ golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBn
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4=
golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
58 changes: 45 additions & 13 deletions pkg/beyla/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ import (
"io"
"log/slog"

"k8s.io/client-go/kubernetes"

"github.com/grafana/beyla/pkg/internal/connector"
"github.com/grafana/beyla/pkg/internal/discover"
"github.com/grafana/beyla/pkg/internal/imetrics"
kube2 "github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/transform"
"github.com/grafana/beyla/pkg/internal/transform/kube"
)

Expand Down Expand Up @@ -133,20 +137,12 @@ func buildContextInfo(config *pipe.Config) *global.ContextInfo {
promMgr := &connector.PrometheusManager{}
k8sCfg := &config.Attributes.Kubernetes
ctxInfo := &global.ContextInfo{
ReportRoutes: config.Routes != nil,
Prometheus: promMgr,
K8sDecoration: k8sCfg.Enabled(),
ReportRoutes: config.Routes != nil,
Prometheus: promMgr,
K8sEnabled: k8sCfg.Enabled(),
}
if ctxInfo.K8sDecoration {
// Creating a common Kubernetes database that needs to be accessed from different points
// in the Beyla pipeline
var err error
if ctxInfo.K8sDatabase, err = kube.StartDatabase(k8sCfg.KubeconfigPath, k8sCfg.InformersSyncTimeout); err != nil {
slog.Error("can't setup Kubernetes database. Your traces won't be decorated with Kubernetes metadata",
"error", err)
ctxInfo.K8sDecoration = false
}

if ctxInfo.K8sEnabled {
setupKubernetes(k8sCfg, ctxInfo)
}
if config.InternalMetrics.Prometheus.Port != 0 {
slog.Debug("reporting internal metrics as Prometheus")
Expand All @@ -160,3 +156,39 @@ func buildContextInfo(config *pipe.Config) *global.ContextInfo {
}
return ctxInfo
}

// setupKubernetes sets up common Kubernetes database and API clients that need to be accessed
// from different stages in the Beyla pipeline
func setupKubernetes(k8sCfg *transform.KubernetesDecorator, ctxInfo *global.ContextInfo) {

config, err := kube2.LoadConfig(k8sCfg.KubeconfigPath)
if err != nil {
slog.Error("can't read kubernetes config. You can't setup Kubernetes discovery and your"+
" traces won't be decorated with Kubernetes metadata", "error", err)
ctxInfo.K8sEnabled = false
return
}

kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
slog.Error("can't init Kubernetes client. You can't setup Kubernetes discovery and your"+
" traces won't be decorated with Kubernetes metadata", "error", err)
ctxInfo.K8sEnabled = false
return
}

ctxInfo.K8sInformer = &kube2.Metadata{}
if err := ctxInfo.K8sInformer.InitFromClient(kubeClient, k8sCfg.InformersSyncTimeout); err != nil {
slog.Error("can't init Kubernetes informer. You can't setup Kubernetes discovery and your"+
" traces won't be decorated with Kubernetes metadata", "error", err)
ctxInfo.K8sInformer = nil
ctxInfo.K8sEnabled = false
return
}

if ctxInfo.K8sDatabase, err = kube.StartDatabase(ctxInfo.K8sInformer); err != nil {
slog.Error("can't setup Kubernetes database. Your traces won't be decorated with Kubernetes metadata",
"error", err)
ctxInfo.K8sEnabled = false
}
}
35 changes: 20 additions & 15 deletions pkg/internal/discover/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,24 @@ import (
)

// ProcessFinder pipeline architecture. It uses the Pipes library to instantiate and connect all the nodes.
// Nodes tagged as "forwardTo" are optional nodes that might not be instantiated. In that case, any
// information directed to them will be automatically forwarded to the next pipeline stage.
// For example WatcherKubeEnricher and ContainerDBUpdater will be only enabled
// (non-nil values) if Kubernetes decoration is enabled
type ProcessFinder struct {
Watcher `sendTo:"CriteriaMatcher"`
CriteriaMatcher `sendTo:"ExecTyper"`
ExecTyper `sendTo:"ContainerDBUpdater"`
// ContainerDBUpdater will be only enabled (non-nil value) if Kubernetes decoration is enabled
*ContainerDBUpdater `forwardTo:"TraceAttacher"`
ProcessWatcher `sendTo:"WatcherKubeEnricher"`
*WatcherKubeEnricher `forwardTo:"CriteriaMatcher"`
CriteriaMatcher `sendTo:"ExecTyper"`
ExecTyper `sendTo:"ContainerDBUpdater"`
*ContainerDBUpdater `forwardTo:"TraceAttacher"`
TraceAttacher
}

func NewProcessFinder(ctx context.Context, cfg *pipe.Config, ctxInfo *global.ContextInfo) *ProcessFinder {
var cntDB *ContainerDBUpdater
if ctxInfo.K8sDecoration {
cntDB = &ContainerDBUpdater{DB: ctxInfo.K8sDatabase}
}
return &ProcessFinder{
Watcher: Watcher{Ctx: ctx, Cfg: cfg},
CriteriaMatcher: CriteriaMatcher{Cfg: cfg},
ExecTyper: ExecTyper{Cfg: cfg, Metrics: ctxInfo.Metrics},
ContainerDBUpdater: cntDB,
processFinder := ProcessFinder{
ProcessWatcher: ProcessWatcher{Ctx: ctx, Cfg: cfg},
CriteriaMatcher: CriteriaMatcher{Cfg: cfg},
ExecTyper: ExecTyper{Cfg: cfg, Metrics: ctxInfo.Metrics},
TraceAttacher: TraceAttacher{
Cfg: cfg,
Ctx: ctx,
Expand All @@ -47,13 +46,19 @@ func NewProcessFinder(ctx context.Context, cfg *pipe.Config, ctxInfo *global.Con
Metrics: ctxInfo.Metrics,
},
}
if ctxInfo.K8sEnabled {
processFinder.ContainerDBUpdater = &ContainerDBUpdater{DB: ctxInfo.K8sDatabase}
processFinder.WatcherKubeEnricher = &WatcherKubeEnricher{Informer: ctxInfo.K8sInformer}
}
return &processFinder
}

// Start the ProcessFinder pipeline in background. It returns a channel where each new discovered
// ebpf.ProcessTracer will be notified.
func (pf *ProcessFinder) Start(cfg *pipe.Config) (<-chan *ebpf.ProcessTracer, <-chan *Instrumentable, error) {
gb := graph.NewBuilder(node.ChannelBufferLen(cfg.ChannelBufferLen))
graph.RegisterStart(gb, WatcherProvider)
graph.RegisterStart(gb, ProcessWatcherProvider)
graph.RegisterMiddle(gb, WatcherKubeEnricherProvider)
graph.RegisterMiddle(gb, CriteriaMatcherProvider)
graph.RegisterMiddle(gb, ExecTyperProvider)
graph.RegisterMiddle(gb, ContainerDBUpdaterProvider)
Expand Down
Loading

0 comments on commit 69bcc58

Please sign in to comment.