Skip to content

Commit

Permalink
Merge pull request #43 from bcodell/bryce/feature-skip-stream
Browse files Browse the repository at this point in the history
Feature: Skip Activity Stream
  • Loading branch information
bcodell authored Dec 19, 2023
2 parents 0fbff48 + abc4bd0 commit 2c3cc0a
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 24 deletions.
44 changes: 38 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,66 @@ jobs:
run: |
cd ./integration_tests
dbt deps
- name: dbt CI - duckdb
- name: dbt CI - duckdb - with stream
run: |
cd ./integration_tests
sed -i 's/skip_stream: true/skip_stream: false/' dbt_project.yml
dbt build -x --target duckdb
env:
GCP_KEYFILE_PATH: ./gcp_keyfile.json
DBT_PROFILES_DIR: . # Use integration_tests/profiles.yml
- name: dbt CI - snowflake
id: snowflake_ci
- name: dbt CI - duckdb - skip stream
run: |
cd ./integration_tests
sed -i 's/skip_stream: false/skip_stream: true/' dbt_project.yml
dbt build -x --target duckdb
env:
GCP_KEYFILE_PATH: ./gcp_keyfile.json
DBT_PROFILES_DIR: . # Use integration_tests/profiles.yml
- name: dbt CI - snowflake - with stream
id: snowflake_ci_with_stream
run: |
localstack extensions install localstack-extension-snowflake
localstack start -d
cd ./integration_tests
sed -i 's/skip_stream: true/skip_stream: false/' dbt_project.yml
dbt build -x --target snowflake
env:
DBT_PROFILES_DIR: . # Use integration_tests/profiles.yml
DEBUG: 1

- name: dbt CI - snowflake - skip stream
id: snowflake_ci_skip_stream
run: |
localstack stop
localstack start -d
cd ./integration_tests
sed -i 's/skip_stream: false/skip_stream: true/' dbt_project.yml
dbt build -x --target snowflake
env:
DBT_PROFILES_DIR: . # Use integration_tests/profiles.yml
DEBUG: 1
- name: localstack logs
if: failure() && steps.snowflake_ci.outcome == 'failure'
if: failure() && (steps.snowflake_ci_with_stream.outcome == 'failure' || steps.snowflake_ci_skip_stream.outcome == 'failure' )
run: localstack logs
- name: dbt CI - bigquery
- name: dbt CI - bigquery - with stream
run: |
cd ./integration_tests
sed -i 's/skip_stream: true/skip_stream: false/' dbt_project.yml
echo $GCP_KEYFILE > $GCP_KEYFILE_PATH
ls -l
dbt build -x --target bigquery
env:
GCP_KEYFILE_PATH: ./gcp_keyfile.json
DBT_PROFILES_DIR: . # Use integration_tests/profiles.yml
- name: dbt CI - bigquery - skip stream
run: |
cd ./integration_tests
sed -i 's/skip_stream: false/skip_stream: true/' dbt_project.yml
echo $GCP_KEYFILE > $GCP_KEYFILE_PATH
ls -l
dbt build -x --target bigquery
env:
GCP_KEYFILE_PATH: ./gcp_keyfile.json
DBT_PROFILES_DIR: . # Use integration_tests/profiles.yml


46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ _Description_: This variable takes a string that represents the alias that will
_Required_: Yes
</br></br>

#### **`skip_stream`**
_Description_: This variable accepts a boolean `true` or `false`. When `true`, the downstream affects are that the `build_stream` macro renders an empty table, and the `dataset` macro queries individual activity tables directly instead of querying the stream.

_Required_: No (defaults to `false`)
</br></br>

