Skip to content

Commit

Permalink
Merge branch 'master' into opentracing
Browse files Browse the repository at this point in the history
  • Loading branch information
sdwr98 authored Oct 1, 2020
2 parents 25c7cb0 + f984029 commit 7ed28cc
Show file tree
Hide file tree
Showing 6 changed files with 419 additions and 36 deletions.
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,32 @@
# Changelog

## 2.7.6
- Fix getVersion override when added new version
- Add async signal to untypedstub
- Fix RetryOptions.addDoNotRetry
- Add missing metrics from go client
- Fix a bug in setting retry expiration while getting history
- Fix start async return

## 2.7.5
- Added supports contextPropagators for localActivity

## v2.7.4
- Fix prometheus reporting issue
- Fix Promise.allOf should not block on empty input
- Misc: Added project directory to sourceItems path
- Add async start to untype stub

## v2.7.3
- Add wf type tag in decider metrics scope
- Fix WorkflowStub.fromTyped method
- Added missing fields to local activity task
- Honor user timeout for get workflow result

## v2.7.2
- Fix leak in Async GetWorkflowExecutionHistory
- Fix context timeout in execute workflow

## v2.7.1
- Fix a bug in build.gradle that prevented javadoc and sources from being published

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ Add *cadence-client* as a dependency to your *pom.xml*:
<dependency>
<groupId>com.uber.cadence</groupId>
<artifactId>cadence-client</artifactId>
<version>2.7.1</version>
<version>2.7.6</version>
</dependency>

or to *build.gradle*:

compile group: 'com.uber.cadence', name: 'cadence-client', version: '2.7.1'
compile group: 'com.uber.cadence', name: 'cadence-client', version: '2.7.6'

## Documentation

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ googleJavaFormat {
}

group = 'com.uber.cadence'
version = '2.7.1'
version = '2.7.6'

description = '''Uber Cadence Java Client'''

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,6 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
};
decisions.addAllMissingVersionMarker(true, Optional.of(changeIdEquals));

Integer version = versionMap.get(changeId);
if (version != null) {
validateVersion(changeId, version, minSupported, maxSupported);
return version;
}

Optional<byte[]> result =
versionHandler.handle(
changeId,
Expand All @@ -313,6 +307,12 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
return Optional.of(converter.toData(maxSupported));
});

Integer version = versionMap.get(changeId);
if (version != null) {
validateVersion(changeId, version, minSupported, maxSupported);
return version;
}

if (!result.isPresent()) {
return WorkflowInternal.DEFAULT_VERSION;
}
Expand Down
163 changes: 136 additions & 27 deletions src/test/java/com/uber/cadence/workflow/WorkflowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.base.Strings;
import com.google.common.util.concurrent.UncheckedExecutionException;
Expand Down Expand Up @@ -411,6 +413,27 @@ public interface TestWorkflow2 {
List<String> getTrace();
}

public interface TestWorkflow3 {

@WorkflowMethod
String execute(String taskList);

@SignalMethod(name = "testSignal")
void signal1(String arg);

@QueryMethod(name = "getState")
String getState();
}

public interface TestWorkflowQuery {

@WorkflowMethod()
String execute(String taskList);

@QueryMethod()
String query();
}

