Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/make-duckdb-connection-configurable #672

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `datacontract test --output-format junit --output TEST-datacontract.xml` Export CLI test results
to a file, in a standard format (e.g. JUnit) to improve CI/CD experience (#650)
- `dbt` & `dbt-sources` export formats now support the optional `--server` flag to adapt the DBT column `data_type` to specific SQL dialects
- Duckdb Connections are now configurable, when used as Python library (#666)

### Changed

Expand Down
6 changes: 5 additions & 1 deletion datacontract/data_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
if typing.TYPE_CHECKING:
from pyspark.sql import SparkSession

from duckdb.duckdb import DuckDBPyConnection

from datacontract.breaking.breaking import (
info_breaking_changes,
models_breaking_changes,
Expand Down Expand Up @@ -39,6 +41,7 @@ def __init__(
server: str = None,
publish_url: str = None,
spark: "SparkSession" = None,
duckdb_connection: DuckDBPyConnection = None,
inline_definitions: bool = True,
inline_quality: bool = True,
ssl_verification: bool = True,
Expand All @@ -50,6 +53,7 @@ def __init__(
self._server = server
self._publish_url = publish_url
self._spark = spark
self._duckdb_connection = duckdb_connection
self._inline_definitions = inline_definitions
self._inline_quality = inline_quality
self._ssl_verification = ssl_verification
Expand Down Expand Up @@ -146,7 +150,7 @@ def test(self) -> Run:
inline_quality=self._inline_quality,
)

execute_data_contract_test(data_contract, run, self._server, self._spark)
execute_data_contract_test(data_contract, run, self._server, self._spark, self._duckdb_connection)

except DataContractException as e:
run.checks.append(
Expand Down
5 changes: 4 additions & 1 deletion datacontract/engines/data_contract_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import typing

from duckdb.duckdb import DuckDBPyConnection

from datacontract.engines.data_contract_checks import create_checks

if typing.TYPE_CHECKING:
Expand All @@ -20,6 +22,7 @@ def execute_data_contract_test(
run: Run,
server_name: str = None,
spark: "SparkSession" = None,
duckdb_connection: DuckDBPyConnection = None,
):
if data_contract_specification.models is None or len(data_contract_specification.models) == 0:
raise DataContractException(
Expand All @@ -43,7 +46,7 @@ def execute_data_contract_test(
# TODO check server credentials are complete for nicer error messages
if server.format == "json" and server.type != "kafka":
check_jsonschema(run, data_contract_specification, server)
check_soda_execute(run, data_contract_specification, server, spark)
check_soda_execute(run, data_contract_specification, server, spark, duckdb_connection)


def get_server(data_contract_specification: DataContractSpecification, server_name: str = None):
Expand Down
16 changes: 14 additions & 2 deletions datacontract/engines/soda/check_soda_execute.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import logging
import typing
import uuid

if typing.TYPE_CHECKING:
from pyspark.sql import SparkSession

from duckdb.duckdb import DuckDBPyConnection

from datacontract.engines.soda.connections.bigquery import to_bigquery_soda_configuration
from datacontract.engines.soda.connections.databricks import to_databricks_soda_configuration
from datacontract.engines.soda.connections.duckdb_connection import get_duckdb_connection
Expand All @@ -14,7 +20,13 @@
from datacontract.model.run import Check, Log, ResultEnum, Run


def check_soda_execute(run: Run, data_contract: DataContractSpecification, server: Server, spark):
def check_soda_execute(
run: Run,
data_contract: DataContractSpecification,
server: Server,
spark: "SparkSession" = None,
duckdb_connection: DuckDBPyConnection = None,
):
from soda.common.config_helper import ConfigHelper

ConfigHelper.get_instance().upsert_value("send_anonymous_usage_stats", False)
Expand All @@ -30,7 +42,7 @@ def check_soda_execute(run: Run, data_contract: DataContractSpecification, serve
if server.type in ["s3", "gcs", "azure", "local"]:
if server.format in ["json", "parquet", "csv", "delta"]:
run.log_info(f"Configuring engine soda-core to connect to {server.type} {server.format} with duckdb")
con = get_duckdb_connection(data_contract, server, run)
con = get_duckdb_connection(data_contract, server, run, duckdb_connection)
scan.add_duckdb_connection(duckdb_connection=con, data_source_name=server.type)
scan.set_data_source_name(server.type)
else:
Expand Down
14 changes: 12 additions & 2 deletions datacontract/engines/soda/connections/duckdb_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@
import duckdb

from datacontract.export.csv_type_converter import convert_to_duckdb_csv_type
from datacontract.model.data_contract_specification import DataContractSpecification, Server
from datacontract.model.run import Run


def get_duckdb_connection(data_contract, server, run: Run):
con = duckdb.connect(database=":memory:")
def get_duckdb_connection(
data_contract: DataContractSpecification,
server: Server,
run: Run,
duckdb_connection: duckdb.DuckDBPyConnection = None,
):
if duckdb_connection is None:
con = duckdb.connect(database=":memory:")
else:
con = duckdb_connection

path: str = ""
if server.type == "local":
path = server.path
Expand Down