Skip to content

Commit

Permalink
Merge pull request #2459 from catalyst-cooperative/sqlite-iomanager-d…
Browse files Browse the repository at this point in the history
…yptes-memory

Use enforce_schema() and read-chunking in PudlSQLiteIOManager.
  • Loading branch information
bendnorman authored Mar 31, 2023
2 parents 01bd9af + 775be55 commit 7ca27c2
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 40 deletions.
103 changes: 89 additions & 14 deletions src/pudl/io_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,11 @@ def _get_sqlalchemy_table(self, table_name: str) -> sa.Table:
table: Corresponding SQL Alchemy Table in SQLiteIOManager metadata.
Raises:
RuntimeError: if table_name does not exist in the SQLiteIOManager metadata.
ValueError: if table_name does not exist in the SQLiteIOManager metadata.
"""
sa_table = self.md.tables.get(table_name, None)
if sa_table is None:
raise RuntimeError(
raise ValueError(
f"{sa_table} not found in database metadata. Either add the table to "
"the metadata or use a different IO Manager."
)
Expand Down Expand Up @@ -274,12 +274,11 @@ def _handle_pandas_output(self, context: OutputContext, df: pd.DataFrame):
df: dataframe to write to the database.
"""
table_name = self._get_table_name(context)

sa_table = self._get_sqlalchemy_table(table_name)

