Skip to content

Commit

Permalink
Merge pull request #75 from roman-corgi/normalize_L3_by_exposure_time
Browse files Browse the repository at this point in the history
Function that divides by the exposure time
  • Loading branch information
maxwellmb authored Feb 12, 2025
2 parents 4905785 + a9140f7 commit c823392
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 4 deletions.
30 changes: 27 additions & 3 deletions corgidrp/l2b_to_l3.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# A file that holds the functions that transmogrify l2b data to l3 data
import numpy as np

def create_wcs(input_dataset):
"""
Expand All @@ -19,16 +20,39 @@ def divide_by_exptime(input_dataset):
Divide the data by the exposure time to get the units in electrons/s
TODO: Make sure to update the headers to reflect the change in units
Args:
input_dataset (corgidrp.data.Dataset): a dataset of Images (L2b-level)
Returns:
corgidrp.data.Dataset: a version of the input dataset with the data in units of electrons/s
"""
data = input_dataset.copy()

return input_dataset.copy()
all_data_new = np.zeros(data.all_data.shape)
all_err_new = np.zeros(data.all_err.shape)

for i in range(len(data.frames)):
exposure_time = float(data.frames[i].ext_hdr['EXPTIME'])

data.frames[i].data = data.frames[i].data / exposure_time
# data.frames[i].err = data.frames[i].err / exposure_time
scale_factor = (1/exposure_time) * np.ones([data.frames[i].err.shape[1], data.frames[i].err.shape[2]])
# print(data.frames[i].err.shape)
# print(data.frames[i].err[0])
# print(scale_factor.shape)
#data.frames[i].rescale_error(data.frames[i].err, 'normalized by the exposure time')
data.frames[i].rescale_error(scale_factor, 'normalized by the exposure time')

all_data_new[i] = data.frames[i].data
all_err_new[i] = data.frames[i].err

data.frames[i].ext_hdr.set('BUNIT', 'electrons/s')

history_msg = 'divided by the exposure time'
data.update_after_processing_step(history_msg, new_all_data = all_data_new, new_all_err = all_err_new)


return data

def update_to_l3(input_dataset):
"""
Expand Down
33 changes: 32 additions & 1 deletion corgidrp/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@
'cols': 108,
'r0c0': [0, 0]
},

'prescan': {
'rows': 120,
'cols': 108,
'r0c0': [0, 0],
'col_start': 0, #10
'col_end': 108, #100
},
'serial_overscan': {

'serial_overscan' : {
'rows': 120,
'cols': 5,
'r0c0': [0, 215]
Expand Down Expand Up @@ -693,6 +695,7 @@ def create_default_headers(arrtype="SCI", vistype="TDEMO"):
exthdr['DATETIME'] = '2024-01-01T11:00:00.000Z'
exthdr['HIERARCH DATA_LEVEL'] = "L1"
exthdr['MISSING'] = False
exthdr['BUNIT'] = ""

return prihdr, exthdr
def create_badpixelmap_files(filedir=None, col_bp=None, row_bp=None):
Expand Down Expand Up @@ -1012,6 +1015,34 @@ def create_astrom_data(field_path, filedir=None, subfield_radius=0.02, platescal
frames.append(frame)
dataset = data.Dataset(frames)

return dataset
def create_not_normalized_dataset(filedir=None, numfiles=10):
"""
Create simulated data not normalized for the exposure time.
Args:
filedir (str): (Optional) Full path to directory to save to.
numfiles (int): Number of files in dataset. Default is 10.
Returns:
corgidrp.data.Dataset:
the simulated dataset
"""
filepattern = "simcall_not_normalized_{0:04d}.fits"
frames = []
for i in range(numfiles):
prihdr, exthdr = create_default_headers()

sim_data = np.asarray(np.random.poisson(lam=150.0, size=(1024,1024)), dtype=float)
sim_err = np.asarray(np.random.poisson(lam=1.0, size=(1024,1024)), dtype=float)
sim_dq = np.asarray(np.zeros((1024,1024)), dtype=int)
frame = data.Image(sim_data, pri_hdr=prihdr, ext_hdr=exthdr, err=sim_err, dq=sim_dq)
# frame = data.Image(sim_data, pri_hdr = prihdr, ext_hdr = exthdr, err = sim_err, dq = sim_dq)
if filedir is not None:
frame.save(filedir=filedir, filename=filepattern.format(i))
frames.append(frame)
dataset = data.Dataset(frames)

return dataset

def generate_mock_pump_trap_data(output_dir,meta_path, EMgain=10,
Expand Down
56 changes: 56 additions & 0 deletions tests/test_exptime_normalization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#%%
import os
import glob
import pytest
import numpy as np
import corgidrp.data as data
import corgidrp.mocks as mocks
import corgidrp.detector as detector
import corgidrp.l2b_to_l3 as l2b_to_l3

#%%

def test_exptime_normalization():
"""
Generate mock input data and pass into exposure_time_normalization function
"""
###### create simulated data
# check that simulated data folder exists, and create if not
datadir = os.path.join(os.path.dirname(__file__), "simdata")
if not os.path.exists(datadir):
os.mkdir(datadir)

#create simulated data
not_normalized_dataset = mocks.create_not_normalized_dataset(filedir=datadir)

# extract the exposure time from each Image and check that it is >0
exposure_times = np.zeros(len(not_normalized_dataset.frames))
for i in range(len(not_normalized_dataset.frames)):
assert not_normalized_dataset.frames[i].ext_hdr['EXPTIME'] > 0
exposure_times[i] = not_normalized_dataset.frames[i].ext_hdr['EXPTIME']

# divide the simulated data by the exposure time
norm_dataset = l2b_to_l3.divide_by_exptime(not_normalized_dataset)

# test if the function works
for i in range(len(not_normalized_dataset.frames)):

# test that the unit in the header is in electrons/s
assert norm_dataset.frames[i].ext_hdr['BUNIT'] == "electrons/s"

#check that the quality flag has exists and it's 1 everywhere
assert np.mean(not_normalized_dataset.frames[i].dq) == pytest.approx(0, abs=1e-6)

# check that, for each frame, if you multiply the output by the exposure time you can recover the input
assert not_normalized_dataset.frames[i].data == pytest.approx(exposure_times[i] * norm_dataset.frames[i].data, abs=1e-6)
assert not_normalized_dataset.frames[i].err == pytest.approx(exposure_times[i] * norm_dataset.frames[i].err, abs=1e-6)

# check that the output in .frames.data and .frames.err correspond to .all_data and .all_err
assert norm_dataset.frames[i].data == pytest.approx(norm_dataset.all_data[i,:,:], abs = 1e-6)
assert norm_dataset.frames[i].err == pytest.approx(norm_dataset.all_err[i,:,:], abs = 1e-6)

#%%

if __name__ == "__main__":
test_exptime_normalization()
# %%

0 comments on commit c823392

Please sign in to comment.