Skip to content

Commit

Permalink
Merge pull request #287 from roman-corgi/flux_cal
Browse files Browse the repository at this point in the history
Flux calibration
  • Loading branch information
maxwellmb authored Jan 29, 2025
2 parents a52916f + 1c2fc06 commit b962fd7
Show file tree
Hide file tree
Showing 8 changed files with 1,185 additions and 251 deletions.
92 changes: 90 additions & 2 deletions corgidrp/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1262,16 +1262,104 @@ def __init__(self,data_or_filepath, pri_hdr=None,ext_hdr=None, input_dataset=Non
if 'DATATYPE' not in self.ext_hdr or self.ext_hdr['DATATYPE'] != 'TrapCalibration':
raise ValueError("File that was loaded was not a TrapCalibration file.")

class FluxcalFactor(Image):
"""
Class containing the flux calibration factor (and corresponding error) for each band in unit erg/(s * cm^2 * AA)/photo-electron.
To create a new instance of FluxcalFactor, you need to pass the value and error and the filter name in the ext_hdr:
Args:
data_or_filepath (dict or str): either a filepath string corresponding to an
existing FluxcalFactor file saved to disk or the data and error values of the
flux cal factor of a certain filter defined in the header
Attributes:
filter (str): used filter name
nd_filter (str): used neutral density filter or "No"
fluxcal_fac (float): the value of the flux cal factor for the corresponding filter
fluxcal_err (float): the error of the flux cal factor for the corresponding filter
"""
def __init__(self, data_or_filepath, err = None, pri_hdr=None, ext_hdr=None, err_hdr = None, input_dataset = None):
# run the image class contructor
super().__init__(data_or_filepath, err=err, pri_hdr=pri_hdr, ext_hdr=ext_hdr, err_hdr=err_hdr)
# if filepath passed in, just load in from disk as usual
# File format checks
if self.data.shape != (1,1):
raise ValueError('The FluxcalFactor calibration data should be just one float value')

#TBC
self.nd_filter = "ND0" #no neutral density filter in beam
if 'FPAMNAME' in self.ext_hdr:
name = self.ext_hdr['FPAMNAME']
if name.startswith("ND"):
self.nd_filter = name
elif 'FSAMNAME' in self.ext_hdr:
name = self.ext_hdr['FSAMNAME']
if name.startswith("ND"):
self.nd_filter = name
else:
raise ValueError('The FluxcalFactor calibration has no keyword FPAMNAME or FSAMNAME in the header')

if 'CFAMNAME' in self.ext_hdr:
self.filter = self.ext_hdr['CFAMNAME']
else:
raise ValueError('The FluxcalFactor calibration has no filter keyword CFAMNAME in the header')


if isinstance(data_or_filepath, str):
# double check that this is actually a FluxcalFactor file that got read in
# since if only a filepath was passed in, any file could have been read in
if 'DATATYPE' not in self.ext_hdr:
raise ValueError("File that was loaded was not a FluxcalFactor file.")
if self.ext_hdr['DATATYPE'] != 'FluxcalFactor':
raise ValueError("File that was loaded was not a FluxcalFactor file.")
else:
self.ext_hdr['DRPVERSN'] = corgidrp.__version__
self.ext_hdr['DRPCTIME'] = time.Time.now().isot

# make some attributes to be easier to use
self.fluxcal_fac = self.data[0,0]
self.fluxcal_err = self.err[0,0,0]

# if this is a new FluxcalFactors file, we need to bookkeep it in the header
# b/c of logic in the super.__init__, we just need to check this to see if it is a new FluxcalFactors file
if ext_hdr is not None:
if input_dataset is None:
if 'DRPNFILE' not in ext_hdr:
# error check. this is required in this case
raise ValueError("This appears to be a new FluxcalFactor. The dataset of input files needs to be passed \
in to the input_dataset keyword to record history of this FluxcalFactor file.")
else:
pass
else:
# log all the data that went into making this calibration file
self._record_parent_filenames(input_dataset)
# give it a default filename using the first input file as the base
# strip off everything starting at .fits
orig_input_filename = input_dataset[0].filename.split(".fits")[0]

self.ext_hdr['DATATYPE'] = 'FluxcalFactor' # corgidrp specific keyword for saving to disk
self.ext_hdr['BUNIT'] = 'erg/(s * cm^2 * AA)/electron'
self.err_hdr['BUNIT'] = 'erg/(s * cm^2 * AA)/electron'
# add to history
self.ext_hdr['HISTORY'] = "Flux calibration file created"

# use the start date for the filename by default
self.filedir = "."
self.filename = "{0}_FluxcalFactor_{1}_{2}.fits".format(orig_input_filename, self.filter, self.nd_filter)


datatypes = { "Image" : Image,
"Dark" : Dark,
"Dark" : Dark,
"NonLinearityCalibration" : NonLinearityCalibration,
"KGain" : KGain,
"BadPixelMap" : BadPixelMap,
"DetectorNoiseMaps": DetectorNoiseMaps,
"FlatField" : FlatField,
"DetectorParams" : DetectorParams,
"AstrometricCalibration" : AstrometricCalibration,
"TrapCalibration": TrapCalibration }
"TrapCalibration": TrapCalibration,
"FluxcalFactor": FluxcalFactor}

