Skip to content

Commit

Permalink
Merge branch 'main' into uncomment_linters
Browse files Browse the repository at this point in the history
  • Loading branch information
ELENAGER committed Dec 13, 2023
2 parents bc6dd0c + b8bedb9 commit 13901aa
Show file tree
Hide file tree
Showing 39 changed files with 663 additions and 238 deletions.
10 changes: 9 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ env:
IMAGE_REGISTRY: "quay.io"
IMAGE_TAG: "ci"
DOCKERCMD: "podman"
DRIVER: "container"
defaults:
run:
shell: bash
Expand Down Expand Up @@ -116,7 +117,6 @@ jobs:
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube
minikube version
mkdir "$HOME/.minikube/profiles"
- name: Install kubectl
run: |
Expand Down Expand Up @@ -145,6 +145,10 @@ jobs:
run: make black
working-directory: test

- name: Start test cluster
run: make cluster
working-directory: test

- name: Run tests
run: make test
working-directory: test
Expand All @@ -153,6 +157,10 @@ jobs:
run: make coverage
working-directory: test

- name: Clean up
run: make clean
working-directory: test

ramenctl-test:
name: ramenctl tests
runs-on: ubuntu-22.04
Expand Down
9 changes: 7 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
*.dll
*.so
*.dylib
bin
testbin/*
/bin
/testbin/*

# Test binary, build with `go test -c`
*.test
Expand Down Expand Up @@ -37,9 +37,14 @@ venv
# Test enviromemnt generated files
test/.coverage
test/.coverage.*
test/addons/kubevirt/vm/id_rsa.pub

# Python generated files
*.egg-info/
__pycache__/
test/build
ramenctl/build

# Generated disk images
*.qcow2
*.iso
3 changes: 3 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ issues:
linters:
- revive
text: "should not use dot imports"
- source: "^func Test"
linters:
- funlen


linters:
Expand Down
7 changes: 6 additions & 1 deletion controllers/drcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (r *DRClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

u.initializeStatus()

if !u.object.ObjectMeta.DeletionTimestamp.IsZero() {
if drClusterIsDeleted(drcluster) {
return processDeletion(u)
}

Expand Down Expand Up @@ -360,6 +360,11 @@ func (r DRClusterReconciler) processCreateOrUpdate(u *drclusterInstance) (ctrl.R
return ctrl.Result{Requeue: requeue || u.requeue}, reconcileError
}

// Return true if dr cluster was marked for deletion.
func drClusterIsDeleted(c *ramen.DRCluster) bool {
return !c.GetDeletionTimestamp().IsZero()
}

func (u *drclusterInstance) initializeStatus() {
// Save a copy of the instance status to be used for the DRCluster status update comparison
u.object.Status.DeepCopyInto(&u.savedInstanceStatus)
Expand Down
7 changes: 6 additions & 1 deletion controllers/drcluster_mmode.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func (u *drclusterInstance) mModeActivationsRequired() (map[string]ramen.Storage

// getVRGs is a helper function to get the VRGs for the passed in DRPC and DRPolicy association
func (u *drclusterInstance) getVRGs(drpcCollection DRPCAndPolicy) (map[string]*ramen.VolumeReplicationGroup, error) {
drClusters, err := getDRClusters(u.ctx, u.client, drpcCollection.drPolicy)
if err != nil {
return nil, err
}

placementObj, err := getPlacementOrPlacementRule(u.ctx, u.client, drpcCollection.drpc, u.log)
if err != nil {
return nil, err
Expand All @@ -113,7 +118,7 @@ func (u *drclusterInstance) getVRGs(drpcCollection DRPCAndPolicy) (map[string]*r
vrgs, failedToQueryCluster, err := getVRGsFromManagedClusters(
u.reconciler.MCVGetter,
drpcCollection.drpc,
drpcCollection.drPolicy,
drClusters,
vrgNamespace,
u.log)
if err != nil {
Expand Down
54 changes: 33 additions & 21 deletions controllers/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/yaml"

rmn "github.com/ramendr/ramen/api/v1alpha1"
Expand Down Expand Up @@ -533,20 +534,23 @@ func (d *DRPCInstance) checkMetroFailoverPrerequisites(curHomeCluster string) (b
func (d *DRPCInstance) checkRegionalFailoverPrerequisites() bool {
d.setProgression(rmn.ProgressionWaitForStorageMaintenanceActivation)

if required, activationsRequired := requiresRegionalFailoverPrerequisites(
d.ctx,
d.reconciler.APIReader,
rmnutil.DRPolicyS3Profiles(d.drPolicy, d.drClusters).List(),
d.instance.GetName(), d.instance.GetNamespace(),
d.vrgs, d.instance.Spec.FailoverCluster,
d.reconciler.ObjStoreGetter, d.log); required {
for _, drCluster := range d.drClusters {
if drCluster.Name != d.instance.Spec.FailoverCluster {
continue
}
for _, drCluster := range d.drClusters {
if drCluster.Name != d.instance.Spec.FailoverCluster {
continue
}

// we want to work with failover cluster only, because the previous primary cluster might be unreachable
if required, activationsRequired := requiresRegionalFailoverPrerequisites(
d.ctx,
d.reconciler.APIReader,
[]string{drCluster.Spec.S3ProfileName},
d.instance.GetName(), d.instance.GetNamespace(),
d.vrgs, d.instance.Spec.FailoverCluster,
d.reconciler.ObjStoreGetter, d.log); required {
return checkFailoverMaintenanceActivations(drCluster, activationsRequired, d.log)
}

break
}

return true
Expand Down Expand Up @@ -1448,16 +1452,9 @@ func (d *DRPCInstance) createVRGManifestWork(homeCluster string, repState rmn.Re
return nil
}

// ensureVRGManifestWork ensures that the VRG ManifestWork exists and matches the current VRG state.
// TODO: This may be safe only when the VRG is primary - check if callers use this correctly.
func (d *DRPCInstance) ensureVRGManifestWork(homeCluster string) error {
mw, mwErr := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, homeCluster)
if mwErr != nil {
d.log.Info("Ensure VRG ManifestWork", "Error", mwErr)
}

if mw != nil {
return nil
}

d.log.Info("Ensure VRG ManifestWork",
"Last State:", d.getLastDRState(), "cluster", homeCluster)

Expand Down Expand Up @@ -1496,7 +1493,7 @@ func (d *DRPCInstance) generateVRG(repState rmn.ReplicationState) rmn.VolumeRepl
Spec: rmn.VolumeReplicationGroupSpec{
PVCSelector: d.instance.Spec.PVCSelector,
ReplicationState: repState,
S3Profiles: rmnutil.DRPolicyS3Profiles(d.drPolicy, d.drClusters).List(),
S3Profiles: d.availableS3Profiles(),
KubeObjectProtection: d.instance.Spec.KubeObjectProtection,
},
}
Expand All @@ -1508,6 +1505,21 @@ func (d *DRPCInstance) generateVRG(repState rmn.ReplicationState) rmn.VolumeRepl
return vrg
}

func (d *DRPCInstance) availableS3Profiles() []string {
profiles := sets.New[string]()

for i := range d.drClusters {
drCluster := &d.drClusters[i]
if drClusterIsDeleted(drCluster) {
continue
}

profiles.Insert(drCluster.Spec.S3ProfileName)
}

return sets.List(profiles)
}

func (d *DRPCInstance) generateVRGSpecAsync() *rmn.VRGAsyncSpec {
if dRPolicySupportsRegional(d.drPolicy, d.drClusters) {
return &rmn.VRGAsyncSpec{
Expand Down
39 changes: 26 additions & 13 deletions controllers/drplacementcontrol_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func (r *DRPlacementControlReconciler) createDRPCInstance(
return nil, err
}

vrgs, err := updateVRGsFromManagedClusters(r.MCVGetter, drpc, drPolicy, vrgNamespace, log)
vrgs, err := updateVRGsFromManagedClusters(r.MCVGetter, drpc, drClusters, vrgNamespace, log)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1065,8 +1065,13 @@ func (r *DRPlacementControlReconciler) finalizeDRPC(ctx context.Context, drpc *r
}
}

drClusters, err := getDRClusters(ctx, r.Client, drPolicy)
if err != nil {
return fmt.Errorf("failed to get drclusters. Error (%w)", err)
}

// Verify VRGs have been deleted
vrgs, _, err := getVRGsFromManagedClusters(r.MCVGetter, drpc, drPolicy, vrgNamespace, log)
vrgs, _, err := getVRGsFromManagedClusters(r.MCVGetter, drpc, drClusters, vrgNamespace, log)
if err != nil {
return fmt.Errorf("failed to retrieve VRGs. We'll retry later. Error (%w)", err)
}
Expand Down Expand Up @@ -1422,9 +1427,9 @@ func (r *DRPlacementControlReconciler) clonePlacementRule(ctx context.Context,
}

func updateVRGsFromManagedClusters(mcvGetter rmnutil.ManagedClusterViewGetter, drpc *rmn.DRPlacementControl,
drPolicy *rmn.DRPolicy, vrgNamespace string, log logr.Logger,
drClusters []rmn.DRCluster, vrgNamespace string, log logr.Logger,
) (map[string]*rmn.VolumeReplicationGroup, error) {
vrgs, failedClusterToQuery, err := getVRGsFromManagedClusters(mcvGetter, drpc, drPolicy, vrgNamespace, log)
vrgs, failedClusterToQuery, err := getVRGsFromManagedClusters(mcvGetter, drpc, drClusters, vrgNamespace, log)
if err != nil {
return nil, err
}
Expand All @@ -1443,7 +1448,7 @@ func updateVRGsFromManagedClusters(mcvGetter rmnutil.ManagedClusterViewGetter, d
}

func getVRGsFromManagedClusters(mcvGetter rmnutil.ManagedClusterViewGetter, drpc *rmn.DRPlacementControl,
drPolicy *rmn.DRPolicy, vrgNamespace string, log logr.Logger,
drClusters []rmn.DRCluster, vrgNamespace string, log logr.Logger,
) (map[string]*rmn.VolumeReplicationGroup, string, error) {
vrgs := map[string]*rmn.VolumeReplicationGroup{}

Expand All @@ -1456,33 +1461,41 @@ func getVRGsFromManagedClusters(mcvGetter rmnutil.ManagedClusterViewGetter, drpc

var clustersQueriedSuccessfully int

for _, drCluster := range rmnutil.DrpolicyClusterNames(drPolicy) {
vrg, err := mcvGetter.GetVRGFromManagedCluster(drpc.Name, vrgNamespace, drCluster, annotations)
for i := range drClusters {
drCluster := &drClusters[i]

vrg, err := mcvGetter.GetVRGFromManagedCluster(drpc.Name, vrgNamespace, drCluster.Name, annotations)
if err != nil {
// Only NotFound error is accepted
if errors.IsNotFound(err) {
log.Info(fmt.Sprintf("VRG not found on %q", drCluster))
log.Info(fmt.Sprintf("VRG not found on %q", drCluster.Name))
clustersQueriedSuccessfully++

continue
}

failedClusterToQuery = drCluster
failedClusterToQuery = drCluster.Name

log.Info(fmt.Sprintf("failed to retrieve VRG from %s. err (%v)", drCluster, err))
log.Info(fmt.Sprintf("failed to retrieve VRG from %s. err (%v)", drCluster.Name, err))

continue
}

clustersQueriedSuccessfully++

vrgs[drCluster] = vrg
if drClusterIsDeleted(drCluster) {
log.Info("Skipping VRG on deleted drcluster", "drcluster", drCluster.Name, "vrg", vrg.Name)

continue
}

vrgs[drCluster.Name] = vrg

log.Info("VRG location", "VRG on", drCluster)
log.Info("VRG location", "VRG on", drCluster.Name)
}

// We are done if we successfully queried all drClusters
if clustersQueriedSuccessfully == len(rmnutil.DrpolicyClusterNames(drPolicy)) {
if clustersQueriedSuccessfully == len(drClusters) {
return vrgs, "", nil
}

Expand Down
32 changes: 7 additions & 25 deletions controllers/util/mw_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,50 +481,32 @@ func (mwu *MWUtil) createOrUpdateManifestWork(
mw *ocmworkv1.ManifestWork,
managedClusternamespace string,
) error {
key := types.NamespacedName{Name: mw.Name, Namespace: managedClusternamespace}
foundMW := &ocmworkv1.ManifestWork{}

err := mwu.Client.Get(mwu.Ctx,
types.NamespacedName{Name: mw.Name, Namespace: managedClusternamespace},
foundMW)
err := mwu.Client.Get(mwu.Ctx, key, foundMW)
if err != nil {
if !errors.IsNotFound(err) {
return errorswrapper.Wrap(err, fmt.Sprintf("failed to fetch ManifestWork %s", mw.Name))
return errorswrapper.Wrap(err, fmt.Sprintf("failed to fetch ManifestWork %s", key))
}

// Let DRPC receive notification for any changes to ManifestWork CR created by it.
// if err := ctrl.SetControllerReference(d.instance, mw, d.reconciler.Scheme); err != nil {
// return fmt.Errorf("failed to set owner reference to ManifestWork resource (%s/%s) (%v)",
// mw.Name, mw.Namespace, err)
// }

mwu.Log.Info("Creating ManifestWork", "cluster", managedClusternamespace, "MW", mw)

return mwu.Client.Create(mwu.Ctx, mw)
}

if !reflect.DeepEqual(foundMW.Spec, mw.Spec) {
mwu.Log.Info("ManifestWork exists.", "name", mw.Name, "namespace", foundMW.Namespace)

retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var err error
mwu.Log.Info("Updating ManifestWork", "name", mw.Name, "namespace", foundMW.Namespace)

err = mwu.Client.Get(mwu.Ctx,
types.NamespacedName{Name: mw.Name, Namespace: managedClusternamespace},
foundMW)
if err != nil {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := mwu.Client.Get(mwu.Ctx, key, foundMW); err != nil {
return err
}

mw.Spec.DeepCopyInto(&foundMW.Spec)

err = mwu.Client.Update(mwu.Ctx, foundMW)

return err
return mwu.Client.Update(mwu.Ctx, foundMW)
})

if retryErr != nil {
return retryErr
}
}

return nil
Expand Down
Loading

0 comments on commit 13901aa

Please sign in to comment.