Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Macro-based dataset generation #56

Merged
merged 7 commits into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
DBT_PROFILES_DIR: . # Use integration_tests/profiles.yml
- name: dbt CI - snowflake - with stream
id: snowflake_ci_with_stream
if: github.repository == 'bcodell/dbt-aql'
if: false
run: |
localstack extensions install localstack-extension-snowflake
localstack start -d
Expand All @@ -62,7 +62,7 @@ jobs:

- name: dbt CI - snowflake - skip stream
id: snowflake_ci_skip_stream
if: github.repository == 'bcodell/dbt-aql'
if: false
run: |
cd ./integration_tests
sed -i 's/skip_stream: false/skip_stream: true/' dbt_project.yml
Expand Down
78 changes: 70 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ Given all of these shortcomings, It wouldn't be surprising if developers could w

This project offers a middle ground. With aql, developers can derive their datasets from Activity Streams in a way that is less tedious than writing traditional SQL, easier to learn and read than a convoluted macro, and above all, it _feels_ like writing SQL (with a few new keywords).

That said, proposing (yet) a(nother) new DSL is treacherous territory; there's plenty of literature in staunch opposition of anything other than SQL ([see one oldie-but-goodie example here](https://erikbern.com/2018/08/30/i-dont-want-to-learn-your-garbage-query-language.html)). And that's fair - SQL is well-known and malleable. But as we've highlighted, writing SQL against an activity stream in this modeling paradigm is tedious, redundant, and parameterizable. So this package offers a macro-based interface for generating queries in line with the Activity Schema spec. And for the purists reading this, writing bespoke SQL is also option! That's what dbt was made for anyway 😅

