Skip to content

Commit

Permalink
Enhancing Hub Recovery: Reworking DRPC State Rebuilding Algorithm
Browse files Browse the repository at this point in the history
This commit tackles hub recovery issues by reworking the algorithm responsible for
rebuilding the DRPC state. The changes align with the following expectations:

1. Stop Condition for Both Failed Queries:
   If attempts to query 2 clusters result in failure for both, the process is halted.

2. Initial Deployment without VRGs:
   If 2 clusters are successfully queried, and no VRGs are found, proceed with the
   initial deployment.

3. Handling Failures with S3 Store Check:
   - If 2 clusters are queried, 1 fails, and 0 VRGs are found, perform the following checks:
      - If the VRG is found in the S3 store, ensure that the DRPC action matches the VRG action.
      If not, stop until the action is corrected, allowing failover if necessary (set PeerReady).
      - If the VRG is not found in the S3 store and the failed cluster is not the destination
      cluster, continue with the initial deployment.

4. Verification and Failover for VRGs on Failover Cluster:
   If 2 clusters are queried, 1 fails, and 1 VRG is found on the failover cluster, check
   the action:
      - If the actions don't match, stop until corrected by the user.
      - If they match, also stop but allow failover if the VRG in-hand is a secondary.
      Otherwise, continue.

5. Handling VRGs on Destination Cluster:
   If 2 clusters are queried successfully and 1 or more VRGs are found, and one of the
   VRGs is on the destination cluster, perform the following checks:
      - Continue with the action only if the DRPC and the found VRG action match.
      - Stop until someone investigates if there is a mismatch, but allow failover to
      take place (set PeerReady).

6. Otherwise, default to allowing Failover:
   If none of the above conditions apply, allow failover (set PeerReady) but stop until
   someone makes the necessary change.

Signed-off-by: Benamar Mekhissi <bmekhiss@ibm.com>
  • Loading branch information
Benamar Mekhissi committed Dec 19, 2023
1 parent b8bedb9 commit d8edb50
Show file tree
Hide file tree
Showing 6 changed files with 414 additions and 338 deletions.
2 changes: 2 additions & 0 deletions api/v1alpha1/drplacementcontrol_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const (
ReasonCleaning = "Cleaning"
ReasonSuccess = "Success"
ReasonNotStarted = "NotStarted"
ReasonPaused = "Paused"
)

type ProgressionStatus string
Expand All @@ -93,6 +94,7 @@ const (
ProgressionEnsuringVolSyncSetup = ProgressionStatus("EnsuringVolSyncSetup")
ProgressionSettingupVolsyncDest = ProgressionStatus("SettingUpVolSyncDest")
ProgressionDeleting = ProgressionStatus("Deleting")
ProgressionActionPaused = ProgressionStatus("Paused")
)

// DRPlacementControlSpec defines the desired state of DRPlacementControl
Expand Down
2 changes: 1 addition & 1 deletion controllers/drcluster_mmode.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (u *drclusterInstance) getVRGs(drpcCollection DRPCAndPolicy) (map[string]*r
return nil, err
}

