Skip to content

Commit

Permalink
Prototype dagster-pandera integration (#3282)
Browse files Browse the repository at this point in the history
* Fix types.

* POC: get pandera schema from our Schema

* Describe RESOURCE_METADATA elements before I start changing the shape willy nilly.

* Pandera df/col checks + dtypes interop with PudlResourceDescriptors

* First attempt at using the existing vs_bounds machinery in Pandera checks. Lots of ergo improvements to be had.

* Use asset check for out_eia923__boiler_fuel

Still remaining: generate asset checks programmatically, instead of with laborious manual typing.

* Automatically generate asset check.

* Generate asset checks automatically for all schemata.

* Make schema-based checks xfail-able.

* Fix docs build and unclosed file warnings.

* Clarify _get_keys_from_assets

* Remove df_checks and field_checks machinery.

---------

Co-authored-by: Zane Selvans <[email protected]>
  • Loading branch information
jdangerx and zaneselvans authored Feb 20, 2024
1 parent 1dc05fd commit eba35e0
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 26 deletions.
87 changes: 87 additions & 0 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
"""Dagster definitions for the PUDL ETL and Output tables."""
import importlib.resources
import itertools
import warnings

import pandera as pr
from dagster import (
AssetCheckResult,
AssetChecksDefinition,
AssetKey,
AssetsDefinition,
AssetSelection,
Definitions,
ExperimentalWarning,
SourceAsset,
asset_check,
define_asset_job,
load_assets_from_modules,
)
from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition

import pudl
from pudl.io_managers import (
Expand All @@ -31,6 +40,7 @@

logger = pudl.logging_helpers.get_logger(__name__)


default_assets = (
*load_assets_from_modules([eia_bulk_elec_assets], group_name="core_eia_bulk_elec"),
*load_assets_from_modules([epacems_assets], group_name="core_epacems"),
Expand Down Expand Up @@ -95,6 +105,79 @@
),
)


def asset_check_from_schema(
asset_key: AssetKey,
package: pudl.metadata.classes.Package,
) -> AssetChecksDefinition | None:
"""Create a dagster asset check based on the resource schema, if defined."""
resource_id = asset_key.to_user_string()
try:
resource = package.get_resource(resource_id)
except ValueError:
return None
pandera_schema = resource.schema.to_pandera()

@asset_check(asset=asset_key)
def pandera_schema_check(asset_value) -> AssetCheckResult:
try:
pandera_schema.validate(asset_value, lazy=True)
except pr.errors.SchemaErrors as schema_errors:
return AssetCheckResult(
passed=False,
metadata={
"errors": [
{
"failure_cases": str(err.failure_cases),
"data": str(err.data),
}
for err in schema_errors.schema_errors
],
},
)
return AssetCheckResult(passed=True)

return pandera_schema_check


def _get_keys_from_assets(
asset_def: AssetsDefinition | SourceAsset | CacheableAssetsDefinition,
) -> list[AssetKey]:
"""Get a list of asset keys.
Most assets have one key, which can be retrieved as a list from
``asset.keys``.
Multi-assets have multiple keys, which can also be retrieved as a list from
``asset.keys``.
SourceAssets always only have one key, and don't have ``asset.keys``. So we
look for ``asset.key`` and wrap it in a list.
We don't handle CacheableAssetsDefinitions yet.
"""
if isinstance(asset_def, AssetsDefinition):
return list(asset_def.keys)
if isinstance(asset_def, SourceAsset):
return [asset_def.key]
return []


# Asset Checks are still Experimental, silence the warning since we use them
# everywhere.
warnings.filterwarnings("ignore", category=ExperimentalWarning)
_package = pudl.metadata.classes.Package.from_resource_ids()
_asset_keys = itertools.chain.from_iterable(
_get_keys_from_assets(asset_def) for asset_def in default_assets
)
default_asset_checks = [
check
for check in (
asset_check_from_schema(asset_key, _package) for asset_key in _asset_keys
)
if check is not None
]

default_resources = {
"datastore": datastore,
"pudl_io_manager": pudl_mixed_format_io_manager,
Expand Down Expand Up @@ -153,6 +236,7 @@ def load_dataset_settings_from_file(setting_filename: str) -> dict:

defs: Definitions = Definitions(
assets=default_assets,
asset_checks=default_asset_checks,
resources=default_resources,
jobs=[
define_asset_job(
Expand Down Expand Up @@ -193,4 +277,7 @@ def load_dataset_settings_from_file(setting_filename: str) -> dict:
),
],
)

warnings.resetwarnings()

"""A collection of dagster assets, resources, IO managers, and jobs for the PUDL ETL."""
Loading

0 comments on commit eba35e0

Please sign in to comment.