Skip to content

Commit

Permalink
feat(sat): Monthly Zarr files (#129)
Browse files Browse the repository at this point in the history
* feat(sat): Monthly Zarr files

* feat(dags): Add satellite dag

* fix(sat): Add files!
  • Loading branch information
devsjc authored Sep 16, 2024
1 parent 06c5220 commit 3cc3308
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 190 deletions.
38 changes: 18 additions & 20 deletions containers/sat/download_process_sat.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,17 @@ class Config:
cadence="15min",
product_id="EO:EUM:DAT:MSG:HRSEVIRI-IODC",
zarr_fmtstr={
"hrv": "%Y_hrv_iodc.zarr",
"nonhrv": "%Y_nonhrv_iodc.zarr",
"hrv": "%Y%m_hrv_iodc.zarr",
"nonhrv": "%Y%m_nonhrv_iodc.zarr",
},
),
"severi": Config(
region="europe",
cadence="5min",
product_id="EO:EUM:DAT:MSG:MSG15-RSS",
zarr_fmtstr={
"hrv": "%Y_hrv.zarr",
"nonhrv": "%Y_nonhrv.zarr",
"hrv": "%Y%m_hrv.zarr",
"nonhrv": "%Y%m_nonhrv.zarr",
},
),
# Optional
Expand All @@ -114,8 +114,8 @@ class Config:
cadence="15min",
product_id="EO:EUM:DAT:MSG:HRSEVIRI",
zarr_fmtstr={
"hrv": "%Y_hrv_odegree.zarr",
"nonhrv": "%Y_nonhrv_odegree.zarr",
"hrv": "%Y%m_hrv_odegree.zarr",
"nonhrv": "%Y%m_nonhrv_odegree.zarr",
},
),
}
Expand Down Expand Up @@ -576,18 +576,11 @@ def _rewrite_zarr_times(output_name: str) -> None:
type=pathlib.Path,
)
parser.add_argument(
"--start_date", "-s",
help="Date to download from (YYYY-MM-DD)",
type=dt.date.fromisoformat,
required=False,
default=str(dt.datetime.now(tz=dt.UTC).date()),
)
parser.add_argument(
"--end_date", "-e",
help="Date to download to (YYYY-MM-DD)",
type=dt.date.fromisoformat,
required=False,
default=str(dt.datetime.now(tz=dt.UTC).date()),
"--month", "-m",
help="Month to download data for (YYYY-MM)",
type=str,
required=True,
default=str(dt.datetime.now(tz=dt.UTC).strftime("%Y-%m")),
)
parser.add_argument(
"--delete_raw", "--rm",
Expand All @@ -597,6 +590,7 @@ def _rewrite_zarr_times(output_name: str) -> None:
)

def run(args: argparse.Namespace) -> None:
"""Run the download and processing pipeline."""
prog_start = dt.datetime.now(tz=dt.UTC)
log.info(f"{prog_start!s}: Running with args: {args}")

Expand All @@ -607,12 +601,16 @@ def run(args: argparse.Namespace) -> None:
sat_config = CONFIGS[args.sat]

# Get start and end times for run
start: dt.date = args.start_date
end: dt.date = args.end_date + dt.timedelta(days=1) if args.end_date == start else args.end_date
start: dt.datetime = dt.datetime.strptime(args.month, "%Y-%m")
end: dt.datetime = \
start.replace(month=start.month + 1) if start.month < 12 \
else start.replace(year=start.year + 1, month=1) \
- dt.timedelta(days=1)
scan_times: list[pd.Timestamp] = pd.date_range(
start=start,
end=end,
freq=sat_config.cadence,
inclusive="left",
).tolist()

# Estimate average runtime
Expand Down
5 changes: 4 additions & 1 deletion local_archives/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import resources
from constants import LOCATIONS_BY_ENVIRONMENT

from . import nwp
from . import nwp, sat

