Skip to content

Commit

Permalink
Add OnConflictOptions support
Browse files Browse the repository at this point in the history
  • Loading branch information
justinp-tt committed Feb 24, 2025
1 parent 0d5b919 commit 81f9ec4
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ static final class WorkflowData {
this.lastFailure = Objects.requireNonNull(lastFailure);
}

boolean hasRequestId(@Nonnull String requestId) {
return requestIds.contains(requestId);
}

void addRequestId(@Nonnull String requestId) {
requestIds.add(requestId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,6 @@ PollWorkflowExecutionUpdateResponse pollUpdateWorkflowExecution(
Optional<TestWorkflowMutableState> getParent();

boolean isTerminalState();

boolean hasRequestId(String requestId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2032,6 +2032,11 @@ public boolean isTerminalState() {
return isTerminalState(workflowState);
}

@Override
public boolean hasRequestId(@Nonnull String requestId) {
return workflow.getData().hasRequestId(requestId);
}

private void updateHeartbeatTimer(
RequestContext ctx,
long activityId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,16 +262,17 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl(
lock.lock();
try {
String newRunId = UUID.randomUUID().toString();
StartWorkflowExecutionResponse dedupedResponse = dedupedRequest(startRequest);
if (dedupedResponse != null) {
return dedupedResponse;
}

existing = executionsByWorkflowId.get(workflowId);
if (existing != null) {
WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();

if (status == WORKFLOW_EXECUTION_STATUS_RUNNING) {
StartWorkflowExecutionResponse dedupedResponse = dedupedRequest(startRequest, existing);
if (dedupedResponse != null) {
return dedupedResponse;
}

if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
|| conflictPolicy
== WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING) {
Expand Down Expand Up @@ -1898,25 +1899,22 @@ public WorkflowServiceStubs newClientStub() {
}

private StartWorkflowExecutionResponse dedupedRequest(
StartWorkflowExecutionRequest startRequest) {
StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existingWorkflow) {
String requestId = startRequest.getRequestId();
if (!requestId.isEmpty()) {
for (TestWorkflowMutableState state : executions.values()) {
String existingRequestId = state.getStartRequest().getRequestId();
if (requestId.equals(existingRequestId)) {
if (state.getWorkflowExecutionStatus()
== WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) {
if (startRequest.hasOnConflictOptions()) {
state.applyOnConflictOptions(startRequest);
}
return StartWorkflowExecutionResponse.newBuilder()
.setRunId(state.getExecutionId().getExecution().getRunId())
.build();
}
break;
}
}
String existingRequestId = existingWorkflow.getStartRequest().getRequestId();
if (existingRequestId.equals(requestId)) {
return StartWorkflowExecutionResponse.newBuilder()
.setStarted(true)
.setRunId(existingWorkflow.getExecutionId().getExecution().getRunId())
.build();
}

if (existingWorkflow.hasRequestId(requestId)) {
return StartWorkflowExecutionResponse.newBuilder()
.setRunId(existingWorkflow.getExecutionId().getExecution().getRunId())
.build();
}

return null;
}

Expand Down

0 comments on commit 81f9ec4

Please sign in to comment.