Skip to content

Commit

Permalink
Check pending update operation for resize
Browse files Browse the repository at this point in the history
  • Loading branch information
saikat-royc committed Aug 11, 2021
1 parent 874174f commit e1cdd06
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 20 deletions.
8 changes: 8 additions & 0 deletions pkg/cloud_provider/file/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ func (manager *fakeServiceManager) CreateInstanceFromBackupSource(ctx context.Co
return instance, nil
}

func (m *fakeServiceManager) HasOperations(ctx context.Context, obj *ServiceInstance, operationType string, done bool) (bool, error) {
return false, nil
}

func notFoundError() *googleapi.Error {
return &googleapi.Error{
Errors: []googleapi.ErrorItem{
Expand Down Expand Up @@ -228,3 +232,7 @@ func (m *fakeBlockingServiceManager) DeleteInstance(ctx context.Context, obj *Se
<-execute
return m.fakeServiceManager.DeleteInstance(ctx, obj)
}

func (m *fakeBlockingServiceManager) HasOperations(ctx context.Context, obj *ServiceInstance, operationType string, done bool) (bool, error) {
return false, nil
}
66 changes: 50 additions & 16 deletions pkg/cloud_provider/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package file

import (
"context"
"encoding/json"
"fmt"
"net/http"
"regexp"
Expand Down Expand Up @@ -71,6 +72,7 @@ type Service interface {
CreateBackup(ctx context.Context, obj *ServiceInstance, backupId, backupLocation string) (*filev1beta1.Backup, error)
DeleteBackup(ctx context.Context, backupId string) error
CreateInstanceFromBackupSource(ctx context.Context, obj *ServiceInstance, volumeSourceSnapshotId string) (*ServiceInstance, error)
HasOperations(ctx context.Context, obj *ServiceInstance, operationType string, done bool) (bool, error)
}

type gcfsServiceManager struct {
Expand Down Expand Up @@ -208,7 +210,7 @@ func (manager *gcfsServiceManager) GetInstance(ctx context.Context, obj *Service
}

if instance != nil {
glog.V(5).Infof("GetInstance call fetched instance %+v", instance)
glog.V(4).Infof("GetInstance call fetched instance %+v", instance)
return cloudInstanceToServiceInstance(instance)
}
return nil, fmt.Errorf("failed to get instance %v", instanceUri)
Expand Down Expand Up @@ -306,32 +308,21 @@ func (manager *gcfsServiceManager) ListInstances(ctx context.Context, obj *Servi

func (manager *gcfsServiceManager) ResizeInstance(ctx context.Context, obj *ServiceInstance) (*ServiceInstance, error) {
instanceuri := instanceURI(obj.Project, obj.Location, obj.Name)
instance, err := manager.GetInstance(ctx, obj)
if err != nil {
glog.Errorf("Failed to get instance %s for resize operation, error %v", instanceuri, err)
return nil, err
}

// High Scale tier supports shrink of capacity. However CSI spec does not support it.
if util.BytesToGb(obj.Volume.SizeBytes) <= util.BytesToGb(instance.Volume.SizeBytes) {
return instance, nil
}

// Create a file instance for the Patch request.
betaObj := &filev1beta1.Instance{
Tier: obj.Tier,
FileShares: []*filev1beta1.FileShareConfig{
{
Name: instance.Volume.Name,
Name: obj.Volume.Name,
// This is the updated instance size requested.
CapacityGb: util.BytesToGb(obj.Volume.SizeBytes),
},
},
Networks: []*filev1beta1.NetworkConfig{
{
Network: instance.Network.Name,
Network: obj.Network.Name,
Modes: []string{"MODE_IPV4"},
ReservedIpRange: instance.Network.ReservedIpRange,
ReservedIpRange: obj.Network.ReservedIpRange,
},
},
}
Expand All @@ -354,7 +345,7 @@ func (manager *gcfsServiceManager) ResizeInstance(ctx context.Context, obj *Serv
return nil, fmt.Errorf("WaitFor patch op %s failed: %v", op.Name, err)
}

instance, err = manager.GetInstance(ctx, obj)
instance, err := manager.GetInstance(ctx, obj)
if err != nil {
return nil, fmt.Errorf("failed to get instance after creation: %v", err)
}
Expand Down Expand Up @@ -497,3 +488,46 @@ func CreateBackpURI(obj *ServiceInstance, backupName, backupLocation string) (st

return backupURI(obj.Project, region, backupName), region, nil
}

func (manager *gcfsServiceManager) HasOperations(ctx context.Context, obj *ServiceInstance, operationType string, done bool) (bool, error) {
uri := instanceURI(obj.Project, obj.Location, obj.Name)
var totalFilteredOps []*filev1beta1.Operation
var nextToken string
for {
resp, err := manager.operationsService.List(locationURI(obj.Project, obj.Location)).PageToken(nextToken).Context(ctx).Do()
if err != nil {
return false, fmt.Errorf("List operations for instance %q, token %q failed: %v", uri, nextToken, err)
}

filteredOps, err := ApplyFilter(resp.Operations, uri, operationType, done)
if err != nil {
return false, err
}

totalFilteredOps = append(totalFilteredOps, filteredOps...)
if resp.NextPageToken == "" {
break
}
nextToken = resp.NextPageToken
}

return len(totalFilteredOps) > 0, nil
}

func ApplyFilter(ops []*filev1beta1.Operation, uri string, opType string, done bool) ([]*filev1beta1.Operation, error) {
var res []*filev1beta1.Operation
for _, op := range ops {
var meta filev1beta1.OperationMetadata
if op.Metadata == nil {
continue
}
if err := json.Unmarshal(op.Metadata, &meta); err != nil {
return nil, err
}
if meta.Target == uri && meta.Verb == opType && op.Done == done {
glog.V(4).Infof("Operation %q match filter for target %q", op.Name, meta.Target)
res = append(res, op)
}
}
return res, nil
}
32 changes: 28 additions & 4 deletions pkg/csi_driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,18 +458,42 @@ func (s *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi.

filer, _, err := getFileInstanceFromID(volumeID)
if err != nil {
glog.Errorf("failed to get instance for volumeID %v expansion, error: %v", volumeID, err)
return nil, err
return nil, status.Error(codes.Internal, err.Error())
}

filer.Project = s.config.cloud.Project
filer, err = s.config.fileService.GetInstance(ctx, filer)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if filer.State != "READY" {
return nil, fmt.Errorf("Volume %q is not yet ready, current state %q", volumeID, filer.State)
}

if util.BytesToGb(reqBytes) <= util.BytesToGb(filer.Volume.SizeBytes) {
glog.Infof("Controller expand volume succeeded for volume %v, existing size(bytes): %v", volumeID, filer.Volume.SizeBytes)
return &csi.ControllerExpandVolumeResponse{
CapacityBytes: filer.Volume.SizeBytes,
NodeExpansionRequired: false,
}, nil
}

hasPendingOps, err := s.config.fileService.HasOperations(ctx, filer, "update", false /* done */)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

if hasPendingOps {
return nil, status.Error(codes.DeadlineExceeded, fmt.Sprintf("Update operation ongoing for volume %v", volumeID))
}

filer.Volume.SizeBytes = reqBytes
newfiler, err := s.config.fileService.ResizeInstance(ctx, filer)
if err != nil {
glog.Errorf("failed to resize volumeID %v, error: %v", volumeID, err)
return nil, err
return nil, status.Error(codes.Internal, err.Error())
}

glog.Infof("Controller expand volume succeeded for volume %v, new size(bytes): %v", volumeID, newfiler.Volume.SizeBytes)
return &csi.ControllerExpandVolumeResponse{
CapacityBytes: newfiler.Volume.SizeBytes,
NodeExpansionRequired: false,
Expand Down

0 comments on commit e1cdd06

Please sign in to comment.