diff --git a/pkg/queue/postgres/retention.go b/pkg/queue/postgres/retention.go index ae8ea059..a85ba6ef 100644 --- a/pkg/queue/postgres/retention.go +++ b/pkg/queue/postgres/retention.go @@ -26,8 +26,8 @@ const ( RetentionTask queue.TaskType = "retention" ) -// retentionTaskSpec defines a SQL task to remove completed tasks that match given criteria. -type retentionTaskSpec struct { +// RetentionTaskSpec defines a SQL task to remove completed tasks that match given criteria. +type RetentionTaskSpec struct { handlers.SQLExecTaskSpec QueueName string `json:"queueName"` TaskType queue.TaskType `json:"taskType"` @@ -47,10 +47,25 @@ func AssertRetentionSchedule(ctx context.Context, db *sql.DB, queueName string, span.SetTag("pkg.name", "postgres") spec := createRetentionSpec(queueName, taskType, status, age) + + return AssertRetentionScheduleWithSpec(ctx, db, spec) +} + +// AssertRetentionScheduleWithSpec ensures that a queue retention tasks exists with the provided task spec. +// The task is scheduled for a random minute throughout the hour. Use AssertRetentionSchedule to create simple +// retention tasks based on queue, type, and age. Callers should only use AssertRetentionScheduleWithSpec when +// the retention policy by age is not accurate enough for their needs. +// +// An upsert pattern is used to ensure that this retention task is scheduled exactly once. +func AssertRetentionScheduleWithSpec(ctx context.Context, db *sql.DB, spec RetentionTaskSpec) (err error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "AssertRetentionScheduleWithSpec") + span.SetTag("pkg.name", "postgres") + specBytes, err := json.Marshal(spec) if err != nil { return fmt.Errorf("can not build retention task spec: %w", err) } + // randomly distribute the retention tasks throughout the hour when := rand.Intn(60) retentionSchedule := queue.TaskScheduleRequest{ @@ -111,8 +126,8 @@ func AssertRetentionSchedule(ctx context.Context, db *sql.DB, queueName string, } //createRetentionSpec builds the task retention job spec. It is split out to simplify test setup -func createRetentionSpec(queueName string, taskType queue.TaskType, status queue.TaskStatus, age time.Duration) retentionTaskSpec { - spec := retentionTaskSpec{ +func createRetentionSpec(queueName string, taskType queue.TaskType, status queue.TaskStatus, age time.Duration) RetentionTaskSpec { + spec := RetentionTaskSpec{ QueueName: queueName, TaskType: taskType, Status: status,