diff --git a/.circleci/config.yml b/.circleci/config.yml index cf1dc578..276c94de 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -42,6 +42,17 @@ jobs: - store_artifacts: path: ./logs + integration-sqlserver: + docker: + - image: dataders/pyodbc:1.2 + steps: + - checkout + - run: + name: "Run Tests - sqlserver" + command: ./run_test.sh sqlserver + - store_artifacts: + path: ./logs + workflows: version: 2 @@ -50,3 +61,4 @@ workflows: - integration-redshift - integration-snowflake - integration-databricks + - integration-sqlserver diff --git a/README.md b/README.md index fb158abd..8cf90d79 100644 --- a/README.md +++ b/README.md @@ -16,9 +16,12 @@ $ dbt run-operation stage_external_sources --vars 'ext_full_refresh: true' ![sample docs](etc/sample_docs.png) -The macros assume that you have already created an external stage (Snowflake) -or external schema (Redshift/Spectrum), and that you have permissions to select from it -and create tables in it. +The macros assume that you have already: +- created either: + - an external stage (Snowflake), + - external schema (Redshift/Spectrum), or + - and external data source and file format (Synapse); and that you +- have permissions to select from it and create tables in it. The `stage_external_sources` macro accepts a similar node selection syntax to [snapshotting source freshness](https://docs.getdbt.com/docs/running-a-dbt-project/command-line-interface/source/#specifying-sources-to-snapshot). @@ -57,7 +60,12 @@ sources: table_properties: # Hive specification options: # Hive specification header: 'TRUE' - + # ------ SYNAPSE ------ + data_source: # External Data Source Name + reject_type: + reject_value: + ansi_nulls: + quoted_identifier: # Snowflake: create an empty table + pipe instead of an external table snowpipe: auto_ingest: true @@ -124,3 +132,4 @@ as a dbt source and stage-ready external table in Snowflake and Spectrum. * Redshift (Spectrum) * Snowflake * Spark +* Synapse diff --git a/integration_tests/ci/sample.profiles.yml b/integration_tests/ci/sample.profiles.yml index ef1de3c7..22425688 100644 --- a/integration_tests/ci/sample.profiles.yml +++ b/integration_tests/ci/sample.profiles.yml @@ -40,3 +40,16 @@ integration_tests: endpoint: "{{ env_var('DBT_DATABRICKS_ENDPOINT') }}" token: "{{ env_var('DBT_DATABRICKS_TOKEN') }}" schema: dbt_external_tables_integration_tests_databricks + + sqlserver: + type: sqlserver + driver: "ODBC Driver 17 for SQL Server" + port: 1433 + host: "{{ env_var('DBT_SYNAPSE_SERVER') }}" + database: "{{ env_var('DBT_SYNAPSE_DB') }}" + username: "{{ env_var('DBT_SYNAPSE_UID') }}" + password: "{{ env_var('DBT_SYNAPSE_PWD') }}" + schema: dbt_external_tables_integration_tests_synapse + encrypt: 'yes' + trust_cert: 'yes' + threads: 1 \ No newline at end of file diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 2c57ce68..9a3afb6d 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -30,6 +30,8 @@ sources: +enabled: "{{ target.type == 'snowflake' }}" spark_external: +enabled: "{{ target.type == 'spark' }}" + sqlserver_external: + +enabled: "{{ target.type == 'sqlserver' }}" seeds: quote_columns: false diff --git a/integration_tests/macros/dbt_utils_tsql.sql b/integration_tests/macros/dbt_utils_tsql.sql new file mode 100644 index 00000000..65c750d7 --- /dev/null +++ b/integration_tests/macros/dbt_utils_tsql.sql @@ -0,0 +1,107 @@ +{% macro test_sqlserver__equal_rowcount(model) %} + +{% set compare_model = kwargs.get('compare_model', kwargs.get('arg')) %} + +{#-- Prevent querying of db in parsing mode. This works because this macro does not create any new refs. #} +{%- if not execute -%} + {{ return('') }} +{% endif %} + +with a as ( + + select count(*) as count_a from {{ model.include(database=False) }} + +), +b as ( + + select count(*) as count_b from {{ compare_model.include(database=False) }} + +), +final as ( + + select abs( + (select count_a from a) - + (select count_b from b) + ) + as diff_count + +) + +select diff_count from final + +{% endmacro %} + + +{% macro test_sqlserver__equality(model) %} + + +{#-- Prevent querying of db in parsing mode. This works because this macro does not create any new refs. #} +{%- if not execute -%} + {{ return('') }} +{% endif %} + +-- setup +{%- do dbt_utils._is_relation(model, 'test_equality') -%} + +{#- +If the compare_cols arg is provided, we can run this test without querying the +information schema — this allows the model to be an ephemeral model +-#} +{%- if not kwargs.get('compare_columns', None) -%} + {%- do dbt_utils._is_ephemeral(model, 'test_equality') -%} +{%- endif -%} + +{% set compare_model = kwargs.get('compare_model', kwargs.get('arg')) %} +{% set compare_columns = kwargs.get('compare_columns', adapter.get_columns_in_relation(model) | map(attribute='quoted') ) %} +{% set compare_cols_csv = compare_columns | join(', ') %} + +with a as ( + + select * from {{ model.include(database=False) }} + +), + +b as ( + + select * from {{ compare_model.include(database=False) }} + +), + +a_minus_b as ( + + select {{compare_cols_csv}} from a + {{ dbt_utils.except() }} + select {{compare_cols_csv}} from b + +), + +b_minus_a as ( + + select {{compare_cols_csv}} from b + {{ dbt_utils.except() }} + select {{compare_cols_csv}} from a + +), + +unioned as ( + + select * from a_minus_b + union all + select * from b_minus_a + +), + +final as ( + + select (select count(*) from unioned) + + (select abs( + (select count(*) from a_minus_b) - + (select count(*) from b_minus_a) + )) + as count + +) + +select count from final + +{% endmacro %} \ No newline at end of file diff --git a/integration_tests/macros/prep_external.sql b/integration_tests/macros/prep_external.sql index ef895c15..c48fa3da 100644 --- a/integration_tests/macros/prep_external.sql +++ b/integration_tests/macros/prep_external.sql @@ -43,3 +43,41 @@ {% do run_query(create_external_stage) %} {% endmacro %} + +{% macro sqlserver__prep_external() %} + + {% set external_data_source = target.schema ~ '.dbt_external_tables_testing' %} + + {% set create_external_data_source %} + IF EXISTS ( SELECT * FROM sys.external_data_sources WHERE name = '{{external_data_source}}' ) + DROP EXTERNAL DATA SOURCE [{{external_data_source}}]; + + CREATE EXTERNAL DATA SOURCE [{{external_data_source}}] WITH ( + TYPE = HADOOP, + LOCATION = 'wasbs://dbt-external-tables-testing@dbtsynapselake.blob.core.windows.net' + ) + {% endset %} + + {% set external_file_format = target.schema ~ '.dbt_external_ff_testing' %} + + {% set create_external_file_format %} + IF EXISTS ( SELECT * FROM sys.external_file_formats WHERE name = '{{external_file_format}}' ) + DROP EXTERNAL FILE FORMAT [{{external_file_format}}]; + + CREATE EXTERNAL FILE FORMAT [{{external_file_format}}] + WITH ( + FORMAT_TYPE = DELIMITEDTEXT, + FORMAT_OPTIONS ( + FIELD_TERMINATOR = N',', + FIRST_ROW = 2, + USE_TYPE_DEFAULT = True + ) + ) + {% endset %} + + {% do log('Creating external data source ' ~ external_data_source, info = true) %} + {% do run_query(create_external_data_source) %} + {% do log('Creating external file format ' ~ external_file_format, info = true) %} + {% do run_query(create_external_file_format) %} + +{% endmacro %} diff --git a/integration_tests/models/sqlserver.yml b/integration_tests/models/sqlserver.yml new file mode 100644 index 00000000..589b24c5 --- /dev/null +++ b/integration_tests/models/sqlserver.yml @@ -0,0 +1,99 @@ +version: 2 + +sources: + - name: sqlserver_external + schema: "{{ target.schema }}" + loader: ADLSblob + + tables: + + - name: people_csv_unpartitioned + external: &csv-people + location: '/csv' + file_format: "{{ target.schema ~ '.dbt_external_ff_testing' }}" + data_source: "{{ target.schema ~ '.dbt_external_tables_testing' }}" + reject_type: VALUE + reject_value: 0 + ansi_nulls: true + quoted_identifier: true + columns: &cols-of-the-people + - name: id + data_type: int + - name: first_name + data_type: varchar(64) + - name: last_name + data_type: varchar(64) + - name: email + data_type: varchar(64) + tests: &equal-to-the-people + - sqlserver__equality: + compare_model: ref('people') + compare_columns: + - id + - first_name + - last_name + - email + + - name: people_csv_partitioned + external: + <<: *csv-people + # SYNAPSE DOES NOT DO PARTITIONS + # (BUT WE COULD MAKE A WORKAROUND !!!) + # partitions: &parts-of-the-people + # - name: section + # data_type: varchar + # expression: "substr(split_part(metadata$filename, 'section=', 2), 1, 1)" + columns: *cols-of-the-people + # tests: *equal-to-the-people + + # JSON IS NOT SUPPORTED BY SYNAPSE ATM + + # - name: people_json_unpartitioned + # external: &json-people + # location: '@{{ target.schema }}.dbt_external_tables_testing/json' + # file_format: '( type = json )' + # columns: *cols-of-the-people + # tests: *equal-to-the-people + + # - name: people_json_partitioned + # external: + # <<: *json-people + # partitions: *parts-of-the-people + # columns: *cols-of-the-people + # tests: *equal-to-the-people + + # NO COLUMNS BREAKS THINGS CURRENTLY + # just to test syntax + # - name: people_csv_unpartitioned_no_columns + # external: *csv-people + # tests: &same-rowcount + # - sqlserver__equal_rowcount: + # compare_model: ref('people') + + # - name: people_csv_partitioned_no_columns + # external: + # <<: *csv-people + # partitions: *parts-of-the-people + # tests: *same-rowcount + + # - name: people_json_unpartitioned_no_columns + # external: *csv-people + # tests: *same-rowcount + + # - name: people_json_partitioned_no_columns + # external: + # <<: *json-people + # partitions: *parts-of-the-people + # tests: *same-rowcount + + # - name: people_json_multipartitioned_no_columns + # external: + # <<: *json-people + # partitions: + # - name: file_type + # data_type: varchar + # expression: "split_part(metadata$filename, 'section=', 1)" + # - name: section + # data_type: varchar + # expression: "substr(split_part(metadata$filename, 'section=', 2), 1, 1)" + # tests: *same-rowcount diff --git a/macros/external/create_external_table.sql b/macros/external/create_external_table.sql index a46e7a2c..324c335f 100644 --- a/macros/external/create_external_table.sql +++ b/macros/external/create_external_table.sql @@ -69,6 +69,35 @@ file_format = {{external.file_format}} {% endmacro %} +{% macro sqlserver__create_external_table(source_node) %} + + {%- set columns = source_node.columns.values() -%} + {%- set external = source_node.external -%} + + {% if external.ansi_nulls is true -%} SET ANSI_NULLS ON; {%- endif %} + {% if external.quoted_identifier is true -%} SET QUOTED_IDENTIFIER ON; {%- endif %} + + create external table {{source(source_node.source_name, source_node.name)}} ( + {% for column in columns %} + {# TODO set nullity based on schema tests?? #} + {%- set nullity = 'NULL' if 'not_null' in columns.tests else 'NOT NULL'-%} + {{adapter.quote(column.name)}} {{column.data_type}} {{nullity}} + {{- ',' if not loop.last -}} + {% endfor %} + ) + WITH ( + {% set dict = {'DATA_SOURCE': external.data_source, + 'LOCATION' : external.location, + 'FILE_FORMAT' : external.file_format, + 'REJECT_TYPE' : external.reject_type, + 'REJECT_VALUE' : external.reject_value} -%} + {%- for key, value in dict.items() %} + {{key}} = {% if key == "LOCATION" -%} '{{value}}' {%- elif key in ["DATA_SOURCE","FILE_FORMAT"] -%} [{{value}}] {%- else -%} {{value}} {%- endif -%} + {{- ',' if not loop.last -}} + {%- endfor -%} + ) +{% endmacro %} + {% macro bigquery__create_external_table(source_node) %} {{ exceptions.raise_compiler_error( "BigQuery does not support creating external tables in SQL/DDL. diff --git a/macros/external/refresh_external_table.sql b/macros/external/refresh_external_table.sql index 7d8c214d..fe50c6c3 100644 --- a/macros/external/refresh_external_table.sql +++ b/macros/external/refresh_external_table.sql @@ -80,6 +80,10 @@ {% endmacro %} +{% macro sqlserver__refresh_external_table(source_node) %} + {% do return([]) %} +{% endmacro %} + {% macro snowflake__refresh_external_table(source_node) %} {% set external = source_node.external %} diff --git a/macros/external/stage_external_sources.sql b/macros/external/stage_external_sources.sql index 1f24eb16..3057e3d9 100644 --- a/macros/external/stage_external_sources.sql +++ b/macros/external/stage_external_sources.sql @@ -71,6 +71,30 @@ {% endmacro %} +{% macro sqlserver__get_external_build_plan(source_node) %} + + {% set build_plan = [] %} + + {% set old_relation = adapter.get_relation( + database = source_node.database, + schema = source_node.schema, + identifier = source_node.identifier + ) %} + + {% set create_or_replace = (old_relation is none or var('ext_full_refresh', false)) %} + + {% if create_or_replace %} + {% set build_plan = build_plan + [ + dbt_external_tables.dropif(source_node), + dbt_external_tables.create_external_table(source_node) + ] %} + {% else %} + {% set build_plan = build_plan + dbt_external_tables.refresh_external_table(source_node) %} + {% endif %} + {% do return(build_plan) %} + +{% endmacro %} + {% macro stage_external_sources(select=none) %} {% set sources_to_stage = [] %} diff --git a/macros/helpers/sqlserver/dropif.sql b/macros/helpers/sqlserver/dropif.sql new file mode 100644 index 00000000..0a1d11b9 --- /dev/null +++ b/macros/helpers/sqlserver/dropif.sql @@ -0,0 +1,12 @@ +{% macro sqlserver__dropif(node) %} + + {% set ddl %} + if object_id ('{{source(node.source_name, node.name)}}') is not null + begin + drop external table {{source(node.source_name, node.name)}} + end + {% endset %} + + {{return(ddl)}} + +{% endmacro %} \ No newline at end of file diff --git a/run_test.sh b/run_test.sh index 2d89e6b0..27e5ac70 100755 --- a/run_test.sh +++ b/run_test.sh @@ -8,6 +8,9 @@ if [[ ! -f $VENV ]]; then if [ $1 == 'databricks' ] then pip install dbt-spark[ODBC] --upgrade + elif [ $1 == 'sqlserver' ] + then + pip install dbt-synapse --upgrade else pip install dbt --upgrade fi diff --git a/sample_sources/synapse.yml b/sample_sources/synapse.yml new file mode 100644 index 00000000..395df980 --- /dev/null +++ b/sample_sources/synapse.yml @@ -0,0 +1,58 @@ +# Creates query given below + +version: 2 + +sources: + - name: marketo + schema: source_marketo + loader: ADLSblob + tables: + - name: lead_activities + description: | + from raw DW. + external: + data_source: SynapseContainer # External Data Source name (created prior) + location: /marketing/Marketo/LeadActivities/ # path on above data source + file_format: CommaDelimited # External File Format name (created prior) + reject_type: VALUE + reject_value: 0 + ansi_nulls: true + quoted_identifier: true + + columns: + - name: id + description: unique Activity ID + data_type: int + - name: leadId + description: Lead ID + data_type: int + - name: activityDate + description: date of activity + data_type: varchar(255) + - name: activityTypeId + description: unique identifier for type of activity + data_type: int + - name: campaignId + description: Campaign under which activity took place + data_type: int + - name: primaryAttributeValueId + description: the main attribute for given activity type + data_type: int + - name: primaryAttributeValue + description: what value was taken + data_type: varchar(255) + +# SET ANSI_NULLS ON; +# SET QUOTED_IDENTIFIER ON; + +# CREATE EXTERNAL TABLE [source].[lead_activities] +# ( +# [id] [int] NOT NULL, +# [leadId] [int] NOT NULL, +# [activityDate] [varchar](255) NOT NULL, +# [activityTypeId] [int] NOT NULL, +# [campaignId] [int] NOT NULL, +# [primaryAttributeValueId] [int] NOT NULL, +# [primaryAttributeValue] [varchar](255) NOT NULL +# ) +# WITH (DATA_SOURCE = [SynapseContainer], LOCATION = N'/marketing/Marketo/LeadActivities/LeadActivities.csv', FILE_FORMAT = [CommaDelimited], REJECT_TYPE = VALUE, REJECT_VALUE = 0 );