Skip to content

Commit

Permalink
Passing page limit to cach config instead of override. (aws#452)
Browse files Browse the repository at this point in the history
* passing page limit to cache config

* adding error log to optimized list watcher
  • Loading branch information
yash97 committed Sep 10, 2024
1 parent 2e255c8 commit 2380bab
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
11 changes: 6 additions & 5 deletions controllers/custom/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,16 @@ func (b *Builder) Complete(reconciler Reconciler) (healthz.Checker, error) {
workqueue.DefaultControllerRateLimiter(), b.options.Name)

optimizedListWatch := newOptimizedListWatcher(b.ctx, b.clientSet.CoreV1().RESTClient(),
b.converter.Resource(), b.options.Namespace, b.options.PageLimit, b.converter)
b.converter.Resource(), b.options.Namespace, b.converter, b.log.WithName("listWatcher"))

// Create the config for low level controller with the custom converter
// list and watch
config := &cache.Config{
Queue: cache.NewDeltaFIFO(b.converter.Indexer, b.dataStore),
ListerWatcher: optimizedListWatch,
ObjectType: b.converter.ResourceType(),
FullResyncPeriod: b.options.ResyncPeriod,
Queue: cache.NewDeltaFIFO(b.converter.Indexer, b.dataStore),
ListerWatcher: optimizedListWatch,
WatchListPageSize: int64(b.options.PageLimit),
ObjectType: b.converter.ResourceType(),
FullResyncPeriod: b.options.ResyncPeriod,
Process: func(obj interface{}, _ bool) error {
// from oldest to newest
for _, d := range obj.(cache.Deltas) {
Expand Down
27 changes: 20 additions & 7 deletions controllers/custom/custom_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition"
"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -178,23 +179,26 @@ func (c *CustomController) WaitForCacheSync(controller cache.Controller) {

// newOptimizedListWatcher returns a list watcher with a custom list function that converts the
// response for each page using the converter function and returns a general watcher
func newOptimizedListWatcher(ctx context.Context, restClient cache.Getter, resource string, namespace string, limit int,
converter Converter) *cache.ListWatch {
func newOptimizedListWatcher(ctx context.Context, restClient cache.Getter, resource string, namespace string,
converter Converter, log logr.Logger) *cache.ListWatch {

listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
list, err := restClient.Get().
Namespace(namespace).
Resource(resource).
// This needs to be done because just setting the limit using option's
// Limit is being overridden and the response is returned without pagination.
VersionedParams(&metav1.ListOptions{
Limit: int64(limit),
Limit: options.Limit,
Continue: options.Continue,
}, metav1.ParameterCodec).
Do(ctx).
Get()
if err != nil {
return list, err
if statusErr, ok := err.(*apierrors.StatusError); ok {
log.Error(err, "List operation error", "code", statusErr.Status().Code)
} else {
log.Error(err, "List operation error")
}
return nil, err
}
// Strip down the the list before passing the paginated response back to
// the pager function
Expand All @@ -206,11 +210,20 @@ func newOptimizedListWatcher(ctx context.Context, restClient cache.Getter, resou
// before storing the object in the data store.
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
return restClient.Get().
watch, err := restClient.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch(ctx)
if err != nil {
if statusErr, ok := err.(*apierrors.StatusError); ok {
log.Error(err, "Watch operation error", "code", statusErr.Status().Code)
} else {
log.Error(err, "Watch operation error")
}
return nil, err
}
return watch, err
}
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
Expand Down

0 comments on commit 2380bab

Please sign in to comment.