From 08e1f9f2aa7104863e883455acf78174844a0b2e Mon Sep 17 00:00:00 2001 From: bendnorman Date: Wed, 22 Mar 2023 16:43:08 -0800 Subject: [PATCH 01/10] Create sql view factory and plants_utils_ferc1 view --- devtools/output-table-conversion-test.ipynb | 287 ++++++++++++++++++++ src/pudl/etl/__init__.py | 2 + src/pudl/io_managers.py | 1 - src/pudl/metadata/resources/ferc1.py | 15 + src/pudl/output/ferc1.py | 11 + src/pudl/output/helpers.py | 33 +++ src/pudl/output/sql/plants_utils_ferc1.sql | 5 + 7 files changed, 353 insertions(+), 1 deletion(-) create mode 100644 devtools/output-table-conversion-test.ipynb create mode 100644 src/pudl/output/helpers.py create mode 100644 src/pudl/output/sql/plants_utils_ferc1.sql diff --git a/devtools/output-table-conversion-test.ipynb b/devtools/output-table-conversion-test.ipynb new file mode 100644 index 0000000000..c9f854db7e --- /dev/null +++ b/devtools/output-table-conversion-test.ipynb @@ -0,0 +1,287 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "f786051b-2aa0-44e0-bfd7-fe6827b6e1a3", + "metadata": {}, + "source": [ + "# Purpose\n", + "We are in the process of converting some functions in `pudl.output` to be SQL views. This notebook allows us to compare the outputs of the old python functions with the SQL view." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "ec08c060-ba49-4466-81a0-315a45993928", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import os\n", + "\n", + "assert os.environ.get(\"DAGSTER_HOME\"), (\n", + " \"The DAGSTER_HOME env var is not set so dagster won't be able to find the assets.\"\n", + " \"Set the DAGSTER_HOME env var in this notebook or kill the jupyter server and set\"\n", + " \" the DAGSTER_HOME env var in your terminal and relaunch jupyter.\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "5ce7f88e-c7b9-4963-a0e4-72ccbcb1f70e", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "pudl_settings is being deprecated in favor of environment variables variables PUDL_OUTPUT and PUDL_INPUT. For more info see: https://catalystcoop-pudl.readthedocs.io/en/dev/dev/dev_setup.html\n", + "pudl_settings is being deprecated in favor of environment variables PUDL_OUTPUT and PUDL_INPUT. For more info see: https://catalystcoop-pudl.readthedocs.io/en/dev/dev/dev_setup.html\n", + "sqlite and parquet directories are no longer being used. Make sure there is a single directory named 'output' at the root of your workspace. For more info see: https://catalystcoop-pudl.readthedocs.io/en/dev/dev/dev_setup.html\n", + "pudl_settings is being deprecated in favor of environment variables variables PUDL_OUTPUT and PUDL_INPUT. For more info see: https://catalystcoop-pudl.readthedocs.io/en/dev/dev/dev_setup.html\n", + "pudl_settings is being deprecated in favor of environment variables PUDL_OUTPUT and PUDL_INPUT. For more info see: https://catalystcoop-pudl.readthedocs.io/en/dev/dev/dev_setup.html\n", + "sqlite and parquet directories are no longer being used. Make sure there is a single directory named 'output' at the root of your workspace. For more info see: https://catalystcoop-pudl.readthedocs.io/en/dev/dev/dev_setup.html\n" + ] + } + ], + "source": [ + "from pudl.workspace.setup import get_defaults\n", + "import sqlalchemy as sa\n", + "import pandas as pd\n", + "\n", + "# TODO: This should be replaced with get_defaults()\n", + "engine = sa.create_engine(f\"sqlite:///{os.environ['PUDL_OUTPUT']}pudl.sqlite\")" + ] + }, + { + "cell_type": "markdown", + "id": "d5f6b021-26cf-4804-929f-0e4975751bd1", + "metadata": {}, + "source": [ + "## Compare output of old python function with SQL view" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "5c705a39-0517-4c7b-a1b0-ca8a12daec53", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 33.3 ms, sys: 5.1 ms, total: 38.4 ms\n", + "Wall time: 53.6 ms\n" + ] + } + ], + "source": [ + "%%time\n", + "view_name = \"plants_utils_ferc1\"\n", + "\n", + "with engine.connect() as con:\n", + " view_df = pd.read_sql_table(view_name, con)" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "633999af-d881-424b-af8c-64fc837982cd", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
utility_id_ferc1plant_name_ferc1plant_id_pudlutility_name_ferc1utility_id_pudl
0206*dolet hills (3)1Southwestern Electric Power Company301
1206*flint creek (1)2Southwestern Electric Power Company301
2206*pirkey (2)3Southwestern Electric Power Company301
315959th st gt-14Consolidated Edison Company of New York, Inc.79
415974th st gt 1&25Consolidated Edison Company of New York, Inc.79
\n", + "
" + ], + "text/plain": [ + " utility_id_ferc1 plant_name_ferc1 plant_id_pudl utility_name_ferc1 utility_id_pudl\n", + "0 206 *dolet hills (3) 1 Southwestern Electric Power Company 301\n", + "1 206 *flint creek (1) 2 Southwestern Electric Power Company 301\n", + "2 206 *pirkey (2) 3 Southwestern Electric Power Company 301\n", + "3 159 59th st gt-1 4 Consolidated Edison Company of New York, Inc. 79\n", + "4 159 74th st gt 1&2 5 Consolidated Edison Company of New York, Inc. 79" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "view_df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "af6253bb-e135-4068-8088-4e08c9914054", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2023-03-22 16:40:52 [ WARNING] catalystcoop.pudl.output.ferc1:56 pudl.output.ferc1.plants_utils_ferc1() will be deprecated in a future version of PUDL.In the future, call the PudlTabl.pu_ferc1() method or pull the plants_utils_ferc1 tabledirectly from the pudl.sqlite database.\n" + ] + } + ], + "source": [ + "# Import the old python functions\n", + "from pudl.output.ferc1 import plants_utils_ferc1\n", + "\n", + "old_output_func = plants_utils_ferc1\n", + "\n", + "old_df = old_output_func(engine)\n", + "\n", + "# Align pandas index and sort values using all of the columns\n", + "key = list(old_df.columns)\n", + "old_df = old_df.sort_values(by=key).reset_index(drop=True)\n", + "view_df = view_df.sort_values(by=key).reset_index(drop=True)\n", + "\n", + "pd.testing.assert_frame_equal(old_df, view_df)" + ] + }, + { + "cell_type": "markdown", + "id": "955efc1e-b21f-473e-b3f2-ab2fad89591d", + "metadata": {}, + "source": [ + "## Make sure we can load the view using the SQLite IO Manager" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "a9d81a2b-9c68-443b-9a56-5563f4cbc920", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "RangeIndex: 5 entries, 0 to 4\n", + "Data columns (total 5 columns):\n", + " # Column Non-Null Count Dtype \n", + "--- ------ -------------- ----- \n", + " 0 utility_id_ferc1 5 non-null Int64 \n", + " 1 plant_name_ferc1 5 non-null string\n", + " 2 plant_id_pudl 5 non-null Int64 \n", + " 3 utility_name_ferc1 5 non-null string\n", + " 4 utility_id_pudl 5 non-null Int64 \n", + "dtypes: Int64(3), string(2)\n", + "memory usage: 343.0 bytes\n" + ] + } + ], + "source": [ + "from dagster import AssetKey\n", + "\n", + "from pudl.etl import defs\n", + "\n", + "df = defs.load_asset_value(AssetKey(view_name))\n", + "\n", + "df.head().info()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index dd2e4e9f4b..3383a2190c 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -8,6 +8,7 @@ Definitions, define_asset_job, load_assets_from_modules, + load_assets_from_package_module, ) import pudl @@ -45,6 +46,7 @@ *load_assets_from_modules([pudl.transform.ferc714], group_name="clean_ferc714"), *load_assets_from_modules([glue_assets], group_name="glue"), *load_assets_from_modules([static_assets], group_name="static"), + *load_assets_from_package_module(pudl.output, group_name="outputs"), ) default_resources = { diff --git a/src/pudl/io_managers.py b/src/pudl/io_managers.py index 4559ef8ea7..61c713038f 100644 --- a/src/pudl/io_managers.py +++ b/src/pudl/io_managers.py @@ -388,7 +388,6 @@ def pudl_sqlite_io_manager(init_context) -> SQLiteIOManager: excluded_etl_groups=( "static_eia_disabled", "epacems", - "outputs", "ferc1_disabled", "eia861_disabled", ) diff --git a/src/pudl/metadata/resources/ferc1.py b/src/pudl/metadata/resources/ferc1.py index 169c941e27..c96b2e76d7 100644 --- a/src/pudl/metadata/resources/ferc1.py +++ b/src/pudl/metadata/resources/ferc1.py @@ -822,6 +822,21 @@ "etl_group": "ferc1", "field_namespace": "ferc1", }, + "plants_utils_ferc1": { + "description": "Output table that contains FERC plant and utility information.", + "schema": { + "fields": [ + "utility_id_ferc1", + "plant_name_ferc1", + "plant_id_pudl", + "utility_name_ferc1", + "utility_id_pudl", + ], + }, + "field_namespace": "ferc1", + "etl_group": "outputs", + "sources": ["ferc1"], + }, } """FERC Form 1 resource attributes by PUDL identifier (``resource.name``). diff --git a/src/pudl/output/ferc1.py b/src/pudl/output/ferc1.py index caddf47f90..2a63f7c669 100644 --- a/src/pudl/output/ferc1.py +++ b/src/pudl/output/ferc1.py @@ -5,6 +5,7 @@ import pudl from pudl.metadata.fields import apply_pudl_dtypes +from pudl.output.helpers import sql_asset_factory logger = pudl.logging_helpers.get_logger(__name__) @@ -36,6 +37,11 @@ def read_table_with_start_end_dates( return pd.read_sql(table_select, pudl_engine) +plants_utils_ferc1_asset = sql_asset_factory( + "plants_utils_ferc1", {"plants_ferc1", "utilities_ferc1"} +) + + def plants_utils_ferc1(pudl_engine): """Build a dataframe of useful FERC Plant & Utility information. @@ -47,6 +53,11 @@ def plants_utils_ferc1(pudl_engine): pandas.DataFrame: A DataFrame containing useful FERC Form 1 Plant and Utility information. """ + logger.warning( + "pudl.output.ferc1.plants_utils_ferc1() will be deprecated in a future version of PUDL." + " In the future, call the PudlTabl.pu_ferc1() method or pull the plants_utils_ferc1 table" + "directly from the pudl.sqlite database." + ) pu_df = pd.merge( pd.read_sql("plants_ferc1", pudl_engine), pd.read_sql("utilities_ferc1", pudl_engine), diff --git a/src/pudl/output/helpers.py b/src/pudl/output/helpers.py new file mode 100644 index 0000000000..e04172588c --- /dev/null +++ b/src/pudl/output/helpers.py @@ -0,0 +1,33 @@ +"""Helper functions for creating output assets.""" +import importlib + +from dagster import AssetsDefinition, asset + + +def sql_asset_factory( + name: str, + non_argument_deps: set[str], + io_manager_key: str = "pudl_sqlite_io_manager", + compute_kind: str = "SQL", +) -> AssetsDefinition: + """Factory for creating assets that run SQL statements.""" + + @asset( + name=name, + non_argument_deps=non_argument_deps, + io_manager_key=io_manager_key, + compute_kind=compute_kind, + ) + def sql_view_asset() -> str: + """Asset that creates sql view in a database.""" + sql_path = importlib.resources.path("pudl.output.sql", f"{name}.sql") + try: + with open(sql_path) as reader: + return reader.read() + # Raise a helpful error here if a sql file doesn't exist + except FileNotFoundError: + raise FileNotFoundError( + f"Could not find {sql_path}. Create a sql file in pudl.output.sql subpackage for {name} asset." + ) + + return sql_view_asset diff --git a/src/pudl/output/sql/plants_utils_ferc1.sql b/src/pudl/output/sql/plants_utils_ferc1.sql new file mode 100644 index 0000000000..0a7069236d --- /dev/null +++ b/src/pudl/output/sql/plants_utils_ferc1.sql @@ -0,0 +1,5 @@ +-- Build a dataframe of useful FERC Plant & Utility information. +CREATE VIEW plants_utils_ferc1 AS +SELECT * +FROM plants_ferc1 + INNER JOIN utilities_ferc1 USING(utility_id_ferc1); From 024a2478e8013c1eaba35ee8336cdd7cd227bd23 Mon Sep 17 00:00:00 2001 From: bendnorman Date: Thu, 23 Mar 2023 13:52:50 -0800 Subject: [PATCH 02/10] Add param to SQLiteIOManager to exclude tables from schema creation We want the IO Manager to throw an error if we haven't created resource meatdata for a view but we also don't want a table schema created for the view in the database. --- devtools/output-table-conversion-test.ipynb | 8 +- src/pudl/io_managers.py | 52 ++++++++++-- src/pudl/metadata/classes.py | 1 + src/pudl/metadata/resources/ferc1.py | 6 +- src/pudl/output/ferc1.py | 4 +- src/pudl/output/pudltabl.py | 8 +- ...erc1.sql => denorm_plants_utils_ferc1.sql} | 2 +- test/unit/io_managers_test.py | 80 ++++++++++++++++++- 8 files changed, 138 insertions(+), 23 deletions(-) rename src/pudl/output/sql/{plants_utils_ferc1.sql => denorm_plants_utils_ferc1.sql} (78%) diff --git a/devtools/output-table-conversion-test.ipynb b/devtools/output-table-conversion-test.ipynb index c9f854db7e..6c909ac918 100644 --- a/devtools/output-table-conversion-test.ipynb +++ b/devtools/output-table-conversion-test.ipynb @@ -77,14 +77,14 @@ "name": "stdout", "output_type": "stream", "text": [ - "CPU times: user 33.3 ms, sys: 5.1 ms, total: 38.4 ms\n", - "Wall time: 53.6 ms\n" + "CPU times: user 34.1 ms, sys: 3.71 ms, total: 37.8 ms\n", + "Wall time: 53.8 ms\n" ] } ], "source": [ "%%time\n", - "view_name = \"plants_utils_ferc1\"\n", + "view_name = \"denorm_plants_utils_ferc1\"\n", "\n", "with engine.connect() as con:\n", " view_df = pd.read_sql_table(view_name, con)" @@ -197,7 +197,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "2023-03-22 16:40:52 [ WARNING] catalystcoop.pudl.output.ferc1:56 pudl.output.ferc1.plants_utils_ferc1() will be deprecated in a future version of PUDL.In the future, call the PudlTabl.pu_ferc1() method or pull the plants_utils_ferc1 tabledirectly from the pudl.sqlite database.\n" + "2023-03-23 13:35:06 [ WARNING] catalystcoop.pudl.output.ferc1:56 pudl.output.ferc1.plants_utils_ferc1() will be deprecated in a future version of PUDL. In the future, call the PudlTabl.pu_ferc1() method or pull the plants_utils_ferc1 tabledirectly from the pudl.sqlite database.\n" ] } ], diff --git a/src/pudl/io_managers.py b/src/pudl/io_managers.py index 61c713038f..bf5cc050d1 100644 --- a/src/pudl/io_managers.py +++ b/src/pudl/io_managers.py @@ -89,6 +89,7 @@ def __init__( base_dir: str, db_name: str, md: sa.MetaData = None, + exclude_tables: tuple[str] = None, timeout: float = 1_000.0, ): """Init a SQLiteIOmanager. @@ -99,6 +100,11 @@ def __init__( db_name: the name of sqlite database. md: database metadata described as a SQLAlchemy MetaData object. If not specified, default to metadata stored in the pudl.metadata subpackage. + exclude_tables: Tuple of table names whose schemas should not be created + in the database. For example, views defined in pudl.metadata.resources + should be excluded because they should not have database schemas but + the SQLiteIOManager needs a way to check if the resource metadata + has been created for a view. timeout: How many seconds the connection should wait before raising an exception, if the database is locked by another connection. If another connection opens a transaction to modify the database, @@ -122,7 +128,9 @@ def __init__( if not self.md: self.md = sa.MetaData() - self.engine = self._setup_database(timeout=timeout) + self.engine = self._setup_database( + timeout=timeout, exclude_tables=exclude_tables + ) def _get_table_name(self, context) -> str: """Get asset name from dagster context object.""" @@ -132,7 +140,9 @@ def _get_table_name(self, context) -> str: table_name = context.get_identifier() return table_name - def _setup_database(self, timeout: float = 1_000.0) -> sa.engine.Engine: + def _setup_database( + self, timeout: float = 1_000.0, exclude_tables: tuple[str] = None + ) -> sa.engine.Engine: """Create database and metadata if they don't exist. Args: @@ -140,6 +150,11 @@ def _setup_database(self, timeout: float = 1_000.0) -> sa.engine.Engine: exception, if the database is locked by another connection. If another connection opens a transaction to modify the database, it will be locked until that transaction is committed. + exclude_tables: Tuple of table names whose schemas should not be created + in the database. For example, views defined in pudl.metadata.resources + should be excluded because they should not have database schemas but + the SQLiteIOManager needs a way to check if the resource metadata + has been created for a view. Returns: engine: SQL Alchemy engine that connects to a database in the base_dir. @@ -156,8 +171,15 @@ def _setup_database(self, timeout: float = 1_000.0) -> sa.engine.Engine: # Create the database and schemas if not db_path.exists(): db_path.touch() - self.md.create_all(engine) - + if exclude_tables: + # Get sql alchemy table objects that should not be excluded + table_names = {table_name for table_name in self.md.tables} - set( + exclude_tables + ) + tables = tuple(self.md.tables[table_name] for table_name in table_names) + self.md.create_all(engine, tables=tables) + else: + self.md.create_all(engine) return engine def _get_sqlalchemy_table(self, table_name: str) -> sa.Table: @@ -311,6 +333,9 @@ def _handle_str_output(self, context: OutputContext, query: str): engine = self.engine table_name = self._get_table_name(context) + # Make sure the metadata has been created for the view + _ = self._get_sqlalchemy_table(table_name) + with engine.connect() as con: # Drop the existing view if it exists and create the new view. # TODO (bendnorman): parameterize this safely. @@ -392,7 +417,10 @@ def pudl_sqlite_io_manager(init_context) -> SQLiteIOManager: "eia861_disabled", ) ).to_sql() - return SQLiteIOManager(base_dir=base_dir, db_name="pudl", md=md) + exclude_tables = Package.get_etl_group_tables("output_views") + return SQLiteIOManager( + base_dir=base_dir, db_name="pudl", md=md, exclude_tables=exclude_tables + ) class FercSQLiteIOManager(SQLiteIOManager): @@ -409,6 +437,7 @@ def __init__( base_dir: str = None, db_name: str = None, md: sa.MetaData = None, + exclude_tables: tuple[str] = None, timeout: float = 1_000.0, ): """Initialize FercSQLiteIOManager. @@ -419,14 +448,23 @@ def __init__( db_name: the name of sqlite database. md: database metadata described as a SQLAlchemy MetaData object. If not specified, default to metadata stored in the pudl.metadata subpackage. + exclude_tables: Tuple of table names whose schemas should not be created + in the database. For example, views defined in pudl.metadata.resources + should be excluded because they should not have database schemas but + the SQLiteIOManager needs a way to check if the resource metadata + has been created for a view. timeout: How many seconds the connection should wait before raising an exception, if the database is locked by another connection. If another connection opens a transaction to modify the database, it will be locked until that transaction is committed. """ - super().__init__(base_dir, db_name, md, timeout) + super().__init__(base_dir, db_name, md, exclude_tables, timeout) - def _setup_database(self, timeout: float = 1_000.0) -> sa.engine.Engine: + def _setup_database( + self, + timeout: float = 1_000.0, + exclude_tables: tuple[str] = None, + ) -> sa.engine.Engine: """Create database engine and read the metadata. Args: diff --git a/src/pudl/metadata/classes.py b/src/pudl/metadata/classes.py index f8226a1de2..b4a18e0e5b 100644 --- a/src/pudl/metadata/classes.py +++ b/src/pudl/metadata/classes.py @@ -1196,6 +1196,7 @@ class Resource(Base): "ferc714", "glue", "outputs", + "output_views", "static_ferc1", "static_eia", "static_eia_disabled", diff --git a/src/pudl/metadata/resources/ferc1.py b/src/pudl/metadata/resources/ferc1.py index c96b2e76d7..361d6b4d88 100644 --- a/src/pudl/metadata/resources/ferc1.py +++ b/src/pudl/metadata/resources/ferc1.py @@ -822,8 +822,8 @@ "etl_group": "ferc1", "field_namespace": "ferc1", }, - "plants_utils_ferc1": { - "description": "Output table that contains FERC plant and utility information.", + "denorm_plants_utils_ferc1": { + "description": "Denormalized table that contains FERC plant and utility information.", "schema": { "fields": [ "utility_id_ferc1", @@ -834,7 +834,7 @@ ], }, "field_namespace": "ferc1", - "etl_group": "outputs", + "etl_group": "output_views", "sources": ["ferc1"], }, } diff --git a/src/pudl/output/ferc1.py b/src/pudl/output/ferc1.py index 2a63f7c669..c104f7e378 100644 --- a/src/pudl/output/ferc1.py +++ b/src/pudl/output/ferc1.py @@ -37,8 +37,8 @@ def read_table_with_start_end_dates( return pd.read_sql(table_select, pudl_engine) -plants_utils_ferc1_asset = sql_asset_factory( - "plants_utils_ferc1", {"plants_ferc1", "utilities_ferc1"} +denorm_plants_utils_ferc1_asset = sql_asset_factory( + "denorm_plants_utils_ferc1", {"plants_ferc1", "utilities_ferc1"} ) diff --git a/src/pudl/output/pudltabl.py b/src/pudl/output/pudltabl.py index 49db9259b9..5bca739168 100644 --- a/src/pudl/output/pudltabl.py +++ b/src/pudl/output/pudltabl.py @@ -155,11 +155,9 @@ def pu_ferc1(self, update=False): Returns: pandas.DataFrame: a denormalized table for interactive use. """ - if update or self._dfs["pu_ferc1"] is None: - self._dfs["pu_ferc1"] = pudl.output.ferc1.plants_utils_ferc1( - self.pudl_engine - ) - return self._dfs["pu_ferc1"] + return pd.read_sql("denorm_plants_utils_ferc1", self.pudl_engine).pipe( + apply_pudl_dtypes, group="eia" + ) def advanced_metering_infrastructure_eia861(self) -> pd.DataFrame: """An interim EIA 861 output function.""" diff --git a/src/pudl/output/sql/plants_utils_ferc1.sql b/src/pudl/output/sql/denorm_plants_utils_ferc1.sql similarity index 78% rename from src/pudl/output/sql/plants_utils_ferc1.sql rename to src/pudl/output/sql/denorm_plants_utils_ferc1.sql index 0a7069236d..79706292ec 100644 --- a/src/pudl/output/sql/plants_utils_ferc1.sql +++ b/src/pudl/output/sql/denorm_plants_utils_ferc1.sql @@ -1,5 +1,5 @@ -- Build a dataframe of useful FERC Plant & Utility information. -CREATE VIEW plants_utils_ferc1 AS +CREATE VIEW denorm_plants_utils_ferc1 AS SELECT * FROM plants_ferc1 INNER JOIN utilities_ferc1 USING(utility_id_ferc1); diff --git a/test/unit/io_managers_test.py b/test/unit/io_managers_test.py index 02869ed0fe..e0d50b672b 100644 --- a/test/unit/io_managers_test.py +++ b/test/unit/io_managers_test.py @@ -5,7 +5,12 @@ from sqlalchemy import Column, ForeignKey, Integer, MetaData, String, Table from sqlalchemy.exc import IntegrityError, OperationalError -from pudl.io_managers import ForeignKeyError, ForeignKeyErrors, SQLiteIOManager +from pudl.io_managers import ( + ForeignKeyError, + ForeignKeyErrors, + SQLiteIOManager, + pudl_sqlite_io_manager, +) @pytest.fixture @@ -167,3 +172,76 @@ def test_missing_schema_error(sqlite_io_manager_fixture): output_context = build_output_context(asset_key=AssetKey(asset_key)) with pytest.raises(RuntimeError): manager.handle_output(output_context, venue) + + +def test_exclude_tables_param(tmp_path): + """Test ability to exclude tables from being created in the database.""" + md = MetaData() + artist = Table( # noqa: F841 + "artist", + md, + Column("artistid", Integer, primary_key=True), + Column("artistname", String(16), nullable=False), + ) + track = Table( # noqa: F841 + "track", + md, + Column("trackid", Integer, primary_key=True), + Column("trackname", String(16), nullable=False), + Column("trackartist", Integer, ForeignKey("artist.artistid")), + ) + io_manager = SQLiteIOManager( + base_dir=tmp_path, db_name="pudl", md=md, exclude_tables=("track") + ) + with io_manager.engine.connect() as con: + table_names = pd.read_sql( + "SELECT name FROM sqlite_schema WHERE type ='table' AND name NOT LIKE 'sqlite_%';", + con, + ) + + # Make sure the track table is in the metadata object but not created in the database + assert "track" in io_manager.md.tables + assert "track" not in table_names["name"] + + +def test_view_exclusion() -> None: + io_manager = pudl_sqlite_io_manager(None) + with io_manager.engine.connect() as con: + table_names = pd.read_sql( + "SELECT name FROM sqlite_schema WHERE type ='table' AND name NOT LIKE 'sqlite_%';", + con, + ) + # Make sure the track table is in the metadata object but not created in the database + assert "denorm_plants_utils_ferc1" in io_manager.md.tables + assert "denorm_plants_utils_ferc1" not in table_names["name"] + + +def test_handle_output_view_raises_error_when_missing_metadata( + sqlite_io_manager_fixture, +): + """Make sure an error is thrown if we try to add a view that doesn't have + metadata.""" + io_manager = sqlite_io_manager_fixture + + asset_key = "artist_view" + artist_view = "CREATE VIEW artist_view AS SELECT * FROM artist;" + output_context = build_output_context(asset_key=AssetKey(asset_key)) + with pytest.raises(RuntimeError): + io_manager.handle_output(output_context, artist_view) + + +def test_load_input_view_raises_error_when_missing_metadata(sqlite_io_manager_fixture): + """Make sure an error is thrown if we try to read from a view that doesn't have + metadata.""" + io_manager = sqlite_io_manager_fixture + + asset_key = "artist_view" + artist_view = "CREATE VIEW artist_view AS SELECT * FROM artist;" + # Create view directly instead of using handle_output() + with io_manager.engine.connect() as con: + con.execute(artist_view) + + # Read the table back into pandas + input_context = build_input_context(asset_key=AssetKey(asset_key)) + with pytest.raises(RuntimeError): + io_manager.load_input(input_context) From 22a8a25345617e58d8fab4da066736ad7a2b96c1 Mon Sep 17 00:00:00 2001 From: bendnorman Date: Thu, 23 Mar 2023 15:53:26 -0800 Subject: [PATCH 03/10] Add example python denorm asset --- src/pudl/etl/denormalized_assets.py | 41 --------------------------- src/pudl/metadata/resources/eia860.py | 38 +++++++++++++++++++++++++ src/pudl/output/eia860.py | 37 ++++++++++++++++++++++++ src/pudl/output/pudltabl.py | 2 +- 4 files changed, 76 insertions(+), 42 deletions(-) delete mode 100644 src/pudl/etl/denormalized_assets.py diff --git a/src/pudl/etl/denormalized_assets.py b/src/pudl/etl/denormalized_assets.py deleted file mode 100644 index e7d9ac5bf9..0000000000 --- a/src/pudl/etl/denormalized_assets.py +++ /dev/null @@ -1,41 +0,0 @@ -"""Denormalized tables, based on other simple normalized tables. - -Right now these will mostly be tables defined in `pudl.output`. Ultimately it would -also include any tables built by joining and aggregating normalized and analysis -tables. These are probably data warehouse style tables with a lot of duplicated -information, ready for analysts to use. - -Initially we'll probably just wrap existing output functions, but these tables should -also be amenable to construction using SQL, database views, dbt, or other similar tools. -""" -from dagster import asset - -import pudl - -logger = pudl.logging_helpers.get_logger(__name__) - - -# TODO (bendnorman): We could also save the SQL code in sql files so we can -# take advantage of syntax highlighting and formatting. We could programatically create -# the view assets with a structure that contains the asset name and non_argument_deps. -@asset( - non_argument_deps={"utilities_entity_eia", "utilities_eia860", "utilities_eia"}, - io_manager_key="pudl_sqlite_io_manager", - compute_kind="SQL", -) -def utils_eia860() -> str: - """Create view of all fields from the EIA860 Utilities table.""" - query = """ - CREATE VIEW utils_eia860 AS - SELECT * - FROM ( - SELECT * - FROM utilities_entity_eia - LEFT JOIN utilities_eia860 USING (utility_id_eia) - ) - LEFT JOIN ( - SELECT utility_id_eia, - utility_id_pudl - FROM utilities_eia - ) USING (utility_id_eia);""" - return query diff --git a/src/pudl/metadata/resources/eia860.py b/src/pudl/metadata/resources/eia860.py index aea879a6cd..146b6cd527 100644 --- a/src/pudl/metadata/resources/eia860.py +++ b/src/pudl/metadata/resources/eia860.py @@ -415,6 +415,44 @@ "sources": ["eia860", "eia923"], "etl_group": "eia860", }, + "denorm_utilities_eia860": { + "description": ("Denoramlized table containing all eia860 utility attributes."), + "schema": { + "fields": [ + "utility_id_eia", + "utility_id_pudl", + "utility_name_eia", + "report_date", + "street_address", + "city", + "state", + "zip_code", + "plants_reported_owner", + "plants_reported_operator", + "plants_reported_asset_manager", + "plants_reported_other_relationship", + "entity_type", + "attention_line", + "address_2", + "zip_code_4", + "contact_firstname", + "contact_lastname", + "contact_title", + "phone_number", + "phone_extension", + "contact_firstname_2", + "contact_lastname_2", + "contact_title_2", + "phone_number_2", + "phone_extension_2", + "data_maturity", + ], + "primary_key": ["utility_id_eia", "report_date"], + }, + "field_namespace": "eia", + "sources": ["eia860", "eia923"], + "etl_group": "outputs", + }, } """EIA-860 resource attributes organized by PUDL identifier (``resource.name``). diff --git a/src/pudl/output/eia860.py b/src/pudl/output/eia860.py index 7a9cffdce1..20b31382f6 100644 --- a/src/pudl/output/eia860.py +++ b/src/pudl/output/eia860.py @@ -2,6 +2,7 @@ import pandas as pd import sqlalchemy as sa +from dagster import asset import pudl from pudl.metadata.fields import apply_pudl_dtypes @@ -11,6 +12,42 @@ logger = pudl.logging_helpers.get_logger(__name__) +@asset(io_manager_key="pudl_sqlite_io_manager", compute_kind="Python") +def denorm_utilities_eia860( + utilities_entity_eia: pd.DataFrame, + utilities_eia860: pd.DataFrame, + utilities_eia: pd.DataFrame, +): + """Pull all fields from the EIA860 Utilities table. + + Args: + utilities_entity_eia: EIA utility entity table. + utilities_eia860: EIA 860 annual utility table. + utilities_eia: Associations between EIA utilities and pudl utility IDs. + + Returns: + A DataFrame containing all the fields of the EIA 860 Utilities table. + """ + utilities_eia = utilities_eia[["utility_id_eia", "utility_id_pudl"]] + out_df = pd.merge( + utilities_entity_eia, utilities_eia860, how="left", on=["utility_id_eia"] + ) + out_df = pd.merge(out_df, utilities_eia, how="left", on=["utility_id_eia"]) + out_df = ( + out_df.assign(report_date=lambda x: pd.to_datetime(x.report_date)) + .dropna(subset=["report_date", "utility_id_eia"]) + .pipe(apply_pudl_dtypes, group="eia") + ) + first_cols = [ + "report_date", + "utility_id_eia", + "utility_id_pudl", + "utility_name_eia", + ] + out_df = pudl.helpers.organize_cols(out_df, first_cols) + return out_df + + def utilities_eia860(pudl_engine, start_date=None, end_date=None): """Pull all fields from the EIA860 Utilities table. diff --git a/src/pudl/output/pudltabl.py b/src/pudl/output/pudltabl.py index 5bca739168..10a92bba2c 100644 --- a/src/pudl/output/pudltabl.py +++ b/src/pudl/output/pudltabl.py @@ -155,7 +155,7 @@ def pu_ferc1(self, update=False): Returns: pandas.DataFrame: a denormalized table for interactive use. """ - return pd.read_sql("denorm_plants_utils_ferc1", self.pudl_engine).pipe( + return pd.read_sql_table("denorm_plants_utils_ferc1", self.pudl_engine).pipe( apply_pudl_dtypes, group="eia" ) From 2b75bda9484f9887fc0a931cb320ed37c93b4101 Mon Sep 17 00:00:00 2001 From: bendnorman Date: Fri, 24 Mar 2023 13:14:42 -0800 Subject: [PATCH 04/10] Add denorm_utilities_eia860 to PudlTabl, add sql_asset_factory test, flesh out output table conversion notebook --- devtools/output-table-conversion-test.ipynb | 287 --------------- ...python-output-table-conversion-debug.ipynb | 328 ++++++++++++++++++ src/pudl/output/eia860.py | 5 + src/pudl/output/helpers.py | 2 +- src/pudl/output/pudltabl.py | 20 +- test/unit/helpers_test.py | 8 + 6 files changed, 347 insertions(+), 303 deletions(-) delete mode 100644 devtools/output-table-conversion-test.ipynb create mode 100644 devtools/python-output-table-conversion-debug.ipynb diff --git a/devtools/output-table-conversion-test.ipynb b/devtools/output-table-conversion-test.ipynb deleted file mode 100644 index 6c909ac918..0000000000 --- a/devtools/output-table-conversion-test.ipynb +++ /dev/null @@ -1,287 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "f786051b-2aa0-44e0-bfd7-fe6827b6e1a3", - "metadata": {}, - "source": [ - "# Purpose\n", - "We are in the process of converting some functions in `pudl.output` to be SQL views. This notebook allows us to compare the outputs of the old python functions with the SQL view." - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "ec08c060-ba49-4466-81a0-315a45993928", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "import os\n", - "\n", - "assert os.environ.get(\"DAGSTER_HOME\"), (\n", - " \"The DAGSTER_HOME env var is not set so dagster won't be able to find the assets.\"\n", - " \"Set the DAGSTER_HOME env var in this notebook or kill the jupyter server and set\"\n", - " \" the DAGSTER_HOME env var in your terminal and relaunch jupyter.\"\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "5ce7f88e-c7b9-4963-a0e4-72ccbcb1f70e", - "metadata": { - "tags": [] - }, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "pudl_settings is being deprecated in favor of environment variables variables PUDL_OUTPUT and PUDL_INPUT. For more info see: https://catalystcoop-pudl.readthedocs.io/en/dev/dev/dev_setup.html\n", - "pudl_settings is being deprecated in favor of environment variables PUDL_OUTPUT and PUDL_INPUT. For more info see: https://catalystcoop-pudl.readthedocs.io/en/dev/dev/dev_setup.html\n", - "sqlite and parquet directories are no longer being used. Make sure there is a single directory named 'output' at the root of your workspace. For more info see: https://catalystcoop-pudl.readthedocs.io/en/dev/dev/dev_setup.html\n", - "pudl_settings is being deprecated in favor of environment variables variables PUDL_OUTPUT and PUDL_INPUT. For more info see: https://catalystcoop-pudl.readthedocs.io/en/dev/dev/dev_setup.html\n", - "pudl_settings is being deprecated in favor of environment variables PUDL_OUTPUT and PUDL_INPUT. For more info see: https://catalystcoop-pudl.readthedocs.io/en/dev/dev/dev_setup.html\n", - "sqlite and parquet directories are no longer being used. Make sure there is a single directory named 'output' at the root of your workspace. For more info see: https://catalystcoop-pudl.readthedocs.io/en/dev/dev/dev_setup.html\n" - ] - } - ], - "source": [ - "from pudl.workspace.setup import get_defaults\n", - "import sqlalchemy as sa\n", - "import pandas as pd\n", - "\n", - "# TODO: This should be replaced with get_defaults()\n", - "engine = sa.create_engine(f\"sqlite:///{os.environ['PUDL_OUTPUT']}pudl.sqlite\")" - ] - }, - { - "cell_type": "markdown", - "id": "d5f6b021-26cf-4804-929f-0e4975751bd1", - "metadata": {}, - "source": [ - "## Compare output of old python function with SQL view" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "5c705a39-0517-4c7b-a1b0-ca8a12daec53", - "metadata": { - "tags": [] - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "CPU times: user 34.1 ms, sys: 3.71 ms, total: 37.8 ms\n", - "Wall time: 53.8 ms\n" - ] - } - ], - "source": [ - "%%time\n", - "view_name = \"denorm_plants_utils_ferc1\"\n", - "\n", - "with engine.connect() as con:\n", - " view_df = pd.read_sql_table(view_name, con)" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "633999af-d881-424b-af8c-64fc837982cd", - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
utility_id_ferc1plant_name_ferc1plant_id_pudlutility_name_ferc1utility_id_pudl
0206*dolet hills (3)1Southwestern Electric Power Company301
1206*flint creek (1)2Southwestern Electric Power Company301
2206*pirkey (2)3Southwestern Electric Power Company301
315959th st gt-14Consolidated Edison Company of New York, Inc.79
415974th st gt 1&25Consolidated Edison Company of New York, Inc.79
\n", - "
" - ], - "text/plain": [ - " utility_id_ferc1 plant_name_ferc1 plant_id_pudl utility_name_ferc1 utility_id_pudl\n", - "0 206 *dolet hills (3) 1 Southwestern Electric Power Company 301\n", - "1 206 *flint creek (1) 2 Southwestern Electric Power Company 301\n", - "2 206 *pirkey (2) 3 Southwestern Electric Power Company 301\n", - "3 159 59th st gt-1 4 Consolidated Edison Company of New York, Inc. 79\n", - "4 159 74th st gt 1&2 5 Consolidated Edison Company of New York, Inc. 79" - ] - }, - "execution_count": 4, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "view_df.head()" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "af6253bb-e135-4068-8088-4e08c9914054", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2023-03-23 13:35:06 [ WARNING] catalystcoop.pudl.output.ferc1:56 pudl.output.ferc1.plants_utils_ferc1() will be deprecated in a future version of PUDL. In the future, call the PudlTabl.pu_ferc1() method or pull the plants_utils_ferc1 tabledirectly from the pudl.sqlite database.\n" - ] - } - ], - "source": [ - "# Import the old python functions\n", - "from pudl.output.ferc1 import plants_utils_ferc1\n", - "\n", - "old_output_func = plants_utils_ferc1\n", - "\n", - "old_df = old_output_func(engine)\n", - "\n", - "# Align pandas index and sort values using all of the columns\n", - "key = list(old_df.columns)\n", - "old_df = old_df.sort_values(by=key).reset_index(drop=True)\n", - "view_df = view_df.sort_values(by=key).reset_index(drop=True)\n", - "\n", - "pd.testing.assert_frame_equal(old_df, view_df)" - ] - }, - { - "cell_type": "markdown", - "id": "955efc1e-b21f-473e-b3f2-ab2fad89591d", - "metadata": {}, - "source": [ - "## Make sure we can load the view using the SQLite IO Manager" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "id": "a9d81a2b-9c68-443b-9a56-5563f4cbc920", - "metadata": { - "tags": [] - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n", - "RangeIndex: 5 entries, 0 to 4\n", - "Data columns (total 5 columns):\n", - " # Column Non-Null Count Dtype \n", - "--- ------ -------------- ----- \n", - " 0 utility_id_ferc1 5 non-null Int64 \n", - " 1 plant_name_ferc1 5 non-null string\n", - " 2 plant_id_pudl 5 non-null Int64 \n", - " 3 utility_name_ferc1 5 non-null string\n", - " 4 utility_id_pudl 5 non-null Int64 \n", - "dtypes: Int64(3), string(2)\n", - "memory usage: 343.0 bytes\n" - ] - } - ], - "source": [ - "from dagster import AssetKey\n", - "\n", - "from pudl.etl import defs\n", - "\n", - "df = defs.load_asset_value(AssetKey(view_name))\n", - "\n", - "df.head().info()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.9" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/devtools/python-output-table-conversion-debug.ipynb b/devtools/python-output-table-conversion-debug.ipynb new file mode 100644 index 0000000000..8cf29ec176 --- /dev/null +++ b/devtools/python-output-table-conversion-debug.ipynb @@ -0,0 +1,328 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "f786051b-2aa0-44e0-bfd7-fe6827b6e1a3", + "metadata": {}, + "source": [ + "# Purpose\n", + "We are in the process of converting some functions in `pudl.output` to be SQL views. This notebook allows us to compare the outputs of the old python functions with the SQL view." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "ec08c060-ba49-4466-81a0-315a45993928", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import os\n", + "\n", + "assert os.environ.get(\"DAGSTER_HOME\"), (\n", + " \"The DAGSTER_HOME env var is not set so dagster won't be able to find the assets.\"\n", + " \"Set the DAGSTER_HOME env var in this notebook or kill the jupyter server and set\"\n", + " \" the DAGSTER_HOME env var in your terminal and relaunch jupyter.\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "6fc58a8b-f879-4675-a5a1-0d2cf67da336", + "metadata": {}, + "source": [ + "## Step 1: Create a new asset\n", + "Create a new asset in the same module of the existing output table function. Most output tables are just denormalied versions of the normalized tables so to differentiate them, name the asset `\"denorm_{output_table_name}\"`. For example, if you are converting the `pudl.output.eia860.utilities_eia860()` function, name the asset `denorm_utilities_eia860`. **Don't delete the old oldput table function! We need it later on to test to new asset.**\n", + "\n", + "You can create an asset by creating a new function and adding the `@asset` decorator. For now, the only attribute you should add to the decorator is the `compute_type = \"Python\"`. All this does is add a cute tag to the asset in the dag to let people know how the asset is being processed.\n", + "\n", + "Next you'll want to figure out what tables the output table depends on. Read through the old output function to see which normalized tables or output functions are being used as inputs to the joins and imputations. Once you have the input table names, add them to the asset function parameters. For example, the `utilities_eia860()` function merges `utilities_entity_eia`, `utilities_eia860`, and `utilities_eia` tables together so the asset would look like this:\n", + "\n", + "```python\n", + "@asset(compute_kind=\"Python\")\n", + "def denorm_utilities_eia860(\n", + " utilities_entity_eia: pd.DataFrame,\n", + " utilities_eia860: pd.DataFrame,\n", + " utilities_eia: pd.DataFrame,\n", + "):\n", + " ... # joining logic\n", + " return joined_df\n", + "```\n", + "\n", + "Dagster will automatically place the `denorm_utilities_eia860` asset downstream of its input assets. **If the old output function depends on an output table function that hasn't been converted to an asset, you'll need to convert that function to an asset first**.\n", + "\n", + "Once the asset has been created and the joining logic is copied over, reload the asset definitions in dagit and materialize the new output table asset. If the asset is succesfully materialized, it won't be present in the database yet. If you don't specify an `io_manager_key` in the asset decorator, the default io manager is used which writes the dataframe to a pickle file in your `DAGSTER_HOME` directory." + ] + }, + { + "cell_type": "markdown", + "id": "97e3ddbd-882d-4224-b4d6-57ee141b7512", + "metadata": {}, + "source": [ + "## Step 2: Create the metadata\n", + "Like the normalized tables, we need to keep track of output table's metadata so the dtypes can be preserved as the table moves between pandas and storage, in this case SQLite. To get a list of field names, load the value of the asset you just created:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "48c79723-28b5-4091-9590-7dedfd23d7df", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "RangeIndex: 5 entries, 0 to 4\n", + "Data columns (total 27 columns):\n", + " # Column Non-Null Count Dtype \n", + "--- ------ -------------- ----- \n", + " 0 utility_id_eia 5 non-null Int64 \n", + " 1 utility_id_pudl 5 non-null Int64 \n", + " 2 utility_name_eia 5 non-null string \n", + " 3 report_date 5 non-null datetime64[ns]\n", + " 4 street_address 3 non-null string \n", + " 5 city 3 non-null string \n", + " 6 state 3 non-null string \n", + " 7 zip_code 3 non-null string \n", + " 8 plants_reported_owner 3 non-null boolean \n", + " 9 plants_reported_operator 0 non-null boolean \n", + " 10 plants_reported_asset_manager 0 non-null boolean \n", + " 11 plants_reported_other_relationship 0 non-null boolean \n", + " 12 entity_type 3 non-null string \n", + " 13 attention_line 0 non-null string \n", + " 14 address_2 0 non-null string \n", + " 15 zip_code_4 0 non-null string \n", + " 16 contact_firstname 0 non-null string \n", + " 17 contact_lastname 0 non-null string \n", + " 18 contact_title 0 non-null string \n", + " 19 phone_number 0 non-null string \n", + " 20 phone_extension 0 non-null string \n", + " 21 contact_firstname_2 0 non-null string \n", + " 22 contact_lastname_2 0 non-null string \n", + " 23 contact_title_2 0 non-null string \n", + " 24 phone_number_2 0 non-null string \n", + " 25 phone_extension_2 0 non-null string \n", + " 26 data_maturity 5 non-null string \n", + "dtypes: Int64(2), boolean(4), datetime64[ns](1), string(20)\n", + "memory usage: 1.1 KB\n" + ] + } + ], + "source": [ + "from dagster import AssetKey\n", + "\n", + "from pudl.etl import defs\n", + "\n", + "asset_name = \"denorm_utilities_eia860\"\n", + "df = defs.load_asset_value(AssetKey(asset_name))\n", + "df.head().info()" + ] + }, + { + "cell_type": "markdown", + "id": "90e64e9c-b59b-4a10-a5b1-0d9e901678f3", + "metadata": {}, + "source": [ + "Once you have the field names, find the appropriate module in `pudl.metadata.resources` to add the metadata too. The metadata of an output table should live in the module of the data source. For example, the `denorm_utilities_eia860` merges eia860 data together so the metadata should live in `pudl.metadata.resources.eia860`. Set `\"etl_group\"` of the resource to `\"outputs\"`.\n", + "\n", + "Most output tables just join existing fields together, but some add new fields. If the output table create a new field, you'll need to it to the `pudl.metadata.fields` module.\n", + "\n", + "Once the metadata is created, add `io_manager_key=\"pudl_sqlite_io_manager\"` to the asset decorator. This tells the asset to load the returned dataframe to the database instead of a pickle file. **Don't forget this step! If you don't change the `io_manager_key` the table will not be loaded to the database!** Example:\n", + "\n", + "```python\n", + "@asset(io_manager_key=\"pudl_sqlite_io_manager\", compute_kind=\"Python\")\n", + "def denorm_utilities_eia860(\n", + " utilities_entity_eia: pd.DataFrame,\n", + " utilities_eia860: pd.DataFrame,\n", + " utilities_eia: pd.DataFrame,\n", + "):\n", + " ... # joining logic\n", + " return joined_df\n", + "```\n", + "\n", + "Once the metadata is created, you'll need to delete your `pudl.sqlite` file so the next ETL run can create the new database schema. Then rematerialize all of the assets. If the database flags any data integrity errors in the output table, you can adjust the code in the output asset and just rematerialize the asset to test it out. If you need to update the table metadata, you'll need to delete the `pudl.sqlite` database and rematerialize all of the assets." + ] + }, + { + "cell_type": "markdown", + "id": "576fe615-5ba5-4474-8506-1e8da02d5a2e", + "metadata": {}, + "source": [ + "## Step 3: Test the output table\n", + "Once the output table is comfortably loaded into the database it is time to compare it to the old output function to make sure the data hasn't changed.\n", + "\n", + "Load the asset value from the database:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "e6e2147a-6bf0-4ebc-91b0-4429cc0af2ca", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "RangeIndex: 5 entries, 0 to 4\n", + "Data columns (total 27 columns):\n", + " # Column Non-Null Count Dtype \n", + "--- ------ -------------- ----- \n", + " 0 utility_id_eia 5 non-null Int64 \n", + " 1 utility_id_pudl 5 non-null Int64 \n", + " 2 utility_name_eia 5 non-null string \n", + " 3 report_date 5 non-null datetime64[ns]\n", + " 4 street_address 3 non-null string \n", + " 5 city 3 non-null string \n", + " 6 state 3 non-null string \n", + " 7 zip_code 3 non-null string \n", + " 8 plants_reported_owner 3 non-null boolean \n", + " 9 plants_reported_operator 0 non-null boolean \n", + " 10 plants_reported_asset_manager 0 non-null boolean \n", + " 11 plants_reported_other_relationship 0 non-null boolean \n", + " 12 entity_type 3 non-null string \n", + " 13 attention_line 0 non-null string \n", + " 14 address_2 0 non-null string \n", + " 15 zip_code_4 0 non-null string \n", + " 16 contact_firstname 0 non-null string \n", + " 17 contact_lastname 0 non-null string \n", + " 18 contact_title 0 non-null string \n", + " 19 phone_number 0 non-null string \n", + " 20 phone_extension 0 non-null string \n", + " 21 contact_firstname_2 0 non-null string \n", + " 22 contact_lastname_2 0 non-null string \n", + " 23 contact_title_2 0 non-null string \n", + " 24 phone_number_2 0 non-null string \n", + " 25 phone_extension_2 0 non-null string \n", + " 26 data_maturity 5 non-null string \n", + "dtypes: Int64(2), boolean(4), datetime64[ns](1), string(20)\n", + "memory usage: 1.1 KB\n" + ] + } + ], + "source": [ + "asset_name = \"denorm_utilities_eia860\"\n", + "new_df = defs.load_asset_value(AssetKey(asset_name))\n", + "new_df.head().info()" + ] + }, + { + "cell_type": "markdown", + "id": "0690ab78-7c07-4ea5-9e93-ce12b6ec0fa7", + "metadata": {}, + "source": [ + "Create the old output table by calling the old output function:" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "af6253bb-e135-4068-8088-4e08c9914054", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2023-03-24 13:05:04 [ WARNING] catalystcoop.pudl.output.eia860:68 pudl.output.eia860.utilities_eia860() will be deprecated in a future version of PUDL. In the future, call the PudlTabl.utils_eia860() method or pull the denorm_utilities_eia860 tabledirectly from the pudl.sqlite database.\n" + ] + } + ], + "source": [ + "# Import the old python functions\n", + "from pudl.output.eia860 import utilities_eia860\n", + "from pudl.io_managers import pudl_sqlite_io_manager\n", + "\n", + "old_output_func = utilities_eia860 # TODO: Replace this with a call to the old function\n", + "\n", + "engine = pudl_sqlite_io_manager(None).engine\n", + "old_df = old_output_func(engine)" + ] + }, + { + "cell_type": "markdown", + "id": "b31ba1a7-27fe-44c0-9fd4-8f33bc97f60d", + "metadata": {}, + "source": [ + "Align the dataframe columns and index then compare the dataframes:" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "26c20586-b001-4f8c-8c89-dca7bf426d72", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "# Make sure the columns are in the correct order\n", + "new_df = new_df.sort_index(axis=1)\n", + "old_df = old_df.sort_index(axis=1)\n", + "\n", + "# Align pandas index and sort values using all of the columns\n", + "key = list(old_df.columns)\n", + "old_df = old_df.sort_values(by=key).reset_index(drop=True)\n", + "new_df = new_df.sort_values(by=key).reset_index(drop=True)\n", + "\n", + "pd.testing.assert_frame_equal(old_df, new_df)" + ] + }, + { + "cell_type": "markdown", + "id": "c4092a8d-21be-4cdb-ae93-12579657f7d1", + "metadata": {}, + "source": [ + "# Step 4: Update the `PudlTabl` class\n", + "Wahoo! The output table asset has been created, added to the database and tested against the old function. Now you should:\n", + "1. Add a deprecation warning to the old output table function. We will remove these functions once all of the output tables have been converted to assets.\n", + "2. Find the `PudlTabl` method that calls the output table function and replace it with code that reads the output table from the database and corrects dtypes. For example:\n", + "\n", + "```python\n", + "def utils_eia860(self, update=False):\n", + " \"\"\"Pull a dataframe describing utilities reported in EIA 860.\n", + "\n", + " Returns:\n", + " pandas.DataFrame: a denormalized table for interactive use.\n", + " \"\"\"\n", + " return pd.read_sql(\"denorm_utilities_eia860\", self.pudl_engine).pipe(\n", + " apply_pudl_dtypes, group=\"eia\"\n", + " )\n", + "```\n", + "\n", + "All done!" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/pudl/output/eia860.py b/src/pudl/output/eia860.py index 20b31382f6..22117732b0 100644 --- a/src/pudl/output/eia860.py +++ b/src/pudl/output/eia860.py @@ -65,6 +65,11 @@ def utilities_eia860(pudl_engine, start_date=None, end_date=None): pandas.DataFrame: A DataFrame containing all the fields of the EIA 860 Utilities table. """ + logger.warning( + "pudl.output.eia860.utilities_eia860() will be deprecated in a future version of PUDL." + " In the future, call the PudlTabl.utils_eia860() method or pull the denorm_utilities_eia860 table" + "directly from the pudl.sqlite database." + ) pt = pudl.output.pudltabl.get_table_meta(pudl_engine) # grab the entity table utils_eia_tbl = pt["utilities_entity_eia"] diff --git a/src/pudl/output/helpers.py b/src/pudl/output/helpers.py index e04172588c..649a579fa4 100644 --- a/src/pudl/output/helpers.py +++ b/src/pudl/output/helpers.py @@ -6,7 +6,7 @@ def sql_asset_factory( name: str, - non_argument_deps: set[str], + non_argument_deps: set[str] = {}, io_manager_key: str = "pudl_sqlite_io_manager", compute_kind: str = "SQL", ) -> AssetsDefinition: diff --git a/src/pudl/output/pudltabl.py b/src/pudl/output/pudltabl.py index 10a92bba2c..b857e119cf 100644 --- a/src/pudl/output/pudltabl.py +++ b/src/pudl/output/pudltabl.py @@ -145,18 +145,14 @@ def pu_eia860(self, update=False): ) return self._dfs["pu_eia"] - def pu_ferc1(self, update=False): + def pu_ferc1(self): """Pull a dataframe of FERC plant-utility associations. - Args: - update (bool): If true, re-calculate the output dataframe, even if - a cached version exists. - Returns: pandas.DataFrame: a denormalized table for interactive use. """ return pd.read_sql_table("denorm_plants_utils_ferc1", self.pudl_engine).pipe( - apply_pudl_dtypes, group="eia" + apply_pudl_dtypes, group="ferc1" ) def advanced_metering_infrastructure_eia861(self) -> pd.DataFrame: @@ -365,18 +361,12 @@ def demand_hourly_pa_ferc714(self) -> pd.DataFrame: def utils_eia860(self, update=False): """Pull a dataframe describing utilities reported in EIA 860. - Args: - update (bool): If true, re-calculate the output dataframe, even if - a cached version exists. - Returns: pandas.DataFrame: a denormalized table for interactive use. """ - if update or self._dfs["utils_eia860"] is None: - self._dfs["utils_eia860"] = pudl.output.eia860.utilities_eia860( - self.pudl_engine, start_date=self.start_date, end_date=self.end_date - ) - return self._dfs["utils_eia860"] + return pd.read_sql("denorm_utilities_eia860", self.pudl_engine).pipe( + apply_pudl_dtypes, group="eia" + ) def bga_eia860(self, update=False): """Pull a dataframe of boiler-generator associations from EIA 860. diff --git a/test/unit/helpers_test.py b/test/unit/helpers_test.py index a373595285..b99c79fda2 100644 --- a/test/unit/helpers_test.py +++ b/test/unit/helpers_test.py @@ -18,6 +18,7 @@ remove_leading_zeros_from_numeric_strings, zero_pad_numeric_string, ) +from pudl.output.helpers import sql_asset_factory MONTHLY_GEN_FUEL = pd.DataFrame( { @@ -622,3 +623,10 @@ def test_cems_selection(): assert AssetKey("hourly_emissions_epacems") not in cems_selection.resolve( pudl.etl.default_assets ), "hourly_emissions_epacems or downstream asset present in selection." + + +def test_sql_asset_factory_missing_file(): + """Test sql_asset_factory throws a file not found error if file doesn't exist for an + asset name.""" + with pytest.raises(FileNotFoundError): + sql_asset_factory(name="fake_view")() From 5d7c95b2891726fd37258d684878f9a978542480 Mon Sep 17 00:00:00 2001 From: bendnorman Date: Mon, 3 Apr 2023 15:53:43 -0800 Subject: [PATCH 05/10] Include sql files in MANIFEST and some other clean up --- MANIFEST.in | 1 + src/pudl/metadata/classes.py | 1 - src/pudl/metadata/resources/ferc1.py | 2 +- src/pudl/output/ferc1.py | 2 +- src/pudl/output/helpers.py | 36 ------------------- src/pudl/output/pudltabl.py | 8 ++--- .../output/sql/denorm_plants_utils_ferc1.sql | 2 +- test/unit/helpers_test.py | 2 +- 8 files changed, 9 insertions(+), 45 deletions(-) delete mode 100644 src/pudl/output/helpers.py diff --git a/MANIFEST.in b/MANIFEST.in index 3d1b28d3b2..8c41bc3102 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,5 +1,6 @@ graft src/pudl/package_data graft src/pudl/metadata/templates +graft src/pudl/output/sql prune ci prune data diff --git a/src/pudl/metadata/classes.py b/src/pudl/metadata/classes.py index c3fabd1615..1a4a92235a 100644 --- a/src/pudl/metadata/classes.py +++ b/src/pudl/metadata/classes.py @@ -1194,7 +1194,6 @@ class Resource(Base): "ferc714", "glue", "outputs", - "output_views", "static_ferc1", "static_eia", "static_eia_disabled", diff --git a/src/pudl/metadata/resources/ferc1.py b/src/pudl/metadata/resources/ferc1.py index 2f0c9345a7..5407579fde 100644 --- a/src/pudl/metadata/resources/ferc1.py +++ b/src/pudl/metadata/resources/ferc1.py @@ -835,7 +835,7 @@ ], }, "field_namespace": "ferc1", - "etl_group": "output_views", + "etl_group": "outputs", "sources": ["ferc1"], "include_in_database": False, }, diff --git a/src/pudl/output/ferc1.py b/src/pudl/output/ferc1.py index c104f7e378..99d5725bf9 100644 --- a/src/pudl/output/ferc1.py +++ b/src/pudl/output/ferc1.py @@ -5,7 +5,7 @@ import pudl from pudl.metadata.fields import apply_pudl_dtypes -from pudl.output.helpers import sql_asset_factory +from pudl.output.sql.helpers import sql_asset_factory logger = pudl.logging_helpers.get_logger(__name__) diff --git a/src/pudl/output/helpers.py b/src/pudl/output/helpers.py deleted file mode 100644 index a4273f74b9..0000000000 --- a/src/pudl/output/helpers.py +++ /dev/null @@ -1,36 +0,0 @@ -"""Helper functions for creating output assets.""" -import importlib - -from dagster import AssetsDefinition, asset - - -def sql_asset_factory( - name: str, - non_argument_deps: set[str] = {}, - io_manager_key: str = "pudl_sqlite_io_manager", - compute_kind: str = "SQL", -) -> AssetsDefinition: - """Factory for creating assets that run SQL statements.""" - - @asset( - name=name, - non_argument_deps=non_argument_deps, - io_manager_key=io_manager_key, - compute_kind=compute_kind, - ) - def sql_view_asset() -> str: - """Asset that creates sql view in a database.""" - sql_path_traversable = importlib.resources.files("pudl.output.sql").joinpath( - f"{name}.sql" - ) - try: - with importlib.resources.as_file(sql_path_traversable) as sql_path: - with open(sql_path) as sql_file: - return sql_file.read() - # Raise a helpful error here if a sql file doesn't exist - except FileNotFoundError: - raise FileNotFoundError( - f"Could not find {sql_path}. Create a sql file in pudl.output.sql subpackage for {name} asset." - ) - - return sql_view_asset diff --git a/src/pudl/output/pudltabl.py b/src/pudl/output/pudltabl.py index 7137c4d358..1916887233 100644 --- a/src/pudl/output/pudltabl.py +++ b/src/pudl/output/pudltabl.py @@ -145,11 +145,11 @@ def pu_eia860(self, update=False): ) return self._dfs["pu_eia"] - def pu_ferc1(self): + def pu_ferc1(self) -> pd.DataFrame: """Pull a dataframe of FERC plant-utility associations. Returns: - pandas.DataFrame: a denormalized table for interactive use. + A denormalized table for interactive use. """ return pd.read_sql_table("denorm_plants_utils_ferc1", self.pudl_engine).pipe( apply_pudl_dtypes, group="ferc1" @@ -358,11 +358,11 @@ def demand_hourly_pa_ferc714(self) -> pd.DataFrame: ########################################################################### # EIA 860/923 OUTPUTS ########################################################################### - def utils_eia860(self, update=False): + def utils_eia860(self, update=False) -> pd.DataFrame: """Pull a dataframe describing utilities reported in EIA 860. Returns: - pandas.DataFrame: a denormalized table for interactive use. + A denormalized table for interactive use. """ return pd.read_sql("denorm_utilities_eia860", self.pudl_engine).pipe( apply_pudl_dtypes, group="eia" diff --git a/src/pudl/output/sql/denorm_plants_utils_ferc1.sql b/src/pudl/output/sql/denorm_plants_utils_ferc1.sql index 79706292ec..c72d279c0b 100644 --- a/src/pudl/output/sql/denorm_plants_utils_ferc1.sql +++ b/src/pudl/output/sql/denorm_plants_utils_ferc1.sql @@ -1,4 +1,4 @@ --- Build a dataframe of useful FERC Plant & Utility information. +-- Build a view of useful FERC Plant & Utility information. CREATE VIEW denorm_plants_utils_ferc1 AS SELECT * FROM plants_ferc1 diff --git a/test/unit/helpers_test.py b/test/unit/helpers_test.py index 766f0d481f..610c137798 100644 --- a/test/unit/helpers_test.py +++ b/test/unit/helpers_test.py @@ -22,7 +22,7 @@ remove_leading_zeros_from_numeric_strings, zero_pad_numeric_string, ) -from pudl.output.helpers import sql_asset_factory +from pudl.output.sql.helpers import sql_asset_factory MONTHLY_GEN_FUEL = pd.DataFrame( { From b27a95a8bbd26168f6ba3249efebf21ab65b7ba2 Mon Sep 17 00:00:00 2001 From: bendnorman Date: Tue, 4 Apr 2023 08:43:46 -0800 Subject: [PATCH 06/10] Create new modules for denorm assets --- src/pudl/etl/__init__.py | 4 ++-- src/pudl/output/__init__.py | 2 ++ src/pudl/output/denorm_eia.py | 42 +++++++++++++++++++++++++++++++++ src/pudl/output/denorm_ferc.py | 6 +++++ src/pudl/output/eia860.py | 37 ----------------------------- src/pudl/output/ferc1.py | 6 ----- src/pudl/output/sql/__init__.py | 1 + src/pudl/output/sql/helpers.py | 35 +++++++++++++++++++++++++++ 8 files changed, 88 insertions(+), 45 deletions(-) create mode 100644 src/pudl/output/denorm_eia.py create mode 100644 src/pudl/output/denorm_ferc.py create mode 100644 src/pudl/output/sql/__init__.py create mode 100644 src/pudl/output/sql/helpers.py diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index 6820ee0a26..2027017cb9 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -8,7 +8,6 @@ Definitions, define_asset_job, load_assets_from_modules, - load_assets_from_package_module, ) import pudl @@ -46,7 +45,8 @@ *load_assets_from_modules([pudl.transform.ferc714], group_name="clean_ferc714"), *load_assets_from_modules([glue_assets], group_name="glue"), *load_assets_from_modules([static_assets], group_name="static"), - *load_assets_from_package_module(pudl.output, group_name="outputs"), + *load_assets_from_modules([pudl.output.denorm_eia], group_name="denorm_eia"), + *load_assets_from_modules([pudl.output.denorm_ferc], group_name="denorm_ferc"), ) default_resources = { diff --git a/src/pudl/output/__init__.py b/src/pudl/output/__init__.py index defdbc85df..222ddbfaed 100644 --- a/src/pudl/output/__init__.py +++ b/src/pudl/output/__init__.py @@ -12,6 +12,8 @@ """ from . import ( # noqa: F401 censusdp1tract, + denorm_eia, + denorm_ferc, eia860, eia923, epacems, diff --git a/src/pudl/output/denorm_eia.py b/src/pudl/output/denorm_eia.py new file mode 100644 index 0000000000..9762023a9b --- /dev/null +++ b/src/pudl/output/denorm_eia.py @@ -0,0 +1,42 @@ +"""A collection of denormalized EIA assets.""" +import pandas as pd +from dagster import asset + +import pudl +from pudl.metadata.fields import apply_pudl_dtypes + + +@asset(io_manager_key="pudl_sqlite_io_manager", compute_kind="Python") +def denorm_utilities_eia860( + utilities_entity_eia: pd.DataFrame, + utilities_eia860: pd.DataFrame, + utilities_eia: pd.DataFrame, +): + """Pull all fields from the EIA860 Utilities table. + + Args: + utilities_entity_eia: EIA utility entity table. + utilities_eia860: EIA 860 annual utility table. + utilities_eia: Associations between EIA utilities and pudl utility IDs. + + Returns: + A DataFrame containing all the fields of the EIA 860 Utilities table. + """ + utilities_eia = utilities_eia[["utility_id_eia", "utility_id_pudl"]] + out_df = pd.merge( + utilities_entity_eia, utilities_eia860, how="left", on=["utility_id_eia"] + ) + out_df = pd.merge(out_df, utilities_eia, how="left", on=["utility_id_eia"]) + out_df = ( + out_df.assign(report_date=lambda x: pd.to_datetime(x.report_date)) + .dropna(subset=["report_date", "utility_id_eia"]) + .pipe(apply_pudl_dtypes, group="eia") + ) + first_cols = [ + "report_date", + "utility_id_eia", + "utility_id_pudl", + "utility_name_eia", + ] + out_df = pudl.helpers.organize_cols(out_df, first_cols) + return out_df diff --git a/src/pudl/output/denorm_ferc.py b/src/pudl/output/denorm_ferc.py new file mode 100644 index 0000000000..56be5a9167 --- /dev/null +++ b/src/pudl/output/denorm_ferc.py @@ -0,0 +1,6 @@ +"""A collection of denormalized FERC assets.""" +from pudl.output.sql.helpers import sql_asset_factory + +denorm_plants_utils_ferc1_asset = sql_asset_factory( + "denorm_plants_utils_ferc1", {"plants_ferc1", "utilities_ferc1"} +) diff --git a/src/pudl/output/eia860.py b/src/pudl/output/eia860.py index 22117732b0..04a4cb7636 100644 --- a/src/pudl/output/eia860.py +++ b/src/pudl/output/eia860.py @@ -2,7 +2,6 @@ import pandas as pd import sqlalchemy as sa -from dagster import asset import pudl from pudl.metadata.fields import apply_pudl_dtypes @@ -12,42 +11,6 @@ logger = pudl.logging_helpers.get_logger(__name__) -@asset(io_manager_key="pudl_sqlite_io_manager", compute_kind="Python") -def denorm_utilities_eia860( - utilities_entity_eia: pd.DataFrame, - utilities_eia860: pd.DataFrame, - utilities_eia: pd.DataFrame, -): - """Pull all fields from the EIA860 Utilities table. - - Args: - utilities_entity_eia: EIA utility entity table. - utilities_eia860: EIA 860 annual utility table. - utilities_eia: Associations between EIA utilities and pudl utility IDs. - - Returns: - A DataFrame containing all the fields of the EIA 860 Utilities table. - """ - utilities_eia = utilities_eia[["utility_id_eia", "utility_id_pudl"]] - out_df = pd.merge( - utilities_entity_eia, utilities_eia860, how="left", on=["utility_id_eia"] - ) - out_df = pd.merge(out_df, utilities_eia, how="left", on=["utility_id_eia"]) - out_df = ( - out_df.assign(report_date=lambda x: pd.to_datetime(x.report_date)) - .dropna(subset=["report_date", "utility_id_eia"]) - .pipe(apply_pudl_dtypes, group="eia") - ) - first_cols = [ - "report_date", - "utility_id_eia", - "utility_id_pudl", - "utility_name_eia", - ] - out_df = pudl.helpers.organize_cols(out_df, first_cols) - return out_df - - def utilities_eia860(pudl_engine, start_date=None, end_date=None): """Pull all fields from the EIA860 Utilities table. diff --git a/src/pudl/output/ferc1.py b/src/pudl/output/ferc1.py index 99d5725bf9..0ba963b362 100644 --- a/src/pudl/output/ferc1.py +++ b/src/pudl/output/ferc1.py @@ -5,7 +5,6 @@ import pudl from pudl.metadata.fields import apply_pudl_dtypes -from pudl.output.sql.helpers import sql_asset_factory logger = pudl.logging_helpers.get_logger(__name__) @@ -37,11 +36,6 @@ def read_table_with_start_end_dates( return pd.read_sql(table_select, pudl_engine) -denorm_plants_utils_ferc1_asset = sql_asset_factory( - "denorm_plants_utils_ferc1", {"plants_ferc1", "utilities_ferc1"} -) - - def plants_utils_ferc1(pudl_engine): """Build a dataframe of useful FERC Plant & Utility information. diff --git a/src/pudl/output/sql/__init__.py b/src/pudl/output/sql/__init__.py new file mode 100644 index 0000000000..92c4de178d --- /dev/null +++ b/src/pudl/output/sql/__init__.py @@ -0,0 +1 @@ +"""A module of python helper functions and sql files for creating SQL views.""" diff --git a/src/pudl/output/sql/helpers.py b/src/pudl/output/sql/helpers.py new file mode 100644 index 0000000000..f398ad1bd2 --- /dev/null +++ b/src/pudl/output/sql/helpers.py @@ -0,0 +1,35 @@ +"""Helper functions for creating output assets.""" +import importlib + +from dagster import AssetsDefinition, asset + + +def sql_asset_factory( + name: str, + non_argument_deps: set[str] = {}, + io_manager_key: str = "pudl_sqlite_io_manager", + compute_kind: str = "SQL", +) -> AssetsDefinition: + """Factory for creating assets that run SQL statements.""" + + @asset( + name=name, + non_argument_deps=non_argument_deps, + io_manager_key=io_manager_key, + compute_kind=compute_kind, + ) + def sql_view_asset() -> str: + """Asset that creates sql view in a database.""" + sql_path_traversable = importlib.resources.files("pudl.output.sql").joinpath( + f"{name}.sql" + ) + try: + with importlib.resources.as_file(sql_path_traversable) as sql_path: + return sql_path.read_text() + # Raise a helpful error here if a sql file doesn't exist + except FileNotFoundError: + raise FileNotFoundError( + f"Could not find {sql_path}. Create a sql file in pudl.output.sql subpackage for {name} asset." + ) + + return sql_view_asset From 474961178b301dbfd4b6f97bc0d1dd8549c13e56 Mon Sep 17 00:00:00 2001 From: bendnorman Date: Tue, 4 Apr 2023 08:51:47 -0800 Subject: [PATCH 07/10] Drop 860 from denorm asset name --- src/pudl/metadata/resources/eia860.py | 4 ++-- src/pudl/output/denorm_eia.py | 4 ++-- src/pudl/output/eia860.py | 2 +- src/pudl/output/pudltabl.py | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/pudl/metadata/resources/eia860.py b/src/pudl/metadata/resources/eia860.py index 146b6cd527..03e37104e9 100644 --- a/src/pudl/metadata/resources/eia860.py +++ b/src/pudl/metadata/resources/eia860.py @@ -415,8 +415,8 @@ "sources": ["eia860", "eia923"], "etl_group": "eia860", }, - "denorm_utilities_eia860": { - "description": ("Denoramlized table containing all eia860 utility attributes."), + "denorm_utilities_eia": { + "description": ("Denoramlized table containing all EIA utility attributes."), "schema": { "fields": [ "utility_id_eia", diff --git a/src/pudl/output/denorm_eia.py b/src/pudl/output/denorm_eia.py index 9762023a9b..74a95e8ff7 100644 --- a/src/pudl/output/denorm_eia.py +++ b/src/pudl/output/denorm_eia.py @@ -7,12 +7,12 @@ @asset(io_manager_key="pudl_sqlite_io_manager", compute_kind="Python") -def denorm_utilities_eia860( +def denorm_utilities_eia( utilities_entity_eia: pd.DataFrame, utilities_eia860: pd.DataFrame, utilities_eia: pd.DataFrame, ): - """Pull all fields from the EIA860 Utilities table. + """Pull all fields from the EIA Utilities table. Args: utilities_entity_eia: EIA utility entity table. diff --git a/src/pudl/output/eia860.py b/src/pudl/output/eia860.py index 04a4cb7636..2d2a2fe6dd 100644 --- a/src/pudl/output/eia860.py +++ b/src/pudl/output/eia860.py @@ -30,7 +30,7 @@ def utilities_eia860(pudl_engine, start_date=None, end_date=None): """ logger.warning( "pudl.output.eia860.utilities_eia860() will be deprecated in a future version of PUDL." - " In the future, call the PudlTabl.utils_eia860() method or pull the denorm_utilities_eia860 table" + " In the future, call the PudlTabl.utils_eia860() method or pull the denorm_utilities_eia table" "directly from the pudl.sqlite database." ) pt = pudl.output.pudltabl.get_table_meta(pudl_engine) diff --git a/src/pudl/output/pudltabl.py b/src/pudl/output/pudltabl.py index 1916887233..336ab19dbe 100644 --- a/src/pudl/output/pudltabl.py +++ b/src/pudl/output/pudltabl.py @@ -359,12 +359,12 @@ def demand_hourly_pa_ferc714(self) -> pd.DataFrame: # EIA 860/923 OUTPUTS ########################################################################### def utils_eia860(self, update=False) -> pd.DataFrame: - """Pull a dataframe describing utilities reported in EIA 860. + """Pull a dataframe describing utilities reported in EIA. Returns: A denormalized table for interactive use. """ - return pd.read_sql("denorm_utilities_eia860", self.pudl_engine).pipe( + return pd.read_sql("denorm_utilities_eia", self.pudl_engine).pipe( apply_pudl_dtypes, group="eia" ) From 1c02e32e298c60878eb22fa3aa306dfb55101678 Mon Sep 17 00:00:00 2001 From: bendnorman Date: Tue, 4 Apr 2023 10:31:38 -0800 Subject: [PATCH 08/10] Rename ferc to ferc1 --- src/pudl/etl/__init__.py | 2 +- src/pudl/output/__init__.py | 2 +- src/pudl/output/{denorm_ferc.py => denorm_ferc1.py} | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename src/pudl/output/{denorm_ferc.py => denorm_ferc1.py} (100%) diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index 2027017cb9..d9803bf65e 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -46,7 +46,7 @@ *load_assets_from_modules([glue_assets], group_name="glue"), *load_assets_from_modules([static_assets], group_name="static"), *load_assets_from_modules([pudl.output.denorm_eia], group_name="denorm_eia"), - *load_assets_from_modules([pudl.output.denorm_ferc], group_name="denorm_ferc"), + *load_assets_from_modules([pudl.output.denorm_ferc1], group_name="denorm_ferc1"), ) default_resources = { diff --git a/src/pudl/output/__init__.py b/src/pudl/output/__init__.py index 222ddbfaed..9e235d982e 100644 --- a/src/pudl/output/__init__.py +++ b/src/pudl/output/__init__.py @@ -13,7 +13,7 @@ from . import ( # noqa: F401 censusdp1tract, denorm_eia, - denorm_ferc, + denorm_ferc1, eia860, eia923, epacems, diff --git a/src/pudl/output/denorm_ferc.py b/src/pudl/output/denorm_ferc1.py similarity index 100% rename from src/pudl/output/denorm_ferc.py rename to src/pudl/output/denorm_ferc1.py From bea1e62e8aeeae8bcaba9e44712f3df9c6a9e8d0 Mon Sep 17 00:00:00 2001 From: bendnorman Date: Wed, 5 Apr 2023 16:38:40 -0800 Subject: [PATCH 09/10] Make table name and include_in_database attribute more verbose --- src/pudl/metadata/classes.py | 4 ++-- src/pudl/metadata/resources/eia.py | 4 ++-- src/pudl/metadata/resources/epacems.py | 2 +- src/pudl/metadata/resources/ferc1.py | 6 +++--- src/pudl/output/denorm_ferc1.py | 4 ++-- src/pudl/output/pudltabl.py | 6 +++--- src/pudl/output/sql/denorm_plants_utils_ferc1.sql | 2 +- test/unit/io_managers_test.py | 2 +- 8 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/pudl/metadata/classes.py b/src/pudl/metadata/classes.py index 1a4a92235a..922561f284 100644 --- a/src/pudl/metadata/classes.py +++ b/src/pudl/metadata/classes.py @@ -1200,7 +1200,7 @@ class Resource(Base): "eia_bulk_elec", "static_pudl", ] = None - include_in_database: bool = True + create_database_schema: bool = True _check_unique = _validator( "contributors", "keywords", "licenses", "sources", fn=_check_unique @@ -1870,7 +1870,7 @@ def to_sql( """Return equivalent SQL MetaData.""" metadata = sa.MetaData() for resource in self.resources: - if resource.include_in_database: + if resource.create_database_schema: _ = resource.to_sql( metadata, check_types=check_types, diff --git a/src/pudl/metadata/resources/eia.py b/src/pudl/metadata/resources/eia.py index afa09e6eca..da11bb18eb 100644 --- a/src/pudl/metadata/resources/eia.py +++ b/src/pudl/metadata/resources/eia.py @@ -432,7 +432,7 @@ "sources": ["eia861"], "etl_group": "static_eia_disabled", # currently not being loaded into the db "field_namespace": "eia", - "include_in_database": False, + "create_database_schema": False, }, "fuel_transportation_modes_eia": { "description": "Long descriptions of the fuel transportation modes reported in the EIA-860 and EIA-923.", @@ -603,7 +603,7 @@ "sources": ["eia860", "eia923"], "etl_group": "outputs", "field_namespace": "ppe", - "include_in_database": False, + "create_database_schema": False, }, "prime_movers_eia": { "description": "Long descriptions explaining the short prime mover codes reported in the EIA-860 and EIA-923.", diff --git a/src/pudl/metadata/resources/epacems.py b/src/pudl/metadata/resources/epacems.py index f442a9cc67..6057c0c59e 100644 --- a/src/pudl/metadata/resources/epacems.py +++ b/src/pudl/metadata/resources/epacems.py @@ -32,7 +32,7 @@ "sources": ["eia860", "epacems"], "field_namespace": "epacems", "etl_group": "epacems", - "include_in_database": False, + "create_database_schema": False, }, } """EPA CEMS resource attributes by PUDL identifier (``resource.name``). diff --git a/src/pudl/metadata/resources/ferc1.py b/src/pudl/metadata/resources/ferc1.py index 5407579fde..13c48e724f 100644 --- a/src/pudl/metadata/resources/ferc1.py +++ b/src/pudl/metadata/resources/ferc1.py @@ -794,7 +794,7 @@ "sources": ["ferc1"], "etl_group": "ferc1_disabled", "field_namespace": "ferc1", - "include_in_database": False, + "create_database_schema": False, }, "electric_operating_revenues_ferc1": { "description": ( @@ -823,7 +823,7 @@ "etl_group": "ferc1", "field_namespace": "ferc1", }, - "denorm_plants_utils_ferc1": { + "denorm_plants_utilities_ferc1": { "description": "Denormalized table that contains FERC plant and utility information.", "schema": { "fields": [ @@ -837,7 +837,7 @@ "field_namespace": "ferc1", "etl_group": "outputs", "sources": ["ferc1"], - "include_in_database": False, + "create_database_schema": False, }, } """FERC Form 1 resource attributes by PUDL identifier (``resource.name``). diff --git a/src/pudl/output/denorm_ferc1.py b/src/pudl/output/denorm_ferc1.py index 56be5a9167..c5c7e55430 100644 --- a/src/pudl/output/denorm_ferc1.py +++ b/src/pudl/output/denorm_ferc1.py @@ -1,6 +1,6 @@ """A collection of denormalized FERC assets.""" from pudl.output.sql.helpers import sql_asset_factory -denorm_plants_utils_ferc1_asset = sql_asset_factory( - "denorm_plants_utils_ferc1", {"plants_ferc1", "utilities_ferc1"} +denorm_plants_utilities_ferc1_asset = sql_asset_factory( + "denorm_plants_utilities_ferc1", {"plants_ferc1", "utilities_ferc1"} ) diff --git a/src/pudl/output/pudltabl.py b/src/pudl/output/pudltabl.py index 336ab19dbe..07c98656a3 100644 --- a/src/pudl/output/pudltabl.py +++ b/src/pudl/output/pudltabl.py @@ -151,9 +151,9 @@ def pu_ferc1(self) -> pd.DataFrame: Returns: A denormalized table for interactive use. """ - return pd.read_sql_table("denorm_plants_utils_ferc1", self.pudl_engine).pipe( - apply_pudl_dtypes, group="ferc1" - ) + return pd.read_sql_table( + "denorm_plants_utilities_ferc1", self.pudl_engine + ).pipe(apply_pudl_dtypes, group="ferc1") def advanced_metering_infrastructure_eia861(self) -> pd.DataFrame: """An interim EIA 861 output function.""" diff --git a/src/pudl/output/sql/denorm_plants_utils_ferc1.sql b/src/pudl/output/sql/denorm_plants_utils_ferc1.sql index c72d279c0b..f939e3202f 100644 --- a/src/pudl/output/sql/denorm_plants_utils_ferc1.sql +++ b/src/pudl/output/sql/denorm_plants_utils_ferc1.sql @@ -1,5 +1,5 @@ -- Build a view of useful FERC Plant & Utility information. -CREATE VIEW denorm_plants_utils_ferc1 AS +CREATE VIEW denorm_plants_utilities_ferc1 AS SELECT * FROM plants_ferc1 INNER JOIN utilities_ferc1 USING(utility_id_ferc1); diff --git a/test/unit/io_managers_test.py b/test/unit/io_managers_test.py index eae7f4c64f..e72bc34537 100644 --- a/test/unit/io_managers_test.py +++ b/test/unit/io_managers_test.py @@ -29,7 +29,7 @@ def test_pkg() -> Package: ] schema = {"fields": fields, "primary_key": ["artistid"]} view_resource = Resource( - name="artist_view", schema=schema, include_in_database=False + name="artist_view", schema=schema, create_database_schema=False ) fields = [ From 236404097cf21e3cfe43a072e92492d8d6c5218b Mon Sep 17 00:00:00 2001 From: bendnorman Date: Wed, 5 Apr 2023 19:06:49 -0800 Subject: [PATCH 10/10] Update denorm_plants_utilities_ferc1.sql file name --- ...m_plants_utils_ferc1.sql => denorm_plants_utilities_ferc1.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/pudl/output/sql/{denorm_plants_utils_ferc1.sql => denorm_plants_utilities_ferc1.sql} (100%) diff --git a/src/pudl/output/sql/denorm_plants_utils_ferc1.sql b/src/pudl/output/sql/denorm_plants_utilities_ferc1.sql similarity index 100% rename from src/pudl/output/sql/denorm_plants_utils_ferc1.sql rename to src/pudl/output/sql/denorm_plants_utilities_ferc1.sql