public static class TestSyncWorkflowImpl implements TestWorkflow1 {

@Override
Expand Down Expand Up @@ -4334,13 +4357,13 @@ public void testGetVersion2() {

static CompletableFuture<Boolean> executionStarted = new CompletableFuture<>();

public static class TestGetVersionWithoutDecisionEventWorkflowImpl
implements TestWorkflowSignaled {
public static class TestGetVersionWithoutDecisionEventWorkflowImpl implements TestWorkflow3 {

CompletablePromise<Boolean> signalReceived = Workflow.newPromise();
String result = "";

@Override
public String execute() {
public String execute(String taskList) {
try {
if (!getVersionExecuted.contains("getVersionWithoutDecisionEvent")) {
// Execute getVersion in non-replay mode.
Expand All @@ -4353,10 +4376,11 @@ public String execute() {
int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1);
if (version == Workflow.DEFAULT_VERSION) {
signalReceived.get();
return "result 1";
result = "result 1";
} else {
return "result 2";
result = "result 2";
}
return result;
}
} catch (Exception e) {
throw new RuntimeException("failed to get from signal");
Expand All @@ -4369,6 +4393,11 @@ public String execute() {
public void signal1(String arg) {
signalReceived.complete(true);
}

@Override
public String getState() {
return result;
}
}

@Test
Expand All @@ -4377,25 +4406,26 @@ public void testGetVersionWithoutDecisionEvent() throws Exception {
executionStarted = new CompletableFuture<>();
getVersionExecuted.remove("getVersionWithoutDecisionEvent");
startWorkerFor(TestGetVersionWithoutDecisionEventWorkflowImpl.class);
TestWorkflowSignaled workflowStub =
TestWorkflow3 workflowStub =
workflowClient.newWorkflowStub(
TestWorkflowSignaled.class, newWorkflowOptionsBuilder(taskList).build());
WorkflowClient.start(workflowStub::execute);
TestWorkflow3.class, newWorkflowOptionsBuilder(taskList).build());
WorkflowClient.start(workflowStub::execute, taskList);
executionStarted.get();
workflowStub.signal1("test signal");
String result = workflowStub.execute();
String result = workflowStub.execute(taskList);
assertEquals("result 1", result);
assertEquals("result 1", workflowStub.getState());
}

// The following test covers the scenario where getVersion call is removed before a
// non-version-marker decision.
public static class TestGetVersionRemovedInReplayWorkflowImpl implements TestWorkflow1 {
public static class TestGetVersionRemovedInReplayWorkflowImpl implements TestWorkflowQuery {
String result = "";

@Override
public String execute(String taskList) {
TestActivities testActivities =
Workflow.newActivityStub(TestActivities.class, newActivityOptions1(taskList));
String result;
// Test removing a version check in replay code.
if (!getVersionExecuted.contains(taskList)) {
int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1);
Expand All @@ -4412,25 +4442,33 @@ public String execute(String taskList) {
result += testActivities.activity();
return result;
}

@Override
public String query() {
return result;
}
}

@Test
public void testGetVersionRemovedInReplay() {
startWorkerFor(TestGetVersionRemovedInReplayWorkflowImpl.class);
TestWorkflow1 workflowStub =
TestWorkflowQuery workflowStub =
workflowClient.newWorkflowStub(
TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build());
TestWorkflowQuery.class, newWorkflowOptionsBuilder(taskList).build());
String result = workflowStub.execute(taskList);
assertEquals("activity22activity", result);
tracer.setExpected(
"registerQuery TestWorkflowQuery::query",
"getVersion",
"executeActivity TestActivities::activity2",
"executeActivity TestActivities::activity");
assertEquals("activity22activity", workflowStub.query());
}

// The following test covers the scenario where getVersion call is removed before another
// version-marker decision.
public static class TestGetVersionRemovedInReplay2WorkflowImpl implements TestWorkflow1 {
public static class TestGetVersionRemovedInReplay2WorkflowImpl implements TestWorkflowQuery {
String result = "";

@Override
public String execute(String taskList) {
Expand All @@ -4445,19 +4483,30 @@ public String execute(String taskList) {
Workflow.getVersion("test_change_2", Workflow.DEFAULT_VERSION, 2);
}

return testActivities.activity();
result = testActivities.activity();
return result;
}

@Override
public String query() {
return result;
}
}

@Test
public void testGetVersionRemovedInReplay2() {
startWorkerFor(TestGetVersionRemovedInReplay2WorkflowImpl.class);
TestWorkflow1 workflowStub =
TestWorkflowQuery workflowStub =
workflowClient.newWorkflowStub(
TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build());
TestWorkflowQuery.class, newWorkflowOptionsBuilder(taskList).build());
String result = workflowStub.execute(taskList);
assertEquals("activity", result);
tracer.setExpected("getVersion", "getVersion", "executeActivity TestActivities::activity");
tracer.setExpected(
"registerQuery TestWorkflowQuery::query",
"getVersion",
"getVersion",
"executeActivity TestActivities::activity");
assertEquals("activity", workflowStub.query());
}

public static class TestVersionNotSupportedWorkflowImpl implements TestWorkflow1 {
Expand Down Expand Up @@ -5162,15 +5211,6 @@ public void testParallelLocalActivityExecutionWorkflow() {
result);
}

public interface TestWorkflowQuery {

@WorkflowMethod()
String execute(String taskList);

@QueryMethod()
String query();
}

public static final class TestLocalActivityAndQueryWorkflow implements TestWorkflowQuery {

String message = "initial value";
Expand Down Expand Up @@ -5846,4 +5886,73 @@ public void upsertSearchAttributes(Map<String, Object> searchAttributes) {
next.upsertSearchAttributes(searchAttributes);
}
}

public static class TestGetVersionWorkflowRetryImpl implements TestWorkflow3 {
private String result = "";

@Override
public String execute(String taskList) {
int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1);
int act = 0;
if (version == 1) {
ActivityOptions options =
new ActivityOptions.Builder()
.setTaskList(taskList)
.setHeartbeatTimeout(Duration.ofSeconds(5))
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
.setScheduleToStartTimeout(Duration.ofSeconds(5))
.setStartToCloseTimeout(Duration.ofSeconds(10))
.setRetryOptions(
new RetryOptions.Builder()
.setMaximumAttempts(3)
.setInitialInterval(Duration.ofSeconds(1))
.build())
.build();

TestActivities testActivities = Workflow.newActivityStub(TestActivities.class, options);
act = testActivities.activity1(1);
}

result += "activity" + act;
return result;
}

@Override
public void signal1(String arg) {
Workflow.sleep(1000);
}

@Override
public String getState() {
return result;
}
}

@Test
public void testGetVersionRetry() throws ExecutionException, InterruptedException {
TestActivities activity = mock(TestActivities.class);
when(activity.activity1(1)).thenReturn(1);
worker.registerActivitiesImplementations(activity);

startWorkerFor(TestGetVersionWorkflowRetryImpl.class);
TestWorkflow3 workflowStub =
workflowClient.newWorkflowStub(
TestWorkflow3.class, newWorkflowOptionsBuilder(taskList).build());
CompletableFuture<String> result = WorkflowClient.execute(workflowStub::execute, taskList);
workflowStub.signal1("test");
assertEquals("activity1", result.get());

// test replay
assertEquals("activity1", workflowStub.getState());
}

@Test
public void testGetVersionWithRetryReplay() throws Exception {
// Avoid executing 4 times
Assume.assumeFalse("skipping for docker tests", useExternalService);
Assume.assumeFalse("skipping for sticky off", disableStickyExecution);

WorkflowReplayer.replayWorkflowExecutionFromResource(
"testGetVersionWithRetryHistory.json", TestGetVersionWorkflowRetryImpl.class);
}
}
Loading

0 comments on commit 7ed28cc

Please sign in to comment.