diff --git a/CHANGELOG.md b/CHANGELOG.md index 59af816c3..11428fadf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,14 @@ ## dbt-databricks 1.9.2 (TBD) +### Features + +- Update snapshot materialization to support new snapshot features ([904](https://github.com/databricks/dbt-databricks/pull/904)) + ### Under the Hood - Refactor global state reading ([888](https://github.com/databricks/dbt-databricks/pull/888)) +- Switch to relation.render() for string interpolation ([903](https://github.com/databricks/dbt-databricks/pull/903)) +- Ensure retry defaults for PySQL ([907](https://github.com/databricks/dbt-databricks/pull/907)) ## dbt-databricks 1.9.1 (December 16, 2024) diff --git a/dbt/adapters/databricks/credentials.py b/dbt/adapters/databricks/credentials.py index 387d0e76b..250e79f65 100644 --- a/dbt/adapters/databricks/credentials.py +++ b/dbt/adapters/databricks/credentials.py @@ -74,8 +74,10 @@ class DatabricksCredentials(Credentials): @classmethod def __pre_deserialize__(cls, data: dict[Any, Any]) -> dict[Any, Any]: data = super().__pre_deserialize__(data) - if "database" not in data: - data["database"] = None + data.setdefault("database", None) + data.setdefault("connection_parameters", {}) + data["connection_parameters"].setdefault("_retry_stop_after_attempts_count", 30) + data["connection_parameters"].setdefault("_retry_delay_max", 60) return data def __post_init__(self) -> None: diff --git a/dbt/include/databricks/macros/adapters/columns.sql b/dbt/include/databricks/macros/adapters/columns.sql index 7fe40e6f9..d9b041ccd 100644 --- a/dbt/include/databricks/macros/adapters/columns.sql +++ b/dbt/include/databricks/macros/adapters/columns.sql @@ -32,13 +32,13 @@ {{ exceptions.raise_compiler_error('Delta format required for dropping columns from tables') }} {% endif %} {%- call statement('alter_relation_remove_columns') -%} - ALTER TABLE {{ relation }} DROP COLUMNS ({{ api.Column.format_remove_column_list(remove_columns) }}) + ALTER TABLE {{ relation.render() }} DROP COLUMNS ({{ api.Column.format_remove_column_list(remove_columns) }}) {%- endcall -%} {% endif %} {% if add_columns %} {%- call statement('alter_relation_add_columns') -%} - ALTER TABLE {{ relation }} ADD COLUMNS ({{ api.Column.format_add_column_list(add_columns) }}) + ALTER TABLE {{ relation.render() }} ADD COLUMNS ({{ api.Column.format_add_column_list(add_columns) }}) {%- endcall -%} {% endif %} {% endmacro %} \ No newline at end of file diff --git a/dbt/include/databricks/macros/adapters/persist_docs.sql b/dbt/include/databricks/macros/adapters/persist_docs.sql index 873039e8d..a8ad48bab 100644 --- a/dbt/include/databricks/macros/adapters/persist_docs.sql +++ b/dbt/include/databricks/macros/adapters/persist_docs.sql @@ -4,7 +4,7 @@ {% set comment = column['description'] %} {% set escaped_comment = comment | replace('\'', '\\\'') %} {% set comment_query %} - alter table {{ relation }} change column {{ api.Column.get_name(column) }} comment '{{ escaped_comment }}'; + alter table {{ relation.render()|lower }} change column {{ api.Column.get_name(column) }} comment '{{ escaped_comment }}'; {% endset %} {% do run_query(comment_query) %} {% endfor %} @@ -13,7 +13,7 @@ {% macro alter_table_comment(relation, model) %} {% set comment_query %} - comment on table {{ relation|lower }} is '{{ model.description | replace("'", "\\'") }}' + comment on table {{ relation.render()|lower }} is '{{ model.description | replace("'", "\\'") }}' {% endset %} {% do run_query(comment_query) %} {% endmacro %} diff --git a/dbt/include/databricks/macros/materializations/snapshot.sql b/dbt/include/databricks/macros/materializations/snapshot.sql index 3d1236a15..3a513a24d 100644 --- a/dbt/include/databricks/macros/materializations/snapshot.sql +++ b/dbt/include/databricks/macros/materializations/snapshot.sql @@ -1,27 +1,4 @@ -{% macro databricks_build_snapshot_staging_table(strategy, sql, target_relation) %} - {% set tmp_identifier = target_relation.identifier ~ '__dbt_tmp' %} - - {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, - schema=target_relation.schema, - database=target_relation.database, - type='view') -%} - - {% set select = snapshot_staging_table(strategy, sql, target_relation) %} - - {# needs to be a non-temp view so that its columns can be ascertained via `describe` #} - {% call statement('build_snapshot_staging_relation') %} - create or replace view {{ tmp_relation }} - as - {{ select }} - {% endcall %} - - {% do return(tmp_relation) %} -{% endmacro %} - - {% materialization snapshot, adapter='databricks' %} - {%- set config = model['config'] -%} - {%- set target_table = model.get('alias', model.get('name')) -%} {%- set strategy_name = config.get('strategy') -%} @@ -62,47 +39,43 @@ {{ run_hooks(pre_hooks, inside_transaction=True) }} {% set strategy_macro = strategy_dispatch(strategy_name) %} - {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %} + {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", model['config'], target_relation_exists) %} {% if not target_relation_exists %} {% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %} + {% set build_or_select_sql = build_sql %} {% set final_sql = create_table_as(False, target_relation, build_sql) %} - {% call statement('main') %} - {{ final_sql }} - {% endcall %} - - {% do persist_docs(target_relation, model, for_relation=False) %} - {% else %} - {{ adapter.valid_snapshot_target(target_relation) }} + {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} - {% if target_relation.database is none %} - {% set staging_table = spark_build_snapshot_staging_table(strategy, sql, target_relation) %} - {% else %} - {% set staging_table = databricks_build_snapshot_staging_table(strategy, sql, target_relation) %} - {% endif %} + {{ adapter.assert_valid_snapshot_target_given_strategy(target_relation, columns, strategy) }} + + {% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %} + {% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %} -- this may no-op if the database does not require column expansion {% do adapter.expand_target_column_types(from_relation=staging_table, to_relation=target_relation) %} + {% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %} + {% if unique_key | is_list %} + {% for key in strategy.unique_key %} + {{ remove_columns.append('dbt_unique_key_' + loop.index|string) }} + {{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }} + {% endfor %} + {% endif %} + {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | rejectattr('name', 'in', remove_columns) | list %} {% do create_columns(target_relation, missing_columns) %} {% set source_columns = adapter.get_columns_in_relation(staging_table) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | rejectattr('name', 'in', remove_columns) | list %} {% set quoted_source_columns = [] %} @@ -117,23 +90,34 @@ ) %} - {% call statement_with_staging_table('main', staging_table) %} - {{ final_sql }} - {% endcall %} + {% endif %} - {% do persist_docs(target_relation, model, for_relation=True) %} - {% endif %} + {{ check_time_data_types(build_or_select_sql) }} - {% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode) %} - {% do apply_grants(target_relation, grant_config, should_revoke) %} + {% call statement('main') %} + {{ final_sql }} + {% endcall %} - {% do persist_constraints(target_relation, model) %} + {% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + + {% if not target_relation_exists %} + {% do create_indexes(target_relation) %} + {% endif %} {{ run_hooks(post_hooks, inside_transaction=True) }} {{ adapter.commit() }} + {% if staging_table is defined %} + {% do post_snapshot(staging_table) %} + {% endif %} + + {% do persist_constraints(target_relation, model) %} + {{ run_hooks(post_hooks, inside_transaction=False) }} {{ return({'relations': [target_relation]}) }} diff --git a/dbt/include/databricks/macros/relations/constraints.sql b/dbt/include/databricks/macros/relations/constraints.sql index 34d5b4157..6d999823a 100644 --- a/dbt/include/databricks/macros/relations/constraints.sql +++ b/dbt/include/databricks/macros/relations/constraints.sql @@ -134,7 +134,7 @@ {% set column = model.get('columns', {}).get(column_name) %} {% if column %} {% set quoted_name = api.Column.get_name(column) %} - {% set stmt = "alter table " ~ relation ~ " change column " ~ quoted_name ~ " set not null " ~ (constraint.expression or "") ~ ";" %} + {% set stmt = "alter table " ~ relation.render() ~ " change column " ~ quoted_name ~ " set not null " ~ (constraint.expression or "") ~ ";" %} {% do statements.append(stmt) %} {% else %} {{ exceptions.warn('not_null constraint on invalid column: ' ~ column_name) }} @@ -170,7 +170,7 @@ {{ exceptions.raise_compiler_error("Constraint of type " ~ type ~ " with no `name` provided, and no md5 utility.") }} {% endif %} {% endif %} - {% set stmt = "alter table " ~ relation ~ " add constraint " ~ name ~ " primary key(" ~ joined_names ~ ");" %} + {% set stmt = "alter table " ~ relation.render() ~ " add constraint " ~ name ~ " primary key(" ~ joined_names ~ ");" %} {% do statements.append(stmt) %} {% elif type == 'foreign_key' %} @@ -191,7 +191,7 @@ {% endif %} {% endif %} - {% set stmt = "alter table " ~ relation ~ " add constraint " ~ name ~ " foreign key" ~ constraint.get('expression') %} + {% set stmt = "alter table " ~ relation.render() ~ " add constraint " ~ name ~ " foreign key" ~ constraint.get('expression') %} {% else %} {% set column_names = constraint.get('columns', []) %} {% if column and not column_names %} @@ -227,7 +227,7 @@ {% endif %} {% endif %} - {% set stmt = "alter table " ~ relation ~ " add constraint " ~ name ~ " foreign key(" ~ joined_names ~ ") references " ~ parent %} + {% set stmt = "alter table " ~ relation.render() ~ " add constraint " ~ name ~ " foreign key(" ~ joined_names ~ ") references " ~ parent %} {% set parent_columns = constraint.get('to_columns') %} {% if parent_columns %} {% set stmt = stmt ~ "(" ~ parent_columns|join(", ") ~ ")"%} @@ -251,7 +251,7 @@ {{ exceptions.raise_compiler_error("Constraint of type " ~ type ~ " with no `name` provided, and no md5 utility.") }} {% endif %} {% endif %} - {% set stmt = "alter table " ~ relation ~ " add constraint " ~ name ~ " " ~ expression ~ ";" %} + {% set stmt = "alter table " ~ relation.render() ~ " add constraint " ~ name ~ " " ~ expression ~ ";" %} {% do statements.append(stmt) %} {% elif constraint.get('warn_unsupported') %} {{ exceptions.warn("unsupported constraint type: " ~ constraint.type)}} diff --git a/dbt/include/databricks/macros/relations/liquid_clustering.sql b/dbt/include/databricks/macros/relations/liquid_clustering.sql index 3cf810488..b30269fd9 100644 --- a/dbt/include/databricks/macros/relations/liquid_clustering.sql +++ b/dbt/include/databricks/macros/relations/liquid_clustering.sql @@ -15,7 +15,7 @@ {%- set cols = [cols] -%} {%- endif -%} {%- call statement('set_cluster_by_columns') -%} - ALTER {{ target_relation.type }} {{ target_relation }} CLUSTER BY ({{ cols | join(', ') }}) + ALTER {{ target_relation.type }} {{ target_relation.render() }} CLUSTER BY ({{ cols | join(', ') }}) {%- endcall -%} {%- endif %} {%- endmacro -%} \ No newline at end of file diff --git a/dbt/include/databricks/macros/relations/materialized_view/alter.sql b/dbt/include/databricks/macros/relations/materialized_view/alter.sql index 41d9bed06..d406508d2 100644 --- a/dbt/include/databricks/macros/relations/materialized_view/alter.sql +++ b/dbt/include/databricks/macros/relations/materialized_view/alter.sql @@ -46,6 +46,6 @@ {% macro get_alter_mv_internal(relation, configuration_changes) %} {%- set refresh = configuration_changes.changes["refresh"] -%} -- Currently only schedule can be altered - ALTER MATERIALIZED VIEW {{ relation }} + ALTER MATERIALIZED VIEW {{ relation.render() }} {{ get_alter_sql_refresh_schedule(refresh.cron, refresh.time_zone_value, refresh.is_altered) -}} {% endmacro %} diff --git a/dbt/include/databricks/macros/relations/materialized_view/drop.sql b/dbt/include/databricks/macros/relations/materialized_view/drop.sql index f3774119d..4def47441 100644 --- a/dbt/include/databricks/macros/relations/materialized_view/drop.sql +++ b/dbt/include/databricks/macros/relations/materialized_view/drop.sql @@ -1,3 +1,3 @@ {% macro databricks__drop_materialized_view(relation) -%} - drop materialized view if exists {{ relation }} + drop materialized view if exists {{ relation.render() }} {%- endmacro %} diff --git a/dbt/include/databricks/macros/relations/materialized_view/refresh.sql b/dbt/include/databricks/macros/relations/materialized_view/refresh.sql index 10a8346be..9967eb21f 100644 --- a/dbt/include/databricks/macros/relations/materialized_view/refresh.sql +++ b/dbt/include/databricks/macros/relations/materialized_view/refresh.sql @@ -1,3 +1,3 @@ {% macro databricks__refresh_materialized_view(relation) -%} - refresh materialized view {{ relation }} + refresh materialized view {{ relation.render() }} {% endmacro %} diff --git a/dbt/include/databricks/macros/relations/streaming_table/drop.sql b/dbt/include/databricks/macros/relations/streaming_table/drop.sql index c8e0cd839..1cfc246a8 100644 --- a/dbt/include/databricks/macros/relations/streaming_table/drop.sql +++ b/dbt/include/databricks/macros/relations/streaming_table/drop.sql @@ -3,5 +3,5 @@ {%- endmacro %} {% macro default__drop_streaming_table(relation) -%} - drop table if exists {{ relation }} + drop table if exists {{ relation.render() }} {%- endmacro %} diff --git a/dbt/include/databricks/macros/relations/streaming_table/refresh.sql b/dbt/include/databricks/macros/relations/streaming_table/refresh.sql index 66b86f1f4..94c96d5cc 100644 --- a/dbt/include/databricks/macros/relations/streaming_table/refresh.sql +++ b/dbt/include/databricks/macros/relations/streaming_table/refresh.sql @@ -3,7 +3,7 @@ {%- endmacro %} {% macro databricks__refresh_streaming_table(relation, sql) -%} - create or refresh streaming table {{ relation }} + create or refresh streaming table {{ relation.render() }} as {{ sql }} {% endmacro %} diff --git a/dbt/include/databricks/macros/relations/table/create.sql b/dbt/include/databricks/macros/relations/table/create.sql index 9e74d57d6..b2aba2fec 100644 --- a/dbt/include/databricks/macros/relations/table/create.sql +++ b/dbt/include/databricks/macros/relations/table/create.sql @@ -5,9 +5,9 @@ {%- else -%} {%- set file_format = config.get('file_format', default='delta') -%} {% if file_format == 'delta' %} - create or replace table {{ relation }} + create or replace table {{ relation.render() }} {% else %} - create table {{ relation }} + create table {{ relation.render() }} {% endif %} {%- set contract_config = config.get('contract') -%} {% if contract_config and contract_config.enforced %} diff --git a/dbt/include/databricks/macros/relations/table/drop.sql b/dbt/include/databricks/macros/relations/table/drop.sql index 3a7d0ced0..7bce7cf46 100644 --- a/dbt/include/databricks/macros/relations/table/drop.sql +++ b/dbt/include/databricks/macros/relations/table/drop.sql @@ -1,3 +1,3 @@ {% macro databricks__drop_table(relation) -%} - drop table if exists {{ relation }} + drop table if exists {{ relation.render() }} {%- endmacro %} diff --git a/dbt/include/databricks/macros/relations/tags.sql b/dbt/include/databricks/macros/relations/tags.sql index 3467631df..fb39c3785 100644 --- a/dbt/include/databricks/macros/relations/tags.sql +++ b/dbt/include/databricks/macros/relations/tags.sql @@ -33,7 +33,7 @@ {%- endmacro -%} {% macro alter_set_tags(relation, tags) -%} - ALTER {{ relation.type }} {{ relation }} SET TAGS ( + ALTER {{ relation.type }} {{ relation.render() }} SET TAGS ( {% for tag in tags -%} '{{ tag }}' = '{{ tags[tag] }}' {%- if not loop.last %}, {% endif -%} {%- endfor %} @@ -41,7 +41,7 @@ {%- endmacro -%} {% macro alter_unset_tags(relation, tags) -%} - ALTER {{ relation.type }} {{ relation }} UNSET TAGS ( + ALTER {{ relation.type }} {{ relation.render() }} UNSET TAGS ( {% for tag in tags -%} '{{ tag }}' {%- if not loop.last %}, {%- endif %} {%- endfor %} diff --git a/dbt/include/databricks/macros/relations/tblproperties.sql b/dbt/include/databricks/macros/relations/tblproperties.sql index 34b6488f7..b11fd7b5c 100644 --- a/dbt/include/databricks/macros/relations/tblproperties.sql +++ b/dbt/include/databricks/macros/relations/tblproperties.sql @@ -17,7 +17,7 @@ {% set tblproperty_statment = databricks__tblproperties_clause(tblproperties) %} {% if tblproperty_statment %} {%- call statement('apply_tblproperties') -%} - ALTER {{ relation.type }} {{ relation }} SET {{ tblproperty_statment}} + ALTER {{ relation.type }} {{ relation.render() }} SET {{ tblproperty_statment}} {%- endcall -%} {% endif %} {%- endmacro -%} diff --git a/dbt/include/databricks/macros/relations/view/create.sql b/dbt/include/databricks/macros/relations/view/create.sql index 096e12de4..5399b4ef5 100644 --- a/dbt/include/databricks/macros/relations/view/create.sql +++ b/dbt/include/databricks/macros/relations/view/create.sql @@ -1,5 +1,5 @@ {% macro databricks__create_view_as(relation, sql) -%} - create or replace view {{ relation }} + create or replace view {{ relation.render() }} {% if config.persist_column_docs() -%} {% set model_columns = model.columns %} {% set query_columns = get_columns_in_query(sql) %} diff --git a/dbt/include/databricks/macros/relations/view/drop.sql b/dbt/include/databricks/macros/relations/view/drop.sql index aa199d760..9098c925f 100644 --- a/dbt/include/databricks/macros/relations/view/drop.sql +++ b/dbt/include/databricks/macros/relations/view/drop.sql @@ -1,3 +1,3 @@ {% macro databricks__drop_view(relation) -%} - drop view if exists {{ relation }} + drop view if exists {{ relation.render() }} {%- endmacro %} diff --git a/tests/functional/adapter/simple_snapshot/test_new_record_mode.py b/tests/functional/adapter/simple_snapshot/test_new_record_mode.py new file mode 100644 index 000000000..6b436a311 --- /dev/null +++ b/tests/functional/adapter/simple_snapshot/test_new_record_mode.py @@ -0,0 +1,74 @@ +import pytest + +from dbt.tests.adapter.simple_snapshot.new_record_mode import ( + _delete_sql, + _invalidate_sql, + _ref_snapshot_sql, + _seed_new_record_mode, + _snapshot_actual_sql, + _snapshots_yml, + _update_sql, +) +from dbt.tests.util import check_relations_equal, run_dbt + + +class TestDatabricksSnapshotNewRecordMode: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": _snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": _snapshots_yml, + "ref_snapshot.sql": _ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def seed_new_record_mode(self): + return _seed_new_record_mode + + @pytest.fixture(scope="class") + def invalidate_sql_1(self): + return _invalidate_sql.split(";", 1)[0].replace("BEGIN", "") + + @pytest.fixture(scope="class") + def invalidate_sql_2(self): + return _invalidate_sql.split(";", 1)[1].replace("END", "").replace(";", "") + + @pytest.fixture(scope="class") + def update_sql(self): + return _update_sql.replace("text", "string") + + @pytest.fixture(scope="class") + def delete_sql(self): + return _delete_sql + + def test_snapshot_new_record_mode( + self, project, seed_new_record_mode, invalidate_sql_1, invalidate_sql_2, update_sql + ): + for sql in ( + seed_new_record_mode.replace("text", "string") + .replace("TEXT", "STRING") + .replace("BEGIN", "") + .replace("END;", "") + .replace(" WITHOUT TIME ZONE", "") + .split(";") + ): + project.run_sql(sql) + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql_1) + project.run_sql(invalidate_sql_2) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + project.run_sql(_delete_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 diff --git a/tests/functional/adapter/simple_snapshot/test_various_configs.py b/tests/functional/adapter/simple_snapshot/test_various_configs.py new file mode 100644 index 000000000..18b82de00 --- /dev/null +++ b/tests/functional/adapter/simple_snapshot/test_various_configs.py @@ -0,0 +1,345 @@ +import datetime + +import pytest +from agate import Table + +from dbt.tests.adapter.simple_snapshot.fixtures import ( + create_multi_key_seed_sql, + create_multi_key_snapshot_expected_sql, + create_seed_sql, + create_snapshot_expected_sql, + model_seed_sql, + populate_multi_key_snapshot_expected_sql, + populate_snapshot_expected_sql, + populate_snapshot_expected_valid_to_current_sql, + ref_snapshot_sql, + seed_insert_sql, + seed_multi_key_insert_sql, + snapshot_actual_sql, + snapshots_multi_key_yml, + snapshots_no_column_names_yml, + snapshots_valid_to_current_yml, + snapshots_yml, + update_multi_key_sql, + update_sql, + update_with_current_sql, +) +from dbt.tests.util import ( + check_relations_equal, + get_manifest, + run_dbt, + run_dbt_and_capture, + run_sql_with_adapter, + update_config_file, +) + + +def text_replace(input: str) -> str: + return input.replace("TEXT", "STRING").replace("text", "string") + + +create_snapshot_expected_sql = text_replace(create_snapshot_expected_sql) +populate_snapshot_expected_sql = text_replace(populate_snapshot_expected_sql) +populate_snapshot_expected_valid_to_current_sql = text_replace( + populate_snapshot_expected_valid_to_current_sql +) +update_with_current_sql = text_replace(update_with_current_sql) +create_multi_key_snapshot_expected_sql = text_replace(create_multi_key_snapshot_expected_sql) +populate_multi_key_snapshot_expected_sql = text_replace(populate_multi_key_snapshot_expected_sql) +update_sql = text_replace(update_sql) +update_multi_key_sql = text_replace(update_multi_key_sql) + +invalidate_sql_1 = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set + updated_at = updated_at + interval '1 hour', + email = case when id = 20 then 'pfoxj@creativecommons.org' else 'new_' || email end +where id >= 10 and id <= 20 +""" + +invalidate_sql_2 = """ +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set + test_valid_to = updated_at + interval '1 hour' +where id >= 10 and id <= 20; +""" + +invalidate_multi_key_sql_1 = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set + updated_at = updated_at + interval '1 hour', + email = case when id1 = 20 then 'pfoxj@creativecommons.org' else 'new_' || email end +where id1 >= 10 and id1 <= 20; +""" + +invalidate_multi_key_sql_2 = """ +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set + test_valid_to = updated_at + interval '1 hour' +where id1 >= 10 and id1 <= 20; +""" + + +class BaseSnapshotColumnNames: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_snapshot_column_names(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql_1) + project.run_sql(invalidate_sql_2) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class BaseSnapshotColumnNamesFromDbtProject: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_no_column_names_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + } + } + } + + def test_snapshot_column_names_from_project(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql_1) + project.run_sql(invalidate_sql_2) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class BaseSnapshotInvalidColumnNames: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_no_column_names_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + } + } + } + + def test_snapshot_invalid_column_names(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + manifest = get_manifest(project.project_root) + snapshot_node = manifest.nodes["snapshot.test.snapshot_actual"] + snapshot_node.config.snapshot_meta_column_names == { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + + project.run_sql(invalidate_sql_1) + project.run_sql(invalidate_sql_2) + project.run_sql(update_sql) + + # Change snapshot_meta_columns and look for an error + different_columns = { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_updated_at": "test_updated_at", + } + } + } + } + update_config_file(different_columns, "dbt_project.yml") + + results, log_output = run_dbt_and_capture(["snapshot"], expect_pass=False) + assert len(results) == 1 + assert "Compilation Error in snapshot snapshot_actual" in log_output + assert "Snapshot target is missing configured columns" in log_output + + +class BaseSnapshotDbtValidToCurrent: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_valid_to_current_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_valid_to_current(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_valid_to_current_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + original_snapshot: Table = run_sql_with_adapter( + project.adapter, + "select id, test_scd_id, test_valid_to from {schema}.snapshot_actual", + "all", + ) + assert original_snapshot[0][2] == datetime.datetime( + 2099, 12, 31, 0, 0, tzinfo=datetime.timezone.utc + ) + original_row = list( + filter(lambda x: x[1] == "61ecd07d17b8a4acb57d115eebb0e2c9", original_snapshot) + ) + assert original_row[0][2] == datetime.datetime( + 2099, 12, 31, 0, 0, tzinfo=datetime.timezone.utc + ) + + project.run_sql(invalidate_sql_1) + project.run_sql(invalidate_sql_2) + project.run_sql(update_with_current_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + updated_snapshot: Table = run_sql_with_adapter( + project.adapter, + "select id, test_scd_id, test_valid_to from {schema}.snapshot_actual", + "all", + ) + print(updated_snapshot) + assert updated_snapshot[0][2] == datetime.datetime( + 2099, 12, 31, 0, 0, tzinfo=datetime.timezone.utc + ) + # Original row that was updated now has a non-current (2099/12/31) date + original_row = list( + filter(lambda x: x[1] == "61ecd07d17b8a4acb57d115eebb0e2c9", updated_snapshot) + ) + assert original_row[0][2] == datetime.datetime( + 2016, 8, 20, 16, 44, 49, tzinfo=datetime.timezone.utc + ) + updated_row = list( + filter(lambda x: x[1] == "af1f803f2179869aeacb1bfe2b23c1df", updated_snapshot) + ) + + # Updated row has a current date + assert updated_row[0][2] == datetime.datetime( + 2099, 12, 31, 0, 0, tzinfo=datetime.timezone.utc + ) + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +# This uses snapshot_meta_column_names, yaml-only snapshot def, +# and multiple keys +class BaseSnapshotMultiUniqueKey: + @pytest.fixture(scope="class") + def models(self): + return { + "seed.sql": model_seed_sql, + "snapshots.yml": snapshots_multi_key_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_multi_column_unique_key(self, project): + project.run_sql(create_multi_key_seed_sql) + project.run_sql(create_multi_key_snapshot_expected_sql) + project.run_sql(seed_multi_key_insert_sql) + project.run_sql(populate_multi_key_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_multi_key_sql_1) + project.run_sql(invalidate_multi_key_sql_2) + project.run_sql(update_multi_key_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class TestDatabricksSnapshotColumnNames(BaseSnapshotColumnNames): + pass + + +class TestDatabricksSnapshotColumnNamesFromDbtProject(BaseSnapshotColumnNamesFromDbtProject): + pass + + +class TestDatabricksSnapshotInvalidColumnNames(BaseSnapshotInvalidColumnNames): + pass + + +class TestDatabricksSnapshotDbtValidToCurrent(BaseSnapshotDbtValidToCurrent): + pass + + +class TestDatabricksSnapshotMultiUniqueKey(BaseSnapshotMultiUniqueKey): + pass