Skip to content

Commit

Permalink
migrate mri upload to sqlalchemy
Browse files Browse the repository at this point in the history
  • Loading branch information
maximemulder committed Jan 8, 2025
1 parent cf9ed0c commit 418bece
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 106 deletions.
5 changes: 5 additions & 0 deletions python/lib/database_lib/mri_upload_db.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""This class performs database queries for the mri_upload table"""

from typing_extensions import deprecated

__license__ = "GPLv3"


@deprecated('Use `lib.db.models.mri_upload.DbMriUpload` instead')
class MriUploadDB:
"""
This class performs database queries for imaging dataset stored in the mri_upload table.
Expand Down Expand Up @@ -34,6 +37,7 @@ def __init__(self, db, verbose):
self.db = db
self.verbose = verbose

@deprecated('Use `lib.db.models.mri_upload.DbMriUpload` instead')
def update_mri_upload(self, upload_id, fields, values):
"""
Update the `isTarchiveValidated` field of the upload with the value provided
Expand All @@ -57,6 +61,7 @@ def update_mri_upload(self, upload_id, fields, values):

self.db.update(query=query, args=args)

@deprecated('Use `lib.db.models.mri_upload.DbMriUpload` instead')
def create_mri_upload_dict(self, where_field, where_value):
"""
Create a dictionary out of the entry available in the `mri_upload` table.
Expand Down
8 changes: 4 additions & 4 deletions python/lib/db/models/dicom_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ class DbDicomArchive(Base):
date_sent : Mapped[Optional[datetime]] = mapped_column('DateSent')
pending_transfer : Mapped[bool] = mapped_column('PendingTransfer')

series : Mapped[list['db_dicom_archive_series.DbDicomArchiveSeries']] \
series : Mapped[list['db_dicom_archive_series.DbDicomArchiveSeries']] \
= relationship('DbDicomArchiveSeries', back_populates='archive')
files : Mapped[list['db_dicom_archive_file.DbDicomArchiveFile']] \
files : Mapped[list['db_dicom_archive_file.DbDicomArchiveFile']] \
= relationship('DbDicomArchiveFile', back_populates='archive')
upload : Mapped[Optional['db_mri_upload.DbMriUpload']] \
mri_uploads : Mapped[list['db_mri_upload.DbMriUpload']] \
= relationship('DbMriUpload', back_populates='dicom_archive')
session : Mapped[Optional['db_session.DbSession']] \
session : Mapped[Optional['db_session.DbSession']] \
= relationship('DbSession')
6 changes: 3 additions & 3 deletions python/lib/db/models/mri_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class DbMriUpload(Base):
is_dicom_archive_validated : Mapped[bool] = mapped_column('IsTarchiveValidated')
is_phantom : Mapped[bool] = mapped_column('IsPhantom', YNBool)

dicom_archive : Mapped[Optional['db_dicom_archive.DbDicomArchive']] \
= relationship('DbDicomArchive', back_populates='upload')
session : Mapped[Optional['db_session.DbSession']] \
dicom_archive : Mapped[Optional['db_dicom_archive.DbDicomArchive']] \
= relationship('DbDicomArchive', back_populates='mri_uploads')
session : Mapped[Optional['db_session.DbSession']] \
= relationship('DbSession')
153 changes: 85 additions & 68 deletions python/lib/dcm2bids_imaging_pipeline_lib/base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import lib.utilities
from lib.database import Database
from lib.database_lib.config import Config
from lib.db.queries.dicom_archive import try_get_dicom_archive_with_archive_location, try_get_dicom_archive_with_id
from lib.db.queries.dicom_archive import try_get_dicom_archive_with_archive_location
from lib.db.queries.mri_upload import try_get_mri_upload_with_id
from lib.db.queries.session import try_get_session_with_cand_id_visit_label
from lib.db.queries.site import get_all_sites
from lib.exception.determine_subject_info_error import DetermineSubjectInfoError
from lib.exception.validate_subject_info_error import ValidateSubjectInfoError
from lib.imaging import Imaging
from lib.imaging_upload import ImagingUpload
from lib.logging import log_error_exit, log_verbose, log_warning
from lib.make_env import make_env
from lib.validate_subject_info import validate_subject_info
Expand All @@ -32,10 +32,9 @@ def __init__(self, loris_getopt_obj, script_name):
These includes the following steps:
- load pipeline options
- establish database connection
- load the Config, Imaging, Tarchive, ImagingUpload, Session and other classes
- load the Config, Imaging and other classes
- creates the processing temporary directory
- creates the log file for the script execution
- populate the imaging_upload and tarchive info dictionaries
- determine the subject IDs
- determine the site information
- determine the scanner information
Expand All @@ -51,7 +50,6 @@ def __init__(self, loris_getopt_obj, script_name):
self.options_dict = loris_getopt_obj.options_dict
self.force = self.options_dict["force"]["value"] if "force" in self.options_dict else None
self.verbose = self.options_dict["verbose"]["value"]
self.upload_id = loris_getopt_obj.options_dict["upload_id"]["value"]

