Skip to content

Commit

Permalink
Check access to VRG on a MC before deleting the MW
Browse files Browse the repository at this point in the history
Signed-off-by: Benamar Mekhissi <bmekhiss@ibm.com>
  • Loading branch information
Benamar Mekhissi committed Dec 23, 2023
1 parent ea6fdba commit accaed3
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 29 deletions.
2 changes: 1 addition & 1 deletion controllers/drcluster_mmode.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (u *drclusterInstance) mModeActivationsRequired() (map[string]ramen.Storage
vrgs, err := u.getVRGs(drpcCollection)
if err != nil {
u.log.Info("Failed to get VRGs for DRPC that is failing over",
"DRPCCommonName", drpcCollection.drpc.GetName(),
"DRPCName", drpcCollection.drpc.GetName(),
"DRPCNamespace", drpcCollection.drpc.GetNamespace())

u.requeue = true
Expand Down
30 changes: 27 additions & 3 deletions controllers/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func requiresRegionalFailoverPrerequisites(
ctx context.Context,
apiReader client.Reader,
s3ProfileNames []string,
DRPCCommonName string,
drpcName string,
vrgNamespace string,
vrgs map[string]*rmn.VolumeReplicationGroup,
failoverCluster string,
Expand All @@ -573,7 +573,7 @@ func requiresRegionalFailoverPrerequisites(

vrg := getLastKnownPrimaryVRG(vrgs, failoverCluster)
if vrg == nil {
vrg = GetLastKnownVRGPrimaryFromS3(ctx, apiReader, s3ProfileNames, DRPCCommonName, vrgNamespace, objectStoreGetter, log)
vrg = GetLastKnownVRGPrimaryFromS3(ctx, apiReader, s3ProfileNames, drpcName, vrgNamespace, objectStoreGetter, log)
if vrg == nil {
// TODO: Is this an error, should we ensure at least one VRG is found in the edge cases?
// Potentially missing VRG and so stop failover? How to recover in that case?
Expand Down Expand Up @@ -1340,7 +1340,12 @@ func (d *DRPCInstance) cleanupSecondaries(skipCluster string) (bool, error) {
}

// If VRG hasn't been deleted, then make sure that the MW for it is deleted and
// return and wait
// return and wait, but first make sure that the cluster is accessible
if err := checkAccessToVRGOnCluster(d.reconciler.MCVGetter, d.instance.GetName(), d.instance.GetNamespace(),
d.vrgNamespace, clusterName); err != nil {
return false, err
}

mwDeleted, err := d.ensureVRGManifestWorkOnClusterDeleted(clusterName)
if err != nil {
return false, err
Expand Down Expand Up @@ -1380,6 +1385,25 @@ func (d *DRPCInstance) cleanupSecondaries(skipCluster string) (bool, error) {
return true, nil
}

func checkAccessToVRGOnCluster(mcvGetter rmnutil.ManagedClusterViewGetter,
name, drpcNamespace, vrgNamespace, clusterName string,
) error {
annotations := make(map[string]string)

annotations[DRPCNameAnnotation] = name
annotations[DRPCNamespaceAnnotation] = drpcNamespace

_, err := mcvGetter.GetVRGFromManagedCluster(name,
vrgNamespace, clusterName, annotations)
if err != nil {
if !errors.IsNotFound(err) {
return err
}
}

return nil
}

func (d *DRPCInstance) updateUserPlacementRule(homeCluster, reason string) error {
d.log.Info(fmt.Sprintf("Updating user Placement %s homeCluster %s",
d.userPlacement.GetName(), homeCluster))
Expand Down
14 changes: 12 additions & 2 deletions controllers/drplacementcontrol_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2166,6 +2166,8 @@ func (r *DRPlacementControlReconciler) determineDRPCState(
dstCluster string,
log logr.Logger,
) (Progress, error) {
log.Info("Rebuild DRPC state")

vrgNamespace, err := selectVRGNamespace(r.Client, log, drpc, placementObj)
if err != nil {
log.Info("Failed to select VRG namespace")
Expand Down Expand Up @@ -2286,11 +2288,19 @@ func (r *DRPlacementControlReconciler) determineDRPCState(

for k, v := range vrgs {
clusterName, vrg = k, v
if vrg.Spec.ReplicationState == rmn.Primary {
break
}
}

break
// This can happen if a hub is recovered in the middle of a Relocate
if vrg.Spec.ReplicationState == rmn.Secondary && len(vrgs) == 2 {
log.Info("Both VRGs are in secondary state")

return Stop, nil
}

if drpc.Spec.Action == rmn.DRAction(vrg.Spec.Action) {
if drpc.Spec.Action == rmn.DRAction(vrg.Spec.Action) && dstCluster == clusterName {
log.Info(fmt.Sprintf("Same Action %s", drpc.Spec.Action))

return Continue, nil
Expand Down
24 changes: 1 addition & 23 deletions controllers/drplacementcontrol_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func (f FakeMCVGetter) GetVRGFromManagedCluster(resourceName, resourceNamespace,

return vrg, nil

case "checkClusterAccessibility":
case "checkAccessToVRGOnCluster":
return checkResource(managedCluster)

case "ensureVRGIsSecondaryOnCluster":
Expand Down Expand Up @@ -1530,28 +1530,6 @@ func runFailoverAction(placementObj client.Object, fromCluster, toCluster string
Expect(decision.ClusterName).To(Equal(toCluster))
}

//nolint:all
func clearDRActionAfterFailover(userPlacementRule *plrv1.PlacementRule, namespace, preferredCluster, failoverCluster string) {
drstate = "none"

setDRPCSpecExpectationTo(namespace, preferredCluster, failoverCluster, "")
waitForCompletion(string(rmn.FailedOver))
// waitForUpdateDRPCStatus(namespace)

drpc := getLatestDRPC(namespace)
// At this point expect the DRPC status condition to have 2 types
// {Available and PeerReady}
// Final state didn't change and it is 'FailedOver' even though we tried to run
// initial deployment
Expect(drpc.Status.Phase).To(Equal(rmn.FailedOver))
Expect(len(drpc.Status.Conditions)).To(Equal(2))
_, condition := getDRPCCondition(&drpc.Status, rmn.ConditionAvailable)
Expect(condition.Reason).To(Equal(string(rmn.FailedOver)))

decision := getLatestUserPlacementDecision(userPlacementRule.Name, userPlacementRule.Namespace)
Expect(decision.ClusterName).To(Equal(failoverCluster))
}

func runRelocateAction(placementObj client.Object, fromCluster string, isSyncDR bool, manualUnfence bool) {
toCluster1 := "east1-cluster"

Expand Down

0 comments on commit accaed3

Please sign in to comment.