#### **`anonymous_customer_id_alias`**
_Description_: This variable takes a string that represents the alias that will be used in lieu of the standard `anonymous_customer_id` column in the Activity Schema spec, which represents the anonymous ID of the entity represented by the stream.
> **Note: This is an optional column according to the Activity Schema v2 Spec. If this variable is excluded, then the Activity and Activity Stream models for the corresponding Activity Schema will exclude `anonymous_customer_id` as a column.**
Expand Down Expand Up @@ -151,6 +157,42 @@ models:
```

**Note: It is highly recommended to use the first option, as it better supports with cross-database functionality.**

### Additional Configuration - Skipping Activity Stream
If the Activity Schema being built is configured to skip the activity stream, then each activity table should be appropriately clustered/partitioned. A convenience macro called `cluster_keys` is provided by this package to simplify this process. Example implementations of this configuration for supported warehouses are provided below - keep in mind that the previously referenced config parameters (`stream`, `data_types`) are still required:
```sql
-- Snowflake
{{
config(
cluster_by=dbt_aql.cluster_keys()
)
}}
-- Bigquery
{{
config(
partition_by=dbt_aql.cluster_keys('stream_name').partition_by,
cluster_by=dbt_aql.cluster_keys('stream_name').cluster_by
)
}}
-- Redshift
{{
config(
sort=dbt_aql.cluster_keys().sort,
dist=dbt_aql.cluster_keys().dist
)
}}
-- DuckDB
{{
config(
options={"partition_by": dbt_aql.cluster_keys()}
)
}}
```

**Note: The `cluster_keys` macro requires the name of the activity's associated stream to be passed as an input argument only when using Bigquery.**
</br></br>


Expand Down Expand Up @@ -214,6 +256,8 @@ For each activity the macro will generate a model entry in yaml format containin
# **Streams**
Each Activity Schema should have exactly 1 stream model. The model should be the name of the stream that is registered in the `streams` variable in `dbt_project.yml`.
**Note: This model must exist with activity model dependencies defined appropriately, regardless of how the `skip_stream` parameter is configured for the stream. This requirement is due to the way dbt determines model dependencies when parsing a project.**
## **Configuration**
In order to maximize computational performance when querying a stream in downstream models, it is recommended to use the clustering/partition keys as recommended by the spec. A convenience macro has been provided with adapter-specific implementations to make this easy for users. To use, simply pass the `dbt_aql.cluster_keys()` to the appropriate argument in the model config block.
Expand Down Expand Up @@ -264,6 +308,8 @@ Each Activity model should look similar to the following:
) }}
```
The `build_stream` macro is a convenience function that takes as input a list of `ref`'d activity models and creates a unioned representation of them. If this model's materialization is specified as incremental, then for each activity, it will only insert rows with a `ts` value greater than the maximum `ts` value in the stream table for that activity.

**Note: When `skip_stream` is configured as `true` for the stream, this macro produces an empty table.**
</br></br>

