Skip to content

Commit

Permalink
suspend backup job if cluster becomes unready
Browse files Browse the repository at this point in the history
  • Loading branch information
egegunes committed Jan 20, 2025
1 parent 77eddf5 commit 1c9730b
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/pxc/v1/pxc_backup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ type PXCBackupState string

const (
BackupNew PXCBackupState = ""
BackupWaiting PXCBackupState = "Waiting"
BackupSuspended PXCBackupState = "Suspended"
BackupStarting PXCBackupState = "Starting"
BackupRunning PXCBackupState = "Running"
BackupFailed PXCBackupState = "Failed"
Expand Down
171 changes: 152 additions & 19 deletions pkg/controller/pxcbackup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,11 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req
return rr, nil
}

if err := r.checkPassiveDeadline(ctx, cr); err != nil {
log.Info("Backup didn't start in passiveDeadlineSeconds, failing the backup", "passiveDeadlineSeconds", *cr.Spec.PassiveDeadlineSeconds)
passedSeconds, err := r.checkPassiveDeadline(ctx, cr)
if err != nil {
log.Info("Backup didn't start in passiveDeadlineSeconds, failing the backup",
"passiveDeadlineSeconds", *cr.Spec.PassiveDeadlineSeconds,
"passedSeconds", passedSeconds)

cr.Status.State = api.BackupFailed
cr.Status.Error = err.Error()
Expand Down Expand Up @@ -189,14 +192,13 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req
return reconcile.Result{}, err
}

if err := r.reconcileBackupJob(ctx, cr, cluster); err != nil {
return rr, errors.Wrap(err, "reconcile backup job")
}

if err := cluster.CanBackup(); err != nil {
log.Info("Cluster is not ready for backup", "reason", err.Error())

cr.Status.State = api.BackupWaiting
if err := r.updateStatus(ctx, cr); err != nil {
return rr, errors.Wrap(err, "update status")
}

return rr, nil
}

Expand Down Expand Up @@ -225,11 +227,6 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req
if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity != cr.Name {
log.Info("Another backup is holding the lock", "holder", *lease.Spec.HolderIdentity)

cr.Status.State = api.BackupWaiting
if err := r.updateStatus(ctx, cr); err != nil {
return rr, errors.Wrap(err, "update status")
}

return rr, nil
}
}
Expand Down Expand Up @@ -640,20 +637,22 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus(
return nil
}

func (r *ReconcilePerconaXtraDBClusterBackup) checkPassiveDeadline(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error {
func (r *ReconcilePerconaXtraDBClusterBackup) checkPassiveDeadline(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) (float64, error) {
since := time.Since(cr.CreationTimestamp.Time).Seconds()

if cr.Spec.PassiveDeadlineSeconds == nil {
return nil
return since, nil
}

if time.Since(cr.CreationTimestamp.Time).Seconds() < float64(*cr.Spec.PassiveDeadlineSeconds) {
return nil
if since < float64(*cr.Spec.PassiveDeadlineSeconds) {
return since, nil
}

switch cr.Status.State {
case api.BackupNew, api.BackupWaiting:
return errors.New("passive deadline seconds exceeded")
case api.BackupNew:
return since, errors.New("passive deadline seconds exceeded")
default:
return nil
return since, nil
}
}

Expand All @@ -670,3 +669,137 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateStatus(ctx context.Context,
return r.client.Status().Update(ctx, localCr)
})
}

func (r *ReconcilePerconaXtraDBClusterBackup) suspendJobIfNeeded(
ctx context.Context,
cr *api.PerconaXtraDBClusterBackup,
cluster *api.PerconaXtraDBCluster,
) error {
if cluster.Spec.Unsafe.BackupIfUnhealthy {
return nil
}

if cluster.Status.Status == api.AppStateReady {
return nil
}

if cluster.Status.PXC.Ready == cluster.Status.PXC.Size {
return nil
}

log := logf.FromContext(ctx)

labelKeyBackupType := naming.GetLabelBackupType(cluster)
jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron")

err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
job := new(batchv1.Job)

err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job)
if err != nil {
if k8sErrors.IsNotFound(err) {
return nil
}
return err
}

suspended := false
for _, cond := range job.Status.Conditions {
if cond.Type == batchv1.JobSuspended && cond.Status == corev1.ConditionTrue {
suspended = true
}
}

if suspended {
return nil
}

log.Info("Suspending backup job",
"job", jobName,
"clusterStatus", cluster.Status.Status,
"readyPXC", cluster.Status.PXC.Ready)

t := true
job.Spec.Suspend = &t

err = r.client.Update(ctx, job)
if err != nil {
return err
}

cr.Status.State = api.BackupSuspended
return r.updateStatus(ctx, cr)
})

return err
}

func (r *ReconcilePerconaXtraDBClusterBackup) resumeJobIfNeeded(
ctx context.Context,
cr *api.PerconaXtraDBClusterBackup,
cluster *api.PerconaXtraDBCluster,
) error {
if cluster.Status.Status != api.AppStateReady {
return nil
}

if cluster.Status.PXC.Ready != cluster.Status.PXC.Size {
return nil
}

log := logf.FromContext(ctx)

labelKeyBackupType := naming.GetLabelBackupType(cluster)
jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron")

err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
job := new(batchv1.Job)

err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job)
if err != nil {
if k8sErrors.IsNotFound(err) {
return nil
}
return err
}

suspended := false
for _, cond := range job.Status.Conditions {
if cond.Type == batchv1.JobSuspended && cond.Status == corev1.ConditionTrue {
suspended = true
}
}

if !suspended {
return nil
}

log.Info("Resuming backup job",
"job", jobName,
"clusterStatus", cluster.Status.Status,
"readyPXC", cluster.Status.PXC.Ready)

f := false
job.Spec.Suspend = &f

return r.client.Update(ctx, job)
})

return err
}

func (r *ReconcilePerconaXtraDBClusterBackup) reconcileBackupJob(
ctx context.Context,
cr *api.PerconaXtraDBClusterBackup,
cluster *api.PerconaXtraDBCluster,
) error {
if err := r.suspendJobIfNeeded(ctx, cr, cluster); err != nil {
return errors.Wrap(err, "suspend job if needed")
}

if err := r.resumeJobIfNeeded(ctx, cr, cluster); err != nil {
return errors.Wrap(err, "suspend job if needed")
}

return nil
}
9 changes: 9 additions & 0 deletions pkg/naming/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ const (
LabelPerconaRestoreJobName = perconaPrefix + "restore-job-name"
)

func GetLabelBackupType(cr *api.PerconaXtraDBCluster) string {
labelKeyBackupType := "type"
if cr.CompareVersionWith("1.16.0") >= 0 {
labelKeyBackupType = LabelPerconaBackupType
}

return labelKeyBackupType
}

func LabelsCluster(cr *api.PerconaXtraDBCluster) map[string]string {
return map[string]string{
LabelAppKubernetesName: "percona-xtradb-cluster",
Expand Down
6 changes: 1 addition & 5 deletions pkg/pxc/backup/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ import (
)

func (*Backup) Job(cr *api.PerconaXtraDBClusterBackup, cluster *api.PerconaXtraDBCluster) *batchv1.Job {
labelKeyBackupType := "type"
if cluster.CompareVersionWith("1.16.0") >= 0 {
labelKeyBackupType = naming.LabelPerconaBackupType
}

labelKeyBackupType := naming.GetLabelBackupType(cluster)
jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron")

return &batchv1.Job{
Expand Down

0 comments on commit 1c9730b

Please sign in to comment.