Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate ImagingUpload and MriUploadDB to the new database abstraction #1224

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions python/lib/database_lib/mri_upload_db.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""This class performs database queries for the mri_upload table"""

from typing_extensions import deprecated

__license__ = "GPLv3"


@deprecated('Use `lib.db.models.mri_upload.DbMriUpload` instead')
class MriUploadDB:
"""
This class performs database queries for imaging dataset stored in the mri_upload table.
Expand Down Expand Up @@ -34,6 +37,7 @@ def __init__(self, db, verbose):
self.db = db
self.verbose = verbose

@deprecated('Use `lib.db.models.mri_upload.DbMriUpload` instead')
def update_mri_upload(self, upload_id, fields, values):
"""
Update the `isTarchiveValidated` field of the upload with the value provided
Expand All @@ -57,6 +61,7 @@ def update_mri_upload(self, upload_id, fields, values):

self.db.update(query=query, args=args)

@deprecated('Use `lib.db.models.mri_upload.DbMriUpload` instead')
def create_mri_upload_dict(self, where_field, where_value):
"""
Create a dictionary out of the entry available in the `mri_upload` table.
Expand Down
6 changes: 4 additions & 2 deletions python/lib/db/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def get_database_engine(config: DatabaseConfig):
Connect to the database and return an SQLAlchemy engine using the provided credentials.
"""

# The SQLAlchemy URL object notably escapes special characters in the configuration attributes
# The SQLAlchemy URL object notably escapes special characters in the configuration attributes.
url = URL.create(
drivername = 'mysql+mysqldb',
host = config.host,
Expand All @@ -18,4 +18,6 @@ def get_database_engine(config: DatabaseConfig):
database = config.database,
)

return create_engine(url)
# 'READ COMMITED' means that the records read in a session can be modified by other sessions
# (such as subscripts or other scripts) during this session's lifetime.
return create_engine(url, isolation_level='READ COMMITTED')
8 changes: 4 additions & 4 deletions python/lib/db/models/dicom_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ class DbDicomArchive(Base):
date_sent : Mapped[Optional[datetime]] = mapped_column('DateSent')
pending_transfer : Mapped[bool] = mapped_column('PendingTransfer')

series : Mapped[list['db_dicom_archive_series.DbDicomArchiveSeries']] \
series : Mapped[list['db_dicom_archive_series.DbDicomArchiveSeries']] \
= relationship('DbDicomArchiveSeries', back_populates='archive')
files : Mapped[list['db_dicom_archive_file.DbDicomArchiveFile']] \
files : Mapped[list['db_dicom_archive_file.DbDicomArchiveFile']] \
= relationship('DbDicomArchiveFile', back_populates='archive')
upload : Mapped[Optional['db_mri_upload.DbMriUpload']] \
mri_uploads : Mapped[list['db_mri_upload.DbMriUpload']] \
= relationship('DbMriUpload', back_populates='dicom_archive')
session : Mapped[Optional['db_session.DbSession']] \
session : Mapped[Optional['db_session.DbSession']] \
= relationship('DbSession')
6 changes: 3 additions & 3 deletions python/lib/db/models/mri_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class DbMriUpload(Base):
is_dicom_archive_validated : Mapped[bool] = mapped_column('IsTarchiveValidated')
is_phantom : Mapped[bool] = mapped_column('IsPhantom', YNBool)

dicom_archive : Mapped[Optional['db_dicom_archive.DbDicomArchive']] \
= relationship('DbDicomArchive', back_populates='upload')
session : Mapped[Optional['db_session.DbSession']] \
dicom_archive : Mapped[Optional['db_dicom_archive.DbDicomArchive']] \
= relationship('DbDicomArchive', back_populates='mri_uploads')
session : Mapped[Optional['db_session.DbSession']] \
= relationship('DbSession')
153 changes: 85 additions & 68 deletions python/lib/dcm2bids_imaging_pipeline_lib/base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import lib.utilities
from lib.database import Database
from lib.database_lib.config import Config
from lib.db.queries.dicom_archive import try_get_dicom_archive_with_archive_location, try_get_dicom_archive_with_id
from lib.db.queries.dicom_archive import try_get_dicom_archive_with_archive_location
from lib.db.queries.mri_upload import try_get_mri_upload_with_id
from lib.db.queries.session import try_get_session_with_cand_id_visit_label
from lib.db.queries.site import get_all_sites
from lib.exception.determine_subject_info_error import DetermineSubjectInfoError
from lib.exception.validate_subject_info_error import ValidateSubjectInfoError
from lib.imaging import Imaging
from lib.imaging_upload import ImagingUpload
from lib.logging import log_error_exit, log_verbose, log_warning
from lib.make_env import make_env
from lib.validate_subject_info import validate_subject_info
Expand All @@ -32,10 +32,9 @@ def __init__(self, loris_getopt_obj, script_name):
These includes the following steps:
- load pipeline options
- establish database connection
- load the Config, Imaging, Tarchive, ImagingUpload, Session and other classes
- load the Config, Imaging and other classes
- creates the processing temporary directory
- creates the log file for the script execution
- populate the imaging_upload and tarchive info dictionaries
- determine the subject IDs
- determine the site information
- determine the scanner information
Expand All @@ -51,7 +50,6 @@ def __init__(self, loris_getopt_obj, script_name):
self.options_dict = loris_getopt_obj.options_dict
self.force = self.options_dict["force"]["value"] if "force" in self.options_dict else None
self.verbose = self.options_dict["verbose"]["value"]
self.upload_id = loris_getopt_obj.options_dict["upload_id"]["value"]

# ----------------------------------------------------
# Establish database connection
Expand All @@ -64,7 +62,6 @@ def __init__(self, loris_getopt_obj, script_name):
# -----------------------------------------------------------------------------------
self.config_db_obj = Config(self.db, self.verbose)
self.imaging_obj = Imaging(self.db, self.verbose, self.config_file)
self.imaging_upload_obj = ImagingUpload(self.db, self.verbose)

# ---------------------------------------------------------------------------------------------
# Grep config settings from the Config module
Expand All @@ -83,104 +80,122 @@ def __init__(self, loris_getopt_obj, script_name):
# ---------------------------------------------------------------------------------------------
# Load imaging_upload and tarchive dictionary
# ---------------------------------------------------------------------------------------------
self.load_imaging_upload_and_tarchive_dictionaries()
self.load_mri_upload_and_dicom_archive()

# ---------------------------------------------------------------------------------------------
# Set Inserting field of mri_upload to indicate a script is running on the upload
# and load the notification object
# ---------------------------------------------------------------------------------------------
if "UploadID" in self.imaging_upload_obj.imaging_upload_dict.keys():
self.upload_id = self.imaging_upload_obj.imaging_upload_dict["UploadID"]
self.imaging_upload_obj.update_mri_upload(upload_id=self.upload_id, fields=('Inserting',), values=('1',))
# Update the MRI upload.
self.mri_upload.inserting = True
self.env.db.commit()

# Initiate the notification object now that we have a confirmed UploadID
self.env.init_notifier(self.upload_id)
self.env.init_notifier(self.mri_upload.id)

# ---------------------------------------------------------------------------------
# Determine subject IDs based on DICOM headers and validate the IDs against the DB
# Verify PSC information stored in DICOMs
# Grep scanner information based on what is in the DICOM headers
# ---------------------------------------------------------------------------------
if self.dicom_archive is not None:
try:
self.subject_info = self.imaging_obj.determine_subject_info(self.dicom_archive)
except DetermineSubjectInfoError as error:
log_error_exit(self.env, error.message, lib.exitcode.PROJECT_CUSTOMIZATION_FAILURE)
try:
self.subject_info = self.imaging_obj.determine_subject_info(self.dicom_archive)
except DetermineSubjectInfoError as error:
log_error_exit(self.env, error.message, lib.exitcode.PROJECT_CUSTOMIZATION_FAILURE)

# verify PSC information stored in DICOMs
self.site_dict = self.determine_study_info()
log_verbose(self.env, (
f"Found Center Name: {self.site_dict['CenterName']},"
f" Center ID: {self.site_dict['CenterID']}"
))
# verify PSC information stored in DICOMs
self.site_dict = self.determine_study_info()
log_verbose(self.env, (
f"Found Center Name: {self.site_dict['CenterName']},"
f" Center ID: {self.site_dict['CenterID']}"
))

# grep scanner information based on what is in the DICOM headers
self.scanner_id = self.determine_scanner_info()
# grep scanner information based on what is in the DICOM headers
self.scanner_id = self.determine_scanner_info()

def load_imaging_upload_and_tarchive_dictionaries(self):
def load_mri_upload_and_dicom_archive(self):
"""
Loads the imaging_upload and tarchive info dictionaries based on the content of the imaging_upload
Loads the MRI upload and DICOM archives based on the content of the imaging_upload
and tarchive tables found for the processed UploadID/ArchiveLocation given as argument to
the script.
"""

upload_id = self.options_dict["upload_id"]["value"]
tarchive_path = self.options_dict["tarchive_path"]["value"] \
if "tarchive_path" in self.options_dict.keys() else None
success = False

if upload_id and tarchive_path:
self.imaging_upload_obj.create_imaging_upload_dict_from_upload_id(upload_id)
if not self.imaging_upload_obj.imaging_upload_dict:
mri_upload = try_get_mri_upload_with_id(self.env.db, upload_id)
if mri_upload is None:
log_error_exit(
self.env,
f"Did not find an entry in mri_upload associated with \'UploadID\' {upload_id}.",
lib.exitcode.SELECT_FAILURE,
)
tarchive_id = self.imaging_upload_obj.imaging_upload_dict["TarchiveID"]
if not tarchive_id:

self.mri_upload = mri_upload

if self.mri_upload.dicom_archive is None:
log_error_exit(
self.env,
f"UploadID {upload_id} is not linked to any tarchive in mri_upload.",
lib.exitcode.SELECT_FAILURE,
)

self.dicom_archive = try_get_dicom_archive_with_id(self.env.db, tarchive_id)
self.dicom_archive = self.mri_upload.dicom_archive
if os.path.join(self.data_dir, 'tarchive', self.dicom_archive.archive_location) != tarchive_path:
log_error_exit(
self.env,
f"UploadID {upload_id} and ArchiveLocation {tarchive_path} do not refer to the same upload",
lib.exitcode.SELECT_FAILURE,
)

err_msg = ''
if upload_id:
self.imaging_upload_obj.create_imaging_upload_dict_from_upload_id(upload_id)
if not self.imaging_upload_obj.imaging_upload_dict:
err_msg += f"Did not find an entry in mri_upload associated with 'UploadID' {upload_id}"
else:
if self.imaging_upload_obj.imaging_upload_dict["TarchiveID"]:
tarchive_id = self.imaging_upload_obj.imaging_upload_dict["TarchiveID"]
self.dicom_archive = try_get_dicom_archive_with_id(self.env.db, tarchive_id)
if self.dicom_archive is not None:
success = True
else:
err_msg += f"Could not load tarchive dictionary for TarchiveID {tarchive_id}"
mri_upload = try_get_mri_upload_with_id(self.env.db, upload_id)
if mri_upload is None:
log_error_exit(
self.env,
f"Did not find an entry in mri_upload associated with 'UploadID' {upload_id}",
lib.exitcode.SELECT_FAILURE,
)

self.mri_upload = mri_upload
if self.mri_upload.dicom_archive is None:
log_error_exit(
self.env,
f"Did not find a DICOM archive associated with upload ID {upload_id}",
lib.exitcode.SELECT_FAILURE,
)

self.dicom_archive = self.mri_upload.dicom_archive

elif tarchive_path:
archive_location = tarchive_path.replace(self.dicom_lib_dir, "")
self.dicom_archive = try_get_dicom_archive_with_archive_location(self.env.db, archive_location)
if self.dicom_archive is not None:
success, new_err_msg = self.imaging_upload_obj.create_imaging_upload_dict_from_tarchive_id(
self.dicom_archive.id
dicom_archive = try_get_dicom_archive_with_archive_location(self.env.db, archive_location)
if dicom_archive is None:
log_error_exit(
self.env,
f"Could not load tarchive dictionary for ArchiveLocation {archive_location}",
lib.exitcode.SELECT_FAILURE,
)

if not success:
err_msg += new_err_msg
else:
err_msg += f"Could not load tarchive dictionary for ArchiveLocation {archive_location}"

if not success and not self.force:
log_error_exit(self.env, err_msg, lib.exitcode.SELECT_FAILURE)
self.dicom_archive = dicom_archive

mri_uploads = self.dicom_archive.mri_uploads
match mri_uploads:
case []:
log_error_exit(
self.env,
f"Did not find an entry in mri_upload associated with 'TarchiveID' {self.dicom_archive.id}",
lib.exitcode.SELECT_FAILURE,
)
case [mri_upload]:
self.mri_upload = mri_upload
case _:
log_error_exit(
self.env,
f"Found {len(mri_uploads)} rows in mri_upload for 'TarchiveID' {self.dicom_archive.id}",
lib.exitcode.SELECT_FAILURE,
)

def determine_study_info(self):
"""
Expand Down Expand Up @@ -244,14 +259,15 @@ def validate_subject_info(self):
try:
validate_subject_info(self.env.db, self.subject_info)

