Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
For more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Dec 12, 2024
1 parent 2ae5dfb commit d5f5ffe
Showing 1 changed file with 46 additions and 41 deletions.
87 changes: 46 additions & 41 deletions src/pudl/transform/phmsagas.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,32 +183,35 @@ def core_phmsagas__yearly_distribution_operators(
df = df.drop_duplicates()

# Identify non-unique groups based on our PKs
non_unique_groups = df[df.groupby(["operator_id_phmsa", "report_number"])["report_number"].transform('size') > 1]
non_unique_groups = df[
df.groupby(["operator_id_phmsa", "report_number"])["report_number"].transform(
"size"
)
> 1
]

# Apply some custom filtering logic to non-unique groups
filtered_non_unique_rows = (
non_unique_groups
.groupby(["operator_id_phmsa", "report_number"], group_keys=False)
.apply(combined_filter)
)
filtered_non_unique_rows = non_unique_groups.groupby(
["operator_id_phmsa", "report_number"], group_keys=False
).apply(combined_filter)

# Combine filtered non-unique rows with untouched unique rows
unique_rows = df.drop(non_unique_groups.index)
df = pd.concat([unique_rows, filtered_non_unique_rows], ignore_index=True)

return df


def filter_if_test_in_address(group: pd.DataFrame) -> pd.DataFrame:
"""
Filters out rows with "test" in address columns. The logic is as follows:
1. For any group of rows with the same combination of "operator_id_phmsa"
"""Filters out rows with "test" in address columns. The logic is as follows:
1. For any group of rows with the same combination of "operator_id_phmsa"
and "report_number":
- If at least one row in the group does not contain the string "test"
(case-insensitive) in either "office_address_street" or
"headquarters_address_street", keep only the rows in the group
- If at least one row in the group does not contain the string "test"
(case-insensitive) in either "office_address_street" or
"headquarters_address_street", keep only the rows in the group
that do not contain "test" in these columns.
- If all rows in the group contain "test" in either of the columns,
- If all rows in the group contain "test" in either of the columns,
leave the group unchanged.
Args:
Expand All @@ -219,22 +222,21 @@ def filter_if_test_in_address(group: pd.DataFrame) -> pd.DataFrame:
"""
# Check if at least one row in the group does NOT contain "test" in both of the specified columns
contains_test = group.apply(
lambda row: "test" in str(row["office_address_street"]).lower() or
"test" in str(row["headquarters_address_street"]).lower(),
axis=1
lambda row: "test" in str(row["office_address_street"]).lower()
or "test" in str(row["headquarters_address_street"]).lower(),
axis=1,
)
has_non_test = not contains_test.all()

if has_non_test:
# Keep rows where "test" does NOT appear in either column
return group[~contains_test]
else:
# If all rows have "test", keep the group as is
return group
# If all rows have "test", keep the group as is
return group


def filter_by_city_in_name(group: pd.DataFrame) -> pd.DataFrame:
"""
Deduplication filter to only keep rows where "office_address_city" value
"""Deduplication filter to only keep rows where "office_address_city" value
is contained in the "operator_name_phmsa" value (case insensitive).
Args:
Expand All @@ -244,20 +246,25 @@ def filter_by_city_in_name(group: pd.DataFrame) -> pd.DataFrame:
pd.DataFrame: The filtered group of rows.
"""
# Check if any row has "office_address_city" contained in "operator_name_phmsa" (case insensitive)
city_in_name = group["office_address_city"].str.lower().apply(
lambda city: any(city in name.lower() for name in group["operator_name_phmsa"])
city_in_name = (
group["office_address_city"]
.str.lower()
.apply(
lambda city: any(
city in name.lower() for name in group["operator_name_phmsa"]
)
)
)

if city_in_name.any():
# If any city is contained in the operator name, keep only those rows
return group[city_in_name]
else:
# If no city is contained in the operator name, return the group as-is
return group
# If no city is contained in the operator name, return the group as-is
return group


def combined_filter(group: pd.DataFrame) -> pd.DataFrame:
"""
Apply all required filters to DataFrame.
"""Apply all required filters to DataFrame.
Args:
group (pd.DataFrame): A grouped subset of the DataFrame.
Expand All @@ -270,6 +277,7 @@ def combined_filter(group: pd.DataFrame) -> pd.DataFrame:
group = filter_if_test_in_address(group)
return group


@dataclass
class PhmsagasCheckSpec:
"""Define some simple checks that can run on FERC 714 assets."""
Expand Down Expand Up @@ -318,17 +326,14 @@ def _check_percent_unaccounted_for_gas(df):
def _check_pk_deduplication(df):
"""Check if the size of filtered non-unique rows exceeds the threshold."""
# Identify non-unique groups
non_unique_groups = (
df.groupby(["operator_id_phmsa", "report_number"])
.filter(lambda group: len(group) > 1)
non_unique_groups = df.groupby(["operator_id_phmsa", "report_number"]).filter(
lambda group: len(group) > 1
)

# Apply the filters to non-unique groups
filtered_non_unique_rows = (
non_unique_groups
.groupby(["operator_id_phmsa", "report_number"], group_keys=False)
.apply(combined_filter)
)
filtered_non_unique_rows = non_unique_groups.groupby(
["operator_id_phmsa", "report_number"], group_keys=False
).apply(combined_filter)

if len(filtered_non_unique_rows) > spec.pk_deduplication_theshold:
error = (
Expand All @@ -337,7 +342,7 @@ def _check_pk_deduplication(df):
)
logger.info(error)
return AssetCheckResult(passed=False, metadata={"errors": error})

return AssetCheckResult(passed=True)

return [_check_percent_unaccounted_for_gas, _check_pk_deduplication]

0 comments on commit d5f5ffe

Please sign in to comment.