diff --git a/AIPscan/Aggregator/task_helpers.py b/AIPscan/Aggregator/task_helpers.py index f0a6db14..a706d665 100644 --- a/AIPscan/Aggregator/task_helpers.py +++ b/AIPscan/Aggregator/task_helpers.py @@ -9,7 +9,9 @@ from dateutil.parser import ParserError, parse +from AIPscan import db from AIPscan.Aggregator.types import StorageServicePackage +from AIPscan.models import FetchJobError def format_api_url_with_limit_offset(storage_service): @@ -158,3 +160,12 @@ def write_mets(http_response, package_uuid, subdir): with open(download_file, "wb") as file: file.write(http_response.content) return download_file + + +def store_fetch_job_error_infomation(fetch_job_id, message): + fetch_error = FetchJobError() + fetch_error.fetch_job_id = fetch_job_id + fetch_error.message = message + + db.session.add(fetch_error) + db.session.commit() diff --git a/AIPscan/Aggregator/tasks.py b/AIPscan/Aggregator/tasks.py index ff46ada2..7f01141c 100644 --- a/AIPscan/Aggregator/tasks.py +++ b/AIPscan/Aggregator/tasks.py @@ -19,6 +19,7 @@ format_api_url_with_limit_offset, parse_package_list_file, process_package_object, + store_fetch_job_error_infomation, ) from AIPscan.extensions import celery from AIPscan.helpers import file_sha256_hash @@ -33,6 +34,15 @@ class TaskError(Exception): intervention. """ + """ + def __init__(self, message): + fetch_error = FetchJobError() + fetch_error.message = message; + + db.session.add(fetch_error) + db.session.commit() + """ + def write_packages_json(count, packages, packages_directory): """Write package JSON to disk""" @@ -63,13 +73,20 @@ def start_mets_task( celery database. """ storage_service = StorageService.query.get(storage_service_id) - storage_location = database_helpers.create_or_update_storage_location( - current_location, storage_service - ) - pipeline = database_helpers.create_or_update_pipeline( - origin_pipeline, storage_service - ) + try: + storage_location = database_helpers.create_or_update_storage_location( + current_location, storage_service + ) + except Exception as err: + store_fetch_job_error_infomation(fetch_job_id, str(err)) + + try: + pipeline = database_helpers.create_or_update_pipeline( + origin_pipeline, storage_service + ) + except Exception as err: + store_fetch_job_error_infomation(fetch_job_id, str(err)) args = [ package_uuid, @@ -168,6 +185,8 @@ def workflow_coordinator( break if isinstance(package_lists_task.info, TaskError): + store_fetch_job_error_infomation(fetch_job_id, str(package_lists_task.info)) + # Re-raise. raise (package_lists_task.info) @@ -315,13 +334,18 @@ def get_mets( # Download METS file storage_service = StorageService.query.get(storage_service_id) - download_file = download_mets( - storage_service, - package_uuid, - relative_path_to_mets, - timestamp_str, - package_list_no, - ) + + try: + download_file = download_mets( + storage_service, + package_uuid, + relative_path_to_mets, + timestamp_str, + package_list_no, + ) + except Exception as err: + store_fetch_job_error_infomation(fetch_job_id, str(err)) + mets_name = os.path.basename(download_file) mets_hash = file_sha256_hash(download_file) diff --git a/AIPscan/Aggregator/templates/storage_service.html b/AIPscan/Aggregator/templates/storage_service.html index 7d880118..3ba8c7f4 100644 --- a/AIPscan/Aggregator/templates/storage_service.html +++ b/AIPscan/Aggregator/templates/storage_service.html @@ -49,6 +49,7 @@