diff --git a/README.md b/README.md index 7675fff..4a4f43b 100644 --- a/README.md +++ b/README.md @@ -105,9 +105,12 @@ The following code applies the geometric terrain correction to the VV polarizati ```python >>> import sarsen ->>> gtc = sarsen.terrain_correction( +>>> product = sarsen.Sentinel1SarProduct( ... "tests/data/S1B_IW_GRDH_1SDV_20211223T051122_20211223T051147_030148_039993_5371.SAFE", ... measurement_group="IW/VV", +... ) +>>> gtc = sarsen.terrain_correction( +... product, ... dem_urlpath="tests/data/Rome-30m-DEM.tif", ... ) @@ -117,8 +120,7 @@ The radiometric correction can be activated using the key `correct_radiometry`: ```python >>> rtc = sarsen.terrain_correction( -... "tests/data/S1B_IW_GRDH_1SDV_20211223T051122_20211223T051147_030148_039993_5371.SAFE", -... measurement_group="IW/VV", +... product, ... dem_urlpath="tests/data/Rome-30m-DEM.tif", ... correct_radiometry="gamma_nearest" ... ) diff --git a/sarsen/__init__.py b/sarsen/__init__.py index b775f60..22afd98 100644 --- a/sarsen/__init__.py +++ b/sarsen/__init__.py @@ -21,5 +21,7 @@ __version__ = "999" from .apps import terrain_correction +from .datamodel import SarProduct +from .sentinel1 import Sentinel1SarProduct -__all__ = ["__version__", "terrain_correction"] +__all__ = ["__version__", "terrain_correction", "SarProduct", "Sentinel1SarProduct"] diff --git a/sarsen/__main__.py b/sarsen/__main__.py index 1380238..efc963b 100644 --- a/sarsen/__main__.py +++ b/sarsen/__main__.py @@ -4,7 +4,7 @@ import typer -from . import apps +from . import apps, sentinel1 app = typer.Typer() @@ -14,7 +14,7 @@ def info( product_urlpath: str, ) -> None: """Print information about the Sentinel-1 product.""" - product_info = apps.product_info(product_urlpath) + product_info = sentinel1.product_info(product_urlpath) for key, value in product_info.items(): print(f"{key}: {value}") @@ -33,9 +33,12 @@ def gtc( client_kwargs = json.loads(client_kwargs_json) real_chunks = chunks if chunks > 0 else None logging.basicConfig(level=logging.INFO) - apps.terrain_correction( + product = sentinel1.Sentinel1SarProduct( product_urlpath, measurement_group, + ) + apps.terrain_correction( + product, dem_urlpath, output_urlpath=output_urlpath, enable_dask_distributed=enable_dask_distributed, @@ -59,9 +62,12 @@ def stc( client_kwargs = json.loads(client_kwargs_json) real_chunks = chunks if chunks > 0 else None logging.basicConfig(level=logging.INFO) - apps.terrain_correction( + product = sentinel1.Sentinel1SarProduct( product_urlpath, measurement_group, + ) + apps.terrain_correction( + product, dem_urlpath, correct_radiometry="gamma_bilinear", simulated_urlpath=simulated_urlpath, @@ -87,9 +93,12 @@ def rtc( client_kwargs = json.loads(client_kwargs_json) real_chunks = chunks if chunks > 0 else None logging.basicConfig(level=logging.INFO) - apps.terrain_correction( + product = sentinel1.Sentinel1SarProduct( product_urlpath, measurement_group, + ) + apps.terrain_correction( + product, dem_urlpath, correct_radiometry="gamma_bilinear", output_urlpath=output_urlpath, diff --git a/sarsen/apps.py b/sarsen/apps.py index c982021..86b1e8b 100644 --- a/sarsen/apps.py +++ b/sarsen/apps.py @@ -1,129 +1,22 @@ -from __future__ import annotations - -import functools import logging -from typing import Any, Dict, Optional, Tuple, TypeVar, Union +from typing import Any, Callable, Dict, Optional, Tuple from unittest import mock -import attrs import numpy as np import rioxarray import xarray as xr -import xarray_sentinel -from . import chunking, geocoding, orbit, radiometry, scene +from . import chunking, datamodel, geocoding, orbit, radiometry, scene logger = logging.getLogger(__name__) -T_SarProduct = TypeVar("T_SarProduct", bound="SarProduct") - - -@attrs.define(slots=False) -class SarProduct: - product_urlpath: str - measurement_group: str - measurement_chunks: int = 2048 - kwargs: Dict[str, Any] = {} - - @functools.cached_property - def measurement(self) -> xr.Dataset: - ds, self.kwargs = open_dataset_autodetect( - self.product_urlpath, - group=self.measurement_group, - chunks=self.measurement_chunks, - **self.kwargs, - ) - return ds - - @functools.cached_property - def orbit(self) -> xr.Dataset: - ds, self.kwargs = open_dataset_autodetect( - self.product_urlpath, group=f"{self.measurement_group}/orbit", **self.kwargs - ) - return ds - - @functools.cached_property - def calibration(self) -> xr.Dataset: - ds, self.kwargs = open_dataset_autodetect( - self.product_urlpath, - group=f"{self.measurement_group}/calibration", - **self.kwargs, - ) - return ds - - @functools.cached_property - def coordinate_conversion(self) -> xr.Dataset: - ds, self.kwargs = open_dataset_autodetect( - self.product_urlpath, - group=f"{self.measurement_group}/coordinate_conversion", - **self.kwargs, - ) - return ds - - -def open_dataset_autodetect( - product_urlpath: str, - group: Optional[str] = None, - chunks: Optional[Union[int, Dict[str, int]]] = None, - **kwargs: Any, -) -> Tuple[xr.Dataset, Dict[str, Any]]: - kwargs.setdefault("engine", "sentinel-1") - try: - ds = xr.open_dataset(product_urlpath, group=group, chunks=chunks, **kwargs) - except FileNotFoundError: - # re-try with Planetary Computer option - kwargs[ - "override_product_files" - ] = "{dirname}/{prefix}{swath}-{polarization}{ext}" - ds = xr.open_dataset(product_urlpath, group=group, chunks=chunks, **kwargs) - return ds, kwargs - - -def product_info( - product_urlpath: str, - **kwargs: Any, -) -> Dict[str, Any]: - """Get information about the Sentinel-1 product.""" - root_ds = xr.open_dataset( - product_urlpath, engine="sentinel-1", check_files_exist=True, **kwargs - ) - - measurement_groups = [g for g in root_ds.attrs["subgroups"] if g.count("/") == 1] - - gcp_group = measurement_groups[0] + "/gcp" - - gcp, kwargs = open_dataset_autodetect(product_urlpath, group=gcp_group, **kwargs) - - bbox = [ - gcp.attrs["geospatial_lon_min"], - gcp.attrs["geospatial_lat_min"], - gcp.attrs["geospatial_lon_max"], - gcp.attrs["geospatial_lat_max"], - ] - - product_attrs = [ - "product_type", - "mode", - "swaths", - "transmitter_receiver_polarisations", - ] - product_info = {attr_name: root_ds.attrs[attr_name] for attr_name in product_attrs} - product_info.update( - { - "measurement_groups": measurement_groups, - "geospatial_bounds": gcp.attrs["geospatial_bounds"], - "geospatial_bbox": bbox, - } - ) - - return product_info - - def simulate_acquisition( dem_ecef: xr.DataArray, position_ecef: xr.DataArray, - coordinate_conversion: Optional[xr.Dataset] = None, + slant_range_time_to_ground_range: Callable[ + [xr.DataArray, xr.DataArray], xr.DataArray + ], correct_radiometry: Optional[str] = None, ) -> xr.Dataset: """Compute the image coordinates of the DEM given the satellite orbit.""" @@ -139,13 +32,12 @@ def simulate_acquisition( acquisition["slant_range_time"] = slant_range_time - if coordinate_conversion is not None: - ground_range = xarray_sentinel.slant_range_time_to_ground_range( - acquisition.azimuth_time, - slant_range_time, - coordinate_conversion, - ) - acquisition["ground_range"] = ground_range.drop_vars("azimuth_time") + maybe_ground_range = slant_range_time_to_ground_range( + acquisition.azimuth_time, + slant_range_time, + ) + if maybe_ground_range is not None: + acquisition["ground_range"] = maybe_ground_range.drop_vars("azimuth_time") if correct_radiometry is not None: gamma_area = radiometry.compute_gamma_area( dem_ecef, acquisition.dem_distance / slant_range @@ -157,22 +49,8 @@ def simulate_acquisition( return acquisition -def calibrate_measurement( - measurement_ds: xr.Dataset, beta_nought_lut: xr.DataArray -) -> xr.DataArray: - measurement = measurement_ds.measurement - if measurement.attrs["product_type"] == "SLC" and measurement.attrs["mode"] == "IW": - measurement = xarray_sentinel.mosaic_slc_iw(measurement) - - beta_nought = xarray_sentinel.calibrate_intensity(measurement, beta_nought_lut) - beta_nought = beta_nought.drop_vars(["pixel", "line"]) - - return beta_nought - - def terrain_correction( - product_urlpath: str, - measurement_group: str, + product: datamodel.SarProduct, dem_urlpath: str, output_urlpath: Optional[str] = "GTC.tif", simulated_urlpath: Optional[str] = None, @@ -181,17 +59,14 @@ def terrain_correction( grouping_area_factor: Tuple[float, float] = (3.0, 3.0), open_dem_raster_kwargs: Dict[str, Any] = {}, chunks: Optional[int] = 1024, - measurement_chunks: int = 1024, radiometry_chunks: int = 2048, radiometry_bound: int = 128, enable_dask_distributed: bool = False, client_kwargs: Dict[str, Any] = {"processes": False}, - **kwargs: Any, ) -> xr.DataArray: """Apply the terrain-correction to sentinel-1 SLC and GRD products. - :param product_urlpath: input product path or url - :param measurement_group: group of the measurement to be used, for example: "IW/VV" + :param product: SarProduct instance representing the input data :param dem_urlpath: dem path or url :param orbit_group: overrides the orbit group name :param calibration_group: overrides the calibration group name @@ -214,7 +89,6 @@ def terrain_correction( Be aware that `grouping_area_factor` too high may degrade the final result :param open_dem_raster_kwargs: additional keyword arguments passed on to ``xarray.open_dataset`` to open the `dem_urlpath` - :param kwargs: additional keyword arguments passed on to ``xarray.open_dataset`` to open the `product_urlpath` """ # rioxarray must be imported explicitly or accesses to `.rio` may fail in dask assert rioxarray.__version__ @@ -244,15 +118,11 @@ def terrain_correction( dem_urlpath, chunks=chunks, **open_dem_raster_kwargs ) - logger.info(f"open data product {product_urlpath!r}") - - product = SarProduct( - product_urlpath, measurement_group, measurement_chunks, **kwargs - ) - product_type = product.measurement.attrs["product_type"] allowed_product_types = ["GRD", "SLC"] - if product_type not in allowed_product_types: - raise ValueError(f"{product_type=}. Must be one of: {allowed_product_types}") + if product.product_type not in allowed_product_types: + raise ValueError( + f"{product.product_type=}. Must be one of: {allowed_product_types}" + ) logger.info("pre-process DEM") @@ -270,30 +140,26 @@ def terrain_correction( "azimuth_time": template_raster.astype("datetime64[ns]"), } ) - acquisition_kwargs = { - "position_ecef": product.orbit.position, - "correct_radiometry": correct_radiometry, - } - if product_type == "GRD": + if product.product_type == "GRD": acquisition_template["ground_range"] = template_raster - acquisition_kwargs["coordinate_conversion"] = product.coordinate_conversion if correct_radiometry is not None: acquisition_template["gamma_area"] = template_raster acquisition = xr.map_blocks( simulate_acquisition, dem_ecef, - kwargs=acquisition_kwargs, + kwargs={ + "position_ecef": product.state_vectors(), + "slant_range_time_to_ground_range": product.slant_range_time_to_ground_range, + "correct_radiometry": correct_radiometry, + }, template=acquisition_template, ) if correct_radiometry is not None: logger.info("simulate radiometry") - grid_parameters = radiometry.azimuth_slant_range_grid( - product.measurement.attrs, - grouping_area_factor, - ) + grid_parameters = product.grid_parameters(grouping_area_factor) if correct_radiometry == "gamma_bilinear": gamma_weights = radiometry.gamma_weights_bilinear @@ -339,13 +205,11 @@ def terrain_correction( logger.info("calibrate image") - beta_nought = calibrate_measurement( - product.measurement, product.calibration.betaNought - ) + beta_nought = product.beta_nought() logger.info("terrain-correct image") - if product_type == "GRD": + if product.product_type == "GRD": interp_kwargs = {"ground_range": acquisition.ground_range} else: interp_kwargs = {"slant_range_time": acquisition.slant_range_time} diff --git a/sarsen/datamodel.py b/sarsen/datamodel.py new file mode 100644 index 0000000..72fb99d --- /dev/null +++ b/sarsen/datamodel.py @@ -0,0 +1,34 @@ +import abc +from typing import Any, Dict, Optional, Tuple + +import xarray as xr + + +class SarProduct(abc.ABC): + def __init__(self, *args: Any, **kwargs: Any) -> None: + pass + + @property + @abc.abstractmethod + def product_type(self) -> str: + ... + + @abc.abstractmethod + def beta_nought(self) -> xr.DataArray: + ... + + @abc.abstractmethod + def state_vectors(self) -> xr.DataArray: + ... + + def slant_range_time_to_ground_range( + self, azimuth_time: xr.DataArray, slant_range_time: xr.DataArray + ) -> Optional[xr.DataArray]: + return None + + # FIXME: design a better interface + def grid_parameters( + self, + grouping_area_factor: Tuple[float, float] = (3.0, 3.0), + ) -> Dict[str, Any]: + ... diff --git a/sarsen/radiometry.py b/sarsen/radiometry.py index b99fc2b..802fb0f 100644 --- a/sarsen/radiometry.py +++ b/sarsen/radiometry.py @@ -1,12 +1,12 @@ import logging -from typing import Any, Dict, Hashable, Optional, Tuple +from typing import Optional, Tuple from unittest import mock import flox.xarray import numpy as np import xarray as xr -from . import geocoding, scene +from . import scene logger = logging.getLogger(__name__) @@ -65,8 +65,8 @@ def gamma_weights_bilinear( azimuth_time0: np.datetime64, slant_range_time_interval_s: float, azimuth_time_interval_s: float, - slant_range_spacing_m: float = 1, - azimuth_spacing_m: float = 1, + slant_range_spacing_m: float = 1.0, + azimuth_spacing_m: float = 1.0, ) -> xr.DataArray: # compute dem image coordinates azimuth_index = ((dem_coords.azimuth_time - azimuth_time0) / ONE_SECOND) / ( @@ -134,8 +134,8 @@ def gamma_weights_nearest( azimuth_time0: np.datetime64, slant_range_time_interval_s: float, azimuth_time_interval_s: float, - slant_range_spacing_m: float = 1, - azimuth_spacing_m: float = 1, + slant_range_spacing_m: float = 1.0, + azimuth_spacing_m: float = 1.0, ) -> xr.DataArray: # compute dem image coordinates azimuth_index = np.round( @@ -156,33 +156,3 @@ def gamma_weights_nearest( normalized_area = tot_area / (azimuth_spacing_m * slant_range_spacing_m) return normalized_area - - -def azimuth_slant_range_grid( - attrs: Dict[Hashable, Any], - grouping_area_factor: Tuple[float, float] = (3.0, 3.0), -) -> Dict[str, Any]: - - if attrs["product_type"] == "SLC": - slant_range_spacing_m = ( - attrs["range_pixel_spacing"] - * np.sin(attrs["incidence_angle_mid_swath"]) - * grouping_area_factor[1] - ) - else: - slant_range_spacing_m = attrs["range_pixel_spacing"] * grouping_area_factor[1] - - slant_range_time_interval_s = ( - slant_range_spacing_m * 2 / geocoding.SPEED_OF_LIGHT # ignore type - ) - - grid_parameters: Dict[str, Any] = { - "slant_range_time0": attrs["image_slant_range_time"], - "slant_range_time_interval_s": slant_range_time_interval_s, - "slant_range_spacing_m": slant_range_spacing_m, - "azimuth_time0": np.datetime64(attrs["product_first_line_utc_time"]), - "azimuth_time_interval_s": attrs["azimuth_time_interval"] - * grouping_area_factor[0], - "azimuth_spacing_m": attrs["azimuth_pixel_spacing"] * grouping_area_factor[0], - } - return grid_parameters diff --git a/sarsen/sentinel1.py b/sarsen/sentinel1.py new file mode 100644 index 0000000..1d51132 --- /dev/null +++ b/sarsen/sentinel1.py @@ -0,0 +1,216 @@ +from __future__ import annotations + +from typing import Any, Dict, Optional, Tuple, Union + +import attrs +import numpy as np +import xarray as xr +import xarray_sentinel + +from . import datamodel, geocoding + +try: + import dask # noqa: F401 + + DEFAULT_MEASUREMENT_CHUNKS: Optional[int] = 2048 +except ModuleNotFoundError: + DEFAULT_MEASUREMENT_CHUNKS = None + + +def open_dataset_autodetect( + product_urlpath: str, + group: Optional[str] = None, + chunks: Optional[Union[int, Dict[str, int]]] = None, + **kwargs: Any, +) -> Tuple[xr.Dataset, Dict[str, Any]]: + kwargs.setdefault("engine", "sentinel-1") + try: + ds = xr.open_dataset(product_urlpath, group=group, chunks=chunks, **kwargs) + except FileNotFoundError: + # re-try with Planetary Computer option + kwargs[ + "override_product_files" + ] = "{dirname}/{prefix}{swath}-{polarization}{ext}" + ds = xr.open_dataset(product_urlpath, group=group, chunks=chunks, **kwargs) + return ds, kwargs + + +def calibrate_measurement( + measurement: xr.DataArray, beta_nought_lut: xr.DataArray +) -> xr.DataArray: + if measurement.attrs["product_type"] == "SLC" and measurement.attrs["mode"] == "IW": + measurement = xarray_sentinel.mosaic_slc_iw(measurement) + + beta_nought = xarray_sentinel.calibrate_intensity(measurement, beta_nought_lut) + beta_nought = beta_nought.drop_vars(["pixel", "line"]) + + return beta_nought + + +def azimuth_slant_range_grid( + attrs: Dict[str, Any], + grouping_area_factor: Tuple[float, float] = (3.0, 3.0), +) -> Dict[str, Any]: + + if attrs["product_type"] == "SLC": + slant_range_spacing_m = ( + attrs["range_pixel_spacing"] + * np.sin(attrs["incidence_angle_mid_swath"]) + * grouping_area_factor[1] + ) + else: + slant_range_spacing_m = attrs["range_pixel_spacing"] * grouping_area_factor[1] + + slant_range_time_interval_s = ( + slant_range_spacing_m * 2 / geocoding.SPEED_OF_LIGHT # ignore type + ) + + grid_parameters: Dict[str, Any] = { + "slant_range_time0": attrs["image_slant_range_time"], + "slant_range_time_interval_s": slant_range_time_interval_s, + "slant_range_spacing_m": slant_range_spacing_m, + "azimuth_time0": np.datetime64(attrs["product_first_line_utc_time"]), + "azimuth_time_interval_s": attrs["azimuth_time_interval"] + * grouping_area_factor[0], + "azimuth_spacing_m": attrs["azimuth_pixel_spacing"] * grouping_area_factor[0], + } + return grid_parameters + + +@attrs.define(slots=False) +class Sentinel1SarProduct(datamodel.SarProduct): + product_urlpath: str + measurement_group: str + measurement_chunks: Optional[int] = DEFAULT_MEASUREMENT_CHUNKS + kwargs: Dict[str, Any] = {} + + @property + def measurement(self) -> xr.Dataset: + ds, self.kwargs = open_dataset_autodetect( + self.product_urlpath, + group=self.measurement_group, + chunks=self.measurement_chunks, + **self.kwargs, + ) + return ds + + @property + def orbit(self) -> xr.Dataset: + ds, self.kwargs = open_dataset_autodetect( + self.product_urlpath, group=f"{self.measurement_group}/orbit", **self.kwargs + ) + return ds + + @property + def calibration(self) -> xr.Dataset: + ds, self.kwargs = open_dataset_autodetect( + self.product_urlpath, + group=f"{self.measurement_group}/calibration", + **self.kwargs, + ) + return ds + + @property + def coordinate_conversion(self) -> Optional[xr.Dataset]: + ds = None + if self.product_type == "GRD": + ds, self.kwargs = open_dataset_autodetect( + self.product_urlpath, + group=f"{self.measurement_group}/coordinate_conversion", + **self.kwargs, + ) + return ds + + @property + def azimuth_fm_rate(self) -> Optional[xr.Dataset]: + ds = None + if self.product_type == "SLC": + ds, self.kwargs = open_dataset_autodetect( + self.product_urlpath, + group=f"{self.measurement_group}/azimuth_fm_rate", + **self.kwargs, + ) + return ds + + @property + def dc_estimate(self) -> Optional[xr.Dataset]: + ds = None + if self.product_type == "SLC": + ds, self.kwargs = open_dataset_autodetect( + self.product_urlpath, + group=f"{self.measurement_group}/dc_estimate", + **self.kwargs, + ) + return ds + + # SarProduct interaface + + @property + def product_type(self) -> Any: + prod_type = self.measurement.attrs["product_type"] + assert isinstance(prod_type, str) + return prod_type + + def beta_nought(self) -> xr.DataArray: + measurement = self.measurement.data_vars["measurement"] + return calibrate_measurement(measurement, self.calibration.betaNought) + + def state_vectors(self) -> xr.DataArray: + return self.orbit.data_vars["position"] + + def slant_range_time_to_ground_range( + self, azimuth_time: xr.DataArray, slant_range_time: xr.DataArray + ) -> Optional[xr.DataArray]: + ds = None + coordinate_conversion = self.coordinate_conversion + if coordinate_conversion is not None: + ds = xarray_sentinel.slant_range_time_to_ground_range( + azimuth_time, slant_range_time, coordinate_conversion + ) + return ds + + def grid_parameters( + self, + grouping_area_factor: Tuple[float, float] = (3.0, 3.0), + ) -> Dict[str, Any]: + return azimuth_slant_range_grid(self.measurement.attrs, grouping_area_factor) + + +def product_info( + product_urlpath: str, + **kwargs: Any, +) -> Dict[str, Any]: + """Get information about the Sentinel-1 product.""" + root_ds = xr.open_dataset( + product_urlpath, engine="sentinel-1", check_files_exist=True, **kwargs + ) + + measurement_groups = [g for g in root_ds.attrs["subgroups"] if g.count("/") == 1] + + gcp_group = measurement_groups[0] + "/gcp" + + gcp, kwargs = open_dataset_autodetect(product_urlpath, group=gcp_group, **kwargs) + + bbox = [ + gcp.attrs["geospatial_lon_min"], + gcp.attrs["geospatial_lat_min"], + gcp.attrs["geospatial_lon_max"], + gcp.attrs["geospatial_lat_max"], + ] + + product_attrs = [ + "product_type", + "mode", + "swaths", + "transmitter_receiver_polarisations", + ] + product_info = {attr_name: root_ds.attrs[attr_name] for attr_name in product_attrs} + product_info.update( + { + "measurement_groups": measurement_groups, + "geospatial_bounds": gcp.attrs["geospatial_bounds"], + "geospatial_bbox": bbox, + } + ) + + return product_info diff --git a/tests/test_10_datamodel.py b/tests/test_10_datamodel.py new file mode 100644 index 0000000..726a16b --- /dev/null +++ b/tests/test_10_datamodel.py @@ -0,0 +1,8 @@ +import pytest + +from sarsen import datamodel + + +def test_SarProduct() -> None: + with pytest.raises(TypeError): + datamodel.SarProduct() # type: ignore diff --git a/tests/test_20_sentinel1.py b/tests/test_20_sentinel1.py new file mode 100644 index 0000000..a49b02d --- /dev/null +++ b/tests/test_20_sentinel1.py @@ -0,0 +1,54 @@ +import pathlib + +import numpy as np +import pytest +import xarray as xr + +from sarsen import sentinel1 + +DATA_FOLDER = pathlib.Path(__file__).parent / "data" + +DATA_PATHS = [ + DATA_FOLDER + / "S1B_IW_GRDH_1SDV_20211223T051122_20211223T051147_030148_039993_5371.SAFE", + DATA_FOLDER + / "S1A_IW_SLC__1SDV_20220104T170557_20220104T170624_041314_04E951_F1F1.SAFE", +] + +GROUPS = ["IW/VV", "IW1/VV"] + + +@pytest.mark.parametrize("data_path,group", zip(DATA_PATHS, GROUPS)) +def test_Sentinel1SarProduct(data_path: str, group: str) -> None: + res = sentinel1.Sentinel1SarProduct(data_path, group) + + assert isinstance(res.measurement, xr.Dataset) + assert isinstance(res.orbit, xr.Dataset) + assert isinstance(res.calibration, xr.Dataset) + + if res.product_type == "GRD": + assert isinstance(res.coordinate_conversion, xr.Dataset) + assert res.azimuth_fm_rate is None + assert res.dc_estimate is None + else: + assert res.coordinate_conversion is None + assert isinstance(res.azimuth_fm_rate, xr.Dataset) + assert isinstance(res.dc_estimate, xr.Dataset) + + assert res.product_type in {"SLC", "GRD"} + assert isinstance(res.beta_nought(), xr.DataArray) + assert isinstance(res.state_vectors(), xr.DataArray) + + +def test_product_info() -> None: + expected_geospatial_bbox = [ + 11.86800305333565, + 40.87886713841886, + 15.32209672548896, + 42.78115380313222, + ] + + res = sentinel1.product_info(str(DATA_PATHS[0])) + + assert "product_type" in res + assert np.allclose(res["geospatial_bbox"], expected_geospatial_bbox) diff --git a/tests/test_50_apps.py b/tests/test_50_apps.py index 028d4d8..f396599 100644 --- a/tests/test_50_apps.py +++ b/tests/test_50_apps.py @@ -1,12 +1,11 @@ import os import pathlib -import numpy as np import py import pytest import xarray as xr -from sarsen import apps +from sarsen import apps, sentinel1 DATA_FOLDER = pathlib.Path(__file__).parent / "data" @@ -22,20 +21,6 @@ DEM_RASTER = DATA_FOLDER / "Rome-30m-DEM.tif" -def test_product_info() -> None: - expected_geospatial_bbox = [ - 11.86800305333565, - 40.87886713841886, - 15.32209672548896, - 42.78115380313222, - ] - - res = apps.product_info(str(DATA_PATHS[0])) - - assert "product_type" in res - assert np.allclose(res["geospatial_bbox"], expected_geospatial_bbox) - - @pytest.mark.parametrize("data_path,group", zip(DATA_PATHS, GROUPS)) @pytest.mark.skipif(os.getenv("GITHUB_ACTIONS") == "true", reason="too much memory") def test_terrain_correction_gtc( @@ -44,9 +29,13 @@ def test_terrain_correction_gtc( group: str, ) -> None: out = str(tmpdir.join("GTC.tif")) - res = apps.terrain_correction( + product = sentinel1.Sentinel1SarProduct( str(data_path), group, + ) + + res = apps.terrain_correction( + product, str(DEM_RASTER), output_urlpath=out, ) @@ -60,10 +49,13 @@ def test_terrain_correction_fast_rtc( tmpdir: py.path.local, data_path: pathlib.Path, group: str ) -> None: out = str(tmpdir.join("RTC.tif")) - - res = apps.terrain_correction( + product = sentinel1.Sentinel1SarProduct( str(data_path), group, + ) + + res = apps.terrain_correction( + product, str(DEM_RASTER), correct_radiometry="gamma_nearest", output_urlpath=out, @@ -78,10 +70,13 @@ def test_terrain_correction_rtc( tmpdir: py.path.local, data_path: pathlib.Path, group: str ) -> None: out = str(tmpdir.join("RTC.tif")) - - res = apps.terrain_correction( + product = sentinel1.Sentinel1SarProduct( str(data_path), group, + ) + + res = apps.terrain_correction( + product, str(DEM_RASTER), correct_radiometry="gamma_bilinear", output_urlpath=out, @@ -96,9 +91,13 @@ def test_terrain_correction_gtc_dask( tmpdir: py.path.local, data_path: pathlib.Path, group: str ) -> None: out = str(tmpdir.join("GTC.tif")) - res = apps.terrain_correction( + product = sentinel1.Sentinel1SarProduct( str(data_path), group, + ) + + res = apps.terrain_correction( + product, str(DEM_RASTER), output_urlpath=out, chunks=1024, @@ -113,10 +112,13 @@ def test_terrain_correction_fast_rtc_dask( tmpdir: py.path.local, data_path: pathlib.Path, group: str ) -> None: out = str(tmpdir.join("RTC.tif")) - - res = apps.terrain_correction( + product = sentinel1.Sentinel1SarProduct( str(data_path), group, + ) + + res = apps.terrain_correction( + product, str(DEM_RASTER), correct_radiometry="gamma_nearest", output_urlpath=out, @@ -132,10 +134,13 @@ def test_terrain_correction_rtc_dask( tmpdir: py.path.local, data_path: pathlib.Path, group: str ) -> None: out = str(tmpdir.join("RTC.tif")) - - res = apps.terrain_correction( + product = sentinel1.Sentinel1SarProduct( str(data_path), group, + ) + + res = apps.terrain_correction( + product, str(DEM_RASTER), correct_radiometry="gamma_bilinear", output_urlpath=out,