Skip to content

Commit

Permalink
Optimize job scheduling by avoiding redundant scheduling commands whe…
Browse files Browse the repository at this point in the history
…n `delayedScheduleNextJob` is active.

Use debug level for detailed scheduling logs
  • Loading branch information
billettc committed Feb 12, 2025
1 parent 4599855 commit 7163df7
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions orchestrator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {
if errors.Is(err, work.ErrorResourceExhausted) {
s.logger.Info("resource exhausted", zap.Error(err))
if s.delayedScheduleNextJob {
s.logger.Info("skipping delayed schedule next job")
s.logger.Debug("skipping delayed schedule next job")
return nil
}
s.logger.Info("scheduling delayed schedule next job")
s.logger.Debug("scheduling delayed schedule next job")
s.delayedScheduleNextJob = true
return loop.Tick(10*time.Second, work.DelayedMsgScheduleNextJob{
TriggerBy: "resource exhausted",
Expand All @@ -130,10 +130,10 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {
s.logger.Info("resource exhausted ramp up", zap.Error(err))

if s.delayedScheduleNextJob {
s.logger.Info("skipping ramp up delayed schedule next job")
s.logger.Debug("skipping ramp up delayed schedule next job")
return nil
}
s.logger.Info("scheduling delayed schedule next job for ramp up")
s.logger.Debug("scheduling delayed schedule next job for ramp up")
s.delayedScheduleNextJob = true
return loop.Tick(1*time.Second, work.DelayedMsgScheduleNextJob{
TriggerBy: "resource exhausted ramp up",
Expand All @@ -143,13 +143,12 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {
return nil //todo: wrap in a retry loop or just let it go through
}
}
s.logger.Info("worker borrowed", zap.String("worker_id", worker.ID()))
workUnit, workRange := s.Stages.NextJob()
if workRange == nil { // End of job
return nil
}

s.logger.Info("scheduling work", zap.Object("unit", workUnit))
s.logger.Info("worker borrowed, scheduling work", zap.String("worker_id", worker.ID()), zap.Object("unit", workUnit))
modules := s.Stages.StageModules(workUnit.Stage)

return loop.Batch(
Expand All @@ -163,15 +162,19 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {

case stage.MsgMergeFinished:
s.Stages.MergeCompleted(msg.Unit)
if !s.delayedScheduleNextJob {
cmds = append(cmds, work.CmdScheduleNextJob("all store completed"))
}
cmds = append(cmds,
//work.CmdScheduleNextJob("merge finished"),
s.Stages.CmdTryMerge(msg.Stage),
)

case stage.MsgAllStoresCompleted:
s.storesSyncCompleted = true
if !s.delayedScheduleNextJob {
cmds = append(cmds, work.CmdScheduleNextJob("all store completed"))
}
cmds = append(cmds,
work.CmdScheduleNextJob("all store completed"), // in case some mapper jobs need scheduling
s.cmdShutdownWhenComplete(),
)

Expand Down

0 comments on commit 7163df7

Please sign in to comment.