Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added delay to trigger after block end and to prefer public/same block/same proposal calibrations. #403

Merged
merged 9 commits into from
Feb 5, 2025
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
1.20.0 (2024-12-11)
-------------------
- Added functionality to delay processing until after the end of the observing block
- Added the ability to only use public calibrations
- Added the ability to prefer calibrations taken within the same block or with the same proposal

1.19.1 (2024-11-05)
-------------------
- Added extra logging and try excepts to catch frames that bypass silently
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ RUN mkdir /home/archive && /usr/sbin/groupadd -g 10000 "domainusers" \

USER archive

ENV HOME /home/archive
ENV HOME=/home/archive

WORKDIR /home/archive

Expand All @@ -21,7 +21,7 @@ RUN . /opt/conda/etc/profile.d/conda.sh && conda config --set remote_read_timeo

COPY --chown=10087:10000 . /lco/banzai

ENV PATH /home/archive/envs/banzai/bin:$PATH
ENV PATH=/home/archive/envs/banzai/bin:$PATH

RUN /home/archive/envs/banzai/bin/pip install --no-cache-dir /lco/banzai/

Expand Down
117 changes: 117 additions & 0 deletions alembic/alembic.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# A generic, single database configuration.

[alembic]
# path to migration scripts
# Use forward slashes (/) also on windows to provide an os agnostic path
script_location = migrations

# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s

# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .

# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =

# max length of characters to apply to the "slug" field
# truncate_slug_length = 40

# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false

# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false

# version location specification; This defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions

# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
# version_path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
version_path_separator = os

# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false

# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8


