Skip to content

Commit

Permalink
Merge pull request #54 from bcodell/bryce/bug-support-anonymous-id
Browse files Browse the repository at this point in the history
Bug: Support `anonymous_customer_id`
  • Loading branch information
bcodell authored Dec 29, 2023
2 parents 6dafaa8 + c7d0c0f commit 14c1fe1
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 23 deletions.
12 changes: 5 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
name: CI
on:
workflow_call:
paths-ignore:
- 'README.md'
secrets:
gcp_keyfile:
required: true
Expand Down Expand Up @@ -57,7 +55,7 @@ jobs:
localstack start -d
cd ./integration_tests
sed -i 's/skip_stream: true/skip_stream: false/' dbt_project.yml
dbt build -x --target snowflake
dbt build -x --target snowflake --exclude dataset__select_all_aggregate_all
env:
DBT_PROFILES_DIR: . # Use integration_tests/profiles.yml
DEBUG: 1
Expand All @@ -66,11 +64,10 @@ jobs:
id: snowflake_ci_skip_stream
if: github.repository == 'bcodell/dbt-aql'
run: |
localstack stop
localstack start -d
cd ./integration_tests
sed -i 's/skip_stream: false/skip_stream: true/' dbt_project.yml
dbt build -x --target snowflake
dbt run -s activities+ -x --target snowflake --exclude dataset__select_all_aggregate_all
dbt test -s activities+ -x --target snowflake --exclude dataset__select_all_aggregate_all
env:
DBT_PROFILES_DIR: . # Use integration_tests/profiles.yml
DEBUG: 1
Expand All @@ -95,7 +92,8 @@ jobs:
sed -i 's/skip_stream: false/skip_stream: true/' dbt_project.yml
echo $GCP_KEYFILE > $GCP_KEYFILE_PATH
ls -l
dbt build -s activities+ -x --target bigquery --exclude config.materialized:seed
dbt run -s activities+ -x --target bigquery
dbt test -s activities+ -x --target bigquery
env:
GCP_KEYFILE_PATH: ./gcp_keyfile.json
DBT_PROFILES_DIR: . # Use integration_tests/profiles.yml
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ name: CI - Pull Request
on:
pull_request:
branches: [ main ]
paths-ignore:
- 'README.md'

jobs:
CI:
Expand Down
5 changes: 0 additions & 5 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ vars:
anonymous_customer_id_alias: anonymous_entity_uuid
model_prefix: customer__

quoting:
database: "{{ true if target.name == 'snowflake' else false }}"
schema: "{{ true if target.name == 'snowflake' else false }}"
identifier: "{{ true if target.name == 'snowflake' else false }}"


