From 2d88bb2229a47bdd3477f3373c451b5fb0f49e8c Mon Sep 17 00:00:00 2001 From: RyanSpies-NOAA Date: Mon, 26 Aug 2024 22:42:28 +0000 Subject: [PATCH 1/7] New data processing code for spatial point data processing --- data/nws/ahps_bench_polys_to_calb_pts.py | 664 +++++++++++++++++++++++ 1 file changed, 664 insertions(+) create mode 100644 data/nws/ahps_bench_polys_to_calb_pts.py diff --git a/data/nws/ahps_bench_polys_to_calb_pts.py b/data/nws/ahps_bench_polys_to_calb_pts.py new file mode 100644 index 00000000..a0525ad3 --- /dev/null +++ b/data/nws/ahps_bench_polys_to_calb_pts.py @@ -0,0 +1,664 @@ +import argparse +import logging +import os +import re + +import geopandas as gpd +import pandas as pd + + +def find_max_elevation(interpolated_flows_path, attributes_path): + """ + Finds the maximum elevation from a flows CSV file, associates it with attributes, and updates paths. + + Parameters: + - interpolated_flows_path (str): Path to the CSV file containing interpolated flows data. + - attributes_path (str): Path to the CSV file containing attributes data. + + Returns: + - tuple: A tuple containing the maximum elevation, updated path, study limit file path, levee file path, + and a DataFrame row with additional attributes. If no valid rows or an error occurs, returns (None, None, None, None, None). + """ + try: + # Read the _interpolated_flows.csv file + df_flows = pd.read_csv(interpolated_flows_path) + # Read the attributes file + df_attributes = pd.read_csv(attributes_path) + + # Filter rows with valid numeric "flow" and non-empty "path" string + df_valid = df_flows[ + (pd.to_numeric(df_flows['flow'], errors='coerce').notnull()) + & (df_flows['path'].astype(str) != '') + ] + + # Check if the dataframe is empty after filtering + if df_valid.empty: + logging.warning(f"No valid rows in {interpolated_flows_path}.") + return None, None, None, None, None + + # Find the maximum "elevation" value + df_find_max = df_valid.loc[df_valid['elevation'].idxmax()].copy() + + # Extract first instance of specific columns from df_attributes + columns_to_extract = ['nws_lid', 'wfo', 'rfc', 'state', 'huc', 'grid_flow_source'] + for col in columns_to_extract: + df_find_max[col] = df_attributes[col].dropna().iloc[0] if col in df_attributes.columns else None + + # Assign specific values to certain columns + df_find_max['magnitude'] = 'maximum' # Set the magnitude to "maximum" + + # Assign null values to certain columns + columns_to_nullify = [ + 'magnitude_stage', + 'magnitude_elev_navd88', + 'grid_name', + 'grid_stage', + 'grid_elev_navd88', + 'grid_flow_cfs', + ] + for col in columns_to_nullify: + df_find_max[col] = None + + # Update the path using the new function + updated_path = update_path(df_find_max['path']) + if updated_path is None: + return None, None, None, None, None + + # Update the row with the new path + df_find_max['path'] = updated_path + + # Back out one more directory level + base_dir = os.path.dirname(os.path.dirname(updated_path)) + + # Find shapefiles in the directory + study_limit_file, levee_file = find_shapefiles_in_directory(base_dir) + + logging.debug(f"Max elevation found in {interpolated_flows_path}: {df_find_max['elevation']}") + + # Return all relevant information including df_joined + return (df_find_max['elevation'], updated_path, study_limit_file, levee_file, df_find_max) + except Exception as e: + logging.error(f"Error processing find_max_elevation function for {interpolated_flows_path}: {e}") + return None, None, None, None, None + + +def update_path(original_path): + """ + Updates the given path string by replacing certain patterns and checking if the file exists. + + Parameters: + - original_path (str): The original file path to update. + + Returns: + - str or None: The updated path if it exists, otherwise None. + """ + try: + # Replace "ahps_inundation_libraries" with "nws" + updated_path = original_path.replace("ahps_inundation_libraries", "nws") + + # Replace "depth_grids" (case-insensitive) with "polygons" + updated_path = re.sub(r"depth_grids", "polygons", updated_path, flags=re.IGNORECASE) + + # Replace the .tif with .shp + updated_path = updated_path.replace(".tif", ".shp") + + # Check if the updated path file exists + if os.path.exists(updated_path): + logging.debug(f"File exists: {updated_path}") + return updated_path + else: + updated_path = re.sub(r"_0.shp", ".shp", updated_path, flags=re.IGNORECASE) + if os.path.exists(updated_path): + logging.debug(f"File exists: {updated_path}") + return updated_path + else: + logging.warning(f"Unexpected - File does not exist: {updated_path}") + return None + except Exception as e: + logging.error(f"Error updating path {original_path}: {e}") + return None + + +def find_shapefiles_in_directory(base_dir): + """ + Search for specific shapefiles in the given directory and its subdirectories. + + Parameters: + - base_dir (str): The base directory to start the search. + + Returns: + - tuple: A tuple containing the paths of the 'study_limit.shp' and 'levee' shapefiles. + """ + try: + # Initialize variables for the search results + study_limit_file = None + levee_file = None + + # Search for shapefiles + for root, dirs, files in os.walk(base_dir): + for file in files: + if re.search(r"study_limit\.shp$", file, re.IGNORECASE): + study_limit_file = os.path.join(root, file) + elif re.search(r"levee.*\.shp$", file, re.IGNORECASE): + levee_file = os.path.join(root, file) + + # Break out of the loop if both files are found + if study_limit_file and levee_file: + break + if study_limit_file and levee_file: + break + + if study_limit_file: + logging.info(f"Found 'study_limit.shp' file: {study_limit_file}") + else: + logging.info(f"No 'study_limit.shp' file found in the directories under {base_dir}.") + + if levee_file: + logging.info(f"Found 'levee' shapefile: {levee_file}") + else: + logging.info(f"No 'levee' shapefile found in the directories under {base_dir}.") + + return study_limit_file, levee_file + except Exception as e: + logging.error(f"Error searching for shapefiles in {base_dir}: {e}") + return None, None + + +def check_max_elevation(csv_path, max_elevation): + """ + Checks if the maximum elevation in a flows file is greater than the maximum elevation in an attributes file. + + Parameters: + - csv_path (str): Path to the CSV file containing attributes data. + - max_elevation (float): The maximum elevation value from the flows data. + + Returns: + - bool: True if the max elevation in the flows file is greater than in the attributes file, False otherwise. + """ + try: + # Read the _attributes.csv file + df_attributes = pd.read_csv(csv_path) + + # Find the maximum value in the "magnitude_elev_navd88" column + max_magnitude_elev = df_attributes['magnitude_elev_navd88'].max() + + logging.debug(f"Max magnitude_elev_navd88 in {csv_path}: {max_magnitude_elev}") + + # Compare the elevations + if max_elevation > max_magnitude_elev: + logging.info( + f"Max elevation {max_elevation} in flows file is greater than {max_magnitude_elev} in attributes file {csv_path}." + ) + return True + logging.info( + f"Max elevation {max_elevation} in flows file is NOT greater than {max_magnitude_elev} in attributes file {csv_path}." + ) + return False + except Exception as e: + logging.error(f"Error processing check_max_elevation function for {csv_path}: {e}") + return False + + +def find_flood_magnitude_shapes(attributes_path, interpolated_flows_path): + """ + Finds flood magnitude polygon shapefiles by joining attributes and flows data, and updates paths. + + Parameters: + - attributes_path (str): Path to the CSV file containing attributes data. + - interpolated_flows_path (str): Path to the CSV file containing interpolated flows data. + + Returns: + - list of dict: A list of dictionaries containing flood magnitude data including updated paths and shapefiles. + Returns an empty list if no valid data is found or an error occurs. + """ + try: + # Read the attributes file + df_attributes = pd.read_csv(attributes_path) + df_flows = pd.read_csv(interpolated_flows_path) + + # Log the types of columns found + logging.debug(f"Columns in {attributes_path}: {df_attributes.columns.tolist()}") + + # Check if the required columns exist + if 'magnitude' not in df_attributes.columns or 'grid_name' not in df_attributes.columns: + logging.error(f"Required columns 'magnitude' or 'grid_name' not found in {attributes_path}.") + return {} + + if 'name' not in df_flows.columns: + logging.error(f"Required column 'name' not found in {interpolated_flows_path}.") + return {} + + # Filter rows with non-null and non-empty 'magnitude' values + df_valid = df_attributes[ + df_attributes['magnitude'].notnull() & (df_attributes['magnitude'].astype(str).str.strip() != '') + ] + df_valid = df_attributes[ + df_attributes['grid_name'].notnull() & (df_attributes['grid_name'].astype(str).str.strip() != '') + ] + + # Remove all ".tif" extensions from the 'grid_name' column + df_valid['grid_name'] = df_valid['grid_name'].str.replace('.tif', '', regex=False) + + if df_valid.empty: + logging.warning(f"No valid entries found in the 'magnitude' column of {attributes_path}.") + return {} + + # Join df_valid and df_flows using 'grid_name' from df_valid and 'name' from df_flows + df_joined = df_valid.merge(df_flows, left_on='grid_name', right_on='name', how='inner') + + if df_joined.empty: + logging.warning(f"No matching entries found between attributes and flows data: {attributes_path}") + return {} + + # Prepare a list to store the results + results = [] + + # Prepare the dictionary with 'magnitude' and grid paths + flood_cat_paths = {} + for _, row in df_joined.iterrows(): + magnitude = row['magnitude'] + path = row['path'] # Assuming path is the desired column to return + + # Store in dictionary with magnitude as key and path as value + flood_cat_paths[magnitude] = path + + # Update the path using the new function + updated_path = update_path(path) + if updated_path is None: + return None, None, None, None, None + + # Update the row with the new path + row['path'] = updated_path + + # Back out one more directory level + base_dir = os.path.dirname(os.path.dirname(updated_path)) + + # Find shapefiles in the directory + study_limit_file, levee_file = find_shapefiles_in_directory(base_dir) + + # Append all relevant information to the results list + results.append( + { + 'magnitude': magnitude, + 'elevation': row['elevation'], + 'path': updated_path, + 'study_limit_file': study_limit_file, + 'levee_file': levee_file, + 'attributes': row, + } + ) + + logging.debug(f"Flood category map {magnitude} found in {attributes_path}: {row['elevation']}") + print(f"Flood category map {magnitude} found in {attributes_path}: {row['elevation']}") + + return results + + except Exception as e: + logging.error(f"Error processing attributes file {attributes_path}: {e}") + return [] + + +def process_shapefile_to_parquet_with_sampling( + shapefile_path, study_limit_file, levee_file, point_attributes, output_dir, step=5 +): + """ + Processes a shapefile to extract edge vertices with sampling, applies buffer operations using other shapefiles, + and converts the output to a Parquet file. + + Parameters: + - shapefile_path (str): Path to the input shapefile containing polygon or multipolygon geometries. + - study_limit_file (str): Path to the study limit shapefile, which will be used for buffering. + - levee_file (str): Path to the levee shapefile, which will also be used for buffering. + - point_attributes (dict): Dictionary containing point attributes like 'nws_lid', 'flow', 'magnitude', etc., + to be added to each extracted point. + - output_dir (str): Directory where the output files (Parquet and optional buffer polygons) will be saved. + - step (int, optional): Sampling step for extracting vertices from polygon edges. Defaults to 5. + + Returns: + - gpd.GeoDataFrame: A GeoDataFrame containing sampled points with associated attributes. + Returns None if an error occurs. + + The function performs the following steps: + 1. Reads the input shapefile using GeoPandas. + 2. Logs and checks the types of geometries present in the shapefile. + 3. If the 'nws_lid' is 'tarn7', it dissolves multipolygons into single polygons. + 4. Extracts edge vertices from polygons and multipolygons using a specified sampling step. + 5. Creates a GeoDataFrame of the sampled points, transforms its CRS to EPSG:5070, and attaches point attributes. + 6. Buffers geometries from the provided study limit and levee shapefiles and removes points within these buffer areas. + 7. Returns the GeoDataFrame of sampled points with attached attributes. + + Any errors encountered during processing are logged. + """ + try: + try: + gdf = gpd.read_file(shapefile_path) + except Exception as e: + logging.error(f"Failed to read shapefile {shapefile_path}: {e}") + return + + # Log the types of geometries found + logging.debug(f"Geometries found in {shapefile_path}: {gdf.geometry.type.unique()}") + + # Check if the shapefile contains valid polygons or multipolygons + if gdf.empty or not any(gdf.geometry.type.isin(['Polygon', 'MultiPolygon'])): + logging.warning(f"No valid polygons found in {shapefile_path}.") + return + + # Dissolve MultiPolygons into single Polygons if nws_lid is "tarn7" + if point_attributes.get('nws_lid') == 'tarn7': + if 'MultiPolygon' in gdf.geometry.geom_type.unique(): + # Unify all geometries into a single geometry and then break into polygons + unified_geom = gdf.geometry.unary_union + gdf = gpd.GeoDataFrame(geometry=[unified_geom], crs=gdf.crs) + gdf = gdf.explode(index_parts=True) + + # Extract edge vertices with sampling + all_points = [] + for geom in gdf.geometry: + if geom.geom_type == 'Polygon': + coords = list(geom.exterior.coords) + sampled_coords = coords[::step] # Take every `step`th point + all_points.extend(sampled_coords) + elif geom.geom_type == 'MultiPolygon': + # In case any MultiPolygon is left, iterate over each Polygon + for poly in geom.geoms: + coords = list(poly.exterior.coords) + sampled_coords = coords[::step] # Take every `step`th point + all_points.extend(sampled_coords) + else: + logging.warning(f"Geometry type {geom.geom_type} is not handled in {shapefile_path}.") + + if not all_points: + logging.warning(f"No points extracted from {shapefile_path}.") + return + + # Create a GeoDataFrame of the points with EPSG:5070 CRS + gdf_points = gpd.GeoDataFrame( + geometry=gpd.points_from_xy([p[0] for p in all_points], [p[1] for p in all_points]), crs=gdf.crs + ) + + # Transform the CRS of points to EPSG:5070 + gdf_points = gdf_points.to_crs(epsg=5070) + + # Convert "flow" from cubic feet to cubic meters + point_attributes['flow'] = point_attributes['flow'] * 0.0283168 + point_attributes['flow_unit'] = 'cms' # Set the magnitude to "maximum" + + # Rename 'huc' to 'HUC8' if it exists + if 'huc' in point_attributes: + point_attributes['HUC8'] = point_attributes.pop('huc') + + # Define the list of attributes to keep + attributes_to_keep = [ + 'nws_lid', + 'magnitude', + 'path', + 'name', + 'elevation', + 'stage', + 'flow', + 'flow_unit', + 'flow_source', + 'wfo', + 'HUC8', + ] + + # Filter point_attributes to include only the desired attributes + filtered_attributes = { + attr: value for attr, value in point_attributes.items() if attr in attributes_to_keep + } + + # Add point attributes to each point in gdf_points + for attr, value in filtered_attributes.items(): + gdf_points[attr] = value + + # Initialize an empty GeoDataFrame for buffer polygons with proper CRS + buffer_polygons = gpd.GeoDataFrame(geometry=[], crs=gdf.crs) + + # Read and buffer study limit and levee files if they are provided + for line_shapefile in [study_limit_file, levee_file]: + if line_shapefile: + gdf_lines = gpd.read_file(line_shapefile) + + # Log the types of geometries found + logging.debug(f"Geometries found in {line_shapefile}: {gdf_lines.geometry.type.unique()}") + + # Check if the shapefile contains valid LineString or MultiLineString + if not any(gdf_lines.geometry.type.isin(['LineString', 'MultiLineString'])): + logging.warning(f"No valid LineString or MultiLineString found in {line_shapefile}.") + continue + + # Buffer the lines and concatenate to buffer_polygons + buffered_lines = gdf_lines.buffer(20) # Buffer by 20 meters + if buffered_lines.empty: + logging.warning(f"No valid buffered polygons created from {line_shapefile}.") + continue + + # Create a new GeoDataFrame for buffered lines + gdf_buffered_lines = gpd.GeoDataFrame(geometry=buffered_lines, crs=gdf.crs) + + # Concatenate buffered lines to buffer_polygons + buffer_polygons = pd.concat([buffer_polygons, gdf_buffered_lines], ignore_index=True) + + if buffer_polygons.empty: + logging.info("No buffer polygons created from line shapefiles.") + else: + # Ensure buffer_polygons has valid geometry + buffer_polygons = gpd.GeoDataFrame(geometry=buffer_polygons.geometry, crs=gdf.crs) + + # Remove points within the buffer polygons + if not buffer_polygons.geometry.is_empty.any(): + gdf_points = gdf_points[~gdf_points.geometry.within(buffer_polygons.unary_union)] + + # Export buffer polygons to GeoPackage + # buffer_gpkg_path = os.path.join(output_dir, 'buffer_polygons.gpkg') + # buffer_polygons.to_file(buffer_gpkg_path, driver="GPKG") + # logging.info(f"Buffer polygons saved to {buffer_gpkg_path}") + + # Define the output Parquet file path + # shapefile_filename = os.path.splitext(os.path.basename(shapefile_path))[0] + # parquet_path = os.path.join(output_dir, f"{shapefile_filename}_vertices.parquet") + + # Export points to Parquet + # gdf_points.to_parquet(parquet_path) + # logging.info(f"Points from {shapefile_path} saved to {parquet_path}") + + # Return the list + return gdf_points + + except Exception as e: + logging.error(f"Error processing shapefile {shapefile_path}: {e}") + + +def process_directory(root_dir, output_dir, log_file): + logging.basicConfig( + filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' + ) + logging.info(f"Starting process in directory: {root_dir}") + accumulated_data = [] # List to accumulate nws_lid and magnitude data + # Loop through each first-level directory under root_dir + for huc_dir in reversed(sorted(os.listdir(root_dir))): + print(huc_dir) + first_level_path = os.path.join(root_dir, huc_dir) + if os.path.isdir(first_level_path): + all_points = gpd.GeoDataFrame() # Initialize an empty GeoDataFrame to accumulate points + # Now walk through each subdirectory within the first-level directory + for subdir, dirs, files in os.walk(first_level_path): + for file in files: + if file.endswith('_interpolated_flows.csv'): + interpolated_flows_path = os.path.join(subdir, file) + print(f"Processing {interpolated_flows_path}.") + logging.debug(f"Processing {interpolated_flows_path}.") + + # Find the corresponding _attributes.csv file + attributes_file = file.replace('_interpolated_flows.csv', '_attributes.csv') + attributes_path = os.path.join(subdir, attributes_file) + + if not os.path.exists(attributes_path): + logging.warning( + f"Attributes file {attributes_file} not found for {file} in {subdir}." + ) + else: + max_elevation, path_value, study_limit_file, levee_file, point_attributes = ( + find_max_elevation(interpolated_flows_path, attributes_path) + ) + + if max_elevation is None: + logging.warning( + f"Skipping {interpolated_flows_path} due to no valid max elevation." + ) + continue + if check_max_elevation(attributes_path, max_elevation): + print( + f"Path with max elevation greater than magnitude_elev_navd88: {path_value}" + ) + + # Process the shapefile and collect points + gdf_points = process_shapefile_to_parquet_with_sampling( + path_value, + study_limit_file, + levee_file, + point_attributes, + output_dir, + step=10, + ) + + if gdf_points is not None: + # Check and align CRS before concatenation + if all_points.empty: + all_points = gdf_points + else: + gdf_points = gdf_points.to_crs(all_points.crs) + print('successfully created points for:') + print( + point_attributes['nws_lid'] + ' --> ' + point_attributes['magnitude'] + ) + # Append the collected points to all_points GeoDataFrame + all_points = pd.concat([all_points, gdf_points], ignore_index=True) + # Accumulate nws_lid and magnitude for CSV export + accumulated_data.append( + { + 'nws_lid': point_attributes['nws_lid'], + 'HUC8': point_attributes['HUC8'], + 'magnitude': point_attributes['magnitude'], + } + ) + else: + logging.info( + f"Max elevation in {interpolated_flows_path} is not greater than max magnitude_elev_navd88 in {attributes_file}." + ) + + # Find flood magnitude shapes and process them + print('Checking for flood category data files...') + flood_cat_paths = find_flood_magnitude_shapes( + attributes_path, interpolated_flows_path + ) + if flood_cat_paths: + print('Processing flood category data files...') + for flood_data in flood_cat_paths: + # Check if flood_data is None + if flood_data is None: + logging.error(f"Flood data could not be loaded for {attributes_path}") + continue # Skip processing this shapefile + path_value = flood_data['path'] + study_limit_file = flood_data['study_limit_file'] + levee_file = flood_data['levee_file'] + point_attributes = flood_data[ + 'attributes' + ] # Use the full dictionary as attributes if needed + + # Process the shapefile and collect points + gdf_points = process_shapefile_to_parquet_with_sampling( + path_value, + study_limit_file, + levee_file, + point_attributes, + output_dir, + step=10, + ) + + if gdf_points is not None: + # Check and align CRS before concatenation + if all_points.empty: + all_points = gdf_points + else: + gdf_points = gdf_points.to_crs(all_points.crs) + # Append the collected points to all_points GeoDataFrame + print('successfully created points for:') + print( + point_attributes['nws_lid'] + + ' --> ' + + point_attributes['magnitude'] + ) + all_points = pd.concat([all_points, gdf_points], ignore_index=True) + # Accumulate nws_lid and magnitude for CSV export + accumulated_data.append( + { + 'nws_lid': point_attributes['nws_lid'], + 'HUC8': point_attributes['HUC8'], + 'magnitude': point_attributes['magnitude'], + } + ) + else: + logging.info( + f"Did not find any flood category maps to process in {attributes_file} or could not find corresponding data in {attributes_file}." + ) + + if not all_points.empty: + # Define the output Parquet file path using the 8-digit subdirectory name + parquet_path = os.path.join(output_dir, f"{huc_dir}.parquet") + + # Export the accumulated points to a Parquet file + all_points.to_parquet(parquet_path) + print(parquet_path) + logging.info(f"Points from all files in subdirectory {subdir} saved to {parquet_path}") + + # After processing all directories, group the accumulated data by nws_lid and export to CSV + if accumulated_data: + df_accumulated = pd.DataFrame(accumulated_data) + # Group by nws_lid and concatenate HUC8 and magnitude values + grouped_df = ( + df_accumulated.groupby('nws_lid') + .agg( + { + 'HUC8': lambda x: ', '.join(map(str, x.unique())), + 'magnitude': lambda x: ', '.join(map(str, x)), + } + ) + .reset_index() + ) + + # Define the CSV output path (e.g., in the output_dir) + csv_output_path = os.path.join(output_dir, 'nws_lid_magnitudes_summary.csv') + grouped_df.to_csv(csv_output_path, index=False) + logging.info(f"Summary CSV saved to {csv_output_path}") + + logging.info("Processing complete.") + logging.shutdown() # Ensure all logs are flushed and the log file is properly closed + print('Script completed!!') + + +if __name__ == '__main__': + # Set up argument parsing + parser = argparse.ArgumentParser(description='Process directories for flood data.') + parser.add_argument( + '-i', + '--root_directory_path', + type=str, + help='Path to the root directory containing the HUC directories.', + ) + parser.add_argument( + '-o', + '--output_directory_path', + type=str, + help='Path to the directory where output files will be saved.', + ) + + args = parser.parse_args() + root_directory_path = args.root_directory_path + output_directory_path = args.output_directory_path + log_file = os.path.join(root_directory_path, "processing_log.log") + + # root_directory_path = r'B:\FIM_development\fim_assessment\FOSS\ahps_benchmark\5_5_2024\nws' + # output_directory_path = r'C:\GID\FOSS_FIM\ahps_max_stage_flow_preprocess\calibration_points\5_5_2024' + process_directory(root_directory_path, output_directory_path, log_file) From 48e65499f53c3d276750260d9211105e7180de6f Mon Sep 17 00:00:00 2001 From: RyanSpies-NOAA Date: Tue, 27 Aug 2024 16:25:59 +0000 Subject: [PATCH 2/7] Temp change to use testing calb points --- src/bash_variables.env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bash_variables.env b/src/bash_variables.env index b1318f4e..408fce44 100644 --- a/src/bash_variables.env +++ b/src/bash_variables.env @@ -30,7 +30,7 @@ export input_nwm_lakes=${inputsDir}/nwm_hydrofabric/nwm_l export input_nwm_lakes_Alaska=${inputsDir}/nwm_hydrofabric/nwm_waterbodies_alaska.gpkg export input_WBD_gdb=${inputsDir}/wbd/WBD_National_EPSG_5070_WBDHU8_clip_dem_domain.gpkg export input_WBD_gdb_Alaska=${inputsDir}/wbd/WBD_National_South_Alaska.gpkg -export input_calib_points_dir=${inputsDir}/rating_curve/water_edge_database/calibration_points/ +export input_calib_points_dir=${inputsDir}/rating_curve/water_edge_database/calibration_points_testing/ export bathymetry_file=${inputsDir}/bathymetry/bathymetry_adjustment_data.gpkg export osm_bridges=${inputsDir}/osm/bridges/240426/osm_all_bridges.gpkg From 7b48c55783044741127208b597301026eb69f955 Mon Sep 17 00:00:00 2001 From: RyanSpies-NOAA Date: Fri, 3 Jan 2025 16:24:30 +0000 Subject: [PATCH 3/7] additional fixes and new merge script --- data/nws/ahps_bench_polys_to_calb_pts.py | 352 +++++++++++++++++------ data/nws/merge_nws_usgs_point_parquet.py | 147 ++++++++++ 2 files changed, 411 insertions(+), 88 deletions(-) create mode 100644 data/nws/merge_nws_usgs_point_parquet.py diff --git a/data/nws/ahps_bench_polys_to_calb_pts.py b/data/nws/ahps_bench_polys_to_calb_pts.py index a0525ad3..89ed832d 100644 --- a/data/nws/ahps_bench_polys_to_calb_pts.py +++ b/data/nws/ahps_bench_polys_to_calb_pts.py @@ -1,19 +1,22 @@ import argparse +import gc import logging import os import re +import sys import geopandas as gpd import pandas as pd -def find_max_elevation(interpolated_flows_path, attributes_path): +def find_max_elevation(interpolated_flows_path, attributes_path, df_poly_search, data_source): """ Finds the maximum elevation from a flows CSV file, associates it with attributes, and updates paths. Parameters: - interpolated_flows_path (str): Path to the CSV file containing interpolated flows data. - attributes_path (str): Path to the CSV file containing attributes data. + - data_source: String to identify "nws" or "usgs" data source Returns: - tuple: A tuple containing the maximum elevation, updated path, study limit file path, levee file path, @@ -44,6 +47,25 @@ def find_max_elevation(interpolated_flows_path, attributes_path): for col in columns_to_extract: df_find_max[col] = df_attributes[col].dropna().iloc[0] if col in df_attributes.columns else None + if df_find_max['elevation'] is None: + logging.warning(f"Skipping {interpolated_flows_path} due to no valid max elevation.") + return None, None, None, None, None + + # Skip specific lids due to issues + # tarn7: FIM multipolygons for max extent cause millions of points (ignore) + skip_lids = ['tarn7'] + if df_find_max['nws_lid'] in skip_lids: + logging.warning( + f"Skipping the max stage/flow process - " f"manual override for: {df_find_max['nws_lid']}." + ) + return None, None, None, None, None + + # Filter the dataframe to corresponding lid to check for poly search override + new_search_lids = [df_find_max['nws_lid']] + df_poly_search_subset = df_poly_search[df_poly_search['lid'].isin(new_search_lids)] + if df_poly_search_subset.empty: + df_poly_search_subset = None + # Assign specific values to certain columns df_find_max['magnitude'] = 'maximum' # Set the magnitude to "maximum" @@ -51,7 +73,6 @@ def find_max_elevation(interpolated_flows_path, attributes_path): columns_to_nullify = [ 'magnitude_stage', 'magnitude_elev_navd88', - 'grid_name', 'grid_stage', 'grid_elev_navd88', 'grid_flow_cfs', @@ -60,8 +81,9 @@ def find_max_elevation(interpolated_flows_path, attributes_path): df_find_max[col] = None # Update the path using the new function - updated_path = update_path(df_find_max['path']) + updated_path = update_path_search(df_find_max['path'], df_poly_search_subset, data_source) if updated_path is None: + logging.warning(f"Unexpected issue identifying polygon path: {updated_path}") return None, None, None, None, None # Update the row with the new path @@ -82,7 +104,7 @@ def find_max_elevation(interpolated_flows_path, attributes_path): return None, None, None, None, None -def update_path(original_path): +def update_path_search(original_path, df_poly_search_subset, data_source): """ Updates the given path string by replacing certain patterns and checking if the file exists. @@ -94,26 +116,73 @@ def update_path(original_path): """ try: # Replace "ahps_inundation_libraries" with "nws" - updated_path = original_path.replace("ahps_inundation_libraries", "nws") + if data_source == 'nws': + updated_path = original_path.replace("ahps_inundation_libraries", str(data_source)) + if data_source == 'usgs': + updated_path = original_path.replace("USGS_FIM", 'AHPS' + os.sep + str(data_source)) # Replace "depth_grids" (case-insensitive) with "polygons" updated_path = re.sub(r"depth_grids", "polygons", updated_path, flags=re.IGNORECASE) + # Replace "custom" (case-insensitive) with "polygons" + updated_path = re.sub(r"custom", "polygons", updated_path, flags=re.IGNORECASE) + # Replace "Depth_grid" (case-insensitive) with "Polygons" + updated_path = re.sub(r"Depth_grid", "Polygons", updated_path, flags=re.IGNORECASE) # Replace the .tif with .shp - updated_path = updated_path.replace(".tif", ".shp") + # updated_path = updated_path.replace(".tiff", ".shp").replace(".tif", ".shp") + # Extract the directory path + directory_path = os.path.dirname(updated_path) + # Extract file name without the extension + file_name = os.path.splitext(os.path.basename(updated_path))[0] + # Check if the there is data in df_poly_search_subset (already filtered to lid) + if df_poly_search_subset is not None: + # Check if file_name is in the 'tif_file_search' column of df_poly_search_subset + if file_name in df_poly_search_subset['tif_file_search'].values: + orig_file_name = file_name + # Assign new file name from 'new_poly_search' column corresponding to the 'tif_file_search' match + file_name = df_poly_search_subset.loc[ + df_poly_search_subset['tif_file_search'] == file_name, 'new_poly_search' + ].values[0] + logging.info( + f"Replacing poly name search using provided csv: {orig_file_name} --> {file_name}" + ) + # Add the new extension (.shp) + shp_file_name = f"{file_name}.shp" + # Combine directory path and new file name + updated_path = os.path.join(directory_path, shp_file_name) # Check if the updated path file exists if os.path.exists(updated_path): - logging.debug(f"File exists: {updated_path}") + logging.info(f"File exists: {updated_path}") return updated_path - else: + elif os.path.exists(re.sub(r"_0.shp", ".shp", updated_path, flags=re.IGNORECASE)): updated_path = re.sub(r"_0.shp", ".shp", updated_path, flags=re.IGNORECASE) - if os.path.exists(updated_path): - logging.debug(f"File exists: {updated_path}") - return updated_path + logging.info(f"File exists: {updated_path}") + return updated_path + elif os.path.exists(re.sub(r"elev_", "", updated_path, flags=re.IGNORECASE)): + updated_path = re.sub(r"elev_", "", updated_path, flags=re.IGNORECASE) + logging.info(f"File exists: {updated_path}") + return updated_path + elif os.path.exists(re.sub(r".shp", "_0.shp", updated_path, flags=re.IGNORECASE)): + updated_path = re.sub(r".shp", "_0.shp", updated_path, flags=re.IGNORECASE) + logging.info(f"File exists: {updated_path}") + return updated_path + else: + # combined search with removing both "_0" and "elev_" + updated_path_0 = re.sub(r"_0.shp", ".shp", updated_path, flags=re.IGNORECASE) + updated_path_elev = re.sub(r"elev_", "", updated_path_0, flags=re.IGNORECASE) + if os.path.exists(updated_path_elev): + logging.info(f"File exists: {updated_path_elev}") + return updated_path_elev else: - logging.warning(f"Unexpected - File does not exist: {updated_path}") - return None + shp_file_name = 'elev_' + shp_file_name + updated_path = os.path.join(directory_path, shp_file_name) + if os.path.exists(updated_path): + logging.info(f"File exists: {updated_path}") + return updated_path + else: + logging.warning(f"Unexpected - File does not exist: {updated_path}") + return None except Exception as e: logging.error(f"Error updating path {original_path}: {e}") return None @@ -199,7 +268,7 @@ def check_max_elevation(csv_path, max_elevation): return False -def find_flood_magnitude_shapes(attributes_path, interpolated_flows_path): +def find_flood_magnitude_shapes(attributes_path, interpolated_flows_path, df_poly_search, data_source): """ Finds flood magnitude polygon shapefiles by joining attributes and flows data, and updates paths. @@ -220,12 +289,12 @@ def find_flood_magnitude_shapes(attributes_path, interpolated_flows_path): logging.debug(f"Columns in {attributes_path}: {df_attributes.columns.tolist()}") # Check if the required columns exist - if 'magnitude' not in df_attributes.columns or 'grid_name' not in df_attributes.columns: - logging.error(f"Required columns 'magnitude' or 'grid_name' not found in {attributes_path}.") + if 'magnitude' not in df_attributes.columns or 'grid_stage' not in df_attributes.columns: + logging.error(f"Required columns 'magnitude' or 'grid_stage' not found in {attributes_path}.") return {} - if 'name' not in df_flows.columns: - logging.error(f"Required column 'name' not found in {interpolated_flows_path}.") + if 'stage' not in df_flows.columns: + logging.error(f"Required column 'stage' not found in {interpolated_flows_path}.") return {} # Filter rows with non-null and non-empty 'magnitude' values @@ -233,23 +302,45 @@ def find_flood_magnitude_shapes(attributes_path, interpolated_flows_path): df_attributes['magnitude'].notnull() & (df_attributes['magnitude'].astype(str).str.strip() != '') ] df_valid = df_attributes[ - df_attributes['grid_name'].notnull() & (df_attributes['grid_name'].astype(str).str.strip() != '') + df_attributes['grid_stage'].notnull() + & (df_attributes['grid_stage'].astype(str).str.strip() != '') ] - # Remove all ".tif" extensions from the 'grid_name' column - df_valid['grid_name'] = df_valid['grid_name'].str.replace('.tif', '', regex=False) + # Remove all ".tif" extensions from the 'grid_stage' column + # df_valid['grid_stage'] = df_valid['grid_stage'].str.replace('.tiff', '', regex=False) + # df_valid['grid_stage'] = df_valid['grid_stage'].str.replace('.tif', '', regex=False) + + # Ensure that 'grid_stage' and 'stage' columns are strings in both dataframes + df_valid['grid_stage'] = df_valid['grid_stage'].astype(str) + df_flows['stage'] = df_flows['stage'].astype(str) + + # Check for any NaN or empty values before merging and replace them with empty strings + df_valid['grid_stage'].fillna('', inplace=True) + df_flows['stage'].fillna('', inplace=True) if df_valid.empty: logging.warning(f"No valid entries found in the 'magnitude' column of {attributes_path}.") return {} - # Join df_valid and df_flows using 'grid_name' from df_valid and 'name' from df_flows - df_joined = df_valid.merge(df_flows, left_on='grid_name', right_on='name', how='inner') + # Join df_valid and df_flows using 'grid_stage' from df_valid and 'stage' from df_flows + df_joined = df_valid.merge(df_flows, left_on='grid_stage', right_on='stage', how='inner') + + # After merging, inspect other columns for data type consistency + for col in df_joined.columns: + if df_joined[col].dtype == 'object': + # Convert any object columns to string to avoid mixed type issues + df_joined[col] = df_joined[col].astype(str) if df_joined.empty: logging.warning(f"No matching entries found between attributes and flows data: {attributes_path}") return {} + # Filter the dataframe to corresponding lid to check for poly search override + new_search_lids = df_joined['nws_lid'].tolist() + df_poly_search_subset = df_poly_search[df_poly_search['lid'].isin(new_search_lids)] + if df_poly_search_subset.empty: + df_poly_search_subset = None + # Prepare a list to store the results results = [] @@ -263,9 +354,10 @@ def find_flood_magnitude_shapes(attributes_path, interpolated_flows_path): flood_cat_paths[magnitude] = path # Update the path using the new function - updated_path = update_path(path) + updated_path = update_path_search(path, df_poly_search_subset, data_source) if updated_path is None: - return None, None, None, None, None + logging.warning(f"Unexpected issue finding polygon path: {path}") + continue # Update the row with the new path row['path'] = updated_path @@ -298,12 +390,12 @@ def find_flood_magnitude_shapes(attributes_path, interpolated_flows_path): return [] -def process_shapefile_to_parquet_with_sampling( - shapefile_path, study_limit_file, levee_file, point_attributes, output_dir, step=5 +def process_shapefile_to_points_with_sampling( + shapefile_path, study_limit_file, levee_file, point_attributes, data_source, step=5 ): """ Processes a shapefile to extract edge vertices with sampling, applies buffer operations using other shapefiles, - and converts the output to a Parquet file. + and converts returns the filtered dataframe. Parameters: - shapefile_path (str): Path to the input shapefile containing polygon or multipolygon geometries. @@ -311,7 +403,6 @@ def process_shapefile_to_parquet_with_sampling( - levee_file (str): Path to the levee shapefile, which will also be used for buffering. - point_attributes (dict): Dictionary containing point attributes like 'nws_lid', 'flow', 'magnitude', etc., to be added to each extracted point. - - output_dir (str): Directory where the output files (Parquet and optional buffer polygons) will be saved. - step (int, optional): Sampling step for extracting vertices from polygon edges. Defaults to 5. Returns: @@ -321,7 +412,7 @@ def process_shapefile_to_parquet_with_sampling( The function performs the following steps: 1. Reads the input shapefile using GeoPandas. 2. Logs and checks the types of geometries present in the shapefile. - 3. If the 'nws_lid' is 'tarn7', it dissolves multipolygons into single polygons. + 3. If the 'nws_lid' is dissolve list, it dissolves multipolygons into single polygons. 4. Extracts edge vertices from polygons and multipolygons using a specified sampling step. 5. Creates a GeoDataFrame of the sampled points, transforms its CRS to EPSG:5070, and attaches point attributes. 6. Buffers geometries from the provided study limit and levee shapefiles and removes points within these buffer areas. @@ -332,6 +423,11 @@ def process_shapefile_to_parquet_with_sampling( try: try: gdf = gpd.read_file(shapefile_path) + # Check if the CRS is not set + if gdf.crs is None: + logging.debug(f"CRS is missing for {shapefile_path} --> Setting CRS to EPSG:4326.") + gdf.set_crs(epsg=4326, inplace=True) + except Exception as e: logging.error(f"Failed to read shapefile {shapefile_path}: {e}") return @@ -344,13 +440,22 @@ def process_shapefile_to_parquet_with_sampling( logging.warning(f"No valid polygons found in {shapefile_path}.") return - # Dissolve MultiPolygons into single Polygons if nws_lid is "tarn7" - if point_attributes.get('nws_lid') == 'tarn7': - if 'MultiPolygon' in gdf.geometry.geom_type.unique(): - # Unify all geometries into a single geometry and then break into polygons - unified_geom = gdf.geometry.unary_union - gdf = gpd.GeoDataFrame(geometry=[unified_geom], crs=gdf.crs) - gdf = gdf.explode(index_parts=True) + # Dissolve MultiPolygons into single Polygons if nws_lid is in list below + # Polygon data is binned by depths for these? + dissolve_locations = ['tarn7', 'pgvn7'] + if point_attributes.get('nws_lid') in dissolve_locations: + # if 'MultiPolygon' in gdf.geometry.geom_type.unique(): + print('Dissolving multipolygons...') + # Unify all geometries into a single MultiPolygon + unified_geom = gdf.unary_union + + # Convert the unified geometry back to a GeoDataFrame + gdf = gpd.GeoDataFrame(geometry=[unified_geom], crs=gdf.crs) + + # Break the unified geometry into individual Polygons if necessary + gdf = gdf.explode(index_parts=True, ignore_index=True) + + logging.debug(f"Dissolving multipolygons: {shapefile_path}") # Extract edge vertices with sampling all_points = [] @@ -383,6 +488,9 @@ def process_shapefile_to_parquet_with_sampling( # Convert "flow" from cubic feet to cubic meters point_attributes['flow'] = point_attributes['flow'] * 0.0283168 point_attributes['flow_unit'] = 'cms' # Set the magnitude to "maximum" + point_attributes['submitter'] = data_source # Set the data source attribute + point_attributes['coll_time'] = '2024-09-04' # Set the data source attribute + point_attributes['layer'] = point_attributes['magnitude'] # Set the data source attribute # Rename 'huc' to 'HUC8' if it exists if 'huc' in point_attributes: @@ -392,6 +500,7 @@ def process_shapefile_to_parquet_with_sampling( attributes_to_keep = [ 'nws_lid', 'magnitude', + 'layer', 'path', 'name', 'elevation', @@ -399,6 +508,8 @@ def process_shapefile_to_parquet_with_sampling( 'flow', 'flow_unit', 'flow_source', + 'coll_time', + 'submitter', 'wfo', 'HUC8', ] @@ -470,12 +581,30 @@ def process_shapefile_to_parquet_with_sampling( logging.error(f"Error processing shapefile {shapefile_path}: {e}") -def process_directory(root_dir, output_dir, log_file): +def process_directory(root_dir, data_source, manual_search_override, output_dir): + root_dir = os.path.join(root_dir, data_source) + # Check if the directory exists, and if not, exit with an error code + if not os.path.exists(root_dir) or not os.path.isdir(root_dir): + print(f"Error: Directory {root_dir} does not exist.") + sys.exit(1) + if not os.path.exists(manual_search_override): + print(f"Error: Directory {manual_search_override} does not exist.") + sys.exit(1) + + output_dir = os.path.join(output_dir, data_source) + # Create the directory if it doesn't exist + if not os.path.exists(output_dir): + os.makedirs(output_dir) + log_file = os.path.join(output_dir, "processing_log.log") logging.basicConfig( filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) + + df_poly_search = pd.read_csv(manual_search_override) + logging.info(f"Starting process in directory: {root_dir}") accumulated_data = [] # List to accumulate nws_lid and magnitude data + lid_dict = {} # List to accumulate all available nws_lid # Loop through each first-level directory under root_dir for huc_dir in reversed(sorted(os.listdir(root_dir))): print(huc_dir) @@ -485,13 +614,16 @@ def process_directory(root_dir, output_dir, log_file): # Now walk through each subdirectory within the first-level directory for subdir, dirs, files in os.walk(first_level_path): for file in files: - if file.endswith('_interpolated_flows.csv'): + if file.endswith('_flows.csv'): interpolated_flows_path = os.path.join(subdir, file) print(f"Processing {interpolated_flows_path}.") - logging.debug(f"Processing {interpolated_flows_path}.") + logging.info(f"Processing {interpolated_flows_path}.") # Find the corresponding _attributes.csv file - attributes_file = file.replace('_interpolated_flows.csv', '_attributes.csv') + if data_source == 'nws': + attributes_file = file.replace('_interpolated_flows.csv', '_attributes.csv') + else: + attributes_file = file.replace('_flows.csv', '_attributes.csv') attributes_path = os.path.join(subdir, attributes_file) if not os.path.exists(attributes_path): @@ -499,59 +631,65 @@ def process_directory(root_dir, output_dir, log_file): f"Attributes file {attributes_file} not found for {file} in {subdir}." ) else: + lid = str(file)[:5] + # Add LID and HUC8 to the dictionary + lid_dict[lid] = str(huc_dir) max_elevation, path_value, study_limit_file, levee_file, point_attributes = ( - find_max_elevation(interpolated_flows_path, attributes_path) - ) - - if max_elevation is None: - logging.warning( - f"Skipping {interpolated_flows_path} due to no valid max elevation." - ) - continue - if check_max_elevation(attributes_path, max_elevation): - print( - f"Path with max elevation greater than magnitude_elev_navd88: {path_value}" - ) - - # Process the shapefile and collect points - gdf_points = process_shapefile_to_parquet_with_sampling( - path_value, - study_limit_file, - levee_file, - point_attributes, - output_dir, - step=10, + find_max_elevation( + interpolated_flows_path, attributes_path, df_poly_search, data_source ) + ) - if gdf_points is not None: - # Check and align CRS before concatenation - if all_points.empty: - all_points = gdf_points - else: - gdf_points = gdf_points.to_crs(all_points.crs) - print('successfully created points for:') + if max_elevation is not None: + if check_max_elevation(attributes_path, max_elevation): print( - point_attributes['nws_lid'] + ' --> ' + point_attributes['magnitude'] + f"Path with max elevation greater than magnitude_elev_navd88: {path_value}" ) - # Append the collected points to all_points GeoDataFrame - all_points = pd.concat([all_points, gdf_points], ignore_index=True) - # Accumulate nws_lid and magnitude for CSV export - accumulated_data.append( - { - 'nws_lid': point_attributes['nws_lid'], - 'HUC8': point_attributes['HUC8'], - 'magnitude': point_attributes['magnitude'], - } + + # Process the shapefile and collect points + gdf_points = process_shapefile_to_points_with_sampling( + path_value, + study_limit_file, + levee_file, + point_attributes, + data_source, + step=10, + ) + + if gdf_points is not None: + # Check and align CRS before concatenation + if all_points.empty: + all_points = gdf_points + else: + gdf_points = gdf_points.to_crs(all_points.crs) + print('successfully created points for:') + print( + point_attributes['nws_lid'] + + ' --> ' + + point_attributes['magnitude'] + ) + # Append the collected points to all_points GeoDataFrame + all_points = pd.concat([all_points, gdf_points], ignore_index=True) + # Accumulate nws_lid and magnitude for CSV export + accumulated_data.append( + { + 'nws_lid': point_attributes['nws_lid'], + 'HUC8': point_attributes['HUC8'], + 'magnitude': point_attributes['magnitude'], + } + ) + # Clear GeoDataFrame after each file to free memory + del gdf_points + gc.collect() + else: + logging.info( + f"Max elevation in {interpolated_flows_path} is not greater than max magnitude_elev_navd88 in {attributes_file}." ) - else: - logging.info( - f"Max elevation in {interpolated_flows_path} is not greater than max magnitude_elev_navd88 in {attributes_file}." - ) # Find flood magnitude shapes and process them print('Checking for flood category data files...') flood_cat_paths = find_flood_magnitude_shapes( - attributes_path, interpolated_flows_path + attributes_path, interpolated_flows_path, df_poly_search, data_source ) if flood_cat_paths: print('Processing flood category data files...') @@ -568,12 +706,12 @@ def process_directory(root_dir, output_dir, log_file): ] # Use the full dictionary as attributes if needed # Process the shapefile and collect points - gdf_points = process_shapefile_to_parquet_with_sampling( + gdf_points = process_shapefile_to_points_with_sampling( path_value, study_limit_file, levee_file, point_attributes, - output_dir, + data_source, step=10, ) @@ -599,12 +737,24 @@ def process_directory(root_dir, output_dir, log_file): 'magnitude': point_attributes['magnitude'], } ) + # Clear GeoDataFrame after each file to free memory + del gdf_points + gc.collect() else: logging.info( f"Did not find any flood category maps to process in {attributes_file} or could not find corresponding data in {attributes_file}." ) if not all_points.empty: + # Force columns to consistent data types + # Convert all object columns to string + for col in all_points.select_dtypes(include=['object']).columns: + all_points[col] = all_points[col].astype(str) + + # If there are numeric columns that should be int or float, make sure they are correct + for col in all_points.select_dtypes(include=['int', 'float']).columns: + all_points[col] = pd.to_numeric(all_points[col], errors='coerce') + # Define the output Parquet file path using the 8-digit subdirectory name parquet_path = os.path.join(output_dir, f"{huc_dir}.parquet") @@ -612,10 +762,22 @@ def process_directory(root_dir, output_dir, log_file): all_points.to_parquet(parquet_path) print(parquet_path) logging.info(f"Points from all files in subdirectory {subdir} saved to {parquet_path}") + # Clear GeoDataFrame after each file to free memory + del all_points + gc.collect() # After processing all directories, group the accumulated data by nws_lid and export to CSV if accumulated_data: df_accumulated = pd.DataFrame(accumulated_data) + # Check if any LIDs in lid_dict are missing from grouped_df, and add them + for lid, huc8 in lid_dict.items(): + if lid not in df_accumulated['nws_lid'].values: + # Create a new DataFrame with the missing LID and HUC8 + new_row = pd.DataFrame({'nws_lid': [lid], 'HUC8': [huc8], 'magnitude': ['']}) + + # Concatenate the new row to df_accumulated + df_accumulated = pd.concat([df_accumulated, new_row], ignore_index=True) + # Group by nws_lid and concatenate HUC8 and magnitude values grouped_df = ( df_accumulated.groupby('nws_lid') @@ -647,6 +809,19 @@ def process_directory(root_dir, output_dir, log_file): type=str, help='Path to the root directory containing the HUC directories.', ) + parser.add_argument( + '-s', + '--source', + type=str, + choices=['nws', 'usgs'], # Restrict the options to "nws" or "usgs" + help='Choose to process either "nws" or "usgs"', + ) + parser.add_argument( + '-f', + '--manual_search_override', + type=str, + help='Path to csv file with list of lids and manual search overrides to handle file name discrepancies', + ) parser.add_argument( '-o', '--output_directory_path', @@ -656,9 +831,10 @@ def process_directory(root_dir, output_dir, log_file): args = parser.parse_args() root_directory_path = args.root_directory_path + data_source = args.source + manual_search_override = args.manual_search_override output_directory_path = args.output_directory_path - log_file = os.path.join(root_directory_path, "processing_log.log") # root_directory_path = r'B:\FIM_development\fim_assessment\FOSS\ahps_benchmark\5_5_2024\nws' # output_directory_path = r'C:\GID\FOSS_FIM\ahps_max_stage_flow_preprocess\calibration_points\5_5_2024' - process_directory(root_directory_path, output_directory_path, log_file) + process_directory(root_directory_path, data_source, manual_search_override, output_directory_path) diff --git a/data/nws/merge_nws_usgs_point_parquet.py b/data/nws/merge_nws_usgs_point_parquet.py new file mode 100644 index 00000000..8cf73bb2 --- /dev/null +++ b/data/nws/merge_nws_usgs_point_parquet.py @@ -0,0 +1,147 @@ +import argparse +import os +import shutil + +import geopandas as gpd +import pandas as pd + + +def filter_magnitude(gdf, remove_max): + """ + Remove points with magnitude = 'maximum' if the remove_max flag is True. + """ + if remove_max: + return gdf[gdf['magnitude'] != 'maximum'] + return gdf + + +def combine_parquet_files(file1, file2, output_file, remove_max=False): + """ + Combine two parquet files and write to the output, with an option to remove points where magnitude = 'maximum'. + """ + try: + gdf1 = gpd.read_parquet(file1) + gdf2 = gpd.read_parquet(file2) + + # Optionally filter out points with magnitude = 'maximum' + gdf1 = filter_magnitude(gdf1, remove_max) + gdf2 = filter_magnitude(gdf2, remove_max) + + # Combine the two GeoDataFrames (concatenate rows) + combined_gdf = pd.concat([gdf1, gdf2], ignore_index=True) + + # Write the combined GeoDataFrame to a new parquet file + combined_gdf.to_parquet(output_file) + print(f"Combined file written: {output_file}") + + except Exception as e: + print(f"Error combining {file1} and {file2}: {e}") + + +def copy_file(source_file, destination_file, remove_max=False): + """ + Copy a file to the destination directory, with an option to remove points where magnitude = 'maximum'. + """ + try: + gdf = gpd.read_parquet(source_file) + + # Optionally filter out points with magnitude = 'maximum' + gdf = filter_magnitude(gdf, remove_max) + + # Save the filtered or unfiltered file to the destination + gdf.to_parquet(destination_file) + print(f"Copied file: {source_file} -> {destination_file}") + + except Exception as e: + print(f"Error copying {source_file}: {e}") + + +def process_directories(dir1, dir2, output_dir, remove_max=False): + """ + Process two directories to find matching .parquet files, combine points, and output the result. + """ + # Create the output directory if it doesn't exist + os.makedirs(output_dir, exist_ok=True) + + # Get all parquet files from both directories + dir1_files = {f for f in os.listdir(dir1) if f.endswith('.parquet')} + dir2_files = {f for f in os.listdir(dir2) if f.endswith('.parquet')} + + # Find matching and unmatched files + matching_files = dir1_files.intersection(dir2_files) + dir1_only_files = dir1_files - dir2_files + dir2_only_files = dir2_files - dir1_files + + # Process matching files (combine points) + combine_count = 0 + dir1_count = 0 + dir2_count = 0 + print("Merging duplicate files between dir1 and dir2...") + for filename in matching_files: + print(str(filename)) + file1 = os.path.join(dir1, filename) + file2 = os.path.join(dir2, filename) + output_file = os.path.join(output_dir, filename) + combine_parquet_files(file1, file2, output_file, remove_max) + combine_count += 1 + + print('Copying unique files from dir1 and dir2...') + # Copy unmatched files from dir1 + for filename in dir1_only_files: + source_file = os.path.join(dir1, filename) + destination_file = os.path.join(output_dir, filename) + copy_file(source_file, destination_file, remove_max) + dir1_count += 1 + + # Copy unmatched files from dir2 + for filename in dir2_only_files: + source_file = os.path.join(dir2, filename) + destination_file = os.path.join(output_dir, filename) + copy_file(source_file, destination_file, remove_max) + dir2_count += 1 + + print('Summary of processing:') + print(f'Total combined files: {combine_count}') + print(f'Source A only: {dir1_count}') + print(f'Source B only: {dir2_count}') + if len(os.listdir(output_dir)) != (combine_count + dir1_count + dir2_count): + print('Warning: discrepancy in the expected number of total files in output dir') + else: + print(f'Total files in output: {len(os.listdir(output_dir))}') + + +if __name__ == "__main__": + # Set up argument parsing + parser = argparse.ArgumentParser(description='Process directories for flood data.') + parser.add_argument( + '-a', + '--nws_directory_path', + type=str, + required=True, + help='Path to the directory containing the NWS parquet points.', + ) + parser.add_argument( + '-b', + '--usgs_directory_path', + type=str, + required=True, + help='Path to the directory containing the USGS parquet points.', + ) + parser.add_argument( + '-o', + '--output_directory_path', + type=str, + required=True, + help='Path to the directory where new output files will be saved.', + ) + parser.add_argument('-d', '--delete', action='store_true', help='Remove points with magnitude="maximum".') + + args = parser.parse_args() + nws_directory_path = args.nws_directory_path + usgs_directory_path = args.usgs_directory_path + output_directory_path = args.output_directory_path + + # Process parquet files with the option to remove "maximum" magnitude points + process_directories( + nws_directory_path, usgs_directory_path, output_directory_path, remove_max=args.delete + ) From 70ba4394fe581c4ed676c2ce6f937dc09cc4afe1 Mon Sep 17 00:00:00 2001 From: RyanSpies-NOAA Date: Fri, 10 Jan 2025 15:45:17 +0000 Subject: [PATCH 4/7] Update changelog --- docs/CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 4e2da186..1aee84e5 100755 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,17 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. + +## v4.5.x.x - 2024-01-06 - [PR#1268](https://github.com/NOAA-OWP/inundation-mapping/pull/1268) + +This code preprocesses the partner FIM benchmark HEC-RAS libraries and converts the inundation extent polygons into a edge point database for the input to the HAND SRC calibration/adjustment algorithm. The key changes with the new input data are the addition of the max stage/flow points as well as the removal of the 10m grid point snapping. Note that the raw data to run this code is not available for external users, so the data processing code can only be run internally within OWP. + +### Additions +`data/nws/ahps_bench_polys_to_calb_pts.py`: this script ingests the HEC-RAS partner FIM benchmark data and outputs huc level parquet files containing the water edge points with associated attributes. +`data/nws/merge_nws_usgs_point_parquet.py`: the script combines the `nws` and `usgs` parquet point files created seperately by the `ahps_bench_polys_to_calb_pts.py` script + +

