Skip to content

Commit

Permalink
Create an offset_timestamp separate from MicrobatchBuilder
Browse files Browse the repository at this point in the history
The `MicrobatchBuilder.offset_timestamp` _truncates_ the timestamp before
offsetting it. We don't want to do that, we want to offset the "raw" timestamp.
We could have split renamed the microbatch builder function name to
`truncate_and_offset_timestamp` and separated the offset logic into a separate
abstract function. However, the offset logic in the MicrobatchBuilder context
depends on the truncation. We might later on be able to refactor the Microbatch
provided function by instead truncating _after_ offsetting instead of before.
But that is out of scope for this initial work, and we should instead revisit it
later.
  • Loading branch information
QMalcolm committed Jan 28, 2025
1 parent a0faf1e commit b04ac97
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 9 deletions.
6 changes: 2 additions & 4 deletions core/dbt/cli/option_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

from dbt.artifacts.resources.types import BatchSize
from dbt.config.utils import normalize_warn_error_options, parse_cli_yaml_string
from dbt.event_time.event_time import offset_timestamp
from dbt.event_time.sample_window import SampleWindow
from dbt.events import ALL_EVENT_NAMES
from dbt.exceptions import OptionNotYamlDictError, ValidationError
from dbt.materializations.incremental.microbatch import MicrobatchBuilder
from dbt_common.exceptions import DbtValidationError
from dbt_common.helper_types import WarnErrorOptions

Expand Down Expand Up @@ -134,9 +134,7 @@ def convert(self, value, param, ctx):
ctx,
)

start = MicrobatchBuilder.offset_timestamp(
timestamp=end, batch_size=batch_size, offset=-1 * lookback
)
start = offset_timestamp(timestamp=end, batch_size=batch_size, offset=-1 * lookback)

return SampleWindow(start=start, end=end)
else:
Expand Down
40 changes: 40 additions & 0 deletions core/dbt/event_time/event_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from datetime import datetime

from dateutil.relativedelta import relativedelta

from dbt.artifacts.resources.types import BatchSize
from dbt_common.exceptions import DbtRuntimeError


def offset_timestamp(timestamp=datetime, batch_size=BatchSize, offset=int) -> datetime:
"""Offsets the passed in timestamp based on the batch_size and offset.
Note: THIS IS DIFFERENT FROM MicrobatchBuilder.offset_timestamp. That function first
`truncates` the timestamp, and then does delta addition subtraction from there. This
function _doesn't_ truncate the timestamp and uses `relativedelta` for specific edge
case handling (months, years), which may produce different results than the delta math
done in `MicrobatchBuilder.offset_timestamp`
Examples
2024-09-17 16:06:00 + Batchsize.hour -1 -> 2024-09-17 15:06:00
2024-09-17 16:06:00 + Batchsize.hour +1 -> 2024-09-17 17:06:00
2024-09-17 16:06:00 + Batchsize.day -1 -> 2024-09-16 16:06:00
2024-09-17 16:06:00 + Batchsize.day +1 -> 2024-09-18 16:06:00
2024-09-17 16:06:00 + Batchsize.month -1 -> 2024-08-17 16:06:00
2024-09-17 16:06:00 + Batchsize.month +1 -> 2024-10-17 16:06:00
2024-09-17 16:06:00 + Batchsize.year -1 -> 2023-09-17 16:06:00
2024-09-17 16:06:00 + Batchsize.year +1 -> 2025-09-17 16:06:00
2024-01-31 16:06:00 + Batchsize.month +1 -> 2024-02-29 16:06:00
2024-02-29 16:06:00 + Batchsize.year +1 -> 2025-02-28 16:06:00
"""

if batch_size == BatchSize.hour:
return timestamp + relativedelta(hours=offset)
elif batch_size == BatchSize.day:
return timestamp + relativedelta(days=offset)
elif batch_size == BatchSize.month:
return timestamp + relativedelta(months=offset)
elif batch_size == BatchSize.year:
return timestamp + relativedelta(years=offset)
else:
raise DbtRuntimeError(f"Unhandled batch_size '{batch_size}'")
21 changes: 16 additions & 5 deletions tests/functional/sample_mode/test_sample_mode.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
from datetime import datetime

import freezegun
import pytest
import pytz

from dbt.event_time.sample_window import SampleWindow
from dbt.events.types import JinjaLogInfo
from dbt.tests.util import relation_from_name, run_dbt
from tests.utils import EventCatcher

