Skip to content

Commit

Permalink
Merge branch 'main' into pithy-sleep-for-days
Browse files Browse the repository at this point in the history
  • Loading branch information
THardy98 authored Feb 1, 2025
2 parents 492f5a5 + 530266c commit 7a05457
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 4 deletions.
2 changes: 2 additions & 0 deletions early-return/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ func main() {
w := worker.New(c, earlyreturn.TaskQueueName, worker.Options{})

w.RegisterWorkflow(earlyreturn.Workflow)
w.RegisterActivity(earlyreturn.CompleteTransaction)
w.RegisterActivity(earlyreturn.CancelTransaction)

err = w.Run(worker.InterruptCh())
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions safe_message_handler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,14 @@ func (cm *ClusterManager) run(ctx workflow.Context) (ClusterManagerResult, error
return ClusterManagerResult{}, err
}
cm.logger.Info("Continuing as new")
return ClusterManagerResult{}, workflow.NewContinueAsNewError(ctx, ClusterManagerInput{
State: &cm.state,
TestContinueAsNew: cm.testContinueAsNew,
})
return ClusterManagerResult{}, workflow.NewContinueAsNewError(
ctx,
ClusterManagerWorkflow,
ClusterManagerInput{
State: &cm.state,
TestContinueAsNew: cm.testContinueAsNew,
},
)
}

}
Expand Down
5 changes: 5 additions & 0 deletions worker-specific-task-queues/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,8 @@ Start the Workflow Execution:
```bash
go run worker-specific-task-queues/starter/main.go
```

### Things to try
You can try to intentionally crash Workers while they are doing work to see what happens when work gets "stuck" in a unique queue: currently the Workflow will `scheduleToCloseTimeout` without a Worker, and retry when a Worker comes back online.

After the 5th attempt, it logs `Workflow failed after multiple retries.` and exits. But you may wish to implement compensatory logic, including notifying you.
18 changes: 18 additions & 0 deletions worker-specific-task-queues/workflow.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package worker_specific_task_queues

import (
"go.temporal.io/sdk/temporal"
"path/filepath"
"time"

Expand All @@ -11,8 +12,25 @@ import (
// FileProcessingWorkflow is a workflow that uses Worker-specific Task Queues to run multiple Activities on a consistent
// host.
func FileProcessingWorkflow(ctx workflow.Context) (err error) {
// When using a worker-specific task queue, if a failure occurs, we want to retry all of the worker-specific
// logic, so wrap all the logic here in a loop.
for attempt := range 5 {
if err = processFile(ctx); err == nil {
workflow.GetLogger(ctx).Info("Workflow completed.")
return
}
workflow.GetLogger(ctx).Error("Attempt failed, trying on new worker", attempt+1)
}
workflow.GetLogger(ctx).Error("Workflow failed after multiple retries.", "Error", err.Error())
return
}

func processFile(ctx workflow.Context) (err error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 1,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
var WorkerSpecificTaskQueue string
Expand Down
53 changes: 53 additions & 0 deletions worker-specific-task-queues/workflow_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package worker_specific_task_queues

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -29,3 +31,54 @@ func Test_Workflow(t *testing.T) {
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
}

func Test_RetrySuccess(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

// Mock activity implementation
var a WorkerSpecificTaskQueue
env.RegisterActivityWithOptions(a.GetWorkerSpecificTaskQueue, activity.RegisterOptions{
Name: "GetWorkerSpecificTaskQueue",
})

counter := 0
env.OnActivity("GetWorkerSpecificTaskQueue", mock.Anything).Return(func(ctx context.Context) (string, error) {
counter++
// Workflow retries up to 5 times
if counter < 3 {
return "", errors.New("temporary error")
}
return "unique-task-queue", nil
})
env.OnActivity(DownloadFile, mock.Anything, mock.Anything, mock.Anything).Return(nil)
env.OnActivity(ProcessFile, mock.Anything, mock.Anything).Return(nil)
env.OnActivity(DeleteFile, mock.Anything, mock.Anything).Return(nil)

env.ExecuteWorkflow(FileProcessingWorkflow)

require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
}

func Test_RetryFail(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

// Mock activity implementation
var a WorkerSpecificTaskQueue
env.RegisterActivityWithOptions(a.GetWorkerSpecificTaskQueue, activity.RegisterOptions{
Name: "GetWorkerSpecificTaskQueue",
})
env.OnActivity("GetWorkerSpecificTaskQueue", mock.Anything).Return(func(ctx context.Context) (string, error) {
return "", errors.New("error to show a retry mechanic failure")
})
env.OnActivity(DownloadFile, mock.Anything, mock.Anything, mock.Anything).Return(nil)
env.OnActivity(ProcessFile, mock.Anything, mock.Anything).Return(nil)
env.OnActivity(DeleteFile, mock.Anything, mock.Anything).Return(nil)

env.ExecuteWorkflow(FileProcessingWorkflow)

require.True(t, env.IsWorkflowCompleted())
require.Error(t, env.GetWorkflowError())
}

0 comments on commit 7a05457

Please sign in to comment.