From 511457f6ee5494f891e4db6549c7655df3297aa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ege=20G=C3=BCne=C5=9F?= Date: Fri, 17 Jan 2025 17:39:59 +0300 Subject: [PATCH 1/3] K8SPXC-1366: Improve parallel backup prevention With these changes, pxcbackup controller will create a lease object for a running backup and no other backup will be able to start until this lease is released (deleted). Lock is released when backup succeeds or fails. Users can disable this behavior by setting `spec.backup.allowParallel` to true, by default this option is enabled. Also, we introduce a new finalizer called `internal.percona.com/release-lock` for PerconaXtraDBClusterBackup objects. Operator will automatically add this finalizer to each new backup object if it doesn't exist. This finalizer will release the lock if user deletes a running backup object. --- .../demand-backup-parallel/conf/backup.yml | 7 + e2e-tests/demand-backup-parallel/conf/cr.yml | 88 +++++++ e2e-tests/demand-backup-parallel/run | 64 +++++ e2e-tests/functions | 11 +- e2e-tests/run-pr.csv | 1 + e2e-tests/run-release.csv | 1 + pkg/apis/pxc/v1/pxc_backup_types.go | 1 + pkg/controller/pxc/controller.go | 51 +--- pkg/controller/pxc/pitr.go | 2 +- pkg/controller/pxc/tls.go | 2 +- pkg/controller/pxc/upgrade.go | 4 +- pkg/controller/pxcbackup/controller.go | 218 +++++++++--------- pkg/k8s/k8s_suite_test.go | 13 ++ pkg/k8s/lease.go | 56 +++++ pkg/k8s/lease_test.go | 73 ++++++ pkg/naming/backup.go | 4 + pkg/naming/naming.go | 4 +- .../app/binlogcollector/binlog-collector.go | 4 +- pkg/pxc/backup/pitr.go | 2 +- 19 files changed, 449 insertions(+), 157 deletions(-) create mode 100644 e2e-tests/demand-backup-parallel/conf/backup.yml create mode 100644 e2e-tests/demand-backup-parallel/conf/cr.yml create mode 100755 e2e-tests/demand-backup-parallel/run create mode 100644 pkg/k8s/k8s_suite_test.go create mode 100644 pkg/k8s/lease.go create mode 100644 pkg/k8s/lease_test.go diff --git a/e2e-tests/demand-backup-parallel/conf/backup.yml b/e2e-tests/demand-backup-parallel/conf/backup.yml new file mode 100644 index 000000000..17d3cac81 --- /dev/null +++ b/e2e-tests/demand-backup-parallel/conf/backup.yml @@ -0,0 +1,7 @@ +apiVersion: pxc.percona.com/v1 +kind: PerconaXtraDBClusterBackup +metadata: + name: +spec: + pxcCluster: demand-backup-parallel + storageName: minio diff --git a/e2e-tests/demand-backup-parallel/conf/cr.yml b/e2e-tests/demand-backup-parallel/conf/cr.yml new file mode 100644 index 000000000..677b9936f --- /dev/null +++ b/e2e-tests/demand-backup-parallel/conf/cr.yml @@ -0,0 +1,88 @@ +apiVersion: pxc.percona.com/v1 +kind: PerconaXtraDBCluster +metadata: + name: demand-backup-parallel + finalizers: + - percona.com/delete-pxc-pods-in-order + # annotations: + # percona.com/issue-vault-token: "true" +spec: + tls: + SANs: + - "minio-service.#namespace" + secretsName: my-cluster-secrets + vaultSecretName: some-name-vault + pause: false + pxc: + size: 3 + image: -pxc + configuration: | + [mysqld] + wsrep_log_conflicts + log_error_verbosity=3 + wsrep_debug=1 + [sst] + xbstream-opts=--decompress + [xtrabackup] + compress=lz4 + resources: + requests: + memory: 0.1G + cpu: 100m + limits: + memory: "2G" + cpu: "1" + volumeSpec: + persistentVolumeClaim: + resources: + requests: + storage: 2Gi + affinity: + antiAffinityTopologyKey: "kubernetes.io/hostname" + haproxy: + enabled: true + size: 2 + image: -haproxy + resources: + requests: + memory: 0.1G + cpu: 100m + limits: + memory: 1G + cpu: 700m + affinity: + antiAffinityTopologyKey: "kubernetes.io/hostname" + pmm: + enabled: false + image: perconalab/pmm-client:1.17.1 + serverHost: monitoring-service + serverUser: pmm + backup: + activeDeadlineSeconds: 3600 + allowParallel: false + backoffLimit: 3 + image: -backup + storages: + pvc: + type: filesystem + volume: + persistentVolumeClaim: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 1Gi + minio: + type: s3 + resources: + requests: + memory: 0.5G + cpu: 500m + limits: + memory: "2G" + cpu: "1" + s3: + credentialsSecret: minio-secret + region: us-east-1 + bucket: operator-testing/prefix/subfolder + endpointUrl: http://minio-service.#namespace:9000/ + verifyTLS: false diff --git a/e2e-tests/demand-backup-parallel/run b/e2e-tests/demand-backup-parallel/run new file mode 100755 index 000000000..328cae51e --- /dev/null +++ b/e2e-tests/demand-backup-parallel/run @@ -0,0 +1,64 @@ +#!/bin/bash + +# This test checks if spec.backup.allowParallel=false works as expected. + +set -o errexit + +test_dir=$(realpath $(dirname $0)) +. ${test_dir}/../functions + +set_debug + +function run_backup() { + local name=$1 + yq eval ".metadata.name = \"${name}\"" ${test_dir}/conf/backup.yml \ + | kubectl_bin apply -f - +} + +function check_active_backup_count() { + active_backup_count=$(kubectl_bin get pxc-backup | grep -E 'Starting|Running' | wc -l) + if [[ ${active_backup_count} -gt 1 ]]; then + log "There are ${active_backup_count} active backups. 'allowParallel: false' doesn't work properly" + exit 1 + fi +} + +create_infra ${namespace} + +start_minio + +log "creating PXC client" +kubectl_bin apply -f ${conf_dir}/client.yml + +log "creating cluster secrets" +kubectl_bin apply -f ${conf_dir}/secrets.yml + +cluster="demand-backup-parallel" +log "create PXC cluster: ${cluster}" +apply_config ${test_dir}/conf/cr.yml + +desc 'creating backups' +run_backup backup1 +run_backup backup2 +run_backup backup3 +run_backup backup4 + +wait_cluster_consistency ${cluster} 3 2 +sleep 5 +check_active_backup_count + +for i in $(seq 0 3); do + sleep 5 + check_active_backup_count + holder=$(kubectl_bin get lease pxc-${cluster}-backup-lock -o jsonpath={.spec.holderIdentity}) + log "Backup lock holder: ${holder}" + wait_backup ${holder} +done + +# explicitly check all backups to ensure all succeeded +wait_backup backup1 +wait_backup backup2 +wait_backup backup3 +wait_backup backup4 + +log "test passed" diff --git a/e2e-tests/functions b/e2e-tests/functions index ff4fe01ef..51c7ee660 100755 --- a/e2e-tests/functions +++ b/e2e-tests/functions @@ -60,6 +60,11 @@ set_debug() { fi } +log() { + echo "[$(date +%Y-%m-%dT%H:%M:%S%z)]" $* +} + + HELM_VERSION=$(helm version -c | $sed -re 's/.*SemVer:"([^"]+)".*/\1/; s/.*\bVersion:"([^"]+)".*/\1/') if [ "${HELM_VERSION:0:2}" == "v2" ]; then HELM_ARGS="--name" @@ -98,10 +103,11 @@ wait_cluster_consistency() { local i=0 local max=36 sleep 7 # wait for two reconcile loops ;) 3 sec x 2 times + 1 sec = 7 seconds + echo -n "waiting for pxc/${cluster_name} to be ready" until [[ "$(kubectl_bin get pxc "${cluster_name}" -o jsonpath='{.status.state}')" == "ready" && "$(kubectl_bin get pxc "${cluster_name}" -o jsonpath='{.status.pxc.ready}')" == "${cluster_size}" && "$(kubectl_bin get pxc "${cluster_name}" -o jsonpath='{.status.'$(get_proxy_engine ${cluster_name})'.ready}')" == "${proxy_size}" ]]; do - echo 'waiting for cluster readyness' + echo -n . sleep 20 if [[ $i -ge $max ]]; then echo "Something went wrong waiting for cluster consistency!" @@ -109,6 +115,7 @@ wait_cluster_consistency() { fi let i+=1 done + echo } create_namespace() { @@ -235,7 +242,7 @@ wait_backup() { set +o xtrace retry=0 - echo -n $backup + echo -n "waiting for pxc-backup/${backup} to reach ${status} state" until kubectl_bin get pxc-backup/$backup -o jsonpath='{.status.state}' 2>/dev/null | grep $status; do sleep 1 echo -n . diff --git a/e2e-tests/run-pr.csv b/e2e-tests/run-pr.csv index 8a8bdfc34..84a7c595f 100644 --- a/e2e-tests/run-pr.csv +++ b/e2e-tests/run-pr.csv @@ -5,6 +5,7 @@ custom-users,8.0 demand-backup-cloud,8.0 demand-backup-encrypted-with-tls,8.0 demand-backup,8.0 +demand-backup-parallel,8.0 haproxy,5.7 haproxy,8.0 init-deploy,5.7 diff --git a/e2e-tests/run-release.csv b/e2e-tests/run-release.csv index 29456f42f..2f4e182e7 100644 --- a/e2e-tests/run-release.csv +++ b/e2e-tests/run-release.csv @@ -5,6 +5,7 @@ cross-site custom-users default-cr demand-backup +demand-backup-parallel demand-backup-cloud demand-backup-encrypted-with-tls haproxy diff --git a/pkg/apis/pxc/v1/pxc_backup_types.go b/pkg/apis/pxc/v1/pxc_backup_types.go index ce62c38c8..44b1a8e3f 100644 --- a/pkg/apis/pxc/v1/pxc_backup_types.go +++ b/pkg/apis/pxc/v1/pxc_backup_types.go @@ -162,6 +162,7 @@ type PXCBackupState string const ( BackupNew PXCBackupState = "" + BackupWaiting PXCBackupState = "Waiting" BackupStarting PXCBackupState = "Starting" BackupRunning PXCBackupState = "Running" BackupFailed PXCBackupState = "Failed" diff --git a/pkg/controller/pxc/controller.go b/pkg/controller/pxc/controller.go index 7cb18926f..b5c7b8574 100644 --- a/pkg/controller/pxc/controller.go +++ b/pkg/controller/pxc/controller.go @@ -17,7 +17,6 @@ import ( corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -26,7 +25,6 @@ import ( k8sretry "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -34,6 +32,7 @@ import ( "github.com/percona/percona-xtradb-cluster-operator/clientcmd" api "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1" + "github.com/percona/percona-xtradb-cluster-operator/pkg/k8s" "github.com/percona/percona-xtradb-cluster-operator/pkg/naming" "github.com/percona/percona-xtradb-cluster-operator/pkg/pxc" "github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/app" @@ -575,7 +574,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileConfigMap(cr *api.PerconaXtraDB return errors.Wrap(err, "new autotune configmap") } - err = setControllerReference(cr, configMap, r.scheme) + err = k8s.SetControllerReference(cr, configMap, r.scheme) if err != nil { return errors.Wrap(err, "set autotune configmap controller ref") } @@ -593,7 +592,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileConfigMap(cr *api.PerconaXtraDB pxcConfigName := config.CustomConfigMapName(cr.Name, "pxc") if cr.Spec.PXC.Configuration != "" { configMap := config.NewConfigMap(cr, pxcConfigName, "init.cnf", cr.Spec.PXC.Configuration) - err := setControllerReference(cr, configMap, r.scheme) + err := k8s.SetControllerReference(cr, configMap, r.scheme) if err != nil { return errors.Wrap(err, "set controller ref") } @@ -660,7 +659,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileConfigMap(cr *api.PerconaXtraDB if cr.Spec.ProxySQLEnabled() { if cr.Spec.ProxySQL.Configuration != "" { configMap := config.NewConfigMap(cr, proxysqlConfigName, "proxysql.cnf", cr.Spec.ProxySQL.Configuration) - err := setControllerReference(cr, configMap, r.scheme) + err := k8s.SetControllerReference(cr, configMap, r.scheme) if err != nil { return errors.Wrap(err, "set controller ref ProxySQL") } @@ -679,7 +678,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileConfigMap(cr *api.PerconaXtraDB haproxyConfigName := config.CustomConfigMapName(cr.Name, "haproxy") if cr.HAProxyEnabled() && cr.Spec.HAProxy.Configuration != "" { configMap := config.NewConfigMap(cr, haproxyConfigName, "haproxy-global.cfg", cr.Spec.HAProxy.Configuration) - err := setControllerReference(cr, configMap, r.scheme) + err := k8s.SetControllerReference(cr, configMap, r.scheme) if err != nil { return errors.Wrap(err, "set controller ref HAProxy") } @@ -697,7 +696,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileConfigMap(cr *api.PerconaXtraDB logCollectorConfigName := config.CustomConfigMapName(cr.Name, "logcollector") if cr.Spec.LogCollector != nil && cr.Spec.LogCollector.Configuration != "" { configMap := config.NewConfigMap(cr, logCollectorConfigName, "fluentbit_custom.conf", cr.Spec.LogCollector.Configuration) - err := setControllerReference(cr, configMap, r.scheme) + err := k8s.SetControllerReference(cr, configMap, r.scheme) if err != nil { return errors.Wrap(err, "set controller ref LogCollector") } @@ -716,7 +715,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileConfigMap(cr *api.PerconaXtraDB func (r *ReconcilePerconaXtraDBCluster) createHookScriptConfigMap(cr *api.PerconaXtraDBCluster, hookScript string, configMapName string) error { configMap := config.NewConfigMap(cr, configMapName, "hook.sh", hookScript) - err := setControllerReference(cr, configMap, r.scheme) + err := k8s.SetControllerReference(cr, configMap, r.scheme) if err != nil { return errors.Wrap(err, "set controller ref") } @@ -742,7 +741,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcilePDB(ctx context.Context, cr *ap } pdb := pxc.PodDisruptionBudget(cr, spec, sfs.Labels()) - if err := setControllerReference(sts, pdb, r.scheme); err != nil { + if err := k8s.SetControllerReference(sts, pdb, r.scheme); err != nil { return errors.Wrap(err, "set owner reference") } @@ -984,38 +983,6 @@ func (r *ReconcilePerconaXtraDBCluster) deleteCerts(ctx context.Context, cr *api return nil } -func setControllerReference(ro runtime.Object, obj metav1.Object, scheme *runtime.Scheme) error { - ownerRef, err := OwnerRef(ro, scheme) - if err != nil { - return err - } - obj.SetOwnerReferences(append(obj.GetOwnerReferences(), ownerRef)) - return nil -} - -// OwnerRef returns OwnerReference to object -func OwnerRef(ro runtime.Object, scheme *runtime.Scheme) (metav1.OwnerReference, error) { - gvk, err := apiutil.GVKForObject(ro, scheme) - if err != nil { - return metav1.OwnerReference{}, err - } - - trueVar := true - - ca, err := meta.Accessor(ro) - if err != nil { - return metav1.OwnerReference{}, err - } - - return metav1.OwnerReference{ - APIVersion: gvk.GroupVersion().String(), - Kind: gvk.Kind, - Name: ca.GetName(), - UID: ca.GetUID(), - Controller: &trueVar, - }, nil -} - // resyncPXCUsersWithProxySQL calls the method of synchronizing users and makes sure that only one Goroutine works at a time func (r *ReconcilePerconaXtraDBCluster) resyncPXCUsersWithProxySQL(ctx context.Context, cr *api.PerconaXtraDBCluster) { if !cr.Spec.ProxySQLEnabled() { @@ -1182,7 +1149,7 @@ func mergeMaps(x, y map[string]string) map[string]string { } func (r *ReconcilePerconaXtraDBCluster) createOrUpdateService(ctx context.Context, cr *api.PerconaXtraDBCluster, svc *corev1.Service, saveOldMeta bool) error { - err := setControllerReference(cr, svc, r.scheme) + err := k8s.SetControllerReference(cr, svc, r.scheme) if err != nil { return errors.Wrap(err, "set controller reference") } diff --git a/pkg/controller/pxc/pitr.go b/pkg/controller/pxc/pitr.go index dfe96196e..5147aeddd 100644 --- a/pkg/controller/pxc/pitr.go +++ b/pkg/controller/pxc/pitr.go @@ -25,7 +25,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileBinlogCollector(ctx context.Con return errors.Wrapf(err, "get binlog collector deployment for cluster '%s'", cr.Name) } - err = setControllerReference(cr, &collector, r.scheme) + err = k8s.SetControllerReference(cr, &collector, r.scheme) if err != nil { return errors.Wrapf(err, "set controller reference for binlog collector deployment '%s'", collector.Name) } diff --git a/pkg/controller/pxc/tls.go b/pkg/controller/pxc/tls.go index 3a133d2ab..16c753597 100644 --- a/pkg/controller/pxc/tls.go +++ b/pkg/controller/pxc/tls.go @@ -278,7 +278,7 @@ func (r *ReconcilePerconaXtraDBCluster) createSSLManualy(cr *api.PerconaXtraDBCl data["ca.crt"] = caCert data["tls.crt"] = tlsCert data["tls.key"] = key - owner, err := OwnerRef(cr, r.scheme) + owner, err := k8s.OwnerRef(cr, r.scheme) if err != nil { return err } diff --git a/pkg/controller/pxc/upgrade.go b/pkg/controller/pxc/upgrade.go index 0c232e2a4..f7f34b841 100644 --- a/pkg/controller/pxc/upgrade.go +++ b/pkg/controller/pxc/upgrade.go @@ -130,7 +130,7 @@ func (r *ReconcilePerconaXtraDBCluster) updatePod(ctx context.Context, sfs api.S sts.Spec.Template.Annotations = annotations sts.Spec.Template.Labels = labels - if err := setControllerReference(cr, sts, r.scheme); err != nil { + if err := k8s.SetControllerReference(cr, sts, r.scheme); err != nil { return errors.Wrap(err, "set controller reference") } err = r.createOrUpdate(ctx, cr, sts) @@ -168,12 +168,10 @@ func (r *ReconcilePerconaXtraDBCluster) smartUpdate(ctx context.Context, sfs api } if cr.HAProxyEnabled() && cr.Status.HAProxy.Status != api.AppStateReady { - log.V(1).Info("Waiting for HAProxy to be ready before smart update") return nil } if cr.ProxySQLEnabled() && cr.Status.ProxySQL.Status != api.AppStateReady { - log.V(1).Info("Waiting for ProxySQL to be ready before smart update") return nil } diff --git a/pkg/controller/pxcbackup/controller.go b/pkg/controller/pxcbackup/controller.go index b321473ce..613ec43f0 100644 --- a/pkg/controller/pxcbackup/controller.go +++ b/pkg/controller/pxcbackup/controller.go @@ -14,7 +14,6 @@ import ( corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" @@ -117,7 +116,7 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req // Fetch the PerconaXtraDBClusterBackup instance cr := &api.PerconaXtraDBClusterBackup{} - err := r.client.Get(context.TODO(), request.NamespacedName, cr) + err := r.client.Get(ctx, request.NamespacedName, cr) if err != nil { if k8sErrors.IsNotFound(err) { // Request object not found, could have been deleted after reconcile request. @@ -129,16 +128,21 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req return reconcile.Result{}, err } + err = r.ensureFinalizers(ctx, cr) + if err != nil { + return reconcile.Result{}, errors.Wrap(err, "ensure finalizers") + } + err = r.tryRunBackupFinalizers(ctx, cr) if err != nil { - return reconcile.Result{}, errors.Wrap(err, "failed to run finalizers") + return reconcile.Result{}, errors.Wrap(err, "run finalizers") } - if cr.Status.State == api.BackupSucceeded || - cr.Status.State == api.BackupFailed { + if cr.Status.State == api.BackupSucceeded || cr.Status.State == api.BackupFailed { if len(cr.GetFinalizers()) > 0 { return rr, nil } + return reconcile.Result{}, nil } @@ -146,12 +150,14 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req return rr, nil } - cluster, err := r.getCluster(cr) + cluster, err := r.getCluster(ctx, cr) if err != nil { log.Error(err, "invalid backup cluster") return rr, nil } + log = log.WithValues("cluster", cluster.Name) + err = cluster.CheckNSetDefaults(r.serverVersion, log) if err != nil { return rr, errors.Wrap(err, "wrong PXC options") @@ -162,24 +168,30 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req } if err := cluster.CanBackup(); err != nil { - return rr, errors.Wrap(err, "failed to run backup") + log.Info("Cluster is not ready for backup", "reason", err.Error()) + return rr, nil + } + + storage, ok := cluster.Spec.Backup.Storages[cr.Spec.StorageName] + if !ok { + return rr, errors.Errorf("storage %s doesn't exist", cr.Spec.StorageName) } + log = log.WithValues("storage", cr.Spec.StorageName) + + log.V(1).Info("Check if parallel backups are allowed", "allowed", cluster.Spec.Backup.GetAllowParallel()) if !cluster.Spec.Backup.GetAllowParallel() { - isRunning, err := r.isOtherBackupRunning(ctx, cr, cluster) + lease, err := k8s.AcquireLease(ctx, r.client, naming.BackupLeaseName(cluster.Name), cr.Namespace, cr.Name) if err != nil { - return rr, errors.Wrap(err, "failed to check if other backups running") + return reconcile.Result{}, errors.Wrap(err, "acquire backup lock") } - if isRunning { - log.Info("backup already running, waiting until it's done") + + if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity != cr.Name { + log.Info("Another backup is holding the lock", "holder", *lease.Spec.HolderIdentity) return rr, nil } } - storage, ok := cluster.Spec.Backup.Storages[cr.Spec.StorageName] - if !ok { - return rr, errors.Errorf("storage %s doesn't exist", cr.Spec.StorageName) - } if cr.Status.S3 == nil || cr.Status.Azure == nil { cr.Status.S3 = storage.S3 cr.Status.Azure = storage.Azure @@ -210,15 +222,15 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req cr.Status.Destination.SetPVCDestination(pvc.Name) // Set PerconaXtraDBClusterBackup instance as the owner and controller - if err := setControllerReference(cr, pvc, r.scheme); err != nil { + if err := k8s.SetControllerReference(cr, pvc, r.scheme); err != nil { return rr, errors.Wrap(err, "setControllerReference") } // Check if this PVC already exists - err = r.client.Get(context.TODO(), types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}, pvc) + err = r.client.Get(ctx, types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}, pvc) if err != nil && k8sErrors.IsNotFound(err) { log.Info("Creating a new volume for backup", "Namespace", pvc.Namespace, "Name", pvc.Name) - err = r.client.Create(context.TODO(), pvc) + err = r.client.Create(ctx, pvc) if err != nil { return rr, errors.Wrap(err, "create backup pvc") } @@ -253,22 +265,49 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req } // Set PerconaXtraDBClusterBackup instance as the owner and controller - if err := setControllerReference(cr, job, r.scheme); err != nil { + if err := k8s.SetControllerReference(cr, job, r.scheme); err != nil { return rr, errors.Wrap(err, "job/setControllerReference") } - err = r.client.Create(context.TODO(), job) + err = r.client.Create(ctx, job) if err != nil && !k8sErrors.IsAlreadyExists(err) { return rr, errors.Wrap(err, "create backup job") } else if err == nil { - log.Info("Created a new backup job", "Namespace", job.Namespace, "Name", job.Name) + log.Info("Created a new backup job", "namespace", job.Namespace, "name", job.Name) } - err = r.updateJobStatus(cr, job, cr.Spec.StorageName, storage, cluster) + err = r.updateJobStatus(ctx, cr, job, cr.Spec.StorageName, storage, cluster) + + switch cr.Status.State { + case api.BackupSucceeded, api.BackupFailed: + log.Info("Releasing backup lock", "lease", naming.BackupLeaseName(cluster.Name)) + + if err := k8s.ReleaseLease(ctx, r.client, naming.BackupLeaseName(cluster.Name), cr.Namespace); err != nil { + return reconcile.Result{}, errors.Wrap(err, "release backup lock") + } + + return reconcile.Result{}, nil + } return rr, err } +func (r *ReconcilePerconaXtraDBClusterBackup) ensureFinalizers(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { + for _, f := range cr.GetFinalizers() { + if f == naming.FinalizerReleaseLock { + return nil + } + } + + orig := cr.DeepCopy() + cr.SetFinalizers(append(cr.GetFinalizers(), naming.FinalizerReleaseLock)) + if err := r.client.Patch(ctx, cr.DeepCopy(), client.MergeFrom(orig)); err != nil { + return errors.Wrap(err, "patch finalizers") + } + + return nil +} + func (r *ReconcilePerconaXtraDBClusterBackup) tryRunBackupFinalizers(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { if cr.ObjectMeta.DeletionTimestamp == nil { return nil @@ -282,7 +321,7 @@ func (r *ReconcilePerconaXtraDBClusterBackup) tryRunBackupFinalizers(ctx context return nil } - go r.runDeleteBackupFinalizer(ctx, cr) + go r.runBackupFinalizers(ctx, cr) default: if _, ok := r.bcpDeleteInProgress.Load(cr.Name); !ok { inprog := []string{} @@ -299,7 +338,7 @@ func (r *ReconcilePerconaXtraDBClusterBackup) tryRunBackupFinalizers(ctx context return nil } -func (r *ReconcilePerconaXtraDBClusterBackup) runDeleteBackupFinalizer(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) { +func (r *ReconcilePerconaXtraDBClusterBackup) runBackupFinalizers(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) { log := logf.FromContext(ctx) defer func() { @@ -315,10 +354,10 @@ func (r *ReconcilePerconaXtraDBClusterBackup) runDeleteBackupFinalizer(ctx conte log.Info("The finalizer delete-s3-backup is deprecated and will be deleted in 1.18.0. Use percona.com/delete-backup") fallthrough case naming.FinalizerDeleteBackup: - if (cr.Status.S3 == nil && cr.Status.Azure == nil) || cr.Status.Destination == "" { continue } + switch cr.Status.GetStorageType(nil) { case api.BackupStorageS3: if cr.Status.Destination.StorageTypePrefix() != api.AwsBlobStoragePrefix { @@ -330,15 +369,24 @@ func (r *ReconcilePerconaXtraDBClusterBackup) runDeleteBackupFinalizer(ctx conte default: continue } + + if err != nil { + log.Info("failed to delete backup", "backup path", cr.Status.Destination, "error", err.Error()) + finalizers = append(finalizers, f) + continue + } + + log.Info("backup was removed", "name", cr.Name) + case naming.FinalizerReleaseLock: + err = r.runReleaseLockFinalizer(ctx, cr) + if err != nil { + log.Error(err, "failed to release backup lock") + finalizers = append(finalizers, f) + } default: finalizers = append(finalizers, f) } - if err != nil { - log.Info("failed to delete backup", "backup path", cr.Status.Destination, "error", err.Error()) - finalizers = append(finalizers, f) - } else if f == naming.FinalizerDeleteBackup || f == naming.FinalizerS3DeleteBackup { - log.Info("backup was removed", "name", cr.Name) - } + } cr.SetFinalizers(finalizers) @@ -409,6 +457,14 @@ func (r *ReconcilePerconaXtraDBClusterBackup) runAzureBackupFinalizer(ctx contex return nil } +func (r *ReconcilePerconaXtraDBClusterBackup) runReleaseLockFinalizer(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { + err := k8s.ReleaseLease(ctx, r.client, naming.BackupLeaseName(cr.Spec.PXCCluster), cr.Namespace) + if k8sErrors.IsNotFound(err) { + return nil + } + return errors.Wrap(err, "release backup lock") +} + func removeBackupObjects(ctx context.Context, s storage.Storage, destination string) func() error { return func() error { blobs, err := s.ListObjects(ctx, destination) @@ -440,9 +496,9 @@ func removeBackupObjects(ctx context.Context, s storage.Storage, destination str } } -func (r *ReconcilePerconaXtraDBClusterBackup) getCluster(cr *api.PerconaXtraDBClusterBackup) (*api.PerconaXtraDBCluster, error) { +func (r *ReconcilePerconaXtraDBClusterBackup) getCluster(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) (*api.PerconaXtraDBCluster, error) { cluster := api.PerconaXtraDBCluster{} - err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: cr.Namespace, Name: cr.Spec.PXCCluster}, &cluster) + err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: cr.Spec.PXCCluster}, &cluster) if err != nil { return nil, errors.Wrap(err, "get PXC cluster") } @@ -450,10 +506,17 @@ func (r *ReconcilePerconaXtraDBClusterBackup) getCluster(cr *api.PerconaXtraDBCl return &cluster, nil } -func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus(bcp *api.PerconaXtraDBClusterBackup, job *batchv1.Job, - storageName string, storage *api.BackupStorageSpec, cluster *api.PerconaXtraDBCluster, +func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus( + ctx context.Context, + bcp *api.PerconaXtraDBClusterBackup, + job *batchv1.Job, + storageName string, + storage *api.BackupStorageSpec, + cluster *api.PerconaXtraDBCluster, ) error { - err := r.client.Get(context.TODO(), types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, job) + log := logf.FromContext(ctx).WithValues("job", job.Name) + + err := r.client.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, job) if err != nil { if k8sErrors.IsNotFound(err) { return nil @@ -500,20 +563,25 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus(bcp *api.PerconaXt bcp.Status = status - if status.State == api.BackupSucceeded { + switch status.State { + case api.BackupSucceeded: + log.Info("Backup succeeded") + if cluster.PITREnabled() { - collectorPod, err := binlogcollector.GetPod(context.TODO(), r.client, cluster) + collectorPod, err := binlogcollector.GetPod(ctx, r.client, cluster) if err != nil { return errors.Wrap(err, "get binlog collector pod") } - if err := binlogcollector.RemoveGapFile(context.TODO(), cluster, r.clientcmd, collectorPod); err != nil { + log.V(1).Info("Removing binlog gap file from binlog collector", "pod", collectorPod.Name) + if err := binlogcollector.RemoveGapFile(r.clientcmd, collectorPod); err != nil { if !errors.Is(err, binlogcollector.GapFileNotFound) { return errors.Wrap(err, "remove gap file") } } - if err := binlogcollector.RemoveTimelineFile(context.TODO(), cluster, r.clientcmd, collectorPod); err != nil { + log.V(1).Info("Removing binlog timeline file from binlog collector", "pod", collectorPod.Name) + if err := binlogcollector.RemoveTimelineFile(r.clientcmd, collectorPod); err != nil { return errors.Wrap(err, "remove timeline file") } } @@ -524,76 +592,18 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus(bcp *api.PerconaXt Namespace: cluster.Namespace, }, } - if err := r.client.Delete(context.TODO(), &initSecret); client.IgnoreNotFound(err) != nil { + log.V(1).Info("Removing mysql-init secret", "secret", initSecret.Name) + if err := r.client.Delete(ctx, &initSecret); client.IgnoreNotFound(err) != nil { return errors.Wrap(err, "delete mysql-init secret") } + case api.BackupFailed: + log.Info("Backup failed") } - err = r.client.Status().Update(context.TODO(), bcp) - if err != nil { - return errors.Wrap(err, "send update") - } - - return nil -} - -func setControllerReference(cr *api.PerconaXtraDBClusterBackup, obj metav1.Object, scheme *runtime.Scheme) error { - ownerRef, err := cr.OwnerRef(scheme) + err = r.client.Status().Update(ctx, bcp) if err != nil { - return err + return errors.Wrap(err, "update status") } - obj.SetOwnerReferences(append(obj.GetOwnerReferences(), ownerRef)) - return nil -} -func (r *ReconcilePerconaXtraDBClusterBackup) isOtherBackupRunning(ctx context.Context, cr *api.PerconaXtraDBClusterBackup, cluster *api.PerconaXtraDBCluster) (bool, error) { - list := new(batchv1.JobList) - if err := r.client.List(ctx, list, &client.ListOptions{ - Namespace: cluster.Namespace, - LabelSelector: labels.SelectorFromSet(naming.LabelsBackup(cluster)), - }); err != nil { - return false, errors.Wrap(err, "list jobs") - } - - for _, job := range list.Items { - backupNameLabelKey := naming.LabelPerconaBackupName - if cluster.CompareVersionWith("1.16.0") < 0 { - backupNameLabelKey = "backup-name" - } - if job.Labels[backupNameLabelKey] == cr.Name { - continue - } - if job.Status.Active == 0 && (jobSucceded(&job) || jobFailed(&job)) { - continue - } - - return true, nil - } - - return false, nil -} - -func jobFailed(job *batchv1.Job) bool { - failedCondition := findJobCondition(job.Status.Conditions, batchv1.JobFailed) - if failedCondition != nil && failedCondition.Status == corev1.ConditionTrue { - return true - } - return false -} - -func jobSucceded(job *batchv1.Job) bool { - succeededCondition := findJobCondition(job.Status.Conditions, batchv1.JobComplete) - if succeededCondition != nil && succeededCondition.Status == corev1.ConditionTrue { - return true - } - return false -} - -func findJobCondition(conditions []batchv1.JobCondition, condType batchv1.JobConditionType) *batchv1.JobCondition { - for i, cond := range conditions { - if cond.Type == condType { - return &conditions[i] - } - } return nil } diff --git a/pkg/k8s/k8s_suite_test.go b/pkg/k8s/k8s_suite_test.go new file mode 100644 index 000000000..3985e3b74 --- /dev/null +++ b/pkg/k8s/k8s_suite_test.go @@ -0,0 +1,13 @@ +package k8s_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestK8s(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "K8s Suite") +} diff --git a/pkg/k8s/lease.go b/pkg/k8s/lease.go new file mode 100644 index 000000000..8eb7d6958 --- /dev/null +++ b/pkg/k8s/lease.go @@ -0,0 +1,56 @@ +package k8s + +import ( + "context" + "time" + + "github.com/pkg/errors" + coordv1 "k8s.io/api/coordination/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func AcquireLease(ctx context.Context, c client.Client, name, namespace, holder string) (*coordv1.Lease, error) { + lease := new(coordv1.Lease) + + if err := c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, lease); err != nil { + if !k8serrors.IsNotFound(err) { + return lease, errors.Wrap(err, "get lease") + } + + lease := &coordv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: coordv1.LeaseSpec{ + AcquireTime: &metav1.MicroTime{Time: time.Now()}, + HolderIdentity: &holder, + }, + } + + if err := c.Create(ctx, lease); err != nil { + return lease, errors.Wrap(err, "create lease") + } + + return lease, nil + } + + return lease, nil +} + +func ReleaseLease(ctx context.Context, c client.Client, name, namespace string) error { + lease := new(coordv1.Lease) + + if err := c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, lease); err != nil { + return errors.Wrap(err, "get lease") + } + + if err := c.Delete(ctx, lease); err != nil { + return errors.Wrap(err, "delete lease") + } + + return nil +} diff --git a/pkg/k8s/lease_test.go b/pkg/k8s/lease_test.go new file mode 100644 index 000000000..873c1c756 --- /dev/null +++ b/pkg/k8s/lease_test.go @@ -0,0 +1,73 @@ +package k8s_test + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/percona/percona-xtradb-cluster-operator/pkg/k8s" + coordv1 "k8s.io/api/coordination/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" // nolint +) + +var _ = Describe("Lease", func() { + It("should be create a lease", func() { + cl := fake.NewFakeClient() + + ctx := context.Background() + + name := "backup-lock" + namespace := "test" + holder := "backup1" + + lease, err := k8s.AcquireLease(ctx, cl, name, namespace, holder) + Expect(err).ToNot(HaveOccurred()) + + freshLease := new(coordv1.Lease) + nn := types.NamespacedName{ + Name: name, + Namespace: namespace, + } + err = cl.Get(ctx, nn, freshLease) + Expect(err).ToNot(HaveOccurred()) + Expect(freshLease.Spec.AcquireTime).NotTo(BeNil()) + Expect(freshLease.Spec.HolderIdentity, lease.Spec.HolderIdentity) + }) + + It("should be delete a lease", func() { + ctx := context.Background() + + name := "backup-lock" + namespace := "test" + holder := "backup1" + + lease := &coordv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: coordv1.LeaseSpec{ + AcquireTime: &metav1.MicroTime{Time: time.Now()}, + HolderIdentity: &holder, + }, + } + + cl := fake.NewFakeClient(lease) + + err := k8s.ReleaseLease(ctx, cl, name, namespace) + Expect(err).ToNot(HaveOccurred()) + + freshLease := new(coordv1.Lease) + nn := types.NamespacedName{ + Name: name, + Namespace: namespace, + } + err = cl.Get(ctx, nn, freshLease) + Expect(err).To(HaveOccurred()) + Expect(k8serrors.IsNotFound(err)).To(BeTrue()) + }) +}) diff --git a/pkg/naming/backup.go b/pkg/naming/backup.go index c7e876d03..2d44941f9 100644 --- a/pkg/naming/backup.go +++ b/pkg/naming/backup.go @@ -9,6 +9,10 @@ import ( "k8s.io/apimachinery/pkg/util/validation" ) +func BackupLeaseName(clusterName string) string { + return "pxc-" + clusterName + "-backup-lock" +} + // BackupJobName generates legit name for backup resources. // k8s sets the `job-name` label for the created by job pod. // So we have to be sure that job name won't be longer than 63 symbols. diff --git a/pkg/naming/naming.go b/pkg/naming/naming.go index 3dd8f58ba..67237ec2f 100644 --- a/pkg/naming/naming.go +++ b/pkg/naming/naming.go @@ -1,7 +1,8 @@ package naming const ( - annotationPrefix = "percona.com/" + annotationPrefix = "percona.com/" + internalAnnotationPrefix = "internal." + annotationPrefix ) const ( @@ -11,6 +12,7 @@ const ( FinalizerDeletePxcPvc = annotationPrefix + "delete-pxc-pvc" FinalizerDeleteBackup = annotationPrefix + "delete-backup" FinalizerS3DeleteBackup = "delete-s3-backup" + FinalizerReleaseLock = internalAnnotationPrefix + "release-lock" ) const ( diff --git a/pkg/pxc/app/binlogcollector/binlog-collector.go b/pkg/pxc/app/binlogcollector/binlog-collector.go index 5eb2940ef..ac7b725ec 100644 --- a/pkg/pxc/app/binlogcollector/binlog-collector.go +++ b/pkg/pxc/app/binlogcollector/binlog-collector.go @@ -346,7 +346,7 @@ func GetPod(ctx context.Context, c client.Client, cr *api.PerconaXtraDBCluster) var GapFileNotFound = errors.New("gap file not found") -func RemoveGapFile(ctx context.Context, cr *api.PerconaXtraDBCluster, c *clientcmd.Client, pod *corev1.Pod) error { +func RemoveGapFile(c *clientcmd.Client, pod *corev1.Pod) error { stderrBuf := &bytes.Buffer{} err := c.Exec(pod, "pitr", []string{"/bin/bash", "-c", "rm /tmp/gap-detected"}, nil, nil, stderrBuf, false) if err != nil { @@ -359,7 +359,7 @@ func RemoveGapFile(ctx context.Context, cr *api.PerconaXtraDBCluster, c *clientc return nil } -func RemoveTimelineFile(ctx context.Context, cr *api.PerconaXtraDBCluster, c *clientcmd.Client, pod *corev1.Pod) error { +func RemoveTimelineFile(c *clientcmd.Client, pod *corev1.Pod) error { stderrBuf := &bytes.Buffer{} err := c.Exec(pod, "pitr", []string{"/bin/bash", "-c", "rm /tmp/pitr-timeline"}, nil, nil, stderrBuf, false) if err != nil { diff --git a/pkg/pxc/backup/pitr.go b/pkg/pxc/backup/pitr.go index 462f1118b..55e2f7770 100644 --- a/pkg/pxc/backup/pitr.go +++ b/pkg/pxc/backup/pitr.go @@ -89,7 +89,7 @@ func CheckPITRErrors(ctx context.Context, cl client.Client, clcmd *clientcmd.Cli return errors.Wrap(err, "update backup status") } - if err := binlogcollector.RemoveGapFile(ctx, cr, clcmd, collectorPod); err != nil { + if err := binlogcollector.RemoveGapFile(clcmd, collectorPod); err != nil { if !errors.Is(err, binlogcollector.GapFileNotFound) { return errors.Wrap(err, "remove gap file") } From 3a4b8f61a1198649103f7914be5f3731f9d000cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ege=20G=C3=BCne=C5=9F?= Date: Mon, 20 Jan 2025 13:02:16 +0300 Subject: [PATCH 2/3] K8SPXC-1366: Add startingDeadlineSeconds We introduce a new field to PerconaXtraDBClusterBackup: `spec.startingDeadlineSeconds`. If this field is set and if backup is not started in configured duration, operator will automatically fail the backup. This duration is checked by comparing the current time with backup job's creation timestamp. --- ...rcona.com_perconaxtradbclusterbackups.yaml | 5 + ...cona.com_perconaxtradbclusterrestores.yaml | 4 + ...pxc.percona.com_perconaxtradbclusters.yaml | 3 + deploy/backup/backup.yaml | 1 + deploy/bundle.yaml | 12 ++ deploy/cr.yaml | 1 + deploy/crd.yaml | 12 ++ deploy/cw-bundle.yaml | 12 ++ pkg/apis/pxc/v1/pxc_backup_types.go | 15 +- pkg/apis/pxc/v1/pxc_types.go | 23 +-- pkg/apis/pxc/v1/zz_generated.deepcopy.go | 10 ++ pkg/controller/pxc/backup.go | 5 +- pkg/controller/pxcbackup/controller.go | 170 ++++++++++++++---- 13 files changed, 223 insertions(+), 50 deletions(-) diff --git a/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml b/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml index a8e745cb8..3696e7235 100644 --- a/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml +++ b/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml @@ -146,6 +146,9 @@ spec: type: object pxcCluster: type: string + startingDeadlineSeconds: + format: int64 + type: integer storageName: type: string type: object @@ -203,6 +206,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: diff --git a/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml b/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml index 50e42bb77..1a964ea17 100644 --- a/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml +++ b/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml @@ -101,6 +101,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -275,6 +277,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: diff --git a/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml b/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml index f79927e7e..9a2ff3fd8 100644 --- a/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml +++ b/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml @@ -143,6 +143,9 @@ spec: type: array serviceAccountName: type: string + startingDeadlineSeconds: + format: int64 + type: integer storages: additionalProperties: properties: diff --git a/deploy/backup/backup.yaml b/deploy/backup/backup.yaml index 2a65ed2ce..26536b626 100644 --- a/deploy/backup/backup.yaml +++ b/deploy/backup/backup.yaml @@ -8,6 +8,7 @@ spec: pxcCluster: cluster1 storageName: fs-pvc # activeDeadlineSeconds: 3600 +# startingDeadlineSeconds: 300 # containerOptions: # env: # - name: VERIFY_TLS diff --git a/deploy/bundle.yaml b/deploy/bundle.yaml index 4604da4fa..33bfe562d 100644 --- a/deploy/bundle.yaml +++ b/deploy/bundle.yaml @@ -145,6 +145,9 @@ spec: type: object pxcCluster: type: string + startingDeadlineSeconds: + format: int64 + type: integer storageName: type: string type: object @@ -202,6 +205,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -344,6 +349,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -518,6 +525,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -1048,6 +1057,9 @@ spec: type: array serviceAccountName: type: string + startingDeadlineSeconds: + format: int64 + type: integer storages: additionalProperties: properties: diff --git a/deploy/cr.yaml b/deploy/cr.yaml index 152620d81..257f7fc4e 100644 --- a/deploy/cr.yaml +++ b/deploy/cr.yaml @@ -605,6 +605,7 @@ spec: image: perconalab/percona-xtradb-cluster-operator:main-pxc8.0-backup # backoffLimit: 6 # activeDeadlineSeconds: 3600 +# startingDeadlineSeconds: 300 # serviceAccountName: percona-xtradb-cluster-operator # imagePullSecrets: # - name: private-registry-credentials diff --git a/deploy/crd.yaml b/deploy/crd.yaml index 63c7dcd41..12b435fe6 100644 --- a/deploy/crd.yaml +++ b/deploy/crd.yaml @@ -145,6 +145,9 @@ spec: type: object pxcCluster: type: string + startingDeadlineSeconds: + format: int64 + type: integer storageName: type: string type: object @@ -202,6 +205,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -344,6 +349,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -518,6 +525,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -1048,6 +1057,9 @@ spec: type: array serviceAccountName: type: string + startingDeadlineSeconds: + format: int64 + type: integer storages: additionalProperties: properties: diff --git a/deploy/cw-bundle.yaml b/deploy/cw-bundle.yaml index f536667bd..2ae2e70f0 100644 --- a/deploy/cw-bundle.yaml +++ b/deploy/cw-bundle.yaml @@ -145,6 +145,9 @@ spec: type: object pxcCluster: type: string + startingDeadlineSeconds: + format: int64 + type: integer storageName: type: string type: object @@ -202,6 +205,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -344,6 +349,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -518,6 +525,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -1048,6 +1057,9 @@ spec: type: array serviceAccountName: type: string + startingDeadlineSeconds: + format: int64 + type: integer storages: additionalProperties: properties: diff --git a/pkg/apis/pxc/v1/pxc_backup_types.go b/pkg/apis/pxc/v1/pxc_backup_types.go index 44b1a8e3f..93241abd6 100644 --- a/pkg/apis/pxc/v1/pxc_backup_types.go +++ b/pkg/apis/pxc/v1/pxc_backup_types.go @@ -47,14 +47,16 @@ type PerconaXtraDBClusterBackup struct { } type PXCBackupSpec struct { - PXCCluster string `json:"pxcCluster"` - StorageName string `json:"storageName,omitempty"` - ContainerOptions *BackupContainerOptions `json:"containerOptions,omitempty"` - ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` + PXCCluster string `json:"pxcCluster"` + StorageName string `json:"storageName,omitempty"` + ContainerOptions *BackupContainerOptions `json:"containerOptions,omitempty"` + StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"` + ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` } type PXCBackupStatus struct { State PXCBackupState `json:"state,omitempty"` + Error string `json:"error,omitempty"` CompletedAt *metav1.Time `json:"completed,omitempty"` LastScheduled *metav1.Time `json:"lastscheduled,omitempty"` Destination PXCBackupDestination `json:"destination,omitempty"` @@ -186,3 +188,8 @@ func (cr *PerconaXtraDBClusterBackup) OwnerRef(scheme *runtime.Scheme) (metav1.O Controller: &trueVar, }, nil } + +func (cr *PerconaXtraDBClusterBackup) SetFailedStatusWithError(err error) { + cr.Status.State = BackupFailed + cr.Status.Error = err.Error() +} diff --git a/pkg/apis/pxc/v1/pxc_types.go b/pkg/apis/pxc/v1/pxc_types.go index df91b7c84..086495d12 100644 --- a/pkg/apis/pxc/v1/pxc_types.go +++ b/pkg/apis/pxc/v1/pxc_types.go @@ -160,17 +160,18 @@ const ( ) type PXCScheduledBackup struct { - AllowParallel *bool `json:"allowParallel,omitempty"` - Image string `json:"image,omitempty"` - ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` - ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"` - Schedule []PXCScheduledBackupSchedule `json:"schedule,omitempty"` - Storages map[string]*BackupStorageSpec `json:"storages,omitempty"` - ServiceAccountName string `json:"serviceAccountName,omitempty"` - Annotations map[string]string `json:"annotations,omitempty"` - PITR PITRSpec `json:"pitr,omitempty"` - BackoffLimit *int32 `json:"backoffLimit,omitempty"` - ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` + AllowParallel *bool `json:"allowParallel,omitempty"` + Image string `json:"image,omitempty"` + ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` + ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"` + Schedule []PXCScheduledBackupSchedule `json:"schedule,omitempty"` + Storages map[string]*BackupStorageSpec `json:"storages,omitempty"` + ServiceAccountName string `json:"serviceAccountName,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` + PITR PITRSpec `json:"pitr,omitempty"` + BackoffLimit *int32 `json:"backoffLimit,omitempty"` + ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` + StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"` } func (b *PXCScheduledBackup) GetAllowParallel() bool { diff --git a/pkg/apis/pxc/v1/zz_generated.deepcopy.go b/pkg/apis/pxc/v1/zz_generated.deepcopy.go index cbd0004d2..e5d2c3bad 100644 --- a/pkg/apis/pxc/v1/zz_generated.deepcopy.go +++ b/pkg/apis/pxc/v1/zz_generated.deepcopy.go @@ -401,6 +401,11 @@ func (in *PXCBackupSpec) DeepCopyInto(out *PXCBackupSpec) { *out = new(BackupContainerOptions) (*in).DeepCopyInto(*out) } + if in.StartingDeadlineSeconds != nil { + in, out := &in.StartingDeadlineSeconds, &out.StartingDeadlineSeconds + *out = new(int64) + **out = **in + } if in.ActiveDeadlineSeconds != nil { in, out := &in.ActiveDeadlineSeconds, &out.ActiveDeadlineSeconds *out = new(int64) @@ -519,6 +524,11 @@ func (in *PXCScheduledBackup) DeepCopyInto(out *PXCScheduledBackup) { *out = new(int64) **out = **in } + if in.StartingDeadlineSeconds != nil { + in, out := &in.StartingDeadlineSeconds, &out.StartingDeadlineSeconds + *out = new(int64) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PXCScheduledBackup. diff --git a/pkg/controller/pxc/backup.go b/pkg/controller/pxc/backup.go index 80a0d8442..1ae89632e 100644 --- a/pkg/controller/pxc/backup.go +++ b/pkg/controller/pxc/backup.go @@ -198,8 +198,9 @@ func (r *ReconcilePerconaXtraDBCluster) createBackupJob(ctx context.Context, cr Labels: naming.LabelsScheduledBackup(cr, backupJob.Name), }, Spec: api.PXCBackupSpec{ - PXCCluster: cr.Name, - StorageName: backupJob.StorageName, + PXCCluster: cr.Name, + StorageName: backupJob.StorageName, + StartingDeadlineSeconds: cr.Spec.Backup.StartingDeadlineSeconds, }, } err = r.client.Create(context.TODO(), bcp) diff --git a/pkg/controller/pxcbackup/controller.go b/pkg/controller/pxcbackup/controller.go index 613ec43f0..683aadfd3 100644 --- a/pkg/controller/pxcbackup/controller.go +++ b/pkg/controller/pxcbackup/controller.go @@ -150,31 +150,62 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req return rr, nil } + if err := r.checkStartingDeadline(ctx, cr); err != nil { + if err := r.setFailedStatus(ctx, cr, err); err != nil { + return rr, errors.Wrap(err, "update status") + } + + return reconcile.Result{}, nil + } + cluster, err := r.getCluster(ctx, cr) if err != nil { - log.Error(err, "invalid backup cluster") - return rr, nil + return reconcile.Result{}, errors.Wrap(err, "get cluster") } log = log.WithValues("cluster", cluster.Name) err = cluster.CheckNSetDefaults(r.serverVersion, log) if err != nil { - return rr, errors.Wrap(err, "wrong PXC options") + err := errors.Wrap(err, "wrong PXC options") + + if err := r.setFailedStatus(ctx, cr, err); err != nil { + return rr, errors.Wrap(err, "update status") + } + + return reconcile.Result{}, err } if cluster.Spec.Backup == nil { - return rr, errors.New("a backup image should be set in the PXC config") + err := errors.New("a backup image should be set in the PXC config") + + if err := r.setFailedStatus(ctx, cr, err); err != nil { + return rr, errors.Wrap(err, "update status") + } + + return reconcile.Result{}, err } if err := cluster.CanBackup(); err != nil { log.Info("Cluster is not ready for backup", "reason", err.Error()) + + cr.Status.State = api.BackupWaiting + if err := r.updateStatus(ctx, cr); err != nil { + return rr, errors.Wrap(err, "update status") + } + return rr, nil } storage, ok := cluster.Spec.Backup.Storages[cr.Spec.StorageName] if !ok { - return rr, errors.Errorf("storage %s doesn't exist", cr.Spec.StorageName) + err := errors.Errorf("storage %s doesn't exist", cr.Spec.StorageName) + + if err := r.setFailedStatus(ctx, cr, err); err != nil { + return rr, errors.Wrap(err, "update status") + } + + return reconcile.Result{}, err } log = log.WithValues("storage", cr.Spec.StorageName) @@ -188,6 +219,12 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity != cr.Name { log.Info("Another backup is holding the lock", "holder", *lease.Spec.HolderIdentity) + + cr.Status.State = api.BackupWaiting + if err := r.updateStatus(ctx, cr); err != nil { + return rr, errors.Wrap(err, "update status") + } + return rr, nil } } @@ -203,15 +240,50 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req cr.Status.VerifyTLS = storage.VerifyTLS } + job, err := r.createBackupJob(ctx, cr, cluster, storage) + if err != nil { + err = errors.Wrap(err, "create backup job") + + if err := r.setFailedStatus(ctx, cr, err); err != nil { + return rr, errors.Wrap(err, "update status") + } + + return reconcile.Result{}, err + } + + err = r.updateJobStatus(ctx, cr, job, cr.Spec.StorageName, storage, cluster) + + switch cr.Status.State { + case api.BackupSucceeded, api.BackupFailed: + log.Info("Releasing backup lock", "lease", naming.BackupLeaseName(cluster.Name)) + + if err := k8s.ReleaseLease(ctx, r.client, naming.BackupLeaseName(cluster.Name), cr.Namespace); err != nil { + return reconcile.Result{}, errors.Wrap(err, "release backup lock") + } + + return reconcile.Result{}, nil + } + + return rr, err +} + +func (r *ReconcilePerconaXtraDBClusterBackup) createBackupJob( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + cluster *api.PerconaXtraDBCluster, + storage *api.BackupStorageSpec, +) (*batchv1.Job, error) { + log := logf.FromContext(ctx) + bcp := backup.New(cluster) job := bcp.Job(cr, cluster) initImage, err := k8s.GetInitImage(ctx, cluster, r.client) if err != nil { - return rr, errors.Wrap(err, "failed to get initImage") + return nil, errors.Wrap(err, "failed to get initImage") } job.Spec, err = bcp.JobSpec(cr.Spec, cluster, job, initImage) if err != nil { - return rr, errors.Wrap(err, "can't create job spec") + return nil, errors.Wrap(err, "can't create job spec") } switch storage.Type { @@ -223,7 +295,7 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req // Set PerconaXtraDBClusterBackup instance as the owner and controller if err := k8s.SetControllerReference(cr, pvc, r.scheme); err != nil { - return rr, errors.Wrap(err, "setControllerReference") + return nil, errors.Wrap(err, "setControllerReference") } // Check if this PVC already exists @@ -232,64 +304,51 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req log.Info("Creating a new volume for backup", "Namespace", pvc.Namespace, "Name", pvc.Name) err = r.client.Create(ctx, pvc) if err != nil { - return rr, errors.Wrap(err, "create backup pvc") + return nil, errors.Wrap(err, "create backup pvc") } } else if err != nil { - return rr, errors.Wrap(err, "get backup pvc") + return nil, errors.Wrap(err, "get backup pvc") } err := backup.SetStoragePVC(&job.Spec, cr, pvc.Name) if err != nil { - return rr, errors.Wrap(err, "set storage FS") + return nil, errors.Wrap(err, "set storage FS") } case api.BackupStorageS3: if storage.S3 == nil { - return rr, errors.New("s3 storage is not specified") + return nil, errors.New("s3 storage is not specified") } cr.Status.Destination.SetS3Destination(storage.S3.Bucket, cr.Spec.PXCCluster+"-"+cr.CreationTimestamp.Time.Format("2006-01-02-15:04:05")+"-full") err := backup.SetStorageS3(&job.Spec, cr) if err != nil { - return rr, errors.Wrap(err, "set storage FS") + return nil, errors.Wrap(err, "set storage FS") } case api.BackupStorageAzure: if storage.Azure == nil { - return rr, errors.New("azure storage is not specified") + return nil, errors.New("azure storage is not specified") } cr.Status.Destination.SetAzureDestination(storage.Azure.ContainerPath, cr.Spec.PXCCluster+"-"+cr.CreationTimestamp.Time.Format("2006-01-02-15:04:05")+"-full") err := backup.SetStorageAzure(&job.Spec, cr) if err != nil { - return rr, errors.Wrap(err, "set storage FS for Azure") + return nil, errors.Wrap(err, "set storage FS for Azure") } } // Set PerconaXtraDBClusterBackup instance as the owner and controller if err := k8s.SetControllerReference(cr, job, r.scheme); err != nil { - return rr, errors.Wrap(err, "job/setControllerReference") + return nil, errors.Wrap(err, "job/setControllerReference") } err = r.client.Create(ctx, job) if err != nil && !k8sErrors.IsAlreadyExists(err) { - return rr, errors.Wrap(err, "create backup job") + return nil, errors.Wrap(err, "create backup job") } else if err == nil { log.Info("Created a new backup job", "namespace", job.Namespace, "name", job.Name) } - err = r.updateJobStatus(ctx, cr, job, cr.Spec.StorageName, storage, cluster) - - switch cr.Status.State { - case api.BackupSucceeded, api.BackupFailed: - log.Info("Releasing backup lock", "lease", naming.BackupLeaseName(cluster.Name)) - - if err := k8s.ReleaseLease(ctx, r.client, naming.BackupLeaseName(cluster.Name), cr.Namespace); err != nil { - return reconcile.Result{}, errors.Wrap(err, "release backup lock") - } - - return reconcile.Result{}, nil - } - - return rr, err + return job, nil } func (r *ReconcilePerconaXtraDBClusterBackup) ensureFinalizers(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { @@ -600,10 +659,55 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus( log.Info("Backup failed") } - err = r.client.Status().Update(ctx, bcp) - if err != nil { + if err := r.updateStatus(ctx, bcp); err != nil { return errors.Wrap(err, "update status") } return nil } + +func (r *ReconcilePerconaXtraDBClusterBackup) checkStartingDeadline(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { + log := logf.FromContext(ctx) + + since := time.Since(cr.CreationTimestamp.Time).Seconds() + + if cr.Spec.StartingDeadlineSeconds == nil { + return nil + } + + if since < float64(*cr.Spec.StartingDeadlineSeconds) { + return nil + } + + if cr.Status.State == api.BackupNew { + log.Info("Backup didn't start in startingDeadlineSeconds, failing the backup", + "startingDeadlineSeconds", *cr.Spec.StartingDeadlineSeconds, + "passedSeconds", since) + return errors.New("starting deadline seconds exceeded") + } + + return nil +} + +func (r *ReconcilePerconaXtraDBClusterBackup) updateStatus(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + localCr := new(api.PerconaXtraDBClusterBackup) + err := r.client.Get(ctx, client.ObjectKeyFromObject(cr), localCr) + if err != nil { + return err + } + + localCr.Status = cr.Status + + return r.client.Status().Update(ctx, localCr) + }) +} + +func (r *ReconcilePerconaXtraDBClusterBackup) setFailedStatus( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + err error, +) error { + cr.SetFailedStatusWithError(err) + return r.updateStatus(ctx, cr) +} From fd338faa77c72b57da05a5400a9b6af57075987b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ege=20G=C3=BCne=C5=9F?= Date: Mon, 20 Jan 2025 15:45:09 +0300 Subject: [PATCH 3/3] K8SPXC-1366: Suspend backup job if cluster becomes unready Backups can put pressure on a PXC cluster. With these changes, we introduce a new safety mechanism to pause backups if cluster becomes unhealthy. This mechanism can be disabled by enabling `spec.unsafeFlags.backupIfUnhealthy`. Operator will periodically check cluster status and ready PXC pods while a backup is running. If ready PXC pods, at any point, becomes less than the desired number of PXC pods, operator will suspend backup job. Suspending the backup job will terminate any running backup pod. Operator will automatically resume the job once ready PXC pods are equal to desired number of PXC pods. --- pkg/apis/pxc/v1/pxc_backup_types.go | 2 +- pkg/controller/pxcbackup/controller.go | 147 +++++++++++++++++++++++-- pkg/naming/labels.go | 8 ++ pkg/pxc/backup/job.go | 6 +- 4 files changed, 147 insertions(+), 16 deletions(-) diff --git a/pkg/apis/pxc/v1/pxc_backup_types.go b/pkg/apis/pxc/v1/pxc_backup_types.go index 93241abd6..454dc1c0a 100644 --- a/pkg/apis/pxc/v1/pxc_backup_types.go +++ b/pkg/apis/pxc/v1/pxc_backup_types.go @@ -164,7 +164,7 @@ type PXCBackupState string const ( BackupNew PXCBackupState = "" - BackupWaiting PXCBackupState = "Waiting" + BackupSuspended PXCBackupState = "Suspended" BackupStarting PXCBackupState = "Starting" BackupRunning PXCBackupState = "Running" BackupFailed PXCBackupState = "Failed" diff --git a/pkg/controller/pxcbackup/controller.go b/pkg/controller/pxcbackup/controller.go index 683aadfd3..5485da244 100644 --- a/pkg/controller/pxcbackup/controller.go +++ b/pkg/controller/pxcbackup/controller.go @@ -17,6 +17,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -186,14 +187,13 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req return reconcile.Result{}, err } + if err := r.reconcileBackupJob(ctx, cr, cluster); err != nil { + return rr, errors.Wrap(err, "reconcile backup job") + } + if err := cluster.CanBackup(); err != nil { log.Info("Cluster is not ready for backup", "reason", err.Error()) - cr.Status.State = api.BackupWaiting - if err := r.updateStatus(ctx, cr); err != nil { - return rr, errors.Wrap(err, "update status") - } - return rr, nil } @@ -220,11 +220,6 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity != cr.Name { log.Info("Another backup is holding the lock", "holder", *lease.Spec.HolderIdentity) - cr.Status.State = api.BackupWaiting - if err := r.updateStatus(ctx, cr); err != nil { - return rr, errors.Wrap(err, "update status") - } - return rr, nil } } @@ -711,3 +706,135 @@ func (r *ReconcilePerconaXtraDBClusterBackup) setFailedStatus( cr.SetFailedStatusWithError(err) return r.updateStatus(ctx, cr) } + +func (r *ReconcilePerconaXtraDBClusterBackup) suspendJobIfNeeded( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + cluster *api.PerconaXtraDBCluster, +) error { + if cluster.Spec.Unsafe.BackupIfUnhealthy { + return nil + } + + if cluster.Status.Status == api.AppStateReady { + return nil + } + + if cluster.Status.PXC.Ready == cluster.Status.PXC.Size { + return nil + } + + log := logf.FromContext(ctx) + + labelKeyBackupType := naming.GetLabelBackupType(cluster) + jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + job := new(batchv1.Job) + + err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job) + if err != nil { + if k8sErrors.IsNotFound(err) { + return nil + } + return err + } + + suspended := false + for _, cond := range job.Status.Conditions { + if cond.Type == batchv1.JobSuspended && cond.Status == corev1.ConditionTrue { + suspended = true + } + } + + if suspended { + return nil + } + + log.Info("Suspending backup job", + "job", jobName, + "clusterStatus", cluster.Status.Status, + "readyPXC", cluster.Status.PXC.Ready) + + job.Spec.Suspend = ptr.To(true) + + err = r.client.Update(ctx, job) + if err != nil { + return err + } + + cr.Status.State = api.BackupSuspended + return r.updateStatus(ctx, cr) + }) + + return err +} + +func (r *ReconcilePerconaXtraDBClusterBackup) resumeJobIfNeeded( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + cluster *api.PerconaXtraDBCluster, +) error { + if cluster.Status.Status != api.AppStateReady { + return nil + } + + if cluster.Status.PXC.Ready != cluster.Status.PXC.Size { + return nil + } + + log := logf.FromContext(ctx) + + labelKeyBackupType := naming.GetLabelBackupType(cluster) + jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + job := new(batchv1.Job) + + err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job) + if err != nil { + if k8sErrors.IsNotFound(err) { + return nil + } + return err + } + + suspended := false + for _, cond := range job.Status.Conditions { + if cond.Type == batchv1.JobSuspended && cond.Status == corev1.ConditionTrue { + suspended = true + } + } + + if !suspended { + return nil + } + + log.Info("Resuming backup job", + "job", jobName, + "clusterStatus", cluster.Status.Status, + "readyPXC", cluster.Status.PXC.Ready) + + job.Spec.Suspend = ptr.To(false) + + return r.client.Update(ctx, job) + }) + + return err +} + +func (r *ReconcilePerconaXtraDBClusterBackup) reconcileBackupJob( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + cluster *api.PerconaXtraDBCluster, +) error { + if err := r.suspendJobIfNeeded(ctx, cr, cluster); err != nil { + return errors.Wrap(err, "suspend job if needed") + } + + if err := r.resumeJobIfNeeded(ctx, cr, cluster); err != nil { + return errors.Wrap(err, "suspend job if needed") + } + + return nil +} diff --git a/pkg/naming/labels.go b/pkg/naming/labels.go index 8fe38bd66..f1f7d007e 100644 --- a/pkg/naming/labels.go +++ b/pkg/naming/labels.go @@ -30,6 +30,14 @@ const ( LabelPerconaRestoreJobName = perconaPrefix + "restore-job-name" ) +func GetLabelBackupType(cr *api.PerconaXtraDBCluster) string { + if cr.CompareVersionWith("1.16.0") < 0 { + return "type" + } + + return LabelPerconaBackupType +} + func LabelsCluster(cr *api.PerconaXtraDBCluster) map[string]string { return map[string]string{ LabelAppKubernetesName: "percona-xtradb-cluster", diff --git a/pkg/pxc/backup/job.go b/pkg/pxc/backup/job.go index 476156428..5da545083 100644 --- a/pkg/pxc/backup/job.go +++ b/pkg/pxc/backup/job.go @@ -19,11 +19,7 @@ import ( ) func (*Backup) Job(cr *api.PerconaXtraDBClusterBackup, cluster *api.PerconaXtraDBCluster) *batchv1.Job { - labelKeyBackupType := "type" - if cluster.CompareVersionWith("1.16.0") >= 0 { - labelKeyBackupType = naming.LabelPerconaBackupType - } - + labelKeyBackupType := naming.GetLabelBackupType(cluster) jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") return &batchv1.Job{