Skip to content

Commit

Permalink
Merge pull request #559 from k8s-infra-cherrypick-robot/cherry-pick-5…
Browse files Browse the repository at this point in the history
…56-to-release-1.5

[release-1.5] 80 share support for stateful driver
  • Loading branch information
leiyiz authored Jun 29, 2023
2 parents 25381e4 + dff5a42 commit 8cc3158
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 14 deletions.
5 changes: 2 additions & 3 deletions pkg/csi_driver/multishare_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (m *MultishareController) CreateVolume(ctx context.Context, req *csi.Create
return nil, status.Error(codes.InvalidArgument, err.Error())
}

maxSharesPerInstance, maxShareSizeSizeBytes, err := m.parseMaxVolumeSizeParam(req)
maxSharesPerInstance, maxShareSizeSizeBytes, err := m.parseMaxVolumeSizeParam(req.GetParameters())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -712,8 +712,7 @@ func generateInstanceDescFromEcfsDesc(desc string) string {
return d
}

func (m *MultishareController) parseMaxVolumeSizeParam(req *csi.CreateVolumeRequest) (int, int64, error) {
params := req.GetParameters()
func (m *MultishareController) parseMaxVolumeSizeParam(params map[string]string) (int, int64, error) {
v, ok := params[paramMaxVolumeSize]
if !m.featureMaxSharePerInstance && ok {
return 0, 0, fmt.Errorf("configurable max shares per instance feature not enabled")
Expand Down
2 changes: 1 addition & 1 deletion pkg/csi_driver/multishare_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2035,7 +2035,7 @@ func TestParseMaxVolumeSizeParam(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
m := initTestMultishareControllerWithFeatureOpts(t, tc.features)
sharePerInstance, maxShareCapacity, err := m.parseMaxVolumeSizeParam(tc.req)
sharePerInstance, maxShareCapacity, err := m.parseMaxVolumeSizeParam(tc.req.GetParameters())
if tc.expectError && err == nil {
t.Errorf("failed")
}
Expand Down
33 changes: 29 additions & 4 deletions pkg/csi_driver/multishare_stateful_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,18 @@ func (m *MultishareStatefulController) CreateVolume(ctx context.Context, req *cs
return nil, status.Error(codes.InvalidArgument, err.Error())
}

reqBytes, err := getShareRequestCapacity(req.GetCapacityRange(), util.MinShareSizeBytes, util.MaxShareSizeBytes)
_, maxShareSizeBytes, err := m.mc.parseMaxVolumeSizeParam(req.GetParameters())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

var reqBytes int64
if m.mc.featureMaxSharePerInstance {
reqBytes, err = getShareRequestCapacity(req.GetCapacityRange(), util.ConfigurablePackMinShareSizeBytes, maxShareSizeBytes)
} else {
reqBytes, err = getShareRequestCapacity(req.GetCapacityRange(), util.MinShareSizeBytes, util.MaxShareSizeBytes)
}

if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -139,7 +150,7 @@ func (m *MultishareStatefulController) CreateVolume(ctx context.Context, req *cs
if err != nil {
return nil, err
}
return m.mc.getShareAndGenerateCSICreateVolumeResponse(ctx, instanceSCLabel, share, util.MaxShareSizeBytes)
return m.mc.getShareAndGenerateCSICreateVolumeResponse(ctx, instanceSCLabel, share, maxShareSizeBytes)
}

func (m *MultishareStatefulController) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
Expand Down Expand Up @@ -234,7 +245,21 @@ func (m *MultishareStatefulController) deleteShareInfo(ctx context.Context, siNa
}

func (m *MultishareStatefulController) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
reqBytes, err := getShareRequestCapacity(req.GetCapacityRange(), util.MinShareSizeBytes, util.MaxShareSizeBytes)
volumeId := req.GetVolumeId()
if len(volumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "ControllerExpandVolume volume ID must be provided")
}

maxShareSizeBytes := util.MaxShareSizeBytes
if m.mc.featureMaxSharePerInstance {
var err error
maxShareSizeBytes, err = m.mc.GetShareMaxSizeFromPV(ctx, volumeId)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
klog.Infof("maxShareSizeBytes %d", maxShareSizeBytes)
}
reqBytes, err := getShareRequestCapacity(req.GetCapacityRange(), util.ConfigurablePackMinShareSizeBytes, maxShareSizeBytes)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -275,7 +300,7 @@ func (m *MultishareStatefulController) ControllerExpandVolume(ctx context.Contex
}

if shareInfo.Status.CapacityBytes >= reqBytes && shareInfo.Status.ShareStatus == v1.READY {
klog.Infof("Controller expand volume succeeded for volume %v, size(bytes): %v", req.VolumeId, shareInfo.Status.CapacityBytes)
klog.Infof("Controller expand volume succeeded for volume %v, size(bytes): %v", volumeId, shareInfo.Status.CapacityBytes)

share, err := generateFileShareFromShareInfo(shareInfo)
if err != nil {
Expand Down
28 changes: 22 additions & 6 deletions pkg/csi_driver/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,10 @@ func (recon *MultishareReconciler) generateNewMultishareInstance(instanceInfo *v
Description: generateInstanceDescFromEcfsDesc(recon.config.EcfsDescription),
}

if recon.controllerServer.config.multiShareController.featureMaxSharePerInstance {
instance.MaxShareCount = recon.parseMaxSharePerInstance(instanceInfo.Spec.Parameters)
}

// reserve ip range
var reservedIPRange string
if connectMode == privateServiceAccess {
Expand Down Expand Up @@ -591,7 +595,7 @@ func (recon *MultishareReconciler) fixTwoWayPointers(shareInfos map[string]*v1.S
var err error
if !ok {
klog.Errorf("Share %q is assigned to instance %q but instanceInfo does not exist. Trying to create one", shareName, shareInfo.Status.InstanceHandle)
actualAssigned, err = recon.generateInstanceInfo(shareInfo.Status.InstanceHandle, shareInfo.Spec.InstancePoolTag)
actualAssigned, err = recon.generateInstanceInfo(shareInfo.Status.InstanceHandle, shareInfo.Spec.InstancePoolTag, shareInfo.Spec.Parameters)
if err != nil {
klog.Errorf("Failed to create instanceInfo %q: %v", shareInfo.Status.InstanceHandle, err)
continue
Expand Down Expand Up @@ -649,7 +653,7 @@ func (recon *MultishareReconciler) assignSharesToEligibleOrNewInstances(shareInf
// if InstanceStatus is not empty but instance is no longer present, instance might have been manually deleted by user and can no longer be used
klog.Warningf("instanceInfo %s has non empty InstanceStatus but underlying instance does not exist. Skip assignment to that instance", instanceInfo.Name)
}
if instanceFitShare(instanceInfo, shareInfo) {
if recon.instanceFitShare(instanceInfo, shareInfo) {
instanceURI = util.InstanceInfoNameToInstanceURI(instanceInfo.Name)
instanceInfo, err = recon.assignShareToInstanceInfo(instanceInfo, shareInfo.Name)
if err != nil {
Expand All @@ -674,7 +678,7 @@ func (recon *MultishareReconciler) assignSharesToEligibleOrNewInstances(shareInf
Name: util.NewMultishareInstancePrefix + string(uuid.NewUUID()),
})

instanceInfo, err := recon.generateInstanceInfo(instanceURI, shareInfo.Spec.InstancePoolTag)
instanceInfo, err := recon.generateInstanceInfo(instanceURI, shareInfo.Spec.InstancePoolTag, shareInfo.Spec.Parameters)
if err != nil {
klog.Errorf("Failed to create new instanceInfo %q: %v", instanceURI, err)
continue
Expand Down Expand Up @@ -732,7 +736,7 @@ func (recon *MultishareReconciler) deleteOrResizeInstances(instanceInfos map[str
}

// generateInstanceInfo generates and creates a new instanceInfo object based on instanceURI and storage class tag.
func (recon *MultishareReconciler) generateInstanceInfo(instanceURI string, scTag string) (*v1.InstanceInfo, error) {
func (recon *MultishareReconciler) generateInstanceInfo(instanceURI string, scTag string, shareParams map[string]string) (*v1.InstanceInfo, error) {
storageClass, err := recon.storageClassFromTag(scTag)
if err != nil {
return nil, err
Expand All @@ -748,6 +752,7 @@ func (recon *MultishareReconciler) generateInstanceInfo(instanceURI string, scTa
Spec: v1.InstanceInfoSpec{
CapacityBytes: util.MinMultishareInstanceSizeBytes,
StorageClassName: storageClass.Name,
Parameters: shareParams,
},
}
return recon.createInstanceInfo(context.TODO(), newInstanceInfo)
Expand Down Expand Up @@ -1256,15 +1261,17 @@ func (recon *MultishareReconciler) listMultishareResourceOps(ctx context.Context
}

// instanceFitShare returns true if shareInfo can be assigned to instanceInfo.
func instanceFitShare(instanceInfo *v1.InstanceInfo, shareInfo *v1.ShareInfo) bool {
func (recon *MultishareReconciler) instanceFitShare(instanceInfo *v1.InstanceInfo, shareInfo *v1.ShareInfo) bool {
// Instance needs to be:
// 1. not up for delete 2.of the same storage class and 3. has less than max number of shares assigned already.
if instanceInfo.DeletionTimestamp != nil ||
instanceInfo.Labels[ParamMultishareInstanceScLabel] != shareInfo.Spec.InstancePoolTag {
return false
}

if instanceInfo.Status != nil && len(instanceInfo.Status.ShareNames) >= util.MaxSharesPerInstance {
maxSharePerInstance := recon.parseMaxSharePerInstance(instanceInfo.Spec.Parameters)

if instanceInfo.Status != nil && len(instanceInfo.Status.ShareNames) >= maxSharePerInstance {
return false
}

Expand All @@ -1277,6 +1284,15 @@ func instanceFitShare(instanceInfo *v1.InstanceInfo, shareInfo *v1.ShareInfo) bo
return true
}

// parseMaxSharePerInstance assumes that params has valid parameter of max volume size and returns max share per instance
func (recon *MultishareReconciler) parseMaxSharePerInstance(params map[string]string) int {
maxSharePerInstance, _, _ := recon.controllerServer.config.multiShareController.parseMaxVolumeSizeParam(params)
if maxSharePerInstance == 0 {
maxSharePerInstance = util.MaxSharesPerInstance
}
return maxSharePerInstance
}

// instanceEmpty returns true if instanceInfo.Status.ShareNames has zero entries.
func instanceEmpty(instanceInfo *v1.InstanceInfo) bool {
if instanceInfo.Status == nil || len(instanceInfo.Status.ShareNames) != 0 {
Expand Down

0 comments on commit 8cc3158

Please sign in to comment.