Skip to content

Commit

Permalink
Merge pull request #231 from ornlneutronimaging/add_cli_fitting
Browse files Browse the repository at this point in the history
Imp core fitting modules for CLI application
  • Loading branch information
JeanBilheux authored Nov 5, 2024
2 parents 8cd4878 + 282b278 commit b901cef
Show file tree
Hide file tree
Showing 24 changed files with 6,871 additions and 66 deletions.
29 changes: 0 additions & 29 deletions .github/workflows/main.yml

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/unittest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: run unit tests
run: |
echo "running unit tests"
python -m pytest --cov=src --cov-report=xml --cov-report=term-missing tests/
python -m pytest --cov=src --cov-report=xml --cov-report=term-missing tests/unit
# - name: upload coverage to codecov
# uses: codecov/codecov-action@v4
# if:
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ repos:
hooks:
- id: check-added-large-files
args: [--maxkb=8192]
exclude: "reference/.*"
- id: check-merge-conflict
- id: check-yaml
args: [--allow-multiple-documents]
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies:
- pytest
- pytest-cov
- pytest-xdist
- pytest-repeat # for debugging unstable tests
# compute
- astropy
- lmfit
Expand Down
2,865 changes: 2,865 additions & 0 deletions notebooks/bragg_edge_fitting.ipynb

Large diffs are not rendered by default.

1,614 changes: 1,614 additions & 0 deletions notebooks/cli_step_by_step.ipynb

Large diffs are not rendered by default.

Binary file not shown.
Binary file not shown.
Binary file not shown.
216 changes: 192 additions & 24 deletions src/ibeatles/app/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
from pathlib import Path
from typing import Dict, Any, Optional
from scipy.ndimage import gaussian_filter1d

from ibeatles.core.config import IBeatlesUserConfig
from ibeatles.core.io.data_loading import (
Expand All @@ -14,33 +15,59 @@
get_time_spectra_filename,
)
from ibeatles.core.processing.normalization import normalize_data
from ibeatles.core.fitting.binning import get_bin_coordinates, get_bin_transmission
from ibeatles.core.material import get_initial_bragg_edge_lambda
from ibeatles.core.fitting.kropff.fitting import fit_bragg_edge_single_pass

# Placeholder imports (to be implemented later)
# from ibeatles.core.fitting import perform_fitting
# from ibeatles.core.strain_calculation import calculate_strain


def setup_logging(log_file: Optional[Path] = None) -> None:
def setup_logging(
log_file: Optional[Path] = None, log_level: int = logging.INFO
) -> logging.Logger:
"""
Set up logging for the application.
Parameters
----------
log_file : Path, optional
Path to the log file. If not provided, logs will be saved in the current working directory.
log_level : int, optional
Logging level. Defaults to logging.INFO.
Returns
-------
None
logging.Logger
Configured logger instance.
"""
if log_file is None:
log_file = Path.cwd() / "ibeatles_cli.log"

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler(log_file), logging.StreamHandler()],
)
logger = logging.getLogger("ibeatles_CLI")
logger.setLevel(log_level)

# Avoid adding handlers if they are already configured
if not logger.hasHandlers():
# Create file handler and stream handler
file_handler = logging.FileHandler(log_file)
stream_handler = logging.StreamHandler()

# Set level for each handler
file_handler.setLevel(log_level)
stream_handler.setLevel(log_level)

# Define formatter and add it to handlers
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)

# Add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(stream_handler)

return logger


def load_config(config_path: Path) -> IBeatlesUserConfig:
Expand All @@ -57,6 +84,9 @@ def load_config(config_path: Path) -> IBeatlesUserConfig:
IBeatlesUserConfig
Parsed configuration object.
"""
logger = logging.getLogger("ibeatles_CLI")
logger.info(f"Loading configuration: {config_path}")

with open(config_path, "r") as f:
config_data = json.load(f)
return IBeatlesUserConfig(**config_data)
Expand All @@ -76,7 +106,9 @@ def load_data(config: IBeatlesUserConfig) -> Dict[str, Any]:
Dict[str, Any]
Dictionary containing loaded data.
"""
logging.info("Loading data...")
logger = logging.getLogger("ibeatles_CLI")
logger.info("Loading data...")