vrgs, failedToQueryCluster, err := getVRGsFromManagedClusters(
vrgs, _, failedToQueryCluster, err := getVRGsFromManagedClusters(
u.reconciler.MCVGetter,
drpcCollection.drpc,
drClusters,
Expand Down
213 changes: 56 additions & 157 deletions controllers/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,6 @@ func (d *DRPCInstance) startDeploying(homeCluster, homeClusterNamespace string)
func (d *DRPCInstance) RunFailover() (bool, error) {
d.log.Info("Entering RunFailover", "state", d.getLastDRState())

if d.isPlacementNeedsFixing() {
err := d.fixupPlacementForFailover()
if err != nil {
d.log.Info("Couldn't fix up placement for Failover")
}
}

const done = true

// We are done if empty
Expand Down Expand Up @@ -662,7 +655,7 @@ func GetLastKnownVRGPrimaryFromS3(

vrg := &rmn.VolumeReplicationGroup{}
if err := vrgObjectDownload(objectStorer, sourcePathNamePrefix, vrg); err != nil {
log.Error(err, "Kube objects capture-to-recover-from identifier get error")
log.Info(fmt.Sprintf("Failed to get VRG from s3 store - s3ProfileName %s. Err %v", s3ProfileName, err))

continue
}
Expand Down Expand Up @@ -773,13 +766,6 @@ func checkActivationForStorageIdentifier(
func (d *DRPCInstance) RunRelocate() (bool, error) {
d.log.Info("Entering RunRelocate", "state", d.getLastDRState(), "progression", d.getProgression())

if d.isPlacementNeedsFixing() {
err := d.fixupPlacementForRelocate()
if err != nil {
d.log.Info("Couldn't fix up placement for Relocate")
}
}

const done = true

preferredCluster := d.instance.Spec.PreferredCluster
Expand Down Expand Up @@ -816,7 +802,7 @@ func (d *DRPCInstance) RunRelocate() (bool, error) {
}

if d.getLastDRState() != rmn.Relocating && !d.validatePeerReady() {
return !done, fmt.Errorf("clean up on secondaries pending (%+v)", d.instance)
return !done, fmt.Errorf("clean up secondaries is pending (%+v)", d.instance.Status.Conditions)
}

if curHomeCluster != "" && curHomeCluster != preferredCluster {
Expand All @@ -841,6 +827,11 @@ func (d *DRPCInstance) ensureActionCompleted(srcCluster string) (bool, error) {
return !done, err
}

err = d.ensurePlacement(srcCluster)
if err != nil {
return !done, err
}

d.setProgression(rmn.ProgressionCleaningUp)

// Cleanup and setup VolSync if enabled
Expand Down Expand Up @@ -1009,12 +1000,12 @@ func (d *DRPCInstance) areMultipleVRGsPrimary() bool {

func (d *DRPCInstance) validatePeerReady() bool {
condition := findCondition(d.instance.Status.Conditions, rmn.ConditionPeerReady)
d.log.Info(fmt.Sprintf("validatePeerReady -- Condition %v", condition))

if condition == nil || condition.Status == metav1.ConditionTrue {
return true
}

d.log.Info("validatePeerReady", "Condition", condition)

return false
}

Expand Down Expand Up @@ -1281,15 +1272,13 @@ func (d *DRPCInstance) vrgExistsAndPrimary(targetCluster string) bool {
return false
}

clusterDecision := d.reconciler.getClusterDecision(d.userPlacement)
if clusterDecision.ClusterName != "" &&
targetCluster == clusterDecision.ClusterName {
d.log.Info(fmt.Sprintf("Already %q to cluster %s", d.getLastDRState(), targetCluster))

return true
if !vrg.GetDeletionTimestamp().IsZero() {
return false
}

return false
d.log.Info(fmt.Sprintf("Already %q to cluster %s", d.getLastDRState(), targetCluster))

return true
}

func (d *DRPCInstance) mwExistsAndPlacementUpdated(targetCluster string) (bool, error) {
Expand Down Expand Up @@ -1435,7 +1424,7 @@ func (d *DRPCInstance) createVRGManifestWork(homeCluster string, repState rmn.Re
d.log.Info("Creating VRG ManifestWork",
"Last State:", d.getLastDRState(), "cluster", homeCluster)

vrg := d.generateVRG(repState)
vrg := d.generateVRG(homeCluster, repState)
vrg.Spec.VolSync.Disabled = d.volSyncDisabled

annotations := make(map[string]string)
Expand All @@ -1457,15 +1446,40 @@ func (d *DRPCInstance) createVRGManifestWork(homeCluster string, repState rmn.Re
// ensureVRGManifestWork ensures that the VRG ManifestWork exists and matches the current VRG state.
// TODO: This may be safe only when the VRG is primary - check if callers use this correctly.
func (d *DRPCInstance) ensureVRGManifestWork(homeCluster string) error {
mw, mwErr := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, homeCluster)
if mwErr != nil {
d.log.Info("Ensure VRG ManifestWork", "Error", mwErr)
}

vrg := d.vrgs[homeCluster]
if vrg == nil {
return fmt.Errorf("failed to get vrg from cluster %s", homeCluster)
}

if mw != nil {
// For upgrade only: Ensure the VRG annotation contains the dst cluster.
dstCluster := vrg.GetAnnotations()[DestinationClusterAnnotationKey]
if dstCluster != "" {
return nil
}
}

d.log.Info("Ensure VRG ManifestWork",
"Last State:", d.getLastDRState(), "cluster", homeCluster)

cachedVrg := d.vrgs[homeCluster]
if cachedVrg == nil {
return fmt.Errorf("failed to get vrg from cluster %s", homeCluster)
return d.createVRGManifestWork(homeCluster, vrg.Spec.ReplicationState)
}

func (d *DRPCInstance) ensurePlacement(homeCluster string) error {
clusterDecision := d.reconciler.getClusterDecision(d.userPlacement)
if clusterDecision.ClusterName == "" ||
homeCluster != clusterDecision.ClusterName {
d.updatePreferredDecision()

return d.updateUserPlacementRule(homeCluster, homeCluster)
}

return d.createVRGManifestWork(homeCluster, cachedVrg.Spec.ReplicationState)
return nil
}

func vrgAction(drpcAction rmn.DRAction) rmn.VRGAction {
Expand All @@ -1488,10 +1502,16 @@ func (d *DRPCInstance) setVRGAction(vrg *rmn.VolumeReplicationGroup) {
vrg.Spec.Action = action
}

func (d *DRPCInstance) generateVRG(repState rmn.ReplicationState) rmn.VolumeReplicationGroup {
func (d *DRPCInstance) generateVRG(dstCluster string, repState rmn.ReplicationState) rmn.VolumeReplicationGroup {
vrg := rmn.VolumeReplicationGroup{
TypeMeta: metav1.TypeMeta{Kind: "VolumeReplicationGroup", APIVersion: "ramendr.openshift.io/v1alpha1"},
ObjectMeta: metav1.ObjectMeta{Name: d.instance.Name, Namespace: d.vrgNamespace},
TypeMeta: metav1.TypeMeta{Kind: "VolumeReplicationGroup", APIVersion: "ramendr.openshift.io/v1alpha1"},
ObjectMeta: metav1.ObjectMeta{
Name: d.instance.Name,
Namespace: d.vrgNamespace,
Annotations: map[string]string{
DestinationClusterAnnotationKey: dstCluster,
},
},
Spec: rmn.VolumeReplicationGroupSpec{
PVCSelector: d.instance.Spec.PVCSelector,
ReplicationState: repState,
Expand Down Expand Up @@ -1651,10 +1671,9 @@ func (d *DRPCInstance) ensureClusterDataRestored(homeCluster string) (*rmn.Volum
annotations[DRPCNameAnnotation] = d.instance.Name
annotations[DRPCNamespaceAnnotation] = d.instance.Namespace

vrg, err := d.reconciler.MCVGetter.GetVRGFromManagedCluster(d.instance.Name,
d.vrgNamespace, homeCluster, annotations)
if err != nil {
return nil, false, fmt.Errorf("failed to get VRG %s from cluster %s (err: %w)", d.instance.Name, homeCluster, err)
vrg := d.vrgs[homeCluster]
if vrg == nil {
return nil, false, fmt.Errorf("failed to get VRG %s from cluster %s", d.instance.Name, homeCluster)
}

// ClusterDataReady condition tells us whether the PVs have been applied on the
Expand Down Expand Up @@ -2315,126 +2334,6 @@ func (d *DRPCInstance) setConditionOnInitialDeploymentCompletion() {
metav1.ConditionTrue, rmn.ReasonSuccess, "Ready")
}

func (d *DRPCInstance) isPlacementNeedsFixing() bool {
// Needs fixing if and only if the DRPC Status is empty, the Placement decision is empty, and
// the we have VRG(s) in the managed clusters
clusterDecision := d.reconciler.getClusterDecision(d.userPlacement)
d.log.Info(fmt.Sprintf("Check placement if needs fixing: PrD %v, PlD %v, VRGs %d",
d.instance.Status.PreferredDecision, clusterDecision, len(d.vrgs)))

if reflect.DeepEqual(d.instance.Status.PreferredDecision, plrv1.PlacementDecision{}) &&
(clusterDecision == nil || clusterDecision.ClusterName == "") && len(d.vrgs) > 0 {
return true
}

return false
}

func (d *DRPCInstance) selectCurrentPrimaries() []string {
var primaries []string

for clusterName, vrg := range d.vrgs {
if isVRGPrimary(vrg) {
primaries = append(primaries, clusterName)
}
}

return primaries
}

func (d *DRPCInstance) selectPrimaryForFailover(primaries []string) string {
for _, clusterName := range primaries {
if clusterName == d.instance.Spec.FailoverCluster {
return clusterName
}
}

return ""
}

func (d *DRPCInstance) fixupPlacementForFailover() error {
d.log.Info("Fixing PlacementRule for failover...")

var primary string

var primaries []string

var secondaries []string

if d.areMultipleVRGsPrimary() {
primaries := d.selectCurrentPrimaries()
primary = d.selectPrimaryForFailover(primaries)
} else {
primary, secondaries = d.selectCurrentPrimaryAndSecondaries()
}

// IFF we have a primary cluster, and it points to the failoverCluster, then rebuild the
// drpc status with it.
if primary != "" && primary == d.instance.Spec.FailoverCluster {
err := d.updateUserPlacementRule(primary, "")
if err != nil {
return err
}

// Update our 'well known' preferred placement
d.updatePreferredDecision()

peerReadyConditionStatus := metav1.ConditionTrue
peerReadyMsg := "Ready"
// IFF more than one primary then the failover hasn't entered the cleanup phase.
// IFF we have a secondary, then the failover hasn't completed the cleanup phase.
// We need to start where it was left off (best guess).
if len(primaries) > 1 || len(secondaries) > 0 {
d.instance.Status.Phase = rmn.FailingOver
peerReadyConditionStatus = metav1.ConditionFalse
peerReadyMsg = "NotReady"
}

addOrUpdateCondition(&d.instance.Status.Conditions, rmn.ConditionPeerReady, d.instance.Generation,
peerReadyConditionStatus, rmn.ReasonSuccess, peerReadyMsg)

addOrUpdateCondition(&d.instance.Status.Conditions, rmn.ConditionAvailable, d.instance.Generation,
metav1.ConditionTrue, string(d.instance.Status.Phase), "Available")

return nil
}

return fmt.Errorf("detected a failover, but it can't rebuild the state")
}

func (d *DRPCInstance) fixupPlacementForRelocate() error {
if d.areMultipleVRGsPrimary() {
return fmt.Errorf("unconstructable state. Can't have multiple primaries on 'Relocate'")
}

primary, secondaries := d.selectCurrentPrimaryAndSecondaries()
d.log.Info(fmt.Sprintf("Fixing PlacementRule for relocation. Primary (%s), Secondaries (%v)",
primary, secondaries))

// IFF we have a primary cluster, then update the PlacementRule and reset PeerReady to false
// Setting the PeerReady condition status to false allows peer cleanup if necessary
if primary != "" {
err := d.updateUserPlacementRule(primary, "")
if err != nil {
return err
}

// Assume that it is not clean
addOrUpdateCondition(&d.instance.Status.Conditions, rmn.ConditionPeerReady, d.instance.Generation,
metav1.ConditionFalse, rmn.ReasonNotStarted, "NotReady")
} else if len(secondaries) > 1 {
// Use case 3: After Hub Recovery, the DRPC Action found to be 'Relocate', and ALL VRGs are secondary
addOrUpdateCondition(&d.instance.Status.Conditions, rmn.ConditionPeerReady, d.instance.Generation,
metav1.ConditionTrue, rmn.ReasonSuccess,
fmt.Sprintf("Fixed for relocation to %q", d.instance.Spec.PreferredCluster))
}

// Update our 'well known' preferred placement
d.updatePreferredDecision()

return nil
}

func (d *DRPCInstance) setStatusInitiating() {
if !(d.instance.Status.Phase == "" ||
d.instance.Status.Phase == rmn.Deployed ||
Expand Down
Loading

0 comments on commit d8edb50

Please sign in to comment.