Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom config pe 6772 #45

Merged
merged 11 commits into from
Nov 25, 2024
1 change: 1 addition & 0 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ vars:
snowplow__dev_target_name: 'dev'
snowplow__allow_refresh: false
snowplow__session_timestamp: 'collector_tstamp'
snowplow__partition_key: 'collector_tstamp'
ilias1111 marked this conversation as resolved.
Show resolved Hide resolved
# Variables - Databricks Only
# Add the following variable to your dbt project's dbt_project.yml file
# Depending on the use case it should either be the catalog (for Unity Catalog users from databricks connector 1.1.1 onwards) or the same value as your snowplow__atomic_schema (unless changed it should be 'atomic')
Expand Down
1 change: 1 addition & 0 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ vars:
snowplow__backfill_limit_days: 2
snowplow__derived_tstamp_partitioned: false
snowplow__atomic_schema: "{{ target.schema ~ 'sp_normalize_int_test' }}"
snowplow__partition_key: "load_tstamp"

models:
snowplow_normalize_integration_tests:
Expand Down
18 changes: 9 additions & 9 deletions integration_tests/macros/test_normalize_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ It runs 9 tests:
{% macro databricks__test_normalize_events() %}

{% set expected_dict = {
"flat_cols_only" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_cols" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_cols_w_alias" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as my_alias_test_id , UNSTRUCT_EVENT_TEST_1.test_class as my_alias_test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_1_context" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_2_context" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_2_context_w_alias" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as test1_context_test_id , CONTEXTS_TEST_1[0].context_test_class as test1_context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as test2_context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as test2_context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"context_only" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"multiple_base_events" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')",
"multiple_sde_events" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test1_test_id , UNSTRUCT_EVENT_TEST_1.test_class as test1_test_class , UNSTRUCT_EVENT_TEST2_1.test_word as test2_test_word , UNSTRUCT_EVENT_TEST2_1.test_idea as test2_test_idea -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')"
"flat_cols_only" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_key') ~ ") as " ~ var('snowplow__partition_key') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_cols" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_key') ~ ") as " ~ var('snowplow__partition_key') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_cols_w_alias" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_key') ~ ") as " ~ var('snowplow__partition_key') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as my_alias_test_id , UNSTRUCT_EVENT_TEST_1.test_class as my_alias_test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_1_context" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_key') ~ ") as " ~ var('snowplow__partition_key') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_2_context" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_key') ~ ") as " ~ var('snowplow__partition_key') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_2_context_w_alias" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_key') ~ ") as " ~ var('snowplow__partition_key') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as test1_context_test_id , CONTEXTS_TEST_1[0].context_test_class as test1_context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as test2_context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as test2_context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"context_only" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_key') ~ ") as " ~ var('snowplow__partition_key') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"multiple_base_events" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_key') ~ ") as " ~ var('snowplow__partition_key') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')",
"multiple_sde_events" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_key') ~ ") as " ~ var('snowplow__partition_key') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test1_test_id , UNSTRUCT_EVENT_TEST_1.test_class as test1_test_class , UNSTRUCT_EVENT_TEST2_1.test_word as test2_test_word , UNSTRUCT_EVENT_TEST2_1.test_idea as test2_test_idea -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')"
} %}

