Skip to content

Commit

Permalink
[refactor] Controllerserver (part 2) - createvolume refactor with add…
Browse files Browse the repository at this point in the history
…ed test coverage (#234)

* Breakup the createVolume() from a long function into smaller more focused funcs()

* Added Unit tests for the new funcs

---------

Co-authored-by: Khaja Omer <komer@akamai.com>
  • Loading branch information
komer3 and Khaja Omer authored Sep 20, 2024
1 parent 20a6dc1 commit 38673fc
Show file tree
Hide file tree
Showing 5 changed files with 952 additions and 101 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ generate-mock:
mockgen -source=internal/driver/nodeserver_helpers.go -destination=mocks/mock_nodeserver.go -package=mocks
mockgen -source=pkg/mount-manager/device-utils.go -destination=mocks/mock_deviceutils.go -package=mocks
mockgen -source=pkg/mount-manager/fs-utils.go -destination=mocks/mock_fsutils.go -package=mocks
mockgen -source=pkg/linode-client/linode-client.go -destination=mocks/mock_linodeclient.go -package=mocks
mockgen -source=pkg/cryptsetup-client/cryptsetup-client.go -destination=mocks/mock_cryptsetupclient.go -package=mocks

.PHONY: test
Expand Down
117 changes: 17 additions & 100 deletions internal/driver/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,124 +61,41 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
log, ctx, done := logger.GetLogger(ctx).WithMethod("CreateVolume")
defer done()

name := req.GetName()
log.V(2).Info("Processing request", "req", req)

if len(name) == 0 {
return &csi.CreateVolumeResponse{}, errNoVolumeName
}

// validate volume capabilities
volCapabilities := req.GetVolumeCapabilities()
if len(volCapabilities) == 0 {
return &csi.CreateVolumeResponse{}, errNoVolumeCapabilities
}
if !validVolumeCapabilities(volCapabilities) {
return &csi.CreateVolumeResponse{}, errInvalidVolumeCapability(volCapabilities)
// Validate the incoming request to ensure it meets the necessary criteria.
// This includes checking for required fields and valid volume capabilities.
if err := cs.validateCreateVolumeRequest(ctx, req); err != nil {
return &csi.CreateVolumeResponse{}, err
}

capRange := req.GetCapacityRange()
size, err := getRequestCapacitySize(capRange)
// Prepare the volume parameters such as name and SizeGB from the request.
// This step may involve calculations or adjustments based on the request's content.
volName, sizeGB, size, err := cs.prepareVolumeParams(ctx, req)
if err != nil {
return &csi.CreateVolumeResponse{}, err
}

// to avoid mangled requests for existing volumes with hyphen,
// we only strip them out on creation when k8s invented the name
// this is still problematic because we strip "-" from volume-name-prefixes
// that specifically requested "-".
// Don't strip this when volume labels support sufficient length
condensedName := strings.Replace(name, "-", "", -1)

preKey := linodevolumes.CreateLinodeVolumeKey(0, condensedName)

volumeName := preKey.GetNormalizedLabelWithPrefix(cs.driver.volumeLabelPrefix)
targetSizeGB := bytesToGB(size)

log.V(4).Info("CreateVolume details", "storage_size_giga_bytes", targetSizeGB, "volume_name", volumeName)
// Create volume context
volContext := cs.createVolumeContext(ctx, req)

volumeContext := make(map[string]string)
if req.Parameters[LuksEncryptedAttribute] == "true" {
// if luks encryption is enabled add a volume context
volumeContext[LuksEncryptedAttribute] = "true"
volumeContext[PublishInfoVolumeName] = volumeName
volumeContext[LuksCipherAttribute] = req.Parameters[LuksCipherAttribute]
volumeContext[LuksKeySizeAttribute] = req.Parameters[LuksKeySizeAttribute]
}

// Attempt to get info about the source volume for
// volume cloning if the datasource is provided in the PVC.
// sourceVolumeInfo will be null if no content source is defined.
contentSource := req.GetVolumeContentSource()
sourceVolumeInfo, err := cs.getContentSourceVolume(ctx, contentSource)
// Attempt to retrieve information about a source volume if the request includes a content source.
// This is important for scenarios where the volume is being cloned from an existing one.
sourceVolInfo, err := cs.getContentSourceVolume(ctx, req.GetVolumeContentSource())
if err != nil {
return &csi.CreateVolumeResponse{}, err
}

// Attempt to create the volume while respecting idempotency.
// If the content source is defined, the source volume will be cloned to create a new volume.
log.V(4).Info("Calling API to create volume", "volumeName", volumeName)
vol, err := cs.attemptCreateLinodeVolume(
ctx,
volumeName,
targetSizeGB,
req.Parameters[VolumeTags],
sourceVolumeInfo,
)
// Create the volume
vol, err := cs.createAndWaitForVolume(ctx, volName, sizeGB, req.Parameters[VolumeTags], sourceVolInfo)
if err != nil {
return &csi.CreateVolumeResponse{}, err
}

// If the existing volume size differs from the requested size, we throw an error.
if vol.Size != targetSizeGB {
if sourceVolumeInfo == nil {
return nil, errAlreadyExists("volume %d already exists of size %d", vol.ID, vol.Size)
}
}

statusPollTimeout := waitTimeout()

// If we're cloning the volume we should extend the timeout
if sourceVolumeInfo != nil {
statusPollTimeout = cloneTimeout()
}

log.V(4).Info("Waiting for volume to be active", "volumeID", vol.ID)
if _, err := cs.client.WaitForVolumeStatus(
ctx, vol.ID, linodego.VolumeActive, statusPollTimeout); err != nil {
return &csi.CreateVolumeResponse{}, errInternal("Timed out waiting for volume %d to be active: %v", vol.ID, err)
}

log.V(4).Info("Volume is active", "volumeID", vol.ID)

key := linodevolumes.CreateLinodeVolumeKey(vol.ID, vol.Label)
resp := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: key.GetVolumeKey(),
CapacityBytes: size,
AccessibleTopology: []*csi.Topology{
{
Segments: map[string]string{
VolumeTopologyRegion: vol.Region,
},
},
},
VolumeContext: volumeContext,
},
}

