Skip to content

Commit

Permalink
[refactor] Controllerserver (part 3) - refactor publish unpublish vol…
Browse files Browse the repository at this point in the history
…ume (#235)

* refactor: move the helper funcs and constants from controllerserver.go into a helper file

* Adding comments to code for better clarity

* updating to use errors.go for grpc errors in helper funcs

* refactor getRequestCapacitySize() for better clarity

* Adding godoc comments to the grpc functions

* update comments

* Breakup the createVolume() from a long function into smaller more focused funcs()

* Added Unit tests for the new funcs

* clean up

* cleanup

* refactor: breakdown the ControllerPublishVolume() into small helper func for better clarity. Add/update comments. Add/update logging

* fix the bug i created :)

* Add Unit tests plus misc changes

* fix csi-sanity issue

* return empty response when there is an error

* fix the idempotency issue

* fixup commit

* update the test case

* fixup

---------

Co-authored-by: Khaja Omer <komer@akamai.com>
Co-authored-by: amold1 <amold1@gmail.com>
  • Loading branch information
3 people authored Sep 23, 2024
1 parent e132756 commit d290cb7
Show file tree
Hide file tree
Showing 3 changed files with 763 additions and 122 deletions.
124 changes: 40 additions & 84 deletions internal/driver/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"strconv"
"strings"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/linode/linodego"
Expand Down Expand Up @@ -54,8 +53,8 @@ func NewControllerServer(ctx context.Context, driver *LinodeDriver, client linod
return cs, nil
}

// CreateVolume provisions a new volume on behalf of a user, which can be used as a block device or mounted filesystem.
// This operation is idempotent, meaning multiple calls with the same parameters will not create duplicate volumes.
// CreateVolume provisions a new volume on behalf of a user, which can be used as a block device or mounted filesystem.
// This operation is idempotent, meaning multiple calls with the same parameters will not create duplicate volumes.
// For more details, refer to the CSI Driver Spec documentation.
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
log, ctx, done := logger.GetLogger(ctx).WithMethod("CreateVolume")
Expand Down Expand Up @@ -137,9 +136,9 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return &csi.DeleteVolumeResponse{}, nil
}

// ControllerPublishVolume attaches a volume to a specified node.
// It ensures the volume is not already attached to another node
// and that the node can accommodate the attachment. Returns
// ControllerPublishVolume attaches a volume to a specified node.
// It ensures the volume is not already attached to another node
// and that the node can accommodate the attachment. Returns
// the device path if successful.
// For more details, refer to the CSI Driver Spec documentation.
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
Expand All @@ -148,100 +147,57 @@ func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *cs

log.V(2).Info("Processing request", "req", req)

linodeID, statusErr := linodevolumes.NodeIdAsInt("ControllerPublishVolume", req)
if statusErr != nil {
return &csi.ControllerPublishVolumeResponse{}, statusErr
}

