From 4e7f15d4eec65f69d7ce5e9df8aca929b6f0447c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ege=20G=C3=BCne=C5=9F?= Date: Thu, 21 Mar 2024 17:16:39 +0300 Subject: [PATCH] K8SPSMDB-1051: Bump PBM version to 2.4.0 (#1485) * K8SPSMDB-1051: Bump PBM version to 2.4.0 * fix pitr-physical * improve pitr-physical * Update e2e-tests/pitr-physical/run Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * Update e2e-tests/pitr-physical/run Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * Update e2e-tests/pitr-physical/run Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * Update e2e-tests/functions Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix date command on macos * bump PBM to fix pitr-physical * bump k8s version to 1.26 --------- Co-authored-by: Viacheslav Sarzhan Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- Jenkinsfile | 2 +- e2e-tests/functions | 274 +++++++++--------- e2e-tests/pitr-physical/run | 68 ++++- go.mod | 8 +- go.sum | 27 +- .../v1/perconaservermongodbbackup_types.go | 11 +- pkg/apis/psmdb/v1/psmdb_types.go | 6 +- pkg/controller/perconaservermongodb/backup.go | 32 +- .../perconaservermongodb/version.go | 4 +- .../perconaservermongodb/version_test.go | 4 +- .../perconaservermongodbbackup/backup.go | 29 +- .../perconaservermongodbbackup_controller.go | 39 +-- .../perconaservermongodbrestore/logical.go | 52 ++-- .../perconaservermongodbrestore_controller.go | 6 +- .../perconaservermongodbrestore/physical.go | 20 +- pkg/psmdb/backup/backup.go | 2 +- pkg/psmdb/backup/fake/pbm.go | 40 +-- pkg/psmdb/backup/job.go | 4 +- pkg/psmdb/backup/pbm.go | 196 ++++++++----- 19 files changed, 473 insertions(+), 351 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index c59088ba7e..d8b4d122e8 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -12,7 +12,7 @@ void createCluster(String CLUSTER_SUFFIX) { gcloud auth activate-service-account --key-file $CLIENT_SECRET_FILE gcloud config set project $GCP_PROJECT gcloud container clusters list --filter $CLUSTER_NAME-${CLUSTER_SUFFIX} --zone $region --format='csv[no-heading](name)' | xargs gcloud container clusters delete --zone $region --quiet || true - gcloud container clusters create --zone $region $CLUSTER_NAME-${CLUSTER_SUFFIX} --cluster-version=1.25 --machine-type=n1-standard-4 --preemptible --num-nodes=3 --network=jenkins-vpc --subnetwork=jenkins-${CLUSTER_SUFFIX} --no-enable-autoupgrade --cluster-ipv4-cidr=/21 --labels delete-cluster-after-hours=6 --enable-ip-alias --workload-pool=cloud-dev-112233.svc.id.goog && \ + gcloud container clusters create --zone $region $CLUSTER_NAME-${CLUSTER_SUFFIX} --cluster-version=1.26 --machine-type=n1-standard-4 --preemptible --num-nodes=3 --network=jenkins-vpc --subnetwork=jenkins-${CLUSTER_SUFFIX} --no-enable-autoupgrade --cluster-ipv4-cidr=/21 --labels delete-cluster-after-hours=6 --enable-ip-alias --workload-pool=cloud-dev-112233.svc.id.goog && \ kubectl create clusterrolebinding cluster-admin-binding --clusterrole cluster-admin --user jenkins@"$GCP_PROJECT".iam.gserviceaccount.com || ret_val=\$? if [ \${ret_val} -eq 0 ]; then break; fi ret_num=\$((ret_num + 1)) diff --git a/e2e-tests/functions b/e2e-tests/functions index c33317777e..7d2179c436 100755 --- a/e2e-tests/functions +++ b/e2e-tests/functions @@ -10,7 +10,7 @@ IMAGE_MONGOD_CHAIN=${IMAGE_MONGOD_CHAIN:-$' perconalab/percona-server-mongodb-operator:main-mongod4.4 perconalab/percona-server-mongodb-operator:main-mongod5.0 perconalab/percona-server-mongodb-operator:main-mongod6.0'} -IMAGE_BACKUP=${IMAGE_BACKUP:-"perconalab/percona-server-mongodb-operator:main-backup"} +IMAGE_BACKUP=${IMAGE_BACKUP:-"perconalab/percona-server-mongodb-operator:pbm-1265"} SKIP_BACKUPS_TO_AWS_GCP_AZURE=${SKIP_BACKUPS_TO_AWS_GCP_AZURE:-1} PMM_SERVER_VER=${PMM_SERVER_VER:-"9.9.9"} IMAGE_PMM_CLIENT=${IMAGE_PMM_CLIENT:-"perconalab/pmm-client:dev-latest"} @@ -102,10 +102,10 @@ create_namespace() { if [[ ${CLEAN_NAMESPACE} == 1 ]] && [[ -z ${skip_clean_namespace} ]]; then destroy_chaos_mesh desc 'cleaned up all old namespaces' - kubectl_bin get ns \ - | egrep -v "^kube-|^default|Terminating|psmdb-operator|openshift|gke-mcs|^NAME" \ - | awk '{print$1}' \ - | xargs kubectl delete ns & + kubectl_bin get ns | + egrep -v "^kube-|^default|Terminating|psmdb-operator|openshift|gke-mcs|^NAME" | + awk '{print$1}' | + xargs kubectl delete ns & fi if [ -n "${OPENSHIFT}" ]; then @@ -151,12 +151,12 @@ wait_pod() { if [ $retry -ge 360 ]; then kubectl_bin describe pod/$pod kubectl_bin logs $pod - kubectl_bin logs ${OPERATOR_NS:+-n $OPERATOR_NS} $(get_operator_pod) \ - | grep -v 'level=info' \ - | grep -v 'level=debug' \ - | grep -v 'Getting tasks for pod' \ - | grep -v 'Getting pods from source' \ - | tail -100 + kubectl_bin logs ${OPERATOR_NS:+-n $OPERATOR_NS} $(get_operator_pod) | + grep -v 'level=info' | + grep -v 'level=debug' | + grep -v 'Getting tasks for pod' | + grep -v 'Getting pods from source' | + tail -100 echo max retry count $retry reached. something went wrong with operator or kubernetes cluster exit 1 fi @@ -176,12 +176,12 @@ wait_cron() { echo -n . let retry+=1 if [ $retry -ge 360 ]; then - kubectl_bin logs ${OPERATOR_NS:+-n $OPERATOR_NS} $(get_operator_pod) \ - | grep -v 'level=info' \ - | grep -v 'level=debug' \ - | grep -v 'Getting tasks for pod' \ - | grep -v 'Getting pods from source' \ - | tail -100 + kubectl_bin logs ${OPERATOR_NS:+-n $OPERATOR_NS} $(get_operator_pod) | + grep -v 'level=info' | + grep -v 'level=debug' | + grep -v 'Getting tasks for pod' | + grep -v 'Getting pods from source' | + tail -100 echo max retry count $retry reached. something went wrong with operator or kubernetes cluster exit 1 fi @@ -200,8 +200,8 @@ wait_backup_agent() { echo -n . let retry+=1 if [ $retry -ge 360 ]; then - kubectl_bin logs $agent_pod -c backup-agent \ - | tail -100 + kubectl_bin logs $agent_pod -c backup-agent | + tail -100 echo max retry count $retry reached. something went wrong with operator or kubernetes cluster exit 1 fi @@ -223,12 +223,12 @@ wait_backup() { let retry+=1 current_status=$(kubectl_bin get psmdb-backup $backup_name -o jsonpath='{.status.state}') if [[ $retry -ge 360 || ${current_status} == 'error' ]]; then - kubectl_bin logs ${OPERATOR_NS:+-n $OPERATOR_NS} $(get_operator_pod) \ - | grep -v 'level=info' \ - | grep -v 'level=debug' \ - | grep -v 'Getting tasks for pod' \ - | grep -v 'Getting pods from source' \ - | tail -100 + kubectl_bin logs ${OPERATOR_NS:+-n $OPERATOR_NS} $(get_operator_pod) | + grep -v 'level=info' | + grep -v 'level=debug' | + grep -v 'Getting tasks for pod' | + grep -v 'Getting pods from source' | + tail -100 echo "Backup object psmdb-backup/${backup_name} is in ${current_state} state." echo something went wrong with operator or kubernetes cluster exit 1 @@ -241,10 +241,10 @@ wait_backup() { run_restore() { local backup_name=$1 - cat $test_dir/conf/restore.yml \ - | $sed -e "s/name:/name: restore-$backup_name/" \ - | $sed -e "s/backupName:/backupName: $backup_name/" \ - | kubectl_bin apply -f - + cat $test_dir/conf/restore.yml | + $sed -e "s/name:/name: restore-$backup_name/" | + $sed -e "s/backupName:/backupName: $backup_name/" | + kubectl_bin apply -f - } run_restore_backupsource() { @@ -254,20 +254,20 @@ run_restore_backupsource() { desc "run restore restore-$backupName from backup $backupName destination is $backupDest" if [ -z "$storageName" ]; then - cat $test_dir/conf/restore-backupsource.yml \ - | $sed -e "s/name:/name: restore-$backupName/" \ - | $sed -e "s|BACKUP-NAME|$backupDest|" \ - | $sed -e "/storageName/d" \ - | kubectl_bin apply -f - + cat $test_dir/conf/restore-backupsource.yml | + $sed -e "s/name:/name: restore-$backupName/" | + $sed -e "s|BACKUP-NAME|$backupDest|" | + $sed -e "/storageName/d" | + kubectl_bin apply -f - return fi - cat $test_dir/conf/restore-backupsource.yml \ - | $sed -e "s/name:/name: restore-$backupName/" \ - | $sed -e "s|BACKUP-NAME|$backupDest|" \ - | $sed -e "s/storageName:/storageName: $storageName/" \ - | kubectl_bin apply -f - + cat $test_dir/conf/restore-backupsource.yml | + $sed -e "s/name:/name: restore-$backupName/" | + $sed -e "s|BACKUP-NAME|$backupDest|" | + $sed -e "s/storageName:/storageName: $storageName/" | + kubectl_bin apply -f - } wait_deployment() { @@ -276,18 +276,18 @@ wait_deployment() { sleep 10 retry=0 echo -n $name - until kubectl_bin get deployment $name ${OPERATOR_NS:+-n $OPERATOR_NS} >/dev/null \ - && [ "$(kubectl_bin get deployment $name -o jsonpath='{.status.replicas}' ${OPERATOR_NS:+-n $OPERATOR_NS})" == "$(kubectl_bin get deployment $name -o jsonpath='{.status.readyReplicas}' ${OPERATOR_NS:+-n $OPERATOR_NS})" ]; do + until kubectl_bin get deployment $name ${OPERATOR_NS:+-n $OPERATOR_NS} >/dev/null && + [ "$(kubectl_bin get deployment $name -o jsonpath='{.status.replicas}' ${OPERATOR_NS:+-n $OPERATOR_NS})" == "$(kubectl_bin get deployment $name -o jsonpath='{.status.readyReplicas}' ${OPERATOR_NS:+-n $OPERATOR_NS})" ]; do sleep 1 echo -n . let retry+=1 if [ $retry -ge 360 ]; then - kubectl_bin logs ${OPERATOR_NS:+-n $OPERATOR_NS} $(get_operator_pod) \ - | grep -v 'level=info' \ - | grep -v 'level=debug' \ - | grep -v 'Getting tasks for pod' \ - | grep -v 'Getting pods from source' \ - | tail -100 + kubectl_bin logs ${OPERATOR_NS:+-n $OPERATOR_NS} $(get_operator_pod) | + grep -v 'level=info' | + grep -v 'level=debug' | + grep -v 'Getting tasks for pod' | + grep -v 'Getting pods from source' | + tail -100 echo max retry count $retry reached. something went wrong with operator or kubernetes cluster exit 1 fi @@ -334,7 +334,7 @@ wait_restore() { | grep -v 'Getting tasks for pod' \ | grep -v 'Getting pods from source' \ | tail -100 - + kubectl_bin get psmdb-restore restore-${backup_name} -o yaml echo "Restore object restore-${backup_name} is in ${current_state} state." echo something went wrong with operator or kubernetes cluster exit 1 @@ -352,9 +352,9 @@ apply_rbac() { local operator_namespace=${OPERATOR_NS:-'psmdb-operator'} local rbac=${1:-'rbac'} - cat ${src_dir}/deploy/${rbac}.yaml \ - | sed -e "s^namespace: .*^namespace: $operator_namespace^" \ - | kubectl_bin apply ${OPERATOR_NS:+-n $OPERATOR_NS} -f - + cat ${src_dir}/deploy/${rbac}.yaml | + sed -e "s^namespace: .*^namespace: $operator_namespace^" | + kubectl_bin apply ${OPERATOR_NS:+-n $OPERATOR_NS} -f - } deploy_operator() { @@ -373,15 +373,15 @@ deploy_operator() { yq eval ' (.spec.template.spec.containers[].image = "'${IMAGE}'") | ((.. | select(.[] == "DISABLE_TELEMETRY")) |= .value="true") | - ((.. | select(.[] == "LOG_LEVEL")) |= .value="DEBUG")' ${src_dir}/deploy/cw-operator.yaml \ - | kubectl_bin apply -f - + ((.. | select(.[] == "LOG_LEVEL")) |= .value="DEBUG")' ${src_dir}/deploy/cw-operator.yaml | + kubectl_bin apply -f - else apply_rbac rbac yq eval ' (.spec.template.spec.containers[].image = "'${IMAGE}'") | ((.. | select(.[] == "DISABLE_TELEMETRY")) |= .value="true") | - ((.. | select(.[] == "LOG_LEVEL")) |= .value="DEBUG")' ${src_dir}/deploy/operator.yaml \ - | kubectl_bin apply -f - + ((.. | select(.[] == "LOG_LEVEL")) |= .value="DEBUG")' ${src_dir}/deploy/operator.yaml | + kubectl_bin apply -f - fi sleep 2 wait_pod $(get_operator_pod) @@ -519,8 +519,8 @@ destroy_chaos_mesh() { timeout 30 kubectl delete ValidatingWebhookConfiguration $(kubectl get ValidatingWebhookConfiguration | grep 'chaos-mesh' | awk '{print $1}') || : timeout 30 kubectl delete ValidatingWebhookConfiguration $(kubectl get ValidatingWebhookConfiguration | grep 'validate-auth' | awk '{print $1}') || : for i in $(kubectl api-resources | grep chaos-mesh | awk '{print $1}'); do - kubectl get ${i} --all-namespaces --no-headers -o custom-columns=Kind:.kind,Name:.metadata.name,NAMESPACE:.metadata.namespace \ - | while read -r line; do + kubectl get ${i} --all-namespaces --no-headers -o custom-columns=Kind:.kind,Name:.metadata.name,NAMESPACE:.metadata.namespace | + while read -r line; do local kind=$(echo "$line" | awk '{print $1}') local name=$(echo "$line" | awk '{print $2}') local namespace=$(echo "$line" | awk '{print $3}') @@ -602,12 +602,12 @@ wait_for_delete() { echo -n . let retry+=1 if [ $retry -ge $wait_time ]; then - kubectl_bin logs ${OPERATOR_NS:+-n $OPERATOR_NS} $(get_operator_pod) \ - | grep -v 'level=info' \ - | grep -v 'level=debug' \ - | grep -v 'Getting tasks for pod' \ - | grep -v 'Getting pods from source' \ - | tail -100 + kubectl_bin logs ${OPERATOR_NS:+-n $OPERATOR_NS} $(get_operator_pod) | + grep -v 'level=info' | + grep -v 'level=debug' | + grep -v 'Getting tasks for pod' | + grep -v 'Getting pods from source' | + tail -100 echo max retry count $retry reached. something went wrong with operator or kubernetes cluster exit 1 fi @@ -642,8 +642,8 @@ compare_kubectl() { fi fi - kubectl_bin get -o yaml ${resource} \ - | yq eval ' + kubectl_bin get -o yaml ${resource} | + yq eval ' del(.metadata.ownerReferences[].apiVersion) | del(.metadata.managedFields) | del(.. | select(has("creationTimestamp")).creationTimestamp) | @@ -776,9 +776,9 @@ compare_mongo_cmd() { local database="${5:-myApp}" local collection="${6:-test}" - run_mongo "use ${database}\n db.${collection}.${command}()" "$uri" "mongodb" "$suffix" \ - | egrep -v 'I NETWORK|W NETWORK|F NETWORK|Error saving history file|Percona Server for MongoDB|connecting to:|Unable to reach primary for set|Implicit session:|versions do not match|Error saving history file:' \ - | $sed -re 's/ObjectId\("[0-9a-f]+"\)//; s/-[0-9]+.svc/-xxx.svc/' \ + run_mongo "use ${database}\n db.${collection}.${command}()" "$uri" "mongodb" "$suffix" | + egrep -v 'I NETWORK|W NETWORK|F NETWORK|Error saving history file|Percona Server for MongoDB|connecting to:|Unable to reach primary for set|Implicit session:|versions do not match|Error saving history file:' | + $sed -re 's/ObjectId\("[0-9a-f]+"\)//; s/-[0-9]+.svc/-xxx.svc/' \ >$tmp_dir/${command}${postfix} diff ${test_dir}/compare/${command}${postfix}.json $tmp_dir/${command}${postfix} } @@ -791,9 +791,9 @@ compare_mongos_cmd() { local database="${5:-myApp}" local collection="${6:-test}" - run_mongos "use ${database}\n db.${collection}.${command}()" "$uri" "mongodb" "$suffix" \ - | egrep -v 'I NETWORK|W NETWORK|Error saving history file|Percona Server for MongoDB|connecting to:|Unable to reach primary for set|Implicit session:|versions do not match|Error saving history file:' \ - | $sed -re 's/ObjectId\("[0-9a-f]+"\)//; s/-[0-9]+.svc/-xxx.svc/' \ + run_mongos "use ${database}\n db.${collection}.${command}()" "$uri" "mongodb" "$suffix" | + egrep -v 'I NETWORK|W NETWORK|Error saving history file|Percona Server for MongoDB|connecting to:|Unable to reach primary for set|Implicit session:|versions do not match|Error saving history file:' | + $sed -re 's/ObjectId\("[0-9a-f]+"\)//; s/-[0-9]+.svc/-xxx.svc/' \ >$tmp_dir/${command}${postfix} diff ${test_dir}/compare/${command}${postfix}.json $tmp_dir/${command}${postfix} } @@ -801,10 +801,10 @@ compare_mongos_cmd() { get_mongo_primary_endpoint() { local uri="$1" - run_mongo 'db.isMaster().me' "$uri" "mongodb" ":27017" \ - | egrep -v "Time|Percona Server for MongoDB|bye|BinData|NumberLong|connecting to|Error saving history file|I NETWORK|W NETWORK|Implicit session:|versions do not match" \ - | sed -e 's^20[0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]:[0-9][0-9]:[0-9][0-9]\.[0-9][0-9][0-9]+[0-9][0-9][0-9][0-9]^^' \ - | grep ":27017$" + run_mongo 'db.isMaster().me' "$uri" "mongodb" ":27017" | + egrep -v "Time|Percona Server for MongoDB|bye|BinData|NumberLong|connecting to|Error saving history file|I NETWORK|W NETWORK|Implicit session:|versions do not match" | + sed -e 's^20[0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]:[0-9][0-9]:[0-9][0-9]\.[0-9][0-9][0-9]+[0-9][0-9][0-9][0-9]^^' | + grep ":27017$" } get_mongo_primary() { @@ -814,12 +814,12 @@ get_mongo_primary() { set_debug endpoint=$(get_mongo_primary_endpoint $uri) if [[ $endpoint =~ ".$cluster" ]]; then - echo $endpoint \ - | cut -d . -f 1 + echo $endpoint | + cut -d . -f 1 else - kubectl_bin get service -o wide \ - | grep " ${endpoint/:*/} " \ - | awk '{print $1}' + kubectl_bin get service -o wide | + grep " ${endpoint/:*/} " | + awk '{print $1}' fi } @@ -844,12 +844,12 @@ compare_mongo_user() { expected_result=${test_dir}/compare/$user-60.json fi - run_mongo 'db.runCommand({connectionStatus:1,showPrivileges:true})' "$uri" \ - | egrep -v "Time|Percona Server for MongoDB|bye|BinData|NumberLong|connecting to|Error saving history file|I NETWORK|W NETWORK|Implicit session:|versions do not match" \ - | sed -e 's^20[0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]:[0-9][0-9]:[0-9][0-9]\.[0-9][0-9][0-9]+[0-9][0-9][0-9][0-9]^^' \ - | $sed -e '/"ok" : 1/,+4d' \ - | sed -e '$s/,$/}/' \ - | jq '.authInfo.authenticatedUserPrivileges|=sort_by(.resource.anyResource, .resource.cluster, .resource.db, .resource.collection)|.authInfo.authenticatedUserRoles|=sort_by(.role)' \ + run_mongo 'db.runCommand({connectionStatus:1,showPrivileges:true})' "$uri" | + egrep -v "Time|Percona Server for MongoDB|bye|BinData|NumberLong|connecting to|Error saving history file|I NETWORK|W NETWORK|Implicit session:|versions do not match" | + sed -e 's^20[0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]:[0-9][0-9]:[0-9][0-9]\.[0-9][0-9][0-9]+[0-9][0-9][0-9][0-9]^^' | + $sed -e '/"ok" : 1/,+4d' | + sed -e '$s/,$/}/' | + jq '.authInfo.authenticatedUserPrivileges|=sort_by(.resource.anyResource, .resource.cluster, .resource.db, .resource.collection)|.authInfo.authenticatedUserRoles|=sort_by(.role)' \ >$tmp_dir/$user.json diff -u $expected_result $tmp_dir/$user.json } @@ -884,10 +884,10 @@ delete_crd() { kubectl_bin delete -f "${src_dir}/deploy/crd.yaml" --ignore-not-found --wait=false || : for crd_name in $(yq eval '.metadata.name' "${src_dir}/deploy/crd.yaml" | grep -v '\-\-\-'); do - kubectl get ${crd_name} --all-namespaces -o wide \ - | grep -v 'NAMESPACE' \ - | xargs -L 1 sh -xc 'kubectl patch '${crd_name}' -n $0 $1 --type=merge -p "{\"metadata\":{\"finalizers\":[]}}"' \ - || : + kubectl get ${crd_name} --all-namespaces -o wide | + grep -v 'NAMESPACE' | + xargs -L 1 sh -xc 'kubectl patch '${crd_name}' -n $0 $1 --type=merge -p "{\"metadata\":{\"finalizers\":[]}}"' || + : kubectl_bin wait --for=delete crd ${crd_name} || : done @@ -905,16 +905,16 @@ destroy() { desc 'destroy cluster/operator and all other resources' if [ ${ignore_logs} == "false" ] && [ "${DEBUG_TESTS}" == 1 ]; then - kubectl_bin logs ${OPERATOR_NS:+-n $OPERATOR_NS} $(get_operator_pod) \ - | grep -v 'level=info' \ - | grep -v 'level=debug' \ - | grep -v 'Getting tasks for pod' \ - | grep -v 'Getting pods from source' \ - | grep -v 'the object has been modified' \ - | grep -v 'get backup status: Job.batch' \ - | $sed -r 's/"ts":[0-9.]+//; s^limits-[0-9.]+/^^g' \ - | sort -u \ - | tee $tmp_dir/operator.log + kubectl_bin logs ${OPERATOR_NS:+-n $OPERATOR_NS} $(get_operator_pod) | + grep -v 'level=info' | + grep -v 'level=debug' | + grep -v 'Getting tasks for pod' | + grep -v 'Getting pods from source' | + grep -v 'the object has been modified' | + grep -v 'get backup status: Job.batch' | + $sed -r 's/"ts":[0-9.]+//; s^limits-[0-9.]+/^^g' | + sort -u | + tee $tmp_dir/operator.log fi #TODO: maybe will be enabled later #diff $test_dir/compare/operator.log $tmp_dir/operator.log @@ -948,17 +948,17 @@ desc() { get_backup_dest() { local backup_name=$1 - kubectl_bin get psmdb-backup $backup_name -o jsonpath='{.status.destination}' \ - | sed -e 's/.json$//' | sed "s|s3://||" | sed "s|azure://||" + kubectl_bin get psmdb-backup $backup_name -o jsonpath='{.status.destination}' | + sed -e 's/.json$//' | sed "s|s3://||" | sed "s|azure://||" } get_service_endpoint() { local service=$1 local hostname=$( - kubectl_bin get service/$service -o json \ - | jq '.status.loadBalancer.ingress[].hostname' \ - | sed -e 's/^"//; s/"$//;' + kubectl_bin get service/$service -o json | + jq '.status.loadBalancer.ingress[].hostname' | + sed -e 's/^"//; s/"$//;' ) if [ -n "$hostname" -a "$hostname" != "null" ]; then echo $hostname @@ -966,9 +966,9 @@ get_service_endpoint() { fi local ip=$( - kubectl_bin get service/$service -o json \ - | jq '.status.loadBalancer.ingress[].ip' \ - | sed -e 's/^"//; s/"$//;' + kubectl_bin get service/$service -o json | + jq '.status.loadBalancer.ingress[].ip' | + sed -e 's/^"//; s/"$//;' ) if [ -n "$ip" -a "$ip" != "null" ]; then echo $ip @@ -986,9 +986,9 @@ get_metric_values() { local end=$($date -u "+%s") local endpoint=$(get_service_endpoint monitoring-service) - curl -s -k "https://${user_pass}@$endpoint/graph/api/datasources/proxy/1/api/v1/query_range?query=min%28$metric%7Bnode_name%3D%7E%22$instance%22%7d%20or%20$metric%7Bnode_name%3D%7E%22$instance%22%7D%29&start=$start&end=$end&step=60" \ - | jq '.data.result[0].values[][1]' \ - | grep '^"[0-9]' + curl -s -k "https://${user_pass}@$endpoint/graph/api/datasources/proxy/1/api/v1/query_range?query=min%28$metric%7Bnode_name%3D%7E%22$instance%22%7d%20or%20$metric%7Bnode_name%3D%7E%22$instance%22%7D%29&start=$start&end=$end&step=60" | + jq '.data.result[0].values[][1]' | + grep '^"[0-9]' } @@ -1036,8 +1036,8 @@ get_qan_values() { EOF local response - response=$(curl -s -k -XPOST -d @payload.json "https://${user_pass}@$endpoint/v0/qan/GetReport" \ - | jq '.rows[].fingerprint') + response=$(curl -s -k -XPOST -d @payload.json "https://${user_pass}@$endpoint/v0/qan/GetReport" | + jq '.rows[].fingerprint') rm -f payload.json if [[ $response == "null" ]]; then echo "No data for $service_type service type in QAN" @@ -1046,25 +1046,25 @@ EOF } cat_config() { - cat "$1" \ - | yq eval '(.spec | select(.image == null)).image = "'"$IMAGE_MONGOD"'"' \ - | yq eval '(.spec | select(has("pmm"))).pmm.image = "'"$IMAGE_PMM_CLIENT"'"' \ - | yq eval '(.spec | select(has("initImage"))).initImage = "'"$IMAGE"'"' \ - | yq eval '(.spec | select(has("backup"))).backup.image = "'"$IMAGE_BACKUP"'"' \ - | yq eval '.spec.upgradeOptions.apply="Never"' + cat "$1" | + yq eval '(.spec | select(.image == null)).image = "'"$IMAGE_MONGOD"'"' | + yq eval '(.spec | select(has("pmm"))).pmm.image = "'"$IMAGE_PMM_CLIENT"'"' | + yq eval '(.spec | select(has("initImage"))).initImage = "'"$IMAGE"'"' | + yq eval '(.spec | select(has("backup"))).backup.image = "'"$IMAGE_BACKUP"'"' | + yq eval '.spec.upgradeOptions.apply="Never"' } apply_cluster() { if [ -z "$SKIP_BACKUPS_TO_AWS_GCP_AZURE" ]; then - cat_config "$1" \ - | kubectl_bin apply -f - + cat_config "$1" | + kubectl_bin apply -f - else - cat_config "$1" \ - | yq eval ' + cat_config "$1" | + yq eval ' del(.spec.backup.tasks.[1]) | del(.spec.backup.tasks.[1]) | - del(.spec.backup.tasks.[1])' - \ - | kubectl_bin apply -f - + del(.spec.backup.tasks.[1])' - | + kubectl_bin apply -f - fi } @@ -1167,8 +1167,8 @@ run_backup() { yq eval '.metadata.name = "'${backup_name}'" | .spec.storageName = "'${storage}'"' \ - $test_dir/conf/backup-$storage.yml \ - | kubectl_bin apply -f - + $test_dir/conf/backup-$storage.yml | + kubectl_bin apply -f - } check_backup_deletion() { @@ -1224,10 +1224,10 @@ check_crd_for_deletion() { for crd_name in $(curl -s https://raw.githubusercontent.com/percona/percona-server-mongodb-operator/${git_tag}/deploy/crd.yaml | yq eval '.metadata.name' | $sed 's/---//g' | $sed ':a;N;$!ba;s/\n/ /g'); do if [[ $(kubectl_bin get crd/${crd_name} -o jsonpath='{.status.conditions[-1].type}') == "Terminating" ]]; then - kubectl get ${crd_name} --all-namespaces -o wide \ - | grep -v 'NAMESPACE' \ - | xargs -L 1 sh -xc 'kubectl patch '${crd_name}' -n $0 $1 --type=merge -p "{\"metadata\":{\"finalizers\":[]}}"' \ - || : + kubectl get ${crd_name} --all-namespaces -o wide | + grep -v 'NAMESPACE' | + xargs -L 1 sh -xc 'kubectl patch '${crd_name}' -n $0 $1 --type=merge -p "{\"metadata\":{\"finalizers\":[]}}"' || + : fi done } @@ -1277,15 +1277,15 @@ function generate_vs_json() { for image_mongod in ${IMAGE_MONGOD_CHAIN[@]}; do current_mongod_version=$(get_mongod_ver_from_image ${image_mongod}) - version_service_source=$(echo ${version_service_source} \ - | jq '.versions[0].matrix.mongod += {"'${current_mongod_version}'": {"image_path":"'${image_mongod}'","status":"recommended"}}') + version_service_source=$(echo ${version_service_source} | + jq '.versions[0].matrix.mongod += {"'${current_mongod_version}'": {"image_path":"'${image_mongod}'","status":"recommended"}}') done - version_service_source=$(echo ${version_service_source} \ - | jq '.versions[0].matrix.backup += {"'$(get_pbm_version ${IMAGE_BACKUP})'": {"image_path":"'${IMAGE_BACKUP}'","status":"recommended"}}') + version_service_source=$(echo ${version_service_source} | + jq '.versions[0].matrix.backup += {"'$(get_pbm_version ${IMAGE_BACKUP})'": {"image_path":"'${IMAGE_BACKUP}'","status":"recommended"}}') - version_service_source=$(echo ${version_service_source} \ - | jq '.versions[0].matrix.operator += {"'${OPERATOR_VERSION}'": {"image_path":"'${IMAGE}'","status":"recommended"}}') + version_service_source=$(echo ${version_service_source} | + jq '.versions[0].matrix.operator += {"'${OPERATOR_VERSION}'": {"image_path":"'${IMAGE}'","status":"recommended"}}') echo ${version_service_source} | jq '.' >${target_path} } diff --git a/e2e-tests/pitr-physical/run b/e2e-tests/pitr-physical/run index e80343da6c..e0150c0ec0 100755 --- a/e2e-tests/pitr-physical/run +++ b/e2e-tests/pitr-physical/run @@ -6,6 +6,16 @@ test_dir=$(realpath $(dirname $0)) . ${test_dir}/../functions set_debug +format_date() { + local timestamp=$1 + echo $(TZ=UTC $date -d@${timestamp} '+%Y-%m-%d %H:%M:%S') +} + +get_latest_oplog_chunk_ts() { + local cluster=$1 + echo $(kubectl_bin exec $cluster-rs0-0 -c backup-agent -- pbm status -o json | jq '.backups.pitrChunks.pitrChunks | last | .range.end') +} + write_document() { local cmp_postfix="$1" @@ -49,24 +59,48 @@ check_recovery() { local cmp_postfix=$4 local cluster_name=$5 + local latest_ts=$(get_latest_oplog_chunk_ts $cluster_name) + desc "write more data before restore by $restore_type" run_mongos \ 'use myApp\n db.test.insert({ x: 100501 })' \ "myApp:myPass@$cluster-mongos.$namespace" - kubectl exec -it some-name-rs0-0 -c backup-agent -- pbm status - - desc 'waiting for chunks to be uploaded' - sleep 150 - - kubectl exec -it some-name-rs0-0 -c backup-agent -- pbm status + if [[ -n ${restore_date} ]]; then + desc "Restoring to time $(format_date ${restore_date})" + retries=0 + until [[ ${latest_ts} -gt ${restore_date} ]]; do + if [[ $retries -gt 30 ]]; then + echo "Last oplog chunk ($(format_date ${latest_ts})) is not greater than restore target ($(format_date ${restore_date}))" + exit 1 + fi + latest_ts=$(get_latest_oplog_chunk_ts $cluster_name) + retries=$((retries + 1)) + echo "Waiting for last oplog chunk ($(format_date ${latest_ts})) to be greater than restore target ($(format_date ${restore_date}))" + sleep 10 + done + else + desc "Restoring to latest" + local current_ts=$(get_latest_oplog_chunk_ts $cluster_name) + retries=0 + until [[ ${latest_ts} -gt ${current_ts} ]]; do + if [[ $retries -gt 30 ]]; then + echo "Last oplog chunk ($(format_date ${latest_ts})) is not greater than starting chunk ($(format_date ${current_ts}))" + exit 1 + fi + latest_ts=$(get_latest_oplog_chunk_ts $cluster_name) + retries=$((retries + 1)) + echo "Waiting for last oplog chunk ($(format_date ${latest_ts})) to be greater than starting chunk ($(format_date ${current_ts}))" + sleep 10 + done + fi desc "check restore by $restore_type" cat $test_dir/conf/restore.yml \ | $sed -e "s/name:/name: restore-$backup_name/" \ | $sed -e "s/backupName:/backupName: $backup_name/" \ | $sed -e "s/type:/type: $restore_type/" \ - | if [ -z "$restore_date" ]; then $sed -e "/date:/d"; else $sed -e "s/date:/date: $restore_date/"; fi \ + | if [ -z "$restore_date" ]; then $sed -e "/date:/d"; else $sed -e "s/date:/date: $(format_date ${restore_date})/"; fi \ | kubectl_bin apply -f - # fail faster if we don't reach requested status until some time @@ -120,11 +154,23 @@ main() { run_backup $backup_name_minio 1 physical write_document "-2nd" - sleep 2 - - time_now=$(run_mongos 'new Date().toISOString()' "myApp:myPass@$cluster-mongos.$namespace" "mongodb" "" "--quiet" | grep -E -v 'I NETWORK|W NETWORK|Error saving history file|Percona Server for MongoDB|connecting to:|Unable to reach primary for set|Implicit session:|versions do not match|Error saving history file:' | cut -c1-19 | tr T " ") - check_recovery $backup_name_minio-1 date "$time_now" "-2nd" "$cluster" + backup_last_write=$(kubectl_bin exec $cluster-rs0-0 -c backup-agent -- pbm status -o json | jq .backups.snapshot[0].restoreTo) + last_chunk=$(get_latest_oplog_chunk_ts $cluster) + + retries=0 + until [[ ${last_chunk} -gt ${backup_last_write} ]]; do + if [[ $retries -gt 30 ]]; then + echo "Last oplog chunk ($(format_date ${last_chunk})) is not greater than last write ($(format_date ${backup_last_write}))" + exit 1 + fi + last_chunk=$(get_latest_oplog_chunk_ts $cluster) + retries=$((retries + 1)) + echo "Waiting for last oplog chunk ($(format_date ${last_chunk})) to be greater than last write ($(format_date ${backup_last_write}))" + sleep 10 + done + + check_recovery $backup_name_minio-1 date "${last_chunk}" "-2nd" "$cluster" run_backup $backup_name_minio 2 logical run_backup $backup_name_minio 3 physical diff --git a/go.mod b/go.mod index 49a88eef18..97de18e715 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/go-openapi/validate v0.22.6 github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 github.com/hashicorp/go-version v1.6.0 - github.com/percona/percona-backup-mongodb v1.8.1-0.20230920143330-3b1c2e263901 + github.com/percona/percona-backup-mongodb v1.8.1-0.20240321083036-d96752d1abc5 github.com/pkg/errors v0.9.1 github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.8.4 @@ -34,7 +34,7 @@ require ( require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.1 // indirect - github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect @@ -68,7 +68,7 @@ require ( github.com/jmespath/go-jmespath v0.4.1-0.20220621161143-b0104c826a24 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.16.7 // indirect + github.com/klauspost/compress v1.17.3 // indirect github.com/klauspost/pgzip v1.2.6 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect @@ -78,7 +78,7 @@ require ( github.com/moby/spdystream v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/mongodb/mongo-tools v0.0.0-20230720205640-fb74684da15f // indirect + github.com/mongodb/mongo-tools v0.0.0-20231117185435-bf0bef9e9f19 // indirect github.com/montanaflynn/stats v0.6.6 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect diff --git a/go.sum b/go.sum index 69f3c74044..8c61d5166c 100644 --- a/go.sum +++ b/go.sum @@ -9,8 +9,8 @@ github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.1 h1:6oNBlSdi1QqM1PNW7FPA6xO github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.1/go.mod h1:s4kgfzA0covAXNicZHDMN58jExvcng2mC/DepXiF1EI= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.2.0 h1:Ma67P/GGprNwsslzEH6+Kb8nybI8jpDTm4Wmzu2ReK8= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.2.0/go.mod h1:c+Lifp3EDEamAkPVzMooRNOK6CZjNSdEnf1A7jsI9u4= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 h1:nVocQV40OQne5613EeLayJiRAJuKlBGy+m22qWG+WRg= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0/go.mod h1:7QJP7dr2wznCMeqIrhMgWGf7XpAQnVrJqDm9nvV3Cu4= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0 h1:gggzg0SUMs6SQbEw+3LoSsYf9YMjkupeAnHMX8O9mmY= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0/go.mod h1:+6KLcKIVgxoBDMqMO/Nvy7bZ9a0nbU3I1DtFQK3YvB4= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI= github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0= @@ -108,8 +108,8 @@ github.com/evanphx/json-patch/v5 v5.0.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2Vvl github.com/evanphx/json-patch/v5 v5.8.0 h1:lRj6N9Nci7MvzrXuX6HFzU8XjmhPiXPlsKEy1u0KQro= github.com/evanphx/json-patch/v5 v5.8.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= -github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA= -github.com/frankban/quicktest v1.14.5/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= @@ -262,8 +262,8 @@ github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsC github.com/googleapis/gnostic v0.1.0/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= github.com/googleapis/gnostic v0.3.1/go.mod h1:on+2t9HRStVgn95RSsFWFz+6Q0Snyqv1awfrALZdbtU= github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8= -github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c h1:7lF+Vz0LqiRidnzC1Oq86fpX1q/iEv2KJdrCtttYjT4= -github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= +github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -314,8 +314,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= +github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU= github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -364,8 +364,8 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/mongodb/mongo-tools v0.0.0-20230720205640-fb74684da15f h1:vrP8tGvlgFyelOah/ndZgZfia5fXbNVIb37ldYYg+OM= -github.com/mongodb/mongo-tools v0.0.0-20230720205640-fb74684da15f/go.mod h1:FjrtGjfqHbUZEkbw0lZ+GB/3rqQsZM9KCFYnO8xx2cU= +github.com/mongodb/mongo-tools v0.0.0-20231117185435-bf0bef9e9f19 h1:B0nhjnm3za73rABZa3HdMhn9WuOXPPHweBBqhZnWinI= +github.com/mongodb/mongo-tools v0.0.0-20231117185435-bf0bef9e9f19/go.mod h1:2Rl3k3e333g2AJN74N9hx9N4IIhB0IcTU3m92oNsOyE= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/montanaflynn/stats v0.6.6 h1:Duep6KMIDpY4Yo11iFsvyqJDyfzLF9+sndUKT+v64GQ= github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= @@ -399,8 +399,10 @@ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYr github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= -github.com/percona/percona-backup-mongodb v1.8.1-0.20230920143330-3b1c2e263901 h1:BDgsZRCjEuxl2/z4yWBqB0s8d20shuIDks7/RVdZiLs= -github.com/percona/percona-backup-mongodb v1.8.1-0.20230920143330-3b1c2e263901/go.mod h1:fZRCMpUqkWlLVdRKqqaj001LoVP2eo6F0ZhoMPeXDng= +github.com/percona/percona-backup-mongodb v1.8.1-0.20240305084523-0b6fbbb00c0d h1:awlHd27WtO5yRm5+1q8RsyZpBZvzB9BUNTfIQRCddjc= +github.com/percona/percona-backup-mongodb v1.8.1-0.20240305084523-0b6fbbb00c0d/go.mod h1:Xrz+58/Ugutc/zqAsUxGHxSRCXrUUiJ8BYHoDhxC4ao= +github.com/percona/percona-backup-mongodb v1.8.1-0.20240321083036-d96752d1abc5 h1:pUIIJgm/YcjFeod0hqZU4Hz8eRiUpHCfC2PYrGAw+Ag= +github.com/percona/percona-backup-mongodb v1.8.1-0.20240321083036-d96752d1abc5/go.mod h1:Xrz+58/Ugutc/zqAsUxGHxSRCXrUUiJ8BYHoDhxC4ao= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -478,7 +480,6 @@ github.com/stretchr/objx v0.5.1/go.mod h1:/iHQpkQwBD6DLUmQ4pE+s1TXdob1mORJ4/UFdr github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= diff --git a/pkg/apis/psmdb/v1/perconaservermongodbbackup_types.go b/pkg/apis/psmdb/v1/perconaservermongodbbackup_types.go index 055ab51dc7..060dce1011 100644 --- a/pkg/apis/psmdb/v1/perconaservermongodbbackup_types.go +++ b/pkg/apis/psmdb/v1/perconaservermongodbbackup_types.go @@ -3,9 +3,10 @@ package v1 import ( "fmt" - "github.com/percona/percona-backup-mongodb/pbm" - "github.com/percona/percona-backup-mongodb/pbm/compress" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/percona/percona-backup-mongodb/pbm/compress" + "github.com/percona/percona-backup-mongodb/pbm/defs" ) // PerconaServerMongoDBBackupSpec defines the desired state of PerconaServerMongoDBBackup @@ -17,7 +18,7 @@ type PerconaServerMongoDBBackupSpec struct { CompressionLevel *int `json:"compressionLevel,omitempty"` // +kubebuilder:validation:Enum={logical,physical} - Type pbm.BackupType `json:"type,omitempty"` + Type defs.BackupType `json:"type,omitempty"` } type BackupState string @@ -34,7 +35,7 @@ const ( // PerconaServerMongoDBBackupStatus defines the observed state of PerconaServerMongoDBBackup type PerconaServerMongoDBBackupStatus struct { - Type pbm.BackupType `json:"type,omitempty"` + Type defs.BackupType `json:"type,omitempty"` State BackupState `json:"state,omitempty"` StartAt *metav1.Time `json:"start,omitempty"` CompletedAt *metav1.Time `json:"completed,omitempty"` @@ -88,7 +89,7 @@ func (p *PerconaServerMongoDBBackup) CheckFields() error { return fmt.Errorf("spec clusterName and deprecated psmdbCluster fields are empty") } if string(p.Spec.Type) == "" { - p.Spec.Type = pbm.LogicalBackup + p.Spec.Type = defs.LogicalBackup } if string(p.Spec.Compression) == "" { p.Spec.Compression = compress.CompressionTypeGZIP diff --git a/pkg/apis/psmdb/v1/psmdb_types.go b/pkg/apis/psmdb/v1/psmdb_types.go index 81787f2253..9901afd909 100644 --- a/pkg/apis/psmdb/v1/psmdb_types.go +++ b/pkg/apis/psmdb/v1/psmdb_types.go @@ -8,8 +8,6 @@ import ( "github.com/go-logr/logr" v "github.com/hashicorp/go-version" - "github.com/percona/percona-backup-mongodb/pbm" - "github.com/percona/percona-backup-mongodb/pbm/compress" "github.com/pkg/errors" "gopkg.in/yaml.v2" appsv1 "k8s.io/api/apps/v1" @@ -22,6 +20,8 @@ import ( k8sversion "k8s.io/apimachinery/pkg/version" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "github.com/percona/percona-backup-mongodb/pbm/compress" + "github.com/percona/percona-backup-mongodb/pbm/defs" "github.com/percona/percona-server-mongodb-operator/pkg/mcs" "github.com/percona/percona-server-mongodb-operator/pkg/util/numstr" "github.com/percona/percona-server-mongodb-operator/version" @@ -705,7 +705,7 @@ type BackupTaskSpec struct { CompressionLevel *int `json:"compressionLevel,omitempty"` // +kubebuilder:validation:Enum={logical,physical} - Type pbm.BackupType `json:"type,omitempty"` + Type defs.BackupType `json:"type,omitempty"` } func (task *BackupTaskSpec) JobName(cr *PerconaServerMongoDB) string { diff --git a/pkg/controller/perconaservermongodb/backup.go b/pkg/controller/perconaservermongodb/backup.go index bec98cffe1..d89bf775f1 100644 --- a/pkg/controller/perconaservermongodb/backup.go +++ b/pkg/controller/perconaservermongodb/backup.go @@ -363,7 +363,7 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api. } } - val, err := pbm.GetConfigVar("pitr.enabled") + val, err := pbm.GetConfigVar(ctx, "pitr.enabled") if err != nil { if !errors.Is(err, mongo.ErrNoDocuments) { return errors.Wrap(err, "get pitr.enabled") @@ -417,7 +417,7 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api. if enabled != cr.Spec.Backup.PITR.Enabled { val := strconv.FormatBool(cr.Spec.Backup.PITR.Enabled) log.Info("Setting pitr.enabled in PBM config", "enabled", val) - if err := pbm.SetConfigVar("pitr.enabled", val); err != nil { + if err := pbm.SetConfigVar(ctx, "pitr.enabled", val); err != nil { return errors.Wrap(err, "update pitr.enabled") } } @@ -426,7 +426,7 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api. return nil } - val, err = pbm.GetConfigVar("pitr.oplogOnly") + val, err = pbm.GetConfigVar(ctx, "pitr.oplogOnly") if err != nil { if errors.Is(err, mongo.ErrNoDocuments) { return nil @@ -447,12 +447,12 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api. if oplogOnly != cr.Spec.Backup.PITR.OplogOnly { enabled := strconv.FormatBool(cr.Spec.Backup.PITR.OplogOnly) log.Info("Setting pitr.oplogOnly in PBM config", "value", enabled) - if err := pbm.SetConfigVar("pitr.oplogOnly", enabled); err != nil { + if err := pbm.SetConfigVar(ctx, "pitr.oplogOnly", enabled); err != nil { return errors.Wrap(err, "update pitr.oplogOnly") } } - val, err = pbm.GetConfigVar("pitr.oplogSpanMin") + val, err = pbm.GetConfigVar(ctx, "pitr.oplogSpanMin") if err != nil { if !errors.Is(err, mongo.ErrNoDocuments) { return errors.Wrap(err, "get pitr.oplogSpanMin") @@ -468,12 +468,12 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api. if oplogSpanMin != cr.Spec.Backup.PITR.OplogSpanMin.Float64() { val := cr.Spec.Backup.PITR.OplogSpanMin.String() - if err := pbm.SetConfigVar("pitr.oplogSpanMin", val); err != nil { + if err := pbm.SetConfigVar(ctx, "pitr.oplogSpanMin", val); err != nil { return errors.Wrap(err, "update pitr.oplogSpanMin") } } - val, err = pbm.GetConfigVar("pitr.compression") + val, err = pbm.GetConfigVar(ctx, "pitr.compression") var compression = "" if err != nil { if errors.Is(err, mongo.ErrNoDocuments) { @@ -490,23 +490,23 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api. if compression != string(cr.Spec.Backup.PITR.CompressionType) { if string(cr.Spec.Backup.PITR.CompressionType) == "" { - if err := pbm.DeleteConfigVar("pitr.compression"); err != nil { + if err := pbm.DeleteConfigVar(ctx, "pitr.compression"); err != nil { return errors.Wrap(err, "delete pitr.compression") } - } else if err := pbm.SetConfigVar("pitr.compression", string(cr.Spec.Backup.PITR.CompressionType)); err != nil { + } else if err := pbm.SetConfigVar(ctx, "pitr.compression", string(cr.Spec.Backup.PITR.CompressionType)); err != nil { return errors.Wrap(err, "update pitr.compression") } // PBM needs to disabling and enabling PITR to change compression type - if err := pbm.SetConfigVar("pitr.enabled", "false"); err != nil { + if err := pbm.SetConfigVar(ctx, "pitr.enabled", "false"); err != nil { return errors.Wrap(err, "disable pitr") } - if err := pbm.SetConfigVar("pitr.enabled", "true"); err != nil { + if err := pbm.SetConfigVar(ctx, "pitr.enabled", "true"); err != nil { return errors.Wrap(err, "enable pitr") } } - val, err = pbm.GetConfigVar("pitr.compressionLevel") + val, err = pbm.GetConfigVar(ctx, "pitr.compressionLevel") var compressionLevel *int = nil if err != nil { if errors.Is(err, mongo.ErrNoDocuments) { @@ -524,18 +524,18 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api. if !reflect.DeepEqual(compressionLevel, cr.Spec.Backup.PITR.CompressionLevel) { if cr.Spec.Backup.PITR.CompressionLevel == nil { - if err := pbm.DeleteConfigVar("pitr.compressionLevel"); err != nil { + if err := pbm.DeleteConfigVar(ctx, "pitr.compressionLevel"); err != nil { return errors.Wrap(err, "delete pitr.compressionLevel") } - } else if err := pbm.SetConfigVar("pitr.compressionLevel", strconv.FormatInt(int64(*cr.Spec.Backup.PITR.CompressionLevel), 10)); err != nil { + } else if err := pbm.SetConfigVar(ctx, "pitr.compressionLevel", strconv.FormatInt(int64(*cr.Spec.Backup.PITR.CompressionLevel), 10)); err != nil { return errors.Wrap(err, "update pitr.compressionLevel") } // PBM needs to disabling and enabling PITR to change compression level - if err := pbm.SetConfigVar("pitr.enabled", "false"); err != nil { + if err := pbm.SetConfigVar(ctx, "pitr.enabled", "false"); err != nil { return errors.Wrap(err, "disable pitr") } - if err := pbm.SetConfigVar("pitr.enabled", "true"); err != nil { + if err := pbm.SetConfigVar(ctx, "pitr.enabled", "true"); err != nil { return errors.Wrap(err, "enable pitr") } } diff --git a/pkg/controller/perconaservermongodb/version.go b/pkg/controller/perconaservermongodb/version.go index 4df3b970ef..5da86a77c8 100644 --- a/pkg/controller/perconaservermongodb/version.go +++ b/pkg/controller/perconaservermongodb/version.go @@ -17,7 +17,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" - "github.com/percona/percona-backup-mongodb/pbm" + "github.com/percona/percona-backup-mongodb/pbm/defs" api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" "github.com/percona/percona-server-mongodb-operator/pkg/k8s" ) @@ -281,7 +281,7 @@ func (r *ReconcilePerconaServerMongoDB) getVersionMeta(ctx context.Context, cr * } for _, task := range cr.Spec.Backup.Tasks { - if task.Type == pbm.PhysicalBackup && task.Enabled { + if task.Type == defs.PhysicalBackup && task.Enabled { vm.PhysicalBackupScheduled = true break } diff --git a/pkg/controller/perconaservermongodb/version_test.go b/pkg/controller/perconaservermongodb/version_test.go index 8493a98a5c..a7ff1bdff5 100644 --- a/pkg/controller/perconaservermongodb/version_test.go +++ b/pkg/controller/perconaservermongodb/version_test.go @@ -21,7 +21,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" logf "sigs.k8s.io/controller-runtime/pkg/log" - "github.com/percona/percona-backup-mongodb/pbm" + "github.com/percona/percona-backup-mongodb/pbm/defs" "github.com/percona/percona-server-mongodb-operator/pkg/apis" api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" "github.com/percona/percona-server-mongodb-operator/pkg/k8s" @@ -399,7 +399,7 @@ func TestVersionMeta(t *testing.T) { }, Tasks: []api.BackupTaskSpec{ { - Type: pbm.PhysicalBackup, + Type: defs.PhysicalBackup, Enabled: true, }, }, diff --git a/pkg/controller/perconaservermongodbbackup/backup.go b/pkg/controller/perconaservermongodbbackup/backup.go index e3c516784d..9f2b03eefc 100644 --- a/pkg/controller/perconaservermongodbbackup/backup.go +++ b/pkg/controller/perconaservermongodbbackup/backup.go @@ -6,13 +6,16 @@ import ( "strings" "time" - "github.com/percona/percona-backup-mongodb/pbm" - api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" - "github.com/percona/percona-server-mongodb-operator/pkg/psmdb/backup" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/percona/percona-backup-mongodb/pbm/ctrl" + "github.com/percona/percona-backup-mongodb/pbm/defs" + pbmErrors "github.com/percona/percona-backup-mongodb/pbm/errors" + api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" + "github.com/percona/percona-server-mongodb-operator/pkg/psmdb/backup" ) const ( @@ -60,9 +63,9 @@ func (b *Backup) Start(ctx context.Context, k8sclient client.Client, cluster *ap compLevel = &l } - err = b.pbm.SendCmd(pbm.Cmd{ - Cmd: pbm.CmdBackup, - Backup: &pbm.BackupCmd{ + err = b.pbm.SendCmd(ctx, ctrl.Cmd{ + Cmd: ctrl.CmdBackup, + Backup: &ctrl.BackupCmd{ Name: name, Type: cr.Spec.Type, Compression: cr.Spec.Compression, @@ -121,12 +124,12 @@ func (b *Backup) Start(ctx context.Context, k8sclient client.Client, cluster *ap func (b *Backup) Status(ctx context.Context, cr *api.PerconaServerMongoDBBackup) (api.PerconaServerMongoDBBackupStatus, error) { status := cr.Status - meta, err := b.pbm.GetBackupMeta(cr.Status.PBMname) - if err != nil && !errors.Is(err, pbm.ErrNotFound) { + meta, err := b.pbm.GetBackupMeta(ctx, cr.Status.PBMname) + if err != nil && !errors.Is(err, pbmErrors.ErrNotFound) { return status, errors.Wrap(err, "get pbm backup meta") } - if meta == nil || meta.Name == "" || errors.Is(err, pbm.ErrNotFound) { + if meta == nil || meta.Name == "" || errors.Is(err, pbmErrors.ErrNotFound) { logf.FromContext(ctx).Info("Waiting for backup metadata", "pbmName", cr.Status.PBMname, "backup", cr.Name) return status, nil } @@ -138,15 +141,15 @@ func (b *Backup) Status(ctx context.Context, cr *api.PerconaServerMongoDBBackup) } switch meta.Status { - case pbm.StatusError: + case defs.StatusError: status.State = api.BackupStateError status.Error = fmt.Sprintf("%v", meta.Error()) - case pbm.StatusDone: + case defs.StatusDone: status.State = api.BackupStateReady status.CompletedAt = &metav1.Time{ Time: time.Unix(meta.LastTransitionTS, 0), } - case pbm.StatusStarting: + case defs.StatusStarting: passed := time.Now().UTC().Sub(time.Unix(meta.StartTS, 0)) if passed >= pbmStartingDeadline { status.State = api.BackupStateError @@ -164,7 +167,7 @@ func (b *Backup) Status(ctx context.Context, cr *api.PerconaServerMongoDBBackup) } status.Type = cr.Spec.Type - node, err := b.pbm.Node() + node, err := b.pbm.Node(ctx) if err != nil { return status, nil } diff --git a/pkg/controller/perconaservermongodbbackup/perconaservermongodbbackup_controller.go b/pkg/controller/perconaservermongodbbackup/perconaservermongodbbackup_controller.go index 57844ffee7..5547c715f4 100644 --- a/pkg/controller/perconaservermongodbbackup/perconaservermongodbbackup_controller.go +++ b/pkg/controller/perconaservermongodbbackup/perconaservermongodbbackup_controller.go @@ -5,11 +5,6 @@ import ( "fmt" "time" - "github.com/percona/percona-backup-mongodb/pbm" - pbmLog "github.com/percona/percona-backup-mongodb/pbm/log" - "github.com/percona/percona-backup-mongodb/pbm/storage" - "github.com/percona/percona-backup-mongodb/pbm/storage/azure" - "github.com/percona/percona-backup-mongodb/pbm/storage/s3" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -27,6 +22,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + pbmBackup "github.com/percona/percona-backup-mongodb/pbm/backup" + "github.com/percona/percona-backup-mongodb/pbm/ctrl" + pbmErrors "github.com/percona/percona-backup-mongodb/pbm/errors" + pbmLog "github.com/percona/percona-backup-mongodb/pbm/log" + "github.com/percona/percona-backup-mongodb/pbm/storage" + "github.com/percona/percona-backup-mongodb/pbm/storage/azure" + "github.com/percona/percona-backup-mongodb/pbm/storage/s3" psmdbv1 "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" "github.com/percona/percona-server-mongodb-operator/pkg/psmdb/backup" "github.com/percona/percona-server-mongodb-operator/version" @@ -308,13 +310,13 @@ func secret(ctx context.Context, cl client.Client, namespace, secretName string) return secret, err } -func getPBMBackupMeta(cr *psmdbv1.PerconaServerMongoDBBackup) *pbm.BackupMeta { - meta := &pbm.BackupMeta{ +func getPBMBackupMeta(cr *psmdbv1.PerconaServerMongoDBBackup) *pbmBackup.BackupMeta { + meta := &pbmBackup.BackupMeta{ Name: cr.Status.PBMname, Compression: cr.Spec.Compression, } for _, rs := range cr.Status.ReplsetNames { - meta.Replsets = append(meta.Replsets, pbm.BackupReplset{ + meta.Replsets = append(meta.Replsets, pbmBackup.BackupReplset{ Name: rs, OplogName: fmt.Sprintf("%s_%s.oplog.gz", meta.Name, rs), DumpName: fmt.Sprintf("%s_%s.dump.gz", meta.Name, rs), @@ -356,25 +358,24 @@ func (r *ReconcilePerconaServerMongoDBBackup) deleteBackupFinalizer(ctx context. return nil } - var meta *pbm.BackupMeta + var meta *backup.BackupMeta var err error if b.pbm != nil { - meta, err = b.pbm.GetBackupMeta(cr.Status.PBMname) + meta, err = b.pbm.GetBackupMeta(ctx, cr.Status.PBMname) if err != nil { - if !errors.Is(err, pbm.ErrNotFound) { + if !errors.Is(err, pbmErrors.ErrNotFound) { return errors.Wrap(err, "get backup meta") } meta = nil } } if b.pbm == nil || meta == nil { - dummyPBM := new(pbm.PBM) // We need this only for the DeleteBackupFiles method, which doesn't use method receiver at all stg, err := r.getPBMStorage(ctx, cr) if err != nil { return errors.Wrap(err, "get storage") } - if err := dummyPBM.DeleteBackupFiles(getPBMBackupMeta(cr), stg); err != nil { + if err := pbmBackup.DeleteBackupFiles(getPBMBackupMeta(cr), stg); err != nil { return errors.Wrap(err, "failed to delete backup files with dummy PBM") } return nil @@ -398,14 +399,14 @@ func (r *ReconcilePerconaServerMongoDBBackup) deleteBackupFinalizer(ctx context. if err != nil { return errors.Wrapf(err, "set backup config with storage %s", cr.Spec.StorageName) } - e := b.pbm.Logger().NewEvent(string(pbm.CmdDeleteBackup), "", "", primitive.Timestamp{}) + e := b.pbm.Logger().NewEvent(string(ctrl.CmdDeleteBackup), "", "", primitive.Timestamp{}) // We should delete PITR oplog chunks until `LastWriteTS` of the backup, // as it's not possible to delete backup if it is a base for the PITR timeline err = r.deletePITR(ctx, b, meta.LastWriteTS, e) if err != nil { return errors.Wrap(err, "failed to delete PITR") } - err = b.pbm.DeleteBackup(cr.Status.PBMname, e) + err = b.pbm.DeleteBackup(ctx, cr.Status.PBMname) if err != nil { return errors.Wrap(err, "failed to delete backup") } @@ -413,15 +414,15 @@ func (r *ReconcilePerconaServerMongoDBBackup) deleteBackupFinalizer(ctx context. } // deletePITR deletes PITR oplog chunks whose StartTS is less or equal to the `until` timestamp. Deletes all chunks if `until` is 0. -func (r *ReconcilePerconaServerMongoDBBackup) deletePITR(ctx context.Context, b *Backup, until primitive.Timestamp, e *pbmLog.Event) error { +func (r *ReconcilePerconaServerMongoDBBackup) deletePITR(ctx context.Context, b *Backup, until primitive.Timestamp, e pbmLog.LogEvent) error { log := logf.FromContext(ctx) - stg, err := b.pbm.GetStorage(e) + stg, err := b.pbm.GetStorage(ctx, e) if err != nil { return errors.Wrap(err, "get storage") } - chunks, err := b.pbm.PITRGetChunksSlice("", primitive.Timestamp{}, until) + chunks, err := b.pbm.PITRGetChunksSlice(ctx, "", primitive.Timestamp{}, until) if err != nil { return errors.Wrap(err, "get pitr chunks") } @@ -435,7 +436,7 @@ func (r *ReconcilePerconaServerMongoDBBackup) deletePITR(ctx context.Context, b return errors.Wrapf(err, "delete pitr chunk '%s' (%v) from storage", chnk.FName, chnk) } - _, err = b.pbm.Conn().Database(pbm.DB).Collection(pbm.PITRChunksCollection).DeleteOne( + _, err = b.pbm.PITRChunksCollection().DeleteOne( ctx, bson.D{ {Key: "rs", Value: chnk.RS}, diff --git a/pkg/controller/perconaservermongodbrestore/logical.go b/pkg/controller/perconaservermongodbrestore/logical.go index dedb53577e..e94bb0a350 100644 --- a/pkg/controller/perconaservermongodbrestore/logical.go +++ b/pkg/controller/perconaservermongodbrestore/logical.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/percona/percona-backup-mongodb/pbm" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson/primitive" appsv1 "k8s.io/api/apps/v1" @@ -13,6 +12,9 @@ import ( "k8s.io/apimachinery/pkg/types" logf "sigs.k8s.io/controller-runtime/pkg/log" + "github.com/percona/percona-backup-mongodb/pbm/ctrl" + "github.com/percona/percona-backup-mongodb/pbm/defs" + pbmErrors "github.com/percona/percona-backup-mongodb/pbm/errors" psmdbv1 "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" "github.com/percona/percona-server-mongodb-operator/pkg/psmdb/backup" "github.com/percona/percona-server-mongodb-operator/version" @@ -95,7 +97,7 @@ func (r *ReconcilePerconaServerMongoDBRestore) reconcileLogicalRestore(ctx conte return status, errors.Wrap(err, "set pbm config") } - isBlockedByPITR, err := pbmc.HasLocks(backup.IsPITRLock) + isBlockedByPITR, err := pbmc.HasLocks(ctx, backup.IsPITRLock) if err != nil { return status, errors.Wrap(err, "checking pbm pitr locks") } @@ -112,8 +114,8 @@ func (r *ReconcilePerconaServerMongoDBRestore) reconcileLogicalRestore(ctx conte return status, err } - meta, err := pbmc.GetRestoreMeta(cr.Status.PBMname) - if err != nil && !errors.Is(err, pbm.ErrNotFound) { + meta, err := pbmc.GetRestoreMeta(ctx, cr.Status.PBMname) + if err != nil && !errors.Is(err, pbmErrors.ErrNotFound) { return status, errors.Wrap(err, "get pbm metadata") } @@ -123,33 +125,33 @@ func (r *ReconcilePerconaServerMongoDBRestore) reconcileLogicalRestore(ctx conte } switch meta.Status { - case pbm.StatusError: + case defs.StatusError: status.State = psmdbv1.RestoreStateError status.Error = meta.Error - if err = reEnablePITR(pbmc, cluster.Spec.Backup); err != nil { + if err = reEnablePITR(ctx, pbmc, cluster.Spec.Backup); err != nil { return status, err } - case pbm.StatusDone: + case defs.StatusDone: status.State = psmdbv1.RestoreStateReady status.CompletedAt = &metav1.Time{ Time: time.Unix(meta.LastTransitionTS, 0), } - if err = reEnablePITR(pbmc, cluster.Spec.Backup); err != nil { + if err = reEnablePITR(ctx, pbmc, cluster.Spec.Backup); err != nil { return status, err } - case pbm.StatusStarting, pbm.StatusRunning: + case defs.StatusStarting, defs.StatusRunning: status.State = psmdbv1.RestoreStateRunning } return status, nil } -func reEnablePITR(pbm backup.PBM, backup psmdbv1.BackupSpec) (err error) { +func reEnablePITR(ctx context.Context, pbm backup.PBM, backup psmdbv1.BackupSpec) (err error) { if !backup.IsEnabledPITR() { return } - err = pbm.SetConfigVar("pitr.enabled", "true") + err = pbm.SetConfigVar(ctx, "pitr.enabled", "true") if err != nil { return } @@ -158,22 +160,22 @@ func reEnablePITR(pbm backup.PBM, backup psmdbv1.BackupSpec) (err error) { } func runRestore(ctx context.Context, backup string, pbmc backup.PBM, pitr *psmdbv1.PITRestoreSpec) (string, error) { - e := pbmc.Logger().NewEvent(string(pbm.CmdResync), "", "", primitive.Timestamp{}) - err := pbmc.ResyncStorage(e) + e := pbmc.Logger().NewEvent(string(ctrl.CmdResync), "", "", primitive.Timestamp{}) + err := pbmc.ResyncStorage(ctx, e) if err != nil { return "", errors.Wrap(err, "set resync backup list from the store") } var ( - cmd pbm.Cmd + cmd ctrl.Cmd rName = time.Now().UTC().Format(time.RFC3339Nano) ) switch { case pitr == nil: - cmd = pbm.Cmd{ - Cmd: pbm.CmdRestore, - Restore: &pbm.RestoreCmd{ + cmd = ctrl.Cmd{ + Cmd: ctrl.CmdRestore, + Restore: &ctrl.RestoreCmd{ Name: rName, BackupName: backup, }, @@ -185,23 +187,23 @@ func runRestore(ctx context.Context, backup string, pbmc backup.PBM, pitr *psmdb return "", err } - cmd = pbm.Cmd{ - Cmd: pbm.CmdRestore, - Restore: &pbm.RestoreCmd{ + cmd = ctrl.Cmd{ + Cmd: ctrl.CmdRestore, + Restore: &ctrl.RestoreCmd{ Name: rName, BackupName: backup, OplogTS: primitive.Timestamp{T: uint32(ts)}, }, } case pitr.Type == psmdbv1.PITRestoreTypeLatest: - tl, err := pbmc.GetLatestTimelinePITR() + tl, err := pbmc.GetLatestTimelinePITR(ctx) if err != nil { return "", err } - cmd = pbm.Cmd{ - Cmd: pbm.CmdRestore, - Restore: &pbm.RestoreCmd{ + cmd = ctrl.Cmd{ + Cmd: ctrl.CmdRestore, + Restore: &ctrl.RestoreCmd{ Name: rName, BackupName: backup, OplogTS: primitive.Timestamp{T: tl.End}, @@ -209,7 +211,7 @@ func runRestore(ctx context.Context, backup string, pbmc backup.PBM, pitr *psmdb } } - if err = pbmc.SendCmd(cmd); err != nil { + if err = pbmc.SendCmd(ctx, cmd); err != nil { return "", errors.Wrap(err, "send restore cmd") } diff --git a/pkg/controller/perconaservermongodbrestore/perconaservermongodbrestore_controller.go b/pkg/controller/perconaservermongodbrestore/perconaservermongodbrestore_controller.go index 29c08e23a9..04fbbdf480 100644 --- a/pkg/controller/perconaservermongodbrestore/perconaservermongodbrestore_controller.go +++ b/pkg/controller/perconaservermongodbrestore/perconaservermongodbrestore_controller.go @@ -6,7 +6,6 @@ import ( "strings" "time" - "github.com/percona/percona-backup-mongodb/pbm" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -23,6 +22,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + "github.com/percona/percona-backup-mongodb/pbm/defs" "github.com/percona/percona-server-mongodb-operator/clientcmd" psmdbv1 "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" "github.com/percona/percona-server-mongodb-operator/pkg/psmdb/backup" @@ -161,12 +161,12 @@ func (r *ReconcilePerconaServerMongoDBRestore) Reconcile(ctx context.Context, re } switch bcp.Status.Type { - case "", pbm.LogicalBackup: + case "", defs.LogicalBackup: status, err = r.reconcileLogicalRestore(ctx, cr, bcp) if err != nil { return rr, errors.Wrap(err, "reconcile logical restore") } - case pbm.PhysicalBackup: + case defs.PhysicalBackup: status, err = r.reconcilePhysicalRestore(ctx, cr, bcp) if err != nil { return rr, errors.Wrap(err, "reconcile physical restore") diff --git a/pkg/controller/perconaservermongodbrestore/physical.go b/pkg/controller/perconaservermongodbrestore/physical.go index 422977d962..7f0f483fa0 100644 --- a/pkg/controller/perconaservermongodbrestore/physical.go +++ b/pkg/controller/perconaservermongodbrestore/physical.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "github.com/percona/percona-backup-mongodb/pbm" "github.com/pkg/errors" "gopkg.in/yaml.v2" appsv1 "k8s.io/api/apps/v1" @@ -22,6 +21,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" + "github.com/percona/percona-backup-mongodb/pbm/defs" psmdbv1 "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" "github.com/percona/percona-server-mongodb-operator/pkg/psmdb" "github.com/percona/percona-server-mongodb-operator/pkg/psmdb/backup" @@ -262,7 +262,7 @@ func (r *ReconcilePerconaServerMongoDBRestore) reconcilePhysicalRestore(ctx cont return status, nil } - meta := pbm.BackupMeta{} + meta := backup.BackupMeta{} err = retry.OnError(retry.DefaultBackoff, func(err error) bool { return strings.Contains(err.Error(), "container is not created or running") || strings.Contains(err.Error(), "error dialing backend: No agent available") @@ -300,30 +300,30 @@ func (r *ReconcilePerconaServerMongoDBRestore) reconcilePhysicalRestore(ctx cont log.V(1).Info("PBM restore status", "status", meta) switch meta.Status { - case pbm.StatusStarting: + case defs.StatusStarting: for _, rs := range meta.Replsets { - if rs.Status == pbm.StatusRunning { + if rs.Status == defs.StatusRunning { status.State = psmdbv1.RestoreStateRunning return status, nil } } - case pbm.StatusError: + case defs.StatusError: status.State = psmdbv1.RestoreStateError status.Error = meta.Err - case pbm.StatusPartlyDone: + case defs.StatusPartlyDone: status.State = psmdbv1.RestoreStateError var pbmErr string for _, rs := range meta.Replsets { - if rs.Status == pbm.StatusError { + if rs.Status == defs.StatusError { pbmErr += fmt.Sprintf("%s %s;", rs.Name, rs.Error) } } status.Error = pbmErr - case pbm.StatusRunning: + case defs.StatusRunning: status.State = psmdbv1.RestoreStateRunning - case pbm.StatusDone: + case defs.StatusDone: for _, rs := range meta.Replsets { - if rs.Status == pbm.StatusDone { + if rs.Status == defs.StatusDone { continue } diff --git a/pkg/psmdb/backup/backup.go b/pkg/psmdb/backup/backup.go index 1bdc4c2d8f..9fd2618242 100644 --- a/pkg/psmdb/backup/backup.go +++ b/pkg/psmdb/backup/backup.go @@ -101,7 +101,7 @@ func HasActiveJobs(ctx context.Context, newPBMFunc NewPBMFunc, cl client.Client, defer pbm.Close(ctx) allowLock = append(allowLock, NotJobLock(current)) - hasLocks, err := pbm.HasLocks(allowLock...) + hasLocks, err := pbm.HasLocks(ctx, allowLock...) if err != nil { return false, errors.Wrap(err, "check PBM locks") } diff --git a/pkg/psmdb/backup/fake/pbm.go b/pkg/psmdb/backup/fake/pbm.go index 99ea8cc164..d5f3fe229d 100644 --- a/pkg/psmdb/backup/fake/pbm.go +++ b/pkg/psmdb/backup/fake/pbm.go @@ -3,8 +3,10 @@ package fake import ( "context" - "github.com/percona/percona-backup-mongodb/pbm" + "github.com/percona/percona-backup-mongodb/pbm/ctrl" pbmLog "github.com/percona/percona-backup-mongodb/pbm/log" + "github.com/percona/percona-backup-mongodb/pbm/oplog" + "github.com/percona/percona-backup-mongodb/pbm/restore" "github.com/percona/percona-backup-mongodb/pbm/storage" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" @@ -23,55 +25,59 @@ func NewPBM(_ context.Context, _ client.Client, _ *api.PerconaServerMongoDB) (ba func (p *fakePBM) Conn() *mongo.Client { return nil } -func (p *fakePBM) GetPITRChunkContains(ctx context.Context, unixTS int64) (*pbm.OplogChunk, error) { +func (p *fakePBM) GetPITRChunkContains(ctx context.Context, unixTS int64) (*oplog.OplogChunk, error) { return nil, nil } -func (p *fakePBM) GetLatestTimelinePITR() (pbm.Timeline, error) { - return pbm.Timeline{}, nil +func (p *fakePBM) GetLatestTimelinePITR(ctx context.Context) (oplog.Timeline, error) { + return oplog.Timeline{}, nil } -func (p *fakePBM) PITRGetChunksSlice(rs string, from, to primitive.Timestamp) ([]pbm.OplogChunk, error) { +func (p *fakePBM) PITRGetChunksSlice(ctx context.Context, rs string, from, to primitive.Timestamp) ([]oplog.OplogChunk, error) { return nil, nil } -func (p *fakePBM) Logger() *pbmLog.Logger { +func (b *fakePBM) PITRChunksCollection() *mongo.Collection { return nil } -func (p *fakePBM) GetStorage(l *pbmLog.Event) (storage.Storage, error) { + +func (p *fakePBM) Logger() pbmLog.Logger { + return nil +} +func (p *fakePBM) GetStorage(ctx context.Context, e pbmLog.LogEvent) (storage.Storage, error) { return nil, nil } -func (p *fakePBM) ResyncStorage(l *pbmLog.Event) error { +func (p *fakePBM) ResyncStorage(ctx context.Context, l pbmLog.LogEvent) error { return nil } -func (p *fakePBM) SendCmd(cmd pbm.Cmd) error { +func (p *fakePBM) SendCmd(ctx context.Context, cmd ctrl.Cmd) error { return nil } func (p *fakePBM) Close(ctx context.Context) error { return nil } -func (p *fakePBM) HasLocks(predicates ...backup.LockHeaderPredicate) (bool, error) { +func (p *fakePBM) HasLocks(ctx context.Context, predicates ...backup.LockHeaderPredicate) (bool, error) { return false, nil } -func (p *fakePBM) GetRestoreMeta(name string) (*pbm.RestoreMeta, error) { +func (p *fakePBM) GetRestoreMeta(ctx context.Context, name string) (*restore.RestoreMeta, error) { return nil, nil } -func (p *fakePBM) GetBackupMeta(name string) (*pbm.BackupMeta, error) { +func (p *fakePBM) GetBackupMeta(ctx context.Context, bcpName string) (*backup.BackupMeta, error) { return nil, nil } -func (p *fakePBM) DeleteBackup(name string, l *pbmLog.Event) error { +func (p *fakePBM) DeleteBackup(ctx context.Context, name string) error { return nil } func (p *fakePBM) SetConfig(ctx context.Context, k8sclient client.Client, cluster *api.PerconaServerMongoDB, stg api.BackupStorageSpec) error { return nil } -func (p *fakePBM) SetConfigVar(key, val string) error { +func (p *fakePBM) SetConfigVar(ctx context.Context, key, val string) error { return nil } -func (p *fakePBM) GetConfigVar(key string) (any, error) { +func (p *fakePBM) GetConfigVar(ctx context.Context, key string) (any, error) { return nil, nil } -func (p *fakePBM) DeleteConfigVar(key string) error { +func (p *fakePBM) DeleteConfigVar(ctx context.Context, key string) error { return nil } -func (p *fakePBM) Node() (string, error) { +func (p *fakePBM) Node(ctx context.Context) (string, error) { return "", nil } diff --git a/pkg/psmdb/backup/job.go b/pkg/psmdb/backup/job.go index 73f955f159..de2cde2193 100644 --- a/pkg/psmdb/backup/job.go +++ b/pkg/psmdb/backup/job.go @@ -9,7 +9,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/percona/percona-backup-mongodb/pbm" + "github.com/percona/percona-backup-mongodb/pbm/defs" api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" "github.com/pkg/errors" ) @@ -104,7 +104,7 @@ func BackupFromTask(cr *api.PerconaServerMongoDB, task *api.BackupTaskSpec) (*ap if len(shortClusterName) > 16 { shortClusterName = shortClusterName[:16] } - backupType := pbm.LogicalBackup + backupType := defs.LogicalBackup if len(task.Type) > 0 { backupType = task.Type } diff --git a/pkg/psmdb/backup/pbm.go b/pkg/psmdb/backup/pbm.go index 5272590abd..afc6d764d9 100644 --- a/pkg/psmdb/backup/pbm.go +++ b/pkg/psmdb/backup/pbm.go @@ -8,11 +8,6 @@ import ( "strings" "time" - "github.com/percona/percona-backup-mongodb/pbm" - pbmLog "github.com/percona/percona-backup-mongodb/pbm/log" - "github.com/percona/percona-backup-mongodb/pbm/storage" - "github.com/percona/percona-backup-mongodb/pbm/storage/azure" - "github.com/percona/percona-backup-mongodb/pbm/storage/s3" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -22,6 +17,20 @@ import ( "k8s.io/apimachinery/pkg/types" client "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/percona/percona-backup-mongodb/pbm/backup" + "github.com/percona/percona-backup-mongodb/pbm/config" + "github.com/percona/percona-backup-mongodb/pbm/connect" + "github.com/percona/percona-backup-mongodb/pbm/ctrl" + "github.com/percona/percona-backup-mongodb/pbm/lock" + pbmLog "github.com/percona/percona-backup-mongodb/pbm/log" + "github.com/percona/percona-backup-mongodb/pbm/oplog" + "github.com/percona/percona-backup-mongodb/pbm/restore" + "github.com/percona/percona-backup-mongodb/pbm/resync" + "github.com/percona/percona-backup-mongodb/pbm/storage" + "github.com/percona/percona-backup-mongodb/pbm/storage/azure" + "github.com/percona/percona-backup-mongodb/pbm/storage/s3" + "github.com/percona/percona-backup-mongodb/pbm/topo" + "github.com/percona/percona-backup-mongodb/pbm/util" api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" "github.com/percona/percona-server-mongodb-operator/pkg/psmdb" ) @@ -35,36 +44,41 @@ const ( ) type pbmC struct { - *pbm.PBM + connect.Client + pbmLogger pbmLog.Logger k8c client.Client namespace string rsName string } +type BackupMeta = backup.BackupMeta + type PBM interface { Conn() *mongo.Client - GetPITRChunkContains(ctx context.Context, unixTS int64) (*pbm.OplogChunk, error) - GetLatestTimelinePITR() (pbm.Timeline, error) - PITRGetChunksSlice(rs string, from, to primitive.Timestamp) ([]pbm.OplogChunk, error) + GetPITRChunkContains(ctx context.Context, unixTS int64) (*oplog.OplogChunk, error) + GetLatestTimelinePITR(ctx context.Context) (oplog.Timeline, error) + PITRGetChunksSlice(ctx context.Context, rs string, from, to primitive.Timestamp) ([]oplog.OplogChunk, error) + PITRChunksCollection() *mongo.Collection - Logger() *pbmLog.Logger - GetStorage(l *pbmLog.Event) (storage.Storage, error) - ResyncStorage(l *pbmLog.Event) error - SendCmd(cmd pbm.Cmd) error + Logger() pbmLog.Logger + GetStorage(ctx context.Context, e pbmLog.LogEvent) (storage.Storage, error) + ResyncStorage(ctx context.Context, e pbmLog.LogEvent) error + SendCmd(ctx context.Context, cmd ctrl.Cmd) error Close(ctx context.Context) error - HasLocks(predicates ...LockHeaderPredicate) (bool, error) + HasLocks(ctx context.Context, predicates ...LockHeaderPredicate) (bool, error) - GetRestoreMeta(name string) (*pbm.RestoreMeta, error) - GetBackupMeta(name string) (*pbm.BackupMeta, error) - DeleteBackup(name string, l *pbmLog.Event) error + GetBackupMeta(ctx context.Context, bcpName string) (*backup.BackupMeta, error) + GetRestoreMeta(ctx context.Context, name string) (*restore.RestoreMeta, error) + + DeleteBackup(ctx context.Context, name string) error SetConfig(ctx context.Context, k8sclient client.Client, cluster *api.PerconaServerMongoDB, stg api.BackupStorageSpec) error - SetConfigVar(key, val string) error - GetConfigVar(key string) (any, error) - DeleteConfigVar(key string) error + SetConfigVar(ctx context.Context, key, val string) error + GetConfigVar(ctx context.Context, key string) (any, error) + DeleteConfigVar(ctx context.Context, key string) error - Node() (string, error) + Node(ctx context.Context) (string, error) } func getMongoUri(ctx context.Context, k8sclient client.Client, cr *api.PerconaServerMongoDB, addrs []string) (string, error) { @@ -151,15 +165,14 @@ func NewPBM(ctx context.Context, c client.Client, cluster *api.PerconaServerMong return nil, errors.Wrap(err, "get mongo uri") } - pbmc, err := pbm.New(ctx, murl, "operator-pbm-ctl") + pbmc, err := connect.Connect(ctx, murl, &connect.ConnectOptions{AppName: "operator-pbm-ctl"}) if err != nil { return nil, errors.Wrapf(err, "create PBM connection to %s", strings.Join(addrs, ",")) } - pbmc.InitLogger("", "") - return &pbmC{ - PBM: pbmc, + Client: pbmc, + pbmLogger: pbmLog.New(pbmc.LogCollection(), "", ""), k8c: c, namespace: cluster.Namespace, rsName: rs.Name, @@ -211,28 +224,28 @@ func GetPriorities(ctx context.Context, k8sclient client.Client, cluster *api.Pe return priorities, nil } -func GetPBMConfig(ctx context.Context, k8sclient client.Client, cluster *api.PerconaServerMongoDB, stg api.BackupStorageSpec) (pbm.Config, error) { - conf := pbm.Config{} +func GetPBMConfig(ctx context.Context, k8sclient client.Client, cluster *api.PerconaServerMongoDB, stg api.BackupStorageSpec) (config.Config, error) { + conf := config.Config{} priority, err := GetPriorities(ctx, k8sclient, cluster) if err != nil { return conf, errors.Wrap(err, "get priorities") } - conf = pbm.Config{ - PITR: pbm.PITRConf{ + conf = config.Config{ + PITR: config.PITRConf{ Enabled: cluster.Spec.Backup.PITR.Enabled, Compression: cluster.Spec.Backup.PITR.CompressionType, CompressionLevel: cluster.Spec.Backup.PITR.CompressionLevel, }, - Backup: pbm.BackupConf{ + Backup: config.BackupConf{ Priority: priority, }, } switch stg.Type { case api.BackupStorageS3: - conf.Storage = pbm.StorageConf{ + conf.Storage = config.StorageConf{ Type: storage.S3, S3: s3.Conf{ Region: stg.S3.Region, @@ -279,7 +292,7 @@ func GetPBMConfig(ctx context.Context, k8sclient client.Client, cluster *api.Per if err != nil { return conf, errors.Wrap(err, "get azure credentials secret") } - conf.Storage = pbm.StorageConf{ + conf.Storage = config.StorageConf{ Type: storage.Azure, Azure: azure.Conf{ Account: string(azureSecret.Data[AzureStorageAccountNameSecretKey]), @@ -300,7 +313,7 @@ func GetPBMConfig(ctx context.Context, k8sclient client.Client, cluster *api.Per } func (b *pbmC) Conn() *mongo.Client { - return b.PBM.Conn + return b.Client.MongoClient() } // SetConfig sets the pbm config with storage defined in the cluster CR @@ -311,7 +324,7 @@ func (b *pbmC) SetConfig(ctx context.Context, k8sclient client.Client, cluster * return errors.Wrap(err, "get PBM config") } - if err := b.PBM.SetConfig(conf); err != nil { + if err := config.SetConfig(ctx, b.Client, conf); err != nil { return errors.Wrap(err, "write config") } @@ -321,7 +334,11 @@ func (b *pbmC) SetConfig(ctx context.Context, k8sclient client.Client, cluster * // Close close the PBM connection func (b *pbmC) Close(ctx context.Context) error { - return b.PBM.Conn.Disconnect(ctx) + return b.Client.Disconnect(ctx) +} + +func (b *pbmC) Logger() pbmLog.Logger { + return b.pbmLogger } func getSecret(ctx context.Context, cl client.Client, namespace, secretName string) (*corev1.Secret, error) { @@ -335,27 +352,27 @@ func getSecret(ctx context.Context, cl client.Client, namespace, secretName stri return secret, err } -type LockHeaderPredicate func(pbm.LockHeader) bool +type LockHeaderPredicate func(lock.LockHeader) bool -func NotPITRLock(l pbm.LockHeader) bool { - return l.Type != pbm.CmdPITR +func NotPITRLock(l lock.LockHeader) bool { + return l.Type != ctrl.CmdPITR } -func IsPITRLock(l pbm.LockHeader) bool { - return l.Type == pbm.CmdPITR +func IsPITRLock(l lock.LockHeader) bool { + return l.Type == ctrl.CmdPITR } func NotJobLock(j Job) LockHeaderPredicate { - return func(h pbm.LockHeader) bool { - var jobCommand pbm.Command + return func(h lock.LockHeader) bool { + var jobCommand ctrl.Command switch j.Type { case TypeBackup: - jobCommand = pbm.CmdBackup + jobCommand = ctrl.CmdBackup case TypeRestore: - jobCommand = pbm.CmdRestore + jobCommand = ctrl.CmdRestore case TypePITRestore: - jobCommand = pbm.CmdRestore + jobCommand = ctrl.CmdRestore default: return true } @@ -364,13 +381,13 @@ func NotJobLock(j Job) LockHeaderPredicate { } } -func (b *pbmC) HasLocks(predicates ...LockHeaderPredicate) (bool, error) { - locks, err := b.PBM.GetLocks(&pbm.LockHeader{}) +func (b *pbmC) HasLocks(ctx context.Context, predicates ...LockHeaderPredicate) (bool, error) { + locks, err := lock.GetLocks(ctx, b.Client, &lock.LockHeader{}) if err != nil { return false, errors.Wrap(err, "getting lock data") } - allowedByAllPredicates := func(l pbm.LockHeader) bool { + allowedByAllPredicates := func(l lock.LockHeader) bool { for _, allow := range predicates { if !allow(l) { return false @@ -390,13 +407,13 @@ func (b *pbmC) HasLocks(predicates ...LockHeaderPredicate) (bool, error) { var errNoOplogsForPITR = errors.New("there is no oplogs that can cover the date/time or no oplogs at all") -func (b *pbmC) GetLastPITRChunk() (*pbm.OplogChunk, error) { - nodeInfo, err := b.PBM.GetNodeInfo() +func (b *pbmC) GetLastPITRChunk(ctx context.Context) (*oplog.OplogChunk, error) { + nodeInfo, err := topo.GetNodeInfo(context.TODO(), b.Client.MongoClient()) if err != nil { return nil, errors.Wrap(err, "getting node information") } - c, err := b.PBM.PITRLastChunkMeta(nodeInfo.SetName) + c, err := oplog.PITRLastChunkMeta(ctx, b.Client, nodeInfo.SetName) if err != nil { if errors.Is(err, mongo.ErrNoDocuments) { return nil, errNoOplogsForPITR @@ -411,19 +428,19 @@ func (b *pbmC) GetLastPITRChunk() (*pbm.OplogChunk, error) { return c, nil } -func (b *pbmC) GetTimelinesPITR() ([]pbm.Timeline, error) { +func (b *pbmC) GetTimelinesPITR(ctx context.Context) ([]oplog.Timeline, error) { var ( now = time.Now().UTC().Unix() - timelines [][]pbm.Timeline + timelines [][]oplog.Timeline ) - shards, err := b.PBM.ClusterMembers() + shards, err := topo.ClusterMembers(ctx, b.Client.MongoClient()) if err != nil { return nil, errors.Wrap(err, "getting cluster members") } for _, s := range shards { - rsTimelines, err := b.PBM.PITRGetValidTimelines(s.RS, primitive.Timestamp{T: uint32(now)}) + rsTimelines, err := oplog.PITRGetValidTimelines(ctx, b.Client, s.RS, primitive.Timestamp{T: uint32(now)}) if err != nil { return nil, errors.Wrapf(err, "getting timelines for %s", s.RS) } @@ -431,17 +448,17 @@ func (b *pbmC) GetTimelinesPITR() ([]pbm.Timeline, error) { timelines = append(timelines, rsTimelines) } - return pbm.MergeTimelines(timelines...), nil + return oplog.MergeTimelines(timelines...), nil } -func (b *pbmC) GetLatestTimelinePITR() (pbm.Timeline, error) { - timelines, err := b.GetTimelinesPITR() +func (b *pbmC) GetLatestTimelinePITR(ctx context.Context) (oplog.Timeline, error) { + timelines, err := b.GetTimelinesPITR(ctx) if err != nil { - return pbm.Timeline{}, err + return oplog.Timeline{}, err } if len(timelines) == 0 { - return pbm.Timeline{}, errNoOplogsForPITR + return oplog.Timeline{}, errNoOplogsForPITR } return timelines[len(timelines)-1], nil @@ -449,8 +466,8 @@ func (b *pbmC) GetLatestTimelinePITR() (pbm.Timeline, error) { // PITRGetChunkContains returns a pitr slice chunk that belongs to the // given replica set and contains the given timestamp -func (p *pbmC) pitrGetChunkContains(ctx context.Context, rs string, ts primitive.Timestamp) (*pbm.OplogChunk, error) { - res := p.PBM.Conn.Database(pbm.DB).Collection(pbm.PITRChunksCollection).FindOne( +func (b *pbmC) pitrGetChunkContains(ctx context.Context, rs string, ts primitive.Timestamp) (*oplog.OplogChunk, error) { + res := b.Client.PITRChunksCollection().FindOne( ctx, bson.D{ {"rs", rs}, @@ -462,13 +479,13 @@ func (p *pbmC) pitrGetChunkContains(ctx context.Context, rs string, ts primitive return nil, errors.Wrap(res.Err(), "get") } - chnk := new(pbm.OplogChunk) + chnk := new(oplog.OplogChunk) err := res.Decode(chnk) return chnk, errors.Wrap(err, "decode") } -func (b *pbmC) GetPITRChunkContains(ctx context.Context, unixTS int64) (*pbm.OplogChunk, error) { - nodeInfo, err := b.PBM.GetNodeInfo() +func (b *pbmC) GetPITRChunkContains(ctx context.Context, unixTS int64) (*oplog.OplogChunk, error) { + nodeInfo, err := topo.GetNodeInfo(ctx, b.Client.MongoClient()) if err != nil { return nil, errors.Wrap(err, "getting node information") } @@ -488,12 +505,57 @@ func (b *pbmC) GetPITRChunkContains(ctx context.Context, unixTS int64) (*pbm.Opl return c, nil } +func (b *pbmC) PITRGetChunksSlice(ctx context.Context, rsName string, from, to primitive.Timestamp) ([]oplog.OplogChunk, error) { + return oplog.PITRGetChunksSlice(ctx, b.Client, rsName, from, to) +} + // Node returns replset node chosen to run the backup -func (p *pbmC) Node() (string, error) { - lock, err := p.GetLockData(&pbm.LockHeader{Replset: p.rsName}) +func (b *pbmC) Node(ctx context.Context) (string, error) { + lock, err := lock.GetLockData(ctx, b.Client, &lock.LockHeader{Replset: b.rsName}) if err != nil { return "", err } return strings.Split(lock.Node, ".")[0], nil } + +func (b *pbmC) GetStorage(ctx context.Context, e pbmLog.LogEvent) (storage.Storage, error) { + return util.GetStorage(ctx, b.Client, e) +} + +func (b *pbmC) GetConfigVar(ctx context.Context, key string) (any, error) { + return config.GetConfigVar(ctx, b.Client, key) +} + +func (b *pbmC) SetConfigVar(ctx context.Context, key, val string) error { + return config.SetConfigVar(ctx, b.Client, key, val) +} + +func (b *pbmC) DeleteConfigVar(ctx context.Context, key string) error { + return config.DeleteConfigVar(ctx, b.Client, key) +} + +func (b *pbmC) GetBackupMeta(ctx context.Context, bcpName string) (*backup.BackupMeta, error) { + return backup.NewDBManager(b.Client).GetBackupByName(ctx, bcpName) +} + +func (b *pbmC) DeleteBackup(ctx context.Context, name string) error { + return backup.DeleteBackup(ctx, b.Client, name) +} + +func (b *pbmC) GetRestoreMeta(ctx context.Context, name string) (*restore.RestoreMeta, error) { + return restore.GetRestoreMeta(ctx, b.Client, name) +} +func (b *pbmC) ResyncStorage(ctx context.Context, e pbmLog.LogEvent) error { + return resync.ResyncStorage(ctx, b.Client, e) +} + +func (b *pbmC) SendCmd(ctx context.Context, cmd ctrl.Cmd) error { + cmd.TS = time.Now().UTC().Unix() + _, err := b.CmdStreamCollection().InsertOne(ctx, cmd) + return err +} + +func (b *pbmC) PITRChunksCollection() *mongo.Collection { + return b.Client.PITRChunksCollection() +}