self.imaging_upload_obj.update_mri_upload(
upload_id=self.upload_id, fields=('IsCandidateInfoValidated',), values=('1',)
)
# Update the MRI upload.
self.mri_upload.is_candidate_info_validated = True
self.env.db.commit()
except ValidateSubjectInfoError as error:
log_warning(self.env, error.message)
self.imaging_upload_obj.update_mri_upload(
upload_id=self.upload_id, fields=('IsCandidateInfoValidated',), values=('0',)
)

# Update the MRI upload.
self.mri_upload.is_candidate_info_validated = False
self.env.db.commit()

def check_if_tarchive_validated_in_db(self):
"""
Expand All @@ -261,13 +277,13 @@ def check_if_tarchive_validated_in_db(self):
If the DICOM archive was not validated, the pipeline will exit and log the proper error information.
"""
# reload the mri_upload object with updated database values
self.load_imaging_upload_and_tarchive_dictionaries()
mu_dict = self.imaging_upload_obj.imaging_upload_dict
if ("IsTarchiveValidated" not in mu_dict.keys() or not mu_dict["IsTarchiveValidated"]) and not self.force:
self.load_mri_upload_and_dicom_archive()

if not self.mri_upload.is_dicom_archive_validated and not self.force:
log_error_exit(
self.env,
(
f"The DICOM archive validation has failed for UploadID {self.upload_id}. Either run the"
f"The DICOM archive validation has failed for UploadID {self.mri_upload.id}. Either run the"
f" validation again and fix the problem or use --force to force the insertion of the NIfTI file."
),
lib.exitcode.INVALID_DICOM,
Expand Down Expand Up @@ -311,8 +327,9 @@ def move_file(self, old_file_path, new_file_path):
)

def end_upload(self):
if self.upload_id:
self.imaging_upload_obj.update_mri_upload(upload_id=self.upload_id, fields=("Inserting",), values=("0",))
# Update the MRI upload.
self.mri_upload.inserting = False
self.env.db.commit()

def remove_tmp_dir(self):
"""
Expand Down
Loading
Loading