Skip to content

Commit

Permalink
Merge pull request #2445 from catalyst-cooperative/init-sql-views
Browse files Browse the repository at this point in the history
Create simple SQL view assets
  • Loading branch information
bendnorman authored Apr 6, 2023
2 parents 140627f + 2364040 commit c8782d5
Show file tree
Hide file tree
Showing 20 changed files with 519 additions and 31 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
graft src/pudl/metadata/templates
graft src/pudl/output/sql

prune .github
prune devtools
Expand Down
328 changes: 328 additions & 0 deletions devtools/python-output-table-conversion-debug.ipynb

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
*load_assets_from_modules([pudl.transform.ferc714], group_name="clean_ferc714"),
*load_assets_from_modules([glue_assets], group_name="glue"),
*load_assets_from_modules([static_assets], group_name="static"),
*load_assets_from_modules([pudl.output.denorm_eia], group_name="denorm_eia"),
*load_assets_from_modules([pudl.output.denorm_ferc1], group_name="denorm_ferc1"),
)

default_resources = {
Expand Down
8 changes: 7 additions & 1 deletion src/pudl/io_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ def _handle_str_output(self, context: OutputContext, query: str):
engine = self.engine
table_name = self._get_table_name(context)

# Make sure the metadata has been created for the view
_ = self._get_sqlalchemy_table(table_name)

with engine.connect() as con:
# Drop the existing view if it exists and create the new view.
# TODO (bendnorman): parameterize this safely.
Expand Down Expand Up @@ -562,7 +565,10 @@ def __init__(
"""
super().__init__(base_dir, db_name, md, timeout)

def _setup_database(self, timeout: float = 1_000.0) -> sa.engine.Engine:
def _setup_database(
self,
timeout: float = 1_000.0,
) -> sa.engine.Engine:
"""Create database engine and read the metadata.
Args:
Expand Down
4 changes: 2 additions & 2 deletions src/pudl/metadata/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ class Resource(Base):
"eia_bulk_elec",
"static_pudl",
] = None
include_in_database: bool = True
create_database_schema: bool = True

_check_unique = _validator(
"contributors", "keywords", "licenses", "sources", fn=_check_unique
Expand Down Expand Up @@ -1870,7 +1870,7 @@ def to_sql(
"""Return equivalent SQL MetaData."""
metadata = sa.MetaData()
for resource in self.resources:
if resource.include_in_database:
if resource.create_database_schema:
_ = resource.to_sql(
metadata,
check_types=check_types,
Expand Down
4 changes: 2 additions & 2 deletions src/pudl/metadata/resources/eia.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@
"sources": ["eia861"],
"etl_group": "static_eia_disabled", # currently not being loaded into the db
"field_namespace": "eia",
"include_in_database": False,
"create_database_schema": False,
},
"fuel_transportation_modes_eia": {
"description": "Long descriptions of the fuel transportation modes reported in the EIA-860 and EIA-923.",
Expand Down Expand Up @@ -603,7 +603,7 @@
"sources": ["eia860", "eia923"],
"etl_group": "outputs",
"field_namespace": "ppe",
"include_in_database": False,
"create_database_schema": False,
},
"prime_movers_eia": {
"description": "Long descriptions explaining the short prime mover codes reported in the EIA-860 and EIA-923.",
Expand Down
38 changes: 38 additions & 0 deletions src/pudl/metadata/resources/eia860.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,44 @@
"sources": ["eia860", "eia923"],
"etl_group": "eia860",
},
"denorm_utilities_eia": {
"description": ("Denoramlized table containing all EIA utility attributes."),
"schema": {
"fields": [
"utility_id_eia",
"utility_id_pudl",
"utility_name_eia",
"report_date",
"street_address",
"city",
"state",
"zip_code",
"plants_reported_owner",
"plants_reported_operator",
"plants_reported_asset_manager",
"plants_reported_other_relationship",
"entity_type",
"attention_line",
"address_2",
"zip_code_4",
"contact_firstname",
"contact_lastname",
"contact_title",
"phone_number",
"phone_extension",
"contact_firstname_2",
"contact_lastname_2",
"contact_title_2",
"phone_number_2",
"phone_extension_2",
"data_maturity",
],
"primary_key": ["utility_id_eia", "report_date"],
},
"field_namespace": "eia",
"sources": ["eia860", "eia923"],
"etl_group": "outputs",
},
}
"""EIA-860 resource attributes organized by PUDL identifier (``resource.name``).
Expand Down
2 changes: 1 addition & 1 deletion src/pudl/metadata/resources/epacems.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"sources": ["eia860", "epacems"],
"field_namespace": "epacems",
"etl_group": "epacems",
"include_in_database": False,
"create_database_schema": False,
},
}
"""EPA CEMS resource attributes by PUDL identifier (``resource.name``).
Expand Down
18 changes: 17 additions & 1 deletion src/pudl/metadata/resources/ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@
"sources": ["ferc1"],
"etl_group": "ferc1_disabled",
"field_namespace": "ferc1",
"include_in_database": False,
"create_database_schema": False,
},
"electric_operating_revenues_ferc1": {
"description": (
Expand Down Expand Up @@ -823,6 +823,22 @@
"etl_group": "ferc1",
"field_namespace": "ferc1",
},
"denorm_plants_utilities_ferc1": {
"description": "Denormalized table that contains FERC plant and utility information.",
"schema": {
"fields": [
"utility_id_ferc1",
"plant_name_ferc1",
"plant_id_pudl",
"utility_name_ferc1",
"utility_id_pudl",
],
},
"field_namespace": "ferc1",
"etl_group": "outputs",
"sources": ["ferc1"],
"create_database_schema": False,
},
}
"""FERC Form 1 resource attributes by PUDL identifier (``resource.name``).
Expand Down
2 changes: 2 additions & 0 deletions src/pudl/output/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
"""
from . import ( # noqa: F401
censusdp1tract,
denorm_eia,
denorm_ferc1,
eia860,
eia923,
epacems,
Expand Down
42 changes: 42 additions & 0 deletions src/pudl/output/denorm_eia.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""A collection of denormalized EIA assets."""
import pandas as pd
from dagster import asset

import pudl
from pudl.metadata.fields import apply_pudl_dtypes


@asset(io_manager_key="pudl_sqlite_io_manager", compute_kind="Python")
def denorm_utilities_eia(
utilities_entity_eia: pd.DataFrame,
utilities_eia860: pd.DataFrame,
utilities_eia: pd.DataFrame,
):
"""Pull all fields from the EIA Utilities table.
Args:
utilities_entity_eia: EIA utility entity table.
utilities_eia860: EIA 860 annual utility table.
utilities_eia: Associations between EIA utilities and pudl utility IDs.
Returns:
A DataFrame containing all the fields of the EIA 860 Utilities table.
"""
utilities_eia = utilities_eia[["utility_id_eia", "utility_id_pudl"]]
out_df = pd.merge(
utilities_entity_eia, utilities_eia860, how="left", on=["utility_id_eia"]
)
out_df = pd.merge(out_df, utilities_eia, how="left", on=["utility_id_eia"])
out_df = (
out_df.assign(report_date=lambda x: pd.to_datetime(x.report_date))
.dropna(subset=["report_date", "utility_id_eia"])
.pipe(apply_pudl_dtypes, group="eia")
)
first_cols = [
"report_date",
"utility_id_eia",
"utility_id_pudl",
"utility_name_eia",
]
out_df = pudl.helpers.organize_cols(out_df, first_cols)
return out_df
6 changes: 6 additions & 0 deletions src/pudl/output/denorm_ferc1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""A collection of denormalized FERC assets."""
from pudl.output.sql.helpers import sql_asset_factory

denorm_plants_utilities_ferc1_asset = sql_asset_factory(
"denorm_plants_utilities_ferc1", {"plants_ferc1", "utilities_ferc1"}
)
5 changes: 5 additions & 0 deletions src/pudl/output/eia860.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ def utilities_eia860(pudl_engine, start_date=None, end_date=None):
pandas.DataFrame: A DataFrame containing all the fields of the EIA 860
Utilities table.
"""
logger.warning(
"pudl.output.eia860.utilities_eia860() will be deprecated in a future version of PUDL."
" In the future, call the PudlTabl.utils_eia860() method or pull the denorm_utilities_eia table"
"directly from the pudl.sqlite database."
)
pt = pudl.output.pudltabl.get_table_meta(pudl_engine)
# grab the entity table
utils_eia_tbl = pt["utilities_entity_eia"]
Expand Down
5 changes: 5 additions & 0 deletions src/pudl/output/ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ def plants_utils_ferc1(pudl_engine):
pandas.DataFrame: A DataFrame containing useful FERC Form 1 Plant and
Utility information.
"""
logger.warning(
"pudl.output.ferc1.plants_utils_ferc1() will be deprecated in a future version of PUDL."
" In the future, call the PudlTabl.pu_ferc1() method or pull the plants_utils_ferc1 table"
"directly from the pudl.sqlite database."
)
pu_df = pd.merge(
pd.read_sql("plants_ferc1", pudl_engine),
pd.read_sql("utilities_ferc1", pudl_engine),
Expand Down
34 changes: 11 additions & 23 deletions src/pudl/output/pudltabl.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,21 +145,15 @@ def pu_eia860(self, update=False):
)
return self._dfs["pu_eia"]

def pu_ferc1(self, update=False):
def pu_ferc1(self) -> pd.DataFrame:
"""Pull a dataframe of FERC plant-utility associations.
Args:
update (bool): If true, re-calculate the output dataframe, even if
a cached version exists.
Returns:
pandas.DataFrame: a denormalized table for interactive use.
A denormalized table for interactive use.
"""
if update or self._dfs["pu_ferc1"] is None:
self._dfs["pu_ferc1"] = pudl.output.ferc1.plants_utils_ferc1(
self.pudl_engine
)
return self._dfs["pu_ferc1"]
return pd.read_sql_table(
"denorm_plants_utilities_ferc1", self.pudl_engine
).pipe(apply_pudl_dtypes, group="ferc1")

def advanced_metering_infrastructure_eia861(self) -> pd.DataFrame:
"""An interim EIA 861 output function."""
Expand Down Expand Up @@ -364,21 +358,15 @@ def demand_hourly_pa_ferc714(self) -> pd.DataFrame:
###########################################################################
# EIA 860/923 OUTPUTS
###########################################################################
def utils_eia860(self, update=False):
"""Pull a dataframe describing utilities reported in EIA 860.
Args:
update (bool): If true, re-calculate the output dataframe, even if
a cached version exists.
def utils_eia860(self, update=False) -> pd.DataFrame:
"""Pull a dataframe describing utilities reported in EIA.
Returns:
pandas.DataFrame: a denormalized table for interactive use.
A denormalized table for interactive use.
"""
if update or self._dfs["utils_eia860"] is None:
self._dfs["utils_eia860"] = pudl.output.eia860.utilities_eia860(
self.pudl_engine, start_date=self.start_date, end_date=self.end_date
)
return self._dfs["utils_eia860"]
return pd.read_sql("denorm_utilities_eia", self.pudl_engine).pipe(
apply_pudl_dtypes, group="eia"
)

def bga_eia860(self, update=False):
"""Pull a dataframe of boiler-generator associations from EIA 860.
Expand Down
1 change: 1 addition & 0 deletions src/pudl/output/sql/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""A module of python helper functions and sql files for creating SQL views."""
5 changes: 5 additions & 0 deletions src/pudl/output/sql/denorm_plants_utilities_ferc1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Build a view of useful FERC Plant & Utility information.
CREATE VIEW denorm_plants_utilities_ferc1 AS
SELECT *
FROM plants_ferc1
INNER JOIN utilities_ferc1 USING(utility_id_ferc1);
35 changes: 35 additions & 0 deletions src/pudl/output/sql/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Helper functions for creating output assets."""
import importlib

from dagster import AssetsDefinition, asset


def sql_asset_factory(
name: str,
non_argument_deps: set[str] = {},
io_manager_key: str = "pudl_sqlite_io_manager",
compute_kind: str = "SQL",
) -> AssetsDefinition:
"""Factory for creating assets that run SQL statements."""

@asset(
name=name,
non_argument_deps=non_argument_deps,
io_manager_key=io_manager_key,
compute_kind=compute_kind,
)
def sql_view_asset() -> str:
"""Asset that creates sql view in a database."""
sql_path_traversable = importlib.resources.files("pudl.output.sql").joinpath(
f"{name}.sql"
)
try:
with importlib.resources.as_file(sql_path_traversable) as sql_path:
return sql_path.read_text()
# Raise a helpful error here if a sql file doesn't exist
except FileNotFoundError:
raise FileNotFoundError(
f"Could not find {sql_path}. Create a sql file in pudl.output.sql subpackage for {name} asset."
)

return sql_view_asset
8 changes: 8 additions & 0 deletions test/unit/helpers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
remove_leading_zeros_from_numeric_strings,
zero_pad_numeric_string,
)
from pudl.output.sql.helpers import sql_asset_factory

MONTHLY_GEN_FUEL = pd.DataFrame(
{
Expand Down Expand Up @@ -627,6 +628,13 @@ def test_cems_selection():
), "hourly_emissions_epacems or downstream asset present in selection."


def test_sql_asset_factory_missing_file():
"""Test sql_asset_factory throws a file not found error if file doesn't exist for an
asset name."""
with pytest.raises(FileNotFoundError):
sql_asset_factory(name="fake_view")()


def test_env_var():
os.environ["_PUDL_TEST"] = "test value"
env_var = EnvVar(env_var="_PUDL_TEST")
Expand Down
2 changes: 1 addition & 1 deletion test/unit/io_managers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_pkg() -> Package:
]
schema = {"fields": fields, "primary_key": ["artistid"]}
view_resource = Resource(
name="artist_view", schema=schema, include_in_database=False
name="artist_view", schema=schema, create_database_schema=False
)

fields = [
Expand Down

0 comments on commit c8782d5

Please sign in to comment.