Skip to content

Commit

Permalink
Add initial toZarr asset
Browse files Browse the repository at this point in the history
  • Loading branch information
BenGalewsky committed Dec 22, 2024
1 parent 9340cb6 commit 4b421a7
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 9 deletions.
38 changes: 38 additions & 0 deletions downscaled_climate_data/assets/as_zarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import s3fs
import xarray as xr

from dagster import AssetIn, AssetOut, asset, ConfigSchema
from dagster_aws.s3 import S3Resource

@asset(
name="AsZarr",
ins={
"RawLOCA2": AssetIn()
})
def as_zarr(context,
RawLOCA2,
s3: S3Resource):
context.log.info(f"Converting {RawLOCA2['s3_key']} to zarr")
# Get S3 client from resource
s3_client = context.resources.s3.get_client()

# Initialize s3fs with the same credentials as the S3Resource
fs = s3fs.S3FileSystem(client_kwargs={"session": s3_client._session})

# Construct S3 paths
input_path = f"s3://{RawLOCA2['bucket']}/{RawLOCA2['s3_key']}"
zarr_key = RawLOCA2['s3_key'].replace('.nc', '.zarr')
output_path = f"s3://{RawLOCA2['bucket']}/{zarr_key}"

# Read NetCDF file from S3
with fs.open(input_path, 'rb') as f:
ds = xr.open_dataset(f)

# Write to Zarr format
ds.to_zarr(
store=s3fs.S3Map(root=output_path, s3=fs),
mode='w' # Overwrite if exists
)

# Close the dataset to free memory
ds.close()
18 changes: 12 additions & 6 deletions downscaled_climate_data/assets/loca2.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Dict

from dagster import AssetExecutionContext, Config, asset
from dagster_aws.s3 import S3Resource

Expand All @@ -16,7 +18,7 @@ class Loca2Config(Config):
)
def loca2_raw(context: AssetExecutionContext,
config: Loca2Config,
s3: S3Resource) -> None:
s3: S3Resource) -> dict[str, str]:

with requests.get(config.url, stream=True) as response:
# Raise an exception for bad HTTP responses
Expand All @@ -27,10 +29,14 @@ def loca2_raw(context: AssetExecutionContext,
context.log.info(f"Downloading {total_size:.2f} GB of data")

# Upload directly using S3 client's upload_fileobj method
s3.get_client().upload_fileobj(
response.raw,
config.bucket,
config.s3_key
)
# s3.get_client().upload_fileobj(
# response.raw,
# config.bucket,
# config.s3_key
# )

context.log.info(f"Downloading data to {config.s3_key}")
return {
"bucket": config.bucket,
"s3_key": config.s3_key,
}
3 changes: 2 additions & 1 deletion downscaled_climate_data/definitions.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from dagster import Definitions, EnvVar
from dagster_aws.s3 import S3Resource

from downscaled_climate_data.assets.as_zarr import as_zarr
from downscaled_climate_data.assets.loca2 import loca2_raw
from downscaled_climate_data.sensors.loca2_models import Loca2Models
from downscaled_climate_data.sensors.loca2_sensor import (loca2_sensor, Loca2Datasets)

defs = Definitions(
assets=[loca2_raw],
assets=[loca2_raw, as_zarr],
sensors=[loca2_sensor],
resources={
"loca2_models": Loca2Models(),
Expand Down
5 changes: 3 additions & 2 deletions downscaled_climate_data/sensors/loca2_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
RunConfig,
)

from downscaled_climate_data.assets.as_zarr import as_zarr
from downscaled_climate_data.assets.loca2 import loca2_raw
from downscaled_climate_data.sensors.loca2_models import Loca2Models

Expand Down Expand Up @@ -70,7 +71,7 @@ def get_downloadable_files(self, models: dict, model: str, scenario: str):
}


@sensor(target=[loca2_raw], name="LOCA2_Sensor")
@sensor(target=[loca2_raw, as_zarr], name="LOCA2_Sensor")
def loca2_sensor(
context: SensorEvaluationContext,
loca2_models: Loca2Models,
Expand Down Expand Up @@ -116,7 +117,7 @@ def loca2_sensor(
"bucket": destination_bucket,
"s3_key": destination_path_root + file["s3_key"],
}
}
},
}
),
tags={"model": model, "scenario": scenario, "memberid": file["memberid"]},
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ dependencies = [
"dagster-cloud",
"dagster-postgres",
"dagster-aws",
"s3fs",
"xarray",
"zarr",
"beautifulsoup4"
]

Expand Down

0 comments on commit 4b421a7

Please sign in to comment.