diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index ee20b1a..c723731 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -33,7 +33,10 @@ vars: customer_id_alias: entity_uuid anonymous_customer_id_alias: anonymous_entity_uuid model_prefix: customer__ - + product_stream: + skip_stream: true + customer_id_alias: product_sku + model_prefix: product__ flags: send_anonymous_usage_stats: False diff --git a/integration_tests/models/activities/product__ordered.sql b/integration_tests/models/activities/product__ordered.sql new file mode 100644 index 0000000..a9d1cfc --- /dev/null +++ b/integration_tests/models/activities/product__ordered.sql @@ -0,0 +1,24 @@ +{% set keys = dbt_aql.cluster_keys(stream='product_stream') %} +{% if target.name == 'bigquery' %} + {% set cluster_keys = keys.cluster_by %} + {% set partition_keys = keys.partition_by %} +{# TODO: remove snowflake-specific implementation once localstack bugs are fixed #} +{% elif target.name == 'snowflake' %} + {% set cluster_keys = none %} + {% set partition_keys = none %} +{% else %} + {% set cluster_keys = keys %} + {% set partition_keys = '' %} +{% endif %} + +{{ config( + cluster_by=cluster_keys, + partition_by=partition_keys, + stream='product_stream' +) }} + +with base as ( + select * + from {{ ref('product_ordered') }} +) +{{ dbt_aql.build_activity('base', null_columns=['revenue_impact', 'link'])}} diff --git a/integration_tests/seeds/activities/product_ordered.csv b/integration_tests/seeds/activities/product_ordered.csv new file mode 100644 index 0000000..9dd490f --- /dev/null +++ b/integration_tests/seeds/activities/product_ordered.csv @@ -0,0 +1,5 @@ +ts,product_sku +2022-01-02 22:10:11,1 +2022-01-05 22:10:11,4 +2022-01-08 22:10:11,7 +2022-01-11 22:10:11,10 diff --git a/macros/activity_schema/activity/generate_activity_yml.sql b/macros/activity_schema/activity/generate_activity_yml.sql index 4aa5a05..f2fa6b4 100644 --- a/macros/activity_schema/activity/generate_activity_yml.sql +++ b/macros/activity_schema/activity/generate_activity_yml.sql @@ -11,14 +11,12 @@ -- Get column descriptions and tests {% macro get_column_descriptions(activity) %} {% set stream = get_activity_config(activity).stream %} - {% set schema_columns = dbt_activity_schema.schema_columns() %} - {% set customer_column = dbt_activity_schema.customer_column(stream) %} - {% set anonymous_customer_column = dbt_activity_schema.anonymous_customer_column(stream) %} + {% set schema_columns = dbt_activity_schema.schema_columns(stream) %} {% set columns = [ {'name': 'activity_id', 'description': 'Unique identifier for the activity.', 'data_type': type_string(), 'tests': ['unique', 'not_null']}, - {'name': 'customer_id', 'description': 'Identifier for the entity.', 'data_type': type_string()}, - {'name': 'anonymous_customer_id', 'description': 'Anonymous identifier for the entity.', 'data_type': type_string(), 'tests': ['not_null']}, + {'name': 'customer', 'description': 'Identifier for the entity.', 'data_type': type_string()}, + {'name': 'anonymous_customer_id', 'description': 'Anonymous identifier for the entity.', 'data_type': type_string()}, {'name': 'activity', 'description': 'Type of activity performed.', 'data_type': type_string(), 'tests': ['not_null']}, {'name': 'ts', 'description': 'Timestamp of when the activity occurred.', 'data_type': type_timestamp(), 'tests': ['not_null']}, {'name': 'revenue_impact', 'description': 'Revenue impact of the activity, if applicable.', 'data_type': type_int()}, @@ -28,8 +26,8 @@ {'name': 'activity_repeated_at', 'description': 'Timestamp of when the activity was repeated, if applicable.', 'data_type': type_timestamp()} ] %} - -- Remove unused columns - {%- if anonymous_customer_column is none -%} + -- Remove optional columns (anonymous_customer_id, revenue_impact, link), when these are not used + {%- if schema_columns.anonymous_customer_id is not defined -%} {%- set columns = columns | rejectattr("name", "equalto", "anonymous_customer_id") | list -%} {%- endif -%} @@ -40,7 +38,7 @@ {%- if schema_columns.revenue_impact is not defined -%} {%- set columns = columns | rejectattr("name", "equalto", "revenue_impact") | list -%} {%- endif -%} - + -- Update column names based on schema_columns {% for column in columns %} {% if column.name in schema_columns %}