Skip to content

Commit

Permalink
Merge branch 'uber-go:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
natemort committed Jul 2, 2024
2 parents 38cc19e + f309725 commit 275a5b8
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 9 deletions.
18 changes: 18 additions & 0 deletions .github/workflows/breaking_change_pr_template.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
**Detailed Description**
[In-depth description of the changes made to the interfaces, specifying new fields, removed fields, or modified data structures]

**Impact Analysis**
- **Backward Compatibility**: [Analysis of backward compatibility]
- **Forward Compatibility**: [Analysis of forward compatibility]

**Testing Plan**
- **Unit Tests**: [Do we have unit test covering the change?]
- **Persistence Tests**: [If the change is related to a data type which is persisted, do we have persistence tests covering the change?]
- **Integration Tests**: [Do we have integration test covering the change?]
- **Compatibility Tests**: [Have we done tests to test the backward and forward compatibility?]

**Rollout Plan**
- What is the rollout plan?
- Does the order of deployment matter?
- Is it safe to rollback? Does the order of rollback matter?
- Is there a kill switch to mitigate the impact immediately?
54 changes: 54 additions & 0 deletions .github/workflows/breaking_change_reminder.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
name: Workflow for Breaking Change Reminder
on:
pull_request:
paths:
# below files do not cover all the exposed types/funcs, but it's a good start to detect potentially breaking changes
- activity/activity.go
- client/client.go
- encoded/encoded.go
- interceptors/workflow_interceptor.go
- internal/activity.go
- internal/client.go
- internal/encoded.go
- internal/workflow.go
- internal/interceptors.go
- internal/worker.go
- internal/workflow.go
- worker/worker.go
- workflow/*.go

jobs:
breaking-change-pr-template-reminder:
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Fail if PR description is missing breaking change template
if: steps.pr-changes.outputs.changes != '[]'
run: |
PR_NUMBER=${{ github.event.pull_request.number }}
PR_URL="https://api.github.com/repos/${{ github.repository }}/pulls/${PR_NUMBER}"
BODY=$(curl $PR_URL | jq '.body')
CHECKLIST=(
"Detailed Description"
"Impact Analysis"
"Testing Plan"
"Rollout Plan"
)
TEMPLATE=$(cat .github/workflows/breaking_change_pr_template.md)
for i in "${CHECKLIST[@]}"; do
if [[ "$BODY" == *"$i"* ]]; then
continue
else
echo "Potential breaking changes detected! Please update the PR description to include following template:"
echo "---"
echo "$TEMPLATE"
echo "---"
exit 1
fi
done
1 change: 1 addition & 0 deletions internal/internal_logging_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
tagVisibilityQuery = "VisibilityQuery"
tagPanicError = "PanicError"
tagPanicStack = "PanicStack"
tagPollerType = "PollerType"
causeTag = "pollerrorcause"
tagWorkflowRuntimeLength = "workflowruntimelength"
tagNonDeterminismDetectionType = "NonDeterminismDetectionType"
Expand Down
17 changes: 8 additions & 9 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pborman/uuid"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
s "go.uber.org/cadence/.gen/go/shared"
Expand Down Expand Up @@ -248,30 +249,28 @@ func (bp *basePoller) doPoll(
var err error
var result interface{}

doneC := make(chan struct{})
ctx, cancel, _ := newChannelContext(context.Background(), featureFlags, chanTimeout(pollTaskServiceTimeOut))
defer cancel()

go func() {
defer close(doneC)
defer cancel()
defer func() {
if p := recover(); p != nil {
bp.metricsScope.Counter(metrics.InternalPanicCounter).Inc(1)
st := getStackTraceRaw("base poller [panic]:", 7, 0)
bp.logger.Error("Unhandled panic.",
bp.logger.Error("Unhandled panic",
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
zap.String(tagPanicStack, st))
err = newPanicError(p, st)
}
}()
defer cancel()
result, err = pollFunc(ctx)
}()

select {
case <-doneC:
case <-ctx.Done():
return result, err
case <-bp.shutdownC:
cancel()
return nil, errShutdown
}
}
Expand All @@ -288,7 +287,7 @@ func newWorkflowTaskPoller(
basePoller: basePoller{
shutdownC: params.WorkerStopChannel,
metricsScope: metrics.NewTaggedScope(params.MetricsScope),
logger: params.Logger,
logger: params.Logger.With(zapcore.Field{Key: tagPollerType, Type: zapcore.StringType, String: "Workflow"}),
},
service: service,
domain: domain,
Expand Down Expand Up @@ -542,7 +541,7 @@ func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localAct
basePoller: basePoller{
shutdownC: params.WorkerStopChannel,
metricsScope: metrics.NewTaggedScope(params.MetricsScope),
logger: params.Logger,
logger: params.Logger.With(zapcore.Field{Key: tagPollerType, Type: zapcore.StringType, String: "LocalActivity"}),
},
handler: handler,
laTunnel: laTunnel,
Expand Down Expand Up @@ -1002,7 +1001,7 @@ func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowserv
activityTaskPoller := &activityTaskPoller{
basePoller: basePoller{
shutdownC: params.WorkerStopChannel,
logger: params.Logger,
logger: params.Logger.With(zap.Field{Key: tagPollerType, Type: zapcore.StringType, String: "Activity"}),
metricsScope: metrics.NewTaggedScope(params.MetricsScope),
},
taskHandler: taskHandler,
Expand Down
27 changes: 27 additions & 0 deletions internal/internal_task_pollers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,33 @@ func TestActivityTaskPollerHandlesPanics(t *testing.T) {
assert.Equal(t, "oh no", panicErr.value)
}

func TestActivityTaskPollerHandlesCancel(t *testing.T) {
ctrl := gomock.NewController(t)
service := workflowservicetest.NewMockClient(ctrl)
workerStopChannel := make(chan struct{}, 1)
pollBlockingChannel := make(chan struct{}, 1)
defer close(pollBlockingChannel)
activityPoller := newActivityTaskPoller(nil, service, "test", workerExecutionParameters{
TaskList: "tasklist",

WorkerStopChannel: workerStopChannel,
WorkerOptions: WorkerOptions{
Identity: "identity",
Logger: zaptest.NewLogger(t),
},
})
service.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ *shared.PollForActivityTaskRequest, opts ...yarpc.CallOption) (*shared.PollForActivityTaskResponse, error) {
workerStopChannel <- struct{}{}
<-pollBlockingChannel
return nil, nil
})

result, err := activityPoller.PollTask()

assert.Nil(t, result)
assert.ErrorIs(t, err, errShutdown)
}

func TestWorkflowTaskPollerHandlesPanics(t *testing.T) {
ctrl := gomock.NewController(t)
service := workflowservicetest.NewMockClient(ctrl)
Expand Down

0 comments on commit 275a5b8

Please sign in to comment.