Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replace mode to incremental materialization #406

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/unreleased/Features-20240430-120758.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Features
body: on_table_exists modes (rename, drop, replace) determine how table is recreated
during a full-refresh run of an incremental model
time: 2024-04-30T12:07:58.484083+02:00
custom:
Author: damian3031
Issue: "395"
PR: "406"
52 changes: 31 additions & 21 deletions dbt/include/trino/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,42 @@

{% materialization incremental, adapter='trino', supported_languages=['sql'] -%}

{#-- Set vars --#}
{#-- relations --#}
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}
{#-- The temp relation will be a view (faster) or temp table, depending on upsert/merge strategy --#}
{%- set tmp_relation_type = get_incremental_tmp_relation_type(incremental_strategy, unique_key, language) -%}
{%- set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type) -%}
{%- set intermediate_relation = make_intermediate_relation(target_relation) -%}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}

{#-- configs --#}
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}
{%- set language = model['language'] -%}
{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
{%- set on_table_exists = config.get('on_table_exists', 'rename') -%}
{% if on_table_exists not in ['rename', 'drop', 'replace'] %}
{%- set log_message = 'Invalid value for on_table_exists (%s) specified. Setting default value (%s).' % (on_table_exists, 'rename') -%}
{% do log(log_message) %}
{%- set on_table_exists = 'rename' -%}
{% endif %}

{#-- The temp relation will be a view (faster) or temp table, depending on upsert/merge strategy --#}
{%- set unique_key = config.get('unique_key') -%}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set tmp_relation_type = get_incremental_tmp_relation_type(incremental_strategy, unique_key, language) %}
{% set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type) %}
-- the temp_ relation should not already exist in the database; get_relation
{#-- the temp_ and backup_ relation should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
-- later, before we try to use this name for the current operation.
-- later, before we try to use this name for the current operation.#}
{%- set preexisting_tmp_relation = load_cached_relation(tmp_relation)-%}
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}

{#--- grab current tables grants config for comparision later on#}
{% set grant_config = config.get('grants') %}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

-- drop the temp relation if it exists already in the database
-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(preexisting_tmp_relation) }}
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}

{{ run_hooks(pre_hooks) }}

Expand All @@ -58,11 +72,8 @@
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}
{% elif full_refresh_mode %}
{#-- Can't replace a table - we must drop --#}
{% do adapter.drop_relation(existing_relation) %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}
{#-- Create table with given `on_table_exists` mode #}
{% do on_table_exists_logic(on_table_exists, existing_relation, intermediate_relation, backup_relation, target_relation) %}

{% else %}
{#-- Create the temp relation, either as a view or as a temp table --#}
Expand All @@ -86,6 +97,7 @@
{% endif %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': tmp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
Expand All @@ -94,9 +106,7 @@
{{ strategy_sql_macro_func(strategy_arg_dict) }}
{%- endcall -%}
{% endif %}

{% do drop_relation_if_exists(tmp_relation) %}

{% do drop_relation_if_exists(tmp_relation) %}
{{ run_hooks(post_hooks) }}

{% set should_revoke =
Expand Down
27 changes: 17 additions & 10 deletions dbt/include/trino/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@
-- grab current tables grants config for comparision later on
{% set grant_config = config.get('grants') %}

{#-- Create table with given `on_table_exists` mode #}
{% do on_table_exists_logic(on_table_exists, existing_relation, intermediate_relation, backup_relation, target_relation) %}

{% do persist_docs(target_relation, model) %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}


{% macro on_table_exists_logic(on_table_exists, existing_relation, intermediate_relation, backup_relation, target_relation) -%}
{#-- Create table with given `on_table_exists` mode #}
{% if on_table_exists == 'rename' %}
{#-- build modeldock #}
{% call statement('main') -%}
Expand Down Expand Up @@ -64,13 +80,4 @@
{{ create_table_as(False, target_relation, sql, True) }}
{%- endcall %}
{% endif %}

{% do persist_docs(target_relation, model) %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
{% endmacro %}
94 changes: 94 additions & 0 deletions tests/functional/adapter/materialization/test_on_table_exists.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,49 @@ def test_run_seed_test(self, project):
check_relations_equal(project.adapter, ["seed", "materialization"])


class TestOnTableExistsDropIncrementalFullRefresh(BaseOnTableExists):
"""
Testing on_table_exists = `drop` configuration for incremental materialization and full refresh flag,
using dbt seed, run and tests commands and validate data load correctness.
"""

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "table_drop",
"models": {"+materialized": "incremental", "+on_table_exists": "drop"},
"seeds": {
"+column_types": {"some_date": "timestamp(6)"},
},
}

# The actual sequence of dbt commands and assertions
# pytest will take care of all "setup" + "teardown"
def test_run_seed_test(self, project):
# seed seeds
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1
# run models two times to check on_table_exists = 'drop'
results, logs = run_dbt_and_capture(["--debug", "run"], expect_pass=True)
assert len(results) == 1
assert (
f'drop table if exists "{project.database}"."{project.test_schema}"."materialization"'
not in logs
)
results, logs = run_dbt_and_capture(["--debug", "run", "--full-refresh"], expect_pass=True)
assert len(results) == 1
assert (
f'drop table if exists "{project.database}"."{project.test_schema}"."materialization"'
in logs
)
# test tests
results = run_dbt(["test"], expect_pass=True)
assert len(results) == 3

# check if the data was loaded correctly
check_relations_equal(project.adapter, ["seed", "materialization"])


class BaseOnTableExistsReplace(BaseOnTableExists):
"""
Testing on_table_exists = `replace` configuration for table materialization,
Expand Down Expand Up @@ -105,3 +148,54 @@ class TestOnTableExistsReplaceIceberg(BaseOnTableExistsReplace):
@pytest.mark.delta
class TestOnTableExistsReplaceDelta(BaseOnTableExistsReplace):
pass


class BaseOnTableExistsReplaceIncrementalFullRefresh(BaseOnTableExists):
"""
Testing on_table_exists = `replace` configuration for incremental materialization and full refresh flag,
using dbt seed, run and tests commands and validate data load correctness.
"""

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "table_drop",
"models": {"+materialized": "incremental", "+on_table_exists": "replace"},
"seeds": {
"+column_types": {"some_date": "timestamp(6)"},
},
}

# The actual sequence of dbt commands and assertions
# pytest will take care of all "setup" + "teardown"
def test_run_seed_test(self, project):
# seed seeds
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1
# run models two times to check on_table_exists = 'replace'
results, logs = run_dbt_and_capture(["--debug", "run"], expect_pass=True)
assert len(results) == 1
assert "create or replace table" not in logs
results, logs = run_dbt_and_capture(["--debug", "run", "--full-refresh"], expect_pass=True)
assert len(results) == 1
assert "create or replace table" in logs
# test tests
results = run_dbt(["test"], expect_pass=True)
assert len(results) == 3

# check if the data was loaded correctly
check_relations_equal(project.adapter, ["seed", "materialization"])


@pytest.mark.iceberg
class TestOnTableExistsReplaceIcebergIncrementalFullRefresh(
BaseOnTableExistsReplaceIncrementalFullRefresh
):
pass


@pytest.mark.delta
class TestOnTableExistsReplaceDeltaIncrementalFullRefresh(
BaseOnTableExistsReplaceIncrementalFullRefresh
):
pass
Loading