Skip to content

Commit

Permalink
Merge pull request #627 from elementary-data/ele-2131-freshness-first…
Browse files Browse the repository at this point in the history
…-metric-fix

ELE-2132: Freshness first metric is null
  • Loading branch information
dapollak authored Dec 10, 2023
2 parents c1784fb + e13313b commit c86801d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 7 deletions.
33 changes: 33 additions & 0 deletions integration_tests/tests/test_freshness_anomalies.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from copy import copy
from dataclasses import dataclass
from datetime import datetime, timedelta
from itertools import chain

import pytest
from data_generator import DATE_FORMAT, generate_dates
Expand Down Expand Up @@ -188,3 +189,35 @@ def test_faster_rate(
test_id, TEST_NAME, self._get_test_config(config), data=data
)
assert result["status"] == "pass"


def test_first_metric_null(test_id, dbt_project: DbtProject):
config = dict(
timestamp_column=TIMESTAMP_COLUMN,
days_back=3,
backfill_days=2,
time_bucket=dict(period="day", count=1),
sensitivity=1,
)
new_data = list(
chain.from_iterable(
[
[
{TIMESTAMP_COLUMN: datetime(2000, 1, d, h, 0).strftime(DATE_FORMAT)}
for h in range(8, 23)
]
for d in range(1, 6)
]
)
)
for i in [3, 4]:
result = dbt_project.test(
test_id,
TEST_NAME,
config,
data=new_data,
test_vars={"custom_run_started_at": datetime(2000, 1, i).isoformat()},
as_model=True,
materialization="incremental",
)
assert result["status"] == "pass"
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,25 @@
{% endmacro %}

{% macro get_timestamp_table_query(monitored_table, metric_properties, timestamp_column, table_monitors, min_bucket_start, max_bucket_end, full_table_name_str) %}
with monitored_table as (
with partially_time_filtered_monitored_table as (
select
{{ elementary.edr_cast_as_timestamp(timestamp_column) }} as {{ timestamp_column }}
{%- if metric_properties.timestamp_column and metric_properties.event_timestamp_column %}
, {{ elementary.edr_cast_as_timestamp(metric_properties.event_timestamp_column) }} as {{ metric_properties.event_timestamp_column }}
{%- endif %}
from {{ monitored_table }}
where {{ elementary.edr_cast_as_timestamp(timestamp_column) }} >= {{ elementary.edr_cast_as_timestamp(min_bucket_start) }}
-- Freshness metric calculated differences between consecutive buckets, thus the first diff
-- is always null. Therefore we let few old buckets inside the query and filter them later, just for
-- the first relevant diff not to be null
where {{ elementary.edr_cast_as_timestamp(timestamp_column) }} >= {{ elementary.edr_timeadd("day", -7, elementary.edr_cast_as_timestamp(min_bucket_start)) }}
{% if metric_properties.where_expression %} and {{ metric_properties.where_expression }} {% endif %}
),

monitored_table as (
select
*
from partially_time_filtered_monitored_table
where {{ timestamp_column }} >= {{ elementary.edr_cast_as_timestamp(min_bucket_start) }}
),
buckets as (
select edr_bucket_start, edr_bucket_end
from ({{ elementary.complete_buckets_cte(metric_properties, min_bucket_start, max_bucket_end) }}) results
Expand Down Expand Up @@ -205,7 +213,7 @@
-- get ordered consecutive update timestamps in the source data
with unique_timestamps as (
select distinct {{ elementary.edr_cast_as_timestamp(freshness_column) }} as timestamp_val
from monitored_table
from partially_time_filtered_monitored_table
order by 1
),

Expand All @@ -215,14 +223,18 @@
timestamp_val as update_timestamp,
{{ elementary.timediff('second', 'lag(timestamp_val) over (order by timestamp_val)', 'timestamp_val') }} as freshness
from unique_timestamps
where timestamp_val >= (select min(edr_bucket_start) from buckets)
),

time_filtered_consecutive_updates_freshness as (
select
*
from consecutive_updates_freshness
where update_timestamp >= (select min(edr_bucket_start) from buckets)
),
-- divide the freshness metrics above to buckets
bucketed_consecutive_updates_freshness as (
select
edr_bucket_start, edr_bucket_end, update_timestamp, freshness
from buckets cross join consecutive_updates_freshness
from buckets cross join time_filtered_consecutive_updates_freshness
where update_timestamp >= edr_bucket_start AND update_timestamp < edr_bucket_end
),

Expand Down

0 comments on commit c86801d

Please sign in to comment.