Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update-with-start #702

Merged
merged 1 commit into from
Dec 19, 2024
Merged

Update-with-start #702

merged 1 commit into from
Dec 19, 2024

Conversation

dandavison
Copy link
Contributor

@dandavison dandavison commented Dec 16, 2024

Add an update-with-start API, using the MultiOperation gRPC API.

The test suite is not complete yet, but please feel free to review.

In addition to the tests, an example of using the new API is temporalio/samples-python#156:

    cart_id = f"cart-{session_id}"
    start_op = WithStartWorkflowOperation(
        ShoppingCartWorkflow.run,
        id=cart_id,
        id_conflict_policy=common.WorkflowIDConflictPolicy.USE_EXISTING,
        task_queue="uws",
    )
    try:
        price = Decimal(
            await temporal_client.execute_update_with_start(
                ShoppingCartWorkflow.add_item,
                ShoppingCartItem(sku=item_id, quantity=quantity),
                start_workflow_operation=start_op,
            )
        )
    except WorkflowUpdateFailedError:
        price = None

    return price, await start_op.workflow_handle()

From the docstring:

        A WorkflowIDConflictPolicy must be set in the start_workflow_operation. If the
        specified workflow execution is not running, a new workflow execution is started
        and the update is sent in the first workflow task. Alternatively if the specified
        workflow execution is running then, if the WorkflowIDConflictPolicy is
        USE_EXISTING, the update is issued against the specified workflow, and if the
        WorkflowIDConflictPolicy is FAIL, an error is returned. This call will block until
        the update has completed, and return the update result. Note that this means that
        the call will not return successfully until the update has been delivered to a
        worker.

@dandavison dandavison force-pushed the uws branch 4 times, most recently from 5a868c0 to 9f1d783 Compare December 16, 2024 19:46
@dandavison dandavison marked this pull request as ready for review December 16, 2024 20:23
@dandavison dandavison requested a review from a team as a code owner December 16, 2024 20:23
Copy link
Member

@cretz cretz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly minor things, overall LGTM

@@ -788,6 +780,308 @@ def get_workflow_handle_for(
result_type=defn.ret_type,
)

