Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create simple SQL view assets #2445

Merged
merged 13 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
graft src/pudl/package_data
graft src/pudl/metadata/templates
graft src/pudl/output/sql

prune ci
prune data
Expand Down
328 changes: 328 additions & 0 deletions devtools/python-output-table-conversion-debug.ipynb

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
*load_assets_from_modules([pudl.transform.ferc714], group_name="clean_ferc714"),
*load_assets_from_modules([glue_assets], group_name="glue"),
*load_assets_from_modules([static_assets], group_name="static"),
*load_assets_from_modules([pudl.output.denorm_eia], group_name="denorm_eia"),
*load_assets_from_modules([pudl.output.denorm_ferc], group_name="denorm_ferc"),
)

default_resources = {
Expand Down
41 changes: 0 additions & 41 deletions src/pudl/etl/denormalized_assets.py

This file was deleted.

8 changes: 7 additions & 1 deletion src/pudl/io_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ def _handle_str_output(self, context: OutputContext, query: str):
engine = self.engine
table_name = self._get_table_name(context)

# Make sure the metadata has been created for the view
_ = self._get_sqlalchemy_table(table_name)

with engine.connect() as con:
# Drop the existing view if it exists and create the new view.
# TODO (bendnorman): parameterize this safely.
Expand Down Expand Up @@ -563,7 +566,10 @@ def __init__(
"""
super().__init__(base_dir, db_name, md, timeout)

def _setup_database(self, timeout: float = 1_000.0) -> sa.engine.Engine:
def _setup_database(
self,
timeout: float = 1_000.0,
) -> sa.engine.Engine:
"""Create database engine and read the metadata.

Args:
Expand Down
38 changes: 38 additions & 0 deletions src/pudl/metadata/resources/eia860.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,44 @@
"sources": ["eia860", "eia923"],
"etl_group": "eia860",
},
"denorm_utilities_eia860": {
bendnorman marked this conversation as resolved.
Show resolved Hide resolved
"description": ("Denoramlized table containing all eia860 utility attributes."),
"schema": {
"fields": [
"utility_id_eia",
"utility_id_pudl",
"utility_name_eia",
"report_date",
"street_address",
"city",
"state",
"zip_code",
"plants_reported_owner",
"plants_reported_operator",
"plants_reported_asset_manager",
"plants_reported_other_relationship",
"entity_type",
"attention_line",
"address_2",
"zip_code_4",
"contact_firstname",
"contact_lastname",
"contact_title",
"phone_number",
"phone_extension",
"contact_firstname_2",
"contact_lastname_2",
"contact_title_2",
"phone_number_2",
"phone_extension_2",
"data_maturity",
],
"primary_key": ["utility_id_eia", "report_date"],
},
"field_namespace": "eia",
"sources": ["eia860", "eia923"],
"etl_group": "outputs",
},
}
"""EIA-860 resource attributes organized by PUDL identifier (``resource.name``).

Expand Down
16 changes: 16 additions & 0 deletions src/pudl/metadata/resources/ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,22 @@
"etl_group": "ferc1",
"field_namespace": "ferc1",
},
"denorm_plants_utils_ferc1": {
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
bendnorman marked this conversation as resolved.
Show resolved Hide resolved
"description": "Denormalized table that contains FERC plant and utility information.",
"schema": {
"fields": [
"utility_id_ferc1",
"plant_name_ferc1",
"plant_id_pudl",
"utility_name_ferc1",
"utility_id_pudl",
],
},
"field_namespace": "ferc1",
"etl_group": "outputs",
"sources": ["ferc1"],
"include_in_database": False,
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
},
}
"""FERC Form 1 resource attributes by PUDL identifier (``resource.name``).

Expand Down
2 changes: 2 additions & 0 deletions src/pudl/output/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
"""
from . import ( # noqa: F401
censusdp1tract,
denorm_eia,
denorm_ferc,
bendnorman marked this conversation as resolved.
Show resolved Hide resolved
eia860,
eia923,
epacems,
Expand Down
42 changes: 42 additions & 0 deletions src/pudl/output/denorm_eia.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""A collection of denormalized EIA assets."""
import pandas as pd
from dagster import asset

