Skip to content

Commit

Permalink
Improve sec10k schema definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
zschira committed Jan 29, 2025
1 parent 31faab3 commit 26140ec
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 110 deletions.
1 change: 1 addition & 0 deletions src/pudl/analysis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
fuel_by_plant,
mcoe,
plant_parts_eia,
pudl_models,
record_linkage,
service_territory,
spatial,
Expand Down
114 changes: 86 additions & 28 deletions src/pudl/analysis/pudl_models.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,101 @@
"""Implement utilities for working with data produced in the pudl modelling repo."""

import os

import pandas as pd
from dagster import AssetsDefinition, asset
from dagster import asset


def _load_table_from_gcs(table_name: str) -> pd.DataFrame:
return pd.read_parquet(f"gs://model-outputs.catalyst.coop/sec10k/{table_name}")

def _get_model_tables() -> list[str]:
"""Return all tables produced by PUDL models or empty list if env variable not set."""
pudl_models_tables = []
if os.getenv("USE_PUDL_MODELS"):
pudl_models_tables = [
"core_sec10k__company_information",
"core_sec10k__exhibit_21_company_ownership",
"core_sec10k__filings",
"out_sec10k__parents_and_subsidiaries",
]

return pudl_models_tables
def _compute_fraction_owned(percent_ownership: pd.Series) -> pd.Series:
"""Clean percent ownership, convert to float, then convert percent to ratio."""
return (
percent_ownership.str.replace(r"(\.{2,})", r"\.", regex=True)
.replace("\\\\", "", regex=True)
.replace(".", "0.0", regex=False)
.astype("float")
) / 100.0


def _get_table_uri(table_name: str) -> str:
return f"gs://model-outputs.catalyst.coop/sec10k/{table_name}"
@asset(
io_manager_key="parquet_io_manager",
group_name="pudl_models",
)
def core_sec10k__company_information() -> pd.DataFrame:
"""Basic company information extracted from SEC10k filings."""
df = _load_table_from_gcs("core_sec10k__company_information")
df = df.rename(
columns={
"sec10k_filename": "filename_sec10k",
"block": "company_information_block",
"block_count": "company_information_block_count",
"key": "company_information_fact_name",
"value": "company_information_fact_value",
}
)

return df

def pudl_models_asset_factory(table_name: str) -> AssetsDefinition:
"""Factory function to create assets which will load pudl models tables."""