def autoload(filepath):
"""
Expand Down
173 changes: 168 additions & 5 deletions corgidrp/fluxcal.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
import os
import numpy as np
from astropy.io import fits, ascii
from astropy import wcs
from astropy.io import fits, ascii
from astropy.coordinates import SkyCoord
import corgidrp
from photutils.aperture import CircularAperture
from photutils.psf import fit_2dgaussian
from scipy import integrate
import urllib
import corgidrp

# Dictionary of anticipated bright and dim CASLPEC standard star names and corresponding fits names
calspec_names= {
Expand Down Expand Up @@ -54,19 +59,19 @@ def get_calspec_file(star_name):
raise Exception("cannot access CALSPEC archive web page and/or download {0}".format(fits_name))
return file_name

def get_filter_name(dataset):
def get_filter_name(image):
"""
return the name of the transmission curve csv file of used color filter
Args:
dataset (corgidrp.Dataset): dataset of the observed calstar
image (corgidrp.image): image of the observed calstar
Returns:
str: filepath of the selected filter curve
"""
datadir = os.path.join(os.path.dirname(__file__), "data", "filter_curves")
filters = os.path.join(datadir, "*.csv")
filter = dataset[0].ext_hdr['CFAMNAME']
filter = image.ext_hdr['CFAMNAME']
filter_names = os.listdir(datadir)

filter_name = [name for name in filter_names if filter in name]
Expand Down Expand Up @@ -187,7 +192,6 @@ def calculate_flux_ref(filter_wavelength, calspec_flux, wave_ref):
flux_ref = np.interp(wave_ref, filter_wavelength, calspec_flux)
return flux_ref


def compute_color_cor(filter_curve, filter_wavelength , flux_ref, wave_ref, flux_source):
"""
Compute the color correction factor K given the filter bandpass, reference spectrum (CALSPEC),
Expand Down Expand Up @@ -239,3 +243,162 @@ def calculate_band_irradiance(filter_curve, calspec_flux, filter_wavelength):
irrad = integrate.simpson(multi_flux, x=filter_wavelength)

return irrad

def aper_phot(image, encircled_radius, frac_enc_energy = 1., method = 'exact', subpixels = 5):
"""
returns the flux in photo-electrons of a point source at the target Ra/Dec position
and using a circular aperture by applying aperture_photometry of photutils.
We assume that background subtraction is already done.
Args:
image (corgidrp.data.Image): combined source exposure image
encircled_radius (float): pixel radius of the circular aperture to sum the flux
frac_enc_energy (float): fraction of encircled energy inside the encircled_radius of the PSF, inverse aperture correction, 0...1
method (str): {‘exact’, ‘center’, ‘subpixel’}, The method used to determine the overlap of the aperture on the pixel grid,
default is 'exact'. For detailed description see https://photutils.readthedocs.io/en/stable/api/photutils.aperture.CircularAnnulus.html
subpixels (int): For the 'subpixel' method, resample pixels by this factor in each dimension. That is, each pixel is divided
into subpixels**2 subpixels. This keyword is ignored unless method='subpixel', default is 5
Returns:
float: integrated flux of the point source in unit photo-electrons and corresponding error
"""
#calculate the x and y pixel positions using the RA/Dec target position and applying WCS conversion
if frac_enc_energy <=0 or frac_enc_energy > 1:
raise ValueError("frac_enc_energy {0} should be within 0 < fee <= 1".format(str(frac_enc_energy)))
ra = image.pri_hdr['RA']
dec = image.pri_hdr['DEC']

target_skycoord = SkyCoord(ra = ra, dec = dec, unit='deg')
w = wcs.WCS(image.ext_hdr)
pix = wcs.utils.skycoord_to_pixel(target_skycoord, w, origin = 1)
aper = CircularAperture(pix, encircled_radius)
aperture_sums, aperture_sums_errs = \
aper.do_photometry(image.data, error = image.err[0], mask = image.dq.astype(bool), method = method, subpixels = subpixels)
return aperture_sums[0]/frac_enc_energy, aperture_sums_errs[0]/frac_enc_energy