volumeID, statusErr := linodevolumes.VolumeIdAsInt("ControllerPublishVolume", req)
if statusErr != nil {
return &csi.ControllerPublishVolumeResponse{}, statusErr
// Validate the request and get Linode ID and Volume ID
linodeID, volumeID, err := cs.validateControllerPublishVolumeRequest(ctx, req)
if err != nil {
return &csi.ControllerPublishVolumeResponse{}, err
}

cap := req.GetVolumeCapability()
if cap == nil {
return &csi.ControllerPublishVolumeResponse{}, errNoVolumeCapability
// Check if the volume exists and is valid.
// If the volume is already attached to the specified instance, it returns its device path.
devicePath, err := cs.getAndValidateVolume(ctx, volumeID, linodeID)
if err != nil {
return &csi.ControllerPublishVolumeResponse{}, err
}
if !validVolumeCapabilities([]*csi.VolumeCapability{cap}) {
return &csi.ControllerPublishVolumeResponse{}, errInvalidVolumeCapability([]*csi.VolumeCapability{cap})
// If devicePath is not empty, the volume is already attached
if devicePath != "" {
return &csi.ControllerPublishVolumeResponse{
PublishContext: map[string]string{
devicePathKey: devicePath,
},
}, nil
}

volume, err := cs.client.GetVolume(ctx, volumeID)
if linodego.IsNotFound(err) {
return &csi.ControllerPublishVolumeResponse{}, errVolumeNotFound(volumeID)
} else if err != nil {
return &csi.ControllerPublishVolumeResponse{}, errInternal("get volume %d: %v", volumeID, err)
}
if volume.LinodeID != nil {
if *volume.LinodeID == linodeID {
log.V(4).Info("Volume already attached to instance", "volume_id", volume.ID, "node_id", *volume.LinodeID, "device_path", volume.FilesystemPath)
pvInfo := map[string]string{
devicePathKey: volume.FilesystemPath,
}
return &csi.ControllerPublishVolumeResponse{
PublishContext: pvInfo,
}, nil
}
return &csi.ControllerPublishVolumeResponse{}, errVolumeAttached(volumeID, linodeID)
// Retrieve and validate the instance associated with the Linode ID
instance, err := cs.getInstance(ctx, linodeID)
if err != nil {
return &csi.ControllerPublishVolumeResponse{}, err
}

instance, err := cs.client.GetInstance(ctx, linodeID)
if linodego.IsNotFound(err) {
return &csi.ControllerPublishVolumeResponse{}, errInstanceNotFound(linodeID)
} else if err != nil {
return &csi.ControllerPublishVolumeResponse{}, errInternal("get linode instance %d: %v", linodeID, err)
// Check if the instance can accommodate the volume attachment
if err := cs.checkAttachmentCapacity(ctx, instance); err != nil {
return &csi.ControllerPublishVolumeResponse{}, err
}

log.V(4).Info("Checking if volume can be attached", "volume_id", volumeID, "node_id", linodeID)
// Check to see if there is room to attach this volume to the instance.
if canAttach, err := cs.canAttach(ctx, instance); err != nil {
// Attach the volume to the specified instance
if err := cs.attachVolume(ctx, volumeID, linodeID); err != nil {
return &csi.ControllerPublishVolumeResponse{}, err
} else if !canAttach {
// If we can, try and add a little more information to the error message
// for the caller.
limit, err := cs.maxAllowedVolumeAttachments(ctx, instance)
if errors.Is(err, errNilInstance) {
return &csi.ControllerPublishVolumeResponse{}, errInternal("cannot calculate max volume attachments for a nil instance")
} else if err != nil {
return &csi.ControllerPublishVolumeResponse{}, errMaxAttachments
}
return &csi.ControllerPublishVolumeResponse{}, errMaxVolumeAttachments(limit)
}

// Whether or not the volume attachment should be persisted across
// boots.
//
// Setting this to true will limit the maximum number of attached
// volumes to 8 (eight), minus any instance disks, since volume
// attachments get persisted by adding them to the instance's boot
// config.
persist := false

log.V(4).Info("Executing attach volume", "volume_id", volumeID, "node_id", linodeID)
if _, err := cs.client.AttachVolume(ctx, volumeID, &linodego.VolumeAttachOptions{
LinodeID: linodeID,
PersistAcrossBoots: &persist,
}); err != nil {
code := codes.Internal
if apiErr, ok := err.(*linodego.Error); ok && strings.Contains(apiErr.Message, "is already attached") {
code = codes.Unavailable // Allow a retry if the volume is already attached: race condition can occur here
}
return &csi.ControllerPublishVolumeResponse{}, status.Errorf(code, "attach volume: %v", err)
}

log.V(4).Info("Waiting for volume to attach", "volume_id", volumeID)
volume, err = cs.client.WaitForVolumeLinodeID(ctx, volumeID, &linodeID, waitTimeout())
// Wait for the volume to be successfully attached to the instance
volume, err := cs.client.WaitForVolumeLinodeID(ctx, volumeID, &linodeID, waitTimeout())
if err != nil {
return &csi.ControllerPublishVolumeResponse{}, errInternal("wait for volume to attach: %v", err)
return &csi.ControllerPublishVolumeResponse{}, err
}

log.V(2).Info("Volume attached successfully", "volume_id", volume.ID, "node_id", *volume.LinodeID, "device_path", volume.FilesystemPath)

pvInfo := map[string]string{
devicePathKey: volume.FilesystemPath,
}
// Return the response with the device path of the attached volume
return &csi.ControllerPublishVolumeResponse{
PublishContext: pvInfo,
PublishContext: map[string]string{
devicePathKey: volume.FilesystemPath,
},
}, nil
}

Expand Down Expand Up @@ -353,7 +309,7 @@ func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolume
if startingToken != "" {
startingPage, errParse := strconv.ParseInt(startingToken, 10, 64)
if errParse != nil {
return &csi.ListVolumesResponse{}, status.Errorf(codes.Aborted,
return &csi.ListVolumesResponse{}, status.Errorf(codes.Aborted,
"invalid starting token: %q", startingToken)
}

Expand Down Expand Up @@ -414,8 +370,8 @@ func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolume
return resp, nil
}

// ControllerGetCapabilities retrieves the capabilities supported by the
// controller service implemented by this Plugin. It returns a response
// ControllerGetCapabilities retrieves the capabilities supported by the
// controller service implemented by this Plugin. It returns a response
// containing the capabilities available for the CSI driver.
// For more details, refer to the CSI Driver Spec documentation.
func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
Expand Down
Loading

0 comments on commit d290cb7

Please sign in to comment.