diff --git a/src/pudl/io_managers.py b/src/pudl/io_managers.py index 44e8fa4f47..fb7cca39ac 100644 --- a/src/pudl/io_managers.py +++ b/src/pudl/io_managers.py @@ -170,11 +170,11 @@ def _get_sqlalchemy_table(self, table_name: str) -> sa.Table: table: Corresponding SQL Alchemy Table in SQLiteIOManager metadata. Raises: - RuntimeError: if table_name does not exist in the SQLiteIOManager metadata. + ValueError: if table_name does not exist in the SQLiteIOManager metadata. """ sa_table = self.md.tables.get(table_name, None) if sa_table is None: - raise RuntimeError( + raise ValueError( f"{sa_table} not found in database metadata. Either add the table to " "the metadata or use a different IO Manager." ) @@ -274,12 +274,11 @@ def _handle_pandas_output(self, context: OutputContext, df: pd.DataFrame): df: dataframe to write to the database. """ table_name = self._get_table_name(context) - sa_table = self._get_sqlalchemy_table(table_name) column_difference = set(sa_table.columns.keys()) - set(df.columns) if column_difference: - raise RuntimeError( + raise ValueError( f"{table_name} dataframe is missing columns: {column_difference}" ) @@ -349,7 +348,6 @@ def load_input(self, context: InputContext) -> pd.DataFrame: name. """ table_name = self._get_table_name(context) - # Check if the table_name exists in the self.md object _ = self._get_sqlalchemy_table(table_name) @@ -398,6 +396,15 @@ def __init__( to create sqlalchemy metadata and check datatypes of dataframes. If not specified, defaults to a Package with all metadata stored in the :mod:`pudl.metadata.resources` subpackage. + + Every table that appears in `self.md` is sepcified in `self.package` + as a :class:`pudl.metadata.classes.Resources`. However, not every + :class:`pudl.metadata.classes.Resources` in `self.package` is included + in `self.md` as a table. This is because `self.package` is used to ensure + datatypes of dataframes loaded from database views are correct. However, + the metadata for views in `self.package` should not be used to create + table schemas in the database because views are just stored sql statements + and do not require a schema. timeout: How many seconds the connection should wait before raising an exception, if the database is locked by another connection. If another connection opens a transaction to modify the database, it will be locked @@ -409,6 +416,60 @@ def __init__( md = self.package.to_sql() super().__init__(base_dir, db_name, md, timeout) + def _handle_str_output(self, context: OutputContext, query: str): + """Execute a sql query on the database. + + This is used for creating output views in the database. + + Args: + context: dagster keyword that provides access output information like asset + name. + query: sql query to execute in the database. + """ + engine = self.engine + table_name = self._get_table_name(context) + + # Check if there is a Resource in self.package for table_name. + # We don't want folks creating views without adding package metadata. + try: + _ = self.package.get_resource(table_name) + except ValueError: + raise ValueError( + f"{table_name} does not appear in pudl.metadata.resources. " + "Check for typos, or add the table to the metadata and recreate the " + f"PUDL SQlite database. It's also possible that {table_name} is one of " + "the tables that does not get loaded into the PUDL SQLite DB because " + "it's a work in progress or is distributed in Apache Parquet format." + ) + + with engine.connect() as con: + # Drop the existing view if it exists and create the new view. + # TODO (bendnorman): parameterize this safely. + con.execute(f"DROP VIEW IF EXISTS {table_name}") + con.execute(query) + + def _handle_pandas_output(self, context: OutputContext, df: pd.DataFrame): + """Enforce PUDL DB schema and write dataframe to SQLite.""" + table_name = self._get_table_name(context) + # If table_name doesn't show up in the self.md object, this will raise an error + sa_table = self._get_sqlalchemy_table(table_name) + res = self.package.get_resource(table_name) + + df = res.enforce_schema(df) + + with self.engine.connect() as con: + # Remove old table records before loading to db + con.execute(sa_table.delete()) + + df.to_sql( + table_name, + con, + if_exists="append", + index=False, + chunksize=100_000, + dtype={c.name: c.type for c in sa_table.columns}, + ) + def load_input(self, context: InputContext) -> pd.DataFrame: """Load a dataframe from a sqlite database. @@ -418,14 +479,28 @@ def load_input(self, context: InputContext) -> pd.DataFrame: """ table_name = self._get_table_name(context) - # Check if the table_name exists in the self.md object - _ = self._get_sqlalchemy_table(table_name) - - engine = self.engine + # Check if there is a Resource in self.package for table_name + try: + res = self.package.get_resource(table_name) + except ValueError: + raise ValueError( + f"{table_name} does not appear in pudl.metadata.resources. " + "Check for typos, or add the table to the metadata and recreate the " + f"PUDL SQlite database. It's also possible that {table_name} is one of " + "the tables that does not get loaded into the PUDL SQLite DB because " + "it's a work in progress or is distributed in Apache Parquet format." + ) - with engine.connect() as con: + with self.engine.connect() as con: try: - df = pd.read_sql_table(table_name, con) + df = pd.concat( + [ + res.enforce_schema(chunk_df) + for chunk_df in pd.read_sql_table( + table_name, con, chunksize=100_000 + ) + ] + ) except ValueError: raise ValueError( f"{table_name} not found. Either the table was dropped " @@ -434,10 +509,10 @@ def load_input(self, context: InputContext) -> pd.DataFrame: ) if df.empty: raise AssertionError( - f"The {table_name} table is empty. Materialize " - "the {table_name} asset so it is available in the database." + f"The {table_name} table is empty. Materialize the {table_name} " + "asset so it is available in the database." ) - return pudl.metadata.fields.apply_pudl_dtypes(df) + return df @io_manager( diff --git a/src/pudl/metadata/classes.py b/src/pudl/metadata/classes.py index e264e1a5cb..1a4a92235a 100644 --- a/src/pudl/metadata/classes.py +++ b/src/pudl/metadata/classes.py @@ -711,10 +711,8 @@ def to_sql( # noqa: C901 ) elif self.type == "date": checks.append(f"{name} IS DATE({name})") - # Need to ensure that the string representation of the datetime only - # includes whole seconds or this check will fail. - # elif self.type == "datetime": - # checks.append(f"{name} IS DATETIME({name})") + elif self.type == "datetime": + checks.append(f"{name} IS DATETIME({name})") if check_values: # Field constraints if self.constraints.min_length is not None: diff --git a/src/pudl/metadata/constants.py b/src/pudl/metadata/constants.py index f817419760..a28c207f3e 100644 --- a/src/pudl/metadata/constants.py +++ b/src/pudl/metadata/constants.py @@ -5,6 +5,7 @@ import pandas as pd import pyarrow as pa import sqlalchemy as sa +from sqlalchemy.dialects.sqlite import DATETIME as SQLITE_DATETIME FIELD_DTYPES_PANDAS: dict[str, str] = { "string": "string", @@ -30,7 +31,10 @@ FIELD_DTYPES_SQL: dict[str, sa.sql.visitors.VisitableType] = { "boolean": sa.Boolean, "date": sa.Date, - "datetime": sa.DateTime, + # Ensure SQLite's string representation of datetime uses only whole seconds: + "datetime": SQLITE_DATETIME( + storage_format="%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d" + ), "integer": sa.Integer, "number": sa.Float, "string": sa.Text, diff --git a/test/unit/io_managers_test.py b/test/unit/io_managers_test.py index 02869ed0fe..eae7f4c64f 100644 --- a/test/unit/io_managers_test.py +++ b/test/unit/io_managers_test.py @@ -2,30 +2,58 @@ import pandas as pd import pytest from dagster import AssetKey, build_input_context, build_output_context -from sqlalchemy import Column, ForeignKey, Integer, MetaData, String, Table from sqlalchemy.exc import IntegrityError, OperationalError -from pudl.io_managers import ForeignKeyError, ForeignKeyErrors, SQLiteIOManager +from pudl.io_managers import ( + ForeignKeyError, + ForeignKeyErrors, + PudlSQLiteIOManager, + SQLiteIOManager, +) +from pudl.metadata.classes import Package, Resource @pytest.fixture -def sqlite_io_manager_fixture(tmp_path): - """Create a SQLiteIOManager fixture with a simple database schema.""" - md = MetaData() - artist = Table( # noqa: F841 - "artist", - md, - Column("artistid", Integer, primary_key=True), - Column("artistname", String(16), nullable=False), +def test_pkg() -> Package: + """Create a test metadata package for the io manager tests.""" + fields = [ + {"name": "artistid", "type": "integer"}, + {"name": "artistname", "type": "string", "constraints": {"required": True}}, + ] + schema = {"fields": fields, "primary_key": ["artistid"]} + artist_resource = Resource(name="artist", schema=schema) + + fields = [ + {"name": "artistid", "type": "integer"}, + {"name": "artistname", "type": "string", "constraints": {"required": True}}, + ] + schema = {"fields": fields, "primary_key": ["artistid"]} + view_resource = Resource( + name="artist_view", schema=schema, include_in_database=False ) - track = Table( # noqa: F841 - "track", - md, - Column("trackid", Integer, primary_key=True), - Column("trackname", String(16), nullable=False), - Column("trackartist", Integer, ForeignKey("artist.artistid")), + + fields = [ + {"name": "trackid", "type": "integer"}, + {"name": "trackname", "type": "string", "constraints": {"required": True}}, + {"name": "trackartist", "type": "integer"}, + ] + fkeys = [ + { + "fields": ["trackartist"], + "reference": {"resource": "artist", "fields": ["artistid"]}, + } + ] + schema = {"fields": fields, "primary_key": ["trackid"], "foreign_keys": fkeys} + track_resource = Resource(name="track", schema=schema) + return Package( + name="music", resources=[track_resource, artist_resource, view_resource] ) + +@pytest.fixture +def sqlite_io_manager_fixture(tmp_path, test_pkg): + """Create a SQLiteIOManager fixture with a simple database schema.""" + md = test_pkg.to_sql() return SQLiteIOManager(base_dir=tmp_path, db_name="pudl", md=md) @@ -105,7 +133,7 @@ def test_missing_column_error(sqlite_io_manager_fixture): } ) output_context = build_output_context(asset_key=AssetKey(asset_key)) - with pytest.raises(RuntimeError): + with pytest.raises(ValueError): manager.handle_output(output_context, artist) @@ -147,8 +175,7 @@ def test_primary_key_column_error(sqlite_io_manager_fixture): def test_incorrect_type_error(sqlite_io_manager_fixture): - """Ensure an error is thrown when a dataframe's type doesn't match the table - schema.""" + """Ensure an error is thrown when dataframe type doesn't match the table schema.""" manager = sqlite_io_manager_fixture asset_key = "artist" @@ -159,11 +186,53 @@ def test_incorrect_type_error(sqlite_io_manager_fixture): def test_missing_schema_error(sqlite_io_manager_fixture): - """Test a RuntimeError is raised when a table without a schema is loaded.""" + """Test a ValueError is raised when a table without a schema is loaded.""" manager = sqlite_io_manager_fixture asset_key = "venues" venue = pd.DataFrame({"venueid": [1], "venuename": "Vans Dive Bar"}) output_context = build_output_context(asset_key=AssetKey(asset_key)) - with pytest.raises(RuntimeError): + with pytest.raises(ValueError): manager.handle_output(output_context, venue) + + +@pytest.fixture +def pudl_sqlite_io_manager_fixture(tmp_path, test_pkg): + """Create a SQLiteIOManager fixture with a PUDL database schema.""" + return PudlSQLiteIOManager(base_dir=tmp_path, db_name="pudl", package=test_pkg) + + +def test_error_when_handling_view_without_metadata(pudl_sqlite_io_manager_fixture): + """Make sure an error is thrown when a user creates a view without metadata.""" + asset_key = "track_view" + sql_stmt = "CREATE VIEW track_view AS SELECT * FROM track;" + output_context = build_output_context(asset_key=AssetKey(asset_key)) + with pytest.raises(ValueError): + pudl_sqlite_io_manager_fixture.handle_output(output_context, sql_stmt) + + +def test_handling_view_with_metadata(pudl_sqlite_io_manager_fixture): + """Make sure an users can create and load views when it has metadata.""" + # Create some sample data + asset_key = "artist" + artist = pd.DataFrame({"artistid": [1], "artistname": ["Co-op Mop"]}) + output_context = build_output_context(asset_key=AssetKey(asset_key)) + pudl_sqlite_io_manager_fixture.handle_output(output_context, artist) + + # create the view + asset_key = "artist_view" + sql_stmt = "CREATE VIEW artist_view AS SELECT * FROM artist;" + output_context = build_output_context(asset_key=AssetKey(asset_key)) + pudl_sqlite_io_manager_fixture.handle_output(output_context, sql_stmt) + + # read the view data as a dataframe + input_context = build_input_context(asset_key=AssetKey(asset_key)) + pudl_sqlite_io_manager_fixture.load_input(input_context) + + +def test_error_when_reading_view_without_metadata(pudl_sqlite_io_manager_fixture): + """Make sure and error is thrown when a user loads a view without metadata.""" + asset_key = "track_view" + input_context = build_input_context(asset_key=AssetKey(asset_key)) + with pytest.raises(ValueError): + pudl_sqlite_io_manager_fixture.load_input(input_context)