Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize Dagster processing of EPA CEMS #2472

Merged
merged 18 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions src/pudl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@ def parse_command_line(argv):
help="Set logging level (DEBUG, INFO, WARNING, ERROR, or CRITICAL).",
default="INFO",
)
parser.add_argument(
"--partition-epacems",
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
action="store_true",
default=False,
help="If set, output epacems year-state partitioned Parquet files",
)
arguments = parser.parse_args(argv[1:])
return arguments

Expand Down Expand Up @@ -158,13 +152,6 @@ def main():
},
},
},
"ops": {
"hourly_emissions_epacems": {
"config": {
"partition": args.partition_epacems,
}
}
},
},
)

Expand Down
13 changes: 0 additions & 13 deletions src/pudl/convert/epacems_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,6 @@ def parse_command_line(argv):
default=None,
help="If specified, write logs to this file.",
)
parser.add_argument(
"--partition",
action="store_true",
default=False,
help="If set, output year-state partitioned Parquet files",
)
arguments = parser.parse_args(argv[1:])
return arguments

Expand Down Expand Up @@ -154,13 +148,6 @@ def main():
},
},
},
"ops": {
"hourly_emissions_epacems": {
"config": {
"partition": args.partition,
}
}
},
},
)

Expand Down
145 changes: 109 additions & 36 deletions src/pudl/etl/epacems_assets.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
"""EPA CEMS Hourly Emissions assets."""
"""EPA CEMS Hourly Emissions assets.

The :func:`hourly_emissions_epacems` asset defined in this module uses a dagster pattern
that is unique from other PUDL assets. The underlying architecture uses ops to create a
dynamic graph
which is wrapped by a special asset called a graph backed asset that creates an asset
from a graph of ops. The dynamic graph will allow dagster to dynamically generate an op
for processing each year of EPA CEMS data and execute these ops in parallel. For more information
see: https://docs.dagster.io/concepts/ops-jobs-graphs/dynamic-graphs and https://docs.dagster.io/concepts/assets/graph-backed-assets.
"""
from collections import namedtuple
from pathlib import Path

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from dagster import Field, asset
from dagster import DynamicOut, DynamicOutput, Field, graph_asset, op

import pudl
from pudl.helpers import EnvVar
Expand All @@ -13,7 +23,25 @@
logger = pudl.logging_helpers.get_logger(__name__)


@asset(
YearPartitions = namedtuple("YearPartitions", ["year", "states"])


@op(
out=DynamicOut(),
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
required_resource_keys={"dataset_settings"},
)
def get_years_from_settings(context):
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
"""Return set of years in settings.

These will be used to kick off worker processes to process each year of data in
parallel.
"""
epacems_settings = context.resources.dataset_settings.epacems
for year in epacems_settings.years:
yield DynamicOutput(year, mapping_key=str(year))


@op(
required_resource_keys={"datastore", "dataset_settings"},
config_schema={
"pudl_output_path": Field(
Expand All @@ -23,24 +51,19 @@
description="Path of directory to store the database in.",
default_value=None,
),
"partition": Field(
bool,
description="Flag indicating whether the partitioned EPACEMS output should be created",
default_value=False,
),
},
)
def hourly_emissions_epacems(
context, epacamd_eia: pd.DataFrame, plants_entity_eia: pd.DataFrame
) -> None:
"""Extract, transform and load CSVs for EPA CEMS.

This asset loads the partitions to a single parquet file in the
function instead of using an IO Manager. Use the epacems_io_manager
IO Manager to read this asset for downstream dependencies.
def process_single_year(
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
context,
year,
epacamd_eia: pd.DataFrame,
plants_entity_eia: pd.DataFrame,
) -> YearPartitions:
"""Process a single year of EPA CEMS data.

Args:
context: dagster keyword that provides access to resources and config.
year: Year of data to process.
epacamd_eia: The EPA EIA crosswalk table used for harmonizing the
ORISPL code with EIA.
plants_entity_eia: The EIA Plant entities used for aligning timezones.
Expand All @@ -53,30 +76,80 @@ def hourly_emissions_epacems(
Path(context.op_config["pudl_output_path"]) / "hourly_emissions_epacems"
)
partitioned_path.mkdir(exist_ok=True)

for state in epacems_settings.states:
logger.info(f"Processing EPA CEMS hourly data for {year}-{state}")
df = pudl.extract.epacems.extract(year=year, state=state, ds=ds)
df = pudl.transform.epacems.transform(df, epacamd_eia, plants_entity_eia)
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)

