Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OnConflictOptions Support #2415

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open

Add OnConflictOptions Support #2415

wants to merge 23 commits into from

Conversation

justinp-tt
Copy link
Contributor

@justinp-tt justinp-tt commented Feb 13, 2025

What was changed

Add OnConflictOptions support in the test server

Why?

Test server needs OnConflictOptions support

Checklist

  1. Closes
    SDK-3343
  2. How was this tested:
  1. Any docs updates needed?

@justinp-tt justinp-tt marked this pull request as ready for review February 14, 2025 14:36
@justinp-tt justinp-tt requested a review from a team as a code owner February 14, 2025 14:36
Copy link
Member

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM.

Could you please also add tests?

Copy link

@rodrigozhou rodrigozhou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We changed the server validation of StartWorkflowExecutionRequest.OnConflictOptions. Now, if AttachCompletionCallbacks is true, then AttachRequestId has to be true as well. Not sure if we need to make any changes here.

@@ -118,6 +126,95 @@ public void secondWorkflowTerminatesFirst() {
describe(execution2).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);
}

@Test
public void useExistingPolicy_RequestIdDeduplication() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have test coverage that WORKFLOW_EXECUTION_OPTIONS_UPDATED is written to the workflows history.

&& (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
|| status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
if (status == WORKFLOW_EXECUTION_STATUS_RUNNING) {
if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some edge cases that we handled in the server (or at least planned to) for reuse policy terminate if running and the different conflict policies.

Someone can address that in a follow up PR though.

CC @rodrigozhou

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rodrigozhou have you reviewed this code and verified the different edge cases?
Can wait for a follow up PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do in a follow up PR, this one is already quite big.


if (options.getAttachCompletionCallbacks()
&& !request.getCompletionCallbacksList().isEmpty()) {
attrs.addAllAttachedCompletionCallbacks(request.getCompletionCallbacksList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we resolve all these callbacks we are attaching? I can't find it anywhere in this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applyOnConflictOptions calls ctx.commitChanges(store); The save implementation makes a call to fireCallbacks. Is that where we would expect callbacks to be invoked?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was assuming that we would trigger any callback attached to the workflow, that should be validated when tests are added.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already resolving. I just had to change to get the callbacks from this new attribute I added to TestWorkflowMutableStateImpl instead of reading from the startRequest.

@justinp-tt justinp-tt assigned rodrigozhou and unassigned justinp-tt Feb 25, 2025
Copy link
Member

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

&& (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
|| status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
if (status == WORKFLOW_EXECUTION_STATUS_RUNNING) {
if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rodrigozhou have you reviewed this code and verified the different edge cases?
Can wait for a follow up PR.

@bergundy bergundy requested a review from rodrigozhou February 28, 2025 15:03
@rodrigozhou rodrigozhou force-pushed the on-conflict-options branch from 171f29a to 5ef7c2f Compare March 3, 2025 20:01
Comment on lines +1751 to +1760
try {
service.completeNexusOperation(
ref, ctx.getExecution().getWorkflowId(), startLink, completionEvent.get());
} catch (StatusRuntimeException e) {
// Callback destination not found should not block processing the callbacks nor
// completing the workflow.
if (e.getStatus().getCode() != Status.Code.NOT_FOUND) {
throw e;
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pdoerner This is what I did to address callback "not found".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we do get NOT_FOUND (or really any error) we should log an error and/or store the failure somewhere so that it can be returned as part of the DescribeWorkflow response which has fields for callback's last failure, attempt time, etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we can also address that in a followup, doesn't necessarily have to be in this PR

@@ -1465,6 +1488,7 @@ public String continueAsNew(
String identity,
ExecutionId continuedExecutionId,
String firstExecutionRunId,
TestWorkflowMutableState previousExecutionState,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need the previous mutable state so we can copy all callbacks to the continued as new workflow (L1511-1514 below).

@rodrigozhou rodrigozhou force-pushed the on-conflict-options branch from 5ef7c2f to a507b5e Compare March 3, 2025 20:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants