Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Azure Synapse External Tables #44

Merged
merged 30 commits into from
Nov 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ jobs:
- store_artifacts:
path: ./logs

integration-sqlserver:
docker:
- image: dataders/pyodbc:1.2
steps:
- checkout
- run:
name: "Run Tests - sqlserver"
command: ./run_test.sh sqlserver
- store_artifacts:
path: ./logs


workflows:
version: 2
Expand All @@ -50,3 +61,4 @@ workflows:
- integration-redshift
- integration-snowflake
- integration-databricks
- integration-sqlserver
17 changes: 13 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ $ dbt run-operation stage_external_sources --vars 'ext_full_refresh: true'

![sample docs](etc/sample_docs.png)

The macros assume that you have already created an external stage (Snowflake)
or external schema (Redshift/Spectrum), and that you have permissions to select from it
and create tables in it.
The macros assume that you have already:
- created either:
- an external stage (Snowflake),
- external schema (Redshift/Spectrum), or
- and external data source and file format (Synapse); and that you
- have permissions to select from it and create tables in it.

The `stage_external_sources` macro accepts a similar node selection syntax to
[snapshotting source freshness](https://docs.getdbt.com/docs/running-a-dbt-project/command-line-interface/source/#specifying-sources-to-snapshot).
Expand Down Expand Up @@ -57,7 +60,12 @@ sources:
table_properties: # Hive specification
options: # Hive specification
header: 'TRUE'

# ------ SYNAPSE ------
data_source: # External Data Source Name
reject_type:
reject_value:
ansi_nulls:
quoted_identifier:
# Snowflake: create an empty table + pipe instead of an external table
snowpipe:
auto_ingest: true
Expand Down Expand Up @@ -124,3 +132,4 @@ as a dbt source and stage-ready external table in Snowflake and Spectrum.
* Redshift (Spectrum)
* Snowflake
* Spark
* Synapse
13 changes: 13 additions & 0 deletions integration_tests/ci/sample.profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,16 @@ integration_tests:
endpoint: "{{ env_var('DBT_DATABRICKS_ENDPOINT') }}"
token: "{{ env_var('DBT_DATABRICKS_TOKEN') }}"
schema: dbt_external_tables_integration_tests_databricks

sqlserver:
type: sqlserver
driver: "ODBC Driver 17 for SQL Server"
port: 1433
host: "{{ env_var('DBT_SYNAPSE_SERVER') }}"
database: "{{ env_var('DBT_SYNAPSE_DB') }}"
username: "{{ env_var('DBT_SYNAPSE_UID') }}"
password: "{{ env_var('DBT_SYNAPSE_PWD') }}"
schema: dbt_external_tables_integration_tests_synapse
encrypt: 'yes'
trust_cert: 'yes'
threads: 1
2 changes: 2 additions & 0 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ sources:
+enabled: "{{ target.type == 'snowflake' }}"
spark_external:
+enabled: "{{ target.type == 'spark' }}"
sqlserver_external:
+enabled: "{{ target.type == 'sqlserver' }}"

seeds:
quote_columns: false
107 changes: 107 additions & 0 deletions integration_tests/macros/dbt_utils_tsql.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
{% macro test_sqlserver__equal_rowcount(model) %}

{% set compare_model = kwargs.get('compare_model', kwargs.get('arg')) %}

{#-- Prevent querying of db in parsing mode. This works because this macro does not create any new refs. #}
{%- if not execute -%}
{{ return('') }}
{% endif %}

with a as (

select count(*) as count_a from {{ model.include(database=False) }}

),
b as (

select count(*) as count_b from {{ compare_model.include(database=False) }}

),
final as (

select abs(
(select count_a from a) -
(select count_b from b)
)
as diff_count

)

select diff_count from final

{% endmacro %}


{% macro test_sqlserver__equality(model) %}


{#-- Prevent querying of db in parsing mode. This works because this macro does not create any new refs. #}
{%- if not execute -%}
{{ return('') }}
{% endif %}

-- setup
{%- do dbt_utils._is_relation(model, 'test_equality') -%}

{#-
If the compare_cols arg is provided, we can run this test without querying the
information schema — this allows the model to be an ephemeral model
-#}
{%- if not kwargs.get('compare_columns', None) -%}
{%- do dbt_utils._is_ephemeral(model, 'test_equality') -%}
{%- endif -%}

{% set compare_model = kwargs.get('compare_model', kwargs.get('arg')) %}
{% set compare_columns = kwargs.get('compare_columns', adapter.get_columns_in_relation(model) | map(attribute='quoted') ) %}
{% set compare_cols_csv = compare_columns | join(', ') %}

with a as (

select * from {{ model.include(database=False) }}

),

b as (

select * from {{ compare_model.include(database=False) }}

),

a_minus_b as (

select {{compare_cols_csv}} from a
{{ dbt_utils.except() }}
select {{compare_cols_csv}} from b

),

b_minus_a as (

select {{compare_cols_csv}} from b
{{ dbt_utils.except() }}
select {{compare_cols_csv}} from a

),

unioned as (

select * from a_minus_b
union all
select * from b_minus_a

),

final as (

select (select count(*) from unioned) +
(select abs(
(select count(*) from a_minus_b) -
(select count(*) from b_minus_a)
))
as count

)

select count from final

{% endmacro %}
38 changes: 38 additions & 0 deletions integration_tests/macros/prep_external.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,41 @@
{% do run_query(create_external_stage) %}

{% endmacro %}

{% macro sqlserver__prep_external() %}

{% set external_data_source = target.schema ~ '.dbt_external_tables_testing' %}

{% set create_external_data_source %}
IF EXISTS ( SELECT * FROM sys.external_data_sources WHERE name = '{{external_data_source}}' )
DROP EXTERNAL DATA SOURCE [{{external_data_source}}];

CREATE EXTERNAL DATA SOURCE [{{external_data_source}}] WITH (
TYPE = HADOOP,
LOCATION = 'wasbs://[email protected]'
)
{% endset %}

{% set external_file_format = target.schema ~ '.dbt_external_ff_testing' %}

{% set create_external_file_format %}
IF EXISTS ( SELECT * FROM sys.external_file_formats WHERE name = '{{external_file_format}}' )
DROP EXTERNAL FILE FORMAT [{{external_file_format}}];

CREATE EXTERNAL FILE FORMAT [{{external_file_format}}]
WITH (
FORMAT_TYPE = DELIMITEDTEXT,
FORMAT_OPTIONS (
FIELD_TERMINATOR = N',',
FIRST_ROW = 2,
USE_TYPE_DEFAULT = True
)
)
{% endset %}

{% do log('Creating external data source ' ~ external_data_source, info = true) %}
{% do run_query(create_external_data_source) %}
{% do log('Creating external file format ' ~ external_file_format, info = true) %}
{% do run_query(create_external_file_format) %}

{% endmacro %}
99 changes: 99 additions & 0 deletions integration_tests/models/sqlserver.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
version: 2

sources:
- name: sqlserver_external
schema: "{{ target.schema }}"
loader: ADLSblob

tables:

- name: people_csv_unpartitioned
external: &csv-people
location: '/csv'
file_format: "{{ target.schema ~ '.dbt_external_ff_testing' }}"
data_source: "{{ target.schema ~ '.dbt_external_tables_testing' }}"
reject_type: VALUE
reject_value: 0
ansi_nulls: true
quoted_identifier: true
columns: &cols-of-the-people
- name: id
data_type: int
- name: first_name
data_type: varchar(64)
- name: last_name
data_type: varchar(64)
- name: email
data_type: varchar(64)
tests: &equal-to-the-people
- sqlserver__equality:
compare_model: ref('people')
compare_columns:
- id
- first_name
- last_name
- email

- name: people_csv_partitioned
external:
<<: *csv-people
# SYNAPSE DOES NOT DO PARTITIONS
# (BUT WE COULD MAKE A WORKAROUND !!!)
# partitions: &parts-of-the-people
# - name: section
# data_type: varchar
# expression: "substr(split_part(metadata$filename, 'section=', 2), 1, 1)"
columns: *cols-of-the-people
# tests: *equal-to-the-people

# JSON IS NOT SUPPORTED BY SYNAPSE ATM

# - name: people_json_unpartitioned
# external: &json-people
# location: '@{{ target.schema }}.dbt_external_tables_testing/json'
# file_format: '( type = json )'
# columns: *cols-of-the-people
# tests: *equal-to-the-people

# - name: people_json_partitioned
# external:
# <<: *json-people
# partitions: *parts-of-the-people
# columns: *cols-of-the-people
# tests: *equal-to-the-people

# NO COLUMNS BREAKS THINGS CURRENTLY
# just to test syntax
# - name: people_csv_unpartitioned_no_columns
# external: *csv-people
# tests: &same-rowcount
# - sqlserver__equal_rowcount:
# compare_model: ref('people')

# - name: people_csv_partitioned_no_columns
# external:
# <<: *csv-people
# partitions: *parts-of-the-people
# tests: *same-rowcount

# - name: people_json_unpartitioned_no_columns
# external: *csv-people
# tests: *same-rowcount

# - name: people_json_partitioned_no_columns
# external:
# <<: *json-people
# partitions: *parts-of-the-people
# tests: *same-rowcount

# - name: people_json_multipartitioned_no_columns
# external:
# <<: *json-people
# partitions:
# - name: file_type
# data_type: varchar
# expression: "split_part(metadata$filename, 'section=', 1)"
# - name: section
# data_type: varchar
# expression: "substr(split_part(metadata$filename, 'section=', 2), 1, 1)"
# tests: *same-rowcount
29 changes: 29 additions & 0 deletions macros/external/create_external_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,35 @@
file_format = {{external.file_format}}
{% endmacro %}

{% macro sqlserver__create_external_table(source_node) %}

{%- set columns = source_node.columns.values() -%}
{%- set external = source_node.external -%}

{% if external.ansi_nulls is true -%} SET ANSI_NULLS ON; {%- endif %}
{% if external.quoted_identifier is true -%} SET QUOTED_IDENTIFIER ON; {%- endif %}

create external table {{source(source_node.source_name, source_node.name)}} (
{% for column in columns %}
{# TODO set nullity based on schema tests?? #}
{%- set nullity = 'NULL' if 'not_null' in columns.tests else 'NOT NULL'-%}
{{adapter.quote(column.name)}} {{column.data_type}} {{nullity}}
{{- ',' if not loop.last -}}
{% endfor %}
)
WITH (
{% set dict = {'DATA_SOURCE': external.data_source,
'LOCATION' : external.location,
'FILE_FORMAT' : external.file_format,
'REJECT_TYPE' : external.reject_type,
'REJECT_VALUE' : external.reject_value} -%}
{%- for key, value in dict.items() %}
{{key}} = {% if key == "LOCATION" -%} '{{value}}' {%- elif key in ["DATA_SOURCE","FILE_FORMAT"] -%} [{{value}}] {%- else -%} {{value}} {%- endif -%}
{{- ',' if not loop.last -}}
{%- endfor -%}
)
{% endmacro %}

{% macro bigquery__create_external_table(source_node) %}
{{ exceptions.raise_compiler_error(
"BigQuery does not support creating external tables in SQL/DDL.
Expand Down
4 changes: 4 additions & 0 deletions macros/external/refresh_external_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@

{% endmacro %}

{% macro sqlserver__refresh_external_table(source_node) %}
{% do return([]) %}
{% endmacro %}

{% macro snowflake__refresh_external_table(source_node) %}

{% set external = source_node.external %}
Expand Down
Loading