From 74e72fb045b0b22f6535a12a2c5ac343f62a6135 Mon Sep 17 00:00:00 2001 From: david-littlefarmer Date: Mon, 27 Nov 2023 16:43:09 +0100 Subject: [PATCH] postgres locker --- locker.go | 32 ++++++++++ looper.go | 52 ++++++++-------- nop.go | 28 +++++++++ postgresql.go | 161 ++++++++++++++++++++++++++++++++++++++++++++++++++ redis.go | 61 +++++++++---------- 5 files changed, 277 insertions(+), 57 deletions(-) create mode 100644 locker.go create mode 100644 nop.go create mode 100644 postgresql.go diff --git a/locker.go b/locker.go new file mode 100644 index 0000000..4a08af3 --- /dev/null +++ b/locker.go @@ -0,0 +1,32 @@ +package looper + +import ( + "context" + "errors" + "time" +) + +var ( + ErrFailedToConnectToLocker = errors.New("looper - failed to connect to locker") + ErrFailedToObtainLock = errors.New("looper - failed to obtain lock") + ErrFailedToReleaseLock = errors.New("looper - failed to release lock") + ErrFailedToCreateLockTable = errors.New("looper - failed to create lock table") + ErrFailedToCheckLockExistence = errors.New("looper - failed to check lock existence") +) + +type lockerKind int + +const ( + lockerNop lockerKind = iota + lockerRedis +) + +// Lock if an error is returned by lock, the job will not be scheduled. +type locker interface { + lock(ctx context.Context, key string, timeout time.Duration) (lock, error) +} + +// lock represents an obtained lock +type lock interface { + unlock(ctx context.Context) error +} diff --git a/looper.go b/looper.go index 76de7b4..d705e73 100644 --- a/looper.go +++ b/looper.go @@ -7,9 +7,6 @@ import ( "strings" "sync" "time" - - "github.com/go-redsync/redsync/v4" - "github.com/redis/go-redis/v9" ) // Panic handler @@ -29,11 +26,11 @@ func SetPanicHandler(handler PanicHandlerFunc) { // Looper type Looper struct { running bool - jobs []*Job startupTime time.Duration + locker locker hooks hooks mu sync.RWMutex - redisClient *redis.Client + jobs []*Job } type ( @@ -49,13 +46,18 @@ type hooks struct { } type Config struct { + // Locker for jobs + // + // Options: + // PostgresLocker(ctx context.Context, db *sql.DB, table string) + // RedisLocker(ctx context.Context, rc redis.UniversalClient) + Locker locker + // Startup time ensuring a consistent delay between registered jobs on start of looper. // // StartupTime = 1 second; 5 registered jobs; Jobs would be initiated // with 200ms delay StartupTime time.Duration - - RedisClient *redis.Client } type JobFn func(ctx context.Context) error @@ -94,7 +96,7 @@ type Job struct { // Count of unsuccessful job runs. RunCountError uint64 - // Copy of last error, that occurred. + // Copy of last error, that occured. LastError error // Hook function before job runs. @@ -106,11 +108,11 @@ type Job struct { // Hook function after job runs unsuccessfully. AfterJobError HookAfterJobError - // If the job should use redis locker + // If the job should use locker WithLocker bool // Locker - locker *locker + locker locker // Context cancel contextCancel context.CancelFunc @@ -119,7 +121,7 @@ type Job struct { } func New(config Config) *Looper { - return &Looper{ + l := &Looper{ jobs: []*Job{}, startupTime: setDefaultDuration(config.StartupTime, time.Second), hooks: hooks{ @@ -127,8 +129,14 @@ func New(config Config) *Looper { afterJob: func(jobName string, duration time.Duration) {}, afterJobError: func(jobName string, duration time.Duration, err error) {}, }, - redisClient: config.RedisClient, + locker: newNopLocker(), + } + + if config.Locker != nil { + l.locker = config.Locker } + + return l } func (l *Looper) RegisterHooks( @@ -141,7 +149,7 @@ func (l *Looper) RegisterHooks( l.hooks.afterJob = afterJob } -func (l *Looper) AddJob(ctx context.Context, jobInput *Job) error { +func (l *Looper) AddJob(ctx context.Context, jobInput *Job) (err error) { if jobInput == nil { return nil } @@ -175,18 +183,10 @@ func (l *Looper) AddJob(ctx context.Context, jobInput *Job) error { AfterJob: afterJob, AfterJobError: afterJobError, WithLocker: jobInput.WithLocker, + locker: l.locker, mu: sync.RWMutex{}, } - if jobInput.WithLocker && l.redisClient != nil { - locker, err := newRedisLocker(ctx, l.redisClient, redsync.WithTries(1), redsync.WithExpiry(j.Timeout+time.Second)) - if err != nil { - return fmt.Errorf("new redis locker for job %s: %w", j.Name, err) - } - - j.locker = &locker - } - l.jobs = append(l.jobs, j) return nil @@ -325,13 +325,11 @@ func (j *Job) start() { j.contextCancel = cancel j.Running = true - var redisLock lock + var lo lock if j.WithLocker { - lo := *j.locker - redisLock, errLock = lo.lock(ctxLock, j.Name) + lo, errLock = j.locker.lock(ctxLock, j.Name, j.Timeout) if errors.Is(errLock, ErrFailedToObtainLock) { - // time.Sleep(j.WaitAfterSuccess) time.Sleep(time.Duration(time.Second)) j.Running = false cancel() @@ -353,7 +351,7 @@ func (j *Job) start() { } if j.WithLocker && errLock == nil { - errLock = redisLock.unlock(ctxLock) + errLock = lo.unlock(ctxLock) } if err != nil || errLock != nil { diff --git a/nop.go b/nop.go new file mode 100644 index 0000000..66b9618 --- /dev/null +++ b/nop.go @@ -0,0 +1,28 @@ +package looper + +import ( + "context" + "time" +) + +func newNopLocker() locker { + return &nopLocker{} +} + +// Locker +var _ locker = (*nopLocker)(nil) + +type nopLocker struct{} + +func (r *nopLocker) lock(ctx context.Context, key string, timeout time.Duration) (lock, error) { + return &nopLock{}, nil +} + +// Lock +var _ lock = (*nopLock)(nil) + +type nopLock struct{} + +func (r *nopLock) unlock(ctx context.Context) error { + return nil +} diff --git a/postgresql.go b/postgresql.go new file mode 100644 index 0000000..ed58063 --- /dev/null +++ b/postgresql.go @@ -0,0 +1,161 @@ +package looper + +import ( + "context" + "database/sql" + "fmt" + "time" +) + +const defaultTableName = "looper_lock" + +// PostgresLocker provides an implementation of the Locker interface using +// a PostgreSQL table for storage. +func PostgresLocker(ctx context.Context, db *sql.DB, table string) (locker, error) { + err := db.PingContext(ctx) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrFailedToConnectToLocker, err) + } + + if table == "" { + table = defaultTableName + } + // Ensure the lock table exists, create it if necessary + err = createLockTable(ctx, db, table) + if err != nil { + return nil, err + } + + pl := &postgresLocker{ + db: db, + table: table, + } + + return pl, nil +} + +// Locker +var _ locker = (*postgresLocker)(nil) + +type postgresLocker struct { + db *sql.DB + table string +} + +func createLockTable(ctx context.Context, db *sql.DB, table string) error { + var tableExists bool + err := db.QueryRowContext( + ctx, + fmt.Sprintf(` + SELECT EXISTS ( + SELECT 1 + FROM information_schema.tables + WHERE table_name = '%s' + );`, + table, + ), + ).Scan(&tableExists) + if err != nil { + return fmt.Errorf("%w: %v", ErrFailedToCheckLockExistence, err) + } + + if !tableExists { + _, err := db.ExecContext( + ctx, + fmt.Sprintf(` + CREATE TABLE %s ( + job_name VARCHAR(255) PRIMARY KEY, + created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT (now() AT TIME ZONE 'UTC') + );`, + table, + )) + if err != nil { + return fmt.Errorf("%w: %v", ErrFailedToCreateLockTable, err) + } + } + + return nil +} + +func (p *postgresLocker) lock( + ctx context.Context, + key string, + timeout time.Duration, +) (lock, error) { + // Create a row in the lock table to acquire the lock + _, err := p.db.ExecContext( + ctx, + fmt.Sprintf(` + INSERT INTO %s (job_name) + VALUES ('%s');`, + p.table, + key, + )) + if err != nil { + var createdAt time.Time + err := p.db.QueryRowContext( + ctx, + fmt.Sprintf(` + SELECT created_at + FROM %s + WHERE job_name = '%s';`, + p.table, + key, + )).Scan(&createdAt) + if err != nil { + return nil, ErrFailedToCheckLockExistence + } + + if createdAt.Before(time.Now().Add(-timeout)) { + _, err := p.db.ExecContext( + ctx, + fmt.Sprintf(` + DELETE FROM %s + WHERE job_name = '%s';`, + p.table, + key, + )) + if err != nil { + return nil, ErrFailedToReleaseLock + } + + return p.lock(ctx, key, timeout) + } + + return nil, ErrFailedToObtainLock + } + + pl := &postgresLock{ + db: p.db, + table: p.table, + key: key, + } + + return pl, nil +} + +// Lock +var _ lock = (*postgresLock)(nil) + +type postgresLock struct { + db *sql.DB + table string + key string +} + +func (p *postgresLock) unlock(ctx context.Context) error { + // Release the lock by deleting the row + _, err := p.db.ExecContext( + ctx, + fmt.Sprintf(` + DELETE FROM %s + WHERE job_name = '%s';`, + p.table, + p.key, + )) + if err != nil { + return ErrFailedToReleaseLock + } + + return nil +} diff --git a/redis.go b/redis.go index 14133e5..f2bdd6d 100644 --- a/redis.go +++ b/redis.go @@ -1,55 +1,55 @@ package looper -// inpired by https://github.com/go-co-op/gocron-redis-lock - import ( "context" - "errors" "fmt" + "time" "github.com/go-redsync/redsync/v4" "github.com/go-redsync/redsync/v4/redis/goredis/v9" "github.com/redis/go-redis/v9" ) -var ( - ErrFailedToConnectToRedis = errors.New("looper - failed to connect to redis") - ErrFailedToObtainLock = errors.New("looper - failed to obtain lock") - ErrFailedToReleaseLock = errors.New("looper - failed to release lock") -) - -type locker interface { - lock(ctx context.Context, key string) (lock, error) -} - -type lock interface { - unlock(ctx context.Context) error -} - -func newRedisLocker(ctx context.Context, r redis.UniversalClient, options ...redsync.Option) (locker, error) { - err := r.Ping(ctx).Err() +// redisOptions := &redis.Options{ +// Addr: conf.Redis.Host, +// } +// +// redisClient := redis.NewClient(redisOptions) +// +// looperRedis, err := looper.RedisLocker(ctx, redisClient) +// if err != nil { +// return err +// } + +// RedisLocker provides an implementation of the Locker interface using +// redis for storage. +func RedisLocker(ctx context.Context, rc redis.UniversalClient) (locker, error) { + err := rc.Ping(ctx).Err() if err != nil { - return nil, fmt.Errorf("%s: %w", ErrFailedToConnectToRedis, err) + return nil, fmt.Errorf("%s: %w", ErrFailedToConnectToLocker, err) } - return newLocker(r, options...), nil -} - -func newLocker(r redis.UniversalClient, options ...redsync.Option) locker { - pool := goredis.NewPool(r) + pool := goredis.NewPool(rc) rs := redsync.New(pool) - return &redisLocker{rs: rs, options: options} + + l := redisLocker{rs: rs} + + return &l, nil } +// Locker var _ locker = (*redisLocker)(nil) type redisLocker struct { - rs *redsync.Redsync - options []redsync.Option + rs *redsync.Redsync } -func (r *redisLocker) lock(ctx context.Context, key string) (lock, error) { - mu := r.rs.NewMutex(key, r.options...) +func (r *redisLocker) lock(ctx context.Context, key string, timeout time.Duration) (lock, error) { + options := []redsync.Option{ + redsync.WithTries(1), + redsync.WithExpiry(timeout + time.Second), + } + mu := r.rs.NewMutex(key, options...) err := mu.LockContext(ctx) if err != nil { return nil, ErrFailedToObtainLock @@ -62,6 +62,7 @@ func (r *redisLocker) lock(ctx context.Context, key string) (lock, error) { return rl, nil } +// Lock var _ lock = (*redisLock)(nil) type redisLock struct {