Skip to content

Commit

Permalink
feat: sync cluster level resource status #2394
Browse files Browse the repository at this point in the history
Signed-off-by: yusank <yusankurban@gmail.com>
  • Loading branch information
yusank committed Aug 22, 2022
1 parent 78c299c commit d161730
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 0 deletions.
48 changes: 48 additions & 0 deletions pkg/controllers/binding/cluster_resource_binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package binding
import (
"context"
"fmt"
"reflect"

corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/dynamic"
Expand All @@ -21,6 +24,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
Expand All @@ -30,6 +34,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
"github.com/karmada-io/karmada/pkg/util/restmapper"
)

// ClusterResourceBindingControllerName is the controller name that will be used when reporting events.
Expand Down Expand Up @@ -131,6 +136,11 @@ func (c *ClusterResourceBindingController) syncBinding(binding *workv1alpha2.Clu
return controllerruntime.Result{Requeue: true}, errors.NewAggregate(errs)
}

err = c.updateResourceStatus(binding)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}

return controllerruntime.Result{}, nil
}

Expand All @@ -152,6 +162,44 @@ func (c *ClusterResourceBindingController) removeOrphanWorks(binding *workv1alph
return nil
}

// updateResourceStatus will try to calculate the summary status and update to original object
// that the ResourceBinding refer to.
func (c *ClusterResourceBindingController) updateResourceStatus(binding *workv1alpha2.ClusterResourceBinding) error {
resource := binding.Spec.Resource
gvr, err := restmapper.GetGroupVersionResource(
c.RESTMapper, schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind),
)
if err != nil {
klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resource.APIVersion, resource.Kind, err)
return err
}
obj, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, resource)
if err != nil {
klog.Errorf("Failed to get resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}

if !c.ResourceInterpreter.HookEnabled(obj.GroupVersionKind(), configv1alpha1.InterpreterOperationAggregateStatus) {
return nil
}
newObj, err := c.ResourceInterpreter.AggregateStatus(obj, binding.Status.AggregatedStatus)
if err != nil {
klog.Errorf("AggregateStatus for resource(%s/%s/%s) failed: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}
if reflect.DeepEqual(obj, newObj) {
klog.V(3).Infof("ignore update resource(%s/%s/%s) status as up to date", resource.Kind, resource.Namespace, resource.Name)
return nil
}

if _, err = c.DynamicClient.Resource(gvr).Namespace(resource.Namespace).UpdateStatus(context.TODO(), newObj, metav1.UpdateOptions{}); err != nil {
klog.Errorf("Failed to update resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}
klog.V(3).Infof("update resource status successfully for resource(%s/%s/%s)", resource.Kind, resource.Namespace, resource.Name)
return nil
}

// SetupWithManager creates a controller and register to controller manager.
func (c *ClusterResourceBindingController) SetupWithManager(mgr controllerruntime.Manager) error {
workFn := handler.MapFunc(
Expand Down
54 changes: 54 additions & 0 deletions pkg/resourceinterpreter/defaultinterpreter/aggregatestatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func getAllDefaultAggregateStatusInterpreter() map[schema.GroupVersionKind]aggre
s[appsv1.SchemeGroupVersion.WithKind(util.DaemonSetKind)] = aggregateDaemonSetStatus
s[appsv1.SchemeGroupVersion.WithKind(util.StatefulSetKind)] = aggregateStatefulSetStatus
s[corev1.SchemeGroupVersion.WithKind(util.PodKind)] = aggregatePodStatus
s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeKind)] = aggregatePersistentVolumeStatus
s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeClaimKind)] = aggregatePersistentVolumeClaimStatus
return s
}
Expand Down Expand Up @@ -364,6 +365,59 @@ func aggregatePodStatus(object *unstructured.Unstructured, aggregatedStatusItems
return helper.ToUnstructured(pod)
}

func aggregatePersistentVolumeStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
pv := &corev1.PersistentVolume{}
err := helper.ConvertToTypedObject(object, pv)
if err != nil {
return nil, err
}

newStatus := &corev1.PersistentVolumeStatus{}
volumePhases := sets.NewString()
for _, item := range aggregatedStatusItems {
if item.Status == nil {
// maybe volume's status hasn't been collected yet, assume it's in pending state.
// Otherwise, it may affect the final aggregated state.
volumePhases.Insert(string(corev1.VolumePending))
continue
}

temp := &corev1.PersistentVolumeStatus{}
if err = json.Unmarshal(item.Status.Raw, temp); err != nil {
return nil, err
}
klog.V(3).Infof("Grab persistentVolume(%s/%s) status from cluster(%s), phase: %s",
pv.Namespace, pv.Name, item.ClusterName, temp.Phase)

volumePhases.Insert(string(temp.Phase))
}

// Check final phase in order: VolumeFailed-->VolumePending-->VolumeAvailable-->VolumeBound-->VolumeReleased
// More details please refer to: https://github.com/karmada-io/karmada/issues/2394
switch {
case volumePhases.Has(string(corev1.VolumeFailed)):
newStatus.Phase = corev1.VolumeFailed
case volumePhases.Has(string(corev1.VolumePending)):
newStatus.Phase = corev1.VolumePending
case volumePhases.Has(string(corev1.VolumeAvailable)):
newStatus.Phase = corev1.VolumeAvailable
case volumePhases.Has(string(corev1.VolumeBound)):
newStatus.Phase = corev1.VolumeBound
case volumePhases.Has(string(corev1.VolumeReleased)):
newStatus.Phase = corev1.VolumeReleased
default:
klog.Errorf("SHOULD-NEVER-HAPPEN, maybe volume added a new state that Karmada don't know about.")
}

if reflect.DeepEqual(pv.Status, *newStatus) {
klog.V(3).Infof("ignore update persistentVolume(%s/%s) status as up to date", pv.Namespace, pv.Name)
return object, nil
}

pv.Status = *newStatus
return helper.ToUnstructured(pv)
}

func aggregatePersistentVolumeClaimStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
pvc := &corev1.PersistentVolumeClaim{}
err := helper.ConvertToTypedObject(object, pvc)
Expand Down
129 changes: 129 additions & 0 deletions pkg/resourceinterpreter/defaultinterpreter/aggregatestatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,3 +714,132 @@ func TestAggregatePVCStatus(t *testing.T) {
assert.Equal(t, tt.expectedObj, actualObj)
}
}