resources_by_env = {
"leo": {
Expand Down Expand Up @@ -36,14 +36,17 @@

all_assets: list[dg.AssetsDefinition] = [
*nwp.all_assets,
*sat.all_assets,
]

all_jobs: list[dg.JobDefinition] = [
*nwp.all_jobs,
*sat.all_jobs,
]

all_schedules: list[dg.ScheduleDefinition] = [
*nwp.all_schedules,
*sat.all_schedules,
]

defs = dg.Definitions(
Expand Down
5 changes: 3 additions & 2 deletions local_archives/nwp/ceda/ceda_global.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import dagster as dg
import datetime as dt
import os
from typing import Any
import datetime as dt

import dagster as dg
from dagster_docker import PipesDockerClient

from constants import LOCATIONS_BY_ENVIRONMENT

env = os.getenv("ENVIRONMENT", "local")
Expand Down
17 changes: 10 additions & 7 deletions local_archives/sat/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from dagster import Definitions, load_assets_from_modules
"""Definitions for the sat dagster code location."""

from sat import assets, jobs
import dagster as dg

from . import eumetsat

all_assets = load_assets_from_modules([assets])
all_assets: list[dg.AssetsDefinition] = [
*eumetsat.all_assets,
]

all_jobs: list[dg.JobDefinition] = []

all_schedules: list[dg.ScheduleDefinition] = []

defs = Definitions(
assets=all_assets,
schedules=jobs.schedules,
)
3 changes: 0 additions & 3 deletions local_archives/sat/assets/__init__.py

This file was deleted.

1 change: 0 additions & 1 deletion local_archives/sat/assets/eumetsat/__init__.py

This file was deleted.

39 changes: 0 additions & 39 deletions local_archives/sat/assets/eumetsat/common.py

This file was deleted.

14 changes: 0 additions & 14 deletions local_archives/sat/assets/eumetsat/iodc.py

This file was deleted.

12 changes: 12 additions & 0 deletions local_archives/sat/eumetsat/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import dagster as dg

from . import eumetsat_iodc


iodc_assets = dg.load_assets_from_modules(
modules=[eumetsat_iodc],
group_name="eumetsat_iodc",
)

all_assets: list[dg.AssetsDefinition] = [*iodc_assets]

65 changes: 65 additions & 0 deletions local_archives/sat/eumetsat/eumetsat_iodc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import datetime as dt
import os
from typing import Any

import dagster as dg
from dagster_docker import PipesDockerClient

from constants import LOCATIONS_BY_ENVIRONMENT

env = os.getenv("ENVIRONMENT", "local")
ZARR_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].SAT_ZARR_FOLDER

@dg.asset(
name="zarr_archive",
description="".join((
"Zarr archive of satellite data from EUMETSAT's IODC satellite.",
"Sourced via EUMDAC from EUMETSAT: ",
"https://navigator.eumetsat.int/product/EO:EUM:DAT:MSG:OCA-IODC\n",
"This asset is updated monthly, and surfaced as a Zarr Directory Store ",
"for each month. It is downloaded using the sat container: ",
"https://github.com/openclimatefix/dagster-dags",
)),
key_prefix=["sat", "eumetsat", "iodc"],
metadata={
"archive_folder": dg.MetadataValue.text(f"{ZARR_FOLDER}/sat/eumetsat/india"),
"area": dg.MetadataValue.text("india"),
"source": dg.MetadataValue.text("eumetsat"),
"expected_runtime": dg.MetadataValue.text("TBD"),
},
compute_kind="docker",
automation_condition=dg.AutomationCondition.eager(),
tags={
# "dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours
"dagster/priority": "1",
"dagster/concurrency_key": "eumetsat",
},
partitions_def=dg.MonthlyPartitionsDefinition(
start_date="2019-01-01",
end_offset=-3,
),
)
def iodc_monthly(
context: dg.AssetExecutionContext,
pipes_docker_client: PipesDockerClient,
) -> Any:
image: str = "ghcr.io/openclimatefix/sat-etl:main"
it: dt.datetime = context.partition_time_window.start
return pipes_docker_client.run(
image=image,
command=[
"iodc",
"-m",
it.strftime("%Y-%m"),
"--rm",
],
env={
"EUMETSAT_CONSUMER_KEY": os.environ["EUMETSAT_CONSUMER_KEY"],
"EUMETSAT_CONSUMER_SECRET": os.environ["EUMETSAT_CONSUMER_SECRET"],
},
container_kwargs={
"volumes": [f"{ZARR_FOLDER}/sat/eumetsat/india:/mnt/disks/sat"],
},
context=context,
).get_results()

103 changes: 0 additions & 103 deletions local_archives/sat/jobs.py

This file was deleted.

0 comments on commit 3cc3308

Please sign in to comment.