From 0d5177ff8f5a6b071385a03695832731b48d232f Mon Sep 17 00:00:00 2001 From: Johannes Scheuermann Date: Tue, 7 Mar 2023 18:28:01 +0000 Subject: [PATCH] Ensure we are not blocking if the operator is not able to get the Pod client (#1532) --- controllers/suite_test.go | 1 + controllers/update_pods.go | 143 +++++++++++++++++++------------- controllers/update_pods_test.go | 62 +++++++++++++- 3 files changed, 144 insertions(+), 62 deletions(-) diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 39ed22d6d..0d7b399c5 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -155,6 +155,7 @@ func reconcileObject(reconciler reconcile.Reconciler, metadata metav1.ObjectMeta log.Info("Reconciliation successful") } } + return result, err } diff --git a/controllers/update_pods.go b/controllers/update_pods.go index d7a4dcd62..1b9166e4c 100644 --- a/controllers/update_pods.go +++ b/controllers/update_pods.go @@ -3,7 +3,7 @@ * * This source file is part of the FoundationDB open source project * - * Copyright 2019-2021 Apple Inc. and the FoundationDB project authors + * Copyright 2019-2023 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,11 +44,48 @@ func (updatePods) reconcile(ctx context.Context, r *FoundationDBClusterReconcile pods, err := r.PodLifecycleManager.GetPods(ctx, r, cluster, internal.GetPodListOptions(cluster, "", "")...) if err != nil { - return &requeue{curError: err} + return &requeue{curError: err, delayedRequeue: true} + } + + updates, err := getPodsToUpdate(logger, r, cluster, internal.CreatePodMap(cluster, pods)) + if err != nil { + return &requeue{curError: err, delay: podSchedulingDelayDuration, delayedRequeue: true} + } + + if len(updates) > 0 { + if cluster.Spec.AutomationOptions.PodUpdateStrategy == fdbv1beta2.PodUpdateStrategyReplacement { + logger.Info("Requeuing reconciliation to replace pods") + return &requeue{message: "Requeueing reconciliation to replace pods"} + } + + if r.PodLifecycleManager.GetDeletionMode(cluster) == fdbv1beta2.PodUpdateModeNone { + r.Recorder.Event(cluster, corev1.EventTypeNormal, + "NeedsPodsDeletion", "Spec require deleting some pods, but deleting pods is disabled") + cluster.Status.Generations.NeedsPodDeletion = cluster.ObjectMeta.Generation + err = r.updateOrApply(ctx, cluster) + if err != nil { + logger.Error(err, "Error updating cluster status") + } + return &requeue{message: "Pod deletion is disabled"} + } } + if len(updates) == 0 { + return nil + } + + adminClient, err := r.getDatabaseClientProvider().GetAdminClient(cluster, r.Client) + if err != nil { + return &requeue{curError: err, delayedRequeue: true} + } + defer adminClient.Close() + + return deletePodsForUpdates(ctx, r, cluster, adminClient, updates, logger) +} + +// getPodsToUpdate returns a map of Zone to Pods mapping. The map has the fault domain as key and all Pods in that fault domain will be present as a slice of *corev1.Pod. +func getPodsToUpdate(logger logr.Logger, reconciler *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster, podMap map[fdbv1beta2.ProcessGroupID]*corev1.Pod) (map[string][]*corev1.Pod, error) { updates := make(map[string][]*corev1.Pod) - podMap := internal.CreatePodMap(cluster, pods) for _, processGroup := range cluster.Status.ProcessGroups { if processGroup.IsMarkedForRemoval() { @@ -74,90 +111,78 @@ func (updatePods) reconcile(ctx context.Context, r *FoundationDBClusterReconcile logger.V(1).Info("Could not find Pod for process group ID", "processGroupID", processGroup.ProcessGroupID) continue - // TODO should not be continue but rather be a requeue? } if shouldRequeueDueToTerminatingPod(pod, cluster, processGroup.ProcessGroupID) { - return &requeue{message: "Cluster has pod that is pending deletion", delay: podSchedulingDelayDuration, delayedRequeue: true} + return nil, fmt.Errorf("cluster has Pod %s that is pending deletion", pod.Name) } _, idNum, err := podmanager.ParseProcessGroupID(processGroup.ProcessGroupID) if err != nil { - return &requeue{curError: err} + logger.Info("Skipping Pod due to error parsing Process Group ID", + "processGroupID", processGroup.ProcessGroupID, + "error", err.Error()) + continue } processClass, err := podmanager.GetProcessClass(cluster, pod) if err != nil { - return &requeue{curError: err} + logger.Info("Skipping Pod due to error fetching process class", + "processGroupID", processGroup.ProcessGroupID, + "error", err.Error()) + continue } specHash, err := internal.GetPodSpecHash(cluster, processClass, idNum, nil) if err != nil { - return &requeue{curError: err} - } - - if pod.ObjectMeta.Annotations[fdbv1beta2.LastSpecKey] != specHash { - logger.Info("Update Pod", + logger.Info("Skipping Pod due to error generating spec hash", "processGroupID", processGroup.ProcessGroupID, - "reason", fmt.Sprintf("specHash has changed from %s to %s", specHash, pod.ObjectMeta.Annotations[fdbv1beta2.LastSpecKey])) - - podClient, message := r.getPodClient(cluster, pod) - if podClient == nil { - return &requeue{message: message, delay: podSchedulingDelayDuration} - } - - substitutions, err := podClient.GetVariableSubstitutions() - if err != nil { - return &requeue{curError: err} - } + "error", err.Error()) + continue + } - if substitutions == nil { - logger.Info("Skipping pod due to missing locality information", - "processGroupID", processGroup.ProcessGroupID) - continue - } + // The Pod is updated, so we can continue. + if pod.ObjectMeta.Annotations[fdbv1beta2.LastSpecKey] == specHash { + continue + } - zone := substitutions["FDB_ZONE_ID"] - if r.InSimulation { - zone = "simulation" - } + logger.Info("Update Pod", + "processGroupID", processGroup.ProcessGroupID, + "reason", fmt.Sprintf("specHash has changed from %s to %s", specHash, pod.ObjectMeta.Annotations[fdbv1beta2.LastSpecKey])) - if updates[zone] == nil { - updates[zone] = make([]*corev1.Pod, 0) - } - updates[zone] = append(updates[zone], pod) + podClient, message := reconciler.getPodClient(cluster, pod) + if podClient == nil { + logger.Info("Skipping Pod due to missing Pod client information", + "processGroupID", processGroup.ProcessGroupID, + "message", message) + continue } - } - if len(updates) > 0 { - if cluster.Spec.AutomationOptions.PodUpdateStrategy == fdbv1beta2.PodUpdateStrategyReplacement { - logger.Info("Requeuing reconciliation to replace pods") - return &requeue{message: "Requeueing reconciliation to replace pods"} + substitutions, err := podClient.GetVariableSubstitutions() + if err != nil { + logger.Info("Skipping Pod due to missing variable substitutions", + "processGroupID", processGroup.ProcessGroupID) + continue } - if r.PodLifecycleManager.GetDeletionMode(cluster) == fdbv1beta2.PodUpdateModeNone { - r.Recorder.Event(cluster, corev1.EventTypeNormal, - "NeedsPodsDeletion", "Spec require deleting some pods, but deleting pods is disabled") - cluster.Status.Generations.NeedsPodDeletion = cluster.ObjectMeta.Generation - err = r.updateOrApply(ctx, cluster) - if err != nil { - logger.Error(err, "Error updating cluster status") - } - return &requeue{message: "Pod deletion is disabled"} + if substitutions == nil { + logger.Info("Skipping Pod due to missing locality information", + "processGroupID", processGroup.ProcessGroupID) + continue } - } - if len(updates) == 0 { - return nil - } + zone := substitutions["FDB_ZONE_ID"] + if reconciler.InSimulation { + zone = "simulation" + } - adminClient, err := r.getDatabaseClientProvider().GetAdminClient(cluster, r.Client) - if err != nil { - return &requeue{curError: err} + if updates[zone] == nil { + updates[zone] = make([]*corev1.Pod, 0) + } + updates[zone] = append(updates[zone], pod) } - defer adminClient.Close() - return deletePodsForUpdates(ctx, r, cluster, adminClient, updates, logger) + return updates, nil } func shouldRequeueDueToTerminatingPod(pod *corev1.Pod, cluster *fdbv1beta2.FoundationDBCluster, processGroupID fdbv1beta2.ProcessGroupID) bool { diff --git a/controllers/update_pods_test.go b/controllers/update_pods_test.go index 0b7f2d940..983b8ed94 100644 --- a/controllers/update_pods_test.go +++ b/controllers/update_pods_test.go @@ -21,9 +21,13 @@ package controllers import ( + "context" "fmt" "time" + "github.com/FoundationDB/fdb-kubernetes-operator/internal" + ctrlClient "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/utils/pointer" fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2" @@ -112,9 +116,10 @@ var _ = Describe("update_pods", func() { Context("Validating shouldRequeueDueToTerminatingPod", func() { var processGroup = fdbv1beta2.ProcessGroupID("") - When("pod is without deletionTimestamp", func() { + When("Pod is without deletionTimestamp", func() { var cluster *fdbv1beta2.FoundationDBCluster var pod *corev1.Pod + BeforeEach(func() { cluster = &fdbv1beta2.FoundationDBCluster{} pod = &corev1.Pod{ @@ -129,9 +134,10 @@ var _ = Describe("update_pods", func() { }) }) - When("pod with deletionTimestamp less than ignore limit", func() { + When("Pod with deletionTimestamp less than ignore limit", func() { var cluster *fdbv1beta2.FoundationDBCluster var pod *corev1.Pod + BeforeEach(func() { cluster = &fdbv1beta2.FoundationDBCluster{} pod = &corev1.Pod{ @@ -147,9 +153,10 @@ var _ = Describe("update_pods", func() { }) }) - When("pod with deletionTimestamp more than ignore limit", func() { + When("Pod with deletionTimestamp more than ignore limit", func() { var cluster *fdbv1beta2.FoundationDBCluster var pod *corev1.Pod + BeforeEach(func() { cluster = &fdbv1beta2.FoundationDBCluster{} pod = &corev1.Pod{ @@ -169,6 +176,7 @@ var _ = Describe("update_pods", func() { When("with configured IgnoreTerminatingPodsSeconds", func() { var cluster *fdbv1beta2.FoundationDBCluster var pod *corev1.Pod + BeforeEach(func() { cluster = &fdbv1beta2.FoundationDBCluster{ Spec: fdbv1beta2.FoundationDBClusterSpec{ @@ -191,4 +199,52 @@ var _ = Describe("update_pods", func() { }) }) }) + + When("fetching all Pods that needs an update", func() { + var cluster *fdbv1beta2.FoundationDBCluster + var updates map[string][]*corev1.Pod + var expectedError bool + + BeforeEach(func() { + cluster = internal.CreateDefaultCluster() + Expect(k8sClient.Create(context.TODO(), cluster)).NotTo(HaveOccurred()) + result, err := reconcileCluster(cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + Expect(k8sClient.Get(context.TODO(), ctrlClient.ObjectKeyFromObject(cluster), cluster)).NotTo(HaveOccurred()) + }) + + JustBeforeEach(func() { + pods, err := clusterReconciler.PodLifecycleManager.GetPods(context.TODO(), k8sClient, cluster, internal.GetPodListOptions(cluster, "", "")...) + Expect(err).NotTo(HaveOccurred()) + + updates, err = getPodsToUpdate(log, clusterReconciler, cluster, internal.CreatePodMap(cluster, pods)) + if !expectedError { + Expect(err).NotTo(HaveOccurred()) + } else { + Expect(err).To(HaveOccurred()) + } + }) + + When("the cluster has no changes", func() { + It("should return no errors and an empty map", func() { + Expect(updates).To(HaveLen(0)) + }) + }) + + When("there is a spec change for all processes", func() { + BeforeEach(func() { + storageSettings := cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral] + storageSettings.PodTemplate.Spec.NodeSelector = map[string]string{"test": "test"} + cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral] = storageSettings + + Expect(k8sClient.Update(context.TODO(), cluster)).NotTo(HaveOccurred()) + }) + + It("should return no errors and a map with one zone", func() { + // We only have one zone in this case, the simulation zone + Expect(updates).To(HaveLen(1)) + }) + }) + }) })