Skip to content

Commit

Permalink
Fix the pattern matching of pr
Browse files Browse the repository at this point in the history
  • Loading branch information
BenGalewsky committed Dec 26, 2024
1 parent cd6e3a0 commit 7c08c82
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 68 deletions.
19 changes: 15 additions & 4 deletions downscaled_climate_data/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,27 @@
from downscaled_climate_data.assets.as_zarr import as_zarr
from downscaled_climate_data.assets.loca2 import loca2_raw
from downscaled_climate_data.sensors.loca2_models import Loca2Models
from downscaled_climate_data.sensors.loca2_sensor import (loca2_sensor,
from downscaled_climate_data.sensors.loca2_sensor import (loca2_sensor_monthly_pr,
loca2_sensor_monthly_tasmin,
loca2_sensor_pr,
loca2_sensor_tasmax,
Loca2Datasets,
loca2_sensor_monthly)
loca2_sensor_monthly_tasmax,
loca2_sensor_tasmin)

defs = Definitions(
assets=[loca2_raw, as_zarr],
sensors=[loca2_sensor, loca2_sensor_monthly],
sensors=[loca2_sensor_tasmax,
loca2_sensor_tasmin,
loca2_sensor_pr,
loca2_sensor_monthly_tasmax,
loca2_sensor_monthly_tasmin,
loca2_sensor_monthly_pr],
resources={
"loca2_models": Loca2Models(),
"loca2_datasets": Loca2Datasets(variable="tasmax"),
"loca2_datasets_tasmax": Loca2Datasets(variable="tasmax"),
"loca2_datasets_tasmin": Loca2Datasets(variable="tasmin"),
"loca2_datasets_pr": Loca2Datasets(variable="pr"),
"s3": S3Resource(endpoint_url=EnvVar("S3_ENDPOINT_URL"),
aws_access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"))
Expand Down
204 changes: 155 additions & 49 deletions downscaled_climate_data/sensors/loca2_sensor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import fnmatch
import re
import urllib.error
import urllib.request
from typing import Iterable

from bs4 import BeautifulSoup
from dagster import (
Expand All @@ -15,8 +16,20 @@
from downscaled_climate_data.assets.loca2 import loca2_raw
from downscaled_climate_data.sensors.loca2_models import Loca2Models

# Give ourselves 2 hours to process a single model/scenario
LOCA2_SENSOR_FREQUENCY = 3600 * 2

# For the smaller, monthly files, we can process them more frequently
LOCA2_MONTHLY_SENSOR_FREQUENCY = 600

LOCA2_ASSETS = [loca2_raw, as_zarr]


class Loca2Datasets(ConfigurableResource):
"""
Dagster resource that provides an iterator over LOCA2 files representing a variable
for a given model and scenario.
"""
variable: str = "tasmax"

def get_downloadable_files(self, models: dict,
Expand All @@ -29,6 +42,7 @@ def get_downloadable_files(self, models: dict,
+ model + "/cent/0p0625deg/" + memberid + "/"
+ scenario + "/" + self.variable + "/"
)

try:
path_soup = BeautifulSoup(
urllib.request.urlopen(path_string), "html.parser"
Expand All @@ -40,28 +54,18 @@ def get_downloadable_files(self, models: dict,
for file in path_soup.find_all("a"): # Pulling the links
file_list.append(file.get("href"))

file_string = (
self.variable
+ "."
+ model
+ "."
+ scenario
+ "."
+ memberid
+ ".*.LOCA_16thdeg_*.cent.nc"
)
filtered = fnmatch.filter(
file_list, file_string
) # Looking for specifically the full daily dataset
# Create a regex to find just the data files. Sadly, the monthly files in the
# pr variable have a different naming convention
file_regex = fr"{self.variable}\.{model}\.{scenario}\.{memberid}\..*.LOCA_16thdeg_v\d+" # noqa E501
file_regex += r"\.(monthly\.cent\.nc|cent\.monthly\.nc)" if monthly else r"\.cent\.nc" # noqa E501

filtered = [f for f in file_list if re.match(file_regex, f)]

directory = (
"/" + model + "/" + scenario + "/"
) # Pulling out the directory to download into

def filter_monthly(filename: str, monthly: bool) -> bool:
return "monthly" in filename if monthly else "monthly" not in filename

for filefiltered in [x for x in filtered if filter_monthly(x, monthly)]:
for filefiltered in filtered:
full_string = (
path_string + filefiltered
) # Putting together the full URL
Expand All @@ -75,7 +79,13 @@ def filter_monthly(filename: str, monthly: bool) -> bool:
}


def model_for_cursor(models, cursor):
def model_for_cursor(models, cursor) -> tuple[str, str]:
"""
Given a cursor, find the next model/scenario to process
:param models:
:param cursor:
:return a tuple of model, scenario:
"""
# Sort the models so we can chunk on model/scenario name
model_cursors = sorted(
f"{model}/{scenario}"
Expand All @@ -96,6 +106,14 @@ def model_for_cursor(models, cursor):
def run_request(file: dict[str, str],
model: str, scenario: str,
monthly: bool) -> RunRequest:
"""
Construct a RunRequest for a given file
:param file:
:param model:
:param scenario:
:param monthly:
:return:
"""
return RunRequest(
run_key=file["s3_key"],
run_config=RunConfig(
Expand All @@ -109,55 +127,143 @@ def run_request(file: dict[str, str],
},
}
),
tags={"model": model, "scenario": scenario, "memberid": file["memberid"]},
tags={"model": model,
"scenario": scenario,
"memberid": file["memberid"],
"variable": file['variable']
}
)


@sensor(
target=[loca2_raw, as_zarr],
name="LOCA2_Sensor",
minimum_interval_seconds=3600 * 2,)
def loca2_sensor(
context: SensorEvaluationContext,
loca2_models: Loca2Models,
loca2_datasets: Loca2Datasets,
) -> RunRequest:
def sensor_implementation(context, models,
dataset_resource, monthly: bool) -> Iterable[RunRequest]:
"""
Implements a common pattern for sensors that iterate over a set of models
and scenarios The actual sensor needs to be a function with the @sensor
decorator and performs the yields to feed the run requests to the Dagster engine
:param context:
:param models:
:param dataset_resource:
:param monthly:
:return:
"""

model, scenario = model_for_cursor(loca2_models.models, context.cursor)
# Find the next model/scenario to process
model, scenario = model_for_cursor(models, context.cursor)

if not model:
return

# Now we can launch jobs for each of the files for this model/scenario combination
for file in loca2_datasets.get_downloadable_files(
loca2_models.models, model, scenario, monthly=False
for file in dataset_resource.get_downloadable_files(
models, model, scenario, monthly=True
):
context.log.info(f"Found file: {file['url']}")
yield run_request(file, model, scenario, monthly=False)
yield run_request(file, model, scenario, monthly=monthly)

context.update_cursor(f"{model}/{scenario}")


@sensor(
target=[loca2_raw, as_zarr],
name="LOCA2_Sensor_Monthly",
minimum_interval_seconds=600)
def loca2_sensor_monthly(
name="LOCA2_Sensor_tasmax",
target=LOCA2_ASSETS,
minimum_interval_seconds=LOCA2_SENSOR_FREQUENCY,
tags={
"variable": "tasmax",
"frequency": "daily"})
def loca2_sensor_tasmax(
context: SensorEvaluationContext,
loca2_models: Loca2Models,
loca2_datasets: Loca2Datasets,
) -> RunRequest:
loca2_datasets_tasmax: Loca2Datasets,
):
for request in sensor_implementation(
context, loca2_models.models, loca2_datasets_tasmax, monthly=False):
yield request

model, scenario = model_for_cursor(loca2_models.models, context.cursor)

if not model:
return
@sensor(
name="LOCA2_Sensor_tasmin",
target=LOCA2_ASSETS,
minimum_interval_seconds=LOCA2_SENSOR_FREQUENCY,
tags={
"variable": "tasmin",
"frequency": "daily"})
def loca2_sensor_tasmin(
context: SensorEvaluationContext,
loca2_models: Loca2Models,
loca2_datasets_tasmin: Loca2Datasets,
):
for request in sensor_implementation(
context, loca2_models.models, loca2_datasets_tasmin, monthly=False):
yield request

# Now we can launch jobs for each of the files for this model/scenario combination
for file in loca2_datasets.get_downloadable_files(
loca2_models.models, model, scenario, monthly=True
):
context.log.info(f"Found file: {file['url']}")
yield run_request(file, model, scenario, monthly=True)

context.update_cursor(f"{model}/{scenario}")
@sensor(
name="LOCA2_Sensor_pr",
target=LOCA2_ASSETS,
minimum_interval_seconds=LOCA2_SENSOR_FREQUENCY,
tags={
"variable": "pr",
"frequency": "daily"})
def loca2_sensor_pr(
context: SensorEvaluationContext,
loca2_models: Loca2Models,
loca2_datasets_pr: Loca2Datasets,
):
for request in sensor_implementation(
context, loca2_models.models, loca2_datasets_pr, monthly=False):
yield request


@sensor(
name="LOCA2_Sensor_Monthly_tasmax",
target=LOCA2_ASSETS,
minimum_interval_seconds=LOCA2_MONTHLY_SENSOR_FREQUENCY,
tags={
"variable": "tasmax",
"frequency": "monthly", })
def loca2_sensor_monthly_tasmax(
context: SensorEvaluationContext,
loca2_models: Loca2Models,
loca2_datasets_tasmax: Loca2Datasets,
):

for request in sensor_implementation(
context, loca2_models.models, loca2_datasets_tasmax, monthly=True):
yield request


@sensor(
name="LOCA2_Sensor_Monthly_tasmin",
target=LOCA2_ASSETS,
minimum_interval_seconds=LOCA2_MONTHLY_SENSOR_FREQUENCY,
tags={
"variable": "tasmin",
"frequency": "monthly", })
def loca2_sensor_monthly_tasmin(
context: SensorEvaluationContext,
loca2_models: Loca2Models,
loca2_datasets_tasmin: Loca2Datasets,
):

for request in sensor_implementation(
context, loca2_models.models, loca2_datasets_tasmin, monthly=True):
yield request


@sensor(
name="LOCA2_Sensor_Monthly_pr",
target=LOCA2_ASSETS,
minimum_interval_seconds=LOCA2_MONTHLY_SENSOR_FREQUENCY,
tags={
"variable": "pr",
"frequency": "monthly", })
def loca2_sensor_monthly_pr(
context: SensorEvaluationContext,
loca2_models: Loca2Models,
loca2_datasets_pr: Loca2Datasets,
):

for request in sensor_implementation(
context, loca2_models.models, loca2_datasets_pr, monthly=True):
yield request
Loading

0 comments on commit 7c08c82

Please sign in to comment.