From b18d8d524927dfa7d77990a619a2f27993c485bc Mon Sep 17 00:00:00 2001 From: Jon Date: Tue, 18 Feb 2025 21:49:34 -0100 Subject: [PATCH] Fix some bugs in retrying of operations --- .../data_operations/data_operation.py | 2 +- datalab/datalab_session/tasks.py | 28 +++++++++++-------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/datalab/datalab_session/data_operations/data_operation.py b/datalab/datalab_session/data_operations/data_operation.py index 574f373..ebb2250 100644 --- a/datalab/datalab_session/data_operations/data_operation.py +++ b/datalab/datalab_session/data_operations/data_operation.py @@ -64,7 +64,7 @@ def perform_operation(self): def generate_cache_key(self) -> str: """ Generate a unique cache key hashed from the input_data and operation name """ - string_key = f'{self.name()}_{json.dumps(sorted(self.input_data.items()))}' + string_key = f'{self.name()}_{json.dumps(sorted(self.input_data.items()), sort_keys=True)}' return hashlib.sha256(string_key.encode('utf-8')).hexdigest() def set_status(self, status: str): diff --git a/datalab/datalab_session/tasks.py b/datalab/datalab_session/tasks.py index d299488..e213761 100644 --- a/datalab/datalab_session/tasks.py +++ b/datalab/datalab_session/tasks.py @@ -15,15 +15,19 @@ def should_retry(retries_so_far, exception): @dramatiq.actor(retry_when=should_retry) def execute_data_operation(data_operation_name: str, input_data: dict): - operation_class = available_operations().get(data_operation_name) - if operation_class is None: - raise NotImplementedError("Operation not implemented!") - else: - try: - operation_class(input_data).operate() - except ClientAlertException as error: - log.error(f"Client Error executing {data_operation_name}: {error}") - operation_class(input_data).set_failed(str(error)) - except Exception as error: - log.exception(error) - operation_class(input_data).set_failed("An unknown error ocurred, contact developers if this persists.") + try: + operation_class = available_operations().get(data_operation_name) + if operation_class is None: + raise NotImplementedError("Operation not implemented!") + else: + try: + operation_class(input_data).operate() + except ClientAlertException as error: + log.error(f"Client Error executing {data_operation_name}: {error}") + operation_class(input_data).set_failed(str(error)) + except Exception as error: + log.exception(error) + operation_class(input_data).set_failed("An unknown error ocurred, contact developers if this persists.") + except dramatiq.middleware.TimeLimitExceeded as error: + log.exception(error) + available_operations().get(data_operation_name)(input_data).set_failed("The operation timed out, contact developers if this persists.")