Skip to content

Commit

Permalink
Change table materialization logic when on_table_exists = 'rename'
Browse files Browse the repository at this point in the history
  • Loading branch information
hovaesco committed Jul 29, 2024
1 parent 98d1fb5 commit 0ea5860
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 15 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20240729-101114.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Change table materialization logic when on_table_exists = 'rename'
time: 2024-07-29T10:11:14.451171+02:00
custom:
Author: hovaesco
Issue: "423"
PR: "425"
28 changes: 18 additions & 10 deletions dbt/include/trino/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,28 @@
{% macro on_table_exists_logic(on_table_exists, existing_relation, intermediate_relation, backup_relation, target_relation) -%}
{#-- Create table with given `on_table_exists` mode #}
{% if on_table_exists == 'rename' %}
{#-- build modeldock #}
{% call statement('main') -%}
{{ create_table_as(False, intermediate_relation, sql) }}
{%- endcall %}

{#-- cleanup #}
{% if existing_relation is not none %}
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{#-- table does not exists #}
{% if existing_relation is none %}
{% call statement('main') -%}
{{ create_table_as(False, target_relation, sql) }}
{%- endcall %}
{% endif %}

{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{#-- table does exists #}
{% if existing_relation is not none %}
{#-- build modeldock #}
{% call statement('main') -%}
{{ create_table_as(False, intermediate_relation, sql) }}
{%- endcall %}

{#-- cleanup #}
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{{ adapter.rename_relation(intermediate_relation, target_relation) }}

{#-- finally, drop the existing/backup relation after the commit #}
{{ drop_relation_if_exists(backup_relation) }}
{#-- finally, drop the existing/backup relation after the commit #}
{{ drop_relation_if_exists(backup_relation) }}
{% endif %}

{% elif on_table_exists == 'drop' %}
{#-- cleanup #}
Expand Down
98 changes: 93 additions & 5 deletions tests/functional/adapter/materialization/test_on_table_exists.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,99 @@ def models(self):
}


class TestOnTableExistsRename(BaseOnTableExists):
"""
Testing on_table_exists = `rename` configuration for table materialization,
using dbt seed, run and tests commands and validate data load correctness.
"""

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "table_rename",
"models": {"+materialized": "table", "+on_table_exists": "rename"},
"seeds": {
"+column_types": {"some_date": "timestamp(6)"},
},
}

# The actual sequence of dbt commands and assertions
# pytest will take care of all "setup" + "teardown"
def test_run_seed_test(self, project):
# seed seeds
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1
# run models two times to check on_table_exists = 'rename'
results = run_dbt(["run"], expect_pass=True)
assert len(results) == 1
results = run_dbt(["run"], expect_pass=True)
assert len(results) == 1
# test tests
results = run_dbt(["test"], expect_pass=True)
assert len(results) == 3

# check if the data was loaded correctly
check_relations_equal(project.adapter, ["seed", "materialization"])


@pytest.mark.iceberg
class TestOnTableExistsRenameIceberg(TestOnTableExistsRename):
pass


@pytest.mark.delta
class TestOnTableExistsRenameDelta(TestOnTableExistsRename):
pass


class TestOnTableExistsRenameIncrementalFullRefresh(BaseOnTableExists):
"""
Testing on_table_exists = `rename` configuration for incremental materialization and full refresh flag,
using dbt seed, run and tests commands and validate data load correctness.
"""

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "table_rename",
"models": {"+materialized": "incremental", "+on_table_exists": "rename"},
"seeds": {
"+column_types": {"some_date": "timestamp(6)"},
},
}

# The actual sequence of dbt commands and assertions
# pytest will take care of all "setup" + "teardown"
def test_run_seed_test(self, project):
# seed seeds
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1
# run models two times to check on_table_exists = 'rename'
results, logs = run_dbt_and_capture(["--debug", "run"], expect_pass=True)
assert len(results) == 1
assert (
f'drop table if exists "{project.database}"."{project.test_schema}"."materialization"'
not in logs
)
assert "dbt_tmp" or "dbt_backup" not in logs
results, logs = run_dbt_and_capture(["--debug", "run", "--full-refresh"], expect_pass=True)
assert len(results) == 1
assert "dbt_tmp" or "dbt_backup" in logs
# test tests
results = run_dbt(["test"], expect_pass=True)
assert len(results) == 3

# check if the data was loaded correctly
check_relations_equal(project.adapter, ["seed", "materialization"])


@pytest.mark.iceberg
class TestOnTableExistsRenameIcebergIncrementalFullRefresh(
TestOnTableExistsRenameIncrementalFullRefresh
):
pass


class TestOnTableExistsDrop(BaseOnTableExists):
"""
Testing on_table_exists = `drop` configuration for table materialization,
Expand Down Expand Up @@ -145,11 +238,6 @@ class TestOnTableExistsReplaceIceberg(BaseOnTableExistsReplace):
pass


@pytest.mark.delta
class TestOnTableExistsReplaceDelta(BaseOnTableExistsReplace):
pass


class BaseOnTableExistsReplaceIncrementalFullRefresh(BaseOnTableExists):
"""
Testing on_table_exists = `replace` configuration for incremental materialization and full refresh flag,
Expand Down

0 comments on commit 0ea5860

Please sign in to comment.