+ ## v4.5.13.1 - 2024-12-13 - [PR#1361](https://github.com/NOAA-OWP/inundation-mapping/pull/1361) This PR was triggered by two dep-bot PR's. One for Tornado, one for aiohttp. Upon further research, these two exist only as dependencies for Jupyter and Jupyterlab which were very out of date. Upgrading Jupyter/JupyterLab took care of the other two. From d4c032075228ff5f9ee8e9e7662ed0834bc42031 Mon Sep 17 00:00:00 2001 From: RyanSpies-NOAA Date: Fri, 24 Jan 2025 16:51:25 +0000 Subject: [PATCH 5/7] Update changelog --- docs/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index d6fae25b..6a31e821 100755 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,7 +2,7 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. -## v4.5.x.x - 2024-01-06 - [PR#1268](https://github.com/NOAA-OWP/inundation-mapping/pull/1268) +## v4.5.x.x - 2025-01-24 - [PR#1268](https://github.com/NOAA-OWP/inundation-mapping/pull/1268) This code preprocesses the partner FIM benchmark HEC-RAS libraries and converts the inundation extent polygons into a edge point database for the input to the HAND SRC calibration/adjustment algorithm. The key changes with the new input data are the addition of the max stage/flow points as well as the removal of the 10m grid point snapping. Note that the raw data to run this code is not available for external users, so the data processing code can only be run internally within OWP. From d120aa71348a8b97e280cf540e1a520daedce2b6 Mon Sep 17 00:00:00 2001 From: RyanSpies-NOAA Date: Fri, 24 Jan 2025 19:00:49 +0000 Subject: [PATCH 6/7] Updated dir name for calb point using date. - Also removed an old variable with path to water edge db (no longer used) --- src/bash_variables.env | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/bash_variables.env b/src/bash_variables.env index d700202e..4acea34b 100644 --- a/src/bash_variables.env +++ b/src/bash_variables.env @@ -30,7 +30,7 @@ export input_nwm_lakes=${inputsDir}/nwm_hydrofabric/nwm_l export input_nwm_lakes_Alaska=${inputsDir}/nwm_hydrofabric/nwm_waterbodies_alaska.gpkg export input_WBD_gdb=${inputsDir}/wbd/WBD_National_EPSG_5070_WBDHU8_clip_dem_domain.gpkg export input_WBD_gdb_Alaska=${inputsDir}/wbd/WBD_National_South_Alaska.gpkg -export input_calib_points_dir=${inputsDir}/rating_curve/water_edge_database/calibration_points_testing/ +export input_calib_points_dir=${inputsDir}/rating_curve/water_edge_database/calibration_points_20250103/ export bathymetry_file=${inputsDir}/bathymetry/bathymetric_adjustment_data.gpkg export osm_bridges=${inputsDir}/osm/bridges/241002/osm_all_bridges.gpkg @@ -51,8 +51,6 @@ export ras2fim_input_dir=${inputsDir}/rating_curve/ras2fim_expo export ras_rating_curve_csv_filename=reformat_ras_rating_curve_table.csv export ras_rating_curve_gpkg_filename=reformat_ras_rating_curve_points.gpkg -export fim_obs_pnt_data=${inputsDir}/rating_curve/water_edge_database/usgs_nws_benchmark_points_cleaned.gpkg - # Input file location with HUC, nwm feature_id and manual calibration coefficients export man_calb_file=${inputsDir}/rating_curve/manual_calibration_coefficients.csv From 5cd935f1bcefe611718bd131e6a2a951d6f5c0f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CRobHanna-NOAA=E2=80=9D?= <“Robert.Hanna@NOAA.gov”> Date: Fri, 24 Jan 2025 20:24:58 +0000 Subject: [PATCH 7/7] merge dev and update changelog --- docs/CHANGELOG.md | 5 ++++- src/bash_variables.env | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 878626d7..ee636eee 100755 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,13 +2,16 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. -## v4.5.x.x - 2025-01-24 - [PR#1268](https://github.com/NOAA-OWP/inundation-mapping/pull/1268) +## v4.5.14.1 - 2025-01-24 - [PR#1268](https://github.com/NOAA-OWP/inundation-mapping/pull/1268) This code preprocesses the partner FIM benchmark HEC-RAS libraries and converts the inundation extent polygons into a edge point database for the input to the HAND SRC calibration/adjustment algorithm. The key changes with the new input data are the addition of the max stage/flow points as well as the removal of the 10m grid point snapping. Note that the raw data to run this code is not available for external users, so the data processing code can only be run internally within OWP. ### Additions `data/nws/ahps_bench_polys_to_calb_pts.py`: this script ingests the HEC-RAS partner FIM benchmark data and outputs huc level parquet files containing the water edge points with associated attributes. `data/nws/merge_nws_usgs_point_parquet.py`: the script combines the `nws` and `usgs` parquet point files created seperately by the `ahps_bench_polys_to_calb_pts.py` script + +

