Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove _raw_sample from Empirical RVs. #82

Merged
merged 8 commits into from
Dec 13, 2024
Merged
1 change: 0 additions & 1 deletion pelicun/tests/basic/test_damage_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,6 @@ def test__create_dmg_RVs(self, assessment_instance: Assessment) -> None:
for rv_name, rv in capacity_rv_reg.RV.items():
uniform_sample = rv._uni_sample
sample = rv.sample

assert uniform_sample is not None
assert sample is not None

Expand Down
62 changes: 23 additions & 39 deletions pelicun/uq.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def _get_std_samples(
uni_sample = norm.cdf(samples_i, loc=theta_i[0], scale=theta_i[1])

# replace 0 and 1 values with the nearest float
uni_sample[uni_sample == 0] = np.nextafter(0, 1)
uni_sample[uni_sample == 0] = FIRST_POSITIVE_NUMBER
uni_sample[uni_sample == 1] = np.nextafter(1, -1)

# consider truncation if needed
Expand Down Expand Up @@ -660,7 +660,7 @@ def _neg_log_likelihood( # noqa: C901
return 1e10

# make sure that the likelihood of censoring a sample is positive
cen_likelihood = max(1.0 - det_alpha, np.nextafter(0, 1))
cen_likelihood = max(1.0 - det_alpha, FIRST_POSITIVE_NUMBER)

else:
# If the data is not censored, use 1.0 for cen_likelihood to get a
Expand All @@ -679,7 +679,7 @@ def _neg_log_likelihood( # noqa: C901
# Zeros are a result of limited floating point precision. Replace them
# with the smallest possible positive floating point number to
# improve convergence.
likelihoods = np.clip(likelihoods, a_min=np.nextafter(0, 1), a_max=None)
likelihoods = np.clip(likelihoods, a_min=FIRST_POSITIVE_NUMBER, a_max=None)

# calculate the total negative log likelihood
negative_log_likelihood = -(
Expand Down Expand Up @@ -819,7 +819,9 @@ def fit_distribution_to_sample( # noqa: C901

# replace zero standard dev with negligible standard dev
sig_zero_id = np.where(sig_init == 0.0)[0]
sig_init[sig_zero_id] = 1e-6 * np.abs(mu_init[sig_zero_id]) + np.nextafter(0, 1)
sig_init[sig_zero_id] = (
1e-6 * np.abs(mu_init[sig_zero_id]) + FIRST_POSITIVE_NUMBER
)

# prepare a vector of initial values
# Note: The actual optimization uses zeros as initial parameters to
Expand Down Expand Up @@ -1244,7 +1246,7 @@ class RandomVariable(BaseRandomVariable):
def __init__(
self,
name: str,
theta: np.ndarray | None,
theta: np.ndarray,
truncation_limits: np.ndarray | None = None,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
Expand Down Expand Up @@ -1390,12 +1392,10 @@ def _prepare_theta_and_truncation_limit_arrays(
of `values`.
"""
theta = self.theta
assert theta is not None
truncation_limits = self.truncation_limits
assert truncation_limits is not None
if self.constant_parameters():
theta = np.atleast_2d(theta)
assert theta is not None
elif len(values) != theta.shape[0]:
msg = (
'Number of elements in `values` variable should '
Expand Down Expand Up @@ -1547,7 +1547,6 @@ def __init__(
f_map=f_map,
anchor=anchor,
)
assert self.theta is not None, '`theta` is required for Normal RVs'
self.distribution = 'normal'

def cdf(self, values: np.ndarray) -> np.ndarray:
Expand Down Expand Up @@ -1724,7 +1723,6 @@ def __init__(
f_map=f_map,
anchor=anchor,
)
assert self.theta is not None, '`theta` is required for LogNormal RVs'
self.distribution = 'lognormal'

def cdf(self, values: np.ndarray) -> np.ndarray:
Expand All @@ -1751,7 +1749,7 @@ def cdf(self, values: np.ndarray) -> np.ndarray:
a, b = truncation_limits.T

# Replace NaN values
a = np.nan_to_num(a, nan=np.nextafter(0, 1))
a = np.nan_to_num(a, nan=FIRST_POSITIVE_NUMBER)
b = np.nan_to_num(b, nan=np.inf)

p_a, p_b = (
Expand All @@ -1769,7 +1767,7 @@ def cdf(self, values: np.ndarray) -> np.ndarray:
result = (p_vals - p_a) / (p_b - p_a)

else:
values = np.maximum(values, np.nextafter(0, 1))
values = np.maximum(values, FIRST_POSITIVE_NUMBER)

result = norm.cdf(np.log(values), loc=np.log(theta), scale=beta)

Expand Down Expand Up @@ -1802,8 +1800,8 @@ def inverse_transform(self, values: np.ndarray) -> np.ndarray:
a, b = truncation_limits.T

# Replace NaN values
a = np.nan_to_num(a, nan=np.nextafter(0, 1))
a[a <= 0] = np.nextafter(0, 1)
a = np.nan_to_num(a, nan=FIRST_POSITIVE_NUMBER)
a[a <= 0] = FIRST_POSITIVE_NUMBER
b = np.nan_to_num(b, nan=np.inf)

p_a, p_b = (
Expand Down Expand Up @@ -1852,7 +1850,6 @@ def __init__(
f_map=f_map,
anchor=anchor,
)
assert self.theta is not None, '`theta` is required for Uniform RVs'
self.distribution = 'uniform'

if self.theta.ndim != 1:
Expand All @@ -1877,7 +1874,6 @@ def cdf(self, values: np.ndarray) -> np.ndarray:
1D float ndarray containing CDF values

"""
assert self.theta is not None
a, b = self.theta[:2]

if np.isnan(a):
Expand Down Expand Up @@ -1908,7 +1904,6 @@ def inverse_transform(self, values: np.ndarray) -> np.ndarray:
Inverse CDF values

"""
assert self.theta is not None
a, b = self.theta[:2]

if np.isnan(a):
Expand Down Expand Up @@ -1953,7 +1948,6 @@ def __init__(
f_map=f_map,
anchor=anchor,
)
assert self.theta is not None, '`theta` is required for Weibull RVs'
self.distribution = 'weibull'

if self.theta.ndim != 1:
Expand All @@ -1978,7 +1972,6 @@ def cdf(self, values: np.ndarray) -> np.ndarray:
1D float ndarray containing CDF values

"""
assert self.theta is not None
lambda_, kappa = self.theta[:2]

if np.any(~np.isnan(self.truncation_limits)):
Expand Down Expand Up @@ -2029,7 +2022,6 @@ def inverse_transform(self, values: np.ndarray) -> np.ndarray:
Inverse CDF values

"""
assert self.theta is not None
lambda_, kappa = self.theta[:2]

if np.any(~np.isnan(self.truncation_limits)):
Expand Down Expand Up @@ -2096,7 +2088,6 @@ def __init__(
f_map=f_map,
anchor=anchor,
)
assert self.theta is not None, '`theta` is required for MultilinearCDF RVs'
self.distribution = 'multilinear_CDF'

if not np.all(np.isnan(truncation_limits)):
Expand Down Expand Up @@ -2159,7 +2150,6 @@ def cdf(self, values: np.ndarray) -> np.ndarray:
1D float ndarray containing CDF values

"""
assert self.theta is not None
x_i = [-np.inf] + [x[0] for x in self.theta] + [np.inf]
y_i = [0.00] + [x[1] for x in self.theta] + [1.00]

Expand All @@ -2184,7 +2174,6 @@ def inverse_transform(self, values: np.ndarray) -> np.ndarray:
Inverse CDF values

"""
assert self.theta is not None
x_i = [x[0] for x in self.theta]
y_i = [x[1] for x in self.theta]

Expand All @@ -2201,7 +2190,7 @@ def inverse_transform(self, values: np.ndarray) -> np.ndarray:
class EmpiricalRandomVariable(RandomVariable):
"""Empirical random variable."""

__slots__: list[str] = ['_raw_sample']
__slots__: list[str] = []

def __init__(
self,
Expand All @@ -2214,9 +2203,12 @@ def __init__(
"""Instantiate an Empirical random variable."""
if truncation_limits is None:
truncation_limits = np.array((np.nan, np.nan))

theta = np.atleast_1d(theta)

super().__init__(
name=name,
theta=None,
theta=theta,
truncation_limits=truncation_limits,
f_map=f_map,
anchor=anchor,
Expand All @@ -2226,8 +2218,6 @@ def __init__(
msg = f'{self.distribution} RVs do not support truncation'
raise NotImplementedError(msg)

self._raw_sample = np.atleast_1d(theta)

def inverse_transform(self, values: np.ndarray) -> np.ndarray:
"""
Evaluate the inverse CDF.
Expand All @@ -2251,14 +2241,14 @@ def inverse_transform(self, values: np.ndarray) -> np.ndarray:
normalized positions.

"""
s_ids = (values * len(self._raw_sample)).astype(int)
return self._raw_sample[s_ids]
s_ids = (values * len(self.theta)).astype(int)
return self.theta[s_ids]


class CoupledEmpiricalRandomVariable(UtilityRandomVariable):
"""Coupled empirical random variable."""

__slots__: list[str] = ['_raw_sample']
__slots__: list[str] = ['theta']

def __init__(
self,
Expand Down Expand Up @@ -2295,19 +2285,17 @@ def __init__(
When truncation limits are provided

"""
if truncation_limits is None:
truncation_limits = np.array((np.nan, np.nan))
super().__init__(
name=name,
f_map=f_map,
anchor=anchor,
)
self.distribution = 'coupled_empirical'
if not np.all(np.isnan(truncation_limits)):
if truncation_limits is not None:
msg = f'{self.distribution} RVs do not support truncation'
raise NotImplementedError(msg)

self._raw_sample = np.atleast_1d(theta)
self.theta = np.atleast_1d(theta)

def inverse_transform(self, sample_size: int) -> np.ndarray:
"""
Expand All @@ -2332,10 +2320,8 @@ def inverse_transform(self, sample_size: int) -> np.ndarray:
dataset.

"""
raw_sample_count = len(self._raw_sample)
new_sample = np.tile(
self._raw_sample, int(sample_size / raw_sample_count) + 1
)
raw_sample_count = len(self.theta)
new_sample = np.tile(self.theta, int(sample_size / raw_sample_count) + 1)
return new_sample[:sample_size]


Expand Down Expand Up @@ -2450,7 +2436,6 @@ def __init__(
if not np.all(np.isnan(truncation_limits)):
msg = f'{self.distribution} RVs do not support truncation'
raise NotImplementedError(msg)
assert self.theta is not None, '`theta` is required for Multinomial RVs'
self.distribution = 'multinomial'
if np.sum(theta) > 1.00:
msg = (
Expand Down Expand Up @@ -2482,7 +2467,6 @@ def inverse_transform(self, values: np.ndarray) -> np.ndarray:
Discrete events corresponding to the input values.

"""
assert self.theta is not None
p_cum = np.cumsum(self.theta)[:-1]

for i, p_i in enumerate(p_cum):
Expand Down
Loading