Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
mcantelon committed Jan 4, 2024
1 parent 7c4eea8 commit 5341548
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 13 deletions.
11 changes: 11 additions & 0 deletions AIPscan/Aggregator/task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

from dateutil.parser import ParserError, parse

from AIPscan import db
from AIPscan.Aggregator.types import StorageServicePackage
from AIPscan.models import FetchJobError


def format_api_url_with_limit_offset(storage_service):
Expand Down Expand Up @@ -158,3 +160,12 @@ def write_mets(http_response, package_uuid, subdir):
with open(download_file, "wb") as file:
file.write(http_response.content)
return download_file


def store_fetch_job_error_infomation(fetch_job_id, message):
fetch_error = FetchJobError()
fetch_error.fetch_job_id = fetch_job_id
fetch_error.message = message

Check warning on line 168 in AIPscan/Aggregator/task_helpers.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/task_helpers.py#L166-L168

Added lines #L166 - L168 were not covered by tests

db.session.add(fetch_error)
db.session.commit()

Check warning on line 171 in AIPscan/Aggregator/task_helpers.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/task_helpers.py#L170-L171

Added lines #L170 - L171 were not covered by tests
50 changes: 37 additions & 13 deletions AIPscan/Aggregator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
format_api_url_with_limit_offset,
parse_package_list_file,
process_package_object,
store_fetch_job_error_infomation,
)
from AIPscan.extensions import celery
from AIPscan.helpers import file_sha256_hash
Expand All @@ -33,6 +34,15 @@ class TaskError(Exception):
intervention.
"""

"""
def __init__(self, message):
fetch_error = FetchJobError()
fetch_error.message = message;
db.session.add(fetch_error)
db.session.commit()
"""


def write_packages_json(count, packages, packages_directory):
"""Write package JSON to disk"""
Expand Down Expand Up @@ -63,13 +73,20 @@ def start_mets_task(
celery database.
"""
storage_service = StorageService.query.get(storage_service_id)
storage_location = database_helpers.create_or_update_storage_location(
current_location, storage_service
)

pipeline = database_helpers.create_or_update_pipeline(
origin_pipeline, storage_service
)
try:
storage_location = database_helpers.create_or_update_storage_location(

Check warning on line 78 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L77-L78

Added lines #L77 - L78 were not covered by tests
current_location, storage_service
)
except Exception as err:
store_fetch_job_error_infomation(fetch_job_id, str(err))

Check warning on line 82 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L81-L82

Added lines #L81 - L82 were not covered by tests

try:
pipeline = database_helpers.create_or_update_pipeline(

Check warning on line 85 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L84-L85

Added lines #L84 - L85 were not covered by tests
origin_pipeline, storage_service
)
except Exception as err:
store_fetch_job_error_infomation(fetch_job_id, str(err))

Check warning on line 89 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L88-L89

Added lines #L88 - L89 were not covered by tests

args = [
package_uuid,
Expand Down Expand Up @@ -168,6 +185,8 @@ def workflow_coordinator(
break

if isinstance(package_lists_task.info, TaskError):
store_fetch_job_error_infomation(fetch_job_id, str(package_lists_task.info))

Check warning on line 188 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L188

Added line #L188 was not covered by tests

# Re-raise.
raise (package_lists_task.info)

Expand Down Expand Up @@ -315,13 +334,18 @@ def get_mets(

# Download METS file
storage_service = StorageService.query.get(storage_service_id)
download_file = download_mets(
storage_service,
package_uuid,
relative_path_to_mets,
timestamp_str,
package_list_no,
)

try:
download_file = download_mets(
storage_service,
package_uuid,
relative_path_to_mets,
timestamp_str,
package_list_no,
)
except Exception as err:
store_fetch_job_error_infomation(fetch_job_id, str(err))

Check warning on line 347 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L346-L347

Added lines #L346 - L347 were not covered by tests

mets_name = os.path.basename(download_file)
mets_hash = file_sha256_hash(download_file)

Expand Down
2 changes: 2 additions & 0 deletions AIPscan/Aggregator/templates/storage_service.html
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
<th><strong>Download duration</strong></th>
<th><strong>Packages in SS</strong></th>
<th><strong>New AIPs added</strong></th>
<th><strong>Errors</strong></th>
<th><strong>Action</strong></th>
</tr>
</thead>
Expand All @@ -72,6 +73,7 @@
</div>
</td>
<td>{{ mets_fetch_job.aips|length }}</td>
<td>{{ mets_fetch_job.errors|length }}</td>
<td>
<a href="{{ url_for('aggregator.delete_fetch_job', fetch_job_id=mets_fetch_job.id) }}"><button type="button" class="btn btn-danger">Delete</button></a>
</td>
Expand Down
15 changes: 15 additions & 0 deletions AIPscan/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import re
from datetime import date, datetime

from sqlalchemy.sql import func

from AIPscan import db

UUID_REGEX = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"
Expand Down Expand Up @@ -257,6 +259,9 @@ class FetchJob(db.Model):
db.Integer(), db.ForeignKey("storage_service.id"), nullable=False
)
aips = db.relationship("AIP", cascade="all,delete", backref="fetch_job", lazy=True)
errors = db.relationship(
"FetchJobError", cascade="all,delete", backref="fetch_job", lazy=True
)

def __init__(
self,
Expand All @@ -280,6 +285,16 @@ def __repr__(self):
return "<Fetch Job '{}'>".format(self.download_start)


class FetchJobError(db.Model):
__tablename__ = "fetch_job_error"
id = db.Column(db.Integer(), primary_key=True)
fetch_job_id = db.Column(
db.Integer(), db.ForeignKey("fetch_job.id"), nullable=False
)
message = db.Column(db.String(255), index=True, unique=True)
create_date = db.Column(db.DateTime, server_default=func.now())


class Pipeline(db.Model):
__tablename__ = "pipeline"
id = db.Column(db.Integer(), primary_key=True)
Expand Down

0 comments on commit 5341548

Please sign in to comment.