Skip to content
This repository has been archived by the owner on Jun 12, 2024. It is now read-only.

Commit

Permalink
Add generic retention task assertion (#65)
Browse files Browse the repository at this point in the history
**What**
- Add new public method for asserting any retention task. This allows us
  to build the task spec in our applciations but still use the generic
  assertion code
Signed-off-by: Lucas Roesler <[email protected]>
  • Loading branch information
LucasRoesler authored Oct 16, 2020
1 parent 55ecca5 commit 5818488
Showing 1 changed file with 19 additions and 4 deletions.
23 changes: 19 additions & 4 deletions pkg/queue/postgres/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ const (
RetentionTask queue.TaskType = "retention"
)

// retentionTaskSpec defines a SQL task to remove completed tasks that match given criteria.
type retentionTaskSpec struct {
// RetentionTaskSpec defines a SQL task to remove completed tasks that match given criteria.
type RetentionTaskSpec struct {
handlers.SQLExecTaskSpec
QueueName string `json:"queueName"`
TaskType queue.TaskType `json:"taskType"`
Expand All @@ -47,10 +47,25 @@ func AssertRetentionSchedule(ctx context.Context, db *sql.DB, queueName string,
span.SetTag("pkg.name", "postgres")

spec := createRetentionSpec(queueName, taskType, status, age)

return AssertRetentionScheduleWithSpec(ctx, db, spec)
}

// AssertRetentionScheduleWithSpec ensures that a queue retention tasks exists with the provided task spec.
// The task is scheduled for a random minute throughout the hour. Use AssertRetentionSchedule to create simple
// retention tasks based on queue, type, and age. Callers should only use AssertRetentionScheduleWithSpec when
// the retention policy by age is not accurate enough for their needs.
//
// An upsert pattern is used to ensure that this retention task is scheduled exactly once.
func AssertRetentionScheduleWithSpec(ctx context.Context, db *sql.DB, spec RetentionTaskSpec) (err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "AssertRetentionScheduleWithSpec")
span.SetTag("pkg.name", "postgres")

specBytes, err := json.Marshal(spec)
if err != nil {
return fmt.Errorf("can not build retention task spec: %w", err)
}

// randomly distribute the retention tasks throughout the hour
when := rand.Intn(60)
retentionSchedule := queue.TaskScheduleRequest{
Expand Down Expand Up @@ -111,8 +126,8 @@ func AssertRetentionSchedule(ctx context.Context, db *sql.DB, queueName string,
}

//createRetentionSpec builds the task retention job spec. It is split out to simplify test setup
func createRetentionSpec(queueName string, taskType queue.TaskType, status queue.TaskStatus, age time.Duration) retentionTaskSpec {
spec := retentionTaskSpec{
func createRetentionSpec(queueName string, taskType queue.TaskType, status queue.TaskStatus, age time.Duration) RetentionTaskSpec {
spec := RetentionTaskSpec{
QueueName: queueName,
TaskType: taskType,
Status: status,
Expand Down

0 comments on commit 5818488

Please sign in to comment.