From d1617303a41c9fb472e29550acd16dc4cbd1320d Mon Sep 17 00:00:00 2001 From: yusank Date: Fri, 19 Aug 2022 15:42:59 +0800 Subject: [PATCH] feat: sync cluster level resource status #2394 Signed-off-by: yusank --- .../cluster_resource_binding_controller.go | 48 +++++++ .../defaultinterpreter/aggregatestatus.go | 54 ++++++++ .../aggregatestatus_test.go | 129 ++++++++++++++++++ pkg/util/constants.go | 2 + 4 files changed, 233 insertions(+) diff --git a/pkg/controllers/binding/cluster_resource_binding_controller.go b/pkg/controllers/binding/cluster_resource_binding_controller.go index 350d1907f16b..a63b214daaa5 100644 --- a/pkg/controllers/binding/cluster_resource_binding_controller.go +++ b/pkg/controllers/binding/cluster_resource_binding_controller.go @@ -3,11 +3,14 @@ package binding import ( "context" "fmt" + "reflect" corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/dynamic" @@ -21,6 +24,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" @@ -30,6 +34,7 @@ import ( "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/overridemanager" + "github.com/karmada-io/karmada/pkg/util/restmapper" ) // ClusterResourceBindingControllerName is the controller name that will be used when reporting events. @@ -131,6 +136,11 @@ func (c *ClusterResourceBindingController) syncBinding(binding *workv1alpha2.Clu return controllerruntime.Result{Requeue: true}, errors.NewAggregate(errs) } + err = c.updateResourceStatus(binding) + if err != nil { + return controllerruntime.Result{Requeue: true}, err + } + return controllerruntime.Result{}, nil } @@ -152,6 +162,44 @@ func (c *ClusterResourceBindingController) removeOrphanWorks(binding *workv1alph return nil } +// updateResourceStatus will try to calculate the summary status and update to original object +// that the ResourceBinding refer to. +func (c *ClusterResourceBindingController) updateResourceStatus(binding *workv1alpha2.ClusterResourceBinding) error { + resource := binding.Spec.Resource + gvr, err := restmapper.GetGroupVersionResource( + c.RESTMapper, schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind), + ) + if err != nil { + klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resource.APIVersion, resource.Kind, err) + return err + } + obj, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, resource) + if err != nil { + klog.Errorf("Failed to get resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err) + return err + } + + if !c.ResourceInterpreter.HookEnabled(obj.GroupVersionKind(), configv1alpha1.InterpreterOperationAggregateStatus) { + return nil + } + newObj, err := c.ResourceInterpreter.AggregateStatus(obj, binding.Status.AggregatedStatus) + if err != nil { + klog.Errorf("AggregateStatus for resource(%s/%s/%s) failed: %v", resource.Kind, resource.Namespace, resource.Name, err) + return err + } + if reflect.DeepEqual(obj, newObj) { + klog.V(3).Infof("ignore update resource(%s/%s/%s) status as up to date", resource.Kind, resource.Namespace, resource.Name) + return nil + } + + if _, err = c.DynamicClient.Resource(gvr).Namespace(resource.Namespace).UpdateStatus(context.TODO(), newObj, metav1.UpdateOptions{}); err != nil { + klog.Errorf("Failed to update resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err) + return err + } + klog.V(3).Infof("update resource status successfully for resource(%s/%s/%s)", resource.Kind, resource.Namespace, resource.Name) + return nil +} + // SetupWithManager creates a controller and register to controller manager. func (c *ClusterResourceBindingController) SetupWithManager(mgr controllerruntime.Manager) error { workFn := handler.MapFunc( diff --git a/pkg/resourceinterpreter/defaultinterpreter/aggregatestatus.go b/pkg/resourceinterpreter/defaultinterpreter/aggregatestatus.go index aea5ebcb8c72..098ef1e89779 100644 --- a/pkg/resourceinterpreter/defaultinterpreter/aggregatestatus.go +++ b/pkg/resourceinterpreter/defaultinterpreter/aggregatestatus.go @@ -29,6 +29,7 @@ func getAllDefaultAggregateStatusInterpreter() map[schema.GroupVersionKind]aggre s[appsv1.SchemeGroupVersion.WithKind(util.DaemonSetKind)] = aggregateDaemonSetStatus s[appsv1.SchemeGroupVersion.WithKind(util.StatefulSetKind)] = aggregateStatefulSetStatus s[corev1.SchemeGroupVersion.WithKind(util.PodKind)] = aggregatePodStatus + s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeKind)] = aggregatePersistentVolumeStatus s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeClaimKind)] = aggregatePersistentVolumeClaimStatus return s } @@ -364,6 +365,59 @@ func aggregatePodStatus(object *unstructured.Unstructured, aggregatedStatusItems return helper.ToUnstructured(pod) } +func aggregatePersistentVolumeStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) { + pv := &corev1.PersistentVolume{} + err := helper.ConvertToTypedObject(object, pv) + if err != nil { + return nil, err + } + + newStatus := &corev1.PersistentVolumeStatus{} + volumePhases := sets.NewString() + for _, item := range aggregatedStatusItems { + if item.Status == nil { + // maybe volume's status hasn't been collected yet, assume it's in pending state. + // Otherwise, it may affect the final aggregated state. + volumePhases.Insert(string(corev1.VolumePending)) + continue + } + + temp := &corev1.PersistentVolumeStatus{} + if err = json.Unmarshal(item.Status.Raw, temp); err != nil { + return nil, err + } + klog.V(3).Infof("Grab persistentVolume(%s/%s) status from cluster(%s), phase: %s", + pv.Namespace, pv.Name, item.ClusterName, temp.Phase) + + volumePhases.Insert(string(temp.Phase)) + } + + // Check final phase in order: VolumeFailed-->VolumePending-->VolumeAvailable-->VolumeBound-->VolumeReleased + // More details please refer to: https://github.com/karmada-io/karmada/issues/2394 + switch { + case volumePhases.Has(string(corev1.VolumeFailed)): + newStatus.Phase = corev1.VolumeFailed + case volumePhases.Has(string(corev1.VolumePending)): + newStatus.Phase = corev1.VolumePending + case volumePhases.Has(string(corev1.VolumeAvailable)): + newStatus.Phase = corev1.VolumeAvailable + case volumePhases.Has(string(corev1.VolumeBound)): + newStatus.Phase = corev1.VolumeBound + case volumePhases.Has(string(corev1.VolumeReleased)): + newStatus.Phase = corev1.VolumeReleased + default: + klog.Errorf("SHOULD-NEVER-HAPPEN, maybe volume added a new state that Karmada don't know about.") + } + + if reflect.DeepEqual(pv.Status, *newStatus) { + klog.V(3).Infof("ignore update persistentVolume(%s/%s) status as up to date", pv.Namespace, pv.Name) + return object, nil + } + + pv.Status = *newStatus + return helper.ToUnstructured(pv) +} + func aggregatePersistentVolumeClaimStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) { pvc := &corev1.PersistentVolumeClaim{} err := helper.ConvertToTypedObject(object, pvc) diff --git a/pkg/resourceinterpreter/defaultinterpreter/aggregatestatus_test.go b/pkg/resourceinterpreter/defaultinterpreter/aggregatestatus_test.go index f9a673e4fc64..23c2f406e398 100644 --- a/pkg/resourceinterpreter/defaultinterpreter/aggregatestatus_test.go +++ b/pkg/resourceinterpreter/defaultinterpreter/aggregatestatus_test.go @@ -714,3 +714,132 @@ func TestAggregatePVCStatus(t *testing.T) { assert.Equal(t, tt.expectedObj, actualObj) } } + +func TestAggregatePVStatus(t *testing.T) { + statusAvailableMap := map[string]interface{}{ + "phase": corev1.VolumeAvailable, + } + // Available status + availableRaw1, _ := helper.BuildStatusRawExtension(statusAvailableMap) + availableRaw2, _ := helper.BuildStatusRawExtension(statusAvailableMap) + + aggregatedStatusItems1 := []workv1alpha2.AggregatedStatusItem{ + {ClusterName: "member1", Status: availableRaw1, Applied: true}, + {ClusterName: "member2", Status: availableRaw2, Applied: true}, + } + + statusReleasedMap := map[string]interface{}{ + "phase": corev1.VolumeReleased, + } + // Release status + releasedRaw1, _ := helper.BuildStatusRawExtension(statusReleasedMap) + releasedRaw2, _ := helper.BuildStatusRawExtension(statusReleasedMap) + + aggregatedStatusItems2 := []workv1alpha2.AggregatedStatusItem{ + {ClusterName: "member1", Status: releasedRaw1, Applied: true}, + {ClusterName: "member2", Status: releasedRaw2, Applied: true}, + } + + statusBoundMap := map[string]interface{}{ + "phase": corev1.VolumeBound, + } + // Bound status + boundRaw1, _ := helper.BuildStatusRawExtension(statusBoundMap) + boundRaw2, _ := helper.BuildStatusRawExtension(statusReleasedMap) + + aggregatedStatusItems3 := []workv1alpha2.AggregatedStatusItem{ + {ClusterName: "member1", Status: boundRaw1, Applied: true}, + {ClusterName: "member2", Status: boundRaw2, Applied: true}, + } + + // Failed status + statusFailedMap := map[string]interface{}{ + "phase": corev1.VolumeFailed, + } + failedRaw1, _ := helper.BuildStatusRawExtension(releasedRaw1) + failedRaw2, _ := helper.BuildStatusRawExtension(statusFailedMap) + + aggregatedStatusItems4 := []workv1alpha2.AggregatedStatusItem{ + {ClusterName: "member1", Status: failedRaw1, Applied: true}, + {ClusterName: "member2", Status: failedRaw2, Applied: true}, + } + + // Pending status + statusPendingMap := map[string]interface{}{ + "phase": corev1.VolumePending, + } + pendingRaw1, _ := helper.BuildStatusRawExtension(statusAvailableMap) + pendingRaw2, _ := helper.BuildStatusRawExtension(statusPendingMap) + + aggregatedStatusItems5 := []workv1alpha2.AggregatedStatusItem{ + {ClusterName: "member1", Status: pendingRaw1, Applied: true}, + {ClusterName: "member2", Status: pendingRaw2, Applied: true}, + } + + // test aggregatePersistentVolumeStatus function + oldPVC := &corev1.PersistentVolume{} + oldObj, _ := helper.ToUnstructured(oldPVC) + + availableNewPv := &corev1.PersistentVolume{Status: corev1.PersistentVolumeStatus{Phase: corev1.VolumeAvailable}} + newAvailablePvObj, _ := helper.ToUnstructured(availableNewPv) + + boundNewPV := &corev1.PersistentVolume{Status: corev1.PersistentVolumeStatus{Phase: corev1.VolumeBound}} + newBoundPVObj, _ := helper.ToUnstructured(boundNewPV) + + releaseNewPV := &corev1.PersistentVolume{Status: corev1.PersistentVolumeStatus{Phase: corev1.VolumeReleased}} + newReleasePVObj, _ := helper.ToUnstructured(releaseNewPV) + + failedNewPV := &corev1.PersistentVolume{Status: corev1.PersistentVolumeStatus{Phase: corev1.VolumeFailed}} + newFailedPVObj, _ := helper.ToUnstructured(failedNewPV) + + pendingNewPV := &corev1.PersistentVolume{Status: corev1.PersistentVolumeStatus{Phase: corev1.VolumePending}} + newPendingPVObj, _ := helper.ToUnstructured(pendingNewPV) + + tests := []struct { + name string + curObj *unstructured.Unstructured + aggregatedStatusItems []workv1alpha2.AggregatedStatusItem + expectedObj *unstructured.Unstructured + }{ + { + name: "update pvc status1", + curObj: oldObj, + aggregatedStatusItems: aggregatedStatusItems1, + expectedObj: newAvailablePvObj, + }, + { + name: "update pvc status2", + curObj: oldObj, + aggregatedStatusItems: aggregatedStatusItems2, + expectedObj: newReleasePVObj, + }, + { + name: "update pvc status3", + curObj: oldObj, + aggregatedStatusItems: aggregatedStatusItems3, + expectedObj: newBoundPVObj, + }, + { + name: "update pvc status4", + curObj: oldObj, + aggregatedStatusItems: aggregatedStatusItems4, + expectedObj: newFailedPVObj, + }, + { + name: "update pvc status5", + curObj: oldObj, + aggregatedStatusItems: aggregatedStatusItems5, + expectedObj: newPendingPVObj, + }, + { + name: "ignore update pvc status as up to date", + curObj: newAvailablePvObj, + aggregatedStatusItems: aggregatedStatusItems1, + expectedObj: newAvailablePvObj, + }, + } + for _, tt := range tests { + actualObj, _ := aggregatePersistentVolumeStatus(tt.curObj, tt.aggregatedStatusItems) + assert.Equal(t, tt.expectedObj, actualObj, tt.name) + } +} diff --git a/pkg/util/constants.go b/pkg/util/constants.go index 634439b1a3ae..41edb8fb5d58 100644 --- a/pkg/util/constants.go +++ b/pkg/util/constants.go @@ -96,6 +96,8 @@ const ( EndpointSliceKind = "EndpointSlice" // PersistentVolumeClaimKind indicated the target resource is a persistentvolumeclaim PersistentVolumeClaimKind = "PersistentVolumeClaim" + // PersistentVolumeKind indicates the target resource is a persistentvolume + PersistentVolumeKind = "PersistentVolume" // HorizontalPodAutoscalerKind indicated the target resource is a horizontalpodautoscaler HorizontalPodAutoscalerKind = "HorizontalPodAutoscaler"