Skip to content

Commit

Permalink
postgres locker
Browse files Browse the repository at this point in the history
  • Loading branch information
david-littlefarmer committed Nov 27, 2023
1 parent d489e93 commit 74e72fb
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 57 deletions.
32 changes: 32 additions & 0 deletions locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package looper

import (
"context"
"errors"
"time"
)

var (
ErrFailedToConnectToLocker = errors.New("looper - failed to connect to locker")
ErrFailedToObtainLock = errors.New("looper - failed to obtain lock")
ErrFailedToReleaseLock = errors.New("looper - failed to release lock")
ErrFailedToCreateLockTable = errors.New("looper - failed to create lock table")
ErrFailedToCheckLockExistence = errors.New("looper - failed to check lock existence")
)

type lockerKind int

const (
lockerNop lockerKind = iota
lockerRedis
)

// Lock if an error is returned by lock, the job will not be scheduled.
type locker interface {
lock(ctx context.Context, key string, timeout time.Duration) (lock, error)
}

// lock represents an obtained lock
type lock interface {
unlock(ctx context.Context) error
}
52 changes: 25 additions & 27 deletions looper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import (
"strings"
"sync"
"time"

"github.com/go-redsync/redsync/v4"
"github.com/redis/go-redis/v9"
)

// Panic handler
Expand All @@ -29,11 +26,11 @@ func SetPanicHandler(handler PanicHandlerFunc) {
// Looper
type Looper struct {
running bool
jobs []*Job
startupTime time.Duration
locker locker
hooks hooks
mu sync.RWMutex
redisClient *redis.Client
jobs []*Job
}

type (
Expand All @@ -49,13 +46,18 @@ type hooks struct {
}

type Config struct {
// Locker for jobs
//
// Options:
// PostgresLocker(ctx context.Context, db *sql.DB, table string)
// RedisLocker(ctx context.Context, rc redis.UniversalClient)
Locker locker

// Startup time ensuring a consistent delay between registered jobs on start of looper.
//
// StartupTime = 1 second; 5 registered jobs; Jobs would be initiated
// with 200ms delay
StartupTime time.Duration

RedisClient *redis.Client
}

type JobFn func(ctx context.Context) error
Expand Down Expand Up @@ -94,7 +96,7 @@ type Job struct {
// Count of unsuccessful job runs.
RunCountError uint64

// Copy of last error, that occurred.
// Copy of last error, that occured.
LastError error

// Hook function before job runs.
Expand All @@ -106,11 +108,11 @@ type Job struct {
// Hook function after job runs unsuccessfully.
AfterJobError HookAfterJobError

// If the job should use redis locker
// If the job should use locker
WithLocker bool

// Locker
locker *locker
locker locker

// Context cancel
contextCancel context.CancelFunc
Expand All @@ -119,16 +121,22 @@ type Job struct {
}

func New(config Config) *Looper {
return &Looper{
l := &Looper{
jobs: []*Job{},
startupTime: setDefaultDuration(config.StartupTime, time.Second),
hooks: hooks{
beforeJob: func(jobName string) {},
afterJob: func(jobName string, duration time.Duration) {},
afterJobError: func(jobName string, duration time.Duration, err error) {},
},
redisClient: config.RedisClient,
locker: newNopLocker(),
}

if config.Locker != nil {
l.locker = config.Locker
}

return l
}

func (l *Looper) RegisterHooks(
Expand All @@ -141,7 +149,7 @@ func (l *Looper) RegisterHooks(
l.hooks.afterJob = afterJob
}

func (l *Looper) AddJob(ctx context.Context, jobInput *Job) error {
func (l *Looper) AddJob(ctx context.Context, jobInput *Job) (err error) {
if jobInput == nil {
return nil
}
Expand Down Expand Up @@ -175,18 +183,10 @@ func (l *Looper) AddJob(ctx context.Context, jobInput *Job) error {
AfterJob: afterJob,
AfterJobError: afterJobError,
WithLocker: jobInput.WithLocker,
locker: l.locker,
mu: sync.RWMutex{},
}

if jobInput.WithLocker && l.redisClient != nil {
locker, err := newRedisLocker(ctx, l.redisClient, redsync.WithTries(1), redsync.WithExpiry(j.Timeout+time.Second))
if err != nil {
return fmt.Errorf("new redis locker for job %s: %w", j.Name, err)
}

j.locker = &locker
}

l.jobs = append(l.jobs, j)

return nil
Expand Down Expand Up @@ -325,13 +325,11 @@ func (j *Job) start() {
j.contextCancel = cancel
j.Running = true

var redisLock lock
var lo lock

if j.WithLocker {
lo := *j.locker
redisLock, errLock = lo.lock(ctxLock, j.Name)
lo, errLock = j.locker.lock(ctxLock, j.Name, j.Timeout)
if errors.Is(errLock, ErrFailedToObtainLock) {
// time.Sleep(j.WaitAfterSuccess)
time.Sleep(time.Duration(time.Second))
j.Running = false
cancel()
Expand All @@ -353,7 +351,7 @@ func (j *Job) start() {
}

if j.WithLocker && errLock == nil {
errLock = redisLock.unlock(ctxLock)
errLock = lo.unlock(ctxLock)
}

if err != nil || errLock != nil {
Expand Down
28 changes: 28 additions & 0 deletions nop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package looper

import (
"context"
"time"
)

func newNopLocker() locker {
return &nopLocker{}
}

// Locker
var _ locker = (*nopLocker)(nil)

type nopLocker struct{}

func (r *nopLocker) lock(ctx context.Context, key string, timeout time.Duration) (lock, error) {
return &nopLock{}, nil
}

// Lock
var _ lock = (*nopLock)(nil)

type nopLock struct{}

func (r *nopLock) unlock(ctx context.Context) error {
return nil
}
161 changes: 161 additions & 0 deletions postgresql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package looper

import (
"context"
"database/sql"
"fmt"
"time"
)

const defaultTableName = "looper_lock"

// PostgresLocker provides an implementation of the Locker interface using
// a PostgreSQL table for storage.
func PostgresLocker(ctx context.Context, db *sql.DB, table string) (locker, error) {
err := db.PingContext(ctx)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrFailedToConnectToLocker, err)
}

if table == "" {
table = defaultTableName
}
// Ensure the lock table exists, create it if necessary
err = createLockTable(ctx, db, table)
if err != nil {
return nil, err
}

pl := &postgresLocker{
db: db,
table: table,
}

return pl, nil
}

// Locker
var _ locker = (*postgresLocker)(nil)

type postgresLocker struct {
db *sql.DB
table string
}

func createLockTable(ctx context.Context, db *sql.DB, table string) error {
var tableExists bool
err := db.QueryRowContext(
ctx,
fmt.Sprintf(`
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = '%s'
);`,
table,
),
).Scan(&tableExists)
if err != nil {
return fmt.Errorf("%w: %v", ErrFailedToCheckLockExistence, err)
}

if !tableExists {
_, err := db.ExecContext(
ctx,
fmt.Sprintf(`
CREATE TABLE %s (
job_name VARCHAR(255) PRIMARY KEY,
created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT (now() AT TIME ZONE 'UTC')
);`,
table,
))
if err != nil {
return fmt.Errorf("%w: %v", ErrFailedToCreateLockTable, err)
}
}

return nil
}

func (p *postgresLocker) lock(
ctx context.Context,
key string,
timeout time.Duration,
) (lock, error) {
// Create a row in the lock table to acquire the lock
_, err := p.db.ExecContext(
ctx,
fmt.Sprintf(`
INSERT INTO %s (job_name)
VALUES ('%s');`,
p.table,
key,
))
if err != nil {
var createdAt time.Time
err := p.db.QueryRowContext(
ctx,
fmt.Sprintf(`
SELECT created_at
FROM %s
WHERE job_name = '%s';`,
p.table,
key,
)).Scan(&createdAt)
if err != nil {
return nil, ErrFailedToCheckLockExistence
}

if createdAt.Before(time.Now().Add(-timeout)) {
_, err := p.db.ExecContext(
ctx,
fmt.Sprintf(`
DELETE FROM %s
WHERE job_name = '%s';`,
p.table,
key,
))
if err != nil {
return nil, ErrFailedToReleaseLock
}

return p.lock(ctx, key, timeout)
}

return nil, ErrFailedToObtainLock
}

pl := &postgresLock{
db: p.db,
table: p.table,
key: key,
}

return pl, nil
}

// Lock
var _ lock = (*postgresLock)(nil)

type postgresLock struct {
db *sql.DB
table string
key string
}

func (p *postgresLock) unlock(ctx context.Context) error {
// Release the lock by deleting the row
_, err := p.db.ExecContext(
ctx,
fmt.Sprintf(`
DELETE FROM %s
WHERE job_name = '%s';`,
p.table,
p.key,
))
if err != nil {
return ErrFailedToReleaseLock
}

return nil
}
Loading

0 comments on commit 74e72fb

Please sign in to comment.