Skip to content

Commit

Permalink
Merging from master
Browse files Browse the repository at this point in the history
  • Loading branch information
sdwr98 committed Sep 22, 2020
2 parents 500b131 + 60ec426 commit 25c7cb0
Show file tree
Hide file tree
Showing 19 changed files with 628 additions and 43 deletions.
5 changes: 5 additions & 0 deletions src/main/java/com/uber/cadence/client/WorkflowStub.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ static <T> WorkflowStub fromTyped(T typed) {

void signal(String signalName, Object... args);

CompletableFuture<Void> signalAsync(String signalName, Object... args);

CompletableFuture<Void> signalAsyncWithTimeout(
long timeout, TimeUnit unit, String signalName, Object... args);

WorkflowExecution start(Object... args);

CompletableFuture<WorkflowExecution> startAsync(Object... args);
Expand Down
17 changes: 2 additions & 15 deletions src/main/java/com/uber/cadence/common/RetryOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,9 @@ public final RetryOptions addDoNotRetry(Class<? extends Throwable>... doNotRetry
return this;
}

double backoffCoefficient = getBackoffCoefficient();
if (backoffCoefficient == 0) {
backoffCoefficient = DEFAULT_BACKOFF_COEFFICIENT;
}

RetryOptions.Builder builder =
new RetryOptions.Builder()
.setInitialInterval(getInitialInterval())
.setExpiration(getExpiration())
.setMaximumInterval(getMaximumInterval())
.setBackoffCoefficient(backoffCoefficient)
.setDoNotRetry(merge(getDoNotRetry(), Arrays.asList(doNotRetry)));
RetryOptions.Builder builder = new RetryOptions.Builder(this);
builder.setDoNotRetry(merge(getDoNotRetry(), Arrays.asList(doNotRetry)));

if (getMaximumAttempts() > 0) {
builder.setMaximumAttempts(getMaximumAttempts());
}
return builder.validateBuildWithDefaults();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ CompletableFuture<WorkflowExecution> startWorkflowAsync(

void signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters);

CompletableFuture<Void> signalWorkflowExecutionAsync(
SignalExternalWorkflowParameters signalParameters);

CompletableFuture<Void> signalWorkflowExecutionAsync(
SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis);

WorkflowExecution signalWithStartWorkflowExecution(
SignalWithStartWorkflowExecutionParameters parameters);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,15 +269,8 @@ private RetryPolicy toRetryPolicy(RetryParameters retryParameters) {

@Override
public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters) {
SignalWorkflowExecutionRequest request = new SignalWorkflowExecutionRequest();
request.setDomain(domain);
SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters);

request.setInput(signalParameters.getInput());
request.setSignalName(signalParameters.getSignalName());
WorkflowExecution execution = new WorkflowExecution();
execution.setRunId(signalParameters.getRunId());
execution.setWorkflowId(signalParameters.getWorkflowId());
request.setWorkflowExecution(execution);
try {
Retryer.retry(
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
Expand All @@ -287,6 +280,55 @@ public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParam
}
}

@Override
public CompletableFuture<Void> signalWorkflowExecutionAsync(
SignalExternalWorkflowParameters signalParameters) {
return signalWorkflowExecutionAsync(signalParameters, Long.MAX_VALUE);
}

@Override
public CompletableFuture<Void> signalWorkflowExecutionAsync(
SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis) {
SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters);
return Retryer.retryWithResultAsync(
getRetryOptionsWithExpiration(
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, timeoutInMillis),
() -> {
CompletableFuture<Void> result = new CompletableFuture<>();
try {
service.SignalWorkflowExecution(
request,
new AsyncMethodCallback() {
@Override
public void onComplete(Object response) {
result.complete(null);
}

@Override
public void onError(Exception exception) {
result.completeExceptionally(exception);
}
});
} catch (TException e) {
result.completeExceptionally(e);
}
return result;
});
}

private SignalWorkflowExecutionRequest getSignalRequest(
SignalExternalWorkflowParameters signalParameters) {
SignalWorkflowExecutionRequest request = new SignalWorkflowExecutionRequest();
request.setDomain(domain);
request.setInput(signalParameters.getInput());
request.setSignalName(signalParameters.getSignalName());
WorkflowExecution execution = new WorkflowExecution();
execution.setRunId(signalParameters.getRunId());
execution.setWorkflowId(signalParameters.getWorkflowId());
request.setWorkflowExecution(execution);
return request;
}

