Skip to content

Commit

Permalink
Fix some bugs in retrying of operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Jon committed Feb 18, 2025
1 parent 647c37c commit b18d8d5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
2 changes: 1 addition & 1 deletion datalab/datalab_session/data_operations/data_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def perform_operation(self):

def generate_cache_key(self) -> str:
""" Generate a unique cache key hashed from the input_data and operation name """
string_key = f'{self.name()}_{json.dumps(sorted(self.input_data.items()))}'
string_key = f'{self.name()}_{json.dumps(sorted(self.input_data.items()), sort_keys=True)}'
return hashlib.sha256(string_key.encode('utf-8')).hexdigest()

def set_status(self, status: str):
Expand Down
28 changes: 16 additions & 12 deletions datalab/datalab_session/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@ def should_retry(retries_so_far, exception):

@dramatiq.actor(retry_when=should_retry)
def execute_data_operation(data_operation_name: str, input_data: dict):
operation_class = available_operations().get(data_operation_name)
if operation_class is None:
raise NotImplementedError("Operation not implemented!")
else:
try:
operation_class(input_data).operate()
except ClientAlertException as error:
log.error(f"Client Error executing {data_operation_name}: {error}")
operation_class(input_data).set_failed(str(error))
except Exception as error:
log.exception(error)
operation_class(input_data).set_failed("An unknown error ocurred, contact developers if this persists.")
try:
operation_class = available_operations().get(data_operation_name)
if operation_class is None:
raise NotImplementedError("Operation not implemented!")
else:
try:
operation_class(input_data).operate()
except ClientAlertException as error:
log.error(f"Client Error executing {data_operation_name}: {error}")
operation_class(input_data).set_failed(str(error))
except Exception as error:
log.exception(error)
operation_class(input_data).set_failed("An unknown error ocurred, contact developers if this persists.")
except dramatiq.middleware.TimeLimitExceeded as error:
log.exception(error)
available_operations().get(data_operation_name)(input_data).set_failed("The operation timed out, contact developers if this persists.")

0 comments on commit b18d8d5

Please sign in to comment.