From 1500222b70f62d930b869fe6c5068fb3fa5db0ec Mon Sep 17 00:00:00 2001 From: Doug Beatty Date: Thu, 16 Jan 2025 13:29:36 -0700 Subject: [PATCH 1/7] Functional test for hourly microbatch model --- tests/functional/microbatch/test_microbatch.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index fdbd0a219a5..81c8a6dfcb1 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -79,6 +79,11 @@ select * from {{ ref('input_model') }} """ +microbatch_hourly_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='hour', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +select * from {{ ref('input_model') }} +""" + microbatch_yearly_model_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} select * from {{ ref('input_model') }} @@ -886,6 +891,19 @@ def test_run_with_event_time(self, project): self.assert_row_count(project, "microbatch_model", 3) +class TestMicrbobatchModelsHourly(BaseMicrobatchTest): + + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_hourly_model_sql, + } + + def test_microbatch(self, project) -> None: + run_dbt(["run"]) + + class TestMicrbobatchModelsRunWithSameCurrentTime(BaseMicrobatchTest): @pytest.fixture(scope="class") From 5f2c032ca4614fded589fb719976e8f56b2463e8 Mon Sep 17 00:00:00 2001 From: Doug Beatty Date: Thu, 16 Jan 2025 14:12:05 -0700 Subject: [PATCH 2/7] Use today's date for functional test for hourly microbatch model --- tests/functional/microbatch/test_microbatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 81c8a6dfcb1..9cec18bbd7e 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -80,7 +80,7 @@ """ microbatch_hourly_model_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='hour', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='hour', begin=datetime.combine(datetime.today(), datetime.min.time())) }} select * from {{ ref('input_model') }} """ From 3804e04cb1450a2c4dd7d24f3decf55ba105aed1 Mon Sep 17 00:00:00 2001 From: Doug Beatty Date: Thu, 16 Jan 2025 16:11:26 -0700 Subject: [PATCH 3/7] Use today's date for functional test for hourly microbatch model --- tests/functional/microbatch/test_microbatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 9cec18bbd7e..c369a18fafc 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -80,7 +80,7 @@ """ microbatch_hourly_model_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='hour', begin=datetime.combine(datetime.today(), datetime.min.time())) }} +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='hour', begin=modules.datetime.combine(modules.datetime.today(), modules.datetime.min.time())) }} select * from {{ ref('input_model') }} """ From a669c1d406d044c5a5da050f1facda1991308fbb Mon Sep 17 00:00:00 2001 From: Doug Beatty Date: Thu, 16 Jan 2025 17:10:10 -0700 Subject: [PATCH 4/7] Restore to original --- tests/functional/microbatch/test_microbatch.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index c369a18fafc..fdbd0a219a5 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -79,11 +79,6 @@ select * from {{ ref('input_model') }} """ -microbatch_hourly_model_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='hour', begin=modules.datetime.combine(modules.datetime.today(), modules.datetime.min.time())) }} -select * from {{ ref('input_model') }} -""" - microbatch_yearly_model_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} select * from {{ ref('input_model') }} @@ -891,19 +886,6 @@ def test_run_with_event_time(self, project): self.assert_row_count(project, "microbatch_model", 3) -class TestMicrbobatchModelsHourly(BaseMicrobatchTest): - - @pytest.fixture(scope="class") - def models(self): - return { - "input_model.sql": input_model_sql, - "microbatch_model.sql": microbatch_hourly_model_sql, - } - - def test_microbatch(self, project) -> None: - run_dbt(["run"]) - - class TestMicrbobatchModelsRunWithSameCurrentTime(BaseMicrobatchTest): @pytest.fixture(scope="class") From 85e0e32f401162995b38a0770198b0c8213794c0 Mon Sep 17 00:00:00 2001 From: Doug Beatty Date: Thu, 16 Jan 2025 17:47:08 -0700 Subject: [PATCH 5/7] Only use alphanumeric characters within batch ids --- .../materializations/incremental/microbatch.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index 6de6945704c..370082461e6 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -194,13 +194,21 @@ def truncate_timestamp(timestamp: datetime, batch_size: BatchSize) -> datetime: @staticmethod def batch_id(start_time: datetime, batch_size: BatchSize) -> str: - return MicrobatchBuilder.format_batch_start(start_time, batch_size).replace("-", "") + return MicrobatchBuilder.format_batch_start(start_time, batch_size) @staticmethod def format_batch_start(batch_start: datetime, batch_size: BatchSize) -> str: - return str( - batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start - ) + """Format the passed in datetime based on the batch_size. + + 2024-09-17 16:06:00 + Batchsize.day -> 20240917 + 2024-09-17 16:06:00 + Batchsize.hour -> 20240917T16 + """ + # If we want a date only + if batch_size != BatchSize.hour: + return batch_start.strftime("%Y%m%d") + + # If we want date + time + return batch_start.strftime("%Y%m%dT%H") @staticmethod def ceiling_timestamp(timestamp: datetime, batch_size: BatchSize) -> datetime: From fe73096cddab20d67d0a61f57eb70ad21ab3af08 Mon Sep 17 00:00:00 2001 From: Doug Beatty Date: Thu, 16 Jan 2025 18:19:37 -0700 Subject: [PATCH 6/7] Add tests for batch_id and change expected output for format_batch_start --- .../incremental/test_microbatch.py | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/tests/unit/materializations/incremental/test_microbatch.py b/tests/unit/materializations/incremental/test_microbatch.py index 3d827a79975..06519d4994c 100644 --- a/tests/unit/materializations/incremental/test_microbatch.py +++ b/tests/unit/materializations/incremental/test_microbatch.py @@ -602,13 +602,25 @@ def test_offset_timestamp(self, timestamp, batch_size, offset, expected_timestam def test_truncate_timestamp(self, timestamp, batch_size, expected_timestamp): assert MicrobatchBuilder.truncate_timestamp(timestamp, batch_size) == expected_timestamp + @pytest.mark.parametrize( + "batch_size,start_time,expected_formatted_start_time", + [ + (BatchSize.year, datetime(2020, 1, 1, 1), "20200101"), + (BatchSize.month, datetime(2020, 1, 1, 1), "20200101"), + (BatchSize.day, datetime(2020, 1, 1, 1), "20200101"), + (BatchSize.hour, datetime(2020, 1, 1, 1), "20200101T01"), + ], + ) + def test_batch_id(self, batch_size, start_time, expected_formatted_start_time): + assert MicrobatchBuilder.batch_id(start_time, batch_size) == expected_formatted_start_time + @pytest.mark.parametrize( "batch_size,batch_start,expected_formatted_batch_start", [ - (BatchSize.year, datetime(2020, 1, 1, 1), "2020-01-01"), - (BatchSize.month, datetime(2020, 1, 1, 1), "2020-01-01"), - (BatchSize.day, datetime(2020, 1, 1, 1), "2020-01-01"), - (BatchSize.hour, datetime(2020, 1, 1, 1), "2020-01-01 01:00:00"), + (BatchSize.year, datetime(2020, 1, 1, 1), "20200101"), + (BatchSize.month, datetime(2020, 1, 1, 1), "20200101"), + (BatchSize.day, datetime(2020, 1, 1, 1), "20200101"), + (BatchSize.hour, datetime(2020, 1, 1, 1), "20200101T01"), ], ) def test_format_batch_start(self, batch_size, batch_start, expected_formatted_batch_start): From 65a1db0048211c47f9edce99c2aae727d04122d3 Mon Sep 17 00:00:00 2001 From: Doug Beatty Date: Thu, 16 Jan 2025 18:38:32 -0700 Subject: [PATCH 7/7] Handle missing batch_start --- tests/unit/materializations/incremental/test_microbatch.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/materializations/incremental/test_microbatch.py b/tests/unit/materializations/incremental/test_microbatch.py index 06519d4994c..ed5415c0f0b 100644 --- a/tests/unit/materializations/incremental/test_microbatch.py +++ b/tests/unit/materializations/incremental/test_microbatch.py @@ -605,6 +605,7 @@ def test_truncate_timestamp(self, timestamp, batch_size, expected_timestamp): @pytest.mark.parametrize( "batch_size,start_time,expected_formatted_start_time", [ + (BatchSize.year, None, ""), (BatchSize.year, datetime(2020, 1, 1, 1), "20200101"), (BatchSize.month, datetime(2020, 1, 1, 1), "20200101"), (BatchSize.day, datetime(2020, 1, 1, 1), "20200101"), @@ -617,6 +618,7 @@ def test_batch_id(self, batch_size, start_time, expected_formatted_start_time): @pytest.mark.parametrize( "batch_size,batch_start,expected_formatted_batch_start", [ + (BatchSize.year, None, ""), (BatchSize.year, datetime(2020, 1, 1, 1), "20200101"), (BatchSize.month, datetime(2020, 1, 1, 1), "20200101"), (BatchSize.day, datetime(2020, 1, 1, 1), "20200101"),