Skip to content

Commit

Permalink
included buffer option to spatially filter stations using the basin. …
Browse files Browse the repository at this point in the history
…This sShould prevent loading too many stations in case of datasets with many stations
  • Loading branch information
shartgring committed Jan 10, 2025
1 parent 26875c4 commit 44baaa7
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 68 deletions.
89 changes: 60 additions & 29 deletions hydromt_wflow/wflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2721,6 +2721,7 @@ def setup_precip_from_point_timeseries(
precip_fn: Union[str, pd.Series, pd.DataFrame, xr.Dataset],
interp_type: str = "nearest",
precip_stations_fn: Union[str, gpd.GeoDataFrame] = None,
buffer: int = 100,
metpy_kwargs: dict = None,
**kwargs,
) -> None:
Expand Down Expand Up @@ -2767,7 +2768,11 @@ def setup_precip_from_point_timeseries(
precip_stations_fn : str, gpd.GeoDataFrame
Source for the locations of the stations as points: (x, y) or (lat, lon).
metpy_kwargs: dict
buffer: int, optional
Number of cells to use around the basins as a buffer to determine \
which stations to include. Set to 100 cells by default.
metpy_kwargs: dict, optional
Keyword arguments to pass to `metpy.interpolate.interpolate_to_grid`.
**kwargs
Expand All @@ -2788,16 +2793,56 @@ def setup_precip_from_point_timeseries(
freq = pd.to_timedelta(self.get_config("timestepsecs"), unit="s")
mask = self.grid[self._MAPS["basins"]].values > 0

# Use buffer around basins if provided, else limit to stations covered by basin
geom_buffered = self.basins.buffer(
buffer * max(np.abs(self.res))
) if buffer else self.basins

# Use basin centroid as 'station' for uniform case
if interp_type == "uniform":
precip_stations_fn = gpd.GeoDataFrame(
data=None,
geometry=[self.basins.unary_union.centroid],
index=["_station"],
)

# Load the stations and their coordinates
if (
isinstance(precip_stations_fn, gpd.GeoDataFrame)
or isfile(precip_stations_fn)
or precip_stations_fn in self.data_catalog
):
gdf_stations = self.data_catalog.get_geodataframe(
precip_stations_fn,
geom=geom_buffered,
assert_gtype="Point",
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
# Use station ids from gdf_stations when reading the DataFrame
_station_columns = gdf_stations.index
logger.info(f"""
Found {len(_station_columns)} stations in {precip_stations_fn} \
using a buffer of {buffer} cells.
""")
elif precip_stations_fn is None:
_station_columns = None
else:
raise ValueError(f"Data source {precip_stations_fn} not recognized.")

# Check data type of precip_fn if it is provided through the data catalog
if isinstance(precip_fn, str) and precip_fn in self.data_catalog:
_data_type = self.data_catalog[precip_fn].data_type
else:
_data_type = None

# Load precipitation data as DataFrame
# This requires that station data is provided (precip_stations_fn)
# Convert Series to DataFrame with dummy station name "_station"
if isinstance(precip_fn, pd.Series):
precip_fn = precip_fn.to_frame()
precip_fn.columns = ["_station"]

# Load precipitation data as DataFrame
# This requires that station data is provided (precip_stations_fn)
if (
isinstance(precip_fn, pd.DataFrame)
or isfile(precip_fn)
Expand All @@ -2806,6 +2851,7 @@ def setup_precip_from_point_timeseries(
df_precip = self.data_catalog.get_dataframe(
precip_fn,
time_tuple=(starttime, endtime),
variables=_station_columns,
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
Expand All @@ -2816,27 +2862,28 @@ def setup_precip_from_point_timeseries(
station locations are provided separately through precip_station_fn.
"""
)

# Use model centroid as station for uniform precipitation
# and nearest-neighbour with the centroid as single station
# For uniform: switch to nearest-neighbour which works with single station
if interp_type == "uniform":
if df_precip.shape[1] != 1:
raise ValueError(
f"""
Data source ({precip_fn}) should contain
a single timeseries, not {df_precip.shape[1]}."""
)
precip_stations_fn = gpd.GeoDataFrame(
data=None,
geometry=[self.basins.unary_union.centroid],
index=["_station"],
)
interp_type = "nearest"

# For other methods: raise error when a DataFrame is loaded without stations
elif precip_stations_fn is None:
raise ValueError(
"""
Using a DataFrame as precipitation source requires that
station locations are provided separately through precip_station_fn.
"""
)

# Load precip as GeoDataset, which does not require precip_stations_fn
elif isinstance(precip_fn, xr.Dataset) or _data_type == "GeoDataset":
# TODO:
# da_precip = self.data_catalog.get_geodataset(...)
# da_precip = self.data_catalog.get_geodataset(...) using geom_buffered
# df_precip = pd.DataFrame(...)
# precip_stations_fn = gpd.GeoDataFrame(...)
raise NotImplementedError("GeoDataset source not yet implented.")
Expand All @@ -2845,22 +2892,6 @@ def setup_precip_from_point_timeseries(

df_precip = df_precip.astype("float32")

# Load the stations and their coordinates
# or pass the GeoDataframe obtained from precip_fn
if (
isinstance(precip_stations_fn, gpd.GeoDataFrame)
or isfile(precip_stations_fn)
or precip_stations_fn in self.data_catalog
):
gdf_stations = self.data_catalog.get_geodataframe(
precip_stations_fn,
assert_gtype="Point",
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
else:
raise ValueError(f"Data source {precip_stations_fn} not recognized.")

# Align precip timeseries and available stations
logger.info(
f"""
Expand Down
76 changes: 37 additions & 39 deletions tests/test_model_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,45 +742,43 @@ def test_setup_floodplains_2d(elevtn_map, example_wflow_model, floodplain1d_test
def test_setup_precip_from_point_timeseries(
example_wflow_model, df_precip_stations, gdf_precip_stations
):
# # Interpolation types and the mean value to check the test
# # first value is for all 8 stations, second for 3 stations inside Piave
# # Note: start_time and end_time of model are used to slice timeseries
# interp_types = {
# "nearest": [445, 433],
# "linear": [458, 417],
# "cubic": [441, 417],
# "rbf": [440, 424],
# "barnes": [447, 434],
# # TODO natural neighbor is not working yet in test and code
# # TODO cressman gives weird results (uniform)
# }
# gdf_precip_stations_inside = gdf_precip_stations[
# gdf_precip_stations.within(example_wflow_model.basins.unary_union)
# ]

# for interp_type, test_val in interp_types.items():
# example_wflow_model.setup_precip_from_point_timeseries(
# precip_fn=df_precip_stations,
# precip_stations_fn=gdf_precip_stations,
# interp_type=interp_type,
# )
# # Check forcing and dtype
# assert "precip" in example_wflow_model.forcing
# assert example_wflow_model.forcing["precip"].dtype == "float32"

# # Compare computed value with expected value using all stations
# mean_all = example_wflow_model.forcing["precip"].mean().values
# assert int(mean_all * 1000) == test_val[0]

# # Do the some but only for the 3 stations inside the basin
# assert len(gdf_precip_stations_inside) == 3
# example_wflow_model.setup_precip_from_point_timeseries(
# precip_fn=df_precip_stations,
# precip_stations_fn=gdf_precip_stations_inside,
# interp_type=interp_type,
# )
# mean_inside = example_wflow_model.forcing["precip"].mean().values
# assert int(mean_inside * 1000) == test_val[1]
# Interpolation types and the mean value to check the test
# first value is for all 8 stations, second for 3 stations inside Piave
# Note: start_time and end_time of model are used to slice timeseries
interp_types = {
"nearest": [445, 433],
"linear": [458, 417],
"cubic": [441, 417],
"rbf": [440, 424],
"barnes": [447, 434],
# TODO natural neighbor is not working yet in test and code
# TODO cressman gives weird results (uniform)
}

for interp_type, test_val in interp_types.items():
example_wflow_model.setup_precip_from_point_timeseries(
precip_fn=df_precip_stations,
precip_stations_fn=gdf_precip_stations,
interp_type=interp_type,
buffer=100,
)
# Check forcing and dtype
assert "precip" in example_wflow_model.forcing
assert example_wflow_model.forcing["precip"].dtype == "float32"

# Compare computed value with expected value using all stations
mean_all = example_wflow_model.forcing["precip"].mean().values
assert int(mean_all * 1000) == test_val[0]

# Do the some but only for the 3 stations inside the basin (buffer = 0)
example_wflow_model.setup_precip_from_point_timeseries(
precip_fn=df_precip_stations,
precip_stations_fn=gdf_precip_stations,
interp_type=interp_type,
buffer=0,
)
mean_inside = example_wflow_model.forcing["precip"].mean().values
assert int(mean_inside * 1000) == test_val[1]

# Also include a test for uniform precipitation
example_wflow_model.setup_precip_from_point_timeseries(
Expand Down

0 comments on commit 44baaa7

Please sign in to comment.