[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples

# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME

# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME

# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARNING
handlers = console
qualname =

[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
5 changes: 5 additions & 0 deletions banzai/utils/db_migration.py → alembic/db_migration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# I have archived this file here even though it is not in alembic format beacuse
# this is one of the early database migrations we did when we changed banzai to track
# instruments and cameras rather than just telescopes. Future migrations of this type
# will use alembic.

import argparse

from sqlalchemy import create_engine
Expand Down
47 changes: 47 additions & 0 deletions alembic/migrations/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from logging.config import fileConfig

import os
from sqlalchemy import pool, create_engine
from alembic import context

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


def run_migrations() -> None:
"""Run migrations in 'online' mode.

In this scenario we need to create an Engine
and associate a connection with the context.

"""
connection_url = os.getenv("DB_ADDRESS", "sqlite:///test.db")
connectable = create_engine(connection_url, poolclass=pool.NullPool)

with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)

with context.begin_transaction():
context.run_migrations()


run_migrations()
26 changes: 26 additions & 0 deletions alembic/migrations/script.py.mako
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""${message}

Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
${imports if imports else ""}

# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}


def upgrade() -> None:
${upgrades if upgrades else "pass"}


def downgrade() -> None:
${downgrades if downgrades else "pass"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Add calibrations block info, proposal id, and public date to CalibrationImage

Revision ID: 8e35c490a971
Revises:
Create Date: 2025-01-31 12:19:03.407606

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
from sqlalchemy import inspect
from sqlalchemy.sql import text
import requests
import os
import datetime


# revision identifiers, used by Alembic.
revision: str = '8e35c490a971'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
bind = op.get_bind()
inspector = inspect(bind.engine)
columns = [col['name'] for col in inspector.get_columns('calimages')]

if 'blockid' not in columns:
op.add_column('calimages', sa.Column('blockid', sa.Integer(), nullable=True))
if 'proposal' not in columns:
op.add_column('calimages', sa.Column('proposal', sa.String(50), nullable=True))
if 'public_date' not in columns:
op.add_column('calimages', sa.Column('public_date', sa.DateTime(), nullable=True))
if os.getenv("AUTH_TOKEN") is not None:
auth_header = {'Authorization': f'Token {os.environ["AUTH_TOKEN"]}'}
else:
auth_header = None

connection = op.get_bind()
rows = connection.execute(text("SELECT id, frameid, filename FROM calimages"))
for row in rows:
params = {'basename_exact': row.filename.replace('.fz', '').replace('.fits', '')}
request_results = requests.get('https://archive-api.lco.global/frames/', params=params, headers=auth_header)
request_results = request_results.json()['results']
if len(request_results) == 0:
continue

blockid = request_results[0]['BLKUID']
proposal = request_results[0]['proposal_id']
date_formats = ['%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%SZ']
for date_format in date_formats:
try:
public_date = datetime.datetime.strptime(request_results[0]['public_date'], date_format)
break
except ValueError:
continue
values = {'id': row.id, 'public_date': public_date, 'proposal': proposal, 'blockid': blockid}
query_str = 'blockid = :blockid, proposal = :proposal, public_date = :public_date'
if row.frameid is None:
query_str += ', frameid = :frameid'
values['frameid'] = request_results[0]['id']
connection.execute(
text(f"UPDATE calimages SET {query_str} WHERE id = :id"),
values
)


def downgrade() -> None:
op.drop_column('calimages', 'blockid')
op.drop_column('calimages', 'proposal')
op.drop_column('calimages', 'public_date')
11 changes: 8 additions & 3 deletions banzai/calibrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,14 @@ def apply_master_calibration(self, image, master_calibration_image):
pass

def get_calibration_file_info(self, image):
return dbs.get_master_cal(image, self.calibration_type, self.master_selection_criteria,
use_only_older_calibrations=self.runtime_context.use_only_older_calibrations,
db_address=self.runtime_context.db_address)
return dbs.cal_record_to_file_info(
dbs.get_master_cal_record(image, self.calibration_type, self.master_selection_criteria,
self.runtime_context.db_address,
use_only_older_calibrations=self.runtime_context.use_only_older_calibrations,
prefer_same_block_cals=self.runtime_context.prefer_same_block_cals,
prefer_same_proposal_cals=self.runtime_context.prefer_same_proposal_cals,
check_public_cals=self.runtime_context.check_public_cals)
)


class CalibrationComparer(CalibrationUser):
Expand Down
21 changes: 12 additions & 9 deletions banzai/celery.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from dateutil.parser import parse

from celery import Celery
from kombu import Queue

from celery.exceptions import Retry
from banzai import dbs, calibrations, logs
from banzai.utils import date_utils, realtime_utils, stage_utils
from celery.signals import worker_process_init
Expand Down Expand Up @@ -103,9 +103,10 @@ def schedule_calibration_stacking(site: str, runtime_context: dict,
stacking_min_date, stacking_max_date)
if len(blocks_for_calibration) > 0:
# Set the delay to after the latest block end
calibration_end_time = max([parse(block['end']) for block in blocks_for_calibration]).replace(tzinfo=None)
calibration_end_time = max([parse(block['end']) for block in blocks_for_calibration])
calibration_end_time = calibration_end_time.replace(tzinfo=timezone.utc)
stack_delay = timedelta(seconds=runtime_context.CALIBRATION_STACK_DELAYS[frame_type.upper()])
now = datetime.utcnow().replace(microsecond=0)
now = datetime.now(timezone.utc).replace(microsecond=0)
message_delay = calibration_end_time - now + stack_delay
if message_delay.days < 0:
message_delay_in_seconds = 0 # Remove delay if block end is in the past
Expand Down Expand Up @@ -174,16 +175,16 @@ def stack_calibrations(self, min_date: str, max_date: str, instrument_id: int, f
raise self.retry()


@app.task(name='celery.process_image', reject_on_worker_lost=True, max_retries=5)
def process_image(file_info: dict, runtime_context: dict):
@app.task(name='celery.process_image', bind=True, reject_on_worker_lost=True, max_retries=5)
def process_image(self, file_info: dict, runtime_context: dict):
"""
:param file_info: Body of queue message: dict
:param runtime_context: Context object with runtime environment info
"""
logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')})
runtime_context = Context(runtime_context)
try:
if realtime_utils.need_to_process_image(file_info, runtime_context):
logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')})
runtime_context = Context(runtime_context)
if realtime_utils.need_to_process_image(file_info, runtime_context, self):
if 'path' in file_info:
filename = os.path.basename(file_info['path'])
else:
Expand All @@ -193,6 +194,8 @@ def process_image(file_info: dict, runtime_context: dict):
realtime_utils.increment_try_number(filename, db_address=runtime_context.db_address)
stage_utils.run_pipeline_stages([file_info], runtime_context)
realtime_utils.set_file_as_processed(filename, db_address=runtime_context.db_address)
except Retry:
raise
except Exception:
logger.error("Exception processing frame: {error}".format(error=logs.format_exception()),
extra_tags={'file_info': file_info})
Loading