Skip to content

Commit

Permalink
Ensure we are not blocking if the operator is not able to get the Pod…
Browse files Browse the repository at this point in the history
… client (FoundationDB#1532)
  • Loading branch information
johscheuer authored Mar 7, 2023
1 parent e0b1434 commit 0d5177f
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 62 deletions.
1 change: 1 addition & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func reconcileObject(reconciler reconcile.Reconciler, metadata metav1.ObjectMeta
log.Info("Reconciliation successful")
}
}

return result, err
}

Expand Down
143 changes: 84 additions & 59 deletions controllers/update_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2019-2021 Apple Inc. and the FoundationDB project authors
* Copyright 2019-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,11 +44,48 @@ func (updatePods) reconcile(ctx context.Context, r *FoundationDBClusterReconcile

pods, err := r.PodLifecycleManager.GetPods(ctx, r, cluster, internal.GetPodListOptions(cluster, "", "")...)
if err != nil {
return &requeue{curError: err}
return &requeue{curError: err, delayedRequeue: true}
}

updates, err := getPodsToUpdate(logger, r, cluster, internal.CreatePodMap(cluster, pods))
if err != nil {
return &requeue{curError: err, delay: podSchedulingDelayDuration, delayedRequeue: true}
}

if len(updates) > 0 {
if cluster.Spec.AutomationOptions.PodUpdateStrategy == fdbv1beta2.PodUpdateStrategyReplacement {
logger.Info("Requeuing reconciliation to replace pods")
return &requeue{message: "Requeueing reconciliation to replace pods"}
}

if r.PodLifecycleManager.GetDeletionMode(cluster) == fdbv1beta2.PodUpdateModeNone {
r.Recorder.Event(cluster, corev1.EventTypeNormal,
"NeedsPodsDeletion", "Spec require deleting some pods, but deleting pods is disabled")
cluster.Status.Generations.NeedsPodDeletion = cluster.ObjectMeta.Generation
err = r.updateOrApply(ctx, cluster)
if err != nil {
logger.Error(err, "Error updating cluster status")
}
return &requeue{message: "Pod deletion is disabled"}
}
}

if len(updates) == 0 {
return nil
}

adminClient, err := r.getDatabaseClientProvider().GetAdminClient(cluster, r.Client)
if err != nil {
return &requeue{curError: err, delayedRequeue: true}
}
defer adminClient.Close()

return deletePodsForUpdates(ctx, r, cluster, adminClient, updates, logger)
}

