Skip to content

Commit

Permalink
Merge branch 'main' into 1.9.latest
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db committed Jan 17, 2025
2 parents 25e4c61 + 45ec259 commit a97b244
Show file tree
Hide file tree
Showing 20 changed files with 488 additions and 77 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
## dbt-databricks 1.9.2 (TBD)

### Features

- Update snapshot materialization to support new snapshot features ([904](https://github.com/databricks/dbt-databricks/pull/904))

### Under the Hood

- Refactor global state reading ([888](https://github.com/databricks/dbt-databricks/pull/888))
- Switch to relation.render() for string interpolation ([903](https://github.com/databricks/dbt-databricks/pull/903))
- Ensure retry defaults for PySQL ([907](https://github.com/databricks/dbt-databricks/pull/907))

## dbt-databricks 1.9.1 (December 16, 2024)

Expand Down
6 changes: 4 additions & 2 deletions dbt/adapters/databricks/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ class DatabricksCredentials(Credentials):
@classmethod
def __pre_deserialize__(cls, data: dict[Any, Any]) -> dict[Any, Any]:
data = super().__pre_deserialize__(data)
if "database" not in data:
data["database"] = None
data.setdefault("database", None)
data.setdefault("connection_parameters", {})
data["connection_parameters"].setdefault("_retry_stop_after_attempts_count", 30)
data["connection_parameters"].setdefault("_retry_delay_max", 60)
return data

def __post_init__(self) -> None:
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/databricks/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
{{ exceptions.raise_compiler_error('Delta format required for dropping columns from tables') }}
{% endif %}
{%- call statement('alter_relation_remove_columns') -%}
ALTER TABLE {{ relation }} DROP COLUMNS ({{ api.Column.format_remove_column_list(remove_columns) }})
ALTER TABLE {{ relation.render() }} DROP COLUMNS ({{ api.Column.format_remove_column_list(remove_columns) }})
{%- endcall -%}
{% endif %}

{% if add_columns %}
{%- call statement('alter_relation_add_columns') -%}
ALTER TABLE {{ relation }} ADD COLUMNS ({{ api.Column.format_add_column_list(add_columns) }})
ALTER TABLE {{ relation.render() }} ADD COLUMNS ({{ api.Column.format_add_column_list(add_columns) }})
{%- endcall -%}
{% endif %}
{% endmacro %}
4 changes: 2 additions & 2 deletions dbt/include/databricks/macros/adapters/persist_docs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{% set comment = column['description'] %}
{% set escaped_comment = comment | replace('\'', '\\\'') %}
{% set comment_query %}
alter table {{ relation }} change column {{ api.Column.get_name(column) }} comment '{{ escaped_comment }}';
alter table {{ relation.render()|lower }} change column {{ api.Column.get_name(column) }} comment '{{ escaped_comment }}';
{% endset %}
{% do run_query(comment_query) %}
{% endfor %}
Expand All @@ -13,7 +13,7 @@

{% macro alter_table_comment(relation, model) %}
{% set comment_query %}
comment on table {{ relation|lower }} is '{{ model.description | replace("'", "\\'") }}'
comment on table {{ relation.render()|lower }} is '{{ model.description | replace("'", "\\'") }}'
{% endset %}
{% do run_query(comment_query) %}
{% endmacro %}
Expand Down
88 changes: 36 additions & 52 deletions dbt/include/databricks/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
@@ -1,27 +1,4 @@
{% macro databricks_build_snapshot_staging_table(strategy, sql, target_relation) %}
{% set tmp_identifier = target_relation.identifier ~ '__dbt_tmp' %}

{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=target_relation.schema,
database=target_relation.database,
type='view') -%}

{% set select = snapshot_staging_table(strategy, sql, target_relation) %}

{# needs to be a non-temp view so that its columns can be ascertained via `describe` #}
{% call statement('build_snapshot_staging_relation') %}
create or replace view {{ tmp_relation }}
as
{{ select }}
{% endcall %}

{% do return(tmp_relation) %}
{% endmacro %}


{% materialization snapshot, adapter='databricks' %}
{%- set config = model['config'] -%}

{%- set target_table = model.get('alias', model.get('name')) -%}

{%- set strategy_name = config.get('strategy') -%}
Expand Down Expand Up @@ -62,47 +39,43 @@
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", model['config'], target_relation_exists) %}

{% if not target_relation_exists %}

{% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %}
{% set build_or_select_sql = build_sql %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% call statement('main') %}
{{ final_sql }}
{% endcall %}

{% do persist_docs(target_relation, model, for_relation=False) %}

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}
{% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %}

{% if target_relation.database is none %}
{% set staging_table = spark_build_snapshot_staging_table(strategy, sql, target_relation) %}
{% else %}
{% set staging_table = databricks_build_snapshot_staging_table(strategy, sql, target_relation) %}
{% endif %}
{{ adapter.assert_valid_snapshot_target_given_strategy(target_relation, columns, strategy) }}

{% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %}
{% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}

-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

{% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %}
{% if unique_key | is_list %}
{% for key in strategy.unique_key %}
{{ remove_columns.append('dbt_unique_key_' + loop.index|string) }}
{{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }}
{% endfor %}
{% endif %}

{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}

{% do create_columns(target_relation, missing_columns) %}

{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}

{% set quoted_source_columns = [] %}
Expand All @@ -117,23 +90,34 @@
)
%}

{% call statement_with_staging_table('main', staging_table) %}
{{ final_sql }}
{% endcall %}
{% endif %}

{% do persist_docs(target_relation, model, for_relation=True) %}

{% endif %}
{{ check_time_data_types(build_or_select_sql) }}

{% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}
{% call statement('main') %}
{{ final_sql }}
{% endcall %}

{% do persist_constraints(target_relation, model) %}
{% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{% if not target_relation_exists %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{% if staging_table is defined %}
{% do post_snapshot(staging_table) %}
{% endif %}

{% do persist_constraints(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}
Expand Down
10 changes: 5 additions & 5 deletions dbt/include/databricks/macros/relations/constraints.sql
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@
{% set column = model.get('columns', {}).get(column_name) %}
{% if column %}
{% set quoted_name = api.Column.get_name(column) %}
{% set stmt = "alter table " ~ relation ~ " change column " ~ quoted_name ~ " set not null " ~ (constraint.expression or "") ~ ";" %}
{% set stmt = "alter table " ~ relation.render() ~ " change column " ~ quoted_name ~ " set not null " ~ (constraint.expression or "") ~ ";" %}
{% do statements.append(stmt) %}
{% else %}
{{ exceptions.warn('not_null constraint on invalid column: ' ~ column_name) }}
Expand Down Expand Up @@ -170,7 +170,7 @@
{{ exceptions.raise_compiler_error("Constraint of type " ~ type ~ " with no `name` provided, and no md5 utility.") }}
{% endif %}
{% endif %}
{% set stmt = "alter table " ~ relation ~ " add constraint " ~ name ~ " primary key(" ~ joined_names ~ ");" %}
{% set stmt = "alter table " ~ relation.render() ~ " add constraint " ~ name ~ " primary key(" ~ joined_names ~ ");" %}
{% do statements.append(stmt) %}
{% elif type == 'foreign_key' %}

Expand All @@ -191,7 +191,7 @@
{% endif %}
{% endif %}

{% set stmt = "alter table " ~ relation ~ " add constraint " ~ name ~ " foreign key" ~ constraint.get('expression') %}
{% set stmt = "alter table " ~ relation.render() ~ " add constraint " ~ name ~ " foreign key" ~ constraint.get('expression') %}
{% else %}
{% set column_names = constraint.get('columns', []) %}
{% if column and not column_names %}
Expand Down Expand Up @@ -227,7 +227,7 @@
{% endif %}
{% endif %}

{% set stmt = "alter table " ~ relation ~ " add constraint " ~ name ~ " foreign key(" ~ joined_names ~ ") references " ~ parent %}
{% set stmt = "alter table " ~ relation.render() ~ " add constraint " ~ name ~ " foreign key(" ~ joined_names ~ ") references " ~ parent %}
{% set parent_columns = constraint.get('to_columns') %}
{% if parent_columns %}
{% set stmt = stmt ~ "(" ~ parent_columns|join(", ") ~ ")"%}
Expand All @@ -251,7 +251,7 @@
{{ exceptions.raise_compiler_error("Constraint of type " ~ type ~ " with no `name` provided, and no md5 utility.") }}
{% endif %}
{% endif %}
{% set stmt = "alter table " ~ relation ~ " add constraint " ~ name ~ " " ~ expression ~ ";" %}
{% set stmt = "alter table " ~ relation.render() ~ " add constraint " ~ name ~ " " ~ expression ~ ";" %}
{% do statements.append(stmt) %}
{% elif constraint.get('warn_unsupported') %}
{{ exceptions.warn("unsupported constraint type: " ~ constraint.type)}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
{%- set cols = [cols] -%}
{%- endif -%}
{%- call statement('set_cluster_by_columns') -%}
ALTER {{ target_relation.type }} {{ target_relation }} CLUSTER BY ({{ cols | join(', ') }})
ALTER {{ target_relation.type }} {{ target_relation.render() }} CLUSTER BY ({{ cols | join(', ') }})
{%- endcall -%}
{%- endif %}
{%- endmacro -%}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@
{% macro get_alter_mv_internal(relation, configuration_changes) %}
{%- set refresh = configuration_changes.changes["refresh"] -%}
-- Currently only schedule can be altered
ALTER MATERIALIZED VIEW {{ relation }}
ALTER MATERIALIZED VIEW {{ relation.render() }}
{{ get_alter_sql_refresh_schedule(refresh.cron, refresh.time_zone_value, refresh.is_altered) -}}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{% macro databricks__drop_materialized_view(relation) -%}
drop materialized view if exists {{ relation }}
drop materialized view if exists {{ relation.render() }}
{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{% macro databricks__refresh_materialized_view(relation) -%}
refresh materialized view {{ relation }}
refresh materialized view {{ relation.render() }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
{%- endmacro %}

{% macro default__drop_streaming_table(relation) -%}
drop table if exists {{ relation }}
drop table if exists {{ relation.render() }}
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{%- endmacro %}

{% macro databricks__refresh_streaming_table(relation, sql) -%}
create or refresh streaming table {{ relation }}
create or refresh streaming table {{ relation.render() }}
as
{{ sql }}
{% endmacro %}
4 changes: 2 additions & 2 deletions dbt/include/databricks/macros/relations/table/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
{%- else -%}
{%- set file_format = config.get('file_format', default='delta') -%}
{% if file_format == 'delta' %}
create or replace table {{ relation }}
create or replace table {{ relation.render() }}
{% else %}
create table {{ relation }}
create table {{ relation.render() }}
{% endif %}
{%- set contract_config = config.get('contract') -%}
{% if contract_config and contract_config.enforced %}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/databricks/macros/relations/table/drop.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{% macro databricks__drop_table(relation) -%}
drop table if exists {{ relation }}
drop table if exists {{ relation.render() }}
{%- endmacro %}
4 changes: 2 additions & 2 deletions dbt/include/databricks/macros/relations/tags.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@
{%- endmacro -%}

{% macro alter_set_tags(relation, tags) -%}
ALTER {{ relation.type }} {{ relation }} SET TAGS (
ALTER {{ relation.type }} {{ relation.render() }} SET TAGS (
{% for tag in tags -%}
'{{ tag }}' = '{{ tags[tag] }}' {%- if not loop.last %}, {% endif -%}
{%- endfor %}
)
{%- endmacro -%}

{% macro alter_unset_tags(relation, tags) -%}
ALTER {{ relation.type }} {{ relation }} UNSET TAGS (
ALTER {{ relation.type }} {{ relation.render() }} UNSET TAGS (
{% for tag in tags -%}
'{{ tag }}' {%- if not loop.last %}, {%- endif %}
{%- endfor %}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/databricks/macros/relations/tblproperties.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
{% set tblproperty_statment = databricks__tblproperties_clause(tblproperties) %}
{% if tblproperty_statment %}
{%- call statement('apply_tblproperties') -%}
ALTER {{ relation.type }} {{ relation }} SET {{ tblproperty_statment}}
ALTER {{ relation.type }} {{ relation.render() }} SET {{ tblproperty_statment}}
{%- endcall -%}
{% endif %}
{%- endmacro -%}
2 changes: 1 addition & 1 deletion dbt/include/databricks/macros/relations/view/create.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro databricks__create_view_as(relation, sql) -%}
create or replace view {{ relation }}
create or replace view {{ relation.render() }}
{% if config.persist_column_docs() -%}
{% set model_columns = model.columns %}
{% set query_columns = get_columns_in_query(sql) %}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/databricks/macros/relations/view/drop.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{% macro databricks__drop_view(relation) -%}
drop view if exists {{ relation }}
drop view if exists {{ relation.render() }}
{%- endmacro %}
Loading

0 comments on commit a97b244

Please sign in to comment.