# **Inspiration**
Numerous projects influenced the design of this one. Such projects include:
- [Activity Schema Spec](https://github.com/ActivitySchema/ActivitySchema)
Expand Down Expand Up @@ -321,21 +323,78 @@ The `build_stream` macro is a convenience function that takes as input a list of
**Note: When `skip_stream` is configured as `true` for the stream, this macro produces an empty table.**
</br></br>

# **Creating Datasets: Querying The Activity Stream with `aql`**
# Creating Datasets
The Activity Stream is the entrypoint for creating any denormalized dataset needed for analysis or reporting. However, as discussed above, writing bespoke SQL to query it in the standard pattern is tedious and error-prone, and using a macro to auto-generate the SQL is brittle and a subpar development experience.

To solve these shortcomings, this project introduces `aql` (Activity Query Language) - an interface that's easier to learn and more SQL-esque than a macro but substantially more concise than bespoke SQL.

Under the hood, this package will parse the `aql` query string into a json object, then use the object parameters to render the appropriate SQL statement.
</br></br>
To solve these shortcomings, this project introduces two solutions - a dbt macro, and an experimental DSL called `aql` (Activity Query Language) - an interface that's easier to learn and more SQL-esque than a macro but substantially more concise than bespoke SQL.

## **The Anatomy of an `aql` Query**
As a refresher, Activity Stream queries require the following inputs to be defined:
* The Activity Stream table to use
* A Primary Activity with columns to include. The Primary Activity defines the granularity of the dataset.
* 0 or more Joined Activities, with Temporal Join criteria defined and columns to include, with specific aggregation functions to use (if applicable) for each column.

All of these inputs are still needed, and they can be viewed in the following example `aql` query:
All of these inputs are still needed, and they can be viewed in the following examples - via the `query_stream` macro and via an `aql` query:

# **Creating Datasets Option 1: The `query_stream` Macro**
For a macro-based approach to producing datasets, use the following syntax:
```sql
{{ dbt_aql.query_stream(
stream='stream_1',
primary_activity=dbt_aql.primary_activity(
activity='activity_1',
columns=[
dbt_aql.dc(
column_name='ts',
alias='activity_1_at',
),
dbt_aql.dc(
column_name='feature_2'
),
dbt_aql.dc(
column_name='feature_3',
),
],
filters=[] -- optional (empty list is default)
),
joined_activities=[
dbt_aql.appended_activity(
activity='activity_2',
relationship_selector='first', -- only needed for appended activities
join_condition='ever',
columns=[
dbt_aql.dc(
column_name='ts',
alias='first_ever_activity_2_at'
)
]
),
dbt_aql.aggregated_activity(
activity='activity_3',
join_condition='after',
columns=[
dbt_aql.dc(
column_name='activity_id',
aggfunc='count',
alias='count_activity_3_after'
),
dbt_aql.dc(
column_name='feature_x',
aggfunc='sum',
alias='sum_feature_x_after'
)
],
filter_columns=[], -- optional (empty list is default)
extra_joins=[], -- optional (empty list is default)
)
],
included_columns=[] -- optional (empty list is default)
)}}
```
It's long, verbose, and not very readable, but this macro will produce a full sql query and return a dataset. Implementation specifics (including arguments requirements) coming soon.

## **Creating Datasets pt. 2: Querying The Activity Stream with `aql`**
Under the hood, this package will parse the `aql` query string into a json object, then use the object parameters to render the appropriate SQL statement:

```sql
using stream_1
select all activity_1 (
Expand All @@ -351,6 +410,7 @@ aggregate after activity_3 (
sum(feature_x) as sum_feature_x_after
)
```
The above statement will produce an identical dataset as the prior macro-based example in a much more concise and readable format.

To use in a dbt model, assign the aql query to a variable, and pass it to the `dataset` macro like so:
```sql
Expand All @@ -373,7 +433,7 @@ aggregate after activity_3 (
{{ dbt_aql.dataset(aql) }}
```

Implementation details are provided below.
Syntax details are provided below.
</br></br>

## **Activity Stream**
Expand Down Expand Up @@ -522,6 +582,8 @@ Each `<column>` value must correspond to the alias of one of the standard Activi
Each included column can optionally be aliased to a whitespace-free friendly name `as <alias>`, where `<alias>` is the name of the column that will be applied to the dataset. Each defined alias must be preceded by `as`. Aliasing is optional - if no alias is explicitly defined, an automated alias will be applied to the column. See more in the `alias_column` macro.
</br></br>



# **Advanced Usage**

## **Extra Join Criteria for Joined Activities**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ append first ever visited_page (
filter {{dbt_aql.json_extract('{feature_json}', 'referrer_url')}} = 'yahoo.com'
)
aggregate all bought_something (
count(activity_id) as total_large_purchases_after
count(activity_id) as total_large_purchases
filter cast(nullif({{dbt_aql.json_extract('{feature_json}', 'total_sales')}}, '') as int) > 100
filter cast(nullif({{dbt_aql.json_extract('{feature_json}', 'total_items_purchased')}}, '') as int) > 3
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
-- depends_on: {{ ref('output__filtered__query_stream_macro') }}


{{ dbt_aql.query_stream(
stream='customer_stream',
primary_activity=dbt_aql.primary_activity(
activity='customer__visited_page',
relationship_selector='first',
columns=[
dbt_aql.dc(column_name='activity_id', alias='activity_id'),
dbt_aql.dc(column_name='entity_uuid', alias='customer_id'),
dbt_aql.dc(column_name='ts', alias='first_visited_google_at'),
],
filters=[
dbt_aql.json_extract('{feature_json}', 'referrer_url')~" = 'google.com'"
]
),
joined_activities=[
dbt_aql.appended_activity(
activity='customer__visited_page',
relationship_selector='first',
join_condition='ever',
columns=[
dbt_aql.dc(column_name='ts', alias='first_visited_yahoo_at')
],
filters=[
dbt_aql.json_extract('{feature_json}', 'referrer_url')~" = 'yahoo.com'"
]
),
dbt_aql.aggregated_activity(
activity='customer__bought_something',
join_condition='all',
columns=[
dbt_aql.dc(column_name='activity_id', alias='total_large_purchases', aggfunc='count'),
],
filters=[
'cast(nullif('~dbt_aql.json_extract('{joined}.{feature_json}', 'total_sales')~", '') as int) > 100",
'cast(nullif('~dbt_aql.json_extract('{joined}.{feature_json}', 'total_items_purchased')~", '') as int) > 3"
]
)
],
included_columns=[
'customer__total_items_purchased_after'
]
) }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
-- depends_on: {{ ref('output__joined__query_stream_macro') }}


{{ dbt_aql.query_stream(
stream='customer_stream',
primary_activity=dbt_aql.primary_activity(
activity='visited_page',
relationship_selector='first',
columns=[
dbt_aql.dc(column_name='activity_id', alias='activity_id'),
dbt_aql.dc(column_name='entity_uuid', alias='customer_id'),
dbt_aql.dc(column_name='ts', alias='first_visited_google_at'),
],
filters=[
dbt_aql.json_extract('{feature_json}', 'referrer_url')~" = 'google.com'"
]
),
joined_activities=[
dbt_aql.aggregated_activity(
activity='bought_something',
join_condition='after',
columns=[
dbt_aql.dc(column_name='activity_id', alias='total_large_purchases_after', aggfunc='count')
],
extra_joins=[
'cast(nullif('~dbt_aql.json_extract('{joined}.{feature_json}', 'total_sales')~", '') as int) > 100"
]
)
],
included_columns=[
'total_items_purchased_after'
]
) }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
version: 2

models:

- name: dataset__filtered__query_stream_macro
description: A test to validate the functionality of macro-based dataset generation.
tests:
- dbt_utils.equality:
compare_model: ref("output__filtered__query_stream_macro")

- name: dataset__joined__query_stream_macro
description: A test to validate the functionality of macro-based dataset generation.
tests:
- dbt_utils.equality:
compare_model: ref("output__joined__query_stream_macro")

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
activity_id,customer_id,first_visited_google_at,first_visited_yahoo_at,total_large_purchases_after,total_items_purchased_after
activity_id,customer_id,first_visited_google_at,first_visited_yahoo_at,total_large_purchases,total_items_purchased_after
e58cfb189af4fbf30f22821af7aa9316,1,2022-01-01 22:10:11,2022-04-07 22:10:11,2,11
d5d41e942d4b0b5325741007e8814f00,10,2022-01-13 22:10:11,,2,18
fdf62f7ddcd69fc3c3dbd54bf7a34452,7,2022-04-13 22:10:11,2022-01-07 22:10:11,0,0
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
activity_id,customer_id,first_visited_google_at,first_visited_yahoo_at,total_large_purchases,total_items_purchased_after
e58cfb189af4fbf30f22821af7aa9316,1,2022-01-01 22:10:11,2022-04-07 22:10:11,2,11
d5d41e942d4b0b5325741007e8814f00,10,2022-01-13 22:10:11,,2,18
fdf62f7ddcd69fc3c3dbd54bf7a34452,7,2022-04-13 22:10:11,2022-01-07 22:10:11,0,0
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
activity_id,customer_id,first_visited_google_at,total_large_purchases_after,total_items_purchased_after
e58cfb189af4fbf30f22821af7aa9316,1,2022-01-01 22:10:11,2,11
fdf62f7ddcd69fc3c3dbd54bf7a34452,7,2022-04-13 22:10:11,0,0
d5d41e942d4b0b5325741007e8814f00,10,2022-01-13 22:10:11,2,18
1 change: 1 addition & 0 deletions macros/activity_schema/activity/activity.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ aql query in model '{{ model.unique_id }}' has invalid syntax. Parsed invalid re
{%- set model_name = model_prefix~activity_name -%}
{%- else -%}
{%- set model_name = activity_name -%}
{%- set activity_name = model_name|replace(model_prefix, '') -%}
{%- endif -%}


Expand Down
Loading