# Overload for no-param update
@overload
async def execute_update_with_start(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be execute_update_with_start_workflow to differentiate from all of the non-workflow stuff on the client (same for start_update_with_start_workflow)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I hadn't appreciated the distinction from Java/TS's "workflow clients". Done.

Comment on lines 867 to 903
update: Update function or name on the workflow. arg: Single argument to the
update. args: Multiple arguments to the update. Cannot be set if arg is.
start_workflow_operation: a WithStartWorkflowOperation definining the
WorkflowIDConflictPolicy and how to start the workflow in the event that a
workflow is started.
id: ID of the update. If not set, the default is a new UUID. result_type: For
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some newlines aren't showing here to separate the args

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.


# Overload for no-param workflow, with_start
@overload
def __init__(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may need to new user metadata stuff that was added in #701

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, rebased.

)
self._workflow_handle: Future[WorkflowHandle[SelfType, ReturnType]] = Future()

async def workflow_handle(self) -> WorkflowHandle[SelfType, ReturnType]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong opinion here, but would be ok if this was just called handle since it's in the start-workflow class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'll leave it as workflow_handle. It's an "operation" which is not really a standard SDK concept, so the clarity probably helps IMO, plus handle could be mistaken for a verb (a lot of non-English-first-language-speakers find the handle-verb, handle-noun, handler-noun terms confusing in software.)

Copy link
Member

@cretz cretz Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. May be worth noting Go, Java, and .NET WithStartWorkflowOperation classes to not qualify their methods with workflow either which makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As decided in group discussion, we're going with await start_op.workflow_handle()

@@ -4919,6 +5460,12 @@ async def start_workflow_update(
"""Called for every :py:meth:`WorkflowHandle.update` and :py:meth:`WorkflowHandle.start_update` call."""
return await self.next.start_workflow_update(input)

async def start_workflow_update_with_start(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be more clearly named start_update_with_start_workflow (and changing input class name too). IMO it makes sense to have the method name match the client call (which should also be that IMO).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. But signal/query/update/terminate do not do this; they have query_workflow, signal_workflow, start_workflow_update, terminate_workflow, etc.

Copy link
Contributor Author

@dandavison dandavison Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think? It seems unfortunately that the most consistent name is start_workflow_update_with_start_workflow, in order to match start_workflow_update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK discussed offline; it is now named start_update_with_start_workflow to match the client call. (The idea behind start_workflow_update is that the workflow handle can name it start_update but in other contexts we need to be more explicit about what "update" is).

Comment on lines +5936 to +6150
start_req = (
await self._build_update_with_start_start_workflow_execution_request(
start_input
)
)
update_req = await self._build_update_workflow_execution_request(
update_input, workflow_id=start_input.id
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned that exceptions that happen here (e.g. serializing workflow/update args) will leave someone waiting on the workflow handle hanging. Same for things like cancel.

Is it possible to make sure that no matter how this method exits, the start operation handle awaiter is updated? I didn't check if other langs did this, but I think it makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes absolutely, good call. Done. Both the exception handling and the inner logic are involved so the inner logic is in a separate function, with a finally case in the outer function ensuring that the promise is rejected in all cases.

),
None,
)
if status and status.code in RPCStatusCode:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add special handling for WorkflowAlreadyStartedError here I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, that's done now, and in general the error handling and poll loop has been rewritten.

temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse
] = [
r.start_workflow
for r in multiop_response.responses
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't really matter, but I think we should be able to assume the indexes of responses match 1:1 with the request objects. So may be able to just do multiop_response.responses[0].start_workflow and not have to loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I decided that you're right and I didn't need to program so defensively here. So more minimal/cleaner now.

# Build the handle. If the user's wait stage is COMPLETED, make sure we
# poll for result.
handle: WorkflowUpdateHandle[Any] = WorkflowUpdateHandle(
return WorkflowUpdateHandle(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we do execute_update (or the user set wait stage to completed), we don't return the handle until we have polled for outcome

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, there's a test that I'd intended to be confirming that the result is fetched in that context, but the test was misconceived. It's not the easiest thing to test for, but I've put the poll call in. Maybe we can add a test later based on manipulating the history long poll timeout.

                break

        handle = WorkflowUpdateHandle(
            client=self._client,
            id=update_req.request.meta.update_id,
            workflow_id=start_input.id,
            workflow_run_id=start_response.run_id,
            known_outcome=known_outcome,
        )
        if update_input.wait_for_stage == WorkflowUpdateStage.COMPLETED:
            await handle._poll_until_outcome()

        return handle

@dandavison dandavison force-pushed the uws branch 3 times, most recently from 0b37fd2 to 3c17209 Compare December 18, 2024 18:15
Copy link
Member

@cretz cretz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing blocking


# TODO (dan):
# temporalio/client.py:926: error: Overloaded function implementation does not accept all possible arguments of signature 1 [misc]
async def start_update_with_start_workflow( # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here and other user facing entry points (e.g. execute equivalent and the class doc for WithStartWorkflowOperation) should have the "experimental" warning that looks similar to what we're removing on #707.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added experimental warnings

@@ -5418,7 +5984,8 @@ async def start_workflow_update(
handle: WorkflowUpdateHandle[Any] = WorkflowUpdateHandle(
client=self._client,
id=req.request.meta.update_id,
workflow_id=input.id,
workflow_id=workflow_id,
# TODO: Why don't we use the run ID from the update response here?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bug we believe (and it exists in .NET too)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted comment from this PR

Comment on lines 6073 to 6107
if (
st.details
and not st.details[0].Is(
temporalio.api.failure.v1.MultiOperationExecutionAborted.DESCRIPTOR
)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What did we end up deciding here about what a successful start but failed update looks like? I think server side today st.details is never None correct? Should we ignore OK statuses? This is super rare and so non-blocking for this PR, but whatever the decision we may need to apply to other SDKs too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm skipping OK statuses in Python and TS.

st
for st in multiop_failure.statuses
if (
st.details
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic is a bit off. I don't think we should require details to be considered an error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that sounds right. 5a06a6a

@dandavison dandavison force-pushed the uws branch 2 times, most recently from faf4140 to 10f3026 Compare December 19, 2024 17:05
@dandavison dandavison merged commit 540faeb into main Dec 19, 2024
12 checks passed
@dandavison dandavison deleted the uws branch December 19, 2024 19:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants