Skip to content

Commit

Permalink
Merge pull request #148 from ral-facilities/DSEGOG-271-Handling-Faile…
Browse files Browse the repository at this point in the history
…d-Ingestion

Dsegog 271 handling failed ingestion
  • Loading branch information
moonraker595 authored Feb 4, 2025
2 parents ad8f996 + 70bbd4e commit bfbbab9
Show file tree
Hide file tree
Showing 9 changed files with 558 additions and 145 deletions.
16 changes: 14 additions & 2 deletions operationsgateway_api/src/mongo/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from operationsgateway_api.src.config import Config
from operationsgateway_api.src.exceptions import DatabaseError
from operationsgateway_api.src.mongo.connection import ConnectionInstance
from operationsgateway_api.src.mongo.mongo_error_handling import mongodb_error_handling

log = logging.getLogger()
ProjectionAlias = Optional[Union[Mapping[str, Any], Iterable[str]]]
Expand All @@ -22,16 +23,17 @@
class MongoDBInterface:
"""
An implementation of various PyMongo and Motor functions that suit our specific
database and colllection names
database and collection names
Motor doesn't support type annotations (see
https://jira.mongodb.org/browse/MOTOR-331 for any updates) so type annotations are
used from PyMongo which from a user perspective, acts almost identically (exlcuding
used from PyMongo which from a user perspective, acts almost identically (excluding
async support of course). This means the type hinting can actually be useful for
developers of this repo
"""

@staticmethod
@mongodb_error_handling("get_collection_object")
def get_collection_object(collection_name: str) -> Collection:
"""
Simple getter function which gets a particular collection so it can be
Expand All @@ -44,6 +46,7 @@ def get_collection_object(collection_name: str) -> Collection:
raise DatabaseError("Invalid collection name given") from exc

@staticmethod
@mongodb_error_handling("find")
def find(
collection_name: str = "images",
filter_: dict = None,
Expand Down Expand Up @@ -83,6 +86,7 @@ def find(
)

@staticmethod
@mongodb_error_handling("query_to_list")
async def query_to_list(query: Cursor) -> List[Dict[str, Any]]:
"""
Sends the query to MongoDB and converts the query results into a list
Expand All @@ -99,6 +103,7 @@ async def query_to_list(query: Cursor) -> List[Dict[str, Any]]:
return await query.to_list(length=Config.config.mongodb.max_documents)

