Skip to content

Commit

Permalink
Add initial retry logic to Executor Job creation and Log reading
Browse files Browse the repository at this point in the history
  • Loading branch information
lbeckman314 committed Jan 30, 2025
1 parent e0abe13 commit e26a314
Showing 1 changed file with 34 additions and 3 deletions.
37 changes: 34 additions & 3 deletions worker/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"strings"
"text/template"
"time"

"github.com/ohsu-comp-bio/funnel/tes"
v1 "k8s.io/api/batch/v1"
Expand Down Expand Up @@ -95,8 +96,19 @@ func (kcmd KubernetesCommand) Run(ctx context.Context) error {
_, err = client.Create(ctx, job, metav1.CreateOptions{})

if err != nil {
// TODO: Retry creating the Exeuctor Pod on failure
return fmt.Errorf("creating job in worker: %v", err)
// Retry creating the Executor Pod on failure
var retryCount int
for retryCount < 3 {
_, err = client.Create(ctx, job, metav1.CreateOptions{})
if err == nil {
break
}
retryCount++
time.Sleep(2 * time.Second)
}
if retryCount == 3 {
return fmt.Errorf("creating job in worker after 3 attempts: %v", err)
}
}

// Wait until the job finishes
Expand All @@ -114,7 +126,26 @@ func (kcmd KubernetesCommand) Run(ctx context.Context) error {
podLogs, err := req.Stream(ctx)

if err != nil {
// TODO: Retry reading the Executor Logs on failure
// Retry reading the Executor Logs on failure
var retryCount int
for retryCount < 3 {
podLogs, err := req.Stream(ctx)
if err == nil {
defer podLogs.Close()
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err == nil {
var bytes = buf.Bytes()
kcmd.Stdout.Write(bytes)
break
}
}
retryCount++
time.Sleep(2 * time.Second)
}
if retryCount == 3 {
return fmt.Errorf("failed to read logs after 3 attempts: %v", err)
}
return err
}

Expand Down

0 comments on commit e26a314

Please sign in to comment.