diff --git a/Makefile b/Makefile index 232b32d8d..4df195953 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,7 @@ # Core Filestore CSI driver binary DRIVERBINARY=gcp-filestore-csi-driver WEBHOOKBINARY=gcp-filestore-csi-driver-webhook +LOCKRELEASEBINARY=gcp-filestore-csi-driver-lockrelease $(info PULL_BASE_REF is $(PULL_BASE_REF)) $(info PWD is $(PWD)) @@ -45,6 +46,15 @@ else endif $(info WEBHOOK_STAGINGIMAGE is $(WEBHOOK_STAGINGIMAGE)) +LOCKRELEASE_STAGINGIMAGE= +ifdef GCP_FS_CSI_LOCKRELEASE_STAGING_IMAGE + LOCKRELEASE_STAGINGIMAGE=$(GCP_FS_CSI_LOCKRELEASE_STAGING_IMAGE) +else + LOCKRELEASE_STAGINGIMAGE=gcr.io/$(PROJECT)/gcp-filestore-csi-driver-lockrelease +endif +$(info LOCKRELEASE_STAGINGIMAGE is $(LOCKRELEASE_STAGINGIMAGE)) + + BINDIR?=bin # This flag is used only for csi-client and windows. @@ -142,6 +152,27 @@ build-and-push-multi-arch: build-image-and-push-linux-arm64 build-image-and-push docker manifest create --amend $(STAGINGIMAGE):$(STAGINGVERSION) $(STAGINGIMAGE):$(STAGINGVERSION)_linux_amd64 $(STAGINGIMAGE):$(STAGINGVERSION)_linux_arm64 docker manifest push -p $(STAGINGIMAGE):$(STAGINGVERSION) +# Build the go binary for the CSI driver lock release controller. +lockrelease: + mkdir -p ${BINDIR} + { \ + set -e ; \ + CGO_ENABLED=0 go build -mod=vendor -a -ldflags '-X main.version=$(STAGINGVERSION) -extldflags "-static"' -o ${BINDIR}/${LOCKRELEASEBINARY} ./cmd/lockrelease/; \ + } + +# Build the docker image for the lock release controller. +lockrelease-image: init-buildx + { \ + set -e ; \ + docker buildx build \ + --platform linux/amd64 \ + --build-arg STAGINGVERSION=$(STAGINGVERSION) \ + --build-arg BUILDPLATFORM=linux/amd64 \ + --build-arg TARGETPLATFORM=linux/amd64 \ + -f ./cmd/lockrelease/Dockerfile \ + -t $(LOCKRELEASE_STAGINGIMAGE):$(STAGINGVERSION) --push .; \ + } + # Build the go binary for the CSI driver. # STAGINGVERSION may contain multiple tags (e.g. canary, vX.Y.Z etc). Use one of the tags # for setting the driver version variable. For convenience we are using the first value. diff --git a/cmd/lockrelease/Dockerfile b/cmd/lockrelease/Dockerfile new file mode 100644 index 000000000..3550e9809 --- /dev/null +++ b/cmd/lockrelease/Dockerfile @@ -0,0 +1,26 @@ +# Copyright 2024 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM --platform=$BUILDPLATFORM golang:1.22.6 as builder + +ARG TARGETPLATFORM + +WORKDIR /go/src/sigs.k8s.io/gcp-filestore-csi-driver +ADD . . +RUN GOARCH=$(echo $TARGETPLATFORM | cut -f2 -d '/') make lockrelease BINDIR=/bin GCP_FS_CSI_STAGING_VERSION=${STAGINGVERSION} + +FROM gcr.io/distroless/static +ARG LOCKRELEASEBINARY=gcp-filestore-csi-driver-lockrelease +COPY --from=builder /bin/${LOCKRELEASEBINARY} /${LOCKRELEASEBINARY} +ENTRYPOINT ["/gcp-filestore-csi-driver-lockrelease"] diff --git a/cmd/lockrelease/main.go b/cmd/lockrelease/main.go new file mode 100644 index 000000000..aa32b05f3 --- /dev/null +++ b/cmd/lockrelease/main.go @@ -0,0 +1,122 @@ +/* +Copyright 2024 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "time" + + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + releaselock "sigs.k8s.io/gcp-filestore-csi-driver/pkg/releaselock" + "sigs.k8s.io/gcp-filestore-csi-driver/pkg/util" +) + +var ( + lockReleaseSyncPeriod = flag.Duration("lock-release-sync-period", 3600*time.Second, "Duration, in seconds, the sync period of the lock release controller. Defaults to 3600 seconds.") + + httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string.") + metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") + + leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.") + leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.") + leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.") + + workQueueRateLimiterBaseDelay = flag.Duration("rate-limiter-base-delay", 5*time.Millisecond, "Base dalay of the work queue rate limiter. Default is 5ms.") + workQueueRateLimiterMaxDelay = flag.Duration("rate-limiter-max-delay", 1000*time.Second, "Max dalay of the work queue rate limiter. Default is 1000s.") +) + +func main() { + klog.InitFlags(nil) + flag.Parse() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + config, err := rest.InClusterConfig() + if err != nil { + klog.Fatalf("Failed to create an in cluster config: %v", err) + } + client, err := kubernetes.NewForConfig(config) + if err != nil { + klog.Fatalf("Failed to create a new discovery client: %v", err) + } + lockReleaseConfig := &releaselock.LockReleaseControllerConfig{ + LeaseDuration: *leaderElectionLeaseDuration, + RenewDeadline: *leaderElectionRenewDeadline, + RetryPeriod: *leaderElectionRetryPeriod, + SyncPeriod: *lockReleaseSyncPeriod, + WorkQueueRateLimiterBaseDelay: *workQueueRateLimiterBaseDelay, + WorkQueueRateLimiterMaxDelay: *workQueueRateLimiterMaxDelay, + MetricEndpoint: *httpEndpoint, + MetricPath: *metricsPath, + } + factory := informers.NewSharedInformerFactory(client, lockReleaseConfig.SyncPeriod) + nodeInformer := factory.Core().V1().Nodes().Informer() + + c, err := releaselock.NewLockReleaseController(client, lockReleaseConfig, &nodeInformer) + if err != nil { + klog.Fatalf("Failed to create a lock release controller: %v", err) + } + + nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + klog.Infof("Node informer received node create event. %v", obj) + c.EnqueueCreateEventObject(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + klog.Infof("Node informer received node update event. old %v, new %v", oldObj, newObj) + c.EnqueueUpdateEventObject(oldObj, newObj) + }, + }) + + run := func(ctx context.Context) { + klog.Infof("Lock release controller %s started leading on node %s", c.GetId(), c.GetHost()) + factory.Start(ctx.Done()) + c.Run(ctx) + } + + rl, err := resourcelock.New( + resourcelock.LeasesResourceLock, + util.ManagedFilestoreCSINamespace, + releaselock.LeaseName, + nil, + c.GetClient().CoordinationV1(), + resourcelock.ResourceLockConfig{ + Identity: c.GetId(), + }) + if err != nil { + klog.Fatalf("Error creating resourcelock: %v", err) + } + + // Use leader election, so that during rolling upgrade, only one of this controller and the old version lock release controller + // is running. + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: lockReleaseConfig.LeaseDuration, + RenewDeadline: lockReleaseConfig.RenewDeadline, + RetryPeriod: lockReleaseConfig.RetryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: run, + OnStoppedLeading: func() { + klog.Fatalf("%s no longer the leader", c.GetId()) + }, + }, + }) +} diff --git a/cmd/main.go b/cmd/main.go index 5dfb4cd36..de421a228 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -52,8 +52,7 @@ var ( resourceTagsStr = flag.String("resource-tags", "", "Resource tags to attach to each volume created. It is a comma separated list of tags of the form '//...//' where, parentID is the ID of Organization or Project resource where tag key and value resources exist, tagKey is the shortName of the tag key resource, tagValue is the shortName of the tag value resource. See https://cloud.google.com/resource-manager/docs/tags/tags-creating-and-managing for more details.") // Feature lock release specific parameters, only take effect when feature-lock-release is set to true. - featureLockRelease = flag.Bool("feature-lock-release", false, "if set to true, the node driver will support Filestore lock release.") - lockReleaseSyncPeriod = flag.Duration("lock-release-sync-period", 60*time.Second, "Duration, in seconds, the sync period of the lock release controller. Defaults to 60 seconds.") + featureLockRelease = flag.Bool("feature-lock-release", false, "if set to true, the node driver will support Filestore lock release.") // Feature configurable shares per Filestore instance specific parameters. featureMaxSharePerInstance = flag.Bool("feature-max-shares-per-instance", false, "If this feature flag is enabled, allows the user to configure max shares packed per Filestore instance") @@ -169,7 +168,6 @@ func main() { LeaseDuration: *leaderElectionLeaseDuration, RenewDeadline: *leaderElectionRenewDeadline, RetryPeriod: *leaderElectionRetryPeriod, - SyncPeriod: *lockReleaseSyncPeriod, MetricEndpoint: *httpEndpoint, MetricPath: *metricsPath, }, diff --git a/deploy/kubernetes/overlays/lockrelease/configmap_rbac.yaml b/deploy/kubernetes/overlays/lockrelease/configmap_rbac.yaml index 5983d449a..b016d7388 100644 --- a/deploy/kubernetes/overlays/lockrelease/configmap_rbac.yaml +++ b/deploy/kubernetes/overlays/lockrelease/configmap_rbac.yaml @@ -4,6 +4,13 @@ metadata: name: gke-managed-filestorecsi --- +##### Lock release controller Service Account, Roles, RoleBindings +apiVersion: v1 +kind: ServiceAccount +metadata: + name: filestore-lockrelease-controller-sa + namespace: gcp-filestore-csi-driver +--- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 @@ -16,21 +23,6 @@ rules: --- -kind: ClusterRoleBinding -apiVersion: rbac.authorization.k8s.io/v1 -metadata: - name: filestorecsi-node-driver-cluster-role-binding -subjects: -- kind: ServiceAccount - name: gcp-filestore-csi-node-sa - namespace: gcp-filestore-csi-driver -roleRef: - kind: ClusterRole - name: filestorecsi-node-driver-cluster-role - apiGroup: rbac.authorization.k8s.io - ---- - kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: @@ -39,7 +31,7 @@ metadata: rules: - apiGroups: [""] resources: ["configmaps"] - verbs: ["get", "list", "update", "create"] + verbs: ["get", "update", "create"] --- @@ -55,6 +47,21 @@ rules: --- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: filestorecsi-lockrelease-controller-cluster-role-binding +subjects: +- kind: ServiceAccount + name: filestore-lockrelease-controller-sa + namespace: gcp-filestore-csi-driver +roleRef: + kind: ClusterRole + name: filestorecsi-node-driver-cluster-role + apiGroup: rbac.authorization.k8s.io + +--- + kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: @@ -62,7 +69,7 @@ metadata: namespace: gke-managed-filestorecsi subjects: - kind: ServiceAccount - name: gcp-filestore-csi-node-sa + name: filestore-lockrelease-controller-sa namespace: gcp-filestore-csi-driver roleRef: kind: Role @@ -80,6 +87,9 @@ subjects: - kind: ServiceAccount name: gcp-filestore-csi-node-sa namespace: gcp-filestore-csi-driver +- kind: ServiceAccount + name: filestore-lockrelease-controller-sa + namespace: gcp-filestore-csi-driver roleRef: kind: Role name: filestorecsi-node-driver-role diff --git a/deploy/kubernetes/overlays/lockrelease/kustomization.yaml b/deploy/kubernetes/overlays/lockrelease/kustomization.yaml index 67946ff30..0b8f18d54 100644 --- a/deploy/kubernetes/overlays/lockrelease/kustomization.yaml +++ b/deploy/kubernetes/overlays/lockrelease/kustomization.yaml @@ -3,4 +3,4 @@ kind: Kustomization resources: - ../stable-master - configmap_rbac.yaml - +- lock_release_controller.yaml diff --git a/deploy/kubernetes/overlays/lockrelease/lock_release_controller.yaml b/deploy/kubernetes/overlays/lockrelease/lock_release_controller.yaml new file mode 100644 index 000000000..960560883 --- /dev/null +++ b/deploy/kubernetes/overlays/lockrelease/lock_release_controller.yaml @@ -0,0 +1,36 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: filestore-lock-release-controller + namespace: gcp-filestore-csi-driver + labels: + k8s-app: filestore-lock-release-controller +spec: + replicas: 1 + selector: + matchLabels: + k8s-app: filestore-lock-release-controller + template: + metadata: + labels: + k8s-app: filestore-lock-release-controller + component: filestorecsi + spec: + priorityClassName: csi-gcp-fs-node + nodeSelector: + kubernetes.io/os: linux + containers: + - name: filestore-lock-release-controller + image: registry.k8s.io/sig-storage/filestore-lockrelease-controller + args: + - --v=6 + resources: + requests: + cpu: 5m + memory: 10Mi + serviceAccountName: filestore-lockrelease-controller-sa + tolerations: + - key: "kubernetes.io/arch" + operator: "Equal" + value: "arm64" + effect: "NoSchedule" diff --git a/pkg/csi_driver/gcfs_driver.go b/pkg/csi_driver/gcfs_driver.go index 5e1cc05aa..c5fcfb21e 100644 --- a/pkg/csi_driver/gcfs_driver.go +++ b/pkg/csi_driver/gcfs_driver.go @@ -326,10 +326,6 @@ func (driver *GCFSDriver) Run(endpoint string) { // Start the nonblocking GRPC. s := NewNonBlockingGRPCServer() s.Start(endpoint, driver.ids, driver.cs, driver.ns) - if driver.config.RunNode && driver.config.FeatureOptions.FeatureLockRelease.Enabled { - // Start the lock release controller on node driver. - driver.ns.(*nodeServer).lockReleaseController.Run(context.Background()) - } s.Wait() } diff --git a/pkg/csi_driver/node.go b/pkg/csi_driver/node.go index 3dc5e3afc..731f9579a 100644 --- a/pkg/csi_driver/node.go +++ b/pkg/csi_driver/node.go @@ -50,6 +50,7 @@ var ( ) // nodeServer handles mounting and unmounting of GCFS volumes on a node +// TODO(b/375481562): refactor config map utils & remove node driver's dependency on lockReleaseController type nodeServer struct { driver *GCFSDriver mounter mount.Interface @@ -76,7 +77,7 @@ func newNodeServer(driver *GCFSDriver, mounter mount.Interface, metaService meta if err != nil { return nil, err } - lc, err := lockrelease.NewLockReleaseController(client, ns.features.FeatureLockRelease.Config) + lc, err := lockrelease.NewLockReleaseController(client, ns.features.FeatureLockRelease.Config, nil) if err != nil { return nil, err } diff --git a/pkg/releaselock/controller.go b/pkg/releaselock/controller.go index 84451537d..11989b17f 100644 --- a/pkg/releaselock/controller.go +++ b/pkg/releaselock/controller.go @@ -15,30 +15,39 @@ package lockrelease import ( "context" + "errors" "fmt" "os" "time" + "golang.org/x/time/rate" + corev1 "k8s.io/api/core/v1" + apiError "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "sigs.k8s.io/gcp-filestore-csi-driver/pkg/metrics" "sigs.k8s.io/gcp-filestore-csi-driver/pkg/util" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" + cache "k8s.io/client-go/tools/cache" ) const ( gceInstanceIDKey = "container.googleapis.com/instance_id" - leaseName = "filestore-csi-storage-gke-io-node" - // Root CA configmap in each namespace. - rootCA = "kube-root-ca.crt" + LeaseName = "filestore-csi-storage-gke-io-node" ) +type NodeUpdatePair struct { + OldObj *corev1.Node + NewObj *corev1.Node +} + type LockReleaseController struct { client kubernetes.Interface @@ -50,18 +59,27 @@ type LockReleaseController struct { config *LockReleaseControllerConfig metricsManager *metrics.MetricsManager + nodeInformer *cache.SharedIndexInformer + + updateEventQueue workqueue.RateLimitingInterface + createEventQueue workqueue.RateLimitingInterface } type LockReleaseControllerConfig struct { // Parameters of leaderelection.LeaderElectionConfig. LeaseDuration, RenewDeadline, RetryPeriod time.Duration + // Parameters of workQueue rate limiters. + WorkQueueRateLimiterBaseDelay, WorkQueueRateLimiterMaxDelay time.Duration // Reconcile loop frequency. SyncPeriod time.Duration // HTTP endpoint and path to emit NFS lock release metrics. MetricEndpoint, MetricPath string } -func NewLockReleaseController(client kubernetes.Interface, config *LockReleaseControllerConfig) (*LockReleaseController, error) { +func NewLockReleaseController( + client kubernetes.Interface, + config *LockReleaseControllerConfig, + nodeInformer *cache.SharedIndexInformer) (*LockReleaseController, error) { // Register rpc procedure for lock release. if err := RegisterLockReleaseProcedure(); err != nil { klog.Errorf("Error initializing lockrelease controller: %v", err) @@ -75,12 +93,19 @@ func NewLockReleaseController(client kubernetes.Interface, config *LockReleaseCo } // Add a uniquifier so that two processes on the same host don't accidentally both become active. id := hostname + "_" + string(uuid.NewUUID()) + ratelimiter := workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(config.WorkQueueRateLimiterBaseDelay, config.WorkQueueRateLimiterMaxDelay), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, + ) lc := &LockReleaseController{ - id: id, - hostname: hostname, - client: client, - config: config, + id: id, + hostname: hostname, + client: client, + config: config, + nodeInformer: nodeInformer, + updateEventQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + createEventQueue: workqueue.NewRateLimitingQueue(ratelimiter), } if config.MetricEndpoint != "" { @@ -94,114 +119,242 @@ func NewLockReleaseController(client kubernetes.Interface, config *LockReleaseCo return lc, nil } -func (c *LockReleaseController) Run(ctx context.Context) { - run := func(ctx context.Context) { - klog.Infof("Lock release controller %s started leading on node %s", c.id, c.hostname) - wait.Forever(func() { - start := time.Now() - cmList, err := c.client.CoreV1().ConfigMaps(util.ManagedFilestoreCSINamespace).List(ctx, metav1.ListOptions{}) - duration := time.Since(start) - c.RecordKubeAPIMetrics(err, metrics.ConfigMapResourceType, metrics.ListOpType, metrics.ReconcilerOpSource, duration) - if err != nil { - klog.Errorf("Failed to list configmap in namespace %s: %v", util.ManagedFilestoreCSINamespace, err) - return - } - klog.Infof("Listed %d configmaps in namespace %s", len(cmList.Items), util.ManagedFilestoreCSINamespace) - - start = time.Now() - nodes, err := c.listNodes(ctx) - duration = time.Since(start) - c.RecordKubeAPIMetrics(err, metrics.NodeResourceType, metrics.ListOpType, metrics.ReconcilerOpSource, duration) - if err != nil { - klog.Errorf("Failed to list nodes: %v", err) - return +func (c *LockReleaseController) Run(ctx context.Context) error { + defer utilruntime.HandleCrash() + defer c.updateEventQueue.ShutDown() + defer c.createEventQueue.ShutDown() + if !cache.WaitForCacheSync(ctx.Done(), (*c.nodeInformer).HasSynced) { + klog.Fatal("Timed out waiting for caches to sync") + } + klog.Info("Cache sync completed successfully.") + go wait.UntilWithContext(ctx, c.runCreateEventWorker, time.Second) + go wait.UntilWithContext(ctx, c.runUpdateEventWorker, time.Second) + klog.Info("Started workers") + <-ctx.Done() + klog.Info("Shutting down workers") + return nil +} + +func (c *LockReleaseController) runCreateEventWorker(ctx context.Context) { + for c.processNextCreateEvent(ctx) { + } +} + +// TODO(b/374327452): interface rpc calls for mocking and create unit tests for handleCreateEvent and handleUpdateEvent +func (c *LockReleaseController) handleCreateEvent(ctx context.Context, obj interface{}) error { + node := obj.(*corev1.Node) + start := time.Now() + cmName := ConfigMapNamePrefix + node.Name + cm, err := c.client.CoreV1().ConfigMaps(util.ManagedFilestoreCSINamespace).Get(ctx, cmName, metav1.GetOptions{}) + duration := time.Since(start) + c.RecordKubeAPIMetrics(err, metrics.ConfigMapResourceType, metrics.GetOpType, metrics.ReconcilerOpSource, duration) + + if err != nil { + if apiError.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to get configmap in namespace %s: %w", util.ManagedFilestoreCSINamespace, err) + } + klog.Infof("Got configmap (%v) in namespace %s", cm, util.ManagedFilestoreCSINamespace) + data := cm.DeepCopy().Data + + var configMapReconcileErrors []error + for key, filestoreIP := range data { + err = c.processConfigMapEntryOnNodeCreation(ctx, key, filestoreIP, node, cm) + if err != nil { + configMapReconcileErrors = append(configMapReconcileErrors, err) + } + } + if len(configMapReconcileErrors) > 0 { + return errors.Join(configMapReconcileErrors...) + } + return nil + +} + +func (c *LockReleaseController) processConfigMapEntryOnNodeCreation(ctx context.Context, key string, filestoreIP string, node *corev1.Node, cm *corev1.ConfigMap) error { + _, _, _, _, gceInstanceID, gkeNodeInternalIP, err := ParseConfigMapKey(key) + if err != nil { + return fmt.Errorf("failed to parse configmap key %s: %w", key, err) + } + klog.V(6).Infof("Verifying GKE node %s with nodeId %s nodeInternalIP %s exists or not", node.Name, gceInstanceID, gkeNodeInternalIP) + entryMatchesNode, err := c.verifyConfigMapEntry(node, gceInstanceID, gkeNodeInternalIP) + if err != nil { + return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", node.Name, gceInstanceID, gkeNodeInternalIP, err) + } + if entryMatchesNode { + klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s still exists in API server, skip lock info reconciliation", node.Name, gceInstanceID, gkeNodeInternalIP) + return nil + } + + // Try to match the latest node, to prevent incorrect releasing the lock in case of a lagging informer/watch + latestNode, err := c.client.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{}) + if err != nil { + if apiError.IsNotFound(err) { + opErr := ReleaseLock(filestoreIP, gkeNodeInternalIP) + c.RecordLockReleaseMetrics(opErr) + if opErr != nil { + return fmt.Errorf("failed to release lock: %w", opErr) } - klog.Infof("Listed %d nodes", len(nodes)) - - for _, cm := range cmList.Items { - // Filter out root ca. - if cm.Name == rootCA { - continue - } - if err := c.syncLockInfo(ctx, &cm, nodes); err != nil { - klog.Errorf("Failed to sync lock info for configmap %s/%s: %v", cm.Namespace, cm.Name, err) - } + if err := c.RemoveKeyFromConfigMapWithRetry(ctx, cm, key); err != nil { + return fmt.Errorf("failed to remove key %s from configmap %s/%s: %w", key, cm.Namespace, cm.Name, err) } - }, c.config.SyncPeriod) - } - - rl, err := resourcelock.New( - resourcelock.LeasesResourceLock, - util.ManagedFilestoreCSINamespace, - leaseName, - nil, - c.client.CoordinationV1(), - resourcelock.ResourceLockConfig{ - Identity: c.id, - }) + return nil + } + return fmt.Errorf("failed to get node in namespace %w", err) + } + entryMatchesLatestNode, err := c.verifyConfigMapEntry(latestNode, gceInstanceID, gkeNodeInternalIP) if err != nil { - klog.Fatalf("Error creating resourcelock: %v", err) + return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", node.Name, gceInstanceID, gkeNodeInternalIP, err) + } + if entryMatchesLatestNode { + klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s exists in API server, skip lock info reconciliation", node.Name, gceInstanceID, gkeNodeInternalIP) + return nil + } + + klog.Infof("GKE node %s with nodeId %s nodeInternalIP %s no longer exists, releasing lock for Filestore IP %s", node.Name, gceInstanceID, gkeNodeInternalIP, filestoreIP) + opErr := ReleaseLock(filestoreIP, gkeNodeInternalIP) + c.RecordLockReleaseMetrics(opErr) + if opErr != nil { + return fmt.Errorf("failed to release lock: %w", opErr) + } + klog.Infof("Removing lock info key %s from configmap %s/%s with data %v", key, cm.Namespace, cm.Name, cm.Data) + // Apply the "Get() and Update(), or retry" logic in RemoveKeyFromConfigMap(). + // This will increase the number of k8s api calls, + // but reduce repetitive ReleaseLock() due to kubeclient api failures in each reconcile loop. + if err := c.RemoveKeyFromConfigMapWithRetry(ctx, cm, key); err != nil { + return fmt.Errorf("failed to remove key %s from configmap %s/%s: %w", key, cm.Namespace, cm.Name, err) + } + return nil +} + +func (c *LockReleaseController) processNextCreateEvent(ctx context.Context) bool { + obj, shutdown := c.createEventQueue.Get() + if shutdown { + return false + } + defer c.createEventQueue.Done(obj) + + err := c.handleCreateEvent(ctx, obj) + if err == nil { + // If no error occurs then we Forget this item so it does not + // get queued again until another change happens. + c.createEventQueue.Forget(obj) + klog.Infof("Successfully processed node create event object %v", obj) + return true + } + + klog.Errorf("Requeue node create event due to error: %v", err) + c.createEventQueue.AddRateLimited(obj) + return true + +} + +func (c *LockReleaseController) runUpdateEventWorker(ctx context.Context) { + for c.processNextUpdateEventWorkItem(ctx) { + } +} + +func (c *LockReleaseController) processNextUpdateEventWorkItem(ctx context.Context) bool { + obj, shutdown := c.updateEventQueue.Get() + if shutdown { + return false + } + defer c.updateEventQueue.Done(obj) + nodeUpdatePair, ok := obj.(*NodeUpdatePair) + if !ok { + klog.Error("unable to convert update event object to nodeUpdatePair") + return true + } + + // Access old and new objects: + oldObj := nodeUpdatePair.OldObj + newObj := nodeUpdatePair.NewObj + + err := c.handleUpdateEvent(ctx, oldObj, newObj) + if err == nil { + // If no error occurs then we Forget this item so it does not + // get queued again until another change happens. + c.updateEventQueue.Forget(obj) + klog.Infof("Successfully processed node update event object %v", obj) + return true } - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: rl, - LeaseDuration: c.config.LeaseDuration, - RenewDeadline: c.config.RenewDeadline, - RetryPeriod: c.config.RetryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: run, - OnStoppedLeading: func() { - klog.Fatalf("%s no longer the leader", c.id) - }, - }, - }) + klog.Errorf("Requeue node update event due to error: %v", err) + c.updateEventQueue.AddRateLimited(obj) + return true } -func (c *LockReleaseController) syncLockInfo(ctx context.Context, cm *corev1.ConfigMap, nodes map[string]*corev1.Node) error { - nodeName, err := GKENodeNameFromConfigMap(cm) +func (c *LockReleaseController) handleUpdateEvent(ctx context.Context, oldObj interface{}, newObj interface{}) error { + newNode := newObj.(*corev1.Node) + oldNode := oldObj.(*corev1.Node) + start := time.Now() + nodeName := newNode.Name + cmName := ConfigMapNamePrefix + nodeName + cm, err := c.client.CoreV1().ConfigMaps(util.ManagedFilestoreCSINamespace).Get(ctx, cmName, metav1.GetOptions{}) + duration := time.Since(start) + c.RecordKubeAPIMetrics(err, metrics.ConfigMapResourceType, metrics.GetOpType, metrics.ReconcilerOpSource, duration) + if err != nil { - klog.Errorf("Failed to get GKE node name from configmap %s/%s: %v", cm.Namespace, cm.Name, err) - return err + if apiError.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to get configmap in namespace %s: %w", util.ManagedFilestoreCSINamespace, err) } + klog.Infof("Got configmap (%v) in namespace %s", cm, util.ManagedFilestoreCSINamespace) - node := nodes[nodeName] data := cm.DeepCopy().Data + var configMapReconcileErrors []error for key, filestoreIP := range data { - _, _, _, _, gceInstanceID, gkeNodeInternalIP, err := ParseConfigMapKey(key) - if err != nil { - klog.Errorf("Failed to parse configmap key %s: %v", key, err) - continue - } - klog.V(6).Infof("Verifying GKE node %s with nodeId %s nodeInternalIP %s exists or not", nodeName, gceInstanceID, gkeNodeInternalIP) - nodeExists, err := c.verifyNodeExists(node, gceInstanceID, gkeNodeInternalIP) + err = c.processConfigMapEntryOnNodeUpdate(ctx, key, filestoreIP, newNode, oldNode, cm) if err != nil { - klog.Errorf("Failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %v", nodeName, gceInstanceID, gkeNodeInternalIP, err) - continue - } - if nodeExists { - klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s still exists in API server, skip lock info reconciliation", nodeName, gceInstanceID, gkeNodeInternalIP) - continue + configMapReconcileErrors = append(configMapReconcileErrors, err) } - klog.Infof("GKE node %s with nodeId %s nodeInternalIP %s no longer exists, releasing lock for Filestore IP %s", nodeName, gceInstanceID, gkeNodeInternalIP, filestoreIP) + } + if len(configMapReconcileErrors) > 0 { + return errors.Join(configMapReconcileErrors...) + } + return nil +} + +func (c *LockReleaseController) processConfigMapEntryOnNodeUpdate(ctx context.Context, key string, filestoreIP string, newNode *corev1.Node, oldNode *corev1.Node, cm *corev1.ConfigMap) error { + _, _, _, _, gceInstanceID, gkeNodeInternalIP, err := ParseConfigMapKey(key) + if err != nil { + return fmt.Errorf("failed to parse configmap key %s: %w", key, err) + } + klog.V(6).Infof("Verifying GKE node %s with nodeId %s nodeInternalIP %s exists or not", newNode.Name, gceInstanceID, gkeNodeInternalIP) + entryMatchesNewNode, err := c.verifyConfigMapEntry(newNode, gceInstanceID, gkeNodeInternalIP) + if err != nil { + return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", newNode.Name, gceInstanceID, gkeNodeInternalIP, err) + } + entryMatchesOldNode, err := c.verifyConfigMapEntry(oldNode, gceInstanceID, gkeNodeInternalIP) + if err != nil { + return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", newNode.Name, gceInstanceID, gkeNodeInternalIP, err) + } + klog.Infof("Checked config map entry against old node(matching result %t), and new node(matching result %t)", entryMatchesOldNode, entryMatchesNewNode) + if entryMatchesNewNode { + klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s still exists in API server, skip lock info reconciliation", newNode.Name, gceInstanceID, gkeNodeInternalIP) + return nil + } + if entryMatchesOldNode { + klog.Infof("GKE node %s with nodeId %s nodeInternalIP %s matches a node before update, releasing lock for Filestore IP %s", newNode.Name, gceInstanceID, gkeNodeInternalIP, filestoreIP) opErr := ReleaseLock(filestoreIP, gkeNodeInternalIP) c.RecordLockReleaseMetrics(opErr) if opErr != nil { - klog.Errorf("Failed to release lock: %v", opErr) - continue + return fmt.Errorf("failed to release lock: %w", opErr) } klog.Infof("Removing lock info key %s from configmap %s/%s with data %v", key, cm.Namespace, cm.Name, cm.Data) - // Apply the "Get() and Update(), or retry" logic in RemoveKeyFromConfigMap(). - // This will increase the number of k8s api calls, - // but reduce repetitive ReleaseLock() due to kubeclient api failures in each reconcile loop. + if err := c.RemoveKeyFromConfigMapWithRetry(ctx, cm, key); err != nil { - klog.Errorf("Failed to remove key %s from configmap %s/%s: %v", key, cm.Namespace, cm.Name, err) + return fmt.Errorf("failed to remove key %s from configmap %s/%s: %w", key, cm.Namespace, cm.Name, err) } } return nil + } -// verifyNodeExists validates if the given node object has the exact nodeID, and nodeInternalIP. -func (c *LockReleaseController) verifyNodeExists(node *corev1.Node, expectedGCEInstanceID, expectedNodeInternalIP string) (bool, error) { +// verifyConfigMapEntry validates if the given config map entry object has the exact nodeID, and nodeInternalIP. +func (c *LockReleaseController) verifyConfigMapEntry(node *corev1.Node, expectedGCEInstanceID, expectedNodeInternalIP string) (bool, error) { if node == nil { return false, nil } @@ -224,18 +377,6 @@ func (c *LockReleaseController) verifyNodeExists(node *corev1.Node, expectedGCEI return false, nil } -func (c *LockReleaseController) listNodes(ctx context.Context) (map[string]*corev1.Node, error) { - nodeList, err := c.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) - if err != nil { - return nil, err - } - nodeMap := map[string]*corev1.Node{} - for _, node := range nodeList.Items { - nodeMap[node.Name] = node.DeepCopy() - } - return nodeMap, nil -} - func (c *LockReleaseController) RecordKubeAPIMetrics(opErr error, resourceType, opType, opSource string, opDuration time.Duration) { if c.metricsManager == nil { return @@ -249,3 +390,32 @@ func (c *LockReleaseController) RecordLockReleaseMetrics(opErr error) { } c.metricsManager.RecordLockReleaseMetrics(opErr) } + +// GetId returns the ID of the LockReleaseController. +func (c *LockReleaseController) GetId() string { + return c.id +} + +// GetHost returns the hostname where the lock release controller is running on. +func (c *LockReleaseController) GetHost() string { + return c.hostname +} + +// GetClient returns the kubernetes client of the LockReleaseController. +func (c *LockReleaseController) GetClient() kubernetes.Interface { + return c.client +} + +// EnqueueCreateEvent adds an object to the createEventQueue of the LockReleaseController. +func (c *LockReleaseController) EnqueueCreateEventObject(obj interface{}) { + c.createEventQueue.Add(obj) +} + +// EnqueueUpdateEvent adds a NodeUpdatePair to the updateEventQueue. +func (c *LockReleaseController) EnqueueUpdateEventObject(oldObj, newObj interface{}) { + nodeUpdatePair := &NodeUpdatePair{ + OldObj: oldObj.(*corev1.Node), // Type assertion to *v1.Node + NewObj: newObj.(*corev1.Node), // Type assertion to *v1.Node + } + c.updateEventQueue.Add(nodeUpdatePair) +} diff --git a/pkg/releaselock/controller_test.go b/pkg/releaselock/controller_test.go index b9bc98ab8..3f5c4605a 100644 --- a/pkg/releaselock/controller_test.go +++ b/pkg/releaselock/controller_test.go @@ -1,16 +1,13 @@ package lockrelease import ( - "context" "testing" - "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" ) -func TestVerifyNodeExists(t *testing.T) { +func TestVerifyConfigMapEntry(t *testing.T) { cases := []struct { name string node *corev1.Node @@ -109,7 +106,7 @@ func TestVerifyNodeExists(t *testing.T) { } for _, test := range cases { controller := NewFakeLockReleaseController() - nodeExists, err := controller.verifyNodeExists(test.node, test.gceInstanceID, test.nodeInternalIP) + nodeExists, err := controller.verifyConfigMapEntry(test.node, test.gceInstanceID, test.nodeInternalIP) if gotExpected := gotExpectedError(test.name, test.expectErr, err); gotExpected != nil { t.Errorf("%v", gotExpected) } @@ -118,48 +115,3 @@ func TestVerifyNodeExists(t *testing.T) { } } } - -func TestListNodes(t *testing.T) { - node1 := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node1", - Annotations: map[string]string{ - gceInstanceIDKey: "node1-id", - }, - }, - } - node2 := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node2", - Annotations: map[string]string{ - gceInstanceIDKey: "node2-id", - }, - }, - } - controller := NewFakeLockReleaseControllerWithClient(fake.NewSimpleClientset(node1, node2)) - expectedMap := map[string]*corev1.Node{ - "node1": { - ObjectMeta: metav1.ObjectMeta{ - Name: "node1", - Annotations: map[string]string{ - gceInstanceIDKey: "node1-id", - }, - }, - }, - "node2": { - ObjectMeta: metav1.ObjectMeta{ - Name: "node2", - Annotations: map[string]string{ - gceInstanceIDKey: "node2-id", - }, - }, - }, - } - nodes, err := controller.listNodes(context.Background()) - if err != nil { - t.Fatalf("test listNodes failed: unexpected error: %v", err) - } - if diff := cmp.Diff(expectedMap, nodes); diff != "" { - t.Errorf("test listNodes failed: unexpected diff (-want +got):%s", diff) - } -}