Skip to content

Commit

Permalink
Update bug fix: prevent update from a stale workflow handle (#703)
Browse files Browse the repository at this point in the history
* Add test that update respects first_execution_run_id

* Send first_execution_run_id with Update requests

Fixes #682
  • Loading branch information
dandavison authored Dec 19, 2024
1 parent 341d949 commit c44a6d8
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 10 deletions.
13 changes: 3 additions & 10 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1882,11 +1882,6 @@ async def execute_update(
.. warning::
This API is experimental
.. warning::
WorkflowHandles created as a result of :py:meth:`Client.start_workflow` will
send updates to the latest workflow with the same workflow ID even if it is
unrelated to the started workflow.
Args:
update: Update function or name on the workflow.
arg: Single argument to the update.
Expand Down Expand Up @@ -1994,11 +1989,6 @@ async def start_update(
.. warning::
This API is experimental
.. warning::
WorkflowHandles created as a result of :py:meth:`Client.start_workflow` will
send updates to the latest workflow with the same workflow ID even if it is
unrelated to the started workflow.
Args:
update: Update function or name on the workflow.
arg: Single argument to the update.
Expand Down Expand Up @@ -2060,6 +2050,7 @@ async def _start_update(
StartWorkflowUpdateInput(
id=self._id,
run_id=self._run_id,
first_execution_run_id=self.first_execution_run_id,
update_id=id,
update=update_name,
args=temporalio.common._arg_or_args(arg, args),
Expand Down Expand Up @@ -4728,6 +4719,7 @@ class StartWorkflowUpdateInput:

id: str
run_id: Optional[str]
first_execution_run_id: Optional[str]
update_id: Optional[str]
update: str
args: Sequence[Any]
Expand Down Expand Up @@ -5360,6 +5352,7 @@ async def start_workflow_update(
workflow_id=input.id,
run_id=input.run_id or "",
),
first_execution_run_id=input.first_execution_run_id or "",
request=temporalio.api.update.v1.Request(
meta=temporalio.api.update.v1.Meta(
update_id=input.update_id or str(uuid.uuid4()),
Expand Down
51 changes: 51 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4425,6 +4425,57 @@ async def test_workflow_update_task_fails(client: Client, env: WorkflowEnvironme
assert bad_validator_fail_ct == 2


@workflow.defn
class UpdateRespectsFirstExecutionRunIdWorkflow:
def __init__(self) -> None:
self.update_received = False

@workflow.run
async def run(self) -> None:
await workflow.wait_condition(lambda: self.update_received)

@workflow.update
async def update(self) -> None:
self.update_received = True


async def test_workflow_update_respects_first_execution_run_id(
client: Client, env: WorkflowEnvironment
):
if env.supports_time_skipping:
pytest.skip(
"Java test server: https://github.com/temporalio/sdk-java/issues/1903"
)
# Start one workflow, obtain the run ID (r1), and let it complete. Start a second
# workflow with the same workflow ID, and try to send an update using the handle from
# r1.
workflow_id = f"update-respects-first-execution-run-id-{uuid.uuid4()}"
async with new_worker(client, UpdateRespectsFirstExecutionRunIdWorkflow) as worker:

async def start_workflow(workflow_id: str) -> WorkflowHandle:
return await client.start_workflow(
UpdateRespectsFirstExecutionRunIdWorkflow.run,
id=workflow_id,
task_queue=worker.task_queue,
)

wf_execution_1_handle = await start_workflow(workflow_id)
await wf_execution_1_handle.execute_update(
UpdateRespectsFirstExecutionRunIdWorkflow.update
)
await wf_execution_1_handle.result()
await start_workflow(workflow_id)

# Execution 1 has closed. This would succeed if the update incorrectly targets
# the second execution
with pytest.raises(RPCError) as exc_info:
await wf_execution_1_handle.execute_update(
UpdateRespectsFirstExecutionRunIdWorkflow.update
)
assert exc_info.value.status == RPCStatusCode.NOT_FOUND
assert "workflow execution not found" in str(exc_info.value)


@workflow.defn
class ImmediatelyCompleteUpdateAndWorkflow:
def __init__(self) -> None:
Expand Down

0 comments on commit c44a6d8

Please sign in to comment.