diff --git a/locker.go b/locker.go index 4a08af3..f6c98cf 100644 --- a/locker.go +++ b/locker.go @@ -14,13 +14,6 @@ var ( ErrFailedToCheckLockExistence = errors.New("looper - failed to check lock existence") ) -type lockerKind int - -const ( - lockerNop lockerKind = iota - lockerRedis -) - // Lock if an error is returned by lock, the job will not be scheduled. type locker interface { lock(ctx context.Context, key string, timeout time.Duration) (lock, error) diff --git a/looper.go b/looper.go index d705e73..72ed4c7 100644 --- a/looper.go +++ b/looper.go @@ -26,11 +26,11 @@ func SetPanicHandler(handler PanicHandlerFunc) { // Looper type Looper struct { running bool + jobs []*Job startupTime time.Duration - locker locker hooks hooks mu sync.RWMutex - jobs []*Job + locker locker } type ( @@ -46,18 +46,13 @@ type hooks struct { } type Config struct { - // Locker for jobs - // - // Options: - // PostgresLocker(ctx context.Context, db *sql.DB, table string) - // RedisLocker(ctx context.Context, rc redis.UniversalClient) - Locker locker - // Startup time ensuring a consistent delay between registered jobs on start of looper. // // StartupTime = 1 second; 5 registered jobs; Jobs would be initiated // with 200ms delay StartupTime time.Duration + + Locker locker } type JobFn func(ctx context.Context) error @@ -226,7 +221,7 @@ func (l *Looper) StartJobByName(jobName string) error { found = true if j.Active && !j.Started { j.Started = true - go j.start() + go j.startLoop() } } @@ -263,7 +258,7 @@ func (l *Looper) startJobs() { j.mu.Lock() if j.Active && !j.Started { j.Started = true - go j.start() + go j.startLoop() time.Sleep(delay) } @@ -299,7 +294,7 @@ func (l *Looper) Stop() { l.mu.Unlock() } -func (j *Job) start() { +func (j *Job) startLoop() { defer func() { j.mu.Lock() j.Started = false @@ -307,10 +302,6 @@ func (j *Job) start() { j.mu.Unlock() }() - var errLock error - var err error - ctxLock := context.Background() - for { j.mu.RLock() if !j.Active || !j.Started { @@ -319,65 +310,86 @@ func (j *Job) start() { } j.mu.RUnlock() - ctx, cancel := context.WithTimeout(context.Background(), j.Timeout) - - j.mu.Lock() - j.contextCancel = cancel - j.Running = true - - var lo lock - - if j.WithLocker { - lo, errLock = j.locker.lock(ctxLock, j.Name, j.Timeout) - if errors.Is(errLock, ErrFailedToObtainLock) { - time.Sleep(time.Duration(time.Second)) - j.Running = false - cancel() - j.mu.Unlock() - continue - } + start := time.Now() - if errLock != nil { - err = errLock - } + j.BeforeJob(j.Name) + err := j.start() + if err != nil { + j.AfterJobError(j.Name, time.Since(start), err) + time.Sleep(j.WaitAfterError) + } else { + j.AfterJob(j.Name, time.Since(start)) + time.Sleep(j.WaitAfterSuccess) } + } +} - j.BeforeJob(j.Name) +func (j *Job) start() error { + defer func() { + j.mu.Lock() + j.Running = false j.mu.Unlock() + }() - start := time.Now() - if err == nil { - err = j.Run(ctx) - } + j.mu.Lock() + j.Running = true + j.mu.Unlock() - if j.WithLocker && errLock == nil { - errLock = lo.unlock(ctxLock) + lo, err := j.lock() + if err != nil { + if errors.Is(err, ErrFailedToObtainLock) { + time.Sleep(time.Second) + return nil } - if err != nil || errLock != nil { - if err != nil { - j.AfterJobError(j.Name, time.Since(start), err) - } else { - j.AfterJobError(j.Name, time.Since(start), errLock) - } + return err + } - time.Sleep(j.WaitAfterError) - } else { - j.AfterJob(j.Name, time.Since(start)) - time.Sleep(j.WaitAfterSuccess) + ctx, cancel := context.WithTimeout(context.Background(), j.Timeout) + defer cancel() + + j.contextCancel = cancel + + err = j.run(ctx) + if err != nil { + errLock := j.unlock(lo) + if errLock != nil { + return errors.Join(err, errLock) } - cancel() + return err + } + + err = j.unlock(lo) + if err != nil { + return err } + + return nil +} + +func (j *Job) lock() (lo lock, err error) { + if j.WithLocker { + lo, err = j.locker.lock(context.Background(), j.Name, j.Timeout) + } + + return lo, err } -func (j *Job) Run(ctx context.Context) (err error) { +func (j *Job) unlock(lo lock) (err error) { + if j.WithLocker { + return lo.unlock(context.Background()) + } + + return nil +} + +func (j *Job) run(ctx context.Context) (err error) { defer func() { j.mu.Lock() defer j.mu.Unlock() j.LastRun = time.Now() - j.Running = false r := recover() if r != nil {