From 3d84e4c9e0e206e137a29784e9554df70ac09050 Mon Sep 17 00:00:00 2001 From: free6om Date: Mon, 30 Sep 2024 14:34:13 +0800 Subject: [PATCH] re-org code --- ...iew.kubeblocks.io_reconciliationviews.yaml | 10 - controllers/view/change_capture_store.go | 27 ++- ...alculation.go => current_state_handler.go} | 4 +- controllers/view/deletion_handler.go | 48 +++++ ...evaluation.go => desired_state_handler.go} | 45 ++++- controllers/view/dry_run_handler.go | 158 ++++++++++++++++ controllers/view/finalizer_handler.go | 52 ++++++ ...resources.go => i18n_resources_manager.go} | 0 controllers/view/informer_manager.go | 116 ++++-------- .../{plan_generation.go => plan_generator.go} | 172 ++++++------------ .../view/reconciliationview_controller.go | 15 +- ...esources_loader.go => resources_loader.go} | 0 ...es_validator.go => resources_validator.go} | 2 +- controllers/view/type.go | 2 + ...iew.kubeblocks.io_reconciliationviews.yaml | 10 - go.mod | 2 +- 16 files changed, 414 insertions(+), 249 deletions(-) rename controllers/view/{view_calculation.go => current_state_handler.go} (97%) create mode 100644 controllers/view/deletion_handler.go rename controllers/view/{view_state_evaluation.go => desired_state_handler.go} (83%) create mode 100644 controllers/view/dry_run_handler.go create mode 100644 controllers/view/finalizer_handler.go rename controllers/view/{i18n_resources.go => i18n_resources_manager.go} (100%) rename controllers/view/{plan_generation.go => plan_generator.go} (53%) rename controllers/view/{view_resources_loader.go => resources_loader.go} (100%) rename controllers/view/{view_resources_validator.go => resources_validator.go} (95%) diff --git a/config/crd/bases/view.kubeblocks.io_reconciliationviews.yaml b/config/crd/bases/view.kubeblocks.io_reconciliationviews.yaml index 322dc3737e1..cef9c5995bf 100644 --- a/config/crd/bases/view.kubeblocks.io_reconciliationviews.yaml +++ b/config/crd/bases/view.kubeblocks.io_reconciliationviews.yaml @@ -244,9 +244,6 @@ spec: Revision can be compared globally between all ObjectChanges of all Objects, to build a total order object change sequence. format: int64 type: integer - state: - description: State represents the state calculated by StateEvaluationExpression. - type: string timestamp: description: |- Timestamp is a timestamp representing the ReconciliationView Controller time when this change occurred. @@ -474,9 +471,6 @@ spec: Revision can be compared globally between all ObjectChanges of all Objects, to build a total order object change sequence. format: int64 type: integer - state: - description: State represents the state calculated by StateEvaluationExpression. - type: string timestamp: description: |- Timestamp is a timestamp representing the ReconciliationView Controller time when this change occurred. @@ -729,10 +723,6 @@ spec: Revision can be compared globally between all ObjectChanges of all Objects, to build a total order object change sequence. format: int64 type: integer - state: - description: State represents the state calculated by - StateEvaluationExpression. - type: string timestamp: description: |- Timestamp is a timestamp representing the ReconciliationView Controller time when this change occurred. diff --git a/controllers/view/change_capture_store.go b/controllers/view/change_capture_store.go index 1c49c49c981..92f5710d8f3 100644 --- a/controllers/view/change_capture_store.go +++ b/controllers/view/change_capture_store.go @@ -24,7 +24,6 @@ import ( "sync/atomic" "golang.org/x/exp/slices" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -46,13 +45,11 @@ type ChangeCaptureStore interface { } type changeCaptureStore struct { - scheme *runtime.Scheme - i18nResources *corev1.ConfigMap - defaultLocale string - locale *string - store map[model.GVKNObjKey]client.Object - clock atomic.Int64 - changes []viewv1.ObjectChange + scheme *runtime.Scheme + formatter descriptionFormatter + store map[model.GVKNObjKey]client.Object + clock atomic.Int64 + changes []viewv1.ObjectChange } func (s *changeCaptureStore) Load(objects ...client.Object) error { @@ -148,11 +145,11 @@ func (s *changeCaptureStore) GetChanges() []viewv1.ObjectChange { return s.changes } -func newChangeCaptureStore(scheme *runtime.Scheme, resource *corev1.ConfigMap) ChangeCaptureStore { +func newChangeCaptureStore(scheme *runtime.Scheme, formatter descriptionFormatter) ChangeCaptureStore { return &changeCaptureStore{ - scheme: scheme, - i18nResources: resource, - store: make(map[model.GVKNObjKey]client.Object), + scheme: scheme, + store: make(map[model.GVKNObjKey]client.Object), + formatter: formatter, } } @@ -164,7 +161,7 @@ func (s *changeCaptureStore) captureCreation(objectRef *model.GVKNObjKey, object changes := buildChanges( make(map[model.GVKNObjKey]client.Object), map[model.GVKNObjKey]client.Object{*objectRef: object}, - buildDescriptionFormatter(s.i18nResources, s.defaultLocale, s.locale)) + s.formatter) s.changes = append(s.changes, changes...) } @@ -172,7 +169,7 @@ func (s *changeCaptureStore) captureUpdate(objectRef *model.GVKNObjKey, obj clie changes := buildChanges( map[model.GVKNObjKey]client.Object{*objectRef: obj}, map[model.GVKNObjKey]client.Object{*objectRef: object}, - buildDescriptionFormatter(s.i18nResources, s.defaultLocale, s.locale)) + s.formatter) s.changes = append(s.changes, changes...) } @@ -180,7 +177,7 @@ func (s *changeCaptureStore) captureDeletion(objectRef *model.GVKNObjKey, object changes := buildChanges( map[model.GVKNObjKey]client.Object{*objectRef: object}, make(map[model.GVKNObjKey]client.Object), - buildDescriptionFormatter(s.i18nResources, s.defaultLocale, s.locale)) + s.formatter) s.changes = append(s.changes, changes...) } diff --git a/controllers/view/view_calculation.go b/controllers/view/current_state_handler.go similarity index 97% rename from controllers/view/view_calculation.go rename to controllers/view/current_state_handler.go index e5c269828a3..2b5ee738b73 100644 --- a/controllers/view/view_calculation.go +++ b/controllers/view/current_state_handler.go @@ -21,11 +21,11 @@ package view import ( "context" - "k8s.io/apimachinery/pkg/util/sets" "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" kbappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" @@ -184,7 +184,7 @@ func filterEvents(eventLister func() ([]client.Object, error), objectMap map[mod return matchedEventMap, nil } -func viewCalculation(ctx context.Context, cli client.Client, scheme *runtime.Scheme, store ObjectRevisionStore) kubebuilderx.Reconciler { +func updateCurrentState(ctx context.Context, cli client.Client, scheme *runtime.Scheme, store ObjectRevisionStore) kubebuilderx.Reconciler { return &viewCalculator{ ctx: ctx, cli: cli, diff --git a/controllers/view/deletion_handler.go b/controllers/view/deletion_handler.go new file mode 100644 index 00000000000..7050b689d70 --- /dev/null +++ b/controllers/view/deletion_handler.go @@ -0,0 +1,48 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package view + +import ( + "github.com/apecloud/kubeblocks/pkg/controller/kubebuilderx" + "github.com/apecloud/kubeblocks/pkg/controller/model" +) + +type deletionHandler struct{} + +func (h *deletionHandler) PreCondition(tree *kubebuilderx.ObjectTree) *kubebuilderx.CheckResult { + if tree.GetRoot() == nil || !model.IsObjectDeleting(tree.GetRoot()) { + return kubebuilderx.ConditionUnsatisfied + } + return kubebuilderx.ConditionSatisfied +} + +func (h *deletionHandler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx.Result, error) { + // TODO(free6om) + // unwatch resources + // store cleanup + // remove finalizer + return kubebuilderx.Commit, nil +} + +func handleDeletion() kubebuilderx.Reconciler { + return &deletionHandler{} +} + +var _ kubebuilderx.Reconciler = &deletionHandler{} diff --git a/controllers/view/view_state_evaluation.go b/controllers/view/desired_state_handler.go similarity index 83% rename from controllers/view/view_state_evaluation.go rename to controllers/view/desired_state_handler.go index 0b8a40dd1cf..bb333075964 100644 --- a/controllers/view/view_state_evaluation.go +++ b/controllers/view/desired_state_handler.go @@ -27,6 +27,7 @@ import ( "github.com/google/cel-go/checker/decls" "github.com/google/cel-go/common/types" "google.golang.org/protobuf/types/known/structpb" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -39,7 +40,7 @@ import ( type stateEvaluation struct { ctx context.Context - reader client.Reader + cli client.Client store ObjectRevisionStore scheme *runtime.Scheme } @@ -53,6 +54,11 @@ func (s *stateEvaluation) PreCondition(tree *kubebuilderx.ObjectTree) *kubebuild func (s *stateEvaluation) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx.Result, error) { view, _ := tree.GetRoot().(*viewv1.ReconciliationView) + objs := tree.List(&corev1.ConfigMap{}) + var i18nResource *corev1.ConfigMap + if len(objs) > 0 { + i18nResource, _ = objs[0].(*corev1.ConfigMap) + } // build new object set from cache root := &kbappsv1.Cluster{} @@ -63,7 +69,7 @@ func (s *stateEvaluation) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx Name: view.Spec.TargetObject.Name, } } - if err := s.reader.Get(s.ctx, objectKey, root); err != nil { + if err := s.cli.Get(s.ctx, objectKey, root); err != nil { return kubebuilderx.Commit, err } @@ -79,6 +85,7 @@ func (s *stateEvaluation) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx Kind: kbappsv1.ClusterKind, } latestReconciliationCycleStart := 0 + var initialRoot *kbappsv1.Cluster for i := len(view.Status.CurrentState.Changes) - 1; i >= 0; i-- { change := view.Status.CurrentState.Changes[i] objType := objectReferenceToType(&change.ObjectReference) @@ -107,6 +114,7 @@ func (s *stateEvaluation) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx } if state && firstFalseStateFound { latestReconciliationCycleStart = i + initialRoot, _ = obj.(*kbappsv1.Cluster) break } } @@ -125,10 +133,21 @@ func (s *stateEvaluation) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx // build new InitialObjectTree var err error - view.Status.InitialObjectTree, err = getObjectTreeWithRevision(root, kbOwnershipRules, s.store, view.Status.CurrentState.Changes[latestReconciliationCycleStart].Revision, s.scheme) + view.Status.InitialObjectTree, err = getObjectTreeWithRevision(initialRoot, kbOwnershipRules, s.store, view.Status.CurrentState.Changes[latestReconciliationCycleStart].Revision, s.scheme) + if err != nil { + return kubebuilderx.Commit, err + } + + // update desired state + generator := newPlanGenerator(s.ctx, s.cli, s.scheme, + treeObjectLoader(view.Status.InitialObjectTree, s.store, s.scheme), + buildDescriptionFormatter(i18nResource, defaultLocale, view.Spec.Locale)) + patch := client.MergeFrom(root) + plan, err := generator.generatePlan(initialRoot, patch) if err != nil { return kubebuilderx.Commit, err } + view.Status.DesiredState = &plan.Plan // delete unused object revisions for i := 0; i < latestReconciliationCycleStart; i++ { @@ -138,12 +157,21 @@ func (s *stateEvaluation) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx } // TODO(free6om): delete unused event revisions - // truncate view + // truncate outage changes view.Status.CurrentState.Changes = view.Status.CurrentState.Changes[latestReconciliationCycleStart:] return kubebuilderx.Continue, nil } +func updateDesiredState(ctx context.Context, cli client.Client, scheme *runtime.Scheme, store ObjectRevisionStore) kubebuilderx.Reconciler { + return &stateEvaluation{ + ctx: ctx, + cli: cli, + scheme: scheme, + store: store, + } +} + func doStateEvaluation(object client.Object, expression viewv1.StateEvaluationExpression) (bool, error) { if expression.CELExpression == nil { return false, fmt.Errorf("CEL expression can't be empty") @@ -199,12 +227,9 @@ func doStateEvaluation(object client.Object, expression viewv1.StateEvaluationEx return result, nil } -func viewStateEvaluation(ctx context.Context, reader client.Reader, scheme *runtime.Scheme, store ObjectRevisionStore) kubebuilderx.Reconciler { - return &stateEvaluation{ - ctx: ctx, - reader: reader, - scheme: scheme, - store: store, +func treeObjectLoader(tree *viewv1.ObjectTreeNode, store ObjectRevisionStore, scheme *runtime.Scheme) objectLoader { + return func() (map[model.GVKNObjKey]client.Object, error) { + return getObjectsFromTree(tree, store, scheme) } } diff --git a/controllers/view/dry_run_handler.go b/controllers/view/dry_run_handler.go new file mode 100644 index 00000000000..915d256658a --- /dev/null +++ b/controllers/view/dry_run_handler.go @@ -0,0 +1,158 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package view + +import ( + "context" + "fmt" + "hash/fnv" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/yaml" + + kbappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" + viewv1 "github.com/apecloud/kubeblocks/apis/view/v1" + "github.com/apecloud/kubeblocks/pkg/constant" + "github.com/apecloud/kubeblocks/pkg/controller/kubebuilderx" + "github.com/apecloud/kubeblocks/pkg/controller/model" +) + +type dryRunner struct { + ctx context.Context + cli client.Client + scheme *runtime.Scheme +} + +func (r *dryRunner) PreCondition(tree *kubebuilderx.ObjectTree) *kubebuilderx.CheckResult { + if tree.GetRoot() == nil || model.IsObjectDeleting(tree.GetRoot()) { + return kubebuilderx.ConditionUnsatisfied + } + v, _ := tree.GetRoot().(*viewv1.ReconciliationView) + if isDesiredSpecChanged(v) { + return kubebuilderx.ConditionSatisfied + } + return kubebuilderx.ConditionUnsatisfied +} + +func (r *dryRunner) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx.Result, error) { + view, _ := tree.GetRoot().(*viewv1.ReconciliationView) + objs := tree.List(&corev1.ConfigMap{}) + var i18nResource *corev1.ConfigMap + if len(objs) > 0 { + i18nResource, _ = objs[0].(*corev1.ConfigMap) + } + + root := &kbappsv1.Cluster{} + objectKey := client.ObjectKeyFromObject(view) + if view.Spec.TargetObject != nil { + objectKey = client.ObjectKey{ + Namespace: view.Spec.TargetObject.Namespace, + Name: view.Spec.TargetObject.Name, + } + } + if err := r.cli.Get(r.ctx, objectKey, root); err != nil { + return kubebuilderx.Commit, err + } + + generator := newPlanGenerator(r.ctx, r.cli, r.scheme, + cacheObjectLoader(r.ctx, r.cli, root, kbOwnershipRules), + buildDescriptionFormatter(i18nResource, defaultLocale, view.Spec.Locale)) + + plan, err := generator.generatePlan(root, strategicMergeFrom(view.Spec.DryRun.DesiredSpec)) + if err != nil { + return kubebuilderx.Commit, err + } + plan.DesiredSpecRevision = getDesiredSpecRevision(view.Spec.DryRun.DesiredSpec) + view.Status.DryRunResult = plan + + return kubebuilderx.Continue, nil +} + +func dryRun(ctx context.Context, cli client.Client, scheme *runtime.Scheme) kubebuilderx.Reconciler { + return &dryRunner{ + ctx: context.WithValue(ctx, constant.DryRunContextKey, true), + cli: cli, + scheme: scheme, + } +} + +func isDesiredSpecChanged(v *viewv1.ReconciliationView) bool { + if v.Spec.DryRun == nil && v.Status.DryRunResult == nil { + return false + } + if v.Spec.DryRun == nil || v.Status.DryRunResult == nil { + return true + } + revision := getDesiredSpecRevision(v.Spec.DryRun.DesiredSpec) + return revision != v.Status.DryRunResult.DesiredSpecRevision +} + +func getDesiredSpecRevision(desiredSpec string) string { + hf := fnv.New32() + _, _ = hf.Write([]byte(desiredSpec)) + return rand.SafeEncodeString(fmt.Sprint(hf.Sum32())) +} + +func cacheObjectLoader(ctx context.Context, cli client.Client, root *kbappsv1.Cluster, rules []OwnershipRule) objectLoader { + return func() (map[model.GVKNObjKey]client.Object, error) { + return getObjectsFromCache(ctx, cli, root, rules) + } +} + +type stringStrategicMergeFromPatch struct { + from string +} + +func (p *stringStrategicMergeFromPatch) Type() types.PatchType { + return types.StrategicMergePatchType +} + +func (p *stringStrategicMergeFromPatch) Data(obj client.Object) ([]byte, error) { + // Convert the desiredSpec YAML string to a map + specMap := make(map[string]interface{}) + if err := yaml.Unmarshal([]byte(p.from), &specMap); err != nil { + return nil, fmt.Errorf("failed to unmarshal desiredSpec: %w", err) + } + + // Extract the current spec and apply the patch + currentSpec, err := getSpecFieldAsStruct(obj) + if err != nil { + return nil, fmt.Errorf("failed to get current spec: %w", err) + } + + // Create a strategic merge patch + return strategicpatch.CreateTwoWayMergePatch( + specMapToJSON(currentSpec), + specMapToJSON(specMap), + currentSpec, + ) +} + +func strategicMergeFrom(desiredSpec string) client.Patch { + return &stringStrategicMergeFromPatch{from: desiredSpec} +} + +var _ kubebuilderx.Reconciler = &dryRunner{} +var _ client.Patch = &stringStrategicMergeFromPatch{} diff --git a/controllers/view/finalizer_handler.go b/controllers/view/finalizer_handler.go new file mode 100644 index 00000000000..4a00a76a5bc --- /dev/null +++ b/controllers/view/finalizer_handler.go @@ -0,0 +1,52 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package view + +import ( + "github.com/apecloud/kubeblocks/pkg/controller/kubebuilderx" + "github.com/apecloud/kubeblocks/pkg/controller/model" +) + +type finalizerHandler struct{} + +func (f *finalizerHandler) PreCondition(tree *kubebuilderx.ObjectTree) *kubebuilderx.CheckResult { + if tree.GetRoot() == nil || model.IsObjectDeleting(tree.GetRoot()) { + return kubebuilderx.ConditionUnsatisfied + } + for _, f := range tree.GetRoot().GetFinalizers() { + if f == finalizer { + return kubebuilderx.ConditionUnsatisfied + } + } + return kubebuilderx.ConditionSatisfied +} + +func (f *finalizerHandler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx.Result, error) { + finalizers := tree.GetRoot().GetFinalizers() + finalizers = append(finalizers, finalizer) + tree.GetRoot().SetFinalizers(finalizers) + return kubebuilderx.Commit, nil +} + +func assureFinalizer() kubebuilderx.Reconciler { + return &finalizerHandler{} +} + +var _ kubebuilderx.Reconciler = &finalizerHandler{} diff --git a/controllers/view/i18n_resources.go b/controllers/view/i18n_resources_manager.go similarity index 100% rename from controllers/view/i18n_resources.go rename to controllers/view/i18n_resources_manager.go diff --git a/controllers/view/informer_manager.go b/controllers/view/informer_manager.go index e3634b34ad1..12b69f99e7e 100644 --- a/controllers/view/informer_manager.go +++ b/controllers/view/informer_manager.go @@ -37,15 +37,10 @@ import ( viewv1 "github.com/apecloud/kubeblocks/apis/view/v1" "github.com/apecloud/kubeblocks/pkg/constant" - "github.com/apecloud/kubeblocks/pkg/controller/kubebuilderx" - "github.com/apecloud/kubeblocks/pkg/controller/model" ) type InformerManager interface { - SetContext(ctx context.Context) - Start() error - Watch(watcher client.Object, watched schema.GroupVersionKind) error - UnWatch(watcher client.Object, watched schema.GroupVersionKind) error + Start(context.Context) error } type informerManager struct { @@ -53,12 +48,12 @@ type informerManager struct { eventChan chan event.GenericEvent - informerRefCounter map[schema.GroupVersionKind]sets.Set[model.GVKNObjKey] - refCounterLock sync.Mutex + informerSet sets.Set[schema.GroupVersionKind] + informerSetLock sync.Mutex cache cache.Cache - ctx context.Context cli client.Client + ctx context.Context handler handler.EventHandler @@ -69,11 +64,13 @@ type informerManager struct { scheme *runtime.Scheme } -func (m *informerManager) SetContext(ctx context.Context) { +func (m *informerManager) Start(ctx context.Context) error { m.ctx = ctx -} -func (m *informerManager) Start() error { + if err := m.watchKubeBlocksRelatedResources(); err != nil { + return err + } + m.once.Do(func() { go func() { for m.processNextWorkItem() { @@ -84,47 +81,33 @@ func (m *informerManager) Start() error { return nil } -func (m *informerManager) Watch(watcher client.Object, watched schema.GroupVersionKind) error { - m.refCounterLock.Lock() - defer m.refCounterLock.Unlock() +func (m *informerManager) watch(resource schema.GroupVersionKind) error { + m.informerSetLock.Lock() + defer m.informerSetLock.Unlock() - watchers, ok := m.informerRefCounter[watched] - if !ok { - watchers = sets.New[model.GVKNObjKey]() - m.informerRefCounter[watched] = watchers - } - watcherRef, err := getObjectRef(watcher, m.scheme) - if err != nil { - return err - } - if watchers.Has(*watcherRef) { + if _, ok := m.informerSet[resource]; ok { return nil } - if err := m.createInformer(watched); err != nil { - return nil + if err := m.createInformer(resource); err != nil { + return err } - watchers.Insert(*watcherRef) + m.informerSet.Insert(resource) + return nil } -func (m *informerManager) UnWatch(watcher client.Object, watched schema.GroupVersionKind) error { - m.refCounterLock.Lock() - defer m.refCounterLock.Unlock() +func (m *informerManager) unWatch(resource schema.GroupVersionKind) error { + m.informerSetLock.Lock() + defer m.informerSetLock.Unlock() - watchers, ok := m.informerRefCounter[watched] - if !ok { + if _, ok := m.informerSet[resource]; !ok { return nil } - watcherRef, err := getObjectRef(watcher, m.scheme) - if err != nil { + if err := m.deleteInformer(resource); err != nil { return err } - watchers.Delete(*watcherRef) - if watchers.Len() == 0 { - if err := m.deleteInformer(watched); err != nil { - return err - } - } + m.informerSet.Delete(resource) + return nil } @@ -208,27 +191,19 @@ func (e *eventProxy) Generic(ctx context.Context, evt event.GenericEvent, q work func NewInformerManager(cli client.Client, cache cache.Cache, scheme *runtime.Scheme, eventChan chan event.GenericEvent) InformerManager { return &informerManager{ - cli: cli, - cache: cache, - scheme: scheme, - eventChan: eventChan, - handler: &eventProxy{}, - informerRefCounter: make(map[schema.GroupVersionKind]sets.Set[model.GVKNObjKey]), + cli: cli, + cache: cache, + scheme: scheme, + eventChan: eventChan, + handler: &eventProxy{}, + informerSet: sets.New[schema.GroupVersionKind](), queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{ Name: "informer-manager", }), } } -type informerManagerReconciler struct { - manager InformerManager -} - -func (r *informerManagerReconciler) PreCondition(tree *kubebuilderx.ObjectTree) *kubebuilderx.CheckResult { - return kubebuilderx.ConditionSatisfied -} - -func (r *informerManagerReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx.Result, error) { +func (m *informerManager) watchKubeBlocksRelatedResources() error { gvks := sets.New[schema.GroupVersionKind]() parseGVK := func(ot *viewv1.ObjectType) error { gvk, err := objectTypeToGVK(ot) @@ -243,40 +218,25 @@ func (r *informerManagerReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (ku APIVersion: corev1.SchemeGroupVersion.String(), Kind: constant.EventKind, }); err != nil { - return kubebuilderx.Commit, err + return err } for _, rule := range kbOwnershipRules { if err := parseGVK(&rule.Primary); err != nil { - return kubebuilderx.Commit, err + return err } for _, resource := range rule.OwnedResources { if err := parseGVK(&resource.Secondary); err != nil { - return kubebuilderx.Commit, err + return err } } } - v, _ := tree.GetRoot().(*viewv1.ReconciliationView) - if model.IsObjectDeleting(tree.GetRoot()) { - for gvk, _ := range gvks { - if err := r.manager.UnWatch(v, gvk); err != nil { - return kubebuilderx.Commit, err - } - } - } else { - for gvk, _ := range gvks { - if err := r.manager.Watch(v, gvk); err != nil { - return kubebuilderx.Commit, err - } + for gvk, _ := range gvks { + if err := m.watch(gvk); err != nil { + return err } } - - return kubebuilderx.Continue, nil -} - -func updateInformerManager(manager InformerManager) kubebuilderx.Reconciler { - return &informerManagerReconciler{manager: manager} + return nil } var _ InformerManager = &informerManager{} -var _ kubebuilderx.Reconciler = &informerManagerReconciler{} var _ handler.EventHandler = &eventProxy{} diff --git a/controllers/view/plan_generation.go b/controllers/view/plan_generator.go similarity index 53% rename from controllers/view/plan_generation.go rename to controllers/view/plan_generator.go index a82f32126c7..7f6eac54f7f 100644 --- a/controllers/view/plan_generation.go +++ b/controllers/view/plan_generator.go @@ -23,85 +23,45 @@ import ( "context" "encoding/json" "fmt" - "hash/fnv" "reflect" "time" + jsonpatch "github.com/evanphx/json-patch/v5" "github.com/google/go-cmp/cmp" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/yaml" kbappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" viewv1 "github.com/apecloud/kubeblocks/apis/view/v1" - "github.com/apecloud/kubeblocks/pkg/constant" - "github.com/apecloud/kubeblocks/pkg/controller/kubebuilderx" "github.com/apecloud/kubeblocks/pkg/controller/model" ) -type planGenerator struct { - ctx context.Context - cli client.Client -} - -func (g *planGenerator) PreCondition(tree *kubebuilderx.ObjectTree) *kubebuilderx.CheckResult { - if tree.GetRoot() == nil || model.IsObjectDeleting(tree.GetRoot()) { - return kubebuilderx.ConditionUnsatisfied - } - v, _ := tree.GetRoot().(*viewv1.ReconciliationView) - if isDesiredSpecChanged(v) { - return kubebuilderx.ConditionSatisfied - } - return kubebuilderx.ConditionUnsatisfied +type PlanGenerator interface { + generatePlan(root *kbappsv1.Cluster, patch client.Patch) (*viewv1.DryRunResult, error) } -func isDesiredSpecChanged(v *viewv1.ReconciliationView) bool { - if v.Spec.DryRun == nil && v.Status.DryRunResult == nil { - return false - } - if v.Spec.DryRun == nil || v.Status.DryRunResult == nil { - return true - } - revision := getDesiredSpecRevision(v.Spec.DryRun.DesiredSpec) - return revision != v.Status.DryRunResult.DesiredSpecRevision -} +type objectLoader func() (map[model.GVKNObjKey]client.Object, error) +type descriptionFormatter func(client.Object, client.Object, viewv1.ObjectChangeType, *schema.GroupVersionKind) (string, *string) -func getDesiredSpecRevision(desiredSpec string) string { - hf := fnv.New32() - _, _ = hf.Write([]byte(desiredSpec)) - return rand.SafeEncodeString(fmt.Sprint(hf.Sum32())) +type planGenerator struct { + ctx context.Context + cli client.Client + scheme *runtime.Scheme + loader objectLoader + formatter descriptionFormatter } -func (g *planGenerator) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx.Result, error) { - view, _ := tree.GetRoot().(*viewv1.ReconciliationView) - objs := tree.List(&corev1.ConfigMap{}) - var i18nResource *corev1.ConfigMap - if len(objs) > 0 { - i18nResource, _ = objs[0].(*corev1.ConfigMap) - } - - root := &kbappsv1.Cluster{} - objectKey := client.ObjectKeyFromObject(view) - if view.Spec.TargetObject != nil { - objectKey = client.ObjectKey{ - Namespace: view.Spec.TargetObject.Namespace, - Name: view.Spec.TargetObject.Name, - } - } - if err := g.cli.Get(g.ctx, objectKey, root); err != nil { - return kubebuilderx.Commit, err - } - +func (g *planGenerator) generatePlan(root *kbappsv1.Cluster, patch client.Patch) (*viewv1.DryRunResult, error) { // create mock client and mock event recorder // kbagent client is running in dry-run mode by setting context key-value pair: dry-run=true - store := newChangeCaptureStore(g.cli.Scheme(), i18nResource) + store := newChangeCaptureStore(g.scheme, g.formatter) mClient, err := newMockClient(g.cli, store, kbOwnershipRules) if err != nil { - return kubebuilderx.Commit, err + return nil, err } mEventRecorder := newMockEventRecorder(store) @@ -111,22 +71,22 @@ func (g *planGenerator) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx.R // 3. encapsulate KB controller as reconciler reconcilerTree, err := newReconcilerTree(g.ctx, mClient, mEventRecorder, kbOwnershipRules) if err != nil { - return kubebuilderx.Commit, err + return nil, err } // load current object tree into store - if err = loadCurrentObjectTree(g.ctx, g.cli, root, kbOwnershipRules, store); err != nil { - return kubebuilderx.Commit, err + if err = loadCurrentObjectTree(g.loader, store); err != nil { + return nil, err } initialObjectMap := store.GetAll() // apply dryRun.desiredSpec to target cluster object var specDiff string - if specDiff, err = applyDesiredSpec(view.Spec.DryRun.DesiredSpec, root); err != nil { - return kubebuilderx.Commit, err + if specDiff, err = applyPatch(root, patch); err != nil { + return nil, err } if err = mClient.Update(g.ctx, root); err != nil { - return kubebuilderx.Commit, err + return nil, err } // generate plan with timeout @@ -154,14 +114,8 @@ func (g *planGenerator) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx.R } // update dry-run result - // // update spec info - dryRunResult := view.Status.DryRunResult - if dryRunResult == nil { - dryRunResult = &viewv1.DryRunResult{} - view.Status.DryRunResult = dryRunResult - } - dryRunResult.DesiredSpecRevision = getDesiredSpecRevision(view.Spec.DryRun.DesiredSpec) + dryRunResult := &viewv1.DryRunResult{} dryRunResult.ObservedTargetGeneration = root.Generation dryRunResult.SpecDiff = specDiff @@ -180,89 +134,81 @@ func (g *planGenerator) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx.R } // update plan - if err = g.cli.Get(g.ctx, objectKey, root); err != nil { - return kubebuilderx.Commit, err - } - desiredRoot := &kbappsv1.Cluster{} - if err = mClient.Get(g.ctx, objectKey, desiredRoot); err != nil { - return kubebuilderx.Commit, err - } - desiredTree, err := getObjectTreeFromCache(g.ctx, mClient, desiredRoot, kbOwnershipRules) + desiredTree, err := getObjectTreeFromCache(g.ctx, mClient, root, kbOwnershipRules) if err != nil { - return kubebuilderx.Commit, err + return nil, err } dryRunResult.Plan.ObjectTree = desiredTree dryRunResult.Plan.Changes = store.GetChanges() newObjectMap := store.GetAll() dryRunResult.Plan.Summary.ObjectSummaries = buildObjectSummaries(initialObjectMap, newObjectMap) - return kubebuilderx.Continue, nil + return dryRunResult, nil } -func planGeneration(ctx context.Context, cli client.Client) kubebuilderx.Reconciler { +func newPlanGenerator(ctx context.Context, cli client.Client, scheme *runtime.Scheme, loader objectLoader, formatter descriptionFormatter) PlanGenerator { return &planGenerator{ - ctx: context.WithValue(ctx, constant.DryRunContextKey, true), - cli: cli, + ctx: ctx, + cli: cli, + scheme: scheme, + loader: loader, + formatter: formatter, } } -func applyDesiredSpec(desiredSpec string, obj client.Object) (string, error) { - // Convert the desiredSpec YAML string to a map - specMap := make(map[string]interface{}) - if err := yaml.Unmarshal([]byte(desiredSpec), &specMap); err != nil { - return "", fmt.Errorf("failed to unmarshal desiredSpec: %w", err) - } - +func applyPatch(obj client.Object, patch client.Patch) (string, error) { // Extract the current spec and apply the patch currentSpec, err := getSpecFieldAsStruct(obj) if err != nil { - return "", fmt.Errorf("failed to get current spec: %w", err) + return "", err } - // Create a strategic merge patch - patch, err := strategicpatch.CreateTwoWayMergePatch( - specMapToJSON(currentSpec), - specMapToJSON(specMap), - currentSpec, - ) + patchData, err := patch.Data(obj) if err != nil { - return "", fmt.Errorf("failed to create merge patch: %w", err) + return "", err } - // Apply the patch to the current spec - modifiedSpec, err := strategicpatch.StrategicMergePatch( - specMapToJSON(currentSpec), - patch, - currentSpec, - ) - if err != nil { - return "", fmt.Errorf("failed to apply merge patch: %w", err) + var modifiedSpec []byte + if patch.Type() == types.StrategicMergePatchType { + modifiedSpec, err = strategicpatch.StrategicMergePatch( + specMapToJSON(currentSpec), + patchData, + currentSpec, + ) + if err != nil { + return "", err + } + } else if patch.Type() == types.MergePatchType { + modifiedSpec, err = jsonpatch.MergePatch(specMapToJSON(currentSpec), patchData) + if err != nil { + return "", err + } } modifiedSpecMap := make(map[string]interface{}) if err = json.Unmarshal(modifiedSpec, &modifiedSpecMap); err != nil { - return "", fmt.Errorf("failed to unmarshal final spec: %w", err) + return "", err } // Convert the object to an unstructured map objMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { - return "", fmt.Errorf("failed to convert object to unstructured: %w", err) + return "", err } // Extract the initial spec initialSpec, _, err := unstructured.NestedMap(objMap, "spec") if err != nil { - return "", fmt.Errorf("failed to get current spec: %w", err) + return "", err } // Update the spec in the object map if err := unstructured.SetNestedField(objMap, modifiedSpecMap, "spec"); err != nil { - return "", fmt.Errorf("failed to set modified spec: %w", err) + return "", err } // Convert the modified map back to the original object if err := runtime.DefaultUnstructuredConverter.FromUnstructured(objMap, obj); err != nil { - return "", fmt.Errorf("failed to convert back to object: %w", err) + return "", err } // build the spec change @@ -291,8 +237,8 @@ func getSpecFieldAsStruct(obj client.Object) (interface{}, error) { return specField.Interface(), nil } -func loadCurrentObjectTree(ctx context.Context, cli client.Client, root *kbappsv1.Cluster, ownershipRules []OwnershipRule, store ChangeCaptureStore) error { - objectMap, err := getObjectsFromCache(ctx, cli, root, ownershipRules) +func loadCurrentObjectTree(loader objectLoader, store ChangeCaptureStore) error { + objectMap, err := loader() if err != nil { return err } @@ -304,4 +250,4 @@ func loadCurrentObjectTree(ctx context.Context, cli client.Client, root *kbappsv return nil } -var _ kubebuilderx.Reconciler = &planGenerator{} +var _ PlanGenerator = &planGenerator{} diff --git a/controllers/view/reconciliationview_controller.go b/controllers/view/reconciliationview_controller.go index 88a25ce7b37..79d216c2324 100644 --- a/controllers/view/reconciliationview_controller.go +++ b/controllers/view/reconciliationview_controller.go @@ -62,12 +62,12 @@ func (r *ReconciliationViewReconciler) Reconcile(ctx context.Context, req ctrl.R res, err := kubebuilderx.NewController(ctx, r.Client, req, r.Recorder, logger). Prepare(viewResources()). - Do(viewResourcesValidation(ctx, r.Client)). - // TODO(free6om): do store cleanup at view deletion - Do(updateInformerManager(r.InformerManager)). - Do(viewCalculation(ctx, r.Client, r.Scheme, r.ObjectStore)). - Do(viewStateEvaluation(ctx, r.Client, r.Scheme, r.ObjectStore)). - Do(planGeneration(ctx, r.Client)). + Do(resourcesValidation(ctx, r.Client)). + Do(assureFinalizer()). + Do(handleDeletion()). + Do(dryRun(ctx, r.Client, r.Scheme)). + Do(updateCurrentState(ctx, r.Client, r.Scheme, r.ObjectStore)). + Do(updateDesiredState(ctx, r.Client, r.Scheme, r.ObjectStore)). Commit() // TODO(free6om): err handling @@ -80,9 +80,6 @@ func (r *ReconciliationViewReconciler) SetupWithManager(mgr ctrl.Manager) error r.ObjectStore = NewObjectStore(r.Scheme) r.ObjectTreeRootFinder = NewObjectTreeRootFinder(r.Client) r.InformerManager = NewInformerManager(r.Client, mgr.GetCache(), r.Scheme, r.ObjectTreeRootFinder.GetEventChannel()) - if err := r.InformerManager.Start(); err != nil { - return err - } return ctrl.NewControllerManagedBy(mgr). For(&viewv1.ReconciliationView{}). diff --git a/controllers/view/view_resources_loader.go b/controllers/view/resources_loader.go similarity index 100% rename from controllers/view/view_resources_loader.go rename to controllers/view/resources_loader.go diff --git a/controllers/view/view_resources_validator.go b/controllers/view/resources_validator.go similarity index 95% rename from controllers/view/view_resources_validator.go rename to controllers/view/resources_validator.go index f31f405de50..3917e26711f 100644 --- a/controllers/view/view_resources_validator.go +++ b/controllers/view/resources_validator.go @@ -61,7 +61,7 @@ func (r *resourcesValidator) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuild return kubebuilderx.Continue, nil } -func viewResourcesValidation(ctx context.Context, reader client.Reader) kubebuilderx.Reconciler { +func resourcesValidation(ctx context.Context, reader client.Reader) kubebuilderx.Reconciler { return &resourcesValidator{ctx: ctx, reader: reader} } diff --git a/controllers/view/type.go b/controllers/view/type.go index 8f159c0df7b..7632d27914e 100644 --- a/controllers/view/type.go +++ b/controllers/view/type.go @@ -47,6 +47,8 @@ import ( dptypes "github.com/apecloud/kubeblocks/pkg/dataprotection/types" ) +const finalizer = "view.kubeblocks.io/finalizer" + var ( clusterCriteria = OwnershipCriteria{ LabelCriteria: map[string]string{ diff --git a/deploy/helm/crds/view.kubeblocks.io_reconciliationviews.yaml b/deploy/helm/crds/view.kubeblocks.io_reconciliationviews.yaml index 322dc3737e1..cef9c5995bf 100644 --- a/deploy/helm/crds/view.kubeblocks.io_reconciliationviews.yaml +++ b/deploy/helm/crds/view.kubeblocks.io_reconciliationviews.yaml @@ -244,9 +244,6 @@ spec: Revision can be compared globally between all ObjectChanges of all Objects, to build a total order object change sequence. format: int64 type: integer - state: - description: State represents the state calculated by StateEvaluationExpression. - type: string timestamp: description: |- Timestamp is a timestamp representing the ReconciliationView Controller time when this change occurred. @@ -474,9 +471,6 @@ spec: Revision can be compared globally between all ObjectChanges of all Objects, to build a total order object change sequence. format: int64 type: integer - state: - description: State represents the state calculated by StateEvaluationExpression. - type: string timestamp: description: |- Timestamp is a timestamp representing the ReconciliationView Controller time when this change occurred. @@ -729,10 +723,6 @@ spec: Revision can be compared globally between all ObjectChanges of all Objects, to build a total order object change sequence. format: int64 type: integer - state: - description: State represents the state calculated by - StateEvaluationExpression. - type: string timestamp: description: |- Timestamp is a timestamp representing the ReconciliationView Controller time when this change occurred. diff --git a/go.mod b/go.mod index b5a15da2302..c2a45e1fc17 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/containers/common v0.55.4 github.com/docker/docker v25.0.6+incompatible github.com/evanphx/json-patch v5.6.0+incompatible + github.com/evanphx/json-patch/v5 v5.8.0 github.com/fasthttp/router v1.4.20 github.com/fsnotify/fsnotify v1.7.0 github.com/go-logr/logr v1.4.1 @@ -115,7 +116,6 @@ require ( github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/emicklei/proto v1.10.0 // indirect - github.com/evanphx/json-patch/v5 v5.8.0 // indirect github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect github.com/fatih/camelcase v1.0.0 // indirect github.com/fatih/color v1.16.0 // indirect