Skip to content

Commit

Permalink
handle case where PeerClasses are not available
Browse files Browse the repository at this point in the history
Signed-off-by: rakeshgm <rakeshgm@redhat.com>
  • Loading branch information
rakeshgm committed Aug 8, 2024
1 parent f68a818 commit d93cb6f
Showing 1 changed file with 80 additions and 39 deletions.
119 changes: 80 additions & 39 deletions internal/controller/volumereplicationgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,14 +663,11 @@ func (v *VRGInstance) updatePVCList() error {
}

if v.instance.Spec.Async == nil {
if err := v.filterSyncPVCs(pvcList); err != nil {
return fmt.Errorf("failed to filter SyncPVCs (%w)", err)
err := v.validateSyncPVCs(pvcList)
if err != nil {
return err
}

v.volRepPVCs = make([]corev1.PersistentVolumeClaim, len(pvcList.Items))
total := copy(v.volRepPVCs, pvcList.Items)
v.log.Info("Found PersistentVolumeClaims", "count", total)

return nil
}

Expand Down Expand Up @@ -734,46 +731,95 @@ func (v *VRGInstance) separatePVCsUsingVRGStatus(pvcList *corev1.PersistentVolum
}
}

func (v *VRGInstance) filterSyncPVCs(pvcList *corev1.PersistentVolumeClaimList) error {
//nolint:gocognit
func (v *VRGInstance) validateSyncPVCs(pvcList *corev1.PersistentVolumeClaimList) error {
for idx := range pvcList.Items {
pvc := &pvcList.Items[idx]

peerClass, storageClass, err := v.getPeerClassAndSC(pvc, v.instance.Spec.Sync.PeerClasses)
scName := pvc.Spec.StorageClassName
peerClasses := v.instance.Spec.Sync.PeerClasses

storageClass, err := v.validateAndGetStorageClass(scName, pvc)
if err != nil {
return err
}

if len(peerClass.StorageID) != 1 {
return fmt.Errorf("no StorageID found in Sync PeerClass")
}
v.log.Info(fmt.Sprintf("storageClass %s validated", storageClass.Name))

if len(peerClasses) != 0 {
if _, ok := storageClass.Labels[StorageIDLabel]; !ok {
return fmt.Errorf("storageID label not found in storageClass for sync PVC %s", pvc.Name)
}

peerClass := v.findPeerClassMatchingSC(peerClasses, *scName)
if peerClass == nil {
return fmt.Errorf("peerClass matching storageClass %s not found for sync PVC", *scName)
}

if len(peerClass.StorageID) != 1 {
return fmt.Errorf("no StorageID found in sync PeerClass")
}

if storageClass.Labels[StorageIDLabel] != peerClass.StorageID[0] {
return fmt.Errorf("storageId %s not present in peerClass", storageClass.Labels[StorageIDLabel])
if storageClass.Labels[StorageIDLabel] != peerClass.StorageID[0] {
return fmt.Errorf("storageId %s not present in peerClass", storageClass.Labels[StorageIDLabel])
}
}
}

v.volRepPVCs = make([]corev1.PersistentVolumeClaim, len(pvcList.Items))
total := copy(v.volRepPVCs, pvcList.Items)
v.log.Info("Found PersistentVolumeClaims", "count", total)

return nil
}

//nolint:gocognit,cyclop
func (v *VRGInstance) separatePVCsUsingStorageClassProvisioner(pvcList *corev1.PersistentVolumeClaimList) error {
peerClasses := v.instance.Spec.Async.PeerClasses

for idx := range pvcList.Items {
pvc := &pvcList.Items[idx]
scName := pvc.Spec.StorageClassName

peerClass, storageClass, err := v.getPeerClassAndSC(pvc, v.instance.Spec.Async.PeerClasses)
storageClass, err := v.validateAndGetStorageClass(scName, pvc)
if err != nil {
v.log.Error(err, fmt.Sprintf("failed to find match between PeerClasses and StorageClass for async PVC %s",
pvc.Name))

return err
}

replicationClass := v.findMatchingReplicationClass(peerClass, storageClass, pvc)
v.log.Info(fmt.Sprintf("storageClass %s validated", storageClass.Name))

if replicationClass == nil && !v.instance.Spec.VolSync.Disabled {
// ignoring volsnapClass till we store it in the future
if _, err := v.findVolSnapClass(storageClass, pvc); err != nil {
switch len(v.instance.Spec.Async.PeerClasses) {
case 0:
v.log.Info("validate using sc provisioner")

replicationClassMatchFound := false

for _, replicationClass := range v.replClassList.Items {
if storageClass.Provisioner == replicationClass.Spec.Provisioner {
v.volRepPVCs = append(v.volRepPVCs, *pvc)
replicationClassMatchFound = true

break
}
}

if !replicationClassMatchFound {
v.volSyncPVCs = append(v.volSyncPVCs, *pvc)
}
default:
v.log.Info("filtering using peerClasses")

replicationClass, err := v.findReplicationClassUsingPeerClasses(peerClasses, storageClass, pvc)
if err != nil {
return err
}

if replicationClass == nil && !v.instance.Spec.VolSync.Disabled {
// ignoring volsnapClass till we store it in the future
if _, err := v.findVolSnapClass(storageClass, pvc); err != nil {
return err
}
}
}
}

Expand All @@ -787,23 +833,25 @@ func (v *VRGInstance) separatePVCsUsingStorageClassProvisioner(pvcList *corev1.P
return nil
}

func (v *VRGInstance) getPeerClassAndSC(pvc *corev1.PersistentVolumeClaim, peerClasses []ramendrv1alpha1.PeerClass,
) (*ramendrv1alpha1.PeerClass, *storagev1.StorageClass, error) {
func (v *VRGInstance) findReplicationClassUsingPeerClasses(peerClasses []ramendrv1alpha1.PeerClass,
storageClass *storagev1.StorageClass, pvc *corev1.PersistentVolumeClaim) (
*volrep.VolumeReplicationClass, error,
) {
scName := pvc.Spec.StorageClassName

storageClass, err := v.validateAndGetStorageClass(scName, pvc)
if err != nil {
return nil, nil, err
// this check is needed because we use storageID to compare with other VRC or VSC
if _, ok := storageClass.Labels[StorageIDLabel]; !ok {
return nil, fmt.Errorf("storageID label not found in storageClass for PVC %s", pvc.Name)
}

v.log.Info(fmt.Sprintf("storageClass %s validated", storageClass.Name))

peerClass := v.findPeerClassMatchingSC(peerClasses, *scName)
if peerClass == nil {
return nil, nil, fmt.Errorf("peerClass matching storageClass %s not found", storageClass.Name)
return nil, fmt.Errorf("peerClass matching storageClass %s not found for async PVC", *scName)
}

return peerClass, storageClass, nil
replicationClass := v.findMatchingReplicationClass(peerClass, storageClass, pvc)

return replicationClass, nil
}

func (v *VRGInstance) findVolSnapClass(storageClass *storagev1.StorageClass,
Expand Down Expand Up @@ -847,7 +895,7 @@ func (v *VRGInstance) selectVSC(snapshotClass snapv1.VolumeSnapshotClass,
}

// nolint: cyclop
func (v *VRGInstance) selectVRC(replicationClass volrep.VolumeReplicationClass,
func (v *VRGInstance) selectVRCUsingPeerClassAndSC(replicationClass volrep.VolumeReplicationClass,
peerClass ramendrv1alpha1.PeerClass, storageClass storagev1.StorageClass,
) bool {
v.log.Info("selecting VolumeReplicationClass")
Expand Down Expand Up @@ -904,7 +952,7 @@ func (v *VRGInstance) findMatchingReplicationClass(peerClass *ramendrv1alpha1.Pe
peerClass.ReplicationID, pvc.Name, storageClass.Name))

for _, replicationClass := range v.replClassList.Items {
if v.selectVRC(replicationClass, *peerClass, *storageClass) {
if v.selectVRCUsingPeerClassAndSC(replicationClass, *peerClass, *storageClass) {
v.volRepPVCs = append(v.volRepPVCs, *pvc)

return &replicationClass
Expand All @@ -927,13 +975,6 @@ func (v *VRGInstance) validateAndGetStorageClass(scName *string, pvc *corev1.Per
return nil, fmt.Errorf("failed to get the storageclass with name %s (%w)", *scName, err)
}

storageClassLabels := storageClass.GetLabels()

// this check is needed because we use storageID to compare with other VRC or VSC
if _, ok := storageClassLabels[StorageIDLabel]; !ok {
return nil, fmt.Errorf("storageID label not found in storageClass for PVC %s", pvc.Name)
}

return storageClass, nil
}

Expand Down

0 comments on commit d93cb6f

Please sign in to comment.