Skip to content

Commit

Permalink
Make CEMS extraction handle new listed year_quarter partitions (#3187)
Browse files Browse the repository at this point in the history
* WIP: add list partitions to _matches

* Fix csv file name

* update the 860m doi

* Fix docs build

* Update to non-broken CEMS archive

* add datastore to CI

* Switch debug logs to info

* Add dtypes on readin

* Try to reduce memory usage when reading EPACEMS CSVs.

* Reduce record linkage test threshold to 80%

---------

Co-authored-by: Christina Gosnell <[email protected]>
Co-authored-by: Zane Selvans <[email protected]>
  • Loading branch information
3 people authored Jan 8, 2024
1 parent eadf625 commit 0525092
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 16 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ jobs:
- name: Run integration tests, trying to use GCS cache if possible
run: |
pip install --no-deps --editable .
pudl_datastore --dataset epacems --partition year_quarter=2022q1
make pytest-integration
- name: Upload coverage
Expand Down
72 changes: 62 additions & 10 deletions src/pudl/extract/epacems.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,44 @@
}
"""Set: The set of EPA CEMS columns to ignore when reading data."""

API_DTYPE_DICT = {
"State": pd.CategoricalDtype(),
"Facility Name": pd.StringDtype(), # Not reading from CSV
"Facility ID": pd.Int32Dtype(), # unique facility id for internal EPA database management (ORIS code)
"Unit ID": pd.StringDtype(),
"Associated Stacks": pd.StringDtype(),
# These op_date, op_hour, and op_time variables get converted to
# operating_date, operating_datetime and operating_time_interval in
# transform/epacems.py
"Date": pd.StringDtype(),
"Hour": pd.Int16Dtype(),
"Operating Time": pd.Float32Dtype(),
"Gross Load (MW)": pd.Float32Dtype(),
"Steam Load (1000 lb/hr)": pd.Float32Dtype(),
"SO2 Mass (lbs)": pd.Float32Dtype(),
"SO2 Mass Measure Indicator": pd.CategoricalDtype(),
"SO2 Rate (lbs/mmBtu)": pd.Float32Dtype(), # Not reading from CSV
"SO2 Rate Measure Indicator": pd.CategoricalDtype(), # Not reading from CSV
"NOx Rate (lbs/mmBtu)": pd.Float32Dtype(), # Not reading from CSV
"NOx Rate Measure Indicator": pd.CategoricalDtype(), # Not reading from CSV
"NOx Mass (lbs)": pd.Float32Dtype(),
"NOx Mass Measure Indicator": pd.CategoricalDtype(),
"CO2 Mass (short tons)": pd.Float32Dtype(),
"CO2 Mass Measure Indicator": pd.CategoricalDtype(),
"CO2 Rate (short tons/mmBtu)": pd.Float32Dtype(), # Not reading from CSV
"CO2 Rate Measure Indicator": pd.CategoricalDtype(), # Not reading from CSV
"Heat Input (mmBtu)": pd.Float32Dtype(),
"Heat Input Measure Indicator": pd.CategoricalDtype(),
"Primary Fuel Type": pd.CategoricalDtype(),
"Secondary Fuel Type": pd.CategoricalDtype(),
"Unit Type": pd.CategoricalDtype(),
"SO2 Controls": pd.CategoricalDtype(),
"NOx Controls": pd.CategoricalDtype(),
"PM Controls": pd.CategoricalDtype(),
"Hg Controls": pd.CategoricalDtype(),
"Program Code": pd.CategoricalDtype(),
}


class EpaCemsPartition(BaseModel):
"""Represents EpaCems partition identifying unique resource file."""
Expand All @@ -117,7 +155,7 @@ def get_filters(self):
def get_quarterly_file(self) -> Path:
"""Return the name of the CSV file that holds annual hourly data."""
return Path(
f"epacems-{self.year}-{pd.to_datetime(self.year_quarter).quarter}.csv"
f"epacems-{self.year}q{pd.to_datetime(self.year_quarter).quarter}.csv"
)


Expand All @@ -138,30 +176,43 @@ def get_data_frame(self, partition: EpaCemsPartition) -> pd.DataFrame:
archive = self.datastore.get_zipfile_resource(
"epacems", **partition.get_filters()
)

with archive.open(str(partition.get_quarterly_file()), "r") as csv_file:
df = self._csv_to_dataframe(
csv_file, ignore_cols=API_IGNORE_COLS, rename_dict=API_RENAME_DICT
csv_file,
ignore_cols=API_IGNORE_COLS,
rename_dict=API_RENAME_DICT,
dtype_dict=API_DTYPE_DICT,
)
return df

def _csv_to_dataframe(
self, csv_file: Path, ignore_cols: dict[str, str], rename_dict: dict[str, str]
self,
csv_path: Path,
ignore_cols: dict[str, str],
rename_dict: dict[str, str],
dtype_dict: dict[str, type],
chunksize: int = 100_000,
) -> pd.DataFrame:
"""Convert a CEMS csv file into a :class:`pandas.DataFrame`.
Args:
csv (file-like object): data to be read
csv_path: Path to CSV file containing data to read.
Returns:
A DataFrame containing the contents of the CSV file.
A DataFrame containing the filtered and dtyped contents of the CSV file.
"""
return pd.read_csv(
csv_file,
chunk_iter = pd.read_csv(
csv_path,
index_col=False,
usecols=lambda col: col not in ignore_cols,
low_memory=False,
).rename(columns=rename_dict)
dtype=dtype_dict,
chunksize=chunksize,
low_memory=True,
parse_dates=["Date"],
)
df = pd.concat(chunk_iter)
dtypes = {k: v for k, v in dtype_dict.items() if k in df.columns}
return df.astype(dtypes).rename(columns=rename_dict)


