diff --git a/examples/demo_workflow/README.md b/examples/demo_workflow/README.md index eab098ed4..99ccb1cf5 100644 --- a/examples/demo_workflow/README.md +++ b/examples/demo_workflow/README.md @@ -34,6 +34,17 @@ expected_stdout_lines: - "== APP == Hi Counter!" - "== APP == New counter value is: 1!" - "== APP == New counter value is: 11!" + - "== APP == Retry count value is: 0!" + - "== APP == Retry count value is: 1! This print statement verifies retry" + - "== APP == Appending 1 to child_orchestrator_string!" + - "== APP == Appending a to child_orchestrator_string!" + - "== APP == Appending a to child_orchestrator_string!" + - "== APP == Appending 2 to child_orchestrator_string!" + - "== APP == Appending b to child_orchestrator_string!" + - "== APP == Appending b to child_orchestrator_string!" + - "== APP == Appending 3 to child_orchestrator_string!" + - "== APP == Appending c to child_orchestrator_string!" + - "== APP == Appending c to child_orchestrator_string!" - "== APP == Get response from hello_world_wf after pause call: Suspended" - "== APP == Get response from hello_world_wf after resume call: Running" - "== APP == New counter value is: 111!" @@ -56,6 +67,17 @@ You should be able to see the following output: == APP == Hi Counter! == APP == New counter value is: 1! == APP == New counter value is: 11! +== APP == Retry count value is: 0! +== APP == Retry count value is: 1! This print statement verifies retry +== APP == Appending 1 to child_orchestrator_string! +== APP == Appending a to child_orchestrator_string! +== APP == Appending a to child_orchestrator_string! +== APP == Appending 2 to child_orchestrator_string! +== APP == Appending b to child_orchestrator_string! +== APP == Appending b to child_orchestrator_string! +== APP == Appending 3 to child_orchestrator_string! +== APP == Appending c to child_orchestrator_string! +== APP == Appending c to child_orchestrator_string! == APP == Get response from hello_world_wf after pause call: Suspended == APP == Get response from hello_world_wf after resume call: Running == APP == New counter value is: 111! diff --git a/examples/demo_workflow/app.py b/examples/demo_workflow/app.py index edbd1b336..8fe3c25a7 100644 --- a/examples/demo_workflow/app.py +++ b/examples/demo_workflow/app.py @@ -10,8 +10,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import timedelta from time import sleep -from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext +from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext, RetryPolicy from dapr.conf import Settings from dapr.clients import DaprClient from dapr.clients.exceptions import DaprInternalError @@ -19,6 +20,10 @@ settings = Settings() counter = 0 +retry_count = 0 +child_orchestrator_count = 0 +child_orchestrator_string = "" +child_act_retry_count = 0 instance_id = "exampleInstanceID" workflow_component = "dapr" workflow_name = "hello_world_wf" @@ -29,11 +34,20 @@ event_data = "eventData" non_existent_id_error = "no such instance exists" +retry_policy=RetryPolicy(first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=3, + backoff_coefficient=2, + max_retry_interval=timedelta(seconds=10), + retry_timeout=timedelta(seconds=100) + ) + def hello_world_wf(ctx: DaprWorkflowContext, wf_input): print(f'{wf_input}') yield ctx.call_activity(hello_act, input=1) yield ctx.call_activity(hello_act, input=10) + yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy) + yield ctx.call_child_workflow(workflow=child_wf, retry_policy=retry_policy) yield ctx.wait_for_external_event("event1") yield ctx.call_activity(hello_act, input=100) yield ctx.call_activity(hello_act, input=1000) @@ -45,11 +59,46 @@ def hello_act(ctx: WorkflowActivityContext, wf_input): print(f'New counter value is: {counter}!', flush=True) +def hello_retryable_act(ctx: WorkflowActivityContext): + global retry_count + if (retry_count % 2) == 0: + print(f'Retry count value is: {retry_count}!', flush=True) + retry_count += 1 + raise ValueError("Retryable Error") + print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True) + retry_count += 1 + + +def child_wf(ctx: DaprWorkflowContext): + global child_orchestrator_string, child_orchestrator_count + if not ctx.is_replaying: + child_orchestrator_count += 1 + print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True) + child_orchestrator_string += str(child_orchestrator_count) + yield ctx.call_activity(act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy) + if (child_orchestrator_count < 3): + raise ValueError("Retryable Error") + + +def act_for_child_wf(ctx: WorkflowActivityContext, inp): + global child_orchestrator_string, child_act_retry_count + inp_char = chr(96+inp) + print(f'Appending {inp_char} to child_orchestrator_string!', flush=True) + child_orchestrator_string += inp_char + if (child_act_retry_count %2 == 0): + child_act_retry_count += 1 + raise ValueError("Retryable Error") + child_act_retry_count += 1 + + def main(): with DaprClient() as d: workflow_runtime = WorkflowRuntime() workflow_runtime.register_workflow(hello_world_wf) + workflow_runtime.register_workflow(child_wf) workflow_runtime.register_activity(hello_act) + workflow_runtime.register_activity(hello_retryable_act) + workflow_runtime.register_activity(act_for_child_wf) workflow_runtime.start() sleep(2) @@ -62,8 +111,10 @@ def main(): print(f"start_resp {start_resp.instance_id}") # Sleep for a while to let the workflow run - sleep(1) + sleep(12) assert counter == 11 + assert retry_count == 2 + assert child_orchestrator_string == "1aa2bb3cc" # Pause Test d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py b/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py index e9314d444..5a6c144a3 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py @@ -19,6 +19,7 @@ from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext, when_all, when_any from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext from dapr.ext.workflow.workflow_state import WorkflowState, WorkflowStatus +from dapr.ext.workflow.retry_policy import RetryPolicy __all__ = [ 'WorkflowRuntime', @@ -29,5 +30,6 @@ 'WorkflowStatus', 'when_all', 'when_any', - 'alternate_name' + 'alternate_name', + 'RetryPolicy' ] diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index 3f8f9ad8f..402f5e74d 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -21,6 +21,7 @@ from dapr.ext.workflow.workflow_context import WorkflowContext, Workflow from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext from dapr.ext.workflow.logger import LoggerOptions, Logger +from dapr.ext.workflow.retry_policy import RetryPolicy T = TypeVar('T') TInput = TypeVar('TInput') @@ -58,17 +59,22 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task: return self.__obj.create_timer(fire_at) def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TOutput], *, - input: TInput = None) -> task.Task[TOutput]: + input: TInput = None, + retry_policy: Optional[RetryPolicy] = None) -> task.Task[TOutput]: self._logger.debug(f'{self.instance_id}: Creating activity {activity.__name__}') if hasattr(activity, '_dapr_alternate_name'): - return self.__obj.call_activity(activity=activity.__dict__['_dapr_alternate_name'], - input=input) - # this return should ideally never execute - return self.__obj.call_activity(activity=activity.__name__, input=input) + act = activity.__dict__['_dapr_alternate_name'] + else: + # this case should ideally never happen + act = activity.__name__ + if retry_policy is None: + return self.__obj.call_activity(activity=act, input=input) + return self.__obj.call_activity(activity=act, input=input, retry_policy=retry_policy.obj) def call_child_workflow(self, workflow: Workflow, *, - input: Optional[TInput], - instance_id: Optional[str]) -> task.Task[TOutput]: + input: Optional[TInput] = None, + instance_id: Optional[str] = None, + retry_policy: Optional[RetryPolicy] = None) -> task.Task[TOutput]: self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow.__name__}') def wf(ctx: task.OrchestrationContext, inp: TInput): @@ -81,7 +87,10 @@ def wf(ctx: task.OrchestrationContext, inp: TInput): else: # this case should ideally never happen wf.__name__ = workflow.__name__ - return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id) + if retry_policy is None: + return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id) + return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id, + retry_policy=retry_policy.obj) def wait_for_external_event(self, name: str) -> task.Task: self._logger.debug(f'{self.instance_id}: Waiting for external event {name}') diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py b/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py new file mode 100644 index 000000000..82da685d4 --- /dev/null +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from typing import Optional, TypeVar +from datetime import timedelta + +from durabletask import task + +T = TypeVar('T') +TInput = TypeVar('TInput') +TOutput = TypeVar('TOutput') + + +class RetryPolicy: + """Represents the retry policy for a workflow or activity function.""" + + def __init__( + self, *, + first_retry_interval: timedelta, + max_number_of_attempts: int, + backoff_coefficient: Optional[float] = 1.0, + max_retry_interval: Optional[timedelta] = None, + retry_timeout: Optional[timedelta] = None + ): + """Creates a new RetryPolicy instance. + + Args: + first_retry_interval(timedelta): The retry interval to use for the first retry attempt. + max_number_of_attempts(int): The maximum number of retry attempts. + backoff_coefficient(Optional[float]): The backoff coefficient to use for calculating + the next retry interval. + max_retry_interval(Optional[timedelta]): The maximum retry interval to use for any + retry attempt. + retry_timeout(Optional[timedelta]): The maximum amount of time to spend retrying the + operation. + """ + # validate inputs + if first_retry_interval < timedelta(seconds=0): + raise ValueError('first_retry_interval must be >= 0') + if max_number_of_attempts < 1: + raise ValueError('max_number_of_attempts must be >= 1') + if backoff_coefficient is not None and backoff_coefficient < 1: + raise ValueError('backoff_coefficient must be >= 1') + if max_retry_interval is not None and max_retry_interval < timedelta(seconds=0): + raise ValueError('max_retry_interval must be >= 0') + if retry_timeout is not None and retry_timeout < timedelta(seconds=0): + raise ValueError('retry_timeout must be >= 0') + + self._obj = task.RetryPolicy( + first_retry_interval=first_retry_interval, + max_number_of_attempts=max_number_of_attempts, + backoff_coefficient=backoff_coefficient, + max_retry_interval=max_retry_interval, + retry_timeout=retry_timeout + ) + + @property + def obj(self) -> task.RetryPolicy: + """Returns the underlying RetryPolicy object.""" + return self._obj + + @property + def first_retry_interval(self) -> timedelta: + """The retry interval to use for the first retry attempt.""" + return self._obj._first_retry_interval + + @property + def max_number_of_attempts(self) -> int: + """The maximum number of retry attempts.""" + return self._obj._max_number_of_attempts + + @property + def backoff_coefficient(self) -> Optional[float]: + """The backoff coefficient to use for calculating the next retry interval.""" + return self._obj._backoff_coefficient + + @property + def max_retry_interval(self) -> Optional[timedelta]: + """The maximum retry interval to use for any retry attempt.""" + return self._obj._max_retry_interval + + @property + def retry_timeout(self) -> Optional[timedelta]: + """The maximum amount of time to spend retrying the operation.""" + return self._obj._retry_timeout diff --git a/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py b/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py index c228e0099..9ea802f29 100644 --- a/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py @@ -58,7 +58,7 @@ def test_workflow_context_functions(self): assert call_activity_result == mock_call_activity call_sub_orchestrator_result = dapr_wf_ctx.call_child_workflow( - self.mock_client_child_wf, input=None, instance_id=None) + self.mock_client_child_wf) assert call_sub_orchestrator_result == mock_call_sub_orchestrator create_timer_result = dapr_wf_ctx.create_timer(mock_date_time)