diff --git a/pelicun/assessment.py b/pelicun/assessment.py index faca36e8c..2165ced17 100644 --- a/pelicun/assessment.py +++ b/pelicun/assessment.py @@ -269,7 +269,7 @@ def scale_factor(self, unit): ------- float Scale factor - + Raises ------ ValueError @@ -278,7 +278,6 @@ def scale_factor(self, unit): """ if unit is not None: - if unit in self.unit_conversion_factors: scale_factor = self.unit_conversion_factors[unit] diff --git a/pelicun/auto.py b/pelicun/auto.py index b440f695f..4fe4622f6 100644 --- a/pelicun/auto.py +++ b/pelicun/auto.py @@ -95,7 +95,7 @@ def auto_populate( If the configuration dictionary does not contain necessary asset information under 'GeneralInformation'. """ - + # try to get the AIM attributes AIM = config.get('GeneralInformation', None) if AIM is None: diff --git a/pelicun/base.py b/pelicun/base.py index cd93fc4ac..39555a826 100644 --- a/pelicun/base.py +++ b/pelicun/base.py @@ -460,9 +460,7 @@ def log_file(self, value): self._log_file = None else: - try: - filepath = Path(value).resolve() self._log_file = str(filepath) @@ -530,7 +528,6 @@ def msg(self, msg='', prepend_timestamp=True, prepend_blank_space=True): msg_lines = msg.split('\n') for msg_i, msg_line in enumerate(msg_lines): - if prepend_timestamp and (msg_i == 0): formatted_msg = '{} {}'.format( datetime.now().strftime(self.log_time_format), msg_line @@ -772,17 +769,14 @@ def convert_to_SimpleIndex(data, axis=0, inplace=False): """ if axis in {0, 1}: - if inplace: data_mod = data else: data_mod = data.copy() if axis == 0: - # only perform this if there are multiple levels if data.index.nlevels > 1: - simple_name = '-'.join( [n if n is not None else "" for n in data.index.names] ) @@ -794,10 +788,8 @@ def convert_to_SimpleIndex(data, axis=0, inplace=False): data_mod.index.name = simple_name elif axis == 1: - # only perform this if there are multiple levels if data.columns.nlevels > 1: - simple_name = '-'.join( [n if n is not None else "" for n in data.columns.names] ) @@ -848,7 +840,6 @@ def convert_to_MultiIndex(data, axis=0, inplace=False): if ((axis == 0) and (isinstance(data.index, pd.MultiIndex))) or ( (axis == 1) and (isinstance(data.columns, pd.MultiIndex)) ): - # if yes, return the data unchanged return data @@ -864,7 +855,6 @@ def convert_to_MultiIndex(data, axis=0, inplace=False): max_lbl_len = np.max([len(labels) for labels in index_labels]) for l_i, labels in enumerate(index_labels): - if len(labels) != max_lbl_len: labels += [ '', @@ -874,7 +864,6 @@ def convert_to_MultiIndex(data, axis=0, inplace=False): index_labels = np.array(index_labels) if index_labels.shape[1] > 1: - if inplace: data_mod = data else: @@ -932,9 +921,7 @@ def show_matrix(data, use_describe=False): If False, simply prints the matrix as is. """ if use_describe: - pp.pprint( - pd.DataFrame(data).describe(percentiles=[0.01, 0.1, 0.5, 0.9, 0.99]) - ) + pp.pprint(pd.DataFrame(data).describe(percentiles=[0.01, 0.1, 0.5, 0.9, 0.99])) else: pp.pprint(pd.DataFrame(data)) diff --git a/pelicun/db.py b/pelicun/db.py index 3ca2548f2..30ec97fcb 100644 --- a/pelicun/db.py +++ b/pelicun/db.py @@ -413,9 +413,7 @@ def create_FEMA_P58_fragility_db( ls_meta.update( { f"DS{ds_id}": { - "Description": cmp_meta[ - f"DS_{ds_id}_Description" - ], + "Description": cmp_meta[f"DS_{ds_id}_Description"], "RepairAction": repair_action, } } @@ -1005,9 +1003,9 @@ def create_FEMA_P58_repair_db( f"{cost_qnt_low:g},{cost_qnt_up:g}" ) - df_db.loc[(cmp.Index, 'Cost'), f'DS{DS_i}-Theta_1'] = ( - f"{cost_theta[1]:g}" - ) + df_db.loc[ + (cmp.Index, 'Cost'), f'DS{DS_i}-Theta_1' + ] = f"{cost_theta[1]:g}" df_db.loc[(cmp.Index, 'Time'), f'DS{DS_i}-Family'] = family_hat @@ -1016,37 +1014,33 @@ def create_FEMA_P58_repair_db( f"{time_qnt_low:g},{time_qnt_up:g}" ) - df_db.loc[(cmp.Index, 'Time'), f'DS{DS_i}-Theta_1'] = ( - f"{time_theta[1]:g}" - ) + df_db.loc[ + (cmp.Index, 'Time'), f'DS{DS_i}-Theta_1' + ] = f"{time_theta[1]:g}" df_db.loc[(cmp.Index, 'Time'), f'DS{DS_i}-LongLeadTime'] = int( time_vals[5] > 0 ) - df_db.loc[(cmp.Index, 'Carbon'), f'DS{DS_i}-Family'] = ( - family_hat_carbon - ) + df_db.loc[(cmp.Index, 'Carbon'), f'DS{DS_i}-Family'] = family_hat_carbon - df_db.loc[(cmp.Index, 'Carbon'), f'DS{DS_i}-Theta_0'] = ( - f"{carbon_theta[0]:g}" - ) + df_db.loc[ + (cmp.Index, 'Carbon'), f'DS{DS_i}-Theta_0' + ] = f"{carbon_theta[0]:g}" - df_db.loc[(cmp.Index, 'Carbon'), f'DS{DS_i}-Theta_1'] = ( - f"{carbon_theta[1]:g}" - ) + df_db.loc[ + (cmp.Index, 'Carbon'), f'DS{DS_i}-Theta_1' + ] = f"{carbon_theta[1]:g}" - df_db.loc[(cmp.Index, 'Energy'), f'DS{DS_i}-Family'] = ( - family_hat_energy - ) + df_db.loc[(cmp.Index, 'Energy'), f'DS{DS_i}-Family'] = family_hat_energy - df_db.loc[(cmp.Index, 'Energy'), f'DS{DS_i}-Theta_0'] = ( - f"{energy_theta[0]:g}" - ) + df_db.loc[ + (cmp.Index, 'Energy'), f'DS{DS_i}-Theta_0' + ] = f"{energy_theta[0]:g}" - df_db.loc[(cmp.Index, 'Energy'), f'DS{DS_i}-Theta_1'] = ( - f"{energy_theta[1]:g}" - ) + df_db.loc[ + (cmp.Index, 'Energy'), f'DS{DS_i}-Theta_1' + ] = f"{energy_theta[1]:g}" if ds_map.count('1') == 1: ds_pure_id = ds_map[::-1].find('1') + 1 @@ -1073,8 +1067,7 @@ def create_FEMA_P58_repair_db( meta_data['DamageStates'].update( { f"DS{DS_i}": { - "Description": 'Combination of ' - + ' & '.join(ds_combo), + "Description": 'Combination of ' + ' & '.join(ds_combo), "RepairAction": 'Combination of pure DS repair ' 'actions.', } @@ -1087,9 +1080,9 @@ def create_FEMA_P58_repair_db( for DS_i in range(1, 6): # cost if not pd.isna(getattr(cmp, f'Best_Fit_DS{DS_i}')): - df_db.loc[(cmp.Index, 'Cost'), f'DS{DS_i}-Family'] = ( - convert_family[getattr(cmp, f'Best_Fit_DS{DS_i}')] - ) + df_db.loc[(cmp.Index, 'Cost'), f'DS{DS_i}-Family'] = convert_family[ + getattr(cmp, f'Best_Fit_DS{DS_i}') + ] if not pd.isna(getattr(cmp, f'Lower_Qty_Mean_DS{DS_i}')): theta_0_low = getattr(cmp, f'Lower_Qty_Mean_DS{DS_i}') @@ -1098,9 +1091,7 @@ def create_FEMA_P58_repair_db( qnt_up = getattr(cmp, f'Upper_Qty_Cutoff_DS{DS_i}') if theta_0_low == 0.0 and theta_0_up == 0.0: - df_db.loc[(cmp.Index, 'Cost'), f'DS{DS_i}-Family'] = ( - np.nan - ) + df_db.loc[(cmp.Index, 'Cost'), f'DS{DS_i}-Family'] = np.nan else: df_db.loc[(cmp.Index, 'Cost'), f'DS{DS_i}-Theta_0'] = ( @@ -1108,9 +1099,9 @@ def create_FEMA_P58_repair_db( f"{qnt_low:g},{qnt_up:g}" ) - df_db.loc[(cmp.Index, 'Cost'), f'DS{DS_i}-Theta_1'] = ( - f"{getattr(cmp, f'CV__Dispersion_DS{DS_i}'):g}" - ) + df_db.loc[ + (cmp.Index, 'Cost'), f'DS{DS_i}-Theta_1' + ] = f"{getattr(cmp, f'CV__Dispersion_DS{DS_i}'):g}" else: incomplete_cost = True @@ -1130,9 +1121,9 @@ def create_FEMA_P58_repair_db( # time if not pd.isna(getattr(cmp, f'Best_Fit_DS{DS_i}_1')): - df_db.loc[(cmp.Index, 'Time'), f'DS{DS_i}-Family'] = ( - convert_family[getattr(cmp, f'Best_Fit_DS{DS_i}_1')] - ) + df_db.loc[(cmp.Index, 'Time'), f'DS{DS_i}-Family'] = convert_family[ + getattr(cmp, f'Best_Fit_DS{DS_i}_1') + ] if not pd.isna(getattr(cmp, f'Lower_Qty_Mean_DS{DS_i}_1')): theta_0_low = getattr(cmp, f'Lower_Qty_Mean_DS{DS_i}_1') @@ -1141,9 +1132,7 @@ def create_FEMA_P58_repair_db( qnt_up = getattr(cmp, f'Upper_Qty_Cutoff_DS{DS_i}_1') if theta_0_low == 0.0 and theta_0_up == 0.0: - df_db.loc[(cmp.Index, 'Time'), f'DS{DS_i}-Family'] = ( - np.nan - ) + df_db.loc[(cmp.Index, 'Time'), f'DS{DS_i}-Family'] = np.nan else: df_db.loc[(cmp.Index, 'Time'), f'DS{DS_i}-Theta_0'] = ( @@ -1151,12 +1140,12 @@ def create_FEMA_P58_repair_db( f"{qnt_low:g},{qnt_up:g}" ) - df_db.loc[(cmp.Index, 'Time'), f'DS{DS_i}-Theta_1'] = ( - f"{getattr(cmp, f'CV__Dispersion_DS{DS_i}_2'):g}" - ) + df_db.loc[ + (cmp.Index, 'Time'), f'DS{DS_i}-Theta_1' + ] = f"{getattr(cmp, f'CV__Dispersion_DS{DS_i}_2'):g}" - df_db.loc[(cmp.Index, 'Time'), f'DS{DS_i}-LongLeadTime'] = ( - int(getattr(cmp, f'DS_{DS_i}_Long_Lead_Time') == 'YES') + df_db.loc[(cmp.Index, 'Time'), f'DS{DS_i}-LongLeadTime'] = int( + getattr(cmp, f'DS_{DS_i}_Long_Lead_Time') == 'YES' ) else: @@ -1164,9 +1153,9 @@ def create_FEMA_P58_repair_db( # Carbon if not pd.isna(getattr(cmp, f'DS{DS_i}_Best_Fit')): - df_db.loc[(cmp.Index, 'Carbon'), f'DS{DS_i}-Family'] = ( - convert_family[getattr(cmp, f'DS{DS_i}_Best_Fit')] - ) + df_db.loc[ + (cmp.Index, 'Carbon'), f'DS{DS_i}-Family' + ] = convert_family[getattr(cmp, f'DS{DS_i}_Best_Fit')] df_db.loc[(cmp.Index, 'Carbon'), f'DS{DS_i}-Theta_0'] = getattr( cmp, f'DS{DS_i}_Embodied_Carbon_kg_CO2eq' @@ -1178,9 +1167,9 @@ def create_FEMA_P58_repair_db( # Energy if not pd.isna(getattr(cmp, f'DS{DS_i}_Best_Fit_1')): - df_db.loc[(cmp.Index, 'Energy'), f'DS{DS_i}-Family'] = ( - convert_family[getattr(cmp, f'DS{DS_i}_Best_Fit_1')] - ) + df_db.loc[ + (cmp.Index, 'Energy'), f'DS{DS_i}-Family' + ] = convert_family[getattr(cmp, f'DS{DS_i}_Best_Fit_1')] df_db.loc[(cmp.Index, 'Energy'), f'DS{DS_i}-Theta_0'] = getattr( cmp, f'DS{DS_i}_Embodied_Energy_MJ' @@ -1290,13 +1279,9 @@ def create_Hazus_EQ_fragility_db( frag_meta = {} # prepare lists of labels for various building features - design_levels = list( - raw_data['Structural_Fragility_Groups']['EDP_limits'].keys() - ) + design_levels = list(raw_data['Structural_Fragility_Groups']['EDP_limits'].keys()) - building_types = list( - raw_data['Structural_Fragility_Groups']['P_collapse'].keys() - ) + building_types = list(raw_data['Structural_Fragility_Groups']['P_collapse'].keys()) convert_design_level = { 'High_code': 'HC', @@ -1398,9 +1383,7 @@ def create_Hazus_EQ_fragility_db( "Description": ( frag_meta['Meta']['Collections']['STR']['Description'] + ", " - + frag_meta['Meta']['StructuralSystems'][st][ - 'Description' - ] + + frag_meta['Meta']['StructuralSystems'][st]['Description'] + ", " + frag_meta['Meta']['HeightClasses'][hc]['Description'] + ", " @@ -1428,9 +1411,7 @@ def create_Hazus_EQ_fragility_db( "Description": ( frag_meta['Meta']['Collections']['STR']['Description'] + ", " - + frag_meta['Meta']['StructuralSystems'][st][ - 'Description' - ] + + frag_meta['Meta']['StructuralSystems'][st]['Description'] + ", " + frag_meta['Meta']['DesignLevels'][ convert_design_level[dl] @@ -1454,18 +1435,18 @@ def create_Hazus_EQ_fragility_db( ds_meta = frag_meta['Meta']['StructuralSystems'][st]['DamageStates'] for LS_i in range(1, 5): df_db.loc[counter, f'LS{LS_i}-Family'] = 'lognormal' - df_db.loc[counter, f'LS{LS_i}-Theta_0'] = S_data['EDP_limits'][ + df_db.loc[counter, f'LS{LS_i}-Theta_0'] = S_data['EDP_limits'][dl][ + bt + ][LS_i - 1] + df_db.loc[counter, f'LS{LS_i}-Theta_1'] = S_data['Fragility_beta'][ dl - ][bt][LS_i - 1] - df_db.loc[counter, f'LS{LS_i}-Theta_1'] = S_data[ - 'Fragility_beta' - ][dl] + ] if LS_i == 4: p_coll = S_data['P_collapse'][bt] - df_db.loc[counter, f'LS{LS_i}-DamageStateWeights'] = ( - f'{1.0 - p_coll} | {p_coll}' - ) + df_db.loc[ + counter, f'LS{LS_i}-DamageStateWeights' + ] = f'{1.0 - p_coll} | {p_coll}' cmp_meta["LimitStates"].update( { @@ -1480,9 +1461,7 @@ def create_Hazus_EQ_fragility_db( cmp_meta["LimitStates"].update( { f"LS{LS_i}": { - f"DS{LS_i}": { - "Description": ds_meta[f"DS{LS_i}"] - } + f"DS{LS_i}": {"Description": ds_meta[f"DS{LS_i}"]} } } ) @@ -1558,9 +1537,7 @@ def create_Hazus_EQ_fragility_db( "Comments": ( frag_meta['Meta']['Collections']['NSA']['Comment'] + "\n" - + frag_meta['Meta']['DesignLevels'][convert_design_level[dl]][ - 'Comment' - ] + + frag_meta['Meta']['DesignLevels'][convert_design_level[dl]]['Comment'] ), "SuggestedComponentBlockSize": "1 EA", "RoundUpToIntegerQuantity": "True", @@ -1622,9 +1599,7 @@ def create_Hazus_EQ_fragility_db( 'Description' ] + ", " - + frag_meta['Meta']['HeightClasses'][hc][ - 'Description' - ] + + frag_meta['Meta']['HeightClasses'][hc]['Description'] + ", " + frag_meta['Meta']['DesignLevels'][ convert_design_level[dl] @@ -1633,9 +1608,7 @@ def create_Hazus_EQ_fragility_db( "Comments": ( frag_meta['Meta']['Collections']['LF']['Comment'] + "\n" - + frag_meta['Meta']['StructuralSystems'][st][ - 'Comment' - ] + + frag_meta['Meta']['StructuralSystems'][st]['Comment'] + "\n" + frag_meta['Meta']['HeightClasses'][hc]['Comment'] + "\n" @@ -1663,9 +1636,7 @@ def create_Hazus_EQ_fragility_db( "Comments": ( frag_meta['Meta']['Collections']['LF']['Comment'] + "\n" - + frag_meta['Meta']['StructuralSystems'][st][ - 'Comment' - ] + + frag_meta['Meta']['StructuralSystems'][st]['Comment'] + "\n" + frag_meta['Meta']['DesignLevels'][ convert_design_level[dl] @@ -1677,23 +1648,21 @@ def create_Hazus_EQ_fragility_db( } # store the Limit State parameters - ds_meta = frag_meta['Meta']['StructuralSystems'][st][ - 'DamageStates' - ] + ds_meta = frag_meta['Meta']['StructuralSystems'][st]['DamageStates'] for LS_i in range(1, 5): df_db.loc[counter, f'LS{LS_i}-Family'] = 'lognormal' - df_db.loc[counter, f'LS{LS_i}-Theta_0'] = LF_data[ - 'EDP_limits' - ][dl][bt][LS_i - 1] + df_db.loc[counter, f'LS{LS_i}-Theta_0'] = LF_data['EDP_limits'][ + dl + ][bt][LS_i - 1] df_db.loc[counter, f'LS{LS_i}-Theta_1'] = LF_data[ 'Fragility_beta' ][dl] if LS_i == 4: p_coll = LF_data['P_collapse'][bt] - df_db.loc[counter, f'LS{LS_i}-DamageStateWeights'] = ( - f'{1.0 - p_coll} | {p_coll}' - ) + df_db.loc[ + counter, f'LS{LS_i}-DamageStateWeights' + ] = f'{1.0 - p_coll} | {p_coll}' cmp_meta["LimitStates"].update( { @@ -1757,9 +1726,9 @@ def create_Hazus_EQ_fragility_db( f_depth ] p_complete = GF_data['P_Complete'] - df_db.loc[counter, 'LS1-DamageStateWeights'] = ( - f'{1.0 - p_complete} | {p_complete}' - ) + df_db.loc[ + counter, 'LS1-DamageStateWeights' + ] = f'{1.0 - p_complete} | {p_complete}' cmp_meta["LimitStates"].update( { @@ -1867,9 +1836,7 @@ def create_Hazus_EQ_repair_db( # create the MultiIndex cmp_types = ['STR', 'NSD', 'NSA', 'LF'] comps = [ - f'{cmp_type}.{occ_type}' - for cmp_type in cmp_types - for occ_type in occupancies + f'{cmp_type}.{occ_type}' for cmp_type in cmp_types for occ_type in occupancies ] DVs = ['Cost', 'Time'] df_MI = pd.MultiIndex.from_product([comps, DVs], names=['ID', 'DV']) @@ -1969,9 +1936,9 @@ def create_Hazus_EQ_repair_db( {f"DS{DS_i}": {"Description": ds_meta[f"DS{DS_i}"]}} ) - df_db.loc[(cmp_id, 'Cost'), f'DS{DS_i}-Theta_0'] = NSD_data[ - 'Repair_cost' - ][occ_type][DS_i - 1] + df_db.loc[(cmp_id, 'Cost'), f'DS{DS_i}-Theta_0'] = NSD_data['Repair_cost'][ + occ_type + ][DS_i - 1] # store metadata meta_dict.update({cmp_id: cmp_meta}) @@ -2006,9 +1973,9 @@ def create_Hazus_EQ_repair_db( {f"DS{DS_i}": {"Description": ds_meta[f"DS{DS_i}"]}} ) - df_db.loc[(cmp_id, 'Cost'), f'DS{DS_i}-Theta_0'] = NSA_data[ - 'Repair_cost' - ][occ_type][DS_i - 1] + df_db.loc[(cmp_id, 'Cost'), f'DS{DS_i}-Theta_0'] = NSA_data['Repair_cost'][ + occ_type + ][DS_i - 1] # store metadata meta_dict.update({cmp_id: cmp_meta}) @@ -2300,15 +2267,9 @@ def create_Hazus_HU_fragility_db( 'Masonry, Engineered Residential Building, High-Rise (6+ Stories).' ), # ------------------------ - 'M.ECB.L': ( - 'Masonry, Engineered Commercial Building, Low-Rise (1-2 Stories).' - ), - 'M.ECB.M': ( - 'Masonry, Engineered Commercial Building, Mid-Rise (3-5 Stories).' - ), - 'M.ECB.H': ( - 'Masonry, Engineered Commercial Building, High-Rise (6+ Stories).' - ), + 'M.ECB.L': ('Masonry, Engineered Commercial Building, Low-Rise (1-2 Stories).'), + 'M.ECB.M': ('Masonry, Engineered Commercial Building, Mid-Rise (3-5 Stories).'), + 'M.ECB.H': ('Masonry, Engineered Commercial Building, High-Rise (6+ Stories).'), # ------------------------ 'C.ERB.L': ( 'Concrete, Engineered Residential Building, Low-Rise (1-2 Stories).' @@ -2643,7 +2604,6 @@ def find_class_type(entry: str) -> str | None: # for fragility_id in fragility_data['ID'].to_list(): - class_type = find_class_type(fragility_id) class_type_human_readable = class_types[class_type] diff --git a/pelicun/file_io.py b/pelicun/file_io.py index 0f099259f..3278d3995 100644 --- a/pelicun/file_io.py +++ b/pelicun/file_io.py @@ -157,7 +157,7 @@ def save_to_csv( unit conversions and reformatting applied. Otherwise, returns None after saving the data to a CSV file. """ - + if filepath is None: if log: log.msg('Preparing data ...', prepend_timestamp=False) @@ -166,13 +166,11 @@ def save_to_csv( log.msg(f'Saving data to {filepath}...', prepend_timestamp=False) if data is not None: - # make sure we do not modify the original data data = data.copy() # convert units and add unit information, if needed if units is not None: - if unit_conversion_factors is None: raise ValueError( 'When units is not None, ' @@ -190,7 +188,6 @@ def save_to_csv( labels_to_keep = [] for unit_name in units.unique(): - labels = units.loc[units == unit_name].index.values unit_factor = 1.0 / unit_conversion_factors[unit_name] @@ -237,17 +234,13 @@ def save_to_csv( data = base.convert_to_SimpleIndex(data, axis=1) if filepath is not None: - filepath = Path(filepath).resolve() if filepath.suffix == '.csv': - # save the contents of the DataFrame into a csv data.to_csv(filepath) if log: - log.msg( - 'Data successfully saved to file.', prepend_timestamp=False - ) + log.msg('Data successfully saved to file.', prepend_timestamp=False) else: raise ValueError( @@ -398,7 +391,6 @@ def load_data( # if there is information about units, separate that information # and optionally apply conversions to all numeric values if 'Units' in the_index: - units = data['Units'] if orientation == 1 else data.loc['Units'] data.drop('Units', axis=orientation, inplace=True) data = base.convert_dtypes(data) @@ -415,20 +407,18 @@ def load_data( conversion_factors = units.map( lambda unit: ( - 1.00 - if pd.isna(unit) - else unit_conversion_factors.get(unit, 1.00) + 1.00 if pd.isna(unit) else unit_conversion_factors.get(unit, 1.00) ) ) if orientation == 1: - data.loc[:, numeric_elements] = data.loc[ - :, numeric_elements - ].multiply(conversion_factors, axis=axis[orientation]) + data.loc[:, numeric_elements] = data.loc[:, numeric_elements].multiply( + conversion_factors, axis=axis[orientation] + ) else: - data.loc[numeric_elements, :] = data.loc[ - numeric_elements, : - ].multiply(conversion_factors, axis=axis[orientation]) + data.loc[numeric_elements, :] = data.loc[numeric_elements, :].multiply( + conversion_factors, axis=axis[orientation] + ) if log: log.msg('Unit conversion successful.', prepend_timestamp=False) @@ -501,12 +491,10 @@ def load_from_file(filepath, log=None): if not filepath.is_file(): raise FileNotFoundError( - f"The filepath provided does not point to an existing " - f"file: {filepath}" + f"The filepath provided does not point to an existing " f"file: {filepath}" ) if filepath.suffix == '.csv': - # load the contents of the csv into a DataFrame data = pd.read_csv( diff --git a/pelicun/model/asset_model.py b/pelicun/model/asset_model.py index 9cf1f3747..5f3c18006 100644 --- a/pelicun/model/asset_model.py +++ b/pelicun/model/asset_model.py @@ -303,6 +303,7 @@ def load_cmp_model(self, data_source): >>> model.load_cmp_model(data_dict) """ + def get_locations(loc_str): """ Parses a location string to determine specific sections of @@ -559,9 +560,7 @@ def get_attribute(attribute_str, dtype=float, default=np.nan): cmp_marginal_param_series = [] for col, cmp_marginal_param in cmp_marginal_param_dct.items(): cmp_marginal_param_series.append( - pd.Series( - cmp_marginal_param, dtype=dtypes[col], name=col, index=index - ) + pd.Series(cmp_marginal_param, dtype=dtypes[col], name=col, index=index) ) cmp_marginal_params = pd.concat(cmp_marginal_param_series, axis=1) @@ -592,9 +591,7 @@ def get_attribute(attribute_str, dtype=float, default=np.nan): self.cmp_marginal_params = cmp_marginal_params.drop('Units', axis=1) - self.log_msg( - "Model parameters successfully loaded.", prepend_timestamp=False - ) + self.log_msg("Model parameters successfully loaded.", prepend_timestamp=False) self.log_msg( "\nComponent model marginal distributions:\n" + str(cmp_marginal_params), @@ -621,8 +618,7 @@ def _create_cmp_RVs(self): uq.rv_class_map(family)( name=f'CMP-{cmp[0]}-{cmp[1]}-{cmp[2]}-{cmp[3]}', theta=[ - getattr(rv_params, f"Theta_{t_i}", np.nan) - for t_i in range(3) + getattr(rv_params, f"Theta_{t_i}", np.nan) for t_i in range(3) ], truncation_limits=[ getattr(rv_params, f"Truncate{side}", np.nan) @@ -659,7 +655,7 @@ def generate_cmp_sample(self, sample_size=None): generation, or if neither sample size is specified nor can be determined from the demand model. """ - + if self.cmp_marginal_params is None: raise ValueError( 'Model parameters have not been specified. Load' diff --git a/pelicun/model/damage_model.py b/pelicun/model/damage_model.py index aed601650..e5927da6a 100644 --- a/pelicun/model/damage_model.py +++ b/pelicun/model/damage_model.py @@ -127,9 +127,7 @@ def save_sample(self, filepath=None, save_units=False): self.log_msg('Saving damage sample...') cmp_units = self._asmnt.asset.cmp_units - qnt_units = pd.Series( - index=self.sample.columns, name='Units', dtype='object' - ) + qnt_units = pd.Series(index=self.sample.columns, name='Units', dtype='object') for cmp in cmp_units.index: qnt_units.loc[cmp] = cmp_units.loc[cmp] @@ -143,9 +141,7 @@ def save_sample(self, filepath=None, save_units=False): ) if filepath is not None: - self.log_msg( - 'Damage sample successfully saved.', prepend_timestamp=False - ) + self.log_msg('Damage sample successfully saved.', prepend_timestamp=False) return None # else: @@ -483,9 +479,7 @@ def map_ds(values, offset=int(ds_id + 1)): for PG in PGB.index: # determine demand capacity adjustment operation, if required cmp_loc_dir = '-'.join(PG[0:3]) - capacity_adjustment_operation = scaling_specification.get( - cmp_loc_dir, None - ) + capacity_adjustment_operation = scaling_specification.get(cmp_loc_dir, None) cmp_id = PG[0] blocks = PGB.loc[PG, 'Blocks'] @@ -697,9 +691,7 @@ def _generate_dmg_sample(self, sample_size, PGB, scaling_specification=None): # get the capacity and lsds samples capacity_sample = ( - pd.DataFrame(capacity_RVs.RV_sample) - .sort_index(axis=0) - .sort_index(axis=1) + pd.DataFrame(capacity_RVs.RV_sample).sort_index(axis=0).sort_index(axis=1) ) capacity_sample = base.convert_to_MultiIndex(capacity_sample, axis=1)['FRG'] capacity_sample.columns.names = ['cmp', 'loc', 'dir', 'uid', 'block', 'ls'] @@ -901,9 +893,7 @@ def _assemble_required_demand_data(self, EDP_req): # take the maximum of all available directions and scale it # using the nondirectional multiplier specified in the # self._asmnt.options (the default value is 1.2) - demand = ( - demand_source.loc[:, (EDP[0], EDP[1])].max(axis=1).values - ) + demand = demand_source.loc[:, (EDP[0], EDP[1])].max(axis=1).values demand = demand * self._asmnt.options.nondir_multi(EDP[0]) except KeyError: @@ -978,9 +968,7 @@ def _evaluate_damage_state( # Create a dataframe with demand values repeated for the # number of PGs and assign the columns as PG_cols demand_df.append( - pd.concat( - [pd.Series(demand_vals)] * len(PG_cols), axis=1, keys=PG_cols - ) + pd.concat([pd.Series(demand_vals)] * len(PG_cols), axis=1, keys=PG_cols) ) # Concatenate all demand dataframes into a single dataframe @@ -1211,7 +1199,6 @@ def _perform_dmg_task(self, task, ds_sample): # execute the events pres prescribed in the damage task for source_event, target_infos in events.items(): - # events can only be triggered by damage state occurrence if not source_event.startswith('DS'): raise ValueError( @@ -1227,7 +1214,6 @@ def _perform_dmg_task(self, task, ds_sample): target_infos = [target_infos] for target_info in target_infos: - # get the target component and event type target_cmp, target_event = target_info.split('_') @@ -1245,7 +1231,6 @@ def _perform_dmg_task(self, task, ds_sample): # trigger a damage state if target_event.startswith('DS'): - # get the ID of the damage state to switch the target # components to ds_target = int(target_event[2:]) @@ -1527,9 +1512,7 @@ def _complete_ds_cols(self, dmg_sample): # Get the header for the results that we can use to identify # cmp-loc-dir-uid sets - dmg_header = ( - dmg_sample.groupby(level=[0, 1, 2, 3], axis=1).first().iloc[:2, :] - ) + dmg_header = dmg_sample.groupby(level=[0, 1, 2, 3], axis=1).first().iloc[:2, :] # get the number of possible limit states ls_list = [col for col in DP.columns.unique(level=0) if 'LS' in col] @@ -1555,9 +1538,7 @@ def _complete_ds_cols(self, dmg_sample): else: # or if there are more than one, how many - ds_count += len( - cmp_data[(ls, 'DamageStateWeights')].split('|') - ) + ds_count += len(cmp_data[(ls, 'DamageStateWeights')].split('|')) # get the list of valid cmp-loc-dir-uid sets cmp_header = dmg_header.loc[ @@ -1657,7 +1638,6 @@ def calculate_internal( # for PG_i in self._asmnt.asset.cmp_sample.columns: ds_samples = [] for PGB_i in batches: - performance_group = pg_batch.loc[PGB_i] self.log_msg( diff --git a/pelicun/model/demand_model.py b/pelicun/model/demand_model.py index df3276a8a..8707cddc8 100644 --- a/pelicun/model/demand_model.py +++ b/pelicun/model/demand_model.py @@ -140,9 +140,7 @@ def save_sample(self, filepath=None, save_units=False): ) if filepath is not None: - self.log_msg( - 'Demand sample successfully saved.', prepend_timestamp=False - ) + self.log_msg('Demand sample successfully saved.', prepend_timestamp=False) return None # else: @@ -607,9 +605,7 @@ def get_filter_mask(lower_lims, upper_lims): distribution=cal_df.loc[:, 'Family'].values, censored_count=censored_count, detection_limits=cal_df.loc[:, ['CensorLower', 'CensorUpper']].values, - truncation_limits=cal_df.loc[ - :, ['TruncateLower', 'TruncateUpper'] - ].values, + truncation_limits=cal_df.loc[:, ['TruncateLower', 'TruncateUpper']].values, multi_fit=False, logger_object=self._asmnt.log, ) @@ -643,8 +639,7 @@ def get_filter_mask(lower_lims, upper_lims): self.marginal_params = model_params self.log_msg( - "\nCalibrated demand model marginal distributions:\n" - + str(model_params), + "\nCalibrated demand model marginal distributions:\n" + str(model_params), prepend_timestamp=False, ) @@ -654,8 +649,7 @@ def get_filter_mask(lower_lims, upper_lims): ) self.log_msg( - "\nCalibrated demand model correlation matrix:\n" - + str(self.correlation), + "\nCalibrated demand model correlation matrix:\n" + str(self.correlation), prepend_timestamp=False, ) @@ -1000,7 +994,7 @@ def generate_sample(self, config): # This will generate 1000 realizations of demand variables # with the specified configuration. """ - + if self.marginal_params is None: raise ValueError( 'Model parameters have not been specified. Either' diff --git a/pelicun/model/loss_model.py b/pelicun/model/loss_model.py index 471f5e210..6940ca2af 100644 --- a/pelicun/model/loss_model.py +++ b/pelicun/model/loss_model.py @@ -427,9 +427,7 @@ def _create_DV_RVs(self, case_list): # currently, we only support DMG-based loss calculations # but this will be extended in the very near future if driver_type != 'DMG': - raise ValueError( - f"Loss Driver type not recognized: " f"{driver_type}" - ) + raise ValueError(f"Loss Driver type not recognized: " f"{driver_type}") # load the parameters # TODO: remove specific DV_type references and make the code below @@ -466,8 +464,7 @@ def _create_DV_RVs(self, case_list): cost_family = cost_params_DS.get('Family', np.nan) cost_theta = [ - cost_params_DS.get(f"Theta_{t_i}", np.nan) - for t_i in range(3) + cost_params_DS.get(f"Theta_{t_i}", np.nan) for t_i in range(3) ] # If the first parameter is controlled by a function, we use @@ -485,8 +482,7 @@ def _create_DV_RVs(self, case_list): time_family = time_params_DS.get('Family', np.nan) time_theta = [ - time_params_DS.get(f"Theta_{t_i}", np.nan) - for t_i in range(3) + time_params_DS.get(f"Theta_{t_i}", np.nan) for t_i in range(3) ] # If the first parameter is controlled by a function, we use @@ -504,8 +500,7 @@ def _create_DV_RVs(self, case_list): carbon_family = carbon_params_DS.get('Family', np.nan) carbon_theta = [ - carbon_params_DS.get(f"Theta_{t_i}", np.nan) - for t_i in range(3) + carbon_params_DS.get(f"Theta_{t_i}", np.nan) for t_i in range(3) ] # If the first parameter is controlled by a function, we use @@ -523,8 +518,7 @@ def _create_DV_RVs(self, case_list): energy_family = energy_params_DS.get('Family', np.nan) energy_theta = [ - energy_params_DS.get(f"Theta_{t_i}", np.nan) - for t_i in range(3) + energy_params_DS.get(f"Theta_{t_i}", np.nan) for t_i in range(3) ] # If the first parameter is controlled by a function, we use @@ -553,9 +547,7 @@ def _create_DV_RVs(self, case_list): for loc, direction, uid in loc_dir_uid: # assign cost RV if pd.isna(cost_family) is False: - cost_rv_tag = ( - f'Cost-{loss_cmp_id}-{ds}-{loc}-{direction}-{uid}' - ) + cost_rv_tag = f'Cost-{loss_cmp_id}-{ds}-{loc}-{direction}-{uid}' RV_reg.add_RV( uq.rv_class_map(cost_family)( @@ -568,9 +560,7 @@ def _create_DV_RVs(self, case_list): # assign time RV if pd.isna(time_family) is False: - time_rv_tag = ( - f'Time-{loss_cmp_id}-{ds}-{loc}-{direction}-{uid}' - ) + time_rv_tag = f'Time-{loss_cmp_id}-{ds}-{loc}-{direction}-{uid}' RV_reg.add_RV( uq.rv_class_map(time_family)( @@ -624,16 +614,12 @@ def _create_DV_RVs(self, case_list): RV_reg.add_RV_set( uq.RandomVariableSet( f'DV-{loss_cmp_id}-{ds}-{loc}-{direction}-{uid}_set', - list( - RV_reg.RVs([cost_rv_tag, time_rv_tag]).values() - ), + list(RV_reg.RVs([cost_rv_tag, time_rv_tag]).values()), np.array([[1.0, rho], [rho, 1.0]]), ) ) - self.log_msg( - f"\n{rv_count} random variables created.", prepend_timestamp=False - ) + self.log_msg(f"\n{rv_count} random variables created.", prepend_timestamp=False) if rv_count > 0: return RV_reg @@ -678,7 +664,7 @@ def _calc_median_consequence(self, eco_qnt): recognized, or if the parameters are incomplete or unsupported. """ - + medians = {} DV_types = self.loss_params.index.unique(level=1) @@ -717,9 +703,7 @@ def _calc_median_consequence(self, eco_qnt): if ds_id == '0': continue - loss_params_DS = self.loss_params.loc[ - (loss_cmp_name, DV_type), ds - ] + loss_params_DS = self.loss_params.loc[(loss_cmp_name, DV_type), ds] # check if theta_0 is defined theta_0 = loss_params_DS.get('Theta_0', np.nan) @@ -936,9 +920,7 @@ def _generate_DV_sample(self, dmg_quantities, sample_size): res_list = [] key_list = [] - dmg_quantities.columns = dmg_quantities.columns.reorder_levels( - [0, 4, 1, 2, 3] - ) + dmg_quantities.columns = dmg_quantities.columns.reorder_levels([0, 4, 1, 2, 3]) dmg_quantities.sort_index(axis=1, inplace=True) DV_types = self.loss_params.index.unique(level=1) @@ -979,13 +961,11 @@ def _generate_DV_sample(self, dmg_quantities, sample_size): loc_list = [] for loc_id, loc in enumerate( - dmg_quantities.loc[:, (dmg_cmp_i, ds)].columns.unique( - level=0 - ) + dmg_quantities.loc[:, (dmg_cmp_i, ds)].columns.unique(level=0) ): - if ( - self._asmnt.options.eco_scale["AcrossFloors"] is True - ) and (loc_id > 0): + if (self._asmnt.options.eco_scale["AcrossFloors"] is True) and ( + loc_id > 0 + ): break if self._asmnt.options.eco_scale["AcrossFloors"] is True: @@ -1090,7 +1070,7 @@ def aggregate_losses(self): Each of these columns is summed or calculated based on the repair data available. """ - + self.log_div() self.log_msg("Aggregating repair consequences...") diff --git a/pelicun/model/pelicun_model.py b/pelicun/model/pelicun_model.py index 8a6a5d5ed..8a11acb78 100644 --- a/pelicun/model/pelicun_model.py +++ b/pelicun/model/pelicun_model.py @@ -189,9 +189,7 @@ def convert_marginal_params(self, marginal_params, units, arg_units=None): if arg_unit != '1 EA': # get the scale factor - arg_unit_factor = self._asmnt.calc_unit_scale_factor( - arg_unit - ) + arg_unit_factor = self._asmnt.calc_unit_scale_factor(arg_unit) # scale arguments, if needed for a_i, arg in enumerate(args): @@ -214,13 +212,11 @@ def convert_marginal_params(self, marginal_params, units, arg_units=None): ) # and update the values in the DF - marginal_params.loc[row_id, ['Theta_0', 'Theta_1', 'Theta_2']] = ( - theta - ) + marginal_params.loc[row_id, ['Theta_0', 'Theta_1', 'Theta_2']] = theta - marginal_params.loc[row_id, ['TruncateLower', 'TruncateUpper']] = ( - tr_limits - ) + marginal_params.loc[ + row_id, ['TruncateLower', 'TruncateUpper'] + ] = tr_limits # remove the added columns marginal_params = marginal_params[original_cols] diff --git a/pelicun/resources/auto/Hazus_Earthquake_IM.py b/pelicun/resources/auto/Hazus_Earthquake_IM.py index 90f720933..91f1e8535 100644 --- a/pelicun/resources/auto/Hazus_Earthquake_IM.py +++ b/pelicun/resources/auto/Hazus_Earthquake_IM.py @@ -409,10 +409,8 @@ def auto_populate(AIM): } }, "Options": { - "NonDirectionalMultipliers": { - "ALL": 1.0 - }, - } + "NonDirectionalMultipliers": {"ALL": 1.0}, + }, } elif assetType == "TransportationNetwork": @@ -447,10 +445,8 @@ def auto_populate(AIM): } }, "Options": { - "NonDirectionalMultipliers": { - "ALL": 1.0 - }, - } + "NonDirectionalMultipliers": {"ALL": 1.0}, + }, } elif inf_type == "HwyTunnel": @@ -482,10 +478,8 @@ def auto_populate(AIM): } }, "Options": { - "NonDirectionalMultipliers": { - "ALL": 1.0 - }, - } + "NonDirectionalMultipliers": {"ALL": 1.0}, + }, } elif inf_type == "Roadway": # get the road class @@ -515,16 +509,13 @@ def auto_populate(AIM): } }, "Options": { - "NonDirectionalMultipliers": { - "ALL": 1.0 - }, - } + "NonDirectionalMultipliers": {"ALL": 1.0}, + }, } else: print("subtype not supported in HWY") elif assetType == "WaterDistributionNetwork": - pipe_material_map = { "CI": "B", "AC": "B", @@ -703,7 +694,6 @@ def auto_populate(AIM): } elif wdn_element_type == "Tank": - tank_cmp_lines = { ("OG", "C", 1): {'PST.G.C.A.GS': ['ea', 1, 1, 1, 'N/A']}, ("OG", "C", 0): {'PST.G.C.U.GS': ['ea', 1, 1, 1, 'N/A']}, diff --git a/pelicun/resources/auto/Hazus_Earthquake_Story.py b/pelicun/resources/auto/Hazus_Earthquake_Story.py index 24fc1da64..74ff13465 100644 --- a/pelicun/resources/auto/Hazus_Earthquake_Story.py +++ b/pelicun/resources/auto/Hazus_Earthquake_Story.py @@ -267,10 +267,8 @@ def auto_populate(AIM): "Demands": {}, "Losses": {"Repair": repair_config}, "Options": { - "NonDirectionalMultipliers": { - "ALL": 1.0 - }, - } + "NonDirectionalMultipliers": {"ALL": 1.0}, + }, } else: diff --git a/pelicun/tests/reset_tests.py b/pelicun/tests/reset_tests.py index 1f9068877..9c7fc8512 100644 --- a/pelicun/tests/reset_tests.py +++ b/pelicun/tests/reset_tests.py @@ -91,8 +91,7 @@ def reset_all_test_data(restore=True, purge=False): cwd = os.path.basename(os.getcwd()) if cwd != 'pelicun': raise OSError( - 'Wrong directory. ' - 'See the docstring of `reset_all_test_data`. Aborting' + 'Wrong directory. ' 'See the docstring of `reset_all_test_data`. Aborting' ) # where the test result data are stored diff --git a/pelicun/tests/test_auto.py b/pelicun/tests/test_auto.py index 36458cb91..5bf2f34f0 100644 --- a/pelicun/tests/test_auto.py +++ b/pelicun/tests/test_auto.py @@ -109,9 +109,7 @@ def test_pelicun_default_path_replacement( assert modified_path.startswith(setup_expected_base_path) -def test_auto_population_script_execution( - setup_valid_config, setup_auto_script_path -): +def test_auto_population_script_execution(setup_valid_config, setup_auto_script_path): with patch('pelicun.base.pelicun_path', '/expected/path'), patch( 'os.path.exists', return_value=True ), patch('importlib.__import__') as mock_import: diff --git a/pelicun/tests/test_base.py b/pelicun/tests/test_base.py index f234188f7..57f3f6d5b 100644 --- a/pelicun/tests/test_base.py +++ b/pelicun/tests/test_base.py @@ -414,9 +414,7 @@ def test_convert_to_MultiIndex(): assert data.index.equals(pd.Index(('A-1', 'B-1', 'C-1'))) # Test a case where the index is already a MultiIndex - data_converted = base.convert_to_MultiIndex( - data_converted, axis=0, inplace=False - ) + data_converted = base.convert_to_MultiIndex(data_converted, axis=0, inplace=False) assert data_converted.index.equals(expected_index) # Test a case where the columns need to be converted to a MultiIndex @@ -428,9 +426,7 @@ def test_convert_to_MultiIndex(): assert data.columns.equals(pd.Index(('A-1', 'B-1'))) # Test a case where the columns are already a MultiIndex - data_converted = base.convert_to_MultiIndex( - data_converted, axis=1, inplace=False - ) + data_converted = base.convert_to_MultiIndex(data_converted, axis=1, inplace=False) assert data_converted.columns.equals(expected_columns) # Test an invalid axis parameter @@ -509,9 +505,7 @@ def test_describe(): # case 1: # passing a dataframe - df = pd.DataFrame( - ((1.00, 2.00, 3.00), (4.00, 5.00, 6.00)), columns=['A', 'B', 'C'] - ) + df = pd.DataFrame(((1.00, 2.00, 3.00), (4.00, 5.00, 6.00)), columns=['A', 'B', 'C']) desc = base.describe(df) assert np.all(desc.index == expected_idx) assert np.all(desc.columns == pd.Index(('A', 'B', 'C'), dtype='object')) diff --git a/pelicun/tests/test_file_io.py b/pelicun/tests/test_file_io.py index 5a141cef9..c98bd3afe 100644 --- a/pelicun/tests/test_file_io.py +++ b/pelicun/tests/test_file_io.py @@ -170,9 +170,7 @@ def test_load_data(): assert isinstance(data.columns, pd.core.indexes.multi.MultiIndex) assert data.columns.nlevels == 4 - _, units = file_io.load_data( - filepath, unit_conversion_factors, return_units=True - ) + _, units = file_io.load_data(filepath, unit_conversion_factors, return_units=True) for item in unit_conversion_factors: assert item in units.unique() diff --git a/pelicun/tests/test_model.py b/pelicun/tests/test_model.py index 23554ebc3..a0a6fea12 100644 --- a/pelicun/tests/test_model.py +++ b/pelicun/tests/test_model.py @@ -243,13 +243,10 @@ def test_estimate_RID(self, demand_model_with_sample): res = demand_model_with_sample.estimate_RID(demands, params) assert list(res.columns) == [('RID', '1', '1')] assert ( - demand_model_with_sample.estimate_RID(demands, params, method='xyz') - is None + demand_model_with_sample.estimate_RID(demands, params, method='xyz') is None ) - def test_calibrate_model( - self, calibrated_demand_model, demand_model_with_sample_C - ): + def test_calibrate_model(self, calibrated_demand_model, demand_model_with_sample_C): assert calibrated_demand_model.marginal_params['Family'].to_list() == [ 'normal', 'normal', @@ -372,9 +369,7 @@ def test_save_load_model_with_empirical( def test_generate_sample_exceptions(self, demand_model): # generating a sample from a non calibrated model should fail with pytest.raises(ValueError): - demand_model.generate_sample( - {"SampleSize": 3, 'PreserveRawOrder': False} - ) + demand_model.generate_sample({"SampleSize": 3, 'PreserveRawOrder': False}) def test_generate_sample(self, calibrated_demand_model): calibrated_demand_model.generate_sample( @@ -517,9 +512,7 @@ def test_convert_marginal_params(self, pelicun_model): ) units = pd.Series(['ea'], index=marginal_params.index) arg_units = None - res = pelicun_model.convert_marginal_params( - marginal_params, units, arg_units - ) + res = pelicun_model.convert_marginal_params(marginal_params, units, arg_units) # >>> res # Theta_0 @@ -557,9 +550,7 @@ def test_convert_marginal_params(self, pelicun_model): ) units = pd.Series(['ea', 'ft', 'in', 'in2'], index=marginal_params.index) arg_units = None - res = pelicun_model.convert_marginal_params( - marginal_params, units, arg_units - ) + res = pelicun_model.convert_marginal_params(marginal_params, units, arg_units) expected_df = pd.DataFrame( { @@ -595,9 +586,7 @@ def test_convert_marginal_params(self, pelicun_model): ) units = pd.Series(['test_three'], index=marginal_params.index) arg_units = pd.Series(['test_two'], index=marginal_params.index) - res = pelicun_model.convert_marginal_params( - marginal_params, units, arg_units - ) + res = pelicun_model.convert_marginal_params(marginal_params, units, arg_units) # >>> res # Theta_0 @@ -706,9 +695,7 @@ def test_load_cmp_model_1(self, asset_model): check_dtype=False, ) - expected_cmp_units = pd.Series( - data=['ea'], index=['component_a'], name='Units' - ) + expected_cmp_units = pd.Series(data=['ea'], index=['component_a'], name='Units') pd.testing.assert_series_equal( expected_cmp_units, @@ -1071,9 +1058,7 @@ def test_save_load_sample(self, damage_model_with_sample, assessment_instance): check_index_type=False, check_column_type=False, ) - _, units_from_variable = damage_model_with_sample.save_sample( - save_units=True - ) + _, units_from_variable = damage_model_with_sample.save_sample(save_units=True) assert np.all(units_from_variable.to_numpy() == 'ea') def test_load_damage_model(self, damage_model_model_loaded): @@ -1355,9 +1340,7 @@ def test__evaluate_damage_state_and_prepare_dmg_quantities( demand_dict, EDP_req, capacity_sample, lsds_sample ) - qnt_sample = damage_model._prepare_dmg_quantities( - ds_sample, dropzero=False - ) + qnt_sample = damage_model._prepare_dmg_quantities(ds_sample, dropzero=False) # note: the realized number of damage states is random, limiting # our assertions @@ -1370,7 +1353,6 @@ def test__evaluate_damage_state_and_prepare_dmg_quantities( assert list(qnt_sample.columns)[0] == ('B.10.31.001', '2', '2', '0', '0') def test__perform_dmg_task(self, assessment_instance): - damage_model = assessment_instance.damage # @@ -1913,18 +1895,10 @@ def test__create_DV_RVs(self, repair_model, loss_params_A): for rv in rvs: print(rv.theta) assert rv.distribution == 'normal' - np.testing.assert_array_equal( - rvs[0].theta, np.array((1.00, 0.390923, np.nan)) - ) - np.testing.assert_array_equal( - rvs[1].theta, np.array((1.00, 0.464027, np.nan)) - ) - np.testing.assert_array_equal( - rvs[2].theta, np.array((1.00, 0.390923, np.nan)) - ) - np.testing.assert_array_equal( - rvs[3].theta, np.array((1.00, 0.464027, np.nan)) - ) + np.testing.assert_array_equal(rvs[0].theta, np.array((1.00, 0.390923, np.nan))) + np.testing.assert_array_equal(rvs[1].theta, np.array((1.00, 0.464027, np.nan))) + np.testing.assert_array_equal(rvs[2].theta, np.array((1.00, 0.390923, np.nan))) + np.testing.assert_array_equal(rvs[3].theta, np.array((1.00, 0.464027, np.nan))) def test__calc_median_consequence(self, repair_model, loss_params_A): repair_model.loss_params = loss_params_A diff --git a/pelicun/tests/test_uq.py b/pelicun/tests/test_uq.py index eeed09545..3d4814a93 100644 --- a/pelicun/tests/test_uq.py +++ b/pelicun/tests/test_uq.py @@ -175,9 +175,7 @@ def test__get_theta(): def test__get_limit_probs(): # verify that it works for valid inputs - res = uq._get_limit_probs( - np.array((0.10, 0.20)), 'normal', np.array((0.15, 1.00)) - ) + res = uq._get_limit_probs(np.array((0.10, 0.20)), 'normal', np.array((0.15, 1.00))) assert np.allclose(res, np.array((0.4800611941616275, 0.5199388058383725))) res = uq._get_limit_probs( @@ -905,9 +903,7 @@ def test_LogNormalRandomVariable_cdf(): ) x = (-1.0, 0.0, 0.5, 1.0, 2.0) cdf = rv.cdf(x) - assert np.allclose( - cdf, (0.0, 0.0, 0.23597085, 0.49461712, 0.75326339), rtol=1e-5 - ) + assert np.allclose(cdf, (0.0, 0.0, 0.23597085, 0.49461712, 0.75326339), rtol=1e-5) # upper truncation rv = uq.LogNormalRandomVariable( @@ -917,9 +913,7 @@ def test_LogNormalRandomVariable_cdf(): ) x = (-1.0, 0.0, 0.5, 1.0, 2.0) cdf = rv.cdf(x) - assert np.allclose( - cdf, (0.00, 0.00, 0.25797755, 0.52840734, 0.79883714), rtol=1e-5 - ) + assert np.allclose(cdf, (0.00, 0.00, 0.25797755, 0.52840734, 0.79883714), rtol=1e-5) # no truncation rv = uq.LogNormalRandomVariable('test_rv', theta=(1.0, 1.0)) @@ -1054,9 +1048,7 @@ def test_UniformRandomVariable_inverse_transform(): def test_MultinomialRandomVariable(): # multinomial with invalid p values provided in the theta vector with pytest.raises(ValueError): - uq.MultinomialRandomVariable( - 'rv_invalid', np.array((0.20, 0.70, 0.10, 42.00)) - ) + uq.MultinomialRandomVariable('rv_invalid', np.array((0.20, 0.70, 0.10, 42.00))) def test_MultilinearCDFRandomVariable(): @@ -1159,9 +1151,7 @@ def test_DeterministicRandomVariable_inverse_transform(): rv = uq.DeterministicRandomVariable('test_rv', theta=np.array((0.00,))) rv.inverse_transform_sampling(4) inverse_transform = rv.sample - assert np.allclose( - inverse_transform, np.array((0.00, 0.00, 0.00, 0.00)), rtol=1e-5 - ) + assert np.allclose(inverse_transform, np.array((0.00, 0.00, 0.00, 0.00)), rtol=1e-5) def test_RandomVariable_Set(): diff --git a/pelicun/tools/DL_calculation.py b/pelicun/tools/DL_calculation.py index 425376ec1..2a0ad7750 100644 --- a/pelicun/tools/DL_calculation.py +++ b/pelicun/tools/DL_calculation.py @@ -452,7 +452,7 @@ def run_pelicun( ) if not sample_size_str: # give up - print('Sampling size not provided in config file.') + print('Sample size not provided in config file.') return -1 sample_size = int(sample_size_str) @@ -1021,7 +1021,6 @@ def run_pelicun( # TODO: we can improve this by creating a water # network-specific assessment class if "Water" in config['DL']['Asset']['ComponentDatabase']: - # add a placeholder aggregate fragility that will never trigger # damage, but allow damage processes to aggregate the # various pipeline damages @@ -1570,7 +1569,6 @@ def run_pelicun( ) elif repair_config['MapApproach'] == "User Defined": - if repair_config.get('MapFilePath', False) is not False: loss_map_path = repair_config['MapFilePath'] @@ -1767,14 +1765,12 @@ def run_pelicun( damage_sample_s['irreparable'] = np.zeros(damage_sample_s.shape[0]) if 'Losses' in config['DL']: - if 'agg_repair' not in locals(): agg_repair = PAL.repair.aggregate_losses() agg_repair_s = convert_to_SimpleIndex(agg_repair, axis=1) else: - agg_repair_s = pd.DataFrame() summary = pd.concat( @@ -1843,7 +1839,6 @@ def run_pelicun( def main(): - args = sys.argv[1:] parser = argparse.ArgumentParser() diff --git a/pelicun/tools/HDF_to_CSV.py b/pelicun/tools/HDF_to_CSV.py index a72b66eb0..cb9e04acc 100644 --- a/pelicun/tools/HDF_to_CSV.py +++ b/pelicun/tools/HDF_to_CSV.py @@ -44,7 +44,6 @@ def convert_HDF(HDF_path): - HDF_ext = HDF_path.split('.')[-1] CSV_base = HDF_path[: -len(HDF_ext) - 1] @@ -53,14 +52,12 @@ def convert_HDF(HDF_path): store = pd.HDFStore(HDF_path) for key in store.keys(): - store[key].to_csv(f'{CSV_base}_{key[1:].replace("/","_")}.csv') store.close() if __name__ == '__main__': - args = sys.argv[1:] parser = argparse.ArgumentParser() diff --git a/pelicun/tools/export_DB.py b/pelicun/tools/export_DB.py index b4b8b0343..24d4563cf 100644 --- a/pelicun/tools/export_DB.py +++ b/pelicun/tools/export_DB.py @@ -59,7 +59,6 @@ def export_DB(data_path, target_dir): DB_df = pd.read_hdf(data_path, 'data') for row_id, row in DB_df.iterrows(): - row_dict = convert_Series_to_dict(row) with open(target_dir_data / f'{row_id}.json', 'w', encoding='utf-8') as f: @@ -68,13 +67,11 @@ def export_DB(data_path, target_dir): # add population if it exists try: - DB_df = pd.read_hdf(data_path, 'pop') pop_dict = {} for row_id, row in DB_df.iterrows(): - pop_dict.update({row_id: convert_Series_to_dict(row)}) with open(target_dir / 'population.json', 'w', encoding='utf-8') as f: @@ -85,7 +82,6 @@ def export_DB(data_path, target_dir): if __name__ == '__main__': - args = sys.argv[1:] parser = argparse.ArgumentParser() diff --git a/pelicun/uq.py b/pelicun/uq.py index 472c2bf81..8e74952d5 100644 --- a/pelicun/uq.py +++ b/pelicun/uq.py @@ -1281,7 +1281,7 @@ class UtilityRandomVariable(BaseRandomVariable): @abstractmethod def __init__( self, - name, + name, f_map=None, anchor=None, ): @@ -1291,7 +1291,7 @@ def __init__( Parameters ---------- name: string - A unique string that identifies the random variable. + A unique string that identifies the random variable. f_map: function, optional A user-defined function that is applied on the realizations before returning a sample. @@ -1789,9 +1789,9 @@ def __init__( anchor=None, ): super().__init__( - name=name, + name=name, theta=raw_samples, - truncation_limits=truncation_limits, + truncation_limits=truncation_limits, f_map=f_map, anchor=anchor, ) @@ -1906,9 +1906,7 @@ def inverse_transform(self, sample_size): """ raw_sample_count = len(self._raw_samples) - new_sample = np.tile( - self._raw_samples, int(sample_size / raw_sample_count) + 1 - ) + new_sample = np.tile(self._raw_samples, int(sample_size / raw_sample_count) + 1) result = new_sample[:sample_size] return result