Skip to content

Commit

Permalink
feat: optimize queue error (#874)
Browse files Browse the repository at this point in the history
* feat: optimize queue error

* feat: optimize queue error

* Update queue/worker.go

Co-authored-by: Wenbo Han <[email protected]>

---------

Co-authored-by: Wenbo Han <[email protected]>
  • Loading branch information
devhaozi and hwbrzzl authored Feb 10, 2025
1 parent c378d7d commit da0832e
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 12 deletions.
16 changes: 7 additions & 9 deletions errors/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,13 @@ var (
OrmRecordNotFound = New("record not found")
OrmDeletedAtColumnNotFound = New("deleted at column not found")

QueueDriverAsyncNoJobFound = New("no job found in %s queue")
QueueDriverSyncNotNeedRun = New("queue %s driver sync not need run")
QueueDriverNotSupported = New("unknown queue driver: %s")
QueueDriverInvalid = New("%s doesn't implement contracts/queue/driver")
QueueDuplicateJobSignature = New("job signature duplicate: %s, the names of Job and Listener cannot be duplicated")
QueueEmptyJobSignature = New("the Signature of job can't be empty")
QueueEmptyListenerSignature = New("the Signature of listener can't be empty")
QueueJobNotFound = New("job not found: %s")
QueueFailedToSaveFailedJob = New("failed to save failed job: %v")
QueueDriverNoJobFound = New("no job found in %s queue")
QueueDriverSyncNotNeedRun = New("queue %s driver sync not need run")
QueueDriverNotSupported = New("unknown queue driver: %s")
QueueDriverInvalid = New("%s doesn't implement contracts/queue/driver")
QueueDriverFailedToPop = New("failed to pop job from %s queue: %v")
QueueJobNotFound = New("job not found: %s")
QueueFailedToSaveFailedJob = New("failed to save failed job: %v")

RouteDefaultDriverNotSet = New("please set default driver")
RouteInvalidDriver = New("init %s route driver fail: route must be implement route.Route or func() (route.Route, error)")
Expand Down
4 changes: 2 additions & 2 deletions queue/driver_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ func (r *Async) Later(delay time.Time, job contractsqueue.Job, args []any, queue
func (r *Async) Pop(queue string) (contractsqueue.Job, []any, error) {
ch, ok := asyncQueues.Load(queue)
if !ok {
return nil, nil, errors.QueueDriverAsyncNoJobFound.Args(queue)
return nil, nil, errors.QueueDriverNoJobFound.Args(queue)
}

queueChan := ch.(chan contractsqueue.Jobs)
select {
case job := <-queueChan:
return job.Job, job.Args, nil
default:
return nil, nil, errors.QueueDriverAsyncNoJobFound.Args(queue)
return nil, nil, errors.QueueDriverNoJobFound.Args(queue)
}
}

Expand Down
18 changes: 17 additions & 1 deletion queue/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Worker struct {
job queue.JobRepository
queue string
wg sync.WaitGroup
currentDelay time.Duration
maxDelay time.Duration
}

func NewWorker(config queue.Config, concurrent int, connection string, queue string, job queue.JobRepository) *Worker {
Expand All @@ -31,6 +33,8 @@ func NewWorker(config queue.Config, concurrent int, connection string, queue str
job: job,
queue: queue,
failedJobChan: make(chan FailedJob, concurrent),
currentDelay: 1 * time.Second,
maxDelay: 32 * time.Second,
}
}

Expand All @@ -56,10 +60,22 @@ func (r *Worker) Run() error {

job, args, err := driver.Pop(r.queue)
if err != nil {
time.Sleep(1 * time.Second)
if !errors.Is(err, errors.QueueDriverNoJobFound) {
LogFacade.Error(errors.QueueDriverFailedToPop.Args(r.queue, err))

r.currentDelay *= 2
if r.currentDelay > r.maxDelay {
r.currentDelay = r.maxDelay
}
}

time.Sleep(r.currentDelay)

continue
}

r.currentDelay = 1 * time.Second

if err = r.job.Call(job.Signature(), args); err != nil {
r.failedJobChan <- FailedJob{
UUID: uuid.New(),
Expand Down

0 comments on commit da0832e

Please sign in to comment.