Skip to content

Commit

Permalink
fix(sat-etl): Simpler pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Dec 24, 2024
1 parent 28e1c60 commit 8f8bf23
Showing 1 changed file with 31 additions and 22 deletions.
53 changes: 31 additions & 22 deletions containers/sat/download_process_sat.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import traceback
from multiprocessing import Pool, cpu_count
from typing import Literal
from collections.abc import Iterator

import dask.delayed
import dask.distributed
import dask.diagnostics
import eumdac
import eumdac.cli
import eumdac.product
import numpy as np
import pandas as pd
import pyproj
Expand Down Expand Up @@ -168,11 +170,15 @@ def get_products_iterator(
start: dt.datetime,
end: dt.datetime,
token: eumdac.AccessToken,
) -> Iterator[eumdac.Product]:
) -> Iterator[eumdac.product.Product]:
"""Get an iterator over the products for a given satellite in a given time range.
Checks that the number of products returned matches the expected number of products.
"""
log.info(
f"Searching for products between {start!s} and {end!s} "
f"for {sat_config.product_id} ",
)
expected_products_count = int((end - start) / pd.Timedelta(sat_config.cadence))
datastore = eumdac.DataStore(token)
collection = datastore.get_collection(sat_config.product_id)
Expand All @@ -189,7 +195,7 @@ def get_products_iterator(


def download_nat(
product: eumdac.Product,
product: eumdac.product.Product,
folder: pathlib.Path,
retries: int = 6,
) -> pathlib.Path | None:
Expand Down Expand Up @@ -248,10 +254,10 @@ def process_nat(
# The reader is the same for each satellite as the sensor is the same
# * Hence "seviri" in all cases
try:
scene = Scene(filenames={"seviri_l1b_native": [f]})
scene = Scene(filenames={"seviri_l1b_native": [path.as_posix()]})
scene.load([c.variable for c in CHANNELS[dstype]])
except Exception as e:
raise OSError(f"Error reading '{path}' as satpy Scene: {e}") from e
raise OSError(f"Error reading '{path!s}' as satpy Scene: {e}") from e

try:
da: xr.DataArray = _convert_scene_to_dataarray(
Expand Down Expand Up @@ -299,7 +305,10 @@ def write_to_zarr(
"x_geostationary": -1,
"y_geostationary": -1,
"variable": 1,
}).to_zarr(
}).to_dataset(
name="data",
promote_attrs=True,
).to_zarr(
store=zarr_path,
compute=False,
consolidated=True,
Expand Down Expand Up @@ -527,44 +536,44 @@ def _convert_scene_to_dataarray(
if attr not in ["area", "_satpy_id"]:
data_attrs[new_name] = scene[channel].attrs[attr].__repr__()

dataset: xr.Dataset = scene.to_xarray_dataset()
dataarray = dataset.to_array()
ds: xr.Dataset = scene.to_xarray_dataset()
da = ds.to_array()

# Lat and Lon are the same for all the channels now
if calculate_osgb:
lon, lat = scene[band].attrs["area"].get_lonlats()
osgb_x, osgb_y = transformer.transform(lat, lon)
# Assign x_osgb and y_osgb and set some attributes
dataarray = dataarray.assign_coords(
da = da.assign_coords(
x_osgb=(("y", "x"), np.float32(osgb_x)),
y_osgb=(("y", "x"), np.float32(osgb_y)),
)
for name in ["x_osgb", "y_osgb"]:
dataarray[name].attrs = {
da[name].attrs = {
"units": "meter",
"coordinate_reference_system": "OSGB",
}

dataarray.x_osgb.attrs["name"] = "Easting"
dataarray.y_osgb.attrs["name"] = "Northing"
da.x_osgb.attrs["name"] = "Easting"
da.y_osgb.attrs["name"] = "Northing"

for name in ["x", "y"]:
dataarray[name].attrs["coordinate_reference_system"] = "geostationary"
da[name].attrs["coordinate_reference_system"] = "geostationary"
log.debug("Calculated OSGB")
# Round to the nearest 5 minutes
data_attrs["end_time"] = pd.Timestamp(dataarray.attrs["end_time"]).round("5 min").__str__()
dataarray.attrs = data_attrs
data_attrs["end_time"] = pd.Timestamp(da.attrs["end_time"]).round("5 min").__str__()
da.attrs = data_attrs

# Rename x and y to make clear the coordinate system they are in
dataarray = dataarray.rename({"x": "x_geostationary", "y": "y_geostationary"})
if "time" not in dataarray.dims:
time = pd.to_datetime(pd.Timestamp(dataarray.attrs["end_time"]).round("5 min"))
dataarray = dataarray.assign_coords({"time": time}).expand_dims("time")
da = da.rename({"x": "x_geostationary", "y": "y_geostationary"})
if "time" not in da.dims:
time = pd.to_datetime(pd.Timestamp(da.attrs["end_time"]).round("5 min"))
da = da.assign_coords({"time": time}).expand_dims("time")

del dataarray["crs"]
del da["crs"]
del scene
log.debug("Finished conversion")
return dataarray
return da


def _rescale(da: xr.DataArray, channels: list[Channel]) -> xr.DataArray:
Expand Down Expand Up @@ -594,8 +603,8 @@ def _rescale(da: xr.DataArray, channels: list[Channel]) -> xr.DataArray:
da -= [c.minimum for c in channels]
da /= [c.maximum - c.minimum for c in channels]
# Since the mins and maxes are approximations, clip the values to [0, 1]
da = dataarray.clip(min=0, max=1)
da = dataarray.astype(np.float32)
da = da.clip(min=0, max=1)
da = da.astype(np.float32)
return da


Expand Down

0 comments on commit 8f8bf23

Please sign in to comment.