Skip to content

Commit

Permalink
fix(sat-etl): Only process native files in date range (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc authored Oct 1, 2024
1 parent 55a0923 commit e716072
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions containers/sat/download_process_sat.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,14 @@ def download_scans(

return files

def _fname_to_scantime(fname: str) -> dt.datetime:
"""Converts a filename to a datetime.
Files are of the form:
`MSG2-SEVI-MSG15-0100-NA-20230910221240.874000000Z-NA.nat`
So determine the time from the first element split by '.'.
"""
return dt.datetime.strptime(fname.split(".")[0][-14:], "%Y%m%d%H%M%S")

def process_scans(
sat_config: Config,
Expand Down Expand Up @@ -256,15 +264,16 @@ def process_scans(

# Get native files in order
native_files: list[pathlib.Path] = list(folder.glob("*.nat"))
log.info(f"Found {len(native_files)} native files at {folder.as_posix()}")
native_files.sort()
wanted_files = [f for f in native_files if start <= _fname_to_scantime(f.name).date() < end]
log.info(f"Found {len(wanted_files)} native files within date range at {folder.as_posix()}")

# Convert native files to xarray datasets
# * Append to the monthly zarr in hourly chunks
datasets: list[xr.Dataset] = []
i: int
f: pathlib.Path
for i, f in enumerate(native_files):
for i, f in enumerate(wanted_files):
try:
# TODO: This method of passing the zarr times to the open function leaves a lot to be desired
# Firstly, if the times are not passed in sorted order then the created 12-dataset chunks
Expand Down Expand Up @@ -301,7 +310,7 @@ def process_scans(
)
datasets = []

log.info(f"Process loop [{dstype}]: {i+1}/{len(native_files)}")
log.info(f"Process loop [{dstype}]: {i+1}/{len(wanted_files)}")

# Consolidate zarr metadata
if pathlib.Path(zarr_path).exists():
Expand Down Expand Up @@ -635,7 +644,10 @@ def run(args: argparse.Namespace) -> None:
# Estimate average runtime
secs_per_scan: int = 90
expected_runtime = pd.Timedelta(secs_per_scan * len(scan_times), "seconds")
log.info(f"Downloading {len(scan_times)} scans. Expected runtime: {expected_runtime!s}")
log.info(
f"Downloading {len(scan_times)} scans ({start} - {end}). "
f"Expected runtime: {expected_runtime!s}"
)

# Download data
# We only parallelize if we have a number of files larger than the cpu count
Expand Down

0 comments on commit e716072

Please sign in to comment.