{% set results_dict ={
Expand Down
16 changes: 16 additions & 0 deletions macros/databricks_partition.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{% macro databricks_partition() %}
{{ return(adapter.dispatch('databricks_partition', 'snowplow_normalize')()) }}
{% endmacro %}

{% macro default__databricks_partition() %}


{% set databricks_partition = var('snowplow__partition_key')~"_date" %}

-- Log the databricks_partition

{{ log("Using databricks_partition: " ~ databricks_partition , info=True )}}

{{ return(databricks_partition) }}

{% endmacro %}
2 changes: 1 addition & 1 deletion macros/normalize_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ select
event_id
, collector_tstamp
{% if target.type in ['databricks', 'spark'] -%}
, DATE(collector_tstamp) as collector_tstamp_date
, DATE({{var("snowplow__partition_key")}}) as {{var("snowplow__partition_key")}}_date
{%- endif %}
-- Flat columns from event table
{% if flat_cols|length > 0 %}
Expand Down
16 changes: 8 additions & 8 deletions utils/snowplow_normalize_model_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_key"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={{
"field": "collector_tstamp",
"field": var("snowplow__partition_key"),
"data_type": "timestamp"
}}, databricks_val='collector_tstamp_date'),
}}, databricks_val=databricks_partition()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={{
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down Expand Up @@ -251,11 +251,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "unique_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_key"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={{
"field": "collector_tstamp",
"field": var("snowplow__partition_key"),
"data_type": "timestamp"
}}, databricks_val='collector_tstamp_date'),
}}, databricks_val=databricks_partition()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={{
'delta.autoOptimize.optimizeWrite' : 'true',
Expand All @@ -270,9 +270,9 @@
filtered_model_content += f"""
select
event_id
, collector_tstamp
, {{{{var("snowplow__partition_key")}}}}
{{% if target.type in ['databricks', 'spark'] -%}}
, DATE(collector_tstamp) as collector_tstamp_date
, DATE({{{{var("snowplow__partition_key")}}}}) as {{{{var("snowplow__partition_key")}}}}_date
{{%- endif %}}
, event_name
, '{model}' as event_table_name
Expand Down
6 changes: 3 additions & 3 deletions utils/tests/expected/custom_table_name2_1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_key"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
"field": "collector_tstamp",
"field": var("snowplow__partition_key"),
"data_type": "timestamp"
}, databricks_val='collector_tstamp_date'),
}, databricks_val=databricks_partition()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down
6 changes: 3 additions & 3 deletions utils/tests/expected/custom_table_name3_2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_key"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
"field": "collector_tstamp",
"field": var("snowplow__partition_key"),
"data_type": "timestamp"
}, databricks_val='collector_tstamp_date'),
}, databricks_val=databricks_partition()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down
6 changes: 3 additions & 3 deletions utils/tests/expected/custom_table_name4_1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_key"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
"field": "collector_tstamp",
"field": var("snowplow__partition_key"),
"data_type": "timestamp"
}, databricks_val='collector_tstamp_date'),
}, databricks_val=databricks_partition()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down
6 changes: 3 additions & 3 deletions utils/tests/expected/custom_table_name5_9.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_key"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
"field": "collector_tstamp",
"field": var("snowplow__partition_key"),
"data_type": "timestamp"
}, databricks_val='collector_tstamp_date'),
}, databricks_val=databricks_partition()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down
6 changes: 3 additions & 3 deletions utils/tests/expected/custom_table_name6_6.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_key"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
"field": "collector_tstamp",
"field": var("snowplow__partition_key"),
"data_type": "timestamp"
}, databricks_val='collector_tstamp_date'),
}, databricks_val=databricks_partition()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down
6 changes: 3 additions & 3 deletions utils/tests/expected/custom_table_name7_6.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_key"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
"field": "collector_tstamp",
"field": var("snowplow__partition_key"),
"data_type": "timestamp"
}, databricks_val='collector_tstamp_date'),
}, databricks_val=databricks_partition()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down
6 changes: 3 additions & 3 deletions utils/tests/expected/event_name1_1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
tags = "snowplow_normalize_incremental",
materialized = "incremental",
unique_key = "event_id",
upsert_date_key = "collector_tstamp",
upsert_date_key = var("snowplow__partition_key"),
partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
"field": "collector_tstamp",
"field": var("snowplow__partition_key"),
"data_type": "timestamp"
}, databricks_val='collector_tstamp_date'),
}, databricks_val=databricks_partition()),
sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')),
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
Expand Down
Loading
Loading