# ----------------------------------------------------
# Establish database connection
Expand All @@ -64,7 +62,6 @@ def __init__(self, loris_getopt_obj, script_name):
# -----------------------------------------------------------------------------------
self.config_db_obj = Config(self.db, self.verbose)
self.imaging_obj = Imaging(self.db, self.verbose, self.config_file)
self.imaging_upload_obj = ImagingUpload(self.db, self.verbose)

# ---------------------------------------------------------------------------------------------
# Grep config settings from the Config module
Expand All @@ -83,104 +80,122 @@ def __init__(self, loris_getopt_obj, script_name):
# ---------------------------------------------------------------------------------------------
# Load imaging_upload and tarchive dictionary
# ---------------------------------------------------------------------------------------------
self.load_imaging_upload_and_tarchive_dictionaries()
self.load_mri_upload_and_dicom_archive()

# ---------------------------------------------------------------------------------------------
# Set Inserting field of mri_upload to indicate a script is running on the upload
# and load the notification object
# ---------------------------------------------------------------------------------------------
if "UploadID" in self.imaging_upload_obj.imaging_upload_dict.keys():
self.upload_id = self.imaging_upload_obj.imaging_upload_dict["UploadID"]
self.imaging_upload_obj.update_mri_upload(upload_id=self.upload_id, fields=('Inserting',), values=('1',))
# Update the MRI upload.
self.mri_upload.inserting = True
self.env.db.commit()

# Initiate the notification object now that we have a confirmed UploadID
self.env.init_notifier(self.upload_id)
self.env.init_notifier(self.mri_upload.id)

# ---------------------------------------------------------------------------------
# Determine subject IDs based on DICOM headers and validate the IDs against the DB
# Verify PSC information stored in DICOMs
# Grep scanner information based on what is in the DICOM headers
# ---------------------------------------------------------------------------------
if self.dicom_archive is not None:
try:
self.subject_info = self.imaging_obj.determine_subject_info(self.dicom_archive)
except DetermineSubjectInfoError as error:
log_error_exit(self.env, error.message, lib.exitcode.PROJECT_CUSTOMIZATION_FAILURE)
try:
self.subject_info = self.imaging_obj.determine_subject_info(self.dicom_archive)
except DetermineSubjectInfoError as error:
log_error_exit(self.env, error.message, lib.exitcode.PROJECT_CUSTOMIZATION_FAILURE)

# verify PSC information stored in DICOMs
self.site_dict = self.determine_study_info()
log_verbose(self.env, (
f"Found Center Name: {self.site_dict['CenterName']},"
f" Center ID: {self.site_dict['CenterID']}"
))
# verify PSC information stored in DICOMs
self.site_dict = self.determine_study_info()
log_verbose(self.env, (
f"Found Center Name: {self.site_dict['CenterName']},"
f" Center ID: {self.site_dict['CenterID']}"
))

# grep scanner information based on what is in the DICOM headers
self.scanner_id = self.determine_scanner_info()
# grep scanner information based on what is in the DICOM headers
self.scanner_id = self.determine_scanner_info()

def load_imaging_upload_and_tarchive_dictionaries(self):
def load_mri_upload_and_dicom_archive(self):
"""
Loads the imaging_upload and tarchive info dictionaries based on the content of the imaging_upload
Loads the MRI upload and DICOM archives based on the content of the imaging_upload
and tarchive tables found for the processed UploadID/ArchiveLocation given as argument to
the script.
"""

upload_id = self.options_dict["upload_id"]["value"]
tarchive_path = self.options_dict["tarchive_path"]["value"] \
if "tarchive_path" in self.options_dict.keys() else None
success = False

if upload_id and tarchive_path:
self.imaging_upload_obj.create_imaging_upload_dict_from_upload_id(upload_id)
if not self.imaging_upload_obj.imaging_upload_dict:
mri_upload = try_get_mri_upload_with_id(self.env.db, upload_id)
if mri_upload is None:
log_error_exit(
self.env,
f"Did not find an entry in mri_upload associated with \'UploadID\' {upload_id}.",
lib.exitcode.SELECT_FAILURE,
)
tarchive_id = self.imaging_upload_obj.imaging_upload_dict["TarchiveID"]
if not tarchive_id:

self.mri_upload = mri_upload

if self.mri_upload.dicom_archive is None:
log_error_exit(
self.env,
f"UploadID {upload_id} is not linked to any tarchive in mri_upload.",
lib.exitcode.SELECT_FAILURE,
)

self.dicom_archive = try_get_dicom_archive_with_id(self.env.db, tarchive_id)
self.dicom_archive = self.mri_upload.dicom_archive
if os.path.join(self.data_dir, 'tarchive', self.dicom_archive.archive_location) != tarchive_path:
log_error_exit(
self.env,
f"UploadID {upload_id} and ArchiveLocation {tarchive_path} do not refer to the same upload",
lib.exitcode.SELECT_FAILURE,
)

