Skip to content

Commit

Permalink
Merge pull request #986 from k8s-infra-cherrypick-robot/cherry-pick-9…
Browse files Browse the repository at this point in the history
…82-to-release-1.6

[release-1.6] Lock release unit test
  • Loading branch information
k8s-ci-robot authored Nov 4, 2024
2 parents f44b9ca + 746b930 commit 7afe11f
Show file tree
Hide file tree
Showing 6 changed files with 663 additions and 76 deletions.
2 changes: 1 addition & 1 deletion pkg/csi_driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func initTestNodeServerWithKubeClient(t *testing.T, client kubernetes.Interface)
mounter: mounter,
metaService: metaserice,
volumeLocks: util.NewVolumeLocks(),
lockReleaseController: lockrelease.NewFakeLockReleaseControllerWithClient(client),
lockReleaseController: lockrelease.NewControllerBuilder().WithClient(client).Build(),
features: &GCFSDriverFeatureOptions{FeatureLockRelease: &FeatureLockRelease{Enabled: true}},
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/releaselock/configmap_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestGetConfigMap(t *testing.T) {
}
for _, test := range cases {
client := fake.NewSimpleClientset(test.existingCM)
controller := NewFakeLockReleaseControllerWithClient(client)
controller := NewControllerBuilder().WithClient(client).Build()
cm, err := controller.GetConfigMap(context.Background(), test.cmName, test.cmNamespace)
if gotExpected := gotExpectedError(test.name, test.expectErr, err); gotExpected != nil {
t.Fatal(gotExpected)
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestUpdateConfigMapWithKeyValue(t *testing.T) {
}
for _, test := range cases {
client := fake.NewSimpleClientset(test.existingCM)
controller := NewFakeLockReleaseControllerWithClient(client)
controller := NewControllerBuilder().WithClient(client).Build()
ctx := context.Background()
err := controller.UpdateConfigMapWithKeyValue(ctx, test.existingCM, test.key, test.value)
if gotExpected := gotExpectedError(test.name, test.expectErr, err); gotExpected != nil {
Expand Down Expand Up @@ -372,7 +372,7 @@ func TestRemoveKeyFromConfigMap(t *testing.T) {
}
for _, test := range cases {
client := fake.NewSimpleClientset(test.existingCM)
controller := NewFakeLockReleaseControllerWithClient(client)
controller := NewControllerBuilder().WithClient(client).Build()
ctx := context.Background()
err := controller.RemoveKeyFromConfigMap(ctx, test.existingCM, test.key)
if gotExpected := gotExpectedError(test.name, test.expectErr, err); gotExpected != nil {
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestRemoveKeyFromConfigMapWithRetry(t *testing.T) {
}
for _, test := range cases {
client := fake.NewSimpleClientset(test.existingCM)
controller := NewFakeLockReleaseControllerWithClient(client)
controller := NewControllerBuilder().WithClient(client).Build()
ctx := context.Background()
err := controller.RemoveKeyFromConfigMapWithRetry(ctx, test.existingCM, test.key)
if gotExpected := gotExpectedError(test.name, test.expectErr, err); gotExpected != nil {
Expand Down
162 changes: 97 additions & 65 deletions pkg/releaselock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,77 @@ type NodeUpdatePair struct {
NewObj *corev1.Node
}

type LockService interface {
ReleaseLock(hostIP, clientIP string) error
}

type EventProcessor interface {
processConfigMapEntryOnNodeCreation(ctx context.Context, key string, filestoreIP string, node *corev1.Node, cm *corev1.ConfigMap) error
processConfigMapEntryOnNodeUpdate(ctx context.Context, key string, filestoreIP string, newNode *corev1.Node, oldNode *corev1.Node, cm *corev1.ConfigMap) error
SetController(ctrl *LockReleaseController)
}

type DefaultEventProcessor struct {
ctrl *LockReleaseController
}

func (p *DefaultEventProcessor) SetController(ctrl *LockReleaseController) {
p.ctrl = ctrl
}

func (p *DefaultEventProcessor) processConfigMapEntryOnNodeCreation(ctx context.Context, key string, filestoreIP string, node *corev1.Node, cm *corev1.ConfigMap) error {
if p.ctrl == nil {
return fmt.Errorf("controller not set")
}

c := p.ctrl
_, _, _, _, gceInstanceID, gkeNodeInternalIP, err := ParseConfigMapKey(key)
if err != nil {
return fmt.Errorf("failed to parse configmap key %s: %v", key, err)
}
klog.V(6).Infof("Verifying GKE node %s with nodeId %s nodeInternalIP %s exists or not", node.Name, gceInstanceID, gkeNodeInternalIP)
entryMatchesNode, err := c.verifyConfigMapEntry(node, gceInstanceID, gkeNodeInternalIP)
if err != nil {
return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %v", node.Name, gceInstanceID, gkeNodeInternalIP, err)
}
if entryMatchesNode {
klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s still exists in API server, skip lock info reconciliation", node.Name, gceInstanceID, gkeNodeInternalIP)
return nil
}

// Try to match the latest node, to prevent incorrect releasing the lock in case of a lagging informer/watch
latestNode, err := c.client.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{})
if err != nil {
if apiError.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to get node in namespace %v", err)
}
entryMatchesLatestNode, err := c.verifyConfigMapEntry(latestNode, gceInstanceID, gkeNodeInternalIP)
if err != nil {
return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %v", node.Name, gceInstanceID, gkeNodeInternalIP, err)
}
if entryMatchesLatestNode {
klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s exists in API server, skip lock info reconciliation", node.Name, gceInstanceID, gkeNodeInternalIP)
return nil
}

klog.Infof("GKE node %s with nodeId %s nodeInternalIP %s no longer exists, releasing lock for Filestore IP %s", node.Name, gceInstanceID, gkeNodeInternalIP, filestoreIP)
opErr := c.lockService.ReleaseLock(filestoreIP, gkeNodeInternalIP)
c.RecordLockReleaseMetrics(opErr)
if opErr != nil {
return fmt.Errorf("failed to release lock: %v", opErr)
}
klog.Infof("Removing lock info key %s from configmap %s/%s with data %v", key, cm.Namespace, cm.Name, cm.Data)
// Apply the "Get() and Update(), or retry" logic in RemoveKeyFromConfigMap().
// This will increase the number of k8s api calls,
// but reduce repetitive ReleaseLock() due to kubeclient api failures in each reconcile loop.
if err := c.RemoveKeyFromConfigMapWithRetry(ctx, cm, key); err != nil {
return fmt.Errorf("failed to remove key %s from configmap %s/%s: %v", key, cm.Namespace, cm.Name, err)
}
return nil
}

type LockReleaseController struct {
client kubernetes.Interface

Expand All @@ -63,6 +134,9 @@ type LockReleaseController struct {

updateEventQueue workqueue.RateLimitingInterface
createEventQueue workqueue.RateLimitingInterface

eventProcessor EventProcessor
lockService LockService
}

type LockReleaseControllerConfig struct {
Expand Down Expand Up @@ -98,6 +172,9 @@ func NewLockReleaseController(
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
)

eventProcessor := &DefaultEventProcessor{}
lockService := &FileStoreRPCClient{}

lc := &LockReleaseController{
id: id,
hostname: hostname,
Expand All @@ -106,6 +183,8 @@ func NewLockReleaseController(
nodeInformer: nodeInformer,
updateEventQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
createEventQueue: workqueue.NewRateLimitingQueue(ratelimiter),
eventProcessor: eventProcessor,
lockService: lockService,
}

if config.MetricEndpoint != "" {
Expand All @@ -116,6 +195,7 @@ func NewLockReleaseController(
lc.metricsManager = mm
}

eventProcessor.SetController(lc)
return lc, nil
}

Expand Down Expand Up @@ -160,74 +240,20 @@ func (c *LockReleaseController) handleCreateEvent(ctx context.Context, obj inter

var configMapReconcileErrors []error
for key, filestoreIP := range data {
err = c.processConfigMapEntryOnNodeCreation(ctx, key, filestoreIP, node, cm)
eventProcessor := c.eventProcessor
err = eventProcessor.processConfigMapEntryOnNodeCreation(ctx, key, filestoreIP, node, cm)
if err != nil {
configMapReconcileErrors = append(configMapReconcileErrors, err)
}
}
klog.Infof("skipped processing %d entries in config map", len(configMapReconcileErrors))
if len(configMapReconcileErrors) > 0 {
return errors.Join(configMapReconcileErrors...)
}
return nil

}

func (c *LockReleaseController) processConfigMapEntryOnNodeCreation(ctx context.Context, key string, filestoreIP string, node *corev1.Node, cm *corev1.ConfigMap) error {
_, _, _, _, gceInstanceID, gkeNodeInternalIP, err := ParseConfigMapKey(key)
if err != nil {
return fmt.Errorf("failed to parse configmap key %s: %w", key, err)
}
klog.V(6).Infof("Verifying GKE node %s with nodeId %s nodeInternalIP %s exists or not", node.Name, gceInstanceID, gkeNodeInternalIP)
entryMatchesNode, err := c.verifyConfigMapEntry(node, gceInstanceID, gkeNodeInternalIP)
if err != nil {
return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", node.Name, gceInstanceID, gkeNodeInternalIP, err)
}
if entryMatchesNode {
klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s still exists in API server, skip lock info reconciliation", node.Name, gceInstanceID, gkeNodeInternalIP)
return nil
}

// Try to match the latest node, to prevent incorrect releasing the lock in case of a lagging informer/watch
latestNode, err := c.client.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{})
if err != nil {
if apiError.IsNotFound(err) {
opErr := ReleaseLock(filestoreIP, gkeNodeInternalIP)
c.RecordLockReleaseMetrics(opErr)
if opErr != nil {
return fmt.Errorf("failed to release lock: %w", opErr)
}
if err := c.RemoveKeyFromConfigMapWithRetry(ctx, cm, key); err != nil {
return fmt.Errorf("failed to remove key %s from configmap %s/%s: %w", key, cm.Namespace, cm.Name, err)
}
return nil
}
return fmt.Errorf("failed to get node in namespace %w", err)
}
entryMatchesLatestNode, err := c.verifyConfigMapEntry(latestNode, gceInstanceID, gkeNodeInternalIP)
if err != nil {
return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", node.Name, gceInstanceID, gkeNodeInternalIP, err)
}
if entryMatchesLatestNode {
klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s exists in API server, skip lock info reconciliation", node.Name, gceInstanceID, gkeNodeInternalIP)
return nil
}

klog.Infof("GKE node %s with nodeId %s nodeInternalIP %s no longer exists, releasing lock for Filestore IP %s", node.Name, gceInstanceID, gkeNodeInternalIP, filestoreIP)
opErr := ReleaseLock(filestoreIP, gkeNodeInternalIP)
c.RecordLockReleaseMetrics(opErr)
if opErr != nil {
return fmt.Errorf("failed to release lock: %w", opErr)
}
klog.Infof("Removing lock info key %s from configmap %s/%s with data %v", key, cm.Namespace, cm.Name, cm.Data)
// Apply the "Get() and Update(), or retry" logic in RemoveKeyFromConfigMap().
// This will increase the number of k8s api calls,
// but reduce repetitive ReleaseLock() due to kubeclient api failures in each reconcile loop.
if err := c.RemoveKeyFromConfigMapWithRetry(ctx, cm, key); err != nil {
return fmt.Errorf("failed to remove key %s from configmap %s/%s: %w", key, cm.Namespace, cm.Name, err)
}
return nil
}

func (c *LockReleaseController) processNextCreateEvent(ctx context.Context) bool {
obj, shutdown := c.createEventQueue.Get()
if shutdown {
Expand Down Expand Up @@ -306,7 +332,7 @@ func (c *LockReleaseController) handleUpdateEvent(ctx context.Context, oldObj in
data := cm.DeepCopy().Data
var configMapReconcileErrors []error
for key, filestoreIP := range data {
err = c.processConfigMapEntryOnNodeUpdate(ctx, key, filestoreIP, newNode, oldNode, cm)
err = c.eventProcessor.processConfigMapEntryOnNodeUpdate(ctx, key, filestoreIP, newNode, oldNode, cm)
if err != nil {
configMapReconcileErrors = append(configMapReconcileErrors, err)
}
Expand All @@ -317,7 +343,11 @@ func (c *LockReleaseController) handleUpdateEvent(ctx context.Context, oldObj in
return nil
}

func (c *LockReleaseController) processConfigMapEntryOnNodeUpdate(ctx context.Context, key string, filestoreIP string, newNode *corev1.Node, oldNode *corev1.Node, cm *corev1.ConfigMap) error {
func (p *DefaultEventProcessor) processConfigMapEntryOnNodeUpdate(ctx context.Context, key string, filestoreIP string, newNode *corev1.Node, oldNode *corev1.Node, cm *corev1.ConfigMap) error {
if p.ctrl == nil {
return fmt.Errorf("controller not set")
}
c := p.ctrl
_, _, _, _, gceInstanceID, gkeNodeInternalIP, err := ParseConfigMapKey(key)
if err != nil {
return fmt.Errorf("failed to parse configmap key %s: %w", key, err)
Expand All @@ -327,18 +357,20 @@ func (c *LockReleaseController) processConfigMapEntryOnNodeUpdate(ctx context.Co
if err != nil {
return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", newNode.Name, gceInstanceID, gkeNodeInternalIP, err)
}
entryMatchesOldNode, err := c.verifyConfigMapEntry(oldNode, gceInstanceID, gkeNodeInternalIP)
if err != nil {
return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", newNode.Name, gceInstanceID, gkeNodeInternalIP, err)
}
klog.Infof("Checked config map entry against old node(matching result %t), and new node(matching result %t)", entryMatchesOldNode, entryMatchesNewNode)
if entryMatchesNewNode {
klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s still exists in API server, skip lock info reconciliation", newNode.Name, gceInstanceID, gkeNodeInternalIP)
return nil
}

entryMatchesOldNode, err := c.verifyConfigMapEntry(oldNode, gceInstanceID, gkeNodeInternalIP)
if err != nil {
return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %v", newNode.Name, gceInstanceID, gkeNodeInternalIP, err)
}
klog.Infof("Checked config map entry against old node(matching result %t), and new node(matching result %t)", entryMatchesOldNode, entryMatchesNewNode)

if entryMatchesOldNode {
klog.Infof("GKE node %s with nodeId %s nodeInternalIP %s matches a node before update, releasing lock for Filestore IP %s", newNode.Name, gceInstanceID, gkeNodeInternalIP, filestoreIP)
opErr := ReleaseLock(filestoreIP, gkeNodeInternalIP)
opErr := c.lockService.ReleaseLock(filestoreIP, gkeNodeInternalIP)
c.RecordLockReleaseMetrics(opErr)
if opErr != nil {
return fmt.Errorf("failed to release lock: %w", opErr)
Expand Down
Loading

0 comments on commit 7afe11f

Please sign in to comment.