@Override
public WorkflowExecution signalWithStartWorkflowExecution(
SignalWithStartWorkflowExecutionParameters parameters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class MetricsType {
CADENCE_METRICS_PREFIX + "decision-task-error";
public static final String DECISION_TASK_COMPLETED_COUNTER =
CADENCE_METRICS_PREFIX + "decision-task-completed";
public static final String DECISION_TASK_FORCE_COMPLETED =
CADENCE_METRICS_PREFIX + "decision-task-force-completed";

public static final String ACTIVITY_POLL_COUNTER = CADENCE_METRICS_PREFIX + "activity-poll-total";
public static final String ACTIVITY_POLL_FAILED_COUNTER =
Expand Down Expand Up @@ -145,4 +147,7 @@ public class MetricsType {
public static final String STICKY_CACHE_SIZE = CADENCE_METRICS_PREFIX + "sticky-cache-size";
public static final String WORKFLOW_ACTIVE_THREAD_COUNT =
CADENCE_METRICS_PREFIX + "workflow_active_thread_count";

public static final String NON_DETERMINISTIC_ERROR =
CADENCE_METRICS_PREFIX + "non-deterministic-error";
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void accept(Exception reason) {
private final DataConverter dataConverter;
private final Condition taskCondition;
private boolean taskCompleted = false;
private final Map<String, Integer> versionMap = new HashMap<>();

ClockDecisionContext(
DecisionsHelper decisions,
Expand Down Expand Up @@ -227,6 +228,8 @@ void handleMarkerRecorded(HistoryEvent event) {
sideEffectResults.put(event.getEventId(), attributes.getDetails());
} else if (LOCAL_ACTIVITY_MARKER_NAME.equals(name)) {
handleLocalActivityMarker(attributes);
} else if (VERSION_MARKER_NAME.equals(name)) {
handleVersionMarker(attributes);
} else if (!MUTABLE_SIDE_EFFECT_MARKER_NAME.equals(name) && !VERSION_MARKER_NAME.equals(name)) {
if (log.isWarnEnabled()) {
log.warn("Unexpected marker: " + event);
Expand Down Expand Up @@ -276,6 +279,14 @@ private void handleLocalActivityMarker(MarkerRecordedEventAttributes attributes)
}
}

private void handleVersionMarker(MarkerRecordedEventAttributes attributes) {
MarkerHandler.MarkerInterface markerData =
MarkerHandler.MarkerInterface.fromEventAttributes(attributes, dataConverter);
String versionID = markerData.getId();
int version = dataConverter.fromData(attributes.getDetails(), Integer.class, Integer.class);
versionMap.put(versionID, version);
}

int getVersion(String changeId, DataConverter converter, int minSupported, int maxSupported) {
Predicate<MarkerRecordedEventAttributes> changeIdEquals =
(attributes) -> {
Expand All @@ -285,6 +296,12 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
};
decisions.addAllMissingVersionMarker(true, Optional.of(changeIdEquals));

Integer version = versionMap.get(changeId);
if (version != null) {
validateVersion(changeId, version, minSupported, maxSupported);
return version;
}

Optional<byte[]> result =
versionHandler.handle(
changeId,
Expand All @@ -299,7 +316,7 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
if (!result.isPresent()) {
return WorkflowInternal.DEFAULT_VERSION;
}
int version = converter.fromData(result.get(), Integer.class, Integer.class);
version = converter.fromData(result.get(), Integer.class, Integer.class);
validateVersion(changeId, version, minSupported, maxSupported);
return version;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
import com.uber.cadence.WorkflowType;
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
import com.uber.cadence.internal.metrics.MetricsTag;
import com.uber.cadence.internal.metrics.MetricsType;
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEvents;
import com.uber.cadence.internal.worker.SingleWorkerOptions;
import com.uber.cadence.internal.worker.WorkflowExecutionException;
import com.uber.m3.tally.Scope;
import com.uber.m3.util.ImmutableMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -86,6 +91,7 @@ class DecisionsHelper {
+ "change in the workflow definition.";

private final PollForDecisionTaskResponse task;
private final SingleWorkerOptions options;

/**
* When workflow task completes the decisions are converted to events that follow the decision
Expand All @@ -105,8 +111,9 @@ class DecisionsHelper {
// TODO: removal of completed activities
private final Map<String, Long> activityIdToScheduledEventId = new HashMap<>();

DecisionsHelper(PollForDecisionTaskResponse task) {
DecisionsHelper(PollForDecisionTaskResponse task, SingleWorkerOptions options) {
this.task = task;
this.options = options;
}

long getNextDecisionEventId() {
Expand Down Expand Up @@ -664,10 +671,10 @@ private void addDecision(DecisionId decisionId, DecisionStateMachine decision) {
// is removed in replay.
void addAllMissingVersionMarker(
boolean isNextDecisionVersionMarker,
Optional<Predicate<MarkerRecordedEventAttributes>> isDifferentChange) {
Optional<Predicate<MarkerRecordedEventAttributes>> changeIdEquals) {
boolean added;
do {
added = addMissingVersionMarker(isNextDecisionVersionMarker, isDifferentChange);
added = addMissingVersionMarker(isNextDecisionVersionMarker, changeIdEquals);
} while (added);
}

Expand Down Expand Up @@ -718,6 +725,11 @@ private boolean addMissingVersionMarker(
private DecisionStateMachine getDecision(DecisionId decisionId) {
DecisionStateMachine result = decisions.get(decisionId);
if (result == null) {
Scope metricsScope =
options
.getMetricsScope()
.tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, task.getWorkflowType().getName()));
metricsScope.counter(MetricsType.NON_DETERMINISTIC_ERROR).inc(1);
throw new NonDeterminisicWorkflowError(
"Unknown " + decisionId + ". " + NON_DETERMINISTIC_MESSAGE);
}
Expand Down
15 changes: 12 additions & 3 deletions src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,9 @@ private boolean decideImpl(PollForDecisionTaskResponse decisionTask, Functions.P
// Reset state to before running the event loop
decisionsHelper.handleDecisionTaskStartedEvent(decision);
}

if (forceCreateNewDecisionTask) {
metricsScope.counter(MetricsType.DECISION_TASK_FORCE_COMPLETED).inc(1);
}
return forceCreateNewDecisionTask;
} catch (Error e) {
if (this.workflow.getWorkflowImplementationOptions().getNonDeterministicWorkflowPolicy()
Expand Down Expand Up @@ -596,7 +598,7 @@ private class DecisionTaskWithHistoryIteratorImpl implements DecisionTaskWithHis
private final Duration paginationStart = Duration.ofMillis(System.currentTimeMillis());
private Duration decisionTaskStartToCloseTimeout;

private final Duration retryServiceOperationExpirationInterval() {
private final Duration decisionTaskRemainingTime() {
Duration passed = Duration.ofMillis(System.currentTimeMillis()).minus(paginationStart);
return decisionTaskStartToCloseTimeout.minus(passed);
}
Expand Down Expand Up @@ -640,11 +642,18 @@ public HistoryEvent next() {
return current.next();
}

Duration decisionTaskRemainingTime = decisionTaskRemainingTime();
if (decisionTaskRemainingTime.isNegative() || decisionTaskRemainingTime.isZero()) {
throw new Error(
"Decision task timed out while querying history. If this happens consistently please consider "
+ "increase decision task timeout or reduce history size.");
}

metricsScope.counter(MetricsType.WORKFLOW_GET_HISTORY_COUNTER).inc(1);
Stopwatch sw = metricsScope.timer(MetricsType.WORKFLOW_GET_HISTORY_LATENCY).start();
RetryOptions retryOptions =
new RetryOptions.Builder()
.setExpiration(retryServiceOperationExpirationInterval())
.setExpiration(decisionTaskRemainingTime)
.setInitialInterval(retryServiceOperationInitialInterval)
.setMaximumInterval(retryServiceOperationMaxInterval)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private Decider createDecider(PollForDecisionTaskResponse decisionTask) throws E
decisionTask.setHistory(getHistoryResponse.getHistory());
decisionTask.setNextPageToken(getHistoryResponse.getNextPageToken());
}
DecisionsHelper decisionsHelper = new DecisionsHelper(decisionTask);
DecisionsHelper decisionsHelper = new DecisionsHelper(decisionTask, options);
ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType);
return new ReplayDecider(
service, domain, workflowType, workflow, decisionsHelper, options, laTaskPoller);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,15 @@ public void SignalWorkflowExecution(
impl.SignalWorkflowExecution(signalRequest, resultHandler);
}

@Override
public void SignalWorkflowExecutionWithTimeout(
SignalWorkflowExecutionRequest signalRequest,
AsyncMethodCallback resultHandler,
Long timeoutInMillis)
throws TException {
impl.SignalWorkflowExecutionWithTimeout(signalRequest, resultHandler, timeoutInMillis);
}

@Override
public void SignalWithStartWorkflowExecution(
SignalWithStartWorkflowExecutionRequest signalWithStartRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,15 @@ public void SignalWorkflowExecution(
impl.SignalWorkflowExecution(signalRequest, resultHandler);
}

@Override
public void SignalWorkflowExecutionWithTimeout(
SignalWorkflowExecutionRequest signalRequest,
AsyncMethodCallback resultHandler,
Long timeoutInMillis)
throws TException {
impl.SignalWorkflowExecutionWithTimeout(signalRequest, resultHandler, timeoutInMillis);
}

@Override
public void SignalWithStartWorkflowExecution(
SignalWithStartWorkflowExecutionRequest signalWithStartRequest,
Expand Down Expand Up @@ -806,6 +815,17 @@ public void signal(String signalName, Object... args) {
next.signal(signalName, args);
}

@Override
public CompletableFuture<Void> signalAsync(String signalName, Object... args) {
return next.signalAsync(signalName, args);
}

@Override
public CompletableFuture<Void> signalAsyncWithTimeout(
long timeout, TimeUnit unit, String signalName, Object... args) {
return next.signalAsyncWithTimeout(timeout, unit, signalName, args);
}

@Override
public WorkflowExecution start(Object... args) {
return next.start(args);
Expand Down
Loading

0 comments on commit 25c7cb0

Please sign in to comment.