+ ## v4.5.14.0 - 2025-01-24 - [PR#1340](https://github.com/NOAA-OWP/inundation-mapping/pull/1340) This PR focuses on adjusting rating curves by using bathymetric data and optimized channel roughness values. The bathymetry data includes eHydro surveys and AI-based datasets created for all NWM streams. New manning roughness values were developed for each feature-id using a differential evolution objective function (OF). The OF minimizes the number of the false_positives and false_negatives cells in our flood inundation maps where we have test cases across the CONUS. diff --git a/src/bash_variables.env b/src/bash_variables.env index 52918bf3..b2fab072 100644 --- a/src/bash_variables.env +++ b/src/bash_variables.env @@ -31,9 +31,9 @@ export input_nwm_lakes_Alaska=${inputsDir}/nwm_hydrofabric/nwm_ export input_WBD_gdb=${inputsDir}/wbd/WBD_National_EPSG_5070_WBDHU8_clip_dem_domain.gpkg export input_WBD_gdb_Alaska=${inputsDir}/wbd/WBD_National_South_Alaska.gpkg export input_calib_points_dir=${inputsDir}/rating_curve/water_edge_database/calibration_points_20250103/ -export bathy_file_ehydro=${inputsDir}/bathymetry/final_bathymetry_ehydro.gpkg -export bathy_file_aibased=${inputsDir}/bathymetry/ml_outputs_v1.01.parquet -export mannN_file_aibased=${inputsDir}/bathymetry/ml_outputs_v1.01.parquet +export bathy_file_ehydro=${inputsDir}/bathymetry/final_bathymetry_ehydro.gpkg +export bathy_file_aibased=${inputsDir}/bathymetry/ml_outputs_v1.01.parquet +export mannN_file_aibased=${inputsDir}/bathymetry/ml_outputs_v1.01.parquet export osm_bridges=${inputsDir}/osm/bridges/241002/osm_all_bridges.gpkg # input file location with nwm feature_id and recurrence flow values