From dc7338ba738987c3200f58fcdc9d2355d499dedc Mon Sep 17 00:00:00 2001 From: Damian Owsianny Date: Tue, 26 Nov 2024 17:38:29 +0100 Subject: [PATCH] Add microbatch incremental strategy --- .../unreleased/Features-20241202-132727.yaml | 7 +++++ dbt/adapters/trino/impl.py | 2 +- dbt/adapters/trino/relation.py | 17 +++++++++- .../macros/materializations/incremental.sql | 31 +++++++++++++++++++ .../test_incremental_microbatch.py | 7 +++++ 5 files changed, 62 insertions(+), 2 deletions(-) create mode 100644 .changes/unreleased/Features-20241202-132727.yaml create mode 100644 tests/functional/adapter/materialization/test_incremental_microbatch.py diff --git a/.changes/unreleased/Features-20241202-132727.yaml b/.changes/unreleased/Features-20241202-132727.yaml new file mode 100644 index 00000000..f6edf9f0 --- /dev/null +++ b/.changes/unreleased/Features-20241202-132727.yaml @@ -0,0 +1,7 @@ +kind: Features +body: Microbatch incremental strategy +time: 2024-12-02T13:27:27.845398+01:00 +custom: + Author: damian3031 + Issue: "" + PR: "453" diff --git a/dbt/adapters/trino/impl.py b/dbt/adapters/trino/impl.py index 858947f9..05678b1f 100644 --- a/dbt/adapters/trino/impl.py +++ b/dbt/adapters/trino/impl.py @@ -102,4 +102,4 @@ def get_columns_in_relation(self, relation): raise def valid_incremental_strategies(self): - return ["append", "merge", "delete+insert"] + return ["append", "merge", "delete+insert", "microbatch"] diff --git a/dbt/adapters/trino/relation.py b/dbt/adapters/trino/relation.py index 0b9a83ee..dcc7da95 100644 --- a/dbt/adapters/trino/relation.py +++ b/dbt/adapters/trino/relation.py @@ -1,6 +1,6 @@ from dataclasses import dataclass, field -from dbt.adapters.base.relation import BaseRelation, Policy +from dbt.adapters.base.relation import BaseRelation, EventTimeFilter, Policy from dbt.adapters.contracts.relation import ComponentName @@ -12,3 +12,18 @@ class TrinoRelation(BaseRelation): # Overridden as Trino converts relation identifiers to lowercase def _is_exactish_match(self, field: ComponentName, value: str) -> bool: return self.path.get_lowered_part(field) == value.lower() + + # Overridden because Trino cannot compare a TIMESTAMP column with a VARCHAR literal. + def _render_event_time_filtered(self, event_time_filter: EventTimeFilter) -> str: + """ + Returns "" if start and end are both None + """ + filter = "" + if event_time_filter.start and event_time_filter.end: + filter = f"{event_time_filter.field_name} >= TIMESTAMP '{event_time_filter.start}' and {event_time_filter.field_name} < TIMESTAMP '{event_time_filter.end}'" + elif event_time_filter.start: + filter = f"{event_time_filter.field_name} >= TIMESTAMP '{event_time_filter.start}'" + elif event_time_filter.end: + filter = f"{event_time_filter.field_name} < TIMESTAMP '{event_time_filter.end}'" + + return filter diff --git a/dbt/include/trino/macros/materializations/incremental.sql b/dbt/include/trino/macros/materializations/incremental.sql index 712d458d..a30b03c8 100644 --- a/dbt/include/trino/macros/materializations/incremental.sql +++ b/dbt/include/trino/macros/materializations/incremental.sql @@ -213,3 +213,34 @@ ) {% endif %} {% endmacro %} + + +{% macro trino__get_incremental_microbatch_sql(arg_dict) %} + {%- set target = arg_dict["target_relation"] -%} + {%- set source = arg_dict["temp_relation"] -%} + {%- set dest_columns = arg_dict["dest_columns"] -%} + {%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%} + + {#-- Add additional incremental_predicates to filter for batch --#} + {% if model.config.get("__dbt_internal_microbatch_event_time_start") -%} + {% do incremental_predicates.append(model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %} + {% endif %} + {% if model.config.get("__dbt_internal_microbatch_event_time_end") -%} + {% do incremental_predicates.append(model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %} + {% endif %} + {% do arg_dict.update({'incremental_predicates': incremental_predicates}) %} + + delete from {{ target }} + where ( + {% for predicate in incremental_predicates %} + {%- if not loop.first %}and {% endif -%} {{ predicate }} + {% endfor %} + ); + + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) +{% endmacro %} diff --git a/tests/functional/adapter/materialization/test_incremental_microbatch.py b/tests/functional/adapter/materialization/test_incremental_microbatch.py new file mode 100644 index 00000000..ee2708d4 --- /dev/null +++ b/tests/functional/adapter/materialization/test_incremental_microbatch.py @@ -0,0 +1,7 @@ +import pytest +from dbt.tests.adapter.incremental.test_incremental_microbatch import BaseMicrobatch + + +@pytest.mark.iceberg +class TestTrinoMicrobatchIceberg(BaseMicrobatch): + pass