seeds:
dbt_aql_integration_tests:
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/macros/listagg_delimiter.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro default__listagg_delimiter() %}
{%- do return(dbt.string_literal(",")) -%}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ using customer_stream
select all signed_up (
activity_id as activity_id,
entity_uuid as customer_id,
anonymous_entity_uuid as anonymous_customer_id,
ts as signed_up_at
)
aggregate all visited_page (
count(activity_id) as pages_visited_all
count(activity_id) as pages_visited_all,
listagg(referrer_url) as referrer_urls,
listagg_distinct(referrer_url) as distinct_referrer_urls
)
aggregate all bought_something(
sum(total_items_purchased) as total_items_purchased_all,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
activity_id,customer_id,signed_up_at,pages_visited_all,total_items_purchased_all,total_sales_all,total_purchases_all
5a5e6cf0133a3faa73356ff291e26471,1,2022-01-02 22:10:11,3,11,600,3
574b51c586b446456ac8da4b504b4d27,4,2022-01-05 22:10:11,3,8,680,3
f77c4be94da1197fb6e8864f59b85dd2,7,2022-01-08 22:10:11,3,5,270,3
22ef1cea8e5cb3dc154fd727b1776a6e,10,2022-01-11 22:10:11,3,22,1650,3
activity_id,customer_id,anonymous_customer_id,signed_up_at,pages_visited_all,referrer_urls,distinct_referrer_urls,total_items_purchased_all,total_sales_all,total_purchases_all
5a5e6cf0133a3faa73356ff291e26471,1,,2022-01-02 22:10:11,3,"bing.com,google.com,yahoo.com","bing.com,google.com,yahoo.com",11,600,3
f77c4be94da1197fb6e8864f59b85dd2,7,,2022-01-08 22:10:11,3,"gmail.com,google.com,yahoo.com","gmail.com,google.com,yahoo.com",5,270,3
22ef1cea8e5cb3dc154fd727b1776a6e,10,,2022-01-11 22:10:11,3,"bing.com,gmail.com,google.com","bing.com,gmail.com,google.com",22,1650,3
574b51c586b446456ac8da4b504b4d27,4,,2022-01-05 22:10:11,3,"bing.com,gmail.com,yahoo.com","bing.com,gmail.com,yahoo.com",8,680,3
7 changes: 7 additions & 0 deletions integration_tests/seeds/datasets/seed_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ seeds:
last_before_total_items_purchased: integer
last_before_bought_something_at: "{{ 'datetime' if target.name == 'bigquery' else 'timestamp' }}"

- name: output__select_all_aggregate_all
config:
column_types:
anonymous_customer_id: "{{ 'string' if target.name == 'bigquery' else 'text' }}"
last_before_total_items_purchased: integer
last_before_bought_something_at: "{{ 'datetime' if target.name == 'bigquery' else 'timestamp' }}"

- name: output__select_all_append_nth_before
config:
column_types:
Expand Down
8 changes: 8 additions & 0 deletions macros/activity_schema/activity/build_activity.sql
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,19 @@ select
{% endif %}
, {{ dbt_aql.build_json(data_types) }} as {{columns.feature_json}}
, row_number() over (
{% if columns.anonymous_customer_id is not defined or 'anonymous_customer_id' in null_columns %}
partition by {{columns.customer}}
{% else %}
partition by coalesce({{columns.customer}}, {{columns.anonymous_customer_id}})
{% endif %}
order by {{columns.ts}}, {{surrogate_key_statement}}
) as activity_occurrence
, lead(cast({{columns.ts}} as {{dbt.type_timestamp()}})) over (
{% if columns.anonymous_customer_id is not defined or 'anonymous_customer_id' in null_columns %}
partition by {{columns.customer}}
{% else %}
partition by coalesce({{columns.customer}}, {{columns.anonymous_customer_id}})
{% endif %}
order by {{columns.ts}}, {{surrogate_key_statement}}
) as activity_repeated_at
from {{cte}}
Expand Down
6 changes: 5 additions & 1 deletion macros/activity_schema/dataset/aggregations/_helpers.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{% macro _listagg_delimiter() %}
{% macro listagg_delimiter() %}
{{ return(adapter.dispatch("listagg_delimiter", "dbt_aql")())}}
{% endmacro %}

{% macro default__listagg_delimiter() %}
{%- do return(dbt.string_literal("\n")) -%}
{% endmacro %}

Expand Down
14 changes: 13 additions & 1 deletion macros/activity_schema/dataset/aggregations/listagg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,17 @@
{% endmacro %}

{% macro default__aggfunc_listagg(column) %}
listagg({{ column.column_sql }}, {{dbt_aql._listagg_delimiter()}})
nullif(string_agg({{ column.column_sql }}, {{dbt_aql.listagg_delimiter()}} order by {{ column.column_sql }}), '')
{% endmacro %}

{% macro duckdb__aggfunc_listagg(column) %}
nullif(string_agg({{ column.column_sql }}, {{dbt_aql.listagg_delimiter()}} order by {{ column.column_sql }}), '')
{% endmacro %}

{% macro snowflake__aggfunc_listagg(column) %}
nullif(listagg({{ column.column_sql }}, {{dbt_aql.listagg_delimiter()}}) within group (order by {{ column.column_sql }}), '')
{% endmacro %}

{% macro redshift__aggfunc_listagg(column) %}
nullif(listagg({{ column.column_sql }}, {{dbt_aql.listagg_delimiter()}}), '')
{% endmacro %}
14 changes: 13 additions & 1 deletion macros/activity_schema/dataset/aggregations/listagg_distinct.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,17 @@
{% endmacro %}

{% macro default__aggfunc_listagg_distinct(column) %}
listagg(distinct {{ column.column_sql }}, {{dbt_aql._listagg_delimiter()}})
nullif(string_agg(distinct {{ column.column_sql }}, {{dbt_aql.listagg_delimiter()}} order by {{ column.column_sql }}), '')
{% endmacro %}

{% macro duckdb__aggfunc_listagg_distinct(column) %}
nullif(string_agg(distinct {{ column.column_sql }}, {{dbt_aql.listagg_delimiter()}} order by {{ column.column_sql }}), '')
{% endmacro %}

{% macro snowflake__aggfunc_listagg_distinct(column) %}
nullif(listagg(distinct {{ column.column_sql }}, {{dbt_aql.listagg_delimiter()}}) within group (order by {{ column.column_sql }}), '')
{% endmacro %}

{% macro redshift__aggfunc_listagg_distinct(column) %}
nullif(listagg(distinct {{ column.column_sql }}, {{dbt_aql.listagg_delimiter()}}), '')
{% endmacro %}
23 changes: 23 additions & 0 deletions macros/activity_schema/dataset/dataset.sql
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,19 @@ with
{%- endif %}
{%- endfor %}
row_number() over (
{% if columns.anonymous_customer_id is not defined %}
partition by {{primary}}.{{columns.customer}}
{% else %}
partition by coalesce({{primary}}.{{columns.customer}}, {{primary}}.{{columns.anonymous_customer_id}})
{% endif %}
order by {{primary}}.{{columns.ts}}, {{primary}}.{{columns.activity_id}}
) as {{columns.activity_occurrence}},
lead({{columns.ts}}) over (
{% if columns.anonymous_customer_id is not defined %}
partition by {{primary}}.{{columns.customer}}
{% else %}
partition by coalesce({{primary}}.{{columns.customer}}, {{primary}}.{{columns.anonymous_customer_id}})
{% endif %}
order by {{primary}}.{{columns.ts}}, {{primary}}.{{columns.activity_id}}
) as {{columns.activity_repeated_at}}
{% if not skip_stream %}
Expand All @@ -121,6 +129,9 @@ with
{{primary}}.{{columns.ts}} as {{req}}{{columns.ts}},
{{primary}}.{{columns.activity_occurrence}} as {{req}}{{columns.activity_occurrence}},
{{primary}}.{{columns.activity_repeated_at}} as {{req}}{{columns.activity_repeated_at}},
{% if columns.anonymous_customer_id is defined %}
{{primary}}.{{columns.anonymous_customer_id}} as {{req}}{{columns.anonymous_customer_id}},
{% endif %}
{%- for column in primary_activity.columns %}
{{ dbt_aql.select_column(stream, primary, column).column_sql }} as {{column.alias}}{% if not loop.last -%},{%- endif -%}
{%- endfor %}
Expand All @@ -143,11 +154,19 @@ with
{%- endif %}
{%- endfor %}
row_number() over (
{% if columns.anonymous_customer_id is not defined %}
partition by {{joined}}.{{columns.customer}}
{% else %}
partition by coalesce({{joined}}.{{columns.customer}}, {{joined}}.{{columns.anonymous_customer_id}})
{% endif %}
order by {{joined}}.{{columns.ts}}, {{joined}}.{{columns.activity_id}}
) as {{columns.activity_occurrence}},
lead({{columns.ts}}) over (
{% if columns.anonymous_customer_id is not defined %}
partition by {{joined}}.{{columns.customer}}
{% else %}
partition by coalesce({{joined}}.{{columns.customer}}, {{joined}}.{{columns.anonymous_customer_id}})
{% endif %}
order by {{joined}}.{{columns.ts}}, {{joined}}.{{columns.activity_id}}
) as {{columns.activity_repeated_at}}
{% if not skip_stream %}
Expand Down Expand Up @@ -183,7 +202,11 @@ with
{%- if ja.relationship_clause is not none %}
and {{ ja.relationship_clause }}
{%- endif %}
{% if columns.anonymous_customer_id is defined %}
and coalesce({{primary}}.{{req}}{{columns.customer}}, {{primary}}.{{req}}{{columns.anonymous_customer_id}}) = coalesce({{joined}}.{{columns.customer}}, {{joined}}.{{columns.anonymous_customer_id}})
{% else %}
and {{primary}}.{{req}}{{columns.customer}} = {{joined}}.{{columns.customer}}
{% endif %}
and {{ ja.join_clause }}
{%- if ja.extra_joins is not none %}
{%- for ej in ja.extra_joins %}
Expand Down

0 comments on commit 14c1fe1

Please sign in to comment.