Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added more continue as new cases for Non-Determinism java checks #85

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,19 @@ public void testReplay_continueAsNew_moreFrequency() throws Exception {

// Continue as new case: If frequency is changed to lesser number.
// FAIL As expected: It should hit non-determinism case and it is hitting properly.
// @Test
// public void testReplay_continueAsNew_lessFrequency() throws Exception {
// WorkflowReplayer.replayWorkflowExecutionFromResource(
// "replaytests/HelloPeriodic.json",
// HelloPeriodic_lessFrequency.GreetingWorkflowImpl.class);
// }

// Continue as new case: when continue as new has child workflow as well
//EXPECTED: FAIL ACTUAL: FAIL
// @Test
// public void testReplay_continueAsNew_lessFrequency() throws Exception {
// public void testReplay_continueAsNew_withChildWorkflows() throws Exception {
// WorkflowReplayer.replayWorkflowExecutionFromResource(
// "replaytests/HelloPeriodic.json", HelloPeriodic_lessFrequency.GreetingWorkflowImpl.class);
// "replaytests/HelloPeriodic.json",
// HelloPeriodic_withChildWorkflows.GreetingWorkflowImpl.class);
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public interface GreetingActivities {
public static class GreetingWorkflowImpl implements GreetingWorkflow {

// If we change the value to 1, then non-determinism case will hit.
private final int CONTINUE_AS_NEW_FREQUENCEY = 1000;
private final int CONTINUE_AS_NEW_FREQUENCEY = 100000000;

private final GreetingActivities activities =
Workflow.newActivityStub(
Expand Down Expand Up @@ -94,9 +94,19 @@ public void greetPeriodically(String name, Duration delay) {
}

static class GreetingActivitiesImpl implements GreetingActivities {

// private int callCount = 1000;

@Override
public void greet(String greeting) {
// callCount++;
System.out.println("From " + Activity.getWorkflowExecution() + ": " + greeting);

// Did not get Non-Determinism for this:
// EXPECTED: YES ACTUAL: NO
// if (callCount > 100) {
// throw new RuntimeException("Exceeded maximum call frequency");
// }
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.samples.replaytests;

import static com.uber.cadence.samples.common.SampleConstants.DOMAIN;

import com.google.common.base.Throwables;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.WorkflowIdReusePolicy;
import com.uber.cadence.activity.Activity;
import com.uber.cadence.activity.ActivityOptions;
import com.uber.cadence.client.DuplicateWorkflowException;
import com.uber.cadence.client.WorkflowClient;
import com.uber.cadence.client.WorkflowClientOptions;
import com.uber.cadence.client.WorkflowException;
import com.uber.cadence.client.WorkflowStub;
import com.uber.cadence.internal.compatibility.Thrift2ProtoAdapter;
import com.uber.cadence.internal.compatibility.proto.serviceclient.IGrpcServiceStubs;
import com.uber.cadence.samples.hello.HelloChild;
import com.uber.cadence.worker.Worker;
import com.uber.cadence.worker.WorkerFactory;
import com.uber.cadence.workflow.Async;
import com.uber.cadence.workflow.Promise;
import com.uber.cadence.workflow.Workflow;
import com.uber.cadence.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.Optional;

public class HelloPeriodic_withChildWorkflows {

static final String TASK_LIST = "HelloPeriodic_withChildWorkflow";
static final String PERIODIC_WORKFLOW_ID = "HelloPeriodic_withChildWorkflow";

public interface GreetingWorkflow {
@WorkflowMethod(
// At most one instance.
workflowId = PERIODIC_WORKFLOW_ID,
// To allow starting workflow with the same ID after the previous one has terminated.
workflowIdReusePolicy = WorkflowIdReusePolicy.AllowDuplicate,
// Adjust this value to the maximum time workflow is expected to run.
// It usually depends on the number of repetitions and interval between them.
executionStartToCloseTimeoutSeconds = 300,
taskList = TASK_LIST
)
void greetPeriodically(String name, Duration delay);
}

public interface GreetingActivities {
void greet(String greeting);
}

public static class GreetingWorkflowImpl implements GreetingWorkflow {

private final int CONTINUE_AS_NEW_FREQUENCEY = 1000;

private final GreetingActivities activities =
Workflow.newActivityStub(
GreetingActivities.class,
new ActivityOptions.Builder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build());

/**
* Stub used to terminate this workflow run and create the next one with the same ID atomically.
*/
private final GreetingWorkflow continueAsNew =
Workflow.newContinueAsNewStub(GreetingWorkflow.class);

@Override
public void greetPeriodically(String name, Duration delay) {

HelloChild.GreetingChild child =
Workflow.newChildWorkflowStub(HelloChild.GreetingChild.class);

Promise<String> greeting = Async.function(child::composeGreeting, "Hello", name);

System.out.println(greeting.get());

// Loop the predefined number of times then continue this workflow as new.
// This is needed to periodically truncate the history size.
for (int i = 0; i < CONTINUE_AS_NEW_FREQUENCEY; i++) {
activities.greet("Hello " + name + "!");
Workflow.sleep(delay);
}
// Current workflow run stops executing after this call.
continueAsNew.greetPeriodically(name, delay);
}
}

static class GreetingActivitiesImpl implements GreetingActivities {

@Override
public void greet(String greeting) {
System.out.println("From " + Activity.getWorkflowExecution() + ": " + greeting);
}
}

public static void main(String[] args) throws InterruptedException {
// Get a new client
// NOTE: to set a different options, you can do like this:
// ClientOptions.newBuilder().setRpcTimeout(5 * 1000).build();
WorkflowClient workflowClient =
WorkflowClient.newInstance(
new Thrift2ProtoAdapter(IGrpcServiceStubs.newInstance()),
WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build());
// Get worker to poll the task list.
WorkerFactory factory = WorkerFactory.newInstance(workflowClient);
Worker worker = factory.newWorker(TASK_LIST);
// Workflows are stateful. So you need a type to create instances.
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
// Activities are stateless and thread safe. So a shared instance is used.
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
// Start listening to the workflow and activity task lists.
factory.start();

// Start a workflow execution. Usually this is done from another program.
// To ensure that this daemon type workflow is always running try to start it periodically
// ignoring the duplicated exception.
// It is only to protect from application level failures.
// Failures of a workflow worker don't lead to workflow failures.
WorkflowExecution execution = null;
while (true) {
// Print reason of failure of the previous run, before restarting.
if (execution != null) {
WorkflowStub workflow = workflowClient.newUntypedWorkflowStub(execution, Optional.empty());
try {
workflow.getResult(Void.class); //
} catch (WorkflowException e) {
System.out.println("Previous instance failed:\n" + Throwables.getStackTraceAsString(e));
}
}
// New stub instance should be created for each new workflow start.
GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class);
try {
execution =
WorkflowClient.start(workflow::greetPeriodically, "World", Duration.ofSeconds(3));
System.out.println("Started " + execution);
} catch (DuplicateWorkflowException e) {
System.out.println("Still running as " + e.getExecution());
} catch (Throwable e) {
e.printStackTrace();
System.exit(1);
}
// This value is so low just for the sample purpose. In production workflow
// it is usually much higher.
Thread.sleep(10000);
}
}
}
Loading