Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust the resolution matrix when doing flux calibration #2423

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 80 additions & 4 deletions py/desispec/fluxcalibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1476,15 +1476,91 @@ def apply_flux_calibration(frame, fluxcalib):
(C[i, ok]**2 * fluxcalib.ivar[i, ok] +
frame.flux[i, ok]**2 * frame.ivar[i, ok]
))
if frame.resolution_data is not None:
frame.resolution_data[i] = _calibrate_resolution_matrix(
frame.resolution_data[i], C[i], (C[i] > 0)
& (fluxcalib.ivar[i] > 0))
frame.ivar[i, ~ok] = 0
# It is important we update flux *after*
# updating variance
frame.flux = frame.flux * (C > 0) / (C + (C == 0))

if fluxcalib.fibercorr is not None and frame.fibermap is not None :
if "PSF_TO_FIBER_FLUX" in fluxcalib.fibercorr.dtype.names :
if fluxcalib.fibercorr is not None and frame.fibermap is not None:
if "PSF_TO_FIBER_FLUX" in fluxcalib.fibercorr.dtype.names:
log.info("add a column PSF_TO_FIBER_SPECFLUX to fibermap")
frame.fibermap["PSF_TO_FIBER_SPECFLUX"]=fluxcalib.fibercorr["PSF_TO_FIBER_FLUX"]
frame.fibermap["PSF_TO_FIBER_SPECFLUX"] = fluxcalib.fibercorr["PSF_TO_FIBER_FLUX"]


