Skip to content

Commit

Permalink
add Capability.TableLastModifiedMetadataBatch support
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk committed Mar 15, 2024
1 parent 9d999e5 commit bc13026
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 6 deletions.
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class SnowflakeAdapter(SQLAdapter):
{
Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full),
Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full),
Capability.TableLastModifiedMetadataBatch: CapabilitySupport(support=Support.Full),
}
)

Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# install latest changes in dbt-core
# TODO: how to automate switching from develop to version branches?
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-adapters.git
git+https://github.com/dbt-labs/dbt-adapters.git@batch-metadata-freshness
git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter

# if version 1.x or greater -> pin to major version
Expand Down
101 changes: 96 additions & 5 deletions tests/functional/adapter/test_get_last_relation_modified.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import os
import pytest
from unittest import mock

from dbt.adapters.snowflake.impl import SnowflakeAdapter
from dbt.adapters.capability import Capability, CapabilityDict
from dbt.cli.main import dbtRunner


Expand All @@ -15,18 +18,28 @@
- name: test_table
"""

freshness_metadata_schema_batch_yml = """version: 2
sources:
- name: test_source
freshness:
warn_after: {count: 10, period: hour}
error_after: {count: 1, period: day}
schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}"
tables:
- name: test_table
- name: test_table2
- name: test_table_with_loaded_at_field
loaded_at_field: my_loaded_at_field
"""

class TestGetLastRelationModified:

class SetupGetLastRelationModified:
@pytest.fixture(scope="class", autouse=True)
def set_env_vars(self, project):
os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema
yield
del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]

@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_via_metadata_schema_yml}

@pytest.fixture(scope="class")
def custom_schema(self, project, set_env_vars):
with project.adapter.connection_named("__test"):
Expand All @@ -41,6 +54,12 @@ def custom_schema(self, project, set_env_vars):
with project.adapter.connection_named("__test"):
project.adapter.drop_schema(relation)


class TestGetLastRelationModified(SetupGetLastRelationModified):
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_via_metadata_schema_yml}

def test_get_last_relation_modified(self, project, set_env_vars, custom_schema):
project.run_sql(
f"create table {custom_schema}.test_table (id integer autoincrement, name varchar(100) not null);"
Expand All @@ -58,3 +77,75 @@ def probe(e):

# The 'source freshness' command should succeed without warnings or errors.
assert not warning_or_error


class TestGetLastRelationModifiedBatch(SetupGetLastRelationModified):
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_metadata_schema_batch_yml}

def get_freshness_result_for_table(self, table_name, results):
for result in results:
if result.node.name == table_name:
return result
return None

def test_get_last_relation_modified_batch(self, project, set_env_vars, custom_schema):
project.run_sql(
f"create table {custom_schema}.test_table (id integer autoincrement, name varchar(100) not null);"
)
project.run_sql(
f"create table {custom_schema}.test_table2 (id integer autoincrement, name varchar(100) not null);"
)
project.run_sql(
f"create table {custom_schema}.test_table_with_loaded_at_field as (select 1 as id, timestamp '2009-09-15 10:59:43' as my_loaded_at_field);"
)

runner = dbtRunner()
freshness_results_batch = runner.invoke(["source", "freshness"]).result

assert len(freshness_results_batch) == 3
test_table_batch_result = self.get_freshness_result_for_table(
"test_table", freshness_results_batch
)
test_table2_batch_result = self.get_freshness_result_for_table(
"test_table2", freshness_results_batch
)
test_table_with_loaded_at_field_batch_result = self.get_freshness_result_for_table(
"test_table_with_loaded_at_field", freshness_results_batch
)

capabilities_no_batch = CapabilityDict(
{
capability: support
for capability, support in SnowflakeAdapter._capabilities.items()
if capability != Capability.TableLastModifiedMetadataBatch
}
)
with mock.patch.object(
SnowflakeAdapter, "capabilities", return_value=capabilities_no_batch
):
freshness_results = runner.invoke(["source", "freshness"]).result

assert len(freshness_results) == 3
test_table_result = self.get_freshness_result_for_table("test_table", freshness_results)
test_table2_result = self.get_freshness_result_for_table("test_table2", freshness_results)
test_table_with_loaded_at_field_result = self.get_freshness_result_for_table(
"test_table_with_loaded_at_field", freshness_results
)

# assert results between batch vs non-batch freshness strategy are equivalent
assert test_table_result.status == test_table_batch_result.status
assert test_table_result.max_loaded_at == test_table_batch_result.max_loaded_at

assert test_table2_result.status == test_table2_batch_result.status
assert test_table2_result.max_loaded_at == test_table2_batch_result.max_loaded_at

assert (
test_table_with_loaded_at_field_batch_result.status
== test_table_with_loaded_at_field_result.status
)
assert (
test_table_with_loaded_at_field_batch_result.max_loaded_at
== test_table_with_loaded_at_field_result.max_loaded_at
)

0 comments on commit bc13026

Please sign in to comment.