func TestAggregatePVStatus(t *testing.T) {
statusAvailableMap := map[string]interface{}{
"phase": corev1.VolumeAvailable,
}
// Available status
availableRaw1, _ := helper.BuildStatusRawExtension(statusAvailableMap)
availableRaw2, _ := helper.BuildStatusRawExtension(statusAvailableMap)

aggregatedStatusItems1 := []workv1alpha2.AggregatedStatusItem{
{ClusterName: "member1", Status: availableRaw1, Applied: true},
{ClusterName: "member2", Status: availableRaw2, Applied: true},
}

statusReleasedMap := map[string]interface{}{
"phase": corev1.VolumeReleased,
}
// Release status
releasedRaw1, _ := helper.BuildStatusRawExtension(statusReleasedMap)
releasedRaw2, _ := helper.BuildStatusRawExtension(statusReleasedMap)

aggregatedStatusItems2 := []workv1alpha2.AggregatedStatusItem{
{ClusterName: "member1", Status: releasedRaw1, Applied: true},
{ClusterName: "member2", Status: releasedRaw2, Applied: true},
}

statusBoundMap := map[string]interface{}{
"phase": corev1.VolumeBound,
}
// Bound status
boundRaw1, _ := helper.BuildStatusRawExtension(statusBoundMap)
boundRaw2, _ := helper.BuildStatusRawExtension(statusReleasedMap)

aggregatedStatusItems3 := []workv1alpha2.AggregatedStatusItem{
{ClusterName: "member1", Status: boundRaw1, Applied: true},
{ClusterName: "member2", Status: boundRaw2, Applied: true},
}

// Failed status
statusFailedMap := map[string]interface{}{
"phase": corev1.VolumeFailed,
}
failedRaw1, _ := helper.BuildStatusRawExtension(releasedRaw1)
failedRaw2, _ := helper.BuildStatusRawExtension(statusFailedMap)

aggregatedStatusItems4 := []workv1alpha2.AggregatedStatusItem{
{ClusterName: "member1", Status: failedRaw1, Applied: true},
{ClusterName: "member2", Status: failedRaw2, Applied: true},
}

// Pending status
statusPendingMap := map[string]interface{}{
"phase": corev1.VolumePending,
}
pendingRaw1, _ := helper.BuildStatusRawExtension(statusAvailableMap)
pendingRaw2, _ := helper.BuildStatusRawExtension(statusPendingMap)

aggregatedStatusItems5 := []workv1alpha2.AggregatedStatusItem{
{ClusterName: "member1", Status: pendingRaw1, Applied: true},
{ClusterName: "member2", Status: pendingRaw2, Applied: true},
}

// test aggregatePersistentVolumeStatus function
oldPVC := &corev1.PersistentVolume{}
oldObj, _ := helper.ToUnstructured(oldPVC)

availableNewPv := &corev1.PersistentVolume{Status: corev1.PersistentVolumeStatus{Phase: corev1.VolumeAvailable}}
newAvailablePvObj, _ := helper.ToUnstructured(availableNewPv)

boundNewPV := &corev1.PersistentVolume{Status: corev1.PersistentVolumeStatus{Phase: corev1.VolumeBound}}
newBoundPVObj, _ := helper.ToUnstructured(boundNewPV)

releaseNewPV := &corev1.PersistentVolume{Status: corev1.PersistentVolumeStatus{Phase: corev1.VolumeReleased}}
newReleasePVObj, _ := helper.ToUnstructured(releaseNewPV)

failedNewPV := &corev1.PersistentVolume{Status: corev1.PersistentVolumeStatus{Phase: corev1.VolumeFailed}}
newFailedPVObj, _ := helper.ToUnstructured(failedNewPV)

pendingNewPV := &corev1.PersistentVolume{Status: corev1.PersistentVolumeStatus{Phase: corev1.VolumePending}}
newPendingPVObj, _ := helper.ToUnstructured(pendingNewPV)

tests := []struct {
name string
curObj *unstructured.Unstructured
aggregatedStatusItems []workv1alpha2.AggregatedStatusItem
expectedObj *unstructured.Unstructured
}{
{
name: "update pvc status1",
curObj: oldObj,
aggregatedStatusItems: aggregatedStatusItems1,
expectedObj: newAvailablePvObj,
},
{
name: "update pvc status2",
curObj: oldObj,
aggregatedStatusItems: aggregatedStatusItems2,
expectedObj: newReleasePVObj,
},
{
name: "update pvc status3",
curObj: oldObj,
aggregatedStatusItems: aggregatedStatusItems3,
expectedObj: newBoundPVObj,
},
{
name: "update pvc status4",
curObj: oldObj,
aggregatedStatusItems: aggregatedStatusItems4,
expectedObj: newFailedPVObj,
},
{
name: "update pvc status5",
curObj: oldObj,
aggregatedStatusItems: aggregatedStatusItems5,
expectedObj: newPendingPVObj,
},
{
name: "ignore update pvc status as up to date",
curObj: newAvailablePvObj,
aggregatedStatusItems: aggregatedStatusItems1,
expectedObj: newAvailablePvObj,
},
}
for _, tt := range tests {
actualObj, _ := aggregatePersistentVolumeStatus(tt.curObj, tt.aggregatedStatusItems)
assert.Equal(t, tt.expectedObj, actualObj, tt.name)
}
}
2 changes: 2 additions & 0 deletions pkg/util/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ const (
EndpointSliceKind = "EndpointSlice"
// PersistentVolumeClaimKind indicated the target resource is a persistentvolumeclaim
PersistentVolumeClaimKind = "PersistentVolumeClaim"
// PersistentVolumeKind indicates the target resource is a persistentvolume
PersistentVolumeKind = "PersistentVolume"
// HorizontalPodAutoscalerKind indicated the target resource is a horizontalpodautoscaler
HorizontalPodAutoscalerKind = "HorizontalPodAutoscaler"

Expand Down

0 comments on commit d161730

Please sign in to comment.