def _calibrate_resolution_matrix(res_mat, calib, good_calib):
"""
Apply corrections to the resolution matrix given the flux calibration
Arguments:
res_mat: (ndarray) banded storage of the resolution matrix
calib: (ndarray) calibration vector
good_calib: (ndarray) array of booleans for good pixels
Returns:
res_mat_fc: corrected resolution matrix
"""
# We use calculation from #2419
# the calculation is C^{-1} R C
# the only complication if C has zeros.
# Then for the left we take the pseudo inverse
# and for the right we interpolate over bad regions
icalib = np.zeros(len(calib))
icalib[good_calib ] = 1./calib[good_calib]
if not good_calib.any():
rcalib = np.ones(len(calib))
else:
# we need to fill holes
rcalib = _interpolate_bad_regions(calib, ~good_calib)
# now we align the diagonals to the banded storage
# of the resolution matrix
width = res_mat.shape[0]
M1 = [np.roll(icalib, _) for _ in np.arange(-(width//2),(width//2)+1)[::-1]]
M2 = [rcalib for _ in np.arange(width)]
res_mat_out = res_mat * M1 * M2
# This has been tested with X0 and X1 being indentical in this calculation
# M1 = [np.roll(D1, _) for _ in np.arange(-5, 6)[::-1]]
# M2 = [D2 for _ in np.arange(-5, 6)]
# X1 = (desispec.resolution.Resolution(res[0] * M1 * M2).todense())
# X0 = (np.diag(D1) @ r.todense() @ np.diag(D2))
return res_mat_out


def _interpolate_bad_regions(spec,badmask):
"""
Interpolate bad regions
Arguments:
spec: ndarray with data
mask: bad mask that needs to be interpolated over

Returns:
ret Interpolated ndarray
"""
xind = np.nonzero(badmask)[0]
if len(xind) == 0:
return spec
elif len(xind) == len(spec):
return spec
edges = np.nonzero(np.diff(xind, prepend=-10) > 1)[0]
n_edges = len(edges)
spec1 = spec * 1
for i in range(n_edges):
if i == n_edges - 1:
rh = xind[-1]
else:
rh = xind[edges[i + 1] - 1]
lh = xind[edges[i]]
# lh rh inclusive
if lh == 0:
spec1[:rh + 1] = spec[rh + 1]
elif rh == len(spec) - 1:
spec1[lh:] = spec[lh - 1]
else:
spec1[lh:rh + 1] = np.interp(np.arange(lh,
rh + 1), [lh - 1, rh + 1],
[spec[lh - 1], spec[rh + 1]])
return spec1


def ZP_from_calib(exptime, wave, calib):
Expand Down
62 changes: 60 additions & 2 deletions py/desispec/test/test_flux_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def test_apply_fluxcalibration(self):
ivar = np.random.uniform(0.9, 1.1, size=flux.shape)
origframe = Frame(wave, flux.copy(), ivar.copy(), spectrograph=0)

# efine fluxcalib object
# define fluxcalib object
calib = np.random.uniform(.5, 1.5, size=origframe.flux.shape)
mask = np.zeros(origframe.flux.shape, dtype=np.uint32)

Expand Down Expand Up @@ -179,7 +179,7 @@ def test_apply_fluxcalibration(self):
frame=copy.deepcopy(origframe)
apply_flux_calibration(frame,fc)
self.assertTrue(np.all(frame.ivar[0,0:10]==0.0))

# should also work even the calib =0 ??
#fcivar=np.ones_like(origframe.flux)
#calib=np.ones_like(origframe.flux)
Expand All @@ -199,6 +199,64 @@ def test_apply_fluxcalibration(self):
with self.assertRaises(SystemExit): #should be ValueError instead?
apply_flux_calibration(frame,fc)

# if the calibration is constant resolution should not change
resol = np.random.uniform(size=(flux.shape[0], 11, flux.shape[1]))
flux = np.random.uniform(0.9, 1.0, size=(nspec, nwave))
ivar = np.random.uniform(0.9, 1.0, size=(nspec, nwave))
frame = Frame(wave, flux.copy(), ivar.copy(), spectrograph=0,
resolution_data=resol.copy())
calib = flux * 0 + 10
mask = np.zeros(origframe.flux.shape, dtype=np.uint32)
fc = FluxCalib(origframe.wave, calib, ivar, mask)
apply_flux_calibration(frame, fc)
self.assertTrue(np.allclose(frame.resolution_data, resol))

# preservation of resolution equation
resol = np.random.uniform(size=(flux.shape[0], 11, flux.shape[1]))
flux0 = np.random.uniform(0.9, 1.0, size=(nspec, nwave))
calib = np.random.uniform(.5, 1.5, size=origframe.flux.shape)
# input spectra
flux0[:, :6] = 0
flux0[:, -7:] = 0
# convolving with the resolution matrix
flux_conv = np.array([desispec.resolution.Resolution(resol[i])@flux0[i] for i in range(nspec)])
flux0_calib = flux0 / calib
frame = Frame(wave, flux_conv.copy(), ivar.copy(), spectrograph=0,
resolution_data=resol.copy())
mask = np.zeros(origframe.flux.shape, dtype=np.uint32)
fc = FluxCalib(origframe.wave, calib, ivar, mask)
apply_flux_calibration(frame, fc)
flux1_conv = np.array([desispec.resolution.Resolution(
frame.resolution_data[i])@flux0_calib[i] for i in range(nspec)])
self.assertTrue(np.allclose(flux1_conv, frame.flux))

# now we test that we correctly deal with cases where
# calibration is unknown
# preservation of resolution equation
resol = np.random.uniform(size=(flux.shape[0], 11, flux.shape[1]))
flux0 = np.random.uniform(0.9, 1.0, size=(nspec, nwave))
calib0 = flux * 0 + 10
calib = calib0.copy()
# set bad calibration range
calib[:, :10] = 0
calib[:, 20:30] = 0
bad_calib = calib == 0
# input spectra
flux0[:, :6] = 0
flux0[:, -7:] = 0
# convolving with the resolution matrix
flux_conv = np.array([desispec.resolution.Resolution(resol[i])@flux0[i] for i in range(nspec)])
flux0_calib = flux0 / calib0
frame = Frame(wave, flux_conv.copy(), ivar.copy(), spectrograph=0,
resolution_data=resol.copy())
mask = np.zeros(origframe.flux.shape, dtype=np.uint32)
fc = FluxCalib(origframe.wave, calib, ivar, mask)
apply_flux_calibration(frame, fc)
flux1_conv = np.array([desispec.resolution.Resolution(
frame.resolution_data[i])@flux0_calib[i] for i in range(nspec)])
self.assertTrue(np.allclose(flux1_conv[~bad_calib], frame.flux[~bad_calib]))


def test_isStdStar(self):
"""test isStdStar works for cmx, main, and sv1 fibermaps"""
from desispec.fluxcalibration import isStdStar
Expand Down
Loading