// Append the content source to the response
if sourceVolumeInfo != nil {
resp.Volume.ContentSource = &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Volume{
Volume: &csi.VolumeContentSource_VolumeSource{
VolumeId: contentSource.GetVolume().GetVolumeId(),
},
},
}
}
// Prepare and return response
resp := cs.prepareCreateVolumeResponse(ctx, vol, size, volContext, sourceVolInfo, req.GetVolumeContentSource())

log.V(2).Info("Volume created successfully", "response", resp)
log.V(2).Info("CreateVolume response", "response", resp)
return resp, nil
}

Expand Down
146 changes: 145 additions & 1 deletion internal/driver/controllerserver_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ func (cs *ControllerServer) getContentSourceVolume(ctx context.Context, contentS
return nil, errRegionMismatch(volumeData.Region, cs.metadata.Region)
}

return volumeInfo, nil // Return the parsed volume information
log.V(4).Info("Content source volume", "volumeData", volumeData)
return volumeInfo, nil
}

// attemptCreateLinodeVolume creates a Linode volume while ensuring idempotency.
Expand Down Expand Up @@ -332,3 +333,146 @@ func validVolumeCapabilities(caps []*csi.VolumeCapability) bool {
// All capabilities are valid; return true
return true
}

// validateCreateVolumeRequest checks if the provided CreateVolumeRequest is valid.
// It ensures that the volume name is not empty, that volume capabilities are provided,
// and that the capabilities are valid. Returns an error if any validation fails.
func (cs *ControllerServer) validateCreateVolumeRequest(ctx context.Context, req *csi.CreateVolumeRequest) error {
log := logger.GetLogger(ctx)
log.V(4).Info("Entering validateCreateVolumeRequest()", "req", req)
defer log.V(4).Info("Exiting validateCreateVolumeRequest()")

// Check if the volume name is empty; if so, return an error indicating no volume name was provided.
if len(req.GetName()) == 0 {
return errNoVolumeName
}

// Retrieve the volume capabilities from the request.
volCaps := req.GetVolumeCapabilities()
// Check if no volume capabilities are provided; if so, return an error.
if len(volCaps) == 0 {
return errNoVolumeCapabilities
}
// Validate the provided volume capabilities; if they are invalid, return an error.
if !validVolumeCapabilities(volCaps) {
return errInvalidVolumeCapability(volCaps)
}

// If all checks pass, return nil indicating the request is valid.
return nil
}