def phot_by_gauss2d_fit(image, fwhm, fit_shape = None):
"""
returns the flux in photo-electrons of a point source at the target Ra/Dec position
and using a circular aperture by applying aperture_photometry of photutils
We assume that background subtraction is already done.
Args:
image (corgidrp.data.Image): combined source exposure image
fwhm (float): estimated fwhm of the point source
fit_shape (int or tuple of two ints): optional
The shape of the fitting region. If a scalar, then it is assumed
to be a square. If `None`, then the shape of the input ``data``.
It must be an odd value and should be much bigger than fwhm.
Returns:
float: integrated flux of the Gaussian2d fit of the point source in unit photo-electrons and corresponding error
"""
ra = image.pri_hdr['RA']
dec = image.pri_hdr['DEC']

target_skycoord = SkyCoord(ra = ra, dec = dec, unit='deg')
w = wcs.WCS(image.ext_hdr)
pix = wcs.utils.skycoord_to_pixel(target_skycoord, w, origin = 1)
# fit_2dgaussian: error weighting raises exception if error is zero
err = image.err[0]
err[err == 0] = np.finfo(np.float32).eps

if fit_shape == None:
fit_shape = np.shape(image.data)[0] -1

psf_phot = fit_2dgaussian(image.data, xypos = pix, fwhm = fwhm, fit_shape = fit_shape, mask = image.dq.astype(bool), error = err)
flux = psf_phot.results['flux_fit'][0]
flux_err = psf_phot.results['flux_err'][0]
return flux, flux_err

def calibrate_fluxcal_aper(dataset_or_image, encircled_radius, frac_enc_energy = 1., method = 'exact', subpixels = 5):
"""
fills the FluxcalFactors calibration product values for one filter band,
calculates the flux calibration factors by aperture photometry.
The band flux values are divided by the found photoelectrons.
Propagates also errors to flux calibration factor calfile.
Args:
dataset_or_image (corgidrp.data.Dataset or corgidrp.data.Image): dataset with one image or image as combined source exposure
encircled_radius (float): pixel radius of the circular aperture to sum the flux
frac_enc_energy (float): fraction of encircled energy inside the encircled_radius of the PSF, inverse aperture correction, 0...1
method (str): {‘exact’, ‘center’, ‘subpixel’}, The method used to determine the overlap of the aperture on the pixel grid,
default is 'exact'. For detailed description see https://photutils.readthedocs.io/en/stable/api/photutils.aperture.CircularAnnulus.html
subpixels (int): For the 'subpixel' method, resample pixels by this factor in each dimension. That is, each pixel is divided
into subpixels**2 subpixels. This keyword is ignored unless method='subpixel', default is 5
Returns:
corgidrp.data.FluxcalFactor: FluxcalFactor calibration object with the value and error of the corresponding filter
"""
if isinstance(dataset_or_image, corgidrp.data.Dataset):
image = dataset_or_image[0]
dataset = dataset_or_image
else:
image = dataset_or_image
dataset = corgidrp.data.Dataset([image])
star_name = image.ext_hdr["TARGET"]

filter_name = image.ext_hdr["CFAMNAME"]
filter_file = get_filter_name(image)
# read the transmission curve from the color filter file
wave, filter_trans = read_filter_curve(filter_file)

calspec_filepath = get_calspec_file(star_name)

# calculate the flux from the user given CALSPEC file binned on the wavelength grid of the filter
flux_ref = read_cal_spec(calspec_filepath, wave)
flux = calculate_band_flux(filter_trans, flux_ref, wave)

ap_sum, ap_sum_err = aper_phot(image, encircled_radius, frac_enc_energy, method = method, subpixels = subpixels)

fluxcal_fac = flux/ap_sum
fluxcal_fac_err = flux/ap_sum**2 * ap_sum_err

fluxcal = corgidrp.data.FluxcalFactor(np.array([[fluxcal_fac]]), err = np.array([[[fluxcal_fac_err]]]), pri_hdr = image.pri_hdr, ext_hdr = image.ext_hdr, input_dataset = dataset)

return fluxcal

def calibrate_fluxcal_gauss2d(dataset_or_image, fwhm, fit_shape = None):
"""
fills the FluxcalFactors calibration product values for one filter band,
calculates the flux calibration factors by fitting a 2D Gaussian.
The band flux values are divided by the found photoelectrons.
Propagates also errors to flux calibration factor calfile.
Args:
dataset_or_image (corgidrp.data.Dataset or corgidrp.data.Image): dataset with one image or image as combined source exposure
fwhm (float): estimated fwhm of the point source
fit_shape (int or tuple of two ints): optional
The shape of the fitting region. If a scalar, then it is assumed
to be a square. If `None`, then the shape of the input ``data``.
It must be an odd value and should be much bigger than fwhm.
Returns:
corgidrp.data.FluxcalFactor: FluxcalFactor calibration object with the value and error of the corresponding filter
"""
if isinstance(dataset_or_image, corgidrp.data.Dataset):
image = dataset_or_image[0]
dataset = dataset_or_image
else:
image = dataset_or_image
dataset = corgidrp.data.Dataset([image])
star_name = image.ext_hdr["TARGET"]
filter_name = image.ext_hdr["CFAMNAME"]
filter_file = get_filter_name(image)
# read the transmission curve from the color filter file
wave, filter_trans = read_filter_curve(filter_file)

