From dd9e544dd67bc411743451fee26832fb31e87a4e Mon Sep 17 00:00:00 2001 From: Michael Weibel Date: Mon, 23 Jan 2023 15:59:20 +0100 Subject: [PATCH] feat: add support for RWO PVCs backups adjusts e2e tests to use local-path dynamic provisioner since we use multi node kind cluster now to be able to test RWO PVCs. fixes #319 Signed-off-by: Michael Weibel --- api/v1/common_types.go | 5 + .../k8up/templates/operator-clusterrole.yaml | 17 +- cmd/restic/main.go | 12 +- config/rbac/role.yaml | 8 + config/samples/deployments/mongodb.yaml | 15 +- docs/modules/ROOT/examples/usage/restic.txt | 74 +++--- e2e/definitions/kind/config.yaml | 6 +- e2e/definitions/pv/pv.yaml | 17 -- e2e/definitions/pv/pvc.yaml | 5 +- .../pvc-rwo-subject/controlplane.yaml | 63 +++++ e2e/definitions/pvc-rwo-subject/worker.yaml | 58 +++++ e2e/kind.mk | 5 + e2e/lib/k8up.bash | 153 ++++++++++++- e2e/test-03-backup.bats | 4 +- e2e/test-05-annotated-backup.bats | 4 +- e2e/test-06-pvc-rwo.bats | 60 +++++ go.mod | 2 +- operator/backupcontroller/backup_utils.go | 4 +- operator/backupcontroller/controller.go | 60 ++++- .../controller_integration_test.go | 198 +++++++++++++++- .../controller_utils_integration_test.go | 77 ++++++- operator/backupcontroller/executor.go | 215 ++++++++++++++---- operator/backupcontroller/executor_test.go | 138 +++++++++++ operator/backupcontroller/setup.go | 1 + operator/job/job.go | 70 +++--- restic/cfg/config.go | 4 + restic/kubernetes/pod_list.go | 20 +- 27 files changed, 1123 insertions(+), 172 deletions(-) delete mode 100644 e2e/definitions/pv/pv.yaml create mode 100644 e2e/definitions/pvc-rwo-subject/controlplane.yaml create mode 100644 e2e/definitions/pvc-rwo-subject/worker.yaml create mode 100644 e2e/test-06-pvc-rwo.bats create mode 100644 operator/backupcontroller/executor_test.go diff --git a/api/v1/common_types.go b/api/v1/common_types.go index b8208f0f4..11a9c65d4 100644 --- a/api/v1/common_types.go +++ b/api/v1/common_types.go @@ -74,12 +74,17 @@ const ( // LabelK8upType is the label key that identifies the job type LabelK8upType = "k8up.io/type" + // LabelK8upOwnedBy is a label used to indicated which resource owns this resource to make it easy to fetch owned resources. + LabelK8upOwnedBy = "k8up.io/owned-by" // Deprecated: LegacyLabelK8upType is the former label key that identified the job type LegacyLabelK8upType = "k8up.syn.tools/type" // LabelManagedBy identifies the tool being used to manage the operation of a resource LabelManagedBy = "app.kubernetes.io/managed-by" // LabelRepositoryHash is the label key that identifies the Restic repository LabelRepositoryHash = "k8up.io/repository-hash" + + // AnnotationK8upHostname is an annotation one can set on RWO PVCs to try to back up them on the specified node. + AnnotationK8upHostname = "k8up.io/hostname" ) // String casts the value to string. diff --git a/charts/k8up/templates/operator-clusterrole.yaml b/charts/k8up/templates/operator-clusterrole.yaml index 86113c92a..6efaba40d 100644 --- a/charts/k8up/templates/operator-clusterrole.yaml +++ b/charts/k8up/templates/operator-clusterrole.yaml @@ -31,15 +31,6 @@ rules: - patch - update - watch - - apiGroups: - - batch - resources: - - jobs/finalizers - - jobs/status - verbs: - - get - - patch - - update - apiGroups: - coordination.k8s.io resources: @@ -64,6 +55,14 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - persistentvolumes + verbs: + - get + - list + - watch - apiGroups: - "" resources: diff --git a/cmd/restic/main.go b/cmd/restic/main.go index 486db2b43..64c5f30e8 100644 --- a/cmd/restic/main.go +++ b/cmd/restic/main.go @@ -6,6 +6,7 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/go-logr/logr" "github.com/urfave/cli/v2" @@ -81,6 +82,9 @@ var ( &cli.StringFlag{Destination: &cfg.Config.PruneKeepWithinMonthly, Name: "keepWithinMonthly", EnvVars: []string{"KEEP_WITHIN_MONTHLY"}, Usage: "While pruning, keep monthly snapshots within the given duration, e.g. '2y5m7d3h'"}, &cli.StringFlag{Destination: &cfg.Config.PruneKeepWithinYearly, Name: "keepWithinYearly", EnvVars: []string{"KEEP_WITHIN_YEARLY"}, Usage: "While pruning, keep yearly snapshots within the given duration, e.g. '2y5m7d3h'"}, &cli.StringFlag{Destination: &cfg.Config.PruneKeepWithin, Name: "keepWithin", EnvVars: []string{"KEEP_WITHIN"}, Usage: "While pruning, keep tagged snapshots within the given duration, e.g. '2y5m7d3h'"}, + + &cli.StringSliceFlag{Name: "targetPods", EnvVars: []string{"TARGET_PODS"}, Usage: "Filter list of pods by TARGET_PODS names"}, + &cli.DurationFlag{Destination: &cfg.Config.SleepDuration, Name: "sleepDuration", EnvVars: []string{"SLEEP_DURATION"}, Usage: "Sleep for specified amount until init starts"}, }, } ) @@ -90,6 +94,8 @@ func resticMain(c *cli.Context) error { resticLog.Info("initializing") cfg.Config.Tags = c.StringSlice("tag") + cfg.Config.TargetPods = c.StringSlice("targetPods") + err := cfg.Config.Validate() if err != nil { return err @@ -122,6 +128,10 @@ func run(ctx context.Context, resticCLI *resticCli.Restic, mainLogger logr.Logge } func resticInitialization(resticCLI *resticCli.Restic, mainLogger logr.Logger) error { + if cfg.Config.SleepDuration > 0 { + mainLogger.Info("sleeping until init", "duration", cfg.Config.SleepDuration) + time.Sleep(cfg.Config.SleepDuration) + } if err := resticCLI.Init(); err != nil { return fmt.Errorf("failed to initialise the restic repository: %w", err) } @@ -235,7 +245,7 @@ func backupAnnotatedPods(ctx context.Context, resticCLI *resticCli.Restic, mainL return nil } - podLister := kubernetes.NewPodLister(ctx, cfg.Config.BackupCommandAnnotation, cfg.Config.BackupFileExtensionAnnotation, cfg.Config.Hostname, mainLogger) + podLister := kubernetes.NewPodLister(ctx, cfg.Config.BackupCommandAnnotation, cfg.Config.BackupFileExtensionAnnotation, cfg.Config.Hostname, cfg.Config.TargetPods, mainLogger) podList, err := podLister.ListPods() if err != nil { mainLogger.Error(err, "could not list pods", "namespace", cfg.Config.Hostname) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 24ad5dcac..c23bf8820 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -53,6 +53,14 @@ rules: - get - list - watch +- apiGroups: + - "" + resources: + - persistentvolumes + verbs: + - get + - list + - watch - apiGroups: - "" resources: diff --git a/config/samples/deployments/mongodb.yaml b/config/samples/deployments/mongodb.yaml index 071a62b1c..2e5b5b7d9 100644 --- a/config/samples/deployments/mongodb.yaml +++ b/config/samples/deployments/mongodb.yaml @@ -1,12 +1,10 @@ apiVersion: apps/v1 kind: Deployment metadata: - annotations: - k8up.io/backupcommand: mongodump --archive -uroot -pexample labels: app: mongodb name: mongodb - namespace: myproject + namespace: default spec: replicas: 1 revisionHistoryLimit: 2 @@ -22,6 +20,8 @@ spec: metadata: labels: app: mongodb + annotations: + k8up.io/backupcommand: mongodump --archive -uroot -pexample spec: containers: - env: @@ -50,11 +50,20 @@ spec: - name: mongodb persistentVolumeClaim: claimName: mongodb + tolerations: + - effect: NoSchedule + key: node-role.kubernetes.io/master + - effect: NoSchedule + key: node-role.kubernetes.io/control-plane + nodeSelector: + kubernetes.io/os: linux + node-role.kubernetes.io/control-plane: "" --- kind: PersistentVolumeClaim apiVersion: v1 metadata: name: mongodb + namespace: default spec: accessModes: - ReadWriteOnce diff --git a/docs/modules/ROOT/examples/usage/restic.txt b/docs/modules/ROOT/examples/usage/restic.txt index 49722bee0..55a99c98e 100644 --- a/docs/modules/ROOT/examples/usage/restic.txt +++ b/docs/modules/ROOT/examples/usage/restic.txt @@ -11,39 +11,41 @@ DESCRIPTION: Start k8up in restic mode OPTIONS: - --check Set, if the container should do a check (default: false) - --prune Set, if the container should do a prune (default: false) - --restore Set, if the container should attempt a restore (default: false) - --archive Set, if the container should do an archive (default: false) - --tag value [ --tag value ] List of tags to consider for given operation - --backupCommandAnnotation value Defines the command to invoke when doing a backup via STDOUT. [$BACKUPCOMMAND_ANNOTATION] - --fileExtensionAnnotation value Defines the file extension to use for STDOUT backups. [$FILEEXTENSION_ANNOTATION] - --promURL value Sets the URL of a prometheus push gateway to report metrics. [$PROM_URL] - --webhookURL value, --statsURL value Sets the URL of a server which will retrieve a webhook after the action completes. [$STATS_URL] - --backupDir value Set from which directory the backup should be performed. (default: "/data") [$BACKUP_DIR] - --restoreDir value Set to which directory the restore should be performed. (default: "/data") [$RESTORE_DIR] - --restoreFilter value Simple filter to define what should get restored. For example the PVC name - --restoreSnap value Snapshot ID, if empty takes the latest snapshot - --restoreType value Type of this restore, 'folder' or 's3' - --restoreS3AccessKey value S3 access key used to connect to the S3 endpoint when restoring [$RESTORE_ACCESSKEYID] - --restoreS3SecretKey value S3 secret key used to connect to the S3 endpoint when restoring [$RESTORE_SECRETACCESSKEY] - --restoreS3Endpoint value S3 endpoint to connect to when restoring, e.g. 'https://minio.svc:9000/backup [$RESTORE_S3ENDPOINT] - --verifyRestore If the restore should get verified, only for PVCs restore (default: false) - --trimRestorePath If set, strips the value of --restoreDir from the lefts side of the remote restore path value (default: enabled) [$TRIM_RESTOREPATH] - --resticBin value The path to the restic binary. (default: "/usr/local/bin/restic") [$RESTIC_BINARY] - --resticRepository value The restic repository to perform the action with [$RESTIC_REPOSITORY] - --resticOptions value Additional options to pass to restic in the format 'key=value,key2=value2' [$RESTIC_OPTIONS] - --keepLatest value While pruning, keep at the latest snapshot (default: 0) [$KEEP_LAST, $KEEP_LATEST] - --keepHourly value While pruning, keep hourly snapshots (default: 0) [$KEEP_HOURLY] - --keepDaily value While pruning, keep daily snapshots (default: 0) [$KEEP_DAILY] - --keepWeekly value While pruning, keep weekly snapshots (default: 0) [$KEEP_WEEKLY] - --keepMonthly value While pruning, keep monthly snapshots (default: 0) [$KEEP_MONTHLY] - --keepYearly value While pruning, keep yearly snapshots (default: 0) [$KEEP_YEARLY] - --keepTags While pruning, keep tagged snapshots (default: false) [$KEEP_TAG, $KEEP_TAGS] - --keepWithinHourly value While pruning, keep hourly snapshots within the given duration, e.g. '2y5m7d3h' [$KEEP_WITHIN_HOURLY] - --keepWithinDaily value While pruning, keep daily snapshots within the given duration, e.g. '2y5m7d3h' [$KEEP_WITHIN_DAILY] - --keepWithinWeekly value While pruning, keep weekly snapshots within the given duration, e.g. '2y5m7d3h' [$KEEP_WITHIN_WEEKLY] - --keepWithinMonthly value While pruning, keep monthly snapshots within the given duration, e.g. '2y5m7d3h' [$KEEP_WITHIN_MONTHLY] - --keepWithinYearly value While pruning, keep yearly snapshots within the given duration, e.g. '2y5m7d3h' [$KEEP_WITHIN_YEARLY] - --keepWithin value While pruning, keep tagged snapshots within the given duration, e.g. '2y5m7d3h' [$KEEP_WITHIN] - --help, -h show help (default: false) + --check Set, if the container should do a check (default: false) + --prune Set, if the container should do a prune (default: false) + --restore Set, if the container should attempt a restore (default: false) + --archive Set, if the container should do an archive (default: false) + --tag value [ --tag value ] List of tags to consider for given operation + --backupCommandAnnotation value Defines the command to invoke when doing a backup via STDOUT. [$BACKUPCOMMAND_ANNOTATION] + --fileExtensionAnnotation value Defines the file extension to use for STDOUT backups. [$FILEEXTENSION_ANNOTATION] + --promURL value Sets the URL of a prometheus push gateway to report metrics. [$PROM_URL] + --webhookURL value, --statsURL value Sets the URL of a server which will retrieve a webhook after the action completes. [$STATS_URL] + --backupDir value Set from which directory the backup should be performed. (default: "/data") [$BACKUP_DIR] + --restoreDir value Set to which directory the restore should be performed. (default: "/data") [$RESTORE_DIR] + --restoreFilter value Simple filter to define what should get restored. For example the PVC name + --restoreSnap value Snapshot ID, if empty takes the latest snapshot + --restoreType value Type of this restore, 'folder' or 's3' + --restoreS3AccessKey value S3 access key used to connect to the S3 endpoint when restoring [$RESTORE_ACCESSKEYID] + --restoreS3SecretKey value S3 secret key used to connect to the S3 endpoint when restoring [$RESTORE_SECRETACCESSKEY] + --restoreS3Endpoint value S3 endpoint to connect to when restoring, e.g. 'https://minio.svc:9000/backup [$RESTORE_S3ENDPOINT] + --verifyRestore If the restore should get verified, only for PVCs restore (default: false) + --trimRestorePath If set, strips the value of --restoreDir from the lefts side of the remote restore path value (default: enabled) [$TRIM_RESTOREPATH] + --resticBin value The path to the restic binary. (default: "/usr/local/bin/restic") [$RESTIC_BINARY] + --resticRepository value The restic repository to perform the action with [$RESTIC_REPOSITORY] + --resticOptions value Additional options to pass to restic in the format 'key=value,key2=value2' [$RESTIC_OPTIONS] + --keepLatest value While pruning, keep at the latest snapshot (default: 0) [$KEEP_LAST, $KEEP_LATEST] + --keepHourly value While pruning, keep hourly snapshots (default: 0) [$KEEP_HOURLY] + --keepDaily value While pruning, keep daily snapshots (default: 0) [$KEEP_DAILY] + --keepWeekly value While pruning, keep weekly snapshots (default: 0) [$KEEP_WEEKLY] + --keepMonthly value While pruning, keep monthly snapshots (default: 0) [$KEEP_MONTHLY] + --keepYearly value While pruning, keep yearly snapshots (default: 0) [$KEEP_YEARLY] + --keepTags While pruning, keep tagged snapshots (default: false) [$KEEP_TAG, $KEEP_TAGS] + --keepWithinHourly value While pruning, keep hourly snapshots within the given duration, e.g. '2y5m7d3h' [$KEEP_WITHIN_HOURLY] + --keepWithinDaily value While pruning, keep daily snapshots within the given duration, e.g. '2y5m7d3h' [$KEEP_WITHIN_DAILY] + --keepWithinWeekly value While pruning, keep weekly snapshots within the given duration, e.g. '2y5m7d3h' [$KEEP_WITHIN_WEEKLY] + --keepWithinMonthly value While pruning, keep monthly snapshots within the given duration, e.g. '2y5m7d3h' [$KEEP_WITHIN_MONTHLY] + --keepWithinYearly value While pruning, keep yearly snapshots within the given duration, e.g. '2y5m7d3h' [$KEEP_WITHIN_YEARLY] + --keepWithin value While pruning, keep tagged snapshots within the given duration, e.g. '2y5m7d3h' [$KEEP_WITHIN] + --targetPods value [ --targetPods value ] Filter list of pods by TARGET_PODS names [$TARGET_PODS] + --sleepDuration value Sleep for specified amount until init starts (default: 0s) [$SLEEP_DURATION] + --help, -h show help (default: false) diff --git a/e2e/definitions/kind/config.yaml b/e2e/definitions/kind/config.yaml index ec17072e9..d1d87605e 100644 --- a/e2e/definitions/kind/config.yaml +++ b/e2e/definitions/kind/config.yaml @@ -2,6 +2,6 @@ apiVersion: kind.x-k8s.io/v1alpha4 kind: Cluster nodes: - role: control-plane - extraMounts: - - hostPath: ./e2e/debug/data - containerPath: /tmp/e2e +- role: worker + labels: + worker: true diff --git a/e2e/definitions/pv/pv.yaml b/e2e/definitions/pv/pv.yaml deleted file mode 100644 index 319ebfee5..000000000 --- a/e2e/definitions/pv/pv.yaml +++ /dev/null @@ -1,17 +0,0 @@ -apiVersion: v1 -kind: PersistentVolume -metadata: - name: subject-pv - labels: - pv.kubernetes.io/type: e2e-test -spec: - accessModes: - - ReadWriteMany - capacity: - storage: 1Gi - hostPath: - path: /tmp/e2e/pvc-subject - type: DirectoryOrCreate - # 'Delete' is only supported inside /tmp - persistentVolumeReclaimPolicy: Delete - storageClassName: hostpath diff --git a/e2e/definitions/pv/pvc.yaml b/e2e/definitions/pv/pvc.yaml index 14fab9f73..ee8bfad56 100644 --- a/e2e/definitions/pv/pvc.yaml +++ b/e2e/definitions/pv/pvc.yaml @@ -9,8 +9,5 @@ spec: resources: requests: storage: 1Gi - storageClassName: hostpath + storageClassName: standard volumeMode: Filesystem - selector: - matchLabels: - pv.kubernetes.io/type: e2e-test diff --git a/e2e/definitions/pvc-rwo-subject/controlplane.yaml b/e2e/definitions/pvc-rwo-subject/controlplane.yaml new file mode 100644 index 000000000..32e39f236 --- /dev/null +++ b/e2e/definitions/pvc-rwo-subject/controlplane.yaml @@ -0,0 +1,63 @@ +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: pvc-rwo-subject-pvc-controlplane + namespace: k8up-e2e-subject +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Mi + storageClassName: standard + volumeMode: Filesystem +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: pvc-rwo-subject-controlplane + namespace: k8up-e2e-subject +spec: + replicas: 1 + selector: + matchLabels: + app: subject-controlplane + template: + metadata: + labels: + app: subject-controlplane + spec: + containers: + - name: pvc-rwo-subject-container + image: quay.io/prometheus/busybox:latest + imagePullPolicy: IfNotPresent + args: + - sh + - -c + - | + printf "$BACKUP_FILE_CONTENT" | tee "/data/$BACKUP_FILE_NAME" && \ + echo && \ + ls -la /data && \ + echo "test file /data/$BACKUP_FILE_NAME written, sleeping now" && \ + sleep infinity + securityContext: + runAsUser: $ID + volumeMounts: + - name: volume + mountPath: /data + env: + - name: BACKUP_FILE_CONTENT + value: "" + - name: BACKUP_FILE_NAME + value: "" + volumes: + - name: volume + persistentVolumeClaim: + claimName: pvc-rwo-subject-pvc-controlplane + tolerations: + - effect: NoSchedule + key: node-role.kubernetes.io/master + - effect: NoSchedule + key: node-role.kubernetes.io/control-plane + nodeSelector: + node-role.kubernetes.io/control-plane: "" diff --git a/e2e/definitions/pvc-rwo-subject/worker.yaml b/e2e/definitions/pvc-rwo-subject/worker.yaml new file mode 100644 index 000000000..a7aebc2d4 --- /dev/null +++ b/e2e/definitions/pvc-rwo-subject/worker.yaml @@ -0,0 +1,58 @@ +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: pvc-rwo-subject-pvc-worker + namespace: k8up-e2e-subject +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Mi + storageClassName: standard + volumeMode: Filesystem +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: pvc-rwo-subject-worker + namespace: k8up-e2e-subject +spec: + replicas: 1 + selector: + matchLabels: + app: subject-worker + template: + metadata: + labels: + app: subject-worker + spec: + containers: + - name: pvc-rwo-subject-container + image: quay.io/prometheus/busybox:latest + imagePullPolicy: IfNotPresent + args: + - sh + - -c + - | + printf "$BACKUP_FILE_CONTENT" | tee "/data/$BACKUP_FILE_NAME" && \ + echo && \ + ls -la /data && \ + echo "test file /data/$BACKUP_FILE_NAME written, sleeping now" && \ + sleep infinity + securityContext: + runAsUser: $ID + volumeMounts: + - name: volume + mountPath: /data + env: + - name: BACKUP_FILE_CONTENT + value: "" + - name: BACKUP_FILE_NAME + value: "" + volumes: + - name: volume + persistentVolumeClaim: + claimName: pvc-rwo-subject-pvc-worker + nodeSelector: + worker: "true" diff --git a/e2e/kind.mk b/e2e/kind.mk index 3cc7b46f8..cb3f2fb8f 100644 --- a/e2e/kind.mk +++ b/e2e/kind.mk @@ -26,6 +26,11 @@ $(KIND_KUBECONFIG): $(KIND) @kubectl version @kubectl cluster-info @kubectl config use-context kind-$(KIND_CLUSTER) + # Applies local-path-config.yaml to kind cluster and forces restart of provisioner - can be simplified once https://github.com/kubernetes-sigs/kind/pull/3090 is merged. + # This is necessary due to the multi node cluster. Classic k8s hostPath provisioner doesn't permit multi node and sharedFileSystemPath support is only in local-path-provisioner v0.0.23. + @kubectl apply -n local-path-storage -f https://raw.githubusercontent.com/rancher/local-path-provisioner/v0.0.23/deploy/local-path-storage.yaml + @kubectl get cm -n local-path-storage local-path-config -o yaml|yq e '.data."config.json"="{\"nodePathMap\":[],\"sharedFileSystemPath\": \"/tmp/e2e/local-path-provisioner\"}"'|kubectl apply -f - + @kubectl delete po -n local-path-storage --all $(KIND): export GOBIN = $(go_bin) $(KIND): | $(go_bin) diff --git a/e2e/lib/k8up.bash b/e2e/lib/k8up.bash index 3ca90ce7b..45aa6672e 100755 --- a/e2e/lib/k8up.bash +++ b/e2e/lib/k8up.bash @@ -2,6 +2,9 @@ export MINIO_NAMESPACE=${MINIO_NAMESPACE-minio} +directory=$(dirname "${BASH_SOURCE[0]}") +source "$directory/detik.bash" + errcho() { echo >&2 "${@}" } @@ -17,12 +20,12 @@ timestamp() { require_args() { if [ "${#}" != 2 ]; then - errcho "$0 expected 2 arguments, got ${#}." + errcho "$0 expected 2 arguments, got ${#} (${FUNCNAME[1]})." exit 1 fi if [ "${1}" != "${2}" ]; then - errcho "Expected ${1} arguments, got ${2}." + errcho "Expected ${1} arguments, got ${2} (${FUNCNAME[1]})." exit 1 fi } @@ -102,7 +105,7 @@ prepare() { given_a_clean_ns() { kubectl delete namespace "${DETIK_CLIENT_NAMESPACE}" --ignore-not-found - kubectl delete pv subject-pv --ignore-not-found + kubectl delete pvc subject-pvc --ignore-not-found clear_pv_data kubectl create namespace "${DETIK_CLIENT_NAMESPACE}" echo "✅ The namespace '${DETIK_CLIENT_NAMESPACE}' is ready." @@ -114,7 +117,7 @@ given_a_subject() { export BACKUP_FILE_NAME=${1} export BACKUP_FILE_CONTENT=${2} - kubectl apply -f definitions/pv + kubectl apply -f definitions/pv/pvc.yaml yq e '.spec.template.spec.containers[0].securityContext.runAsUser='$(id -u)' | .spec.template.spec.containers[0].env[0].value=strenv(BACKUP_FILE_CONTENT) | .spec.template.spec.containers[0].env[1].value=strenv(BACKUP_FILE_NAME)' definitions/subject/deployment.yaml | kubectl apply -f - echo "✅ The subject is ready" @@ -126,12 +129,34 @@ given_an_annotated_subject() { export BACKUP_FILE_NAME=${1} export BACKUP_FILE_CONTENT=${2} - kubectl apply -f definitions/pv + kubectl apply -f definitions/pv/pvc.yaml yq e '.spec.template.spec.containers[0].securityContext.runAsUser='$(id -u)' | .spec.template.spec.containers[0].env[0].value=strenv(BACKUP_FILE_CONTENT) | .spec.template.spec.containers[0].env[1].value=strenv(BACKUP_FILE_NAME)' definitions/annotated-subject/deployment.yaml | kubectl apply -f - echo "✅ The annotated subject is ready" } +given_a_rwo_pvc_subject_in_worker_node() { + require_args 2 ${#} + + export BACKUP_FILE_NAME=${1} + export BACKUP_FILE_CONTENT=${2} + + yq e 'with(select(document_index == 1) .spec.template.spec; .containers[0].securityContext.runAsUser='$(id -u)' | .containers[0].env[0].value=strenv(BACKUP_FILE_CONTENT) | .containers[0].env[1].value=strenv(BACKUP_FILE_NAME))' definitions/pvc-rwo-subject/worker.yaml | kubectl apply -f - + + echo "✅ The pvc rwo worker subject is ready" +} + +given_a_rwo_pvc_subject_in_controlplane_node() { + require_args 2 ${#} + + export BACKUP_FILE_NAME=${1} + export BACKUP_FILE_CONTENT=${2} + + yq e 'with(select(document_index == 1) .spec.template.spec; .containers[0].securityContext.runAsUser='$(id -u)' | .containers[0].env[0].value=strenv(BACKUP_FILE_CONTENT) | .containers[0].env[1].value=strenv(BACKUP_FILE_NAME))' definitions/pvc-rwo-subject/controlplane.yaml | kubectl apply -f - + + echo "✅ The pvc rwo controlplane subject is ready" +} + given_s3_storage() { helm repo add minio https://helm.min.io/ --force-update helm repo update @@ -185,6 +210,122 @@ given_an_existing_backup() { echo "✅ An existing backup is ready" } +verify_object_value_by_label() { + require_args 5 ${#} + + resource=${1} + labelSelector=${2} + property=${3} + expected_value=${4} + should_return=${5} + + result=$(get_resource_value_by_label "$resource" "$labelSelector" "$property") + + code=$(verify_result "$result" "$expected_value") + if [[ "$should_return" == "true" ]]; then + return "$code" + else + echo "$code" + fi +} + +get_resource_value_by_label() { + require_args 3 ${#} + + resource=${1} + labelSelector=${2} + property=${3} + ns=${NAMESPACE=${DETIK_CLIENT_NAMESPACE}} + + query=$(build_k8s_request "$property") + result=$(eval kubectl --namespace "${ns}" get "${resource}" -l "${labelSelector}" "$query" --no-headers) + + # Debug? + detik_debug "-----DETIK:begin-----" + detik_debug "$BATS_TEST_FILENAME" + detik_debug "$BATS_TEST_DESCRIPTION" + detik_debug "" + detik_debug "Client query:" + detik_debug "kubectl --namespace ${ns} get ${resource} -l ${labelSelector} $query --no-headers" + detik_debug "" + detik_debug "Result:" + detik_debug "$result" + detik_debug "-----DETIK:end-----" + detik_debug "" + + # Is the result empty? + if [[ "$result" == "" ]]; then + echo "No resource of type '$resource' was found with the labelSelector '$labelSelector'." + fi + + echo "$result" +} + +# verify values of pods created by jobs +verify_job_pod_values() { + labelSelector=${1} + shift + property=${1} + shift + expected_values="${*}" + + jobs=$(get_resource_value_by_label job "$labelSelector" "") + IFS=$'\n' + invalid=0 + valid=0 + + for want in $expected_values; do + for line in $jobs; do + ret=$(verify_object_value_by_label pod "job-name=${line}" "$property" "$want" false) + if [[ "$ret" == "0" ]]; then + valid=$((valid + 1)) + else + invalid=$((invalid + 1)) + fi + done + done + + if [[ "$valid" == "0" ]]; then + return 102 + fi + + return 0 +} + +# copied from detik.bash by detik to an own function +verify_result() { + require_args 2 ${#} + + result=${1} + expected_value=${2} + + IFS=$'\n' + invalid=0 + valid=0 + for line in $result; do + # Keep the second column (property to verify) + value=$(echo "$line" | awk '{ print $2 }') + element=$(echo "$line" | awk '{ print $1 }') + + # Compare with an exact value (case insensitive) + value=$(to_lower_case "$value") + expected_value=$(to_lower_case "$expected_value") + if [[ "$value" != "$expected_value" ]]; then + detik_debug "Current value for $element is $value..." + invalid=$((invalid + 1)) + else + detik_debug "$element has the right value ($value)." + valid=$((valid + 1)) + fi + done + + if [[ "$valid" == "0" ]]; then + invalid=102 + fi + + echo $invalid +} + wait_until() { require_args 2 ${#} @@ -194,7 +335,7 @@ wait_until() { ns=${NAMESPACE=${DETIK_CLIENT_NAMESPACE}} echo "Waiting for '${object}' in namespace '${ns}' to become '${condition}' ..." - kubectl -n "${ns}" wait --timeout 1m --for "condition=${condition}" "${object}" + kubectl -n "${ns}" wait --timeout 2m --for "condition=${condition}" "${object}" } expect_file_in_container() { diff --git a/e2e/test-03-backup.bats b/e2e/test-03-backup.bats index 813471109..c58191dc0 100644 --- a/e2e/test-03-backup.bats +++ b/e2e/test-03-backup.bats @@ -23,8 +23,8 @@ DEBUG_DETIK="true" kubectl apply -f definitions/secrets yq e '.spec.podSecurityContext.runAsUser='$(id -u)'' definitions/backup/backup.yaml | kubectl apply -f - - try "at most 10 times every 1s to get backup named 'k8up-backup' and verify that '.status.started' is 'true'" - try "at most 10 times every 1s to get job named 'k8up-backup' and verify that '.status.active' is '1'" + try "at most 10 times every 5s to get backup named 'k8up-backup' and verify that '.status.started' is 'true'" + verify_object_value_by_label job 'k8up.io/owned-by=backup_k8up-backup' '.status.active' 1 true wait_until backup/k8up-backup completed diff --git a/e2e/test-05-annotated-backup.bats b/e2e/test-05-annotated-backup.bats index 838f55421..ed5f55f0a 100644 --- a/e2e/test-05-annotated-backup.bats +++ b/e2e/test-05-annotated-backup.bats @@ -23,8 +23,8 @@ DEBUG_DETIK="true" kubectl apply -f definitions/secrets yq e '.spec.podSecurityContext.runAsUser='$(id -u)'' definitions/backup/backup.yaml | kubectl apply -f - - try "at most 10 times every 1s to get backup named 'k8up-backup' and verify that '.status.started' is 'true'" - try "at most 10 times every 1s to get job named 'k8up-backup' and verify that '.status.active' is '1'" + try "at most 10 times every 5s to get backup named 'k8up-backup' and verify that '.status.started' is 'true'" + verify_object_value_by_label job 'k8up.io/owned-by=backup_k8up-backup' '.status.active' 1 true wait_until backup/k8up-backup completed diff --git a/e2e/test-06-pvc-rwo.bats b/e2e/test-06-pvc-rwo.bats new file mode 100644 index 000000000..498abada3 --- /dev/null +++ b/e2e/test-06-pvc-rwo.bats @@ -0,0 +1,60 @@ +#!/usr/bin/env bats + +load "lib/utils" +load "lib/detik" +load "lib/k8up" + +# shellcheck disable=SC2034 +DETIK_CLIENT_NAME="kubectl" +# shellcheck disable=SC2034 +DETIK_CLIENT_NAMESPACE="k8up-e2e-subject" +# shellcheck disable=SC2034 +DEBUG_DETIK="true" + +@test "Given two RWO PVCs, When creating a Backup of an app, Then expect Restic repository" { + reset_debug + + expected_content="expected content: $(timestamp)" + expected_filename="expected_filename.txt" + + given_a_running_operator + given_a_clean_ns + given_s3_storage + given_a_rwo_pvc_subject_in_worker_node "${expected_filename}-worker" "${expected_content}-worker" + given_a_rwo_pvc_subject_in_controlplane_node "${expected_filename}-controlplane" "${expected_content}-controlplane" + + kubectl apply -f definitions/secrets + yq e '.spec.podSecurityContext.runAsUser='$(id -u)'' definitions/backup/backup.yaml | kubectl apply -f - + + try "at most 10 times every 5s to get backup named 'k8up-backup' and verify that '.status.started' is 'true'" + + verify_object_value_by_label job 'k8up.io/owned-by=backup_k8up-backup' '.status.active' 1 true + verify_job_pod_values 'k8up.io/owned-by=backup_k8up-backup' .spec.nodeName k8up-v1.24.4-control-plane k8up-v1.24.4-worker + + wait_until backup/k8up-backup completed + + run restic snapshots + + echo "---BEGIN restic snapshots output---" + echo "${output}" + echo "---END---" + + echo -n "Number of Snapshots >= 1? " + jq -e 'length >= 1' <<< "${output}" # Ensure that there was actually a backup created + + run restic dump latest --path /data/pvc-rwo-subject-pvc-worker "/data/pvc-rwo-subject-pvc-worker/${expected_filename}-worker" + + echo "---BEGIN actual ${expected_filename}-worker---" + echo "${output}" + echo "---END---" + + [ "${output}" = "${expected_content}-worker" ] + + run restic dump latest --path /data/pvc-rwo-subject-pvc-controlplane "/data/pvc-rwo-subject-pvc-controlplane/${expected_filename}-controlplane" + + echo "---BEGIN actual ${expected_filename}-controlplane---" + echo "${output}" + echo "---END---" + + [ "${output}" = "${expected_content}-controlplane" ] +} diff --git a/go.mod b/go.mod index 0838dbf3f..67f1bf731 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/stretchr/testify v1.8.1 github.com/urfave/cli/v2 v2.23.7 go.uber.org/zap v1.24.0 + golang.org/x/net v0.0.0-20220822230855-b0a4917ee28c k8s.io/api v0.25.3 k8s.io/apimachinery v0.25.3 k8s.io/client-go v0.25.3 @@ -76,7 +77,6 @@ require ( go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8 // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect - golang.org/x/net v0.0.0-20220822230855-b0a4917ee28c // indirect golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2 // indirect golang.org/x/sys v0.0.0-20220818161305-2296e01440c6 // indirect golang.org/x/term v0.0.0-20220722155259-a9ba230a4035 // indirect diff --git a/operator/backupcontroller/backup_utils.go b/operator/backupcontroller/backup_utils.go index 3ed16bc86..27a087797 100644 --- a/operator/backupcontroller/backup_utils.go +++ b/operator/backupcontroller/backup_utils.go @@ -30,9 +30,9 @@ func (b *BackupExecutor) newVolumeMounts(claims []corev1.Volume) []corev1.Volume return mounts } -func containsAccessMode(s []corev1.PersistentVolumeAccessMode, e string) bool { +func containsAccessMode(s []corev1.PersistentVolumeAccessMode, e corev1.PersistentVolumeAccessMode) bool { for _, a := range s { - if string(a) == e { + if a == e { return true } } diff --git a/operator/backupcontroller/controller.go b/operator/backupcontroller/controller.go index d9cf7c1f0..a192aa0e7 100644 --- a/operator/backupcontroller/controller.go +++ b/operator/backupcontroller/controller.go @@ -2,14 +2,15 @@ package backupcontroller import ( "context" + "fmt" "time" k8upv1 "github.com/k8up-io/k8up/v2/api/v1" "github.com/k8up-io/k8up/v2/operator/cfg" "github.com/k8up-io/k8up/v2/operator/job" "github.com/k8up-io/k8up/v2/operator/locker" + batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/types" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -38,17 +39,13 @@ func (r *BackupReconciler) Provision(ctx context.Context, obj *k8upv1.Backup) (r config := job.NewConfig(r.Kube, obj, repository) executor := NewBackupExecutor(config) - jobKey := types.NamespacedName{ - Namespace: obj.GetNamespace(), - Name: executor.jobName(), - } - if err := job.ReconcileJobStatus(ctx, jobKey, r.Kube, obj); err != nil { + if err := r.ReconcileJobStatus(ctx, obj); err != nil { return controllerruntime.Result{}, err } if obj.Status.HasStarted() { log.V(1).Info("backup just started, waiting") - return controllerruntime.Result{RequeueAfter: 30 * time.Second}, nil + return controllerruntime.Result{RequeueAfter: 5 * time.Second}, nil } if obj.Status.HasFinished() || isPrebackupFailed(obj) { cleanupCond := meta.FindStatusCondition(obj.Status.Conditions, k8upv1.ConditionScrubbed.String()) @@ -73,6 +70,55 @@ func (r *BackupReconciler) Provision(ctx context.Context, obj *k8upv1.Backup) (r return controllerruntime.Result{RequeueAfter: time.Second * 30}, err } +// ReconcileJobStatus implements a custom job reconciliation since there can be multiple jobs per Backup (this is different +// from the implementation in the job package). +func (r *BackupReconciler) ReconcileJobStatus(ctx context.Context, obj *k8upv1.Backup) error { + log := controllerruntime.LoggerFrom(ctx) + ownedBy := obj.GetType().String() + "_" + obj.GetName() + log.V(1).Info("reconciling jobs", "owned-by", ownedBy) + + jobList := batchv1.JobList{} + if err := r.Kube.List(ctx, &jobList, client.MatchingLabels{k8upv1.LabelK8upOwnedBy: ownedBy}, client.InNamespace(obj.Namespace)); err != nil { + return fmt.Errorf("list jobs: %w", err) + } + + numJobs := len(jobList.Items) + if numJobs == 0 { + return nil + } + + numSucceeded, numFailed, numStarted := 0, 0, 0 + for _, item := range jobList.Items { + conditions := item.Status.Conditions + if job.HasSucceeded(conditions) { + numSucceeded += 1 + } + if job.HasFailed(conditions) { + numFailed += 1 + } + if job.HasStarted(conditions) { + numStarted += 1 + } + } + + objStatus := obj.Status + message := fmt.Sprintf("%q has %d succeeded, %d failed, and %d started jobs", ownedBy, numSucceeded, numFailed, numStarted) + if numJobs == numSucceeded { + job.SetSucceeded(ctx, ownedBy, obj.Namespace, obj.GetType(), &objStatus, message) + } else if numFailed > 0 { + job.SetFailed(ctx, ownedBy, obj.Namespace, obj.GetType(), &objStatus, message) + } else if numStarted > 0 { + objStatus.SetStarted(message) + } + obj.SetStatus(objStatus) + + log.V(1).Info("updating status") + if err := r.Kube.Status().Update(ctx, obj); err != nil { + return fmt.Errorf("backup status update failed: %w", err) + } + return nil +} + func (r *BackupReconciler) Deprovision(_ context.Context, _ *k8upv1.Backup) (controllerruntime.Result, error) { return controllerruntime.Result{}, nil } diff --git a/operator/backupcontroller/controller_integration_test.go b/operator/backupcontroller/controller_integration_test.go index b06215439..b03439537 100644 --- a/operator/backupcontroller/controller_integration_test.go +++ b/operator/backupcontroller/controller_integration_test.go @@ -4,6 +4,7 @@ package backupcontroller import ( "context" + "strings" "testing" "time" @@ -15,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" ) type BackupTestSuite struct { @@ -40,7 +42,12 @@ func (ts *BackupTestSuite) BeforeTest(_, _ string) { } func (ts *BackupTestSuite) Test_GivenBackup_ExpectBackupJob() { - ts.EnsureResources(ts.BackupResource) + pvc := ts.newPvc("test-pvc", corev1.ReadWriteMany) + ts.EnsureResources(ts.BackupResource, pvc) + + pvc.Status.Phase = corev1.ClaimBound + ts.UpdateStatus(pvc) + result := ts.whenReconciling(ts.BackupResource) ts.Assert().GreaterOrEqual(result.RequeueAfter, 30*time.Second) @@ -49,7 +56,12 @@ func (ts *BackupTestSuite) Test_GivenBackup_ExpectBackupJob() { func (ts *BackupTestSuite) Test_GivenBackup_AndJob_KeepBackupProgressing() { backupJob := ts.newJob(ts.BackupResource) - ts.EnsureResources(ts.BackupResource, backupJob) + pvc := ts.newPvc("test-pvc", corev1.ReadWriteMany) + ts.EnsureResources(ts.BackupResource, backupJob, pvc) + + pvc.Status.Phase = corev1.ClaimBound + ts.UpdateStatus(pvc) + ts.BackupResource.Status.Started = true backupJob.Status.Active = 1 ts.UpdateStatus(ts.BackupResource, backupJob) @@ -66,7 +78,12 @@ func (ts *BackupTestSuite) Test_GivenBackup_AndJob_KeepBackupProgressing() { func (ts *BackupTestSuite) Test_GivenBackup_AndCompletedJob_ThenCompleteBackup() { backupJob := ts.newJob(ts.BackupResource) - ts.EnsureResources(ts.BackupResource, backupJob) + pvc := ts.newPvc("test-pvc", corev1.ReadWriteMany) + ts.EnsureResources(ts.BackupResource, backupJob, pvc) + + pvc.Status.Phase = corev1.ClaimBound + ts.UpdateStatus(pvc) + ts.BackupResource.Status.Started = true backupJob.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}} ts.UpdateStatus(ts.BackupResource, backupJob) @@ -83,7 +100,13 @@ func (ts *BackupTestSuite) Test_GivenBackup_AndCompletedJob_ThenCompleteBackup() func (ts *BackupTestSuite) Test_GivenBackup_AndFailedJob_ThenCompleteBackup() { backupJob := ts.newJob(ts.BackupResource) - ts.EnsureResources(ts.BackupResource, backupJob) + + pvc := ts.newPvc("test-pvc", corev1.ReadWriteMany) + ts.EnsureResources(ts.BackupResource, backupJob, pvc) + + pvc.Status.Phase = corev1.ClaimBound + ts.UpdateStatus(pvc) + ts.BackupResource.Status.Started = true backupJob.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobFailed, Status: corev1.ConditionTrue}} ts.UpdateStatus(ts.BackupResource, backupJob) @@ -99,7 +122,12 @@ func (ts *BackupTestSuite) Test_GivenBackup_AndFailedJob_ThenCompleteBackup() { func (ts *BackupTestSuite) Test_GivenBackupWithSecurityContext_ExpectBackupJobWithSecurityContext() { ts.BackupResource = ts.newBackupWithSecurityContext() - ts.EnsureResources(ts.BackupResource) + pvc := ts.newPvc("test-pvc", corev1.ReadWriteMany) + ts.EnsureResources(ts.BackupResource, pvc) + + pvc.Status.Phase = corev1.ClaimBound + ts.UpdateStatus(pvc) + result := ts.whenReconciling(ts.BackupResource) ts.Require().GreaterOrEqual(result.RequeueAfter, 30*time.Second) @@ -214,15 +242,173 @@ func (ts *BackupTestSuite) Test_GivenFailedBackup_WhenReconciling_ThenIgnore() { func (ts *BackupTestSuite) Test_GivenBackupWithTags_WhenCreatingBackupjob_ThenHaveTagArguments() { ts.BackupResource = ts.newBackupWithTags() - ts.EnsureResources(ts.BackupResource) + pvc := ts.newPvc("test-pvc", corev1.ReadWriteMany) + ts.EnsureResources(ts.BackupResource, pvc) + + pvc.Status.Phase = corev1.ClaimBound + ts.UpdateStatus(pvc) + ts.whenReconciling(ts.BackupResource) backupJob := ts.expectABackupJob() ts.assertJobHasTagArguments(backupJob) } +func (ts *BackupTestSuite) Test_GivenBackupAndMountedRWOPVCOnOneNode_ExpectBackupOnOneNode() { + pvc1 := ts.newPvc("test-pvc1", corev1.ReadWriteOnce) + pvc2 := ts.newPvc("test-pvc2", corev1.ReadWriteOnce) + nodeName := "worker" + tolerations := make([]corev1.Toleration, 0) + volumePvc1 := corev1.Volume{ + Name: "test-pvc1", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + } + pod1 := ts.newPod("test-pod1", nodeName, tolerations, []corev1.Volume{volumePvc1}) + volumePvc2 := corev1.Volume{ + Name: "test-pvc2", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc2.Name, + }, + }, + } + pod2 := ts.newPod("test-pod2", nodeName, tolerations, []corev1.Volume{volumePvc2}) + ts.EnsureResources(ts.BackupResource, pvc1, pvc2, pod1, pod2) + + pvc1.Status.Phase = corev1.ClaimBound + pvc2.Status.Phase = corev1.ClaimBound + ts.UpdateStatus(pvc1, pvc2) + + result := ts.whenReconciling(ts.BackupResource) + ts.Assert().GreaterOrEqual(result.RequeueAfter, 30*time.Second) + + job := ts.expectABackupJob() + ts.assertJobSpecs(job, nodeName, []corev1.Volume{volumePvc1, volumePvc2}, tolerations, []string{pod1.Name, pod2.Name}) +} + +func (ts *BackupTestSuite) Test_GivenBackupAndMountedRWOPVCOnTwoNodes_ExpectBackupOnTwoNodes() { + pvc1 := ts.newPvc("test-pvc1", corev1.ReadWriteOnce) + pvc2 := ts.newPvc("test-pvc2", corev1.ReadWriteOnce) + nodeNamePod1 := "worker" + nodeNamePod2 := "control-plane" + tolerationsPod1 := make([]corev1.Toleration, 0) + tolerationsPod2 := []corev1.Toleration{ + { + Key: "node-role.kubernetes.io/control-plane", + Effect: corev1.TaintEffectNoSchedule, + }, + { + Key: "node-role.kubernetes.io/master", + Effect: corev1.TaintEffectNoSchedule, + }, + } + volumePvc1 := corev1.Volume{ + Name: "test-pvc1", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + } + pod1 := ts.newPod("test-pod1", nodeNamePod1, tolerationsPod1, []corev1.Volume{volumePvc1}) + volumePvc2 := corev1.Volume{ + Name: "test-pvc2", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc2.Name, + }, + }, + } + pod2 := ts.newPod("test-pod2", nodeNamePod2, tolerationsPod2, []corev1.Volume{volumePvc2}) + ts.EnsureResources(ts.BackupResource, pvc1, pvc2, pod1, pod2) + + pvc1.Status.Phase = corev1.ClaimBound + pvc2.Status.Phase = corev1.ClaimBound + ts.UpdateStatus(pvc1, pvc2) + + result := ts.whenReconciling(ts.BackupResource) + ts.Assert().GreaterOrEqual(result.RequeueAfter, 30*time.Second) + + jobs := new(batchv1.JobList) + err := ts.Client.List(ts.Ctx, jobs, client.InNamespace(ts.NS)) + ts.Require().NoError(err) + ts.Assert().Len(jobs.Items, 2) + + job1 := jobs.Items[0] + job2 := jobs.Items[1] + ts.assertJobSpecs(&job1, nodeNamePod1, []corev1.Volume{volumePvc1}, tolerationsPod1, []string{pod1.Name}) + ts.assertJobSpecs(&job2, nodeNamePod2, []corev1.Volume{volumePvc2}, tolerationsPod2, []string{pod2.Name}) +} + +func (ts *BackupTestSuite) Test_GivenBackupAndUnmountedRWOPVCOnTwoNodes_ExpectBackupOnTwoNodes() { + pvc1 := ts.newPvc("test-pvc1", corev1.ReadWriteOnce) + pvc2 := ts.newPvc("test-pvc2", corev1.ReadWriteOnce) + nodeNamePv1 := "worker" + nodeNamePv2 := "control-plane" + pv1 := ts.newPv(pvc1.Spec.VolumeName, nodeNamePv1, corev1.ReadWriteOnce) + pv2 := ts.newPv(pvc2.Spec.VolumeName, nodeNamePv2, corev1.ReadWriteOnce) + + ts.EnsureResources(ts.BackupResource, pv1, pv2, pvc1, pvc2) + + pv1.Status.Phase = corev1.VolumeBound + pv2.Status.Phase = corev1.VolumeBound + pvc1.Status.Phase = corev1.ClaimBound + pvc2.Status.Phase = corev1.ClaimBound + ts.UpdateStatus(pv1, pv2, pvc1, pvc2) + + volumePvc1 := corev1.Volume{ + Name: "test-pvc1", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + } + volumePvc2 := corev1.Volume{ + Name: "test-pvc2", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc2.Name, + }, + }, + } + + result := ts.whenReconciling(ts.BackupResource) + ts.Assert().GreaterOrEqual(result.RequeueAfter, 30*time.Second) + + jobs := new(batchv1.JobList) + err := ts.Client.List(ts.Ctx, jobs, client.InNamespace(ts.NS)) + ts.Require().NoError(err) + ts.Assert().Len(jobs.Items, 2) + + job1 := jobs.Items[0] + job2 := jobs.Items[1] + ts.assertJobSpecs(&job1, nodeNamePv1, []corev1.Volume{volumePvc1}, nil, []string{}) + ts.assertJobSpecs(&job2, nodeNamePv2, []corev1.Volume{volumePvc2}, nil, []string{}) +} + func (ts *BackupTestSuite) assertCondition(conditions []metav1.Condition, condType k8upv1.ConditionType, reason k8upv1.ConditionReason, status metav1.ConditionStatus) { cond := meta.FindStatusCondition(conditions, condType.String()) ts.Require().NotNil(cond, "condition of type %s missing", condType) ts.Assert().Equal(reason.String(), cond.Reason, "condition %s doesn't contain reason %s", condType, reason) ts.Assert().Equal(status, cond.Status, "condition %s isn't %s", condType, status) } + +func (ts *BackupTestSuite) assertJobSpecs(job *batchv1.Job, nodeName string, volumes []corev1.Volume, tolerations []corev1.Toleration, targetPods []string) { + ts.Assert().Equal(nodeName, job.Spec.Template.Spec.NodeSelector[corev1.LabelHostname]) + for i, volume := range volumes { + ts.Assert().Equal(volume.Name, job.Spec.Template.Spec.Volumes[i].Name) + ts.Assert().Equal(volume.VolumeSource.PersistentVolumeClaim.ClaimName, job.Spec.Template.Spec.Volumes[i].VolumeSource.PersistentVolumeClaim.ClaimName) + } + + if len(targetPods) > 0 { + ts.Assert().Contains(job.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: "TARGET_PODS", Value: strings.Join(targetPods, ",")}) + } + + for _, toleration := range tolerations { + ts.Assert().Contains(job.Spec.Template.Spec.Tolerations, toleration) + } +} diff --git a/operator/backupcontroller/controller_utils_integration_test.go b/operator/backupcontroller/controller_utils_integration_test.go index 581905a72..1b08df247 100644 --- a/operator/backupcontroller/controller_utils_integration_test.go +++ b/operator/backupcontroller/controller_utils_integration_test.go @@ -9,6 +9,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -24,6 +25,77 @@ const ( backupTag = "integrationTag" ) +func (ts *BackupTestSuite) newPvc(name string, accessMode corev1.PersistentVolumeAccessMode) *corev1.PersistentVolumeClaim { + return &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ts.NS, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{accessMode}, + Resources: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceStorage: resource.MustParse("1Mi"), + }, + }, + VolumeName: name, + }, + } +} + +func (ts *BackupTestSuite) newPv(name string, nodeName string, accessMode corev1.PersistentVolumeAccessMode) *corev1.PersistentVolume { + return &corev1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: corev1.PersistentVolumeSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{accessMode}, + Capacity: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceStorage: resource.MustParse("1Mi"), + }, + PersistentVolumeSource: corev1.PersistentVolumeSource{ + HostPath: &corev1.HostPathVolumeSource{Path: "/tmp/integration-tests"}, + }, + NodeAffinity: &corev1.VolumeNodeAffinity{ + Required: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: corev1.LabelHostname, + Operator: corev1.NodeSelectorOpIn, + Values: []string{nodeName}, + }, + }, + }, + }, + }, + }, + }, + } +} + +func (ts *BackupTestSuite) newPod(name, nodeName string, tolerations []corev1.Toleration, volumes []corev1.Volume) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ts.NS, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + Tolerations: tolerations, + Volumes: volumes, + Containers: []corev1.Container{ + { + Name: "main", + Command: []string{"/bin/sh"}, + Image: "dummy", + }, + }, + }, + } +} + func (ts *BackupTestSuite) newPreBackupPod() *k8upv1.PreBackupPod { return &k8upv1.PreBackupPod{ Spec: k8upv1.PreBackupPodSpec{ @@ -243,7 +315,10 @@ func (ts *BackupTestSuite) newJob(owner client.Object) *batchv1.Job { jb := &batchv1.Job{} jb.Name = k8upv1.BackupType.String() + "-" + ts.BackupResource.Name jb.Namespace = ts.NS - jb.Labels = labels.Set{k8upv1.LabelK8upType: k8upv1.BackupType.String()} + jb.Labels = labels.Set{ + k8upv1.LabelK8upType: k8upv1.BackupType.String(), + k8upv1.LabelK8upOwnedBy: k8upv1.BackupType.String() + "_" + ts.BackupResource.Name, + } jb.Spec.Template.Spec.Containers = []corev1.Container{{Name: "container", Image: "image"}} jb.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure ts.Assert().NoError(controllerruntime.SetControllerReference(owner, jb, ts.Scheme), "set controller ref") diff --git a/operator/backupcontroller/executor.go b/operator/backupcontroller/executor.go index 1658e40f2..4c4ee1a12 100644 --- a/operator/backupcontroller/executor.go +++ b/operator/backupcontroller/executor.go @@ -2,7 +2,10 @@ package backupcontroller import ( "context" + "fmt" "strconv" + "strings" + "time" k8upv1 "github.com/k8up-io/k8up/v2/api/v1" "github.com/k8up-io/k8up/v2/operator/cfg" @@ -10,8 +13,9 @@ import ( "github.com/k8up-io/k8up/v2/operator/job" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - + "k8s.io/apimachinery/pkg/types" controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" ) // BackupExecutor creates a batch.job object on the cluster. It merges all the @@ -42,54 +46,122 @@ func (b *BackupExecutor) Execute(ctx context.Context) error { return b.startBackup(ctx) } +type backupItem struct { + volume corev1.Volume + node string + tolerations []corev1.Toleration + targetPod string +} + // listAndFilterPVCs lists all PVCs in the given namespace and filters them for K8up specific usage. // Specifically, non-RWX PVCs will be skipped, as well PVCs that have the given annotation. -func (b *BackupExecutor) listAndFilterPVCs(ctx context.Context, annotation string) ([]corev1.Volume, error) { +func (b *BackupExecutor) listAndFilterPVCs(ctx context.Context, annotation string) ([]backupItem, error) { log := controllerruntime.LoggerFrom(ctx) - volumes := make([]corev1.Volume, 0) + + pods := &corev1.PodList{} + pvcPodMap := make(map[string]corev1.Pod) + if err := b.Config.Client.List(ctx, pods, client.InNamespace(b.backup.Namespace)); err != nil { + return nil, fmt.Errorf("list pods: %w", err) + } + for _, pod := range pods.Items { + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim != nil { + pvcPodMap[volume.PersistentVolumeClaim.ClaimName] = pod + log.V(1).Info("pvc pod map", "claimName", volume.PersistentVolumeClaim.ClaimName, "pod", pod.GetName()) + } + } + } + + backupItems := make([]backupItem, 0) claimlist := &corev1.PersistentVolumeClaimList{} log.Info("Listing all PVCs", "annotation", annotation) if err := b.fetchPVCs(ctx, claimlist); err != nil { - return volumes, err + return backupItems, err } - for _, item := range claimlist.Items { - annotations := item.GetAnnotations() + for _, pvc := range claimlist.Items { + if pvc.Status.Phase != corev1.ClaimBound { + log.Info("PVC is not bound", "pvc", pvc.GetName()) + continue + } - tmpAnnotation, ok := annotations[annotation] + backupAnnotation, hasBackupAnnotation := pvc.GetAnnotations()[annotation] - if !containsAccessMode(item.Spec.AccessModes, "ReadWriteMany") && !ok { - log.Info("PVC isn't RWX", "pvc", item.GetName()) + isRWO := containsAccessMode(pvc.Spec.AccessModes, corev1.ReadWriteOnce) + if !containsAccessMode(pvc.Spec.AccessModes, corev1.ReadWriteMany) && !isRWO && !hasBackupAnnotation { + log.Info("PVC is neither RWX nor RWO and has no backup annotation", "pvc", pvc.GetName()) continue } - if !ok { - log.Info("PVC doesn't have annotation, adding to list", "pvc", item.GetName()) - } else if anno, _ := strconv.ParseBool(tmpAnnotation); !anno { - log.Info("PVC skipped due to annotation", "pvc", item.GetName(), "annotation", tmpAnnotation) + if !hasBackupAnnotation { + log.Info("PVC doesn't have annotation, adding to list", "pvc", pvc.GetName()) + } else if shouldBackup, _ := strconv.ParseBool(backupAnnotation); !shouldBackup { + log.Info("PVC skipped due to annotation", "pvc", pvc.GetName(), "annotation", backupAnnotation) continue } else { - log.Info("Adding to list", "pvc", item.Name) + log.Info("Adding to list", "pvc", pvc.Name) } - tmpVol := corev1.Volume{ - Name: item.Name, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: item.Name, + bi := backupItem{ + volume: corev1.Volume{ + Name: pvc.Name, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, }, }, } - volumes = append(volumes, tmpVol) + if pod, ok := pvcPodMap[pvc.GetName()]; ok { + bi.node = pod.Spec.NodeName + bi.tolerations = pod.Spec.Tolerations + bi.targetPod = pod.GetName() + + log.V(1).Info("PVC mounted at pod", "pvc", pvc.GetName(), "targetPod", bi.targetPod, "node", bi.node, "tolerations", bi.tolerations) + } else if isRWO { + pv := &corev1.PersistentVolume{} + if err := b.Config.Client.Get(ctx, types.NamespacedName{Name: pvc.Spec.VolumeName}, pv); err != nil { + log.Error(err, "unable to get PV, skipping pvc", "pvc", pvc.GetName(), "pv", pvc.Spec.VolumeName) + continue + } + + bi.node = findNode(pv, pvc) + if bi.node == "" { + log.Info("RWO PVC not bound and no PV node affinity set, skipping", "pvc", pvc.GetName(), "affinity", pv.Spec.NodeAffinity) + continue + } + log.V(1).Info("node found in PV or PVC", "pvc", pvc.GetName(), "node", bi.node) + } else { + log.Info("RWX PVC with no specific node", "pvc", pvc.GetName()) + } + + backupItems = append(backupItems, bi) } - return volumes, nil + return backupItems, nil } -func (b *BackupExecutor) startBackup(ctx context.Context) error { +// findNode tries to find a PVs NodeAffinity for a specific hostname. If found will return that. +// If not it will try to return the value of the k8up.io/hostname annotation on the PVC. If this is not set, will return +// empty string. +func findNode(pv *corev1.PersistentVolume, pvc corev1.PersistentVolumeClaim) string { + hostnameAnnotation := pvc.Annotations[k8upv1.AnnotationK8upHostname] + if pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil { + return hostnameAnnotation + } + for _, term := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms { + for _, matchExpr := range term.MatchExpressions { + if matchExpr.Key == corev1.LabelHostname && matchExpr.Operator == corev1.NodeSelectorOpIn { + return matchExpr.Values[0] + } + } + } + return hostnameAnnotation +} +func (b *BackupExecutor) startBackup(ctx context.Context) error { ready, err := b.StartPreBackup(ctx) if err != nil { return err @@ -98,42 +170,97 @@ func (b *BackupExecutor) startBackup(ctx context.Context) error { return nil } - volumes, err := b.listAndFilterPVCs(ctx, cfg.Config.BackupAnnotation) + backupItems, err := b.listAndFilterPVCs(ctx, cfg.Config.BackupAnnotation) if err != nil { b.Generic.SetConditionFalseWithMessage(ctx, k8upv1.ConditionReady, k8upv1.ReasonRetrievalFailed, err.Error()) return err } - batchJob := &batchv1.Job{} - batchJob.Name = b.jobName() - batchJob.Namespace = b.backup.Namespace + type jobItem struct { + job *batchv1.Job + targetPods []string + volumes []corev1.Volume + } + backupJobs := map[string]jobItem{} + for index, item := range backupItems { + if _, ok := backupJobs[item.node]; !ok { + backupJobs[item.node] = jobItem{ + job: b.createJob(strconv.Itoa(index), item.node, item.tolerations), + targetPods: make([]string, 0), + volumes: make([]corev1.Volume, 0), + } + } - _, err = controllerruntime.CreateOrUpdate(ctx, b.Generic.Config.Client, batchJob, func() error { - mutateErr := job.MutateBatchJob(batchJob, b.backup, b.Generic.Config) - if mutateErr != nil { - return mutateErr + j := backupJobs[item.node] + if item.targetPod != "" { + j.targetPods = append(j.targetPods, item.targetPod) } + j.volumes = append(j.volumes, item.volume) + backupJobs[item.node] = j + } + + index := 0 + for _, batchJob := range backupJobs { + _, err = controllerruntime.CreateOrUpdate(ctx, b.Generic.Config.Client, batchJob.job, func() error { + mutateErr := job.MutateBatchJob(batchJob.job, b.backup, b.Generic.Config) + if mutateErr != nil { + return mutateErr + } - vars, setupErr := b.setupEnvVars() - if setupErr != nil { - return setupErr + vars, setupErr := b.setupEnvVars() + if setupErr != nil { + return setupErr + } + batchJob.job.Spec.Template.Spec.Containers[0].Env = vars + if len(batchJob.targetPods) > 0 { + batchJob.job.Spec.Template.Spec.Containers[0].Env = append(batchJob.job.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{ + Name: "TARGET_PODS", + Value: strings.Join(batchJob.targetPods, ","), + }) + } + // each job sleeps for index seconds to avoid concurrent restic repository creation. Not the prettiest way but it works and a repository + // is created only once usually. + if index > 0 { + batchJob.job.Spec.Template.Spec.Containers[0].Env = append(batchJob.job.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{ + Name: "SLEEP_DURATION", + Value: (5 * time.Second).String(), + }) + } + b.backup.Spec.AppendEnvFromToContainer(&batchJob.job.Spec.Template.Spec.Containers[0]) + batchJob.job.Spec.Template.Spec.ServiceAccountName = cfg.Config.ServiceAccount + batchJob.job.Spec.Template.Spec.Containers[0].Args = executor.BuildTagArgs(b.backup.Spec.Tags) + batchJob.job.Spec.Template.Spec.Volumes = batchJob.volumes + batchJob.job.Spec.Template.Spec.Containers[0].VolumeMounts = b.newVolumeMounts(batchJob.job.Spec.Template.Spec.Volumes) + + index++ + return nil + }) + if err != nil { + return fmt.Errorf("unable to createOrUpdate(%q): %w", batchJob.job.Name, err) } - batchJob.Spec.Template.Spec.Containers[0].Env = vars - b.backup.Spec.AppendEnvFromToContainer(&batchJob.Spec.Template.Spec.Containers[0]) - batchJob.Spec.Template.Spec.Volumes = volumes - batchJob.Spec.Template.Spec.ServiceAccountName = cfg.Config.ServiceAccount - batchJob.Spec.Template.Spec.Containers[0].VolumeMounts = b.newVolumeMounts(volumes) - batchJob.Spec.Template.Spec.Containers[0].Args = executor.BuildTagArgs(b.backup.Spec.Tags) - return nil - }) + } + + return nil +} - return err +func (b *BackupExecutor) createJob(name, node string, tolerations []corev1.Toleration) *batchv1.Job { + batchJob := &batchv1.Job{} + batchJob.Name = b.jobName(name) + batchJob.Namespace = b.backup.Namespace + batchJob.Spec.Template.Spec.Volumes = make([]corev1.Volume, 0) + if node != "" { + batchJob.Spec.Template.Spec.NodeSelector = map[string]string{ + corev1.LabelHostname: node, + } + } + batchJob.Spec.Template.Spec.Tolerations = tolerations + return batchJob } func (b *BackupExecutor) cleanupOldBackups(ctx context.Context) { b.Generic.CleanupOldResources(ctx, &k8upv1.BackupList{}, b.backup.Namespace, b.backup) } -func (b *BackupExecutor) jobName() string { - return k8upv1.BackupType.String() + "-" + b.backup.Name +func (b *BackupExecutor) jobName(name string) string { + return k8upv1.BackupType.String() + "-" + b.backup.Name + "-" + name } diff --git a/operator/backupcontroller/executor_test.go b/operator/backupcontroller/executor_test.go new file mode 100644 index 000000000..ff5649d62 --- /dev/null +++ b/operator/backupcontroller/executor_test.go @@ -0,0 +1,138 @@ +package backupcontroller + +import ( + "testing" + + k8upv1 "github.com/k8up-io/k8up/v2/api/v1" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func Test_findNode(t *testing.T) { + type args struct { + pv *corev1.PersistentVolume + pvc corev1.PersistentVolumeClaim + } + tests := []struct { + name string + args args + want string + }{ + { + name: "returns node when PV affinity is set", + args: args{ + pv: &corev1.PersistentVolume{ + Spec: corev1.PersistentVolumeSpec{ + NodeAffinity: &corev1.VolumeNodeAffinity{ + Required: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: corev1.LabelHostname, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"node-a"}, + }, + }, + }, + }, + }, + }, + }, + }, + pvc: corev1.PersistentVolumeClaim{}, + }, + want: "node-a", + }, + { + name: "returns empty string when wrong PV affinity operator is set", + args: args{ + pv: &corev1.PersistentVolume{ + Spec: corev1.PersistentVolumeSpec{ + NodeAffinity: &corev1.VolumeNodeAffinity{ + Required: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: corev1.LabelHostname, + Operator: corev1.NodeSelectorOpExists, + Values: []string{"node-a"}, + }, + }, + }, + }, + }, + }, + }, + }, + pvc: corev1.PersistentVolumeClaim{}, + }, + want: "", + }, + { + name: "returns empty string when wrong PV affinity key is set", + args: args{ + pv: &corev1.PersistentVolume{ + Spec: corev1.PersistentVolumeSpec{ + NodeAffinity: &corev1.VolumeNodeAffinity{ + Required: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "hostname", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"node-a"}, + }, + }, + }, + }, + }, + }, + }, + }, + pvc: corev1.PersistentVolumeClaim{}, + }, + want: "", + }, + { + name: "returns node when no affinity is set but annotation on PVC", + args: args{ + pv: &corev1.PersistentVolume{}, + pvc: corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{k8upv1.AnnotationK8upHostname: "node-a"}, + }, + }, + }, + want: "node-a", + }, + { + name: "returns empty string when no affinity is set and invalid annotation on PVC", + args: args{ + pv: &corev1.PersistentVolume{}, + pvc: corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{"hostname": "node-a"}, + }, + }, + }, + want: "", + }, + { + name: "returns empty string if neither affinity nor annotation is set", + args: args{ + pv: &corev1.PersistentVolume{}, + pvc: corev1.PersistentVolumeClaim{}, + }, + want: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, findNode(tt.args.pv, tt.args.pvc), "findNode(%v, %v)", tt.args.pv, tt.args.pvc) + }) + } +} diff --git a/operator/backupcontroller/setup.go b/operator/backupcontroller/setup.go index 36e09a120..4ea4c523f 100644 --- a/operator/backupcontroller/setup.go +++ b/operator/backupcontroller/setup.go @@ -18,6 +18,7 @@ import ( // +kubebuilder:rbac:groups=core,resources=pods/exec,verbs="*" // +kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;list;watch;create;delete // +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch +// +kubebuilder:rbac:groups=core,resources=persistentvolumes,verbs=get;list;watch // +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=roles;rolebindings,verbs=get;list;watch;create;delete // SetupWithManager configures the reconciler. diff --git a/operator/job/job.go b/operator/job/job.go index f978c7957..9fc151de2 100644 --- a/operator/job/job.go +++ b/operator/job/job.go @@ -49,6 +49,7 @@ func MutateBatchJob(batchJob *batchv1.Job, jobObj k8upv1.JobObject, config Confi batchJob.Labels = labels.Merge(batchJob.Labels, labels.Set{ K8uplabel: "true", k8upv1.LabelK8upType: jobObj.GetType().String(), + k8upv1.LabelK8upOwnedBy: jobObj.GetType().String() + "_" + jobObj.GetName(), k8upv1.LabelRepositoryHash: Sha256Hash(config.Repository), }) @@ -86,9 +87,7 @@ func ReconcileJobStatus(ctx context.Context, key types.NamespacedName, client cl return nil } - if err := UpdateStatus(ctx, batchJob, obj); err != nil { - return fmt.Errorf("unable to update status in object: %w", err) - } + UpdateStatus(ctx, batchJob, obj) log.V(1).Info("updating status") if err := client.Status().Update(ctx, obj); err != nil { @@ -98,40 +97,59 @@ func ReconcileJobStatus(ctx context.Context, key types.NamespacedName, client cl } // UpdateStatus retrieves status of batchJob and sets status of obj accordingly. -func UpdateStatus(ctx context.Context, batchJob *batchv1.Job, obj k8upv1.JobObject) error { - log := controllerruntime.LoggerFrom(ctx) - +func UpdateStatus(ctx context.Context, batchJob *batchv1.Job, obj k8upv1.JobObject) { // update status conditions based on Job status objStatus := obj.GetStatus() message := fmt.Sprintf("job '%s' has %d active, %d succeeded and %d failed pods", batchJob.Name, batchJob.Status.Active, batchJob.Status.Succeeded, batchJob.Status.Failed) - successCond := FindStatusCondition(batchJob.Status.Conditions, batchv1.JobComplete) - if successCond != nil && successCond.Status == corev1.ConditionTrue { - if !objStatus.HasSucceeded() { - // only increase success counter if new condition - monitoring.IncSuccessCounters(batchJob.Namespace, obj.GetType()) - log.Info("Job succeeded") - } - objStatus.SetSucceeded(message) - objStatus.SetFinished(fmt.Sprintf("job '%s' completed successfully", batchJob.Name)) + if HasSucceeded(batchJob.Status.Conditions) { + SetSucceeded(ctx, batchJob.Name, batchJob.Namespace, obj.GetType(), &objStatus, message) } - failedCond := FindStatusCondition(batchJob.Status.Conditions, batchv1.JobFailed) - if failedCond != nil && failedCond.Status == corev1.ConditionTrue { - if !objStatus.HasFailed() { - // only increase fail counter if new condition - monitoring.IncFailureCounters(batchJob.Namespace, obj.GetType()) - log.Info("Job failed") - } - objStatus.SetFailed(message) - objStatus.SetFinished(fmt.Sprintf("job '%s' has failed", batchJob.Name)) + if HasFailed(batchJob.Status.Conditions) { + SetFailed(ctx, batchJob.Name, batchJob.Namespace, obj.GetType(), &objStatus, message) } - if successCond == nil && failedCond == nil { + if HasStarted(batchJob.Status.Conditions) { objStatus.SetStarted(message) } obj.SetStatus(objStatus) +} - return nil +func HasSucceeded(conditions []batchv1.JobCondition) bool { + successCond := FindStatusCondition(conditions, batchv1.JobComplete) + return successCond != nil && successCond.Status == corev1.ConditionTrue +} +func HasFailed(conditions []batchv1.JobCondition) bool { + failedCond := FindStatusCondition(conditions, batchv1.JobFailed) + return failedCond != nil && failedCond.Status == corev1.ConditionTrue +} +func HasStarted(conditions []batchv1.JobCondition) bool { + successCond := FindStatusCondition(conditions, batchv1.JobComplete) + failedCond := FindStatusCondition(conditions, batchv1.JobFailed) + return successCond == nil && failedCond == nil +} + +func SetSucceeded(ctx context.Context, name, ns string, typ k8upv1.JobType, objStatus *k8upv1.Status, message string) { + log := controllerruntime.LoggerFrom(ctx) + + if !objStatus.HasSucceeded() { + // only increase success counter if new condition + monitoring.IncSuccessCounters(ns, typ) + log.Info("Job succeeded") + } + objStatus.SetSucceeded(message) + objStatus.SetFinished(fmt.Sprintf("job '%s' completed successfully", name)) +} +func SetFailed(ctx context.Context, name, ns string, typ k8upv1.JobType, objStatus *k8upv1.Status, message string) { + log := controllerruntime.LoggerFrom(ctx) + + if !objStatus.HasFailed() { + // only increase fail counter if new condition + monitoring.IncFailureCounters(ns, typ) + log.Info("Job failed") + } + objStatus.SetFailed(message) + objStatus.SetFinished(fmt.Sprintf("job '%s' has failed", name)) } // FindStatusCondition finds the condition with the given type in the batchv1.JobCondition slice. diff --git a/restic/cfg/config.go b/restic/cfg/config.go index 72d6ebb72..a0a3361bc 100644 --- a/restic/cfg/config.go +++ b/restic/cfg/config.go @@ -68,6 +68,10 @@ type Configuration struct { PruneKeepWithinYearly string Tags []string + + TargetPods []string + + SleepDuration time.Duration } // Validate ensures a consistent configuration and returns an error should that not be the case diff --git a/restic/kubernetes/pod_list.go b/restic/kubernetes/pod_list.go index 07b585dd3..bc6ff2e96 100644 --- a/restic/kubernetes/pod_list.go +++ b/restic/kubernetes/pod_list.go @@ -18,6 +18,7 @@ type PodLister struct { k8scli *kube.Clientset err error namespace string + targetPods map[string]struct{} log logr.Logger ctx context.Context } @@ -32,23 +33,30 @@ type BackupPod struct { } // NewPodLister returns a PodLister configured to find the defined annotations. -func NewPodLister(ctx context.Context, backupCommandAnnotation, fileExtensionAnnotation, namespace string, log logr.Logger) *PodLister { +func NewPodLister(ctx context.Context, backupCommandAnnotation, fileExtensionAnnotation, namespace string, targetPods []string, log logr.Logger) *PodLister { k8cli, err := newk8sClient() if err != nil { err = fmt.Errorf("can't create podLister: %v", err) } + + tp := make(map[string]struct{}) + for _, name := range targetPods { + tp[name] = struct{}{} + } + return &PodLister{ backupCommandAnnotation: backupCommandAnnotation, fileExtensionAnnotation: fileExtensionAnnotation, k8scli: k8cli, err: err, namespace: namespace, + targetPods: tp, log: log.WithName("k8sClient"), ctx: ctx, } } -// ListPods resturns a list of pods which have backup commands in their annotations. +// ListPods finds a list of pods which have backup commands in their annotations. func (p *PodLister) ListPods() ([]BackupPod, error) { p.log.Info("listing all pods", "annotation", p.backupCommandAnnotation, "namespace", p.namespace) @@ -67,6 +75,14 @@ func (p *PodLister) ListPods() ([]BackupPod, error) { if pod.Status.Phase != corev1.PodRunning { continue } + + // if TARGET_PODS is set, skip pods which should not be targetd. + _, ok := p.targetPods[pod.GetName()] + if len(p.targetPods) > 0 && !ok { + p.log.V(1).Info("pod not in target pod list, skipping", "pod", pod.GetName()) + continue + } + annotations := pod.GetAnnotations() if command, ok := annotations[p.backupCommandAnnotation]; ok {