# Raw data is mandatory
raw_data = load_data_from_folder(
config.raw_data.raw_data_dir,
Expand Down Expand Up @@ -111,26 +143,147 @@ def load_data(config: IBeatlesUserConfig) -> Dict[str, Any]:
return {"raw_data": raw_data, "open_beam": open_beam, "spectra": spectra}


def perform_fitting(data: Dict[str, Any], config: IBeatlesUserConfig) -> Dict[str, Any]:
def perform_binning(
data: Dict[str, Any], config: IBeatlesUserConfig, spectra_dict: dict
) -> Dict[str, Any]:
"""
Perform fitting on the normalized data.
Perform binning on the normalized data.
Parameters
----------
data : Dict[str, Any]
Dictionary containing normalized data.
config : IBeatlesUserConfig
Parsed configuration object.
spectra_dict:
Dictionary containing time spectra data.
Returns
-------
Dict[str, Any]
Dictionary containing binning results.
"""
logger = logging.getLogger("ibeatles_CLI")
logger.info("Performing binning...")

# Build binning coordinates
bins = get_bin_coordinates(
image_shape=data[0].shape,
**config.analysis.pixel_binning.model_dump(), # to dict for unpacking
)
# extract wavelength data from spectra dict
# default unit is SI unit (meters)
wavelengths_m = spectra_dict["lambda_array"]
# execute binning
bin_transmission = {}
for i, bin_coord in enumerate(bins):
wavelengths_bin, transmission_bin = get_bin_transmission(
images=data,
wavelengths=wavelengths_m,
bin_coords=bin_coord,
lambda_range=None,
)
bin_transmission[str(i)] = {
"wavelengths": wavelengths_bin,
"transmission": transmission_bin,
"coordinates": bin_coord,
}

return bin_transmission


def perform_fitting(
bin_transmission_dict: Dict[str, Any], config: IBeatlesUserConfig
) -> Dict[str, Any]:
"""
Perform fitting on the normalized data.
Parameters
----------
bin_transmission_dict : Dict[str, Any]
Dictionary containing binning results, from function perform_binning.
config : IBeatlesUserConfig
Parsed configuration object.
Returns
-------
Dict[str, Any]
Dictionary containing fitting results.
"""
# Placeholder implementation
logging.info("Performing fitting...")
# fitting_results = perform_fitting(data['normalized_data'], config)
return {"fitting_results": None}
logger = logging.getLogger("ibeatles_CLI")
logger.info("Performing fitting...")

# step_0: prepare the lambda range
lambda_min_angstrom = config.analysis.fitting.lambda_min * 1e10
lambda_max_angstrom = config.analysis.fitting.lambda_max * 1e10
lambda_range_angstrom = lambda_min_angstrom, lambda_max_angstrom
# step_1: get the reference (zero strain) Bragg edge value
lambda_0_angstrom = get_initial_bragg_edge_lambda(
material_config=config.analysis.material,
lambda_range=lambda_range_angstrom,
)
# step_2: setup the initial guess and bounds
# NOTE: the only critical value here is the reference Bragg edge wavelength
initial_parameters = {
"a0": 0.1,
"b0": 0.1,
"a_hkl": 0.1,
"b_hkl": 0.1,
"bragg_edge_wavelength": lambda_0_angstrom, # use the reference Bragg edge as the initial guess
"sigma": 0.01,
"tau": 0.01,
}
parameter_bounds = {
"bragg_edge_wavelength": {
"min": lambda_min_angstrom,
"max": lambda_max_angstrom,
},
"sigma": {"min": 0.001, "max": 0.2},
"tau": {"min": 0.001, "max": 0.2},
}
# step_3: fitting
fit_results = {} # (str(bin_id): lmfit.model.ModelResult)
for key, value in bin_transmission_dict.items():
wavelengths_angstrom = value["wavelengths"] * 1e10
transmission = value["transmission"]
# step_3.1: prepare the fitting range
mask = (wavelengths_angstrom > lambda_min_angstrom) & (
wavelengths_angstrom < lambda_max_angstrom
)
wavelengths_fitting_angstrom = wavelengths_angstrom[mask]
transmission_fitting = transmission[mask]
# step_3.2: fitting a smooth version first to get better initial guess
# NOTE: eventually we will always get a fit for over-smoothed data, so we need to gradually increase the sigma
# although the quality of the initial guess decreases with the increase of sigma
ratio = 0.10
fit_success = False
while not fit_success:
sigma = int(len(transmission_fitting) * ratio)
transmission_smooth = gaussian_filter1d(transmission_fitting, sigma=sigma)
fit_result_smoothed = fit_bragg_edge_single_pass(
wavelengths=wavelengths_fitting_angstrom,
transmission=transmission_smooth,
initial_parameters=initial_parameters,
parameter_bounds=parameter_bounds,
)
if fit_result_smoothed is None:
logger.info(
f"Bin_{key}: Failed fitting with sigma = {sigma}, increase sigma and try again..."
)
ratio += 0.02
continue
else:
fit_success = True
# step_3.3: fitting
fit_result = fit_bragg_edge_single_pass(
wavelengths=wavelengths_fitting_angstrom,
transmission=transmission_fitting,
initial_parameters=fit_result_smoothed.best_values,
parameter_bounds=parameter_bounds,
)
fit_results[key] = fit_result

return fit_results


def calculate_strain(
Expand All @@ -151,8 +304,10 @@ def calculate_strain(
Dict[str, Any]
Dictionary containing strain calculation results.
"""
logger = logging.getLogger("ibeatles_CLI")

# Placeholder implementation
logging.info("Calculating strain...")
logger.info("Calculating strain...")
# strain_results = calculate_strain(data['fitting_results'], config)
return {"strain_results": None}

Expand All @@ -172,9 +327,11 @@ def save_analysis_results(data: Dict[str, Any], config: IBeatlesUserConfig) -> N
-------
None
"""
logger = logging.getLogger("ibeatles_CLI")

# Placeholder implementation
output_dir = config.output["analysis_results_dir"]
logging.info(f"Saving analysis results to {output_dir}...")
logger.info(f"Saving analysis results to {output_dir}...")
# Save fitting results
# Example: np.save(output_dir / "fitting_results.npy", data["fitting_results"])
# Save strain map data
Expand All @@ -198,7 +355,7 @@ def main(config_path: Path, log_file: Optional[Path] = None) -> None:
-------
None
"""
setup_logging(log_file)
logger = setup_logging(log_file)

try:
# Load configuration
Expand All @@ -218,18 +375,29 @@ def main(config_path: Path, log_file: Optional[Path] = None) -> None:
config=config,
output_folder=config.output["normalized_data_dir"],
)
logging.info(f"Normalized data saved to {output_path}.")
logger.info(f"Normalized data saved to {output_path}.")

# Binning
binning_results = perform_binning(
data=normalized_data,
config=config,
spectra_dict=spectra_dict,
)

# Fitting
fitting_results = perform_fitting(
bin_transmission_dict=binning_results,
config=config,
)

# Dummy implementation of the remaining processing steps
fitting_results = perform_fitting(normalized_data, config)
strain_results = calculate_strain(fitting_results, config)

analysis_results = {**fitting_results, **strain_results}
save_analysis_results(analysis_results, config)

logging.info("iBeatles CLI application completed successfully.")
logger.info("iBeatles CLI application completed successfully.")
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
logger.error(f"An error occurred: {str(e)}")
raise


Expand Down
Loading

0 comments on commit b901cef

Please sign in to comment.