diff --git a/src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodicReplayTest.java b/src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodicReplayTest.java index 1fde4d88..2b3e5ab2 100644 --- a/src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodicReplayTest.java +++ b/src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodicReplayTest.java @@ -41,9 +41,19 @@ public void testReplay_continueAsNew_moreFrequency() throws Exception { // Continue as new case: If frequency is changed to lesser number. // FAIL As expected: It should hit non-determinism case and it is hitting properly. + // @Test + // public void testReplay_continueAsNew_lessFrequency() throws Exception { + // WorkflowReplayer.replayWorkflowExecutionFromResource( + // "replaytests/HelloPeriodic.json", + // HelloPeriodic_lessFrequency.GreetingWorkflowImpl.class); + // } + + // Continue as new case: when continue as new has child workflow as well + //EXPECTED: FAIL ACTUAL: FAIL // @Test -// public void testReplay_continueAsNew_lessFrequency() throws Exception { +// public void testReplay_continueAsNew_withChildWorkflows() throws Exception { // WorkflowReplayer.replayWorkflowExecutionFromResource( -// "replaytests/HelloPeriodic.json", HelloPeriodic_lessFrequency.GreetingWorkflowImpl.class); +// "replaytests/HelloPeriodic.json", +// HelloPeriodic_withChildWorkflows.GreetingWorkflowImpl.class); // } } diff --git a/src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodic_moreFrequency.java b/src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodic_moreFrequency.java index ed1d951d..2859613e 100644 --- a/src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodic_moreFrequency.java +++ b/src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodic_moreFrequency.java @@ -64,7 +64,7 @@ public interface GreetingActivities { public static class GreetingWorkflowImpl implements GreetingWorkflow { // If we change the value to 1, then non-determinism case will hit. - private final int CONTINUE_AS_NEW_FREQUENCEY = 1000; + private final int CONTINUE_AS_NEW_FREQUENCEY = 100000000; private final GreetingActivities activities = Workflow.newActivityStub( @@ -94,9 +94,19 @@ public void greetPeriodically(String name, Duration delay) { } static class GreetingActivitiesImpl implements GreetingActivities { + + // private int callCount = 1000; + @Override public void greet(String greeting) { + // callCount++; System.out.println("From " + Activity.getWorkflowExecution() + ": " + greeting); + + // Did not get Non-Determinism for this: + // EXPECTED: YES ACTUAL: NO + // if (callCount > 100) { + // throw new RuntimeException("Exceeded maximum call frequency"); + // } } } diff --git a/src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodic_withChildWorkflows.java b/src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodic_withChildWorkflows.java new file mode 100644 index 00000000..e9391c43 --- /dev/null +++ b/src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodic_withChildWorkflows.java @@ -0,0 +1,164 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.samples.replaytests; + +import static com.uber.cadence.samples.common.SampleConstants.DOMAIN; + +import com.google.common.base.Throwables; +import com.uber.cadence.WorkflowExecution; +import com.uber.cadence.WorkflowIdReusePolicy; +import com.uber.cadence.activity.Activity; +import com.uber.cadence.activity.ActivityOptions; +import com.uber.cadence.client.DuplicateWorkflowException; +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; +import com.uber.cadence.client.WorkflowException; +import com.uber.cadence.client.WorkflowStub; +import com.uber.cadence.internal.compatibility.Thrift2ProtoAdapter; +import com.uber.cadence.internal.compatibility.proto.serviceclient.IGrpcServiceStubs; +import com.uber.cadence.samples.hello.HelloChild; +import com.uber.cadence.worker.Worker; +import com.uber.cadence.worker.WorkerFactory; +import com.uber.cadence.workflow.Async; +import com.uber.cadence.workflow.Promise; +import com.uber.cadence.workflow.Workflow; +import com.uber.cadence.workflow.WorkflowMethod; +import java.time.Duration; +import java.util.Optional; + +public class HelloPeriodic_withChildWorkflows { + + static final String TASK_LIST = "HelloPeriodic_withChildWorkflow"; + static final String PERIODIC_WORKFLOW_ID = "HelloPeriodic_withChildWorkflow"; + + public interface GreetingWorkflow { + @WorkflowMethod( + // At most one instance. + workflowId = PERIODIC_WORKFLOW_ID, + // To allow starting workflow with the same ID after the previous one has terminated. + workflowIdReusePolicy = WorkflowIdReusePolicy.AllowDuplicate, + // Adjust this value to the maximum time workflow is expected to run. + // It usually depends on the number of repetitions and interval between them. + executionStartToCloseTimeoutSeconds = 300, + taskList = TASK_LIST + ) + void greetPeriodically(String name, Duration delay); + } + + public interface GreetingActivities { + void greet(String greeting); + } + + public static class GreetingWorkflowImpl implements GreetingWorkflow { + + private final int CONTINUE_AS_NEW_FREQUENCEY = 1000; + + private final GreetingActivities activities = + Workflow.newActivityStub( + GreetingActivities.class, + new ActivityOptions.Builder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()); + + /** + * Stub used to terminate this workflow run and create the next one with the same ID atomically. + */ + private final GreetingWorkflow continueAsNew = + Workflow.newContinueAsNewStub(GreetingWorkflow.class); + + @Override + public void greetPeriodically(String name, Duration delay) { + + HelloChild.GreetingChild child = + Workflow.newChildWorkflowStub(HelloChild.GreetingChild.class); + + Promise greeting = Async.function(child::composeGreeting, "Hello", name); + + System.out.println(greeting.get()); + + // Loop the predefined number of times then continue this workflow as new. + // This is needed to periodically truncate the history size. + for (int i = 0; i < CONTINUE_AS_NEW_FREQUENCEY; i++) { + activities.greet("Hello " + name + "!"); + Workflow.sleep(delay); + } + // Current workflow run stops executing after this call. + continueAsNew.greetPeriodically(name, delay); + } + } + + static class GreetingActivitiesImpl implements GreetingActivities { + + @Override + public void greet(String greeting) { + System.out.println("From " + Activity.getWorkflowExecution() + ": " + greeting); + } + } + + public static void main(String[] args) throws InterruptedException { + // Get a new client + // NOTE: to set a different options, you can do like this: + // ClientOptions.newBuilder().setRpcTimeout(5 * 1000).build(); + WorkflowClient workflowClient = + WorkflowClient.newInstance( + new Thrift2ProtoAdapter(IGrpcServiceStubs.newInstance()), + WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build()); + // Get worker to poll the task list. + WorkerFactory factory = WorkerFactory.newInstance(workflowClient); + Worker worker = factory.newWorker(TASK_LIST); + // Workflows are stateful. So you need a type to create instances. + worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class); + // Activities are stateless and thread safe. So a shared instance is used. + worker.registerActivitiesImplementations(new GreetingActivitiesImpl()); + // Start listening to the workflow and activity task lists. + factory.start(); + + // Start a workflow execution. Usually this is done from another program. + // To ensure that this daemon type workflow is always running try to start it periodically + // ignoring the duplicated exception. + // It is only to protect from application level failures. + // Failures of a workflow worker don't lead to workflow failures. + WorkflowExecution execution = null; + while (true) { + // Print reason of failure of the previous run, before restarting. + if (execution != null) { + WorkflowStub workflow = workflowClient.newUntypedWorkflowStub(execution, Optional.empty()); + try { + workflow.getResult(Void.class); // + } catch (WorkflowException e) { + System.out.println("Previous instance failed:\n" + Throwables.getStackTraceAsString(e)); + } + } + // New stub instance should be created for each new workflow start. + GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class); + try { + execution = + WorkflowClient.start(workflow::greetPeriodically, "World", Duration.ofSeconds(3)); + System.out.println("Started " + execution); + } catch (DuplicateWorkflowException e) { + System.out.println("Still running as " + e.getExecution()); + } catch (Throwable e) { + e.printStackTrace(); + System.exit(1); + } + // This value is so low just for the sample purpose. In production workflow + // it is usually much higher. + Thread.sleep(10000); + } + } +}