Skip to content

Commit

Permalink
Support CREATE OR REPLACE in table materialization
Browse files Browse the repository at this point in the history
  • Loading branch information
damian3031 committed Dec 20, 2023
1 parent 9d0508e commit 6f786fa
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 20 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20231129-195500.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Support CREATE OR REPLACE in table materialization
time: 2023-11-29T19:55:00.460963+01:00
custom:
Author: damian3031
Issue: "371"
PR: "377"
12 changes: 9 additions & 3 deletions dbt/include/trino/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,19 @@
{%- endif -%}
{%- endmacro -%}
{% macro trino__create_table_as(temporary, relation, sql) -%}
{% macro trino__create_table_as(temporary, relation, sql, replace=False) -%}
{%- set _properties = config.get('properties') -%}
{%- if replace -%}
{%- set or_replace = ' or replace' -%}
{%- else -%}
{%- set or_replace = '' -%}
{%- endif -%}
{%- set contract_config = config.get('contract') -%}
{%- if contract_config.enforced -%}
create table
create{{ or_replace }} table
{{ relation }}
{{ get_table_columns_and_constraints() }}
{{ get_assert_columns_equivalent(sql) }}
Expand All @@ -122,7 +128,7 @@
{%- else %}
create table {{ relation }}
create{{ or_replace }} table {{ relation }}
{{ comment(model.get('description')) }}
{{ properties(_properties) }}
as (
Expand Down
8 changes: 7 additions & 1 deletion dbt/include/trino/macros/materializations/table.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{% materialization table, adapter = 'trino' %}
{%- set on_table_exists = config.get('on_table_exists', 'rename') -%}
{% if on_table_exists not in ['rename', 'drop'] %}
{% if on_table_exists not in ['rename', 'drop', 'replace'] %}
{%- set log_message = 'Invalid value for on_table_exists (%s) specified. Setting default value (%s).' % (on_table_exists, 'rename') -%}
{% do log(log_message) %}
{%- set on_table_exists = 'rename' -%}
Expand Down Expand Up @@ -57,6 +57,12 @@
{% call statement('main') -%}
{{ create_table_as(False, target_relation, sql) }}
{%- endcall %}

{% elif on_table_exists == 'replace' %}
{#-- build model #}
{% call statement('main') -%}
{{ create_table_as(False, target_relation, sql, True) }}
{%- endcall %}
{% endif %}

{% do persist_docs(target_relation, model) %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from dbt.tests.util import check_relations_equal, run_dbt
from dbt.tests.util import check_relations_equal, run_dbt, run_dbt_and_capture

from tests.functional.adapter.materialization.fixtures import (
model_sql,
Expand All @@ -8,13 +8,29 @@
)


class TestTableDrop:
class BaseOnTableExists:
# everything that goes in the "seeds" directory
@pytest.fixture(scope="class")
def seeds(self):
return {
"seed.csv": seed_csv,
}

# everything that goes in the "models" directory
@pytest.fixture(scope="class")
def models(self):
return {
"materialization.sql": model_sql,
"materialization.yml": profile_yml,
}


class TestOnTableExistsDrop(BaseOnTableExists):
"""
Testing on_table_exists = `drop` configuration for table materialization,
using dbt seed, run and tests commands and validate data load correctness.
"""

# configuration in dbt_project.yml
@pytest.fixture(scope="class")
def project_config_update(self):
return {
Expand All @@ -25,19 +41,42 @@ def project_config_update(self):
},
}

# everything that goes in the "seeds" directory
@pytest.fixture(scope="class")
def seeds(self):
return {
"seed.csv": seed_csv,
}
# The actual sequence of dbt commands and assertions
# pytest will take care of all "setup" + "teardown"
def test_run_seed_test(self, project):
# seed seeds
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1
# run models two times to check on_table_exists = 'drop'
results = run_dbt(["run"], expect_pass=True)
assert len(results) == 1
results = run_dbt(["run"], expect_pass=True)
assert len(results) == 1
# test tests
results = run_dbt(["test"], expect_pass=True)
assert len(results) == 3

# check if the data was loaded correctly
check_relations_equal(project.adapter, ["seed", "materialization"])


# TODO: Enable for SEP, after support for CORTAS will be added
@pytest.mark.skip_engine("starburst_enterprise")
@pytest.mark.iceberg
class TestOnTableExistsReplace(BaseOnTableExists):
"""
Testing on_table_exists = `replace` configuration for table materialization,
using dbt seed, run and tests commands and validate data load correctness.
"""

# everything that goes in the "models" directory
@pytest.fixture(scope="class")
def models(self):
def project_config_update(self):
return {
"materialization.sql": model_sql,
"materialization.yml": profile_yml,
"name": "table_drop",
"models": {"+materialized": "table", "+on_table_exists": "replace"},
"seeds": {
"+column_types": {"some_date": "timestamp(6)"},
},
}

# The actual sequence of dbt commands and assertions
Expand All @@ -46,11 +85,13 @@ def test_run_seed_test(self, project):
# seed seeds
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1
# run models two times to check on_table_exists = 'drop'
results = run_dbt(["run"], expect_pass=True)
# run models two times to check on_table_exists = 'replace'
results, logs = run_dbt_and_capture(["--debug", "run"], expect_pass=True)
assert len(results) == 1
results = run_dbt(["run"], expect_pass=True)
assert "create or replace table" in logs
results, logs = run_dbt_and_capture(["--debug", "run"], expect_pass=True)
assert len(results) == 1
assert "create or replace table" in logs
# test tests
results = run_dbt(["test"], expect_pass=True)
assert len(results) == 3
Expand Down

0 comments on commit 6f786fa

Please sign in to comment.