@asset(
name=table_name,
io_manager_key="parquet_io_manager",
group_name="pudl_models",
@asset(
io_manager_key="parquet_io_manager",
group_name="pudl_models",
)
def core_sec10k__exhibit_21_company_ownership() -> pd.DataFrame:
"""Company ownership information extracted from sec10k exhibit 21 attachments."""
df = _load_table_from_gcs("core_sec10k__exhibit_21_company_ownership")
df = df.rename(
columns={
"sec10k_filename": "filename_sec10k",
"subsidiary": "subsidiary_company_name",
"location": "subsidiary_location",
}
)
def _asset() -> pd.DataFrame:
return pd.read_parquet(_get_table_uri(table_name))

return _asset
# Convert ownership percentage
df["fraction_owned"] = _compute_fraction_owned(df.ownership_percentage)

return df


@asset(
io_manager_key="parquet_io_manager",
group_name="pudl_models",
)
def core_sec10k__filings() -> pd.DataFrame:
"""Metadata on all 10k filings submitted to SEC."""
df = _load_table_from_gcs("core_sec10k__filings")
df = df.rename(
columns={
"sec10k_filename": "filename_sec10k",
"form_type": "sec10k_version",
}
)

return df


@asset(
io_manager_key="parquet_io_manager",
group_name="pudl_models",
)
def out_sec10k__parents_and_subsidiaries() -> pd.DataFrame:
"""Denormalized output table with sec10k info and company ownership linked to EIA."""
df = _load_table_from_gcs("out_sec10k__parents_and_subsidiaries")
df = df.rename(
columns={
"sec10k_filename": "filename_sec10k",
"sec_company_id": "company_id_sec",
"street_address_2": "address_2",
"former_conformed_name": "company_name_former",
"location_of_inc": "location_of_incorporation",
"irs_number": "company_id_irs",
"parent_company_cik": "parent_company_central_index_key",
}
)

# Convert ownership percentage
df["fraction_owned"] = _compute_fraction_owned(df.ownership_percentage)

def get_pudl_models_assets() -> list[AssetsDefinition]:
"""Generate a collection of assets for all PUDL model tables."""
return [pudl_models_asset_factory(table) for table in _get_model_tables()]
return df
18 changes: 7 additions & 11 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition

import pudl
from pudl.analysis.pudl_models import get_pudl_models_assets
from pudl.io_managers import (
epacems_io_manager,
ferc1_dbf_sqlite_io_manager,
Expand Down Expand Up @@ -108,18 +107,15 @@
}

all_asset_modules = raw_module_groups | core_module_groups | out_module_groups
default_assets = (
list(
itertools.chain.from_iterable(
load_assets_from_modules(
modules,
group_name=group_name,
)
for group_name, modules in all_asset_modules.items()
default_assets = list(
itertools.chain.from_iterable(
load_assets_from_modules(
modules,
group_name=group_name,
)
for group_name, modules in all_asset_modules.items()
)
+ get_pudl_models_assets()
)
) + load_assets_from_modules([pudl.analysis.pudl_models])

default_asset_checks = list(
itertools.chain.from_iterable(
Expand Down
89 changes: 42 additions & 47 deletions src/pudl/metadata/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,6 @@
"or charging rent to host cell antennas on transmission towers."
),
},
"block": {
"type": "string",
"description": "Title of block of data.",
},
"block_count": {
"type": "integer",
"description": "Some blocks are repeated, `block_count` defines the index of the data block.",
},
"boiler_fuel_code_1": {
"type": "string",
"description": "The code representing the most predominant type of energy that fuels the boiler.",
Expand Down Expand Up @@ -750,10 +742,34 @@
"description": "Average monthly coincident peak (CP) demand (for requirements purchases, and any transactions involving demand charges). Monthly CP demand is the metered demand during the hour (60-minute integration) in which the supplier's system reaches its monthly peak. In megawatts.",
"unit": "MW",
},
"company_id_sec": {
"type": "string",
"description": "Assigned identifier for the company.",
},
"company_information_block": {
"type": "string",
"description": "Title of block of data.",
},
"company_information_block_count": {
"type": "integer",
"description": "Some blocks are repeated, this defines the index of the data block.",
},
"company_information_fact_name": {
"type": "string",
"description": "Name of fact within a ``company_information_block``.",
},
"company_information_fact_value": {
"type": "string",
"description": "Value corresponding with ``company_information_fact_name``.",
},
"company_name": {
"type": "string",
"description": "Name of company submitting SEC 10k filing.",
},
"company_name_former": {
"type": "string",
"description": "Former name of company.",
},
"company_name_raw": {
"type": "string",
"description": "Uncleaned name of company.",
Expand Down Expand Up @@ -1643,14 +1659,14 @@
"type": "number",
"description": "Total number of flue gas desulfurization unit scrubber trains.",
},
"former_conformed_name": {
"type": "string",
"description": "Former name of the company.",
},
"filer_count": {
"type": "integer",
"description": "Index company information as some filings contain information for multiple companies.",
},
"filename_sec10k": {
"type": "string",
"description": "Name of filing as provided by SEC data portal.",
},
"files_10k": {
"type": "boolean",
"description": "Indicates whether the company files a 10-K.",
Expand Down Expand Up @@ -1731,10 +1747,6 @@
"type": "integer",
"description": "Four-digit year that applies to a particular forecasted value.",
},
"form_type": {
"type": "string",
"description": "Specific version of SEC 10k filed.",
},
"fraction_owned": {
"type": "number",
"description": "Proportion of generator ownership attributable to this utility.",
Expand Down Expand Up @@ -2377,7 +2389,10 @@
"description": "Original reported energy interchange between adjacent balancing authorities.",
"unit": "MWh",
},
"irs_number": {"type": "string", "description": "ID of the company with the IRS."},
"company_id_irs": {
"type": "string",
"description": "ID of the company with the IRS.",
},
"is_epacems_state": {
"type": "boolean",
"description": (
Expand All @@ -2393,10 +2408,6 @@
"type": "string",
"description": "The code of the plant's ISO or RTO. NA if not reported in that year.",
},
"key": {
"type": "string",
"description": "Key within block.",
},
"kwh_per_customer": {"type": "number", "description": "kWh per customer."},
"label": {
"type": "string",
Expand Down Expand Up @@ -2509,11 +2520,7 @@
),
"unit": "MW",
},
"location": {
"type": "string",
"description": "Location of subsidiary company.",
},
"location_of_inc": {
"location_of_incorporation": {
"type": "string",
"description": "Cleaned location of incorporation of the company.",
},
Expand Down Expand Up @@ -3350,10 +3357,6 @@
"description": "Whether each generator record is for one owner or represents a total of all ownerships.",
"constraints": {"enum": ["owned", "total"]},
},
"ownership_percentage": {
"type": "string",
"description": "Percentage of subsidiary company owned by parent.",
},
"ownership_code": {
"type": "string",
"description": "Identifies the ownership for each generator.",
Expand All @@ -3362,9 +3365,9 @@
"type": "boolean",
"description": "Whether a plant part record has a duplicate record with different ownership status.",
},
"parent_company_cik": {
"parent_company_central_index_key": {
"type": "string",
"description": "CIK of the company's parent company.",
"description": "Central index key (CIK) of the company's parent company.",
},
"particulate_control_id_eia": {
"type": "string",
Expand Down Expand Up @@ -3981,13 +3984,9 @@
"description": "Estimated electricity demand scaled by the total sales within a state.",
"unit": "MWh",
},
"sec_company_id": {
"sec10k_version": {
"type": "string",
"description": "Assigned identifier for the company.",
},
"sec10k_filename": {
"type": "string",
"description": "Name of filing as provided by SEC data portal.",
"description": "Specific version of SEC 10k filed.",
},
"secondary_transportation_mode_code": {
"type": "string",
Expand Down Expand Up @@ -4384,10 +4383,6 @@
# TODO: Disambiguate as this means different things in different tables.
"description": "Physical street address.",
},
"street_address_2": {
"type": "string",
"description": "Secondary street address.",
},
"subcritical_tech": {
"type": "boolean",
"description": "Indicates whether the generator uses subcritical technology",
Expand Down Expand Up @@ -4417,10 +4412,14 @@
"type": "integer",
"description": "Sub-plant ID links EPA CEMS emissions units to EIA units.",
},
"subsidiary": {
"subsidiary_company_name": {
"type": "string",
"description": "Name of subsidiary company.",
},
"subsidiary_location": {
"type": "string",
"description": "Location of subsidiary company.",
},
"sulfur_content_pct": {
"type": "number",
"description": "Sulfur content percentage by weight to the nearest 0.01 percent.",
Expand Down Expand Up @@ -4904,10 +4903,6 @@
"type": "date",
"description": "The record in the changelog is valid until this date. The record is valid from the report_date up until but not including the valid_until_date.",
},
"value": {
"type": "string",
"description": "String value of data point.",
},
"variable_peak_pricing": {
"type": "boolean",
"description": (
Expand Down
Loading

0 comments on commit 26140ec

Please sign in to comment.