From 7889e942dcfab994ff1da655ec0400f747a28235 Mon Sep 17 00:00:00 2001 From: Harel Moshe Date: Mon, 10 Jun 2024 09:57:31 +0300 Subject: [PATCH] feat: support K8S application logs (#97) --- CONTRIBUTING.md | 14 +- Makefile | 2 +- README.md | 24 ++- .../controller-deployment-and-webhooks.yaml | 2 + .../lumigo-operator/templates/lumigo-crd.yaml | 11 + charts/lumigo-operator/values.yaml | 3 +- .../bases/operator.lumigo.io_lumigoes.yaml | 11 + config/manager/manager.yaml | 2 + controller/src/api/v1alpha1/lumigo_types.go | 9 + .../api/v1alpha1/lumigo_webhook_suite_test.go | 4 +- .../src/controllers/lumigo_controller.go | 6 +- .../lumigo_controller_suite_test.go | 34 +-- controller/src/main.go | 29 +-- controller/src/mutation/matchers.go | 31 ++- controller/src/mutation/mutate.go | 44 +++- .../webhooks/defaulter/defaulter_webhook.go | 5 + .../defaulter/defaulter_webhook_suite_test.go | 9 +- .../src/webhooks/injector/injector_webhook.go | 11 +- .../injector/injector_webhook_suite_test.go | 39 ++-- helm.yaml | 13 ++ kustomize.yaml | 11 + telemetryproxy/docker/etc/config.yaml.tpl | 19 ++ .../k8sdataenricherprocessor/processor.go | 201 +++++++++--------- .../kind/apps/client/app.js | 11 +- .../kind/apps/client/package.json | 6 +- .../kind/apps/python/Dockerfile | 9 + .../kind/apps/python/app.py | 17 ++ .../kind/internal/build_docker_image.go | 1 + .../kind/internal/context.go | 1 + .../kind/internal/install_operator.go | 25 +-- .../kind/internal/lumigo.go | 5 +- .../kind/lumigooperator_logs_test.go | 100 +++++++-- .../kind/lumigooperator_traces_test.go | 2 +- tests/kubernetes-distros/kind/main_test.go | 9 +- .../kind/resources/kind-config.yaml.tpl | 8 +- 35 files changed, 513 insertions(+), 215 deletions(-) create mode 100644 tests/kubernetes-distros/kind/apps/python/Dockerfile create mode 100644 tests/kubernetes-distros/kind/apps/python/app.py diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a54337b..3462d04 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -30,6 +30,8 @@ Start `minikube`: minikube start --insecure-registry "host.docker.internal:5000" ``` +* Note that if the minikube machine already exists, you will need to delete it first with `minikube delete` - otherwise the `--insecure-registry` parameter will be ignored (more details [here](https://stackoverflow.com/a/53937716)) + Start a local Docker registry: ```sh @@ -46,7 +48,7 @@ $ curl localhost:5000/v2/_catalog -v > Host: localhost:5000 > User-Agent: curl/7.77.0 > Accept: */* -> +> * Mark bundle as not supporting multiuse < HTTP/1.1 200 OK < Content-Type: application/json; charset=utf-8 @@ -54,7 +56,7 @@ $ curl localhost:5000/v2/_catalog -v < X-Content-Type-Options: nosniff < Date: Fri, 20 Jan 2023 08:10:32 GMT < Content-Length: 20 -< +< {"repositories":[]} * Connection #0 to host localhost left intact ``` @@ -100,7 +102,7 @@ Changing the target Lumigo backend can be done with a [`patchStrategicMerge`](ht echo -n "apiVersion: apps/v1 kind: Deployment metadata: - name: lumigo-controller-manager + name: lumigo-lumigo-operator-controller-manager spec: template: spec: @@ -109,6 +111,8 @@ spec: env: - name: LUMIGO_ENDPOINT value: \"https://my.lumigo.endpoint\" # Replace this! + - name: LUMIGO_LOGS_ENDPOINT + value: \"https://my.lumigo.endpoint\" # Replace this! " > lumigo-endpoint.patch.yaml kubectl patch --patch-file lumigo-endpoint.patch.yaml --type strategic -n lumigo-system --filename=lumigo-endpoint.patch.yaml ``` @@ -123,8 +127,8 @@ If you see the following, it's likely because and Mac OS has [squatted over port docker push host.docker.internal:5000/controller Using default tag: latest The push refers to repository [host.docker.internal:5000/controller] -377b701db379: Preparing -fba4381f2bb7: Preparing +377b701db379: Preparing +fba4381f2bb7: Preparing error parsing HTTP 403 response body: unexpected end of JSON input: "" ``` diff --git a/Makefile b/Makefile index 1d77c76..e5a027e 100644 --- a/Makefile +++ b/Makefile @@ -73,7 +73,7 @@ vet: ## Run go vet against code. .PHONY: test test: manifests generate fmt vet envtest ## Run tests. - (cd ./controller/src && KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" $(GOCMD) test . -coverprofile cover.out ) + (cd ./controller/src && KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" $(GOCMD) test ./... -coverprofile cover.out ) ##@ Build diff --git a/README.md b/README.md index 49a430a..4fef142 100644 --- a/README.md +++ b/README.md @@ -72,9 +72,9 @@ The Lumigo Kubernetes operator allows you to set a human-readable name using the You can check which version of the Lumigo Kubernetes operator you have deployed in your cluster as follows: ```sh -$ helm ls -A +$ helm ls -A NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION -lumigo lumigo-system 2 2023-07-10 09:20:04.233825 +0200 CEST deployed lumigo-operator-13 13 +lumigo lumigo-system 2 2023-07-10 09:20:04.233825 +0200 CEST deployed lumigo-operator-13 13 ``` The Lumigo Kubernetes operator is reported as `APP VERSION`. @@ -160,6 +160,26 @@ Status: UID: 93d6d809-ac2a-43a9-bc07-f0d4e314efcc ``` +#### Logging support + +The Lumigo Kubernetes operator can automatically forward logs emitted by traced pods to [Lumigo's log-management solution](https://lumigo.io/lp/log-management/), supporting several logging providers (currently `logging` for Python apps, `Winston` and `Bunyan` for Node.js apps). +Enabling log forwarding is done by adding the `spec.logging.enabled` field to the `Lumigo` resource: + +```yaml +apiVersion: operator.lumigo.io/v1alpha1 +kind: Lumigo +metadata: + labels: + app.kubernetes.io/name: lumigo + app.kubernetes.io/instance: lumigo + app.kubernetes.io/part-of: lumigo-operator + name: lumigo +spec: + lumigoToken: ... # same token used for tracing + logging: + enabled: true # enables log forwarding for pods with tracing injected +``` + #### Opting out for specific resources To prevent the Lumigo Kubernetes operator from injecting tracing to pods managed by some resource in a namespace that contains a `Lumigo` resource, add the `lumigo.auto-trace` label set to `false`: diff --git a/charts/lumigo-operator/templates/controller-deployment-and-webhooks.yaml b/charts/lumigo-operator/templates/controller-deployment-and-webhooks.yaml index c43230e..31544eb 100644 --- a/charts/lumigo-operator/templates/controller-deployment-and-webhooks.yaml +++ b/charts/lumigo-operator/templates/controller-deployment-and-webhooks.yaml @@ -231,6 +231,8 @@ spec: value: "{{ .Values.debug.enabled | default false }}" - name: LUMIGO_ENDPOINT value: "{{ .Values.endpoint.otlp.url }}" + - name: LUMIGO_LOGS_ENDPOINT + value: "{{ .Values.endpoint.otlp.logs_url }}" - name: LUMIGO_OPERATOR_VERSION value: "{{ $lumigoOperatorVersion }}" - name: LUMIGO_OPERATOR_DEPLOYMENT_METHOD diff --git a/charts/lumigo-operator/templates/lumigo-crd.yaml b/charts/lumigo-operator/templates/lumigo-crd.yaml index 7657eef..4b1cbf1 100644 --- a/charts/lumigo-operator/templates/lumigo-crd.yaml +++ b/charts/lumigo-operator/templates/lumigo-crd.yaml @@ -102,6 +102,17 @@ spec: required: - injection type: object + logging: + description: 'LoggingSpec specifies if logging should be set up by the operator' + properties: + enabled: + description: Whether Daemonsets, Deployments, ReplicaSets, + StatefulSets, CronJobs and Jobs that are created or updated + after the creation of the Lumigo resource and are injected will + have their logs sent to Lumigo. + If unspecified, defaults to `false` + type: boolean + type: object type: object status: description: LumigoStatus defines the observed state of Lumigo diff --git a/charts/lumigo-operator/values.yaml b/charts/lumigo-operator/values.yaml index f1652c7..e7d8bd3 100644 --- a/charts/lumigo-operator/values.yaml +++ b/charts/lumigo-operator/values.yaml @@ -61,4 +61,5 @@ metricsService: type: ClusterIP endpoint: otlp: - url: https://ga-otlp.lumigo-tracer-edge.golumigo.com \ No newline at end of file + url: https://ga-otlp.lumigo-tracer-edge.golumigo.com + logs_url: https://ga-otlp.lumigo-tracer-edge.golumigo.com \ No newline at end of file diff --git a/config/crd/bases/operator.lumigo.io_lumigoes.yaml b/config/crd/bases/operator.lumigo.io_lumigoes.yaml index a76d1c7..88ca1dc 100644 --- a/config/crd/bases/operator.lumigo.io_lumigoes.yaml +++ b/config/crd/bases/operator.lumigo.io_lumigoes.yaml @@ -102,6 +102,17 @@ spec: required: - injection type: object + logging: + description: 'LoggingSpec specifies if logging should be set up by the operator' + properties: + enabled: + description: Whether Daemonsets, Deployments, ReplicaSets, + StatefulSets, CronJobs and Jobs that are created or updated + after the creation of the Lumigo resource and are injected will + have their logs sent to Lumigo. + If unspecified, defaults to `false` + type: boolean + type: object type: object status: description: LumigoStatus defines the observed state of Lumigo diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index f7a6451..a578699 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -131,6 +131,8 @@ spec: value: Kustomize - name: LUMIGO_ENDPOINT value: https://ga-otlp.lumigo-tracer-edge.golumigo.com + - name: LUMIGO_LOGS_ENDPOINT + value: https://ga-otlp.lumigo-tracer-edge.golumigo.com ports: - containerPort: 4318 name: otlphttp diff --git a/controller/src/api/v1alpha1/lumigo_types.go b/controller/src/api/v1alpha1/lumigo_types.go index 7d9b6a9..61780d1 100644 --- a/controller/src/api/v1alpha1/lumigo_types.go +++ b/controller/src/api/v1alpha1/lumigo_types.go @@ -54,6 +54,7 @@ type LumigoSpec struct { // https://docs.lumigo.io/docs/lumigo-tokens LumigoToken Credentials `json:"lumigoToken,omitempty"` Tracing TracingSpec `json:"tracing,omitempty"` + Logging LoggingSpec `json:"logging,omitempty"` Infrastructure InfrastructureSpec `json:"infrastructure,omitempty"` } @@ -77,6 +78,14 @@ type TracingSpec struct { Injection InjectionSpec `json:"injection"` } +type LoggingSpec struct { + // Whether Daemonsets, Deployments, ReplicaSets, StatefulSets, CronJobs and Jobs + // that are created or updated after the creation of the Lumigo resource have their logs sent to Lumigo. + // If unspecified, defaults to `false`. + // +kubebuilder:validation:Optional + Enabled *bool `json:"enabled"` // Using a pointer to support cases where the value is not set (and it counts as disabled) +} + type InjectionSpec struct { // Whether Daemonsets, Deployments, ReplicaSets, StatefulSets, CronJobs and Jobs // that are created or updated after the creation of the Lumigo resource be injected. diff --git a/controller/src/api/v1alpha1/lumigo_webhook_suite_test.go b/controller/src/api/v1alpha1/lumigo_webhook_suite_test.go index 20a9749..0d07bd8 100644 --- a/controller/src/api/v1alpha1/lumigo_webhook_suite_test.go +++ b/controller/src/api/v1alpha1/lumigo_webhook_suite_test.go @@ -61,8 +61,8 @@ var _ = BeforeSuite(func() { By("bootstrapping test environment") testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, - ErrorIfCRDPathMissing: false, + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: true, WebhookInstallOptions: envtest.WebhookInstallOptions{ Paths: []string{filepath.Join("..", "..", "config", "webhook")}, }, diff --git a/controller/src/controllers/lumigo_controller.go b/controller/src/controllers/lumigo_controller.go index 25f165f..387cda3 100644 --- a/controller/src/controllers/lumigo_controller.go +++ b/controller/src/controllers/lumigo_controller.go @@ -77,6 +77,7 @@ type LumigoReconciler struct { LumigoOperatorVersion string LumigoInjectorImage string TelemetryProxyOtlpServiceUrl string + TelemetryProxyOtlpLogsServiceUrl string TelemetryProxyNamespaceConfigurationsPath string } @@ -489,7 +490,7 @@ func (r *LumigoReconciler) updateStatusIfNeeded(ctx context.Context, logger logr } func (r *LumigoReconciler) injectLumigoIntoResources(ctx context.Context, lumigo *operatorv1alpha1.Lumigo, log *logr.Logger) error { - mutator, err := mutation.NewMutator(log, &lumigo.Spec.LumigoToken, r.LumigoOperatorVersion, r.LumigoInjectorImage, r.TelemetryProxyOtlpServiceUrl) + mutator, err := mutation.NewMutator(log, &lumigo.Spec, r.LumigoOperatorVersion, r.LumigoInjectorImage, r.TelemetryProxyOtlpServiceUrl, r.TelemetryProxyOtlpLogsServiceUrl) if err != nil { return fmt.Errorf("cannot instantiate mutator: %w", err) } @@ -688,7 +689,8 @@ func (r *LumigoReconciler) injectLumigoIntoResources(ctx context.Context, lumigo func (r *LumigoReconciler) removeLumigoFromResources(ctx context.Context, lumigo *operatorv1alpha1.Lumigo, log *logr.Logger) error { namespace := lumigo.Namespace - mutator, err := mutation.NewMutator(log, nil, r.LumigoOperatorVersion, r.LumigoInjectorImage, r.TelemetryProxyOtlpServiceUrl) + + mutator, err := mutation.NewMutator(log, nil, r.LumigoOperatorVersion, r.LumigoInjectorImage, r.TelemetryProxyOtlpServiceUrl, r.TelemetryProxyOtlpLogsServiceUrl) if err != nil { return fmt.Errorf("cannot instantiate mutator: %w", err) } diff --git a/controller/src/controllers/lumigo_controller_suite_test.go b/controller/src/controllers/lumigo_controller_suite_test.go index 32d7b2b..c913e12 100644 --- a/controller/src/controllers/lumigo_controller_suite_test.go +++ b/controller/src/controllers/lumigo_controller_suite_test.go @@ -81,7 +81,7 @@ var _ = BeforeSuite(func() { By("bootstrapping test environment") testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")}, + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, ErrorIfCRDPathMissing: true, } @@ -197,7 +197,6 @@ var _ = Context("Lumigo controller", func() { }) Context("with one Lumigo instance", func() { - It("has an error if the referenced secret does not exist", func() { lumigoName := "lumigo" lumigo := newLumigo(namespaceName, lumigoName, operatorv1alpha1.Credentials{ @@ -205,7 +204,7 @@ var _ = Context("Lumigo controller", func() { Name: "lumigo-credentials", Key: "token", }, - }, true, true, true) + }, true, true, true, true) Expect(k8sClient.Create(ctx, lumigo)).Should(Succeed()) By("the Lumigo instance goes in an erroneous state", func() { @@ -253,7 +252,7 @@ var _ = Context("Lumigo controller", func() { Name: "lumigo-credentials", Key: expectedTokenKey, }, - }, true, true, true) + }, true, true, true, true) Expect(k8sClient.Create(ctx, lumigo)).Should(Succeed()) By("the Lumigo instance goes in an erroneous state", func() { @@ -333,7 +332,7 @@ var _ = Context("Lumigo controller", func() { Name: "lumigo-credentials", Key: expectedTokenKey, }, - }, true, true, true) + }, true, true, true, true) Expect(k8sClient.Create(ctx, lumigo)).Should(Succeed()) By("the Lumigo instance goes in an erroneous state", func() { @@ -434,7 +433,7 @@ var _ = Context("Lumigo controller", func() { Name: lumigoSecretName, Key: expectedTokenKey, }, - }, true, false, false) + }, true, false, false, true) g.Expect(k8sClient.Create(ctx, lumigo)).Should(Succeed()) }, defaultTimeout, defaultInterval).Should(Succeed()) }) @@ -453,7 +452,7 @@ var _ = Context("Lumigo controller", func() { Name: deploymentName, }, deployment)).To(Succeed()) - Expect(deployment).NotTo(mutation.BeInstrumentedWithLumigo(lumigoOperatorVersion, lumigoInjectorImage, telemetryProxyOtlpServiceUrl)) + Expect(deployment).NotTo(mutation.BeInstrumentedWithLumigo(lumigoOperatorVersion, lumigoInjectorImage, telemetryProxyOtlpServiceUrl, false)) }) }) @@ -517,7 +516,7 @@ var _ = Context("Lumigo controller", func() { Name: lumigoSecretName, Key: expectedTokenKey, }, - }, true, true, false) + }, true, true, false, false) Expect(k8sClient.Create(ctx, lumigo)).Should(Succeed()) Eventually(func(g Gomega) { @@ -537,7 +536,7 @@ var _ = Context("Lumigo controller", func() { Name: deploymentName, }, deployment)).To(Succeed()) - g.Expect(deployment).To(mutation.BeInstrumentedWithLumigo(lumigoOperatorVersion, lumigoInjectorImage, telemetryProxyOtlpServiceUrl)) + g.Expect(deployment).To(mutation.BeInstrumentedWithLumigo(lumigoOperatorVersion, lumigoInjectorImage, telemetryProxyOtlpServiceUrl, false)) g.Expect(currentVersionOf(lumigo, g)).To(BeActive()) g.Expect(currentVersionOf(lumigo, g)).To(HaveInstrumentedObjectReferenceFor(deployment)) }, defaultTimeout, defaultInterval).Should(Succeed()) @@ -565,7 +564,7 @@ var _ = Context("Lumigo controller", func() { Name: deploymentName, }, deployment)).To(Succeed()) - Expect(deployment).To(mutation.BeInstrumentedWithLumigo(lumigoOperatorVersion, lumigoInjectorImage, telemetryProxyOtlpServiceUrl)) + Expect(deployment).To(mutation.BeInstrumentedWithLumigo(lumigoOperatorVersion, lumigoInjectorImage, telemetryProxyOtlpServiceUrl, false)) }) }) @@ -628,7 +627,7 @@ var _ = Context("Lumigo controller", func() { Name: lumigoSecretName, Key: expectedTokenKey, }, - }, true, true, true) + }, true, true, true, false) Expect(k8sClient.Create(ctx, lumigo)).Should(Succeed()) Eventually(func(g Gomega) { @@ -644,7 +643,7 @@ var _ = Context("Lumigo controller", func() { Name: deploymentName, }, deploymentAfter)).To(Succeed()) - Expect(deploymentAfter).To(mutation.BeInstrumentedWithLumigo(lumigoOperatorVersion, lumigoInjectorImage, telemetryProxyOtlpServiceUrl)) + Expect(deploymentAfter).To(mutation.BeInstrumentedWithLumigo(lumigoOperatorVersion, lumigoInjectorImage, telemetryProxyOtlpServiceUrl, false)) }) By("Deleting the Lumigo resource", func() { @@ -710,7 +709,7 @@ var _ = Context("Lumigo controller", func() { Name: lumigoSecretName, Key: expectedTokenKey, }, - }, true, true, false) + }, true, true, false, true) Expect(k8sClient.Create(ctx, lumigo)).Should(Succeed()) Eventually(func(g Gomega) { @@ -763,7 +762,7 @@ var _ = Context("Lumigo controller", func() { }, } - lumigo1 := newLumigo(namespaceName, "lumigo1", lumigoToken, true, true, true) + lumigo1 := newLumigo(namespaceName, "lumigo1", lumigoToken, true, true, true, true) Expect(k8sClient.Create(ctx, lumigo1)).Should(Succeed()) Eventually(func(g Gomega) { g.Expect(currentVersionOf(lumigo1, g)).To(BeActive()) @@ -771,7 +770,7 @@ var _ = Context("Lumigo controller", func() { Expect(telemetryProxyNamespacesFile).To(BeMonitoringNamespace(namespaceName)) - lumigo2 := newLumigo(namespaceName, "lumigo2", lumigoToken, true, true, true) + lumigo2 := newLumigo(namespaceName, "lumigo2", lumigoToken, true, true, true, true) By("adding a second Lumigo in the namespace", func() { Expect(k8sClient.Create(ctx, lumigo2)).Should(Succeed()) @@ -803,7 +802,7 @@ var _ = Context("Lumigo controller", func() { }) -func newLumigo(namespace string, name string, lumigoToken operatorv1alpha1.Credentials, injectionEnabled bool, injectLumigoIntoExistingResourcesOnCreation bool, removeLumigoFromResourcesOnDeletion bool) *operatorv1alpha1.Lumigo { +func newLumigo(namespace string, name string, lumigoToken operatorv1alpha1.Credentials, injectionEnabled bool, injectLumigoIntoExistingResourcesOnCreation bool, removeLumigoFromResourcesOnDeletion bool, loggingEnabled bool) *operatorv1alpha1.Lumigo { return &operatorv1alpha1.Lumigo{ ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, @@ -819,6 +818,9 @@ func newLumigo(namespace string, name string, lumigoToken operatorv1alpha1.Crede RemoveLumigoFromResourcesOnDeletion: &removeLumigoFromResourcesOnDeletion, }, }, + Logging: operatorv1alpha1.LoggingSpec{ + Enabled: &loggingEnabled, + }, }, } } diff --git a/controller/src/main.go b/controller/src/main.go index b1521db..5923b43 100644 --- a/controller/src/main.go +++ b/controller/src/main.go @@ -129,6 +129,7 @@ func startManager(metricsAddr string, probeAddr string, enableLeaderElection boo } telemetryProxyOtlpService := lumigoEndpoint + "/v1/traces" // TODO: Fix it when the distros use the Lumigo endpoint as root + telemetryProxyOtlpLogsService := lumigoEndpoint + "/v1/logs" namespaceConfigurationsPath, isSet := os.LookupEnv("LUMIGO_NAMESPACE_CONFIGURATIONS") if !isSet { @@ -151,14 +152,15 @@ func startManager(metricsAddr string, probeAddr string, enableLeaderElection boo } if err = (&controllers.LumigoReconciler{ - Client: mgr.GetClient(), - Clientset: clientset, - DynamicClient: dynamicClient, - EventRecorder: mgr.GetEventRecorderFor(fmt.Sprintf("lumigo-operator.v%s/controller", lumigoOperatorVersion)), - Scheme: mgr.GetScheme(), - LumigoOperatorVersion: lumigoOperatorVersion, - LumigoInjectorImage: lumigoInjectorImage, - TelemetryProxyOtlpServiceUrl: telemetryProxyOtlpService, + Client: mgr.GetClient(), + Clientset: clientset, + DynamicClient: dynamicClient, + EventRecorder: mgr.GetEventRecorderFor(fmt.Sprintf("lumigo-operator.v%s/controller", lumigoOperatorVersion)), + Scheme: mgr.GetScheme(), + LumigoOperatorVersion: lumigoOperatorVersion, + LumigoInjectorImage: lumigoInjectorImage, + TelemetryProxyOtlpServiceUrl: telemetryProxyOtlpService, + TelemetryProxyOtlpLogsServiceUrl: telemetryProxyOtlpLogsService, TelemetryProxyNamespaceConfigurationsPath: namespaceConfigurationsPath, Log: logger, }).SetupWithManager(mgr); err != nil { @@ -166,11 +168,12 @@ func startManager(metricsAddr string, probeAddr string, enableLeaderElection boo } if err = (&injector.LumigoInjectorWebhookHandler{ - EventRecorder: mgr.GetEventRecorderFor(fmt.Sprintf("lumigo-operator.v%s/injector-webhook", lumigoOperatorVersion)), - LumigoOperatorVersion: lumigoOperatorVersion, - LumigoInjectorImage: lumigoInjectorImage, - TelemetryProxyOtlpServiceUrl: telemetryProxyOtlpService, - Log: logger, + EventRecorder: mgr.GetEventRecorderFor(fmt.Sprintf("lumigo-operator.v%s/injector-webhook", lumigoOperatorVersion)), + LumigoOperatorVersion: lumigoOperatorVersion, + LumigoInjectorImage: lumigoInjectorImage, + TelemetryProxyOtlpServiceUrl: telemetryProxyOtlpService, + TelemetryProxyOtlpLogsServiceUrl: telemetryProxyOtlpLogsService, + Log: logger, }).SetupWebhookWithManager(mgr); err != nil { return fmt.Errorf("unable to create injector webhook: %w", err) } diff --git a/controller/src/mutation/matchers.go b/controller/src/mutation/matchers.go index f8892c8..1a0c982 100644 --- a/controller/src/mutation/matchers.go +++ b/controller/src/mutation/matchers.go @@ -3,6 +3,7 @@ package mutation import ( "fmt" "reflect" + "strconv" "github.com/onsi/gomega" "github.com/onsi/gomega/format" @@ -15,16 +16,19 @@ import ( var ( errAutotraceLabelNotFound = fmt.Errorf("'%s' label not found", LumigoAutoTraceLabelKey) - errLdPreloadEnvVarNotSet = fmt.Errorf("the environment variable '%s' is not set in the container's Env", LdPreloadEnvVarName) - errLumigoTracerTokenEnvVarNotSet = fmt.Errorf("the environment variable '%s' is not set in the container's Env", LumigoTracerTokenEnvVarName) - errLumigoEndpointEnvVarNotSet = fmt.Errorf("the environment variable '%s' is not set in the container's Env", LumigoEndpointEnvVarName) + errEnvVarMissingFormat = "the environment variable '%s' is not set in the container's Env" + errLdPreloadEnvVarNotSet = fmt.Errorf(errEnvVarMissingFormat, LdPreloadEnvVarName) + errLumigoTracerTokenEnvVarNotSet = fmt.Errorf(errEnvVarMissingFormat, LumigoTracerTokenEnvVarName) + errLumigoEndpointEnvVarNotSet = fmt.Errorf(errEnvVarMissingFormat, LumigoEndpointEnvVarName) + errLumigoEnableLogsEnvVarNotSet = fmt.Errorf(errEnvVarMissingFormat, LumigoEnableLogsEnvVarName) ) -func BeInstrumentedWithLumigo(lumigoOperatorVersion string, lumigoInjectorImage string, lumigoEndpointUrl string) types.GomegaMatcher { +func BeInstrumentedWithLumigo(lumigoOperatorVersion string, lumigoInjectorImage string, lumigoEndpointUrl string, lumigoLogsEnabled bool) types.GomegaMatcher { return &beInstrumentedWithLumigo{ lumigoOperatorVersion: lumigoOperatorVersion, lumigoInjectorImage: lumigoInjectorImage, lumigoEndpointUrl: lumigoEndpointUrl, + lumigoLogsEnabled: lumigoLogsEnabled, } } @@ -32,6 +36,7 @@ type beInstrumentedWithLumigo struct { lumigoOperatorVersion string lumigoInjectorImage string lumigoEndpointUrl string + lumigoLogsEnabled bool } func (m *beInstrumentedWithLumigo) Match(actual interface{}) (bool, error) { @@ -248,6 +253,8 @@ func (m *beInstrumentedWithLumigo) isContainerInstrumentedWithLumigo(container * ldPreloadEnvVarFound := false lumigoTracerTokenEnvVarFound := false lumigoEndpointEnvVarFound := false + lumigoEnableLogsEnvVarFound := false + for _, envVar := range container.Env { switch envVar.Name { case LdPreloadEnvVarName: @@ -267,6 +274,18 @@ func (m *beInstrumentedWithLumigo) isContainerInstrumentedWithLumigo(container * return false, fmt.Errorf("unexpected value for '%s' env var: expected '%s', found '%s'", LumigoEndpointEnvVarName, m.lumigoEndpointUrl, envVar.Value) } lumigoEndpointEnvVarFound = true + + case LumigoEnableLogsEnvVarName: + boolValue, err := strconv.ParseBool(envVar.Value) + + if err != nil { + return false, fmt.Errorf("unexpected value for boolean '%s' env var: '%s'", LumigoEnableLogsEnvVarName, envVar.Value) + } + + if boolValue != m.lumigoLogsEnabled { + return false, fmt.Errorf("unexpected value for '%s' env var: expected '%t', found '%s'", LumigoEnableLogsEnvVarName, m.lumigoLogsEnabled, envVar.Value) + } + lumigoEnableLogsEnvVarFound = true } } @@ -282,6 +301,10 @@ func (m *beInstrumentedWithLumigo) isContainerInstrumentedWithLumigo(container * return false, errLumigoEndpointEnvVarNotSet } + if !lumigoEnableLogsEnvVarFound { + return false, errLumigoEnableLogsEnvVarNotSet + } + volumeMountFound := false for _, volumeMount := range container.VolumeMounts { if volumeMount.Name == LumigoInjectorVolumeName { diff --git a/controller/src/mutation/mutate.go b/controller/src/mutation/mutate.go index ee68cc2..ece0c70 100644 --- a/controller/src/mutation/mutate.go +++ b/controller/src/mutation/mutate.go @@ -22,6 +22,7 @@ import ( "fmt" "reflect" + "strconv" "strings" "github.com/go-logr/logr" @@ -46,6 +47,8 @@ const LumigoInjectorVolumeName = "lumigo-injector" const LumigoInjectorVolumeMountPoint = "/opt/lumigo" const LumigoTracerTokenEnvVarName = "LUMIGO_TRACER_TOKEN" const LumigoEndpointEnvVarName = "LUMIGO_ENDPOINT" +const LumigoLogsEndpointEnvVarName = "LUMIGO_LOGS_ENDPOINT" +const LumigoEnableLogsEnvVarName = "LUMIGO_ENABLE_LOGS" const LumigoContainerNameEnvVarName = "LUMIGO_CONTAINER_NAME" const LdPreloadEnvVarName = "LD_PRELOAD" const LdPreloadEnvVarValue = LumigoInjectorVolumeMountPoint + "/injector/lumigo_injector.so" @@ -78,6 +81,8 @@ type mutatorImpl struct { log *logr.Logger lumigoAutotraceLabelValue string lumigoEndpoint string + lumigoLogsEndpoint string + lumigoEnableLogs bool lumigoToken *operatorv1alpha1.Credentials lumigoInjectorImage string } @@ -86,18 +91,30 @@ func (m *mutatorImpl) GetAutotraceLabelValue() string { return m.lumigoAutotraceLabelValue } -func NewMutator(Log *logr.Logger, LumigoToken *operatorv1alpha1.Credentials, LumigoOperatorVersion string, LumigoInjectorImage string, TelemetryProxyOtlpServiceUrl string) (Mutator, error) { +func NewMutator(Log *logr.Logger, LumigoSpec *operatorv1alpha1.LumigoSpec, LumigoOperatorVersion string, LumigoInjectorImage string, TelemetryProxyOtlpServiceUrl string, TelemetryProxyOtlpLogsServiceUrl string) (Mutator, error) { version := LumigoOperatorVersion if len(version) > 8 { version = version[0:7] // Label values have a limit of 63 characters, we stay well below that } + lumigoEnableLogs := false + if LumigoSpec != nil && LumigoSpec.Logging.Enabled != nil { + lumigoEnableLogs = *LumigoSpec.Logging.Enabled + } + + lumigoToken := &operatorv1alpha1.Credentials{} + if LumigoSpec != nil { + lumigoToken = &LumigoSpec.LumigoToken + } + return &mutatorImpl{ log: Log, lumigoAutotraceLabelValue: LumigoAutoTraceLabelVersionPrefixValue + version, lumigoEndpoint: TelemetryProxyOtlpServiceUrl, - lumigoToken: LumigoToken, + lumigoLogsEndpoint: TelemetryProxyOtlpLogsServiceUrl, + lumigoEnableLogs: lumigoEnableLogs, + lumigoToken: lumigoToken, lumigoInjectorImage: LumigoInjectorImage, }, nil } @@ -403,6 +420,29 @@ func (m *mutatorImpl) injectLumigoIntoPodSpec(podSpec *corev1.PodSpec) error { } else { envVars[lumigoEndpointEnvVarIndex] = *lumigoEndpointEnvVar } + + lumigoLogsEndpointEnvVar := &corev1.EnvVar{ + Name: LumigoLogsEndpointEnvVarName, + Value: m.lumigoLogsEndpoint, + } + lumigoLogsEndpointEnvVarIndex := slices.IndexFunc(envVars, func(c corev1.EnvVar) bool { return c.Name == LumigoLogsEndpointEnvVarName }) + if lumigoLogsEndpointEnvVarIndex < 0 { + envVars = append(envVars, *lumigoLogsEndpointEnvVar) + } else { + envVars[lumigoLogsEndpointEnvVarIndex] = *lumigoLogsEndpointEnvVar + } + + lumigoEnableLogsEnvVar := &corev1.EnvVar{ + Name: LumigoEnableLogsEnvVarName, + Value: strconv.FormatBool(m.lumigoEnableLogs), + } + lumigoEnableLogsEnvVarIndex := slices.IndexFunc(envVars, func(c corev1.EnvVar) bool { return c.Name == LumigoEnableLogsEnvVarName }) + if lumigoEnableLogsEnvVarIndex < 0 { + envVars = append(envVars, *lumigoEnableLogsEnvVar) + } else { + envVars[lumigoEnableLogsEnvVarIndex] = *lumigoEnableLogsEnvVar + } + lumigoContainerNameEnvVar := &corev1.EnvVar{ Name: LumigoContainerNameEnvVarName, Value: container.Name, diff --git a/controller/src/webhooks/defaulter/defaulter_webhook.go b/controller/src/webhooks/defaulter/defaulter_webhook.go index 23e1d2d..3a8b760 100644 --- a/controller/src/webhooks/defaulter/defaulter_webhook.go +++ b/controller/src/webhooks/defaulter/defaulter_webhook.go @@ -146,6 +146,11 @@ func (h *LumigoDefaulterWebhookHandler) Handle(ctx context.Context, request admi newLumigo.Spec.Infrastructure.KubeEvents.Enabled = &newTrue } + newFalse := false + if newLumigo.Spec.Logging.Enabled == nil { + newLumigo.Spec.Logging.Enabled = &newFalse + } + marshalled, err := json.Marshal(newLumigo) if err != nil { return admission.Errored(http.StatusInternalServerError, fmt.Errorf("cannot marshal object %w", err)) diff --git a/controller/src/webhooks/defaulter/defaulter_webhook_suite_test.go b/controller/src/webhooks/defaulter/defaulter_webhook_suite_test.go index 5af3635..c89008b 100644 --- a/controller/src/webhooks/defaulter/defaulter_webhook_suite_test.go +++ b/controller/src/webhooks/defaulter/defaulter_webhook_suite_test.go @@ -71,10 +71,10 @@ var _ = BeforeSuite(func() { By("bootstrapping test environment") testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, - ErrorIfCRDPathMissing: false, + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: true, WebhookInstallOptions: envtest.WebhookInstallOptions{ - Paths: []string{filepath.Join("..", "..", "config", "webhooks")}, + Paths: []string{filepath.Join("..", "..", "..", "..", "config", "webhooks")}, }, } @@ -197,7 +197,7 @@ var _ = Context("Lumigo defaulter webhook", func() { Context("when creating the first Lumigo instance in the namespace", func() { - It("it sets defaults for Tracing.Injection.*", func() { + It("it sets defaults for Tracing.Injection.* and Logging", func() { newLumigo := operatorv1alpha1.Lumigo{ TypeMeta: metav1.TypeMeta{ Kind: "Lumigo", @@ -225,6 +225,7 @@ var _ = Context("Lumigo defaulter webhook", func() { Expect(newLumigo.Spec.Tracing.Injection.Enabled).To(&beBoolPointer{expectedValue: true}) Expect(newLumigo.Spec.Tracing.Injection.InjectLumigoIntoExistingResourcesOnCreation).To(&beBoolPointer{expectedValue: true}) Expect(newLumigo.Spec.Tracing.Injection.RemoveLumigoFromResourcesOnDeletion).To(&beBoolPointer{expectedValue: true}) + Expect(newLumigo.Spec.Logging.Enabled).To(&beBoolPointer{expectedValue: false}) }) It("it rejects instances with blank .LumigoToken.Spec.LumigoToken.SecretRef.Name", func() { diff --git a/controller/src/webhooks/injector/injector_webhook.go b/controller/src/webhooks/injector/injector_webhook.go index 760f171..47dae20 100644 --- a/controller/src/webhooks/injector/injector_webhook.go +++ b/controller/src/webhooks/injector/injector_webhook.go @@ -50,10 +50,11 @@ type LumigoInjectorWebhookHandler struct { client.Client record.EventRecorder *admission.Decoder - LumigoOperatorVersion string - LumigoInjectorImage string - TelemetryProxyOtlpServiceUrl string - Log logr.Logger + LumigoOperatorVersion string + LumigoInjectorImage string + TelemetryProxyOtlpServiceUrl string + TelemetryProxyOtlpLogsServiceUrl string + Log logr.Logger } func (h *LumigoInjectorWebhookHandler) SetupWebhookWithManager(mgr ctrl.Manager) error { @@ -132,7 +133,7 @@ func (h *LumigoInjectorWebhookHandler) Handle(ctx context.Context, request admis return admission.Allowed(fmt.Sprintf("The Lumigo object in the '%s' namespace is not active; resource will not be mutated", namespace)) } - mutator, err := mutation.NewMutator(&log, &lumigo.Spec.LumigoToken, h.LumigoOperatorVersion, h.LumigoInjectorImage, h.TelemetryProxyOtlpServiceUrl) + mutator, err := mutation.NewMutator(&log, &lumigo.Spec, h.LumigoOperatorVersion, h.LumigoInjectorImage, h.TelemetryProxyOtlpServiceUrl, h.TelemetryProxyOtlpLogsServiceUrl) if err != nil { return admission.Allowed(fmt.Errorf("cannot instantiate mutator: %w", err).Error()) } diff --git a/controller/src/webhooks/injector/injector_webhook_suite_test.go b/controller/src/webhooks/injector/injector_webhook_suite_test.go index bb872f2..198f875 100644 --- a/controller/src/webhooks/injector/injector_webhook_suite_test.go +++ b/controller/src/webhooks/injector/injector_webhook_suite_test.go @@ -64,6 +64,7 @@ var lumigoApiVersion = fmt.Sprintf("%s/%s", operatorv1alpha1.GroupVersion.Group, var lumigoOperatorVersion = "2b1e6b60ca871edee1d8f543c400f0b24663349144b78c79cfa006efaad6176a" // Unrealistically long, but we need to ensure we don't set label values too long var lumigoInjectorImage = "localhost:5000/lumigo-autotrace:test" var telemetryProxyOtlpServiceUrl = "lumigo-telemetry-proxy.lumigo-system.svc.cluster.local" +var telemetryProxyOtlpLogsServiceUrl = telemetryProxyOtlpServiceUrl var statusActive = operatorv1alpha1.LumigoStatus{ Conditions: []operatorv1alpha1.LumigoCondition{ @@ -117,10 +118,10 @@ var _ = BeforeSuite(func() { By("bootstrapping test environment") testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, - ErrorIfCRDPathMissing: false, + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: true, WebhookInstallOptions: envtest.WebhookInstallOptions{ - Paths: []string{filepath.Join("..", "..", "config", "webhooks")}, + Paths: []string{filepath.Join("..", "..", "..", "..", "config", "webhooks")}, }, } @@ -174,11 +175,12 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) err = (&LumigoInjectorWebhookHandler{ - EventRecorder: mgr.GetEventRecorderFor(fmt.Sprintf("lumigo-operator.v%s", lumigoOperatorVersion)), - LumigoOperatorVersion: lumigoOperatorVersion, - LumigoInjectorImage: lumigoInjectorImage, - TelemetryProxyOtlpServiceUrl: telemetryProxyOtlpServiceUrl, - Log: ctrl.Log.WithName("injector-webhook").WithName("Lumigo"), + EventRecorder: mgr.GetEventRecorderFor(fmt.Sprintf("lumigo-operator.v%s", lumigoOperatorVersion)), + LumigoOperatorVersion: lumigoOperatorVersion, + LumigoInjectorImage: lumigoInjectorImage, + TelemetryProxyOtlpServiceUrl: telemetryProxyOtlpServiceUrl, + TelemetryProxyOtlpLogsServiceUrl: telemetryProxyOtlpLogsServiceUrl, + Log: ctrl.Log.WithName("injector-webhook").WithName("Lumigo"), }).SetupWebhookWithManager(mgr) Expect(err).NotTo(HaveOccurred()) @@ -321,7 +323,7 @@ var _ = Context("Lumigo defaulter webhook", func() { Name: "DoesNot", Key: "Exist", }, - }, true) + }, true, true) Expect(k8sClient.Create(ctx, lumigo)).Should(Succeed()) lumigo.Status = statusErroneous @@ -386,7 +388,7 @@ var _ = Context("Lumigo defaulter webhook", func() { Name: "lumigosecret", Key: "token", }, - }, true) + }, true, true) Expect(k8sClient.Create(ctx, lumigo)).Should(Succeed()) lumigo.Status = statusActive @@ -432,7 +434,7 @@ var _ = Context("Lumigo defaulter webhook", func() { Expect(err).NotTo(HaveOccurred()) } - Expect(deploymentAfter).To(mutation.BeInstrumentedWithLumigo(lumigoOperatorVersion, lumigoInjectorImage, telemetryProxyOtlpServiceUrl)) + Expect(deploymentAfter).To(mutation.BeInstrumentedWithLumigo(lumigoOperatorVersion, lumigoInjectorImage, telemetryProxyOtlpServiceUrl, true)) }) It("should inject a deployment with containers running not as root", func() { @@ -441,7 +443,7 @@ var _ = Context("Lumigo defaulter webhook", func() { Name: "lumigosecret", Key: "token", }, - }, true) + }, true, false) Expect(k8sClient.Create(ctx, lumigo)).Should(Succeed()) lumigo.Status = statusActive @@ -491,7 +493,7 @@ var _ = Context("Lumigo defaulter webhook", func() { Expect(err).NotTo(HaveOccurred()) } - Expect(deploymentAfter).To(mutation.BeInstrumentedWithLumigo(lumigoOperatorVersion, lumigoInjectorImage, telemetryProxyOtlpServiceUrl)) + Expect(deploymentAfter).To(mutation.BeInstrumentedWithLumigo(lumigoOperatorVersion, lumigoInjectorImage, telemetryProxyOtlpServiceUrl, false)) Expect(deploymentAfter.Spec.Template.Spec.InitContainers[0].SecurityContext.RunAsNonRoot).To(Equal(&f)) }) @@ -501,7 +503,7 @@ var _ = Context("Lumigo defaulter webhook", func() { Name: "lumigosecret", Key: "token", }, - }, true) + }, true, false) Expect(k8sClient.Create(ctx, lumigo)).Should(Succeed()) lumigo.Status = statusActive @@ -562,7 +564,7 @@ var _ = Context("Lumigo defaulter webhook", func() { Expect(err).NotTo(HaveOccurred()) } - Expect(deploymentAfter).To(mutation.BeInstrumentedWithLumigo(lumigoOperatorVersion, lumigoInjectorImage, telemetryProxyOtlpServiceUrl)) + Expect(deploymentAfter).To(mutation.BeInstrumentedWithLumigo(lumigoOperatorVersion, lumigoInjectorImage, telemetryProxyOtlpServiceUrl, false)) Expect(deploymentAfter.Spec.Template.Spec.InitContainers[0].SecurityContext.RunAsGroup).To(Equal(&group)) }) @@ -574,7 +576,7 @@ var _ = Context("Lumigo defaulter webhook", func() { Name: "doesnot", Key: "exist", }, - }, true) + }, true, true) Expect(k8sClient.Create(ctx, lumigo)).Should(Succeed()) lumigo.Status = statusActive @@ -636,7 +638,7 @@ var _ = Context("Lumigo defaulter webhook", func() { }) -func newLumigo(namespace string, name string, lumigoToken operatorv1alpha1.Credentials, injectionEnabled bool) *operatorv1alpha1.Lumigo { +func newLumigo(namespace string, name string, lumigoToken operatorv1alpha1.Credentials, injectionEnabled bool, loggingEnabled bool) *operatorv1alpha1.Lumigo { return &operatorv1alpha1.Lumigo{ TypeMeta: metav1.TypeMeta{ Kind: "Lumigo", @@ -654,6 +656,9 @@ func newLumigo(namespace string, name string, lumigoToken operatorv1alpha1.Crede Enabled: &injectionEnabled, }, }, + Logging: operatorv1alpha1.LoggingSpec{ + Enabled: &loggingEnabled, + }, }, } } diff --git a/helm.yaml b/helm.yaml index 227fc8b..c56e4e7 100644 --- a/helm.yaml +++ b/helm.yaml @@ -153,6 +153,17 @@ spec: required: - injection type: object + logging: + description: 'LoggingSpec specifies if logging should be set up by the operator' + properties: + enabled: + description: Whether Daemonsets, Deployments, ReplicaSets, + StatefulSets, CronJobs and Jobs that are created or updated + after the creation of the Lumigo resource and are injected will + have their logs sent to Lumigo. + If unspecified, defaults to `false` + type: boolean + type: object type: object status: description: LumigoStatus defines the observed state of Lumigo @@ -683,6 +694,8 @@ spec: readOnlyRootFilesystem: true runAsNonRoot: true - env: + - name: LUMIGO_LOGS_ENDPOINT + value: https://ga-otlp.lumigo-tracer-edge.golumigo.com - name: LUMIGO_ENDPOINT value: https://ga-otlp.lumigo-tracer-edge.golumigo.com - mountPath: /lumigo/etc/namespaces/ diff --git a/kustomize.yaml b/kustomize.yaml index 15fd8b6..f4f4296 100644 --- a/kustomize.yaml +++ b/kustomize.yaml @@ -96,6 +96,17 @@ spec: required: - injection type: object + logging: + description: 'LoggingSpec specifies if logging should be set up by the operator' + properties: + enabled: + description: Whether Daemonsets, Deployments, ReplicaSets, + StatefulSets, CronJobs and Jobs that are created or updated + after the creation of the Lumigo resource and are injected will + have their logs sent to Lumigo. + If unspecified, defaults to `false` + type: boolean + type: object type: object status: description: LumigoStatus defines the observed state of Lumigo diff --git a/telemetryproxy/docker/etc/config.yaml.tpl b/telemetryproxy/docker/etc/config.yaml.tpl index 9e0f64e..73b6a19 100644 --- a/telemetryproxy/docker/etc/config.yaml.tpl +++ b/telemetryproxy/docker/etc/config.yaml.tpl @@ -93,6 +93,10 @@ exporters: endpoint: {{ env.Getenv "LUMIGO_ENDPOINT" "https://ga-otlp.lumigo-tracer-edge.golumigo.com" }} auth: authenticator: headers_setter/lumigo + otlphttp/lumigo_logs: + endpoint: {{ env.Getenv "LUMIGO_LOGS_ENDPOINT" "https://ga-otlp.lumigo-tracer-edge.golumigo.com" }} + auth: + authenticator: headers_setter/lumigo {{- if $debug }} logging: verbosity: detailed @@ -233,6 +237,21 @@ service: - logging {{- end }} - otlphttp/lumigo_ns_{{ $namespace.name }} + logs/application_logs_ns_{{ $namespace.name }}: + receivers: + - otlp + processors: + - k8sdataenricherprocessor + - transform/add_ns_attributes_ns_{{ $namespace.name }} +{{- if $clusterName }} + - transform/add_cluster_name +{{- end }} + - transform/inject_operator_details_into_resource + exporters: +{{- if $config.debug }} + - logging +{{- end }} + - otlphttp/lumigo_logs logs/k8s_objects_ns_{{ $namespace.name }}: receivers: - k8sobjects/objects_ns_{{ $namespace.name }} diff --git a/telemetryproxy/src/processor/k8sdataenricherprocessor/processor.go b/telemetryproxy/src/processor/k8sdataenricherprocessor/processor.go index ac68c2f..9419ba8 100644 --- a/telemetryproxy/src/processor/k8sdataenricherprocessor/processor.go +++ b/telemetryproxy/src/processor/k8sdataenricherprocessor/processor.go @@ -72,122 +72,127 @@ func (kp *kubernetesprocessor) processTraces(ctx context.Context, tr ptrace.Trac resourceSpanLength := resourceSpans.Len() for i := 0; i < resourceSpanLength; i++ { resource := resourceSpans.At(i).Resource() - resourceAttributes := resource.Attributes() - resourceAttributes.PutStr(K8SProviderIdKey, kp.kube.GetProviderId()) - resourceAttributes.PutStr(K8SClusterUIDKey, string(kp.clusterUid)) + kp.addResourceAttributes(ctx, resource) + } - pod, found := kp.getPod(ctx, &resource) - if !found { - kp.logger.Debug( - "Cannot find pod by 'k8s.pod.uid' of by connection ip", - zap.Any("resource-attributes", resourceAttributes.AsRaw()), - ) - continue - } - resourceAttributes.PutStr(K8SNodeNameKey, pod.Spec.NodeName) + return tr, nil +} - // Ensure 'k8s.pod.uid' is set (we might have found the pod via the network connection ip) - if _, found := resourceAttributes.Get(string(semconv.K8SPodUIDKey)); !found { - resourceAttributes.PutStr(string(semconv.K8SPodUIDKey), string(pod.UID)) - } +func (kp *kubernetesprocessor) addResourceAttributes(ctx context.Context, resource pcommon.Resource) { + resourceAttributes := resource.Attributes() - resourceAttributes.PutStr(string(semconv.K8SPodNameKey), pod.Name) + resourceAttributes.PutStr(K8SProviderIdKey, kp.kube.GetProviderId()) + resourceAttributes.PutStr(K8SClusterUIDKey, string(kp.clusterUid)) - resourceAttributes.PutStr(string(semconv.K8SNamespaceNameKey), pod.Namespace) - if namespace, nsFound := kp.kube.GetNamespaceByName(pod.Namespace); nsFound { - resourceAttributes.PutStr("k8s.namespace.uid", string(namespace.UID)) - } else { - kp.logger.Error( - "Cannot add namespace resource attributes to traces, namespace not found", - zap.String("namespace", pod.Namespace), - ) - } + pod, found := kp.getPod(ctx, &resource) + if !found { + kp.logger.Debug( + "Cannot find pod by 'k8s.pod.uid' of by connection ip", + zap.Any("resource-attributes", resourceAttributes.AsRaw()), + ) + return + } + resourceAttributes.PutStr(K8SNodeNameKey, pod.Spec.NodeName) - ownerObject, found, err := kp.kube.ResolveRelevantOwnerReference(ctx, pod) - if err != nil { - kp.logger.Error( - "Cannot look up owner reference for pod", - zap.Any("pod", pod), - zap.Error(err), - ) - } + // Ensure 'k8s.pod.uid' is set (we might have found the pod via the network connection ip) + if _, found := resourceAttributes.Get(string(semconv.K8SPodUIDKey)); !found { + resourceAttributes.PutStr(string(semconv.K8SPodUIDKey), string(pod.UID)) + } - if !found { - kp.logger.Debug( - "Pod has no owner reference we can use to add other resource attributes", - zap.Any("pod", pod), - ) - continue + resourceAttributes.PutStr(string(semconv.K8SPodNameKey), pod.Name) + + resourceAttributes.PutStr(string(semconv.K8SNamespaceNameKey), pod.Namespace) + if namespace, nsFound := kp.kube.GetNamespaceByName(pod.Namespace); nsFound { + resourceAttributes.PutStr("k8s.namespace.uid", string(namespace.UID)) + } else { + kp.logger.Error( + "Cannot add namespace resource attributes to traces, namespace not found", + zap.String("namespace", pod.Namespace), + ) + } + + ownerObject, found, err := kp.kube.ResolveRelevantOwnerReference(ctx, pod) + if err != nil { + kp.logger.Error( + "Cannot look up owner reference for pod", + zap.Any("pod", pod), + zap.Error(err), + ) + } + + if !found { + kp.logger.Debug( + "Pod has no owner reference we can use to add other resource attributes", + zap.Any("pod", pod), + ) + return + } + + switch podOwner := ownerObject.(type) { + case *appsv1.DaemonSet: + { + resourceAttributes.PutStr(string(semconv.K8SDaemonSetNameKey), podOwner.Name) + resourceAttributes.PutStr(string(semconv.K8SDaemonSetUIDKey), string(podOwner.UID)) } + case *appsv1.ReplicaSet: + { + resourceAttributes.PutStr(string(semconv.K8SReplicaSetNameKey), podOwner.Name) + resourceAttributes.PutStr(string(semconv.K8SReplicaSetUIDKey), string(podOwner.UID)) - switch podOwner := ownerObject.(type) { - case *appsv1.DaemonSet: - { - resourceAttributes.PutStr(string(semconv.K8SDaemonSetNameKey), podOwner.Name) - resourceAttributes.PutStr(string(semconv.K8SDaemonSetUIDKey), string(podOwner.UID)) - } - case *appsv1.ReplicaSet: - { - resourceAttributes.PutStr(string(semconv.K8SReplicaSetNameKey), podOwner.Name) - resourceAttributes.PutStr(string(semconv.K8SReplicaSetUIDKey), string(podOwner.UID)) - - if replicaSetOwner, found, err := kp.kube.ResolveRelevantOwnerReference(ctx, podOwner); found { - if deployment, ok := replicaSetOwner.(*appsv1.Deployment); ok { - resourceAttributes.PutStr(string(semconv.K8SDeploymentNameKey), deployment.Name) - resourceAttributes.PutStr(string(semconv.K8SDeploymentUIDKey), string(deployment.UID)) - } else { - kp.logger.Error( - "Cannot add deployment resource attributes to traces, replicaset's owner object is not a *apps/v1.Deployment", - zap.Any("owner-object", ownerObject), - zap.Error(err), - ) - } + if replicaSetOwner, found, err := kp.kube.ResolveRelevantOwnerReference(ctx, podOwner); found { + if deployment, ok := replicaSetOwner.(*appsv1.Deployment); ok { + resourceAttributes.PutStr(string(semconv.K8SDeploymentNameKey), deployment.Name) + resourceAttributes.PutStr(string(semconv.K8SDeploymentUIDKey), string(deployment.UID)) } else { - kp.logger.Debug( - "Cannot add deployment resource attributes to traces, replicaset has no deployment owner", - zap.Any("replicaset", podOwner), + kp.logger.Error( + "Cannot add deployment resource attributes to traces, replicaset's owner object is not a *apps/v1.Deployment", + zap.Any("owner-object", ownerObject), zap.Error(err), ) } + } else { + kp.logger.Debug( + "Cannot add deployment resource attributes to traces, replicaset has no deployment owner", + zap.Any("replicaset", podOwner), + zap.Error(err), + ) } - case *appsv1.StatefulSet: - { - resourceAttributes.PutStr(string(semconv.K8SStatefulSetNameKey), podOwner.Name) - resourceAttributes.PutStr(string(semconv.K8SStatefulSetUIDKey), string(podOwner.UID)) - } - case *batchv1.Job: - { - resourceAttributes.PutStr(string(semconv.K8SJobNameKey), podOwner.Name) - resourceAttributes.PutStr(string(semconv.K8SJobUIDKey), string(podOwner.UID)) - - if jobOwner, found, err := kp.kube.ResolveRelevantOwnerReference(ctx, podOwner); found { - if cronJob, ok := jobOwner.(*batchv1.CronJob); ok { - resourceAttributes.PutStr(string(semconv.K8SCronJobNameKey), cronJob.Name) - resourceAttributes.PutStr(string(semconv.K8SCronJobUIDKey), string(cronJob.UID)) - } else { - kp.logger.Error( - "Cannot add cronjob resource attributes to traces, job's object is not a *batch/v1.CronJob", - zap.Any("owner-object", ownerObject), - zap.Error(err), - ) - } + } + case *appsv1.StatefulSet: + { + resourceAttributes.PutStr(string(semconv.K8SStatefulSetNameKey), podOwner.Name) + resourceAttributes.PutStr(string(semconv.K8SStatefulSetUIDKey), string(podOwner.UID)) + } + case *batchv1.Job: + { + resourceAttributes.PutStr(string(semconv.K8SJobNameKey), podOwner.Name) + resourceAttributes.PutStr(string(semconv.K8SJobUIDKey), string(podOwner.UID)) + + if jobOwner, found, err := kp.kube.ResolveRelevantOwnerReference(ctx, podOwner); found { + if cronJob, ok := jobOwner.(*batchv1.CronJob); ok { + resourceAttributes.PutStr(string(semconv.K8SCronJobNameKey), cronJob.Name) + resourceAttributes.PutStr(string(semconv.K8SCronJobUIDKey), string(cronJob.UID)) } else { - kp.logger.Debug( - "Cannot add cronjob resource attributes to traces, job has no cronjob owner", - zap.Any("job", podOwner), + kp.logger.Error( + "Cannot add cronjob resource attributes to traces, job's object is not a *batch/v1.CronJob", + zap.Any("owner-object", ownerObject), zap.Error(err), ) } - } - default: - { - kp.logger.Error("Unexpected owner-object for pod", zap.Any("pod", pod), zap.Any("owner-object", ownerObject)) + } else { + kp.logger.Debug( + "Cannot add cronjob resource attributes to traces, job has no cronjob owner", + zap.Any("job", podOwner), + zap.Error(err), + ) } } + default: + { + kp.logger.Error("Unexpected owner-object for pod", zap.Any("pod", pod), zap.Any("owner-object", ownerObject)) + } } - - return tr, nil } func (kp *kubernetesprocessor) getPod(ctx context.Context, resource *pcommon.Resource) (*corev1.Pod, bool) { @@ -214,10 +219,7 @@ func (kp *kubernetesprocessor) processLogs(ctx context.Context, ld plog.Logs) (p for i := 0; i < resourceLogs.Len(); i++ { rl := resourceLogs.At(i) - resourceAttributes := rl.Resource().Attributes() - resourceAttributes.PutStr(K8SClusterUIDKey, string(kp.clusterUid)) - resourceAttributes.PutStr(K8SProviderIdKey, kp.kube.GetProviderId()) - + kp.addResourceAttributes(ctx, rl.Resource()); kp.processResourceLogs(ctx, &rl) } @@ -226,6 +228,7 @@ func (kp *kubernetesprocessor) processLogs(ctx context.Context, ld plog.Logs) (p func (kp *kubernetesprocessor) processResourceLogs(ctx context.Context, resourceLogs *plog.ResourceLogs) { scopeLogs := resourceLogs.ScopeLogs() + for i := 0; i < scopeLogs.Len(); i++ { sl := scopeLogs.At(i) diff --git a/tests/kubernetes-distros/kind/apps/client/app.js b/tests/kubernetes-distros/kind/apps/client/app.js index 9125008..1cd46e5 100644 --- a/tests/kubernetes-distros/kind/apps/client/app.js +++ b/tests/kubernetes-distros/kind/apps/client/app.js @@ -2,16 +2,21 @@ const axios = require('axios'); const { init } = require('@lumigo/opentelemetry'); const { SpanStatusCode, trace } = require('@opentelemetry/api'); +const winston = require('winston'); if (!process.env.TARGET_URL) { throw new Error("The required environment variable 'TARGET_URL' is not set") } -(async() => { +(async () => { const { tracerProvider } = await init; const tracer = trace.getTracer(__filename) - - await tracer.startActiveSpan('batch', async(rootSpan) => { + const logger = winston.createLogger({ + transports: [new winston.transports.Console()], + level: 'info' + }); + logger.info('Starting batch job...'); + await tracer.startActiveSpan('batch', async (rootSpan) => { try { const res = await axios.post(`${process.env.TARGET_URL}/api/checkout`, { "reference": "Order1234567", diff --git a/tests/kubernetes-distros/kind/apps/client/package.json b/tests/kubernetes-distros/kind/apps/client/package.json index 9def253..bdfb890 100644 --- a/tests/kubernetes-distros/kind/apps/client/package.json +++ b/tests/kubernetes-distros/kind/apps/client/package.json @@ -4,9 +4,11 @@ "license": "ISC", "dependencies": { "@opentelemetry/api": "^1.4.1", - "axios": "^1.1.2" + "axios": "^1.1.2", + "winston": "3.13.0", + "@opentelemetry/winston-transport": "0.3.0" }, "scripts": { "start": "node app.js" } -} +} \ No newline at end of file diff --git a/tests/kubernetes-distros/kind/apps/python/Dockerfile b/tests/kubernetes-distros/kind/apps/python/Dockerfile new file mode 100644 index 0000000..60008db --- /dev/null +++ b/tests/kubernetes-distros/kind/apps/python/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.9-slim + +WORKDIR /code + +COPY ./app.py /code/app.py + +LABEL distro-version='DEV' + +CMD ["python", "app.py", "something to say to the logs"] \ No newline at end of file diff --git a/tests/kubernetes-distros/kind/apps/python/app.py b/tests/kubernetes-distros/kind/apps/python/app.py new file mode 100644 index 0000000..f26ef26 --- /dev/null +++ b/tests/kubernetes-distros/kind/apps/python/app.py @@ -0,0 +1,17 @@ +import sys +import time +import logging +from lumigo_opentelemetry import logger_provider + +logger = logging.getLogger("test") +logger.setLevel(logging.INFO) + +# Non-mandatory in our OTEL setup, but recommended for troubleshooting - adds a console handler to see the logs in the console +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.INFO) +logger.addHandler(console_handler) + +while True: + logger.info(sys.argv[1] if len(sys.argv) > 1 else "Hello, World!") + logger_provider.force_flush() + time.sleep(5) \ No newline at end of file diff --git a/tests/kubernetes-distros/kind/internal/build_docker_image.go b/tests/kubernetes-distros/kind/internal/build_docker_image.go index b62fc8c..da41c30 100644 --- a/tests/kubernetes-distros/kind/internal/build_docker_image.go +++ b/tests/kubernetes-distros/kind/internal/build_docker_image.go @@ -17,6 +17,7 @@ import ( const ( DEFAULT_JS_CLIENT_IMG_NAME = "host.docker.internal:5000/test-apps/js/client" DEFAULT_JS_SERVER_IMG_NAME = "host.docker.internal:5000/test-apps/js/server" + DEFAULT_PYTHON_IMG_NAME = "host.docker.internal:5000/test-apps/python-app" ) func BuildDockerImageAndExportArchive(imageName, sourceFolder, imageArchivePath string, logger *log.Logger) env.Func { diff --git a/tests/kubernetes-distros/kind/internal/context.go b/tests/kubernetes-distros/kind/internal/context.go index c4c21c1..447ef9c 100644 --- a/tests/kubernetes-distros/kind/internal/context.go +++ b/tests/kubernetes-distros/kind/internal/context.go @@ -15,6 +15,7 @@ var ( ContextKeySendDataToLumigo = ContextKey("lumigo/upstream/send_data") ContextTestAppJsClientImageName = ContextKey("test-apps/js/client/image/name") ContextTestAppJsServerImageName = ContextKey("test-apps/js/server/image/name") + ContextTestAppPythonImageName = ContextKey("test-apps/python/image/name") ) func (c ContextKey) String() string { diff --git a/tests/kubernetes-distros/kind/internal/install_operator.go b/tests/kubernetes-distros/kind/internal/install_operator.go index cf494ab..5474354 100644 --- a/tests/kubernetes-distros/kind/internal/install_operator.go +++ b/tests/kubernetes-distros/kind/internal/install_operator.go @@ -6,7 +6,6 @@ import ( "os" "path/filepath" "strings" - "testing" "time" "github.com/go-logr/logr" @@ -14,7 +13,6 @@ import ( "sigs.k8s.io/e2e-framework/klient/wait" "sigs.k8s.io/e2e-framework/klient/wait/conditions" "sigs.k8s.io/e2e-framework/pkg/envconf" - "sigs.k8s.io/e2e-framework/pkg/features" "sigs.k8s.io/e2e-framework/third_party/helm" appsv1 "k8s.io/api/apps/v1" @@ -28,7 +26,7 @@ const ( DEFAULT_IMG_VERSION = "latest" ) -func installLumigoOperator(ctx context.Context, client klient.Client, kubeconfigFilePath string, lumigoNamespace string, otlpSinkUrl string, logger logr.Logger) (context.Context, error) { +func installLumigoOperator(ctx context.Context, client klient.Client, kubeconfigFilePath string, lumigoNamespace string, otlpSinkUrl string, otlpSinkLogsUrl string, logger logr.Logger) (context.Context, error) { controllerImageName, controllerImageTag := splitContainerImageNameAndTag(ctx.Value(ContextKeyOperatorControllerImage).(string)) telemetryProxyImageName, telemetryProxyImageTag := splitContainerImageNameAndTag(ctx.Value(ContextKeyOperatorTelemetryProxyImage).(string)) operatorDebug := ctx.Value(ContextKeyLumigoOperatorDebug).(bool) @@ -50,6 +48,7 @@ func installLumigoOperator(ctx context.Context, client klient.Client, kubeconfig helm.WithArgs(fmt.Sprintf("--set controllerManager.telemetryProxy.image.repository=%s", telemetryProxyImageName)), helm.WithArgs(fmt.Sprintf("--set controllerManager.telemetryProxy.image.tag=%s", telemetryProxyImageTag)), helm.WithArgs(fmt.Sprintf("--set endpoint.otlp.url=%s", otlpSinkUrl)), + helm.WithArgs(fmt.Sprintf("--set endpoint.otlp.logs_url=%s", otlpSinkUrl)), helm.WithArgs(fmt.Sprintf("--set debug.enabled=%v", operatorDebug)), // Operator debug logging at runtime helm.WithArgs("--debug"), // Helm debug output on install helm.WithWait(), @@ -70,33 +69,17 @@ func installLumigoOperator(ctx context.Context, client klient.Client, kubeconfig return ctx, nil } -func LumigoOperatorEnvFunc(lumigoNamespace string, otlpSinkUrl string, logger logr.Logger) func(ctx context.Context, cfg *envconf.Config) (context.Context, error) { +func LumigoOperatorEnvFunc(lumigoNamespace string, otlpSinkUrl string, otlpSinkLogsUrl string, logger logr.Logger) func(ctx context.Context, cfg *envconf.Config) (context.Context, error) { return func(ctx context.Context, config *envconf.Config) (context.Context, error) { client, err := config.NewClient() if err != nil { return ctx, err } - return installLumigoOperator(ctx, client, config.KubeconfigFile(), lumigoNamespace, otlpSinkUrl, logger) + return installLumigoOperator(ctx, client, config.KubeconfigFile(), lumigoNamespace, otlpSinkUrl, otlpSinkLogsUrl, logger) } } -func LumigoOperatorFeature(lumigoNamespace string, otlpSinkUrl string, logger logr.Logger) features.Feature { - return features.New("LumigoOperatorLocal").Setup(func(ctx context.Context, t *testing.T, config *envconf.Config) context.Context { - client, err := config.NewClient() - if err != nil { - t.Fatal(err) - } - - ctx, err = installLumigoOperator(ctx, client, config.KubeconfigFile(), lumigoNamespace, otlpSinkUrl, logger) - if err != nil { - t.Fatal(err) - } - - return ctx - }).Feature() -} - func splitContainerImageNameAndTag(imageName string) (string, string) { lastColonIndex := strings.LastIndex(imageName, ":") lastSlashIndex := strings.LastIndex(imageName, "/") diff --git a/tests/kubernetes-distros/kind/internal/lumigo.go b/tests/kubernetes-distros/kind/internal/lumigo.go index 71ef0b6..b9e4742 100644 --- a/tests/kubernetes-distros/kind/internal/lumigo.go +++ b/tests/kubernetes-distros/kind/internal/lumigo.go @@ -5,7 +5,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func NewLumigo(namespace, name, lumigoTokenSecretName, lumigoTokenSecretKey string, injectionEnabled bool) *operatorv1alpha1.Lumigo { +func NewLumigo(namespace, name, lumigoTokenSecretName, lumigoTokenSecretKey string, injectionEnabled bool, enableLogging bool) *operatorv1alpha1.Lumigo { return &operatorv1alpha1.Lumigo{ ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, @@ -24,6 +24,9 @@ func NewLumigo(namespace, name, lumigoTokenSecretName, lumigoTokenSecretKey stri Enabled: &injectionEnabled, }, }, + Logging: operatorv1alpha1.LoggingSpec{ + Enabled: &enableLogging, + }, }, } } diff --git a/tests/kubernetes-distros/kind/lumigooperator_logs_test.go b/tests/kubernetes-distros/kind/lumigooperator_logs_test.go index 8c4d9ec..dfc6c73 100644 --- a/tests/kubernetes-distros/kind/lumigooperator_logs_test.go +++ b/tests/kubernetes-distros/kind/lumigooperator_logs_test.go @@ -23,6 +23,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -77,7 +78,7 @@ func TestLumigoOperatorEventsAndObjects(t *testing.T) { t.Fatal(err) } - lumigo := internal.NewLumigo(namespaceName, "lumigo", lumigoTokenName, lumigoTokenKey, true) + lumigo := internal.NewLumigo(namespaceName, "lumigo", lumigoTokenName, lumigoTokenKey, true, true) r, err := resources.New(client.RESTConfig()) if err != nil { @@ -87,8 +88,6 @@ func TestLumigoOperatorEventsAndObjects(t *testing.T) { r.Create(ctx, lumigo) deploymentName := "testdeployment" - testImage := "python" - logOutput := "IT'S ALIIIIIIVE!" tr := true var g int64 = 5678 @@ -124,8 +123,15 @@ func TestLumigoOperatorEventsAndObjects(t *testing.T) { Containers: []corev1.Container{ { Name: "myapp", - Image: testImage, - Command: []string{"python", "-c", fmt.Sprintf("while True: print(\"%s\"); import time; time.sleep(5)", logOutput)}, + Image: ctx.Value(internal.ContextTestAppPythonImageName).(string), + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("768Mi"), + }, + }, }, }, }, @@ -429,6 +435,52 @@ func TestLumigoOperatorEventsAndObjects(t *testing.T) { return ctx }). + Assess("Application logs are collected successfully and added k8s.* attributes", func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + otlpSinkDataPath := ctx.Value(internal.ContextKeyOtlpSinkDataPath).(string) + logsPath := filepath.Join(otlpSinkDataPath, "logs.json") + + if err := apimachinerywait.PollImmediateUntilWithContext(ctx, time.Second*5, func(context.Context) (bool, error) { + logsBytes, err := os.ReadFile(logsPath) + if err != nil { + return false, err + } + + if len(logsBytes) < 1 { + return false, err + } + + applicationLogs := make([]plog.LogRecord, 0) + + /* + * Logs come in multiple lines, and two different scopes; we need to split by '\n'. + * bufio.NewScanner fails because our lines are "too long" (LOL). + */ + exportRequests := strings.Split(string(logsBytes), "\n") + for _, exportRequestJson := range exportRequests { + exportRequest := plogotlp.NewExportRequest() + exportRequest.UnmarshalJSON([]byte(exportRequestJson)) + + if appLogs, err := exportRequestToApplicationLogRecords(exportRequest); err != nil { + t.Fatalf("Cannot extract logs from export request: %v", err) + } else { + applicationLogs = append(applicationLogs, appLogs...) + } + } + + if len(applicationLogs) < 1 { + // No application logs received yet + t.Fatalf("No application logs found in '%s'. \r\nMake sure the application has LUMIGO_ENABLE_LOGS=true and is emitting logs using a supported logger", logsPath) + return false, nil + } + + t.Logf("Found application logs: %d", len(applicationLogs)) + return true, nil + }); err != nil { + t.Fatalf("Failed to wait for application logs: %v", err) + } + + return ctx + }). Feature() testEnv.Test(t, testAppDeploymentFeature) @@ -553,20 +605,44 @@ func exportRequestToHeartbeatLogRecords(exportRequest plogotlp.ExportRequest) ([ return eventLogs, nil } +func exportRequestToApplicationLogRecords(exportRequest plogotlp.ExportRequest) ([]plog.LogRecord, error) { + applicationLogs := make([]plog.LogRecord, 0) + logs := exportRequest.Logs() + + l := logs.ResourceLogs().Len() + for i := 0; i < l; i++ { + e, err := resourceLogsToApplicationLogRecords(logs.ResourceLogs().At(i)) + if err != nil { + return nil, err + } + + applicationLogs = append(applicationLogs, e...) + } + + return applicationLogs, nil +} + +func resourceLogsToApplicationLogRecords(resourceLogs plog.ResourceLogs) ([]plog.LogRecord, error) { + return resourceLogsToScopedLogRecords(resourceLogs, "opentelemetry.sdk._logs._internal") +} + func resourceLogsToHeartbeatLogRecords(resourceLogs plog.ResourceLogs) ([]plog.LogRecord, error) { + return resourceLogsToScopedLogRecords(resourceLogs, "lumigo-operator.namespace_heartbeat") +} + +func resourceLogsToScopedLogRecords(resourceLogs plog.ResourceLogs, filteredScopeName string) ([]plog.LogRecord, error) { l := resourceLogs.ScopeLogs().Len() - heartbeatLogRecords := make([]plog.LogRecord, 0) + filteredScopeLogRecords := make([]plog.LogRecord, 0) + for i := 0; i < l; i++ { scopeLogs := resourceLogs.ScopeLogs().At(i) scopeName := scopeLogs.Scope().Name() logRecords := scopeLogsToLogRecords(scopeLogs) - switch scopeName { - case "lumigo-operator.namespace_heartbeat": - { - heartbeatLogRecords = append(heartbeatLogRecords, logRecords...) - } + if scopeName == filteredScopeName { + filteredScopeLogRecords = append(filteredScopeLogRecords, logRecords...) } } - return heartbeatLogRecords, nil + + return filteredScopeLogRecords, nil } diff --git a/tests/kubernetes-distros/kind/lumigooperator_traces_test.go b/tests/kubernetes-distros/kind/lumigooperator_traces_test.go index 610915e..517e107 100644 --- a/tests/kubernetes-distros/kind/lumigooperator_traces_test.go +++ b/tests/kubernetes-distros/kind/lumigooperator_traces_test.go @@ -76,7 +76,7 @@ func TestLumigoOperatorTraces(t *testing.T) { t.Fatal(err) } - lumigo := internal.NewLumigo(namespaceName, "lumigo", lumigoTokenName, lumigoTokenKey, true) + lumigo := internal.NewLumigo(namespaceName, "lumigo", lumigoTokenName, lumigoTokenKey, true, false) r, err := resources.New(client.RESTConfig()) if err != nil { diff --git a/tests/kubernetes-distros/kind/main_test.go b/tests/kubernetes-distros/kind/main_test.go index 639af0e..6e63de8 100644 --- a/tests/kubernetes-distros/kind/main_test.go +++ b/tests/kubernetes-distros/kind/main_test.go @@ -191,6 +191,9 @@ func TestMain(m *testing.M) { testJsServerImageName := fmt.Sprintf("%s:%s", internal.DEFAULT_JS_SERVER_IMG_NAME, runId) testJsServerImageArchivePath := filepath.Join(tmpDir, "test-js-server.tgz") + testPythonImageName := fmt.Sprintf("%s:%s", internal.DEFAULT_PYTHON_IMG_NAME, runId) + testPythonImageArchivePath := filepath.Join(tmpDir, "test-python.tgz") + ctx := context.WithValue(context.Background(), internal.ContextKeyRunId, runId) ctx = context.WithValue(ctx, internal.ContextKeyKubernetesClusterName, kindClusterName) ctx = context.WithValue(ctx, internal.ContextKeyOtlpSinkConfigPath, dataSinkConfigDir) @@ -203,18 +206,20 @@ func TestMain(m *testing.M) { ctx = context.WithValue(ctx, internal.ContextKeyOperatorTelemetryProxyImage, telemetryProxyImageName) ctx = context.WithValue(ctx, internal.ContextTestAppJsClientImageName, testJsClientImageName) ctx = context.WithValue(ctx, internal.ContextTestAppJsServerImageName, testJsServerImageName) + ctx = context.WithValue(ctx, internal.ContextTestAppPythonImageName, testPythonImageName) testEnv = env.NewWithConfig(cfg).WithContext(ctx) logrWrapper := stdr.New(logger) otlpSinkFeature, otlpSinkK8sServiceUrl := internal.OtlpSinkEnvFunc(OTLP_SINK_NAMESPACE, "otlp-sink", OTLP_SINK_OTEL_COLLECTOR_IMAGE, logrWrapper) - lumigoOperatorFeature := internal.LumigoOperatorEnvFunc(LUMIGO_SYSTEM_NAMESPACE, otlpSinkK8sServiceUrl, logrWrapper) + lumigoOperatorFeature := internal.LumigoOperatorEnvFunc(LUMIGO_SYSTEM_NAMESPACE, otlpSinkK8sServiceUrl, otlpSinkK8sServiceUrl, logrWrapper) testEnv.Setup( internal.BuildDockerImageAndExportArchive(controllerImageName, filepath.Join(repoRoot, "controller"), controllerImageArchivePath, logger), internal.BuildDockerImageAndExportArchive(telemetryProxyImageName, filepath.Join(repoRoot, "telemetryproxy"), telemetryProxyImageArchivePath, logger), internal.BuildDockerImageAndExportArchive(testJsClientImageName, filepath.Join(cwd, "apps", "client"), testJsClientImageArchivePath, logger), internal.BuildDockerImageAndExportArchive(testJsServerImageName, filepath.Join(cwd, "apps", "server"), testJsServerImageArchivePath, logger), + internal.BuildDockerImageAndExportArchive(testPythonImageName, filepath.Join(cwd, "apps", "python"), testPythonImageArchivePath, logger), envfuncs.CreateKindClusterWithConfig(kindClusterName, kindNodeImageVal, kindConfigPath), @@ -222,6 +227,8 @@ func TestMain(m *testing.M) { internal.LoadDockerImageArchiveToCluster(kindClusterName, telemetryProxyImageArchivePath, logger), internal.LoadDockerImageArchiveToCluster(kindClusterName, testJsClientImageArchivePath, logger), internal.LoadDockerImageArchiveToCluster(kindClusterName, testJsServerImageArchivePath, logger), + internal.LoadDockerImageArchiveToCluster(kindClusterName, testPythonImageArchivePath, logger), + /* * Otel Collector image is on Docker hub, no need to pull it into Kind (pulling into Kind * works only for local image, in the local Docker daemon). diff --git a/tests/kubernetes-distros/kind/resources/kind-config.yaml.tpl b/tests/kubernetes-distros/kind/resources/kind-config.yaml.tpl index fa5aac3..2e88f9d 100644 --- a/tests/kubernetes-distros/kind/resources/kind-config.yaml.tpl +++ b/tests/kubernetes-distros/kind/resources/kind-config.yaml.tpl @@ -6,4 +6,10 @@ nodes: - hostPath: '{{OTLP_SINK_CONFIG_VOLUME_PATH}}' containerPath: /lumigo/otlp-sink/config - hostPath: '{{OTLP_SINK_DATA_VOLUME_PATH}}' - containerPath: /lumigo/otlp-sink/data \ No newline at end of file + containerPath: /lumigo/otlp-sink/data + kubeadmConfigPatches: + - | + kind: InitConfiguration + nodeRegistration: + kubeletExtraArgs: + system-reserved: memory=4Gi \ No newline at end of file