From 1c9730bd9c19ea482058144fee5ed05683cbccb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ege=20G=C3=BCne=C5=9F?= Date: Mon, 20 Jan 2025 15:45:09 +0300 Subject: [PATCH] suspend backup job if cluster becomes unready --- pkg/apis/pxc/v1/pxc_backup_types.go | 2 +- pkg/controller/pxcbackup/controller.go | 171 ++++++++++++++++++++++--- pkg/naming/labels.go | 9 ++ pkg/pxc/backup/job.go | 6 +- 4 files changed, 163 insertions(+), 25 deletions(-) diff --git a/pkg/apis/pxc/v1/pxc_backup_types.go b/pkg/apis/pxc/v1/pxc_backup_types.go index 1723100ad..de6dd0124 100644 --- a/pkg/apis/pxc/v1/pxc_backup_types.go +++ b/pkg/apis/pxc/v1/pxc_backup_types.go @@ -164,7 +164,7 @@ type PXCBackupState string const ( BackupNew PXCBackupState = "" - BackupWaiting PXCBackupState = "Waiting" + BackupSuspended PXCBackupState = "Suspended" BackupStarting PXCBackupState = "Starting" BackupRunning PXCBackupState = "Running" BackupFailed PXCBackupState = "Failed" diff --git a/pkg/controller/pxcbackup/controller.go b/pkg/controller/pxcbackup/controller.go index 598473786..22c0531f5 100644 --- a/pkg/controller/pxcbackup/controller.go +++ b/pkg/controller/pxcbackup/controller.go @@ -145,8 +145,11 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req return rr, nil } - if err := r.checkPassiveDeadline(ctx, cr); err != nil { - log.Info("Backup didn't start in passiveDeadlineSeconds, failing the backup", "passiveDeadlineSeconds", *cr.Spec.PassiveDeadlineSeconds) + passedSeconds, err := r.checkPassiveDeadline(ctx, cr) + if err != nil { + log.Info("Backup didn't start in passiveDeadlineSeconds, failing the backup", + "passiveDeadlineSeconds", *cr.Spec.PassiveDeadlineSeconds, + "passedSeconds", passedSeconds) cr.Status.State = api.BackupFailed cr.Status.Error = err.Error() @@ -189,14 +192,13 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req return reconcile.Result{}, err } + if err := r.reconcileBackupJob(ctx, cr, cluster); err != nil { + return rr, errors.Wrap(err, "reconcile backup job") + } + if err := cluster.CanBackup(); err != nil { log.Info("Cluster is not ready for backup", "reason", err.Error()) - cr.Status.State = api.BackupWaiting - if err := r.updateStatus(ctx, cr); err != nil { - return rr, errors.Wrap(err, "update status") - } - return rr, nil } @@ -225,11 +227,6 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity != cr.Name { log.Info("Another backup is holding the lock", "holder", *lease.Spec.HolderIdentity) - cr.Status.State = api.BackupWaiting - if err := r.updateStatus(ctx, cr); err != nil { - return rr, errors.Wrap(err, "update status") - } - return rr, nil } } @@ -640,20 +637,22 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus( return nil } -func (r *ReconcilePerconaXtraDBClusterBackup) checkPassiveDeadline(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { +func (r *ReconcilePerconaXtraDBClusterBackup) checkPassiveDeadline(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) (float64, error) { + since := time.Since(cr.CreationTimestamp.Time).Seconds() + if cr.Spec.PassiveDeadlineSeconds == nil { - return nil + return since, nil } - if time.Since(cr.CreationTimestamp.Time).Seconds() < float64(*cr.Spec.PassiveDeadlineSeconds) { - return nil + if since < float64(*cr.Spec.PassiveDeadlineSeconds) { + return since, nil } switch cr.Status.State { - case api.BackupNew, api.BackupWaiting: - return errors.New("passive deadline seconds exceeded") + case api.BackupNew: + return since, errors.New("passive deadline seconds exceeded") default: - return nil + return since, nil } } @@ -670,3 +669,137 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateStatus(ctx context.Context, return r.client.Status().Update(ctx, localCr) }) } + +func (r *ReconcilePerconaXtraDBClusterBackup) suspendJobIfNeeded( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + cluster *api.PerconaXtraDBCluster, +) error { + if cluster.Spec.Unsafe.BackupIfUnhealthy { + return nil + } + + if cluster.Status.Status == api.AppStateReady { + return nil + } + + if cluster.Status.PXC.Ready == cluster.Status.PXC.Size { + return nil + } + + log := logf.FromContext(ctx) + + labelKeyBackupType := naming.GetLabelBackupType(cluster) + jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + job := new(batchv1.Job) + + err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job) + if err != nil { + if k8sErrors.IsNotFound(err) { + return nil + } + return err + } + + suspended := false + for _, cond := range job.Status.Conditions { + if cond.Type == batchv1.JobSuspended && cond.Status == corev1.ConditionTrue { + suspended = true + } + } + + if suspended { + return nil + } + + log.Info("Suspending backup job", + "job", jobName, + "clusterStatus", cluster.Status.Status, + "readyPXC", cluster.Status.PXC.Ready) + + t := true + job.Spec.Suspend = &t + + err = r.client.Update(ctx, job) + if err != nil { + return err + } + + cr.Status.State = api.BackupSuspended + return r.updateStatus(ctx, cr) + }) + + return err +} + +func (r *ReconcilePerconaXtraDBClusterBackup) resumeJobIfNeeded( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + cluster *api.PerconaXtraDBCluster, +) error { + if cluster.Status.Status != api.AppStateReady { + return nil + } + + if cluster.Status.PXC.Ready != cluster.Status.PXC.Size { + return nil + } + + log := logf.FromContext(ctx) + + labelKeyBackupType := naming.GetLabelBackupType(cluster) + jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + job := new(batchv1.Job) + + err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job) + if err != nil { + if k8sErrors.IsNotFound(err) { + return nil + } + return err + } + + suspended := false + for _, cond := range job.Status.Conditions { + if cond.Type == batchv1.JobSuspended && cond.Status == corev1.ConditionTrue { + suspended = true + } + } + + if !suspended { + return nil + } + + log.Info("Resuming backup job", + "job", jobName, + "clusterStatus", cluster.Status.Status, + "readyPXC", cluster.Status.PXC.Ready) + + f := false + job.Spec.Suspend = &f + + return r.client.Update(ctx, job) + }) + + return err +} + +func (r *ReconcilePerconaXtraDBClusterBackup) reconcileBackupJob( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + cluster *api.PerconaXtraDBCluster, +) error { + if err := r.suspendJobIfNeeded(ctx, cr, cluster); err != nil { + return errors.Wrap(err, "suspend job if needed") + } + + if err := r.resumeJobIfNeeded(ctx, cr, cluster); err != nil { + return errors.Wrap(err, "suspend job if needed") + } + + return nil +} diff --git a/pkg/naming/labels.go b/pkg/naming/labels.go index 8fe38bd66..4108a890f 100644 --- a/pkg/naming/labels.go +++ b/pkg/naming/labels.go @@ -30,6 +30,15 @@ const ( LabelPerconaRestoreJobName = perconaPrefix + "restore-job-name" ) +func GetLabelBackupType(cr *api.PerconaXtraDBCluster) string { + labelKeyBackupType := "type" + if cr.CompareVersionWith("1.16.0") >= 0 { + labelKeyBackupType = LabelPerconaBackupType + } + + return labelKeyBackupType +} + func LabelsCluster(cr *api.PerconaXtraDBCluster) map[string]string { return map[string]string{ LabelAppKubernetesName: "percona-xtradb-cluster", diff --git a/pkg/pxc/backup/job.go b/pkg/pxc/backup/job.go index 476156428..5da545083 100644 --- a/pkg/pxc/backup/job.go +++ b/pkg/pxc/backup/job.go @@ -19,11 +19,7 @@ import ( ) func (*Backup) Job(cr *api.PerconaXtraDBClusterBackup, cluster *api.PerconaXtraDBCluster) *batchv1.Job { - labelKeyBackupType := "type" - if cluster.CompareVersionWith("1.16.0") >= 0 { - labelKeyBackupType = naming.LabelPerconaBackupType - } - + labelKeyBackupType := naming.GetLabelBackupType(cluster) jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") return &batchv1.Job{