# **Creating Datasets: Querying The Activity Stream with `aql`**
Expand Down
1 change: 1 addition & 0 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ vars:
feature_json: attributes
streams:
customer_stream:
skip_stream: false
customer_id_alias: entity_uuid
anonymous_customer_id_alias: anonymous_entity_uuid
model_prefix: customer__
Expand Down
15 changes: 15 additions & 0 deletions integration_tests/models/activities/customer__bought_something.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
{% set keys = dbt_aql.cluster_keys(stream='customer_stream') %}
{% if target.name == 'bigquery' %}
{% set cluster_keys = keys.cluster_by %}
{% set partition_keys = keys.partition_by %}
{# TODO: remove snowflake-specific implementation once localstack bugs are fixed #}
{% elif target.name == 'snowflake' %}
{% set cluster_keys = none %}
{% set partition_keys = none %}
{% else %}
{% set cluster_keys = keys %}
{% set partition_keys = '' %}
{% endif %}

{{ config(
cluster_by=cluster_keys,
partition_by=partition_keys,
data_types={
'total_sales': dbt.type_int(),
'total_items_purchased': dbt.type_int(),
Expand Down
19 changes: 18 additions & 1 deletion integration_tests/models/activities/customer__signed_up.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
{{ config(stream='customer_stream') }}
{% set keys = dbt_aql.cluster_keys(stream='customer_stream') %}
{% if target.name == 'bigquery' %}
{% set cluster_keys = keys.cluster_by %}
{% set partition_keys = keys.partition_by %}
{# TODO: remove snowflake-specific implementation once localstack bugs are fixed #}
{% elif target.name == 'snowflake' %}
{% set cluster_keys = none %}
{% set partition_keys = none %}
{% else %}
{% set cluster_keys = keys %}
{% set partition_keys = '' %}
{% endif %}

{{ config(
cluster_by=cluster_keys,
partition_by=partition_keys,
stream='customer_stream'
) }}

with base as (
select *
Expand Down
15 changes: 15 additions & 0 deletions integration_tests/models/activities/customer__visited_page.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
{% set keys = dbt_aql.cluster_keys(stream='customer_stream') %}
{% if target.name == 'bigquery' %}
{% set cluster_keys = keys.cluster_by %}
{% set partition_keys = keys.partition_by %}
{# TODO: remove snowflake-specific implementation once localstack bugs are fixed #}
{% elif target.name == 'snowflake' %}
{% set cluster_keys = none %}
{% set partition_keys = none %}
{% else %}
{% set cluster_keys = keys %}
{% set partition_keys = '' %}
{% endif %}

{{ config(
cluster_by=cluster_keys,
partition_by=partition_keys,
data_types={
'referrer_url': dbt.type_string(),
},
Expand Down
8 changes: 8 additions & 0 deletions macros/activity_schema/activity/activity.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,17 @@ aql query in model '{{ model.unique_id }}' has invalid syntax. Parsed invalid re
{%- else -%}
{%- set relationship_clause = none -%}
{%- endif -%}
{%- set model_prefix = dbt_aql.get_model_prefix(stream) -%}
{%- if modules.re.search(model_prefix, activity_name) is none -%}
{%- set model_name = model_prefix~activity_name -%}
{%- else -%}
{%- set model_name = activity_name -%}
{%- endif -%}


{%- do return(namespace(
name="activity",
model_name=model_name,
verb=verb,
relationship_selector=relationship_selector,
join_condition=join_condition,
Expand Down
46 changes: 40 additions & 6 deletions macros/activity_schema/dataset/dataset.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
{%- set jc = dbt_aql._join_conditions() -%}
{%- set parsed_query = dbt_aql.parse_aql(aql) -%}
{%- set stream = parsed_query.stream -%}
{%- set skip_stream = var("dbt_aql").get("streams", {}).get(stream, {}).get("skip_stream", false) | as_bool -%}
{%- set columns = dbt_aql.schema_columns() -%}
{%- do columns.update({"customer": dbt_aql.customer_column(stream)}) -%}
{%- if dbt_aql.anonymous_customer_column(stream) is not none -%}
Expand All @@ -31,6 +32,12 @@
{% endfor %}

-- depends_on: {{ ref(stream) }}
-- depends_on: {{ ref(primary_activity.model_name) }}

{% for ja in joined_activities %}
-- depends_on: {{ ref(ja.model_name) }}
{% endfor %}

{% for ic in included_columns %}
{% set ic_stripped = ic.strip() %}
{% if modules.re.search(model_prefix, ic_stripped) is none %}
Expand All @@ -41,6 +48,7 @@
{% endif %}
{% endfor %}


{% set stream_relation = ref(stream) %}

{%- set ja_dict = {} -%}
Expand Down Expand Up @@ -94,8 +102,15 @@ with
partition by {{primary}}.{{columns.customer}}
order by {{primary}}.{{columns.ts}}, {{primary}}.{{columns.activity_id}}
) as {{columns.activity_repeated_at}}
{% if not skip_stream %}
from {{ stream_relation }} {{primary}}
where {{primary}}.{{columns.activity}} = {{dbt_aql.clean_activity_name(stream, primary_activity.activity_name)}}
{% else %}
from {{ ref(primary_activity.model_name) }} {{primary}}
{% endif %}
where true
{% if not skip_stream %}
and {{primary}}.{{columns.activity}} = {{dbt_aql.clean_activity_name(stream, primary_activity.activity_name)}}
{% endif %}
{% for f in primary_activity.filters %}
{%- set f_formatted = f.format(primary=primary, joined=joined, **columns) %}
and {{f_formatted}}
Expand All @@ -113,11 +128,16 @@ with
{%- for column in primary_activity.columns %}
{{ dbt_aql.select_column(stream, primary, column).column_sql }} as {{column.alias}}{% if not loop.last -%},{%- endif -%}
{%- endfor %}
from {% if primary_activity.filters is none %}{{ stream_relation }}{% else %}{{primary_activity_alias}}{{fs}}{% endif %} as {{primary}}
where {{primary}}.{{columns.activity}} = {{dbt_aql.clean_activity_name(stream, primary_activity.activity_name)}}
from {% if primary_activity.filters is none %}{% if not skip_stream %}{{ stream_relation }}{% else %}{{ ref(primary_activity.model_name) }}{% endif %}{% else %}{{primary_activity_alias}}{{fs}}{% endif %} as {{primary}}
where true
{% if not skip_stream %}
and {{primary}}.{{columns.activity}} = {{dbt_aql.clean_activity_name(stream, primary_activity.activity_name)}}
{% endif %}
and {{ primary_activity.relationship_clause }}
){% if joined_activities|length > 0 %},{% endif %}
{% for ja in joined_activities %}

{# cte below only applies to filtered append activities since activity occurrence and next activity need to be recomputed for use in the join #}
{% if ja.filters is not none and ja.verb == av.append %}
{{dbt_aql.alias_activity(ja, loop.index)}}{{fs}} as (
select
Expand All @@ -134,8 +154,15 @@ with
partition by {{joined}}.{{columns.customer}}
order by {{joined}}.{{columns.ts}}, {{joined}}.{{columns.activity_id}}
) as {{columns.activity_repeated_at}}
{% if not skip_stream %}
from {{ stream_relation }} {{joined}}
where {{joined}}.{{columns.activity}} = {{dbt_aql.clean_activity_name(stream, ja.activity_name)}}
{% else %}
from {{ ref(ja.model_name) }} {{joined}}
{% endif %}
where true
{% if not skip_stream %}
and {{joined}}.{{columns.activity}} = {{dbt_aql.clean_activity_name(stream, ja.activity_name)}}
{% endif %}
{% for f in ja.filters %}
{%- set f_formatted = f.format(primary=primary, joined=joined, **columns) %}
and {{f_formatted}}
Expand All @@ -151,9 +178,12 @@ with
{{ column.aggfunc(parsed_col) }} as {{ column.alias }}{% if not loop.last -%},{%- endif -%}
{%- endfor %}
from {{primary_activity_alias}} as {{primary}}
left join {% if ja.filters is not none and ja.verb == av.append %}{{dbt_aql.alias_activity(ja, loop.index)}}{{fs}}{% else %}{{ stream_relation }}{% endif %} {{joined}}
left join {% if ja.filters is not none and ja.verb == av.append %}{{dbt_aql.alias_activity(ja, loop.index)}}{{fs}}{% else %}{% if not skip_stream %}{{ stream_relation }}{% else %}{{ ref(ja.model_name) }}{% endif %}{% endif %} {{joined}}
-- filter joined activity first to improve query performance
on {{joined}}.{{columns.activity}} = {{dbt_aql.clean_activity_name(stream, ja.activity_name)}}
on true
{% if not skip_stream %}
and {{joined}}.{{columns.activity}} = {{dbt_aql.clean_activity_name(stream, ja.activity_name)}}
{% endif %}
{%- if ja.relationship_clause is not none %}
and {{ ja.relationship_clause }}
{%- endif %}
Expand All @@ -180,7 +210,11 @@ with
{%- set parsed_col = dbt_aql.select_column(stream, joined, column) -%}
{{ column.aggfunc(parsed_col) }} as {{ column.alias }}{% if not loop.last %},{% endif %}
{%- endfor %}
{% if not skip_stream %}
from {{ ref(stream) }} {{joined}}
{% else %}
from {{ ref(ja.model_name) }} {{joined}}
{% endif %}
where {{joined}}.{{columns.activity}} = {{dbt_aql.clean_activity_name(stream, ja.activity_name)}}
{% if ja.filters is not none %}
{%- for f in ja.filters %}
Expand Down
11 changes: 11 additions & 0 deletions macros/activity_schema/dataset/dataset_column/dataset_column.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,22 @@
{% endmacro %}

{% macro default__dataset_column(aql) %}
{%- set av = dbt_aql._activity_verbs() -%}
{%- set query_no_comments = dbt_aql._strip_comments(aql) -%}
{%- set query_clean = dbt_aql._clean_query(query_no_comments) -%}
{%- set using, rest = dbt_aql._parse_keyword(query_clean, ["using"]) -%}
{%- set stream, rest = dbt_aql._parse_stream(rest) -%}
{%- set activity, rest = dbt_aql._parse_activity(rest, stream, [av.append, av.aggregate]) -%}

-- depends_on: {{ ref(stream) }}

{%- set model_prefix = dbt_aql.get_model_prefix(stream) -%}
{% if modules.re.search(model_prefix, activity.activity_name) is none %}
{% set m = model_prefix~activity.activity_name %}
-- depends_on: {{ ref(m) }}
{% else %}
-- depends_on: {{ ref(primary_activity.activity_name) }}
{% endif %}


{% endmacro %}
Loading

0 comments on commit 2c3cc0a

Please sign in to comment.