calspec_filepath = get_calspec_file(star_name)

# calculate the flux from the user given CALSPEC file binned on the wavelength grid of the filter
flux_ref = read_cal_spec(calspec_filepath, wave)
flux = calculate_band_flux(filter_trans, flux_ref, wave)

flux_sum, flux_sum_err = phot_by_gauss2d_fit(image, fwhm, fit_shape = fit_shape)

fluxcal_fac = flux/flux_sum
fluxcal_fac_err = flux/flux_sum**2 * flux_sum_err

fluxcal = corgidrp.data.FluxcalFactor(np.array([[fluxcal_fac]]), err = np.array([[[fluxcal_fac_err]]]), pri_hdr = image.pri_hdr, ext_hdr = image.ext_hdr, input_dataset = dataset)

return fluxcal
44 changes: 42 additions & 2 deletions corgidrp/l4_to_tda.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# A file that holds the functions that transmogrify l4 data to TDA (Technical Demo Analysis) data
import corgidrp.fluxcal as fluxcal
import numpy as np
import warnings

def determine_app_mag(input_dataset, source_star, scale_factor = 1.):
"""
Expand All @@ -22,7 +23,7 @@ def determine_app_mag(input_dataset, source_star, scale_factor = 1.):
"""
mag_dataset = input_dataset.copy()
# get the filter name from the header keyword 'CFAMNAME'
filter_name = fluxcal.get_filter_name(mag_dataset)
filter_name = fluxcal.get_filter_name(mag_dataset[0])
# read the transmission curve from the color filter file
wave, filter_trans = fluxcal.read_filter_curve(filter_name)

Expand Down Expand Up @@ -68,7 +69,7 @@ def determine_color_cor(input_dataset, ref_star, source_star):
"""
color_dataset = input_dataset.copy()
# get the filter name from the header keyword 'CFAMNAME'
filter_name = fluxcal.get_filter_name(color_dataset)
filter_name = fluxcal.get_filter_name(color_dataset[0])
# read the transmission curve from the color filter file
wave, filter_trans = fluxcal.read_filter_curve(filter_name)
# calculate the reference wavelength of the color filter
Expand Down Expand Up @@ -99,6 +100,45 @@ def determine_color_cor(input_dataset, ref_star, source_star):

return color_dataset


def convert_to_flux(input_dataset, fluxcal_factor):
"""
Convert the data from electron unit to flux unit erg/(s * cm^2 * AA).
Args:
input_dataset (corgidrp.data.Dataset): a dataset of Images
fluxcal_factor (corgidrp.data.FluxcalFactor): flux calibration file
Returns:
corgidrp.data.Dataset: a version of the input dataset with the data in flux units
"""
# you should make a copy the dataset to start
flux_dataset = input_dataset.copy()
flux_cube = flux_dataset.all_data
flux_error = flux_dataset.all_err
if "COL_COR" in flux_dataset[0].ext_hdr:
color_cor_fac = flux_dataset[0].ext_hdr['COL_COR']
else:
warnings.warn("There is no COL_COR keyword in the header, color correction was not done, it is set to 1")
color_cor_fac = 1
factor = fluxcal_factor.fluxcal_fac/color_cor_fac
factor_error = fluxcal_factor.fluxcal_err/color_cor_fac
error_frame = flux_cube * factor_error
flux_cube *= factor

#scale also the old error with the flux_factor and propagate the error
# err = sqrt(err_flux^2 * flux_fac^2 + fluxfac_err^2 * flux^2)
factor_2d = np.ones(np.shape(flux_dataset[0].data)) * factor #TODO 2D should not be necessary anymore after improve_err is merged
flux_dataset.rescale_error(factor_2d, "fluxcal_factor")
flux_dataset.add_error_term(error_frame, "fluxcal_error")

history_msg = "data converted to flux unit erg/(s * cm^2 * AA) by fluxcal_factor {0} plus color correction".format(fluxcal_factor.fluxcal_fac)

# update the output dataset with this converted data and update the history
flux_dataset.update_after_processing_step(history_msg, new_all_data=flux_cube, header_entries = {"BUNIT":"erg/(s*cm^2*AA)", "FLUXFAC":fluxcal_factor.fluxcal_fac})
return flux_dataset

def update_to_tda(input_dataset):
"""
Updates the data level to TDA (Technical Demo Analysis). Only works on L4 data.
Expand Down
Loading

0 comments on commit b962fd7

Please sign in to comment.