column_difference = set(sa_table.columns.keys()) - set(df.columns)
if column_difference:
raise RuntimeError(
raise ValueError(
f"{table_name} dataframe is missing columns: {column_difference}"
)

Expand Down Expand Up @@ -349,7 +348,6 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
name.
"""
table_name = self._get_table_name(context)

# Check if the table_name exists in the self.md object
_ = self._get_sqlalchemy_table(table_name)

Expand Down Expand Up @@ -398,6 +396,15 @@ def __init__(
to create sqlalchemy metadata and check datatypes of dataframes. If not
specified, defaults to a Package with all metadata stored in the
:mod:`pudl.metadata.resources` subpackage.
Every table that appears in `self.md` is sepcified in `self.package`
as a :class:`pudl.metadata.classes.Resources`. However, not every
:class:`pudl.metadata.classes.Resources` in `self.package` is included
in `self.md` as a table. This is because `self.package` is used to ensure
datatypes of dataframes loaded from database views are correct. However,
the metadata for views in `self.package` should not be used to create
table schemas in the database because views are just stored sql statements
and do not require a schema.
timeout: How many seconds the connection should wait before raising an
exception, if the database is locked by another connection. If another
connection opens a transaction to modify the database, it will be locked
Expand All @@ -409,6 +416,60 @@ def __init__(
md = self.package.to_sql()
super().__init__(base_dir, db_name, md, timeout)

def _handle_str_output(self, context: OutputContext, query: str):
"""Execute a sql query on the database.
This is used for creating output views in the database.
Args:
context: dagster keyword that provides access output information like asset
name.
query: sql query to execute in the database.
"""
engine = self.engine
table_name = self._get_table_name(context)

# Check if there is a Resource in self.package for table_name.
# We don't want folks creating views without adding package metadata.
try:
_ = self.package.get_resource(table_name)
except ValueError:
raise ValueError(
f"{table_name} does not appear in pudl.metadata.resources. "
"Check for typos, or add the table to the metadata and recreate the "
f"PUDL SQlite database. It's also possible that {table_name} is one of "
"the tables that does not get loaded into the PUDL SQLite DB because "
"it's a work in progress or is distributed in Apache Parquet format."
)

with engine.connect() as con:
# Drop the existing view if it exists and create the new view.
# TODO (bendnorman): parameterize this safely.
con.execute(f"DROP VIEW IF EXISTS {table_name}")
con.execute(query)

def _handle_pandas_output(self, context: OutputContext, df: pd.DataFrame):
"""Enforce PUDL DB schema and write dataframe to SQLite."""
table_name = self._get_table_name(context)
# If table_name doesn't show up in the self.md object, this will raise an error
sa_table = self._get_sqlalchemy_table(table_name)
res = self.package.get_resource(table_name)

df = res.enforce_schema(df)

with self.engine.connect() as con:
# Remove old table records before loading to db
con.execute(sa_table.delete())

df.to_sql(
table_name,
con,
if_exists="append",
index=False,
chunksize=100_000,
dtype={c.name: c.type for c in sa_table.columns},
)

def load_input(self, context: InputContext) -> pd.DataFrame:
"""Load a dataframe from a sqlite database.
Expand All @@ -418,14 +479,28 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
"""
table_name = self._get_table_name(context)

# Check if the table_name exists in the self.md object
_ = self._get_sqlalchemy_table(table_name)

engine = self.engine
# Check if there is a Resource in self.package for table_name
try:
res = self.package.get_resource(table_name)
except ValueError:
raise ValueError(
f"{table_name} does not appear in pudl.metadata.resources. "
"Check for typos, or add the table to the metadata and recreate the "
f"PUDL SQlite database. It's also possible that {table_name} is one of "
"the tables that does not get loaded into the PUDL SQLite DB because "
"it's a work in progress or is distributed in Apache Parquet format."
)

with engine.connect() as con:
with self.engine.connect() as con:
try:
df = pd.read_sql_table(table_name, con)
df = pd.concat(
[
res.enforce_schema(chunk_df)
for chunk_df in pd.read_sql_table(
table_name, con, chunksize=100_000
)
]
)
except ValueError:
raise ValueError(
f"{table_name} not found. Either the table was dropped "
Expand All @@ -434,10 +509,10 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
)
if df.empty:
raise AssertionError(
f"The {table_name} table is empty. Materialize "
"the {table_name} asset so it is available in the database."
f"The {table_name} table is empty. Materialize the {table_name} "
"asset so it is available in the database."
)
return pudl.metadata.fields.apply_pudl_dtypes(df)
return df


@io_manager(
Expand Down
6 changes: 2 additions & 4 deletions src/pudl/metadata/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,10 +711,8 @@ def to_sql( # noqa: C901
)
elif self.type == "date":
checks.append(f"{name} IS DATE({name})")
# Need to ensure that the string representation of the datetime only
# includes whole seconds or this check will fail.
# elif self.type == "datetime":
# checks.append(f"{name} IS DATETIME({name})")
elif self.type == "datetime":
checks.append(f"{name} IS DATETIME({name})")
if check_values:
# Field constraints
if self.constraints.min_length is not None:
Expand Down
6 changes: 5 additions & 1 deletion src/pudl/metadata/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pandas as pd
import pyarrow as pa
import sqlalchemy as sa
from sqlalchemy.dialects.sqlite import DATETIME as SQLITE_DATETIME

FIELD_DTYPES_PANDAS: dict[str, str] = {
"string": "string",
Expand All @@ -30,7 +31,10 @@
FIELD_DTYPES_SQL: dict[str, sa.sql.visitors.VisitableType] = {
"boolean": sa.Boolean,
"date": sa.Date,
"datetime": sa.DateTime,
# Ensure SQLite's string representation of datetime uses only whole seconds:
"datetime": SQLITE_DATETIME(
storage_format="%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d"
),
"integer": sa.Integer,
"number": sa.Float,
"string": sa.Text,
Expand Down
111 changes: 90 additions & 21 deletions test/unit/io_managers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,58 @@
import pandas as pd
import pytest
from dagster import AssetKey, build_input_context, build_output_context
from sqlalchemy import Column, ForeignKey, Integer, MetaData, String, Table
from sqlalchemy.exc import IntegrityError, OperationalError

from pudl.io_managers import ForeignKeyError, ForeignKeyErrors, SQLiteIOManager
from pudl.io_managers import (
ForeignKeyError,
ForeignKeyErrors,
PudlSQLiteIOManager,
SQLiteIOManager,
)
from pudl.metadata.classes import Package, Resource


@pytest.fixture
def sqlite_io_manager_fixture(tmp_path):
"""Create a SQLiteIOManager fixture with a simple database schema."""
md = MetaData()
artist = Table( # noqa: F841
"artist",
md,
Column("artistid", Integer, primary_key=True),
Column("artistname", String(16), nullable=False),
def test_pkg() -> Package:
"""Create a test metadata package for the io manager tests."""
fields = [
{"name": "artistid", "type": "integer"},
{"name": "artistname", "type": "string", "constraints": {"required": True}},
]
schema = {"fields": fields, "primary_key": ["artistid"]}
artist_resource = Resource(name="artist", schema=schema)

fields = [
{"name": "artistid", "type": "integer"},
{"name": "artistname", "type": "string", "constraints": {"required": True}},
]
schema = {"fields": fields, "primary_key": ["artistid"]}
view_resource = Resource(
name="artist_view", schema=schema, include_in_database=False
)
track = Table( # noqa: F841
"track",
md,
Column("trackid", Integer, primary_key=True),
Column("trackname", String(16), nullable=False),
Column("trackartist", Integer, ForeignKey("artist.artistid")),

fields = [
{"name": "trackid", "type": "integer"},
{"name": "trackname", "type": "string", "constraints": {"required": True}},
{"name": "trackartist", "type": "integer"},
]
fkeys = [
{
"fields": ["trackartist"],
"reference": {"resource": "artist", "fields": ["artistid"]},
}
]
schema = {"fields": fields, "primary_key": ["trackid"], "foreign_keys": fkeys}
track_resource = Resource(name="track", schema=schema)
return Package(
name="music", resources=[track_resource, artist_resource, view_resource]
)


@pytest.fixture
def sqlite_io_manager_fixture(tmp_path, test_pkg):
"""Create a SQLiteIOManager fixture with a simple database schema."""
md = test_pkg.to_sql()
return SQLiteIOManager(base_dir=tmp_path, db_name="pudl", md=md)


Expand Down Expand Up @@ -105,7 +133,7 @@ def test_missing_column_error(sqlite_io_manager_fixture):
}
)
output_context = build_output_context(asset_key=AssetKey(asset_key))
with pytest.raises(RuntimeError):
with pytest.raises(ValueError):
manager.handle_output(output_context, artist)


Expand Down Expand Up @@ -147,8 +175,7 @@ def test_primary_key_column_error(sqlite_io_manager_fixture):


def test_incorrect_type_error(sqlite_io_manager_fixture):
"""Ensure an error is thrown when a dataframe's type doesn't match the table
schema."""
"""Ensure an error is thrown when dataframe type doesn't match the table schema."""
manager = sqlite_io_manager_fixture

asset_key = "artist"
Expand All @@ -159,11 +186,53 @@ def test_incorrect_type_error(sqlite_io_manager_fixture):


def test_missing_schema_error(sqlite_io_manager_fixture):
"""Test a RuntimeError is raised when a table without a schema is loaded."""
"""Test a ValueError is raised when a table without a schema is loaded."""
manager = sqlite_io_manager_fixture

asset_key = "venues"
venue = pd.DataFrame({"venueid": [1], "venuename": "Vans Dive Bar"})
output_context = build_output_context(asset_key=AssetKey(asset_key))
with pytest.raises(RuntimeError):
with pytest.raises(ValueError):
manager.handle_output(output_context, venue)


@pytest.fixture
def pudl_sqlite_io_manager_fixture(tmp_path, test_pkg):
"""Create a SQLiteIOManager fixture with a PUDL database schema."""
return PudlSQLiteIOManager(base_dir=tmp_path, db_name="pudl", package=test_pkg)


def test_error_when_handling_view_without_metadata(pudl_sqlite_io_manager_fixture):
"""Make sure an error is thrown when a user creates a view without metadata."""
asset_key = "track_view"
sql_stmt = "CREATE VIEW track_view AS SELECT * FROM track;"
output_context = build_output_context(asset_key=AssetKey(asset_key))
with pytest.raises(ValueError):
pudl_sqlite_io_manager_fixture.handle_output(output_context, sql_stmt)


def test_handling_view_with_metadata(pudl_sqlite_io_manager_fixture):
"""Make sure an users can create and load views when it has metadata."""
# Create some sample data
asset_key = "artist"
artist = pd.DataFrame({"artistid": [1], "artistname": ["Co-op Mop"]})
output_context = build_output_context(asset_key=AssetKey(asset_key))
pudl_sqlite_io_manager_fixture.handle_output(output_context, artist)

# create the view
asset_key = "artist_view"
sql_stmt = "CREATE VIEW artist_view AS SELECT * FROM artist;"
output_context = build_output_context(asset_key=AssetKey(asset_key))
pudl_sqlite_io_manager_fixture.handle_output(output_context, sql_stmt)

# read the view data as a dataframe
input_context = build_input_context(asset_key=AssetKey(asset_key))
pudl_sqlite_io_manager_fixture.load_input(input_context)


def test_error_when_reading_view_without_metadata(pudl_sqlite_io_manager_fixture):
"""Make sure and error is thrown when a user loads a view without metadata."""
asset_key = "track_view"
input_context = build_input_context(asset_key=AssetKey(asset_key))
with pytest.raises(ValueError):
pudl_sqlite_io_manager_fixture.load_input(input_context)

0 comments on commit 7ca27c2

Please sign in to comment.