Skip to content

Commit

Permalink
Revert "history service should do event reordering making sure corres…
Browse files Browse the repository at this point in the history
…ponding … (#601)" (#611)

This reverts commit 6164b73.
  • Loading branch information
wxing1292 authored Mar 13, 2018
1 parent 1f1d16f commit dff860e
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 210 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions idl/github.com/uber/cadence/shared.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ enum TimeoutType {
HEARTBEAT,
}

// whenever this list of decision is changed
// do change the mutableStateBuilder.go
// function shouldBufferEvent
// to make sure wo do the correct event ordering
enum DecisionType {
ScheduleActivityTask,
RequestCancelActivityTask,
Expand Down
27 changes: 5 additions & 22 deletions service/history/historyBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ func (s *historyBuilderSuite) TestHistoryBuilderDynamicSuccess() {
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

activityStartedEvent := s.addActivityTaskStartedEvent(5, activityTaskList, identity)
s.validateActivityTaskStartedEvent(activityStartedEvent, bufferedEventID, 5, identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskStartedEvent(activityStartedEvent, 7, 5, identity)
s.Equal(int64(8), s.getNextEventID())
ai3, activity1Running1 := s.msBuilder.GetActivityInfo(5)
Expand All @@ -152,9 +150,6 @@ func (s *historyBuilderSuite) TestHistoryBuilderDynamicSuccess() {
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

activityCompletedEvent := s.addActivityTaskCompletedEvent(5, 7, activity1Result, identity)
s.validateActivityTaskCompletedEvent(activityCompletedEvent, bufferedEventID, 5, 7, activity1Result,
identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskCompletedEvent(activityCompletedEvent, 8, 5, 7, activity1Result,
identity)
s.Equal(int64(9), s.getNextEventID())
Expand All @@ -171,8 +166,6 @@ func (s *historyBuilderSuite) TestHistoryBuilderDynamicSuccess() {
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

activity2StartedEvent := s.addActivityTaskStartedEvent(6, activityTaskList, identity)
s.validateActivityTaskStartedEvent(activity2StartedEvent, bufferedEventID, 6, identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskStartedEvent(activity2StartedEvent, 10, 6, identity)
s.Equal(int64(11), s.getNextEventID())
ai4, activity2Running1 := s.msBuilder.GetActivityInfo(6)
Expand All @@ -182,9 +175,6 @@ func (s *historyBuilderSuite) TestHistoryBuilderDynamicSuccess() {

activity2FailedEvent := s.addActivityTaskFailedEvent(6, 10, activity2Reason, activity2Details,
identity)
s.validateActivityTaskFailedEvent(activity2FailedEvent, bufferedEventID, 6, 10, activity2Reason,
activity2Details, identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskFailedEvent(activity2FailedEvent, 11, 6, 10, activity2Reason,
activity2Details, identity)
s.Equal(int64(12), s.getNextEventID())
Expand Down Expand Up @@ -397,8 +387,6 @@ func (s *historyBuilderSuite) TestHistoryBuilderFlushBufferedEvents() {

// 7 activity1 started
activityStartedEvent := s.addActivityTaskStartedEvent(5, activityTaskList, identity)
s.validateActivityTaskStartedEvent(activityStartedEvent, bufferedEventID, 5, identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskStartedEvent(activityStartedEvent, 7, 5, identity)
s.Equal(int64(8), s.getNextEventID())
ai3, activity1Running1 := s.msBuilder.GetActivityInfo(5)
Expand All @@ -408,9 +396,8 @@ func (s *historyBuilderSuite) TestHistoryBuilderFlushBufferedEvents() {

// 8 activity1 completed
activityCompletedEvent := s.addActivityTaskCompletedEvent(5, 7, activity1Result, identity)
s.validateActivityTaskCompletedEvent(activityCompletedEvent, bufferedEventID, 5, 7, activity1Result, identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskCompletedEvent(activityCompletedEvent, 8, 5, 7, activity1Result, identity)
s.validateActivityTaskCompletedEvent(activityCompletedEvent, 8, 5, 7, activity1Result,
identity)
s.Equal(int64(9), s.getNextEventID())
_, activity1Running2 := s.msBuilder.GetActivityInfo(5)
s.False(activity1Running2)
Expand Down Expand Up @@ -545,9 +532,9 @@ func (s *historyBuilderSuite) TestHistoryBuilderWorkflowCancellationRequested()
cancellationRequestedEvent := s.addExternalWorkflowExecutionCancelRequested(
5, targetDomain, targetExecution.GetWorkflowId(), targetExecution.GetRunId(),
)
s.validateExternalWorkflowExecutionCancelRequested(cancellationRequestedEvent, bufferedEventID, 5, targetDomain, targetExecution)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateExternalWorkflowExecutionCancelRequested(cancellationRequestedEvent, 6, 5, targetDomain, targetExecution)
s.validateExternalWorkflowExecutionCancelRequested(
cancellationRequestedEvent, 6, 5, targetDomain, targetExecution,
)
s.Equal(int64(7), s.getNextEventID())
}

Expand Down Expand Up @@ -618,10 +605,6 @@ func (s *historyBuilderSuite) TestHistoryBuilderWorkflowCancellationFailed() {
cancellationRequestedEvent := s.addRequestCancelExternalWorkflowExecutionFailedEvent(
4, 5, targetDomain, targetExecution.GetWorkflowId(), targetExecution.GetRunId(), cancellationFailedCause,
)
s.validateRequestCancelExternalWorkflowExecutionFailedEvent(
cancellationRequestedEvent, bufferedEventID, 4, 5, targetDomain, targetExecution, cancellationFailedCause,
)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateRequestCancelExternalWorkflowExecutionFailedEvent(
cancellationRequestedEvent, 6, 4, 5, targetDomain, targetExecution, cancellationFailedCause,
)
Expand Down
1 change: 0 additions & 1 deletion service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3502,7 +3502,6 @@ func addCompleteWorkflowEvent(builder *mutableStateBuilder, decisionCompletedEve
}

func createMutableState(builder *mutableStateBuilder) *persistence.WorkflowMutableState {
builder.FlushBufferedEvents()
info := copyWorkflowExecutionInfo(builder.executionInfo)
activityInfos := make(map[int64]*persistence.ActivityInfo)
for id, info := range builder.pendingActivityInfoIDs {
Expand Down
50 changes: 11 additions & 39 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,54 +378,26 @@ func (e *mutableStateBuilder) createNewHistoryEvent(eventType workflow.EventType
}

func (e *mutableStateBuilder) shouldBufferEvent(eventType workflow.EventType) bool {
if !e.HasInFlightDecisionTask() {
// do not buffer event if there is no in-flight decision
return false
}

switch eventType {
case // do not buffer for workflow state change
workflow.EventTypeWorkflowExecutionStarted,
case workflow.EventTypeDecisionTaskCompleted,
workflow.EventTypeDecisionTaskFailed,
workflow.EventTypeDecisionTaskTimedOut,
workflow.EventTypeWorkflowExecutionCompleted,
workflow.EventTypeWorkflowExecutionFailed,
workflow.EventTypeWorkflowExecutionTimedOut,
workflow.EventTypeWorkflowExecutionTerminated,
workflow.EventTypeWorkflowExecutionContinuedAsNew,
workflow.EventTypeWorkflowExecutionCanceled:
// do not buffer event if it is any type of close decision or close workflow
return false
case // decision event should not be buffered
workflow.EventTypeDecisionTaskScheduled,
workflow.EventTypeDecisionTaskStarted,
workflow.EventTypeDecisionTaskCompleted,
workflow.EventTypeDecisionTaskFailed,
workflow.EventTypeDecisionTaskTimedOut:
return false
case // events generated directly from decisions should not be buffered
// workflow complete, failed, cancelled and continue-as-new events are duplication of above
// just put is here for reference
// workflow.EventTypeWorkflowExecutionCompleted,
// workflow.EventTypeWorkflowExecutionFailed,
// workflow.EventTypeWorkflowExecutionCanceled,
// workflow.EventTypeWorkflowExecutionContinuedAsNew,
workflow.EventTypeActivityTaskScheduled,
workflow.EventTypeActivityTaskCancelRequested,
workflow.EventTypeTimerStarted,
// DecisionTypeCancelTimer is an excption. This decision will be mapped
// to either workflow.EventTypeTimerCanceled, or workflow.EventTypeCancelTimerFailed.
// So both should not be buffered. Ref: historyEngine, search for "workflow.DecisionTypeCancelTimer"
workflow.EventTypeTimerCanceled,
workflow.EventTypeCancelTimerFailed,
workflow.EventTypeRequestCancelExternalWorkflowExecutionInitiated,
workflow.EventTypeMarkerRecorded,
workflow.EventTypeStartChildWorkflowExecutionInitiated,
workflow.EventTypeSignalExternalWorkflowExecutionInitiated:
// do not buffer event if event is directly generated from a corresponding decision

// sanity check there is no decision on the fly
if e.HasInFlightDecisionTask() {
msg := fmt.Sprintf("history mutable state is processing event: %v while there is decision pending. "+
"domainID: %v, workflow ID: %v, run ID: %v.", eventType, e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID)
panic(msg)
}
return false
default:
return true
}

return true
}

func (e *mutableStateBuilder) createNewHistoryEventWithTimestamp(eventID int64, eventType workflow.EventType,
Expand Down
140 changes: 0 additions & 140 deletions service/history/mutableStateBuilder_test.go

This file was deleted.

3 changes: 1 addition & 2 deletions service/history/timerQueueProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,13 @@ func (s *timerQueueProcessorSuite) addDecisionTimer(domainID string, we workflow
builder.Load(state)

di := addDecisionTaskScheduledEvent(builder)
startedEvent := addDecisionTaskStartedEvent(builder, di.ScheduleID, state.ExecutionInfo.TaskList, "identity")
addDecisionTaskStartedEvent(builder, di.ScheduleID, state.ExecutionInfo.TaskList, "identity")

timeOutTask := tb.AddDecisionTimoutTask(di.ScheduleID, di.Attempt, 1)
timerTasks := []persistence.Task{timeOutTask}

s.updateTimerSeqNumbers(timerTasks)

addDecisionTaskCompletedEvent(builder, di.ScheduleID, startedEvent.GetEventId(), nil, "identity")
err2 := s.UpdateWorkflowExecution(state.ExecutionInfo, nil, nil, condition, timerTasks, nil, nil, nil, nil, nil)
s.Nil(err2, "No error expected.")
return timerTasks
Expand Down

0 comments on commit dff860e

Please sign in to comment.