Skip to content

Commit

Permalink
fix: add missing cluster metadata to exported metrics (#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
harelmo-lumigo authored Dec 12, 2024
1 parent 7bf554c commit 54263a7
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 24 deletions.
15 changes: 13 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Install [minikube](https://minikube.sigs.k8s.io/docs/start/), [Helm](https://hel

Set up your Docker engine to use insecure registries (on Mac OS with Docker Desktop for Mac, the file to edit is `~/.docker/daemon.json`):

```json
```js
{
...
"insecure-registries" : [
Expand Down Expand Up @@ -75,6 +75,7 @@ To avoid strange issues with Docker caching the wrong images in your test enviro
```sh
export IMG_VERSION=1 # Incremend this every time to try a deploy
make docker-build docker-push
helm dependency build
helm upgrade --install lumigo charts/lumigo-operator --namespace lumigo-system --create-namespace --set "controllerManager.manager.image.tag=${IMG_VERSION}" --set "controllerManager.telemetryProxy.image.tag=${IMG_VERSION}" --set "debug.enabled=true"
```
Expand Down Expand Up @@ -203,4 +204,14 @@ make docker-build docker-push
(cd tests/kubernetes-distros/kind && go test)
```
**Note:** The build of the `controller` and `telemetry-proxy` images assume the local repository setup documented in the [Local testing with Minikube](#local-testing-with-minikube) section.
#### Running specific tests
If you're focusing on a specific set of tests in a file, you can run those only using the follwing syntax:
```sh
export IMG_VERSION=<incremental_number> # Avoid image cache issues
make docker-build docker-push
(cd tests/kubernetes-distros/kind && go test -run TestLumigoOperatorInfraMetrics -assess "some text")
```
That will run only the tests under the test functions named `TestLumigoOperatorInfraMetrics`, and only the ones having `some text` in their title.
**Note:** The build of the `controller` and `telemetry-proxy` images assume the local repository setup documented in the [Local testing with Minikube](#local-testing-with-minikube) section.
23 changes: 14 additions & 9 deletions telemetryproxy/docker/etc/config.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ exporters:
otlphttp/lumigo_metrics:
endpoint: {{ env.Getenv "LUMIGO_METRICS_ENDPOINT" "https://ga-otlp.lumigo-tracer-edge.golumigo.com" }}
headers:
# We cannot use headers_setter/lumigo since it assumes the headers are already set by the sender, and in this case -
# since we're scraping Prometheus metrics and not receiving any metrics from customer code - we don't have any incoming headers.
Authorization: "LumigoToken {{ $infraMetricsToken }}"
{{- end }}
{{- if $debug }}
Expand Down Expand Up @@ -217,10 +219,10 @@ processors:
- context: resource
statements:
- set(attributes["k8s.cluster.name"], "{{ $clusterName }}")
# metric_statements:
# - context: resource
# statements:
# - set(attributes["k8s.cluster.name"], "{{ $clusterName }}")
metric_statements:
- context: resource
statements:
- set(attributes["k8s.cluster.name"], "{{ $clusterName }}")
log_statements:
- context: resource
statements:
Expand Down Expand Up @@ -257,11 +259,11 @@ processors:
statements:
- set(attributes["lumigo.k8s_operator.version"], "{{ $config.operator.version }}")
- set(attributes["lumigo.k8s_operator.deployment_method"], "{{ $config.operator.deployment_method }}")
# metric_statements:
# - context: resource
# statements:
# - set(attributes["lumigo.k8s_operator.version"], "{{ $config.operator.version }}")
# - set(attributes["lumigo.k8s_operator.deployment_method"], "{{ $config.operator.deployment_method }}")
metric_statements:
- context: resource
statements:
- set(attributes["lumigo.k8s_operator.version"], "{{ $config.operator.version }}")
- set(attributes["lumigo.k8s_operator.deployment_method"], "{{ $config.operator.deployment_method }}")
log_statements:
- context: resource
statements:
Expand All @@ -286,6 +288,9 @@ service:
- prometheus
processors:
- filter/filter-prom-metrics
- k8sdataenricherprocessor
- transform/inject_operator_details_into_resource
- transform/add_cluster_name
exporters:
- otlphttp/lumigo_metrics
{{- if $debug }}
Expand Down
41 changes: 36 additions & 5 deletions telemetryproxy/src/processor/k8sdataenricherprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func NewFactory() processor.Factory {
createDefaultConfig,
processor.WithTraces(createTracesProcessor, stability),
processor.WithLogs(createLogsProcessor, stability),
processor.WithMetrics(createMetricsProcessor, stability),
)
}

Expand All @@ -59,7 +60,7 @@ func createDefaultConfig() component.Config {

func createTracesProcessor(
ctx context.Context,
params processor.CreateSettings,
params processor.Settings,
cfg component.Config,
next consumer.Traces,
) (processor.Traces, error) {
Expand All @@ -68,7 +69,7 @@ func createTracesProcessor(

func createTracesProcessorWithOptions(
ctx context.Context,
set processor.CreateSettings,
set processor.Settings,
cfg component.Config,
next consumer.Traces,
) (processor.Traces, error) {
Expand All @@ -89,7 +90,7 @@ func createTracesProcessorWithOptions(

func createLogsProcessor(
ctx context.Context,
params processor.CreateSettings,
params processor.Settings,
cfg component.Config,
nextLogsConsumer consumer.Logs,
) (processor.Logs, error) {
Expand All @@ -98,7 +99,7 @@ func createLogsProcessor(

func createLogsProcessorWithOptions(
ctx context.Context,
set processor.CreateSettings,
set processor.Settings,
cfg component.Config,
nextLogsConsumer consumer.Logs,
) (processor.Logs, error) {
Expand All @@ -117,8 +118,38 @@ func createLogsProcessorWithOptions(
}
}

func createMetricsProcessor(
ctx context.Context,
params processor.Settings,
cfg component.Config,
nextMetricsConsumer consumer.Metrics,
) (processor.Metrics, error) {
return createMetricsProcessorWithOptions(ctx, params, cfg, nextMetricsConsumer)
}

func createMetricsProcessorWithOptions(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextMetricsConsumer consumer.Metrics,
) (processor.Metrics, error) {
if kp, err := createKubernetesProcessor(set, cfg); err != nil {
return nil, err
} else {
return processorhelper.NewMetricsProcessor(
ctx,
set,
cfg,
nextMetricsConsumer,
kp.processMetrics,
processorhelper.WithCapabilities(consumerCapabilities),
processorhelper.WithStart(kp.Start),
processorhelper.WithShutdown(kp.Shutdown))
}
}

func createKubernetesProcessor(
params processor.CreateSettings,
params processor.Settings,
cfg component.Config,
) (*kubernetesprocessor, error) {
apiConfig := cfg.(*Config).APIConfig
Expand Down
21 changes: 19 additions & 2 deletions telemetryproxy/src/processor/k8sdataenricherprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"go.uber.org/zap"
Expand Down Expand Up @@ -79,11 +80,15 @@ func (kp *kubernetesprocessor) processTraces(ctx context.Context, tr ptrace.Trac
return tr, nil
}

func (kp *kubernetesprocessor) addResourceClusterAttributes(resourceAttributes pcommon.Map) {
resourceAttributes.PutStr(K8SProviderIdKey, kp.kube.GetProviderId())
resourceAttributes.PutStr(K8SClusterUIDKey, string(kp.clusterUid))
}

func (kp *kubernetesprocessor) addResourceAttributes(ctx context.Context, resource pcommon.Resource) {
resourceAttributes := resource.Attributes()

resourceAttributes.PutStr(K8SProviderIdKey, kp.kube.GetProviderId())
resourceAttributes.PutStr(K8SClusterUIDKey, string(kp.clusterUid))
kp.addResourceClusterAttributes(resourceAttributes)

pod, found := kp.getPod(ctx, &resource)
if !found {
Expand Down Expand Up @@ -226,6 +231,18 @@ func (kp *kubernetesprocessor) processLogs(ctx context.Context, ld plog.Logs) (p
return ld, nil
}

func (kp *kubernetesprocessor) processMetrics(ctx context.Context, metrics pmetric.Metrics) (pmetric.Metrics, error) {
resourceMetrics := metrics.ResourceMetrics()

for i := 0; i < resourceMetrics.Len(); i++ {
rm := resourceMetrics.At(i)

kp.addResourceClusterAttributes(rm.Resource().Attributes());
}

return metrics, nil
}

func (kp *kubernetesprocessor) processResourceLogs(ctx context.Context, resourceLogs *plog.ResourceLogs) {
scopeLogs := resourceLogs.ScopeLogs()

Expand Down
50 changes: 44 additions & 6 deletions tests/kubernetes-distros/kind/lumigooperator_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -51,7 +52,29 @@ func TestLumigoOperatorInfraMetrics(t *testing.T) {
exportRequest := pmetricotlp.NewExportRequest()
exportRequest.UnmarshalJSON([]byte(exportRequestJson))

if m, err := exportRequestToMetricRecords(exportRequest); err != nil {
allResourceMetrics, err := exportRequestToResourceMetrics(exportRequest);
if err != nil {
return false, fmt.Errorf("Cannot extract resource metrics from export request: %v", err)
}

for _, resourceMetrics := range allResourceMetrics {
clusterName, exists := resourceMetrics.Resource().Attributes().Get("k8s.cluster.name")
if !exists {
return false, fmt.Errorf("Cannot find cluster name in resource metrics")
}
if clusterName.AsString() != ctx.Value(internal.ContextKeyKubernetesClusterName) {
return false, fmt.Errorf("Cluster name mismatch: actual %v, expected: %v", clusterName, internal.ContextKeyKubernetesClusterName)
}
clusterUid, exists := resourceMetrics.Resource().Attributes().Get("k8s.cluster.uid")
if !exists {
return false, fmt.Errorf("Cannot find cluster UID in resource metrics")
}
if !isValidUUID(clusterUid.AsString()) {
return false, fmt.Errorf("Invalid cluster UID: %v", clusterUid)
}
}

if m, err := extractMetrics(allResourceMetrics); err != nil {
t.Fatalf("Cannot extract metrics from export request: %v", err)
} else {
metrics = append(metrics, m...)
Expand Down Expand Up @@ -124,13 +147,22 @@ func TestLumigoOperatorInfraMetrics(t *testing.T) {
testEnv.Test(t, testAppDeploymentFeature)
}

func exportRequestToMetricRecords(exportRequest pmetricotlp.ExportRequest) ([]pmetric.Metric, error) {
allMetrics := make([]pmetric.Metric, 0)
func exportRequestToResourceMetrics(exportRequest pmetricotlp.ExportRequest) ([]pmetric.ResourceMetrics, error) {
allResourceMetrics := make([]pmetric.ResourceMetrics, 0)

for i := 0; i < exportRequest.Metrics().ResourceMetrics().Len(); i++ {
resourceMetric := exportRequest.Metrics().ResourceMetrics().At(i)
for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
scopeMetric := resourceMetric.ScopeMetrics().At(j)
allResourceMetrics = append(allResourceMetrics, exportRequest.Metrics().ResourceMetrics().At(i))
}

return allResourceMetrics, nil
}

func extractMetrics(resourceMetricsList []pmetric.ResourceMetrics) ([]pmetric.Metric, error) {
allMetrics := make([]pmetric.Metric, 0)

for i := 0; i < len(resourceMetricsList); i++ {
for j := 0; j < resourceMetricsList[i].ScopeMetrics().Len(); j++ {
scopeMetric := resourceMetricsList[i].ScopeMetrics().At(j)
for k := 0; k < scopeMetric.Metrics().Len(); k++ {
metric := scopeMetric.Metrics().At(k)
allMetrics = append(allMetrics, metric)
Expand All @@ -140,3 +172,9 @@ func exportRequestToMetricRecords(exportRequest pmetricotlp.ExportRequest) ([]pm

return allMetrics, nil
}

func isValidUUID(uuid string) bool {
regex := `^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[1-5][a-fA-F0-9]{3}-[89abAB][a-fA-F0-9]{3}-[a-fA-F0-9]{12}$`
r := regexp.MustCompile(regex)
return r.MatchString(uuid)
}

0 comments on commit 54263a7

Please sign in to comment.