diff --git a/CHANGELOG.md b/CHANGELOG.md index d910c646..ed59a8db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `datacontract test --output-format junit --output TEST-datacontract.xml` Export CLI test results to a file, in a standard format (e.g. JUnit) to improve CI/CD experience (#650) - `dbt` & `dbt-sources` export formats now support the optional `--server` flag to adapt the DBT column `data_type` to specific SQL dialects +- Duckdb Connections are now configurable, when used as Python library (#666) ### Changed diff --git a/datacontract/data_contract.py b/datacontract/data_contract.py index 1cef9150..1ebe8126 100644 --- a/datacontract/data_contract.py +++ b/datacontract/data_contract.py @@ -4,6 +4,8 @@ if typing.TYPE_CHECKING: from pyspark.sql import SparkSession +from duckdb.duckdb import DuckDBPyConnection + from datacontract.breaking.breaking import ( info_breaking_changes, models_breaking_changes, @@ -39,6 +41,7 @@ def __init__( server: str = None, publish_url: str = None, spark: "SparkSession" = None, + duckdb_connection: DuckDBPyConnection = None, inline_definitions: bool = True, inline_quality: bool = True, ssl_verification: bool = True, @@ -50,6 +53,7 @@ def __init__( self._server = server self._publish_url = publish_url self._spark = spark + self._duckdb_connection = duckdb_connection self._inline_definitions = inline_definitions self._inline_quality = inline_quality self._ssl_verification = ssl_verification @@ -146,7 +150,7 @@ def test(self) -> Run: inline_quality=self._inline_quality, ) - execute_data_contract_test(data_contract, run, self._server, self._spark) + execute_data_contract_test(data_contract, run, self._server, self._spark, self._duckdb_connection) except DataContractException as e: run.checks.append( diff --git a/datacontract/engines/data_contract_test.py b/datacontract/engines/data_contract_test.py index cc59fee8..ae3b84b1 100644 --- a/datacontract/engines/data_contract_test.py +++ b/datacontract/engines/data_contract_test.py @@ -1,5 +1,7 @@ import typing +from duckdb.duckdb import DuckDBPyConnection + from datacontract.engines.data_contract_checks import create_checks if typing.TYPE_CHECKING: @@ -20,6 +22,7 @@ def execute_data_contract_test( run: Run, server_name: str = None, spark: "SparkSession" = None, + duckdb_connection: DuckDBPyConnection = None, ): if data_contract_specification.models is None or len(data_contract_specification.models) == 0: raise DataContractException( @@ -43,7 +46,7 @@ def execute_data_contract_test( # TODO check server credentials are complete for nicer error messages if server.format == "json" and server.type != "kafka": check_jsonschema(run, data_contract_specification, server) - check_soda_execute(run, data_contract_specification, server, spark) + check_soda_execute(run, data_contract_specification, server, spark, duckdb_connection) def get_server(data_contract_specification: DataContractSpecification, server_name: str = None): diff --git a/datacontract/engines/soda/check_soda_execute.py b/datacontract/engines/soda/check_soda_execute.py index 09ad281b..9bc42ddd 100644 --- a/datacontract/engines/soda/check_soda_execute.py +++ b/datacontract/engines/soda/check_soda_execute.py @@ -1,6 +1,12 @@ import logging +import typing import uuid +if typing.TYPE_CHECKING: + from pyspark.sql import SparkSession + +from duckdb.duckdb import DuckDBPyConnection + from datacontract.engines.soda.connections.bigquery import to_bigquery_soda_configuration from datacontract.engines.soda.connections.databricks import to_databricks_soda_configuration from datacontract.engines.soda.connections.duckdb_connection import get_duckdb_connection @@ -14,7 +20,13 @@ from datacontract.model.run import Check, Log, ResultEnum, Run -def check_soda_execute(run: Run, data_contract: DataContractSpecification, server: Server, spark): +def check_soda_execute( + run: Run, + data_contract: DataContractSpecification, + server: Server, + spark: "SparkSession" = None, + duckdb_connection: DuckDBPyConnection = None, +): from soda.common.config_helper import ConfigHelper ConfigHelper.get_instance().upsert_value("send_anonymous_usage_stats", False) @@ -30,7 +42,7 @@ def check_soda_execute(run: Run, data_contract: DataContractSpecification, serve if server.type in ["s3", "gcs", "azure", "local"]: if server.format in ["json", "parquet", "csv", "delta"]: run.log_info(f"Configuring engine soda-core to connect to {server.type} {server.format} with duckdb") - con = get_duckdb_connection(data_contract, server, run) + con = get_duckdb_connection(data_contract, server, run, duckdb_connection) scan.add_duckdb_connection(duckdb_connection=con, data_source_name=server.type) scan.set_data_source_name(server.type) else: diff --git a/datacontract/engines/soda/connections/duckdb_connection.py b/datacontract/engines/soda/connections/duckdb_connection.py index 2790e1a8..f05fce2f 100644 --- a/datacontract/engines/soda/connections/duckdb_connection.py +++ b/datacontract/engines/soda/connections/duckdb_connection.py @@ -4,11 +4,21 @@ import duckdb from datacontract.export.csv_type_converter import convert_to_duckdb_csv_type +from datacontract.model.data_contract_specification import DataContractSpecification, Server from datacontract.model.run import Run -def get_duckdb_connection(data_contract, server, run: Run): - con = duckdb.connect(database=":memory:") +def get_duckdb_connection( + data_contract: DataContractSpecification, + server: Server, + run: Run, + duckdb_connection: duckdb.DuckDBPyConnection = None, +): + if duckdb_connection is None: + con = duckdb.connect(database=":memory:") + else: + con = duckdb_connection + path: str = "" if server.type == "local": path = server.path