Skip to content

Commit

Permalink
fix: Resource version incorrectly overridden for wfInformer list requ…
Browse files Browse the repository at this point in the history
  • Loading branch information
terrytangyuan authored and jiachengxu committed Nov 17, 2023
1 parent e05a5e5 commit 9db0bd0
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 14 deletions.
12 changes: 6 additions & 6 deletions util/unstructured/unstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@ import (
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewUnstructuredInformer(resource schema.GroupVersionResource, client dynamic.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
return NewFilteredUnstructuredInformer(resource, client, namespace, resyncPeriod, indexers, nil)
return NewFilteredUnstructuredInformer(resource, client, namespace, resyncPeriod, indexers, nil, nil)
}

// NewFilteredUnstructuredInformer constructs a new informer for Unstructured type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredUnstructuredInformer(resource schema.GroupVersionResource, client dynamic.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
func NewFilteredUnstructuredInformer(resource schema.GroupVersionResource, client dynamic.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListRequestListOptions internalinterfaces.TweakListOptionsFunc, tweakWatchRequestListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
ctx := context.Background()
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
if tweakListRequestListOptions != nil {
tweakListRequestListOptions(&options)
}
return client.Resource(resource).Namespace(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
if tweakWatchRequestListOptions != nil {
tweakWatchRequestListOptions(&options)
}
return client.Resource(resource).Namespace(namespace).Watch(ctx, options)
},
Expand Down
10 changes: 8 additions & 2 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
WithField("podCleanup", podCleanupWorkers).
Info("Current Worker Numbers")

wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListOptions, indexers)
wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers)
wfc.wftmplInformer = informer.NewTolerantWorkflowTemplateInformer(wfc.dynamicInterface, workflowTemplateResyncPeriod, wfc.managedNamespace)
wfc.wfTaskSetInformer = wfc.newWorkflowTaskSetInformer()
wfc.artGCTaskInformer = wfc.newArtGCTaskInformer()
Expand Down Expand Up @@ -809,7 +809,7 @@ func (wfc *WorkflowController) enqueueWfFromPodLabel(obj interface{}) error {
return nil
}

func (wfc *WorkflowController) tweakListOptions(options *metav1.ListOptions) {
func (wfc *WorkflowController) tweakListRequestListOptions(options *metav1.ListOptions) {
labelSelector := labels.NewSelector().
Add(util.InstanceIDRequirement(wfc.Config.InstanceID))
options.LabelSelector = labelSelector.String()
Expand All @@ -818,6 +818,12 @@ func (wfc *WorkflowController) tweakListOptions(options *metav1.ListOptions) {
options.ResourceVersion = ""
}

func (wfc *WorkflowController) tweakWatchRequestListOptions(options *metav1.ListOptions) {
labelSelector := labels.NewSelector().
Add(util.InstanceIDRequirement(wfc.Config.InstanceID))
options.LabelSelector = labelSelector.String()
}

func getWfPriority(obj interface{}) (int32, time.Time) {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl

// always compare to WorkflowController.Run to see what this block of code should be doing
{
wfc.wfInformer = util.NewWorkflowInformer(dynamicClient, "", 0, wfc.tweakListOptions, indexers)
wfc.wfInformer = util.NewWorkflowInformer(dynamicClient, "", 0, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers)
wfc.wfTaskSetInformer = informerFactory.Argoproj().V1alpha1().WorkflowTaskSets()
wfc.artGCTaskInformer = informerFactory.Argoproj().V1alpha1().WorkflowArtifactGCTasks()
wfc.taskResultInformer = wfc.newWorkflowTaskResultInformer()
Expand Down
7 changes: 4 additions & 3 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ func (cc *Controller) Run(ctx context.Context) {
}).ForResource(schema.GroupVersionResource{Group: workflow.Group, Version: workflow.Version, Resource: workflow.CronWorkflowPlural})
cc.addCronWorkflowInformerHandler()

wfInformer := util.NewWorkflowInformer(cc.dynamicInterface, cc.managedNamespace, cronWorkflowResyncPeriod, func(options *v1.ListOptions) {
wfInformerListOptionsFunc(options, cc.instanceId)
}, cache.Indexers{})
wfInformer := util.NewWorkflowInformer(cc.dynamicInterface, cc.managedNamespace, cronWorkflowResyncPeriod,
func(options *v1.ListOptions) { wfInformerListOptionsFunc(options, cc.instanceId) },
func(options *v1.ListOptions) { wfInformerListOptionsFunc(options, cc.instanceId) },
cache.Indexers{})
go wfInformer.Run(ctx.Done())

cc.wfLister = util.NewWorkflowLister(wfInformer)
Expand Down
5 changes: 3 additions & 2 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ import (
// objects. We no longer return WorkflowInformer due to:
// https://github.com/kubernetes/kubernetes/issues/57705
// https://github.com/argoproj/argo-workflows/issues/632
func NewWorkflowInformer(dclient dynamic.Interface, ns string, resyncPeriod time.Duration, tweakListOptions internalinterfaces.TweakListOptionsFunc, indexers cache.Indexers) cache.SharedIndexInformer {
func NewWorkflowInformer(dclient dynamic.Interface, ns string, resyncPeriod time.Duration, tweakListRequestListOptions internalinterfaces.TweakListOptionsFunc, tweakWatchRequestListOptions internalinterfaces.TweakListOptionsFunc, indexers cache.Indexers) cache.SharedIndexInformer {
resource := schema.GroupVersionResource{
Group: workflow.Group,
Version: "v1alpha1",
Expand All @@ -71,7 +71,8 @@ func NewWorkflowInformer(dclient dynamic.Interface, ns string, resyncPeriod time
ns,
resyncPeriod,
indexers,
tweakListOptions,
tweakListRequestListOptions,
tweakWatchRequestListOptions,
)
return informer
}
Expand Down

0 comments on commit 9db0bd0

Please sign in to comment.