Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple cron scheduling #22

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions scheduler/schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package scheduler

import (
"regexp"
"strconv"
"strings"
"time"
)

const cronRegex = `^(\*|[0-5]?\d)(\/\d+)? (\*|[01]?\d|2[0-3])(\/\d+)? (\*|0?[1-9]|[12]\d|3[01])(\/\d+)? (\*|0?[1-9]|1[0-2])(\/\d+)? (\*|[0-6])(\/\d+)?$`

type timingType int

const (
concrete timingType = iota
step
wildcard
)

const (
minMax = 60
hourMax = 24
dayMax = 31
monthMax = 12
weekdayMax = 6
)

type timing struct {
typ timingType
val int
}

type schedule struct {
min timing
hour timing
day timing
month timing
weekday timing
}

func getDurationTillNextProc(s schedule) time.Duration {
currentTime := time.Now()

nextMonth := calcNextTime(s.month, int(currentTime.Month()), monthMax, 0)

if nextMonth > int(currentTime.Month()) {
nextDate := time.Date(
currentTime.Year(),
time.Month(nextMonth),
0,
0,
0,
0,
0,
currentTime.Location(),
)
return nextDate.Sub(currentTime)
}

nextDay := calcNextTime(s.day, currentTime.Day(), dayMax, 0)

if nextDay > currentTime.Day() {
nextDate := time.Date(
currentTime.Year(),
time.Month(nextMonth),
nextDay,
0,
0,
0,
0,
currentTime.Location(),
)
return nextDate.Sub(currentTime)
}

nextHour := calcNextTime(s.hour, currentTime.Hour(), hourMax, 0)

if nextHour > currentTime.Hour() {
nextDate := time.Date(
currentTime.Year(),
time.Month(nextMonth),
nextDay,
nextHour,
0,
0,
0,
currentTime.Location(),
)
return nextDate.Sub(currentTime)
}

nextMinute := calcNextTime(s.min, currentTime.Minute(), minMax, 1)

nextDate := time.Date(
currentTime.Year(),
time.Month(nextMonth),
nextDay,
nextHour,
nextMinute,
0,
0,
currentTime.Location(),
)
return nextDate.Sub(currentTime)
}

func calcNextTime(t timing, currentTime, maxVal, wildCardIncrement int) int {
if t.typ == wildcard {
return currentTime + wildCardIncrement
}

if t.typ == step {
stepped := min(currentTime+t.val, maxVal)
return stepped - (stepped % min(t.val, maxVal))
}

if t.val < currentTime {
return t.val + minMax
}

return t.val
}

func validateSchedule(schedule string) (bool, error) {
ok, err := regexp.MatchString(cronRegex, schedule)
if err != nil {
return false, err
}

return ok, nil
}

func parseSchedule(s string) schedule {
timings := strings.Split(s, " ")

min := convCronTiming(timings[0], 0, minMax)
hour := convCronTiming(timings[1], 0, hourMax)
day := convCronTiming(timings[2], 1, dayMax)
month := convCronTiming(timings[3], 1, monthMax)
weekday := convCronTiming(timings[4], 0, weekdayMax)

return schedule{
min: min,
hour: hour,
day: day,
month: month,
weekday: weekday,
}
}

func convCronTiming(timeOption string, minVal, maxVal int) timing {
if timeOption == "*" {
return timing{
typ: wildcard,
val: minVal,
}
}

var typ timingType
if ok, _ := regexp.MatchString(`^\*\/\d+$`, timeOption); ok {
timeOption = timeOption[2:]
typ = step
} else {
typ = concrete
}

val, err := strconv.Atoi(timeOption)
if err != nil {
panic("String to int conversion should not have failed for cron string")
}

return timing{
typ: typ,
val: max(min(val, maxVal), minVal),
}
}
111 changes: 111 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package scheduler

import (
"context"
"fmt"
"log"
"sync"
"time"
)

type job struct {
cmd func()
name string
schedule schedule
}

type Scheduler struct {
cv *sync.Cond
jobs []job
logger *log.Logger
mutex *sync.Mutex
numReady int
ready sync.Map
}

func New(logger *log.Logger) *Scheduler {
mutex := &sync.Mutex{}
cv := sync.NewCond(mutex)

return &Scheduler{
cv: cv,
jobs: nil,
logger: logger,
mutex: mutex,
numReady: 0,
ready: sync.Map{},
}
}

func (s *Scheduler) Add(schedule string, cmd func()) (bool, error) {
return s.add(schedule, cmd, fmt.Sprintf("job-%d", time.Now().UTC().Unix()))
}

func (s *Scheduler) AddWithName(schedule string, cmd func(), name string) (bool, error) {
return s.add(schedule, cmd, name)
}

func (s *Scheduler) Start(ctx context.Context) {
s.logger.Printf("Starting CRON scheduling with [%d] jobs.", len(s.jobs))
Copy link

@MrIceman MrIceman Jul 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to have a look attimers, you could just create a new timer for each cron job which would probably simplify your code even more
https://gobyexample.com/timers

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think it will necessarily simplify the code more than what it is, the body of it gorountine when dispacting a job will practically the same with replacing the time.sleep with a listening to the timer channel.

go func() {
for {
select {
case <-ctx.Done():
default:
s.mutex.Lock()
for s.numReady == 0 {
s.cv.Wait()
}

s.ready.Range(func(key, _ any) bool {
s.ready.Delete(key)
s.numReady -= 1

go func() {
job := s.jobs[key.(int)]

duration := getDurationTillNextProc(job.schedule)
s.logger.Printf("Scheduling job [%s] | Time till next proc [%s]", job.name, duration)

time.Sleep(duration)
job.cmd()

s.mutex.Lock()
defer s.mutex.Unlock()
s.ready.Store(key, true)
s.numReady += 1
s.cv.Signal()
}()

return true
})
s.mutex.Unlock()
}
}
}()
}

func (s *Scheduler) add(schedule string, cmd func(), name string) (bool, error) {
ok, err := validateSchedule(schedule)
if err != nil {
return false, err
}

if !ok {
return false, nil
}

job := job{
cmd: cmd,
schedule: parseSchedule(schedule),
name: name,
}

s.mutex.Lock()
s.ready.Store(s.numReady, true)
s.jobs = append(s.jobs, job)
s.numReady += 1
s.mutex.Unlock()

return true, nil
}