// getPodsToUpdate returns a map of Zone to Pods mapping. The map has the fault domain as key and all Pods in that fault domain will be present as a slice of *corev1.Pod.
func getPodsToUpdate(logger logr.Logger, reconciler *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster, podMap map[fdbv1beta2.ProcessGroupID]*corev1.Pod) (map[string][]*corev1.Pod, error) {
updates := make(map[string][]*corev1.Pod)
podMap := internal.CreatePodMap(cluster, pods)

for _, processGroup := range cluster.Status.ProcessGroups {
if processGroup.IsMarkedForRemoval() {
Expand All @@ -74,90 +111,78 @@ func (updatePods) reconcile(ctx context.Context, r *FoundationDBClusterReconcile
logger.V(1).Info("Could not find Pod for process group ID",
"processGroupID", processGroup.ProcessGroupID)
continue
// TODO should not be continue but rather be a requeue?
}

if shouldRequeueDueToTerminatingPod(pod, cluster, processGroup.ProcessGroupID) {
return &requeue{message: "Cluster has pod that is pending deletion", delay: podSchedulingDelayDuration, delayedRequeue: true}
return nil, fmt.Errorf("cluster has Pod %s that is pending deletion", pod.Name)
}

_, idNum, err := podmanager.ParseProcessGroupID(processGroup.ProcessGroupID)
if err != nil {
return &requeue{curError: err}
logger.Info("Skipping Pod due to error parsing Process Group ID",
"processGroupID", processGroup.ProcessGroupID,
"error", err.Error())
continue
}

processClass, err := podmanager.GetProcessClass(cluster, pod)
if err != nil {
return &requeue{curError: err}
logger.Info("Skipping Pod due to error fetching process class",
"processGroupID", processGroup.ProcessGroupID,
"error", err.Error())
continue
}

specHash, err := internal.GetPodSpecHash(cluster, processClass, idNum, nil)
if err != nil {
return &requeue{curError: err}
}

if pod.ObjectMeta.Annotations[fdbv1beta2.LastSpecKey] != specHash {
logger.Info("Update Pod",
logger.Info("Skipping Pod due to error generating spec hash",
"processGroupID", processGroup.ProcessGroupID,
"reason", fmt.Sprintf("specHash has changed from %s to %s", specHash, pod.ObjectMeta.Annotations[fdbv1beta2.LastSpecKey]))

podClient, message := r.getPodClient(cluster, pod)
if podClient == nil {
return &requeue{message: message, delay: podSchedulingDelayDuration}
}

substitutions, err := podClient.GetVariableSubstitutions()
if err != nil {
return &requeue{curError: err}
}
"error", err.Error())
continue
}

if substitutions == nil {
logger.Info("Skipping pod due to missing locality information",
"processGroupID", processGroup.ProcessGroupID)
continue
}
// The Pod is updated, so we can continue.
if pod.ObjectMeta.Annotations[fdbv1beta2.LastSpecKey] == specHash {
continue
}

zone := substitutions["FDB_ZONE_ID"]
if r.InSimulation {
zone = "simulation"
}
logger.Info("Update Pod",
"processGroupID", processGroup.ProcessGroupID,
"reason", fmt.Sprintf("specHash has changed from %s to %s", specHash, pod.ObjectMeta.Annotations[fdbv1beta2.LastSpecKey]))

if updates[zone] == nil {
updates[zone] = make([]*corev1.Pod, 0)
}
updates[zone] = append(updates[zone], pod)
podClient, message := reconciler.getPodClient(cluster, pod)
if podClient == nil {
logger.Info("Skipping Pod due to missing Pod client information",
"processGroupID", processGroup.ProcessGroupID,
"message", message)
continue
}
}

if len(updates) > 0 {
if cluster.Spec.AutomationOptions.PodUpdateStrategy == fdbv1beta2.PodUpdateStrategyReplacement {
logger.Info("Requeuing reconciliation to replace pods")
return &requeue{message: "Requeueing reconciliation to replace pods"}
substitutions, err := podClient.GetVariableSubstitutions()
if err != nil {
logger.Info("Skipping Pod due to missing variable substitutions",
"processGroupID", processGroup.ProcessGroupID)
continue
}

if r.PodLifecycleManager.GetDeletionMode(cluster) == fdbv1beta2.PodUpdateModeNone {
r.Recorder.Event(cluster, corev1.EventTypeNormal,
"NeedsPodsDeletion", "Spec require deleting some pods, but deleting pods is disabled")
cluster.Status.Generations.NeedsPodDeletion = cluster.ObjectMeta.Generation
err = r.updateOrApply(ctx, cluster)
if err != nil {
logger.Error(err, "Error updating cluster status")
}
return &requeue{message: "Pod deletion is disabled"}
if substitutions == nil {
logger.Info("Skipping Pod due to missing locality information",
"processGroupID", processGroup.ProcessGroupID)
continue
}
}

if len(updates) == 0 {
return nil
}
zone := substitutions["FDB_ZONE_ID"]
if reconciler.InSimulation {
zone = "simulation"
}

adminClient, err := r.getDatabaseClientProvider().GetAdminClient(cluster, r.Client)
if err != nil {
return &requeue{curError: err}
if updates[zone] == nil {
updates[zone] = make([]*corev1.Pod, 0)
}
updates[zone] = append(updates[zone], pod)
}
defer adminClient.Close()

return deletePodsForUpdates(ctx, r, cluster, adminClient, updates, logger)
return updates, nil
}

func shouldRequeueDueToTerminatingPod(pod *corev1.Pod, cluster *fdbv1beta2.FoundationDBCluster, processGroupID fdbv1beta2.ProcessGroupID) bool {
Expand Down
62 changes: 59 additions & 3 deletions controllers/update_pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
package controllers

import (
"context"
"fmt"
"time"

"github.com/FoundationDB/fdb-kubernetes-operator/internal"
ctrlClient "sigs.k8s.io/controller-runtime/pkg/client"

"k8s.io/utils/pointer"

fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2"
Expand Down Expand Up @@ -112,9 +116,10 @@ var _ = Describe("update_pods", func() {
Context("Validating shouldRequeueDueToTerminatingPod", func() {
var processGroup = fdbv1beta2.ProcessGroupID("")

When("pod is without deletionTimestamp", func() {
When("Pod is without deletionTimestamp", func() {
var cluster *fdbv1beta2.FoundationDBCluster
var pod *corev1.Pod

BeforeEach(func() {
cluster = &fdbv1beta2.FoundationDBCluster{}
pod = &corev1.Pod{
Expand All @@ -129,9 +134,10 @@ var _ = Describe("update_pods", func() {
})
})

When("pod with deletionTimestamp less than ignore limit", func() {
When("Pod with deletionTimestamp less than ignore limit", func() {
var cluster *fdbv1beta2.FoundationDBCluster
var pod *corev1.Pod

BeforeEach(func() {
cluster = &fdbv1beta2.FoundationDBCluster{}
pod = &corev1.Pod{
Expand All @@ -147,9 +153,10 @@ var _ = Describe("update_pods", func() {
})
})

When("pod with deletionTimestamp more than ignore limit", func() {
When("Pod with deletionTimestamp more than ignore limit", func() {
var cluster *fdbv1beta2.FoundationDBCluster
var pod *corev1.Pod

BeforeEach(func() {
cluster = &fdbv1beta2.FoundationDBCluster{}
pod = &corev1.Pod{
Expand All @@ -169,6 +176,7 @@ var _ = Describe("update_pods", func() {
When("with configured IgnoreTerminatingPodsSeconds", func() {
var cluster *fdbv1beta2.FoundationDBCluster
var pod *corev1.Pod

BeforeEach(func() {
cluster = &fdbv1beta2.FoundationDBCluster{
Spec: fdbv1beta2.FoundationDBClusterSpec{
Expand All @@ -191,4 +199,52 @@ var _ = Describe("update_pods", func() {
})
})
})

When("fetching all Pods that needs an update", func() {
var cluster *fdbv1beta2.FoundationDBCluster
var updates map[string][]*corev1.Pod
var expectedError bool

BeforeEach(func() {
cluster = internal.CreateDefaultCluster()
Expect(k8sClient.Create(context.TODO(), cluster)).NotTo(HaveOccurred())
result, err := reconcileCluster(cluster)
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())
Expect(k8sClient.Get(context.TODO(), ctrlClient.ObjectKeyFromObject(cluster), cluster)).NotTo(HaveOccurred())
})

JustBeforeEach(func() {
pods, err := clusterReconciler.PodLifecycleManager.GetPods(context.TODO(), k8sClient, cluster, internal.GetPodListOptions(cluster, "", "")...)
Expect(err).NotTo(HaveOccurred())

updates, err = getPodsToUpdate(log, clusterReconciler, cluster, internal.CreatePodMap(cluster, pods))
if !expectedError {
Expect(err).NotTo(HaveOccurred())
} else {
Expect(err).To(HaveOccurred())
}
})

When("the cluster has no changes", func() {
It("should return no errors and an empty map", func() {
Expect(updates).To(HaveLen(0))
})
})

When("there is a spec change for all processes", func() {
BeforeEach(func() {
storageSettings := cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral]
storageSettings.PodTemplate.Spec.NodeSelector = map[string]string{"test": "test"}
cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral] = storageSettings

Expect(k8sClient.Update(context.TODO(), cluster)).NotTo(HaveOccurred())
})

It("should return no errors and a map with one zone", func() {
// We only have one zone in this case, the simulation zone
Expect(updates).To(HaveLen(1))
})
})
})
})

0 comments on commit 0d5177f

Please sign in to comment.