# Write to a directory of partitioned parquet files
with pq.ParquetWriter(
where=partitioned_path / f"epacems-{year}-{state}.parquet",
schema=schema,
compression="snappy",
version="2.6",
) as partitioned_writer:
partitioned_writer.write_table(table)

return YearPartitions(year, epacems_settings.states)


@op(
config_schema={
"pudl_output_path": Field(
EnvVar(
env_var="PUDL_OUTPUT",
),
description="Path of directory to store the database in.",
default_value=None,
),
},
)
def consolidate_partitions(context, partitions: list[YearPartitions]) -> None:
"""Read partitions into memory and write to a single monolithic output.

Args:
context: dagster keyword that provides access to resources and config.
partitions: Year and state combinations in the output database.
"""
partitioned_path = (
Path(context.op_config["pudl_output_path"]) / "hourly_emissions_epacems"
)
monolithic_path = (
Path(context.op_config["pudl_output_path"]) / "hourly_emissions_epacems.parquet"
)
schema = Resource.from_id("hourly_emissions_epacems").to_pyarrow()

with pq.ParquetWriter(
where=monolithic_path, schema=schema, compression="snappy", version="2.6"
) as monolithic_writer:
for part in epacems_settings.partitions:
year = part["year"]
state = part["state"]
logger.info(f"Processing EPA CEMS hourly data for {year}-{state}")
df = pudl.extract.epacems.extract(year=year, state=state, ds=ds)
df = pudl.transform.epacems.transform(df, epacamd_eia, plants_entity_eia)
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)

# Write to one monolithic parquet file
monolithic_writer.write_table(table)

if context.op_config["partition"]:
# Write to a directory of partitioned parquet files
with pq.ParquetWriter(
where=partitioned_path / f"epacems-{year}-{state}.parquet",
schema=schema,
compression="snappy",
version="2.6",
) as partitioned_writer:
partitioned_writer.write_table(table)
for year, states in partitions:
for state in states:
monolithic_writer.write_table(
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
pq.read_table(
source=partitioned_path / f"epacems-{year}-{state}.parquet",
schema=schema,
)
)


@graph_asset
def hourly_emissions_epacems(
epacamd_eia: pd.DataFrame, plants_entity_eia: pd.DataFrame
) -> None:
"""Extract, transform and load CSVs for EPA CEMS.

This asset creates a dynamic graph of ops to process EPA CEMS data in parallel. It
will create both a partitioned and single monolithic parquet output. For more
information see: https://docs.dagster.io/concepts/ops-jobs-graphs/dynamic-graphs.
"""
years = get_years_from_settings()
partitions = years.map(
lambda year: process_single_year(
year,
epacamd_eia,
plants_entity_eia,
)
)
return consolidate_partitions(partitions.collect())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you say more about what the meaning of the return value is here? consolidate_partitions() doesn't return anything itself and I'm not immediately understanding the Dagster docs on what's going on here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically a graph_asset like this has to return the output from an op for dagster to be able to figure out how to create the asset. In most cases the op would actually be returning something that would then get passed to an io_manager. This case is kind of confusing though because we are directly writing to the parquet outputs inside the op, so there's nothing to return.

7 changes: 0 additions & 7 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,6 @@ def pudl_sql_io_manager(
"config": pudl_datastore_config,
},
},
"ops": {
"hourly_emissions_epacems": {
"config": {
"partition": True,
}
}
},
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
},
)
# Grab a connection to the freshly populated PUDL DB, and hand it off.
Expand Down