Skip to content

Commit

Permalink
Merge pull request #48 from awoehrl/bugfix/missing_schema_columns
Browse files Browse the repository at this point in the history
  • Loading branch information
bcodell authored Jan 17, 2025
2 parents 4fb1e68 + 0d8714d commit 9fe510e
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 9 deletions.
5 changes: 4 additions & 1 deletion integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ vars:
customer_id_alias: entity_uuid
anonymous_customer_id_alias: anonymous_entity_uuid
model_prefix: customer__

product_stream:
skip_stream: true
customer_id_alias: product_sku
model_prefix: product__

flags:
send_anonymous_usage_stats: False
Expand Down
24 changes: 24 additions & 0 deletions integration_tests/models/activities/product__ordered.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{% set keys = dbt_aql.cluster_keys(stream='product_stream') %}
{% if target.name == 'bigquery' %}
{% set cluster_keys = keys.cluster_by %}
{% set partition_keys = keys.partition_by %}
{# TODO: remove snowflake-specific implementation once localstack bugs are fixed #}
{% elif target.name == 'snowflake' %}
{% set cluster_keys = none %}
{% set partition_keys = none %}
{% else %}
{% set cluster_keys = keys %}
{% set partition_keys = '' %}
{% endif %}

{{ config(
cluster_by=cluster_keys,
partition_by=partition_keys,
stream='product_stream'
) }}

with base as (
select *
from {{ ref('product_ordered') }}
)
{{ dbt_aql.build_activity('base', null_columns=['revenue_impact', 'link'])}}
5 changes: 5 additions & 0 deletions integration_tests/seeds/activities/product_ordered.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ts,product_sku
2022-01-02 22:10:11,1
2022-01-05 22:10:11,4
2022-01-08 22:10:11,7
2022-01-11 22:10:11,10
14 changes: 6 additions & 8 deletions macros/activity_schema/activity/generate_activity_yml.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
-- Get column descriptions and tests
{% macro get_column_descriptions(activity) %}
{% set stream = get_activity_config(activity).stream %}
{% set schema_columns = dbt_activity_schema.schema_columns() %}
{% set customer_column = dbt_activity_schema.customer_column(stream) %}
{% set anonymous_customer_column = dbt_activity_schema.anonymous_customer_column(stream) %}
{% set schema_columns = dbt_activity_schema.schema_columns(stream) %}

{% set columns = [
{'name': 'activity_id', 'description': 'Unique identifier for the activity.', 'data_type': type_string(), 'tests': ['unique', 'not_null']},
{'name': 'customer_id', 'description': 'Identifier for the entity.', 'data_type': type_string()},
{'name': 'anonymous_customer_id', 'description': 'Anonymous identifier for the entity.', 'data_type': type_string(), 'tests': ['not_null']},
{'name': 'customer', 'description': 'Identifier for the entity.', 'data_type': type_string()},
{'name': 'anonymous_customer_id', 'description': 'Anonymous identifier for the entity.', 'data_type': type_string()},
{'name': 'activity', 'description': 'Type of activity performed.', 'data_type': type_string(), 'tests': ['not_null']},
{'name': 'ts', 'description': 'Timestamp of when the activity occurred.', 'data_type': type_timestamp(), 'tests': ['not_null']},
{'name': 'revenue_impact', 'description': 'Revenue impact of the activity, if applicable.', 'data_type': type_int()},
Expand All @@ -28,8 +26,8 @@
{'name': 'activity_repeated_at', 'description': 'Timestamp of when the activity was repeated, if applicable.', 'data_type': type_timestamp()}
] %}

-- Remove unused columns
{%- if anonymous_customer_column is none -%}
-- Remove optional columns (anonymous_customer_id, revenue_impact, link), when these are not used
{%- if schema_columns.anonymous_customer_id is not defined -%}
{%- set columns = columns | rejectattr("name", "equalto", "anonymous_customer_id") | list -%}
{%- endif -%}

Expand All @@ -40,7 +38,7 @@
{%- if schema_columns.revenue_impact is not defined -%}
{%- set columns = columns | rejectattr("name", "equalto", "revenue_impact") | list -%}
{%- endif -%}

-- Update column names based on schema_columns
{% for column in columns %}
{% if column.name in schema_columns %}
Expand Down

0 comments on commit 9fe510e

Please sign in to comment.