Skip to content

Commit

Permalink
Support DaemonSet and StatefulSet in discovery and decoration (1.2 ba…
Browse files Browse the repository at this point in the history
…ckport) (#572)

* Support DaemonSet and StatefulSet in discovery and decoration (#571)

* Integration tests for daemonset and statefulset metadata decoration

* Support replicaset and daemonset decoration and discovery

* Prometheus exporter: add k8s_replicaset_name, k8s_daemonset_name and k8s_statefulset_name attributes
  • Loading branch information
mariomac authored Jan 25, 2024
1 parent 129a6d8 commit 24c2493
Show file tree
Hide file tree
Showing 15 changed files with 524 additions and 88 deletions.
24 changes: 16 additions & 8 deletions pkg/internal/discover/services/criteria.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,26 @@ import (
)

const (
AttrNamespace = "k8s_namespace"
AttrPodName = "k8s_pod_name"
AttrDeploymentName = "k8s_deployment_name"
AttrReplicaSetName = "k8s_replicaset_name"
AttrNamespace = "k8s_namespace"
AttrPodName = "k8s_pod_name"
AttrDeploymentName = "k8s_deployment_name"
AttrReplicaSetName = "k8s_replicaset_name"
AttrDaemonSetName = "k8s_daemonset_name"
AttrStatefulSetName = "k8s_statefulset_name"
// AttrOwnerName would be a generic search criteria that would
// match against deployment, replicaset, daemonset and statefulset names
AttrOwnerName = "k8s_owner_name"
)

// any attribute name not in this set will cause an error during the YAML unmarshalling
var allowedAttributeNames = map[string]struct{}{
AttrNamespace: {},
AttrPodName: {},
AttrDeploymentName: {},
AttrReplicaSetName: {},
AttrNamespace: {},
AttrPodName: {},
AttrDeploymentName: {},
AttrReplicaSetName: {},
AttrDaemonSetName: {},
AttrStatefulSetName: {},
AttrOwnerName: {},
}

// ProcessInfo stores some relevant information about a running process
Expand Down
33 changes: 22 additions & 11 deletions pkg/internal/discover/watcher_kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,10 @@ func (wk *WatcherKubeEnricher) onNewReplicaSet(rsInfo *kube.ReplicaSetInfo) []Ev
for _, pod := range podInfos {
for _, containerID := range pod.ContainerIDs {
if procInfo, ok := wk.processByContainer[containerID]; ok {
pod.ReplicaSetName = rsInfo.Name
pod.DeploymentName = rsInfo.DeploymentName
pod.Owner = &kube.Owner{Type: kube.OwnerReplicaSet, Name: rsInfo.Name}
if rsInfo.DeploymentName != "" {
pod.Owner.Owner = &kube.Owner{Type: kube.OwnerDeployment, Name: rsInfo.DeploymentName}
}
allProcesses = append(allProcesses, Event[processAttrs]{
Type: EventCreated,
Obj: withMetadata(procInfo, pod),
Expand Down Expand Up @@ -280,14 +282,14 @@ func (wk *WatcherKubeEnricher) getReplicaSetPods(namespace, name string) []*kube
}

func (wk *WatcherKubeEnricher) updateNewPodsByOwnerIndex(pod *kube.PodInfo) {
if pod.ReplicaSetName != "" {
wk.podsByOwner.Put(nsName{namespace: pod.Namespace, name: pod.ReplicaSetName}, pod.Name, pod)
if pod.Owner != nil {
wk.podsByOwner.Put(nsName{namespace: pod.Namespace, name: pod.Owner.Name}, pod.Name, pod)
}
}

func (wk *WatcherKubeEnricher) updateDeletedPodsByOwnerIndex(pod *kube.PodInfo) {
if pod.ReplicaSetName != "" {
wk.podsByOwner.Delete(nsName{namespace: pod.Namespace, name: pod.ReplicaSetName}, pod.Name)
if pod.Owner != nil {
wk.podsByOwner.Delete(nsName{namespace: pod.Namespace, name: pod.Owner.Name}, pod.Name)
}
}

Expand All @@ -298,11 +300,20 @@ func withMetadata(pp processAttrs, info *kube.PodInfo) processAttrs {
services.AttrNamespace: info.Namespace,
services.AttrPodName: info.Name,
}
if info.DeploymentName != "" {
ret.metadata[services.AttrDeploymentName] = info.DeploymentName
}
if info.ReplicaSetName != "" {
ret.metadata[services.AttrReplicaSetName] = info.ReplicaSetName
owner := info.Owner
for owner != nil {
ret.metadata[services.AttrOwnerName] = owner.Name
switch owner.Type {
case kube.OwnerDaemonSet:
ret.metadata[services.AttrDaemonSetName] = owner.Name
case kube.OwnerReplicaSet:
ret.metadata[services.AttrReplicaSetName] = owner.Name
case kube.OwnerDeployment:
ret.metadata[services.AttrDeploymentName] = owner.Name
case kube.OwnerStatefulSet:
ret.metadata[services.AttrStatefulSetName] = owner.Name
}
owner = owner.Owner
}
return ret
}
35 changes: 21 additions & 14 deletions pkg/internal/export/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/grafana/beyla/pkg/internal/connector"
"github.com/grafana/beyla/pkg/internal/export/otel"
"github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/transform"
)

// using labels and names that are equivalent names to the OTEL attributes
Expand Down Expand Up @@ -45,12 +45,15 @@ const (
rpcSystemGRPC = "rpc_system"
DBOperationKey = "db_operation"

k8sNamespaceName = "k8s_namespace_name"
k8sPodName = "k8s_pod_name"
k8sDeploymentName = "k8s_deployment_name"
k8sNodeName = "k8s_node_name"
k8sPodUID = "k8s_pod_uid"
k8sPodStartTime = "k8s_pod_start_time"
k8sNamespaceName = "k8s_namespace_name"
k8sPodName = "k8s_pod_name"
k8sDeploymentName = "k8s_deployment_name"
k8sStatefulSetName = "k8s_statefulset_name"
k8sReplicaSetName = "k8s_replicaset_name"
k8sDaemonSetName = "k8s_daemonset_name"
k8sNodeName = "k8s_node_name"
k8sPodUID = "k8s_pod_uid"
k8sPodStartTime = "k8s_pod_start_time"
)

// TODO: TLS
Expand Down Expand Up @@ -305,19 +308,23 @@ func (r *metricsReporter) labelValuesHTTP(span *request.Span) []string {
}

func appendK8sLabelNames(names []string) []string {
names = append(names, k8sNamespaceName, k8sDeploymentName, k8sPodName, k8sNodeName, k8sPodUID, k8sPodStartTime)
names = append(names, k8sNamespaceName, k8sPodName, k8sNodeName, k8sPodUID, k8sPodStartTime,
k8sDeploymentName, k8sReplicaSetName, k8sStatefulSetName, k8sDaemonSetName)
return names
}

func appendK8sLabelValues(values []string, span *request.Span) []string {
// must follow the order in appendK8sLabelNames
values = append(values,
span.ServiceID.Metadata[transform.NamespaceName],
span.ServiceID.Metadata[transform.DeploymentName],
span.ServiceID.Metadata[transform.PodName],
span.ServiceID.Metadata[transform.NodeName],
span.ServiceID.Metadata[transform.PodUID],
span.ServiceID.Metadata[transform.PodStartTime],
span.ServiceID.Metadata[kube.NamespaceName],
span.ServiceID.Metadata[kube.PodName],
span.ServiceID.Metadata[kube.NodeName],
span.ServiceID.Metadata[kube.PodUID],
span.ServiceID.Metadata[kube.PodStartTime],
span.ServiceID.Metadata[kube.DeploymentName],
span.ServiceID.Metadata[kube.ReplicaSetName],
span.ServiceID.Metadata[kube.StatefulSetName],
span.ServiceID.Metadata[kube.DaemonSetName],
)
return values
}
46 changes: 22 additions & 24 deletions pkg/internal/kube/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,10 @@ type Metadata struct {
type PodInfo struct {
// Informers need that internal object is an ObjectMeta instance
metav1.ObjectMeta
NodeName string
ReplicaSetName string
// Pod Info includes the ReplicaSet as owner reference, and ReplicaSet info
// has Deployment as owner reference. We initially do a two-steps lookup to
// get the Pod's Deployment, but then cache the Deployment value here
DeploymentName string
NodeName string

Owner *Owner

// StartTimeStr caches value of ObjectMeta.StartTimestamp.String()
StartTimeStr string
ContainerIDs []string
Expand All @@ -81,6 +79,11 @@ var podIndexer = cache.Indexers{
},
}

// usually all the data required by the discovery and enrichement is inside
// te v1.Pod object. However, when the Pod object has a ReplicaSet as owner,
// if the ReplicaSet is owned by a Deployment, the reported Pod Owner should
// be the Deployment, as the Replicaset is just an intermediate entity
// used by the Deployment that it's actually defined by the user
var replicaSetIndexer = cache.Indexers{
IndexReplicaSetNames: func(obj interface{}) ([]string, error) {
rs := obj.(*ReplicaSetInfo)
Expand Down Expand Up @@ -131,18 +134,11 @@ func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFacto
rmContainerIDSchema(pod.Status.EphemeralContainerStatuses[i].ContainerID))
}

var replicaSet string
for i := range pod.OwnerReferences {
or := &pod.OwnerReferences[i]
if or.APIVersion == "apps/v1" && or.Kind == "ReplicaSet" {
replicaSet = or.Name
break
}
}
owner := OwnerFromPodInfo(pod)
startTime := pod.GetCreationTimestamp().String()
if log.Enabled(context.TODO(), slog.LevelDebug) {
log.Debug("inserting pod", "name", pod.Name, "namespace", pod.Namespace,
"uid", pod.UID, "replicaSet", replicaSet,
"uid", pod.UID, "owner", owner,
"node", pod.Spec.NodeName, "startTime", startTime,
"containerIDs", containerIDs)
}
Expand All @@ -152,10 +148,10 @@ func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFacto
Namespace: pod.Namespace,
UID: pod.UID,
},
ReplicaSetName: replicaSet,
NodeName: pod.Spec.NodeName,
StartTimeStr: startTime,
ContainerIDs: containerIDs,
Owner: owner,
NodeName: pod.Spec.NodeName,
StartTimeStr: startTime,
ContainerIDs: containerIDs,
}, nil
}); err != nil {
return fmt.Errorf("can't set pods transform: %w", err)
Expand Down Expand Up @@ -309,12 +305,14 @@ func (k *Metadata) initInformers(client kubernetes.Interface, timeout time.Durat
}
}

// FetchPodOwnerInfo updates the passed pod with the owner Desployment info, if required and
// if it exists.
// FetchPodOwnerInfo updates the pod owner with the Deployment information, if it exists.
// Pod Info might include a ReplicaSet as owner, and ReplicaSet info
// usually has a Deployment as owner reference, which is the one that we'd really like
// to report as owner.
func (k *Metadata) FetchPodOwnerInfo(pod *PodInfo) {
if pod.DeploymentName == "" && pod.ReplicaSetName != "" {
if rsi, ok := k.GetReplicaSetInfo(pod.Namespace, pod.ReplicaSetName); ok {
pod.DeploymentName = rsi.DeploymentName
if pod.Owner != nil && pod.Owner.Type == OwnerReplicaSet {
if rsi, ok := k.GetReplicaSetInfo(pod.Namespace, pod.Owner.Name); ok {
pod.Owner.Owner = &Owner{Type: OwnerDeployment, Name: rsi.DeploymentName}
}
}
}
Expand Down
89 changes: 89 additions & 0 deletions pkg/internal/kube/owner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package kube

import (
"strings"

v1 "k8s.io/api/core/v1"
)

const (
NamespaceName = "k8s.namespace.name"
PodName = "k8s.pod.name"
DeploymentName = "k8s.deployment.name"
ReplicaSetName = "k8s.replicaset.name"
DaemonSetName = "k8s.daemonset.name"
StatefulSetName = "k8s.statefulset.name"
NodeName = "k8s.node.name"
PodUID = "k8s.pod.uid"
PodStartTime = "k8s.pod.start_time"
)

type OwnerType int

const (
OwnerUnknown = OwnerType(iota)
OwnerReplicaSet
OwnerDeployment
OwnerStatefulSet
OwnerDaemonSet
)

func (o OwnerType) LabelName() string {
switch o {
case OwnerReplicaSet:
return ReplicaSetName
case OwnerDeployment:
return DeploymentName
case OwnerStatefulSet:
return StatefulSetName
case OwnerDaemonSet:
return DaemonSetName
default:
return "k8s.unknown.owner"
}
}

type Owner struct {
Type OwnerType
Name string
// Owner of the owner. For example, a ReplicaSet might be owned by a Deployment
Owner *Owner
}

// OwnerFromPodInfo returns the pod Owner reference. It might be
// null if the Pod does not have any owner
func OwnerFromPodInfo(pod *v1.Pod) *Owner {
for i := range pod.OwnerReferences {
or := &pod.OwnerReferences[i]
if or.APIVersion != "apps/v1" {
continue
}
switch or.Kind {
case "ReplicaSet":
return &Owner{Type: OwnerReplicaSet, Name: or.Name}
case "Deployment":
return &Owner{Type: OwnerDeployment, Name: or.Name}
case "StatefulSet":
return &Owner{Type: OwnerStatefulSet, Name: or.Name}
case "DaemonSet":
return &Owner{Type: OwnerDaemonSet, Name: or.Name}
}
}
return nil
}

func (o *Owner) String() string {
sb := strings.Builder{}
o.string(&sb)
return sb.String()
}

func (o *Owner) string(sb *strings.Builder) {
if o.Owner != nil {
o.Owner.string(sb)
sb.WriteString("->")
}
sb.WriteString(o.Type.LabelName())
sb.WriteByte(':')
sb.WriteString(o.Name)
}
14 changes: 14 additions & 0 deletions pkg/internal/kube/owner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kube

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestOwnerString(t *testing.T) {
owner := Owner{Type: OwnerReplicaSet, Name: "rs"}
assert.Equal(t, "k8s.replicaset.name:rs", owner.String())
owner.Owner = &Owner{Type: OwnerDeployment, Name: "dep"}
assert.Equal(t, "k8s.deployment.name:dep->k8s.replicaset.name:rs", owner.String())
}
Loading

0 comments on commit 24c2493

Please sign in to comment.