input_model_sql = """
{{ config(materialized='table', event_time='event_time') }}
select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time
select 1 as id, TIMESTAMP '2020-01-01 11:25:00-0' as event_time
UNION ALL
select 2 as id, TIMESTAMP '2025-01-01 00:00:00-0' as event_time
select 2 as id, TIMESTAMP '2025-01-01 13:47:00-0' as event_time
UNION ALL
select 3 as id, TIMESTAMP '2025-01-02 00:00:00-0' as event_time
select 3 as id, TIMESTAMP '2025-01-02 12:32:00-0' as event_time
"""

sample_mode_model_sql = """
{{ config(materialized='table', event_time='event_time') }}
{% if execute %}
{{ log("Sample mode: " ~ invocation_args_dict.get("sample"), info=true) }}
{{ log("Sample window: " ~ invocation_args_dict.get("sample_window"), info=true) }}
{% endif %}
SELECT * FROM {{ ref("input_model") }}
Expand Down Expand Up @@ -55,7 +60,7 @@ def event_catcher(self) -> EventCatcher:
(False, 3, False),
],
)
@freezegun.freeze_time("2025-01-03T00:00:0Z")
@freezegun.freeze_time("2025-01-03T02:03:0Z")
def test_sample_mode(
self,
project,
Expand All @@ -65,12 +70,18 @@ def test_sample_mode(
arg_value_in_jinja: bool,
):
run_args = ["run"]
expected_sample_window = None
if use_sample_mode:
run_args.extend(["--sample", "--sample-window=1 day"])
expected_sample_window = SampleWindow(
start=datetime(2025, 1, 2, 2, 3, 0, 0, tzinfo=pytz.UTC),
end=datetime(2025, 1, 3, 2, 3, 0, 0, tzinfo=pytz.UTC),
)

_ = run_dbt(run_args, callbacks=[event_catcher.catch])
assert len(event_catcher.caught_events) == 1
assert len(event_catcher.caught_events) == 2
assert event_catcher.caught_events[0].info.msg == f"Sample mode: {arg_value_in_jinja}" # type: ignore
assert event_catcher.caught_events[1].info.msg == f"Sample window: {expected_sample_window}" # type: ignore
self.assert_row_count(
project=project,
relation_name="sample_mode_model",
Expand Down
78 changes: 78 additions & 0 deletions tests/unit/event_time/test_event_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from datetime import datetime

import pytest
import pytz

from dbt.artifacts.resources.types import BatchSize
from dbt.event_time.event_time import offset_timestamp


class TestEventTime:

@pytest.mark.parametrize(
"timestamp,batch_size,offset,expected_timestamp",
[
(
datetime(2024, 9, 5, 3, 56, 1, 1, pytz.UTC),
BatchSize.year,
1,
datetime(2025, 9, 5, 3, 56, 1, 1, pytz.UTC),
),
(
datetime(2024, 9, 5, 3, 56, 1, 1, pytz.UTC),
BatchSize.year,
-1,
datetime(2023, 9, 5, 3, 56, 1, 1, pytz.UTC),
),
(
datetime(2024, 9, 5, 3, 56, 1, 1, pytz.UTC),
BatchSize.month,
1,
datetime(2024, 10, 5, 3, 56, 1, 1, pytz.UTC),
),
(
datetime(2024, 9, 5, 3, 56, 1, 1, pytz.UTC),
BatchSize.month,
-1,
datetime(2024, 8, 5, 3, 56, 1, 1, pytz.UTC),
),
(
datetime(2024, 9, 5, 3, 56, 1, 1, pytz.UTC),
BatchSize.day,
1,
datetime(2024, 9, 6, 3, 56, 1, 1, pytz.UTC),
),
(
datetime(2024, 9, 5, 3, 56, 1, 1, pytz.UTC),
BatchSize.day,
-1,
datetime(2024, 9, 4, 3, 56, 1, 1, pytz.UTC),
),
(
datetime(2024, 9, 5, 3, 56, 1, 1, pytz.UTC),
BatchSize.hour,
1,
datetime(2024, 9, 5, 4, 56, 1, 1, pytz.UTC),
),
(
datetime(2024, 9, 5, 3, 56, 1, 1, pytz.UTC),
BatchSize.hour,
-1,
datetime(2024, 9, 5, 2, 56, 1, 1, pytz.UTC),
),
(
datetime(2024, 1, 31, 16, 6, 0, 0, pytz.UTC),
BatchSize.month,
1,
datetime(2024, 2, 29, 16, 6, 0, 0, pytz.UTC),
),
(
datetime(2024, 2, 29, 16, 6, 0, 0, pytz.UTC),
BatchSize.year,
1,
datetime(2025, 2, 28, 16, 6, 0, 0, pytz.UTC),
),
],
)
def test_offset_timestamp(self, timestamp, batch_size, offset, expected_timestamp):
assert offset_timestamp(timestamp, batch_size, offset) == expected_timestamp

0 comments on commit b04ac97

Please sign in to comment.