Skip to content

Commit

Permalink
Merge pull request #44 from dbt-msft/synapse_support
Browse files Browse the repository at this point in the history
Add support for Azure Synapse External Tables
  • Loading branch information
jtcohen6 authored Nov 29, 2020
2 parents f2924df + 363dbfd commit 3c322cd
Show file tree
Hide file tree
Showing 13 changed files with 414 additions and 4 deletions.
12 changes: 12 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ jobs:
- store_artifacts:
path: ./logs

integration-sqlserver:
docker:
- image: dataders/pyodbc:1.2
steps:
- checkout
- run:
name: "Run Tests - sqlserver"
command: ./run_test.sh sqlserver
- store_artifacts:
path: ./logs


workflows:
version: 2
Expand All @@ -50,3 +61,4 @@ workflows:
- integration-redshift
- integration-snowflake
- integration-databricks
- integration-sqlserver
17 changes: 13 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ $ dbt run-operation stage_external_sources --vars 'ext_full_refresh: true'

![sample docs](etc/sample_docs.png)

The macros assume that you have already created an external stage (Snowflake)
or external schema (Redshift/Spectrum), and that you have permissions to select from it
and create tables in it.
The macros assume that you have already:
- created either:
- an external stage (Snowflake),
- external schema (Redshift/Spectrum), or
- and external data source and file format (Synapse); and that you
- have permissions to select from it and create tables in it.