def extract(year_quarter: str, ds: Datastore) -> pd.DataFrame:
Expand All @@ -178,6 +229,7 @@ def extract(year_quarter: str, ds: Datastore) -> pd.DataFrame:
year = partition.year
# We have to assign the reporting year for partitioning purposes
try:
logger.info(f"Extracting data frame for {year_quarter}")
df = ds.get_data_frame(partition).assign(year=year)
# If the requested quarter is not found, return an empty df with expected columns:
except KeyError:
Expand Down
18 changes: 14 additions & 4 deletions src/pudl/workspace/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ def _matches(self, res: dict, **filters: Any):
f"Resource filter values should be all lowercase: {k}={v}"
)
parts = res.get("parts", {})
# If partitions are list, match whole list if it contains desired element
if set(map(type, parts.values())) == {list}:
return all(
any(part.lower() == str(v).lower() for part in parts.get(k))
for k, v in filters.items()
)
# Otherwise return matches to int/str partitions
return all(
str(parts.get(k)).lower() == str(v).lower() for k, v in filters.items()
)
Expand Down Expand Up @@ -134,7 +141,10 @@ def get_partitions(self, name: str = None) -> dict[str, set[str]]:
if name and res["name"] != name:
continue
for k, v in res.get("parts", {}).items():
partitions[k].add(v)
if isinstance(v, list):
partitions[k] |= set(v) # Add all items from list
else:
partitions[k].add(v)
return partitions

def get_partition_filters(self, **filters: Any) -> Iterator[dict[str, str]]:
Expand Down Expand Up @@ -172,12 +182,12 @@ class ZenodoDoiSettings(BaseSettings):

censusdp1tract: ZenodoDoi = "10.5281/zenodo.4127049"
eia860: ZenodoDoi = "10.5281/zenodo.10067566"
eia860m: ZenodoDoi = "10.5281/zenodo.10204686"
eia860m: ZenodoDoi = "10.5281/zenodo.10423813"
eia861: ZenodoDoi = "10.5281/zenodo.10204708"
eia923: ZenodoDoi = "10.5281/zenodo.10067550"
eia_bulk_elec: ZenodoDoi = "10.5281/zenodo.7067367"
epacamd_eia: ZenodoDoi = "10.5281/zenodo.7900974"
epacems: ZenodoDoi = "10.5281/zenodo.10306114"
epacems: ZenodoDoi = "10.5281/zenodo.10425497"
ferc1: ZenodoDoi = "10.5281/zenodo.8326634"
ferc2: ZenodoDoi = "10.5281/zenodo.8326697"
ferc6: ZenodoDoi = "10.5281/zenodo.8326696"
Expand Down Expand Up @@ -366,8 +376,8 @@ def get_resources(
logger.info(f"{res} is already optimally cached.")
continue
if self._cache.contains(res):
logger.info(f"Retrieved {res} from cache.")
contents = self._cache.get(res)
logger.info(f"Retrieved {res} from cache.")
if not self._cache.is_optimally_cached(res):
logger.info(f"{res} was not optimally cached yet, adding.")
self._cache.add(res, contents)
Expand Down
5 changes: 4 additions & 1 deletion src/pudl/workspace/resource_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def _resource_path(self, resource: PudlResourceKey) -> Path:

def get(self, resource: PudlResourceKey) -> bytes:
"""Retrieves value associated with a given resource."""
logger.debug(f"Getting {resource} from local file cache.")
return self._resource_path(resource).open("rb").read()

def add(self, resource: PudlResourceKey, content: bytes):
Expand Down Expand Up @@ -151,6 +152,7 @@ def _blob(self, resource: PudlResourceKey) -> Blob:

def get(self, resource: PudlResourceKey) -> bytes:
"""Retrieves value associated with given resource."""
logger.debug(f"Getting {resource} from {self._blob.__name__}")
return self._blob(resource).download_as_bytes(retry=gcs_retry)

def add(self, resource: PudlResourceKey, value: bytes):
Expand Down Expand Up @@ -218,9 +220,10 @@ def add(self, resource: PudlResourceKey, value):
for cache_layer in self._caches:
if cache_layer.is_read_only():
continue
logger.debug(f"Adding {resource} to cache {cache_layer.__class__.__name__}")
cache_layer.add(resource, value)
logger.debug(
f"Add {resource} to cache layer {cache_layer.__class__.__name__})"
f"Added {resource} to cache layer {cache_layer.__class__.__name__})"
)
break

Expand Down
2 changes: 1 addition & 1 deletion test/integration/record_linkage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,4 @@ def _link_ids(df: pd.DataFrame):
)
ratio_correct = correctly_matched / len(mock_ferc1_plants_df)
logger.info(f"Percent correctly matched: {ratio_correct:.2%}")
assert ratio_correct > 0.85, "Percent of correctly matched FERC records below 85%."
assert ratio_correct > 0.80, "Percent of correctly matched FERC records below 80%."

0 comments on commit 0525092

Please sign in to comment.