Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create simple SQL view assets #2445

Merged
merged 13 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
graft src/pudl/metadata/templates
graft src/pudl/output/sql

prune .github
prune devtools
Expand Down
328 changes: 328 additions & 0 deletions devtools/python-output-table-conversion-debug.ipynb

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
*load_assets_from_modules([pudl.transform.ferc714], group_name="clean_ferc714"),
*load_assets_from_modules([glue_assets], group_name="glue"),
*load_assets_from_modules([static_assets], group_name="static"),
*load_assets_from_modules([pudl.output.denorm_eia], group_name="denorm_eia"),
*load_assets_from_modules([pudl.output.denorm_ferc1], group_name="denorm_ferc1"),
)

default_resources = {
Expand Down
8 changes: 7 additions & 1 deletion src/pudl/io_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ def _handle_str_output(self, context: OutputContext, query: str):
engine = self.engine
table_name = self._get_table_name(context)

# Make sure the metadata has been created for the view
_ = self._get_sqlalchemy_table(table_name)

with engine.connect() as con:
# Drop the existing view if it exists and create the new view.
# TODO (bendnorman): parameterize this safely.
Expand Down Expand Up @@ -562,7 +565,10 @@ def __init__(
"""
super().__init__(base_dir, db_name, md, timeout)

def _setup_database(self, timeout: float = 1_000.0) -> sa.engine.Engine:
def _setup_database(
self,
timeout: float = 1_000.0,
) -> sa.engine.Engine:
"""Create database engine and read the metadata.

Args:
Expand Down
4 changes: 2 additions & 2 deletions src/pudl/metadata/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ class Resource(Base):
"eia_bulk_elec",
"static_pudl",
] = None
include_in_database: bool = True
create_database_schema: bool = True

_check_unique = _validator(
"contributors", "keywords", "licenses", "sources", fn=_check_unique
Expand Down Expand Up @@ -1870,7 +1870,7 @@ def to_sql(
"""Return equivalent SQL MetaData."""
metadata = sa.MetaData()
for resource in self.resources:
if resource.include_in_database:
if resource.create_database_schema:
_ = resource.to_sql(
metadata,
check_types=check_types,
Expand Down
4 changes: 2 additions & 2 deletions src/pudl/metadata/resources/eia.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@
"sources": ["eia861"],
"etl_group": "static_eia_disabled", # currently not being loaded into the db
"field_namespace": "eia",
"include_in_database": False,
"create_database_schema": False,
},
"fuel_transportation_modes_eia": {
"description": "Long descriptions of the fuel transportation modes reported in the EIA-860 and EIA-923.",
Expand Down Expand Up @@ -603,7 +603,7 @@
"sources": ["eia860", "eia923"],
"etl_group": "outputs",
"field_namespace": "ppe",
"include_in_database": False,
"create_database_schema": False,
},
"prime_movers_eia": {
"description": "Long descriptions explaining the short prime mover codes reported in the EIA-860 and EIA-923.",
Expand Down
38 changes: 38 additions & 0 deletions src/pudl/metadata/resources/eia860.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,44 @@
"sources": ["eia860", "eia923"],
"etl_group": "eia860",
},
"denorm_utilities_eia": {
"description": ("Denoramlized table containing all EIA utility attributes."),
"schema": {
"fields": [
"utility_id_eia",
"utility_id_pudl",
"utility_name_eia",
"report_date",
"street_address",
"city",
"state",
"zip_code",
"plants_reported_owner",
"plants_reported_operator",
"plants_reported_asset_manager",
"plants_reported_other_relationship",
"entity_type",
"attention_line",
"address_2",
"zip_code_4",
"contact_firstname",
"contact_lastname",
"contact_title",
"phone_number",
"phone_extension",
"contact_firstname_2",
"contact_lastname_2",
"contact_title_2",
"phone_number_2",
"phone_extension_2",
"data_maturity",
],
"primary_key": ["utility_id_eia", "report_date"],
},
"field_namespace": "eia",
"sources": ["eia860", "eia923"],
"etl_group": "outputs",
},
}
"""EIA-860 resource attributes organized by PUDL identifier (``resource.name``).

Expand Down
2 changes: 1 addition & 1 deletion src/pudl/metadata/resources/epacems.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"sources": ["eia860", "epacems"],
"field_namespace": "epacems",
"etl_group": "epacems",
"include_in_database": False,
"create_database_schema": False,
},
}
"""EPA CEMS resource attributes by PUDL identifier (``resource.name``).
Expand Down
18 changes: 17 additions & 1 deletion src/pudl/metadata/resources/ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@
"sources": ["ferc1"],
"etl_group": "ferc1_disabled",
"field_namespace": "ferc1",
"include_in_database": False,
"create_database_schema": False,
},
"electric_operating_revenues_ferc1": {
"description": (
Expand Down Expand Up @@ -823,6 +823,22 @@
"etl_group": "ferc1",
"field_namespace": "ferc1",
},
"denorm_plants_utilities_ferc1": {
"description": "Denormalized table that contains FERC plant and utility information.",
"schema": {
"fields": [
"utility_id_ferc1",
"plant_name_ferc1",
"plant_id_pudl",
"utility_name_ferc1",
"utility_id_pudl",
],
},
"field_namespace": "ferc1",
"etl_group": "outputs",
"sources": ["ferc1"],
"create_database_schema": False,
},
}
"""FERC Form 1 resource attributes by PUDL identifier (``resource.name``).

Expand Down
2 changes: 2 additions & 0 deletions src/pudl/output/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
"""
from . import ( # noqa: F401
censusdp1tract,
denorm_eia,
denorm_ferc1,
eia860,
eia923,
epacems,
Expand Down
42 changes: 42 additions & 0 deletions src/pudl/output/denorm_eia.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""A collection of denormalized EIA assets."""
import pandas as pd
from dagster import asset

import pudl
from pudl.metadata.fields import apply_pudl_dtypes


@asset(io_manager_key="pudl_sqlite_io_manager", compute_kind="Python")
def denorm_utilities_eia(
utilities_entity_eia: pd.DataFrame,
utilities_eia860: pd.DataFrame,
utilities_eia: pd.DataFrame,
):
"""Pull all fields from the EIA Utilities table.

Args:
utilities_entity_eia: EIA utility entity table.
utilities_eia860: EIA 860 annual utility table.
utilities_eia: Associations between EIA utilities and pudl utility IDs.

Returns:
A DataFrame containing all the fields of the EIA 860 Utilities table.
"""
utilities_eia = utilities_eia[["utility_id_eia", "utility_id_pudl"]]
out_df = pd.merge(
utilities_entity_eia, utilities_eia860, how="left", on=["utility_id_eia"]
)
out_df = pd.merge(out_df, utilities_eia, how="left", on=["utility_id_eia"])
out_df = (
out_df.assign(report_date=lambda x: pd.to_datetime(x.report_date))
.dropna(subset=["report_date", "utility_id_eia"])
.pipe(apply_pudl_dtypes, group="eia")
)
first_cols = [
"report_date",
"utility_id_eia",
"utility_id_pudl",
"utility_name_eia",
]
out_df = pudl.helpers.organize_cols(out_df, first_cols)
return out_df
6 changes: 6 additions & 0 deletions src/pudl/output/denorm_ferc1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""A collection of denormalized FERC assets."""
from pudl.output.sql.helpers import sql_asset_factory

denorm_plants_utilities_ferc1_asset = sql_asset_factory(
"denorm_plants_utilities_ferc1", {"plants_ferc1", "utilities_ferc1"}
)
5 changes: 5 additions & 0 deletions src/pudl/output/eia860.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ def utilities_eia860(pudl_engine, start_date=None, end_date=None):
pandas.DataFrame: A DataFrame containing all the fields of the EIA 860
Utilities table.
"""
logger.warning(
"pudl.output.eia860.utilities_eia860() will be deprecated in a future version of PUDL."
" In the future, call the PudlTabl.utils_eia860() method or pull the denorm_utilities_eia table"
"directly from the pudl.sqlite database."
)
Comment on lines +31 to +35
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to fix this to merge, but I think we should probably avoid directing folks to use the PudlTabl outputs since we intend to deprecate that too -- if we're giving them directions for the future, it should probably be to use the DB tables directly.

pt = pudl.output.pudltabl.get_table_meta(pudl_engine)
# grab the entity table
utils_eia_tbl = pt["utilities_entity_eia"]
Expand Down
5 changes: 5 additions & 0 deletions src/pudl/output/ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ def plants_utils_ferc1(pudl_engine):
pandas.DataFrame: A DataFrame containing useful FERC Form 1 Plant and
Utility information.
"""
logger.warning(
"pudl.output.ferc1.plants_utils_ferc1() will be deprecated in a future version of PUDL."
" In the future, call the PudlTabl.pu_ferc1() method or pull the plants_utils_ferc1 table"
"directly from the pudl.sqlite database."
)
bendnorman marked this conversation as resolved.
Show resolved Hide resolved
pu_df = pd.merge(
pd.read_sql("plants_ferc1", pudl_engine),
pd.read_sql("utilities_ferc1", pudl_engine),
Expand Down
34 changes: 11 additions & 23 deletions src/pudl/output/pudltabl.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,21 +145,15 @@ def pu_eia860(self, update=False):
)
return self._dfs["pu_eia"]

def pu_ferc1(self, update=False):
def pu_ferc1(self) -> pd.DataFrame:
"""Pull a dataframe of FERC plant-utility associations.

Args:
update (bool): If true, re-calculate the output dataframe, even if
a cached version exists.

Returns:
pandas.DataFrame: a denormalized table for interactive use.
A denormalized table for interactive use.
"""
if update or self._dfs["pu_ferc1"] is None:
self._dfs["pu_ferc1"] = pudl.output.ferc1.plants_utils_ferc1(
self.pudl_engine
)
return self._dfs["pu_ferc1"]
return pd.read_sql_table(
"denorm_plants_utilities_ferc1", self.pudl_engine
).pipe(apply_pudl_dtypes, group="ferc1")

def advanced_metering_infrastructure_eia861(self) -> pd.DataFrame:
"""An interim EIA 861 output function."""
Expand Down Expand Up @@ -364,21 +358,15 @@ def demand_hourly_pa_ferc714(self) -> pd.DataFrame:
###########################################################################
# EIA 860/923 OUTPUTS
###########################################################################
def utils_eia860(self, update=False):
"""Pull a dataframe describing utilities reported in EIA 860.

Args:
update (bool): If true, re-calculate the output dataframe, even if
a cached version exists.
def utils_eia860(self, update=False) -> pd.DataFrame:
"""Pull a dataframe describing utilities reported in EIA.

Returns:
pandas.DataFrame: a denormalized table for interactive use.
A denormalized table for interactive use.
"""
if update or self._dfs["utils_eia860"] is None:
self._dfs["utils_eia860"] = pudl.output.eia860.utilities_eia860(
self.pudl_engine, start_date=self.start_date, end_date=self.end_date
)
return self._dfs["utils_eia860"]
return pd.read_sql("denorm_utilities_eia", self.pudl_engine).pipe(
apply_pudl_dtypes, group="eia"
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
)

def bga_eia860(self, update=False):
"""Pull a dataframe of boiler-generator associations from EIA 860.
Expand Down
1 change: 1 addition & 0 deletions src/pudl/output/sql/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""A module of python helper functions and sql files for creating SQL views."""
5 changes: 5 additions & 0 deletions src/pudl/output/sql/denorm_plants_utilities_ferc1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Build a view of useful FERC Plant & Utility information.
CREATE VIEW denorm_plants_utilities_ferc1 AS
SELECT *
FROM plants_ferc1
INNER JOIN utilities_ferc1 USING(utility_id_ferc1);
35 changes: 35 additions & 0 deletions src/pudl/output/sql/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Helper functions for creating output assets."""
import importlib

from dagster import AssetsDefinition, asset


def sql_asset_factory(
name: str,
non_argument_deps: set[str] = {},
io_manager_key: str = "pudl_sqlite_io_manager",
compute_kind: str = "SQL",
) -> AssetsDefinition:
"""Factory for creating assets that run SQL statements."""

@asset(
name=name,
non_argument_deps=non_argument_deps,
io_manager_key=io_manager_key,
compute_kind=compute_kind,
)
def sql_view_asset() -> str:
"""Asset that creates sql view in a database."""
sql_path_traversable = importlib.resources.files("pudl.output.sql").joinpath(
f"{name}.sql"
)
try:
with importlib.resources.as_file(sql_path_traversable) as sql_path:
return sql_path.read_text()
# Raise a helpful error here if a sql file doesn't exist
except FileNotFoundError:
raise FileNotFoundError(
f"Could not find {sql_path}. Create a sql file in pudl.output.sql subpackage for {name} asset."
)

return sql_view_asset
8 changes: 8 additions & 0 deletions test/unit/helpers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
remove_leading_zeros_from_numeric_strings,
zero_pad_numeric_string,
)
from pudl.output.sql.helpers import sql_asset_factory

MONTHLY_GEN_FUEL = pd.DataFrame(
{
Expand Down Expand Up @@ -627,6 +628,13 @@ def test_cems_selection():
), "hourly_emissions_epacems or downstream asset present in selection."


def test_sql_asset_factory_missing_file():
"""Test sql_asset_factory throws a file not found error if file doesn't exist for an
asset name."""
with pytest.raises(FileNotFoundError):
sql_asset_factory(name="fake_view")()


def test_env_var():
os.environ["_PUDL_TEST"] = "test value"
env_var = EnvVar(env_var="_PUDL_TEST")
Expand Down
2 changes: 1 addition & 1 deletion test/unit/io_managers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_pkg() -> Package:
]
schema = {"fields": fields, "primary_key": ["artistid"]}
view_resource = Resource(
name="artist_view", schema=schema, include_in_database=False
name="artist_view", schema=schema, create_database_schema=False
)

fields = [
Expand Down