Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1pt] PR: Replace fiona with pyogrio #1077

Merged
merged 10 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ ipython = "==8.12.0"
[packages]
certifi = "==2023.7.22"
fiona = "==1.8.22"
geopandas = "==0.12.2"
geopandas = "==0.14.3"
numba = "==0.56.4"
numpy = "==1.23.5"
pandas = "==2.0.2"
Expand Down Expand Up @@ -43,8 +43,9 @@ flake8-pyproject = "==1.2.3"
pre-commit = "==3.3.3"
isort = "==5.12.0"
urllib3 = "==1.26.18"
pyflwdir = "*"
pyflwdir = "==0.5.8"
pillow = "==10.2.0"
pyogrio = "==0.7.2"

[requires]
python_version = "3.10"
455 changes: 245 additions & 210 deletions Pipfile.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions data/bathymetry/preprocess_bathymetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def preprocessing_ehydro(tif, bathy_bounds, survey_gdb, output, min_depth_thresh
bathy_gdal = gdal_array.OpenArray(bathy_m)

# Read in shapefiles
bathy_bounds = gpd.read_file(survey_gdb, layer=bathy_bounds)
bathy_bounds = gpd.read_file(survey_gdb, layer=bathy_bounds, engine="pyogrio", use_arrow=True)
nwm_streams = gpd.read_file("/data/inputs/nwm_hydrofabric/nwm_flows.gpkg", mask=bathy_bounds)
nwm_catchments = gpd.read_file("/data/inputs/nwm_hydrofabric/nwm_catchments.gpkg", mask=bathy_bounds)
bathy_bounds = bathy_bounds.to_crs(nwm_streams.crs)
Expand Down Expand Up @@ -125,7 +125,7 @@ def preprocessing_ehydro(tif, bathy_bounds, survey_gdb, output, min_depth_thresh
bathy_nwm_streams = bathy_nwm_streams.to_crs(epsg=5070)
if os.path.exists(output):
print(f"{output} already exists. Concatinating now...")
existing_bathy_file = gpd.read_file(output)
existing_bathy_file = gpd.read_file(output, engine="pyogrio", use_arrow=True)
bathy_nwm_streams = pd.concat([existing_bathy_file, bathy_nwm_streams])
bathy_nwm_streams.to_file(output, index=False)
print(f"Added {num_streams} new NWM features")
Expand Down
3 changes: 3 additions & 0 deletions data/ble/ble_benchmark/create_flow_forecast_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import pandas as pd


gpd.options.io_engine = "pyogrio"


def create_flow_forecast_file(
huc,
ble_geodatabase,
Expand Down
3 changes: 3 additions & 0 deletions data/esri.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from tqdm import tqdm


gpd.options.io_engine = "pyogrio"


class ESRI_REST(object):
"""
This class was built for querying ESRI REST endpoints for the purpose of downloading datasets.
Expand Down
3 changes: 3 additions & 0 deletions data/nld/levee_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from utils.shared_variables import DEFAULT_FIM_PROJECTION_CRS


gpd.options.io_engine = "pyogrio"


epsg_code = re.search(r'\d+$', DEFAULT_FIM_PROJECTION_CRS).group()
today = datetime.now().strftime('%y%m%d')
nld_vector_output = os.path.join(INPUTS_DIR, 'nld_vectors', f'System_Routes_NLDFS_5070_{today}.gpkg')
Expand Down
3 changes: 3 additions & 0 deletions data/usgs/acquire_and_preprocess_3dep_dems.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from utils.shared_functions import FIM_Helpers as fh


gpd.options.io_engine = "pyogrio"


'''
TODO:
- Add input args for resolution size, which means URL and block size also hve to be parameterized.
Expand Down
4 changes: 2 additions & 2 deletions data/wbd/clip_vectors_to_wbd.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def subset_vector_layers(
with rio.open(dem_filename) as dem_raster:
dem_cellsize = max(dem_raster.res)

wbd = gpd.read_file(wbd_filename)
dem_domain = gpd.read_file(dem_domain)
wbd = gpd.read_file(wbd_filename, engine="pyogrio", use_arrow=True)
dem_domain = gpd.read_file(dem_domain, engine="pyogrio", use_arrow=True)

# Get wbd buffer
print(f"Create wbd buffer for {hucCode}", flush=True)
Expand Down
3 changes: 3 additions & 0 deletions data/wbd/preprocess_wbd.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from utils.shared_variables import DEFAULT_FIM_PROJECTION_CRS


gpd.options.io_engine = "pyogrio"


def clip_wbd_to_dem_domain(dem: str, wbd_in: str, wbd_out: str, huc_level: int):
"""
Clips Watershed Boundary Dataset (WBD) to DEM domain
Expand Down
3 changes: 3 additions & 0 deletions data/write_parquet_from_calib_pts.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from dotenv import load_dotenv


gpd.options.io_engine = "pyogrio"


######################################################################################################
# #
# Overview: #
Expand Down
18 changes: 18 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
All notable changes to this project will be documented in this file.
We follow the [Semantic Versioning 2.0.0](http://semver.org/) format.

## v4.4.x.x - 2024-02-13 - [PR#1077](https://github.com/NOAA-OWP/inundation-mapping/pull/1077)

Replace `fiona` with `pyogrio` to improve I/O speed. `geopandas` will use `pyogrio` by default starting with version 1.0. `pyarrow` was also added as an environment variable to further speedup I/O. As a result of the changes in this PR, `fim_pipeline.sh` runs approximately 10% faster.

### Changes

- `Pipfile`: Upgraded `geopandas` from v0.12.2 to v0.14.3, added `pyogrio`, and fixed version of `pyflwdir`.
- `src/bash_variables.env`: Added environment variable for `pyogrio` to use `pyarrow`
- To all of the following files: Added `pyogrio` and `pyarrow`
- `data/`
- `bathymetry/preprocess_bathymetry.py`, `ble/ble_benchmark/create_flow_forecast_file.py`, `esri.py`, `nld/levee_download.py`, `usgs/acquire_and_preprocess_3dep_dems.py`, `wbd/clip_vectors_to_wbd.py`, `wbd/preprocess_wbd.py`, `write_parquet_from_calib_pts.py`
- `src/`
- `add_crosswalk.py`, `associate_levelpaths_with_levees.py`, `bathy_rc_adjust.py`, `bathymetric_adjustment.py`, `buffer_stream_branches.py`, `build_stream_traversal.py`, `crosswalk_nwm_demDerived.py`, `derive_headwaters.py`, `derive_level_paths.py`, `edit_points.py`, `filter_catchments_and_add_attributes.py`, `finalize_srcs.py`, `make_stages_and_catchlist.py`, `mask_dem.py`, `reachID_grid_to_vector_points.py`, `split_flows.py`, `src_adjust_spatial_obs.py`, `stream_branches.py`, `subset_catch_list_by_branch_id.py`, `usgs_gage_crosswalk.py`, `usgs_gage_unit_setup.py`, `utils/shared_functions.py`
- `tools/`
- `adjust_rc_with_feedback.py`, `check_deep_flooding.py`, `create_flow_forecast_file.py`, `eval_plots.py`, `evaluate_continuity.py`, `evaluate_crosswalk.py`, `fimr_to_benchmark.py`, `find_max_catchment_breadth.py`, `generate_categorical_fim.py`, `generate_categorical_fim_flows.py`, `generate_categorical_fim_mapping.py`, `generate_nws_lid.py`, `hash_compare.py`, `inundate_events.py`, `inundation.py`, `make_boxes_from_bounds.py`, `mosaic_inundation.py`, `overlapping_inundation.py`, `rating_curve_comparison.py`, `rating_curve_get_usgs_curves.py`, `test_case_by_hydro_id.py`, `tools_shared_functions.py`

<br/><br/>

## v4.4.10.0 - 2024-02-02 - [PR#1054](https://github.com/NOAA-OWP/inundation-mapping/pull/1054)

Recent testing exposed a bug with the `acquire_and_preprocess_3dep_dems.py` script. It lost the ability to be re-run and look for files that were unsuccessful earlier attempts and try them again. It may have been lost due to confusion of the word "retry". Now "retry" means restart the entire run. A new flag called "repair" has been added meaning fix what failed earlier. This is a key feature it is common for communication failures when calling USGS to download DEMs. And with some runs taking many hours, this feature becomes important.
Expand Down
10 changes: 5 additions & 5 deletions src/add_crosswalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ def add_crosswalk(
min_stream_length,
calibration_mode=False,
):
input_catchments = gpd.read_file(input_catchments_fileName)
input_flows = gpd.read_file(input_flows_fileName)
input_huc = gpd.read_file(input_huc_fileName)
input_nwmflows = gpd.read_file(input_nwmflows_fileName)
input_catchments = gpd.read_file(input_catchments_fileName, engine="pyogrio", use_arrow=True)
input_flows = gpd.read_file(input_flows_fileName, engine="pyogrio", use_arrow=True)
input_huc = gpd.read_file(input_huc_fileName, engine="pyogrio", use_arrow=True)
input_nwmflows = gpd.read_file(input_nwmflows_fileName, engine="pyogrio", use_arrow=True)
min_catchment_area = float(min_catchment_area) # 0.25#
min_stream_length = float(min_stream_length) # 0.5#

Expand Down Expand Up @@ -87,7 +87,7 @@ def add_crosswalk(

elif (extent == 'MS') | (extent == 'GMS'):
## crosswalk using stream segment midpoint method
input_nwmcat = gpd.read_file(input_nwmcat_fileName, mask=input_huc)
input_nwmcat = gpd.read_file(input_nwmcat_fileName, mask=input_huc, engine="fiona")

# only reduce nwm catchments to mainstems if running mainstems
if extent == 'MS':
Expand Down
3 changes: 3 additions & 0 deletions src/associate_levelpaths_with_levees.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import pandas as pd


gpd.options.io_engine = "pyogrio"


def associate_levelpaths_with_levees(
levees_filename: str,
levee_id_attribute: str,
Expand Down
3 changes: 3 additions & 0 deletions src/bash_variables.env
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ export fim_obs_pnt_data=${inputsDir}/rating_curve/water_edge_da
# Input file location with HUC, nwm feature_id and manual calibration coefficients
export man_calb_file=${inputsDir}/rating_curve/manual_calibration_coefficients.csv

# Use pyarrow
export PYOGRIO_USE_ARROW=1


# Styling
export startDiv="\n-----------------------------------------------------------------\n"
Expand Down
3 changes: 3 additions & 0 deletions src/bathy_rc_adjust.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import pandas as pd


gpd.options.io_engine = "pyogrio"


sa_ratio_flag = float(environ['surf_area_thalweg_ratio_flag']) # 10x
thal_stg_limit = float(environ['thalweg_stg_search_max_limit']) # 3m
bankful_xs_ratio_flag = float(environ['bankful_xs_area_ratio_flag']) # 10x
Expand Down
8 changes: 5 additions & 3 deletions src/bathymetric_adjustment.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose):

# Load wbd and use it as a mask to pull the bathymetry data
fim_huc_dir = join(fim_dir, huc)
wbd8_clp = gpd.read_file(join(fim_huc_dir, 'wbd8_clp.gpkg'))
wbd8_clp = gpd.read_file(join(fim_huc_dir, 'wbd8_clp.gpkg'), engine="pyogrio", use_arrow=True)
bathy_data = gpd.read_file(bathy_file, mask=wbd8_clp)
bathy_data = bathy_data.rename(columns={'ID': 'feature_id'})

Expand Down Expand Up @@ -171,9 +171,11 @@ def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, numb
# NOTE: This block can be removed if we have estimated bathymetry data for
# the whole domain later.
fim_hucs = [h for h in os.listdir(fim_dir) if re.match(r'\d{8}', h)]
bathy_gdf = gpd.read_file(bathy_file)
bathy_gdf = gpd.read_file(bathy_file, engine="pyogrio", use_arrow=True)
buffered_bathy = bathy_gdf.geometry.buffer(wbd_buffer) # We buffer the bathymetric data to get adjacent
wbd = gpd.read_file(wbd, mask=buffered_bathy) # HUCs that could also have bathymetric reaches included
wbd = gpd.read_file(
wbd, mask=buffered_bathy, engine="fiona"
) # HUCs that could also have bathymetric reaches included
hucs_with_bathy = wbd.HUC8.to_list()
hucs = [h for h in fim_hucs if h in hucs_with_bathy]
log_file.write(f"Identified {len(hucs)} HUCs that have bathymetric data: {hucs}\n")
Expand Down
3 changes: 3 additions & 0 deletions src/buffer_stream_branches.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from stream_branches import StreamBranchPolygons, StreamNetwork


gpd.options.io_engine = "pyogrio"


def buffer_stream_branches(
streams_file: str,
branch_id_attribute: str,
Expand Down
3 changes: 3 additions & 0 deletions src/build_stream_traversal.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import geopandas as gpd


gpd.options.io_engine = "pyogrio"


'''
Description:
This tool creates unique IDs for each segment and builds the To_Node, From_Node, and NextDownID
Expand Down
3 changes: 3 additions & 0 deletions src/crosswalk_nwm_demDerived.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from utils.shared_variables import FIM_ID


gpd.options.io_engine = "pyogrio"


def Crosswalk_nwm_demDerived(
nwm_streams,
demDerived,
Expand Down
3 changes: 3 additions & 0 deletions src/derive_headwaters.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from utils.shared_functions import getDriver


gpd.options.io_engine = "pyogrio"


def findHeadWaterPoints(flows):
flows = flows.explode(index_parts=True)
headwater_points = []
Expand Down
3 changes: 3 additions & 0 deletions src/derive_level_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from utils.fim_enums import FIM_exit_codes


gpd.options.io_engine = "pyogrio"


def Derive_level_paths(
in_stream_network,
wbd,
Expand Down
3 changes: 3 additions & 0 deletions src/edit_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import geopandas as gpd


gpd.options.io_engine = "pyogrio"


def Edit_points(
stream_reaches,
branch_id_attribute,
Expand Down
3 changes: 3 additions & 0 deletions src/filter_catchments_and_add_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from utils.shared_variables import FIM_ID


gpd.options.io_engine = "pyogrio"


def filter_catchments_and_add_attributes(
input_catchments_filename,
input_flows_filename,
Expand Down
5 changes: 2 additions & 3 deletions src/finalize_srcs.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#!/usr/bin/env python3

import argparse
import json

import geopandas as gpd
import pandas as pd
from numpy import unique

from utils.shared_functions import getDriver

gpd.options.io_engine = "pyogrio"


def finalize_srcs(srcbase, srcfull, hydrotable, output_srcfull=None, output_hydrotable=None):
Expand Down
4 changes: 3 additions & 1 deletion src/make_stages_and_catchlist.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#!/usr/bin/env python3

import argparse
import sys

import geopandas as gpd
import numpy as np


gpd.options.io_engine = "pyogrio"


def make_stages_and_catchlist(
flows_filename,
catchments_filename,
Expand Down
3 changes: 3 additions & 0 deletions src/mask_dem.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from rasterio.mask import mask


gpd.options.io_engine = "pyogrio"


def mask_dem(
dem_filename: str,
nld_filename: str,
Expand Down
3 changes: 3 additions & 0 deletions src/reachID_grid_to_vector_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from utils.shared_variables import PREP_PROJECTION


gpd.options.io_engine = "pyogrio"


def convert_grid_cells_to_points(raster, index_option, output_points_filename=False):
# Input raster
if isinstance(raster, str):
Expand Down
3 changes: 3 additions & 0 deletions src/split_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
from utils.shared_variables import FIM_ID


gpd.options.io_engine = "pyogrio"


def split_flows(
flows_filename,
dem_filename,
Expand Down
3 changes: 3 additions & 0 deletions src/src_adjust_spatial_obs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
)


gpd.options.io_engine = "pyogrio"


# Import variables from .env file
load_dotenv('/foss_fim/src/bash_variables.env')
outputsDir = os.getenv("outputsDir")
Expand Down
3 changes: 3 additions & 0 deletions src/src_roughness_optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from utils.shared_variables import DOWNSTREAM_THRESHOLD, ROUGHNESS_MAX_THRESH, ROUGHNESS_MIN_THRESH


gpd.options.io_engine = "pyogrio"


def update_rating_curve(
fim_directory,
water_edge_median_df,
Expand Down
4 changes: 3 additions & 1 deletion src/stream_branches.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
from utils.shared_variables import PREP_CRS


class StreamNetwork(gpd.GeoDataFrame):
gpd.options.io_engine = "pyogrio"


class StreamNetwork(gpd.GeoDataFrame):
"""
Notes:
- Many of the methods support two attributes called branch_id_attribute and values_excluded.
Expand Down
3 changes: 3 additions & 0 deletions src/subset_catch_list_by_branch_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from stream_branches import StreamNetwork


gpd.options.io_engine = "pyogrio"


def Subset_catch_list(
catch_list, stream_network, branch_id_attribute, branch_id_list=None, out_catch_list=None, verbose=False
):
Expand Down
2 changes: 2 additions & 0 deletions src/usgs_gage_crosswalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import rasterio


gpd.options.io_engine = "pyogrio"

warnings.simplefilter("ignore")


Expand Down
Loading
Loading