Skip to content

Commit

Permalink
workflows: update durabletask dependency (#757)
Browse files Browse the repository at this point in the history
* Bump codecov/codecov-action from 4 to 5 (#753)

      Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 4 to 5.
- [Release notes](https://github.com/codecov/codecov-action/releases)
- [Changelog](https://github.com/codecov/codecov-action/blob/main/CHANGELOG.md)
- [Commits](codecov/codecov-action@v4...v5)

---
updated-dependencies:
- dependency-name: codecov/codecov-action
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

Signed-off-by: Elena Kolevska <[email protected]>

* update durabletask to use fork

Signed-off-by: Fabian Martinez <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* add purge workflow function

Signed-off-by: Fabian Martinez <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* support reuse id policy

Signed-off-by: Fabian Martinez <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* support set custom status

Signed-off-by: Fabian Martinez <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* Update ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Fabian Martinez <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* Update ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Fabian Martinez <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* Update ext/dapr-ext-workflow/tests/test_workflow_client.py

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Fabian Martinez <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* Update ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Fabian Martinez <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* update test, grpc version and lint

Signed-off-by: Fabian Martinez <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* Adds missing arguments in FakeTaskHubGrpcClient

Signed-off-by: Elena Kolevska <[email protected]>

* linter

Signed-off-by: Elena Kolevska <[email protected]>

* remove alpha for workflow stable release (#760)

Signed-off-by: Hannah Hunter <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* Replace deprecated tox.ini option (#762)

This option was replaced in 2020, deprecated, and eventually removed in
tox 4. The correct option already appears elseware in this tox.ini file.

This fix is necessary to run `tox -e doc` per the README.md
instructions on tox 4.

Signed-off-by: Eric Searcy <[email protected]>
Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* Add Actor Mocks (#750)

* Moved files to new branch to avoid weird git bug

Signed-off-by: Lorenzo Curcio <[email protected]>

* requested documentation changes

Signed-off-by: Lorenzo Curcio <[email protected]>

* forgot to move file back to starting point

Signed-off-by: Lorenzo Curcio <[email protected]>

* result of ruff format

Signed-off-by: Lorenzo Curcio <[email protected]>

* fixed minor formatting issues, fixed type issues

Signed-off-by: Lorenzo Curcio <[email protected]>

* minor test fix

Signed-off-by: Lorenzo Curcio <[email protected]>

* fixes try_add_state

Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>

* Revert "fixes try_add_state"

This reverts commit 254ad17.

Signed-off-by: Lorenzo Curcio <[email protected]>

* Update dapr/actor/runtime/mock_state_manager.py

Fixing bug in try_add_state as mentioned in PR #756

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>

* Update dapr/actor/runtime/mock_actor.py

Whoops missed this

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>

* Update daprdocs/content/en/python-sdk-docs/python-actor.md

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>

* Update daprdocs/content/en/python-sdk-docs/python-actor.md

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>

* Update daprdocs/content/en/python-sdk-docs/python-actor.md

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>

* Update daprdocs/content/en/python-sdk-docs/python-actor.md

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>

* Update daprdocs/content/en/python-sdk-docs/python-actor.md

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>

* Update daprdocs/content/en/python-sdk-docs/python-actor.md

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>

* Update daprdocs/content/en/python-sdk-docs/python-actor.md

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>

* minor error in docs

Signed-off-by: Lorenzo Curcio <[email protected]>

* fixed and added more unit tests. Added example

Signed-off-by: Lorenzo Curcio <[email protected]>

* unittest fix

Signed-off-by: Lorenzo Curcio <[email protected]>

* Update examples/demo_actor/README.md

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>

* concentrated some tests

Signed-off-by: Lorenzo Curcio <[email protected]>

* removed unnecessary type hint

Signed-off-by: Lorenzo Curcio <[email protected]>

* Update daprdocs/content/en/python-sdk-docs/python-actor.md

didnt see this earlier whoops

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>

* Update examples/demo_actor/README.md

Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>

* documentation changes

Signed-off-by: Lorenzo Curcio <[email protected]>

* now requires #type: ignore

Signed-off-by: Lorenzo Curcio <[email protected]>

* small docs change

Signed-off-by: Elena Kolevska <[email protected]>

* examples test fix

Signed-off-by: Elena Kolevska <[email protected]>

---------

Signed-off-by: Lorenzo Curcio <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Co-authored-by: Elena Kolevska <[email protected]>
Co-authored-by: Lorenzo Curcio <[email protected]>
Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* Fixes try_add_state in actor state manger (#756)

Signed-off-by: Elena Kolevska <[email protected]>

* Integration test for http invocation (#758)

Signed-off-by: Elena Kolevska <[email protected]>

* fixes missing state store in test (#759)

Signed-off-by: Elena Kolevska <[email protected]>

* Mark workflows API functions as deprecated  (#749)

* workflows, remove deprecated functions

Signed-off-by: Fabian Martinez <[email protected]>

* revert changes to example

Signed-off-by: Fabian Martinez <[email protected]>

* update warning messages

Signed-off-by: Fabian Martinez <[email protected]>

* Typos

Signed-off-by: Elena Kolevska <[email protected]>

* fixes linter

Signed-off-by: Elena Kolevska <[email protected]>

* Apply suggestions from code review

Signed-off-by: Elena Kolevska <[email protected]>

* Apply suggestions from code review

Signed-off-by: Elena Kolevska <[email protected]>

---------

Signed-off-by: Fabian Martinez <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Co-authored-by: Elena Kolevska <[email protected]>
Co-authored-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* Removes support for 3.8 and adds 3.13 to test version matrix (#763)

Signed-off-by: Elena Kolevska <[email protected]>

* Updates dapr email to dapr.io (#764)

Signed-off-by: Elena Kolevska <[email protected]>

* Reverts grpc bump

Signed-off-by: Elena Kolevska <[email protected]>

* Updates protos and fixes grpc-tools for protos generation (#766)

* Updates protos and fixes grpc-tools for protos generation

Signed-off-by: Elena Kolevska <[email protected]>

* bumps grpcio tools version

Signed-off-by: Elena Kolevska <[email protected]>

---------

Signed-off-by: Elena Kolevska <[email protected]>

* Bump dapr/durabletask version

Signed-off-by: Elena Kolevska <[email protected]>

---------

Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Fabian Martinez <[email protected]>
Signed-off-by: Hannah Hunter <[email protected]>
Signed-off-by: Eric Searcy <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>
Signed-off-by: Lorenzo Curcio <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Elena Kolevska <[email protected]>
Co-authored-by: Elena Kolevska <[email protected]>
Co-authored-by: Hannah Hunter <[email protected]>
Co-authored-by: Eric Searcy <[email protected]>
Co-authored-by: Lorenzo Curcio <[email protected]>
Co-authored-by: Lorenzo Curcio <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
  • Loading branch information
8 people committed Jan 14, 2025
1 parent 28df345 commit 58dbd8b
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 19 deletions.
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ Flask>=1.1
# needed for auto fix
ruff===0.2.2
# needed for dapr-ext-workflow
durabletask>=0.1.1a1
durabletask-dapr >= 0.2.0a4
1 change: 1 addition & 0 deletions examples/workflow/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def send_alert(ctx, message: str):
except Exception:
pass
if not status or status.runtime_status.name != 'RUNNING':
# TODO update to use reuse_id_policy
instance_id = wf_client.schedule_new_workflow(
workflow=status_monitor_workflow,
input=JobStatus(job_id=job_id, is_healthy=True),
Expand Down
1 change: 1 addition & 0 deletions examples/workflow/task_chaining.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
# TODO update to set custom status
return [result1, result2, result3]


Expand Down
28 changes: 25 additions & 3 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from datetime import datetime
from typing import Any, Optional, TypeVar


from durabletask import client
import durabletask.internal.orchestrator_service_pb2 as pb

from dapr.ext.workflow.workflow_state import WorkflowState
from dapr.ext.workflow.workflow_context import Workflow
Expand Down Expand Up @@ -78,6 +80,7 @@ def schedule_new_workflow(
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None,
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
) -> str:
"""Schedules a new workflow instance for execution.
Expand All @@ -90,6 +93,8 @@ def schedule_new_workflow(
start_at: The time when the workflow instance should start executing.
If not specified or if a date-time in the past is specified, the workflow instance will
be scheduled immediately.
reuse_id_policy: Optional policy to reuse the workflow id when there is a conflict with
an existing workflow instance.
Returns:
The ID of the scheduled workflow instance.
Expand All @@ -100,9 +105,14 @@ def schedule_new_workflow(
input=input,
instance_id=instance_id,
start_at=start_at,
reuse_id_policy=reuse_id_policy,
)
return self.__obj.schedule_new_orchestration(
workflow.__name__, input=input, instance_id=instance_id, start_at=start_at
workflow.__name__,
input=input,
instance_id=instance_id,
start_at=start_at,
reuse_id_policy=reuse_id_policy,
)

def get_workflow_state(
Expand Down Expand Up @@ -208,7 +218,9 @@ def raise_workflow_event(
"""
return self.__obj.raise_orchestration_event(instance_id, event_name, data=data)

def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
def terminate_workflow(
self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True
):
"""Terminates a running workflow instance and updates its runtime status to
WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in
the task hub. When the task hub worker processes this message, it will update the runtime
Expand All @@ -226,9 +238,10 @@ def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
Args:
instance_id: The ID of the workflow instance to terminate.
output: The optional output to set for the terminated workflow instance.
recursive: The optional flag to terminate all child workflows.
"""
return self.__obj.terminate_orchestration(instance_id, output=output)
return self.__obj.terminate_orchestration(instance_id, output=output, recursive=recursive)

def pause_workflow(self, instance_id: str):
"""Suspends a workflow instance, halting processing of it until resume_workflow is used to
Expand All @@ -246,3 +259,12 @@ def resume_workflow(self, instance_id: str):
instance_id: The instance ID of the workflow to resume.
"""
return self.__obj.resume_orchestration(instance_id)

def purge_workflow(self, instance_id: str, recursive: bool = True):
"""Purge data from a workflow instance.
Args:
instance_id: The instance ID of the workflow to purge.
recursive: The optional flag to also purge data from all child workflows.
"""
return self.__obj.purge_orchestration(instance_id, recursive)
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def current_utc_datetime(self) -> datetime:
def is_replaying(self) -> bool:
return self.__obj.is_replaying

def set_custom_status(self, custom_status: str) -> None:
self._logger.debug(f'{self.instance_id}: Setting custom status to {custom_status}')
self.__obj.set_custom_status(custom_status)

def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
self._logger.debug(f'{self.instance_id}: Creating timer to fire at {fire_at} time')
return self.__obj.create_timer(fire_at)
Expand Down
5 changes: 5 additions & 0 deletions ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ def is_replaying(self) -> bool:
"""
pass

@abstractmethod
def set_custom_status(self, custom_status: str) -> None:
"""Set the custom status."""
pass

@abstractmethod
def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
"""Create a Timer Task to fire after at the specified deadline.
Expand Down
2 changes: 1 addition & 1 deletion ext/dapr-ext-workflow/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ packages = find_namespace:
include_package_data = True
install_requires =
dapr-dev >= 1.13.0rc1.dev
durabletask >= 0.1.1a1
durabletask-dapr >= 0.2.0a4

[options.packages.find]
include =
Expand Down
8 changes: 8 additions & 0 deletions ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
mock_create_timer = 'create_timer'
mock_call_activity = 'call_activity'
mock_call_sub_orchestrator = 'call_sub_orchestrator'
mock_custom_status = 'custom_status'


class FakeOrchestrationContext:
def __init__(self):
self.instance_id = mock_instance_id
self.custom_status = None

def create_timer(self, fire_at):
return mock_create_timer
Expand All @@ -40,6 +42,9 @@ def call_activity(self, activity, input):
def call_sub_orchestrator(self, orchestrator, input, instance_id):
return mock_call_sub_orchestrator

def set_custom_status(self, custom_status):
self.custom_status = custom_status


class DaprWorkflowContextTest(unittest.TestCase):
def mock_client_activity(ctx: WorkflowActivityContext, input):
Expand All @@ -65,3 +70,6 @@ def test_workflow_context_functions(self):

create_timer_result = dapr_wf_ctx.create_timer(mock_date_time)
assert create_timer_result == mock_create_timer

dapr_wf_ctx.set_custom_status(mock_custom_status)
assert fakeContext.custom_status == mock_custom_status
43 changes: 30 additions & 13 deletions ext/dapr-ext-workflow/tests/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,26 @@
from unittest import mock
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
from durabletask import client
import durabletask.internal.orchestrator_service_pb2 as pb

mock_schedule_result = 'workflow001'
mock_raise_event_result = 'event001'
mock_terminate_result = 'terminate001'
mock_suspend_result = 'suspend001'
mock_resume_result = 'resume001'
mockInstanceId = 'instance001'
mock_purge_result = 'purge001'
mock_instance_id = 'instance001'


class FakeTaskHubGrpcClient:
def schedule_new_orchestration(self, workflow, input, instance_id, start_at):
def schedule_new_orchestration(
self,
workflow,
input,
instance_id,
start_at,
reuse_id_policy: Union[pb.OrchestrationIdReusePolicy, None] = None,
):
return mock_schedule_result

def get_orchestration_state(self, instance_id, fetch_payloads):
Expand All @@ -49,7 +58,9 @@ def raise_orchestration_event(
):
return mock_raise_event_result

def terminate_orchestration(self, instance_id: str, *, output: Union[Any, None] = None):
def terminate_orchestration(
self, instance_id: str, *, output: Union[Any, None] = None, recursive: bool = True
):
return mock_terminate_result

def suspend_orchestration(self, instance_id: str):
Expand All @@ -58,6 +69,9 @@ def suspend_orchestration(self, instance_id: str):
def resume_orchestration(self, instance_id: str):
return mock_resume_result

def purge_orchestration(self, instance_id: str, recursive: bool = True):
return mock_purge_result

def _inner_get_orchestration_state(self, instance_id, state: client.OrchestrationStatus):
return client.OrchestrationState(
instance_id=instance_id,
Expand Down Expand Up @@ -87,35 +101,38 @@ def test_client_functions(self):
assert actual_schedule_result == mock_schedule_result

actual_get_result = wfClient.get_workflow_state(
instance_id=mockInstanceId, fetch_payloads=True
instance_id=mock_instance_id, fetch_payloads=True
)
assert actual_get_result.runtime_status.name == 'PENDING'
assert actual_get_result.instance_id == mockInstanceId
assert actual_get_result.instance_id == mock_instance_id

actual_wait_start_result = wfClient.wait_for_workflow_start(
instance_id=mockInstanceId, timeout_in_seconds=30
instance_id=mock_instance_id, timeout_in_seconds=30
)
assert actual_wait_start_result.runtime_status.name == 'RUNNING'
assert actual_wait_start_result.instance_id == mockInstanceId
assert actual_wait_start_result.instance_id == mock_instance_id

actual_wait_completion_result = wfClient.wait_for_workflow_completion(
instance_id=mockInstanceId, timeout_in_seconds=30
instance_id=mock_instance_id, timeout_in_seconds=30
)
assert actual_wait_completion_result.runtime_status.name == 'COMPLETED'
assert actual_wait_completion_result.instance_id == mockInstanceId
assert actual_wait_completion_result.instance_id == mock_instance_id

actual_raise_event_result = wfClient.raise_workflow_event(
instance_id=mockInstanceId, event_name='test_event', data='test_data'
instance_id=mock_instance_id, event_name='test_event', data='test_data'
)
assert actual_raise_event_result == mock_raise_event_result

actual_terminate_result = wfClient.terminate_workflow(
instance_id=mockInstanceId, output='test_output'
instance_id=mock_instance_id, output='test_output'
)
assert actual_terminate_result == mock_terminate_result

actual_suspend_result = wfClient.pause_workflow(instance_id=mockInstanceId)
actual_suspend_result = wfClient.pause_workflow(instance_id=mock_instance_id)
assert actual_suspend_result == mock_suspend_result

actual_resume_result = wfClient.resume_workflow(instance_id=mockInstanceId)
actual_resume_result = wfClient.resume_workflow(instance_id=mock_instance_id)
assert actual_resume_result == mock_resume_result

actual_purge_result = wfClient.purge_workflow(instance_id=mock_instance_id)
assert actual_purge_result == mock_purge_result
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[metadata]
url = https://dapr.io/
author = Dapr Authors
author_email = [email protected]
author_email = [email protected]
license = Apache
license_file = LICENSE
classifiers =
Expand Down

0 comments on commit 58dbd8b

Please sign in to comment.