diff --git a/controllers/custom/builder.go b/controllers/custom/builder.go index 181c9bc8..232f1d42 100644 --- a/controllers/custom/builder.go +++ b/controllers/custom/builder.go @@ -113,15 +113,16 @@ func (b *Builder) Complete(reconciler Reconciler) (healthz.Checker, error) { workqueue.DefaultControllerRateLimiter(), b.options.Name) optimizedListWatch := newOptimizedListWatcher(b.ctx, b.clientSet.CoreV1().RESTClient(), - b.converter.Resource(), b.options.Namespace, b.options.PageLimit, b.converter) + b.converter.Resource(), b.options.Namespace, b.converter, b.log.WithName("listWatcher")) // Create the config for low level controller with the custom converter // list and watch config := &cache.Config{ - Queue: cache.NewDeltaFIFO(b.converter.Indexer, b.dataStore), - ListerWatcher: optimizedListWatch, - ObjectType: b.converter.ResourceType(), - FullResyncPeriod: b.options.ResyncPeriod, + Queue: cache.NewDeltaFIFO(b.converter.Indexer, b.dataStore), + ListerWatcher: optimizedListWatch, + WatchListPageSize: int64(b.options.PageLimit), + ObjectType: b.converter.ResourceType(), + FullResyncPeriod: b.options.ResyncPeriod, Process: func(obj interface{}, _ bool) error { // from oldest to newest for _, d := range obj.(cache.Deltas) { diff --git a/controllers/custom/custom_controller.go b/controllers/custom/custom_controller.go index b3bfeee5..df98b968 100644 --- a/controllers/custom/custom_controller.go +++ b/controllers/custom/custom_controller.go @@ -21,6 +21,7 @@ import ( "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition" "github.com/go-logr/logr" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -178,23 +179,26 @@ func (c *CustomController) WaitForCacheSync(controller cache.Controller) { // newOptimizedListWatcher returns a list watcher with a custom list function that converts the // response for each page using the converter function and returns a general watcher -func newOptimizedListWatcher(ctx context.Context, restClient cache.Getter, resource string, namespace string, limit int, - converter Converter) *cache.ListWatch { +func newOptimizedListWatcher(ctx context.Context, restClient cache.Getter, resource string, namespace string, + converter Converter, log logr.Logger) *cache.ListWatch { listFunc := func(options metav1.ListOptions) (runtime.Object, error) { list, err := restClient.Get(). Namespace(namespace). Resource(resource). - // This needs to be done because just setting the limit using option's - // Limit is being overridden and the response is returned without pagination. VersionedParams(&metav1.ListOptions{ - Limit: int64(limit), + Limit: options.Limit, Continue: options.Continue, }, metav1.ParameterCodec). Do(ctx). Get() if err != nil { - return list, err + if statusErr, ok := err.(*apierrors.StatusError); ok { + log.Error(err, "List operation error", "code", statusErr.Status().Code) + } else { + log.Error(err, "List operation error") + } + return nil, err } // Strip down the the list before passing the paginated response back to // the pager function @@ -206,11 +210,20 @@ func newOptimizedListWatcher(ctx context.Context, restClient cache.Getter, resou // before storing the object in the data store. watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { options.Watch = true - return restClient.Get(). + watch, err := restClient.Get(). Namespace(namespace). Resource(resource). VersionedParams(&options, metav1.ParameterCodec). Watch(ctx) + if err != nil { + if statusErr, ok := err.(*apierrors.StatusError); ok { + log.Error(err, "Watch operation error", "code", statusErr.Status().Code) + } else { + log.Error(err, "Watch operation error") + } + return nil, err + } + return watch, err } return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} }