diff --git a/cobaya/component.py b/cobaya/component.py index 98799cb20..a1c5fde80 100644 --- a/cobaya/component.py +++ b/cobaya/component.py @@ -5,15 +5,16 @@ from inspect import cleandoc from packaging import version from importlib import import_module, resources -from typing import Optional, Union, List, Set +from typing import Optional, Union, List, Set, get_type_hints from cobaya.log import HasLogger, LoggedError, get_logger -from cobaya.typing import Any, InfoDict, InfoDictIn, empty_dict +from cobaya.typing import Any, InfoDict, InfoDictIn, empty_dict, validate_type from cobaya.tools import resolve_packages_path, load_module, get_base_classes, \ get_internal_class_component_name, deepcopy_where_possible, VersionCheckError from cobaya.conventions import kinds, cobaya_package, reserved_attributes from cobaya.yaml import yaml_load_file, yaml_dump, yaml_load from cobaya.mpi import is_main_process +import cobaya class Timer: @@ -278,7 +279,7 @@ def get_defaults(cls, return_yaml=False, yaml_expand_defaults=True, "(type declarations without values are fine " "with yaml file as well).", cls.get_qualified_class_name(), list(both)) - options |= yaml_options + options.update(yaml_options) yaml_text = None if return_yaml and not yaml_expand_defaults: return yaml_text or "" @@ -315,6 +316,7 @@ def get_annotations(cls) -> InfoDict: if issubclass(base, HasDefaults) and base is not HasDefaults: d.update(base.get_annotations()) + # from Python 10 should use just cls.__annotations__ d.update({k: v for k, v in cls.__dict__.get("__annotations__", {}).items() if not k.startswith('_')}) return d @@ -331,6 +333,8 @@ class CobayaComponent(HasLogger, HasDefaults): _at_resume_prefer_new: List[str] = ["version"] _at_resume_prefer_old: List[str] = [] + _enforce_types: bool = False + def __init__(self, info: InfoDictIn = empty_dict, name: Optional[str] = None, timing: Optional[bool] = None, @@ -355,6 +359,8 @@ def __init__(self, info: InfoDictIn = empty_dict, except AttributeError: raise AttributeError("Cannot set {} attribute for {}!".format(k, self)) self.set_logger(name=self._name) + self.validate_attributes(annotations) + self.set_timing_on(timing) try: if initialize: @@ -412,21 +418,35 @@ def has_version(self): """ return True - def validate_info(self, k: str, value: Any, annotations: dict): + def validate_info(self, name: str, value: Any, annotations: dict): """ Does any validation on parameter k read from an input dictionary or yaml file, before setting the corresponding class attribute. - You could enforce consistency with annotations here, but does not by default. + This check is always done, even if _enforce_types is not set. - :param k: name of parameter + :param name: name of parameter :param value: value :param annotations: resolved inherited dictionary of attributes for this class """ - # by default just test booleans, e.g. for typos of "false" which evaluate true - if annotations.get(k) is bool and value and isinstance(value, str): + if annotations.get(name) is bool and value and isinstance(value, str): raise AttributeError("Class '%s' parameter '%s' should be True " - "or False, got '%s'" % (self, k, value)) + "or False, got '%s'" % (self, name, value)) + + def validate_attributes(self, annotations: dict): + """ + If _enforce_types or cobaya.typing.enforce_type_checking is set, this + checks all class attributes against the annotation types + + :param annotations: resolved inherited dictionary of attributes for this class + :raises: TypeError if any attribute does not match the annotation type + """ + check = cobaya.typing.enforce_type_checking + if check or self._enforce_types and check is not False: + hints = get_type_hints(self.__class__) # resolve any deferred attributes + for name in annotations: + validate_type(hints[name], getattr(self, name, None), + self.get_name() + ':' + name) @classmethod def get_kind(cls): diff --git a/cobaya/cosmo_input/input_database.py b/cobaya/cosmo_input/input_database.py index 601265cce..8244386a9 100644 --- a/cobaya/cosmo_input/input_database.py +++ b/cobaya/cosmo_input/input_database.py @@ -415,8 +415,8 @@ like_cmb: InfoDict = { none: {}, - "planck_NPIPE": { - "desc": "Planck NPIPE (native; polarized NPIPE CMB + lensing)", + "planck_NPIPE_CamSpec": { + "desc": "Planck NPIPE CamSpec (native; polarized NPIPE CMB + lensing)", "sampler": cmb_sampler_recommended, "theory": {theo: {"extra_args": cmb_precision[theo]} for theo in ["camb", "classy"]}, @@ -427,6 +427,22 @@ "planckpr4lensing": {'package_install': {'github_repository': 'carronj/planck_PR4_lensing', 'min_version': '1.0.2'}}}}, + "planck_NPIPE_Hillipop": { + "desc": "Planck NPIPE Hillipop+Lollipop (polarized NPIPE CMB + lensing)", + "sampler": cmb_sampler_recommended, + "theory": {theo: {"extra_args": cmb_precision[theo]} + for theo in ["camb", "classy"]}, + "likelihood": { + "planck_2018_lowl.TT": None, + "planck_2020_lollipop.lowlE": + {'package_install': {'pip': 'planck-npipe/lollipop', + 'min_version': '4.1.1'}}, + "planck_2020_hillipop.TTTEEE": + {'package_install': {'pip': 'planck-npipe/hillipop', + 'min_version': '4.2.2'}}, + "planckpr4lensing": + {'package_install': {'github_repository': 'carronj/planck_PR4_lensing', + 'min_version': '1.0.2'}}}}, "planck_2018": { "desc": "Planck 2018 (Polarized CMB + lensing)", "sampler": cmb_sampler_recommended, @@ -610,14 +626,22 @@ preset: InfoDict = dict([ (none, {"desc": "(No preset chosen)"}), # Pure CMB ####################################################### - ("planck_NPIPE_camb", { - "desc": "Planck NPIPE with CAMB (all native Python)", + ("planck_NPIPE_CamSpec_camb", { + "desc": "Planck NPIPE CamSpec with CAMB (all native Python)", + "theory": "camb", + "like_cmb": "planck_NPIPE_CamSpec"}), + ("planck_NPIPE_CamSpec_classy", { + "desc": "Planck NPIPE CamSpec with CLASS (all native Python)", + "theory": "classy", + "like_cmb": "planck_NPIPE_CamSpec"}), + ("planck_NPIPE_Hillipop_camb", { + "desc": "Planck NPIPE Hillipop+Lollipop with CAMB (all native Python)", "theory": "camb", - "like_cmb": "planck_NPIPE"}), - ("planck_NPIPE_classy", { - "desc": "Planck NPIPE with CLASS (all native Python)", + "like_cmb": "planck_NPIPE_Hillipop"}), + ("planck_NPIPE_Hillipop_classy", { + "desc": "Planck NPIPE Hillipop+Lollipop with CLASS (all native Python)", "theory": "classy", - "like_cmb": "planck_NPIPE"}), + "like_cmb": "planck_NPIPE_Hillipop"}), ("planck_2018_camb", { "desc": "Planck 2018 with CAMB", "theory": "camb", @@ -745,7 +769,7 @@ # BASIC INSTALLATION ##################################################################### install_basic: InfoDict = { "theory": theory, - "likelihood": dict(like_cmb["planck_NPIPE"]["likelihood"], **{ + "likelihood": dict(like_cmb["planck_NPIPE_CamSpec"]["likelihood"], **{ # 2018 lensing ensured covmat database also installed "planck_2018_lensing.native": None, "sn.pantheon": None, diff --git a/cobaya/input.py b/cobaya/input.py index a1c0929c9..9b8f5827b 100644 --- a/cobaya/input.py +++ b/cobaya/input.py @@ -29,6 +29,7 @@ from cobaya.log import LoggedError, get_logger from cobaya.parameterization import expand_info_param from cobaya import mpi +import cobaya.typing # Logger logger = get_logger(__name__) @@ -141,6 +142,8 @@ def load_info_overrides(*infos_or_yaml_or_files, **flags) -> InputDict: for flag, value in flags.items(): if value is not None: info[flag] = value + if cobaya.typing.enforce_type_checking: + cobaya.typing.validate_type(InputDict, info) return info diff --git a/cobaya/likelihood.py b/cobaya/likelihood.py index 8ce50032f..90b9f26f9 100644 --- a/cobaya/likelihood.py +++ b/cobaya/likelihood.py @@ -126,7 +126,7 @@ def calculate(self, state, want_derived=True, **params_values_dict): derived: Optional[ParamValuesDict] = {} if want_derived else None state["logp"] = -np.inf # in case of exception state["logp"] = self.logp(_derived=derived, **params_values_dict) - self.log.debug("Computed log-likelihood = %r", state["logp"]) + self.log.debug("Computed log-likelihood = %s", state["logp"]) if derived is not None: state["derived"] = derived.copy() diff --git a/cobaya/likelihoods/base_classes/planck_2018_CamSpec_python.py b/cobaya/likelihoods/base_classes/planck_2018_CamSpec_python.py index 052fd67a6..da0aaab6a 100644 --- a/cobaya/likelihoods/base_classes/planck_2018_CamSpec_python.py +++ b/cobaya/likelihoods/base_classes/planck_2018_CamSpec_python.py @@ -55,8 +55,10 @@ class Planck2018CamSpecPython(DataSetLikelihood): @classmethod def get_bibtex(cls): - from cobaya.likelihoods.base_classes import Planck2018Clik - return Planck2018Clik.get_bibtex() + if not (res := super().get_bibtex()): + from cobaya.likelihoods.base_classes import Planck2018Clik + return Planck2018Clik.get_bibtex() + return res def read_normalized(self, filename, pivot=None): # arrays all based at L=0, in L(L+1)/2pi units diff --git a/cobaya/log.py b/cobaya/log.py index f578db72b..0dbf4162f 100644 --- a/cobaya/log.py +++ b/cobaya/log.py @@ -13,6 +13,7 @@ import platform import traceback import functools +import numpy as np from random import shuffle, choice # Local @@ -230,6 +231,7 @@ def format(self, record): "%(message)s") self._style._fmt = fmt return super().format(record) + # Configure stdout handler handle_stdout = logging.StreamHandler(sys.stdout) handle_stdout.setLevel(level) @@ -300,3 +302,9 @@ def mpi_info(self, msg, *args, **kwargs): @mpi.root_only def mpi_debug(self, msg, *args, **kwargs): self.log.debug(msg, *args, **kwargs) + + def param_dict_debug(self, msg, dic: dict): + """Removes numpy2 np.float64 for consistent output""" + if self.log.getEffectiveLevel() <= logging.DEBUG: + self.log.debug(msg, {k: float(v) if isinstance(v, np.number) else v + for k, v in dic.items()}) diff --git a/cobaya/model.py b/cobaya/model.py index ae7dd3e07..728f3f0fd 100644 --- a/cobaya/model.py +++ b/cobaya/model.py @@ -53,7 +53,7 @@ class LogPosterior: A consistency check will be performed if initialized simultaneously with log-posterior, log-priors and log-likelihoods, so, for faster initialisation, - you may prefer to pass log-priors and log-likelhoods only, and only pass all three + you may prefer to pass log-priors and log-likelihoods only, and only pass all three (so that the test is performed) only when e.g. loading from an old sample. If ``finite=True`` (default: False), it will try to represent infinities as the @@ -368,7 +368,7 @@ def _loglikes_input_params( outpar_dict: ParamValuesDict = {} compute_success = True self.provider.set_current_input_params(input_params) - self.log.debug("Got input parameters: %r", input_params) + self.param_dict_debug("Got input parameters: %r", input_params) loglikes = np.zeros(len(self.likelihood)) need_derived = self.requires_derived or return_derived or return_output_params for (component, like_index), param_dep in zip(self._component_order.items(), @@ -391,7 +391,7 @@ def _loglikes_input_params( raise LoggedError( self.log, "Likelihood %s has not returned a valid log-likelihood, " - "but %r instead.", component, + "but %s instead.", component, component.current_logp) from type_excpt if make_finite: loglikes = np.nan_to_num(loglikes) @@ -413,7 +413,7 @@ def _loglikes_input_params( else list(outpar_dict.values())) else: # explicitly derived, instead of output params derived_dict = self.parameterization.to_derived(outpar_dict) - self.log.debug("Computed derived parameters: %s", derived_dict) + self.param_dict_debug("Computed derived parameters: %s", derived_dict) return_params = (derived_dict if as_dict else list(derived_dict.values())) return return_likes, return_params @@ -537,12 +537,12 @@ def logposterior(self, self.log.debug( "Posterior to be computed for parameters %s", dict(zip(self.parameterization.sampled_params(), - params_values_array))) + params_values_array.astype(float)))) if not np.all(np.isfinite(params_values_array)): raise LoggedError( self.log, "Got non-finite parameter values: %r", dict(zip(self.parameterization.sampled_params(), - params_values_array))) + params_values_array.astype(float)))) # Notice that we don't use the make_finite in the prior call, # to correctly check if we have to compute the likelihood logpriors_1d = self.prior.logps_internal(params_values_array) @@ -591,8 +591,8 @@ def logpost(self, def get_valid_point(self, max_tries: int, ignore_fixed_ref: bool = False, logposterior_as_dict: bool = False, random_state=None, - ) -> Union[Tuple[np.ndarray, LogPosterior], - Tuple[np.ndarray, dict]]: + ) \ + -> Union[Tuple[np.ndarray, LogPosterior], Tuple[np.ndarray, dict]]: """ Finds a point with finite posterior, sampled from the reference pdf. @@ -1087,8 +1087,7 @@ def _assign_params(self, info_likelihood, info_theory=None, # Update infos! (helper theory parameters stored in yaml with host) inf = (info_likelihood if component in self.likelihood.values() else info_theory) - inf = inf.get(component.get_name()) - if inf: + if inf := inf.get(component.get_name()): inf.pop("params", None) inf[option] = component.get_attr_list_with_helpers(option) if self.is_debug_and_mpi_root(): diff --git a/cobaya/output.py b/cobaya/output.py index 30e1cd426..552ca2334 100644 --- a/cobaya/output.py +++ b/cobaya/output.py @@ -376,7 +376,8 @@ def __init__(self, prefix, resume=resume_default, force=False, infix=None): self.log.info("Output to be read-from/written-into folder '%s', with prefix '%s'", self.folder, self.prefix) self._resuming = False - if os.path.isfile(self.file_updated): + self._has_old_updated_info = os.path.isfile(self.file_updated) + if self._has_old_updated_info: self.log.info( "Found existing info files with the requested output prefix: '%s'", prefix) @@ -388,6 +389,11 @@ def __init__(self, prefix, resume=resume_default, force=False, infix=None): # Only in this case we can be sure that we are actually resuming self._resuming = True self.log.info("Let's try to resume/load.") + else: + self.log.debug( + "There was old updated info, but no resume or force requested. " + "Behavior will be handled by sampler." + ) @mpi.root_only def create_folder(self, folder): diff --git a/cobaya/prior.py b/cobaya/prior.py index cdd7ad314..a68173ba1 100644 --- a/cobaya/prior.py +++ b/cobaya/prior.py @@ -645,7 +645,7 @@ def logps_internal(self, x: np.ndarray) -> float: if len(self._non_uniform_indices) else 0) else: logps = -np.inf - self.log.debug("Got logpriors (internal) = %r", logps) + self.log.debug("Got logpriors (internal) = %s", logps) return logps def logps_external(self, input_params) -> List[float]: diff --git a/cobaya/samplers/mcmc/mcmc.py b/cobaya/samplers/mcmc/mcmc.py index 7f7d51c79..cec5673c1 100644 --- a/cobaya/samplers/mcmc/mcmc.py +++ b/cobaya/samplers/mcmc/mcmc.py @@ -175,8 +175,8 @@ def initialize(self): # If resuming but no existing chains, assume failed run and ignore blocking # if speeds measurement requested if ( - self.output.is_resuming() and not existing_chains_any_process and - self.measure_speeds + self.output.is_resuming() and not existing_chains_any_process and + self.measure_speeds ): self.blocking = None if self.measure_speeds and self.blocking: @@ -255,7 +255,7 @@ def n_fast(self): """Number of parameters which are considered fast, in binary fast/slow splits.""" return len(self.fast_params) - def get_acceptance_rate(self, first=0, last=None): + def get_acceptance_rate(self, first=0, last=None) -> np.floating: """ Computes the current acceptance rate, optionally only for ``[first:last]`` subchain. @@ -698,12 +698,12 @@ def check_convergence_and_learn_proposal(self): acceptance_rate = (np.average(acceptance_rates, weights=Ns) if acceptance_rates is not None else acceptance_rate) if self.oversample_thin > 1: - weights_multi_str = (" = 1/avg(%r)" % list(acceptance_rates) + weights_multi_str = (" = 1/avg(%r)" % acceptance_rates.tolist() if acceptance_rates is not None else "") self.log.info(" - Average thinned weight: %.3f%s", 1 / acceptance_rate, weights_multi_str) else: - accpt_multi_str = (" = avg(%r)" % list(acceptance_rates) + accpt_multi_str = (" = avg(%r)" % acceptance_rates.tolist() if acceptance_rates is not None else "") self.log.info(" - Acceptance rate: %.3f%s", acceptance_rate, accpt_multi_str) @@ -746,8 +746,8 @@ def check_convergence_and_learn_proposal(self): condition_number = Rminus1 / min(np.abs(eigvals)) self.log.debug(" - Condition number = %g", condition_number) self.log.debug(" - Eigenvalues = %r", eigvals) - accpt_multi_str = \ - " = sum(%r)" % list(Ns) if more_than_one_process() else "" + accpt_multi_str = " = sum(%r)" % Ns.astype(int).tolist() \ + if more_than_one_process() else "" self.log.info( " - Convergence of means: R-1 = %f after %d accepted steps%s", Rminus1, sum(Ns), accpt_multi_str) diff --git a/cobaya/samplers/polychord/polychord.py b/cobaya/samplers/polychord/polychord.py index cb51438c4..2aa6ff667 100644 --- a/cobaya/samplers/polychord/polychord.py +++ b/cobaya/samplers/polychord/polychord.py @@ -55,7 +55,7 @@ class polychord(Sampler): # variables from yaml do_clustering: bool - num_repeats: int + num_repeats: Union[int, str] confidence_for_unbounded: float callback_function: Callable blocking: Any diff --git a/cobaya/theories/camb/camb.py b/cobaya/theories/camb/camb.py index cfe0c4e18..d9222bc12 100644 --- a/cobaya/theories/camb/camb.py +++ b/cobaya/theories/camb/camb.py @@ -888,7 +888,7 @@ def set(self, params_values_dict, state): else: self.log.debug("Out of bounds parameters. " "Assigning 0 likelihood and going on.") - except (self.camb.baseconfig.CAMBValueError, self.camb.baseconfig.CAMBError): + except (self.camb.baseconfig.CAMBValueError, self.camb.baseconfig.CAMBError) as e: if self.stop_at_error: self.log.error( "Error setting parameters (see traceback below)! " @@ -896,6 +896,8 @@ def set(self, params_values_dict, state): "To ignore this kind of error, make 'stop_at_error: False'.", dict(state["params"]), dict(self.extra_args)) raise + else: + self.log.debug("Error setting parameters: %s", e) except self.camb.baseconfig.CAMBUnknownArgumentError as e: raise LoggedError( self.log, diff --git a/cobaya/theories/classy/classy.py b/cobaya/theories/classy/classy.py index 9f800e96f..06245df96 100644 --- a/cobaya/theories/classy/classy.py +++ b/cobaya/theories/classy/classy.py @@ -531,7 +531,7 @@ def set(self, params_values_dict): args = {self.translate_param(p): v for p, v in params_values_dict.items()} args.update(self.extra_args) # Generate and save - self.log.debug("Setting parameters: %r", args) + self.param_dict_debug("Setting parameters: %r", args) self.classy.set(**args) def calculate(self, state, want_derived=True, **params_values_dict): diff --git a/cobaya/theories/cosmo/boltzmannbase.py b/cobaya/theories/cosmo/boltzmannbase.py index 30e546e92..813918839 100644 --- a/cobaya/theories/cosmo/boltzmannbase.py +++ b/cobaya/theories/cosmo/boltzmannbase.py @@ -255,8 +255,7 @@ def check_no_repeated_input_extra(self): Should be called at initialisation, and at the end of every call to must_provide() """ - common = set(self.input_params).intersection(self.extra_args) - if common: + if common := set(self.input_params).intersection(self.extra_args): raise LoggedError( self.log, "The following parameters appear both as input parameters and " "as extra arguments: %s. Please, remove one of the definitions " diff --git a/cobaya/theory.py b/cobaya/theory.py index 571ebea96..26d923585 100644 --- a/cobaya/theory.py +++ b/cobaya/theory.py @@ -67,7 +67,7 @@ def __init__(self, info: TheoryDictIn = empty_dict, standalone=standalone) # set to Provider instance before calculations - self.provider: Any = None + self.provider: Optional['Provider'] = None # Generate cache states, to avoid recomputing. # Default 3, but can be changed by sampler self.set_cache_size(3) @@ -231,7 +231,7 @@ def check_cache_and_compute(self, params_values_dict, params_values_dict.update( zip(self._input_params_extra, self.provider.get_param(self._input_params_extra))) - self.log.debug("Got parameters %r", params_values_dict) + self.param_dict_debug("Got parameters %r", params_values_dict) state = None if cached: for _state in self._states: @@ -415,7 +415,7 @@ def __init__(self, model, requirement_providers: Dict[str, Theory]): self.requirement_providers = requirement_providers self.params = {} - def set_current_input_params(self, params): + def set_current_input_params(self, params: ParamValuesDict): self.params = params def get_param(self, param: Union[str, Iterable[str]]) -> Union[float, List[float]]: diff --git a/cobaya/tools.py b/cobaya/tools.py index 7858a1091..7b7773636 100644 --- a/cobaya/tools.py +++ b/cobaya/tools.py @@ -306,7 +306,6 @@ def get_external_function(string_or_function, name=None): if isinstance(string_or_function, str): try: scope = globals() - import scipy.stats as stats # provide default scope for eval scope['stats'] = stats scope['np'] = np string_or_function = replace_optimizations(string_or_function) @@ -993,7 +992,7 @@ def load_config_file(): from cobaya.yaml import yaml_load_file try: return yaml_load_file( - os.path.join(get_config_path(), packages_path_config_file)) + os.path.join(get_config_path(), packages_path_config_file)) or {} except: return {} diff --git a/cobaya/typing.py b/cobaya/typing.py index 5b08057dd..eccbbed24 100644 --- a/cobaya/typing.py +++ b/cobaya/typing.py @@ -1,6 +1,11 @@ -from typing import Dict, Any, Optional, Union, Sequence, Type, Callable, Mapping -from typing import TypedDict, Literal +from typing import Dict, Any, Optional, Union, Type, TypedDict, Literal, Mapping, \ + Callable, Sequence, Iterable from types import MappingProxyType +import contextlib +import typing +import numbers +import numpy as np +import sys InfoDict = Dict[str, Any] InfoDictIn = Mapping[str, Any] @@ -22,22 +27,10 @@ ParamValuesDict = Dict[str, float] # Do not yet explicitly support passing instances here TheoriesDict = Dict[str, Union[None, TheoryDict, Type]] -LikesDict = Dict[str, Union[None, LikeDict, Type, Callable]] +LikesDict = Dict[str, Union[None, str, LikeDict, Type, Callable]] SamplersDict = Dict[str, Optional[SamplerDict]] PriorsDict = Dict[str, Union[str, Callable]] -# parameters in a params list can be specified on input by -# 1. a ParamDict dictionary -# 2. constant value -# 3. a string giving lambda function of other parameters -# 4. None - must be a computed output parameter -# 5. Sequence specifying uniform prior range [min, max] and optionally -# 'ref' mean and standard deviation for starting positions, and optionally -# proposal width. Allowed lengths, 2, 4, 5 -ParamInput = Union['ParamDict', None, str, float, Sequence[float]] -ParamsDict = Dict[str, ParamInput] -ExpandedParamsDict = Dict[str, 'ParamDict'] - partags = {"prior", "ref", "proposal", "value", "drop", "derived", "latex", "renames", "min", "max"} @@ -62,7 +55,7 @@ class ParamDict(TypedDict, total=False): value: Union[float, Callable, str] derived: Union[bool, str, Callable] prior: Union[None, Sequence[float], SciPyDistDict, SciPyMinMaxDict] - ref: Union[None, Sequence[float], SciPyDistDict, SciPyMinMaxDict] + ref: Union[None, float, Sequence[float], SciPyDistDict, SciPyMinMaxDict] proposal: Optional[float] renames: Union[str, Sequence[str]] latex: str @@ -71,6 +64,19 @@ class ParamDict(TypedDict, total=False): max: float +# parameters in a params list can be specified on input by +# 1. a ParamDict dictionary +# 2. constant value +# 3. a string giving lambda function of other parameters +# 4. None - must be a computed output parameter +# 5. Sequence specifying uniform prior range [min, max] and optionally +# 'ref' mean and standard deviation for starting positions, and optionally +# proposal width. Allowed lengths, 2, 4, 5 +ParamInput = Union[ParamDict, None, str, float, Sequence[float]] +ParamsDict = Dict[str, ParamInput] +ExpandedParamsDict = Dict[str, ParamDict] + + class ModelDict(TypedDict, total=False): theory: TheoriesDict likelihood: LikesDict @@ -95,9 +101,163 @@ class InputDict(ModelDict, total=False): force: bool debug: Union[bool, int, str] resume: bool + minimize: bool stop_at_error: bool test: bool timing: bool packages_path: Optional[str] output: Optional[str] version: Optional[Union[str, InfoDict]] + + +enforce_type_checking = None + + +@contextlib.contextmanager +def type_checking(value: bool): + """ + Context manager to temporarily set typing.enforce_type_checking to a specific value. + Restores the original value when exiting the context. + """ + global enforce_type_checking + original_value = enforce_type_checking + enforce_type_checking = value + try: + yield + finally: + enforce_type_checking = original_value + + +def validate_type(expected_type: type, value: Any, path: str = ''): + """ + Checks for soft compatibility of a value with a type. + Raises TypeError with descriptive messages when validation fails. + + :param expected_type: from annotation + :param value: value to validate + :param path: string tracking the nested path for error messages + :raises TypeError: with descriptive message when validation fails + """ + if value is None or expected_type is Any: + return + + curr_path = f"'{path}'" if path else 'value' + + if expected_type is int: + if not (value in (np.inf, -np.inf) or isinstance(value, numbers.Integral)): + raise TypeError( + f"{curr_path} must be an integer, got {type(value).__name__}" + ) + return + + if expected_type is float: + if not (isinstance(value, numbers.Real) or + (isinstance(value, np.ndarray) and value.ndim == 0)): + raise TypeError(f"{curr_path} must be a float, got {type(value).__name__}") + return + + if expected_type is bool: + if not isinstance(value, bool): + raise TypeError( + f"{curr_path} must be boolean, got {type(value).__name__}" + ) + return + + if sys.version_info < (3, 10): + from typing_extensions import is_typeddict + else: + from typing import is_typeddict + + if is_typeddict(expected_type): + type_hints = typing.get_type_hints(expected_type) + if not isinstance(value, Mapping): + raise TypeError(f"{curr_path} must be a mapping for TypedDict " + f"'{expected_type.__name__}', got {type(value).__name__}") + if invalid_keys := set(value) - set(type_hints): + raise TypeError(f"{curr_path} contains invalid keys for TypedDict " + f"'{expected_type.__name__}': {invalid_keys}") + for key, val in value.items(): + validate_type(type_hints[key], val, f"{path}.{key}" if path else str(key)) + return + + if (origin := typing.get_origin(expected_type)) and ( + args := typing.get_args(expected_type)): + # complex types like Dict[str, float] etc. + + if origin is Union: + errors = [] + for t in args: + try: + return validate_type(t, value, path) + except TypeError as e: + error_msg = str(e) + error_path = error_msg.split(' ')[0].strip("'") + errors.append((error_path, error_msg)) + + longest_path = max((p for p, _ in errors), key=len) + path_errors = set(e for p, e in errors if p == longest_path) + raise TypeError( + f"{longest_path} failed to match any Union type:\n" + + "\n".join(f"- {e}" for e in path_errors) + ) + + if not isinstance(origin, type): + return validate_type(args[0], value, path) + + if isinstance(value, Mapping) != issubclass(origin, Mapping): + raise TypeError( + f"{curr_path} must be {origin.__name__}, got {type(value).__name__}" + ) + + if issubclass(origin, Mapping): + for k, v in value.items(): + key_path = f"{path}[{k!r}]" if path else f"[{k!r}]" + validate_type(args[0], k, f"{key_path} (key)") + validate_type(args[1], v, key_path) + return + + if issubclass(origin, Iterable): + if isinstance(value, np.ndarray): + if value.ndim == 0: + raise TypeError(f"{curr_path} numpy array zero rank") + if len(args) == 1 and not np.issubdtype(value.dtype, args[0]): + raise TypeError( + f"{curr_path} numpy array has wrong dtype: " + f"expected {args[0]}, got {value.dtype}" + ) + return + + if len(args) == 1: + if not isinstance(value, Iterable): + raise TypeError( + f"{curr_path} must be iterable, got {type(value).__name__}" + ) + for i, item in enumerate(value): + validate_type(args[0], item, f"{path}[{i}]" if path else f"[{i}]") + else: + if not isinstance(value, Sequence): + raise TypeError(f"{curr_path} must be a sequence for " + f"tuple types, got {type(value).__name__}") + if len(args) != len(value): + raise TypeError(f"{curr_path} has wrong length: " + f"expected {len(args)}, got {len(value)}") + for i, (t, v) in enumerate(zip(args, value)): + validate_type(t, v, f"{path}[{i}]" if path else f"[{i}]") + return + + if not isinstance(expected_type, type) or isinstance(value, expected_type) \ + or expected_type is Sequence and isinstance(value, np.ndarray): + return + + type_name = getattr(expected_type, "__name__", repr(expected_type)) + + # special case for Cobaya's NumberWithUnits, if not instance yet + if type_name == 'NumberWithUnits': + if not isinstance(value, (numbers.Real, str)): + raise TypeError( + f"{curr_path} must be a number or string for NumberWithUnits," + f" got {type(value).__name__}") + return + + raise TypeError(f"{curr_path} must be of type {type_name}, " + f"got {type(value).__name__}") diff --git a/docs/cobaya-example.ipynb b/docs/cobaya-example.ipynb index a7eef950c..396d2b01f 100644 --- a/docs/cobaya-example.ipynb +++ b/docs/cobaya-example.ipynb @@ -20,7 +20,7 @@ "try:\n", " from cobaya import run\n", "except ImportError:\n", - " sys.path.insert(0,'../../cobaya')\n", + " sys.path.insert(0,os.path.realpath(os.path.join(os.getcwd(), '../..', 'cobaya')))\n", " from cobaya import run" ] }, diff --git a/tests/conftest.py b/tests/conftest.py index f3f22fdec..55a35e202 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,6 +11,9 @@ from cobaya.conventions import packages_path_env, packages_path_arg_posix, \ test_skip_env from cobaya.tools import resolve_packages_path +import cobaya.typing + +cobaya.typing.enforce_type_checking = True def pytest_addoption(parser): diff --git a/tests/test_cosmo_docs_basic.py b/tests/test_cosmo_docs_basic.py index 88805ab64..82104e710 100644 --- a/tests/test_cosmo_docs_basic.py +++ b/tests/test_cosmo_docs_basic.py @@ -13,7 +13,7 @@ path = os.path.join(docs_folder, "src_examples", "cosmo_basic") file_pre = "basic_" -preset_pre = "planck_NPIPE_" +preset_pre = "planck_NPIPE_CamSpec_" def test_cosmo_docs_basic(): diff --git a/tests/test_mcmc.py b/tests/test_mcmc.py index 3eebcdbab..94aaac02e 100644 --- a/tests/test_mcmc.py +++ b/tests/test_mcmc.py @@ -10,6 +10,7 @@ from cobaya.tools import KL_norm from cobaya.yaml import yaml_load from .common_sampler import body_of_sampler_test, body_of_test_speeds +from cobaya.typing import type_checking pytestmark = pytest.mark.mpi @@ -39,7 +40,7 @@ def test_mcmc(tmpdir, temperature, do_plots, packages_path=None): def check_gaussian(sampler_instance): if not len(sampler_instance.collection) or \ - not len(sampler_instance.collection[int(sampler_instance.n() / 2):]): + not len(sampler_instance.collection[int(sampler_instance.n() / 2):]): return proposer = KL_norm( S1=sampler_instance.model.likelihood["gaussian_mixture"].covs[0], @@ -162,7 +163,7 @@ def test_mcmc_sync(): logger.info('Test error synchronization') if mpi.rank() == 0: info['sampler']['mcmc'] = {'max_samples': 'bad_val'} - with NoLogging(logging.ERROR), pytest.raises(TypeError): + with NoLogging(logging.ERROR), pytest.raises(TypeError), type_checking(False): run(info) else: with pytest.raises(mpi.OtherProcessError): diff --git a/tests/test_scripts.py b/tests/test_scripts.py index 73588fe50..00f9fde71 100644 --- a/tests/test_scripts.py +++ b/tests/test_scripts.py @@ -31,6 +31,10 @@ def test_doc(): def test_bib(tmpdir): + + with stdout_check('rosenberg22'): + bib_script(['planck_NPIPE_highl_CamSpec.TTTEEE']) + with stdout_check('Neal:2005'): bib_script(['des_y1.shear', 'camb', 'mcmc']) diff --git a/tests/test_type_checking.py b/tests/test_type_checking.py new file mode 100644 index 000000000..4a895a4b5 --- /dev/null +++ b/tests/test_type_checking.py @@ -0,0 +1,79 @@ +"""General test for types of components.""" + +from typing import Any, ClassVar, Dict, List, Optional, Tuple, Mapping +import numpy as np +import pytest + +from cobaya.component import CobayaComponent +from cobaya.tools import NumberWithUnits +from cobaya.typing import ParamDict, Sequence + + +class GenericComponent(CobayaComponent): + any: Any + classvar: ClassVar[int] = 1 + infinity: int = float("inf") + mean: NumberWithUnits = 1 + noise: float = 0 + none: int = None + numpy_int: int = np.int64(1) + optional: Optional[int] = None + paramdict_params: ParamDict + params: Dict[str, List[float]] + tuple_params: Tuple[float, float] = (0.0, 1.0) + array: Sequence[float] + array2: Sequence[float] + map: Mapping[float, str] + deferred: 'ParamDict' + unset = 1 + install_options: ClassVar + + _enforce_types = True + + +def test_component_types(): + correct_kwargs = { + "any": 1, + "classvar": 1, + "infinity": float("inf"), + "mean": 1, + "noise": 0, + "none": None, + "numpy_int": 1, + "optional": 3, + "paramdict_params": {"prior": [0.0, 1.0]}, + "params": {"a": [0.0, 1.0], "b": [0, 1]}, + "tuple_params": (0.0, 1.0), + "array": np.arange(2, dtype=np.float64), + "array2": [1, 2], + "map": {1.0: "a", 2.0: "b"}, + "deferred": {'value': lambda x: x}, + "install_options": {} + } + GenericComponent(correct_kwargs) + + wrong_cases = [ + {"classvar": "not_an_int"}, + {"infinity": "not_an_int"}, + {"mean": {}}, + {"noise": "not_a_float"}, + {"none": "not_a_none"}, + {"numpy_int": "not_an_int"}, + {"paramdict_params": {"prior": {"c": 1}}}, + {"params": "not_a_dict"}, + {"params": {1: [0.0, 1.0]}}, + {"params": {"a": "not_a_list"}}, + {"params": {"a": [0.0, "not_a_float"]}}, + {"optional": "not_an_int"}, + {"tuple_params": "not_a_tuple"}, + {"tuple_params": (0.0, "not_a_float")}, + {"array": 2}, + {"map": {"a": 2.0}} + ] + for case in wrong_cases: + with pytest.raises(TypeError): + GenericComponent({**correct_kwargs, **case}) + + +class NextComponent(CobayaComponent): + pass