import pudl
from pudl.metadata.fields import apply_pudl_dtypes


@asset(io_manager_key="pudl_sqlite_io_manager", compute_kind="Python")
def denorm_utilities_eia860(
utilities_entity_eia: pd.DataFrame,
utilities_eia860: pd.DataFrame,
utilities_eia: pd.DataFrame,
):
"""Pull all fields from the EIA860 Utilities table.

Args:
utilities_entity_eia: EIA utility entity table.
utilities_eia860: EIA 860 annual utility table.
utilities_eia: Associations between EIA utilities and pudl utility IDs.

Returns:
A DataFrame containing all the fields of the EIA 860 Utilities table.
"""
utilities_eia = utilities_eia[["utility_id_eia", "utility_id_pudl"]]
out_df = pd.merge(
utilities_entity_eia, utilities_eia860, how="left", on=["utility_id_eia"]
)
out_df = pd.merge(out_df, utilities_eia, how="left", on=["utility_id_eia"])
out_df = (
out_df.assign(report_date=lambda x: pd.to_datetime(x.report_date))
.dropna(subset=["report_date", "utility_id_eia"])
.pipe(apply_pudl_dtypes, group="eia")
)
first_cols = [
"report_date",
"utility_id_eia",
"utility_id_pudl",
"utility_name_eia",
]
out_df = pudl.helpers.organize_cols(out_df, first_cols)
return out_df
6 changes: 6 additions & 0 deletions src/pudl/output/denorm_ferc.py
bendnorman marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""A collection of denormalized FERC assets."""
from pudl.output.sql.helpers import sql_asset_factory

denorm_plants_utils_ferc1_asset = sql_asset_factory(
"denorm_plants_utils_ferc1", {"plants_ferc1", "utilities_ferc1"}
)
5 changes: 5 additions & 0 deletions src/pudl/output/eia860.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ def utilities_eia860(pudl_engine, start_date=None, end_date=None):
pandas.DataFrame: A DataFrame containing all the fields of the EIA 860
Utilities table.
"""
logger.warning(
"pudl.output.eia860.utilities_eia860() will be deprecated in a future version of PUDL."
" In the future, call the PudlTabl.utils_eia860() method or pull the denorm_utilities_eia860 table"
"directly from the pudl.sqlite database."
)
pt = pudl.output.pudltabl.get_table_meta(pudl_engine)
# grab the entity table
utils_eia_tbl = pt["utilities_entity_eia"]
Expand Down
5 changes: 5 additions & 0 deletions src/pudl/output/ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ def plants_utils_ferc1(pudl_engine):
pandas.DataFrame: A DataFrame containing useful FERC Form 1 Plant and
Utility information.
"""
logger.warning(
"pudl.output.ferc1.plants_utils_ferc1() will be deprecated in a future version of PUDL."
" In the future, call the PudlTabl.pu_ferc1() method or pull the plants_utils_ferc1 table"
"directly from the pudl.sqlite database."
)
bendnorman marked this conversation as resolved.
Show resolved Hide resolved
pu_df = pd.merge(
pd.read_sql("plants_ferc1", pudl_engine),
pd.read_sql("utilities_ferc1", pudl_engine),
Expand Down
32 changes: 10 additions & 22 deletions src/pudl/output/pudltabl.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,21 +145,15 @@ def pu_eia860(self, update=False):
)
return self._dfs["pu_eia"]

def pu_ferc1(self, update=False):
def pu_ferc1(self) -> pd.DataFrame:
"""Pull a dataframe of FERC plant-utility associations.

Args:
update (bool): If true, re-calculate the output dataframe, even if
a cached version exists.

