From 4f7a40e3312b651fabc62fb86cdd4a6118353c57 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Tue, 17 Sep 2024 10:05:19 -0400 Subject: [PATCH 01/29] changelog --- .changes/unreleased/Features-20240917-100505.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20240917-100505.yaml diff --git a/.changes/unreleased/Features-20240917-100505.yaml b/.changes/unreleased/Features-20240917-100505.yaml new file mode 100644 index 000000000..1f55ff23c --- /dev/null +++ b/.changes/unreleased/Features-20240917-100505.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add support for Iceberg table format in Dynamic ta +time: 2024-09-17T10:05:05.609859-04:00 +custom: + Author: mikealfare + Issue: "123" From 71265922108043b6c01c9e50262a47476dda41c4 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Tue, 17 Sep 2024 10:08:32 -0400 Subject: [PATCH 02/29] changelog --- .changes/unreleased/Features-20240917-100505.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.changes/unreleased/Features-20240917-100505.yaml b/.changes/unreleased/Features-20240917-100505.yaml index 1f55ff23c..22cabc904 100644 --- a/.changes/unreleased/Features-20240917-100505.yaml +++ b/.changes/unreleased/Features-20240917-100505.yaml @@ -1,6 +1,6 @@ kind: Features -body: Add support for Iceberg table format in Dynamic ta +body: Add support for Iceberg table format in Dynamic Tables time: 2024-09-17T10:05:05.609859-04:00 custom: Author: mikealfare - Issue: "123" + Issue: "1183" From 7d05939adec66fa187fa8eb45811da331d35b87d Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Tue, 17 Sep 2024 15:35:40 -0400 Subject: [PATCH 03/29] first draft for iceberg dynamic tables --- dbt/adapters/snowflake/relation.py | 7 ++ .../snowflake/relation_configs/__init__.py | 6 +- .../snowflake/relation_configs/catalog.py | 117 ++++++++++++++++++ .../relation_configs/dynamic_table.py | 20 ++- .../snowflake/relation_configs/formats.py | 5 + .../macros/relations/dynamic_table/alter.sql | 24 ++-- .../macros/relations/dynamic_table/create.sql | 95 +++++++++++--- .../relations/dynamic_table/describe.sql | 51 +++++--- .../macros/relations/dynamic_table/drop.sql | 2 +- .../relations/dynamic_table/refresh.sql | 4 +- .../relations/dynamic_table/replace.sql | 94 +++++++++++--- .../snowflake/macros/utils/optional.sql | 3 + 12 files changed, 362 insertions(+), 66 deletions(-) create mode 100644 dbt/adapters/snowflake/relation_configs/catalog.py create mode 100644 dbt/include/snowflake/macros/utils/optional.sql diff --git a/dbt/adapters/snowflake/relation.py b/dbt/adapters/snowflake/relation.py index 224b2b75e..f56bfe828 100644 --- a/dbt/adapters/snowflake/relation.py +++ b/dbt/adapters/snowflake/relation.py @@ -17,6 +17,7 @@ from dbt_common.events.functions import fire_event, warn_or_error from dbt.adapters.snowflake.relation_configs import ( + SnowflakeCatalogConfigChange, SnowflakeDynamicTableConfig, SnowflakeDynamicTableConfigChangeset, SnowflakeDynamicTableRefreshModeConfigChange, @@ -114,6 +115,12 @@ def dynamic_table_config_changeset( context=new_dynamic_table.refresh_mode, ) + if new_dynamic_table.catalog.table_format != existing_dynamic_table.catalog.table_format: + config_change_collection.catalog = SnowflakeCatalogConfigChange( + action=RelationConfigChangeAction.create, + context=new_dynamic_table.catalog, + ) + if config_change_collection.has_changes: return config_change_collection return None diff --git a/dbt/adapters/snowflake/relation_configs/__init__.py b/dbt/adapters/snowflake/relation_configs/__init__.py index 61941ab50..fec9d8a54 100644 --- a/dbt/adapters/snowflake/relation_configs/__init__.py +++ b/dbt/adapters/snowflake/relation_configs/__init__.py @@ -1,3 +1,7 @@ +from dbt.adapters.snowflake.relation_configs.catalog import ( + SnowflakeCatalogConfig, + SnowflakeCatalogConfigChange, +) from dbt.adapters.snowflake.relation_configs.dynamic_table import ( SnowflakeDynamicTableConfig, SnowflakeDynamicTableConfigChangeset, @@ -5,9 +9,9 @@ SnowflakeDynamicTableWarehouseConfigChange, SnowflakeDynamicTableTargetLagConfigChange, ) +from dbt.adapters.snowflake.relation_configs.formats import TableFormat from dbt.adapters.snowflake.relation_configs.policies import ( SnowflakeIncludePolicy, SnowflakeQuotePolicy, SnowflakeRelationType, ) -from dbt.adapters.snowflake.relation_configs.formats import TableFormat diff --git a/dbt/adapters/snowflake/relation_configs/catalog.py b/dbt/adapters/snowflake/relation_configs/catalog.py new file mode 100644 index 000000000..d2c8db3fc --- /dev/null +++ b/dbt/adapters/snowflake/relation_configs/catalog.py @@ -0,0 +1,117 @@ +from dataclasses import dataclass +from typing import Any, Dict, Optional, TYPE_CHECKING, Set + +if TYPE_CHECKING: + import agate + +from dbt.adapters.relation_configs import ( + RelationConfigChange, + RelationResults, + RelationConfigValidationMixin, + RelationConfigValidationRule, +) +from dbt.adapters.contracts.relation import RelationConfig +from dbt_common.exceptions import DbtConfigError +from typing_extensions import Self + +from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase +from dbt.adapters.snowflake.relation_configs.formats import TableFormat + + +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class SnowflakeCatalogConfig(SnowflakeRelationConfigBase, RelationConfigValidationMixin): + """ + This config follow the specs found here: + https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table + https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table + + The following parameters are configurable by dbt: + - table_format: format for interfacing with the table, e.g. default, iceberg + - external_volume: name of the external volume in Snowflake + - base_location: the directory within the external volume that contains the data + *Note*: This directory can’t be changed after you create a table. + + The following parameters are not currently configurable by dbt: + - name: snowflake + """ + + table_format: Optional[TableFormat] = TableFormat.default() + name: Optional[str] = "SNOWFLAKE" + external_volume: Optional[str] = None + base_location: Optional[str] = None + + def validation_rules(self) -> Set[RelationConfigValidationRule]: + return { + RelationConfigValidationRule( + (self.table_format == "default") + or (self.table_format == "iceberg" and self.base_location is not None), + DbtConfigError("Please provide a `base_location` when using iceberg"), + ), + RelationConfigValidationRule( + (self.table_format == "default") + or (self.table_format == "iceberg" and self.name == "SNOWFLAKE"), + DbtConfigError( + "Only Snowflake catalogs are currently supported when using iceberg" + ), + ), + } + + @classmethod + def from_dict(cls, config_dict: Dict[str, Any]) -> Self: + kwargs_dict = { + "name": config_dict.get("name"), + "external_volume": config_dict.get("external_volume"), + "base_location": config_dict.get("base_location"), + } + if table_format := config_dict.get("table_format"): + kwargs_dict.update({"table_format": TableFormat(table_format)}) + return super().from_dict(kwargs_dict) + + @classmethod + def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]: + + if relation_config.config.extra.get("table_format") is None: + return {"table_format": "default"} + + config_dict = { + "table_format": relation_config.config.extra.get("table_format"), + "name": "SNOWFLAKE", # this is not currently configurable + } + + if external_volume := relation_config.config.extra.get("external_volume"): + config_dict.update({"external_volume": external_volume}) + + if base_location := relation_config.config.extra.get("base_location_subpath"): + config_dict.update({"base_location": base_location}) + + return config_dict + + @classmethod + def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: + catalog_results: "agate.Table" = relation_results["catalog"] + + if catalog_results is None or len(catalog_results) == 0: + return {"table_format": "default"} + + catalog: "agate.Row" = catalog_results.rows[0] + config_dict = {"table_format": "iceberg"} + + if name := catalog.get("catalog_name"): + config_dict.update({"name": name}) + + if external_volume := catalog.get("external_volume_name"): + config_dict.update({"external_volume": external_volume}) + + if base_location := catalog.get("base_location"): + config_dict.update({"base_location": base_location}) + + return config_dict + + +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class SnowflakeCatalogConfigChange(RelationConfigChange): + context: Optional[str] = None + + @property + def requires_full_refresh(self) -> bool: + return True diff --git a/dbt/adapters/snowflake/relation_configs/dynamic_table.py b/dbt/adapters/snowflake/relation_configs/dynamic_table.py index 2e227d3a4..7bae857c2 100644 --- a/dbt/adapters/snowflake/relation_configs/dynamic_table.py +++ b/dbt/adapters/snowflake/relation_configs/dynamic_table.py @@ -8,6 +8,11 @@ from typing_extensions import Self from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase +from dbt.adapters.snowflake.relation_configs.catalog import ( + SnowflakeCatalogConfig, + SnowflakeCatalogConfigChange, +) + if TYPE_CHECKING: import agate @@ -55,11 +60,12 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): query: str target_lag: str snowflake_warehouse: str + catalog: SnowflakeCatalogConfig refresh_mode: Optional[RefreshMode] = RefreshMode.default() initialize: Optional[Initialize] = Initialize.default() @classmethod - def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig": + def from_dict(cls, config_dict: Dict[str, Any]) -> Self: kwargs_dict = { "name": cls._render_part(ComponentName.Identifier, config_dict.get("name")), "schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")), @@ -69,12 +75,12 @@ def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig": "query": config_dict.get("query"), "target_lag": config_dict.get("target_lag"), "snowflake_warehouse": config_dict.get("snowflake_warehouse"), + "catalog": SnowflakeCatalogConfig.from_dict(config_dict), "refresh_mode": config_dict.get("refresh_mode"), "initialize": config_dict.get("initialize"), } - dynamic_table: "SnowflakeDynamicTableConfig" = super().from_dict(kwargs_dict) - return dynamic_table + return super().from_dict(kwargs_dict) @classmethod def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]: @@ -85,6 +91,7 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any "query": relation_config.compiled_code, "target_lag": relation_config.config.extra.get("target_lag"), "snowflake_warehouse": relation_config.config.extra.get("snowflake_warehouse"), + "catalog": SnowflakeCatalogConfig.parse_relation_config(relation_config), } if refresh_mode := relation_config.config.extra.get("refresh_mode"): @@ -96,7 +103,7 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any return config_dict @classmethod - def parse_relation_results(cls, relation_results: RelationResults) -> Dict: + def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: dynamic_table: "agate.Row" = relation_results["dynamic_table"].rows[0] config_dict = { @@ -106,6 +113,7 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict: "query": dynamic_table.get("text"), "target_lag": dynamic_table.get("target_lag"), "snowflake_warehouse": dynamic_table.get("warehouse"), + "catalog": SnowflakeCatalogConfig.parse_relation_results(relation_results), "refresh_mode": dynamic_table.get("refresh_mode"), # we don't get initialize since that's a one-time scheduler attribute, not a DT attribute } @@ -145,6 +153,7 @@ class SnowflakeDynamicTableConfigChangeset: target_lag: Optional[SnowflakeDynamicTableTargetLagConfigChange] = None snowflake_warehouse: Optional[SnowflakeDynamicTableWarehouseConfigChange] = None refresh_mode: Optional[SnowflakeDynamicTableRefreshModeConfigChange] = None + catalog: Optional[SnowflakeCatalogConfigChange] = None @property def requires_full_refresh(self) -> bool: @@ -157,9 +166,10 @@ def requires_full_refresh(self) -> bool: else False ), self.refresh_mode.requires_full_refresh if self.refresh_mode else False, + self.catalog.requires_full_refresh if self.catalog else False, ] ) @property def has_changes(self) -> bool: - return any([self.target_lag, self.snowflake_warehouse, self.refresh_mode]) + return any([self.target_lag, self.snowflake_warehouse, self.refresh_mode, self.catalog]) diff --git a/dbt/adapters/snowflake/relation_configs/formats.py b/dbt/adapters/snowflake/relation_configs/formats.py index 460241d9d..b6bb0bdda 100644 --- a/dbt/adapters/snowflake/relation_configs/formats.py +++ b/dbt/adapters/snowflake/relation_configs/formats.py @@ -1,4 +1,5 @@ from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11 +from typing_extensions import Self class TableFormat(StrEnum): @@ -10,5 +11,9 @@ class TableFormat(StrEnum): DEFAULT = "default" ICEBERG = "iceberg" + @classmethod + def default(cls) -> Self: + return cls("default") + def __str__(self): return self.value diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/alter.sql b/dbt/include/snowflake/macros/relations/dynamic_table/alter.sql index f4b1be699..633d74700 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/alter.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/alter.sql @@ -4,22 +4,22 @@ target_relation, sql ) -%} - {{- log('Applying ALTER to: ' ~ existing_relation) -}} +{{- log('Applying ALTER to: ' ~ existing_relation) -}} - {% if configuration_changes.requires_full_refresh %} - {{- get_replace_sql(existing_relation, target_relation, sql) -}} +{% if configuration_changes.requires_full_refresh %} +{{- get_replace_sql(existing_relation, target_relation, sql) -}} - {% else %} +{% else %} - {%- set target_lag = configuration_changes.target_lag -%} - {%- if target_lag -%}{{- log('Applying UPDATE TARGET_LAG to: ' ~ existing_relation) -}}{%- endif -%} - {%- set snowflake_warehouse = configuration_changes.snowflake_warehouse -%} - {%- if snowflake_warehouse -%}{{- log('Applying UPDATE WAREHOUSE to: ' ~ existing_relation) -}}{%- endif -%} +{%- set target_lag = configuration_changes.target_lag -%} +{%- if target_lag -%}{{- log('Applying UPDATE TARGET_LAG to: ' ~ existing_relation) -}}{%- endif -%} +{%- set snowflake_warehouse = configuration_changes.snowflake_warehouse -%} +{%- if snowflake_warehouse -%}{{- log('Applying UPDATE WAREHOUSE to: ' ~ existing_relation) -}}{%- endif -%} - alter dynamic table {{ existing_relation }} set - {% if target_lag %}target_lag = '{{ target_lag.context }}'{% endif %} - {% if snowflake_warehouse %}warehouse = {{ snowflake_warehouse.context }}{% endif %} +alter dynamic table {{ existing_relation }} set + {% if target_lag %}target_lag = '{{ target_lag.context }}'{% endif %} + {% if snowflake_warehouse %}warehouse = {{ snowflake_warehouse.context }}{% endif %} - {%- endif -%} +{%- endif -%} {%- endmacro %} diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/create.sql b/dbt/include/snowflake/macros/relations/dynamic_table/create.sql index 253788779..4220362f1 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/create.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/create.sql @@ -1,18 +1,85 @@ {% macro snowflake__get_create_dynamic_table_as_sql(relation, sql) -%} +{#- + Produce DDL that creates a dynamic table - {%- set dynamic_table = relation.from_config(config.model) -%} - - create dynamic table {{ relation }} - target_lag = '{{ dynamic_table.target_lag }}' - warehouse = {{ dynamic_table.snowflake_warehouse }} - {% if dynamic_table.refresh_mode %} - refresh_mode = {{ dynamic_table.refresh_mode }} - {% endif %} - {% if dynamic_table.initialize %} - initialize = {{ dynamic_table.initialize }} - {% endif %} - as ( - {{ sql }} - ) + Args: + - relation: Union[SnowflakeRelation, str] + - SnowflakeRelation - required for relation.render() + - str - is already the rendered relation name + - sql: str - the code defining the model + Globals: + - config: NodeConfig - contains the attribution required to produce a SnowflakeDynamicTableConfig + Returns: + A valid DDL statement which will result in a new dynamic table. +-#} + +{%- set dynamic_table = relation.from_config(config.model) -%} + +{%- if dynamic_table.catalog.table_format = "iceberg" -%} +{{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} +{%- else -%} +{{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} +{%- endif -%} + +{%- endmacro %} + + +{% macro _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) -%} +{#- + Produce DDL that creates a standard dynamic table + + This follows the syntax outlined here: + https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#syntax + + Args: + - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table + - relation: Union[SnowflakeRelation, str] + - SnowflakeRelation - required for relation.render() + - str - is already the rendered relation name + - sql: str - the code defining the model + Returns: + A valid DDL statement which will result in a new dynamic standard table. +-#} + +create dynamic table {{ relation }} + target_lag = '{{ dynamic_table.target_lag }}' + warehouse = {{ dynamic_table.snowflake_warehouse }} + {{ optional('refresh_mode', dynamic_table.refresh_mode) }} + {{ optional('initialize', dynamic_table.initialize) }} + as ( + {{ sql }} + ) + +{%- endmacro %} + + +{% macro _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) -%} +{#- + Produce DDL that creates a dynamic iceberg table + + This follows the syntax outlined here: + https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table + + Args: + - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table + - relation: Union[SnowflakeRelation, str] + - SnowflakeRelation - required for relation.render() + - str - is already the rendered relation name + - sql: str - the code defining the model + Returns: + A valid DDL statement which will result in a new dynamic iceberg table. +-#} + +create dynamic iceberg table {{ relation }} + target_lag = '{{ dynamic_table.target_lag }}' + warehouse = {{ dynamic_table.snowflake_warehouse }} + {{ optional('external_volume', dynamic_table.catalog.external_volume) }} + {{ optional('catalog', dynamic_table.catalog.name) }} + base_location = {{ dynamic_table.catalog.base_location }} + {{ optional('refresh_mode', dynamic_table.refresh_mode) }} + {{ optional('initialize', dynamic_table.initialize) }} + as ( + {{ sql }} + ) {%- endmacro %} diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql b/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql index cc79328fe..5815231a1 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql @@ -1,20 +1,37 @@ {% macro snowflake__describe_dynamic_table(relation) %} - {%- set _dynamic_table_sql -%} - show dynamic tables - like '{{ relation.identifier }}' - in schema {{ relation.database }}.{{ relation.schema }} - ; - select - "name", - "schema_name", - "database_name", - "text", - "target_lag", - "warehouse", - "refresh_mode" - from table(result_scan(last_query_id())) - {%- endset %} - {% set _dynamic_table = run_query(_dynamic_table_sql) %} +{%- set _dynamic_table_sql -%} +show dynamic tables + like '{{ relation.identifier }}' + in schema {{ relation.database }}.{{ relation.schema }} +; +select + "name", + "schema_name", + "database_name", + "text", + "target_lag", + "warehouse", + "refresh_mode" +from table(result_scan(last_query_id())) +{%- endset %} +{% set _dynamic_table = run_query(_dynamic_table_sql) %} - {% do return({'dynamic_table': _dynamic_table}) %} +{% if adapter.behavior.enable_iceberg_materializations %} +{%- set _catalog_sql -%} +show iceberg tables + like '{{ relation.identifier }}' + in schema {{ relation.database }}.{{ relation.schema }} +; +select + "catalog_name", + "external_volume_name", + "base_location" +from table(result_scan(last_query_id())) +{%- endset %} +{% set _catalog = run_query(_catalog_sql) %} +{% else %} +{% set _catalog = none %} +{% endif %} + +{% do return({'dynamic_table': _dynamic_table, 'catalog': _catalog}) %} {% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/drop.sql b/dbt/include/snowflake/macros/relations/dynamic_table/drop.sql index 577bd06a0..7b3a83409 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/drop.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/drop.sql @@ -1,3 +1,3 @@ {% macro snowflake__get_drop_dynamic_table_sql(relation) %} - drop dynamic table if exists {{ relation }} +drop dynamic table if exists {{ relation }} {% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/refresh.sql b/dbt/include/snowflake/macros/relations/dynamic_table/refresh.sql index 5c6af1bda..06650a826 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/refresh.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/refresh.sql @@ -1,5 +1,5 @@ {% macro snowflake__refresh_dynamic_table(relation) -%} - {{- log('Applying REFRESH to: ' ~ relation) -}} +{{- log('Applying REFRESH to: ' ~ relation) -}} - alter dynamic table {{ relation }} refresh +alter dynamic table {{ relation }} refresh {%- endmacro %} diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql b/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql index dbe27d66e..2e3bb12a5 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql @@ -1,18 +1,84 @@ {% macro snowflake__get_replace_dynamic_table_sql(relation, sql) -%} +{#- + Produce DDL that replaces a dynamic table with a new dynamic table - {%- set dynamic_table = relation.from_config(config.model) -%} - - create or replace dynamic table {{ relation }} - target_lag = '{{ dynamic_table.target_lag }}' - warehouse = {{ dynamic_table.snowflake_warehouse }} - {% if dynamic_table.refresh_mode %} - refresh_mode = {{ dynamic_table.refresh_mode }} - {% endif %} - {% if dynamic_table.initialize %} - initialize = {{ dynamic_table.initialize }} - {% endif %} - as ( - {{ sql }} - ) + Args: + - relation: Union[SnowflakeRelation, str] + - SnowflakeRelation - required for relation.render() + - str - is already the rendered relation name + - sql: str - the code defining the model + Globals: + - config: NodeConfig - contains the attribution required to produce a SnowflakeDynamicTableConfig + Returns: + A valid DDL statement which will result in a new dynamic table. +-#} + +{%- set dynamic_table = relation.from_config(config.model) -%} + +{%- if dynamic_table.catalog.table_format = "iceberg" -%} +{{ _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} +{%- else -%} +{{ _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} +{%- endif -%} + +{%- endmacro %} + +{% macro _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) -%} +{#- + Produce DDL that replaces a standard dynamic table with a new standard dynamic table + + This follows the syntax outlined here: + https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#syntax + + Args: + - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table + - relation: Union[SnowflakeRelation, str] + - SnowflakeRelation - required for relation.render() + - str - is already the rendered relation name + - sql: str - the code defining the model + Returns: + A valid DDL statement which will result in a new dynamic standard table. +-#} + +create or replace dynamic table {{ relation }} + target_lag = '{{ dynamic_table.target_lag }}' + warehouse = {{ dynamic_table.snowflake_warehouse }} + {{ optional('refresh_mode', dynamic_table.refresh_mode) }} + {{ optional('initialize', dynamic_table.initialize) }} + as ( + {{ sql }} + ) + +{%- endmacro %} + + +{% macro _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) -%} +{#- + Produce DDL that replaces a dynamic iceberg table with a new dynamic iceberg table + + This follows the syntax outlined here: + https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table + + Args: + - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table + - relation: Union[SnowflakeRelation, str] + - SnowflakeRelation - required for relation.render() + - str - is already the rendered relation name + - sql: str - the code defining the model + Returns: + A valid DDL statement which will result in a new dynamic iceberg table. +-#} + +create or replace dynamic iceberg table {{ relation }} + target_lag = '{{ dynamic_table.target_lag }}' + warehouse = {{ dynamic_table.snowflake_warehouse }} + {{ optional('external_volume', dynamic_table.catalog.external_volume) }} + {{ optional('catalog', dynamic_table.catalog.name) }} + base_location = {{ dynamic_table.catalog.base_location }} + {{ optional('refresh_mode', dynamic_table.refresh_mode) }} + {{ optional('initialize', dynamic_table.initialize) }} + as ( + {{ sql }} + ) {%- endmacro %} diff --git a/dbt/include/snowflake/macros/utils/optional.sql b/dbt/include/snowflake/macros/utils/optional.sql new file mode 100644 index 000000000..95f53d4dd --- /dev/null +++ b/dbt/include/snowflake/macros/utils/optional.sql @@ -0,0 +1,3 @@ +{% macro optional(name, value, quote_char = '') %} +{% if value is not none %}{{ name }} = {{ quote_char }}{{ value }}{{ quote_char }}{% endif %} +{% endmacro %} From 01000a5ee9a360d383e570adca8ac1f9608d3d2a Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Tue, 17 Sep 2024 16:19:04 -0400 Subject: [PATCH 04/29] fix comparison operator --- dbt/include/snowflake/macros/relations/dynamic_table/create.sql | 2 +- .../snowflake/macros/relations/dynamic_table/replace.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/create.sql b/dbt/include/snowflake/macros/relations/dynamic_table/create.sql index 4220362f1..8945440c5 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/create.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/create.sql @@ -15,7 +15,7 @@ {%- set dynamic_table = relation.from_config(config.model) -%} -{%- if dynamic_table.catalog.table_format = "iceberg" -%} +{%- if dynamic_table.catalog.table_format == 'iceberg' -%} {{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} {%- else -%} {{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql b/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql index 2e3bb12a5..73b3ba382 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql @@ -15,7 +15,7 @@ {%- set dynamic_table = relation.from_config(config.model) -%} -{%- if dynamic_table.catalog.table_format = "iceberg" -%} +{%- if dynamic_table.catalog.table_format == 'iceberg' -%} {{ _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} {%- else -%} {{ _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} From 1b9c00b95d23e5cfa3b8521fdc9fd9ed41d8b29d Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Tue, 17 Sep 2024 18:39:15 -0400 Subject: [PATCH 05/29] fix validation rules on catalog --- dbt/adapters/snowflake/relation_configs/catalog.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dbt/adapters/snowflake/relation_configs/catalog.py b/dbt/adapters/snowflake/relation_configs/catalog.py index d2c8db3fc..c4d7b7310 100644 --- a/dbt/adapters/snowflake/relation_configs/catalog.py +++ b/dbt/adapters/snowflake/relation_configs/catalog.py @@ -40,6 +40,7 @@ class SnowflakeCatalogConfig(SnowflakeRelationConfigBase, RelationConfigValidati external_volume: Optional[str] = None base_location: Optional[str] = None + @property def validation_rules(self) -> Set[RelationConfigValidationRule]: return { RelationConfigValidationRule( From cdc90b5cdbfc5034dd23ca7de196f88dfffeb7e2 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Wed, 18 Sep 2024 14:04:46 -0400 Subject: [PATCH 06/29] consider the entire catalog for a config change --- dbt/adapters/snowflake/relation.py | 2 +- dbt/adapters/snowflake/relation_configs/catalog.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/snowflake/relation.py b/dbt/adapters/snowflake/relation.py index f56bfe828..148ae0869 100644 --- a/dbt/adapters/snowflake/relation.py +++ b/dbt/adapters/snowflake/relation.py @@ -115,7 +115,7 @@ def dynamic_table_config_changeset( context=new_dynamic_table.refresh_mode, ) - if new_dynamic_table.catalog.table_format != existing_dynamic_table.catalog.table_format: + if new_dynamic_table.catalog != existing_dynamic_table.catalog: config_change_collection.catalog = SnowflakeCatalogConfigChange( action=RelationConfigChangeAction.create, context=new_dynamic_table.catalog, diff --git a/dbt/adapters/snowflake/relation_configs/catalog.py b/dbt/adapters/snowflake/relation_configs/catalog.py index c4d7b7310..83e2c860c 100644 --- a/dbt/adapters/snowflake/relation_configs/catalog.py +++ b/dbt/adapters/snowflake/relation_configs/catalog.py @@ -94,6 +94,9 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, if catalog_results is None or len(catalog_results) == 0: return {"table_format": "default"} + # for now, if we get catalog results, it's because this is an iceberg table + # this is because we only run `show iceberg tables` to get catalog metadata + # this will need to be updated once this is in `show objects` catalog: "agate.Row" = catalog_results.rows[0] config_dict = {"table_format": "iceberg"} @@ -111,7 +114,7 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, @dataclass(frozen=True, eq=True, unsafe_hash=True) class SnowflakeCatalogConfigChange(RelationConfigChange): - context: Optional[str] = None + context: Optional[SnowflakeCatalogConfig] = None @property def requires_full_refresh(self) -> bool: From e65bba2ca74d4f8c488410d1fdca6ca96ed41e5a Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Wed, 18 Sep 2024 14:05:46 -0400 Subject: [PATCH 07/29] remove is_dynamic-related guards as that is ga now --- dbt/adapters/snowflake/impl.py | 46 +++++++++++----------------------- 1 file changed, 15 insertions(+), 31 deletions(-) diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index a6297887d..d4f58c404 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -247,28 +247,19 @@ def list_relations_without_caching( return [] raise - # this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory - columns = ["database_name", "schema_name", "name", "kind"] - if "is_dynamic" in schema_objects.column_names: - columns.append("is_dynamic") - if "is_iceberg" in schema_objects.column_names: - columns.append("is_iceberg") - - return [self._parse_list_relations_result(obj) for obj in schema_objects.select(columns)] + return [self._parse_list_relations_result(obj) for obj in schema_objects] def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation: - # this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory - # this can be reduced to always including `is_iceberg` once Snowflake adds it to show objects - try: - if self.behavior.enable_iceberg_materializations.no_warn: - database, schema, identifier, relation_type, is_dynamic, is_iceberg = result - else: - database, schema, identifier, relation_type, is_dynamic = result - except ValueError: - database, schema, identifier, relation_type = result - is_dynamic = "N" - if self.behavior.enable_iceberg_materializations.no_warn: - is_iceberg = "N" + # this can be collapsed once Snowflake adds is_iceberg to show objects + if self.behavior.enable_iceberg_materializations: + columns = ["database_name", "schema_name", "name", "kind", "is_dynamic", "is_iceberg"] + database, schema, identifier, relation_type, is_dynamic, is_iceberg = result.select( + columns + ) + else: + columns = ["database_name", "schema_name", "name", "kind", "is_dynamic"] + database, schema, identifier, relation_type, is_dynamic = result.select(columns) + is_iceberg = "NO" try: relation_type = self.Relation.get_relation_type(relation_type.lower()) @@ -278,22 +269,15 @@ def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation if relation_type == self.Relation.Table and is_dynamic == "Y": relation_type = self.Relation.DynamicTable - # This line is the main gate on supporting Iceberg materializations. Pass forward a default - # table format, and no downstream table macros can build iceberg relations. - table_format: str = ( - TableFormat.ICEBERG - if self.behavior.enable_iceberg_materializations.no_warn and is_iceberg in ("Y", "YES") - else TableFormat.DEFAULT - ) - quote_policy = {"database": True, "schema": True, "identifier": True} - return self.Relation.create( database=database, schema=schema, identifier=identifier, type=relation_type, - table_format=table_format, - quote_policy=quote_policy, + table_format=( + TableFormat.ICEBERG if is_iceberg in ("Y", "YES") else TableFormat.DEFAULT + ), + quote_policy={"database": True, "schema": True, "identifier": True}, ) def quote_seed_column(self, column: str, quote_config: Optional[bool]) -> str: From 2ffe6ab373b4b191b0a563b2b4476e1ad33e1afc Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Wed, 18 Sep 2024 14:05:55 -0400 Subject: [PATCH 08/29] formatting --- dbt/include/snowflake/macros/adapters.sql | 1 + .../snowflake/macros/relations/dynamic_table/describe.sql | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index aa8895819..bf9fe8483 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -137,6 +137,7 @@ {% macro snowflake__list_relations_without_caching(schema_relation, max_iter=10, max_results_per_iter=10000) %} {%- set max_total_results = max_results_per_iter * max_iter -%} + {%- set sql -%} {% if schema_relation is string %} show objects in {{ schema_relation }} limit {{ max_results_per_iter }}; diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql b/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql index 5815231a1..681806a7a 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql @@ -16,7 +16,7 @@ from table(result_scan(last_query_id())) {%- endset %} {% set _dynamic_table = run_query(_dynamic_table_sql) %} -{% if adapter.behavior.enable_iceberg_materializations %} +{% if adapter.behavior.enable_iceberg_materializations.no_warn %} {%- set _catalog_sql -%} show iceberg tables like '{{ relation.identifier }}' From f9f1d6a8784ffb39362549601215468ce4222f37 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Wed, 18 Sep 2024 15:24:02 -0400 Subject: [PATCH 09/29] formatting --- dbt/include/snowflake/macros/adapters.sql | 1 - .../macros/relations/dynamic_table/alter.sql | 24 +++++++++---------- .../macros/relations/dynamic_table/drop.sql | 2 +- .../relations/dynamic_table/refresh.sql | 4 ++-- 4 files changed, 15 insertions(+), 16 deletions(-) diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index bf9fe8483..aa8895819 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -137,7 +137,6 @@ {% macro snowflake__list_relations_without_caching(schema_relation, max_iter=10, max_results_per_iter=10000) %} {%- set max_total_results = max_results_per_iter * max_iter -%} - {%- set sql -%} {% if schema_relation is string %} show objects in {{ schema_relation }} limit {{ max_results_per_iter }}; diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/alter.sql b/dbt/include/snowflake/macros/relations/dynamic_table/alter.sql index 633d74700..f4b1be699 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/alter.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/alter.sql @@ -4,22 +4,22 @@ target_relation, sql ) -%} -{{- log('Applying ALTER to: ' ~ existing_relation) -}} + {{- log('Applying ALTER to: ' ~ existing_relation) -}} -{% if configuration_changes.requires_full_refresh %} -{{- get_replace_sql(existing_relation, target_relation, sql) -}} + {% if configuration_changes.requires_full_refresh %} + {{- get_replace_sql(existing_relation, target_relation, sql) -}} -{% else %} + {% else %} -{%- set target_lag = configuration_changes.target_lag -%} -{%- if target_lag -%}{{- log('Applying UPDATE TARGET_LAG to: ' ~ existing_relation) -}}{%- endif -%} -{%- set snowflake_warehouse = configuration_changes.snowflake_warehouse -%} -{%- if snowflake_warehouse -%}{{- log('Applying UPDATE WAREHOUSE to: ' ~ existing_relation) -}}{%- endif -%} + {%- set target_lag = configuration_changes.target_lag -%} + {%- if target_lag -%}{{- log('Applying UPDATE TARGET_LAG to: ' ~ existing_relation) -}}{%- endif -%} + {%- set snowflake_warehouse = configuration_changes.snowflake_warehouse -%} + {%- if snowflake_warehouse -%}{{- log('Applying UPDATE WAREHOUSE to: ' ~ existing_relation) -}}{%- endif -%} -alter dynamic table {{ existing_relation }} set - {% if target_lag %}target_lag = '{{ target_lag.context }}'{% endif %} - {% if snowflake_warehouse %}warehouse = {{ snowflake_warehouse.context }}{% endif %} + alter dynamic table {{ existing_relation }} set + {% if target_lag %}target_lag = '{{ target_lag.context }}'{% endif %} + {% if snowflake_warehouse %}warehouse = {{ snowflake_warehouse.context }}{% endif %} -{%- endif -%} + {%- endif -%} {%- endmacro %} diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/drop.sql b/dbt/include/snowflake/macros/relations/dynamic_table/drop.sql index 7b3a83409..577bd06a0 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/drop.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/drop.sql @@ -1,3 +1,3 @@ {% macro snowflake__get_drop_dynamic_table_sql(relation) %} -drop dynamic table if exists {{ relation }} + drop dynamic table if exists {{ relation }} {% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/refresh.sql b/dbt/include/snowflake/macros/relations/dynamic_table/refresh.sql index 06650a826..5c6af1bda 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/refresh.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/refresh.sql @@ -1,5 +1,5 @@ {% macro snowflake__refresh_dynamic_table(relation) -%} -{{- log('Applying REFRESH to: ' ~ relation) -}} + {{- log('Applying REFRESH to: ' ~ relation) -}} -alter dynamic table {{ relation }} refresh + alter dynamic table {{ relation }} refresh {%- endmacro %} From 3d6a76a16e7015669fe6e3658776459591a5dfa6 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Wed, 18 Sep 2024 16:44:34 -0400 Subject: [PATCH 10/29] move .select back from agate.Row to agate.Table --- dbt/adapters/snowflake/impl.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 3f916d4b2..f04e0d036 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -258,19 +258,20 @@ def list_relations_without_caching( return [] raise - return [self._parse_list_relations_result(obj) for obj in schema_objects] + # this can be collapsed once Snowflake adds is_iceberg to show objects + columns = ["database_name", "schema_name", "name", "kind", "is_dynamic"] + if self.behavior.enable_iceberg_materializations.no_warn: + columns.append("is_iceberg") + + return [self._parse_list_relations_result(obj) for obj in schema_objects.select(columns)] def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation: # this can be collapsed once Snowflake adds is_iceberg to show objects - if self.behavior.enable_iceberg_materializations: - columns = ["database_name", "schema_name", "name", "kind", "is_dynamic", "is_iceberg"] - database, schema, identifier, relation_type, is_dynamic, is_iceberg = result.select( - columns - ) + if self.behavior.enable_iceberg_materializations.no_warn: + database, schema, identifier, relation_type, is_dynamic, is_iceberg = result else: - columns = ["database_name", "schema_name", "name", "kind", "is_dynamic"] - database, schema, identifier, relation_type, is_dynamic = result.select(columns) - is_iceberg = "NO" + database, schema, identifier, relation_type, is_dynamic = result + is_iceberg = "N" try: relation_type = self.Relation.get_relation_type(relation_type.lower()) From 7444dff1b807a66608b7f8dc9d8faecde0700a40 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Wed, 18 Sep 2024 16:46:24 -0400 Subject: [PATCH 11/29] formatting --- dbt/adapters/snowflake/impl.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index f04e0d036..396fc4204 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -281,15 +281,17 @@ def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation if relation_type == self.Relation.Table and is_dynamic == "Y": relation_type = self.Relation.DynamicTable + table_format = TableFormat.ICEBERG if is_iceberg in ("Y", "YES") else TableFormat.DEFAULT + + quote_policy = ({"database": True, "schema": True, "identifier": True},) + return self.Relation.create( database=database, schema=schema, identifier=identifier, type=relation_type, - table_format=( - TableFormat.ICEBERG if is_iceberg in ("Y", "YES") else TableFormat.DEFAULT - ), - quote_policy={"database": True, "schema": True, "identifier": True}, + table_format=table_format, + quote_policy=quote_policy, ) def quote_seed_column(self, column: str, quote_config: Optional[bool]) -> str: From b1fa0adf2f42763dd2363b7e696aa88170cd6993 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Wed, 18 Sep 2024 16:46:43 -0400 Subject: [PATCH 12/29] formatting --- dbt/adapters/snowflake/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 396fc4204..5b5881eed 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -283,7 +283,7 @@ def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation table_format = TableFormat.ICEBERG if is_iceberg in ("Y", "YES") else TableFormat.DEFAULT - quote_policy = ({"database": True, "schema": True, "identifier": True},) + quote_policy = {"database": True, "schema": True, "identifier": True} return self.Relation.create( database=database, From 0aee10bdbf326c3afbcdc3e659f037db41f46f5e Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Wed, 18 Sep 2024 20:57:33 -0400 Subject: [PATCH 13/29] fix catalog data marshalling --- dbt/adapters/snowflake/relation_configs/dynamic_table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/snowflake/relation_configs/dynamic_table.py b/dbt/adapters/snowflake/relation_configs/dynamic_table.py index 7bae857c2..61a4cafa3 100644 --- a/dbt/adapters/snowflake/relation_configs/dynamic_table.py +++ b/dbt/adapters/snowflake/relation_configs/dynamic_table.py @@ -75,7 +75,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self: "query": config_dict.get("query"), "target_lag": config_dict.get("target_lag"), "snowflake_warehouse": config_dict.get("snowflake_warehouse"), - "catalog": SnowflakeCatalogConfig.from_dict(config_dict), + "catalog": SnowflakeCatalogConfig.from_dict(config_dict["catalog"]), "refresh_mode": config_dict.get("refresh_mode"), "initialize": config_dict.get("initialize"), } From c74283ee05ebbe42fa74e8fc1d238831918f29af Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Wed, 18 Sep 2024 21:12:53 -0400 Subject: [PATCH 14/29] flip the flag for testing --- dbt/adapters/snowflake/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 5b5881eed..c844f0cd8 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -83,7 +83,7 @@ def _behavior_flags(self) -> List[BehaviorFlag]: return [ { "name": "enable_iceberg_materializations", - "default": False, + "default": True, "description": ( "Enabling Iceberg materializations introduces latency to metadata queries, " "specifically within the list_relations_without_caching macro. Since Iceberg " From fb9aee1a4fa2a0baa050679585e9cf5994b44cae Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Fri, 20 Sep 2024 12:28:16 -0400 Subject: [PATCH 15/29] simplify dynamic table testing --- .../adapter/dynamic_table_tests/files.py | 33 -- .../test_dynamic_tables_basic.py | 186 ----------- .../test_dynamic_tables_changes.py | 307 ------------------ .../adapter/dynamic_table_tests/utils.py | 53 --- tests/functional/relation_tests/__init__.py | 0 .../dynamic_table_tests/__init__.py | 0 .../dynamic_table_tests/models.py | 64 ++++ .../dynamic_table_tests/test_basic.py | 33 ++ .../test_configuration_changes.py | 103 ++++++ tests/functional/relation_tests/models.py | 47 +++ .../test_relation_type_change.py | 73 +++++ tests/functional/utils.py | 78 +++++ 12 files changed, 398 insertions(+), 579 deletions(-) delete mode 100644 tests/functional/adapter/dynamic_table_tests/files.py delete mode 100644 tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_basic.py delete mode 100644 tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py delete mode 100644 tests/functional/adapter/dynamic_table_tests/utils.py create mode 100644 tests/functional/relation_tests/__init__.py create mode 100644 tests/functional/relation_tests/dynamic_table_tests/__init__.py create mode 100644 tests/functional/relation_tests/dynamic_table_tests/models.py create mode 100644 tests/functional/relation_tests/dynamic_table_tests/test_basic.py create mode 100644 tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py create mode 100644 tests/functional/relation_tests/models.py create mode 100644 tests/functional/relation_tests/test_relation_type_change.py create mode 100644 tests/functional/utils.py diff --git a/tests/functional/adapter/dynamic_table_tests/files.py b/tests/functional/adapter/dynamic_table_tests/files.py deleted file mode 100644 index ef8d2bf1f..000000000 --- a/tests/functional/adapter/dynamic_table_tests/files.py +++ /dev/null @@ -1,33 +0,0 @@ -MY_SEED = """ -id,value -1,100 -2,200 -3,300 -""".strip() - - -MY_TABLE = """ -{{ config( - materialized='table', -) }} -select * from {{ ref('my_seed') }} -""" - - -MY_VIEW = """ -{{ config( - materialized='view', -) }} -select * from {{ ref('my_seed') }} -""" - - -MY_DYNAMIC_TABLE = """ -{{ config( - materialized='dynamic_table', - snowflake_warehouse='DBT_TESTING', - target_lag='2 minutes', - refresh_mode='INCREMENTAL', -) }} -select * from {{ ref('my_seed') }} -""" diff --git a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_basic.py b/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_basic.py deleted file mode 100644 index a17f5d267..000000000 --- a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_basic.py +++ /dev/null @@ -1,186 +0,0 @@ -from typing import Optional, Tuple - -import pytest - -from dbt.tests.util import ( - get_model_file, - run_dbt, - run_dbt_and_capture, - set_model_file, -) - -from dbt.adapters.snowflake.relation import SnowflakeRelation, SnowflakeRelationType -from tests.functional.adapter.dynamic_table_tests.files import ( - MY_DYNAMIC_TABLE, - MY_SEED, - MY_TABLE, - MY_VIEW, -) -from tests.functional.adapter.dynamic_table_tests.utils import query_relation_type - - -class TestSnowflakeDynamicTableBasic: - @staticmethod - def insert_record(project, table: SnowflakeRelation, record: Tuple[int, int]): - my_id, value = record - project.run_sql(f"insert into {table} (id, value) values ({my_id}, {value})") - - @staticmethod - def refresh_dynamic_table(project, dynamic_table: SnowflakeRelation): - sql = f"alter dynamic table {dynamic_table} refresh" - project.run_sql(sql) - - @staticmethod - def query_row_count(project, relation: SnowflakeRelation) -> int: - sql = f"select count(*) from {relation}" - return project.run_sql(sql, fetch="one")[0] - - @staticmethod - def query_relation_type(project, relation: SnowflakeRelation) -> Optional[str]: - return query_relation_type(project, relation) - - @pytest.fixture(scope="class", autouse=True) - def seeds(self): - return {"my_seed.csv": MY_SEED} - - @pytest.fixture(scope="class", autouse=True) - def models(self): - yield { - "my_table.sql": MY_TABLE, - "my_view.sql": MY_VIEW, - "my_dynamic_table.sql": MY_DYNAMIC_TABLE, - } - - @pytest.fixture(scope="class") - def my_dynamic_table(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_dynamic_table", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.DynamicTable, - ) - - @pytest.fixture(scope="class") - def my_view(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_view", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.View, - ) - - @pytest.fixture(scope="class") - def my_table(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_table", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.Table, - ) - - @pytest.fixture(scope="class") - def my_seed(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_seed", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.Table, - ) - - @staticmethod - def load_model(project, current_model, new_model): - model_to_load = get_model_file(project, new_model) - set_model_file(project, current_model, model_to_load) - - @pytest.fixture(scope="function", autouse=True) - def setup(self, project, my_dynamic_table, my_view, my_table): - run_dbt(["seed"]) - run_dbt(["run", "--models", my_dynamic_table.identifier, "--full-refresh"]) - - # the tests touch these files, store their contents in memory - my_dynamic_table_config = get_model_file(project, my_dynamic_table) - my_view_config = get_model_file(project, my_view) - my_table_config = get_model_file(project, my_table) - - yield - - # and then reset them after the test runs - set_model_file(project, my_dynamic_table, my_dynamic_table_config) - set_model_file(project, my_view, my_view_config) - set_model_file(project, my_table, my_table_config) - project.run_sql(f"drop schema if exists {project.test_schema} cascade") - - def test_dynamic_table_create(self, project, my_dynamic_table): - # setup creates it; verify it's there - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - def test_dynamic_table_create_idempotent(self, project, my_dynamic_table): - # setup creates it once; verify it's there and run once - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - def test_dynamic_table_full_refresh(self, project, my_dynamic_table): - _, logs = run_dbt_and_capture( - ["--debug", "run", "--models", my_dynamic_table.identifier, "--full-refresh"] - ) - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - def test_dynamic_table_replaces_table(self, project, my_table, my_dynamic_table): - run_dbt(["run", "--models", my_table.identifier]) - assert self.query_relation_type(project, my_table) == "table" - - self.load_model(project, my_table, my_dynamic_table) - - run_dbt(["run", "--models", my_table.identifier]) - assert self.query_relation_type(project, my_table) == "dynamic_table" - - def test_dynamic_table_replaces_view(self, project, my_view, my_dynamic_table): - run_dbt(["run", "--models", my_view.identifier]) - assert self.query_relation_type(project, my_view) == "view" - - self.load_model(project, my_view, my_dynamic_table) - - run_dbt(["run", "--models", my_view.identifier]) - assert self.query_relation_type(project, my_view) == "dynamic_table" - - def test_table_replaces_dynamic_table(self, project, my_dynamic_table, my_table): - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - self.load_model(project, my_dynamic_table, my_table) - - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "table" - - def test_view_replaces_dynamic_table(self, project, my_dynamic_table, my_view): - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - self.load_model(project, my_dynamic_table, my_view) - - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "view" - - def test_dynamic_table_only_updates_after_refresh(self, project, my_dynamic_table, my_seed): - # poll database - table_start = self.query_row_count(project, my_seed) - view_start = self.query_row_count(project, my_dynamic_table) - - # insert new record in table - self.insert_record(project, my_seed, (4, 400)) - - # poll database - table_mid = self.query_row_count(project, my_seed) - view_mid = self.query_row_count(project, my_dynamic_table) - - # refresh the materialized view - self.refresh_dynamic_table(project, my_dynamic_table) - - # poll database - table_end = self.query_row_count(project, my_seed) - view_end = self.query_row_count(project, my_dynamic_table) - - # new records were inserted in the table but didn't show up in the view until it was refreshed - assert table_start < table_mid == table_end - assert view_start == view_mid < view_end diff --git a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py b/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py deleted file mode 100644 index a58b76f29..000000000 --- a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py +++ /dev/null @@ -1,307 +0,0 @@ -from typing import Optional - -import pytest - -from dbt_common.contracts.config.materialization import OnConfigurationChangeOption -from dbt.tests.util import ( - assert_message_in_logs, - get_model_file, - run_dbt, - run_dbt_and_capture, - set_model_file, -) - -from dbt.adapters.snowflake.relation import SnowflakeRelation, SnowflakeRelationType -from tests.functional.adapter.dynamic_table_tests.files import ( - MY_DYNAMIC_TABLE, - MY_SEED, -) -from tests.functional.adapter.dynamic_table_tests.utils import ( - query_refresh_mode, - query_relation_type, - query_target_lag, - query_warehouse, -) - - -class SnowflakeDynamicTableChanges: - @staticmethod - def check_start_state(project, dynamic_table): - assert query_target_lag(project, dynamic_table) == "2 minutes" - assert query_warehouse(project, dynamic_table) == "DBT_TESTING" - assert query_refresh_mode(project, dynamic_table) == "INCREMENTAL" - - @staticmethod - def change_config_via_alter(project, dynamic_table): - initial_model = get_model_file(project, dynamic_table) - new_model = initial_model.replace( - "target_lag='2 minutes'", "target_lag='5 minutes'" - ) - set_model_file(project, dynamic_table, new_model) - - @staticmethod - def change_config_via_alter_downstream(project, dynamic_table): - initial_model = get_model_file(project, dynamic_table) - new_model = initial_model.replace( - "target_lag='2 minutes'", "target_lag='DOWNSTREAM'" - ) - set_model_file(project, dynamic_table, new_model) - - @staticmethod - def check_state_alter_change_is_applied(project, dynamic_table): - assert query_target_lag(project, dynamic_table) == "5 minutes" - assert query_warehouse(project, dynamic_table) == "DBT_TESTING" - - @staticmethod - def check_state_alter_change_is_applied_downstream(project, dynamic_table): - assert query_target_lag(project, dynamic_table) == "DOWNSTREAM" - assert query_warehouse(project, dynamic_table) == "DBT_TESTING" - - @staticmethod - def change_config_via_replace(project, dynamic_table): - initial_model = get_model_file(project, dynamic_table) - new_model = initial_model.replace("refresh_mode='INCREMENTAL'", "refresh_mode='FULL'") - set_model_file(project, dynamic_table, new_model) - - @staticmethod - def check_state_replace_change_is_applied(project, dynamic_table): - assert query_refresh_mode(project, dynamic_table) == "FULL" - - @staticmethod - def query_relation_type(project, relation: SnowflakeRelation) -> Optional[str]: - return query_relation_type(project, relation) - - @pytest.fixture(scope="class", autouse=True) - def seeds(self): - yield {"my_seed.csv": MY_SEED} - - @pytest.fixture(scope="class", autouse=True) - def models(self): - yield {"my_dynamic_table.sql": MY_DYNAMIC_TABLE} - - @pytest.fixture(scope="class") - def my_dynamic_table(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_dynamic_table", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.DynamicTable, - ) - - @pytest.fixture(scope="function", autouse=True) - def setup(self, project, my_dynamic_table): - # make sure the model in the data reflects the files each time - run_dbt(["seed"]) - run_dbt(["run", "--models", my_dynamic_table.identifier, "--full-refresh"]) - - # the tests touch these files, store their contents in memory - initial_model = get_model_file(project, my_dynamic_table) - - # verify the initial settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - yield - - # and then reset them after the test runs - set_model_file(project, my_dynamic_table, initial_model) - - # ensure clean slate each method - project.run_sql(f"drop schema if exists {project.test_schema} cascade") - - def test_full_refresh_occurs_with_changes(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - self.change_config_via_replace(project, my_dynamic_table) - _, logs = run_dbt_and_capture( - ["--debug", "run", "--models", my_dynamic_table.identifier, "--full-refresh"] - ) - - # verify the updated settings are correct in Snowflake - self.check_state_alter_change_is_applied(project, my_dynamic_table) - self.check_state_replace_change_is_applied(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", logs.replace('"', "") - ) - - -class TestSnowflakeDynamicTableChangesApply(SnowflakeDynamicTableChanges): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"models": {"on_configuration_change": OnConfigurationChangeOption.Apply.value}} - - def test_change_is_applied_via_alter(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_state_alter_change_is_applied(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', "") - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - def test_change_is_applied_via_alter_downstream(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter_downstream(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_state_alter_change_is_applied_downstream(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', "") - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - @pytest.mark.skip( - "dbt-snowflake does not currently monitor any changes the trigger a full refresh" - ) - def test_change_is_applied_via_replace(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - self.change_config_via_replace(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_state_alter_change_is_applied(project, my_dynamic_table) - self.check_state_replace_change_is_applied(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", logs.replace('"', "") - ) - - -class TestSnowflakeDynamicTableChangesContinue(SnowflakeDynamicTableChanges): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"models": {"on_configuration_change": OnConfigurationChangeOption.Continue.value}} - - def test_change_is_not_applied_via_alter(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Configuration changes were identified and `on_configuration_change` was set" - f" to `continue` for `{my_dynamic_table}`", - logs, - ) - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - def test_change_is_not_applied_via_replace(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - self.change_config_via_replace(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Configuration changes were identified and `on_configuration_change` was set" - f" to `continue` for `{my_dynamic_table}`", - logs, - ) - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - -class TestSnowflakeDynamicTableChangesFailMixin(SnowflakeDynamicTableChanges): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"models": {"on_configuration_change": OnConfigurationChangeOption.Fail.value}} - - def test_change_is_not_applied_via_alter(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - _, logs = run_dbt_and_capture( - ["--debug", "run", "--models", my_dynamic_table.name], expect_pass=False - ) - - # verify the updated settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Configuration changes were identified and `on_configuration_change` was set" - f" to `fail` for `{my_dynamic_table}`", - logs, - ) - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - def test_change_is_not_applied_via_replace(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - self.change_config_via_replace(project, my_dynamic_table) - _, logs = run_dbt_and_capture( - ["--debug", "run", "--models", my_dynamic_table.name], expect_pass=False - ) - - # verify the updated settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Configuration changes were identified and `on_configuration_change` was set" - f" to `fail` for `{my_dynamic_table}`", - logs, - ) - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) diff --git a/tests/functional/adapter/dynamic_table_tests/utils.py b/tests/functional/adapter/dynamic_table_tests/utils.py deleted file mode 100644 index d72b231c9..000000000 --- a/tests/functional/adapter/dynamic_table_tests/utils.py +++ /dev/null @@ -1,53 +0,0 @@ -from typing import Optional - -import agate -from dbt.tests.util import get_connection - -from dbt.adapters.snowflake.relation import SnowflakeRelation - - -def query_relation_type(project, relation: SnowflakeRelation) -> Optional[str]: - sql = f""" - select - case - when table_type = 'BASE TABLE' and is_dynamic = 'YES' then 'dynamic_table' - when table_type = 'BASE TABLE' then 'table' - when table_type = 'VIEW' then 'view' - when table_type = 'EXTERNAL TABLE' then 'external_table' - end as relation_type - from information_schema.tables - where table_name like '{relation.identifier.upper()}' - and table_schema like '{relation.schema.upper()}' - and table_catalog like '{relation.database.upper()}' - """ - results = project.run_sql(sql, fetch="one") - if results is None or len(results) == 0: - return None - elif len(results) > 1: - raise ValueError(f"More than one instance of {relation.name} found!") - else: - return results[0].lower() - - -def query_target_lag(project, dynamic_table: SnowflakeRelation) -> Optional[str]: - config = describe_dynamic_table(project, dynamic_table) - return config.get("target_lag") - - -def query_warehouse(project, dynamic_table: SnowflakeRelation) -> Optional[str]: - config = describe_dynamic_table(project, dynamic_table) - return config.get("warehouse") - - -def query_refresh_mode(project, dynamic_table: SnowflakeRelation) -> Optional[str]: - config = describe_dynamic_table(project, dynamic_table) - return config.get("refresh_mode") - - -def describe_dynamic_table(project, dynamic_table: SnowflakeRelation) -> agate.Row: - with get_connection(project.adapter): - macro_results = project.adapter.execute_macro( - "snowflake__describe_dynamic_table", kwargs={"relation": dynamic_table} - ) - config = macro_results["dynamic_table"] - return config.rows[0] diff --git a/tests/functional/relation_tests/__init__.py b/tests/functional/relation_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/functional/relation_tests/dynamic_table_tests/__init__.py b/tests/functional/relation_tests/dynamic_table_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/functional/relation_tests/dynamic_table_tests/models.py b/tests/functional/relation_tests/dynamic_table_tests/models.py new file mode 100644 index 000000000..d5afca681 --- /dev/null +++ b/tests/functional/relation_tests/dynamic_table_tests/models.py @@ -0,0 +1,64 @@ +SEED = """ +id,value +1,100 +2,200 +3,300 +""".strip() + + +DYNAMIC_TABLE = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', + refresh_mode='INCREMENTAL', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE_DOWNSTREAM = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='DOWNSTREAM', + refresh_mode='INCREMENTAL', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_ICEBERG_TABLE = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', + refresh_mode='INCREMENTAL', + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_subpath="subpath", +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE_ALTER = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='5 minutes', + refresh_mode='INCREMENTAL', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE_REPLACE = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', + refresh_mode='FULL', +) }} +select * from {{ ref('my_seed') }} +""" diff --git a/tests/functional/relation_tests/dynamic_table_tests/test_basic.py b/tests/functional/relation_tests/dynamic_table_tests/test_basic.py new file mode 100644 index 000000000..f704fadd1 --- /dev/null +++ b/tests/functional/relation_tests/dynamic_table_tests/test_basic.py @@ -0,0 +1,33 @@ +import pytest + +from dbt.tests.util import run_dbt + +from tests.functional.relation_tests.dynamic_table_tests import models +from tests.functional.utils import query_relation_type + + +class TestBasic: + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + return {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "my_dynamic_table.sql": models.DYNAMIC_TABLE, + "my_dynamic_table_downstream.sql": models.DYNAMIC_TABLE_DOWNSTREAM, + "my_dynamic_iceberg_table.sql": models.DYNAMIC_ICEBERG_TABLE, + } + + @pytest.fixture(scope="class", autouse=True) + def setup(self, project): + run_dbt(["seed"]) + run_dbt(["run"]) + + @pytest.mark.parametrize( + "relation", ["my_dynamic_table", "my_dynamic_iceberg_table", "my_dynamic_table_downstream"] + ) + def test_dynamic_table_full_refresh(self, project, relation): + run_dbt(["run", "--models", relation, "--full-refresh"]) + assert query_relation_type(project, relation) == "dynamic_table" diff --git a/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py b/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py new file mode 100644 index 000000000..3c4f65a87 --- /dev/null +++ b/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py @@ -0,0 +1,103 @@ +import pytest + +from dbt.tests.util import run_dbt + +from tests.functional.relation_tests.dynamic_table_tests import models +from tests.functional.utils import describe_dynamic_table, update_model + + +class Changes: + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "dynamic_table_alter.sql": models.DYNAMIC_TABLE, + "dynamic_table_replace.sql": models.DYNAMIC_TABLE, + } + + @pytest.fixture(scope="function", autouse=True) + def setup_class(self, project): + run_dbt(["seed"]) + yield + project.run_sql(f"drop schema if exists {project.test_schema} cascade") + + @pytest.fixture(scope="function", autouse=True) + def setup_method(self, project, setup_class): + # make sure the model in the data reflects the files each time + run_dbt(["run", "--full-refresh"]) + self.assert_changes_are_not_applied(project) + + update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE_ALTER) + update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE_REPLACE) + + yield + + update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE) + update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE) + + @staticmethod + def assert_changes_are_applied(project): + altered = describe_dynamic_table(project, "dynamic_table_alter") + assert altered.snowflake_warehouse == "DBT_TESTING" + assert altered.target_lag == "5 minutes" # this updated + assert altered.refresh_mode == "INCREMENTAL" + + replaced = describe_dynamic_table(project, "dynamic_table_replace") + assert replaced.snowflake_warehouse == "DBT_TESTING" + assert replaced.target_lag == "2 minutes" + assert replaced.refresh_mode == "FULL" # this updated + + @staticmethod + def assert_changes_are_not_applied(project): + altered = describe_dynamic_table(project, "dynamic_table_alter") + assert altered.snowflake_warehouse == "DBT_TESTING" + assert altered.target_lag == "2 minutes" # this would have updated, but didn't + assert altered.refresh_mode == "INCREMENTAL" + + replaced = describe_dynamic_table(project, "dynamic_table_replace") + assert replaced.snowflake_warehouse == "DBT_TESTING" + assert replaced.target_lag == "2 minutes" + assert replaced.refresh_mode == "INCREMENTAL" # this would have updated, but didn't + + def test_full_refresh_is_always_successful(self, project): + # this always passes and always changes the configuration, regardless of on_configuration_change + # and regardless of whether the changes require a replace versus an alter + run_dbt(["run", "--full-refresh"]) + self.assert_changes_are_applied(project) + + +class TestChangesApply(Changes): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "apply"}} + + def test_changes_are_applied(self, project): + # this passes and changes the configuration + run_dbt(["run"]) + self.assert_changes_are_applied(project) + + +class TestChangesContinue(Changes): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "continue"}} + + def test_changes_are_not_applied(self, project): + # this passes but does not change the configuration + run_dbt(["run"]) + self.assert_changes_are_not_applied(project) + + +class TestChangesFail(Changes): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "fail"}} + + def test_changes_are_not_applied(self, project): + # this fails and does not change the configuration + run_dbt(["run"], expect_pass=False) + self.assert_changes_are_not_applied(project) diff --git a/tests/functional/relation_tests/models.py b/tests/functional/relation_tests/models.py new file mode 100644 index 000000000..d49728166 --- /dev/null +++ b/tests/functional/relation_tests/models.py @@ -0,0 +1,47 @@ +SEED = """ +id,value +1,100 +2,200 +3,300 +""".strip() + + +TABLE = """ +{{ config( + materialized='table', +) }} +select * from {{ ref('my_seed') }} +""" + + +VIEW = """ +{{ config( + materialized='view', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='1 minute', + refresh_mode='INCREMENTAL', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_ICEBERG_TABLE = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='1 minute', + refresh_mode='INCREMENTAL', + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_subpath="subpath", +) }} +select * from {{ ref('my_seed') }} +""" diff --git a/tests/functional/relation_tests/test_relation_type_change.py b/tests/functional/relation_tests/test_relation_type_change.py new file mode 100644 index 000000000..b41bb19d8 --- /dev/null +++ b/tests/functional/relation_tests/test_relation_type_change.py @@ -0,0 +1,73 @@ +from dataclasses import dataclass +from itertools import product +from typing import Optional + +from dbt.tests.util import run_dbt +import pytest + +from tests.functional.relation_tests import models +from tests.functional.utils import describe_dynamic_table, query_relation_type, update_model + + +@dataclass +class Model: + model: str + relation_type: str + table_format: Optional[str] = None + + @property + def name(self) -> str: + name = f"{self.relation_type}" + if self.table_format: + name += f"_{self.table_format}" + return name + + +@dataclass +class Scenario: + initial: Model + final: Model + + @property + def name(self) -> str: + return f"REPLACE_{self.initial.name}__WITH_{self.final.name}" + + @property + def error_message(self) -> str: + return f"Failed when migrating from: {self.initial.name} to: {self.final.name}" + + +relations = [ + Model(models.VIEW, "view"), + Model(models.TABLE, "table", "default"), + Model(models.DYNAMIC_TABLE, "dynamic_table", "default"), + Model(models.DYNAMIC_ICEBERG_TABLE, "dynamic_table", "iceberg"), +] +scenarios = [Scenario(*scenario) for scenario in product(relations, relations)] + + +class TestRelationTypeChange: + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + return {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield {f"{scenario.name}.sql": scenario.initial.model for scenario in scenarios} + + @pytest.fixture(scope="class", autouse=True) + def setup(self, project): + run_dbt(["seed"]) + run_dbt(["run"]) + for scenario in scenarios: + update_model(project, scenario.name, scenario.final.model) + run_dbt(["run"]) + + @pytest.mark.parametrize("scenario", scenarios, ids=[scenario.name for scenario in scenarios]) + def test_replace(self, project, scenario): + relation_type = query_relation_type(project, scenario.name) + assert relation_type == scenario.final.relation_type, scenario.error_message + if relation_type == "dynamic_table": + dynamic_table = describe_dynamic_table(project, scenario.name) + assert dynamic_table.catalog.table_format == scenario.final.table_format diff --git a/tests/functional/utils.py b/tests/functional/utils.py new file mode 100644 index 000000000..d185e8d2b --- /dev/null +++ b/tests/functional/utils.py @@ -0,0 +1,78 @@ +from typing import Any, Dict, Optional + +from dbt.tests.util import ( + get_connection, + get_model_file, + relation_from_name, + set_model_file, +) + +from dbt.adapters.snowflake.relation_configs import SnowflakeDynamicTableConfig + + +def query_relation_type(project, name: str) -> Optional[str]: + relation = relation_from_name(project.adapter, name) + sql = f""" + select + case table_type + when 'BASE TABLE' then iff(is_dynamic = 'YES', 'dynamic_table', 'table') + when 'VIEW' then 'view' + when 'EXTERNAL TABLE' then 'external_table' + end as relation_type + from information_schema.tables + where table_name like '{relation.identifier.upper()}' + and table_schema like '{relation.schema.upper()}' + and table_catalog like '{relation.database.upper()}' + """ + results = project.run_sql(sql, fetch="all") + + assert len(results) > 0, f"Relation {relation} not found" + assert len(results) == 1, f"Multiple relations found" + + return results[0][0].lower() + + +def query_row_count(project, name: str) -> int: + relation = relation_from_name(project.adapter, name) + sql = f"select count(*) from {relation}" + return project.run_sql(sql, fetch="one")[0] + + +def insert_record(project, name: str, record: Dict[str, Any]): + relation = relation_from_name(project.adapter, name) + column_names = ", ".join(record.keys()) + values = ", ".join( + [f"'{value}'" if isinstance(value, str) else f"{value}" for value in record.values()] + ) + sql = f"insert into {relation} ({column_names}) values ({values})" + project.run_sql(sql) + + +def update_model(project, name: str, model: str) -> str: + relation = relation_from_name(project.adapter, name) + original_model = get_model_file(project, relation) + set_model_file(project, relation, model) + return original_model + + +def describe_dynamic_table(project, name: str) -> Optional[SnowflakeDynamicTableConfig]: + macro = "snowflake__describe_dynamic_table" + dynamic_table = relation_from_name(project.adapter, name) + kwargs = {"relation": dynamic_table} + with get_connection(project.adapter): + results = project.adapter.execute_macro(macro, kwargs=kwargs) + + assert len(results["dynamic_table"].rows) > 0, f"Dynamic table {dynamic_table} not found" + found = len(results["dynamic_table"].rows) + names = ", ".join([table.get("name") for table in results["dynamic_table"].rows]) + assert found == 1, f"Multiple dynamic tables found: {names}" + + return SnowflakeDynamicTableConfig.from_relation_results(results) + + +def refresh_dynamic_table(project, name: str) -> None: + macro = "snowflake__refresh_dynamic_table" + dynamic_table = relation_from_name(project.adapter, name) + kwargs = {"relation": dynamic_table} + with get_connection(project.adapter): + project.adapter.execute_macro(macro, kwargs=kwargs) From 4373bdd65cac78764a27ab4abdf5f7dde90c68cd Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Fri, 20 Sep 2024 19:05:08 -0400 Subject: [PATCH 16/29] undo formatting changes to make function changes easier to see --- .../macros/relations/dynamic_table/create.sql | 50 +++++++------- .../relations/dynamic_table/describe.sql | 66 +++++++++---------- .../relations/dynamic_table/replace.sql | 50 +++++++------- 3 files changed, 83 insertions(+), 83 deletions(-) diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/create.sql b/dbt/include/snowflake/macros/relations/dynamic_table/create.sql index 8945440c5..1e249d241 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/create.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/create.sql @@ -13,13 +13,13 @@ A valid DDL statement which will result in a new dynamic table. -#} -{%- set dynamic_table = relation.from_config(config.model) -%} + {%- set dynamic_table = relation.from_config(config.model) -%} -{%- if dynamic_table.catalog.table_format == 'iceberg' -%} -{{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} -{%- else -%} -{{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} -{%- endif -%} + {%- if dynamic_table.catalog.table_format == 'iceberg' -%} + {{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} + {%- else -%} + {{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} + {%- endif -%} {%- endmacro %} @@ -41,14 +41,14 @@ A valid DDL statement which will result in a new dynamic standard table. -#} -create dynamic table {{ relation }} - target_lag = '{{ dynamic_table.target_lag }}' - warehouse = {{ dynamic_table.snowflake_warehouse }} - {{ optional('refresh_mode', dynamic_table.refresh_mode) }} - {{ optional('initialize', dynamic_table.initialize) }} - as ( - {{ sql }} - ) + create dynamic table {{ relation }} + target_lag = '{{ dynamic_table.target_lag }}' + warehouse = {{ dynamic_table.snowflake_warehouse }} + {{ optional('refresh_mode', dynamic_table.refresh_mode) }} + {{ optional('initialize', dynamic_table.initialize) }} + as ( + {{ sql }} + ) {%- endmacro %} @@ -70,16 +70,16 @@ create dynamic table {{ relation }} A valid DDL statement which will result in a new dynamic iceberg table. -#} -create dynamic iceberg table {{ relation }} - target_lag = '{{ dynamic_table.target_lag }}' - warehouse = {{ dynamic_table.snowflake_warehouse }} - {{ optional('external_volume', dynamic_table.catalog.external_volume) }} - {{ optional('catalog', dynamic_table.catalog.name) }} - base_location = {{ dynamic_table.catalog.base_location }} - {{ optional('refresh_mode', dynamic_table.refresh_mode) }} - {{ optional('initialize', dynamic_table.initialize) }} - as ( - {{ sql }} - ) + create dynamic iceberg table {{ relation }} + target_lag = '{{ dynamic_table.target_lag }}' + warehouse = {{ dynamic_table.snowflake_warehouse }} + {{ optional('external_volume', dynamic_table.catalog.external_volume) }} + {{ optional('catalog', dynamic_table.catalog.name) }} + base_location = {{ dynamic_table.catalog.base_location }} + {{ optional('refresh_mode', dynamic_table.refresh_mode) }} + {{ optional('initialize', dynamic_table.initialize) }} + as ( + {{ sql }} + ) {%- endmacro %} diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql b/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql index 681806a7a..5e5e97ac2 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql @@ -1,37 +1,37 @@ {% macro snowflake__describe_dynamic_table(relation) %} -{%- set _dynamic_table_sql -%} -show dynamic tables - like '{{ relation.identifier }}' - in schema {{ relation.database }}.{{ relation.schema }} -; -select - "name", - "schema_name", - "database_name", - "text", - "target_lag", - "warehouse", - "refresh_mode" -from table(result_scan(last_query_id())) -{%- endset %} -{% set _dynamic_table = run_query(_dynamic_table_sql) %} + {%- set _dynamic_table_sql -%} + show dynamic tables + like '{{ relation.identifier }}' + in schema {{ relation.database }}.{{ relation.schema }} + ; + select + "name", + "schema_name", + "database_name", + "text", + "target_lag", + "warehouse", + "refresh_mode" + from table(result_scan(last_query_id())) + {%- endset %} + {% set _dynamic_table = run_query(_dynamic_table_sql) %} -{% if adapter.behavior.enable_iceberg_materializations.no_warn %} -{%- set _catalog_sql -%} -show iceberg tables - like '{{ relation.identifier }}' - in schema {{ relation.database }}.{{ relation.schema }} -; -select - "catalog_name", - "external_volume_name", - "base_location" -from table(result_scan(last_query_id())) -{%- endset %} -{% set _catalog = run_query(_catalog_sql) %} -{% else %} -{% set _catalog = none %} -{% endif %} + {% if adapter.behavior.enable_iceberg_materializations.no_warn %} + {%- set _catalog_sql -%} + show iceberg tables + like '{{ relation.identifier }}' + in schema {{ relation.database }}.{{ relation.schema }} + ; + select + "catalog_name", + "external_volume_name", + "base_location" + from table(result_scan(last_query_id())) + {%- endset %} + {% set _catalog = run_query(_catalog_sql) %} + {% else %} + {% set _catalog = none %} + {% endif %} -{% do return({'dynamic_table': _dynamic_table, 'catalog': _catalog}) %} + {% do return({'dynamic_table': _dynamic_table, 'catalog': _catalog}) %} {% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql b/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql index 73b3ba382..6ee29b941 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql @@ -13,13 +13,13 @@ A valid DDL statement which will result in a new dynamic table. -#} -{%- set dynamic_table = relation.from_config(config.model) -%} + {%- set dynamic_table = relation.from_config(config.model) -%} -{%- if dynamic_table.catalog.table_format == 'iceberg' -%} -{{ _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} -{%- else -%} -{{ _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} -{%- endif -%} + {%- if dynamic_table.catalog.table_format == 'iceberg' -%} + {{ _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} + {%- else -%} + {{ _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} + {%- endif -%} {%- endmacro %} @@ -40,14 +40,14 @@ A valid DDL statement which will result in a new dynamic standard table. -#} -create or replace dynamic table {{ relation }} - target_lag = '{{ dynamic_table.target_lag }}' - warehouse = {{ dynamic_table.snowflake_warehouse }} - {{ optional('refresh_mode', dynamic_table.refresh_mode) }} - {{ optional('initialize', dynamic_table.initialize) }} - as ( - {{ sql }} - ) + create or replace dynamic table {{ relation }} + target_lag = '{{ dynamic_table.target_lag }}' + warehouse = {{ dynamic_table.snowflake_warehouse }} + {{ optional('refresh_mode', dynamic_table.refresh_mode) }} + {{ optional('initialize', dynamic_table.initialize) }} + as ( + {{ sql }} + ) {%- endmacro %} @@ -69,16 +69,16 @@ create or replace dynamic table {{ relation }} A valid DDL statement which will result in a new dynamic iceberg table. -#} -create or replace dynamic iceberg table {{ relation }} - target_lag = '{{ dynamic_table.target_lag }}' - warehouse = {{ dynamic_table.snowflake_warehouse }} - {{ optional('external_volume', dynamic_table.catalog.external_volume) }} - {{ optional('catalog', dynamic_table.catalog.name) }} - base_location = {{ dynamic_table.catalog.base_location }} - {{ optional('refresh_mode', dynamic_table.refresh_mode) }} - {{ optional('initialize', dynamic_table.initialize) }} - as ( - {{ sql }} - ) + create or replace dynamic iceberg table {{ relation }} + target_lag = '{{ dynamic_table.target_lag }}' + warehouse = {{ dynamic_table.snowflake_warehouse }} + {{ optional('external_volume', dynamic_table.catalog.external_volume) }} + {{ optional('catalog', dynamic_table.catalog.name) }} + base_location = {{ dynamic_table.catalog.base_location }} + {{ optional('refresh_mode', dynamic_table.refresh_mode) }} + {{ optional('initialize', dynamic_table.initialize) }} + as ( + {{ sql }} + ) {%- endmacro %} From 951a2ab39e644cde68e4e680773f5c8f82a0b877 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Fri, 20 Sep 2024 19:30:16 -0400 Subject: [PATCH 17/29] add iceberg dynamic tables to existing dynamic table tests --- .../dynamic_table_tests/models.py | 28 +++++++++++++++++ .../test_configuration_changes.py | 30 +++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/tests/functional/relation_tests/dynamic_table_tests/models.py b/tests/functional/relation_tests/dynamic_table_tests/models.py index d5afca681..6279850cc 100644 --- a/tests/functional/relation_tests/dynamic_table_tests/models.py +++ b/tests/functional/relation_tests/dynamic_table_tests/models.py @@ -62,3 +62,31 @@ ) }} select * from {{ ref('my_seed') }} """ + + +DYNAMIC_ICEBERG_TABLE_ALTER = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='5 minutes', + refresh_mode='INCREMENTAL', + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_subpath="subpath", +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_ICEBERG_TABLE_REPLACE = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', + refresh_mode='FULL', + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_subpath="subpath", +) }} +select * from {{ ref('my_seed') }} +""" diff --git a/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py b/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py index 3c4f65a87..8d890b22b 100644 --- a/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py +++ b/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py @@ -17,6 +17,8 @@ def models(self): yield { "dynamic_table_alter.sql": models.DYNAMIC_TABLE, "dynamic_table_replace.sql": models.DYNAMIC_TABLE, + "dynamic_table_iceberg_alter.sql": models.DYNAMIC_ICEBERG_TABLE, + "dynamic_table_iceberg_replace.sql": models.DYNAMIC_ICEBERG_TABLE, } @pytest.fixture(scope="function", autouse=True) @@ -33,11 +35,17 @@ def setup_method(self, project, setup_class): update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE_ALTER) update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE_REPLACE) + update_model(project, "dynamic_table_iceberg_alter", models.DYNAMIC_ICEBERG_TABLE_ALTER) + update_model( + project, "dynamic_table_iceberg_replace", models.DYNAMIC_ICEBERG_TABLE_REPLACE + ) yield update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE) update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE) + update_model(project, "dynamic_table_iceberg_alter", models.DYNAMIC_ICEBERG_TABLE) + update_model(project, "dynamic_table_iceberg_replace", models.DYNAMIC_ICEBERG_TABLE) @staticmethod def assert_changes_are_applied(project): @@ -51,6 +59,16 @@ def assert_changes_are_applied(project): assert replaced.target_lag == "2 minutes" assert replaced.refresh_mode == "FULL" # this updated + altered_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_alter") + assert altered_iceberg.snowflake_warehouse == "DBT_TESTING" + assert altered_iceberg.target_lag == "5 minutes" # this updated + assert altered_iceberg.refresh_mode == "INCREMENTAL" + + replaced_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_replace") + assert replaced_iceberg.snowflake_warehouse == "DBT_TESTING" + assert replaced_iceberg.target_lag == "2 minutes" + assert replaced_iceberg.refresh_mode == "FULL" # this updated + @staticmethod def assert_changes_are_not_applied(project): altered = describe_dynamic_table(project, "dynamic_table_alter") @@ -63,6 +81,18 @@ def assert_changes_are_not_applied(project): assert replaced.target_lag == "2 minutes" assert replaced.refresh_mode == "INCREMENTAL" # this would have updated, but didn't + altered_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_alter") + assert altered_iceberg.snowflake_warehouse == "DBT_TESTING" + assert altered_iceberg.target_lag == "2 minutes" # this would have updated, but didn't + assert altered_iceberg.refresh_mode == "INCREMENTAL" + + replaced_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_replace") + assert replaced_iceberg.snowflake_warehouse == "DBT_TESTING" + assert replaced_iceberg.target_lag == "2 minutes" + assert ( + replaced_iceberg.refresh_mode == "INCREMENTAL" + ) # this would have updated, but didn't + def test_full_refresh_is_always_successful(self, project): # this always passes and always changes the configuration, regardless of on_configuration_change # and regardless of whether the changes require a replace versus an alter From 610d3792350bf06270e70eba4358564fc5f59c31 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Mon, 23 Sep 2024 15:42:56 -0400 Subject: [PATCH 18/29] update docstrings --- .../macros/relations/dynamic_table/create.sql | 74 +++++++++---------- .../relations/dynamic_table/describe.sql | 46 ++++++++---- .../relations/dynamic_table/replace.sql | 74 +++++++++---------- .../snowflake/macros/utils/optional.sql | 11 +++ 4 files changed, 118 insertions(+), 87 deletions(-) diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/create.sql b/dbt/include/snowflake/macros/relations/dynamic_table/create.sql index 1e249d241..0bd190dcc 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/create.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/create.sql @@ -1,16 +1,16 @@ {% macro snowflake__get_create_dynamic_table_as_sql(relation, sql) -%} {#- - Produce DDL that creates a dynamic table - - Args: - - relation: Union[SnowflakeRelation, str] - - SnowflakeRelation - required for relation.render() - - str - is already the rendered relation name - - sql: str - the code defining the model - Globals: - - config: NodeConfig - contains the attribution required to produce a SnowflakeDynamicTableConfig - Returns: - A valid DDL statement which will result in a new dynamic table. +-- Produce DDL that creates a dynamic table +-- +-- Args: +-- - relation: Union[SnowflakeRelation, str] +-- - SnowflakeRelation - required for relation.render() +-- - str - is already the rendered relation name +-- - sql: str - the code defining the model +-- Globals: +-- - config: NodeConfig - contains the attribution required to produce a SnowflakeDynamicTableConfig +-- Returns: +-- A valid DDL statement which will result in a new dynamic table. -#} {%- set dynamic_table = relation.from_config(config.model) -%} @@ -26,19 +26,19 @@ {% macro _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) -%} {#- - Produce DDL that creates a standard dynamic table - - This follows the syntax outlined here: - https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#syntax - - Args: - - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table - - relation: Union[SnowflakeRelation, str] - - SnowflakeRelation - required for relation.render() - - str - is already the rendered relation name - - sql: str - the code defining the model - Returns: - A valid DDL statement which will result in a new dynamic standard table. +-- Produce DDL that creates a standard dynamic table +-- +-- This follows the syntax outlined here: +-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#syntax +-- +-- Args: +-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table +-- - relation: Union[SnowflakeRelation, str] +-- - SnowflakeRelation - required for relation.render() +-- - str - is already the rendered relation name +-- - sql: str - the code defining the model +-- Returns: +-- A valid DDL statement which will result in a new dynamic standard table. -#} create dynamic table {{ relation }} @@ -55,19 +55,19 @@ {% macro _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) -%} {#- - Produce DDL that creates a dynamic iceberg table - - This follows the syntax outlined here: - https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table - - Args: - - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table - - relation: Union[SnowflakeRelation, str] - - SnowflakeRelation - required for relation.render() - - str - is already the rendered relation name - - sql: str - the code defining the model - Returns: - A valid DDL statement which will result in a new dynamic iceberg table. +-- Produce DDL that creates a dynamic iceberg table +-- +-- This follows the syntax outlined here: +-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table +-- +-- Args: +-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table +-- - relation: Union[SnowflakeRelation, str] +-- - SnowflakeRelation - required for relation.render() +-- - str - is already the rendered relation name +-- - sql: str - the code defining the model +-- Returns: +-- A valid DDL statement which will result in a new dynamic iceberg table. -#} create dynamic iceberg table {{ relation }} diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql b/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql index 5e5e97ac2..77e3a8e52 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql @@ -1,4 +1,14 @@ {% macro snowflake__describe_dynamic_table(relation) %} +{#- +-- Get all relevant metadata about a dynamic table +-- +-- Args: +-- - relation: SnowflakeRelation - the relation to describe +-- Returns: +-- A dictionary with one or two entries depending on whether iceberg is enabled: +-- - dynamic_table: the metadata associated with a standard dynamic table +-- - catalog: the metadata associated with the iceberg catalog +-#} {%- set _dynamic_table_sql -%} show dynamic tables like '{{ relation.identifier }}' @@ -17,21 +27,31 @@ {% set _dynamic_table = run_query(_dynamic_table_sql) %} {% if adapter.behavior.enable_iceberg_materializations.no_warn %} - {%- set _catalog_sql -%} - show iceberg tables - like '{{ relation.identifier }}' - in schema {{ relation.database }}.{{ relation.schema }} - ; - select - "catalog_name", - "external_volume_name", - "base_location" - from table(result_scan(last_query_id())) - {%- endset %} - {% set _catalog = run_query(_catalog_sql) %} - {% else %} + {% set _catalog = run_query(_get_describe_iceberg_catalog_sql(relation)) %} + {% else %} {% set _catalog = none %} {% endif %} {% do return({'dynamic_table': _dynamic_table, 'catalog': _catalog}) %} {% endmacro %} + + +{% macro _get_describe_iceberg_catalog_sql(relation) %} +{#- +-- Produce DQL that returns all relevant metadata about an iceberg catalog +-- +-- Args: +-- - relation: SnowflakeRelation - the relation to describe +-- Returns: +-- A valid DQL statement that will return metadata associated with an iceberg catalog +-#} +show iceberg tables + like '{{ relation.identifier }}' + in schema {{ relation.database }}.{{ relation.schema }} + ; + select + "catalog_name", + "external_volume_name", + "base_location" + from table(result_scan(last_query_id())) +{% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql b/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql index 6ee29b941..f9ba1275a 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql @@ -1,16 +1,16 @@ {% macro snowflake__get_replace_dynamic_table_sql(relation, sql) -%} {#- - Produce DDL that replaces a dynamic table with a new dynamic table - - Args: - - relation: Union[SnowflakeRelation, str] - - SnowflakeRelation - required for relation.render() - - str - is already the rendered relation name - - sql: str - the code defining the model - Globals: - - config: NodeConfig - contains the attribution required to produce a SnowflakeDynamicTableConfig - Returns: - A valid DDL statement which will result in a new dynamic table. +-- Produce DDL that replaces a dynamic table with a new dynamic table +-- +-- Args: +-- - relation: Union[SnowflakeRelation, str] +-- - SnowflakeRelation - required for relation.render() +-- - str - is already the rendered relation name +-- - sql: str - the code defining the model +-- Globals: +-- - config: NodeConfig - contains the attribution required to produce a SnowflakeDynamicTableConfig +-- Returns: +-- A valid DDL statement which will result in a new dynamic table. -#} {%- set dynamic_table = relation.from_config(config.model) -%} @@ -25,19 +25,19 @@ {% macro _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) -%} {#- - Produce DDL that replaces a standard dynamic table with a new standard dynamic table - - This follows the syntax outlined here: - https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#syntax - - Args: - - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table - - relation: Union[SnowflakeRelation, str] - - SnowflakeRelation - required for relation.render() - - str - is already the rendered relation name - - sql: str - the code defining the model - Returns: - A valid DDL statement which will result in a new dynamic standard table. +-- Produce DDL that replaces a standard dynamic table with a new standard dynamic table +-- +-- This follows the syntax outlined here: +-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#syntax +-- +-- Args: +-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table +-- - relation: Union[SnowflakeRelation, str] +-- - SnowflakeRelation - required for relation.render() +-- - str - is already the rendered relation name +-- - sql: str - the code defining the model +-- Returns: +-- A valid DDL statement which will result in a new dynamic standard table. -#} create or replace dynamic table {{ relation }} @@ -54,19 +54,19 @@ {% macro _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) -%} {#- - Produce DDL that replaces a dynamic iceberg table with a new dynamic iceberg table - - This follows the syntax outlined here: - https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table - - Args: - - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table - - relation: Union[SnowflakeRelation, str] - - SnowflakeRelation - required for relation.render() - - str - is already the rendered relation name - - sql: str - the code defining the model - Returns: - A valid DDL statement which will result in a new dynamic iceberg table. +-- Produce DDL that replaces a dynamic iceberg table with a new dynamic iceberg table +-- +-- This follows the syntax outlined here: +-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table +-- +-- Args: +-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table +-- - relation: Union[SnowflakeRelation, str] +-- - SnowflakeRelation - required for relation.render() +-- - str - is already the rendered relation name +-- - sql: str - the code defining the model +-- Returns: +-- A valid DDL statement which will result in a new dynamic iceberg table. -#} create or replace dynamic iceberg table {{ relation }} diff --git a/dbt/include/snowflake/macros/utils/optional.sql b/dbt/include/snowflake/macros/utils/optional.sql index 95f53d4dd..0758ca59f 100644 --- a/dbt/include/snowflake/macros/utils/optional.sql +++ b/dbt/include/snowflake/macros/utils/optional.sql @@ -1,3 +1,14 @@ {% macro optional(name, value, quote_char = '') %} +{#- +-- Insert optional DDL parameters only when their value is provided; makes DDL statements more readable +-- +-- Args: +-- - name: the name of the DDL option +-- - value: the value of the DDL option, may be None +-- - quote_char: the quote character to use (e.g. string), leave blank if unnecessary (e.g. integer or bool) +-- Returns: +-- If the value is not None (e.g. provided by the user), return the option setting DDL +-- If the value is None, return an empty string +-#} {% if value is not none %}{{ name }} = {{ quote_char }}{{ value }}{{ quote_char }}{% endif %} {% endmacro %} From d9b8a6ba7e4518a0623e854f517d815f851b6b0a Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Mon, 23 Sep 2024 16:39:57 -0400 Subject: [PATCH 19/29] revert default iceberg to false, update tests to test both scenarios --- dbt/adapters/snowflake/impl.py | 2 +- .../dynamic_table_tests/test_basic.py | 22 +++- .../test_configuration_changes.py | 106 +++++++++++++----- .../test_relation_type_change.py | 40 +++++-- 4 files changed, 128 insertions(+), 42 deletions(-) diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index c844f0cd8..5b5881eed 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -83,7 +83,7 @@ def _behavior_flags(self) -> List[BehaviorFlag]: return [ { "name": "enable_iceberg_materializations", - "default": True, + "default": False, "description": ( "Enabling Iceberg materializations introduces latency to metadata queries, " "specifically within the list_relations_without_caching macro. Since Iceberg " diff --git a/tests/functional/relation_tests/dynamic_table_tests/test_basic.py b/tests/functional/relation_tests/dynamic_table_tests/test_basic.py index 20cc028c8..79a2241ca 100644 --- a/tests/functional/relation_tests/dynamic_table_tests/test_basic.py +++ b/tests/functional/relation_tests/dynamic_table_tests/test_basic.py @@ -7,6 +7,7 @@ class TestBasic: + iceberg: bool = False @pytest.fixture(scope="class", autouse=True) def seeds(self): @@ -14,11 +15,17 @@ def seeds(self): @pytest.fixture(scope="class", autouse=True) def models(self): - yield { + my_models = { "my_dynamic_table.sql": models.DYNAMIC_TABLE, "my_dynamic_table_downstream.sql": models.DYNAMIC_TABLE_DOWNSTREAM, - "my_dynamic_iceberg_table.sql": models.DYNAMIC_ICEBERG_TABLE, } + if self.iceberg: + my_models.update( + { + "my_dynamic_iceberg_table.sql": models.DYNAMIC_ICEBERG_TABLE, + } + ) + yield my_models @pytest.fixture(scope="class", autouse=True) def setup(self, project): @@ -29,4 +36,13 @@ def test_dynamic_table_full_refresh(self, project): run_dbt(["run", "--full-refresh"]) assert query_relation_type(project, "my_dynamic_table") == "dynamic_table" assert query_relation_type(project, "my_dynamic_table_downstream") == "dynamic_table" - assert query_relation_type(project, "my_dynamic_iceberg_table") == "dynamic_table" + if self.iceberg: + assert query_relation_type(project, "my_dynamic_iceberg_table") == "dynamic_table" + + +class TestBasicIcebergOn(TestBasic): + iceberg = True + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"enable_iceberg_materializations": True}} diff --git a/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py b/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py index 8d890b22b..f389344e0 100644 --- a/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py +++ b/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py @@ -7,6 +7,7 @@ class Changes: + iceberg: bool = False @pytest.fixture(scope="class", autouse=True) def seeds(self): @@ -14,12 +15,18 @@ def seeds(self): @pytest.fixture(scope="class", autouse=True) def models(self): - yield { + my_models = { "dynamic_table_alter.sql": models.DYNAMIC_TABLE, "dynamic_table_replace.sql": models.DYNAMIC_TABLE, - "dynamic_table_iceberg_alter.sql": models.DYNAMIC_ICEBERG_TABLE, - "dynamic_table_iceberg_replace.sql": models.DYNAMIC_ICEBERG_TABLE, } + if self.iceberg: + my_models.update( + { + "dynamic_table_iceberg_alter.sql": models.DYNAMIC_ICEBERG_TABLE, + "dynamic_table_iceberg_replace.sql": models.DYNAMIC_ICEBERG_TABLE, + } + ) + yield my_models @pytest.fixture(scope="function", autouse=True) def setup_class(self, project): @@ -35,20 +42,23 @@ def setup_method(self, project, setup_class): update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE_ALTER) update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE_REPLACE) - update_model(project, "dynamic_table_iceberg_alter", models.DYNAMIC_ICEBERG_TABLE_ALTER) - update_model( - project, "dynamic_table_iceberg_replace", models.DYNAMIC_ICEBERG_TABLE_REPLACE - ) + if self.iceberg: + update_model( + project, "dynamic_table_iceberg_alter", models.DYNAMIC_ICEBERG_TABLE_ALTER + ) + update_model( + project, "dynamic_table_iceberg_replace", models.DYNAMIC_ICEBERG_TABLE_REPLACE + ) yield update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE) update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE) - update_model(project, "dynamic_table_iceberg_alter", models.DYNAMIC_ICEBERG_TABLE) - update_model(project, "dynamic_table_iceberg_replace", models.DYNAMIC_ICEBERG_TABLE) + if self.iceberg: + update_model(project, "dynamic_table_iceberg_alter", models.DYNAMIC_ICEBERG_TABLE) + update_model(project, "dynamic_table_iceberg_replace", models.DYNAMIC_ICEBERG_TABLE) - @staticmethod - def assert_changes_are_applied(project): + def assert_changes_are_applied(self, project): altered = describe_dynamic_table(project, "dynamic_table_alter") assert altered.snowflake_warehouse == "DBT_TESTING" assert altered.target_lag == "5 minutes" # this updated @@ -59,18 +69,18 @@ def assert_changes_are_applied(project): assert replaced.target_lag == "2 minutes" assert replaced.refresh_mode == "FULL" # this updated - altered_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_alter") - assert altered_iceberg.snowflake_warehouse == "DBT_TESTING" - assert altered_iceberg.target_lag == "5 minutes" # this updated - assert altered_iceberg.refresh_mode == "INCREMENTAL" + if self.iceberg: + altered_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_alter") + assert altered_iceberg.snowflake_warehouse == "DBT_TESTING" + assert altered_iceberg.target_lag == "5 minutes" # this updated + assert altered_iceberg.refresh_mode == "INCREMENTAL" - replaced_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_replace") - assert replaced_iceberg.snowflake_warehouse == "DBT_TESTING" - assert replaced_iceberg.target_lag == "2 minutes" - assert replaced_iceberg.refresh_mode == "FULL" # this updated + replaced_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_replace") + assert replaced_iceberg.snowflake_warehouse == "DBT_TESTING" + assert replaced_iceberg.target_lag == "2 minutes" + assert replaced_iceberg.refresh_mode == "FULL" # this updated - @staticmethod - def assert_changes_are_not_applied(project): + def assert_changes_are_not_applied(self, project): altered = describe_dynamic_table(project, "dynamic_table_alter") assert altered.snowflake_warehouse == "DBT_TESTING" assert altered.target_lag == "2 minutes" # this would have updated, but didn't @@ -81,17 +91,18 @@ def assert_changes_are_not_applied(project): assert replaced.target_lag == "2 minutes" assert replaced.refresh_mode == "INCREMENTAL" # this would have updated, but didn't - altered_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_alter") - assert altered_iceberg.snowflake_warehouse == "DBT_TESTING" - assert altered_iceberg.target_lag == "2 minutes" # this would have updated, but didn't - assert altered_iceberg.refresh_mode == "INCREMENTAL" + if self.iceberg: + altered_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_alter") + assert altered_iceberg.snowflake_warehouse == "DBT_TESTING" + assert altered_iceberg.target_lag == "2 minutes" # this would have updated, but didn't + assert altered_iceberg.refresh_mode == "INCREMENTAL" - replaced_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_replace") - assert replaced_iceberg.snowflake_warehouse == "DBT_TESTING" - assert replaced_iceberg.target_lag == "2 minutes" - assert ( - replaced_iceberg.refresh_mode == "INCREMENTAL" - ) # this would have updated, but didn't + replaced_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_replace") + assert replaced_iceberg.snowflake_warehouse == "DBT_TESTING" + assert replaced_iceberg.target_lag == "2 minutes" + assert ( + replaced_iceberg.refresh_mode == "INCREMENTAL" + ) # this would have updated, but didn't def test_full_refresh_is_always_successful(self, project): # this always passes and always changes the configuration, regardless of on_configuration_change @@ -111,6 +122,17 @@ def test_changes_are_applied(self, project): self.assert_changes_are_applied(project) +class TestChangesApplyIcebergOn(TestChangesApply): + iceberg = True + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": {"on_configuration_change": "apply"}, + "flags": {"enable_iceberg_materializations": True}, + } + + class TestChangesContinue(Changes): @pytest.fixture(scope="class") def project_config_update(self): @@ -122,6 +144,17 @@ def test_changes_are_not_applied(self, project): self.assert_changes_are_not_applied(project) +class TestChangesContinueIcebergOn(TestChangesContinue): + iceberg = True + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": {"on_configuration_change": "continue"}, + "flags": {"enable_iceberg_materializations": True}, + } + + class TestChangesFail(Changes): @pytest.fixture(scope="class") def project_config_update(self): @@ -131,3 +164,14 @@ def test_changes_are_not_applied(self, project): # this fails and does not change the configuration run_dbt(["run"], expect_pass=False) self.assert_changes_are_not_applied(project) + + +class TestChangesFailIcebergOn(TestChangesFail): + iceberg = True + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": {"on_configuration_change": "fail"}, + "flags": {"enable_iceberg_materializations": True}, + } diff --git a/tests/functional/relation_tests/test_relation_type_change.py b/tests/functional/relation_tests/test_relation_type_change.py index b41bb19d8..f92c73641 100644 --- a/tests/functional/relation_tests/test_relation_type_change.py +++ b/tests/functional/relation_tests/test_relation_type_change.py @@ -48,26 +48,52 @@ def error_message(self) -> str: class TestRelationTypeChange: + @staticmethod + def include(scenario) -> bool: + return ( + scenario.initial.table_format != "iceberg" and scenario.final.table_format != "iceberg" + ) + @pytest.fixture(scope="class", autouse=True) def seeds(self): return {"my_seed.csv": models.SEED} @pytest.fixture(scope="class", autouse=True) def models(self): - yield {f"{scenario.name}.sql": scenario.initial.model for scenario in scenarios} + yield { + f"{scenario.name}.sql": scenario.initial.model + for scenario in scenarios + if self.include(scenario) + } @pytest.fixture(scope="class", autouse=True) def setup(self, project): run_dbt(["seed"]) run_dbt(["run"]) for scenario in scenarios: - update_model(project, scenario.name, scenario.final.model) + if self.include(scenario): + update_model(project, scenario.name, scenario.final.model) run_dbt(["run"]) @pytest.mark.parametrize("scenario", scenarios, ids=[scenario.name for scenario in scenarios]) def test_replace(self, project, scenario): - relation_type = query_relation_type(project, scenario.name) - assert relation_type == scenario.final.relation_type, scenario.error_message - if relation_type == "dynamic_table": - dynamic_table = describe_dynamic_table(project, scenario.name) - assert dynamic_table.catalog.table_format == scenario.final.table_format + if self.include(scenario): + relation_type = query_relation_type(project, scenario.name) + assert relation_type == scenario.final.relation_type, scenario.error_message + if relation_type == "dynamic_table": + dynamic_table = describe_dynamic_table(project, scenario.name) + assert dynamic_table.catalog.table_format == scenario.final.table_format + else: + pytest.skip() + + +class TestRelationTypeChangeIcebergOn(TestRelationTypeChange): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"enable_iceberg_materializations": True}} + + @staticmethod + def include(scenario) -> bool: + return ( + scenario.initial.table_format == "iceberg" or scenario.final.table_format == "iceberg" + ) From 24f7beeff271757d103d787c3328fdafbc53411a Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Thu, 26 Sep 2024 15:33:40 -0400 Subject: [PATCH 20/29] use direct assignment instead of dict.update --- dbt/adapters/snowflake/relation_configs/catalog.py | 12 ++++++------ .../snowflake/relation_configs/dynamic_table.py | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/dbt/adapters/snowflake/relation_configs/catalog.py b/dbt/adapters/snowflake/relation_configs/catalog.py index 83e2c860c..02842b476 100644 --- a/dbt/adapters/snowflake/relation_configs/catalog.py +++ b/dbt/adapters/snowflake/relation_configs/catalog.py @@ -65,7 +65,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self: "base_location": config_dict.get("base_location"), } if table_format := config_dict.get("table_format"): - kwargs_dict.update({"table_format": TableFormat(table_format)}) + kwargs_dict["table_format"] = TableFormat(table_format) return super().from_dict(kwargs_dict) @classmethod @@ -80,10 +80,10 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any } if external_volume := relation_config.config.extra.get("external_volume"): - config_dict.update({"external_volume": external_volume}) + config_dict["external_volume"] = external_volume if base_location := relation_config.config.extra.get("base_location_subpath"): - config_dict.update({"base_location": base_location}) + config_dict["base_location"] = base_location return config_dict @@ -101,13 +101,13 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, config_dict = {"table_format": "iceberg"} if name := catalog.get("catalog_name"): - config_dict.update({"name": name}) + config_dict["name"] = name if external_volume := catalog.get("external_volume_name"): - config_dict.update({"external_volume": external_volume}) + config_dict["external_volume"] = external_volume if base_location := catalog.get("base_location"): - config_dict.update({"base_location": base_location}) + config_dict["base_location"] = base_location return config_dict diff --git a/dbt/adapters/snowflake/relation_configs/dynamic_table.py b/dbt/adapters/snowflake/relation_configs/dynamic_table.py index 61a4cafa3..7361df80a 100644 --- a/dbt/adapters/snowflake/relation_configs/dynamic_table.py +++ b/dbt/adapters/snowflake/relation_configs/dynamic_table.py @@ -95,10 +95,10 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any } if refresh_mode := relation_config.config.extra.get("refresh_mode"): - config_dict.update(refresh_mode=refresh_mode.upper()) + config_dict["refresh_mode"] = refresh_mode.upper() if initialize := relation_config.config.extra.get("initialize"): - config_dict.update(initialize=initialize.upper()) + config_dict["initialize"] = initialize.upper() return config_dict From 23419dd689b8721037da1f4363f390fba11f124e Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Thu, 26 Sep 2024 17:50:47 -0400 Subject: [PATCH 21/29] update how the catalog gets built in the default scenario --- .../snowflake/relation_configs/catalog.py | 27 +++++++++---------- .../relations/dynamic_table/describe.sql | 8 +++--- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/dbt/adapters/snowflake/relation_configs/catalog.py b/dbt/adapters/snowflake/relation_configs/catalog.py index 02842b476..2a4ce8851 100644 --- a/dbt/adapters/snowflake/relation_configs/catalog.py +++ b/dbt/adapters/snowflake/relation_configs/catalog.py @@ -72,7 +72,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self: def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]: if relation_config.config.extra.get("table_format") is None: - return {"table_format": "default"} + return {} config_dict = { "table_format": relation_config.config.extra.get("table_format"), @@ -89,25 +89,24 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any @classmethod def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: - catalog_results: "agate.Table" = relation_results["catalog"] + try: + catalog_results: "agate.Table" = relation_results["catalog"] + except KeyError: + return {} - if catalog_results is None or len(catalog_results) == 0: - return {"table_format": "default"} + if len(catalog_results) == 0: + return {} # for now, if we get catalog results, it's because this is an iceberg table # this is because we only run `show iceberg tables` to get catalog metadata # this will need to be updated once this is in `show objects` catalog: "agate.Row" = catalog_results.rows[0] - config_dict = {"table_format": "iceberg"} - - if name := catalog.get("catalog_name"): - config_dict["name"] = name - - if external_volume := catalog.get("external_volume_name"): - config_dict["external_volume"] = external_volume - - if base_location := catalog.get("base_location"): - config_dict["base_location"] = base_location + config_dict = { + "table_format": "iceberg", + "name": catalog.get("catalog_name"), + "external_volume": catalog.get("external_volume_name"), + "base_location": catalog.get("base_location"), + } return config_dict diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql b/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql index 77e3a8e52..69b43fce7 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql @@ -24,15 +24,13 @@ "refresh_mode" from table(result_scan(last_query_id())) {%- endset %} - {% set _dynamic_table = run_query(_dynamic_table_sql) %} + {% set results = {'dynamic_table': run_query(_dynamic_table_sql)} %} {% if adapter.behavior.enable_iceberg_materializations.no_warn %} - {% set _catalog = run_query(_get_describe_iceberg_catalog_sql(relation)) %} - {% else %} - {% set _catalog = none %} + {% set _ = results.update({'catalog': run_query(_get_describe_iceberg_catalog_sql(relation))}) %} {% endif %} - {% do return({'dynamic_table': _dynamic_table, 'catalog': _catalog}) %} + {% do return(results) %} {% endmacro %} From 8265e5e605aed42b5de4491020b04edcaf48cf43 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Thu, 26 Sep 2024 17:56:01 -0400 Subject: [PATCH 22/29] remove string building from Model.name --- tests/functional/relation_tests/test_relation_type_change.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/functional/relation_tests/test_relation_type_change.py b/tests/functional/relation_tests/test_relation_type_change.py index 48cd1d65e..d7ae1936a 100644 --- a/tests/functional/relation_tests/test_relation_type_change.py +++ b/tests/functional/relation_tests/test_relation_type_change.py @@ -18,9 +18,10 @@ class Model: @property def name(self) -> str: - name = f"{self.relation_type}" if self.table_format: - name += f"_{self.table_format}" + name = f"{self.relation_type}_{self.table_format}" + else: + name = f"{self.relation_type}" return name From 2c91dd4ba919d6af9d81d593337ce5fcfcc1c888 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Thu, 26 Sep 2024 17:57:32 -0400 Subject: [PATCH 23/29] stop testing odd target lag inputs which we don't want to support anyway --- .../relation_tests/dynamic_table_tests/models.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/functional/relation_tests/dynamic_table_tests/models.py b/tests/functional/relation_tests/dynamic_table_tests/models.py index 6279850cc..4dcd6cf48 100644 --- a/tests/functional/relation_tests/dynamic_table_tests/models.py +++ b/tests/functional/relation_tests/dynamic_table_tests/models.py @@ -10,7 +10,7 @@ {{ config( materialized='dynamic_table', snowflake_warehouse='DBT_TESTING', - target_lag='2 minutes', + target_lag='2 minutes', refresh_mode='INCREMENTAL', ) }} select * from {{ ref('my_seed') }} @@ -32,7 +32,7 @@ {{ config( materialized='dynamic_table', snowflake_warehouse='DBT_TESTING', - target_lag='2 minutes', + target_lag='2 minutes', refresh_mode='INCREMENTAL', table_format="iceberg", external_volume="s3_iceberg_snow", @@ -46,7 +46,7 @@ {{ config( materialized='dynamic_table', snowflake_warehouse='DBT_TESTING', - target_lag='5 minutes', + target_lag='5 minutes', refresh_mode='INCREMENTAL', ) }} select * from {{ ref('my_seed') }} @@ -57,7 +57,7 @@ {{ config( materialized='dynamic_table', snowflake_warehouse='DBT_TESTING', - target_lag='2 minutes', + target_lag='2 minutes', refresh_mode='FULL', ) }} select * from {{ ref('my_seed') }} From 529b875fabcbad887f99c9d0f1a4b6727589a728 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Thu, 26 Sep 2024 18:06:38 -0400 Subject: [PATCH 24/29] comments and formatting --- dbt/adapters/snowflake/relation_configs/catalog.py | 3 +++ .../snowflake/macros/relations/dynamic_table/describe.sql | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/snowflake/relation_configs/catalog.py b/dbt/adapters/snowflake/relation_configs/catalog.py index 2a4ce8851..09e338635 100644 --- a/dbt/adapters/snowflake/relation_configs/catalog.py +++ b/dbt/adapters/snowflake/relation_configs/catalog.py @@ -89,12 +89,15 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any @classmethod def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: + # this try block can be removed once enable_iceberg_materializations is retired try: catalog_results: "agate.Table" = relation_results["catalog"] except KeyError: + # this happens when `enable_iceberg_materializations` is turned off return {} if len(catalog_results) == 0: + # this happens when the dynamic table is a standard dynamic table (e.g. not iceberg) return {} # for now, if we get catalog results, it's because this is an iceberg table diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql b/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql index 69b43fce7..b5c49ad37 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql @@ -43,7 +43,7 @@ -- Returns: -- A valid DQL statement that will return metadata associated with an iceberg catalog -#} -show iceberg tables + show iceberg tables like '{{ relation.identifier }}' in schema {{ relation.database }}.{{ relation.schema }} ; From f0684bfa76cd8269c49f229c674bc0eabad5ca6d Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Thu, 26 Sep 2024 18:09:40 -0400 Subject: [PATCH 25/29] comments and formatting --- tests/functional/relation_tests/models.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/functional/relation_tests/models.py b/tests/functional/relation_tests/models.py index 5e271c49f..63dfff045 100644 --- a/tests/functional/relation_tests/models.py +++ b/tests/functional/relation_tests/models.py @@ -46,7 +46,6 @@ select * from {{ ref('my_seed') }} """ - ICEBERG_TABLE = """ {{ config( materialized='table', From 1d89f113e438ed8b6ba75cacab989e4bc6a1a80f Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Fri, 27 Sep 2024 14:37:01 -0400 Subject: [PATCH 26/29] add standard incremental tables into the relation swap scenarios --- tests/functional/relation_tests/models.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/functional/relation_tests/models.py b/tests/functional/relation_tests/models.py index 63dfff045..7b0050d11 100644 --- a/tests/functional/relation_tests/models.py +++ b/tests/functional/relation_tests/models.py @@ -55,7 +55,7 @@ select * from {{ ref('my_seed') }} """ -ICEBERG_INCREMENTAL_TABLE = """ +INCREMENTAL_ICEBERG_TABLE = """ {{ config( materialized='incremental', table_format='iceberg', @@ -65,3 +65,13 @@ ) }} select * from {{ ref('my_seed') }} """ + + +INCREMENTAL_TABLE = """ +{{ config( + materialized='incremental', + incremental_strategy='append', + unique_key="id", +) }} +select * from {{ ref('my_seed') }} +""" From 989c072bfcdfce214eeb620031f00c0adbf6334b Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Fri, 27 Sep 2024 14:37:34 -0400 Subject: [PATCH 27/29] account for the fact that snowflake does not support renaming iceberg relations --- dbt/adapters/snowflake/relation.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dbt/adapters/snowflake/relation.py b/dbt/adapters/snowflake/relation.py index 148ae0869..b6924b9b3 100644 --- a/dbt/adapters/snowflake/relation.py +++ b/dbt/adapters/snowflake/relation.py @@ -139,6 +139,14 @@ def as_case_sensitive(self) -> "SnowflakeRelation": return self.replace_path(**path_part_map) + @property + def can_be_renamed(self) -> bool: + """ + Standard tables and dynamic tables can be renamed, but Snowflake does not support renaming iceberg relations. + The iceberg standard does support renaming, so this may change in the future. + """ + return self.type in self.renameable_relations and not self.is_iceberg_format + def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) -> str: """ This macro renders the appropriate DDL prefix during the create_table_as From 58819861626080ca85a28e4d846e88c111e41b7a Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Fri, 27 Sep 2024 14:38:51 -0400 Subject: [PATCH 28/29] account for all scenarios when swapping relation types, including those which currently require a full refresh --- .../test_relation_type_change.py | 129 ++++++++++++++---- 1 file changed, 101 insertions(+), 28 deletions(-) diff --git a/tests/functional/relation_tests/test_relation_type_change.py b/tests/functional/relation_tests/test_relation_type_change.py index d7ae1936a..93ee913b6 100644 --- a/tests/functional/relation_tests/test_relation_type_change.py +++ b/tests/functional/relation_tests/test_relation_type_change.py @@ -13,17 +13,25 @@ class Model: model: str relation_type: str - table_format: Optional[str] = None - incremental: Optional[bool] = None + table_format: Optional[str] = "default" + is_incremental: Optional[bool] = False @property def name(self) -> str: - if self.table_format: - name = f"{self.relation_type}_{self.table_format}" + if self.is_incremental: + name = f"{self.relation_type}_{self.table_format}_incremental" else: - name = f"{self.relation_type}" + name = f"{self.relation_type}_{self.table_format}" return name + @property + def is_iceberg(self) -> bool: + return self.table_format == "iceberg" + + @property + def is_standard_table(self) -> bool: + return self.relation_type == "table" and not self.is_incremental + @dataclass class Scenario: @@ -38,24 +46,38 @@ def name(self) -> str: def error_message(self) -> str: return f"Failed when migrating from: {self.initial.name} to: {self.final.name}" + @property + def uses_iceberg(self) -> bool: + return any([self.initial.is_iceberg, self.final.is_iceberg]) + relations = [ Model(models.VIEW, "view"), Model(models.TABLE, "table", "default"), + Model(models.INCREMENTAL_TABLE, "table", "default", is_incremental=True), Model(models.DYNAMIC_TABLE, "dynamic_table", "default"), - Model(models.DYNAMIC_ICEBERG_TABLE, "dynamic_table", "iceberg"), Model(models.ICEBERG_TABLE, "table", "iceberg"), - Model(models.ICEBERG_INCREMENTAL_TABLE, "table", "iceberg", incremental=True), + Model(models.INCREMENTAL_ICEBERG_TABLE, "table", "iceberg", is_incremental=True), + Model(models.DYNAMIC_ICEBERG_TABLE, "dynamic_table", "iceberg"), ] scenarios = [Scenario(*scenario) for scenario in product(relations, relations)] class TestRelationTypeChange: + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"enable_iceberg_materializations": False}} @staticmethod def include(scenario) -> bool: - return ( - scenario.initial.table_format != "iceberg" and scenario.final.table_format != "iceberg" + """ + This condition is the complement of TestRelationTypeChangeFullRefreshRequired, given `not scenario.uses_iceberg` + """ + return not scenario.uses_iceberg and not any( + [ + scenario.initial.relation_type == "dynamic_table" + and scenario.final.is_incremental, + ] ) @pytest.fixture(scope="class", autouse=True) @@ -77,7 +99,11 @@ def setup(self, project): for scenario in scenarios: if self.include(scenario): update_model(project, scenario.name, scenario.final.model) - run_dbt(["run"]) + # allow for dbt to fail so that we can see which scenarios pass and which scenarios fail + try: + run_dbt(["run"], expect_pass=False) + except Exception: + pass @pytest.mark.parametrize("scenario", scenarios, ids=[scenario.name for scenario in scenarios]) def test_replace(self, project, scenario): @@ -91,6 +117,28 @@ def test_replace(self, project, scenario): pytest.skip() +class TestRelationTypeChangeFullRefreshRequired(TestRelationTypeChange): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": {"enable_iceberg_materializations": False}, + "models": {"full_refresh": True}, + } + + @staticmethod + def include(scenario) -> bool: + """ + These are unhandled scenarios, given `not scenario.uses_iceberg` + """ + return not scenario.uses_iceberg and any( + [ + # we can't swap from an incremental to a dynamic table because the materialization does not handle this case + scenario.initial.relation_type == "dynamic_table" + and scenario.final.is_incremental, + ] + ) + + class TestRelationTypeChangeIcebergOn(TestRelationTypeChange): @pytest.fixture(scope="class") def project_config_update(self): @@ -99,23 +147,48 @@ def project_config_update(self): @staticmethod def include(scenario) -> bool: """ - Upon adding the logic needed for seamless transitions to and from incremental models without data loss, we can coalesce these test cases. + This condition is the complement of TestRelationTypeChangeIcebergOnFullRefreshRequired, given `scenario.uses_iceberg` + """ + return scenario.uses_iceberg and not any( + [ + # we can only swap incremental to table and back if both are iceberg + scenario.initial.is_incremental + and scenario.final.is_standard_table + and scenario.initial.table_format != scenario.final.table_format, + scenario.initial.is_standard_table + and scenario.final.is_incremental + and scenario.initial.table_format != scenario.final.table_format, + # we can't swap from an incremental to a dynamic table because the materialization does not handle this case + scenario.initial.relation_type == "dynamic_table" + and scenario.final.is_incremental, + ] + ) + + +class TestRelationTypeChangeIcebergOnFullRefreshRequired(TestRelationTypeChange): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": {"enable_iceberg_materializations": True}, + "models": {"full_refresh": True}, + } + + @staticmethod + def include(scenario) -> bool: + """ + These are unhandled scenarios, given `scenario.uses_iceberg` """ - return any( - ( - # scenario 1: Everything that doesn't include incremental relations on Iceberg - ( - ( - scenario.initial.table_format == "iceberg" - or scenario.final.table_format == "iceberg" - ) - and not scenario.initial.incremental - and not scenario.final.incremental - ), - # scenario 2: Iceberg Incremental swaps allowed - ( - scenario.initial.table_format == "iceberg" - and scenario.final.table_format == "iceberg" - ), - ) + return scenario.uses_iceberg and any( + [ + # we can only swap incremental to table and back if both are iceberg + scenario.initial.is_incremental + and scenario.final.is_standard_table + and scenario.initial.table_format != scenario.final.table_format, + scenario.initial.is_standard_table + and scenario.final.is_incremental + and scenario.initial.table_format != scenario.final.table_format, + # we can't swap from an incremental to a dynamic table because the materialization does not handle this case + scenario.initial.relation_type == "dynamic_table" + and scenario.final.is_incremental, + ] ) From a42addaf8b3982375da712b7c169c1787435c8cf Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Fri, 27 Sep 2024 14:54:19 -0400 Subject: [PATCH 29/29] make it clearer which scenarios are included in each run and why by pulling the criteria into one function --- .../test_relation_type_change.py | 73 +++++-------------- 1 file changed, 20 insertions(+), 53 deletions(-) diff --git a/tests/functional/relation_tests/test_relation_type_change.py b/tests/functional/relation_tests/test_relation_type_change.py index 93ee913b6..1024a92ca 100644 --- a/tests/functional/relation_tests/test_relation_type_change.py +++ b/tests/functional/relation_tests/test_relation_type_change.py @@ -63,6 +63,22 @@ def uses_iceberg(self) -> bool: scenarios = [Scenario(*scenario) for scenario in product(relations, relations)] +def requires_full_refresh(scenario) -> bool: + return any( + [ + # we can only swap incremental to table and back if both are iceberg + scenario.initial.is_incremental + and scenario.final.is_standard_table + and scenario.initial.table_format != scenario.final.table_format, + scenario.initial.is_standard_table + and scenario.final.is_incremental + and scenario.initial.table_format != scenario.final.table_format, + # we can't swap from an incremental to a dynamic table because the materialization does not handle this case + scenario.initial.relation_type == "dynamic_table" and scenario.final.is_incremental, + ] + ) + + class TestRelationTypeChange: @pytest.fixture(scope="class") def project_config_update(self): @@ -70,15 +86,7 @@ def project_config_update(self): @staticmethod def include(scenario) -> bool: - """ - This condition is the complement of TestRelationTypeChangeFullRefreshRequired, given `not scenario.uses_iceberg` - """ - return not scenario.uses_iceberg and not any( - [ - scenario.initial.relation_type == "dynamic_table" - and scenario.final.is_incremental, - ] - ) + return not scenario.uses_iceberg and not requires_full_refresh(scenario) @pytest.fixture(scope="class", autouse=True) def seeds(self): @@ -127,16 +135,7 @@ def project_config_update(self): @staticmethod def include(scenario) -> bool: - """ - These are unhandled scenarios, given `not scenario.uses_iceberg` - """ - return not scenario.uses_iceberg and any( - [ - # we can't swap from an incremental to a dynamic table because the materialization does not handle this case - scenario.initial.relation_type == "dynamic_table" - and scenario.final.is_incremental, - ] - ) + return not scenario.uses_iceberg and requires_full_refresh(scenario) class TestRelationTypeChangeIcebergOn(TestRelationTypeChange): @@ -146,23 +145,7 @@ def project_config_update(self): @staticmethod def include(scenario) -> bool: - """ - This condition is the complement of TestRelationTypeChangeIcebergOnFullRefreshRequired, given `scenario.uses_iceberg` - """ - return scenario.uses_iceberg and not any( - [ - # we can only swap incremental to table and back if both are iceberg - scenario.initial.is_incremental - and scenario.final.is_standard_table - and scenario.initial.table_format != scenario.final.table_format, - scenario.initial.is_standard_table - and scenario.final.is_incremental - and scenario.initial.table_format != scenario.final.table_format, - # we can't swap from an incremental to a dynamic table because the materialization does not handle this case - scenario.initial.relation_type == "dynamic_table" - and scenario.final.is_incremental, - ] - ) + return scenario.uses_iceberg and not requires_full_refresh(scenario) class TestRelationTypeChangeIcebergOnFullRefreshRequired(TestRelationTypeChange): @@ -175,20 +158,4 @@ def project_config_update(self): @staticmethod def include(scenario) -> bool: - """ - These are unhandled scenarios, given `scenario.uses_iceberg` - """ - return scenario.uses_iceberg and any( - [ - # we can only swap incremental to table and back if both are iceberg - scenario.initial.is_incremental - and scenario.final.is_standard_table - and scenario.initial.table_format != scenario.final.table_format, - scenario.initial.is_standard_table - and scenario.final.is_incremental - and scenario.initial.table_format != scenario.final.table_format, - # we can't swap from an incremental to a dynamic table because the materialization does not handle this case - scenario.initial.relation_type == "dynamic_table" - and scenario.final.is_incremental, - ] - ) + return scenario.uses_iceberg and requires_full_refresh(scenario)