From b0c0524ffb19abcf76b424ca434c91cb346f6c27 Mon Sep 17 00:00:00 2001 From: Deepanshu Agarwal Date: Fri, 12 Jan 2024 00:38:41 +0530 Subject: [PATCH 1/7] Add retry policy wrapper in dapr Signed-off-by: Deepanshu Agarwal --- .../dapr/ext/workflow/__init__.py | 4 +- .../ext/workflow/dapr_workflow_context.py | 19 +++- .../dapr/ext/workflow/retry_policy.py | 92 +++++++++++++++++++ 3 files changed, 109 insertions(+), 6 deletions(-) create mode 100644 ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py b/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py index e9314d44..5a6c144a 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py @@ -19,6 +19,7 @@ from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext, when_all, when_any from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext from dapr.ext.workflow.workflow_state import WorkflowState, WorkflowStatus +from dapr.ext.workflow.retry_policy import RetryPolicy __all__ = [ 'WorkflowRuntime', @@ -29,5 +30,6 @@ 'WorkflowStatus', 'when_all', 'when_any', - 'alternate_name' + 'alternate_name', + 'RetryPolicy' ] diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index 3f8f9ad8..c60c3453 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -21,6 +21,7 @@ from dapr.ext.workflow.workflow_context import WorkflowContext, Workflow from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext from dapr.ext.workflow.logger import LoggerOptions, Logger +from dapr.ext.workflow.retry_policy import RetryPolicy T = TypeVar('T') TInput = TypeVar('TInput') @@ -28,7 +29,7 @@ class DaprWorkflowContext(WorkflowContext): - """DaprWorkflowContext that provides proxy access to internal OrchestrationContext instance.""" + """DaprWorkflowContext1 that provides proxy access to internal OrchestrationContext instance.""" def __init__( self, @@ -58,17 +59,22 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task: return self.__obj.create_timer(fire_at) def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TOutput], *, - input: TInput = None) -> task.Task[TOutput]: + input: TInput = None, + retry_policy: Optional[RetryPolicy] = None) -> task.Task[TOutput]: self._logger.debug(f'{self.instance_id}: Creating activity {activity.__name__}') if hasattr(activity, '_dapr_alternate_name'): + if retry_policy is None: + return self.__obj.call_activity(activity=activity.__dict__['_dapr_alternate_name'], + input=input) return self.__obj.call_activity(activity=activity.__dict__['_dapr_alternate_name'], - input=input) + input=input, retry_policy=retry_policy.obj) # this return should ideally never execute return self.__obj.call_activity(activity=activity.__name__, input=input) def call_child_workflow(self, workflow: Workflow, *, input: Optional[TInput], - instance_id: Optional[str]) -> task.Task[TOutput]: + instance_id: Optional[str], + retry_policy: Optional[RetryPolicy] = None) -> task.Task[TOutput]: self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow.__name__}') def wf(ctx: task.OrchestrationContext, inp: TInput): @@ -81,7 +87,10 @@ def wf(ctx: task.OrchestrationContext, inp: TInput): else: # this case should ideally never happen wf.__name__ = workflow.__name__ - return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id) + if retry_policy is None: + return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id) + return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id, + retry_policy=retry_policy.obj) def wait_for_external_event(self, name: str) -> task.Task: self._logger.debug(f'{self.instance_id}: Waiting for external event {name}') diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py b/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py new file mode 100644 index 00000000..78280a31 --- /dev/null +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from typing import Optional, TypeVar +from datetime import timedelta + +from durabletask import task + +T = TypeVar('T') +TInput = TypeVar('TInput') +TOutput = TypeVar('TOutput') + + +class RetryPolicy: + """Represents the retry policy for an orchestration or activity function.""" + + def __init__(self, *, + first_retry_interval: timedelta, + max_number_of_attempts: int, + backoff_coefficient: Optional[float] = 1.0, + max_retry_interval: Optional[timedelta] = None, + retry_timeout: Optional[timedelta] = None): + """Creates a new RetryPolicy instance. + Parameters + ---------- + first_retry_interval : timedelta + The retry interval to use for the first retry attempt. + max_number_of_attempts : int + The maximum number of retry attempts. + backoff_coefficient : Optional[float] + The backoff coefficient to use for calculating the next retry interval. + max_retry_interval : Optional[timedelta] + The maximum retry interval to use for any retry attempt. + retry_timeout : Optional[timedelta] + The maximum amount of time to spend retrying the operation. + """ + # validate inputs + if first_retry_interval < timedelta(seconds=0): + raise ValueError('first_retry_interval must be >= 0') + if max_number_of_attempts < 1: + raise ValueError('max_number_of_attempts must be >= 1') + if backoff_coefficient is not None and backoff_coefficient < 1: + raise ValueError('backoff_coefficient must be >= 1') + if max_retry_interval is not None and max_retry_interval < timedelta(seconds=0): + raise ValueError('max_retry_interval must be >= 0') + if retry_timeout is not None and retry_timeout < timedelta(seconds=0): + raise ValueError('retry_timeout must be >= 0') + + self.obj = task.RetryPolicy( + first_retry_interval=first_retry_interval, + max_number_of_attempts=max_number_of_attempts, + backoff_coefficient=backoff_coefficient, + max_retry_interval=max_retry_interval, + retry_timeout=retry_timeout + ) + + @property + def first_retry_interval(self) -> timedelta: + """The retry interval to use for the first retry attempt.""" + return self.obj._first_retry_interval + + @property + def max_number_of_attempts(self) -> int: + """The maximum number of retry attempts.""" + return self.obj._max_number_of_attempts + + @property + def backoff_coefficient(self) -> Optional[float]: + """The backoff coefficient to use for calculating the next retry interval.""" + return self.obj._backoff_coefficient + + @property + def max_retry_interval(self) -> Optional[timedelta]: + """The maximum retry interval to use for any retry attempt.""" + return self.obj._max_retry_interval + + @property + def retry_timeout(self) -> Optional[timedelta]: + """The maximum amount of time to spend retrying the operation.""" + return self.obj._retry_timeout From 5c444f1712c8bcf89c16f7aba5fc4eaf11994ebb Mon Sep 17 00:00:00 2001 From: Deepanshu Agarwal Date: Fri, 12 Jan 2024 00:40:38 +0530 Subject: [PATCH 2/7] correct Signed-off-by: Deepanshu Agarwal --- .../dapr/ext/workflow/dapr_workflow_context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index c60c3453..e8b4b581 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -29,7 +29,7 @@ class DaprWorkflowContext(WorkflowContext): - """DaprWorkflowContext1 that provides proxy access to internal OrchestrationContext instance.""" + """DaprWorkflowContext that provides proxy access to internal OrchestrationContext instance.""" def __init__( self, From ef0684025018ed9d6f7f69a5659356568cd0c7e5 Mon Sep 17 00:00:00 2001 From: Deepanshu Agarwal Date: Fri, 12 Jan 2024 01:06:31 +0530 Subject: [PATCH 3/7] test Signed-off-by: Deepanshu Agarwal --- examples/demo_workflow/README.md | 4 ++++ examples/demo_workflow/app.py | 23 +++++++++++++++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/examples/demo_workflow/README.md b/examples/demo_workflow/README.md index eab098ed..491fae95 100644 --- a/examples/demo_workflow/README.md +++ b/examples/demo_workflow/README.md @@ -34,6 +34,8 @@ expected_stdout_lines: - "== APP == Hi Counter!" - "== APP == New counter value is: 1!" - "== APP == New counter value is: 11!" + - "== APP == Retry count value is: 0!" + - "== APP == Retry count value is: 1! This print statement verifies retry" - "== APP == Get response from hello_world_wf after pause call: Suspended" - "== APP == Get response from hello_world_wf after resume call: Running" - "== APP == New counter value is: 111!" @@ -56,6 +58,8 @@ You should be able to see the following output: == APP == Hi Counter! == APP == New counter value is: 1! == APP == New counter value is: 11! +== APP == Retry count value is: 0! +== APP == Retry count value is: 1! This print statement verifies retry == APP == Get response from hello_world_wf after pause call: Suspended == APP == Get response from hello_world_wf after resume call: Running == APP == New counter value is: 111! diff --git a/examples/demo_workflow/app.py b/examples/demo_workflow/app.py index edbd1b33..31d98463 100644 --- a/examples/demo_workflow/app.py +++ b/examples/demo_workflow/app.py @@ -10,8 +10,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import timedelta from time import sleep -from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext +from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext, RetryPolicy from dapr.conf import Settings from dapr.clients import DaprClient from dapr.clients.exceptions import DaprInternalError @@ -19,6 +20,7 @@ settings = Settings() counter = 0 +retry_count = 0 instance_id = "exampleInstanceID" workflow_component = "dapr" workflow_name = "hello_world_wf" @@ -32,8 +34,15 @@ def hello_world_wf(ctx: DaprWorkflowContext, wf_input): print(f'{wf_input}') + retry_policy=RetryPolicy(first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=3, + backoff_coefficient=2, + max_retry_interval=timedelta(seconds=10), + retry_timeout=timedelta(seconds=100) + ) yield ctx.call_activity(hello_act, input=1) yield ctx.call_activity(hello_act, input=10) + yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy) yield ctx.wait_for_external_event("event1") yield ctx.call_activity(hello_act, input=100) yield ctx.call_activity(hello_act, input=1000) @@ -44,6 +53,15 @@ def hello_act(ctx: WorkflowActivityContext, wf_input): counter += wf_input print(f'New counter value is: {counter}!', flush=True) +def hello_retryable_act(ctx: WorkflowActivityContext, wf_input): + global retry_count + if (retry_count >> 1) == 0: + print(f'Retry count value is: {retry_count}!', flush=True) + retry_count += 1 + raise ValueError("Retryable Error") + print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True) + retry_count += 1 + def main(): with DaprClient() as d: @@ -62,8 +80,9 @@ def main(): print(f"start_resp {start_resp.instance_id}") # Sleep for a while to let the workflow run - sleep(1) + sleep(2) assert counter == 11 + assert retry_count == 1 # Pause Test d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component) From 4881aa42d9b723b47e2ddeae0544589f0a25c840 Mon Sep 17 00:00:00 2001 From: Deepanshu Agarwal Date: Fri, 12 Jan 2024 01:41:32 +0530 Subject: [PATCH 4/7] test run Signed-off-by: Deepanshu Agarwal --- examples/demo_workflow/app.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/demo_workflow/app.py b/examples/demo_workflow/app.py index 31d98463..6eb8bebb 100644 --- a/examples/demo_workflow/app.py +++ b/examples/demo_workflow/app.py @@ -53,9 +53,9 @@ def hello_act(ctx: WorkflowActivityContext, wf_input): counter += wf_input print(f'New counter value is: {counter}!', flush=True) -def hello_retryable_act(ctx: WorkflowActivityContext, wf_input): +def hello_retryable_act(ctx: WorkflowActivityContext): global retry_count - if (retry_count >> 1) == 0: + if (retry_count % 2) == 0: print(f'Retry count value is: {retry_count}!', flush=True) retry_count += 1 raise ValueError("Retryable Error") @@ -68,6 +68,7 @@ def main(): workflow_runtime = WorkflowRuntime() workflow_runtime.register_workflow(hello_world_wf) workflow_runtime.register_activity(hello_act) + workflow_runtime.register_activity(hello_retryable_act) workflow_runtime.start() sleep(2) @@ -82,7 +83,7 @@ def main(): # Sleep for a while to let the workflow run sleep(2) assert counter == 11 - assert retry_count == 1 + assert retry_count == 2 # Pause Test d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component) From b2e3bc71853af8d536c743210f277cf77a5f2611 Mon Sep 17 00:00:00 2001 From: Deepanshu Agarwal Date: Fri, 12 Jan 2024 02:05:07 +0530 Subject: [PATCH 5/7] correct name usage Signed-off-by: Deepanshu Agarwal --- ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py b/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py index 78280a31..c3541ca7 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py @@ -24,7 +24,7 @@ class RetryPolicy: - """Represents the retry policy for an orchestration or activity function.""" + """Represents the retry policy for a workflow or activity function.""" def __init__(self, *, first_retry_interval: timedelta, From 2fc851907a3258594fdf7f906af7403c48ca462b Mon Sep 17 00:00:00 2001 From: Deepanshu Agarwal Date: Tue, 16 Jan 2024 18:50:05 +0530 Subject: [PATCH 6/7] Update example for child wf retry and other review comments Signed-off-by: Deepanshu Agarwal --- examples/demo_workflow/README.md | 9 ++++ examples/demo_workflow/app.py | 43 ++++++++++++--- .../ext/workflow/dapr_workflow_context.py | 18 +++---- .../dapr/ext/workflow/retry_policy.py | 53 ++++++++++--------- .../tests/test_dapr_workflow_context.py | 2 +- 5 files changed, 85 insertions(+), 40 deletions(-) diff --git a/examples/demo_workflow/README.md b/examples/demo_workflow/README.md index 491fae95..317629be 100644 --- a/examples/demo_workflow/README.md +++ b/examples/demo_workflow/README.md @@ -36,6 +36,15 @@ expected_stdout_lines: - "== APP == New counter value is: 11!" - "== APP == Retry count value is: 0!" - "== APP == Retry count value is: 1! This print statement verifies retry" + - "== APP == Appending 1 to child_orchestrator_string!" + - "== APP == Appending a to child_orchestrator_string!" + - "== APP == Appending a to child_orchestrator_string!" + - "== APP == Appending 2 to child_orchestrator_string!" + - "== APP == Appending b to child_orchestrator_string!" + - "== APP == Appending b to child_orchestrator_string!" + - "== APP == Appending 3 to child_orchestrator_string!" + - "== APP == Appending c to child_orchestrator_string!" + - "== APP == Appending c to child_orchestrator_string!" - "== APP == Get response from hello_world_wf after pause call: Suspended" - "== APP == Get response from hello_world_wf after resume call: Running" - "== APP == New counter value is: 111!" diff --git a/examples/demo_workflow/app.py b/examples/demo_workflow/app.py index 6eb8bebb..8fe3c25a 100644 --- a/examples/demo_workflow/app.py +++ b/examples/demo_workflow/app.py @@ -21,6 +21,9 @@ counter = 0 retry_count = 0 +child_orchestrator_count = 0 +child_orchestrator_string = "" +child_act_retry_count = 0 instance_id = "exampleInstanceID" workflow_component = "dapr" workflow_name = "hello_world_wf" @@ -31,18 +34,20 @@ event_data = "eventData" non_existent_id_error = "no such instance exists" - -def hello_world_wf(ctx: DaprWorkflowContext, wf_input): - print(f'{wf_input}') - retry_policy=RetryPolicy(first_retry_interval=timedelta(seconds=1), +retry_policy=RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=3, backoff_coefficient=2, max_retry_interval=timedelta(seconds=10), retry_timeout=timedelta(seconds=100) - ) + ) + + +def hello_world_wf(ctx: DaprWorkflowContext, wf_input): + print(f'{wf_input}') yield ctx.call_activity(hello_act, input=1) yield ctx.call_activity(hello_act, input=10) yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy) + yield ctx.call_child_workflow(workflow=child_wf, retry_policy=retry_policy) yield ctx.wait_for_external_event("event1") yield ctx.call_activity(hello_act, input=100) yield ctx.call_activity(hello_act, input=1000) @@ -53,6 +58,7 @@ def hello_act(ctx: WorkflowActivityContext, wf_input): counter += wf_input print(f'New counter value is: {counter}!', flush=True) + def hello_retryable_act(ctx: WorkflowActivityContext): global retry_count if (retry_count % 2) == 0: @@ -63,12 +69,36 @@ def hello_retryable_act(ctx: WorkflowActivityContext): retry_count += 1 +def child_wf(ctx: DaprWorkflowContext): + global child_orchestrator_string, child_orchestrator_count + if not ctx.is_replaying: + child_orchestrator_count += 1 + print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True) + child_orchestrator_string += str(child_orchestrator_count) + yield ctx.call_activity(act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy) + if (child_orchestrator_count < 3): + raise ValueError("Retryable Error") + + +def act_for_child_wf(ctx: WorkflowActivityContext, inp): + global child_orchestrator_string, child_act_retry_count + inp_char = chr(96+inp) + print(f'Appending {inp_char} to child_orchestrator_string!', flush=True) + child_orchestrator_string += inp_char + if (child_act_retry_count %2 == 0): + child_act_retry_count += 1 + raise ValueError("Retryable Error") + child_act_retry_count += 1 + + def main(): with DaprClient() as d: workflow_runtime = WorkflowRuntime() workflow_runtime.register_workflow(hello_world_wf) + workflow_runtime.register_workflow(child_wf) workflow_runtime.register_activity(hello_act) workflow_runtime.register_activity(hello_retryable_act) + workflow_runtime.register_activity(act_for_child_wf) workflow_runtime.start() sleep(2) @@ -81,9 +111,10 @@ def main(): print(f"start_resp {start_resp.instance_id}") # Sleep for a while to let the workflow run - sleep(2) + sleep(12) assert counter == 11 assert retry_count == 2 + assert child_orchestrator_string == "1aa2bb3cc" # Pause Test d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index e8b4b581..d3af3ad6 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -63,17 +63,17 @@ def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TO retry_policy: Optional[RetryPolicy] = None) -> task.Task[TOutput]: self._logger.debug(f'{self.instance_id}: Creating activity {activity.__name__}') if hasattr(activity, '_dapr_alternate_name'): - if retry_policy is None: - return self.__obj.call_activity(activity=activity.__dict__['_dapr_alternate_name'], - input=input) - return self.__obj.call_activity(activity=activity.__dict__['_dapr_alternate_name'], - input=input, retry_policy=retry_policy.obj) - # this return should ideally never execute - return self.__obj.call_activity(activity=activity.__name__, input=input) + act=activity.__dict__['_dapr_alternate_name'] + else: + # this case should ideally never happen + act=activity.__name__ + if retry_policy is None: + return self.__obj.call_activity(activity=act, input=input) + return self.__obj.call_activity(activity=act, input=input, retry_policy=retry_policy.obj) def call_child_workflow(self, workflow: Workflow, *, - input: Optional[TInput], - instance_id: Optional[str], + input: Optional[TInput] = None, + instance_id: Optional[str] = None, retry_policy: Optional[RetryPolicy] = None) -> task.Task[TOutput]: self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow.__name__}') diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py b/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py index c3541ca7..82da685d 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py @@ -26,25 +26,25 @@ class RetryPolicy: """Represents the retry policy for a workflow or activity function.""" - def __init__(self, *, - first_retry_interval: timedelta, - max_number_of_attempts: int, - backoff_coefficient: Optional[float] = 1.0, - max_retry_interval: Optional[timedelta] = None, - retry_timeout: Optional[timedelta] = None): + def __init__( + self, *, + first_retry_interval: timedelta, + max_number_of_attempts: int, + backoff_coefficient: Optional[float] = 1.0, + max_retry_interval: Optional[timedelta] = None, + retry_timeout: Optional[timedelta] = None + ): """Creates a new RetryPolicy instance. - Parameters - ---------- - first_retry_interval : timedelta - The retry interval to use for the first retry attempt. - max_number_of_attempts : int - The maximum number of retry attempts. - backoff_coefficient : Optional[float] - The backoff coefficient to use for calculating the next retry interval. - max_retry_interval : Optional[timedelta] - The maximum retry interval to use for any retry attempt. - retry_timeout : Optional[timedelta] - The maximum amount of time to spend retrying the operation. + + Args: + first_retry_interval(timedelta): The retry interval to use for the first retry attempt. + max_number_of_attempts(int): The maximum number of retry attempts. + backoff_coefficient(Optional[float]): The backoff coefficient to use for calculating + the next retry interval. + max_retry_interval(Optional[timedelta]): The maximum retry interval to use for any + retry attempt. + retry_timeout(Optional[timedelta]): The maximum amount of time to spend retrying the + operation. """ # validate inputs if first_retry_interval < timedelta(seconds=0): @@ -58,7 +58,7 @@ def __init__(self, *, if retry_timeout is not None and retry_timeout < timedelta(seconds=0): raise ValueError('retry_timeout must be >= 0') - self.obj = task.RetryPolicy( + self._obj = task.RetryPolicy( first_retry_interval=first_retry_interval, max_number_of_attempts=max_number_of_attempts, backoff_coefficient=backoff_coefficient, @@ -66,27 +66,32 @@ def __init__(self, *, retry_timeout=retry_timeout ) + @property + def obj(self) -> task.RetryPolicy: + """Returns the underlying RetryPolicy object.""" + return self._obj + @property def first_retry_interval(self) -> timedelta: """The retry interval to use for the first retry attempt.""" - return self.obj._first_retry_interval + return self._obj._first_retry_interval @property def max_number_of_attempts(self) -> int: """The maximum number of retry attempts.""" - return self.obj._max_number_of_attempts + return self._obj._max_number_of_attempts @property def backoff_coefficient(self) -> Optional[float]: """The backoff coefficient to use for calculating the next retry interval.""" - return self.obj._backoff_coefficient + return self._obj._backoff_coefficient @property def max_retry_interval(self) -> Optional[timedelta]: """The maximum retry interval to use for any retry attempt.""" - return self.obj._max_retry_interval + return self._obj._max_retry_interval @property def retry_timeout(self) -> Optional[timedelta]: """The maximum amount of time to spend retrying the operation.""" - return self.obj._retry_timeout + return self._obj._retry_timeout diff --git a/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py b/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py index c228e009..9ea802f2 100644 --- a/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py @@ -58,7 +58,7 @@ def test_workflow_context_functions(self): assert call_activity_result == mock_call_activity call_sub_orchestrator_result = dapr_wf_ctx.call_child_workflow( - self.mock_client_child_wf, input=None, instance_id=None) + self.mock_client_child_wf) assert call_sub_orchestrator_result == mock_call_sub_orchestrator create_timer_result = dapr_wf_ctx.create_timer(mock_date_time) From 3b095cb40b36bf80cf77a5264fba65b045b12dac Mon Sep 17 00:00:00 2001 From: Deepanshu Agarwal Date: Tue, 16 Jan 2024 18:59:17 +0530 Subject: [PATCH 7/7] fix Signed-off-by: Deepanshu Agarwal --- examples/demo_workflow/README.md | 9 +++++++++ .../dapr/ext/workflow/dapr_workflow_context.py | 4 ++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/examples/demo_workflow/README.md b/examples/demo_workflow/README.md index 317629be..99ccb1cf 100644 --- a/examples/demo_workflow/README.md +++ b/examples/demo_workflow/README.md @@ -69,6 +69,15 @@ You should be able to see the following output: == APP == New counter value is: 11! == APP == Retry count value is: 0! == APP == Retry count value is: 1! This print statement verifies retry +== APP == Appending 1 to child_orchestrator_string! +== APP == Appending a to child_orchestrator_string! +== APP == Appending a to child_orchestrator_string! +== APP == Appending 2 to child_orchestrator_string! +== APP == Appending b to child_orchestrator_string! +== APP == Appending b to child_orchestrator_string! +== APP == Appending 3 to child_orchestrator_string! +== APP == Appending c to child_orchestrator_string! +== APP == Appending c to child_orchestrator_string! == APP == Get response from hello_world_wf after pause call: Suspended == APP == Get response from hello_world_wf after resume call: Running == APP == New counter value is: 111! diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index d3af3ad6..402f5e74 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -63,10 +63,10 @@ def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TO retry_policy: Optional[RetryPolicy] = None) -> task.Task[TOutput]: self._logger.debug(f'{self.instance_id}: Creating activity {activity.__name__}') if hasattr(activity, '_dapr_alternate_name'): - act=activity.__dict__['_dapr_alternate_name'] + act = activity.__dict__['_dapr_alternate_name'] else: # this case should ideally never happen - act=activity.__name__ + act = activity.__name__ if retry_policy is None: return self.__obj.call_activity(activity=act, input=input) return self.__obj.call_activity(activity=act, input=input, retry_policy=retry_policy.obj)