err_msg = ''
if upload_id:
self.imaging_upload_obj.create_imaging_upload_dict_from_upload_id(upload_id)
if not self.imaging_upload_obj.imaging_upload_dict:
err_msg += f"Did not find an entry in mri_upload associated with 'UploadID' {upload_id}"
else:
if self.imaging_upload_obj.imaging_upload_dict["TarchiveID"]:
tarchive_id = self.imaging_upload_obj.imaging_upload_dict["TarchiveID"]
self.dicom_archive = try_get_dicom_archive_with_id(self.env.db, tarchive_id)
if self.dicom_archive is not None:
success = True
else:
err_msg += f"Could not load tarchive dictionary for TarchiveID {tarchive_id}"
mri_upload = try_get_mri_upload_with_id(self.env.db, upload_id)
if mri_upload is None:
log_error_exit(
self.env,
f"Did not find an entry in mri_upload associated with 'UploadID' {upload_id}",
lib.exitcode.SELECT_FAILURE,
)

self.mri_upload = mri_upload
if self.mri_upload.dicom_archive is None:
log_error_exit(
self.env,
f"Did not find a DICOM archive associated with upload ID {upload_id}",
lib.exitcode.SELECT_FAILURE,
)

self.dicom_archive = self.mri_upload.dicom_archive

elif tarchive_path:
archive_location = tarchive_path.replace(self.dicom_lib_dir, "")
self.dicom_archive = try_get_dicom_archive_with_archive_location(self.env.db, archive_location)
if self.dicom_archive is not None:
success, new_err_msg = self.imaging_upload_obj.create_imaging_upload_dict_from_tarchive_id(
self.dicom_archive.id
dicom_archive = try_get_dicom_archive_with_archive_location(self.env.db, archive_location)
if dicom_archive is None:
log_error_exit(
self.env,
f"Could not load tarchive dictionary for ArchiveLocation {archive_location}",
lib.exitcode.SELECT_FAILURE,
)

if not success:
err_msg += new_err_msg
else:
err_msg += f"Could not load tarchive dictionary for ArchiveLocation {archive_location}"

if not success and not self.force:
log_error_exit(self.env, err_msg, lib.exitcode.SELECT_FAILURE)
self.dicom_archive = dicom_archive

mri_uploads = self.dicom_archive.mri_uploads
match mri_uploads:
case []:
log_error_exit(
self.env,
f"Did not find an entry in mri_upload associated with 'TarchiveID' {self.dicom_archive.id}",
lib.exitcode.SELECT_FAILURE,
)
case [mri_upload]:
self.mri_upload = mri_upload
case _:
log_error_exit(
self.env,
f"Found {len(mri_uploads)} rows in mri_upload for 'TarchiveID' {self.dicom_archive.id}",
lib.exitcode.SELECT_FAILURE,
)

def determine_study_info(self):
"""
Expand Down Expand Up @@ -244,14 +259,15 @@ def validate_subject_info(self):
try:
validate_subject_info(self.env.db, self.subject_info)

self.imaging_upload_obj.update_mri_upload(
upload_id=self.upload_id, fields=('IsCandidateInfoValidated',), values=('1',)
)
# Update the MRI upload.
self.mri_upload.is_candidate_info_validated = True
self.env.db.commit()
except ValidateSubjectInfoError as error:
log_warning(self.env, error.message)
self.imaging_upload_obj.update_mri_upload(
upload_id=self.upload_id, fields=('IsCandidateInfoValidated',), values=('0',)
)

# Update the MRI upload.
self.mri_upload.is_candidate_info_validated = False
self.env.db.commit()

def check_if_tarchive_validated_in_db(self):
"""
Expand All @@ -261,13 +277,13 @@ def check_if_tarchive_validated_in_db(self):
If the DICOM archive was not validated, the pipeline will exit and log the proper error information.
"""
# reload the mri_upload object with updated database values
self.load_imaging_upload_and_tarchive_dictionaries()
mu_dict = self.imaging_upload_obj.imaging_upload_dict
if ("IsTarchiveValidated" not in mu_dict.keys() or not mu_dict["IsTarchiveValidated"]) and not self.force:
self.load_mri_upload_and_dicom_archive()

if not self.mri_upload.is_dicom_archive_validated and not self.force:
log_error_exit(
self.env,
(
f"The DICOM archive validation has failed for UploadID {self.upload_id}. Either run the"
f"The DICOM archive validation has failed for UploadID {self.mri_upload.id}. Either run the"
f" validation again and fix the problem or use --force to force the insertion of the NIfTI file."
),
lib.exitcode.INVALID_DICOM,
Expand Down Expand Up @@ -311,8 +327,9 @@ def move_file(self, old_file_path, new_file_path):
)

def end_upload(self):
if self.upload_id:
self.imaging_upload_obj.update_mri_upload(upload_id=self.upload_id, fields=("Inserting",), values=("0",))
# Update the MRI upload.
self.mri_upload.inserting = False
self.env.db.commit()

def remove_tmp_dir(self):
"""
Expand Down
Loading

0 comments on commit 418bece

Please sign in to comment.