From 661ff1782256b962e8f99edf728f15445318cd53 Mon Sep 17 00:00:00 2001 From: zschira Date: Wed, 29 Mar 2023 16:43:51 -0400 Subject: [PATCH 01/15] Convert cems to use dagster partitions --- src/pudl/etl/__init__.py | 3 +- src/pudl/etl/epacems_assets.py | 57 ++++++++++++++++++---------------- src/pudl/resources.py | 26 ++++++++++++++++ 3 files changed, 58 insertions(+), 28 deletions(-) diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index dd2e4e9f4b..01b0e43a62 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -17,7 +17,7 @@ ferc1_xbrl_sqlite_io_manager, pudl_sqlite_io_manager, ) -from pudl.resources import dataset_settings, datastore, ferc_to_sqlite_settings +from pudl.resources import dataset_settings, datastore, ferc_to_sqlite_settings, pq_writer from pudl.settings import EtlSettings from . import ( # noqa: F401 @@ -49,6 +49,7 @@ default_resources = { "datastore": datastore, + "pq_writer": pq_writer, "pudl_sqlite_io_manager": pudl_sqlite_io_manager, "ferc1_dbf_sqlite_io_manager": ferc1_dbf_sqlite_io_manager, "ferc1_xbrl_sqlite_io_manager": ferc1_xbrl_sqlite_io_manager, diff --git a/src/pudl/etl/epacems_assets.py b/src/pudl/etl/epacems_assets.py index a3a8d93fb7..5b40a556cf 100644 --- a/src/pudl/etl/epacems_assets.py +++ b/src/pudl/etl/epacems_assets.py @@ -4,17 +4,18 @@ import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -from dagster import Field, asset +from dagster import Field, asset, MultiPartitionsDefinition, StaticPartitionsDefinition import pudl from pudl.helpers import EnvVar from pudl.metadata.classes import Resource +from pudl.settings import EpaCemsSettings logger = pudl.logging_helpers.get_logger(__name__) @asset( - required_resource_keys={"datastore", "dataset_settings"}, + required_resource_keys={"datastore", "dataset_settings", "pq_writer"}, config_schema={ "pudl_output_path": Field( EnvVar( @@ -29,6 +30,14 @@ default_value=False, ), }, + partitions_def=MultiPartitionsDefinition( + { + "years": StaticPartitionsDefinition( + [str(year) for year in EpaCemsSettings().years] + ), + "states": StaticPartitionsDefinition(EpaCemsSettings().states), + }, + ), ) def hourly_emissions_epacems( context, epacamd_eia: pd.DataFrame, plants_entity_eia: pd.DataFrame @@ -46,37 +55,31 @@ def hourly_emissions_epacems( plants_entity_eia: The EIA Plant entities used for aligning timezones. """ ds = context.resources.datastore - epacems_settings = context.resources.dataset_settings.epacems schema = Resource.from_id("hourly_emissions_epacems").to_pyarrow() partitioned_path = ( Path(context.op_config["pudl_output_path"]) / "hourly_emissions_epacems" ) partitioned_path.mkdir(exist_ok=True) - monolithic_path = ( - Path(context.op_config["pudl_output_path"]) / "hourly_emissions_epacems.parquet" - ) + monolithic_writer = context.resources.pq_writer - with pq.ParquetWriter( - where=monolithic_path, schema=schema, compression="snappy", version="2.6" - ) as monolithic_writer: - for part in epacems_settings.partitions: - year = part["year"] - state = part["state"] - logger.info(f"Processing EPA CEMS hourly data for {year}-{state}") - df = pudl.extract.epacems.extract(year=year, state=state, ds=ds) - df = pudl.transform.epacems.transform(df, epacamd_eia, plants_entity_eia) - table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) + partitions = context.partition_key.keys_by_dimension.split("|") + year = int(partitions[0]) + state = partitions[1] + logger.info(f"Processing EPA CEMS hourly data for {year}-{state}") + df = pudl.extract.epacems.extract(year=year, state=state, ds=ds) + df = pudl.transform.epacems.transform(df, epacamd_eia, plants_entity_eia) + table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) - # Write to one monolithic parquet file - monolithic_writer.write_table(table) + # Write to one monolithic parquet file + monolithic_writer.write_table(table) - if context.op_config["partition"]: - # Write to a directory of partitioned parquet files - with pq.ParquetWriter( - where=partitioned_path / f"epacems-{year}-{state}.parquet", - schema=schema, - compression="snappy", - version="2.6", - ) as partitioned_writer: - partitioned_writer.write_table(table) + if context.op_config["partition"]: + # Write to a directory of partitioned parquet files + with pq.ParquetWriter( + where=partitioned_path / f"epacems-{year}-{state}.parquet", + schema=schema, + compression="snappy", + version="2.6", + ) as partitioned_writer: + partitioned_writer.write_table(table) diff --git a/src/pudl/resources.py b/src/pudl/resources.py index 9597287d72..1a2d6e106f 100644 --- a/src/pudl/resources.py +++ b/src/pudl/resources.py @@ -1,9 +1,13 @@ """Collection of Dagster resources for PUDL.""" from dagster import Field, resource +from pathlib import Path + +import pyarrow.parquet as pq from pudl.helpers import EnvVar from pudl.settings import DatasetsSettings, FercToSqliteSettings, create_dagster_config from pudl.workspace.datastore import Datastore +from pudl.metadata.classes import Resource @resource(config_schema=create_dagster_config(DatasetsSettings())) @@ -26,6 +30,28 @@ def ferc_to_sqlite_settings(init_context) -> FercToSqliteSettings: return FercToSqliteSettings(**init_context.resource_config) +@resource( + config_schema={ + "pudl_output_path": Field( + EnvVar( + env_var="PUDL_OUTPUT", + ), + description="Path of directory to store the database in.", + default_value=None, + ), + } +) +def pq_writer(init_context) -> pq.ParquetWriter: + """Get monolithic parquet writer.""" + monolithic_path = Path(init_context.resource_config["pudl_output_path"]) / "hourly_emissions_epacems.parquet" + schema = Resource.from_id("hourly_emissions_epacems").to_pyarrow() + + with pq.ParquetWriter( + where=monolithic_path, schema=schema, compression="snappy", version="2.6" + ) as monolithic_writer: + yield monolithic_writer + + @resource( config_schema={ "local_cache_path": Field( From 6e4e63e17fc441fae35cfd2a198868696804a707 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 30 Mar 2023 16:15:45 +0000 Subject: [PATCH 02/15] [pre-commit.ci] auto fixes from pre-commit.com hooks For more information, see https://pre-commit.ci --- src/pudl/etl/__init__.py | 7 ++++++- src/pudl/etl/epacems_assets.py | 2 +- src/pudl/resources.py | 9 ++++++--- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index 01b0e43a62..905958fcc6 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -17,7 +17,12 @@ ferc1_xbrl_sqlite_io_manager, pudl_sqlite_io_manager, ) -from pudl.resources import dataset_settings, datastore, ferc_to_sqlite_settings, pq_writer +from pudl.resources import ( + dataset_settings, + datastore, + ferc_to_sqlite_settings, + pq_writer, +) from pudl.settings import EtlSettings from . import ( # noqa: F401 diff --git a/src/pudl/etl/epacems_assets.py b/src/pudl/etl/epacems_assets.py index 5b40a556cf..5b9557d079 100644 --- a/src/pudl/etl/epacems_assets.py +++ b/src/pudl/etl/epacems_assets.py @@ -4,7 +4,7 @@ import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -from dagster import Field, asset, MultiPartitionsDefinition, StaticPartitionsDefinition +from dagster import Field, MultiPartitionsDefinition, StaticPartitionsDefinition, asset import pudl from pudl.helpers import EnvVar diff --git a/src/pudl/resources.py b/src/pudl/resources.py index 1a2d6e106f..77f4e0b901 100644 --- a/src/pudl/resources.py +++ b/src/pudl/resources.py @@ -1,13 +1,13 @@ """Collection of Dagster resources for PUDL.""" -from dagster import Field, resource from pathlib import Path import pyarrow.parquet as pq +from dagster import Field, resource from pudl.helpers import EnvVar +from pudl.metadata.classes import Resource from pudl.settings import DatasetsSettings, FercToSqliteSettings, create_dagster_config from pudl.workspace.datastore import Datastore -from pudl.metadata.classes import Resource @resource(config_schema=create_dagster_config(DatasetsSettings())) @@ -43,7 +43,10 @@ def ferc_to_sqlite_settings(init_context) -> FercToSqliteSettings: ) def pq_writer(init_context) -> pq.ParquetWriter: """Get monolithic parquet writer.""" - monolithic_path = Path(init_context.resource_config["pudl_output_path"]) / "hourly_emissions_epacems.parquet" + monolithic_path = ( + Path(init_context.resource_config["pudl_output_path"]) + / "hourly_emissions_epacems.parquet" + ) schema = Resource.from_id("hourly_emissions_epacems").to_pyarrow() with pq.ParquetWriter( From d80e7c614a9cde4d762b1b48b5a4c3d0ff50645d Mon Sep 17 00:00:00 2001 From: zschira Date: Thu, 30 Mar 2023 16:53:27 -0400 Subject: [PATCH 03/15] Create dynamic graph for epacems asset --- src/pudl/etl/epacems_assets.py | 95 ++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 46 deletions(-) diff --git a/src/pudl/etl/epacems_assets.py b/src/pudl/etl/epacems_assets.py index 5b9557d079..4f52f2ecac 100644 --- a/src/pudl/etl/epacems_assets.py +++ b/src/pudl/etl/epacems_assets.py @@ -3,18 +3,27 @@ import pandas as pd import pyarrow as pa -import pyarrow.parquet as pq -from dagster import Field, MultiPartitionsDefinition, StaticPartitionsDefinition, asset +from dagster import DynamicOut, DynamicOutput, Field, graph_asset, op import pudl from pudl.helpers import EnvVar from pudl.metadata.classes import Resource -from pudl.settings import EpaCemsSettings logger = pudl.logging_helpers.get_logger(__name__) -@asset( +@op( + out=DynamicOut(), + required_resource_keys={"dataset_settings"}, +) +def get_year(context): + """Return set of requested years.""" + epacems_settings = context.resources.dataset_settings.epacems + for year in epacems_settings.years: + yield DynamicOutput(year, mapping_key=str(year)) + + +@op( required_resource_keys={"datastore", "dataset_settings", "pq_writer"}, config_schema={ "pudl_output_path": Field( @@ -24,23 +33,38 @@ description="Path of directory to store the database in.", default_value=None, ), - "partition": Field( - bool, - description="Flag indicating whether the partitioned EPACEMS output should be created", - default_value=False, - ), }, - partitions_def=MultiPartitionsDefinition( - { - "years": StaticPartitionsDefinition( - [str(year) for year in EpaCemsSettings().years] - ), - "states": StaticPartitionsDefinition(EpaCemsSettings().states), - }, - ), ) +def process_single_year( + context, + year, + epacamd_eia: pd.DataFrame, + plants_entity_eia: pd.DataFrame, +): + """Process a single year of epacems data.""" + ds = context.resources.datastore + epacems_settings = context.resources.dataset_settings.epacems + + schema = Resource.from_id("hourly_emissions_epacems").to_pyarrow() + partitioned_path = ( + Path(context.op_config["pudl_output_path"]) / "hourly_emissions_epacems" + ) + partitioned_path.mkdir(exist_ok=True) + monolithic_writer = context.resources.pq_writer + + for state in epacems_settings.states: + logger.info(f"Processing EPA CEMS hourly data for {year}-{state}") + df = pudl.extract.epacems.extract(year=year, state=state, ds=ds) + df = pudl.transform.epacems.transform(df, epacamd_eia, plants_entity_eia) + table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) + + # Write to one monolithic parquet file + monolithic_writer.write_table(table) + + +@graph_asset def hourly_emissions_epacems( - context, epacamd_eia: pd.DataFrame, plants_entity_eia: pd.DataFrame + epacamd_eia: pd.DataFrame, plants_entity_eia: pd.DataFrame ) -> None: """Extract, transform and load CSVs for EPA CEMS. @@ -54,32 +78,11 @@ def hourly_emissions_epacems( ORISPL code with EIA. plants_entity_eia: The EIA Plant entities used for aligning timezones. """ - ds = context.resources.datastore - - schema = Resource.from_id("hourly_emissions_epacems").to_pyarrow() - partitioned_path = ( - Path(context.op_config["pudl_output_path"]) / "hourly_emissions_epacems" + years = get_year() + return years.map( + lambda year: process_single_year( + year, + epacamd_eia, + plants_entity_eia, + ) ) - partitioned_path.mkdir(exist_ok=True) - monolithic_writer = context.resources.pq_writer - - partitions = context.partition_key.keys_by_dimension.split("|") - year = int(partitions[0]) - state = partitions[1] - logger.info(f"Processing EPA CEMS hourly data for {year}-{state}") - df = pudl.extract.epacems.extract(year=year, state=state, ds=ds) - df = pudl.transform.epacems.transform(df, epacamd_eia, plants_entity_eia) - table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) - - # Write to one monolithic parquet file - monolithic_writer.write_table(table) - - if context.op_config["partition"]: - # Write to a directory of partitioned parquet files - with pq.ParquetWriter( - where=partitioned_path / f"epacems-{year}-{state}.parquet", - schema=schema, - compression="snappy", - version="2.6", - ) as partitioned_writer: - partitioned_writer.write_table(table) From 6eb0ee56ae65b5eb0fc40a0a3f7f9e1da7be2a58 Mon Sep 17 00:00:00 2001 From: zschira Date: Thu, 30 Mar 2023 17:12:06 -0400 Subject: [PATCH 04/15] Add partition option back to epacems asset --- src/pudl/etl/epacems_assets.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/pudl/etl/epacems_assets.py b/src/pudl/etl/epacems_assets.py index 4f52f2ecac..286478dd5e 100644 --- a/src/pudl/etl/epacems_assets.py +++ b/src/pudl/etl/epacems_assets.py @@ -3,6 +3,7 @@ import pandas as pd import pyarrow as pa +import pyarrow.parquet as pq from dagster import DynamicOut, DynamicOutput, Field, graph_asset, op import pudl @@ -33,6 +34,11 @@ def get_year(context): description="Path of directory to store the database in.", default_value=None, ), + "partition": Field( + bool, + description="Flag indicating whether the partitioned EPACEMS output should be created", + default_value=False, + ), }, ) def process_single_year( @@ -61,6 +67,16 @@ def process_single_year( # Write to one monolithic parquet file monolithic_writer.write_table(table) + if context.op_config["parition"]: + # Write to a directory of partitioned parquet files + with pq.ParquetWriter( + where=partitioned_path / f"epacems-{year}-{state}.parquet", + schema=schema, + compression="snappy", + version="2.6", + ) as partitioned_writer: + partitioned_writer.write_table(table) + @graph_asset def hourly_emissions_epacems( From adf2d92dc96b0299cbc154070502cd69d239afd4 Mon Sep 17 00:00:00 2001 From: zschira Date: Fri, 31 Mar 2023 10:56:41 -0400 Subject: [PATCH 05/15] Fix epacems integration tests --- src/pudl/etl/epacems_assets.py | 2 +- test/conftest.py | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/pudl/etl/epacems_assets.py b/src/pudl/etl/epacems_assets.py index 286478dd5e..e9516e936c 100644 --- a/src/pudl/etl/epacems_assets.py +++ b/src/pudl/etl/epacems_assets.py @@ -67,7 +67,7 @@ def process_single_year( # Write to one monolithic parquet file monolithic_writer.write_table(table) - if context.op_config["parition"]: + if context.op_config["partition"]: # Write to a directory of partitioned parquet files with pq.ParquetWriter( where=partitioned_path / f"epacems-{year}-{state}.parquet", diff --git a/test/conftest.py b/test/conftest.py index 0c36f8d5c8..deedeed2c0 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -312,8 +312,12 @@ def pudl_sql_io_manager( }, "ops": { "hourly_emissions_epacems": { - "config": { - "partition": True, + "ops": { + "process_single_year": { + "config": { + "partition": True, + } + } } } }, From 37609e205d77b79fea236c0448db62dcd6f3131b Mon Sep 17 00:00:00 2001 From: zschira Date: Fri, 31 Mar 2023 12:25:39 -0400 Subject: [PATCH 06/15] Add synchronization around parquet write_table --- src/pudl/resources.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/pudl/resources.py b/src/pudl/resources.py index 77f4e0b901..3f2bfd987d 100644 --- a/src/pudl/resources.py +++ b/src/pudl/resources.py @@ -1,4 +1,5 @@ """Collection of Dagster resources for PUDL.""" +from multiprocessing import Lock from pathlib import Path import pyarrow.parquet as pq @@ -30,6 +31,23 @@ def ferc_to_sqlite_settings(init_context) -> FercToSqliteSettings: return FercToSqliteSettings(**init_context.resource_config) +class ParquetWriter(pq.ParquetWriter): + """Subclass of ParquetWriter to provide synchronization around writes.""" + + def __init__(self, *args, **kwargs): + """Initialize base class and create lock.""" + super().__init__(*args, **kwargs) + self.lock = Lock() + + def write_table(self, *args, **kwargs): + """Acquire lock, then write table.""" + self.lock.acquire() + try: + super().write_table(*args, **kwargs) + finally: + self.lock.release() + + @resource( config_schema={ "pudl_output_path": Field( @@ -41,7 +59,7 @@ def ferc_to_sqlite_settings(init_context) -> FercToSqliteSettings: ), } ) -def pq_writer(init_context) -> pq.ParquetWriter: +def pq_writer(init_context) -> ParquetWriter: """Get monolithic parquet writer.""" monolithic_path = ( Path(init_context.resource_config["pudl_output_path"]) @@ -49,7 +67,7 @@ def pq_writer(init_context) -> pq.ParquetWriter: ) schema = Resource.from_id("hourly_emissions_epacems").to_pyarrow() - with pq.ParquetWriter( + with ParquetWriter( where=monolithic_path, schema=schema, compression="snappy", version="2.6" ) as monolithic_writer: yield monolithic_writer From f90b3df4a77710462e909df834484363b3ae1bf3 Mon Sep 17 00:00:00 2001 From: zschira Date: Fri, 31 Mar 2023 16:45:46 -0400 Subject: [PATCH 07/15] Seperate epacems monolithic output into its own asset to avoid synchronization write problems --- src/pudl/etl/__init__.py | 16 +++++------- src/pudl/etl/epacems_assets.py | 44 ++++++++++++++++++++++++++----- src/pudl/resources.py | 47 ---------------------------------- test/unit/helpers_test.py | 4 ++- 4 files changed, 47 insertions(+), 64 deletions(-) diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index 905958fcc6..ea8e18e1a8 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -17,12 +17,7 @@ ferc1_xbrl_sqlite_io_manager, pudl_sqlite_io_manager, ) -from pudl.resources import ( - dataset_settings, - datastore, - ferc_to_sqlite_settings, - pq_writer, -) +from pudl.resources import dataset_settings, datastore, ferc_to_sqlite_settings from pudl.settings import EtlSettings from . import ( # noqa: F401 @@ -54,7 +49,6 @@ default_resources = { "datastore": datastore, - "pq_writer": pq_writer, "pudl_sqlite_io_manager": pudl_sqlite_io_manager, "ferc1_dbf_sqlite_io_manager": ferc1_dbf_sqlite_io_manager, "ferc1_xbrl_sqlite_io_manager": ferc1_xbrl_sqlite_io_manager, @@ -76,7 +70,9 @@ def create_non_cems_selection(all_assets: list[AssetsDefinition]) -> AssetSelect all_asset_keys = pudl.helpers.get_asset_keys(all_assets) all_selection = AssetSelection.keys(*all_asset_keys) - cems_selection = AssetSelection.keys(AssetKey("hourly_emissions_epacems")) + cems_selection = AssetSelection.keys( + AssetKey("hourly_emissions_epacems_monolithic") + ) return all_selection - cems_selection.downstream() @@ -104,7 +100,7 @@ def load_dataset_settings_from_file(setting_filename: str) -> dict: name="etl_full", description="This job executes all years of all assets." ), define_asset_job( - name="etl_full_no_cems", + name="etl_full_no_cems_monolithic", selection=create_non_cems_selection(default_assets), description="This job executes all years of all assets except the " "hourly_emissions_epacems asset and all assets downstream.", @@ -121,7 +117,7 @@ def load_dataset_settings_from_file(setting_filename: str) -> dict: description="This job executes the most recent year of each asset.", ), define_asset_job( - name="etl_fast_no_cems", + name="etl_fast_no_cems_monolithic", selection=create_non_cems_selection(default_assets), config={ "resources": { diff --git a/src/pudl/etl/epacems_assets.py b/src/pudl/etl/epacems_assets.py index e9516e936c..29cbf78348 100644 --- a/src/pudl/etl/epacems_assets.py +++ b/src/pudl/etl/epacems_assets.py @@ -1,10 +1,11 @@ """EPA CEMS Hourly Emissions assets.""" from pathlib import Path +import dask.dataframe as dd import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -from dagster import DynamicOut, DynamicOutput, Field, graph_asset, op +from dagster import DynamicOut, DynamicOutput, Field, asset, graph_asset, op import pudl from pudl.helpers import EnvVar @@ -25,7 +26,7 @@ def get_year(context): @op( - required_resource_keys={"datastore", "dataset_settings", "pq_writer"}, + required_resource_keys={"datastore", "dataset_settings"}, config_schema={ "pudl_output_path": Field( EnvVar( @@ -56,7 +57,6 @@ def process_single_year( Path(context.op_config["pudl_output_path"]) / "hourly_emissions_epacems" ) partitioned_path.mkdir(exist_ok=True) - monolithic_writer = context.resources.pq_writer for state in epacems_settings.states: logger.info(f"Processing EPA CEMS hourly data for {year}-{state}") @@ -64,9 +64,6 @@ def process_single_year( df = pudl.transform.epacems.transform(df, epacamd_eia, plants_entity_eia) table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) - # Write to one monolithic parquet file - monolithic_writer.write_table(table) - if context.op_config["partition"]: # Write to a directory of partitioned parquet files with pq.ParquetWriter( @@ -102,3 +99,38 @@ def hourly_emissions_epacems( plants_entity_eia, ) ) + + +@asset( + config_schema={ + "pudl_output_path": Field( + EnvVar( + env_var="PUDL_OUTPUT", + ), + description="Path of directory to store the database in.", + default_value=None, + ), + }, + io_manager_key="epacems_io_manager", + required_resource_keys={"dataset_settings"}, +) +def hourly_emissions_epacems_monolithic( + context, hourly_emissions_epacems: dd.DataFrame +): + """Read partitioned output and create monolithic version.""" + epacems_settings = context.resources.dataset_settings.epacems + + schema = Resource.from_id("hourly_emissions_epacems").to_pyarrow() + monolithic_path = ( + Path(context.op_config["pudl_output_path"]) / "hourly_emissions_epacems.parquet" + ) + + with pq.ParquetWriter( + where=monolithic_path, schema=schema, compression="snappy", version="2.6" + ) as monolithic_writer: + for year in epacems_settings.years: + df = hourly_emissions_epacems[hourly_emissions_epacems == year].compute() + table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) + + # Write to one monolithic parquet file + monolithic_writer.write_table(table) diff --git a/src/pudl/resources.py b/src/pudl/resources.py index 3f2bfd987d..9597287d72 100644 --- a/src/pudl/resources.py +++ b/src/pudl/resources.py @@ -1,12 +1,7 @@ """Collection of Dagster resources for PUDL.""" -from multiprocessing import Lock -from pathlib import Path - -import pyarrow.parquet as pq from dagster import Field, resource from pudl.helpers import EnvVar -from pudl.metadata.classes import Resource from pudl.settings import DatasetsSettings, FercToSqliteSettings, create_dagster_config from pudl.workspace.datastore import Datastore @@ -31,48 +26,6 @@ def ferc_to_sqlite_settings(init_context) -> FercToSqliteSettings: return FercToSqliteSettings(**init_context.resource_config) -class ParquetWriter(pq.ParquetWriter): - """Subclass of ParquetWriter to provide synchronization around writes.""" - - def __init__(self, *args, **kwargs): - """Initialize base class and create lock.""" - super().__init__(*args, **kwargs) - self.lock = Lock() - - def write_table(self, *args, **kwargs): - """Acquire lock, then write table.""" - self.lock.acquire() - try: - super().write_table(*args, **kwargs) - finally: - self.lock.release() - - -@resource( - config_schema={ - "pudl_output_path": Field( - EnvVar( - env_var="PUDL_OUTPUT", - ), - description="Path of directory to store the database in.", - default_value=None, - ), - } -) -def pq_writer(init_context) -> ParquetWriter: - """Get monolithic parquet writer.""" - monolithic_path = ( - Path(init_context.resource_config["pudl_output_path"]) - / "hourly_emissions_epacems.parquet" - ) - schema = Resource.from_id("hourly_emissions_epacems").to_pyarrow() - - with ParquetWriter( - where=monolithic_path, schema=schema, compression="snappy", version="2.6" - ) as monolithic_writer: - yield monolithic_writer - - @resource( config_schema={ "local_cache_path": Field( diff --git a/test/unit/helpers_test.py b/test/unit/helpers_test.py index a48f87f526..471a9fa3c3 100644 --- a/test/unit/helpers_test.py +++ b/test/unit/helpers_test.py @@ -623,7 +623,9 @@ def test_flatten_mix_types(): def test_cems_selection(): """Test CEMS asset selection remove cems assets.""" cems_selection = pudl.etl.create_non_cems_selection(pudl.etl.default_assets) - assert AssetKey("hourly_emissions_epacems") not in cems_selection.resolve( + assert AssetKey( + "hourly_emissions_epacems_monolithic" + ) not in cems_selection.resolve( pudl.etl.default_assets ), "hourly_emissions_epacems or downstream asset present in selection." From 2d9e926b2d3e0ce29e0c682893c4105baafb5659 Mon Sep 17 00:00:00 2001 From: zschira Date: Mon, 3 Apr 2023 15:55:44 -0400 Subject: [PATCH 08/15] Create monolithic epacems parquet output from reading partitioned outputs in and writing to single file --- src/pudl/etl/__init__.py | 8 +-- src/pudl/etl/epacems_assets.py | 128 ++++++++++++++++++--------------- 2 files changed, 72 insertions(+), 64 deletions(-) diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index ea8e18e1a8..dd2e4e9f4b 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -70,9 +70,7 @@ def create_non_cems_selection(all_assets: list[AssetsDefinition]) -> AssetSelect all_asset_keys = pudl.helpers.get_asset_keys(all_assets) all_selection = AssetSelection.keys(*all_asset_keys) - cems_selection = AssetSelection.keys( - AssetKey("hourly_emissions_epacems_monolithic") - ) + cems_selection = AssetSelection.keys(AssetKey("hourly_emissions_epacems")) return all_selection - cems_selection.downstream() @@ -100,7 +98,7 @@ def load_dataset_settings_from_file(setting_filename: str) -> dict: name="etl_full", description="This job executes all years of all assets." ), define_asset_job( - name="etl_full_no_cems_monolithic", + name="etl_full_no_cems", selection=create_non_cems_selection(default_assets), description="This job executes all years of all assets except the " "hourly_emissions_epacems asset and all assets downstream.", @@ -117,7 +115,7 @@ def load_dataset_settings_from_file(setting_filename: str) -> dict: description="This job executes the most recent year of each asset.", ), define_asset_job( - name="etl_fast_no_cems_monolithic", + name="etl_fast_no_cems", selection=create_non_cems_selection(default_assets), config={ "resources": { diff --git a/src/pudl/etl/epacems_assets.py b/src/pudl/etl/epacems_assets.py index 29cbf78348..58b7af53c8 100644 --- a/src/pudl/etl/epacems_assets.py +++ b/src/pudl/etl/epacems_assets.py @@ -1,11 +1,11 @@ """EPA CEMS Hourly Emissions assets.""" +from collections import namedtuple from pathlib import Path -import dask.dataframe as dd import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -from dagster import DynamicOut, DynamicOutput, Field, asset, graph_asset, op +from dagster import DynamicOut, DynamicOutput, Field, graph_asset, op import pudl from pudl.helpers import EnvVar @@ -14,12 +14,19 @@ logger = pudl.logging_helpers.get_logger(__name__) +YearPartitions = namedtuple("YearPartitions", ["year", "states"]) + + @op( out=DynamicOut(), required_resource_keys={"dataset_settings"}, ) -def get_year(context): - """Return set of requested years.""" +def get_years_from_settings(context): + """Return set of years in settings. + + These will be used to kick off worker processes to process each year of data in + parallel. + """ epacems_settings = context.resources.dataset_settings.epacems for year in epacems_settings.years: yield DynamicOutput(year, mapping_key=str(year)) @@ -35,11 +42,6 @@ def get_year(context): description="Path of directory to store the database in.", default_value=None, ), - "partition": Field( - bool, - description="Flag indicating whether the partitioned EPACEMS output should be created", - default_value=False, - ), }, ) def process_single_year( @@ -47,8 +49,16 @@ def process_single_year( year, epacamd_eia: pd.DataFrame, plants_entity_eia: pd.DataFrame, -): - """Process a single year of epacems data.""" +) -> YearPartitions: + """Process a single year of EPA CEMS data. + + Args: + context: dagster keyword that provides access to resources and config. + year: Year of data to process. + epacamd_eia: The EPA EIA crosswalk table used for harmonizing the + ORISPL code with EIA. + plants_entity_eia: The EIA Plant entities used for aligning timezones. + """ ds = context.resources.datastore epacems_settings = context.resources.dataset_settings.epacems @@ -64,44 +74,19 @@ def process_single_year( df = pudl.transform.epacems.transform(df, epacamd_eia, plants_entity_eia) table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) - if context.op_config["partition"]: - # Write to a directory of partitioned parquet files - with pq.ParquetWriter( - where=partitioned_path / f"epacems-{year}-{state}.parquet", - schema=schema, - compression="snappy", - version="2.6", - ) as partitioned_writer: - partitioned_writer.write_table(table) + # Write to a directory of partitioned parquet files + with pq.ParquetWriter( + where=partitioned_path / f"epacems-{year}-{state}.parquet", + schema=schema, + compression="snappy", + version="2.6", + ) as partitioned_writer: + partitioned_writer.write_table(table) + return YearPartitions(year, epacems_settings.states) -@graph_asset -def hourly_emissions_epacems( - epacamd_eia: pd.DataFrame, plants_entity_eia: pd.DataFrame -) -> None: - """Extract, transform and load CSVs for EPA CEMS. - This asset loads the partitions to a single parquet file in the - function instead of using an IO Manager. Use the epacems_io_manager - IO Manager to read this asset for downstream dependencies. - - Args: - context: dagster keyword that provides access to resources and config. - epacamd_eia: The EPA EIA crosswalk table used for harmonizing the - ORISPL code with EIA. - plants_entity_eia: The EIA Plant entities used for aligning timezones. - """ - years = get_year() - return years.map( - lambda year: process_single_year( - year, - epacamd_eia, - plants_entity_eia, - ) - ) - - -@asset( +@op( config_schema={ "pudl_output_path": Field( EnvVar( @@ -111,26 +96,51 @@ def hourly_emissions_epacems( default_value=None, ), }, - io_manager_key="epacems_io_manager", - required_resource_keys={"dataset_settings"}, ) -def hourly_emissions_epacems_monolithic( - context, hourly_emissions_epacems: dd.DataFrame -): - """Read partitioned output and create monolithic version.""" - epacems_settings = context.resources.dataset_settings.epacems +def consolidate_partitions(context, partitions: list[YearPartitions]) -> None: + """Read partitions into memory and write to a single monolithic output. - schema = Resource.from_id("hourly_emissions_epacems").to_pyarrow() + Args: + context: dagster keyword that provides access to resources and config. + partitions: Year and state combinations in the output database. + """ + partitioned_path = ( + Path(context.op_config["pudl_output_path"]) / "hourly_emissions_epacems" + ) monolithic_path = ( Path(context.op_config["pudl_output_path"]) / "hourly_emissions_epacems.parquet" ) + schema = Resource.from_id("hourly_emissions_epacems").to_pyarrow() with pq.ParquetWriter( where=monolithic_path, schema=schema, compression="snappy", version="2.6" ) as monolithic_writer: - for year in epacems_settings.years: - df = hourly_emissions_epacems[hourly_emissions_epacems == year].compute() - table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) + for year, states in partitions: + for state in states: + monolithic_writer.write_table( + pq.read_table( + source=partitioned_path / f"epacems-{year}-{state}.parquet", + schema=schema, + ) + ) - # Write to one monolithic parquet file - monolithic_writer.write_table(table) + +@graph_asset +def hourly_emissions_epacems( + epacamd_eia: pd.DataFrame, plants_entity_eia: pd.DataFrame +) -> None: + """Extract, transform and load CSVs for EPA CEMS. + + This asset creates a dynamic graph of ops to process EPA CEMS data in parallel. It + will create both a partitioned and single monolithic parquet output. For more + information see: https://docs.dagster.io/concepts/ops-jobs-graphs/dynamic-graphs. + """ + years = get_years_from_settings() + partitions = years.map( + lambda year: process_single_year( + year, + epacamd_eia, + plants_entity_eia, + ) + ) + return consolidate_partitions(partitions.collect()) From a30f031a581a2090c476906d62491dc30f5f961b Mon Sep 17 00:00:00 2001 From: zschira Date: Mon, 3 Apr 2023 17:09:23 -0400 Subject: [PATCH 09/15] Fix epacems unit test --- test/unit/helpers_test.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/unit/helpers_test.py b/test/unit/helpers_test.py index 471a9fa3c3..a48f87f526 100644 --- a/test/unit/helpers_test.py +++ b/test/unit/helpers_test.py @@ -623,9 +623,7 @@ def test_flatten_mix_types(): def test_cems_selection(): """Test CEMS asset selection remove cems assets.""" cems_selection = pudl.etl.create_non_cems_selection(pudl.etl.default_assets) - assert AssetKey( - "hourly_emissions_epacems_monolithic" - ) not in cems_selection.resolve( + assert AssetKey("hourly_emissions_epacems") not in cems_selection.resolve( pudl.etl.default_assets ), "hourly_emissions_epacems or downstream asset present in selection." From 887ee7401276a62003422e504b539698033c7296 Mon Sep 17 00:00:00 2001 From: zschira Date: Mon, 3 Apr 2023 17:16:45 -0400 Subject: [PATCH 10/15] Remove partition options from cli commands --- src/pudl/cli.py | 13 ------------- src/pudl/convert/epacems_to_parquet.py | 13 ------------- 2 files changed, 26 deletions(-) diff --git a/src/pudl/cli.py b/src/pudl/cli.py index 11a68d5b90..312e2c1647 100644 --- a/src/pudl/cli.py +++ b/src/pudl/cli.py @@ -64,12 +64,6 @@ def parse_command_line(argv): help="Set logging level (DEBUG, INFO, WARNING, ERROR, or CRITICAL).", default="INFO", ) - parser.add_argument( - "--partition-epacems", - action="store_true", - default=False, - help="If set, output epacems year-state partitioned Parquet files", - ) arguments = parser.parse_args(argv[1:]) return arguments @@ -158,13 +152,6 @@ def main(): }, }, }, - "ops": { - "hourly_emissions_epacems": { - "config": { - "partition": args.partition_epacems, - } - } - }, }, ) diff --git a/src/pudl/convert/epacems_to_parquet.py b/src/pudl/convert/epacems_to_parquet.py index 3b13d4fdcd..d6511196e8 100755 --- a/src/pudl/convert/epacems_to_parquet.py +++ b/src/pudl/convert/epacems_to_parquet.py @@ -89,12 +89,6 @@ def parse_command_line(argv): default=None, help="If specified, write logs to this file.", ) - parser.add_argument( - "--partition", - action="store_true", - default=False, - help="If set, output year-state partitioned Parquet files", - ) arguments = parser.parse_args(argv[1:]) return arguments @@ -154,13 +148,6 @@ def main(): }, }, }, - "ops": { - "hourly_emissions_epacems": { - "config": { - "partition": args.partition, - } - } - }, }, ) From 150e303c9ba96de5e55b805c2b237cde86b8634f Mon Sep 17 00:00:00 2001 From: Zane Selvans Date: Mon, 3 Apr 2023 16:44:54 -0600 Subject: [PATCH 11/15] Remove EPA CEMS partitioning config from tests --- test/conftest.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/test/conftest.py b/test/conftest.py index deedeed2c0..b13f1d120a 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -310,17 +310,6 @@ def pudl_sql_io_manager( "config": pudl_datastore_config, }, }, - "ops": { - "hourly_emissions_epacems": { - "ops": { - "process_single_year": { - "config": { - "partition": True, - } - } - } - } - }, }, ) # Grab a connection to the freshly populated PUDL DB, and hand it off. From 75bac250a5e2304c17b78918f4dfcef5e1e10b0b Mon Sep 17 00:00:00 2001 From: zschira Date: Tue, 4 Apr 2023 12:49:54 -0400 Subject: [PATCH 12/15] Add detailed module description to epacems_assets.py --- src/pudl/etl/epacems_assets.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/pudl/etl/epacems_assets.py b/src/pudl/etl/epacems_assets.py index 58b7af53c8..cd511439fe 100644 --- a/src/pudl/etl/epacems_assets.py +++ b/src/pudl/etl/epacems_assets.py @@ -1,4 +1,12 @@ -"""EPA CEMS Hourly Emissions assets.""" +"""EPA CEMS Hourly Emissions assets. + +The :func:`hourly_emissions_epacems` asset defined in this module uses a dagster pattern +that is unique from other PUDL assets. Specifically, it is creating a `dynamic graph +`__ of ops wrapped by a +`graph backed `__ asset. +The dynamic graph will allow dagster to dynamically generate an op for processing each +year of EPA CEMS data and execute these ops in parallel. +""" from collections import namedtuple from pathlib import Path From 68b5116ad79160af91b4594ef6eeb65119d5b5f3 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 4 Apr 2023 16:57:50 +0000 Subject: [PATCH 13/15] [pre-commit.ci] auto fixes from pre-commit.com hooks For more information, see https://pre-commit.ci --- src/pudl/etl/epacems_assets.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pudl/etl/epacems_assets.py b/src/pudl/etl/epacems_assets.py index cd511439fe..99720006d7 100644 --- a/src/pudl/etl/epacems_assets.py +++ b/src/pudl/etl/epacems_assets.py @@ -2,8 +2,8 @@ The :func:`hourly_emissions_epacems` asset defined in this module uses a dagster pattern that is unique from other PUDL assets. Specifically, it is creating a `dynamic graph -`__ of ops wrapped by a -`graph backed `__ asset. + +`__ of ops wrapped by a`graph backed `__ asset. The dynamic graph will allow dagster to dynamically generate an op for processing each year of EPA CEMS data and execute these ops in parallel. """ From 61ce3c184b021b98240c5b495182096dd1b2214c Mon Sep 17 00:00:00 2001 From: zschira Date: Tue, 4 Apr 2023 13:21:36 -0400 Subject: [PATCH 14/15] Reformat epacems_assets docstring --- src/pudl/etl/epacems_assets.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/pudl/etl/epacems_assets.py b/src/pudl/etl/epacems_assets.py index 99720006d7..1efaf8029a 100644 --- a/src/pudl/etl/epacems_assets.py +++ b/src/pudl/etl/epacems_assets.py @@ -1,11 +1,10 @@ """EPA CEMS Hourly Emissions assets. The :func:`hourly_emissions_epacems` asset defined in this module uses a dagster pattern -that is unique from other PUDL assets. Specifically, it is creating a `dynamic graph - -`__ of ops wrapped by a`graph backed `__ asset. -The dynamic graph will allow dagster to dynamically generate an op for processing each -year of EPA CEMS data and execute these ops in parallel. +that is unique from other PUDL assets. Specifically, it is creating a dynamic graph of +ops wrapped by a `graph backed +`__ asset. The dynamic graph will allow dagster to dynamically generate an op for +processing each year of EPA CEMS data and execute these ops in parallel. """ from collections import namedtuple from pathlib import Path From 25138dab48aa3ae7372f18b2bf4965ef274edf62 Mon Sep 17 00:00:00 2001 From: zschira Date: Tue, 4 Apr 2023 13:34:36 -0400 Subject: [PATCH 15/15] Reformat links in epacems_assets docstring --- src/pudl/etl/epacems_assets.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/pudl/etl/epacems_assets.py b/src/pudl/etl/epacems_assets.py index 1efaf8029a..85967acc94 100644 --- a/src/pudl/etl/epacems_assets.py +++ b/src/pudl/etl/epacems_assets.py @@ -1,10 +1,12 @@ """EPA CEMS Hourly Emissions assets. The :func:`hourly_emissions_epacems` asset defined in this module uses a dagster pattern -that is unique from other PUDL assets. Specifically, it is creating a dynamic graph of -ops wrapped by a `graph backed -`__ asset. The dynamic graph will allow dagster to dynamically generate an op for -processing each year of EPA CEMS data and execute these ops in parallel. +that is unique from other PUDL assets. The underlying architecture uses ops to create a +dynamic graph +which is wrapped by a special asset called a graph backed asset that creates an asset +from a graph of ops. The dynamic graph will allow dagster to dynamically generate an op +for processing each year of EPA CEMS data and execute these ops in parallel. For more information +see: https://docs.dagster.io/concepts/ops-jobs-graphs/dynamic-graphs and https://docs.dagster.io/concepts/assets/graph-backed-assets. """ from collections import namedtuple from pathlib import Path