// prepareVolumeParams prepares the volume parameters for creation.
// It extracts the capacity range from the request, calculates the size,
// and generates a normalized volume name. Returns the volume name and size in GB.
func (cs *ControllerServer) prepareVolumeParams(ctx context.Context, req *csi.CreateVolumeRequest) (string, int, int64, error) {
log := logger.GetLogger(ctx)
log.V(4).Info("Entering prepareVolumeParams()", "req", req)
defer log.V(4).Info("Exiting prepareVolumeParams()")

// Retrieve the capacity range from the request to determine the size limits for the volume.
capRange := req.GetCapacityRange()
// Get the requested size in bytes, handling any potential errors.
size, err := getRequestCapacitySize(capRange)
if err != nil {
return "", 0, 0, err
}

condensedName := strings.Replace(req.GetName(), "-", "", -1)
preKey := linodevolumes.CreateLinodeVolumeKey(0, condensedName)
volumeName := preKey.GetNormalizedLabelWithPrefix(cs.driver.volumeLabelPrefix)
targetSizeGB := bytesToGB(size)

log.V(4).Info("Volume parameters prepared", "volumeName", volumeName, "targetSizeGB", targetSizeGB)
return volumeName, targetSizeGB, size, nil
}

// createVolumeContext creates a context map for the volume based on the request parameters.
// If the volume is encrypted, it adds relevant encryption attributes to the context.
func (cs *ControllerServer) createVolumeContext(ctx context.Context, req *csi.CreateVolumeRequest) map[string]string {
log := logger.GetLogger(ctx)
log.V(4).Info("Entering createVolumeContext()", "req", req)
defer log.V(4).Info("Exiting createVolumeContext()")

volumeContext := make(map[string]string)

if req.Parameters[LuksEncryptedAttribute] == "true" {
volumeContext[LuksEncryptedAttribute] = "true"
volumeContext[PublishInfoVolumeName] = req.GetName()
volumeContext[LuksCipherAttribute] = req.Parameters[LuksCipherAttribute]
volumeContext[LuksKeySizeAttribute] = req.Parameters[LuksKeySizeAttribute]
}

log.V(4).Info("Volume context created", "volumeContext", volumeContext)
return volumeContext
}

// createAndWaitForVolume attempts to create a new volume and waits for it to become active.
// It logs the process and handles any errors that occur during creation or waiting.
func (cs *ControllerServer) createAndWaitForVolume(ctx context.Context, name string, sizeGB int, tags string, sourceInfo *linodevolumes.LinodeVolumeKey) (*linodego.Volume, error) {
log := logger.GetLogger(ctx)
log.V(4).Info("Entering createAndWaitForVolume()", "name", name, "sizeGB", sizeGB, "tags", tags)
defer log.V(4).Info("Exiting createAndWaitForVolume()")

vol, err := cs.attemptCreateLinodeVolume(ctx, name, sizeGB, tags, sourceInfo)
if err != nil {
return nil, err
}

// Check if the created volume's size matches the requested size.
// if not, and sourceInfo is nil, it indicates that the volume was not created from a source.
if vol.Size != sizeGB && sourceInfo == nil {
return nil, errAlreadyExists("volume %d already exists with size %d", vol.ID, vol.Size)
}

// Set the timeout for polling the volume status based on whether it's a clone or not.
statusPollTimeout := waitTimeout()
if sourceInfo != nil {
statusPollTimeout = cloneTimeout()
}

log.V(4).Info("Waiting for volume to be active", "volumeID", vol.ID)
vol, err = cs.client.WaitForVolumeStatus(ctx, vol.ID, linodego.VolumeActive, statusPollTimeout)
if err != nil {
return nil, errInternal("Timed out waiting for volume %d to be active: %v", vol.ID, err)
}

log.V(4).Info("Volume is active", "volumeID", vol.ID)
return vol, nil
}

// prepareCreateVolumeResponse constructs a CreateVolumeResponse from the created volume details.
// It includes the volume ID, capacity, accessible topology, and any relevant context or content source.
func (cs *ControllerServer) prepareCreateVolumeResponse(ctx context.Context, vol *linodego.Volume, size int64, context map[string]string, sourceInfo *linodevolumes.LinodeVolumeKey, contentSource *csi.VolumeContentSource) *csi.CreateVolumeResponse {
log := logger.GetLogger(ctx)
log.V(4).Info("Entering prepareCreateVolumeResponse()", "vol", vol)
defer log.V(4).Info("Exiting prepareCreateVolumeResponse()")

key := linodevolumes.CreateLinodeVolumeKey(vol.ID, vol.Label)
resp := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: key.GetVolumeKey(),
CapacityBytes: size,
AccessibleTopology: []*csi.Topology{
{
Segments: map[string]string{
VolumeTopologyRegion: vol.Region,
},
},
},
VolumeContext: context,
},
}

if sourceInfo != nil {
resp.Volume.ContentSource = &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Volume{
Volume: &csi.VolumeContentSource_VolumeSource{
VolumeId: contentSource.GetVolume().GetVolumeId(),
},
},
}
}

return resp
}
Loading

0 comments on commit 38673fc

Please sign in to comment.