From 7c0e2ab766e03fbc3efc75fe03683eee7c634812 Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Wed, 16 Sep 2020 14:53:11 -0700 Subject: [PATCH 1/6] Fix getVersion override when added new version (#526) --- .../activity/LocalActivityOptions.java | 8 +- .../internal/replay/ClockDecisionContext.java | 19 +- .../internal/replay/DecisionsHelper.java | 4 +- .../internal/sync/SyncDecisionContext.java | 13 +- .../internal/worker/LocalActivityWorker.java | 6 +- .../LocalActivityContextPropagationTest.java | 3 +- .../uber/cadence/workflow/WorkflowTest.java | 72 +++++++ src/test/resources/testGetVersionHistory.json | 178 ++++++++++++++++++ 8 files changed, 289 insertions(+), 14 deletions(-) create mode 100644 src/test/resources/testGetVersionHistory.json diff --git a/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java b/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java index de13cc860..9534af9a2 100644 --- a/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java +++ b/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java @@ -101,7 +101,8 @@ public LocalActivityOptions validateAndBuildWithDefaults() { if (retryOptions != null) { ro = new RetryOptions.Builder(retryOptions).validateBuildWithDefaults(); } - return new LocalActivityOptions(roundUpToSeconds(scheduleToCloseTimeout), ro, contextPropagators); + return new LocalActivityOptions( + roundUpToSeconds(scheduleToCloseTimeout), ro, contextPropagators); } } @@ -109,7 +110,10 @@ public LocalActivityOptions validateAndBuildWithDefaults() { private final RetryOptions retryOptions; private final List contextPropagators; - private LocalActivityOptions(Duration scheduleToCloseTimeout, RetryOptions retryOptions, List contextPropagators) { + private LocalActivityOptions( + Duration scheduleToCloseTimeout, + RetryOptions retryOptions, + List contextPropagators) { this.scheduleToCloseTimeout = scheduleToCloseTimeout; this.retryOptions = retryOptions; this.contextPropagators = contextPropagators; diff --git a/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java b/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java index 118d95cc7..d24b99a1e 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java @@ -88,6 +88,7 @@ public void accept(Exception reason) { private final DataConverter dataConverter; private final Condition taskCondition; private boolean taskCompleted = false; + private final Map versionMap = new HashMap<>(); ClockDecisionContext( DecisionsHelper decisions, @@ -227,6 +228,8 @@ void handleMarkerRecorded(HistoryEvent event) { sideEffectResults.put(event.getEventId(), attributes.getDetails()); } else if (LOCAL_ACTIVITY_MARKER_NAME.equals(name)) { handleLocalActivityMarker(attributes); + } else if (VERSION_MARKER_NAME.equals(name)) { + handleVersionMarker(attributes); } else if (!MUTABLE_SIDE_EFFECT_MARKER_NAME.equals(name) && !VERSION_MARKER_NAME.equals(name)) { if (log.isWarnEnabled()) { log.warn("Unexpected marker: " + event); @@ -276,6 +279,14 @@ private void handleLocalActivityMarker(MarkerRecordedEventAttributes attributes) } } + private void handleVersionMarker(MarkerRecordedEventAttributes attributes) { + MarkerHandler.MarkerInterface markerData = + MarkerHandler.MarkerInterface.fromEventAttributes(attributes, dataConverter); + String versionID = markerData.getId(); + int version = dataConverter.fromData(attributes.getDetails(), Integer.class, Integer.class); + versionMap.put(versionID, version); + } + int getVersion(String changeId, DataConverter converter, int minSupported, int maxSupported) { Predicate changeIdEquals = (attributes) -> { @@ -285,6 +296,12 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m }; decisions.addAllMissingVersionMarker(true, Optional.of(changeIdEquals)); + Integer version = versionMap.get(changeId); + if (version != null) { + validateVersion(changeId, version, minSupported, maxSupported); + return version; + } + Optional result = versionHandler.handle( changeId, @@ -299,7 +316,7 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m if (!result.isPresent()) { return WorkflowInternal.DEFAULT_VERSION; } - int version = converter.fromData(result.get(), Integer.class, Integer.class); + version = converter.fromData(result.get(), Integer.class, Integer.class); validateVersion(changeId, version, minSupported, maxSupported); return version; } diff --git a/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java b/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java index 6ed08a08f..b02a47b05 100644 --- a/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java +++ b/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java @@ -664,10 +664,10 @@ private void addDecision(DecisionId decisionId, DecisionStateMachine decision) { // is removed in replay. void addAllMissingVersionMarker( boolean isNextDecisionVersionMarker, - Optional> isDifferentChange) { + Optional> changeIdEquals) { boolean added; do { - added = addMissingVersionMarker(isNextDecisionVersionMarker, isDifferentChange); + added = addMissingVersionMarker(isNextDecisionVersionMarker, changeIdEquals); } while (added); } diff --git a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java index 5cf6d7b01..23955ce69 100644 --- a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java @@ -323,10 +323,11 @@ private ExecuteActivityParameters constructExecuteActivityParameters( private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters( String name, LocalActivityOptions options, byte[] input, long elapsed, int attempt) { - ExecuteLocalActivityParameters parameters = new ExecuteLocalActivityParameters() - .withActivityType(new ActivityType().setName(name)) - .withInput(input) - .withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds()); + ExecuteLocalActivityParameters parameters = + new ExecuteLocalActivityParameters() + .withActivityType(new ActivityType().setName(name)) + .withInput(input) + .withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds()); RetryOptions retryOptions = options.getRetryOptions(); if (retryOptions != null) { @@ -337,8 +338,8 @@ private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters( parameters.setWorkflowDomain(this.context.getDomain()); parameters.setWorkflowExecution(this.context.getWorkflowExecution()); - List propagators = Optional.ofNullable(options.getContextPropagators()) - .orElse(contextPropagators); + List propagators = + Optional.ofNullable(options.getContextPropagators()).orElse(contextPropagators); parameters.setContext(extractContextsAndConvertToBytes(propagators)); return parameters; diff --git a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java index 85f60ccb9..de6ce8da3 100644 --- a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java @@ -271,7 +271,9 @@ private void propagateContext(ExecuteLocalActivityParameters params) { } private void restoreContext(Map context) { - options.getContextPropagators() - .forEach(propagator -> propagator.setCurrentContext(propagator.deserializeContext(context))); + options + .getContextPropagators() + .forEach( + propagator -> propagator.setCurrentContext(propagator.deserializeContext(context))); } } diff --git a/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java b/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java index 12c3197c6..62fa3723c 100644 --- a/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java +++ b/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java @@ -59,7 +59,8 @@ public class LocalActivityContextPropagationTest { private final WrapperContext wrapperContext = new WrapperContext(EXPECTED_CONTEXT_NAME); - //let's add safe TestWorkflowEnvironment closing and make configurable propagation enabling/disabling + // let's add safe TestWorkflowEnvironment closing and make configurable propagation + // enabling/disabling private class TestEnvAutoCloseable implements AutoCloseable { private TestWorkflowEnvironment testEnv; diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index b52af98d8..c426d0d42 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -4427,6 +4427,78 @@ public void testVersionNotSupported() { } } + public static class TestGetVersionAddedImpl implements TestWorkflow1 { + + @Override + public String execute(String taskList) { + + int versionNew = Workflow.getVersion("cid2", Workflow.DEFAULT_VERSION, 1); + assertEquals(-1, versionNew); + int version = Workflow.getVersion("cid1", Workflow.DEFAULT_VERSION, 1); + assertEquals(1, version); + + TestActivities testActivities = + Workflow.newActivityStub(TestActivities.class, newActivityOptions1(taskList)); + return "hello" + testActivities.activity1(1); + } + } + + @Test + public void testGetVersionAdded() { + try { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionHistory.json", TestGetVersionAddedImpl.class); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + public static class TestGetVersionRemovedImpl implements TestWorkflow1 { + + @Override + public String execute(String taskList) { + // history contains cid1, but later getVersion is removed + TestActivities testActivities = + Workflow.newActivityStub(TestActivities.class, newActivityOptions1(taskList)); + return "hello" + testActivities.activity1(1); + } + } + + @Test + public void testGetVersionRemoved() { + try { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionHistory.json", TestGetVersionRemovedImpl.class); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + public static class TestGetVersionRemoveAndAddImpl implements TestWorkflow1 { + + @Override + public String execute(String taskList) { + int version = Workflow.getVersion("cid2", Workflow.DEFAULT_VERSION, 1); + assertEquals(-1, version); + TestActivities testActivities = + Workflow.newActivityStub(TestActivities.class, newActivityOptions1(taskList)); + return "hello" + testActivities.activity1(1); + } + } + + @Test + public void testGetVersionRemoveAndAdd() { + try { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionHistory.json", TestGetVersionRemoveAndAddImpl.class); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + public interface DeterminismFailingWorkflow { @WorkflowMethod diff --git a/src/test/resources/testGetVersionHistory.json b/src/test/resources/testGetVersionHistory.json new file mode 100644 index 000000000..650831ec7 --- /dev/null +++ b/src/test/resources/testGetVersionHistory.json @@ -0,0 +1,178 @@ +[ + { + "eventId":1, + "timestamp":1599846349049225000, + "eventType":"WorkflowExecutionStarted", + "version":-24, + "taskId":11534336, + "workflowExecutionStartedEventAttributes":{ + "workflowType":{ + "name":"TestWorkflow1::execute" + }, + "taskList":{ + "name":"WorkflowTest-testGetVersionAdded[Docker Sticky OFF]-bf24970f-3800-427a-8e52-2a5c2d8dc08e" + }, + "input":"IldvcmtmbG93VGVzdC10ZXN0R2V0VmVyc2lvbkFkZGVkW0RvY2tlciBTdGlja3kgT0ZGXS1iZjI0OTcwZi0zODAwLTQyN2EtOGU1Mi0yYTVjMmQ4ZGMwOGUi", + "executionStartToCloseTimeoutSeconds":30, + "taskStartToCloseTimeoutSeconds":5, + "originalExecutionRunId":"249740d3-8d3c-4660-9c3c-cd61f9136db3", + "identity":"", + "firstExecutionRunId":"249740d3-8d3c-4660-9c3c-cd61f9136db3", + "attempt":0, + "firstDecisionTaskBackoffSeconds":0 + } + }, + { + "eventId":2, + "timestamp":1599846349049238000, + "eventType":"DecisionTaskScheduled", + "version":-24, + "taskId":11534337, + "decisionTaskScheduledEventAttributes":{ + "taskList":{ + "name":"WorkflowTest-testGetVersionAdded[Docker Sticky OFF]-bf24970f-3800-427a-8e52-2a5c2d8dc08e" + }, + "startToCloseTimeoutSeconds":5, + "attempt":0 + } + }, + { + "eventId":3, + "timestamp":1599846349090195000, + "eventType":"DecisionTaskStarted", + "version":-24, + "taskId":11534342, + "decisionTaskStartedEventAttributes":{ + "scheduledEventId":2, + "identity":"6275@boweixu-C02V61JZHTDG", + "requestId":"d85c4495-9902-4db1-a0d6-7a153ecf9278" + } + }, + { + "eventId":4, + "timestamp":1599846349263595000, + "eventType":"DecisionTaskCompleted", + "version":-24, + "taskId":11534345, + "decisionTaskCompletedEventAttributes":{ + "scheduledEventId":2, + "startedEventId":3, + "identity":"6275@boweixu-C02V61JZHTDG" + } + }, + { + "eventId":5, + "timestamp":1599846349263629000, + "eventType":"MarkerRecorded", + "version":-24, + "taskId":11534346, + "markerRecordedEventAttributes":{ + "markerName":"Version", + "details":"MQ==", + "decisionTaskCompletedEventId":4, + "header":{ + "fields":{ + "MutableMarkerHeader":"eyJpZCI6ImNpZDEiLCJldmVudElkIjo1LCJhY2Nlc3NDb3VudCI6MH0=" + } + } + } + }, + { + "eventId":6, + "timestamp":1599846349263639000, + "eventType":"ActivityTaskScheduled", + "version":-24, + "taskId":11534347, + "activityTaskScheduledEventAttributes":{ + "activityId":"0", + "activityType":{ + "name":"customActivity1" + }, + "taskList":{ + "name":"WorkflowTest-testGetVersionAdded[Docker Sticky OFF]-bf24970f-3800-427a-8e52-2a5c2d8dc08e" + }, + "input":"MQ==", + "scheduleToCloseTimeoutSeconds":5, + "scheduleToStartTimeoutSeconds":5, + "startToCloseTimeoutSeconds":10, + "heartbeatTimeoutSeconds":5, + "decisionTaskCompletedEventId":4 + } + }, + { + "eventId":7, + "timestamp":1599846349269715000, + "eventType":"ActivityTaskStarted", + "version":-24, + "taskId":11534351, + "activityTaskStartedEventAttributes":{ + "scheduledEventId":6, + "identity":"6275@boweixu-C02V61JZHTDG", + "requestId":"2e479cb9-9b47-4765-953c-4e219f6b828a", + "attempt":0, + "lastFailureReason":"" + } + }, + { + "eventId":8, + "timestamp":1599846349292416000, + "eventType":"ActivityTaskCompleted", + "version":-24, + "taskId":11534354, + "activityTaskCompletedEventAttributes":{ + "result":"MQ==", + "scheduledEventId":6, + "startedEventId":7, + "identity":"6275@boweixu-C02V61JZHTDG" + } + }, + { + "eventId":9, + "timestamp":1599846349292422000, + "eventType":"DecisionTaskScheduled", + "version":-24, + "taskId":11534356, + "decisionTaskScheduledEventAttributes":{ + "taskList":{ + "name":"WorkflowTest-testGetVersionAdded[Docker Sticky OFF]-bf24970f-3800-427a-8e52-2a5c2d8dc08e" + }, + "startToCloseTimeoutSeconds":5, + "attempt":0 + } + }, + { + "eventId":10, + "timestamp":1599846349295720000, + "eventType":"DecisionTaskStarted", + "version":-24, + "taskId":11534359, + "decisionTaskStartedEventAttributes":{ + "scheduledEventId":9, + "identity":"6275@boweixu-C02V61JZHTDG", + "requestId":"b275fd8b-4fd3-4201-9bc7-428c9e2c5ef7" + } + }, + { + "eventId":11, + "timestamp":1599846349316011000, + "eventType":"DecisionTaskCompleted", + "version":-24, + "taskId":11534362, + "decisionTaskCompletedEventAttributes":{ + "scheduledEventId":9, + "startedEventId":10, + "identity":"6275@boweixu-C02V61JZHTDG" + } + }, + { + "eventId":12, + "timestamp":1599846349316036000, + "eventType":"WorkflowExecutionCompleted", + "version":-24, + "taskId":11534363, + "workflowExecutionCompletedEventAttributes":{ + "result":"ImhlbGxvMSI=", + "decisionTaskCompletedEventId":11 + } + } +] \ No newline at end of file From d98d33ff436ea40235cc4922d47c52369ab5ee7b Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Fri, 18 Sep 2020 15:19:03 -0700 Subject: [PATCH 2/6] Add async signal to untypedstub (#527) --- .../com/uber/cadence/client/WorkflowStub.java | 5 ++ .../GenericWorkflowClientExternal.java | 6 ++ .../GenericWorkflowClientExternalImpl.java | 58 ++++++++++++-- .../sync/TestActivityEnvironmentInternal.java | 9 +++ .../sync/TestWorkflowEnvironmentInternal.java | 20 +++++ .../internal/sync/WorkflowStubImpl.java | 28 +++++-- .../testservice/TestWorkflowService.java | 19 ++++- .../serviceclient/IWorkflowService.java | 15 ++++ .../WorkflowServiceTChannel.java | 79 ++++++++++++++++++- .../uber/cadence/workflow/WorkflowTest.java | 75 ++++++++++++++++++ 10 files changed, 297 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/uber/cadence/client/WorkflowStub.java b/src/main/java/com/uber/cadence/client/WorkflowStub.java index 207dd8d27..73448c8e6 100644 --- a/src/main/java/com/uber/cadence/client/WorkflowStub.java +++ b/src/main/java/com/uber/cadence/client/WorkflowStub.java @@ -65,6 +65,11 @@ static WorkflowStub fromTyped(T typed) { void signal(String signalName, Object... args); + CompletableFuture signalAsync(String signalName, Object... args); + + CompletableFuture signalAsyncWithTimeout( + long timeout, TimeUnit unit, String signalName, Object... args); + WorkflowExecution start(Object... args); CompletableFuture startAsync(Object... args); diff --git a/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternal.java b/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternal.java index 9c2dd74bc..836923f51 100644 --- a/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternal.java +++ b/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternal.java @@ -41,6 +41,12 @@ CompletableFuture startWorkflowAsync( void signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters); + CompletableFuture signalWorkflowExecutionAsync( + SignalExternalWorkflowParameters signalParameters); + + CompletableFuture signalWorkflowExecutionAsync( + SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis); + WorkflowExecution signalWithStartWorkflowExecution( SignalWithStartWorkflowExecutionParameters parameters); diff --git a/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java b/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java index 3733798cd..6d86d186a 100644 --- a/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java +++ b/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java @@ -269,15 +269,8 @@ private RetryPolicy toRetryPolicy(RetryParameters retryParameters) { @Override public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters) { - SignalWorkflowExecutionRequest request = new SignalWorkflowExecutionRequest(); - request.setDomain(domain); + SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters); - request.setInput(signalParameters.getInput()); - request.setSignalName(signalParameters.getSignalName()); - WorkflowExecution execution = new WorkflowExecution(); - execution.setRunId(signalParameters.getRunId()); - execution.setWorkflowId(signalParameters.getWorkflowId()); - request.setWorkflowExecution(execution); try { Retryer.retry( Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, @@ -287,6 +280,55 @@ public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParam } } + @Override + public CompletableFuture signalWorkflowExecutionAsync( + SignalExternalWorkflowParameters signalParameters) { + return signalWorkflowExecutionAsync(signalParameters, Long.MAX_VALUE); + } + + @Override + public CompletableFuture signalWorkflowExecutionAsync( + SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis) { + SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters); + return Retryer.retryWithResultAsync( + getRetryOptionsWithExpiration( + Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, timeoutInMillis), + () -> { + CompletableFuture result = new CompletableFuture<>(); + try { + service.SignalWorkflowExecution( + request, + new AsyncMethodCallback() { + @Override + public void onComplete(Object response) { + result.complete(null); + } + + @Override + public void onError(Exception exception) { + result.completeExceptionally(exception); + } + }); + } catch (TException e) { + result.completeExceptionally(e); + } + return result; + }); + } + + private SignalWorkflowExecutionRequest getSignalRequest( + SignalExternalWorkflowParameters signalParameters) { + SignalWorkflowExecutionRequest request = new SignalWorkflowExecutionRequest(); + request.setDomain(domain); + request.setInput(signalParameters.getInput()); + request.setSignalName(signalParameters.getSignalName()); + WorkflowExecution execution = new WorkflowExecution(); + execution.setRunId(signalParameters.getRunId()); + execution.setWorkflowId(signalParameters.getWorkflowId()); + request.setWorkflowExecution(execution); + return request; + } + @Override public WorkflowExecution signalWithStartWorkflowExecution( SignalWithStartWorkflowExecutionParameters parameters) { diff --git a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java index e2bb09f10..33479e6b8 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java @@ -699,6 +699,15 @@ public void SignalWorkflowExecution( impl.SignalWorkflowExecution(signalRequest, resultHandler); } + @Override + public void SignalWorkflowExecutionWithTimeout( + SignalWorkflowExecutionRequest signalRequest, + AsyncMethodCallback resultHandler, + Long timeoutInMillis) + throws TException { + impl.SignalWorkflowExecutionWithTimeout(signalRequest, resultHandler, timeoutInMillis); + } + @Override public void SignalWithStartWorkflowExecution( SignalWithStartWorkflowExecutionRequest signalWithStartRequest, diff --git a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java index 04b1b6b7c..32e1a9504 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java @@ -550,6 +550,15 @@ public void SignalWorkflowExecution( impl.SignalWorkflowExecution(signalRequest, resultHandler); } + @Override + public void SignalWorkflowExecutionWithTimeout( + SignalWorkflowExecutionRequest signalRequest, + AsyncMethodCallback resultHandler, + Long timeoutInMillis) + throws TException { + impl.SignalWorkflowExecutionWithTimeout(signalRequest, resultHandler, timeoutInMillis); + } + @Override public void SignalWithStartWorkflowExecution( SignalWithStartWorkflowExecutionRequest signalWithStartRequest, @@ -806,6 +815,17 @@ public void signal(String signalName, Object... args) { next.signal(signalName, args); } + @Override + public CompletableFuture signalAsync(String signalName, Object... args) { + return next.signalAsync(signalName, args); + } + + @Override + public CompletableFuture signalAsyncWithTimeout( + long timeout, TimeUnit unit, String signalName, Object... args) { + return next.signalAsyncWithTimeout(timeout, unit, signalName, args); + } + @Override public WorkflowExecution start(Object... args) { return next.start(args); diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java index c7bc92046..c2788d833 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java @@ -97,6 +97,28 @@ class WorkflowStubImpl implements WorkflowStub { @Override public void signal(String signalName, Object... input) { + SignalExternalWorkflowParameters p = getSignalExternalWorkflowParameters(signalName, input); + try { + genericClient.signalWorkflowExecution(p); + } catch (Exception e) { + throw new WorkflowServiceException(execution.get(), workflowType, e); + } + } + + @Override + public CompletableFuture signalAsync(String signalName, Object... input) { + return signalAsyncWithTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS, signalName, input); + } + + @Override + public CompletableFuture signalAsyncWithTimeout( + long timeout, TimeUnit unit, String signalName, Object... input) { + SignalExternalWorkflowParameters p = getSignalExternalWorkflowParameters(signalName, input); + return genericClient.signalWorkflowExecutionAsync(p, unit.toMillis(timeout)); + } + + private SignalExternalWorkflowParameters getSignalExternalWorkflowParameters( + String signalName, Object... input) { checkStarted(); SignalExternalWorkflowParameters p = new SignalExternalWorkflowParameters(); p.setInput(dataConverter.toData(input)); @@ -105,11 +127,7 @@ public void signal(String signalName, Object... input) { // TODO: Deal with signaling started workflow only, when requested // Commented out to support signaling workflows that called continue as new. // p.setRunId(execution.getRunId()); - try { - genericClient.signalWorkflowExecution(p); - } catch (Exception e) { - throw new WorkflowServiceException(execution.get(), workflowType, e); - } + return p; } private WorkflowExecution startWithOptions(WorkflowOptions o, Object... args) { diff --git a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java index 6ba5044bd..f73fa3c6f 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java +++ b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java @@ -936,7 +936,24 @@ public void RequestCancelWorkflowExecution( public void SignalWorkflowExecution( SignalWorkflowExecutionRequest signalRequest, AsyncMethodCallback resultHandler) throws TException { - throw new UnsupportedOperationException("not implemented"); + SignalWorkflowExecutionWithTimeout(signalRequest, resultHandler, null); + } + + @Override + public void SignalWorkflowExecutionWithTimeout( + SignalWorkflowExecutionRequest signalRequest, + AsyncMethodCallback resultHandler, + Long timeoutInMillis) + throws TException { + forkJoinPool.execute( + () -> { + try { + SignalWorkflowExecution(signalRequest); + resultHandler.onComplete(null); + } catch (TException e) { + resultHandler.onError(e); + } + }); } @Override diff --git a/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java b/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java index 2a78ac7fe..d275ac8f5 100644 --- a/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java +++ b/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java @@ -19,6 +19,7 @@ import com.uber.cadence.GetWorkflowExecutionHistoryRequest; import com.uber.cadence.GetWorkflowExecutionHistoryResponse; +import com.uber.cadence.SignalWorkflowExecutionRequest; import com.uber.cadence.StartWorkflowExecutionRequest; import com.uber.cadence.WorkflowService.AsyncIface; import com.uber.cadence.WorkflowService.Iface; @@ -69,4 +70,18 @@ void GetWorkflowExecutionHistoryWithTimeout( AsyncMethodCallback resultHandler, Long timeoutInMillis) throws TException; + /** + * SignalWorkflowExecutionWithTimeout signal workflow same as SignalWorkflowExecution but with + * timeout + * + * @param signalRequest + * @param resultHandler + * @param timeoutInMillis + * @throws TException + */ + void SignalWorkflowExecutionWithTimeout( + SignalWorkflowExecutionRequest signalRequest, + AsyncMethodCallback resultHandler, + Long timeoutInMillis) + throws TException; } diff --git a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java index fa932b449..28d8b5439 100644 --- a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java +++ b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java @@ -2551,9 +2551,82 @@ public void RequestCancelWorkflowExecution( @Override public void SignalWorkflowExecution( - SignalWorkflowExecutionRequest signalRequest, AsyncMethodCallback resultHandler) - throws TException { - throw new UnsupportedOperationException("not implemented"); + SignalWorkflowExecutionRequest signalRequest, AsyncMethodCallback resultHandler) { + signalWorkflowExecution(signalRequest, resultHandler, null); + } + + @Override + public void SignalWorkflowExecutionWithTimeout( + SignalWorkflowExecutionRequest signalRequest, + AsyncMethodCallback resultHandler, + Long timeoutInMillis) { + signalWorkflowExecution(signalRequest, resultHandler, timeoutInMillis); + } + + private void signalWorkflowExecution( + SignalWorkflowExecutionRequest signalRequest, + AsyncMethodCallback resultHandler, + Long timeoutInMillis) { + + timeoutInMillis = validateAndUpdateTimeout(timeoutInMillis, options.getRpcTimeoutMillis()); + ThriftRequest request = + buildThriftRequest( + "SignalWorkflowExecution", + new WorkflowService.SignalWorkflowExecution_args(signalRequest), + timeoutInMillis); + CompletableFuture> response = + doRemoteCallAsync(request); + response + .whenComplete( + (r, e) -> { + try { + if (e != null) { + resultHandler.onError(CheckedExceptionWrapper.wrap(e)); + return; + } + WorkflowService.SignalWorkflowExecution_result result = + r.getBody(WorkflowService.SignalWorkflowExecution_result.class); + if (r.getResponseCode() == ResponseCode.OK) { + resultHandler.onComplete(null); + return; + } + if (result.isSetBadRequestError()) { + resultHandler.onError(result.getBadRequestError()); + return; + } + if (result.isSetEntityNotExistError()) { + resultHandler.onError(result.getEntityNotExistError()); + return; + } + if (result.isSetServiceBusyError()) { + resultHandler.onError(result.getServiceBusyError()); + return; + } + if (result.isSetDomainNotActiveError()) { + resultHandler.onError(result.getDomainNotActiveError()); + return; + } + if (result.isSetLimitExceededError()) { + resultHandler.onError(result.getLimitExceededError()); + return; + } + if (result.isSetClientVersionNotSupportedError()) { + resultHandler.onError(result.getClientVersionNotSupportedError()); + return; + } + resultHandler.onError( + new TException("SignalWorkflowExecution failed with unknown error:" + result)); + } finally { + if (r != null) { + r.release(); + } + } + }) + .exceptionally( + (e) -> { + log.error("Unexpected error in SignalWorkflowExecution", e); + return null; + }); } @Override diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index c426d0d42..2b76badc2 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.base.Strings; import com.google.common.util.concurrent.UncheckedExecutionException; import com.uber.cadence.GetWorkflowExecutionHistoryResponse; import com.uber.cadence.HistoryEvent; @@ -3224,6 +3225,80 @@ public void testSignalExternalWorkflowImmediateCancellation() { } } + public static class TestSignalWorkflowAsync implements TestWorkflowSignaled { + private String message; + + @Override + public String execute() { + Workflow.await(() -> !Strings.isNullOrEmpty(message)); + return message; + } + + @Override + public void signal1(String arg) { + message = arg; + } + } + + @Test + public void testSignalWorkflowAsync() throws Exception { + startWorkerFor(TestSignalWorkflowAsync.class); + + WorkflowStub workflowStub = + workflowClient.newUntypedWorkflowStub( + "TestWorkflowSignaled::execute", newWorkflowOptionsBuilder(taskList).build()); + CompletableFuture future = workflowStub.startAsync(taskList); + future.get(); + + String testSignalInput = "hello"; + CompletableFuture resultFuture = + workflowStub + .signalAsync("testSignal", testSignalInput) + .thenCompose( + v -> { + return workflowStub.getResultAsync(String.class); + }); + assertEquals(testSignalInput, resultFuture.get()); + } + + @Test + public void testSignalWorkflowAsyncWithTimeout() throws Exception { + startWorkerFor(TestSignalWorkflowAsync.class); + + WorkflowStub workflowStub = + workflowClient.newUntypedWorkflowStub( + "TestWorkflowSignaled::execute", newWorkflowOptionsBuilder(taskList).build()); + CompletableFuture future = workflowStub.startAsync(taskList); + future.get(); + + Long timeout = new Long(200); + String testSignalInput = "hello"; + CompletableFuture resultFuture = + workflowStub + .signalAsyncWithTimeout(timeout, TimeUnit.MILLISECONDS, "testSignal", testSignalInput) + .thenCompose( + v -> { + return workflowStub.getResultAsync(String.class); + }); + assertEquals(testSignalInput, resultFuture.get()); + } + + @Test + public void testSignalWorkflowAsyncFailed() throws Exception { + startWorkerFor(TestSignalWorkflowAsync.class); + + WorkflowStub workflowStub = + workflowClient.newUntypedWorkflowStub( + "TestWorkflowSignaled::execute", newWorkflowOptionsBuilder(taskList).build()); + String testSignalInput = "hello"; + try { + workflowStub.signalAsync("testSignal", testSignalInput).get(); + fail("unreachable"); + } catch (IllegalStateException e) { + // expected exception, workflow should be started before signal + } + } + public static class TestChildWorkflowAsyncRetryWorkflow implements TestWorkflow1 { private ITestChild child; From 0dbd1bcb7e0f49472744f744cfc398fc34323572 Mon Sep 17 00:00:00 2001 From: Shinnosuke Okada <43149041+sokada1221@users.noreply.github.com> Date: Fri, 18 Sep 2020 18:29:43 -0400 Subject: [PATCH 3/6] Fix RetryOptions.addDoNotRetry (#520) * Remove unnecessary validations in RetryOptions.addDoNotRetry * Add basic test on RetryOptions.addDoNotRetry Co-authored-by: Liang Mei --- .../com/uber/cadence/common/RetryOptions.java | 17 ++---------- .../cadence/internal/common/RetryerTest.java | 26 +++++++++++++++++++ 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/uber/cadence/common/RetryOptions.java b/src/main/java/com/uber/cadence/common/RetryOptions.java index 010b1301b..b100a5e35 100644 --- a/src/main/java/com/uber/cadence/common/RetryOptions.java +++ b/src/main/java/com/uber/cadence/common/RetryOptions.java @@ -92,22 +92,9 @@ public final RetryOptions addDoNotRetry(Class... doNotRetry return this; } - double backoffCoefficient = getBackoffCoefficient(); - if (backoffCoefficient == 0) { - backoffCoefficient = DEFAULT_BACKOFF_COEFFICIENT; - } - - RetryOptions.Builder builder = - new RetryOptions.Builder() - .setInitialInterval(getInitialInterval()) - .setExpiration(getExpiration()) - .setMaximumInterval(getMaximumInterval()) - .setBackoffCoefficient(backoffCoefficient) - .setDoNotRetry(merge(getDoNotRetry(), Arrays.asList(doNotRetry))); + RetryOptions.Builder builder = new RetryOptions.Builder(this); + builder.setDoNotRetry(merge(getDoNotRetry(), Arrays.asList(doNotRetry))); - if (getMaximumAttempts() > 0) { - builder.setMaximumAttempts(getMaximumAttempts()); - } return builder.validateBuildWithDefaults(); } diff --git a/src/test/java/com/uber/cadence/internal/common/RetryerTest.java b/src/test/java/com/uber/cadence/internal/common/RetryerTest.java index df0eacc67..1cbf543fe 100644 --- a/src/test/java/com/uber/cadence/internal/common/RetryerTest.java +++ b/src/test/java/com/uber/cadence/internal/common/RetryerTest.java @@ -107,6 +107,32 @@ public void testInterruptedException() throws InterruptedException { assertTrue(System.currentTimeMillis() - start < 100000); } + @Test + public void testAddDoNotRetry() throws InterruptedException { + RetryOptions options = + new RetryOptions.Builder() + .setInitialInterval(Duration.ofMillis(10)) + .setExpiration(Duration.ofSeconds(100)) + .validateBuildWithDefaults(); + options = options.addDoNotRetry(InterruptedException.class); + long start = System.currentTimeMillis(); + try { + Retryer.retryWithResultAsync( + options, + () -> { + CompletableFuture result = new CompletableFuture<>(); + result.completeExceptionally(new InterruptedException("simulated")); + return result; + }) + .get(); + fail("unreachable"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof InterruptedException); + assertEquals("simulated", e.getCause().getMessage()); + } + assertTrue(System.currentTimeMillis() - start < 100000); + } + @Test public void testMaxAttempt() throws InterruptedException { RetryOptions options = From 47f7ecf102701e6068631398e250a9caa495a335 Mon Sep 17 00:00:00 2001 From: Aleksei Izmalkin <42375035+aai95@users.noreply.github.com> Date: Sat, 19 Sep 2020 03:17:31 +0400 Subject: [PATCH 4/6] Add missing metrics from go client (#379) (#518) Co-authored-by: Liang Mei --- .../uber/cadence/internal/metrics/MetricsType.java | 4 ++++ .../cadence/internal/replay/DecisionsHelper.java | 13 ++++++++++++- .../uber/cadence/internal/replay/ReplayDecider.java | 4 +++- .../internal/replay/ReplayDecisionTaskHandler.java | 2 +- .../internal/replay/ReplayDeciderCacheTests.java | 2 +- 5 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/uber/cadence/internal/metrics/MetricsType.java b/src/main/java/com/uber/cadence/internal/metrics/MetricsType.java index be030c72c..0c671b312 100644 --- a/src/main/java/com/uber/cadence/internal/metrics/MetricsType.java +++ b/src/main/java/com/uber/cadence/internal/metrics/MetricsType.java @@ -67,6 +67,8 @@ public class MetricsType { CADENCE_METRICS_PREFIX + "decision-task-error"; public static final String DECISION_TASK_COMPLETED_COUNTER = CADENCE_METRICS_PREFIX + "decision-task-completed"; + public static final String DECISION_TASK_FORCE_COMPLETED = + CADENCE_METRICS_PREFIX + "decision-task-force-completed"; public static final String ACTIVITY_POLL_COUNTER = CADENCE_METRICS_PREFIX + "activity-poll-total"; public static final String ACTIVITY_POLL_FAILED_COUNTER = @@ -145,4 +147,6 @@ public class MetricsType { public static final String STICKY_CACHE_SIZE = CADENCE_METRICS_PREFIX + "sticky-cache-size"; public static final String WORKFLOW_ACTIVE_THREAD_COUNT = CADENCE_METRICS_PREFIX + "workflow_active_thread_count"; + + public static final String NON_DETERMINISTIC_ERROR = CADENCE_METRICS_PREFIX + "non-deterministic-error"; } diff --git a/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java b/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java index b02a47b05..547f9fbe0 100644 --- a/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java +++ b/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java @@ -57,8 +57,13 @@ import com.uber.cadence.WorkflowExecutionStartedEventAttributes; import com.uber.cadence.WorkflowType; import com.uber.cadence.internal.common.WorkflowExecutionUtils; +import com.uber.cadence.internal.metrics.MetricsTag; +import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.internal.replay.HistoryHelper.DecisionEvents; +import com.uber.cadence.internal.worker.SingleWorkerOptions; import com.uber.cadence.internal.worker.WorkflowExecutionException; +import com.uber.m3.tally.Scope; +import com.uber.m3.util.ImmutableMap; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -86,6 +91,7 @@ class DecisionsHelper { + "change in the workflow definition."; private final PollForDecisionTaskResponse task; + private final SingleWorkerOptions options; /** * When workflow task completes the decisions are converted to events that follow the decision @@ -105,8 +111,9 @@ class DecisionsHelper { // TODO: removal of completed activities private final Map activityIdToScheduledEventId = new HashMap<>(); - DecisionsHelper(PollForDecisionTaskResponse task) { + DecisionsHelper(PollForDecisionTaskResponse task, SingleWorkerOptions options) { this.task = task; + this.options = options; } long getNextDecisionEventId() { @@ -718,6 +725,10 @@ private boolean addMissingVersionMarker( private DecisionStateMachine getDecision(DecisionId decisionId) { DecisionStateMachine result = decisions.get(decisionId); if (result == null) { + Scope metricsScope = options + .getMetricsScope() + .tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, task.getWorkflowType().getName())); + metricsScope.counter(MetricsType.NON_DETERMINISTIC_ERROR).inc(1); throw new NonDeterminisicWorkflowError( "Unknown " + decisionId + ". " + NON_DETERMINISTIC_MESSAGE); } diff --git a/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java b/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java index b5f81c165..1f4bb21ed 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java +++ b/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java @@ -454,7 +454,9 @@ private boolean decideImpl(PollForDecisionTaskResponse decisionTask, Functions.P // Reset state to before running the event loop decisionsHelper.handleDecisionTaskStartedEvent(decision); } - + if (forceCreateNewDecisionTask) { + metricsScope.counter(MetricsType.DECISION_TASK_FORCE_COMPLETED).inc(1); + } return forceCreateNewDecisionTask; } catch (Error e) { if (this.workflow.getWorkflowImplementationOptions().getNonDeterministicWorkflowPolicy() diff --git a/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java b/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java index a26f3e21f..d06a0c01b 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java @@ -276,7 +276,7 @@ private Decider createDecider(PollForDecisionTaskResponse decisionTask) throws E decisionTask.setHistory(getHistoryResponse.getHistory()); decisionTask.setNextPageToken(getHistoryResponse.getNextPageToken()); } - DecisionsHelper decisionsHelper = new DecisionsHelper(decisionTask); + DecisionsHelper decisionsHelper = new DecisionsHelper(decisionTask, options); ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType); return new ReplayDecider( service, domain, workflowType, workflow, decisionsHelper, options, laTaskPoller); diff --git a/src/test/java/com/uber/cadence/internal/replay/ReplayDeciderCacheTests.java b/src/test/java/com/uber/cadence/internal/replay/ReplayDeciderCacheTests.java index 73c8189c1..564914d0a 100644 --- a/src/test/java/com/uber/cadence/internal/replay/ReplayDeciderCacheTests.java +++ b/src/test/java/com/uber/cadence/internal/replay/ReplayDeciderCacheTests.java @@ -306,7 +306,7 @@ public WorkflowImplementationOptions getWorkflowImplementationOptions() { return new WorkflowImplementationOptions.Builder().build(); } }, - new DecisionsHelper(response), + new DecisionsHelper(response, new SingleWorkerOptions.Builder().build()), new SingleWorkerOptions.Builder().build(), (a, d) -> true); } From ff05ea157824e96b18cb7e3c2e962d6c3d87522a Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Fri, 18 Sep 2020 16:22:22 -0700 Subject: [PATCH 5/6] Fix a bug in setting retry expiration while getting history (#528) --- .../uber/cadence/internal/replay/ReplayDecider.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java b/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java index 1f4bb21ed..d3c4bcf93 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java +++ b/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java @@ -598,7 +598,7 @@ private class DecisionTaskWithHistoryIteratorImpl implements DecisionTaskWithHis private final Duration paginationStart = Duration.ofMillis(System.currentTimeMillis()); private Duration decisionTaskStartToCloseTimeout; - private final Duration retryServiceOperationExpirationInterval() { + private final Duration decisionTaskRemainingTime() { Duration passed = Duration.ofMillis(System.currentTimeMillis()).minus(paginationStart); return decisionTaskStartToCloseTimeout.minus(passed); } @@ -642,11 +642,18 @@ public HistoryEvent next() { return current.next(); } + Duration decisionTaskRemainingTime = decisionTaskRemainingTime(); + if (decisionTaskRemainingTime.isNegative() || decisionTaskRemainingTime.isZero()) { + throw new Error( + "Decision task timed out while querying history. If this happens consistently please consider " + + "increase decision task timeout or reduce history size."); + } + metricsScope.counter(MetricsType.WORKFLOW_GET_HISTORY_COUNTER).inc(1); Stopwatch sw = metricsScope.timer(MetricsType.WORKFLOW_GET_HISTORY_LATENCY).start(); RetryOptions retryOptions = new RetryOptions.Builder() - .setExpiration(retryServiceOperationExpirationInterval()) + .setExpiration(decisionTaskRemainingTime) .setInitialInterval(retryServiceOperationInitialInterval) .setMaximumInterval(retryServiceOperationMaxInterval) .build(); From cdc5aab4845c653e6afd5d7156c0df377d9107d2 Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Mon, 21 Sep 2020 14:26:03 -0700 Subject: [PATCH 6/6] Fix start async return (#529) --- .../com/uber/cadence/internal/metrics/MetricsType.java | 3 ++- .../com/uber/cadence/internal/replay/DecisionsHelper.java | 7 ++++--- .../com/uber/cadence/internal/replay/ReplayDecider.java | 4 ++-- .../com/uber/cadence/internal/sync/WorkflowStubImpl.java | 3 +-- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/uber/cadence/internal/metrics/MetricsType.java b/src/main/java/com/uber/cadence/internal/metrics/MetricsType.java index 0c671b312..5a3205c4e 100644 --- a/src/main/java/com/uber/cadence/internal/metrics/MetricsType.java +++ b/src/main/java/com/uber/cadence/internal/metrics/MetricsType.java @@ -148,5 +148,6 @@ public class MetricsType { public static final String WORKFLOW_ACTIVE_THREAD_COUNT = CADENCE_METRICS_PREFIX + "workflow_active_thread_count"; - public static final String NON_DETERMINISTIC_ERROR = CADENCE_METRICS_PREFIX + "non-deterministic-error"; + public static final String NON_DETERMINISTIC_ERROR = + CADENCE_METRICS_PREFIX + "non-deterministic-error"; } diff --git a/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java b/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java index 547f9fbe0..b209d1643 100644 --- a/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java +++ b/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java @@ -725,9 +725,10 @@ private boolean addMissingVersionMarker( private DecisionStateMachine getDecision(DecisionId decisionId) { DecisionStateMachine result = decisions.get(decisionId); if (result == null) { - Scope metricsScope = options - .getMetricsScope() - .tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, task.getWorkflowType().getName())); + Scope metricsScope = + options + .getMetricsScope() + .tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, task.getWorkflowType().getName())); metricsScope.counter(MetricsType.NON_DETERMINISTIC_ERROR).inc(1); throw new NonDeterminisicWorkflowError( "Unknown " + decisionId + ". " + NON_DETERMINISTIC_MESSAGE); diff --git a/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java b/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java index d3c4bcf93..d019005e0 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java +++ b/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java @@ -645,8 +645,8 @@ public HistoryEvent next() { Duration decisionTaskRemainingTime = decisionTaskRemainingTime(); if (decisionTaskRemainingTime.isNegative() || decisionTaskRemainingTime.isZero()) { throw new Error( - "Decision task timed out while querying history. If this happens consistently please consider " + - "increase decision task timeout or reduce history size."); + "Decision task timed out while querying history. If this happens consistently please consider " + + "increase decision task timeout or reduce history size."); } metricsScope.counter(MetricsType.WORKFLOW_GET_HISTORY_COUNTER).inc(1); diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java index c2788d833..4b2fcf387 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java @@ -234,7 +234,7 @@ public CompletableFuture startAsyncWithTimeout( CompletableFuture result = startAsyncWithOptions( timeout, unit, WorkflowOptions.merge(null, null, null, options.get()), args); - result.whenComplete( + return result.whenComplete( (input, exception) -> { if (input != null) { execution.set( @@ -243,7 +243,6 @@ public CompletableFuture startAsyncWithTimeout( .setRunId(input.getRunId())); } }); - return result; } private WorkflowExecution signalWithStartWithOptions(