From 6400d35619945d75b580d194844b4d8319ee2948 Mon Sep 17 00:00:00 2001 From: Curtis McCully Date: Wed, 11 Dec 2024 09:18:31 -0500 Subject: [PATCH 1/9] Added delay to trigger after block end and to prefer public/same block/same proposal calibrations. --- CHANGES.md | 6 ++++ banzai/calibrations.py | 11 ++++-- banzai/celery.py | 12 ++++--- banzai/dbs.py | 41 ++++++++++----------- banzai/frames.py | 25 +++++++++++++ banzai/lco.py | 42 ++++++++++++++++++---- banzai/main.py | 8 +++++ banzai/settings.py | 2 ++ banzai/tests/test_frames.py | 2 +- banzai/tests/test_need_to_process_image.py | 18 ++++++---- banzai/utils/realtime_utils.py | 10 +++++- 11 files changed, 134 insertions(+), 43 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index d3be4c8c5..139b64726 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +1.20.0 (2024-12-11) +------------------- +- Added functionality to delay processing until after the end of the observing block +- Added the ability to only use public calibrations +- Added the ability to prefer calibrations taken within the same block or with the same proposal + 1.19.1 (2024-11-05) ------------------- - Added extra logging and try excepts to catch frames that bypass silently diff --git a/banzai/calibrations.py b/banzai/calibrations.py index 7578f34f2..0a4fa9997 100644 --- a/banzai/calibrations.py +++ b/banzai/calibrations.py @@ -137,9 +137,14 @@ def apply_master_calibration(self, image, master_calibration_image): pass def get_calibration_file_info(self, image): - return dbs.get_master_cal(image, self.calibration_type, self.master_selection_criteria, - use_only_older_calibrations=self.runtime_context.use_only_older_calibrations, - db_address=self.runtime_context.db_address) + return dbs.cal_record_to_file_info( + dbs.get_master_cal_record(image, self.calibration_type, self.master_selection_criteria, + self.runtime_context.db_address, + use_only_older_calibrations=self.runtime_context.use_only_older_calibrations, + prefer_same_block=self.runtime_context.same_block_cals, + prefer_same_proposal=self.runtime_context.prefer_same_proposal, + check_public=self.runtime_context.check_public_cals) + ) class CalibrationComparer(CalibrationUser): diff --git a/banzai/celery.py b/banzai/celery.py index 28637130f..e34214e8e 100644 --- a/banzai/celery.py +++ b/banzai/celery.py @@ -4,7 +4,7 @@ from celery import Celery from kombu import Queue - +from celery.exceptions import Retry from banzai import dbs, calibrations, logs from banzai.utils import date_utils, realtime_utils, stage_utils from celery.signals import worker_process_init @@ -174,15 +174,15 @@ def stack_calibrations(self, min_date: str, max_date: str, instrument_id: int, f raise self.retry() -@app.task(name='celery.process_image', reject_on_worker_lost=True, max_retries=5) -def process_image(file_info: dict, runtime_context: dict): +@app.task(name='celery.process_image', bind=True, reject_on_worker_lost=True, max_retries=5) +def process_image(self, file_info: dict, runtime_context: dict): """ :param file_info: Body of queue message: dict :param runtime_context: Context object with runtime environment info """ - logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')}) - runtime_context = Context(runtime_context) try: + logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')}) + runtime_context = Context(runtime_context) if realtime_utils.need_to_process_image(file_info, runtime_context): if 'path' in file_info: filename = os.path.basename(file_info['path']) @@ -193,6 +193,8 @@ def process_image(file_info: dict, runtime_context: dict): realtime_utils.increment_try_number(filename, db_address=runtime_context.db_address) stage_utils.run_pipeline_stages([file_info], runtime_context) realtime_utils.set_file_as_processed(filename, db_address=runtime_context.db_address) + except Retry: + raise except Exception: logger.error("Exception processing frame: {error}".format(error=logs.format_exception()), extra_tags={'file_info': file_info}) diff --git a/banzai/dbs.py b/banzai/dbs.py index 6786aa56f..1c5ef66c7 100755 --- a/banzai/dbs.py +++ b/banzai/dbs.py @@ -10,9 +10,8 @@ import os.path import datetime from dateutil.parser import parse -import numpy as np import requests -from sqlalchemy import create_engine, pool, type_coerce, cast +from sqlalchemy import create_engine, pool, type_coerce, cast, func from sqlalchemy.orm import sessionmaker from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Boolean, CHAR, JSON, UniqueConstraint, Float from sqlalchemy.ext.declarative import declarative_base @@ -74,6 +73,9 @@ class CalibrationImage(Base): good_until = Column(DateTime, default=datetime.datetime(3000, 1, 1)) good_after = Column(DateTime, default=datetime.datetime(1000, 1, 1)) attributes = Column(JSON) + blockid = Column(Integer, nullable=True) + proposal = Column(String(50), nullable=True) + public_date = Column(DateTime, nullable=True) class Instrument(Base): @@ -336,7 +338,8 @@ def cal_record_to_file_info(record): def get_master_cal_record(image, calibration_type, master_selection_criteria, db_address, - use_only_older_calibrations=False): + use_only_older_calibrations=False, prefer_same_block=False, check_public=False, + prefer_same_proposal=False): calibration_criteria = CalibrationImage.type == calibration_type.upper() calibration_criteria &= CalibrationImage.instrument_id == image.instrument.id calibration_criteria &= CalibrationImage.is_master.is_(True) @@ -356,24 +359,22 @@ def get_master_cal_record(image, calibration_type, master_selection_criteria, db calibration_criteria &= CalibrationImage.good_after <= image.dateobs calibration_criteria &= CalibrationImage.good_until >= image.dateobs + calibration_image = None with get_session(db_address=db_address) as db_session: - calibration_images = db_session.query(CalibrationImage).filter(calibration_criteria).all() - - # Exit if no calibration file found - if len(calibration_images) == 0: - return None - - # Find the closest date - date_deltas = np.abs(np.array([i.dateobs - image.dateobs for i in calibration_images])) - closest_calibration_image = calibration_images[np.argmin(date_deltas)] - - return closest_calibration_image - - -def get_master_cal(image, calibration_type, master_selection_criteria, db_address, - use_only_older_calibrations=False): - return cal_record_to_file_info(get_master_cal_record(image, calibration_type, master_selection_criteria, db_address, - use_only_older_calibrations=use_only_older_calibrations)) + if prefer_same_block: + block_criteria = CalibrationImage.blockid == image.blockid + image_filter = db_session.query(CalibrationImage).filter(calibration_criteria & block_criteria) + calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first() + if calibration_image is None and prefer_same_proposal: + proposal_criteria = CalibrationImage.proposal == image.proposal + image_filter = db_session.query(CalibrationImage).filter(calibration_criteria & proposal_criteria) + calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first() + if check_public: + calibration_criteria &= CalibrationImage.public_date <= datetime.datetime.utcnow() + if calibration_image is None: + image_filter = db_session.query(CalibrationImage).filter(calibration_criteria) + calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first() + return calibration_image def get_individual_cal_records(instrument, calibration_type, min_date: str, max_date: str, db_address: str, diff --git a/banzai/frames.py b/banzai/frames.py index 47543aa53..fe5b79298 100644 --- a/banzai/frames.py +++ b/banzai/frames.py @@ -165,6 +165,31 @@ def requested_ccd_temperature(self): def measured_ccd_temperature(self): pass + @property + @abc.abstractmethod + def block_end_date(self): + pass + + @property + @abc.abstractmethod + def proposal(self): + pass + + @property + @abc.abstractmethod + def blockid(self): + pass + + @property + @abc.abstractmethod + def public_date(self): + pass + + @public_date.setter + @abc.abstractmethod + def public_date(self, value): + pass + @property def data_type(self): # Convert bytes to bits diff --git a/banzai/lco.py b/banzai/lco.py index 67447d57e..57a2418b2 100644 --- a/banzai/lco.py +++ b/banzai/lco.py @@ -62,6 +62,30 @@ def dateobs(self): def datecreated(self): return Time(self.primary_hdu.meta.get('DATE'), scale='utc').datetime + @property + def block_end_date(self): + return Time(self.primary_hdu.meta.get('BLKEDATE'), scale='utc').datetime + + @property + def proposal(self): + return self.primary_hdu.meta.get('PROPID') + + @property + def blockid(self): + return self.primary_hdu.meta.get('BLKUID') + + @property + def public_date(self): + pubdat = self.primary_hdu.meta.get('L1PUBDAT') + if pubdat is None: + return pubdat + else: + return Time(pubdat).datetime + + @public_date.setter + def public_date(self, value: datetime.datetime): + self.primary_hdu.meta['L1PUBDAT'] = date_utils.date_obs_to_string(value), '[UTC] Date the frame becomes public' + @property def configuration_mode(self): mode = self.meta.get('CONFMODE', 'default') @@ -112,13 +136,14 @@ def save_processing_metadata(self, context): self.meta['PIPEVER'] = (context.PIPELINE_VERSION, 'Pipeline version') - if any(fnmatch(self.meta['PROPID'].lower(), public_proposal) for public_proposal in context.PUBLIC_PROPOSALS): - self.meta['L1PUBDAT'] = (self.meta['DATE-OBS'], '[UTC] Date the frame becomes public') - else: - # Wait to make public - date_observed = date_utils.parse_date_obs(self.meta['DATE-OBS']) - next_year = date_observed + datetime.timedelta(days=context.DATA_RELEASE_DELAY) - self.meta['L1PUBDAT'] = (date_utils.date_obs_to_string(next_year), '[UTC] Date the frame becomes public') + if self.public_date is None: + # Don't override the public date if it already exists + if any(fnmatch(self.meta['PROPID'].lower(), public_proposal) for public_proposal in context.PUBLIC_PROPOSALS): + self.public_date = self.dateobs + else: + # Wait to make public + next_year = self.dateobs + datetime.timedelta(days=context.DATA_RELEASE_DELAY) + self.public_date = next_year def get_output_filename(self, runtime_context): output_filename = self.filename.replace('00.fits', '{:02d}.fits'.format(int(runtime_context.reduction_level))) @@ -171,6 +196,9 @@ def to_db_record(self, output_product): 'is_master': self.is_master, 'is_bad': self.is_bad, 'frameid': output_product.frame_id, + 'blockid': self.blockid, + 'proposal': self.proposal, + 'public_date': self.public_date, 'attributes': {}} for attribute in self.grouping_criteria: record_attributes['attributes'][attribute] = str(getattr(self, attribute)) diff --git a/banzai/main.py b/banzai/main.py index bd1c1fd5a..76deb9754 100755 --- a/banzai/main.py +++ b/banzai/main.py @@ -118,6 +118,14 @@ def parse_args(settings, extra_console_arguments=None, parser_description='Proce help='Maximum number of times to try to process a frame') parser.add_argument('--broker-url', dest='broker_url', help='URL for the FITS broker service.') + parser.add_argument('--delay-to-block-end', dest='delay_to_block_end', default=False, action='store_true', + help='Delay real-time processing until after the block has ended') + parser.add_argument('--same-block-cals', dest='same_block_cals', default=False, action='store_true', + help='Prefer calibrations taken in the same block') + parser.add_argument('--check-public-cals', dest='check_public_cals', default=False, action='store_true', + help='Check to see if calibration frames are public before using them?') + parser.add_argument('--prefer-same-proposal', dest='prefer_same_proposal', default=False, action='store_true', + help='Prefer calibrations taken with the same proposal') if extra_console_arguments is None: extra_console_arguments = [] diff --git a/banzai/settings.py b/banzai/settings.py index 7123c5b9a..7f3567bb0 100644 --- a/banzai/settings.py +++ b/banzai/settings.py @@ -90,6 +90,8 @@ 'banzai.utils.file_utils.ccdsum_to_filename', 'banzai.utils.file_utils.filter_to_filename')} +OBSTYPES_TO_DELAY = [] + TELESCOPE_FILENAME_FUNCTION = 'banzai.utils.file_utils.telescope_to_filename' OBSERVATION_PORTAL_URL = os.getenv('OBSERVATION_PORTAL_URL', diff --git a/banzai/tests/test_frames.py b/banzai/tests/test_frames.py index e66ae92fb..0a22d6cfd 100644 --- a/banzai/tests/test_frames.py +++ b/banzai/tests/test_frames.py @@ -384,7 +384,7 @@ def test_data_to_detector_section_full(): def test_propid_public(): proposal_ids = ['standard', 'Photometric standards', 'NRES standards', 'FLOYDS standards'] - date_obs = '2021-09-01T00:00:00' + date_obs = '2021-09-01T00:00:00.000009' test_data = [CCDData(np.zeros((1024, 1024)), meta={'PROPID': propid, 'DATE-OBS': date_obs}) for propid in proposal_ids] diff --git a/banzai/tests/test_need_to_process_image.py b/banzai/tests/test_need_to_process_image.py index 3d56fc89b..f65ea72ed 100644 --- a/banzai/tests/test_need_to_process_image.py +++ b/banzai/tests/test_need_to_process_image.py @@ -3,6 +3,7 @@ from banzai.tests.utils import FakeContext from banzai.utils.realtime_utils import need_to_process_image +import datetime md5_hash1 = '49a6bb35cdd3859224c0214310b1d9b6' md5_hash2 = 'aec5ef355e7e43a59fedc88ac95caed6' @@ -11,7 +12,7 @@ class FakeRealtimeImage(object): - def __init__(self, success=False, checksum=md5_hash1, tries=0): + def __init__(self, success=False, checksum=md5_hash1, tries=0, block_end_date=None): self.success = success self.checksum = checksum self.tries = tries @@ -22,10 +23,11 @@ def __init__(self, success=False, checksum=md5_hash1, tries=0): @mock.patch('banzai.utils.fits_utils.get_primary_header') @mock.patch('banzai.utils.image_utils.image_can_be_processed') def test_no_processing_if_previous_success(mock_can_process, mock_header, mock_processed, mock_md5): + mock_task = mock.MagicMock() mock_can_process.return_value = True mock_processed.return_value = FakeRealtimeImage(success=True, checksum=md5_hash1) mock_md5.return_value = md5_hash1 - assert not need_to_process_image({'path':'test.fits'}, FakeContext()) + assert not need_to_process_image({'path': 'test.fits'}, FakeContext(), mock_task) @mock.patch('banzai.dbs.commit_processed_image') @@ -34,10 +36,11 @@ def test_no_processing_if_previous_success(mock_can_process, mock_header, mock_p @mock.patch('banzai.utils.fits_utils.get_primary_header') @mock.patch('banzai.utils.image_utils.image_can_be_processed') def test_do_process_if_never_tried(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit): + mock_task = mock.MagicMock() mock_can_process.return_value = True mock_processed.return_value = FakeRealtimeImage(success=False, checksum=md5_hash1, tries=0) mock_md5.return_value = md5_hash1 - assert need_to_process_image({'path':'test.fits'}, FakeContext()) + assert need_to_process_image({'path': 'test.fits'}, FakeContext(), mock_task) @mock.patch('banzai.dbs.commit_processed_image') @@ -46,12 +49,13 @@ def test_do_process_if_never_tried(mock_can_process, mock_header, mock_processed @mock.patch('banzai.utils.fits_utils.get_primary_header') @mock.patch('banzai.utils.image_utils.image_can_be_processed') def test_do_process_if_tries_less_than_max(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit): + mock_task = mock.MagicMock() mock_can_process.return_value = True mock_processed.return_value = FakeRealtimeImage(success=False, checksum=md5_hash1, tries=3) mock_md5.return_value = md5_hash1 context = FakeContext() context.max_tries = 5 - assert need_to_process_image({'path':'test.fits'}, context) + assert need_to_process_image({'path': 'test.fits'}, context, mock_task) @mock.patch('banzai.dbs.commit_processed_image') @@ -60,13 +64,14 @@ def test_do_process_if_tries_less_than_max(mock_can_process, mock_header, mock_p @mock.patch('banzai.utils.fits_utils.get_primary_header') @mock.patch('banzai.utils.image_utils.image_can_be_processed') def test_no_processing_if_tries_at_max(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit): + mock_task = mock.MagicMock() mock_can_process.return_value = True max_tries = 5 mock_processed.return_value = FakeRealtimeImage(success=False, checksum=md5_hash1, tries=max_tries) mock_md5.return_value = md5_hash1 context = FakeContext() context.max_tries = max_tries - assert not need_to_process_image({'path':'test.fits'}, context) + assert not need_to_process_image({'path': 'test.fits'}, context, mock_task) @mock.patch('banzai.dbs.commit_processed_image') @@ -76,11 +81,12 @@ def test_no_processing_if_tries_at_max(mock_can_process, mock_header, mock_proce @mock.patch('banzai.utils.image_utils.image_can_be_processed') def test_do_process_if_new_checksum(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit): # assert that tries and success are reset to 0 + mock_task = mock.MagicMock() image = FakeRealtimeImage(success=True, checksum=md5_hash1, tries=3) mock_can_process.return_value = True mock_processed.return_value = image mock_md5.return_value = md5_hash2 - assert need_to_process_image({'path': 'test.fits'}, FakeContext()) + assert need_to_process_image({'path': 'test.fits'}, FakeContext(), mock_task) assert not image.success assert image.tries == 0 assert image.checksum == md5_hash2 diff --git a/banzai/utils/realtime_utils.py b/banzai/utils/realtime_utils.py index 2f6ca001c..f920c2dd8 100644 --- a/banzai/utils/realtime_utils.py +++ b/banzai/utils/realtime_utils.py @@ -4,6 +4,8 @@ from banzai.utils import file_utils, import_utils, image_utils from banzai.data import HeaderOnly from banzai import logs +import datetime + logger = logs.get_logger() @@ -22,7 +24,7 @@ def increment_try_number(path, db_address): dbs.commit_processed_image(image, db_address=db_address) -def need_to_process_image(file_info, context): +def need_to_process_image(file_info, context, task): """ Figure out if we need to try to make a process a given file. @@ -108,6 +110,12 @@ def need_to_process_image(file_info, context): msg = 'The header in this queue message appears to not be complete enough to make a Frame object' logger.error(msg, extra_tags={'filename': filename}) need_to_process = False + if context.delay_to_block_end and test_image.obstype in context.OBSTYPES_TO_DELAY: + if datetime.datetime.now() < test_image.block_end_date: + logger.info('Observing Block in progress. Retrying 5 minutes after it completes', + extra_tags={'filename': filename}) + delay = test_image.block_end_date - datetime.datetime.now() + datetime.timedelta(minutes=5) + task.retry(countdown=delay.total_seconds()) except Exception: logger.error('Issue creating Image object with given queue message', extra_tags={"filename": filename}) logger.error(logs.format_exception()) From e5859642deb95ad1539efe9a7153305065967c4d Mon Sep 17 00:00:00 2001 From: Curtis McCully Date: Mon, 16 Dec 2024 09:57:52 -0500 Subject: [PATCH 2/9] Fixes to tests. Addressed some comments. --- banzai/calibrations.py | 8 ++++---- banzai/celery.py | 2 +- banzai/dbs.py | 12 ++++++------ banzai/main.py | 3 ++- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/banzai/calibrations.py b/banzai/calibrations.py index 0a4fa9997..2909691ab 100644 --- a/banzai/calibrations.py +++ b/banzai/calibrations.py @@ -139,11 +139,11 @@ def apply_master_calibration(self, image, master_calibration_image): def get_calibration_file_info(self, image): return dbs.cal_record_to_file_info( dbs.get_master_cal_record(image, self.calibration_type, self.master_selection_criteria, - self.runtime_context.db_address, + self.runtime_context.db_address, use_only_older_calibrations=self.runtime_context.use_only_older_calibrations, - prefer_same_block=self.runtime_context.same_block_cals, - prefer_same_proposal=self.runtime_context.prefer_same_proposal, - check_public=self.runtime_context.check_public_cals) + prefer_same_block_cals=self.runtime_context.same_block_cals, + prefer_same_proposal_cals=self.runtime_context.prefer_same_proposal_cals, + check_public_cals=self.runtime_context.check_public_cals) ) diff --git a/banzai/celery.py b/banzai/celery.py index e34214e8e..bdda5e060 100644 --- a/banzai/celery.py +++ b/banzai/celery.py @@ -183,7 +183,7 @@ def process_image(self, file_info: dict, runtime_context: dict): try: logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')}) runtime_context = Context(runtime_context) - if realtime_utils.need_to_process_image(file_info, runtime_context): + if realtime_utils.need_to_process_image(file_info, runtime_context, self): if 'path' in file_info: filename = os.path.basename(file_info['path']) else: diff --git a/banzai/dbs.py b/banzai/dbs.py index 1c5ef66c7..ca8bbacd3 100755 --- a/banzai/dbs.py +++ b/banzai/dbs.py @@ -338,8 +338,8 @@ def cal_record_to_file_info(record): def get_master_cal_record(image, calibration_type, master_selection_criteria, db_address, - use_only_older_calibrations=False, prefer_same_block=False, check_public=False, - prefer_same_proposal=False): + use_only_older_calibrations=False, prefer_same_block_cals=False, check_public_cals=False, + prefer_same_proposal_cals=False): calibration_criteria = CalibrationImage.type == calibration_type.upper() calibration_criteria &= CalibrationImage.instrument_id == image.instrument.id calibration_criteria &= CalibrationImage.is_master.is_(True) @@ -361,16 +361,16 @@ def get_master_cal_record(image, calibration_type, master_selection_criteria, db calibration_image = None with get_session(db_address=db_address) as db_session: - if prefer_same_block: + if prefer_same_block_cals: block_criteria = CalibrationImage.blockid == image.blockid image_filter = db_session.query(CalibrationImage).filter(calibration_criteria & block_criteria) calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first() - if calibration_image is None and prefer_same_proposal: + if calibration_image is None and prefer_same_proposal_cals: proposal_criteria = CalibrationImage.proposal == image.proposal image_filter = db_session.query(CalibrationImage).filter(calibration_criteria & proposal_criteria) calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first() - if check_public: - calibration_criteria &= CalibrationImage.public_date <= datetime.datetime.utcnow() + if check_public_cals: + calibration_criteria &= CalibrationImage.public_date <= datetime.datetime.now(datetime.UTC) if calibration_image is None: image_filter = db_session.query(CalibrationImage).filter(calibration_criteria) calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first() diff --git a/banzai/main.py b/banzai/main.py index 76deb9754..a1e12145e 100755 --- a/banzai/main.py +++ b/banzai/main.py @@ -124,7 +124,8 @@ def parse_args(settings, extra_console_arguments=None, parser_description='Proce help='Prefer calibrations taken in the same block') parser.add_argument('--check-public-cals', dest='check_public_cals', default=False, action='store_true', help='Check to see if calibration frames are public before using them?') - parser.add_argument('--prefer-same-proposal', dest='prefer_same_proposal', default=False, action='store_true', + parser.add_argument('--prefer-same-proposal-cals', dest='prefer_same_proposal_cals', + default=False, action='store_true', help='Prefer calibrations taken with the same proposal') if extra_console_arguments is None: From 8fe8f7e21075884ce5b0453561de740c800cbbb7 Mon Sep 17 00:00:00 2001 From: Curtis McCully Date: Mon, 13 Jan 2025 10:24:41 -0500 Subject: [PATCH 3/9] Cover the case when the block id is null in the header. --- banzai/lco.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/banzai/lco.py b/banzai/lco.py index 57a2418b2..01067e103 100644 --- a/banzai/lco.py +++ b/banzai/lco.py @@ -72,7 +72,10 @@ def proposal(self): @property def blockid(self): - return self.primary_hdu.meta.get('BLKUID') + id = self.primary_hdu.meta.get('BLKUID') + if str(id).lower() in ['n/a', 'unknown', 'none', '']: + id = None + return id @property def public_date(self): From 2b81b605eb86e3bc130f7f4583737e9f27470276 Mon Sep 17 00:00:00 2001 From: Curtis McCully Date: Mon, 13 Jan 2025 15:52:24 -0500 Subject: [PATCH 4/9] Updates to how we order by date because postgres and sqlite are not compatible with direct date differences. --- banzai/dbs.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/banzai/dbs.py b/banzai/dbs.py index ca8bbacd3..ad00557bd 100755 --- a/banzai/dbs.py +++ b/banzai/dbs.py @@ -361,19 +361,21 @@ def get_master_cal_record(image, calibration_type, master_selection_criteria, db calibration_image = None with get_session(db_address=db_address) as db_session: + order_func = func.abs(func.julianday(CalibrationImage.dateobs) - func.julianday(image.dateobs)) if prefer_same_block_cals: block_criteria = CalibrationImage.blockid == image.blockid image_filter = db_session.query(CalibrationImage).filter(calibration_criteria & block_criteria) - calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first() + calibration_image = image_filter.order_by(order_func).first() if calibration_image is None and prefer_same_proposal_cals: proposal_criteria = CalibrationImage.proposal == image.proposal image_filter = db_session.query(CalibrationImage).filter(calibration_criteria & proposal_criteria) - calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first() + calibration_image = image_filter.order_by(order_func).first() if check_public_cals: calibration_criteria &= CalibrationImage.public_date <= datetime.datetime.now(datetime.UTC) if calibration_image is None: image_filter = db_session.query(CalibrationImage).filter(calibration_criteria) - calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first() + calibration_image = image_filter.order_by(order_func).first() + return calibration_image From a009efd5d6a533f8507879e5aa36a3d03d423aac Mon Sep 17 00:00:00 2001 From: Curtis McCully Date: Mon, 13 Jan 2025 19:07:42 -0500 Subject: [PATCH 5/9] Another attempt to deal with date intervals in sqlite and postgres. --- banzai/dbs.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/banzai/dbs.py b/banzai/dbs.py index ad00557bd..ed833c6f6 100755 --- a/banzai/dbs.py +++ b/banzai/dbs.py @@ -361,7 +361,13 @@ def get_master_cal_record(image, calibration_type, master_selection_criteria, db calibration_image = None with get_session(db_address=db_address) as db_session: - order_func = func.abs(func.julianday(CalibrationImage.dateobs) - func.julianday(image.dateobs)) + if 'postgres' in db_session.bind.dialect.name: + order_func = func.abs(func.extract("epoch", CalibrationImage.dateobs) - + func.extract("epoch", image.dateobs)) + elif 'sqlite' in db_session.bind.dialect.name: + order_func = func.abs(func.julianday(CalibrationImage.dateobs) - func.julianday(image.dateobs)) + else: + raise NotImplementedError("Only postgres and sqlite are supported") if prefer_same_block_cals: block_criteria = CalibrationImage.blockid == image.blockid image_filter = db_session.query(CalibrationImage).filter(calibration_criteria & block_criteria) From c0b0a6df4bacbf624c29577af43b55856b194db5 Mon Sep 17 00:00:00 2001 From: Curtis McCully Date: Wed, 29 Jan 2025 12:42:13 -0500 Subject: [PATCH 6/9] Removed deprecated utcnow and switch datetime.UTC to datetime.timezone.utc for compatability with python <=3.10 --- banzai/celery.py | 2 +- banzai/dbs.py | 2 +- banzai/lco.py | 2 +- banzai/tests/test_celery.py | 4 ++-- banzai/tests/test_date_utils.py | 2 +- banzai/utils/date_utils.py | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/banzai/celery.py b/banzai/celery.py index bdda5e060..1fa3903ec 100644 --- a/banzai/celery.py +++ b/banzai/celery.py @@ -105,7 +105,7 @@ def schedule_calibration_stacking(site: str, runtime_context: dict, # Set the delay to after the latest block end calibration_end_time = max([parse(block['end']) for block in blocks_for_calibration]).replace(tzinfo=None) stack_delay = timedelta(seconds=runtime_context.CALIBRATION_STACK_DELAYS[frame_type.upper()]) - now = datetime.utcnow().replace(microsecond=0) + now = datetime.now(datetime.timezone.utc).replace(microsecond=0) message_delay = calibration_end_time - now + stack_delay if message_delay.days < 0: message_delay_in_seconds = 0 # Remove delay if block end is in the past diff --git a/banzai/dbs.py b/banzai/dbs.py index ed833c6f6..7c4e45951 100755 --- a/banzai/dbs.py +++ b/banzai/dbs.py @@ -377,7 +377,7 @@ def get_master_cal_record(image, calibration_type, master_selection_criteria, db image_filter = db_session.query(CalibrationImage).filter(calibration_criteria & proposal_criteria) calibration_image = image_filter.order_by(order_func).first() if check_public_cals: - calibration_criteria &= CalibrationImage.public_date <= datetime.datetime.now(datetime.UTC) + calibration_criteria &= CalibrationImage.public_date <= datetime.datetime.now(datetime.timezone.utc) if calibration_image is None: image_filter = db_session.query(CalibrationImage).filter(calibration_criteria) calibration_image = image_filter.order_by(order_func).first() diff --git a/banzai/lco.py b/banzai/lco.py index 01067e103..177802e55 100644 --- a/banzai/lco.py +++ b/banzai/lco.py @@ -133,7 +133,7 @@ def measured_ccd_temperature(self): return self.primary_hdu.meta.get('CCDATEMP', 0.0) def save_processing_metadata(self, context): - datecreated = datetime.datetime.utcnow() + datecreated = datetime.datetime.now(datetime.timezone.utc) self.meta['DATE'] = (date_utils.date_obs_to_string(datecreated), '[UTC] Date this FITS file was written') self.meta['RLEVEL'] = (context.reduction_level, 'Reduction level') diff --git a/banzai/tests/test_celery.py b/banzai/tests/test_celery.py index 2838c228b..460a42596 100644 --- a/banzai/tests/test_celery.py +++ b/banzai/tests/test_celery.py @@ -1,7 +1,7 @@ import mock import pytest -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from unittest.mock import ANY from astropy.io.fits import Header @@ -211,7 +211,7 @@ def test_submit_stacking_tasks_to_queue_no_delay(self, mock_filter_blocks, mock_ def test_submit_stacking_tasks_to_queue_with_delay(self, mock_filter_blocks, mock_get_blocks, mock_get_instruments, mock_stack_calibrations, setup): mock_get_instruments.return_value = [self.fake_inst] - self.fake_blocks_response_json['results'][0]['end'] = datetime.strftime(datetime.utcnow() + timedelta(minutes=1), + self.fake_blocks_response_json['results'][0]['end'] = datetime.strftime(datetime.now(timezone.utc) + timedelta(minutes=1), date_utils.TIMESTAMP_FORMAT) mock_get_blocks.return_value = self.fake_blocks_response_json mock_filter_blocks.return_value = [block for block in self.fake_blocks_response_json['results']] diff --git a/banzai/tests/test_date_utils.py b/banzai/tests/test_date_utils.py index e32c48408..071c2c15c 100644 --- a/banzai/tests/test_date_utils.py +++ b/banzai/tests/test_date_utils.py @@ -56,7 +56,7 @@ @mock.patch('banzai.utils.date_utils.datetime.datetime') def test_get_dayobs(mock_datetime): for data in test_site_data.values(): - mock_datetime.utcnow = mock.Mock(return_value=datetime(2019, 5, 1) + data['schedule_time']) + mock_datetime.now = mock.Mock(return_value=datetime(2019, 5, 1) + data['schedule_time']) assert date_utils.get_dayobs(data['timezone']) == data['dayobs'] diff --git a/banzai/utils/date_utils.py b/banzai/utils/date_utils.py index be10545a5..c895b80f2 100755 --- a/banzai/utils/date_utils.py +++ b/banzai/utils/date_utils.py @@ -117,7 +117,7 @@ def total_seconds(timedelta): def get_dayobs(timezone): # Get the current utc - now = datetime.datetime.utcnow() + now = datetime.now(datetime.timezone.utc) # Add the timezone offset now += datetime.timedelta(hours=timezone) return epoch_date_to_string(now.date()) From fde6732130fe40548185370971679c4427742eb7 Mon Sep 17 00:00:00 2001 From: Curtis McCully Date: Mon, 3 Feb 2025 13:59:59 -0500 Subject: [PATCH 7/9] Added alembic migration for the database. --- alembic/alembic.ini | 117 ++++++++++++++++++ {banzai/utils => alembic}/db_migration.py | 5 + alembic/migrations/env.py | 47 +++++++ alembic/migrations/script.py.mako | 26 ++++ ...dd_calibrations_block_info_proposal_id_.py | 60 +++++++++ setup.cfg | 2 +- 6 files changed, 256 insertions(+), 1 deletion(-) create mode 100644 alembic/alembic.ini rename {banzai/utils => alembic}/db_migration.py (95%) create mode 100644 alembic/migrations/env.py create mode 100644 alembic/migrations/script.py.mako create mode 100644 alembic/migrations/versions/8e35c490a971_add_calibrations_block_info_proposal_id_.py diff --git a/alembic/alembic.ini b/alembic/alembic.ini new file mode 100644 index 000000000..c008fe40c --- /dev/null +++ b/alembic/alembic.ini @@ -0,0 +1,117 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +# Use forward slashes (/) also on windows to provide an os agnostic path +script_location = migrations + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library. +# Any required deps can installed by adding `alembic[tz]` to the pip requirements +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to migrations/versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" below. +# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. +# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. +# Valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +# version_path_separator = newline +# +# Use os.pathsep. Default configuration used for new projects. +version_path_separator = os + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the exec runner, execute a binary +# hooks = ruff +# ruff.type = exec +# ruff.executable = %(here)s/.venv/bin/ruff +# ruff.options = --fix REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARNING +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARNING +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/banzai/utils/db_migration.py b/alembic/db_migration.py similarity index 95% rename from banzai/utils/db_migration.py rename to alembic/db_migration.py index b378824d5..7f5d12193 100644 --- a/banzai/utils/db_migration.py +++ b/alembic/db_migration.py @@ -1,3 +1,8 @@ +# I have archived this file here even though it is not in alembic format beacuse +# this is one of the early database migrations we did when we changed banzai to track +# instruments and cameras rather than just telescopes. Future migrations of this type +# will use alembic. + import argparse from sqlalchemy import create_engine diff --git a/alembic/migrations/env.py b/alembic/migrations/env.py new file mode 100644 index 000000000..f2daf0e61 --- /dev/null +++ b/alembic/migrations/env.py @@ -0,0 +1,47 @@ +from logging.config import fileConfig + +import os +from sqlalchemy import pool, create_engine +from alembic import context + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = None + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connection_url = os.getenv("DATABASE_URL", "sqlite:///fallback.db") + connectable = create_engine(connection_url, poolclass=pool.NullPool) + + with connectable.connect() as connection: + context.configure( + connection=connection, target_metadata=target_metadata + ) + + with context.begin_transaction(): + context.run_migrations() + + +run_migrations() diff --git a/alembic/migrations/script.py.mako b/alembic/migrations/script.py.mako new file mode 100644 index 000000000..fbc4b07dc --- /dev/null +++ b/alembic/migrations/script.py.mako @@ -0,0 +1,26 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} diff --git a/alembic/migrations/versions/8e35c490a971_add_calibrations_block_info_proposal_id_.py b/alembic/migrations/versions/8e35c490a971_add_calibrations_block_info_proposal_id_.py new file mode 100644 index 000000000..2f38b43b8 --- /dev/null +++ b/alembic/migrations/versions/8e35c490a971_add_calibrations_block_info_proposal_id_.py @@ -0,0 +1,60 @@ +"""Add calibrations block info, proposal id, and public date to CalibrationImage + +Revision ID: 8e35c490a971 +Revises: +Create Date: 2025-01-31 12:19:03.407606 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.sql import text +import requests +import os +import datetime + + +# revision identifiers, used by Alembic. +revision: str = '8e35c490a971' +down_revision: Union[str, None] = None +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column('calimages', sa.Column('blockid', sa.Integer(), nullable=True)) + op.add_column('calimages', sa.Column('proposal', sa.String(50), nullable=True)) + op.add_column('calimages', sa.Column('public_date', sa.DateTime(), nullable=True)) + if os.getenv("AUTH_TOKEN") is not None: + auth_header = {'Authorization': f'Token {os.environ["AUTH_TOKEN"]}'} + else: + auth_header = None + + connection = op.get_bind() + rows = connection.execute(text("SELECT id, frameid, filename FROM calimages")) + for row in rows: + params = {'basename_exact': row.filename.replace('.fz', '').replace('.fits', '')} + request_results = requests.get('https://archive-api.lco.global/frames/', params=params, headers=auth_header) + request_results = request_results.json() + if len(request_results['results']) == 0: + continue + + blockid = request_results[0]['BLKUID'] + proposal = request_results[0]['proposal_id'] + public_date = datetime.datetime.strptime(request_results[0]['public_date'], '%Y-%m-%dT%H:%M:%S.%fZ') + values = {'public_date': public_date, 'proposal': proposal, 'blockid': blockid} + query_str = 'blockid = :blockid, proposal = :proposal, public_date = :public_date' + if row.frameid is None: + query_str += ', frameid = :frameid' + values['frameid'] = request_results['results'][0]['id'] + connection.execute( + text(f"UPDATE calimages SET {query_str} WHERE id = :id"), + values + ) + + +def downgrade() -> None: + op.drop_column('calimages', 'blockid') + op.drop_column('calimages', 'proposal') + op.drop_column('calimages', 'public_date') diff --git a/setup.cfg b/setup.cfg index d21c76a91..0ad2a3633 100755 --- a/setup.cfg +++ b/setup.cfg @@ -70,6 +70,7 @@ install_requires = emcee scikit-image cosmic-conn>=0.2.8 + alembic setup_requires = setuptools_scm python_requires = >=3.7 @@ -96,7 +97,6 @@ console_scripts = banzai_mark_frame_as_good = banzai.main:mark_frame_as_good banzai_mark_frame_as_bad = banzai.main:mark_frame_as_bad banzai_update_db = banzai.main:update_db - banzai_migrate_db = banzai.utils.db_migration:migrate_db banzai_add_instrument = banzai.main:add_instrument banzai_add_site = banzai.main:add_site banzai_add_super_calibration = banzai.main:add_super_calibration From 0c2e05fda3b3e1f4f1765496559643fde96c3a47 Mon Sep 17 00:00:00 2001 From: Curtis McCully Date: Tue, 4 Feb 2025 14:27:26 -0500 Subject: [PATCH 8/9] Fixes to datetime updates. --- banzai/calibrations.py | 2 +- banzai/celery.py | 7 ++++--- banzai/main.py | 2 +- banzai/utils/date_utils.py | 2 +- tox.ini | 4 ++-- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/banzai/calibrations.py b/banzai/calibrations.py index 2909691ab..3b794cc24 100644 --- a/banzai/calibrations.py +++ b/banzai/calibrations.py @@ -141,7 +141,7 @@ def get_calibration_file_info(self, image): dbs.get_master_cal_record(image, self.calibration_type, self.master_selection_criteria, self.runtime_context.db_address, use_only_older_calibrations=self.runtime_context.use_only_older_calibrations, - prefer_same_block_cals=self.runtime_context.same_block_cals, + prefer_same_block_cals=self.runtime_context.prefer_same_block_cals, prefer_same_proposal_cals=self.runtime_context.prefer_same_proposal_cals, check_public_cals=self.runtime_context.check_public_cals) ) diff --git a/banzai/celery.py b/banzai/celery.py index 1fa3903ec..15b7dcd4c 100644 --- a/banzai/celery.py +++ b/banzai/celery.py @@ -1,5 +1,5 @@ import os -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from dateutil.parser import parse from celery import Celery @@ -103,9 +103,10 @@ def schedule_calibration_stacking(site: str, runtime_context: dict, stacking_min_date, stacking_max_date) if len(blocks_for_calibration) > 0: # Set the delay to after the latest block end - calibration_end_time = max([parse(block['end']) for block in blocks_for_calibration]).replace(tzinfo=None) + calibration_end_time = max([parse(block['end']) for block in blocks_for_calibration]) + calibration_end_time = calibration_end_time.replace(tzinfo=timezone.utc) stack_delay = timedelta(seconds=runtime_context.CALIBRATION_STACK_DELAYS[frame_type.upper()]) - now = datetime.now(datetime.timezone.utc).replace(microsecond=0) + now = datetime.now(timezone.utc).replace(microsecond=0) message_delay = calibration_end_time - now + stack_delay if message_delay.days < 0: message_delay_in_seconds = 0 # Remove delay if block end is in the past diff --git a/banzai/main.py b/banzai/main.py index a1e12145e..72a1d5c70 100755 --- a/banzai/main.py +++ b/banzai/main.py @@ -120,7 +120,7 @@ def parse_args(settings, extra_console_arguments=None, parser_description='Proce help='URL for the FITS broker service.') parser.add_argument('--delay-to-block-end', dest='delay_to_block_end', default=False, action='store_true', help='Delay real-time processing until after the block has ended') - parser.add_argument('--same-block-cals', dest='same_block_cals', default=False, action='store_true', + parser.add_argument('--prefer-same-block-cals', dest='prefer_same_block_cals', default=False, action='store_true', help='Prefer calibrations taken in the same block') parser.add_argument('--check-public-cals', dest='check_public_cals', default=False, action='store_true', help='Check to see if calibration frames are public before using them?') diff --git a/banzai/utils/date_utils.py b/banzai/utils/date_utils.py index c895b80f2..d0a2a9a07 100755 --- a/banzai/utils/date_utils.py +++ b/banzai/utils/date_utils.py @@ -117,7 +117,7 @@ def total_seconds(timedelta): def get_dayobs(timezone): # Get the current utc - now = datetime.now(datetime.timezone.utc) + now = datetime.datetime.now(datetime.timezone.utc) # Add the timezone offset now += datetime.timedelta(hours=timezone) return epoch_date_to_string(now.date()) diff --git a/tox.ini b/tox.ini index 61eeec70e..2490b244e 100644 --- a/tox.ini +++ b/tox.ini @@ -59,8 +59,8 @@ extras = commands = pip freeze - !cov: pytest --pyargs banzai.tests -m "not e2e" {toxinidir}/docs {posargs} - cov: pytest --pyargs banzai.tests {toxinidir}/docs --cov banzai --cov-config={toxinidir}/setup.cfg {posargs} + !cov: pytest --pyargs banzai.tests -m "not e2e" "{toxinidir}/docs" {posargs} + cov: pytest --pyargs banzai.tests "{toxinidir}/docs" --cov banzai --cov-config="{toxinidir}/setup.cfg" {posargs} [testenv:py310-test] install_command = pip install --pre torch -f https://download.pytorch.org/whl/nightly/cpu/torch_nightly.html {opts} {packages} From b8873f487469a54b55e2845dec59a2b5b54eae4d Mon Sep 17 00:00:00 2001 From: Curtis McCully Date: Wed, 5 Feb 2025 11:34:01 -0500 Subject: [PATCH 9/9] Fixes to migration script. Minor updates to deprecated syntax in dockerfile. --- Dockerfile | 4 +-- alembic/migrations/env.py | 2 +- ...dd_calibrations_block_info_proposal_id_.py | 30 ++++++++++++++----- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/Dockerfile b/Dockerfile index f93189520..5a7af7bfb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,7 +11,7 @@ RUN mkdir /home/archive && /usr/sbin/groupadd -g 10000 "domainusers" \ USER archive -ENV HOME /home/archive +ENV HOME=/home/archive WORKDIR /home/archive @@ -21,7 +21,7 @@ RUN . /opt/conda/etc/profile.d/conda.sh && conda config --set remote_read_timeo COPY --chown=10087:10000 . /lco/banzai -ENV PATH /home/archive/envs/banzai/bin:$PATH +ENV PATH=/home/archive/envs/banzai/bin:$PATH RUN /home/archive/envs/banzai/bin/pip install --no-cache-dir /lco/banzai/ diff --git a/alembic/migrations/env.py b/alembic/migrations/env.py index f2daf0e61..38582b6fa 100644 --- a/alembic/migrations/env.py +++ b/alembic/migrations/env.py @@ -32,7 +32,7 @@ def run_migrations() -> None: and associate a connection with the context. """ - connection_url = os.getenv("DATABASE_URL", "sqlite:///fallback.db") + connection_url = os.getenv("DB_ADDRESS", "sqlite:///test.db") connectable = create_engine(connection_url, poolclass=pool.NullPool) with connectable.connect() as connection: diff --git a/alembic/migrations/versions/8e35c490a971_add_calibrations_block_info_proposal_id_.py b/alembic/migrations/versions/8e35c490a971_add_calibrations_block_info_proposal_id_.py index 2f38b43b8..c2f2546ef 100644 --- a/alembic/migrations/versions/8e35c490a971_add_calibrations_block_info_proposal_id_.py +++ b/alembic/migrations/versions/8e35c490a971_add_calibrations_block_info_proposal_id_.py @@ -9,6 +9,7 @@ from alembic import op import sqlalchemy as sa +from sqlalchemy import inspect from sqlalchemy.sql import text import requests import os @@ -23,9 +24,16 @@ def upgrade() -> None: - op.add_column('calimages', sa.Column('blockid', sa.Integer(), nullable=True)) - op.add_column('calimages', sa.Column('proposal', sa.String(50), nullable=True)) - op.add_column('calimages', sa.Column('public_date', sa.DateTime(), nullable=True)) + bind = op.get_bind() + inspector = inspect(bind.engine) + columns = [col['name'] for col in inspector.get_columns('calimages')] + + if 'blockid' not in columns: + op.add_column('calimages', sa.Column('blockid', sa.Integer(), nullable=True)) + if 'proposal' not in columns: + op.add_column('calimages', sa.Column('proposal', sa.String(50), nullable=True)) + if 'public_date' not in columns: + op.add_column('calimages', sa.Column('public_date', sa.DateTime(), nullable=True)) if os.getenv("AUTH_TOKEN") is not None: auth_header = {'Authorization': f'Token {os.environ["AUTH_TOKEN"]}'} else: @@ -36,18 +44,24 @@ def upgrade() -> None: for row in rows: params = {'basename_exact': row.filename.replace('.fz', '').replace('.fits', '')} request_results = requests.get('https://archive-api.lco.global/frames/', params=params, headers=auth_header) - request_results = request_results.json() - if len(request_results['results']) == 0: + request_results = request_results.json()['results'] + if len(request_results) == 0: continue blockid = request_results[0]['BLKUID'] proposal = request_results[0]['proposal_id'] - public_date = datetime.datetime.strptime(request_results[0]['public_date'], '%Y-%m-%dT%H:%M:%S.%fZ') - values = {'public_date': public_date, 'proposal': proposal, 'blockid': blockid} + date_formats = ['%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%SZ'] + for date_format in date_formats: + try: + public_date = datetime.datetime.strptime(request_results[0]['public_date'], date_format) + break + except ValueError: + continue + values = {'id': row.id, 'public_date': public_date, 'proposal': proposal, 'blockid': blockid} query_str = 'blockid = :blockid, proposal = :proposal, public_date = :public_date' if row.frameid is None: query_str += ', frameid = :frameid' - values['frameid'] = request_results['results'][0]['id'] + values['frameid'] = request_results[0]['id'] connection.execute( text(f"UPDATE calimages SET {query_str} WHERE id = :id"), values