diff --git a/PyMca5/PyMcaMath/fitting/Model.py b/PyMca5/PyMcaMath/fitting/Model.py new file mode 100644 index 000000000..b296e150e --- /dev/null +++ b/PyMca5/PyMcaMath/fitting/Model.py @@ -0,0 +1,813 @@ +# /*########################################################################## +# +# The PyMca X-Ray Fluorescence Toolkit +# +# Copyright (c) 2020 European Synchrotron Radiation Facility +# +# This file is part of the PyMca X-ray Fluorescence Toolkit developed at +# the ESRF by the Software group. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +#############################################################################*/ +__author__ = "Wout De Nolf" +__contact__ = "wout.de_nolf@esrf.eu" +__license__ = "MIT" +__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France" + + +import functools +from collections.abc import Sequence, MutableMapping +import numpy +from contextlib import contextmanager, ExitStack +from PyMca5.PyMcaMath.linalg import lstsq +from PyMca5.PyMcaMath.fitting import Gefit + + +def enable_caching(method): + @functools.wraps(method) + def cache_wrapper(self, *args, **kw): + with self.caching_context(): + return method(self, *args, **kw) + + return cache_wrapper + + +def memoize(method): + @functools.wraps(method) + def cache_wrapper(self): + if self.caching_enabled: + name = method.__qualname__ + if name in self._cache: + return self._cache[name] + else: + r = self._cache[name] = method(self) + return r + + return cache_wrapper + + +def memoize_property(method): + return property(memoize(method)) + + +class Cashed(object): + def __init__(self): + self._cache = None + + @contextmanager + def caching_context(self): + reset = self._cache is None + if reset: + self._cache = {} + try: + yield + finally: + if reset: + self._cache = None + + @property + def caching_enabled(self): + return self._cache is not None + + +class Model(Cashed): + """Evaluation and derivatives of a model to be used in least-squares fitting. + """ + + def __init__(self): + self.included_parameters = None + self.excluded_parameters = None + super(Model, self).__init__() + + @property + def xdata(self): + raise AttributeError from NotImplementedError + + @property + def ydata(self): + raise AttributeError from NotImplementedError + + @property + def ystd(self): + raise AttributeError from NotImplementedError + + @property + def ymodel(self): + return self.evaluate() + + @property + def nchannels(self): + raise AttributeError from NotImplementedError + + @property + def parameters(self): + return self._get_parameters() + + @parameters.setter + def parameters(self, values): + return self._set_parameters(values) + + @property + def nparameters(self): + return sum(tpl[1] for tpl in self._parameter_groups()) + + @property + def parameter_names(self): + return list(self._iter_parameter_names()) + + @property + def parameter_group_names(self): + return self._filter_parameter_names(self._parameter_group_names) + + @property + def _parameter_group_names(self): + raise AttributeError from NotImplementedError + + @property + def linear_parameters(self): + return self._get_parameters(linear_only=True) + + @linear_parameters.setter + def linear_parameters(self, params): + return self._set_parameters(params, linear_only=True) + + @property + def nlinear_parameters(self): + return sum(tpl[1] for tpl in self._parameter_groups(linear_only=True)) + + @property + def linear_parameter_names(self): + return list(self._iter_parameter_names(linear_only=True)) + + @property + def linear_parameter_group_names(self): + return self._filter_parameter_names(self._linear_parameter_group_names) + + @property + def _linear_parameter_group_names(self): + raise AttributeError from NotImplementedError + + def _get_parameters(self, linear_only=False): + """ + :param bool linear_only: + :returns array: + """ + return NotImplementedError + + def _set_parameters(self, values, linear_only=False): + """ + :returns values: + :param bool linear_only: + """ + return NotImplementedError + + def _filter_parameter_names(self, names): + included = self.included_parameters + excluded = self.excluded_parameters + if included is None: + included = names + if excluded is None: + excluded = [] + return [name for name in names if name in included and name not in excluded] + + def evaluate(self, xdata=None): + """Evaluate model + + :param array xdata: length nxdata + :returns array: nxdata + """ + raise NotImplementedError + + def evaluate_linear(self, xdata=None): + """Derivate to a specific parameter + + :param array xdata: length nxdata + :returns array: n x nxdata + """ + derivatives = self.linear_derivatives(xdata=xdata) + return self.linear_parameters.dot(derivatives) + + def linear_decomposition(self, xdata=None): + """Linear decomposition + + :param array xdata: length nxdata + :returns array: nparams x nxdata + """ + derivatives = self.linear_derivatives(xdata=xdata) + return self.linear_parameters[:, numpy.newaxis] * derivatives + + def derivative(self, param_idx, xdata=None): + """Derivate to a specific parameter + + :param int param_idx: + :param array xdata: length nxdata + :returns array: nxdata + """ + raise NotImplementedError + + def derivatives(self, xdata=None): + """Derivates to all parameters + + :param array xdata: length nxdata + :returns list(array): nparams x nxdata + """ + if xdata is None: + xdata = self.xdata + return [self.derivative(i, xdata=xdata) for i in range(self.nparameters)] + + def linear_derivatives(self, xdata=None): + """Derivates to all linear parameters + + :param array xdata: length nxdata + :returns array: nparams x nxdata + """ + raise NotImplementedError + + @property + def linear(self): + raise AttributeError from NotImplementedError + + def _iter_parameter_groups(self, linear_only=False): + """ + :param bool linear_only: + :yields (str, int): group name, nb. parameters in the group + """ + raise NotImplementedError + + def _parameter_groups(self, linear_only=False): + """ + :param bool linear_only: + :returns iterable(str, int): group name, nb. parameters in the group + """ + if self.caching_enabled: + cache = self._cache.setdefault("all_parameter_groups", {}) + a = self.included_parameters + b = self.excluded_parameters + if a is not None: + a = tuple(sorted(a)) + if b is not None: + b = tuple(sorted(b)) + key = a, b + it = cache.get(key) + if it is None: + it = cache[key] = list( + self._iter_parameter_groups(linear_only=linear_only) + ) + else: + it = self._iter_parameter_groups(linear_only=linear_only) + return it + + def _iter_parameter_names(self, linear_only=False): + """ + :param bool linear_only: + :yields str: + """ + for name, n in self._parameter_groups(linear_only=linear_only): + if n > 1: + for i in range(n): + yield name + str(i) + else: + yield name + + def _parameter_name_from_index(self, idx, linear_only=False): + """Parameter index to group name and group index + + :returns str, int: group name, index in parameter group + """ + i = 0 + for name, n in self._parameter_groups(linear_only=linear_only): + if idx >= i and idx < (i + n): + return name, idx - i + i += n + + def fit(self, full_output=False): + """ + :param bool full_output: add statistics to fitted parameters + :returns dict: + """ + if self.linear: + return self.linear_fit(full_output=full_output) + else: + return self.nonlinear_fit(full_output=full_output) + + @enable_caching + def linear_fit(self, full_output=False): + """ + :param bool full_output: add statistics to fitted parameters + :returns dict: + """ + A = self.linear_derivatives().T # nchannels, npeaks + b = self.ydata # nchannels + result = lstsq(A, b, digested_output=full_output) + if not full_output: + result = {"parameters": result[0], "uncertainties": result[1]} + return result + + @enable_caching + def nonlinear_fit(self, full_output=False): + """ + :param bool full_output: add statistics to fitted parameters + :returns dict: + """ + initial = self.parameters + try: + result = Gefit.LeastSquaresFit( + self._evaluate, + initial, + model_deriv=self._derivative, + xdata=self.xdata, + ydata=self.ydata, + sigmadata=self.ystd, + fulloutput=full_output, + ) + finally: + self.parameters = initial + ret = { + "parameters": result[0], + "uncertainties": result[2], + "chi2_red": result[1], + } + if full_output: + ret["niter"] = result[3] + ret["lastdeltachi"] = result[4] + return ret + + def _evaluate(self, parameters, xdata): + """Update parameters and evaluate model + + :param array parameters: length nparams + :param array xdata: length nxdata + :returns array: nxdata + """ + self.parameters = parameters + return self.evaluate(xdata=xdata) + + def _derivative(self, parameters, param_idx, xdata): + """Update parameters and return derivate to a specific parameter + + :param array parameters: length nparams + :param int param_idx: + :param array xdata: length nxdata + :returns array: nxdata + """ + self.parameters = parameters + return self.derivative(param_idx, xdata=xdata) + + def use_fit_result(self, result): + """ + :param dict result: + """ + if self.linear: + self.linear_parameters = result["parameters"] + else: + self.parameters = result["parameters"] + + +class ConcatModel(Model): + """Concatenated model with shared parameters + """ + + def __init__(self, models, shared_attributes=None): + if not isinstance(models, Sequence): + models = [models] + for model in models: + if not isinstance(model, Model): + raise ValueError("'models' must be a list of type 'Model'") + self._models = models + self.__fixed_shared_attributes = { + "linear", + "included_parameters", + "excluded_parameters", + } + self.shared_attributes = shared_attributes + super(ConcatModel, self).__init__() + + @property + def model(self): + """Model used to get/set shared attributes + """ + return self._models[0] + + def __getattr__(self, name): + """Get shared attribute + """ + if self.nmodels and name in self.shared_attributes: + return getattr(self.model, name) + raise AttributeError(name) + + def __setattr__(self, name, value): + """Set shared attribute + """ + if ( + name != "_models" + and self.nmodels + and hasattr(self.model, name) + and name in self.shared_attributes + ): + for m in self._models: + setattr(m, name, value) + else: + super(ConcatModel, self).__setattr__(name, value) + + @property + def shared_attributes(self): + """Attributes shared between the fit models (they should have the same value) + """ + return self._shared_attributes + + @shared_attributes.setter + def shared_attributes(self, shared_attributes): + """ + :param Sequence(str) shared_attributes: + """ + if shared_attributes is None: + shared_attributes = set() + else: + shared_attributes = set(shared_attributes) + shared_attributes |= self.__fixed_shared_attributes + if self.nmodels <= 1: + self._shared_attributes = shared_attributes + return + self.share_attributes(shared_attributes) + self.validate_shared_attributes(shared_attributes) + self._shared_attributes = shared_attributes + + def validate_shared_attributes(self, shared_attributes=None): + """Check whether attributes are shared + + :param Sequence(str) shared_attributes: + :raises AssertionError: + """ + if self.nmodels <= 1: + return + if shared_attributes is None: + shared_attributes = self._shared_attributes + for name in shared_attributes: + value = getattr(self.model, name) + if isinstance(value, (Sequence, MutableMapping, numpy.ndarray)): + for m in self._models[1:]: + assert id(value) == id(getattr(m, name)), name + else: + for m in self._models[1:]: + assert value == getattr(m, name), name + + def share_attributes(self, shared_attributes=None): + """Ensure attributes are shared + + :param Sequence(str) shared_attributes: + """ + if self.nmodels <= 1: + return + if shared_attributes is None: + shared_attributes = self._shared_attributes + model = self.model + adict = {name: getattr(model, name) for name in shared_attributes} + for model in self._models[1:]: + for name, value in adict.items(): + setattr(model, name, value) + + @property + def nmodels(self): + return len(self._models) + + @property + def nchannels(self): + nmodels = self.nmodels + if nmodels == 0: + return 0 + else: + return sum([m.nchannels for m in self._models]) + + @property + def xdata(self): + return self._get_data("xdata") + + @xdata.setter + def xdata(self, values): + self._set_data("xdata", values) + + @property + def ydata(self): + return self._get_data("ydata") + + @ydata.setter + def ydata(self, values): + self._set_data("ydata", values) + + @property + def ystd(self): + return self._get_data("ystd") + + @ystd.setter + def ystd(self, values): + self._set_data("ystd", values) + + def _get_data(self, attr): + """ + :param str attr: + :returns array: + """ + nmodels = self.nmodels + if nmodels == 0: + return None + elif nmodels == 1: + return getattr(self.model, attr) + elif getattr(self.model, attr) is None: + return None + else: + return numpy.concatenate([getattr(m, attr) for m in self._models]) + + def _set_data(self, attr, values): + """ + :param str attr: + :param array values: + """ + if len(values) != self.nchannels: + raise ValueError("Not the expected number of channels") + for idx, model in self._iter_models(values): + setattr(model, attr, values[idx]) + + @contextmanager + def _filter_parameter_context(self, shared=True): + keepex = self.excluded_parameters + keepinc = self.included_parameters + try: + if shared: + if keepinc: + self.included_parameters = list( + set(keepinc) - set(self.shared_attributes) + ) + else: + self.included_parameters = self.shared_attributes + else: + if keepex: + self.excluded_parameters.extend(self.shared_attributes) + else: + self.excluded_parameters = self.shared_attributes + yield + finally: + self.excluded_parameters = keepex + self.included_parameters = keepinc + + @property + def nparameters(self): + return sum(m.nparameters for m in self._iter_parameter_models()) + + @property + def nlinear_parameters(self): + return sum(m.nlinear_parameters for m in self._iter_parameter_models()) + + @property + def nshared_parameters(self): + with self._filter_parameter_context(shared=True): + return self.model.nparameters + + @property + def nshared_linear_parameters(self): + with self._filter_parameter_context(shared=True): + return self.model.nlinear_parameters + + def _get_parameters(self, linear_only=False): + """ + :param bool linear_only: + :returns array: + """ + return numpy.concatenate( + [ + m._get_parameters(linear_only=linear_only) + for m in self._iter_parameter_models() + ] + ) + + def _set_parameters(self, values, linear_only=False): + """ + :returns values: + :param bool linear_only: + """ + i = 0 + for m in self._iter_parameter_models(): + if linear_only: + n = m.nlinear_parameters + else: + n = m.nparameters + if n: + m._set_parameters(values[i : i + n], linear_only=linear_only) + i += n + + def _iter_parameter_models(self): + """Iterate over models which are temporarily configured so that + after iterations, all parameters provided. + :yields Model: + """ + with self._filter_parameter_context(shared=True): + yield self.model + with self._filter_parameter_context(shared=False): + for m in self._models: + yield m + + def _parameter_groups(self, linear_only=False): + """ + :param bool linear_only: + :yields (str, int): group name, nb. parameters in the group + """ + with self._filter_parameter_context(shared=True): + for item in self.model._parameter_groups(linear_only=linear_only): + yield item + with self._filter_parameter_context(shared=False): + for i, m in enumerate(self._models): + for name, n in self.model._parameter_groups(linear_only=linear_only): + yield name + str(i), n + + def _parameter_model_index(self, idx, linear_only=False): + """Convert parameter index of ConcatModel to a parameter indices + of the underlying models (only one when parameter is not shared). + + :param bool linear_only: + :param int idx: + :returns iterable(tuple): model index, parameter index in this model + """ + if self.caching_enabled: + cache = self._cache.setdefault("parameter_model_index", {}) + it = cache.get(idx) + if it is None: + it = cache[idx] = list( + self._iter_parameter_index(idx, linear_only=linear_only) + ) + else: + it = self._iter_parameter_index(idx, linear_only=linear_only) + return it + + def _iter_parameter_index(self, idx, linear_only=False): + """Convert parameter index of ConcatModel to a parameter indices + of the underlying models (only one when parameter is not shared). + + :param bool linear_only: + :param int idx: + :yields (int, int): model index, parameter index in this model + """ + if linear_only: + nshared = self.nshared_linear_parameters + else: + nshared = self.nshared_parameters + shared_attributes = self.shared_attributes + if idx < nshared: + for i, m in enumerate(self._models): + iglobal = 0 + imodel = 0 + for name, n in m._parameter_groups(linear_only=linear_only): + if name in shared_attributes: + if idx >= iglobal and idx < (iglobal + n): + yield i, imodel + idx - iglobal + iglobal += n + imodel += n + else: + iglobal = nshared + for i, m in enumerate(self._models): + imodel = 0 + for name, n in m._parameter_groups(linear_only=linear_only): + if name not in shared_attributes: + if idx >= iglobal and idx < (iglobal + n): + yield i, imodel + idx - iglobal + return + iglobal += n + imodel += n + + @property + def shared_parameters(self): + with self._filter_parameter_context(shared=True): + return self.model.parameters + + @shared_parameters.setter + def shared_parameters(self, values): + with self._filter_parameter_context(shared=True): + self.model.parameters = values + + @property + def shared_linear_parameters(self): + with self._filter_parameter_context(shared=True): + return self.model.linear_parameters + + @shared_linear_parameters.setter + def shared_linear_parameters(self, values): + with self._filter_parameter_context(shared=True): + self.model.linear_parameters = values + + def evaluate(self, xdata=None): + """Evaluate model + + :param array xdata: length nxdata + :returns array: nxdata + """ + if xdata is None: + xdata = self.xdata + ret = xdata * 0.0 + for idx, model in self._iter_models(xdata): + ret[idx] = model.evaluate(xdata=xdata[idx]) + return ret + + def derivative(self, param_idx, xdata=None): + """Derivate to a specific parameter + + :param int param_idx: + :param array xdata: length nxdata + :returns array: nxdata + """ + if xdata is None: + xdata = self.xdata + ret = xdata * 0.0 + idx_channels = self._idx_channels(len(xdata)) + for model_idx, param_idx in self._parameter_model_index(param_idx): + idx = idx_channels[model_idx] + model = self._models[model_idx] + ret[idx] = model.derivative(param_idx, xdata=xdata[idx]) + return ret + + def linear_derivatives(self, xdata=None): + """Derivates to all linear parameters + + :param array xdata: length nxdata + :returns array: nparams x nxdata + """ + if xdata is None: + xdata = self.xdata + ret = numpy.empty((self.nlinear_parameters, xdata.size)) + for idx, model in self._iter_models(xdata): + ret[:, idx] = model.linear_derivatives(xdata=xdata[idx]) + return ret + + def _iter_models(self, xdata): + """Loop over the models and yield xdata slice + + :param array xdata: + :yields (slice, Model): + """ + for item in zip(self._idx_channels(len(xdata)), self._models): + yield item + + def _idx_channels(self, nconcat): + """Index of each model in the concatenated data + + :param int nconcat: + :returns list(slice): + """ + if self.caching_enabled: + cache = self._cache.setdefault("idx_channels", {}) + if nconcat != cache.get("nconcat"): + cache["idx"] = list(self._generate_idx_channels(nconcat)) + cache["nconcat"] = nconcat + return cache["idx"] + else: + return list(self._generate_idx_channels(nconcat)) + + def _generate_idx_channels(self, nconcat, stride=None): + """Yield slice of the concatenated data for each model. + The concatenated data could be sliced as `xdata[::stride]`. + """ + nchannels = [m.nchannels for m in self._models] + if not stride: + stride, remain = divmod(sum(nchannels), nconcat) + stride += remain > 0 + start = 0 + offset = 0 + i = 0 + for n in nchannels: + # Index of model in concatenated xdata due to slicing + stop = start + n + lst = list(range(start + offset, stop, stride)) + nlst = len(lst) + # Index of model in concatenated xdata after slicing + idx = slice(i, i + nlst) + i += nlst + # Prepare for next model + if lst: + offset = lst[-1] + stride - stop + else: + offset -= n + start = stop + yield idx + + @contextmanager + def caching_context(self): + with ExitStack() as stack: + ctx = super(ConcatModel, self).caching_context() + stack.enter_context(ctx) + for m in self._models: + stack.enter_context(m.caching_context()) + yield diff --git a/PyMca5/tests/FitModelTest.py b/PyMca5/tests/FitModelTest.py new file mode 100644 index 000000000..63c688a73 --- /dev/null +++ b/PyMca5/tests/FitModelTest.py @@ -0,0 +1,327 @@ +# /*########################################################################## +# +# The PyMca X-Ray Fluorescence Toolkit +# +# Copyright (c) 2020 European Synchrotron Radiation Facility +# +# This file is part of the PyMca X-ray Fluorescence Toolkit developed at +# the ESRF by the Software group. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +#############################################################################*/ +__author__ = "Wout De Nolf" +__contact__ = "wout.de_nolf@esrf.eu" +__license__ = "MIT" +__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France" + +import unittest +import numpy +from PyMca5.tests import SimpleModel + +# from PyMca5.PyMcaMisc.ProfilingUtils import profile + + +def with_model(nmodels): + def inner1(method): + def inner2(self, *args, **kw): + self.create_model(nmodels=nmodels) + try: + return method(self, *args, **kw) + finally: + self.validate_model() + + return inner2 + + return inner1 + + +class testFitModel(unittest.TestCase): + def setUp(self): + self.random_state = numpy.random.RandomState(seed=0) + + def create_model(self, nmodels): + if nmodels == 1: + self.fitmodel = SimpleModel.SimpleModel() + else: + self.fitmodel = SimpleModel.SimpleConcatModel(ndetectors=nmodels) + assert not self.fitmodel.linear + self.init_random() + self.fitmodel.ydata = self.fitmodel.ymodel + numpy.testing.assert_array_equal(self.fitmodel.ydata, self.fitmodel.ymodel) + self.validate_model() + + def init_random(self, **kw): + if isinstance(self.fitmodel, SimpleModel.SimpleConcatModel): + for model in self.fitmodel._models: + self._init_random(model, **kw) + self.fitmodel.shared_attributes = self.fitmodel.shared_attributes + else: + self._init_random(self.fitmodel, **kw) + + def _init_random(self, model, npeaks=10, nchannels=2048, border=0.1): + """Peaks close to the border will cause the nlls to fail + """ + self.nnonglobals = 4 # zero, gain, wzero, wgain + self.nglobals = npeaks # concentrations + model.xdata_raw = numpy.arange(nchannels) + model.ydata_raw = numpy.full(nchannels, numpy.nan) + model.xmin = self.random_state.randint(low=0, high=10) + model.xmax = self.random_state.randint(low=nchannels - 10, high=nchannels) + model.zero = self.random_state.uniform(low=1, high=1.5) + model.gain = self.random_state.uniform(low=10e-3, high=11e-3) + a = model.zero + b = model.zero + model.gain * nchannels + border = border * (b - a) + a += border + b -= border + model.positions = numpy.linspace(a, b, npeaks) + model.wzero = self.random_state.uniform(low=0.0, high=0.01) + model.wgain = self.random_state.uniform(low=0.05, high=0.1) + model.concentrations = self.random_state.uniform(low=0.5, high=1, size=npeaks) + model.efficiency = self.random_state.uniform(low=5000, high=6000, size=npeaks) + + def modify_random(self, only_linear=False): + if isinstance(self.fitmodel, SimpleModel.SimpleConcatModel): + self._modify_random_concat(only_linear=only_linear) + else: + self._modify_random(only_linear=only_linear) + self.validate_model() + # self.plot() + + def _modify_random(self, only_linear=False): + if only_linear: + p = self.fitmodel.parameters + plin = self.fitmodel.linear_parameters + plin *= numpy.random.uniform(0.5, 0.8, len(plin)) + else: + p = self.fitmodel.parameters + plin = self.fitmodel.linear_parameters + p *= numpy.random.uniform(0.95, 1, len(p)) + plin *= numpy.random.uniform(0.5, 0.8, len(plin)) + self.fitmodel.parameters = p + assert numpy.array_equal(self.fitmodel.parameters, p) + self.fitmodel.linear_parameters = plin + assert numpy.array_equal(self.fitmodel.linear_parameters, plin) + return p + + def _modify_random_concat(self, only_linear=False): + p = self._modify_random(only_linear=only_linear) + assert not numpy.array_equal( + self.fitmodel.parameters[: self.nglobals], p[: self.nglobals] + ) + assert numpy.array_equal( + self.fitmodel.parameters[self.nglobals :], p[self.nglobals :] + ) + + def validate_model(self): + self._validate_model(self.fitmodel) + if isinstance(self.fitmodel, SimpleModel.SimpleConcatModel): + for model in self.fitmodel._models: + self._validate_model(model) + + def _validate_model(self, model): + is_concat = isinstance(model, SimpleModel.SimpleConcatModel) + + assert not model.excluded_parameters + assert not model.included_parameters + assert model.nchannels == len(model.xdata) + assert model.nparameters == len(model.parameters) + assert model.nlinear_parameters == len(model.linear_parameters) + arr1 = model.evaluate() + arr2 = model.evaluate_linear() + arr3 = sum(model.linear_decomposition()) + arr4 = model.ymodel + numpy.testing.assert_allclose(arr1, arr2) + numpy.testing.assert_allclose(arr1, arr3) + numpy.testing.assert_allclose(arr1, arr4) + + nonlin_names = ["zero", "gain", "wzero", "wgain"] + lin_names = ["concentrations" + str(i) for i in range(self.nglobals)] + names = nonlin_names + lin_names + if is_concat: + model.validate_shared_attributes() + assert model.nshared_parameters == self.nglobals + assert model.nshared_linear_parameters == self.nglobals + nmodels = model.nmodels + nonglobal_names = [ + name + str(i) for i in range(nmodels) for name in nonlin_names + ] + global_names = lin_names + names = global_names + nonglobal_names + n = self.nglobals + self.nnonglobals * nmodels + assert model.nparameters == n + assert model.nlinear_parameters == self.nglobals + assert model.parameter_names == names + assert model.linear_parameter_names == lin_names + else: + assert model.nparameters == self.nglobals + self.nnonglobals + assert model.nlinear_parameters == self.nglobals + assert model.parameter_names == names + assert model.linear_parameter_names == lin_names + + def plot(self): + import matplotlib.pyplot as plt + + m = self.fitmodel + derivatives = m.derivatives() + names = m.parameter_names + plt.figure() + plt.plot(m.ydata, label="data") + plt.plot(m.ymodel, label="model") + plt.legend() + plt.figure() + for y, name in zip(derivatives, names): + plt.plot(y, label=name) + plt.title("Derivatives") + plt.legend() + plt.show() + + @with_model(1) + def testLinearFit(self): + self._testLinearFit() + + @with_model(8) + def testLinearFitConcat(self): + self._testLinearFit() + + def _testLinearFit(self): + self.fitmodel.linear = True + expected = self.fitmodel.linear_parameters.copy() + self.modify_random(only_linear=True) + result = self.fitmodel.fit() + self.assert_result(result, expected) + assert not numpy.allclose(self.fitmodel.ydata, self.fitmodel.ymodel) + assert not numpy.allclose(self.fitmodel.linear_parameters, expected) + self.fitmodel.use_fit_result(result) + numpy.testing.assert_allclose(self.fitmodel.ydata, self.fitmodel.ymodel) + numpy.testing.assert_allclose(self.fitmodel.linear_parameters, expected) + + @with_model(1) + def testNonLinearFit(self): + self._testNonLinearFit() + + @with_model(8) + def testNonLinearFitConcat(self): + self._testNonLinearFit() + + def _testNonLinearFit(self): + self.fitmodel.linear = False + expected1 = self.fitmodel.parameters.copy() + expected2 = self.fitmodel.linear_parameters.copy() + self.modify_random(only_linear=False) + # with profile(memory=False, filename="testNonLinearFit.pyprof"): + result = self.fitmodel.fit(full_output=True) + # TODO: non-linear parameters not precise + # self.assert_result(result, expected1) + assert not numpy.allclose(self.fitmodel.ydata, self.fitmodel.ymodel) + assert not numpy.allclose(self.fitmodel.parameters, expected1) + assert not numpy.allclose(self.fitmodel.linear_parameters, expected2) + self.fitmodel.use_fit_result(result) + self.assert_ymodel() + # TODO: non-linear parameters not precise + # numpy.testing.assert_allclose(self.fitmodel.parameters, expected1) + numpy.testing.assert_allclose(self.fitmodel.linear_parameters, expected2) + + def assert_result(self, result, expected): + p = numpy.asarray(result["parameters"]) + pstd = numpy.asarray(result["uncertainties"]) + ll = p - 3 * pstd + ul = p + 3 * pstd + assert all((expected >= ll) & (expected <= ul)) + + def assert_ymodel(self): + a = self.fitmodel.ydata + b = self.fitmodel.ymodel + mask = (a > 1) & (b > 1) + assert mask.any() + numpy.testing.assert_allclose(a[mask], b[mask], rtol=1e-3) + + @with_model(8) + def testParameterIndex(self): + # Test parameter index conversion from concatenated to model to model + nmodels = self.fitmodel.nmodels + nglobals = self.nglobals + for linear in [False, True]: + self.fitmodel.linear = linear + if linear: + nnonglobals = 0 + else: + nnonglobals = self.nnonglobals + imodels = [] + iparams = [] + for param_idx in range(self.fitmodel.nparameters): + lst = list( + self.fitmodel._parameter_model_index(param_idx, linear_only=linear) + ) + if lst: + imodel, iparam = list(zip(*lst)) + else: + imodel, iparam = tuple(), tuple() + if param_idx < nglobals: + assert imodel == tuple(range(nmodels)) + assert iparam == tuple([nnonglobals + param_idx] * nmodels) + else: + imodels.extend(imodel) + iparams.extend(iparam) + assert len(imodels) == nnonglobals * nmodels + expected = numpy.repeat(list(range(nmodels)), nnonglobals) + assert imodels == expected.tolist() + expected = numpy.tile(list(range(nnonglobals)), nmodels) + assert iparams == expected.tolist() + + @with_model(8) + def testChannelIndex(self): + # Test model index in concatenated + strides = [2, 3, 100, 1000, 1100, 1200, 3000] + for stride in strides: + x = self.fitmodel.xdata + x2 = x[::stride] + access_cnt = numpy.zeros(len(x2), dtype=int) + vstride = stride + if stride < 1000: + vstride = None + for idx in self.fitmodel._generate_idx_channels(len(x2), stride=vstride): + chunk = x2[idx] + access_cnt[idx] += 1 + assert all(numpy.diff(chunk) == stride) + assert all(access_cnt == 1) + + +def getSuite(auto=True): + testSuite = unittest.TestSuite() + if auto: + testSuite.addTest(unittest.TestLoader().loadTestsFromTestCase(testFitModel)) + else: + # use a predefined order + testSuite.addTest(testFitModel("testParameterIndex")) + testSuite.addTest(testFitModel("testChannelIndex")) + testSuite.addTest(testFitModel("testLinearFit")) + testSuite.addTest(testFitModel("testNonLinearFit")) + testSuite.addTest(testFitModel("testLinearFitConcat")) + testSuite.addTest(testFitModel("testNonLinearFitConcat")) + return testSuite + + +def test(auto=False): + unittest.TextTestRunner(verbosity=2).run(getSuite(auto=auto)) + + +if __name__ == "__main__": + test() diff --git a/PyMca5/tests/SimpleModel.py b/PyMca5/tests/SimpleModel.py new file mode 100644 index 000000000..f828be12f --- /dev/null +++ b/PyMca5/tests/SimpleModel.py @@ -0,0 +1,312 @@ +# /*########################################################################## +# +# The PyMca X-Ray Fluorescence Toolkit +# +# Copyright (c) 2020 European Synchrotron Radiation Facility +# +# This file is part of the PyMca X-ray Fluorescence Toolkit developed at +# the ESRF by the Software group. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +#############################################################################*/ +__author__ = "Wout De Nolf" +__contact__ = "wout.de_nolf@esrf.eu" +__license__ = "MIT" +__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France" + + +import numpy +from PyMca5.PyMcaMath.fitting import SpecfitFuns +from PyMca5.PyMcaMath.fitting.Model import Model, ConcatModel + + +class SimpleModel(Model): + """Model MCA data using a fixed list of peak positions and efficiencies + """ + + def __init__(self): + self.config = { + "detector": {"zero": 0.0, "gain": 1.0, "wzero": 0.0, "wgain": 1.0}, + "matrix": {"positions": [], "concentrations": [], "efficiency": []}, + "fit": {"linear": False}, + "xmin": 0.0, + "xmax": 1.0, + } + self.xdata_raw = None + self.ydata_raw = None + self.ystd_raw = None + self.sigma_to_fwhm = 2 * numpy.sqrt(2 * numpy.log(2)) + super(SimpleModel, self).__init__() + + def __str__(self): + return "{}(npeaks={}, zero={}, gain={}, wzero={}, wgain={})".format( + self.__class__, self.npeaks, self.zero, self.gain, self.wzero, self.wgain + ) + + @property + def zero(self): + return self.config["detector"]["zero"] + + @zero.setter + def zero(self, value): + self.config["detector"]["zero"] = value + + @property + def gain(self): + return self.config["detector"]["gain"] + + @gain.setter + def gain(self, value): + self.config["detector"]["gain"] = value + + @property + def wzero(self): + return self.config["detector"]["wzero"] + + @wzero.setter + def wzero(self, value): + self.config["detector"]["wzero"] = value + + @property + def wgain(self): + return self.config["detector"]["wgain"] + + @wgain.setter + def wgain(self, value): + self.config["detector"]["wgain"] = value + + @property + def efficiency(self): + return self.config["matrix"]["efficiency"] + + @efficiency.setter + def efficiency(self, value): + arr = self.config["matrix"]["efficiency"] + self.config["matrix"]["efficiency"] = value + + @property + def positions(self): + return self.config["matrix"]["positions"] + + @positions.setter + def positions(self, value): + self.config["matrix"]["positions"] = value + + @property + def fwhms(self): + return self.zero + self.gain * self.positions + + @property + def areas(self): + return self.efficiency * self.concentrations + + @property + def concentrations(self): + return self.config["matrix"]["concentrations"] + + @concentrations.setter + def concentrations(self, value): + self.config["matrix"]["concentrations"] = value + + @property + def linear(self): + return self.config["fit"]["linear"] + + @linear.setter + def linear(self, value): + self.config["fit"]["linear"] = value + + @property + def idx_channels(self): + return slice(self.xmin, self.xmax) + + @property + def xdata(self): + if self.xdata_raw is None: + return None + else: + return self.xdata_raw[self.idx_channels] + + @xdata.setter + def xdata(self, values): + self.xdata_raw[self.idx_channels] = values + + @property + def xenergy(self): + return self.zero + self.gain * self.xdata + + @property + def ydata(self): + if self.ydata_raw is None: + return None + else: + return self.ydata_raw[self.idx_channels] + + @ydata.setter + def ydata(self, values): + self.ydata_raw[self.idx_channels] = values + + @property + def ystd(self): + if self.ystd_raw is None: + return None + else: + return self.ystd_raw[self.idx_channels] + + @ystd.setter + def ystd(self, values): + self.ystd_raw[self.idx_channels] = values + + @property + def nchannels(self): + return self.xmax - self.xmin + + @property + def npeaks(self): + return len(self.concentrations) + + @property + def _parameter_group_names(self): + return ["zero", "gain", "wzero", "wgain", "concentrations"] + + @property + def _linear_parameter_group_names(self): + return ["concentrations"] + + def _iter_parameter_groups(self, linear_only=False): + """ + :param bool linear_only: + :yields (str, int): group name, nb. parameters in the group + """ + if linear_only: + names = self.linear_parameter_group_names + else: + names = self.parameter_group_names + for name in names: + if name == "zero": + yield name, 1 + elif name == "gain": + yield name, 1 + elif name == "wzero": + yield name, 1 + elif name == "wgain": + yield name, 1 + elif name == "concentrations": + yield name, self.npeaks + else: + raise ValueError(name) + + def _get_parameters(self, linear_only=False): + i = 0 + if linear_only: + params = numpy.zeros(self.nlinear_parameters) + else: + params = numpy.zeros(self.nparameters) + for name, n in self._parameter_groups(linear_only=linear_only): + if name == "zero": + params[i : i + n] = self.zero + elif name == "gain": + params[i : i + n] = self.gain + elif name == "wzero": + params[i : i + n] = self.wzero + elif name == "wgain": + params[i : i + n] = self.wgain + elif name == "concentrations": + params[i : i + n] = self.concentrations + i += n + return params + + def _set_parameters(self, params, linear_only=False): + i = 0 + for name, n in self._parameter_groups(linear_only=linear_only): + if n > 1: + getattr(self, name)[:] = params[i : i + n] + else: + setattr(self, name, params[i]) + i += n + + def evaluate(self, xdata=None): + """DEvaluate model + + :param array xdata: length nxdata + :returns array: nxdata + """ + if xdata is None: + xdata = self.xdata + x = self.zero + self.gain * xdata + p = list(zip(self.areas, self.positions, self.fwhms)) + return SpecfitFuns.agauss(p, x) + + def linear_derivatives(self, xdata=None): + """Derivates to all linear parameters + + :param array xdata: length nxdata + :returns array: nparams x nxdata + """ + if xdata is None: + xdata = self.xdata + x = self.zero + self.gain * xdata + it = zip(self.efficiency, self.positions, self.fwhms) + return numpy.array([SpecfitFuns.agauss([a, p, w], x) for a, p, w in it]) + + def derivative(self, param_idx, xdata=None): + """Derivate to a specific parameter + + :param int param_idx: + :param array xdata: length nxdata + :returns array: nxdata + """ + if xdata is None: + xdata = self.xdata + x = self.zero + self.gain * xdata + name, i = self._parameter_name_from_index(param_idx) + if name == "concentrations": + p = self.positions[i] + a = self.efficiency[i] + w = self.wzero + self.wgain * p + y = SpecfitFuns.agauss([a, p, w], x) + else: + fwhms = self.fwhms + sigmas = fwhms / self.sigma_to_fwhm + y = x * 0.0 + for p, a, w, s in zip(self.positions, self.areas, fwhms, sigmas): + if name in ("zero", "gain"): + # Derivative to position + m = -(x - p) / s ** 2 + # Derivative to position param + if name == "gain": + m *= xdata + else: + # Derivative to FWHM + m = ((x - p) ** 2 / s ** 2 - 1) / (self.sigma_to_fwhm * s) + # Derivative to FWHM param + if name == "wgain": + m *= p + y += m * SpecfitFuns.agauss([a, p, w], x) + return y + + +class SimpleConcatModel(ConcatModel): + def __init__(self, ndetectors=1): + models = [SimpleModel() for i in range(ndetectors)] + shared_attributes = ["concentrations", "positions"] + super(SimpleConcatModel, self).__init__( + models, shared_attributes=shared_attributes + )