-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathscheduler.go
129 lines (112 loc) · 3.66 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package asyncer
import (
"context"
"errors"
"time"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
)
type (
// SchedulerServer is a wrapper for asynq.Scheduler.
SchedulerServer struct {
asynq *asynq.Scheduler
}
// SchedulerServerOption is a function that configures a SchedulerServer.
SchedulerServerOption func(*asynq.SchedulerOpts)
)
// NewSchedulerServer creates a new scheduler client and returns the server.
func NewSchedulerServer(redisClient redis.UniversalClient, opts ...SchedulerServerOption) *SchedulerServer {
// setup asynq scheduler config
cnf := &asynq.SchedulerOpts{
LogLevel: asynq.ErrorLevel,
Location: time.UTC,
}
// Apply options
for _, opt := range opts {
opt(cnf)
}
return &SchedulerServer{
asynq: asynq.NewSchedulerFromRedisClient(redisClient, cnf),
}
}
// ScheduleTask schedules a task based on the given cron specification and task name.
// It returns an error if the cron specification or task name is empty, or if there was an error registering the task.
func (srv *SchedulerServer) ScheduleTask(cronSpec, taskName string, opts ...TaskOption) error {
if cronSpec == "" {
return errors.Join(ErrFailedToScheduleTask, ErrCronSpecIsEmpty)
}
if taskName == "" {
return errors.Join(ErrFailedToScheduleTask, ErrTaskNameIsEmpty)
}
if _, err := srv.asynq.Register(cronSpec, asynq.NewTask(taskName, nil, opts...)); err != nil {
return errors.Join(ErrFailedToScheduleTask, err)
}
return nil
}
// Run runs the scheduler with the provided handlers.
// It returns a function that can be used to run server in a error group.
// E.g.:
//
// eg, ctx := errgroup.WithContext(context.Background())
// eg.Go(schedulerServer.Run())
func (srv *SchedulerServer) Run() func() error {
return func() error {
// Run scheduler
if err := srv.asynq.Run(); err != nil {
return errors.Join(ErrFailedToStartSchedulerServer, err)
}
return nil
}
}
// Shutdown gracefully shuts down the scheduler server by waiting for all
// pending tasks to be processed.
func (srv *SchedulerServer) Shutdown() {
srv.asynq.Shutdown()
}
// RunSchedulerServer runs the scheduler server with the given Redis connection string,
// logger, and scheduled task handlers.
// It returns a function that can be used to run server in a error group.
// E.g.:
//
// eg, ctx := errgroup.WithContext(context.Background())
// eg.Go(asyncer.RunSchedulerServer(
// "redis://localhost:6379",
// logger,
// asyncer.NewTaskScheduler("@every 1h", "scheduled_task_1"),
// ))
// eg.Go(asyncer.RunQueueServer(
// "redis://localhost:6379",
// logger,
// asyncer.ScheduledHandlerFunc("scheduled_task_1", scheduledTaskHandler),
// ))
//
// func scheduledTaskHandler(ctx context.Context) error {
// // ...handle task here...
// }
//
// The function returns an error if the server fails to start.
// The function panics if the Redis connection string is invalid.
//
// !!! Pay attention, that the scheduler just triggers the job, so you need to run queue server as well.
func RunSchedulerServer(ctx context.Context, redisClient redis.UniversalClient, log asynq.Logger, schedulers ...TaskScheduler) func() error {
// Init scheduler server
var opts []SchedulerServerOption
if log != nil {
opts = append(opts, WithSchedulerLogger(log))
}
return func() error {
srv := NewSchedulerServer(redisClient, opts...)
defer srv.Shutdown()
// Register schedulers
for _, scheduler := range schedulers {
if err := srv.ScheduleTask(scheduler.Schedule(), scheduler.TaskName(), scheduler.Options()...); err != nil {
return errors.Join(ErrFailedToRunSchedulerServer, err)
}
}
// Run server
eg, _ := errgroup.WithContext(ctx)
eg.Go(srv.Run())
return eg.Wait()
}
}