Returns:
pandas.DataFrame: a denormalized table for interactive use.
A denormalized table for interactive use.
"""
if update or self._dfs["pu_ferc1"] is None:
self._dfs["pu_ferc1"] = pudl.output.ferc1.plants_utils_ferc1(
self.pudl_engine
)
return self._dfs["pu_ferc1"]
return pd.read_sql_table("denorm_plants_utils_ferc1", self.pudl_engine).pipe(
apply_pudl_dtypes, group="ferc1"
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
)

def advanced_metering_infrastructure_eia861(self) -> pd.DataFrame:
"""An interim EIA 861 output function."""
Expand Down Expand Up @@ -364,21 +358,15 @@ def demand_hourly_pa_ferc714(self) -> pd.DataFrame:
###########################################################################
# EIA 860/923 OUTPUTS
###########################################################################
def utils_eia860(self, update=False):
def utils_eia860(self, update=False) -> pd.DataFrame:
"""Pull a dataframe describing utilities reported in EIA 860.

Args:
update (bool): If true, re-calculate the output dataframe, even if
a cached version exists.

Returns:
pandas.DataFrame: a denormalized table for interactive use.
A denormalized table for interactive use.
"""
if update or self._dfs["utils_eia860"] is None:
self._dfs["utils_eia860"] = pudl.output.eia860.utilities_eia860(
self.pudl_engine, start_date=self.start_date, end_date=self.end_date
)
return self._dfs["utils_eia860"]
return pd.read_sql("denorm_utilities_eia860", self.pudl_engine).pipe(
apply_pudl_dtypes, group="eia"
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
)

def bga_eia860(self, update=False):
"""Pull a dataframe of boiler-generator associations from EIA 860.
Expand Down
1 change: 1 addition & 0 deletions src/pudl/output/sql/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""A module of python helper functions and sql files for creating SQL views."""
5 changes: 5 additions & 0 deletions src/pudl/output/sql/denorm_plants_utils_ferc1.sql
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Build a view of useful FERC Plant & Utility information.
CREATE VIEW denorm_plants_utils_ferc1 AS
SELECT *
FROM plants_ferc1
INNER JOIN utilities_ferc1 USING(utility_id_ferc1);
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
35 changes: 35 additions & 0 deletions src/pudl/output/sql/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Helper functions for creating output assets."""
import importlib

from dagster import AssetsDefinition, asset


def sql_asset_factory(
name: str,
non_argument_deps: set[str] = {},
io_manager_key: str = "pudl_sqlite_io_manager",
compute_kind: str = "SQL",
) -> AssetsDefinition:
"""Factory for creating assets that run SQL statements."""

@asset(
name=name,
non_argument_deps=non_argument_deps,
io_manager_key=io_manager_key,
compute_kind=compute_kind,
)
def sql_view_asset() -> str:
"""Asset that creates sql view in a database."""
sql_path_traversable = importlib.resources.files("pudl.output.sql").joinpath(
f"{name}.sql"
)
try:
with importlib.resources.as_file(sql_path_traversable) as sql_path:
return sql_path.read_text()
# Raise a helpful error here if a sql file doesn't exist
except FileNotFoundError:
raise FileNotFoundError(
f"Could not find {sql_path}. Create a sql file in pudl.output.sql subpackage for {name} asset."
)

return sql_view_asset
8 changes: 8 additions & 0 deletions test/unit/helpers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
remove_leading_zeros_from_numeric_strings,
zero_pad_numeric_string,
)
from pudl.output.sql.helpers import sql_asset_factory

MONTHLY_GEN_FUEL = pd.DataFrame(
{
Expand Down Expand Up @@ -628,6 +629,13 @@ def test_cems_selection():
), "hourly_emissions_epacems or downstream asset present in selection."


def test_sql_asset_factory_missing_file():
"""Test sql_asset_factory throws a file not found error if file doesn't exist for an
asset name."""
with pytest.raises(FileNotFoundError):
sql_asset_factory(name="fake_view")()


def test_env_var():
os.environ["_PUDL_TEST"] = "test value"
env_var = EnvVar(env_var="_PUDL_TEST")
Expand Down