Skip to content

Commit

Permalink
add limit for dependency resource
Browse files Browse the repository at this point in the history
Signed-off-by: changzhen <changzhen5@huawei.com>
  • Loading branch information
XiShanYongYe-Chang committed Oct 25, 2024
1 parent c6c20a9 commit ea0c003
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 8 deletions.
16 changes: 9 additions & 7 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,20 +762,22 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ConcurrentClusterPropagationPolicySyncs: opts.ConcurrentClusterPropagationPolicySyncs,
ConcurrentResourceTemplateSyncs: opts.ConcurrentResourceTemplateSyncs,
RateLimiterOptions: opts.RateLimiterOpts,
DisableMultiDependencyDistribution: opts.DisableMultiDependencyDistribution,
}

if err := mgr.Add(resourceDetector); err != nil {
klog.Fatalf("Failed to setup resource detector: %v", err)
}
if features.FeatureGate.Enabled(features.PropagateDeps) {
dependenciesDistributor := &dependenciesdistributor.DependenciesDistributor{
Client: mgr.GetClient(),
DynamicClient: dynamicClientSet,
InformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
RESTMapper: mgr.GetRESTMapper(),
EventRecorder: mgr.GetEventRecorderFor("dependencies-distributor"),
RateLimiterOptions: opts.RateLimiterOpts,
Client: mgr.GetClient(),
DynamicClient: dynamicClientSet,
InformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
RESTMapper: mgr.GetRESTMapper(),
EventRecorder: mgr.GetEventRecorderFor("dependencies-distributor"),
RateLimiterOptions: opts.RateLimiterOpts,
DisableMultiDependencyDistribution: opts.DisableMultiDependencyDistribution,
}
if err := dependenciesDistributor.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup dependencies distributor: %v", err)
Expand Down
14 changes: 14 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,17 @@ type Options struct {
// in scenario of dynamic replica assignment based on cluster free resources.
// Disable if it does not fit your cases for better performance.
EnableClusterResourceModeling bool

// DisableMultiDependencyDistribution indicates disable the ability to a resource from being depended on by multiple
// resources or being distributed by PropagationPolicy/ClusterPropagationPolicy while being depended on.
//
// Before v1.12, this capability is allowed by default. If you still wish to enable this capability, you can set
// this flag to false. However, you will need to bear some side effects that come with it.
// For example, you can refer to https://github.com/karmada-io/karmada/pull/5717. When the primary resource is deleted,
// it does not consider other resources that currently depend on the resource or any PropagationPolicy associated with it.
//
// It is recommended that you adapt your business accordingly to avoid continued use.
DisableMultiDependencyDistribution bool
}

// NewOptions builds an empty options.
Expand Down Expand Up @@ -224,6 +235,9 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers, disabledByDefau
flags.BoolVar(&o.EnableClusterResourceModeling, "enable-cluster-resource-modeling", true, "Enable means controller would build resource modeling for each cluster by syncing Nodes and Pods resources.\n"+
"The resource modeling might be used by the scheduler to make scheduling decisions in scenario of dynamic replica assignment based on cluster free resources.\n"+
"Disable if it does not fit your cases for better performance.")
flags.BoolVar(&o.DisableMultiDependencyDistribution, "disable-multi-dependency-distribution", true, "True value means disable the ability to a resource from being depended on by multiple resources or being distributed by PropagationPolicy/ClusterPropagationPolicy while being depended on.\n"+
"Before v1.12, this capability is allowed by default. If you still wish to enable this capability, you can set this flag to false. However, you will need to bear some side effects that come with it. For example, you can refer to https://github.com/karmada-io/karmada/pull/5717. When the primary resource is deleted, it does not consider other resources that currently depend on the resource or any PropagationPolicy associated with it.\n"+
"It is recommended that you adapt your business accordingly to avoid continued use.")

o.RateLimiterOpts.AddFlags(flags)
o.ProfileOpts.AddFlags(flags)
Expand Down
71 changes: 71 additions & 0 deletions pkg/dependenciesdistributor/dependencies_distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -29,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
Expand All @@ -47,6 +49,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
Expand Down Expand Up @@ -100,6 +103,9 @@ type DependenciesDistributor struct {
RESTMapper meta.RESTMapper
ResourceInterpreter resourceinterpreter.ResourceInterpreter
RateLimiterOptions ratelimiterflag.Options
// DisableMultiDependencyDistribution indicates disable the ability to a resource from being depended on by multiple
// resources or being distributed by PropagationPolicy/ClusterPropagationPolicy while being depended on.
DisableMultiDependencyDistribution bool

eventHandler cache.ResourceEventHandler
resourceProcessor util.AsyncWorker
Expand Down Expand Up @@ -163,6 +169,12 @@ func (d *DependenciesDistributor) reconcileResourceTemplate(key util.QueueKey) e
return fmt.Errorf("invalid key")
}
klog.V(4).Infof("DependenciesDistributor start to reconcile object: %s", resourceTemplateKey)

if d.DisableMultiDependencyDistribution && resourceTemplateClaimedByPolicy(resourceTemplateKey.Labels) {
klog.V(4).Infof("Skip object(%s) as it has been claimed by PropagationPolicy/ClusterPropagationPolicy", resourceTemplateKey)
return nil
}

bindingList := &workv1alpha2.ResourceBindingList{}
err := d.Client.List(context.TODO(), bindingList, &client.ListOptions{
Namespace: resourceTemplateKey.Namespace,
Expand Down Expand Up @@ -190,6 +202,12 @@ func (d *DependenciesDistributor) reconcileResourceTemplate(key util.QueueKey) e
return nil
}

func resourceTemplateClaimedByPolicy(resourceLabels map[string]string) bool {
_, ppClaimed := resourceLabels[policyv1alpha1.PropagationPolicyPermanentIDLabel]
_, cppClaimed := resourceLabels[policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel]
return ppClaimed || cppClaimed
}

// matchesWithBindingDependencies tells if the given object(resource template) is matched
// with the dependencies of independent resourceBinding.
func matchesWithBindingDependencies(resourceTemplateKey *LabelsKey, independentBinding *workv1alpha2.ResourceBinding) bool {
Expand Down Expand Up @@ -333,6 +351,7 @@ func (d *DependenciesDistributor) handleDependentResource(
Namespace: dependent.Namespace,
Name: dependent.Name,
}
independentBindingID := independentBinding.Labels[workv1alpha2.ResourceBindingPermanentIDLabel]

switch {
case len(dependent.Name) != 0:
Expand All @@ -344,6 +363,17 @@ func (d *DependenciesDistributor) handleDependentResource(
}
return err
}

if d.DisableMultiDependencyDistribution {
hasBeenPropagated, err := objHasBeenPropagatedByOther(d.Client, rawObject, independentBindingID)
if err != nil {
return err
}
if hasBeenPropagated {
return nil
}
}

attachedBinding := buildAttachedBinding(independentBinding, rawObject)
return d.createOrUpdateAttachedBinding(attachedBinding)
case dependent.LabelSelector != nil:
Expand All @@ -357,6 +387,16 @@ func (d *DependenciesDistributor) handleDependentResource(
return err
}
for _, rawObject := range rawObjects {
if d.DisableMultiDependencyDistribution {
hasBeenPropagated, err := objHasBeenPropagatedByOther(d.Client, rawObject, independentBindingID)
if err != nil {
return err
}
if hasBeenPropagated {
continue
}
}

attachedBinding := buildAttachedBinding(independentBinding, rawObject)
if err := d.createOrUpdateAttachedBinding(attachedBinding); err != nil {
return err
Expand All @@ -368,6 +408,37 @@ func (d *DependenciesDistributor) handleDependentResource(
return fmt.Errorf("the Name and LabelSelector in the DependentObjectReference cannot be empty at the same time")
}

func objHasBeenPropagatedByOther(c client.Client, object *unstructured.Unstructured, independentBindingID string) (bool, error) {
if resourceTemplateClaimedByPolicy(object.GetLabels()) {
klog.Warningf("Skip object(%s,kind=%s %s/%s) as it has been claimed by PropagationPolicy/ClusterPropagationPolicy",
object.GetAPIVersion(), object.GetKind(), object.GetNamespace(), object.GetName())
return true, nil
}

bindingName := names.GenerateBindingName(object.GetKind(), object.GetName())
binding := &workv1alpha2.ResourceBinding{}
err := c.Get(context.TODO(), types.NamespacedName{Namespace: object.GetNamespace(), Name: bindingName}, binding)
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}

for k, v := range binding.Labels {
if !strings.HasPrefix(k, dependedByLabelKeyPrefix) {
continue
}

if v != independentBindingID {
klog.Warningf("Skip object(%s,kind=%s %s/%s) as it has been propagated by other dependent resource",
object.GetAPIVersion(), object.GetKind(), object.GetNamespace(), object.GetName())
return true, nil
}
}
return false, nil
}

func (d *DependenciesDistributor) syncScheduleResultToAttachedBindings(ctx context.Context, independentBinding *workv1alpha2.ResourceBinding, dependencies []configv1alpha1.DependentObjectReference) (err error) {
defer func() {
if err != nil {
Expand Down
43 changes: 42 additions & 1 deletion pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
Expand Down Expand Up @@ -106,6 +107,10 @@ type ResourceDetector struct {
// the controller.
RateLimiterOptions ratelimiterflag.Options

// DisableMultiDependencyDistribution indicates disable the ability to a resource from being depended on by multiple
// resources or being distributed by PropagationPolicy/ClusterPropagationPolicy while being depended on.
DisableMultiDependencyDistribution bool

stopCh <-chan struct{}
}

Expand Down Expand Up @@ -245,10 +250,22 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
// currently we do that by setting owner reference to derived objects.
return nil
}
klog.Errorf("Failed to get unstructured object(%s), error: %v", clusterWideKeyWithConfig, err)
klog.Errorf("Failed to get unstructured object(%s), error: %v", clusterWideKeyWithConfig.ClusterWideKey, err)
return err
}

if d.DisableMultiDependencyDistribution {
skip, err := skipForPropagatedByDependent(d.Client, object)
if err != nil {
klog.Errorf("Failed to calc skipForPropagatedByDependent with object(%s), error: %v", clusterWideKeyWithConfig.ClusterWideKey, err)
return err
}
if skip {
klog.Warningf("Skip to propagate resource(%s) for it has been propagated by the dependency", clusterWideKeyWithConfig.ClusterWideKey)
return nil
}
}

resourceTemplateClaimedBy := util.GetLabelValue(object.GetLabels(), util.ResourceTemplateClaimedByLabel)
// If the resource lacks this label, it implies that the resource template can be propagated by Policy.
// For instance, once MultiClusterService takes over the Service, Policy cannot reclaim it.
Expand All @@ -260,6 +277,30 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
return d.propagateResource(object, clusterWideKey, resourceChangeByKarmada)
}

func skipForPropagatedByDependent(c client.Client, object *unstructured.Unstructured) (bool, error) {
// cluster scope resource can not be propagated by dependent
if object.GetNamespace() == "" {
return false, nil
}

_, ppClaimed := object.GetLabels()[policyv1alpha1.PropagationPolicyPermanentIDLabel]
_, cppClaimed := object.GetLabels()[policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel]
if ppClaimed || cppClaimed {
return false, nil
}

bindingName := names.GenerateBindingName(object.GetKind(), object.GetName())
binding := &workv1alpha2.ResourceBinding{}
err := c.Get(context.TODO(), types.NamespacedName{Namespace: object.GetNamespace(), Name: bindingName}, binding)
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
}

// EventFilter tells if an object should be taken care of.
//
// All objects under Karmada reserved namespace should be ignored:
Expand Down

0 comments on commit ea0c003

Please sign in to comment.