@staticmethod
@mongodb_error_handling("find_one")
async def find_one(
collection_name: str,
filter_: Dict[str, Any] = None,
Expand All @@ -120,6 +125,7 @@ async def find_one(
return await collection.find_one(filter_, sort=sort, projection=projection)

@staticmethod
@mongodb_error_handling("update_one")
async def update_one(
collection_name: str,
filter_: Dict[str, Any] = None,
Expand Down Expand Up @@ -153,6 +159,7 @@ async def update_one(
) from exc

@staticmethod
@mongodb_error_handling("update_many")
async def update_many(
collection_name: str,
filter_: Dict[str, Any] = {}, # noqa: B006
Expand All @@ -177,6 +184,7 @@ async def update_many(
) from exc

@staticmethod
@mongodb_error_handling("insert_one")
async def insert_one(collection_name: str, data: Dict[str, Any]) -> InsertOneResult:
"""
Using the input data, insert a single document into a given collection
Expand All @@ -194,6 +202,7 @@ async def insert_one(collection_name: str, data: Dict[str, Any]) -> InsertOneRes
) from exc

@staticmethod
@mongodb_error_handling("insert_many")
async def insert_many(
collection_name: str,
data: List[Dict[str, Any]],
Expand All @@ -215,6 +224,7 @@ async def insert_many(
) from exc

@staticmethod
@mongodb_error_handling("delete_one")
async def delete_one(
collection_name: str,
filter_: Dict[str, Any] = None,
Expand Down Expand Up @@ -245,6 +255,7 @@ async def delete_one(
) from exc

@staticmethod
@mongodb_error_handling("count_documents")
async def count_documents(
collection_name: str,
filter_: Dict[str, Any] = None,
Expand Down Expand Up @@ -273,6 +284,7 @@ async def count_documents(
) from exc

@staticmethod
@mongodb_error_handling("aggregate")
async def aggregate(
collection_name: str,
pipeline,
Expand Down
48 changes: 48 additions & 0 deletions operationsgateway_api/src/mongo/mongo_error_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from functools import wraps
from inspect import iscoroutinefunction
import logging

from operationsgateway_api.src.exceptions import DatabaseError

log = logging.getLogger()


def mongodb_error_handling(operation: str):
"""
Decorator for consistent error handling in MongoDB operations, supporting both
sync and async functions.
"""

def decorator(func):
if iscoroutinefunction(func): # Check if the function is async

@wraps(func)
async def async_wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except DatabaseError:
raise # If it's already a DatabaseError, propagate it as-is
except Exception as exc:
log.error("Database operation: %s failed", operation)
raise DatabaseError(
f"Database operation failed during {operation}",
) from exc

return async_wrapper
else: # Handle synchronous functions

@wraps(func)
def sync_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except DatabaseError:
raise # If it's already a DatabaseError, propagate it as-is
except Exception as exc:
log.error("Database operation: %s failed", operation)
raise DatabaseError(
f"Database operation failed during {operation}",
) from exc

return sync_wrapper

return decorator
14 changes: 11 additions & 3 deletions operationsgateway_api/src/records/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from io import BytesIO
import logging
from typing import Tuple
from typing import Optional, Tuple

from botocore.exceptions import ClientError
import numpy as np
Expand Down Expand Up @@ -79,7 +79,7 @@ def extract_metadata_from_path(self) -> Tuple[str, str]:
return record_id, channel_name

@staticmethod
def upload_image(input_image: Image) -> None:
def upload_image(input_image: Image) -> Optional[str]:
"""
Save the image on Echo S3 object storage
"""
Expand All @@ -96,7 +96,15 @@ def upload_image(input_image: Image) -> None:
echo = EchoInterface()
storage_path = Image.get_full_path(input_image.image.path)
log.info("Storing image on S3: %s", storage_path)
echo.upload_file_object(image_bytes, storage_path)

try:
echo.upload_file_object(image_bytes, storage_path)
return None # No failure
except EchoS3Error:
# Extract the channel name and propagate it
record_id, channel_name = input_image.extract_metadata_from_path()
log.error("Failed to upload image for channel: %s", channel_name)
return channel_name

@staticmethod
async def get_image(
Expand Down
7 changes: 7 additions & 0 deletions operationsgateway_api/src/records/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ async def update(self) -> None:
},
)

def remove_channel(self, channel_name: str) -> None:
if channel_name in self.record.channels:
log.info("Removing channel '%s' from record.", channel_name)
del self.record.channels[channel_name]
else:
log.error("Channel '%s' not found in record.", channel_name)

async def find_existing_record(self) -> Union[RecordModel, None]:
"""
Using the ID, check if the object's record is currently stored in the database
Expand Down
21 changes: 16 additions & 5 deletions operationsgateway_api/src/records/waveform.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from io import BytesIO
import json
import logging
from typing import Optional

from botocore.exceptions import ClientError
import matplotlib
Expand Down Expand Up @@ -35,17 +36,27 @@ def to_json(self):
b.seek(0)
return b

def insert_waveform(self) -> None:
def insert_waveform(self) -> Optional[str]:
"""
Store the waveform from this object in Echo
"""
log.info("Storing waveform: %s", self.waveform.path)
bytes_json = self.to_json()
echo = EchoInterface()
echo.upload_file_object(
bytes_json,
Waveform.get_full_path(self.waveform.path),
)
try:
echo.upload_file_object(
bytes_json,
Waveform.get_full_path(self.waveform.path),
)
return None # Successful upload
except EchoS3Error:
# Extract the channel name and propagate it
channel_name = self.get_channel_name_from_id()
log.exception(
"Failed to upload waveform for channel '%s'",
channel_name,
)
return channel_name

def create_thumbnail(self) -> None:
"""
Expand Down
42 changes: 35 additions & 7 deletions operationsgateway_api/src/routes/ingest_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,24 +111,52 @@ async def submit_hdf(
record = Record(record_data)

log.debug("Processing waveforms")
failed_waveform_uploads = []
for w in waveforms:
waveform = Waveform(w)
waveform.insert_waveform()
waveform.create_thumbnail()
record.store_thumbnail(waveform)

# Call insert_waveform and track failures
failed_upload = waveform.insert_waveform() # Returns channel name if failed
# if the upload to echo fails, don't process the waveform any further
if failed_upload:
failed_waveform_uploads.append(failed_upload)
else:
waveform.create_thumbnail()
record.store_thumbnail(waveform) # in the record not echo

# This section distributes the Image.upload_image calls across
# the threads in the pool.It takes the image_instances list,
# applies the Image.upload_image function to each item, and collects
# the return values in upload_results. The map function blocks the main
# thread until all the tasks in the pool are complete.
log.debug("Processing images")
failed_image_uploads = []
image_instances = [Image(i) for i in images]
for image in image_instances:
image.create_thumbnail()
record.store_thumbnail(image)

record.store_thumbnail(image) # in the record not echo
if len(image_instances) > 0:
pool = ThreadPool(processes=Config.config.images.upload_image_threads)
pool.map(Image.upload_image, image_instances)
upload_results = pool.map(Image.upload_image, image_instances)
# Filter out successful uploads, collect only failed ones
failed_image_uploads = [channel for channel in upload_results if channel]
pool.close()
image_instances = None

# Combine failed channels from waveforms and images and remove them from the record
# Update the channel checker to reflect failed uploads
all_failed_upload_channels = failed_waveform_uploads + failed_image_uploads
for channel in all_failed_upload_channels:
record.remove_channel(channel)
if channel in checker_response["accepted_channels"]:
# Remove from accepted_channels and add to rejected_channels
checker_response["accepted_channels"].remove(channel)
checker_response["rejected_channels"][channel] = ["Upload to Echo failed"]
elif channel in checker_response["rejected_channels"]:
# Append the failure reason to the existing reasons
checker_response["rejected_channels"][channel].append(
"Upload to Echo failed",
)

if stored_record and accept_type == "accept_merge":
log.debug(
"Record matching ID %s already exists in the database, updating existing"
Expand Down
Loading

0 comments on commit bfbbab9

Please sign in to comment.