Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Capability.TableLastModifiedMetadataBatch support #928

Merged
merged 14 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240404-171704.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support TableLastModifiedMetadataBatch capability
time: 2024-04-04T17:17:04.853047-07:00
custom:
Author: michelleark
Issue: "965"
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class SnowflakeAdapter(SQLAdapter):
{
Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full),
Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full),
Capability.TableLastModifiedMetadataBatch: CapabilitySupport(support=Support.Full),
}
)

Expand Down
6 changes: 3 additions & 3 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# install latest changes in dbt-core
# TODO: how to automate switching from develop to version branches?
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-adapters.git
git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-core.git@batch-metadata-freshness#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-adapters.git@batch-metadata-freshness
git+https://github.com/dbt-labs/dbt-adapters.git@batch-metadata-freshness#subdirectory=dbt-tests-adapter

# if version 1.x or greater -> pin to major version
# if version 0.x -> pin to minor
Expand Down
104 changes: 98 additions & 6 deletions tests/functional/adapter/test_get_last_relation_modified.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import os
import pytest
from unittest import mock

from dbt.adapters.snowflake.impl import SnowflakeAdapter
from dbt.adapters.capability import Capability, CapabilityDict
from dbt.cli.main import dbtRunner


freshness_via_metadata_schema_yml = """version: 2
freshness_via_metadata_schema_yml = """
sources:
- name: test_source
freshness:
Expand All @@ -15,18 +18,28 @@
- name: test_table
"""

freshness_metadata_schema_batch_yml = """
sources:
- name: test_source
freshness:
warn_after: {count: 10, period: hour}
error_after: {count: 1, period: day}
schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}"
tables:
- name: test_table
- name: test_table2
- name: test_table_with_loaded_at_field
loaded_at_field: my_loaded_at_field
"""

class TestGetLastRelationModified:

class SetupGetLastRelationModified:
@pytest.fixture(scope="class", autouse=True)
def set_env_vars(self, project):
os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema
yield
del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]

@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_via_metadata_schema_yml}

@pytest.fixture(scope="class")
def custom_schema(self, project, set_env_vars):
with project.adapter.connection_named("__test"):
Expand All @@ -41,6 +54,12 @@ def custom_schema(self, project, set_env_vars):
with project.adapter.connection_named("__test"):
project.adapter.drop_schema(relation)


class TestGetLastRelationModified(SetupGetLastRelationModified):
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_via_metadata_schema_yml}

def test_get_last_relation_modified(self, project, set_env_vars, custom_schema):
project.run_sql(
f"create table {custom_schema}.test_table (id integer autoincrement, name varchar(100) not null);"
Expand All @@ -58,3 +77,76 @@ def probe(e):

# The 'source freshness' command should succeed without warnings or errors.
assert not warning_or_error


class TestGetLastRelationModifiedBatch(SetupGetLastRelationModified):
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_metadata_schema_batch_yml}

def get_freshness_result_for_table(self, table_name, results):
for result in results:
if result.node.name == table_name:
return result
return None

def test_get_last_relation_modified_batch(self, project, set_env_vars, custom_schema):
project.run_sql(
f"create table {custom_schema}.test_table (id integer autoincrement, name varchar(100) not null);"
)
project.run_sql(
f"create table {custom_schema}.test_table2 (id integer autoincrement, name varchar(100) not null);"
)
project.run_sql(
f"create table {custom_schema}.test_table_with_loaded_at_field as (select 1 as id, timestamp '2009-09-15 10:59:43' as my_loaded_at_field);"
)

runner = dbtRunner()
freshness_results_batch = runner.invoke(["source", "freshness"]).result

assert len(freshness_results_batch) == 3
test_table_batch_result = self.get_freshness_result_for_table(
"test_table", freshness_results_batch
)
test_table2_batch_result = self.get_freshness_result_for_table(
"test_table2", freshness_results_batch
)
test_table_with_loaded_at_field_batch_result = self.get_freshness_result_for_table(
"test_table_with_loaded_at_field", freshness_results_batch
)

# Remove TableLastModifiedMetadataBatch and run freshness on same input without batch strategy
capabilities_no_batch = CapabilityDict(
{
capability: support
for capability, support in SnowflakeAdapter.capabilities().items()
if capability != Capability.TableLastModifiedMetadataBatch
}
)
with mock.patch.object(
SnowflakeAdapter, "capabilities", return_value=capabilities_no_batch
):
freshness_results = runner.invoke(["source", "freshness"]).result

assert len(freshness_results) == 3
test_table_result = self.get_freshness_result_for_table("test_table", freshness_results)
test_table2_result = self.get_freshness_result_for_table("test_table2", freshness_results)
test_table_with_loaded_at_field_result = self.get_freshness_result_for_table(
"test_table_with_loaded_at_field", freshness_results
)

# assert results between batch vs non-batch freshness strategy are equivalent
assert test_table_result.status == test_table_batch_result.status
assert test_table_result.max_loaded_at == test_table_batch_result.max_loaded_at

assert test_table2_result.status == test_table2_batch_result.status
assert test_table2_result.max_loaded_at == test_table2_batch_result.max_loaded_at

assert (
test_table_with_loaded_at_field_batch_result.status
== test_table_with_loaded_at_field_result.status
)
assert (
test_table_with_loaded_at_field_batch_result.max_loaded_at
== test_table_with_loaded_at_field_result.max_loaded_at
)
Loading