Skip to content

Commit

Permalink
Remove the non_recursive argument from purge and terminate workflow (
Browse files Browse the repository at this point in the history
…#668)

* Renames the `non-recursive` parameter to `recursive`

Signed-off-by: Elena Kolevska <[email protected]>

* Removes the `recursive` parameter for workflows

Signed-off-by: Elena Kolevska <[email protected]>

* Apply suggestions from code review

Signed-off-by: Bernd Verst <[email protected]>

---------

Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Bernd Verst <[email protected]>
Co-authored-by: Bernd Verst <[email protected]>
  • Loading branch information
elena-kolevska and berndverst authored Feb 6, 2024
1 parent 78c2fd7 commit 5f85f97
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 73 deletions.
20 changes: 4 additions & 16 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1285,18 +1285,14 @@ async def get_workflow(self, instance_id: str, workflow_component: str) -> GetWo
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def terminate_workflow(
self, instance_id: str, workflow_component: str, non_recursive: bool = False
) -> DaprResponse:
async def terminate_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Terminates a workflow.
Args:
instance_id (str): the ID of the workflow instance, e.g.
`order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
non_recursive (bool): if true, child workflows will not be terminated,
defaults to false.
Returns:
:class:`DaprResponse` gRPC metadata returned from callee
Expand All @@ -1310,9 +1306,7 @@ async def terminate_workflow(
validateNotBlankString(instance_id=instance_id, workflow_component=workflow_component)
# Actual terminate workflow invocation
req = api_v1.TerminateWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component,
non_recursive=non_recursive,
instance_id=instance_id, workflow_component=workflow_component
)

try:
Expand Down Expand Up @@ -1454,18 +1448,14 @@ async def resume_workflow(self, instance_id: str, workflow_component: str) -> Da
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def purge_workflow(
self, instance_id: str, workflow_component: str, non_recursive: bool = False
) -> DaprResponse:
async def purge_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Purges a workflow.
Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
non_recursive (bool): if true, child workflows will not be purged,
defaults to false.
Returns:
:class:`DaprResponse` gRPC metadata returned from callee
Expand All @@ -1479,9 +1469,7 @@ async def purge_workflow(
validateNotBlankString(instance_id=instance_id, workflow_component=workflow_component)
# Actual purge workflow invocation
req = api_v1.PurgeWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component,
non_recursive=non_recursive,
instance_id=instance_id, workflow_component=workflow_component
)

try:
Expand Down
20 changes: 4 additions & 16 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1302,18 +1302,14 @@ def get_workflow(self, instance_id: str, workflow_component: str) -> GetWorkflow
except RpcError as err:
raise DaprInternalError(err.details())

def terminate_workflow(
self, instance_id: str, workflow_component: str, non_recursive: bool = False
) -> DaprResponse:
def terminate_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Terminates a workflow.
Args:
instance_id (str): the ID of the workflow instance, e.g.
`order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
non_recursive (bool): if true, child workflows will not be terminated,
defaults to false.
Returns:
:class:`DaprResponse` gRPC metadata returned from callee
Expand All @@ -1328,9 +1324,7 @@ def terminate_workflow(
validateNotBlankString(instance_id=instance_id, workflow_component=workflow_component)
# Actual terminate workflow invocation
req = api_v1.TerminateWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component,
non_recursive=non_recursive,
instance_id=instance_id, workflow_component=workflow_component
)

try:
Expand Down Expand Up @@ -1473,18 +1467,14 @@ def resume_workflow(self, instance_id: str, workflow_component: str) -> DaprResp
except RpcError as err:
raise DaprInternalError(err.details())

def purge_workflow(
self, instance_id: str, workflow_component: str, non_recursive: bool = False
) -> DaprResponse:
def purge_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Purges a workflow.
Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
non_recursive (bool): if true, child workflows will not be purged,
defaults to false.
Returns:
:class:`DaprResponse` gRPC metadata returned from callee
Expand All @@ -1498,9 +1488,7 @@ def purge_workflow(
validateNotBlankString(instance_id=instance_id, workflow_component=workflow_component)
# Actual purge workflow invocation
req = api_v1.PurgeWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component,
non_recursive=non_recursive,
instance_id=instance_id, workflow_component=workflow_component
)

try:
Expand Down
30 changes: 15 additions & 15 deletions dapr/proto/runtime/v1/dapr_pb2.py

Large diffs are not rendered by default.

12 changes: 2 additions & 10 deletions dapr/proto/runtime/v1/dapr_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2766,21 +2766,17 @@ class TerminateWorkflowRequest(google.protobuf.message.Message):

INSTANCE_ID_FIELD_NUMBER: builtins.int
WORKFLOW_COMPONENT_FIELD_NUMBER: builtins.int
NON_RECURSIVE_FIELD_NUMBER: builtins.int
instance_id: builtins.str
"""ID of the workflow instance to terminate."""
workflow_component: builtins.str
"""Name of the workflow component."""
non_recursive: builtins.bool
"""Indicates whether this is a non_recursive terminate request"""
def __init__(
self,
*,
instance_id: builtins.str = ...,
workflow_component: builtins.str = ...,
non_recursive: builtins.bool = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["instance_id", b"instance_id", "non_recursive", b"non_recursive", "workflow_component", b"workflow_component"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["instance_id", b"instance_id", "workflow_component", b"workflow_component"]) -> None: ...

global___TerminateWorkflowRequest = TerminateWorkflowRequest

Expand Down Expand Up @@ -2866,21 +2862,17 @@ class PurgeWorkflowRequest(google.protobuf.message.Message):

INSTANCE_ID_FIELD_NUMBER: builtins.int
WORKFLOW_COMPONENT_FIELD_NUMBER: builtins.int
NON_RECURSIVE_FIELD_NUMBER: builtins.int
instance_id: builtins.str
"""ID of the workflow instance to purge."""
workflow_component: builtins.str
"""Name of the workflow component."""
non_recursive: builtins.bool
"""Indicates whether this is a non_recursive purge request"""
def __init__(
self,
*,
instance_id: builtins.str = ...,
workflow_component: builtins.str = ...,
non_recursive: builtins.bool = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["instance_id", b"instance_id", "non_recursive", b"non_recursive", "workflow_component", b"workflow_component"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["instance_id", b"instance_id", "workflow_component", b"workflow_component"]) -> None: ...

global___PurgeWorkflowRequest = PurgeWorkflowRequest

Expand Down
20 changes: 7 additions & 13 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,33 +208,27 @@ def raise_workflow_event(
"""
return self.__obj.raise_orchestration_event(instance_id, event_name, data=data)

def terminate_workflow(
self, instance_id: str, *, non_recursive: bool = False, output: Optional[Any] = None
):
def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
"""Terminates a running workflow instance and updates its runtime status to
WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in
the task hub. When the task hub worker processes this message, it will update the runtime
status of the target instance to WorkflowRuntimeStatus.Terminated. You can use
wait_for_workflow_completion to wait for the instance to reach the terminated state.
Terminating a workflow by default will terminate all of the child workflows that were started by
the workflow instance. If you don't want to terminate child workflows, you can set `non_recursive`
flag to true which will disable termination of child workflows.
Terminating a workflow will terminate all child workflows that were started by
the workflow instance.
However, terminating a workflow would have no effect on any in-flight activity function executions
that were started by the terminated workflow instance.
However, terminating a workflow has no effect on any in-flight activity function
executions that were started by the terminated workflow instance.
At the time of writing, there is no way to terminate an in-flight activity execution.
Args:
instance_id: The ID of the workflow instance to terminate.
output: The optional output to set for the terminated workflow instance.
non_recursive: If true, child workflows will not be terminated,
defaults to false.
"""
return self.__obj.terminate_orchestration(
instance_id, output=output, recursive=(not non_recursive)
)
return self.__obj.terminate_orchestration(instance_id, output=output)

def pause_workflow(self, instance_id: str):
"""Suspends a workflow instance, halting processing of it until resume_workflow is used to
Expand Down
4 changes: 1 addition & 3 deletions ext/dapr-ext-workflow/tests/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ def raise_orchestration_event(
):
return mock_raise_event_result

def terminate_orchestration(
self, instance_id: str, *, output: Union[Any, None] = None, recursive: bool = True
):
def terminate_orchestration(self, instance_id: str, *, output: Union[Any, None] = None):
return mock_terminate_result

def suspend_orchestration(self, instance_id: str):
Expand Down

0 comments on commit 5f85f97

Please sign in to comment.