From 58dbd8bf7ad96b173e156394b10bd67fb3938104 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Tue, 14 Jan 2025 15:16:34 +0100 Subject: [PATCH] workflows: update durabletask dependency (#757) * Bump codecov/codecov-action from 4 to 5 (#753) Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 4 to 5. - [Release notes](https://github.com/codecov/codecov-action/releases) - [Changelog](https://github.com/codecov/codecov-action/blob/main/CHANGELOG.md) - [Commits](https://github.com/codecov/codecov-action/compare/v4...v5) --- updated-dependencies: - dependency-name: codecov/codecov-action dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Signed-off-by: Elena Kolevska * update durabletask to use fork Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> Signed-off-by: Elena Kolevska * add purge workflow function Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> Signed-off-by: Elena Kolevska * support reuse id policy Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> Signed-off-by: Elena Kolevska * support set custom status Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> Signed-off-by: Elena Kolevska * Update ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py Co-authored-by: Elena Kolevska Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> Signed-off-by: Elena Kolevska * Update ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py Co-authored-by: Elena Kolevska Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> Signed-off-by: Elena Kolevska * Update ext/dapr-ext-workflow/tests/test_workflow_client.py Co-authored-by: Elena Kolevska Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> Signed-off-by: Elena Kolevska * Update ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py Co-authored-by: Elena Kolevska Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> Signed-off-by: Elena Kolevska * update test, grpc version and lint Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> Signed-off-by: Elena Kolevska * Adds missing arguments in FakeTaskHubGrpcClient Signed-off-by: Elena Kolevska * linter Signed-off-by: Elena Kolevska * remove alpha for workflow stable release (#760) Signed-off-by: Hannah Hunter Signed-off-by: Elena Kolevska * Replace deprecated tox.ini option (#762) This option was replaced in 2020, deprecated, and eventually removed in tox 4. The correct option already appears elseware in this tox.ini file. This fix is necessary to run `tox -e doc` per the README.md instructions on tox 4. Signed-off-by: Eric Searcy Co-authored-by: Elena Kolevska Signed-off-by: Elena Kolevska * Add Actor Mocks (#750) * Moved files to new branch to avoid weird git bug Signed-off-by: Lorenzo Curcio * requested documentation changes Signed-off-by: Lorenzo Curcio * forgot to move file back to starting point Signed-off-by: Lorenzo Curcio * result of ruff format Signed-off-by: Lorenzo Curcio * fixed minor formatting issues, fixed type issues Signed-off-by: Lorenzo Curcio * minor test fix Signed-off-by: Lorenzo Curcio * fixes try_add_state Signed-off-by: Elena Kolevska Signed-off-by: Lorenzo Curcio * Revert "fixes try_add_state" This reverts commit 254ad17bfb184310b2ceae37c1eb82c947466ce6. Signed-off-by: Lorenzo Curcio * Update dapr/actor/runtime/mock_state_manager.py Fixing bug in try_add_state as mentioned in PR #756 Co-authored-by: Elena Kolevska Signed-off-by: Lorenzo Curcio Signed-off-by: Lorenzo Curcio * Update dapr/actor/runtime/mock_actor.py Whoops missed this Co-authored-by: Elena Kolevska Signed-off-by: Lorenzo Curcio * Update daprdocs/content/en/python-sdk-docs/python-actor.md Co-authored-by: Elena Kolevska Signed-off-by: Lorenzo Curcio * Update daprdocs/content/en/python-sdk-docs/python-actor.md Co-authored-by: Elena Kolevska Signed-off-by: Lorenzo Curcio * Update daprdocs/content/en/python-sdk-docs/python-actor.md Co-authored-by: Elena Kolevska Signed-off-by: Lorenzo Curcio * Update daprdocs/content/en/python-sdk-docs/python-actor.md Co-authored-by: Elena Kolevska Signed-off-by: Lorenzo Curcio * Update daprdocs/content/en/python-sdk-docs/python-actor.md Co-authored-by: Elena Kolevska Signed-off-by: Lorenzo Curcio * Update daprdocs/content/en/python-sdk-docs/python-actor.md Co-authored-by: Elena Kolevska Signed-off-by: Lorenzo Curcio * Update daprdocs/content/en/python-sdk-docs/python-actor.md Co-authored-by: Elena Kolevska Signed-off-by: Lorenzo Curcio * minor error in docs Signed-off-by: Lorenzo Curcio * fixed and added more unit tests. Added example Signed-off-by: Lorenzo Curcio * unittest fix Signed-off-by: Lorenzo Curcio * Update examples/demo_actor/README.md Co-authored-by: Elena Kolevska Signed-off-by: Lorenzo Curcio * concentrated some tests Signed-off-by: Lorenzo Curcio * removed unnecessary type hint Signed-off-by: Lorenzo Curcio * Update daprdocs/content/en/python-sdk-docs/python-actor.md didnt see this earlier whoops Co-authored-by: Elena Kolevska Signed-off-by: Lorenzo Curcio * Update examples/demo_actor/README.md Co-authored-by: Elena Kolevska Signed-off-by: Lorenzo Curcio * documentation changes Signed-off-by: Lorenzo Curcio * now requires #type: ignore Signed-off-by: Lorenzo Curcio * small docs change Signed-off-by: Elena Kolevska * examples test fix Signed-off-by: Elena Kolevska --------- Signed-off-by: Lorenzo Curcio Signed-off-by: Lorenzo Curcio Signed-off-by: Elena Kolevska Co-authored-by: Elena Kolevska Co-authored-by: Lorenzo Curcio Co-authored-by: Elena Kolevska Signed-off-by: Elena Kolevska * Fixes try_add_state in actor state manger (#756) Signed-off-by: Elena Kolevska * Integration test for http invocation (#758) Signed-off-by: Elena Kolevska * fixes missing state store in test (#759) Signed-off-by: Elena Kolevska * Mark workflows API functions as deprecated (#749) * workflows, remove deprecated functions Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> * revert changes to example Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> * update warning messages Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> * Typos Signed-off-by: Elena Kolevska * fixes linter Signed-off-by: Elena Kolevska * Apply suggestions from code review Signed-off-by: Elena Kolevska * Apply suggestions from code review Signed-off-by: Elena Kolevska --------- Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> Signed-off-by: Elena Kolevska Signed-off-by: Elena Kolevska Co-authored-by: Elena Kolevska Co-authored-by: Elena Kolevska Signed-off-by: Elena Kolevska * Removes support for 3.8 and adds 3.13 to test version matrix (#763) Signed-off-by: Elena Kolevska * Updates dapr email to dapr.io (#764) Signed-off-by: Elena Kolevska * Reverts grpc bump Signed-off-by: Elena Kolevska * Updates protos and fixes grpc-tools for protos generation (#766) * Updates protos and fixes grpc-tools for protos generation Signed-off-by: Elena Kolevska * bumps grpcio tools version Signed-off-by: Elena Kolevska --------- Signed-off-by: Elena Kolevska * Bump dapr/durabletask version Signed-off-by: Elena Kolevska --------- Signed-off-by: Elena Kolevska Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> Signed-off-by: Hannah Hunter Signed-off-by: Eric Searcy Signed-off-by: Lorenzo Curcio Signed-off-by: Lorenzo Curcio Signed-off-by: Elena Kolevska Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Elena Kolevska Co-authored-by: Elena Kolevska Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Co-authored-by: Eric Searcy Co-authored-by: Lorenzo Curcio Co-authored-by: Lorenzo Curcio Signed-off-by: Elena Kolevska --- dev-requirements.txt | 2 +- examples/workflow/monitor.py | 1 + examples/workflow/task_chaining.py | 1 + .../dapr/ext/workflow/dapr_workflow_client.py | 28 ++++++++++-- .../ext/workflow/dapr_workflow_context.py | 4 ++ .../dapr/ext/workflow/workflow_context.py | 5 +++ ext/dapr-ext-workflow/setup.cfg | 2 +- .../tests/test_dapr_workflow_context.py | 8 ++++ .../tests/test_workflow_client.py | 43 +++++++++++++------ setup.cfg | 2 +- 10 files changed, 77 insertions(+), 19 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 15866725..769b1e1e 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -15,4 +15,4 @@ Flask>=1.1 # needed for auto fix ruff===0.2.2 # needed for dapr-ext-workflow -durabletask>=0.1.1a1 +durabletask-dapr >= 0.2.0a4 diff --git a/examples/workflow/monitor.py b/examples/workflow/monitor.py index a6da1c7d..6bdb6cc3 100644 --- a/examples/workflow/monitor.py +++ b/examples/workflow/monitor.py @@ -69,6 +69,7 @@ def send_alert(ctx, message: str): except Exception: pass if not status or status.runtime_status.name != 'RUNNING': + # TODO update to use reuse_id_policy instance_id = wf_client.schedule_new_workflow( workflow=status_monitor_workflow, input=JobStatus(job_id=job_id, is_healthy=True), diff --git a/examples/workflow/task_chaining.py b/examples/workflow/task_chaining.py index c24e340c..c67308d5 100644 --- a/examples/workflow/task_chaining.py +++ b/examples/workflow/task_chaining.py @@ -27,6 +27,7 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): except Exception as e: yield ctx.call_activity(error_handler, input=str(e)) raise + # TODO update to set custom status return [result1, result2, result3] diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index 19f49981..b9865344 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -17,7 +17,9 @@ from datetime import datetime from typing import Any, Optional, TypeVar + from durabletask import client +import durabletask.internal.orchestrator_service_pb2 as pb from dapr.ext.workflow.workflow_state import WorkflowState from dapr.ext.workflow.workflow_context import Workflow @@ -78,6 +80,7 @@ def schedule_new_workflow( input: Optional[TInput] = None, instance_id: Optional[str] = None, start_at: Optional[datetime] = None, + reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None, ) -> str: """Schedules a new workflow instance for execution. @@ -90,6 +93,8 @@ def schedule_new_workflow( start_at: The time when the workflow instance should start executing. If not specified or if a date-time in the past is specified, the workflow instance will be scheduled immediately. + reuse_id_policy: Optional policy to reuse the workflow id when there is a conflict with + an existing workflow instance. Returns: The ID of the scheduled workflow instance. @@ -100,9 +105,14 @@ def schedule_new_workflow( input=input, instance_id=instance_id, start_at=start_at, + reuse_id_policy=reuse_id_policy, ) return self.__obj.schedule_new_orchestration( - workflow.__name__, input=input, instance_id=instance_id, start_at=start_at + workflow.__name__, + input=input, + instance_id=instance_id, + start_at=start_at, + reuse_id_policy=reuse_id_policy, ) def get_workflow_state( @@ -208,7 +218,9 @@ def raise_workflow_event( """ return self.__obj.raise_orchestration_event(instance_id, event_name, data=data) - def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None): + def terminate_workflow( + self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True + ): """Terminates a running workflow instance and updates its runtime status to WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in the task hub. When the task hub worker processes this message, it will update the runtime @@ -226,9 +238,10 @@ def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None): Args: instance_id: The ID of the workflow instance to terminate. output: The optional output to set for the terminated workflow instance. + recursive: The optional flag to terminate all child workflows. """ - return self.__obj.terminate_orchestration(instance_id, output=output) + return self.__obj.terminate_orchestration(instance_id, output=output, recursive=recursive) def pause_workflow(self, instance_id: str): """Suspends a workflow instance, halting processing of it until resume_workflow is used to @@ -246,3 +259,12 @@ def resume_workflow(self, instance_id: str): instance_id: The instance ID of the workflow to resume. """ return self.__obj.resume_orchestration(instance_id) + + def purge_workflow(self, instance_id: str, recursive: bool = True): + """Purge data from a workflow instance. + + Args: + instance_id: The instance ID of the workflow to purge. + recursive: The optional flag to also purge data from all child workflows. + """ + return self.__obj.purge_orchestration(instance_id, recursive) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index dbcccd64..2dee46fe 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -53,6 +53,10 @@ def current_utc_datetime(self) -> datetime: def is_replaying(self) -> bool: return self.__obj.is_replaying + def set_custom_status(self, custom_status: str) -> None: + self._logger.debug(f'{self.instance_id}: Setting custom status to {custom_status}') + self.__obj.set_custom_status(custom_status) + def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task: self._logger.debug(f'{self.instance_id}: Creating timer to fire at {fire_at} time') return self.__obj.create_timer(fire_at) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py index e0e3c736..b4c85f6a 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py @@ -84,6 +84,11 @@ def is_replaying(self) -> bool: """ pass + @abstractmethod + def set_custom_status(self, custom_status: str) -> None: + """Set the custom status.""" + pass + @abstractmethod def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task: """Create a Timer Task to fire after at the specified deadline. diff --git a/ext/dapr-ext-workflow/setup.cfg b/ext/dapr-ext-workflow/setup.cfg index 0cd3949a..e61b94ec 100644 --- a/ext/dapr-ext-workflow/setup.cfg +++ b/ext/dapr-ext-workflow/setup.cfg @@ -25,7 +25,7 @@ packages = find_namespace: include_package_data = True install_requires = dapr-dev >= 1.13.0rc1.dev - durabletask >= 0.1.1a1 + durabletask-dapr >= 0.2.0a4 [options.packages.find] include = diff --git a/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py b/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py index 6b3c9ad3..9fdfe044 100644 --- a/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py @@ -25,11 +25,13 @@ mock_create_timer = 'create_timer' mock_call_activity = 'call_activity' mock_call_sub_orchestrator = 'call_sub_orchestrator' +mock_custom_status = 'custom_status' class FakeOrchestrationContext: def __init__(self): self.instance_id = mock_instance_id + self.custom_status = None def create_timer(self, fire_at): return mock_create_timer @@ -40,6 +42,9 @@ def call_activity(self, activity, input): def call_sub_orchestrator(self, orchestrator, input, instance_id): return mock_call_sub_orchestrator + def set_custom_status(self, custom_status): + self.custom_status = custom_status + class DaprWorkflowContextTest(unittest.TestCase): def mock_client_activity(ctx: WorkflowActivityContext, input): @@ -65,3 +70,6 @@ def test_workflow_context_functions(self): create_timer_result = dapr_wf_ctx.create_timer(mock_date_time) assert create_timer_result == mock_create_timer + + dapr_wf_ctx.set_custom_status(mock_custom_status) + assert fakeContext.custom_status == mock_custom_status diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client.py b/ext/dapr-ext-workflow/tests/test_workflow_client.py index 4a7f93b9..e1c9b772 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client.py @@ -20,17 +20,26 @@ from unittest import mock from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient from durabletask import client +import durabletask.internal.orchestrator_service_pb2 as pb mock_schedule_result = 'workflow001' mock_raise_event_result = 'event001' mock_terminate_result = 'terminate001' mock_suspend_result = 'suspend001' mock_resume_result = 'resume001' -mockInstanceId = 'instance001' +mock_purge_result = 'purge001' +mock_instance_id = 'instance001' class FakeTaskHubGrpcClient: - def schedule_new_orchestration(self, workflow, input, instance_id, start_at): + def schedule_new_orchestration( + self, + workflow, + input, + instance_id, + start_at, + reuse_id_policy: Union[pb.OrchestrationIdReusePolicy, None] = None, + ): return mock_schedule_result def get_orchestration_state(self, instance_id, fetch_payloads): @@ -49,7 +58,9 @@ def raise_orchestration_event( ): return mock_raise_event_result - def terminate_orchestration(self, instance_id: str, *, output: Union[Any, None] = None): + def terminate_orchestration( + self, instance_id: str, *, output: Union[Any, None] = None, recursive: bool = True + ): return mock_terminate_result def suspend_orchestration(self, instance_id: str): @@ -58,6 +69,9 @@ def suspend_orchestration(self, instance_id: str): def resume_orchestration(self, instance_id: str): return mock_resume_result + def purge_orchestration(self, instance_id: str, recursive: bool = True): + return mock_purge_result + def _inner_get_orchestration_state(self, instance_id, state: client.OrchestrationStatus): return client.OrchestrationState( instance_id=instance_id, @@ -87,35 +101,38 @@ def test_client_functions(self): assert actual_schedule_result == mock_schedule_result actual_get_result = wfClient.get_workflow_state( - instance_id=mockInstanceId, fetch_payloads=True + instance_id=mock_instance_id, fetch_payloads=True ) assert actual_get_result.runtime_status.name == 'PENDING' - assert actual_get_result.instance_id == mockInstanceId + assert actual_get_result.instance_id == mock_instance_id actual_wait_start_result = wfClient.wait_for_workflow_start( - instance_id=mockInstanceId, timeout_in_seconds=30 + instance_id=mock_instance_id, timeout_in_seconds=30 ) assert actual_wait_start_result.runtime_status.name == 'RUNNING' - assert actual_wait_start_result.instance_id == mockInstanceId + assert actual_wait_start_result.instance_id == mock_instance_id actual_wait_completion_result = wfClient.wait_for_workflow_completion( - instance_id=mockInstanceId, timeout_in_seconds=30 + instance_id=mock_instance_id, timeout_in_seconds=30 ) assert actual_wait_completion_result.runtime_status.name == 'COMPLETED' - assert actual_wait_completion_result.instance_id == mockInstanceId + assert actual_wait_completion_result.instance_id == mock_instance_id actual_raise_event_result = wfClient.raise_workflow_event( - instance_id=mockInstanceId, event_name='test_event', data='test_data' + instance_id=mock_instance_id, event_name='test_event', data='test_data' ) assert actual_raise_event_result == mock_raise_event_result actual_terminate_result = wfClient.terminate_workflow( - instance_id=mockInstanceId, output='test_output' + instance_id=mock_instance_id, output='test_output' ) assert actual_terminate_result == mock_terminate_result - actual_suspend_result = wfClient.pause_workflow(instance_id=mockInstanceId) + actual_suspend_result = wfClient.pause_workflow(instance_id=mock_instance_id) assert actual_suspend_result == mock_suspend_result - actual_resume_result = wfClient.resume_workflow(instance_id=mockInstanceId) + actual_resume_result = wfClient.resume_workflow(instance_id=mock_instance_id) assert actual_resume_result == mock_resume_result + + actual_purge_result = wfClient.purge_workflow(instance_id=mock_instance_id) + assert actual_purge_result == mock_purge_result diff --git a/setup.cfg b/setup.cfg index dcb6ba8d..de5d53f4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,7 @@ [metadata] url = https://dapr.io/ author = Dapr Authors -author_email = daprweb@microsoft.com +author_email = dapr@dapr.io license = Apache license_file = LICENSE classifiers =