From 70483ded9078a6ea1c9720520784096890bf8c89 Mon Sep 17 00:00:00 2001 From: lou Date: Tue, 7 Nov 2023 19:12:30 +0800 Subject: [PATCH 1/3] improve discoveryHelper.Refresh() in restore Signed-off-by: lou --- pkg/restore/restore.go | 129 ++++++++++++++++++++++------------------- 1 file changed, 69 insertions(+), 60 deletions(-) diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index 8f623800ce..0241014f20 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -502,7 +502,7 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { }() // totalItems: previously discovered items, i: iteration counter. - totalItems, processedItems, existingNamespaces := 0, 0, sets.NewString() + totalItems, processedItems, createdItems, existingNamespaces := 0, 0, 0, sets.NewString() // First restore CRDs. This is needed so that they are available in the cluster // when getOrderedResourceCollection is called again on the whole backup and @@ -525,16 +525,26 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { var w, e results.Result // Restore this resource, the update channel is set to nil, to avoid misleading value of "totalItems" // more details see #5990 - processedItems, w, e = ctx.processSelectedResource( + processedItems, createdItems, w, e = ctx.processSelectedResource( selectedResource, totalItems, processedItems, + createdItems, existingNamespaces, nil, ) warnings.Merge(&w) errs.Merge(&e) } + // If we just restored custom resource definitions (CRDs), refresh + // discovery because the restored CRDs may have created new APIs that + // didn't previously exist in the cluster, and we want to be able to + // resolve & restore instances of them in subsequent loop iterations. + if createdItems > 0 { + if err := ctx.discoveryHelper.Refresh(); err != nil { + warnings.Add("", errors.Wrap(err, "refresh discovery after restoring CRDs")) + } + } // Restore everything else selectedResourceCollection, _, w, e := ctx.getOrderedResourceCollection( @@ -576,6 +586,7 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { // reset processedItems and totalItems before processing full resource list processedItems = 0 totalItems = 0 + createdItems = 0 for _, selectedResource := range selectedResourceCollection { totalItems += selectedResource.totalItems } @@ -583,10 +594,11 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { for _, selectedResource := range selectedResourceCollection { var w, e results.Result // Restore this resource - processedItems, w, e = ctx.processSelectedResource( + processedItems, createdItems, w, e = ctx.processSelectedResource( selectedResource, totalItems, processedItems, + createdItems, existingNamespaces, update, ) @@ -670,9 +682,10 @@ func (ctx *restoreContext) processSelectedResource( selectedResource restoreableResource, totalItems int, processedItems int, + createdItems int, existingNamespaces sets.String, update chan progressUpdate, -) (int, results.Result, results.Result) { +) (int, int, results.Result, results.Result) { warnings, errs := results.Result{}, results.Result{} groupResource := schema.ParseGroupResource(selectedResource.resource) @@ -728,11 +741,15 @@ func (ctx *restoreContext) processSelectedResource( continue } - w, e, _ := ctx.restoreItem(obj, groupResource, selectedItem.targetNamespace) + w, e, _, created := ctx.restoreItem(obj, groupResource, selectedItem.targetNamespace) warnings.Merge(&w) errs.Merge(&e) processedItems++ + if created { + createdItems++ + } + // totalItems keeps the count of items previously known. There // may be additional items restored by plugins. We want to include // the additional items by looking at restoredItems at the same @@ -754,16 +771,7 @@ func (ctx *restoreContext) processSelectedResource( } } - // If we just restored custom resource definitions (CRDs), refresh - // discovery because the restored CRDs may have created new APIs that - // didn't previously exist in the cluster, and we want to be able to - // resolve & restore instances of them in subsequent loop iterations. - if groupResource == kuberesource.CustomResourceDefinitions { - if err := ctx.discoveryHelper.Refresh(); err != nil { - warnings.Add("", errors.Wrap(err, "refresh discovery after restoring CRDs")) - } - } - return processedItems, warnings, errs + return processedItems, createdItems, warnings, errs } // getNamespace returns a namespace API object that we should attempt to @@ -1083,10 +1091,9 @@ func (ctx *restoreContext) getResource(groupResource schema.GroupResource, obj * return u, nil } -func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupResource schema.GroupResource, namespace string) (results.Result, results.Result, bool) { - warnings, errs := results.Result{}, results.Result{} - // itemExists bool is used to determine whether to include this item in the "wait for additional items" list - itemExists := false +// itemExists bool is used to determine whether to include this item in the "wait for additional items" list +// itemCreated indicates whether the item was created by this restore +func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupResource schema.GroupResource, namespace string) (warnings, errs results.Result, itemExists, itemCreated bool) { resourceID := getResourceID(groupResource, namespace, obj.GetName()) // Check if group/resource should be restored. We need to do this here since @@ -1098,7 +1105,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso "name": obj.GetName(), "groupResource": groupResource.String(), }).Info("Not restoring item because resource is excluded") - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } // Check if namespace/cluster-scoped resource should be restored. We need @@ -1114,7 +1121,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso "name": obj.GetName(), "groupResource": groupResource.String(), }).Info("Not restoring item because namespace is excluded") - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } // If the namespace scoped resource should be restored, ensure that the @@ -1124,7 +1131,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso _, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(nsToEnsure, ctx.namespaceClient, ctx.resourceTerminatingTimeout) if err != nil { errs.AddVeleroError(err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } // Add the newly created namespace to the list of restored items. if nsCreated { @@ -1142,7 +1149,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso "name": obj.GetName(), "groupResource": groupResource.String(), }).Info("Not restoring item because it's cluster-scoped") - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } } @@ -1153,11 +1160,11 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso complete, err := isCompleted(obj, groupResource) if err != nil { errs.Add(namespace, fmt.Errorf("error checking completion of %q: %v", resourceID, err)) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } if complete { ctx.log.Infof("%s is complete - skipping", kube.NamespaceAndName(obj)) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } name := obj.GetName() @@ -1171,7 +1178,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso if prevRestoredItemStatus, exists := ctx.restoredItems[itemKey]; exists { ctx.log.Infof("Skipping %s because it's already been restored.", resourceID) itemExists = prevRestoredItemStatus.itemExists - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } ctx.restoredItems[itemKey] = restoredItemStatus{itemExists: itemExists} defer func() { @@ -1195,13 +1202,13 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso // to the interface. if groupResource == kuberesource.Pods && obj.GetAnnotations()[v1.MirrorPodAnnotationKey] != "" { ctx.log.Infof("Not restoring pod because it's a mirror pod") - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } resourceClient, err := ctx.getResourceClient(groupResource, obj, namespace) if err != nil { errs.AddVeleroError(fmt.Errorf("error getting resource client for namespace %q, resource %q: %v", namespace, &groupResource, err)) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } if groupResource == kuberesource.PersistentVolumes { @@ -1211,7 +1218,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso shouldRenamePV, err := shouldRenamePV(ctx, obj, resourceClient) if err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } // Check to see if the claimRef.namespace field needs to be remapped, @@ -1219,7 +1226,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso _, err = remapClaimRefNS(ctx, obj) if err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } var shouldRestoreSnapshot bool @@ -1229,7 +1236,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso shouldRestoreSnapshot, err = ctx.shouldRestore(name, resourceClient) if err != nil { errs.Add(namespace, errors.Wrapf(err, "error waiting on in-cluster persistentvolume %s", name)) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } } else { // If we're renaming the PV, we're going to give it a new random name, @@ -1249,7 +1256,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso updatedObj, err := ctx.pvRestorer.executePVAction(obj) if err != nil { errs.Add(namespace, fmt.Errorf("error executing PVAction for %s: %v", resourceID, err)) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } obj = updatedObj @@ -1266,7 +1273,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso pvName, err = ctx.pvRenamer(oldName) if err != nil { errs.Add(namespace, errors.Wrapf(err, "error renaming PV")) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } } else { // VolumeSnapshotter could have modified the PV name through @@ -1293,7 +1300,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso // Return early because we don't want to restore the PV itself, we // want to dynamically re-provision it. - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated case hasDeleteReclaimPolicy(obj.Object): ctx.log.Infof("Dynamically re-provisioning persistent volume because it doesn't have a snapshot and its reclaim policy is Delete.") @@ -1301,7 +1308,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso // Return early because we don't want to restore the PV itself, we // want to dynamically re-provision it. - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated default: ctx.log.Infof("Restoring persistent volume as-is because it doesn't have a snapshot and its reclaim policy is not Delete.") @@ -1310,7 +1317,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso _, err = remapClaimRefNS(ctx, obj) if err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } obj = resetVolumeBindingInfo(obj) // We call the pvRestorer here to clear out the PV's claimRef.UID, @@ -1318,7 +1325,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso updatedObj, err := ctx.pvRestorer.executePVAction(obj) if err != nil { errs.Add(namespace, fmt.Errorf("error executing PVAction for %s: %v", resourceID, err)) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } obj = updatedObj } @@ -1328,7 +1335,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso // Clear out non-core metadata fields and status. if obj, err = resetMetadataAndStatus(obj); err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } ctx.log.Infof("restore status includes excludes: %+v", ctx.resourceStatusIncludesExcludes) @@ -1353,7 +1360,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso }) if err != nil { errs.Add(namespace, fmt.Errorf("error preparing %s: %v", resourceID, err)) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } // If async plugin started async operation, add it to the ItemOperations list @@ -1382,12 +1389,12 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } if executeOutput.SkipRestore { ctx.log.Infof("Skipping restore of %s: %v because a registered plugin discarded it", obj.GroupVersionKind().Kind, name) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } unstructuredObj, ok := executeOutput.UpdatedItem.(*unstructured.Unstructured) if !ok { errs.Add(namespace, fmt.Errorf("%s: unexpected type %T", resourceID, executeOutput.UpdatedItem)) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } obj = unstructuredObj @@ -1420,7 +1427,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } } - w, e, additionalItemExists := ctx.restoreItem(additionalObj, additionalItem.GroupResource, additionalItemNamespace) + w, e, additionalItemExists, _ := ctx.restoreItem(additionalObj, additionalItem.GroupResource, additionalItemNamespace) if additionalItemExists { filteredAdditionalItems = append(filteredAdditionalItems, additionalItem) } @@ -1449,7 +1456,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso pvc := new(v1.PersistentVolumeClaim) if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pvc); err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } if pvc.Spec.VolumeName != "" { @@ -1468,7 +1475,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso ctx.log.Infof("Updating persistent volume claim %s/%s to reference renamed persistent volume (%s -> %s)", namespace, name, pvc.Spec.VolumeName, newName) if err := unstructured.SetNestedField(obj.Object, newName, "spec", "volumeName"); err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } } } @@ -1499,7 +1506,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso resourceClient, err = ctx.getResourceClient(newGR, obj, obj.GetNamespace()) if err != nil { errs.AddVeleroError(fmt.Errorf("error getting updated resource client for namespace %q, resource %q: %v", namespace, &groupResource, err)) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } ctx.log.Infof("Attempting to restore %s: %v", obj.GroupVersionKind().Kind, name) @@ -1528,7 +1535,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso isAlreadyExistsError, err := isAlreadyExistsError(ctx, obj, restoreErr, resourceClient) if err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } if restoreErr != nil { @@ -1543,7 +1550,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso if err != nil && isAlreadyExistsError { ctx.log.Warnf("Unable to retrieve in-cluster version of %s: %v, object won't be restored by velero or have restore labels, and existing resource policy is not applied", kube.NamespaceAndName(obj), err) warnings.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } } @@ -1557,7 +1564,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso if err != nil { ctx.log.Infof("Error trying to reset metadata for %s: %v", kube.NamespaceAndName(obj), err) warnings.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } // We know the object from the cluster won't have the backup/restore name @@ -1573,20 +1580,20 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso if err != nil { ctx.log.Infof("error merging secrets for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err) warnings.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } patchBytes, err := generatePatch(fromCluster, desired) if err != nil { ctx.log.Infof("error generating patch for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err) warnings.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } if patchBytes == nil { // In-cluster and desired state are the same, so move on to // the next item. - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } _, err = resourceClient.Patch(name, patchBytes) @@ -1635,7 +1642,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso warnings.Add(namespace, e) } } - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } //update backup/restore labels on the unchanged resources if existingResourcePolicy is set as update @@ -1651,22 +1658,24 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } ctx.log.Infof("Restore of %s, %v skipped: it already exists in the cluster and is the same as the backed up version", obj.GroupVersionKind().Kind, name) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } // Error was something other than an AlreadyExists. if restoreErr != nil { ctx.log.Errorf("error restoring %s: %+v", name, restoreErr) errs.Add(namespace, fmt.Errorf("error restoring %s: %v", resourceID, restoreErr)) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } + itemCreated = true + shouldRestoreStatus := ctx.resourceStatusIncludesExcludes != nil && ctx.resourceStatusIncludesExcludes.ShouldInclude(groupResource.String()) if shouldRestoreStatus && statusFieldErr != nil { err := fmt.Errorf("could not get status to be restored %s: %v", kube.NamespaceAndName(obj), statusFieldErr) ctx.log.Errorf(err.Error()) errs.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } ctx.log.Debugf("status field for %s: exists: %v, should restore: %v", groupResource, statusFieldExists, shouldRestoreStatus) // if it should restore status, run a UpdateStatus @@ -1674,7 +1683,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso if err := unstructured.SetNestedField(obj.Object, objStatus, "status"); err != nil { ctx.log.Errorf("could not set status field %s: %v", kube.NamespaceAndName(obj), err) errs.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } obj.SetResourceVersion(createdObj.GetResourceVersion()) updated, err := resourceClient.UpdateStatus(obj, metav1.UpdateOptions{}) @@ -1693,14 +1702,14 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso if err != nil { ctx.log.Errorf("error generating patch for managed fields %s: %v", kube.NamespaceAndName(obj), err) errs.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } if patchBytes != nil { if _, err = resourceClient.Patch(name, patchBytes); err != nil { ctx.log.Errorf("error patch for managed fields %s: %v", kube.NamespaceAndName(obj), err) if !apierrors.IsNotFound(err) { errs.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } } else { ctx.log.Infof("the managed fields for %s is patched", kube.NamespaceAndName(obj)) @@ -1711,7 +1720,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso pod := new(v1.Pod) if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pod); err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } // Do not create podvolumerestore when current restore excludes pv/pvc @@ -1737,7 +1746,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } } - return warnings, errs, itemExists + return warnings, errs, itemExists, itemCreated } func isAlreadyExistsError(ctx *restoreContext, obj *unstructured.Unstructured, err error, client client.Dynamic) (bool, error) { From ebb21303ab09875a3aebed9228e323111fa2c199 Mon Sep 17 00:00:00 2001 From: lou Date: Tue, 7 Nov 2023 19:50:35 +0800 Subject: [PATCH 2/3] add changelog Signed-off-by: lou --- changelogs/unreleased/7069-27149chen | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelogs/unreleased/7069-27149chen diff --git a/changelogs/unreleased/7069-27149chen b/changelogs/unreleased/7069-27149chen new file mode 100644 index 0000000000..243596d4ad --- /dev/null +++ b/changelogs/unreleased/7069-27149chen @@ -0,0 +1 @@ +improve discoveryHelper.Refresh() in restore \ No newline at end of file From 179faf3e333c48519933a27ec70155eb48e35294 Mon Sep 17 00:00:00 2001 From: lou Date: Mon, 27 Nov 2023 17:39:37 +0800 Subject: [PATCH 3/3] update after review Signed-off-by: lou --- pkg/restore/restore.go | 123 ++++++++++++++++++++--------------------- 1 file changed, 61 insertions(+), 62 deletions(-) diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index 0241014f20..a64e8639a8 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -502,7 +502,7 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { }() // totalItems: previously discovered items, i: iteration counter. - totalItems, processedItems, createdItems, existingNamespaces := 0, 0, 0, sets.NewString() + totalItems, processedItems, existingNamespaces := 0, 0, sets.NewString() // First restore CRDs. This is needed so that they are available in the cluster // when getOrderedResourceCollection is called again on the whole backup and @@ -525,22 +525,29 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { var w, e results.Result // Restore this resource, the update channel is set to nil, to avoid misleading value of "totalItems" // more details see #5990 - processedItems, createdItems, w, e = ctx.processSelectedResource( + processedItems, w, e = ctx.processSelectedResource( selectedResource, totalItems, processedItems, - createdItems, existingNamespaces, nil, ) warnings.Merge(&w) errs.Merge(&e) } + + var createdOrUpdatedCRDs bool + for _, restoredItem := range ctx.restoredItems { + if restoredItem.action == itemRestoreResultCreated || restoredItem.action == itemRestoreResultUpdated { + createdOrUpdatedCRDs = true + break + } + } // If we just restored custom resource definitions (CRDs), refresh - // discovery because the restored CRDs may have created new APIs that + // discovery because the restored CRDs may have created or updated new APIs that // didn't previously exist in the cluster, and we want to be able to // resolve & restore instances of them in subsequent loop iterations. - if createdItems > 0 { + if createdOrUpdatedCRDs { if err := ctx.discoveryHelper.Refresh(); err != nil { warnings.Add("", errors.Wrap(err, "refresh discovery after restoring CRDs")) } @@ -586,7 +593,6 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { // reset processedItems and totalItems before processing full resource list processedItems = 0 totalItems = 0 - createdItems = 0 for _, selectedResource := range selectedResourceCollection { totalItems += selectedResource.totalItems } @@ -594,11 +600,10 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { for _, selectedResource := range selectedResourceCollection { var w, e results.Result // Restore this resource - processedItems, createdItems, w, e = ctx.processSelectedResource( + processedItems, w, e = ctx.processSelectedResource( selectedResource, totalItems, processedItems, - createdItems, existingNamespaces, update, ) @@ -682,10 +687,9 @@ func (ctx *restoreContext) processSelectedResource( selectedResource restoreableResource, totalItems int, processedItems int, - createdItems int, existingNamespaces sets.String, update chan progressUpdate, -) (int, int, results.Result, results.Result) { +) (int, results.Result, results.Result) { warnings, errs := results.Result{}, results.Result{} groupResource := schema.ParseGroupResource(selectedResource.resource) @@ -741,15 +745,11 @@ func (ctx *restoreContext) processSelectedResource( continue } - w, e, _, created := ctx.restoreItem(obj, groupResource, selectedItem.targetNamespace) + w, e, _ := ctx.restoreItem(obj, groupResource, selectedItem.targetNamespace) warnings.Merge(&w) errs.Merge(&e) processedItems++ - if created { - createdItems++ - } - // totalItems keeps the count of items previously known. There // may be additional items restored by plugins. We want to include // the additional items by looking at restoredItems at the same @@ -771,7 +771,7 @@ func (ctx *restoreContext) processSelectedResource( } } - return processedItems, createdItems, warnings, errs + return processedItems, warnings, errs } // getNamespace returns a namespace API object that we should attempt to @@ -1091,9 +1091,10 @@ func (ctx *restoreContext) getResource(groupResource schema.GroupResource, obj * return u, nil } -// itemExists bool is used to determine whether to include this item in the "wait for additional items" list -// itemCreated indicates whether the item was created by this restore -func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupResource schema.GroupResource, namespace string) (warnings, errs results.Result, itemExists, itemCreated bool) { +func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupResource schema.GroupResource, namespace string) (results.Result, results.Result, bool) { + warnings, errs := results.Result{}, results.Result{} + // itemExists bool is used to determine whether to include this item in the "wait for additional items" list + itemExists := false resourceID := getResourceID(groupResource, namespace, obj.GetName()) // Check if group/resource should be restored. We need to do this here since @@ -1105,7 +1106,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso "name": obj.GetName(), "groupResource": groupResource.String(), }).Info("Not restoring item because resource is excluded") - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } // Check if namespace/cluster-scoped resource should be restored. We need @@ -1121,7 +1122,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso "name": obj.GetName(), "groupResource": groupResource.String(), }).Info("Not restoring item because namespace is excluded") - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } // If the namespace scoped resource should be restored, ensure that the @@ -1131,7 +1132,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso _, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(nsToEnsure, ctx.namespaceClient, ctx.resourceTerminatingTimeout) if err != nil { errs.AddVeleroError(err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } // Add the newly created namespace to the list of restored items. if nsCreated { @@ -1149,7 +1150,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso "name": obj.GetName(), "groupResource": groupResource.String(), }).Info("Not restoring item because it's cluster-scoped") - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } } @@ -1160,11 +1161,11 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso complete, err := isCompleted(obj, groupResource) if err != nil { errs.Add(namespace, fmt.Errorf("error checking completion of %q: %v", resourceID, err)) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } if complete { ctx.log.Infof("%s is complete - skipping", kube.NamespaceAndName(obj)) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } name := obj.GetName() @@ -1178,7 +1179,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso if prevRestoredItemStatus, exists := ctx.restoredItems[itemKey]; exists { ctx.log.Infof("Skipping %s because it's already been restored.", resourceID) itemExists = prevRestoredItemStatus.itemExists - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } ctx.restoredItems[itemKey] = restoredItemStatus{itemExists: itemExists} defer func() { @@ -1202,13 +1203,13 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso // to the interface. if groupResource == kuberesource.Pods && obj.GetAnnotations()[v1.MirrorPodAnnotationKey] != "" { ctx.log.Infof("Not restoring pod because it's a mirror pod") - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } resourceClient, err := ctx.getResourceClient(groupResource, obj, namespace) if err != nil { errs.AddVeleroError(fmt.Errorf("error getting resource client for namespace %q, resource %q: %v", namespace, &groupResource, err)) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } if groupResource == kuberesource.PersistentVolumes { @@ -1218,7 +1219,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso shouldRenamePV, err := shouldRenamePV(ctx, obj, resourceClient) if err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } // Check to see if the claimRef.namespace field needs to be remapped, @@ -1226,7 +1227,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso _, err = remapClaimRefNS(ctx, obj) if err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } var shouldRestoreSnapshot bool @@ -1236,7 +1237,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso shouldRestoreSnapshot, err = ctx.shouldRestore(name, resourceClient) if err != nil { errs.Add(namespace, errors.Wrapf(err, "error waiting on in-cluster persistentvolume %s", name)) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } } else { // If we're renaming the PV, we're going to give it a new random name, @@ -1256,7 +1257,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso updatedObj, err := ctx.pvRestorer.executePVAction(obj) if err != nil { errs.Add(namespace, fmt.Errorf("error executing PVAction for %s: %v", resourceID, err)) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } obj = updatedObj @@ -1273,7 +1274,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso pvName, err = ctx.pvRenamer(oldName) if err != nil { errs.Add(namespace, errors.Wrapf(err, "error renaming PV")) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } } else { // VolumeSnapshotter could have modified the PV name through @@ -1300,7 +1301,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso // Return early because we don't want to restore the PV itself, we // want to dynamically re-provision it. - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists case hasDeleteReclaimPolicy(obj.Object): ctx.log.Infof("Dynamically re-provisioning persistent volume because it doesn't have a snapshot and its reclaim policy is Delete.") @@ -1308,7 +1309,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso // Return early because we don't want to restore the PV itself, we // want to dynamically re-provision it. - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists default: ctx.log.Infof("Restoring persistent volume as-is because it doesn't have a snapshot and its reclaim policy is not Delete.") @@ -1317,7 +1318,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso _, err = remapClaimRefNS(ctx, obj) if err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } obj = resetVolumeBindingInfo(obj) // We call the pvRestorer here to clear out the PV's claimRef.UID, @@ -1325,7 +1326,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso updatedObj, err := ctx.pvRestorer.executePVAction(obj) if err != nil { errs.Add(namespace, fmt.Errorf("error executing PVAction for %s: %v", resourceID, err)) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } obj = updatedObj } @@ -1335,7 +1336,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso // Clear out non-core metadata fields and status. if obj, err = resetMetadataAndStatus(obj); err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } ctx.log.Infof("restore status includes excludes: %+v", ctx.resourceStatusIncludesExcludes) @@ -1360,7 +1361,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso }) if err != nil { errs.Add(namespace, fmt.Errorf("error preparing %s: %v", resourceID, err)) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } // If async plugin started async operation, add it to the ItemOperations list @@ -1389,12 +1390,12 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } if executeOutput.SkipRestore { ctx.log.Infof("Skipping restore of %s: %v because a registered plugin discarded it", obj.GroupVersionKind().Kind, name) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } unstructuredObj, ok := executeOutput.UpdatedItem.(*unstructured.Unstructured) if !ok { errs.Add(namespace, fmt.Errorf("%s: unexpected type %T", resourceID, executeOutput.UpdatedItem)) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } obj = unstructuredObj @@ -1427,7 +1428,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } } - w, e, additionalItemExists, _ := ctx.restoreItem(additionalObj, additionalItem.GroupResource, additionalItemNamespace) + w, e, additionalItemExists := ctx.restoreItem(additionalObj, additionalItem.GroupResource, additionalItemNamespace) if additionalItemExists { filteredAdditionalItems = append(filteredAdditionalItems, additionalItem) } @@ -1456,7 +1457,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso pvc := new(v1.PersistentVolumeClaim) if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pvc); err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } if pvc.Spec.VolumeName != "" { @@ -1475,7 +1476,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso ctx.log.Infof("Updating persistent volume claim %s/%s to reference renamed persistent volume (%s -> %s)", namespace, name, pvc.Spec.VolumeName, newName) if err := unstructured.SetNestedField(obj.Object, newName, "spec", "volumeName"); err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } } } @@ -1506,7 +1507,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso resourceClient, err = ctx.getResourceClient(newGR, obj, obj.GetNamespace()) if err != nil { errs.AddVeleroError(fmt.Errorf("error getting updated resource client for namespace %q, resource %q: %v", namespace, &groupResource, err)) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } ctx.log.Infof("Attempting to restore %s: %v", obj.GroupVersionKind().Kind, name) @@ -1535,7 +1536,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso isAlreadyExistsError, err := isAlreadyExistsError(ctx, obj, restoreErr, resourceClient) if err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } if restoreErr != nil { @@ -1550,7 +1551,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso if err != nil && isAlreadyExistsError { ctx.log.Warnf("Unable to retrieve in-cluster version of %s: %v, object won't be restored by velero or have restore labels, and existing resource policy is not applied", kube.NamespaceAndName(obj), err) warnings.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } } @@ -1564,7 +1565,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso if err != nil { ctx.log.Infof("Error trying to reset metadata for %s: %v", kube.NamespaceAndName(obj), err) warnings.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } // We know the object from the cluster won't have the backup/restore name @@ -1580,20 +1581,20 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso if err != nil { ctx.log.Infof("error merging secrets for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err) warnings.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } patchBytes, err := generatePatch(fromCluster, desired) if err != nil { ctx.log.Infof("error generating patch for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err) warnings.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } if patchBytes == nil { // In-cluster and desired state are the same, so move on to // the next item. - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } _, err = resourceClient.Patch(name, patchBytes) @@ -1642,7 +1643,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso warnings.Add(namespace, e) } } - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } //update backup/restore labels on the unchanged resources if existingResourcePolicy is set as update @@ -1658,24 +1659,22 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } ctx.log.Infof("Restore of %s, %v skipped: it already exists in the cluster and is the same as the backed up version", obj.GroupVersionKind().Kind, name) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } // Error was something other than an AlreadyExists. if restoreErr != nil { ctx.log.Errorf("error restoring %s: %+v", name, restoreErr) errs.Add(namespace, fmt.Errorf("error restoring %s: %v", resourceID, restoreErr)) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } - itemCreated = true - shouldRestoreStatus := ctx.resourceStatusIncludesExcludes != nil && ctx.resourceStatusIncludesExcludes.ShouldInclude(groupResource.String()) if shouldRestoreStatus && statusFieldErr != nil { err := fmt.Errorf("could not get status to be restored %s: %v", kube.NamespaceAndName(obj), statusFieldErr) ctx.log.Errorf(err.Error()) errs.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } ctx.log.Debugf("status field for %s: exists: %v, should restore: %v", groupResource, statusFieldExists, shouldRestoreStatus) // if it should restore status, run a UpdateStatus @@ -1683,7 +1682,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso if err := unstructured.SetNestedField(obj.Object, objStatus, "status"); err != nil { ctx.log.Errorf("could not set status field %s: %v", kube.NamespaceAndName(obj), err) errs.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } obj.SetResourceVersion(createdObj.GetResourceVersion()) updated, err := resourceClient.UpdateStatus(obj, metav1.UpdateOptions{}) @@ -1702,14 +1701,14 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso if err != nil { ctx.log.Errorf("error generating patch for managed fields %s: %v", kube.NamespaceAndName(obj), err) errs.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } if patchBytes != nil { if _, err = resourceClient.Patch(name, patchBytes); err != nil { ctx.log.Errorf("error patch for managed fields %s: %v", kube.NamespaceAndName(obj), err) if !apierrors.IsNotFound(err) { errs.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } } else { ctx.log.Infof("the managed fields for %s is patched", kube.NamespaceAndName(obj)) @@ -1720,7 +1719,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso pod := new(v1.Pod) if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pod); err != nil { errs.Add(namespace, err) - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } // Do not create podvolumerestore when current restore excludes pv/pvc @@ -1746,7 +1745,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } } - return warnings, errs, itemExists, itemCreated + return warnings, errs, itemExists } func isAlreadyExistsError(ctx *restoreContext, obj *unstructured.Unstructured, err error, client client.Dynamic) (bool, error) {