Skip to content

Commit

Permalink
Check access to VRG on a MC before deleting the MW
Browse files Browse the repository at this point in the history
Signed-off-by: Benamar Mekhissi <bmekhiss@ibm.com>
  • Loading branch information
Benamar Mekhissi committed Dec 22, 2023
1 parent ea6fdba commit 5e8bca8
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
2 changes: 1 addition & 1 deletion controllers/drcluster_mmode.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (u *drclusterInstance) mModeActivationsRequired() (map[string]ramen.Storage
vrgs, err := u.getVRGs(drpcCollection)
if err != nil {
u.log.Info("Failed to get VRGs for DRPC that is failing over",
"DRPCCommonName", drpcCollection.drpc.GetName(),
"DRPCName", drpcCollection.drpc.GetName(),
"DRPCNamespace", drpcCollection.drpc.GetNamespace())

u.requeue = true
Expand Down
30 changes: 27 additions & 3 deletions controllers/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func requiresRegionalFailoverPrerequisites(
ctx context.Context,
apiReader client.Reader,
s3ProfileNames []string,
DRPCCommonName string,
drpcName string,
vrgNamespace string,
vrgs map[string]*rmn.VolumeReplicationGroup,
failoverCluster string,
Expand All @@ -573,7 +573,7 @@ func requiresRegionalFailoverPrerequisites(

vrg := getLastKnownPrimaryVRG(vrgs, failoverCluster)
if vrg == nil {
vrg = GetLastKnownVRGPrimaryFromS3(ctx, apiReader, s3ProfileNames, DRPCCommonName, vrgNamespace, objectStoreGetter, log)
vrg = GetLastKnownVRGPrimaryFromS3(ctx, apiReader, s3ProfileNames, drpcName, vrgNamespace, objectStoreGetter, log)
if vrg == nil {
// TODO: Is this an error, should we ensure at least one VRG is found in the edge cases?
// Potentially missing VRG and so stop failover? How to recover in that case?
Expand Down Expand Up @@ -1340,7 +1340,12 @@ func (d *DRPCInstance) cleanupSecondaries(skipCluster string) (bool, error) {
}

// If VRG hasn't been deleted, then make sure that the MW for it is deleted and
// return and wait
// return and wait, but first make sure that the cluster is accessible
if err := checkAccessToVRGOnCluster(d.reconciler.MCVGetter, d.instance.GetName(), d.instance.GetNamespace(),
d.vrgNamespace, clusterName); err != nil {
return false, err
}

mwDeleted, err := d.ensureVRGManifestWorkOnClusterDeleted(clusterName)
if err != nil {
return false, err
Expand Down Expand Up @@ -1380,6 +1385,25 @@ func (d *DRPCInstance) cleanupSecondaries(skipCluster string) (bool, error) {
return true, nil
}

func checkAccessToVRGOnCluster(mcvGetter rmnutil.ManagedClusterViewGetter,
name, drpcNamespace, vrgNamespace, clusterName string,
) error {
annotations := make(map[string]string)

annotations[DRPCNameAnnotation] = name
annotations[DRPCNamespaceAnnotation] = drpcNamespace

_, err := mcvGetter.GetVRGFromManagedCluster(name,
vrgNamespace, clusterName, annotations)
if err != nil {
if !errors.IsNotFound(err) {
return err
}
}

return nil
}

func (d *DRPCInstance) updateUserPlacementRule(homeCluster, reason string) error {
d.log.Info(fmt.Sprintf("Updating user Placement %s homeCluster %s",
d.userPlacement.GetName(), homeCluster))
Expand Down
7 changes: 5 additions & 2 deletions controllers/drplacementcontrol_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2166,6 +2166,8 @@ func (r *DRPlacementControlReconciler) determineDRPCState(
dstCluster string,
log logr.Logger,
) (Progress, error) {
log.Info("Rebuild DRPC state")

vrgNamespace, err := selectVRGNamespace(r.Client, log, drpc, placementObj)
if err != nil {
log.Info("Failed to select VRG namespace")
Expand Down Expand Up @@ -2286,8 +2288,9 @@ func (r *DRPlacementControlReconciler) determineDRPCState(

for k, v := range vrgs {
clusterName, vrg = k, v

break
if vrg.Spec.ReplicationState == rmn.Primary {
break
}
}

if drpc.Spec.Action == rmn.DRAction(vrg.Spec.Action) {
Expand Down

0 comments on commit 5e8bca8

Please sign in to comment.