The `stage_external_sources` macro accepts a similar node selection syntax to
[snapshotting source freshness](https://docs.getdbt.com/docs/running-a-dbt-project/command-line-interface/source/#specifying-sources-to-snapshot).
Expand Down Expand Up @@ -57,7 +60,12 @@ sources:
table_properties: # Hive specification
options: # Hive specification
header: 'TRUE'

# ------ SYNAPSE ------
data_source: # External Data Source Name
reject_type:
reject_value:
ansi_nulls:
quoted_identifier:
# Snowflake: create an empty table + pipe instead of an external table
snowpipe:
auto_ingest: true
Expand Down Expand Up @@ -124,3 +132,4 @@ as a dbt source and stage-ready external table in Snowflake and Spectrum.
* Redshift (Spectrum)
* Snowflake
* Spark
* Synapse
13 changes: 13 additions & 0 deletions integration_tests/ci/sample.profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,16 @@ integration_tests:
endpoint: "{{ env_var('DBT_DATABRICKS_ENDPOINT') }}"
token: "{{ env_var('DBT_DATABRICKS_TOKEN') }}"
schema: dbt_external_tables_integration_tests_databricks

sqlserver:
type: sqlserver
driver: "ODBC Driver 17 for SQL Server"
port: 1433
host: "{{ env_var('DBT_SYNAPSE_SERVER') }}"
database: "{{ env_var('DBT_SYNAPSE_DB') }}"
username: "{{ env_var('DBT_SYNAPSE_UID') }}"
password: "{{ env_var('DBT_SYNAPSE_PWD') }}"
schema: dbt_external_tables_integration_tests_synapse
encrypt: 'yes'
trust_cert: 'yes'
threads: 1
2 changes: 2 additions & 0 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ sources:
+enabled: "{{ target.type == 'snowflake' }}"
spark_external:
+enabled: "{{ target.type == 'spark' }}"
sqlserver_external:
+enabled: "{{ target.type == 'sqlserver' }}"

seeds:
quote_columns: false
107 changes: 107 additions & 0 deletions integration_tests/macros/dbt_utils_tsql.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
{% macro test_sqlserver__equal_rowcount(model) %}

{% set compare_model = kwargs.get('compare_model', kwargs.get('arg')) %}

{#-- Prevent querying of db in parsing mode. This works because this macro does not create any new refs. #}
{%- if not execute -%}
{{ return('') }}
{% endif %}

with a as (

select count(*) as count_a from {{ model.include(database=False) }}

),
b as (

select count(*) as count_b from {{ compare_model.include(database=False) }}

),
final as (

select abs(
(select count_a from a) -
(select count_b from b)
)
as diff_count

)

select diff_count from final

{% endmacro %}


{% macro test_sqlserver__equality(model) %}


{#-- Prevent querying of db in parsing mode. This works because this macro does not create any new refs. #}
{%- if not execute -%}
{{ return('') }}
{% endif %}

-- setup
{%- do dbt_utils._is_relation(model, 'test_equality') -%}

{#-
If the compare_cols arg is provided, we can run this test without querying the
information schema — this allows the model to be an ephemeral model
-#}
{%- if not kwargs.get('compare_columns', None) -%}
{%- do dbt_utils._is_ephemeral(model, 'test_equality') -%}
{%- endif -%}

{% set compare_model = kwargs.get('compare_model', kwargs.get('arg')) %}
{% set compare_columns = kwargs.get('compare_columns', adapter.get_columns_in_relation(model) | map(attribute='quoted') ) %}
{% set compare_cols_csv = compare_columns | join(', ') %}

with a as (

select * from {{ model.include(database=False) }}

),

b as (

select * from {{ compare_model.include(database=False) }}

),

a_minus_b as (

select {{compare_cols_csv}} from a
{{ dbt_utils.except() }}
select {{compare_cols_csv}} from b

),

b_minus_a as (

select {{compare_cols_csv}} from b
{{ dbt_utils.except() }}
select {{compare_cols_csv}} from a

),

unioned as (

select * from a_minus_b
union all
select * from b_minus_a

),

final as (

select (select count(*) from unioned) +
(select abs(
(select count(*) from a_minus_b) -
(select count(*) from b_minus_a)
))
as count

)

select count from final

{% endmacro %}
38 changes: 38 additions & 0 deletions integration_tests/macros/prep_external.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,41 @@
{% do run_query(create_external_stage) %}

{% endmacro %}

{% macro sqlserver__prep_external() %}

{% set external_data_source = target.schema ~ '.dbt_external_tables_testing' %}

{% set create_external_data_source %}
IF EXISTS ( SELECT * FROM sys.external_data_sources WHERE name = '{{external_data_source}}' )
DROP EXTERNAL DATA SOURCE [{{external_data_source}}];

CREATE EXTERNAL DATA SOURCE [{{external_data_source}}] WITH (
TYPE = HADOOP,
LOCATION = 'wasbs://[email protected]'
)
{% endset %}

{% set external_file_format = target.schema ~ '.dbt_external_ff_testing' %}

{% set create_external_file_format %}
IF EXISTS ( SELECT * FROM sys.external_file_formats WHERE name = '{{external_file_format}}' )
DROP EXTERNAL FILE FORMAT [{{external_file_format}}];

CREATE EXTERNAL FILE FORMAT [{{external_file_format}}]
WITH (
FORMAT_TYPE = DELIMITEDTEXT,
FORMAT_OPTIONS (
FIELD_TERMINATOR = N',',
FIRST_ROW = 2,
USE_TYPE_DEFAULT = True
)
)
{% endset %}

{% do log('Creating external data source ' ~ external_data_source, info = true) %}
{% do run_query(create_external_data_source) %}
{% do log('Creating external file format ' ~ external_file_format, info = true) %}
{% do run_query(create_external_file_format) %}

{% endmacro %}
99 changes: 99 additions & 0 deletions integration_tests/models/sqlserver.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
version: 2

sources:
- name: sqlserver_external
schema: "{{ target.schema }}"
loader: ADLSblob

tables:

- name: people_csv_unpartitioned
external: &csv-people
location: '/csv'
file_format: "{{ target.schema ~ '.dbt_external_ff_testing' }}"
data_source: "{{ target.schema ~ '.dbt_external_tables_testing' }}"
reject_type: VALUE
reject_value: 0
ansi_nulls: true
quoted_identifier: true
columns: &cols-of-the-people
- name: id
data_type: int
- name: first_name
data_type: varchar(64)
- name: last_name
data_type: varchar(64)
- name: email
data_type: varchar(64)
tests: &equal-to-the-people
- sqlserver__equality:
compare_model: ref('people')
compare_columns:
- id
- first_name
- last_name
- email

- name: people_csv_partitioned
external:
<<: *csv-people
# SYNAPSE DOES NOT DO PARTITIONS
# (BUT WE COULD MAKE A WORKAROUND !!!)
# partitions: &parts-of-the-people
# - name: section
# data_type: varchar
# expression: "substr(split_part(metadata$filename, 'section=', 2), 1, 1)"
columns: *cols-of-the-people
# tests: *equal-to-the-people

# JSON IS NOT SUPPORTED BY SYNAPSE ATM

# - name: people_json_unpartitioned
# external: &json-people
# location: '@{{ target.schema }}.dbt_external_tables_testing/json'
# file_format: '( type = json )'
# columns: *cols-of-the-people
# tests: *equal-to-the-people

# - name: people_json_partitioned
# external:
# <<: *json-people
# partitions: *parts-of-the-people
# columns: *cols-of-the-people
# tests: *equal-to-the-people

# NO COLUMNS BREAKS THINGS CURRENTLY
# just to test syntax
# - name: people_csv_unpartitioned_no_columns
# external: *csv-people
# tests: &same-rowcount
# - sqlserver__equal_rowcount:
# compare_model: ref('people')

# - name: people_csv_partitioned_no_columns
# external:
# <<: *csv-people
# partitions: *parts-of-the-people
# tests: *same-rowcount

# - name: people_json_unpartitioned_no_columns
# external: *csv-people
# tests: *same-rowcount

# - name: people_json_partitioned_no_columns
# external:
# <<: *json-people
# partitions: *parts-of-the-people
# tests: *same-rowcount

# - name: people_json_multipartitioned_no_columns
# external:
# <<: *json-people
# partitions:
# - name: file_type
# data_type: varchar
# expression: "split_part(metadata$filename, 'section=', 1)"
# - name: section
# data_type: varchar
# expression: "substr(split_part(metadata$filename, 'section=', 2), 1, 1)"
# tests: *same-rowcount
29 changes: 29 additions & 0 deletions macros/external/create_external_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,35 @@
file_format = {{external.file_format}}
{% endmacro %}

{% macro sqlserver__create_external_table(source_node) %}

{%- set columns = source_node.columns.values() -%}
{%- set external = source_node.external -%}

{% if external.ansi_nulls is true -%} SET ANSI_NULLS ON; {%- endif %}
{% if external.quoted_identifier is true -%} SET QUOTED_IDENTIFIER ON; {%- endif %}

create external table {{source(source_node.source_name, source_node.name)}} (
{% for column in columns %}
{# TODO set nullity based on schema tests?? #}
{%- set nullity = 'NULL' if 'not_null' in columns.tests else 'NOT NULL'-%}
{{adapter.quote(column.name)}} {{column.data_type}} {{nullity}}
{{- ',' if not loop.last -}}
{% endfor %}
)
WITH (
{% set dict = {'DATA_SOURCE': external.data_source,
'LOCATION' : external.location,
'FILE_FORMAT' : external.file_format,
'REJECT_TYPE' : external.reject_type,
'REJECT_VALUE' : external.reject_value} -%}
{%- for key, value in dict.items() %}
{{key}} = {% if key == "LOCATION" -%} '{{value}}' {%- elif key in ["DATA_SOURCE","FILE_FORMAT"] -%} [{{value}}] {%- else -%} {{value}} {%- endif -%}
{{- ',' if not loop.last -}}
{%- endfor -%}
)
{% endmacro %}

{% macro bigquery__create_external_table(source_node) %}
{{ exceptions.raise_compiler_error(
"BigQuery does not support creating external tables in SQL/DDL.
Expand Down
4 changes: 4 additions & 0 deletions macros/external/refresh_external_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@

{% endmacro %}

{% macro sqlserver__refresh_external_table(source_node) %}
{% do return([]) %}
{% endmacro %}

{% macro snowflake__refresh_external_table(source_node) %}

{% set external = source_node.external %}
Expand Down
Loading

0 comments on commit 3c322cd

Please sign in to comment.