From 946ca626066489006290b1f3674e4a1a4fe39cdf Mon Sep 17 00:00:00 2001 From: zhouhao_yewu Date: Tue, 28 May 2024 10:36:21 +0800 Subject: [PATCH] support immediate mode storage Signed-off-by: zhouhao_yewu --- .../controllers/pod/root_pod_controller.go | 90 ++++++++++++++++++- .../controllers/pv/leaf_pv_controller.go | 6 +- .../controllers/pvc/leaf_pvc_controller.go | 2 +- .../controllers/pvc/root_pvc_controller.go | 20 +++-- pkg/utils/constants.go | 1 + pkg/utils/k8s.go | 61 +++++++++---- 6 files changed, 151 insertions(+), 29 deletions(-) diff --git a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go index f75433446..24fb2b7a1 100644 --- a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go @@ -282,6 +282,93 @@ func (r *RootPodReconciler) SetupWithManager(mgr manager.Manager) error { Complete(r) } +func (r *RootPodReconciler) createPvcInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pvcs []string, rootpod *corev1.Pod, cn *leafUtils.ClusterNode) error { + for _, pvcName := range pvcs { + rootPVC := &corev1.PersistentVolumeClaim{} + err := r.RootClient.Get(ctx, types.NamespacedName{Namespace: rootpod.Namespace, Name: pvcName}, rootPVC) + if err != nil { + return fmt.Errorf("could not get pvc %s from root cluster: %v", pvcName, err) + } + anno := rootPVC.GetAnnotations() + if rootPVC.Status.Phase == corev1.ClaimBound { + if _, ok := anno[utils.KosmosResourceOwnersAnnotations]; !ok { + anno[utils.KosmosPvcImmediateMode] = "true" + } + } + anno = utils.AddResourceClusters(anno, lr.ClusterName) + anno[utils.KosmosGlobalLabel] = "true" + + rootPVC.SetAnnotations(anno) + err = r.RootClient.Update(ctx, rootPVC) + if err != nil { + return fmt.Errorf("could not update pvc %s/%s in host cluster: %v", rootpod.Namespace, pvcName, err) + } + + leafPvc, err := lr.Clientset.CoreV1().PersistentVolumeClaims(rootpod.Namespace).Get(ctx, pvcName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + leafPvc = rootPVC.DeepCopy() + err = utils.ResetMetadata(leafPvc) + if err != nil { + return err + } + delete(anno, utils.PVCSelectedNodeKey) + leafPvc.SetAnnotations(anno) + leafPvc, err = lr.Clientset.CoreV1().PersistentVolumeClaims(rootpod.Namespace).Create(ctx, leafPvc, metav1.CreateOptions{}) + if err != nil { + klog.Errorf("Failed to create pvc %s/%s err: %v", rootpod.Namespace, pvcName, err) + return err + } + klog.V(4).Infof("Create pvc %s/%s success", rootpod.Namespace, pvcName) + if _, ok := anno[utils.KosmosPvcImmediateMode]; ok { + return r.createImmediateModePvInLeafCluster(ctx, lr, leafPvc) + } + } + return fmt.Errorf("could not check pvc %s/%s in external cluster: %v", rootpod.Namespace, pvcName, err) + } else { + if utils.IsImmediateModePvc(leafPvc.Annotations) && leafPvc.Status.Phase != corev1.ClaimBound { + _, err := lr.Clientset.CoreV1().PersistentVolumes().Get(ctx, leafPvc.Spec.VolumeName, metav1.GetOptions{}) + if errors.IsNotFound(err) { + return r.createImmediateModePvInLeafCluster(ctx, lr, leafPvc) + } + } + } + } + return nil +} + +func (r *RootPodReconciler) createImmediateModePvInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, leafPvc *corev1.PersistentVolumeClaim) error { + klog.V(4).Infof("Creating PV %s in leaf cluster %s", leafPvc.Spec.VolumeName, lr.ClusterName) + rootPV := &corev1.PersistentVolume{} + err := r.RootClient.Get(ctx, types.NamespacedName{Namespace: "", Name: leafPvc.Spec.VolumeName}, rootPV) + if err != nil { + return fmt.Errorf("could not get pv %s from root cluster: %v", leafPvc.Spec.VolumeName, err) + } + pv := rootPV.DeepCopy() + anno := pv.Annotations + if anno == nil { + anno = make(map[string]string) + } + anno[utils.KosmosGlobalLabel] = "true" + anno[utils.KosmosPvcImmediateMode] = "true" + pv.SetAnnotations(anno) + err = utils.ResetMetadata(pv) + if err != nil { + return err + } + pv.Spec.ClaimRef.ResourceVersion = leafPvc.ResourceVersion + pv.Spec.ClaimRef.UID = leafPvc.UID + _, err = lr.Clientset.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{}) + if err != nil { + if errors.IsAlreadyExists(err) { + return nil + } + klog.Errorf("Failed to create pv %s err: %v", pv.Name, err) + return err + } + return nil +} + func (r *RootPodReconciler) createStorageInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, gvr schema.GroupVersionResource, resourcenames []string, rootpod *corev1.Pod, cn *leafUtils.ClusterNode) error { ns := rootpod.Namespace storageHandler, err := NewStorageHandler(gvr) @@ -769,10 +856,11 @@ func (r *RootPodReconciler) createVolumes(ctx context.Context, lr *leafUtils.Lea return false, err } klog.V(4).Info("Trying to creating dependent pvc") - if err := r.createStorageInLeafCluster(ctx, lr, utils.GVR_PVC, pvcsWithoutEs, basicPod, clusterNodeInfo); err != nil { + if err := r.createPvcInLeafCluster(ctx, lr, pvcsWithoutEs, basicPod, clusterNodeInfo); err != nil { klog.Error(err) return false, nil } + klog.V(4).Infof("Create pvc %v of %v/%v success", pvcsWithoutEs, basicPod.Namespace, basicPod.Name) // } return true, nil diff --git a/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go b/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go index 0723515bc..fa284ddec 100644 --- a/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go @@ -177,15 +177,15 @@ func (l *LeafPVController) SetupWithManager(mgr manager.Manager) error { For(&v1.PersistentVolume{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(createEvent event.CreateEvent) bool { curr := createEvent.Object.(*v1.PersistentVolume) - return !podutils.IsOneWayPV(curr) + return !podutils.IsOneWayPV(curr) && !utils.IsImmediateModePvc(curr.Annotations) }, UpdateFunc: func(updateEvent event.UpdateEvent) bool { curr := updateEvent.ObjectNew.(*v1.PersistentVolume) - return !podutils.IsOneWayPV(curr) + return !podutils.IsOneWayPV(curr) && !utils.IsImmediateModePvc(curr.Annotations) }, DeleteFunc: func(deleteEvent event.DeleteEvent) bool { curr := deleteEvent.Object.(*v1.PersistentVolume) - return !podutils.IsOneWayPV(curr) + return !podutils.IsOneWayPV(curr) && !utils.IsImmediateModePvc(curr.Annotations) }, GenericFunc: func(genericEvent event.GenericEvent) bool { return false diff --git a/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go b/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go index eb7597af2..75f1b7663 100644 --- a/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go @@ -125,7 +125,7 @@ func (l *LeafPVCController) SetupWithManager(mgr manager.Manager) error { }, UpdateFunc: func(updateEvent event.UpdateEvent) bool { pvc := updateEvent.ObjectOld.(*v1.PersistentVolumeClaim) - return utils.IsObjectGlobal(&pvc.ObjectMeta) && !podutils.IsOneWayPVC(pvc) + return utils.IsObjectGlobal(&pvc.ObjectMeta) && !podutils.IsOneWayPVC(pvc) && !utils.IsImmediateModePvc(pvc.Annotations) }, DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return false diff --git a/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go b/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go index d065f74d3..e6668aa53 100644 --- a/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go @@ -133,17 +133,19 @@ func (r *RootPVCController) cleanupPvc(pvc *v1.PersistentVolumeClaim) (reconcile return reconcile.Result{}, nil } - lr, err := r.GlobalLeafManager.GetLeafResource(clusters[0]) - if err != nil { - klog.Warningf("pvc leaf %q: %q doesn't existed in LeafResources", pvc.GetNamespace(), pvc.GetName()) - return reconcile.Result{}, nil - } - - if err = lr.Clientset.CoreV1().PersistentVolumeClaims(pvc.GetNamespace()).Delete(context.TODO(), pvc.GetName(), metav1.DeleteOptions{}); err != nil { - if !errors.IsNotFound(err) { - klog.Errorf("delete pvc from leaf cluster failed, %q: %q, error: %v", pvc.GetNamespace(), pvc.GetName(), err) + for _, cluster := range clusters { + lr, err := r.GlobalLeafManager.GetLeafResource(cluster) + if err != nil { + klog.Errorf("Delete pvc %s/%s from leaf cluster %s, get leafresource error %s.", pvc.GetNamespace(), pvc.GetName(), cluster, err.Error()) return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, err } + + if err = lr.Clientset.CoreV1().PersistentVolumeClaims(pvc.GetNamespace()).Delete(context.TODO(), pvc.GetName(), metav1.DeleteOptions{}); err != nil { + if !errors.IsNotFound(err) { + klog.Errorf("delete pvc from leaf cluster failed, %q: %q, error: %v", pvc.GetNamespace(), pvc.GetName(), err) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, err + } + } } return reconcile.Result{}, nil } diff --git a/pkg/utils/constants.go b/pkg/utils/constants.go index 2de906e57..bcc4b4dc9 100644 --- a/pkg/utils/constants.go +++ b/pkg/utils/constants.go @@ -111,6 +111,7 @@ const ( KosmosNodeTaintEffect = "NoSchedule" KosmosPodLabel = "kosmos-io/pod" KosmosGlobalLabel = "kosmos.io/global" + KosmosPvcImmediateMode = "kosmos/pvc-immediate-mode" KosmosSelectorKey = "kosmos.io/cluster-selector" KosmosTrippedLabels = "kosmos-io/tripped" KosmosConvertLabels = "kosmos-io/convert-policy" diff --git a/pkg/utils/k8s.go b/pkg/utils/k8s.go index f5de46db0..dd36146de 100644 --- a/pkg/utils/k8s.go +++ b/pkg/utils/k8s.go @@ -2,6 +2,8 @@ package utils import ( "encoding/json" + "fmt" + "reflect" "strings" jsonpatch "github.com/evanphx/json-patch" @@ -133,30 +135,59 @@ func IsObjectUnstructuredGlobal(obj map[string]string) bool { return false } +func ResetMetadata(obj interface{}) error { + value := reflect.ValueOf(obj) + if value.Kind() != reflect.Ptr { + return fmt.Errorf("obj must be a pointer") + } + + value = value.Elem() + metaField := value.FieldByName("ObjectMeta") + if !metaField.IsValid() || metaField.Kind() != reflect.Struct { + return fmt.Errorf("obj does not have an ObjectMeta field") + } + metaField.FieldByName("ResourceVersion").SetString("") + metaField.FieldByName("UID").SetString("") + metaField.FieldByName("Generation").SetInt(0) + metaField.FieldByName("SelfLink").SetString("") + ownerRefsField := metaField.FieldByName("OwnerReferences") + if ownerRefsField.IsValid() && ownerRefsField.Kind() == reflect.Slice && ownerRefsField.CanSet() { + ownerRefsField.Set(reflect.MakeSlice(ownerRefsField.Type(), 0, 0)) + } + return nil +} + +func IsImmediateModePvc(annotations map[string]string) bool { + if _, ok := annotations[KosmosPvcImmediateMode]; ok { + return true + } + return false +} + func AddResourceClusters(anno map[string]string, clusterName string) map[string]string { if anno == nil { - anno = map[string]string{} + anno = make(map[string]string) + } + + ownerStr := anno[KosmosResourceOwnersAnnotations] + if ownerStr == "" { + anno[KosmosResourceOwnersAnnotations] = clusterName + return anno } - owners := strings.Split(anno[KosmosResourceOwnersAnnotations], ",") - newowners := make([]string, 0) - flag := false + owners := strings.Split(ownerStr, ",") + found := false for _, v := range owners { - if len(v) == 0 { - continue - } - newowners = append(newowners, v) - if v == clusterName { - // already existed - flag = true + if strings.TrimSpace(v) == clusterName { + found = true + break } } - if !flag { - newowners = append(newowners, clusterName) + if !found { + owners = append(owners, clusterName) + anno[KosmosResourceOwnersAnnotations] = strings.Join(owners, ",") } - - anno[KosmosResourceOwnersAnnotations] = strings.Join(newowners, ",") return anno }