Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

demo of retry handlers returning a substitute value rather than choosing to retry or fail #3553

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion parsl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing_extensions import Literal

from parsl.dataflow.dependency_resolvers import DependencyResolver
from parsl.dataflow.retries import RetryBehaviour
from parsl.dataflow.taskrecord import TaskRecord
from parsl.errors import ConfigurationError
from parsl.executors.base import ParslExecutor
Expand Down Expand Up @@ -110,7 +111,7 @@ def __init__(self,
garbage_collect: bool = True,
internal_tasks_max_threads: int = 10,
retries: int = 0,
retry_handler: Optional[Callable[[Exception, TaskRecord], float]] = None,
retry_handler: Optional[Callable[[Exception, TaskRecord], Union[float, RetryBehaviour]]] = None,
run_dir: str = 'runinfo',
std_autopath: Optional[Callable] = None,
strategy: Optional[str] = 'simple',
Expand Down
22 changes: 19 additions & 3 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from parsl.dataflow.errors import BadCheckpoint, DependencyError, JoinError
from parsl.dataflow.futures import AppFuture
from parsl.dataflow.memoization import Memoizer
from parsl.dataflow.retries import CompleteWithAlternateValue
from parsl.dataflow.rundirs import make_rundir
from parsl.dataflow.states import FINAL_FAILURE_STATES, FINAL_STATES, States
from parsl.dataflow.taskrecord import TaskRecord
Expand Down Expand Up @@ -340,8 +341,11 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
if not future.done():
raise InternalConsistencyError("done callback called, despite future not reporting itself as done")

returned_result = False

try:
res = self._unwrap_remote_exception_wrapper(future)
returned_result = True

except Exception as e:
logger.info(f"Task {task_id} try {task_record['try_id']} failed with exception of type {type(e).__name__}")
Expand All @@ -351,7 +355,7 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
task_record['fail_count'] += 1
if self._config.retry_handler:
try:
cost = self._config.retry_handler(e, task_record)
retry_behaviour = self._config.retry_handler(e, task_record)
except Exception as retry_handler_exception:
logger.exception("retry_handler raised an exception - will not retry")

Expand All @@ -363,7 +367,17 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
# rather than the execution level exception
e = retry_handler_exception
else:
task_record['fail_cost'] += cost
if isinstance(retry_behaviour, float) or isinstance(retry_behaviour, int):
task_record['fail_cost'] += retry_behaviour
elif isinstance(retry_behaviour, CompleteWithAlternateValue):
# retry_behaviour.value contains the value we will complete with, instead
# of trying to either retry or fail.
res = retry_behaviour.value
returned_result = True
else:
# TODO: this error doesn't go anywhere... just makes a hang...
# should, like the block above, turn it into a final exception.
raise InternalConsistencyError(f"Unrecognised retry behaviour {retry_behaviour}")
else:
task_record['fail_cost'] += 1

Expand Down Expand Up @@ -399,7 +413,9 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
with task_record['app_fu']._update_lock:
task_record['app_fu'].set_exception(e)

else:
# we might do the result path even if we did exception handling above...
# because the exception handling might have decided we should have a result
if returned_result:
if task_record['from_memo']:
self._complete_task(task_record, States.memo_done, res)
self._send_task_log_info(task_record)
Expand Down
11 changes: 11 additions & 0 deletions parsl/dataflow/retries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from abc import ABCMeta
from dataclasses import dataclass


class RetryBehaviour(metaclass=ABCMeta):
pass


@dataclass
class CompleteWithAlternateValue(RetryBehaviour):
value: object
1 change: 1 addition & 0 deletions parsl/tests/test_error_handling/test_retry_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import parsl
from parsl import bash_app
from parsl.dataflow.retries import CompleteWithAlternateValue
from parsl.tests.configs.local_threads import fresh_config


Expand Down
57 changes: 57 additions & 0 deletions parsl/tests/test_error_handling/test_retry_handler_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import os

import pytest

import parsl
from parsl import python_app
from parsl.dataflow.retries import CompleteWithAlternateValue
from parsl.tests.configs.local_threads import fresh_config


class SpecialException(Exception):
pass


def error_to_result_handler(exc, task_record):
"""Given a particular exception, turn it into a specific result"""
if isinstance(exc, SpecialException):
# substitute the exception with an alternate success
return CompleteWithAlternateValue(8)
else:
return 1 # regular retry cost


def local_config():
c = fresh_config()
c.retries = 2
c.retry_handler = error_to_result_handler
return c


@python_app
def returns_val():
return 7


@python_app
def raises_runtime():
raise RuntimeError("from raises_runtime")


@python_app
def raises_special():
raise SpecialException(Exception)


@pytest.mark.local
def test_retry():

# two pre-reqs to validate that results and normal exceptions are handled
# correctly with the test retry handler
assert returns_val().result() == 7
with pytest.raises(RuntimeError):
raises_runtime().result()

# the actual test: check that a special exception has been replaced by a
# a real value
assert raises_special().result() == 8
Loading