From 7f9111baaab51a5fba33686d09ccd8496874707d Mon Sep 17 00:00:00 2001 From: Aleksandar Date: Wed, 7 Feb 2024 00:03:17 +0100 Subject: [PATCH 01/13] add some questions --- .../duckdb/macros/materializations/external.sql | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/dbt/include/duckdb/macros/materializations/external.sql b/dbt/include/duckdb/macros/materializations/external.sql index 91e9b5ab..cc9172e8 100644 --- a/dbt/include/duckdb/macros/materializations/external.sql +++ b/dbt/include/duckdb/macros/materializations/external.sql @@ -7,11 +7,14 @@ {%- set read_location = adapter.external_read_location(location, rendered_options) -%} -- set language - python or sql + -- i have to learn general about python models {%- set language = model['language'] -%} {%- set target_relation = this.incorporate(type='view') %} -- Continue as normal materialization + -- Why do we have temp and intermediate relation? + -- What does load_cached_relation do? {%- set existing_relation = load_cached_relation(this) -%} {%- set temp_relation = make_intermediate_relation(this.incorporate(type='table'), suffix='__dbt_tmp') -%} {%- set intermediate_relation = make_intermediate_relation(target_relation, suffix='__dbt_int') -%} @@ -27,6 +30,8 @@ {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} -- as above, the backup_relation should not already exist {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} + + --What is grants here? -- grab current tables grants config for comparision later on {% set grant_config = config.get('grants') %} @@ -62,10 +67,11 @@ {{ adapter.rename_relation(intermediate_relation, target_relation) }} {{ run_hooks(post_hooks, inside_transaction=True) }} - + -- What is should_revoke? {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + --What does that do? {% do persist_docs(target_relation, model) %} -- `COMMIT` happens here @@ -76,9 +82,10 @@ {{ drop_relation_if_exists(temp_relation) }} -- register table into glue + --I dont know glue so can you explain me a bit about that? {%- set plugin_name = config.get('plugin') -%} {%- set glue_register = config.get('glue_register', default=false) -%} - {%- set partition_columns = config.get('partition_columns', []) -%} + {%- set partition_columns = config.get('partition_columns', []) -%} -- this one is never used? {% if plugin_name is not none or glue_register is true %} {% if glue_register %} {# legacy hack to set the glue database name, deprecate this #} @@ -89,6 +96,6 @@ {{ run_hooks(post_hooks, inside_transaction=False) }} - {{ return({'relations': [target_relation]}) }} + {{ return({'relations': [target_relation]}) }} -- to what i return? {% endmaterialization %} From 1014c825df786fc6472d00c38e54cca7c423f78c Mon Sep 17 00:00:00 2001 From: Aleksandar Date: Sat, 10 Feb 2024 17:06:48 +0000 Subject: [PATCH 02/13] add code structure --- dbt/adapters/duckdb/environments/local.py | 41 ++++++++++++------- dbt/adapters/duckdb/plugins/__init__.py | 10 ++++- dbt/adapters/duckdb/plugins/duckdb_native.py | 29 +++++++++++++ dbt/adapters/duckdb/plugins/excel.py | 3 +- dbt/adapters/duckdb/plugins/glue.py | 3 +- dbt/adapters/duckdb/plugins/sqlalchemy.py | 3 +- .../macros/materializations/external.sql | 13 ++---- .../macros/materializations/external_new.sql | 30 ++++++++++++++ dbt/include/duckdb/macros/utils/upstream.sql | 3 ++ 9 files changed, 106 insertions(+), 29 deletions(-) create mode 100644 dbt/adapters/duckdb/plugins/duckdb_native.py create mode 100644 dbt/include/duckdb/macros/materializations/external_new.sql diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index bdb297d3..8fe53b0b 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -6,7 +6,9 @@ from dbt.contracts.connection import AdapterResponse from dbt.exceptions import DbtRuntimeError +import duckdb +duckdb.sql() class DuckDBCursorWrapper: def __init__(self, cursor): self._cursor = cursor @@ -24,7 +26,7 @@ def execute(self, sql, bindings=None): except RuntimeError as e: raise DbtRuntimeError(str(e)) - + class DuckDBConnectionWrapper: def __init__(self, cursor, env): self._cursor = DuckDBCursorWrapper(cursor) @@ -132,21 +134,32 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): handle.close() def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) -> None: - if plugin_name not in self._plugins: - if plugin_name.startswith("glue|"): - from ..plugins import glue + # some plugin have to be initialized on the fly? glue for example? - _, glue_db = plugin_name.split("|") - config = (self.creds.settings or {}).copy() - config["glue_database"] = glue_db - self._plugins[plugin_name] = glue.Plugin(plugin_name, config) - else: - raise Exception( - f"Plugin {plugin_name} not found; known plugins are: " - + ",".join(self._plugins.keys()) - ) + if plugin_name not in self._plugins: + raise Exception( + f"Plugin {plugin_name} not found; known plugins are: " + + ",".join(self._plugins.keys()) + ) plugin = self._plugins[plugin_name] - plugin.store(target_config) + + + #export data with the store model + handle = self.handle() + cursor = handle.cursor() + + df = cursor.sql(target_config.config.model.compiled_code) + #hand over Duckdb format that each plugin can choose which type of integration to use + plugin.store(df, target_config) + + cursor.close() + handle.close() + + # all are by default false, has to be turned on per plugin + if plugin.can_be_upstream_referenced(): + #create df and view which can be referenced in the run following run + source_config = plugin.create_source_config(target_config) + plugin.load(plugin_name, source_config) def close(self): if self.conn: diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py index 2520173b..988561e8 100644 --- a/dbt/adapters/duckdb/plugins/__init__.py +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -4,7 +4,7 @@ from typing import Dict from typing import Optional -from duckdb import DuckDBPyConnection +from duckdb import DuckDBPyConnection, DuckDBPyRelation from ..utils import SourceConfig from ..utils import TargetConfig @@ -109,9 +109,15 @@ def load(self, source_config: SourceConfig): """ raise NotImplementedError(f"load method not implemented for {self.name}") - def store(self, target_config: TargetConfig): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig): raise NotImplementedError(f"store method not implemented for {self.name}") + def create_source_config(target_config: TargetConfig) -> SourceConfig: + raise NotImplementedError(f"store method not implemented for {self.name}") + + def can_be_upstream_referenced(): + return False + def configure_cursor(self, cursor): """ Configure each copy of the DuckDB cursor. diff --git a/dbt/adapters/duckdb/plugins/duckdb_native.py b/dbt/adapters/duckdb/plugins/duckdb_native.py new file mode 100644 index 00000000..c9a38ff1 --- /dev/null +++ b/dbt/adapters/duckdb/plugins/duckdb_native.py @@ -0,0 +1,29 @@ +from typing import Any +from typing import Dict + +from duckdb import DuckDBPyRelation + +from . import BasePlugin +from ..utils import SourceConfig, TargetConfig + +# here will be parquet,csv,json implementation, +# this plugin should be default one if none is specified +# we can change the name of the plugin + +class Plugin(BasePlugin): + def initialize(self, config: Dict[str, Any]): + pass + + def configure_cursor(self, cursor): + pass + + def load(self, source_config: SourceConfig): + pass + + def default_materialization(self): + return "view" + + def store(self, df: DuckDBPyRelation, target_config: TargetConfig): + pass + + diff --git a/dbt/adapters/duckdb/plugins/excel.py b/dbt/adapters/duckdb/plugins/excel.py index 2f5ac0f8..a614c96b 100644 --- a/dbt/adapters/duckdb/plugins/excel.py +++ b/dbt/adapters/duckdb/plugins/excel.py @@ -3,6 +3,7 @@ from threading import Lock from typing import Any from typing import Dict +from duckdb import DuckDBPyRelation import pandas as pd from pandas.io.formats import excel @@ -42,7 +43,7 @@ def load(self, source_config: SourceConfig): sheet_name = source_config.get("sheet_name", 0) return pd.read_excel(source_location, sheet_name=sheet_name) - def store(self, target_config: TargetConfig): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig): plugin_output_config = self._config["output"] # Create the writer on the first instance of the call to store. diff --git a/dbt/adapters/duckdb/plugins/glue.py b/dbt/adapters/duckdb/plugins/glue.py index 76f25419..7717a686 100644 --- a/dbt/adapters/duckdb/plugins/glue.py +++ b/dbt/adapters/duckdb/plugins/glue.py @@ -5,6 +5,7 @@ from typing import Sequence import boto3 +from duckdb import DuckDBPyRelation from mypy_boto3_glue import GlueClient from mypy_boto3_glue.type_defs import ColumnTypeDef from mypy_boto3_glue.type_defs import GetTableResponseTypeDef @@ -263,7 +264,7 @@ def initialize(self, config: Dict[str, Any]): self.database = config.get("glue_database", "default") self.delimiter = config.get("delimiter", ",") - def store(self, target_config: TargetConfig): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig): assert target_config.location is not None assert target_config.relation.identifier is not None table: str = target_config.relation.identifier diff --git a/dbt/adapters/duckdb/plugins/sqlalchemy.py b/dbt/adapters/duckdb/plugins/sqlalchemy.py index b66c8de8..2864a210 100644 --- a/dbt/adapters/duckdb/plugins/sqlalchemy.py +++ b/dbt/adapters/duckdb/plugins/sqlalchemy.py @@ -1,5 +1,6 @@ from typing import Any from typing import Dict +from duckdb import DuckDBPyRelation import pandas as pd from sqlalchemy import create_engine @@ -30,7 +31,7 @@ def load(self, source_config: SourceConfig) -> pd.DataFrame: with self.engine.connect() as conn: return pd.read_sql_table(table, con=conn) - def store(self, target_config: TargetConfig): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig): # first, load the data frame from the external location df = pd_utils.target_to_df(target_config) table_name = target_config.relation.identifier diff --git a/dbt/include/duckdb/macros/materializations/external.sql b/dbt/include/duckdb/macros/materializations/external.sql index cc9172e8..91e9b5ab 100644 --- a/dbt/include/duckdb/macros/materializations/external.sql +++ b/dbt/include/duckdb/macros/materializations/external.sql @@ -7,14 +7,11 @@ {%- set read_location = adapter.external_read_location(location, rendered_options) -%} -- set language - python or sql - -- i have to learn general about python models {%- set language = model['language'] -%} {%- set target_relation = this.incorporate(type='view') %} -- Continue as normal materialization - -- Why do we have temp and intermediate relation? - -- What does load_cached_relation do? {%- set existing_relation = load_cached_relation(this) -%} {%- set temp_relation = make_intermediate_relation(this.incorporate(type='table'), suffix='__dbt_tmp') -%} {%- set intermediate_relation = make_intermediate_relation(target_relation, suffix='__dbt_int') -%} @@ -30,8 +27,6 @@ {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} -- as above, the backup_relation should not already exist {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} - - --What is grants here? -- grab current tables grants config for comparision later on {% set grant_config = config.get('grants') %} @@ -67,11 +62,10 @@ {{ adapter.rename_relation(intermediate_relation, target_relation) }} {{ run_hooks(post_hooks, inside_transaction=True) }} - -- What is should_revoke? + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} - --What does that do? {% do persist_docs(target_relation, model) %} -- `COMMIT` happens here @@ -82,10 +76,9 @@ {{ drop_relation_if_exists(temp_relation) }} -- register table into glue - --I dont know glue so can you explain me a bit about that? {%- set plugin_name = config.get('plugin') -%} {%- set glue_register = config.get('glue_register', default=false) -%} - {%- set partition_columns = config.get('partition_columns', []) -%} -- this one is never used? + {%- set partition_columns = config.get('partition_columns', []) -%} {% if plugin_name is not none or glue_register is true %} {% if glue_register %} {# legacy hack to set the glue database name, deprecate this #} @@ -96,6 +89,6 @@ {{ run_hooks(post_hooks, inside_transaction=False) }} - {{ return({'relations': [target_relation]}) }} -- to what i return? + {{ return({'relations': [target_relation]}) }} {% endmaterialization %} diff --git a/dbt/include/duckdb/macros/materializations/external_new.sql b/dbt/include/duckdb/macros/materializations/external_new.sql new file mode 100644 index 00000000..d163584f --- /dev/null +++ b/dbt/include/duckdb/macros/materializations/external_new.sql @@ -0,0 +1,30 @@ +{% materialization external, adapter="duckdb", supported_languages=['sql', 'python'] %} + + {%- set location = render(config.get('location', default=external_location(this, config))) -%}) + + {%- set format = config.get('format', 'parquet') -%} + + {%- set target_relation = this.incorporate(type='view') %} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + -- has to be here + {% call statement('main', language='sql') -%} + {%- endcall %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + -- `COMMIT` happens here + {{ adapter.commit() }} + + {%- set plugin_name = config.get('plugin') -%} + + {% do store_relation(plugin_name, target_relation, location, format, config) %} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {{ return({'relations': [target_relation]}) }} -- to what i return? + +{% endmaterialization %} diff --git a/dbt/include/duckdb/macros/utils/upstream.sql b/dbt/include/duckdb/macros/utils/upstream.sql index c8097aa6..edce959b 100644 --- a/dbt/include/duckdb/macros/utils/upstream.sql +++ b/dbt/include/duckdb/macros/utils/upstream.sql @@ -1,3 +1,6 @@ +-- todo: understand this function +-- we call to create all views over the location but this doesnt work for df registration +-- so we have to call some adapter function which takes the config and register the df and create view over that {%- macro register_upstream_external_models() -%} {% if execute %} {% set upstream_nodes = {} %} From 7e4b5333c3800d20273d1f89bf9665fad8743b69 Mon Sep 17 00:00:00 2001 From: Aleksandar Date: Tue, 20 Feb 2024 01:50:28 +0000 Subject: [PATCH 03/13] add native plugin with the upstream functionality --- dbt/adapters/duckdb/environments/__init__.py | 12 +- dbt/adapters/duckdb/environments/local.py | 12 +- dbt/adapters/duckdb/plugins/__init__.py | 20 ++-- dbt/adapters/duckdb/plugins/duckdb_native.py | 29 ----- dbt/adapters/duckdb/plugins/native.py | 91 +++++++++++++++ .../macros/materializations/external_new.sql | 21 ++-- tests/functional/adapter/test_external.py | 4 +- tests/functional/plugins/test_native.py | 109 ++++++++++++++++++ 8 files changed, 234 insertions(+), 64 deletions(-) delete mode 100644 dbt/adapters/duckdb/plugins/duckdb_native.py create mode 100644 dbt/adapters/duckdb/plugins/native.py create mode 100644 tests/functional/plugins/test_native.py diff --git a/dbt/adapters/duckdb/environments/__init__.py b/dbt/adapters/duckdb/environments/__init__.py index 6b27a02f..8d85ae5e 100644 --- a/dbt/adapters/duckdb/environments/__init__.py +++ b/dbt/adapters/duckdb/environments/__init__.py @@ -4,19 +4,17 @@ import sys import tempfile import time -from typing import Dict -from typing import List -from typing import Optional +from typing import Dict, List, Optional import duckdb -from ..credentials import DuckDBCredentials -from ..plugins import BasePlugin -from ..utils import SourceConfig -from ..utils import TargetConfig from dbt.contracts.connection import AdapterResponse from dbt.exceptions import DbtRuntimeError +from ..credentials import DuckDBCredentials +from ..plugins import BasePlugin +from ..utils import SourceConfig, TargetConfig + def _ensure_event_loop(): """ diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index 8fe53b0b..504665ee 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -1,14 +1,12 @@ import threading -from . import Environment -from .. import credentials -from .. import utils from dbt.contracts.connection import AdapterResponse from dbt.exceptions import DbtRuntimeError -import duckdb +from .. import credentials, utils +from . import Environment + -duckdb.sql() class DuckDBCursorWrapper: def __init__(self, cursor): self._cursor = cursor @@ -150,7 +148,7 @@ def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) -> df = cursor.sql(target_config.config.model.compiled_code) #hand over Duckdb format that each plugin can choose which type of integration to use - plugin.store(df, target_config) + plugin.store(df, target_config, cursor) cursor.close() handle.close() @@ -159,7 +157,7 @@ def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) -> if plugin.can_be_upstream_referenced(): #create df and view which can be referenced in the run following run source_config = plugin.create_source_config(target_config) - plugin.load(plugin_name, source_config) + self.load_source(plugin_name, source_config) def close(self): if self.conn: diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py index a9416523..14712d7a 100644 --- a/dbt/adapters/duckdb/plugins/__init__.py +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -1,16 +1,14 @@ import importlib import os -from typing import Any -from typing import Dict -from typing import Optional +from typing import Any, Dict, Optional from duckdb import DuckDBPyConnection, DuckDBPyRelation -from ..credentials import DuckDBCredentials -from ..utils import SourceConfig -from ..utils import TargetConfig from dbt.dataclass_schema import dbtClassMixin +from ..credentials import DuckDBCredentials +from ..utils import SourceConfig, TargetConfig + class PluginConfig(dbtClassMixin): """A helper class for defining the configuration settings a particular plugin uses.""" @@ -120,14 +118,16 @@ def load(self, source_config: SourceConfig): :raises NotImplementedError: If this method is not implemented by a subclass. """ raise NotImplementedError(f"load method not implemented for {self.name}") - - def store(self, df: DuckDBPyRelation, target_config: TargetConfig): + + # coursor is needed just for the native, we have to do it better + # to had it over in some initalization? + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor): raise NotImplementedError(f"store method not implemented for {self.name}") - def create_source_config(target_config: TargetConfig) -> SourceConfig: + def create_source_config(self, target_config: TargetConfig) -> SourceConfig: raise NotImplementedError(f"store method not implemented for {self.name}") - def can_be_upstream_referenced(): + def can_be_upstream_referenced(self): return False def configure_cursor(self, cursor): diff --git a/dbt/adapters/duckdb/plugins/duckdb_native.py b/dbt/adapters/duckdb/plugins/duckdb_native.py deleted file mode 100644 index c9a38ff1..00000000 --- a/dbt/adapters/duckdb/plugins/duckdb_native.py +++ /dev/null @@ -1,29 +0,0 @@ -from typing import Any -from typing import Dict - -from duckdb import DuckDBPyRelation - -from . import BasePlugin -from ..utils import SourceConfig, TargetConfig - -# here will be parquet,csv,json implementation, -# this plugin should be default one if none is specified -# we can change the name of the plugin - -class Plugin(BasePlugin): - def initialize(self, config: Dict[str, Any]): - pass - - def configure_cursor(self, cursor): - pass - - def load(self, source_config: SourceConfig): - pass - - def default_materialization(self): - return "view" - - def store(self, df: DuckDBPyRelation, target_config: TargetConfig): - pass - - diff --git a/dbt/adapters/duckdb/plugins/native.py b/dbt/adapters/duckdb/plugins/native.py new file mode 100644 index 00000000..3fc33589 --- /dev/null +++ b/dbt/adapters/duckdb/plugins/native.py @@ -0,0 +1,91 @@ +import os +from typing import Any, Dict + +import duckdb +from duckdb import DuckDBPyRelation + +from ..utils import SourceConfig, TargetConfig +from . import BasePlugin + +# here will be parquet,csv,json implementation, +# this plugin should be default one if none is specified +# we can change the name of the plugin + +class Plugin(BasePlugin): + def initialize(self, config: Dict[str, Any]): + pass + + def configure_cursor(self, cursor): + pass + + def default_materialization(self): + return "view" + + def load(self, source_config: SourceConfig): + location = external_read_location(source_config.meta.get("location").get("path"), + source_config.meta.get("config").get("options", {})) + return duckdb.sql(f"SELECT * FROM '{location}'").df() + + def can_be_upstream_referenced(self): + return True + + def create_source_config(self, target_config: TargetConfig) -> SourceConfig: + source_config = SourceConfig( + name= target_config.relation.name, + identifier= target_config.relation.identifier, + schema=target_config.relation.schema, + database=target_config.relation.database, + meta= target_config.as_dict(), + tags= [], + ) + return source_config + + + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor): + location = target_config.location.path + options = external_write_options(location, target_config.config.get("options", {})) + #pwd = os.getcwd() # to find out where it saves -> cd '/tmp/pytest-of-vscode/pytest-18/project0' + #print(pwd) + cursor.sql(f"COPY (SELECT * FROM df) to '{location}' ({options})") + +# 1 to 1 from adapter +def external_write_options(write_location: str, rendered_options: dict) -> str: + if "format" not in rendered_options: + ext = os.path.splitext(write_location)[1].lower() + if ext: + rendered_options["format"] = ext[1:] + elif "delimiter" in rendered_options: + rendered_options["format"] = "csv" + else: + rendered_options["format"] = "parquet" + + if rendered_options["format"] == "csv": + if "header" not in rendered_options: + rendered_options["header"] = 1 + + if "partition_by" in rendered_options: + v = rendered_options["partition_by"] + if "," in v and not v.startswith("("): + rendered_options["partition_by"] = f"({v})" + + ret = [] + for k, v in rendered_options.items(): + if k.lower() in { + "delimiter", + "quote", + "escape", + "null", + } and not v.startswith("'"): + ret.append(f"{k} '{v}'") + else: + ret.append(f"{k} {v}") + return ", ".join(ret) + +# 1 to 1 from adapter +def external_read_location(write_location: str, rendered_options: dict) -> str: + if rendered_options.get("partition_by"): + globs = [write_location, "*"] + partition_by = str(rendered_options.get("partition_by")) + globs.extend(["*"] * len(partition_by.split(","))) + return ".".join(["/".join(globs), str(rendered_options.get("format", "parquet"))]) + return write_location diff --git a/dbt/include/duckdb/macros/materializations/external_new.sql b/dbt/include/duckdb/macros/materializations/external_new.sql index d163584f..234dc883 100644 --- a/dbt/include/duckdb/macros/materializations/external_new.sql +++ b/dbt/include/duckdb/macros/materializations/external_new.sql @@ -1,8 +1,4 @@ -{% materialization external, adapter="duckdb", supported_languages=['sql', 'python'] %} - - {%- set location = render(config.get('location', default=external_location(this, config))) -%}) - - {%- set format = config.get('format', 'parquet') -%} +{% materialization external_new, adapter="duckdb", supported_languages=['sql', 'python'] %} {%- set target_relation = this.incorporate(type='view') %} @@ -14,14 +10,19 @@ {% call statement('main', language='sql') -%} {%- endcall %} - {{ run_hooks(post_hooks, inside_transaction=True) }} + {%- set location = render(config.get('location', default=external_location(this, config))) -%}) + -- just a check if the options is a dictionary to stay compielnt but it will be used over config in the plugins + {%- set rendered_options = render_write_options(config) -%} + {%- set format = config.get('format', 'parquet') -%} + {%- set plugin_name = config.get('plugin', 'native') -%} + {% do store_relation(plugin_name, target_relation, location, format, config) %} + + -- in this moment target should exists as a view so we can setup grants or docu + + {{ run_hooks(post_hooks, inside_transaction=True) }} -- `COMMIT` happens here {{ adapter.commit() }} - - {%- set plugin_name = config.get('plugin') -%} - - {% do store_relation(plugin_name, target_relation, location, format, config) %} {{ run_hooks(post_hooks, inside_transaction=False) }} diff --git a/tests/functional/adapter/test_external.py b/tests/functional/adapter/test_external.py index 66256cc5..12b0d375 100644 --- a/tests/functional/adapter/test_external.py +++ b/tests/functional/adapter/test_external.py @@ -1,5 +1,7 @@ import os + import pytest + from dbt.tests.adapter.basic.files import ( base_table_sql, model_base, @@ -51,7 +53,7 @@ def models(self): "table_parquet_location.sql": parquet_table_location_sql, "table_csv_location_delim.sql": csv_location_delim_sql, "table_json.sql": json_sql, - "schema.yml": schema_base_yml, + "schema.yml": schema_base_yml, } @pytest.fixture(scope="class") diff --git a/tests/functional/plugins/test_native.py b/tests/functional/plugins/test_native.py new file mode 100644 index 00000000..ad948a1f --- /dev/null +++ b/tests/functional/plugins/test_native.py @@ -0,0 +1,109 @@ +import os + +import pytest + +from dbt.tests.adapter.basic.files import ( + base_table_sql, + model_base, + schema_base_yml, + seeds_base_csv, +) +from dbt.tests.util import ( + run_dbt, +) + +default_parquet = """ + {{ config(materialized="external_new") }} + SELECT 1 as a +""" +upstream_default_parquet = """ + {{ config(materialized="table") }} + SELECT * from {{ref("default_parquet")}} +""" + +partition_model_parquet = """ + {{ config( + materialized="external_new", + options = { + "partition_by": "a" + } + ) + }} + SELECT 1 as a +""" +upstream_partition_model_parquet = """ + {{ config(materialized="table") }} + SELECT * from {{ref("partition_model_parquet")}} +""" + +default_csv= """ + {{ config(materialized="external_new", format="csv", delimiter="|" ) }} + SELECT * FROM {{ref("base")}} + """ + +upstream_default_csv = """ + {{ config(materialized="table") }} + SELECT * from {{ref("default_csv")}} +""" + +default_json= """ + {{ config(materialized="external_new", format="json", location="{{ adapter.external_root() }}/test.json" ) }} + SELECT * FROM {{ref("base")}} + """ + +upstream_default_json = """ + {{ config(materialized="table") }} + SELECT * from {{ref("default_json")}} +""" + + +@pytest.mark.skip_profile("buenavista", "md") +class TestDuckdbtNativelMaterializations: + @pytest.fixture(scope="class") + def models(self): + return { + "default_parquet.sql" : default_parquet, + "upstream_default_parquet.sql" : upstream_default_parquet, + "partition_model_parquet.sql": partition_model_parquet, + "upstream_partition_model_parquet.sql": upstream_partition_model_parquet, + "default_csv.sql": default_csv, + "upstream_default_csv.sql": upstream_default_csv, + "default_json.sql": default_json, + "upstream_default_json.sql": upstream_default_json + } + + @pytest.fixture(scope="class") + def seeds(self): + return { + "base.csv": seeds_base_csv, + } + + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target,tmp_path_factory): + extroot = str(tmp_path_factory.getbasetemp() / "external") + os.mkdir(extroot) + return { + "test": { + "outputs": { + "dev": { + "type": "duckdb", + "path": "duckdb.dev", + "plugins": [ + {"module": "native"} + ], + "external_root" : f'{extroot}' + } + }, + "target": "dev", + } + } + + def test_base(self, project): + results = run_dbt(["seed"]) + # run command + results = run_dbt() + print(project.project_root) + print("break point") + + + From 46b1f06a61320f732cff480d34e5252d4f57f9c9 Mon Sep 17 00:00:00 2001 From: Aleksandar Date: Tue, 20 Feb 2024 21:14:17 +0000 Subject: [PATCH 04/13] rename external_new to external --- .../macros/materializations/external.sql | 85 ++--------- .../macros/materializations/external_new.sql | 31 ---- tests/functional/adapter/test_external.py | 141 ------------------ tests/functional/plugins/test_native.py | 8 +- 4 files changed, 15 insertions(+), 250 deletions(-) delete mode 100644 dbt/include/duckdb/macros/materializations/external_new.sql delete mode 100644 tests/functional/adapter/test_external.py diff --git a/dbt/include/duckdb/macros/materializations/external.sql b/dbt/include/duckdb/macros/materializations/external.sql index 91e9b5ab..cb9eb791 100644 --- a/dbt/include/duckdb/macros/materializations/external.sql +++ b/dbt/include/duckdb/macros/materializations/external.sql @@ -1,94 +1,31 @@ {% materialization external, adapter="duckdb", supported_languages=['sql', 'python'] %} - {%- set location = render(config.get('location', default=external_location(this, config))) -%}) - {%- set rendered_options = render_write_options(config) -%} - {%- set format = config.get('format', 'parquet') -%} - {%- set write_options = adapter.external_write_options(location, rendered_options) -%} - {%- set read_location = adapter.external_read_location(location, rendered_options) -%} - - -- set language - python or sql - {%- set language = model['language'] -%} - {%- set target_relation = this.incorporate(type='view') %} - -- Continue as normal materialization - {%- set existing_relation = load_cached_relation(this) -%} - {%- set temp_relation = make_intermediate_relation(this.incorporate(type='table'), suffix='__dbt_tmp') -%} - {%- set intermediate_relation = make_intermediate_relation(target_relation, suffix='__dbt_int') -%} - -- the intermediate_relation should not already exist in the database; get_relation - -- will return None in that case. Otherwise, we get a relation that we can drop - -- later, before we try to use this name for the current operation - {%- set preexisting_temp_relation = load_cached_relation(temp_relation) -%} - {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%} - /* - See ../view/view.sql for more information about this relation. - */ - {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} - {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} - -- as above, the backup_relation should not already exist - {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} - -- grab current tables grants config for comparision later on - {% set grant_config = config.get('grants') %} - - -- drop the temp relations if they exist already in the database - {{ drop_relation_if_exists(preexisting_intermediate_relation) }} - {{ drop_relation_if_exists(preexisting_temp_relation) }} - {{ drop_relation_if_exists(preexisting_backup_relation) }} - {{ run_hooks(pre_hooks, inside_transaction=False) }} - -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} - -- build model - {% call statement('create_table', language=language) -%} - {{- create_table_as(False, temp_relation, compiled_code, language) }} - {%- endcall %} - - -- write an temp relation into file - {{ write_to_file(temp_relation, location, write_options) }} - -- create a view on top of the location + -- has to be here {% call statement('main', language='sql') -%} - create or replace view {{ intermediate_relation }} as ( - select * from '{{ read_location }}' - ); {%- endcall %} - -- cleanup - {% if existing_relation is not none %} - {{ adapter.rename_relation(existing_relation, backup_relation) }} - {% endif %} + {%- set location = render(config.get('location', default=external_location(this, config))) -%}) + -- just a check if the options is a dictionary to stay compielnt but it will be used over config in the plugins + {%- set rendered_options = render_write_options(config) -%} + {%- set format = config.get('format', 'parquet') -%} + {%- set plugin_name = config.get('plugin', 'native') -%} - {{ adapter.rename_relation(intermediate_relation, target_relation) }} + {% do store_relation(plugin_name, target_relation, location, format, config) %} + + -- in this moment target should exists as a view so we can setup grants or docu {{ run_hooks(post_hooks, inside_transaction=True) }} - - {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} - {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} - - {% do persist_docs(target_relation, model) %} - -- `COMMIT` happens here {{ adapter.commit() }} - - -- finally, drop the existing/backup relation after the commit - {{ drop_relation_if_exists(backup_relation) }} - {{ drop_relation_if_exists(temp_relation) }} - - -- register table into glue - {%- set plugin_name = config.get('plugin') -%} - {%- set glue_register = config.get('glue_register', default=false) -%} - {%- set partition_columns = config.get('partition_columns', []) -%} - {% if plugin_name is not none or glue_register is true %} - {% if glue_register %} - {# legacy hack to set the glue database name, deprecate this #} - {%- set plugin_name = 'glue|' ~ config.get('glue_database', 'default') -%} - {% endif %} - {% do store_relation(plugin_name, target_relation, location, format, config) %} - {% endif %} - + {{ run_hooks(post_hooks, inside_transaction=False) }} - {{ return({'relations': [target_relation]}) }} + {{ return({'relations': [target_relation]}) }} -- to what i return? {% endmaterialization %} diff --git a/dbt/include/duckdb/macros/materializations/external_new.sql b/dbt/include/duckdb/macros/materializations/external_new.sql deleted file mode 100644 index 234dc883..00000000 --- a/dbt/include/duckdb/macros/materializations/external_new.sql +++ /dev/null @@ -1,31 +0,0 @@ -{% materialization external_new, adapter="duckdb", supported_languages=['sql', 'python'] %} - - {%- set target_relation = this.incorporate(type='view') %} - - {{ run_hooks(pre_hooks, inside_transaction=False) }} - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} - - -- has to be here - {% call statement('main', language='sql') -%} - {%- endcall %} - - {%- set location = render(config.get('location', default=external_location(this, config))) -%}) - -- just a check if the options is a dictionary to stay compielnt but it will be used over config in the plugins - {%- set rendered_options = render_write_options(config) -%} - {%- set format = config.get('format', 'parquet') -%} - {%- set plugin_name = config.get('plugin', 'native') -%} - - {% do store_relation(plugin_name, target_relation, location, format, config) %} - - -- in this moment target should exists as a view so we can setup grants or docu - - {{ run_hooks(post_hooks, inside_transaction=True) }} - -- `COMMIT` happens here - {{ adapter.commit() }} - - {{ run_hooks(post_hooks, inside_transaction=False) }} - - {{ return({'relations': [target_relation]}) }} -- to what i return? - -{% endmaterialization %} diff --git a/tests/functional/adapter/test_external.py b/tests/functional/adapter/test_external.py deleted file mode 100644 index 12b0d375..00000000 --- a/tests/functional/adapter/test_external.py +++ /dev/null @@ -1,141 +0,0 @@ -import os - -import pytest - -from dbt.tests.adapter.basic.files import ( - base_table_sql, - model_base, - schema_base_yml, - seeds_base_csv, -) -from dbt.tests.util import ( - check_relation_types, - check_relations_equal, - check_result_nodes_by_name, - relation_from_name, - run_dbt, -) - -config_materialized_default = """ - {{ config(materialized="external") }} -""" - -config_materialized_csv = """ - {{ config(materialized="external", format="csv") }} -""" - -config_materialized_parquet_location = """ - {{ config(materialized="external", location="{{ adapter.external_root() }}/test.parquet") }} -""" - -config_materialized_csv_location_delim = """ - {{ config(materialized="external", location="{{ adapter.external_root() }}/test_delim.csv", delimiter="|") }} -""" - -config_json = """ - {{ config(materialized="external", format="json") }} -""" - -default_external_sql = config_materialized_default + model_base -csv_external_sql = config_materialized_csv + model_base -parquet_table_location_sql = config_materialized_parquet_location + model_base -csv_location_delim_sql = config_materialized_csv_location_delim + model_base -json_sql = config_json + model_base - - -class BaseExternalMaterializations: - @pytest.fixture(scope="class") - def models(self): - return { - "table_model.sql": base_table_sql, - "table_default.sql": default_external_sql, - "table_csv.sql": csv_external_sql, - "table_parquet_location.sql": parquet_table_location_sql, - "table_csv_location_delim.sql": csv_location_delim_sql, - "table_json.sql": json_sql, - "schema.yml": schema_base_yml, - } - - @pytest.fixture(scope="class") - def seeds(self): - return { - "base.csv": seeds_base_csv, - } - - @pytest.fixture(scope="class") - def dbt_profile_target(self, dbt_profile_target, tmp_path_factory): - extroot = str(tmp_path_factory.getbasetemp() / "external") - os.mkdir(extroot) - dbt_profile_target["external_root"] = extroot - return dbt_profile_target - - @pytest.fixture(scope="class") - def project_config_update(self): - return { - "name": "base", - } - - def test_base(self, project): - - # seed command - results = run_dbt(["seed"]) - # seed result length - assert len(results) == 1 - - # run command - results = run_dbt() - # run result length - assert len(results) == 6 - - # names exist in result nodes - check_result_nodes_by_name( - results, - [ - "table_model", - "table_default", - "table_csv", - "table_parquet_location", - "table_csv_location_delim", - "table_json", - ], - ) - - # check relation types - expected = { - "base": "table", - "table_model": "table", - "table_default": "view", - "table_parquet_location": "view", - "table_csv": "view", - "table_csv_location_delim": "view", - "table_json": "view", - } - check_relation_types(project.adapter, expected) - - # base table rowcount - relation = relation_from_name(project.adapter, "base") - result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") - assert result[0] == 10 - - # relations_equal - check_relations_equal( - project.adapter, - [ - "base", - "table_default", - "table_parquet_location", - "table_model", - "table_csv", - "table_csv_location_delim", - "table_json", - ], - ) - - # check relations in catalog - catalog = run_dbt(["docs", "generate"]) - assert len(catalog.nodes) == 7 - assert len(catalog.sources) == 1 - - -class TestExternalMaterializations(BaseExternalMaterializations): - pass diff --git a/tests/functional/plugins/test_native.py b/tests/functional/plugins/test_native.py index ad948a1f..ab3cfaa5 100644 --- a/tests/functional/plugins/test_native.py +++ b/tests/functional/plugins/test_native.py @@ -13,7 +13,7 @@ ) default_parquet = """ - {{ config(materialized="external_new") }} + {{ config(materialized="external") }} SELECT 1 as a """ upstream_default_parquet = """ @@ -23,7 +23,7 @@ partition_model_parquet = """ {{ config( - materialized="external_new", + materialized="external", options = { "partition_by": "a" } @@ -37,7 +37,7 @@ """ default_csv= """ - {{ config(materialized="external_new", format="csv", delimiter="|" ) }} + {{ config(materialized="external", format="csv", delimiter="|" ) }} SELECT * FROM {{ref("base")}} """ @@ -47,7 +47,7 @@ """ default_json= """ - {{ config(materialized="external_new", format="json", location="{{ adapter.external_root() }}/test.json" ) }} + {{ config(materialized="external", format="json", location="{{ adapter.external_root() }}/test.json" ) }} SELECT * FROM {{ref("base")}} """ From 0dd9053b2fe098777439299c07a91a49220905c9 Mon Sep 17 00:00:00 2001 From: Aleksandar Date: Sat, 24 Feb 2024 13:15:34 +0000 Subject: [PATCH 05/13] simplify flow; simplify and addapt excel plugin --- dbt/adapters/duckdb/environments/local.py | 2 + dbt/adapters/duckdb/plugins/__init__.py | 5 +- dbt/adapters/duckdb/plugins/excel.py | 92 +++++++++++-------- dbt/adapters/duckdb/plugins/native.py | 13 ++- dbt/adapters/duckdb/plugins/pd_utils.py | 16 ---- dbt/adapters/duckdb/plugins/postgres.py | 5 +- .../macros/materializations/external.sql | 4 +- .../duckdb/macros/utils/external_location.sql | 7 +- tests/functional/plugins/test_excel.py | 8 +- 9 files changed, 80 insertions(+), 72 deletions(-) delete mode 100644 dbt/adapters/duckdb/plugins/pd_utils.py diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index 504665ee..c182efd8 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -141,6 +141,8 @@ def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) -> ) plugin = self._plugins[plugin_name] + #e.g add file format to the location + target_config = plugin.adapt_target_config(target_config) #export data with the store model handle = self.handle() diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py index 14712d7a..770f096f 100644 --- a/dbt/adapters/duckdb/plugins/__init__.py +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -121,7 +121,7 @@ def load(self, source_config: SourceConfig): # coursor is needed just for the native, we have to do it better # to had it over in some initalization? - def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None): raise NotImplementedError(f"store method not implemented for {self.name}") def create_source_config(self, target_config: TargetConfig) -> SourceConfig: @@ -142,3 +142,6 @@ def configure_cursor(self, cursor): def default_materialization(self): return "table" + + def adapt_target_config(self, target_config: TargetConfig) -> TargetConfig: + return target_config diff --git a/dbt/adapters/duckdb/plugins/excel.py b/dbt/adapters/duckdb/plugins/excel.py index a614c96b..05d20d9d 100644 --- a/dbt/adapters/duckdb/plugins/excel.py +++ b/dbt/adapters/duckdb/plugins/excel.py @@ -1,29 +1,22 @@ import os import pathlib from threading import Lock -from typing import Any -from typing import Dict -from duckdb import DuckDBPyRelation +from typing import Any, Dict import pandas as pd +from duckdb import DuckDBPyRelation from pandas.io.formats import excel -from . import BasePlugin -from . import pd_utils -from ..utils import SourceConfig -from ..utils import TargetConfig from dbt.logger import GLOBAL_LOGGER as logger +from ..utils import SourceConfig, TargetConfig +from . import BasePlugin + class Plugin(BasePlugin): def initialize(self, plugin_config: Dict[str, Any]): self._config = plugin_config - if "output" in plugin_config: - self._excel_writer_create_lock = Lock() - assert isinstance(plugin_config["output"], dict) - assert "file" in plugin_config["output"] - # Pass s3 settings to plugin environment if "s3_access_key_id" in plugin_config: os.environ["AWS_ACCESS_KEY_ID"] = plugin_config["s3_access_key_id"] @@ -43,26 +36,20 @@ def load(self, source_config: SourceConfig): sheet_name = source_config.get("sheet_name", 0) return pd.read_excel(source_location, sheet_name=sheet_name) - def store(self, df: DuckDBPyRelation, target_config: TargetConfig): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None): plugin_output_config = self._config["output"] - # Create the writer on the first instance of the call to store. - # Instead if we instantiated the writer in the constructor - # with mode = 'w', this would result in an existing file getting - # overwritten. This can happen if dbt test is executed for example. - if not hasattr(self, "_excel_writer"): - with self._excel_writer_create_lock: - if not hasattr(self, "_excel_writer"): - self._excel_writer = pd.ExcelWriter( - plugin_output_config["file"], - mode=plugin_output_config.get("mode", "w"), - engine=plugin_output_config.get("engine", "xlsxwriter"), - engine_kwargs=plugin_output_config.get("engine_kwargs", {}), - date_format=plugin_output_config.get("date_format"), - datetime_format=plugin_output_config.get("datetime_format"), - ) - if not plugin_output_config.get("header_styling", True): - excel.ExcelFormatter.header_style = None + #this writer doesnt take location but always something defined in the profile? + _excel_writer = pd.ExcelWriter( + target_config.location.path, + mode=plugin_output_config.get("mode", "w"), + engine=plugin_output_config.get("engine", "xlsxwriter"), + engine_kwargs=plugin_output_config.get("engine_kwargs", {}), + date_format=plugin_output_config.get("date_format"), + datetime_format=plugin_output_config.get("datetime_format"), + ) + if not plugin_output_config.get("header_styling", True): + excel.ExcelFormatter.header_style = None target_output_config = { **plugin_output_config, @@ -74,12 +61,12 @@ def store(self, df: DuckDBPyRelation, target_config: TargetConfig): sheet_name = (target_config.relation.identifier or "Sheet1")[0:31] target_output_config["sheet_name"] = sheet_name - df = pd_utils.target_to_df(target_config) + df = df.df() # duckdb model to pandas dataframe if target_output_config.get("skip_empty_sheet", False) and df.shape[0] == 0: return try: df.to_excel( - self._excel_writer, + _excel_writer, sheet_name=target_output_config["sheet_name"], na_rep=target_output_config.get("na_rep", ""), float_format=target_output_config.get("float_format", None), @@ -88,9 +75,6 @@ def store(self, df: DuckDBPyRelation, target_config: TargetConfig): merge_cells=target_output_config.get("merge_cells", True), inf_rep=target_output_config.get("inf_rep", "inf"), ) - if not target_output_config.get("lazy_close", True): - self._excel_writer.close() - del self._excel_writer except ValueError as ve: # Catches errors resembling the below & logs an appropriate message # ValueError('This sheet is too large! Your sheet size is: 1100000, 1 Max sheet size is: 1048576, 16384') @@ -101,12 +85,40 @@ def store(self, df: DuckDBPyRelation, target_config: TargetConfig): pd.DataFrame( [{"Error": target_output_config.get("ignore_sheet_too_large_error", str(ve))}] ).to_excel( - self._excel_writer, sheet_name=target_output_config["sheet_name"], index=False + _excel_writer, sheet_name=target_output_config["sheet_name"], index=False ) else: raise ve + + _excel_writer.close() + + def create_source_config(self, target_config: TargetConfig) -> SourceConfig: + # in the reader we have just location and sheet_name, maybe we can add here more options + # but in the first place i would not recommend to upstream excel file + # this works for a very simple case but not all of them + meta = { + "external_location": target_config.location.path, + "sheet_name": target_config.config.get("sheet_name",0) + } + + source_config = SourceConfig( + name= target_config.relation.name, + identifier= target_config.relation.identifier, + schema=target_config.relation.schema, + database=target_config.relation.database, + meta= meta, + tags= [], + ) + return source_config + + + def can_be_upstream_referenced(self): + return True + + def adapt_target_config(self, target_config: TargetConfig) -> TargetConfig: + if target_config.location.format == "default": + target_config.location.format = "xlsx" + + target_config.location.path = target_config.location.path + "." + target_config.location.format - def __del__(self): - if hasattr(self, "_excel_writer"): - logger.info(f"Closing {self._config['output']['file']}") - self._excel_writer.close() + return target_config diff --git a/dbt/adapters/duckdb/plugins/native.py b/dbt/adapters/duckdb/plugins/native.py index 3fc33589..5ad67a43 100644 --- a/dbt/adapters/duckdb/plugins/native.py +++ b/dbt/adapters/duckdb/plugins/native.py @@ -40,15 +40,26 @@ def create_source_config(self, target_config: TargetConfig) -> SourceConfig: ) return source_config - def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor): location = target_config.location.path + options = external_write_options(location, target_config.config.get("options", {})) #pwd = os.getcwd() # to find out where it saves -> cd '/tmp/pytest-of-vscode/pytest-18/project0' #print(pwd) cursor.sql(f"COPY (SELECT * FROM df) to '{location}' ({options})") + def adapt_target_config(self, target_config: TargetConfig) -> TargetConfig: + #setup the location with default to parquet if not partitions_by + if target_config.location.format == "default": + target_config.location.format = "parquet" + + if "partition_by" not in target_config.config.get("options", {}): + target_config.location.path = target_config.location.path + "." + target_config.location.format + return target_config + + # 1 to 1 from adapter +# TODO those can be maybe better written def external_write_options(write_location: str, rendered_options: dict) -> str: if "format" not in rendered_options: ext = os.path.splitext(write_location)[1].lower() diff --git a/dbt/adapters/duckdb/plugins/pd_utils.py b/dbt/adapters/duckdb/plugins/pd_utils.py deleted file mode 100644 index c6620793..00000000 --- a/dbt/adapters/duckdb/plugins/pd_utils.py +++ /dev/null @@ -1,16 +0,0 @@ -import pandas as pd - -from ..utils import TargetConfig - - -def target_to_df(target_config: TargetConfig) -> pd.DataFrame: - """Load a dataframe from a target config.""" - location = target_config.location - if location is None: - raise Exception("Target config does not have a location") - if location.format == "csv": - return pd.read_csv(location.path) - elif location.format == "parquet": - return pd.read_parquet(location.path) - else: - raise Exception(f"Unsupported format: {location.format}") diff --git a/dbt/adapters/duckdb/plugins/postgres.py b/dbt/adapters/duckdb/plugins/postgres.py index 2b14f5fb..e12c79d3 100644 --- a/dbt/adapters/duckdb/plugins/postgres.py +++ b/dbt/adapters/duckdb/plugins/postgres.py @@ -1,11 +1,12 @@ -from typing import Any -from typing import Dict +from typing import Any, Dict from duckdb import DuckDBPyConnection from . import BasePlugin +## This is maybe what we can deprecate? Native way in the config +#is better or there is some need? class Plugin(BasePlugin): def initialize(self, config: Dict[str, Any]): self._dsn = config.get("dsn") diff --git a/dbt/include/duckdb/macros/materializations/external.sql b/dbt/include/duckdb/macros/materializations/external.sql index cb9eb791..ab476a55 100644 --- a/dbt/include/duckdb/macros/materializations/external.sql +++ b/dbt/include/duckdb/macros/materializations/external.sql @@ -13,7 +13,7 @@ {%- set location = render(config.get('location', default=external_location(this, config))) -%}) -- just a check if the options is a dictionary to stay compielnt but it will be used over config in the plugins {%- set rendered_options = render_write_options(config) -%} - {%- set format = config.get('format', 'parquet') -%} + {%- set format = config.get('format', 'default') -%} {%- set plugin_name = config.get('plugin', 'native') -%} {% do store_relation(plugin_name, target_relation, location, format, config) %} @@ -26,6 +26,6 @@ {{ run_hooks(post_hooks, inside_transaction=False) }} - {{ return({'relations': [target_relation]}) }} -- to what i return? + {{ return({'relations': [target_relation]}) }} {% endmaterialization %} diff --git a/dbt/include/duckdb/macros/utils/external_location.sql b/dbt/include/duckdb/macros/utils/external_location.sql index c81921ad..c7008234 100644 --- a/dbt/include/duckdb/macros/utils/external_location.sql +++ b/dbt/include/duckdb/macros/utils/external_location.sql @@ -1,8 +1,3 @@ {%- macro external_location(relation, config) -%} - {%- if config.get('options', {}).get('partition_by') is none -%} - {%- set format = config.get('format', 'parquet') -%} - {{- adapter.external_root() }}/{{ relation.identifier }}.{{ format }} - {%- else -%} - {{- adapter.external_root() }}/{{ relation.identifier }} - {%- endif -%} + {{- adapter.external_root() }}/{{ relation.identifier }} {%- endmacro -%} diff --git a/tests/functional/plugins/test_excel.py b/tests/functional/plugins/test_excel.py index 97fd6236..f4adc989 100644 --- a/tests/functional/plugins/test_excel.py +++ b/tests/functional/plugins/test_excel.py @@ -20,14 +20,13 @@ external_location: "{test_data_path}/excel_file.xlsx" """ +#Question why is file here defined? this is not a source this is config? plugins = [ { "module": "excel", "config": { "output": { - "engine": "openpyxl", - "file": "/tmp/excel_file_out.xlsx", - "lazy_close": False + "engine": "openpyxl" } } }, @@ -69,7 +68,7 @@ def test_excel_plugin(self, project): res = project.run_sql("SELECT COUNT(1) FROM excel_file", fetch="one") assert res[0] == 9 - df = pandas.read_excel('/tmp/excel_file_out.xlsx') + df = pandas.read_excel('./excel_read_write.xlsx') assert df.shape[0] == 9 assert df['First Name'].iloc[0] == 'Dulce' @@ -82,3 +81,4 @@ def test_excel_plugin(self, project): ) +#TODO write more tests \ No newline at end of file From 405b9291cb7b19d1d162d260e5e3b4343555eb62 Mon Sep 17 00:00:00 2001 From: Aleksandar Date: Sun, 25 Feb 2024 09:14:22 +0000 Subject: [PATCH 06/13] add sqlalchemy; adapt flow --- dbt/adapters/duckdb/environments/local.py | 4 +- dbt/adapters/duckdb/plugins/__init__.py | 4 +- dbt/adapters/duckdb/plugins/delta.py | 7 ++-- dbt/adapters/duckdb/plugins/excel.py | 2 +- dbt/adapters/duckdb/plugins/glue.py | 27 ++++++------ dbt/adapters/duckdb/plugins/gsheet.py | 9 ++-- dbt/adapters/duckdb/plugins/iceberg.py | 7 ++-- dbt/adapters/duckdb/plugins/native.py | 10 ++--- dbt/adapters/duckdb/plugins/sqlalchemy.py | 41 +++++++++++++------ tests/create_function_plugin.py | 25 +++++++++-- .../functional/adapter/test_rematerialize.py | 6 ++- .../functional/adapter/test_write_options.py | 5 +++ tests/functional/plugins/test_excel.py | 5 ++- tests/functional/plugins/test_plugins.py | 19 +++++---- 14 files changed, 108 insertions(+), 63 deletions(-) diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index c182efd8..54447f87 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -66,7 +66,7 @@ def handle(self): self.handle_count += 1 cursor = self.initialize_cursor( - self.creds, self.conn.cursor(), self._plugins, self._REGISTERED_DF + self.creds, self.conn.cursor(), self._plugins, self._REGISTERED_DF.copy() ) return DuckDBConnectionWrapper(cursor, self) @@ -109,7 +109,7 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): else: # Nothing to do (we ignore the existing table) return - df = plugin.load(source_config) + df = plugin.load(source_config, cursor) assert df is not None materialization = source_config.meta.get( diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py index 770f096f..a2bfb949 100644 --- a/dbt/adapters/duckdb/plugins/__init__.py +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -107,8 +107,8 @@ def configure_connection(self, conn: DuckDBPyConnection): :param conn: A DuckDBPyConnection instance to be configured. """ pass - - def load(self, source_config: SourceConfig): + #coursor is needed for the native plugin + def load(self, source_config: SourceConfig, coursor = None): """ Load data from a source config and return it as a DataFrame-like object that DuckDB can read. This method should be overridden by subclasses that diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index c6b0aa2a..4387426c 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -1,10 +1,9 @@ -from typing import Any -from typing import Dict +from typing import Any, Dict from deltalake import DeltaTable -from . import BasePlugin from ..utils import SourceConfig +from . import BasePlugin class Plugin(BasePlugin): @@ -14,7 +13,7 @@ def initialize(self, config: Dict[str, Any]): def configure_cursor(self, cursor): pass - def load(self, source_config: SourceConfig): + def load(self, source_config: SourceConfig, coursor = None): if "delta_table_path" not in source_config: raise Exception("'delta_table_path' is a required argument for the delta table!") diff --git a/dbt/adapters/duckdb/plugins/excel.py b/dbt/adapters/duckdb/plugins/excel.py index 05d20d9d..ab2c8734 100644 --- a/dbt/adapters/duckdb/plugins/excel.py +++ b/dbt/adapters/duckdb/plugins/excel.py @@ -25,7 +25,7 @@ def initialize(self, plugin_config: Dict[str, Any]): if "s3_region" in plugin_config: os.environ["AWS_DEFAULT_REGION"] = plugin_config["s3_region"] - def load(self, source_config: SourceConfig): + def load(self, source_config: SourceConfig, coursor = None): ext_location = source_config["external_location"] ext_location = ext_location.format(**source_config.as_dict()) if "s3" in ext_location: diff --git a/dbt/adapters/duckdb/plugins/glue.py b/dbt/adapters/duckdb/plugins/glue.py index e74787d2..5ddc8e05 100644 --- a/dbt/adapters/duckdb/plugins/glue.py +++ b/dbt/adapters/duckdb/plugins/glue.py @@ -1,23 +1,22 @@ -from typing import Any -from typing import Dict -from typing import List -from typing import Optional -from typing import Sequence +from typing import Any, Dict, List, Optional, Sequence import boto3 from duckdb import DuckDBPyRelation from mypy_boto3_glue import GlueClient -from mypy_boto3_glue.type_defs import ColumnTypeDef -from mypy_boto3_glue.type_defs import GetTableResponseTypeDef -from mypy_boto3_glue.type_defs import PartitionInputTypeDef -from mypy_boto3_glue.type_defs import SerDeInfoTypeDef -from mypy_boto3_glue.type_defs import StorageDescriptorTypeDef -from mypy_boto3_glue.type_defs import TableInputTypeDef +from mypy_boto3_glue.type_defs import ( + ColumnTypeDef, + GetTableResponseTypeDef, + PartitionInputTypeDef, + SerDeInfoTypeDef, + StorageDescriptorTypeDef, + TableInputTypeDef, +) -from . import BasePlugin -from ..utils import TargetConfig from dbt.adapters.base.column import Column +from ..utils import TargetConfig +from . import BasePlugin + class UnsupportedFormatType(Exception): """UnsupportedFormatType exception.""" @@ -332,7 +331,7 @@ def initialize(self, config: Dict[str, Any]): self.database = config.get("glue_database", "default") self.delimiter = config.get("delimiter", ",") - def store(self, df: DuckDBPyRelation, target_config: TargetConfig): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None): assert target_config.location is not None assert target_config.relation.identifier is not None table: str = target_config.relation.identifier diff --git a/dbt/adapters/duckdb/plugins/gsheet.py b/dbt/adapters/duckdb/plugins/gsheet.py index 9b8e8db9..150d2f83 100644 --- a/dbt/adapters/duckdb/plugins/gsheet.py +++ b/dbt/adapters/duckdb/plugins/gsheet.py @@ -1,14 +1,11 @@ from dataclasses import dataclass -from typing import Any -from typing import Dict -from typing import Literal +from typing import Any, Dict, Literal import gspread import pandas as pd -from . import BasePlugin -from . import PluginConfig from ..utils import SourceConfig +from . import BasePlugin, PluginConfig @dataclass @@ -27,7 +24,7 @@ def initialize(self, config: Dict[str, Any]): self._config = GSheetConfig.from_dict(config) self._gc = self._config.client() - def load(self, source_config: SourceConfig): + def load(self, source_config: SourceConfig, coursor = None): doc = None if "title" in source_config: doc = self._gc.open(source_config["title"]) diff --git a/dbt/adapters/duckdb/plugins/iceberg.py b/dbt/adapters/duckdb/plugins/iceberg.py index 27208bbd..99460b81 100644 --- a/dbt/adapters/duckdb/plugins/iceberg.py +++ b/dbt/adapters/duckdb/plugins/iceberg.py @@ -1,10 +1,9 @@ -from typing import Any -from typing import Dict +from typing import Any, Dict import pyiceberg.catalog -from . import BasePlugin from ..utils import SourceConfig +from . import BasePlugin class Plugin(BasePlugin): @@ -14,7 +13,7 @@ def initialize(self, config: Dict[str, Any]): catalog = config.pop("catalog") self._catalog = pyiceberg.catalog.load_catalog(catalog, **config) - def load(self, source_config: SourceConfig): + def load(self, source_config: SourceConfig, coursor = None): table_format = source_config.get("iceberg_table", "{schema}.{identifier}") table_name = table_format.format(**source_config.as_dict()) table = self._catalog.load_table(table_name) diff --git a/dbt/adapters/duckdb/plugins/native.py b/dbt/adapters/duckdb/plugins/native.py index 5ad67a43..619abaa9 100644 --- a/dbt/adapters/duckdb/plugins/native.py +++ b/dbt/adapters/duckdb/plugins/native.py @@ -21,10 +21,11 @@ def configure_cursor(self, cursor): def default_materialization(self): return "view" - def load(self, source_config: SourceConfig): +#this one can be better not to go over some other format and df but directly + def load(self, source_config: SourceConfig, coursor = None): location = external_read_location(source_config.meta.get("location").get("path"), source_config.meta.get("config").get("options", {})) - return duckdb.sql(f"SELECT * FROM '{location}'").df() + return coursor.sql(f"SELECT * FROM '{location}'").arrow() def can_be_upstream_referenced(self): return True @@ -40,12 +41,9 @@ def create_source_config(self, target_config: TargetConfig) -> SourceConfig: ) return source_config - def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None): location = target_config.location.path - options = external_write_options(location, target_config.config.get("options", {})) - #pwd = os.getcwd() # to find out where it saves -> cd '/tmp/pytest-of-vscode/pytest-18/project0' - #print(pwd) cursor.sql(f"COPY (SELECT * FROM df) to '{location}' ({options})") def adapt_target_config(self, target_config: TargetConfig) -> TargetConfig: diff --git a/dbt/adapters/duckdb/plugins/sqlalchemy.py b/dbt/adapters/duckdb/plugins/sqlalchemy.py index 2864a210..59c80992 100644 --- a/dbt/adapters/duckdb/plugins/sqlalchemy.py +++ b/dbt/adapters/duckdb/plugins/sqlalchemy.py @@ -1,22 +1,21 @@ -from typing import Any -from typing import Dict -from duckdb import DuckDBPyRelation +from typing import Any, Dict import pandas as pd -from sqlalchemy import create_engine -from sqlalchemy import text +from duckdb import DuckDBPyRelation +from sqlalchemy import create_engine, text +from ..utils import SourceConfig, TargetConfig from . import BasePlugin -from . import pd_utils -from ..utils import SourceConfig -from ..utils import TargetConfig +# overall i would recommend to use attach in duckdb but this can be nice for some cases where +# native one is not supported e.g starrock +# here we have to requestion the names of the tables class Plugin(BasePlugin): def initialize(self, plugin_config: Dict[str, Any]): self.engine = create_engine(plugin_config.pop("connection_url"), **plugin_config) - def load(self, source_config: SourceConfig) -> pd.DataFrame: + def load(self, source_config: SourceConfig, coursor = None): if "query" in source_config: query = source_config["query"] query = query.format(**source_config.as_dict()) @@ -24,6 +23,7 @@ def load(self, source_config: SourceConfig) -> pd.DataFrame: with self.engine.connect() as conn: return pd.read_sql_query(text(query), con=conn, params=params) else: + # we should question this? what is the use case? if "table" in source_config: table = source_config["table"] else: @@ -31,12 +31,29 @@ def load(self, source_config: SourceConfig) -> pd.DataFrame: with self.engine.connect() as conn: return pd.read_sql_table(table, con=conn) - def store(self, df: DuckDBPyRelation, target_config: TargetConfig): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None): # first, load the data frame from the external location - df = pd_utils.target_to_df(target_config) + pd_df = df.df() table_name = target_config.relation.identifier # then, write it to the database - df.to_sql(table_name, self.engine, if_exists="replace", index=False) + pd_df.to_sql(table_name, self.engine, if_exists="replace", index=False) + + def can_be_upstream_referenced(self): + return True + + def create_source_config(self, target_config: TargetConfig) -> SourceConfig: + meta = { + "table": target_config.relation.identifier + } + source_config = SourceConfig( + name= target_config.relation.name, + identifier= target_config.relation.identifier, + schema=target_config.relation.schema, + database=target_config.relation.database, + meta= meta, + tags= [], + ) + return source_config def __del__(self): self.engine.dispose() diff --git a/tests/create_function_plugin.py b/tests/create_function_plugin.py index c4d52d6d..0b99b2de 100644 --- a/tests/create_function_plugin.py +++ b/tests/create_function_plugin.py @@ -1,7 +1,8 @@ -from duckdb import DuckDBPyConnection +import duckdb +from duckdb import DuckDBPyConnection, DuckDBPyRelation from dbt.adapters.duckdb.plugins import BasePlugin -from dbt.adapters.duckdb.utils import TargetConfig +from dbt.adapters.duckdb.utils import SourceConfig, TargetConfig def foo() -> int: @@ -12,5 +13,23 @@ class Plugin(BasePlugin): def configure_connection(self, conn: DuckDBPyConnection): conn.create_function("foo", foo) - def store(self, target_config: TargetConfig): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None): assert target_config.config.get("key") == "value" + + def can_be_upstream_referenced(self): + return True + + def load(self, source_config: SourceConfig, coursor = None): + + return duckdb.sql("SELECT 1729 as foo").arrow() + + def create_source_config(self, target_config: TargetConfig) -> SourceConfig: + source_config = SourceConfig( + name= target_config.relation.name, + identifier= target_config.relation.identifier, + schema=target_config.relation.schema, + database=target_config.relation.database, + meta= target_config.as_dict(), + tags= [], + ) + return source_config \ No newline at end of file diff --git a/tests/functional/adapter/test_rematerialize.py b/tests/functional/adapter/test_rematerialize.py index 4f821df2..5300f7c2 100644 --- a/tests/functional/adapter/test_rematerialize.py +++ b/tests/functional/adapter/test_rematerialize.py @@ -1,7 +1,9 @@ import os + import pytest -from dbt.tests.util import run_dbt, relation_from_name + from dbt.adapters.duckdb import DuckDBConnectionManager +from dbt.tests.util import relation_from_name, run_dbt upstream_model_sql = """ select range from range(3) @@ -40,6 +42,8 @@ def dbt_profile_target(self, dbt_profile_target, tmp_path_factory): extroot = str(tmp_path_factory.getbasetemp() / "rematerialize") os.mkdir(extroot) dbt_profile_target["external_root"] = extroot + dbt_profile_target["plugins"] = [{"module": "native"}] + return dbt_profile_target @pytest.fixture(scope="class") diff --git a/tests/functional/adapter/test_write_options.py b/tests/functional/adapter/test_write_options.py index 4939ffc3..33d0a166 100644 --- a/tests/functional/adapter/test_write_options.py +++ b/tests/functional/adapter/test_write_options.py @@ -1,5 +1,7 @@ import os + import pytest + from dbt.tests.adapter.basic.files import ( base_table_sql, model_base, @@ -43,6 +45,9 @@ def dbt_profile_target(self, dbt_profile_target, tmp_path_factory): extroot = str(tmp_path_factory.getbasetemp() / "write_options") os.mkdir(extroot) dbt_profile_target["external_root"] = extroot + ##todo add native but delete after setted to default + dbt_profile_target["plugins"] = [{"module": "native"}] + #dbt_profile_target["threads"] = 5 return dbt_profile_target @pytest.fixture(scope="class") diff --git a/tests/functional/plugins/test_excel.py b/tests/functional/plugins/test_excel.py index f4adc989..97e5c34d 100644 --- a/tests/functional/plugins/test_excel.py +++ b/tests/functional/plugins/test_excel.py @@ -81,4 +81,7 @@ def test_excel_plugin(self, project): ) -#TODO write more tests \ No newline at end of file +#TODO write more tests + +#TODO here can be the problem that i deleted some file output from the config +#which doesnt makes so much sense but is a breaking change \ No newline at end of file diff --git a/tests/functional/plugins/test_plugins.py b/tests/functional/plugins/test_plugins.py index 124ee178..906c85d4 100644 --- a/tests/functional/plugins/test_plugins.py +++ b/tests/functional/plugins/test_plugins.py @@ -1,6 +1,8 @@ import os -import pytest import sqlite3 +from pathlib import Path + +import pytest from dbt.tests.util import ( check_relations_equal, @@ -44,8 +46,9 @@ @pytest.mark.skip_profile("buenavista", "md") class TestPlugins: @pytest.fixture(scope="class") - def sqlite_test_db(self): - path = "/tmp/satest.db" + def sqlite_test_db(self,project_root): + path = Path(project_root) + path = path / "satest.db" db = sqlite3.connect(path) cursor = db.cursor() cursor.execute("CREATE TABLE tt1 (id int, name text)") @@ -69,8 +72,6 @@ def sqlite_test_db(self): cursor.close() db.close() - os.unlink(path) - @pytest.fixture(scope="class") def profiles_config_update(self, dbt_profile_target, sqlite_test_db): sa_config = {"connection_url": f"sqlite:///{sqlite_test_db}"} @@ -125,6 +126,10 @@ def test_plugins(self, project): "sqlalchemy2", ], ) + #this does not work in the new scenario because we dont store anything + #we have to write a new test there + #res = project.run_sql("SELECT foo FROM foo", fetch="one") + #assert res[0] == 1729 - res = project.run_sql("SELECT foo FROM foo", fetch="one") - assert res[0] == 1729 +## TODO add some more tests +## separate plugin extendability and sqlachlemy? \ No newline at end of file From e2685a108188ecb1d14a4e0780cb9dfaa68fd71b Mon Sep 17 00:00:00 2001 From: Aleksandar Date: Sun, 25 Feb 2024 13:28:31 +0000 Subject: [PATCH 07/13] add stack overflow question for native --- dbt/adapters/duckdb/environments/local.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index 54447f87..dabd52c1 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -122,7 +122,8 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): if materialization == "view": # save to df instance to register on each cursor creation - self._REGISTERED_DF[df_name] = df + with self.lock: + self._REGISTERED_DF[df_name] = df cursor.execute( f"CREATE OR REPLACE {materialization} {source_table_name} AS SELECT * FROM {df_name}" From f93a1268bf271dc6d6d8b7858cbcd3500582d7e2 Mon Sep 17 00:00:00 2001 From: Aleksandar Date: Sun, 25 Feb 2024 17:29:30 +0000 Subject: [PATCH 08/13] add upstream model; start pre-commit --- dbt/adapters/duckdb/environments/__init__.py | 16 +++-- .../duckdb/environments/buenavista.py | 4 +- dbt/adapters/duckdb/environments/local.py | 37 +++++----- dbt/adapters/duckdb/impl.py | 3 +- dbt/adapters/duckdb/plugins/__init__.py | 28 ++++---- dbt/adapters/duckdb/plugins/delta.py | 7 +- dbt/adapters/duckdb/plugins/excel.py | 48 ++++++------- dbt/adapters/duckdb/plugins/glue.py | 27 ++++---- dbt/adapters/duckdb/plugins/gsheet.py | 9 ++- dbt/adapters/duckdb/plugins/iceberg.py | 7 +- dbt/adapters/duckdb/plugins/native.py | 54 ++++++++------- dbt/adapters/duckdb/plugins/postgres.py | 7 +- dbt/adapters/duckdb/plugins/sqlalchemy.py | 38 ++++++----- dbt/include/duckdb/macros/adapters.sql | 4 +- .../macros/materializations/external.sql | 7 +- dbt/include/duckdb/macros/utils/upstream.sql | 67 ++++++++++--------- .../duckdb/macros/utils/upstream_copy.sql | 31 +++++++++ .../functional/adapter/test_rematerialize.py | 8 ++- tests/functional/plugins/test_native.py | 3 +- tests/functional/plugins/test_sqlite.py | 14 ++-- 20 files changed, 243 insertions(+), 176 deletions(-) create mode 100644 dbt/include/duckdb/macros/utils/upstream_copy.sql diff --git a/dbt/adapters/duckdb/environments/__init__.py b/dbt/adapters/duckdb/environments/__init__.py index 8d85ae5e..ebe20d12 100644 --- a/dbt/adapters/duckdb/environments/__init__.py +++ b/dbt/adapters/duckdb/environments/__init__.py @@ -4,16 +4,18 @@ import sys import tempfile import time -from typing import Dict, List, Optional +from typing import Dict +from typing import List +from typing import Optional import duckdb -from dbt.contracts.connection import AdapterResponse -from dbt.exceptions import DbtRuntimeError - from ..credentials import DuckDBCredentials from ..plugins import BasePlugin -from ..utils import SourceConfig, TargetConfig +from ..utils import SourceConfig +from ..utils import TargetConfig +from dbt.contracts.connection import AdapterResponse +from dbt.exceptions import DbtRuntimeError def _ensure_event_loop(): @@ -101,7 +103,9 @@ def load_source(self, plugin_name: str, source_config: SourceConfig) -> str: pass @abc.abstractmethod - def store_relation(self, plugin_name: str, target_config: TargetConfig) -> None: + def store_relation( + self, plugin_name: str, target_config: TargetConfig, just_register: bool = False + ) -> None: pass def get_binding_char(self) -> str: diff --git a/dbt/adapters/duckdb/environments/buenavista.py b/dbt/adapters/duckdb/environments/buenavista.py index b41aaf80..d490b5c4 100644 --- a/dbt/adapters/duckdb/environments/buenavista.py +++ b/dbt/adapters/duckdb/environments/buenavista.py @@ -61,7 +61,9 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): cursor.close() handle.close() - def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) -> None: + def store_relation( + self, plugin_name: str, target_config: utils.TargetConfig, just_register: bool = False + ) -> None: handle = self.handle() payload = { "method": "dbt_store_relation", diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index dabd52c1..88a83b6d 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -1,11 +1,11 @@ import threading +from . import Environment +from .. import credentials +from .. import utils from dbt.contracts.connection import AdapterResponse from dbt.exceptions import DbtRuntimeError -from .. import credentials, utils -from . import Environment - class DuckDBCursorWrapper: def __init__(self, cursor): @@ -24,7 +24,7 @@ def execute(self, sql, bindings=None): except RuntimeError as e: raise DbtRuntimeError(str(e)) - + class DuckDBConnectionWrapper: def __init__(self, cursor, env): self._cursor = DuckDBCursorWrapper(cursor) @@ -132,7 +132,9 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): cursor.close() handle.close() - def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) -> None: + def store_relation( + self, plugin_name: str, target_config: utils.TargetConfig, just_register: bool = False + ) -> None: # some plugin have to be initialized on the fly? glue for example? if plugin_name not in self._plugins: @@ -141,24 +143,25 @@ def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) -> + ",".join(self._plugins.keys()) ) plugin = self._plugins[plugin_name] - - #e.g add file format to the location + + # e.g add file format to the location target_config = plugin.adapt_target_config(target_config) + if not just_register: + # export data with the store model + handle = self.handle() + cursor = handle.cursor() - #export data with the store model - handle = self.handle() - cursor = handle.cursor() - - df = cursor.sql(target_config.config.model.compiled_code) - #hand over Duckdb format that each plugin can choose which type of integration to use - plugin.store(df, target_config, cursor) + df = cursor.sql(target_config.config.model.compiled_code) + # hand over Duckdb format that each plugin can choose which type of integration to use - cursor.close() - handle.close() + plugin.store(df, target_config, cursor) + + cursor.close() + handle.close() # all are by default false, has to be turned on per plugin if plugin.can_be_upstream_referenced(): - #create df and view which can be referenced in the run following run + # create df and view which can be referenced in the run following run source_config = plugin.create_source_config(target_config) self.load_source(plugin_name, source_config) diff --git a/dbt/adapters/duckdb/impl.py b/dbt/adapters/duckdb/impl.py index 47596c9a..6d9ab959 100644 --- a/dbt/adapters/duckdb/impl.py +++ b/dbt/adapters/duckdb/impl.py @@ -100,6 +100,7 @@ def store_relation( path: str, format: str, config: RuntimeConfigObject, + just_register: bool, ) -> None: target_config = TargetConfig( relation=relation, @@ -107,7 +108,7 @@ def store_relation( config=config, location=TargetLocation(path=path, format=format), ) - DuckDBConnectionManager.env().store_relation(plugin_name, target_config) + DuckDBConnectionManager.env().store_relation(plugin_name, target_config, just_register) @available def external_root(self) -> str: diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py index a2bfb949..93e667c5 100644 --- a/dbt/adapters/duckdb/plugins/__init__.py +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -1,13 +1,16 @@ import importlib import os -from typing import Any, Dict, Optional +from typing import Any +from typing import Dict +from typing import Optional -from duckdb import DuckDBPyConnection, DuckDBPyRelation - -from dbt.dataclass_schema import dbtClassMixin +from duckdb import DuckDBPyConnection +from duckdb import DuckDBPyRelation from ..credentials import DuckDBCredentials -from ..utils import SourceConfig, TargetConfig +from ..utils import SourceConfig +from ..utils import TargetConfig +from dbt.dataclass_schema import dbtClassMixin class PluginConfig(dbtClassMixin): @@ -107,8 +110,9 @@ def configure_connection(self, conn: DuckDBPyConnection): :param conn: A DuckDBPyConnection instance to be configured. """ pass - #coursor is needed for the native plugin - def load(self, source_config: SourceConfig, coursor = None): + + # coursor is needed for the native plugin + def load(self, source_config: SourceConfig, coursor=None): """ Load data from a source config and return it as a DataFrame-like object that DuckDB can read. This method should be overridden by subclasses that @@ -118,15 +122,15 @@ def load(self, source_config: SourceConfig, coursor = None): :raises NotImplementedError: If this method is not implemented by a subclass. """ raise NotImplementedError(f"load method not implemented for {self.name}") - - # coursor is needed just for the native, we have to do it better + + # coursor is needed just for the native, we have to do it better # to had it over in some initalization? - def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor=None): raise NotImplementedError(f"store method not implemented for {self.name}") def create_source_config(self, target_config: TargetConfig) -> SourceConfig: raise NotImplementedError(f"store method not implemented for {self.name}") - + def can_be_upstream_referenced(self): return False @@ -142,6 +146,6 @@ def configure_cursor(self, cursor): def default_materialization(self): return "table" - + def adapt_target_config(self, target_config: TargetConfig) -> TargetConfig: return target_config diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index 4387426c..b8590efe 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -1,9 +1,10 @@ -from typing import Any, Dict +from typing import Any +from typing import Dict from deltalake import DeltaTable -from ..utils import SourceConfig from . import BasePlugin +from ..utils import SourceConfig class Plugin(BasePlugin): @@ -13,7 +14,7 @@ def initialize(self, config: Dict[str, Any]): def configure_cursor(self, cursor): pass - def load(self, source_config: SourceConfig, coursor = None): + def load(self, source_config: SourceConfig, coursor=None): if "delta_table_path" not in source_config: raise Exception("'delta_table_path' is a required argument for the delta table!") diff --git a/dbt/adapters/duckdb/plugins/excel.py b/dbt/adapters/duckdb/plugins/excel.py index ab2c8734..e59f2080 100644 --- a/dbt/adapters/duckdb/plugins/excel.py +++ b/dbt/adapters/duckdb/plugins/excel.py @@ -1,16 +1,15 @@ import os import pathlib -from threading import Lock -from typing import Any, Dict +from typing import Any +from typing import Dict import pandas as pd from duckdb import DuckDBPyRelation from pandas.io.formats import excel -from dbt.logger import GLOBAL_LOGGER as logger - -from ..utils import SourceConfig, TargetConfig from . import BasePlugin +from ..utils import SourceConfig +from ..utils import TargetConfig class Plugin(BasePlugin): @@ -25,7 +24,7 @@ def initialize(self, plugin_config: Dict[str, Any]): if "s3_region" in plugin_config: os.environ["AWS_DEFAULT_REGION"] = plugin_config["s3_region"] - def load(self, source_config: SourceConfig, coursor = None): + def load(self, source_config: SourceConfig, coursor=None): ext_location = source_config["external_location"] ext_location = ext_location.format(**source_config.as_dict()) if "s3" in ext_location: @@ -36,10 +35,10 @@ def load(self, source_config: SourceConfig, coursor = None): sheet_name = source_config.get("sheet_name", 0) return pd.read_excel(source_location, sheet_name=sheet_name) - def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor=None): plugin_output_config = self._config["output"] - #this writer doesnt take location but always something defined in the profile? + # this writer doesnt take location but always something defined in the profile? _excel_writer = pd.ExcelWriter( target_config.location.path, mode=plugin_output_config.get("mode", "w"), @@ -61,11 +60,11 @@ def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None sheet_name = (target_config.relation.identifier or "Sheet1")[0:31] target_output_config["sheet_name"] = sheet_name - df = df.df() # duckdb model to pandas dataframe + pd_df = df.df() # duckdb model to pandas dataframe if target_output_config.get("skip_empty_sheet", False) and df.shape[0] == 0: return try: - df.to_excel( + pd_df.to_excel( _excel_writer, sheet_name=target_output_config["sheet_name"], na_rep=target_output_config.get("na_rep", ""), @@ -85,40 +84,43 @@ def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None pd.DataFrame( [{"Error": target_output_config.get("ignore_sheet_too_large_error", str(ve))}] ).to_excel( - _excel_writer, sheet_name=target_output_config["sheet_name"], index=False + _excel_writer, + sheet_name=target_output_config["sheet_name"], + index=False, ) else: raise ve - + _excel_writer.close() def create_source_config(self, target_config: TargetConfig) -> SourceConfig: # in the reader we have just location and sheet_name, maybe we can add here more options - # but in the first place i would not recommend to upstream excel file + # but in the first place i would not recommend to upstream excel file # this works for a very simple case but not all of them meta = { "external_location": target_config.location.path, - "sheet_name": target_config.config.get("sheet_name",0) + "sheet_name": target_config.config.get("sheet_name", 0), } source_config = SourceConfig( - name= target_config.relation.name, - identifier= target_config.relation.identifier, + name=target_config.relation.name, + identifier=target_config.relation.identifier, schema=target_config.relation.schema, database=target_config.relation.database, - meta= meta, - tags= [], + meta=meta, + tags=[], ) return source_config - def can_be_upstream_referenced(self): return True - + def adapt_target_config(self, target_config: TargetConfig) -> TargetConfig: - if target_config.location.format == "default": - target_config.location.format = "xlsx" + if target_config.location.format == "default": + target_config.location.format = "xlsx" - target_config.location.path = target_config.location.path + "." + target_config.location.format + target_config.location.path = ( + target_config.location.path + "." + target_config.location.format + ) return target_config diff --git a/dbt/adapters/duckdb/plugins/glue.py b/dbt/adapters/duckdb/plugins/glue.py index 5ddc8e05..64c33ca9 100644 --- a/dbt/adapters/duckdb/plugins/glue.py +++ b/dbt/adapters/duckdb/plugins/glue.py @@ -1,21 +1,22 @@ -from typing import Any, Dict, List, Optional, Sequence +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import Sequence import boto3 from duckdb import DuckDBPyRelation from mypy_boto3_glue import GlueClient -from mypy_boto3_glue.type_defs import ( - ColumnTypeDef, - GetTableResponseTypeDef, - PartitionInputTypeDef, - SerDeInfoTypeDef, - StorageDescriptorTypeDef, - TableInputTypeDef, -) +from mypy_boto3_glue.type_defs import ColumnTypeDef +from mypy_boto3_glue.type_defs import GetTableResponseTypeDef +from mypy_boto3_glue.type_defs import PartitionInputTypeDef +from mypy_boto3_glue.type_defs import SerDeInfoTypeDef +from mypy_boto3_glue.type_defs import StorageDescriptorTypeDef +from mypy_boto3_glue.type_defs import TableInputTypeDef -from dbt.adapters.base.column import Column - -from ..utils import TargetConfig from . import BasePlugin +from ..utils import TargetConfig +from dbt.adapters.base.column import Column class UnsupportedFormatType(Exception): @@ -331,7 +332,7 @@ def initialize(self, config: Dict[str, Any]): self.database = config.get("glue_database", "default") self.delimiter = config.get("delimiter", ",") - def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor=None): assert target_config.location is not None assert target_config.relation.identifier is not None table: str = target_config.relation.identifier diff --git a/dbt/adapters/duckdb/plugins/gsheet.py b/dbt/adapters/duckdb/plugins/gsheet.py index 150d2f83..2df7ca4c 100644 --- a/dbt/adapters/duckdb/plugins/gsheet.py +++ b/dbt/adapters/duckdb/plugins/gsheet.py @@ -1,11 +1,14 @@ from dataclasses import dataclass -from typing import Any, Dict, Literal +from typing import Any +from typing import Dict +from typing import Literal import gspread import pandas as pd +from . import BasePlugin +from . import PluginConfig from ..utils import SourceConfig -from . import BasePlugin, PluginConfig @dataclass @@ -24,7 +27,7 @@ def initialize(self, config: Dict[str, Any]): self._config = GSheetConfig.from_dict(config) self._gc = self._config.client() - def load(self, source_config: SourceConfig, coursor = None): + def load(self, source_config: SourceConfig, coursor=None): doc = None if "title" in source_config: doc = self._gc.open(source_config["title"]) diff --git a/dbt/adapters/duckdb/plugins/iceberg.py b/dbt/adapters/duckdb/plugins/iceberg.py index 99460b81..e2cb33f2 100644 --- a/dbt/adapters/duckdb/plugins/iceberg.py +++ b/dbt/adapters/duckdb/plugins/iceberg.py @@ -1,9 +1,10 @@ -from typing import Any, Dict +from typing import Any +from typing import Dict import pyiceberg.catalog -from ..utils import SourceConfig from . import BasePlugin +from ..utils import SourceConfig class Plugin(BasePlugin): @@ -13,7 +14,7 @@ def initialize(self, config: Dict[str, Any]): catalog = config.pop("catalog") self._catalog = pyiceberg.catalog.load_catalog(catalog, **config) - def load(self, source_config: SourceConfig, coursor = None): + def load(self, source_config: SourceConfig, coursor=None): table_format = source_config.get("iceberg_table", "{schema}.{identifier}") table_name = table_format.format(**source_config.as_dict()) table = self._catalog.load_table(table_name) diff --git a/dbt/adapters/duckdb/plugins/native.py b/dbt/adapters/duckdb/plugins/native.py index 619abaa9..908ccba4 100644 --- a/dbt/adapters/duckdb/plugins/native.py +++ b/dbt/adapters/duckdb/plugins/native.py @@ -1,62 +1,69 @@ import os -from typing import Any, Dict +from typing import Any +from typing import Dict -import duckdb from duckdb import DuckDBPyRelation -from ..utils import SourceConfig, TargetConfig from . import BasePlugin +from ..utils import SourceConfig +from ..utils import TargetConfig -# here will be parquet,csv,json implementation, +# here will be parquet,csv,json implementation, # this plugin should be default one if none is specified # we can change the name of the plugin + class Plugin(BasePlugin): def initialize(self, config: Dict[str, Any]): pass def configure_cursor(self, cursor): pass - + def default_materialization(self): return "view" - -#this one can be better not to go over some other format and df but directly - def load(self, source_config: SourceConfig, coursor = None): - location = external_read_location(source_config.meta.get("location").get("path"), - source_config.meta.get("config").get("options", {})) + + # this one can be better not to go over some other format and df but directly + # https://stackoverflow.com/questions/78055585/how-to-reference-duckdbpyrelation-from-another-connection + def load(self, source_config: SourceConfig, coursor=None): + location = external_read_location( + source_config.meta.get("location", "").get("path"), + source_config.meta.get("config", {}).get("options", {}), + ) return coursor.sql(f"SELECT * FROM '{location}'").arrow() def can_be_upstream_referenced(self): return True - + def create_source_config(self, target_config: TargetConfig) -> SourceConfig: source_config = SourceConfig( - name= target_config.relation.name, - identifier= target_config.relation.identifier, + name=target_config.relation.name, + identifier=target_config.relation.identifier, schema=target_config.relation.schema, database=target_config.relation.database, - meta= target_config.as_dict(), - tags= [], + meta=target_config.as_dict(), + tags=[], ) return source_config - - def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None): + + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor=None): location = target_config.location.path options = external_write_options(location, target_config.config.get("options", {})) cursor.sql(f"COPY (SELECT * FROM df) to '{location}' ({options})") def adapt_target_config(self, target_config: TargetConfig) -> TargetConfig: - #setup the location with default to parquet if not partitions_by - if target_config.location.format == "default": - target_config.location.format = "parquet" + # setup the location with default to parquet if not partitions_by + if target_config.location.format == "default": + target_config.location.format = "parquet" if "partition_by" not in target_config.config.get("options", {}): - target_config.location.path = target_config.location.path + "." + target_config.location.format + target_config.location.path = ( + target_config.location.path + "." + target_config.location.format + ) return target_config -# 1 to 1 from adapter +# 1 to 1 from adapter # TODO those can be maybe better written def external_write_options(write_location: str, rendered_options: dict) -> str: if "format" not in rendered_options: @@ -90,7 +97,8 @@ def external_write_options(write_location: str, rendered_options: dict) -> str: ret.append(f"{k} {v}") return ", ".join(ret) -# 1 to 1 from adapter + +# 1 to 1 from adapter def external_read_location(write_location: str, rendered_options: dict) -> str: if rendered_options.get("partition_by"): globs = [write_location, "*"] diff --git a/dbt/adapters/duckdb/plugins/postgres.py b/dbt/adapters/duckdb/plugins/postgres.py index e12c79d3..f282d0fb 100644 --- a/dbt/adapters/duckdb/plugins/postgres.py +++ b/dbt/adapters/duckdb/plugins/postgres.py @@ -1,12 +1,13 @@ -from typing import Any, Dict +from typing import Any +from typing import Dict from duckdb import DuckDBPyConnection from . import BasePlugin -## This is maybe what we can deprecate? Native way in the config -#is better or there is some need? +## This is maybe what we can deprecate? Native way in the config +# is better or there is some need? class Plugin(BasePlugin): def initialize(self, config: Dict[str, Any]): self._dsn = config.get("dsn") diff --git a/dbt/adapters/duckdb/plugins/sqlalchemy.py b/dbt/adapters/duckdb/plugins/sqlalchemy.py index 59c80992..9273bc08 100644 --- a/dbt/adapters/duckdb/plugins/sqlalchemy.py +++ b/dbt/adapters/duckdb/plugins/sqlalchemy.py @@ -1,21 +1,25 @@ -from typing import Any, Dict +from typing import Any +from typing import Dict import pandas as pd from duckdb import DuckDBPyRelation -from sqlalchemy import create_engine, text +from sqlalchemy import create_engine +from sqlalchemy import text -from ..utils import SourceConfig, TargetConfig from . import BasePlugin +from ..utils import SourceConfig +from ..utils import TargetConfig -# overall i would recommend to use attach in duckdb but this can be nice for some cases where -# native one is not supported e.g starrock +# overall i would recommend to use attach in duckdb but this can be nice for some cases where +# native one is not supported e.g starrock -# here we have to requestion the names of the tables + +# here we have to requestion the names of the tables class Plugin(BasePlugin): def initialize(self, plugin_config: Dict[str, Any]): self.engine = create_engine(plugin_config.pop("connection_url"), **plugin_config) - def load(self, source_config: SourceConfig, coursor = None): + def load(self, source_config: SourceConfig, coursor=None): if "query" in source_config: query = source_config["query"] query = query.format(**source_config.as_dict()) @@ -31,7 +35,7 @@ def load(self, source_config: SourceConfig, coursor = None): with self.engine.connect() as conn: return pd.read_sql_table(table, con=conn) - def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor=None): # first, load the data frame from the external location pd_df = df.df() table_name = target_config.relation.identifier @@ -40,18 +44,16 @@ def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None def can_be_upstream_referenced(self): return True - + def create_source_config(self, target_config: TargetConfig) -> SourceConfig: - meta = { - "table": target_config.relation.identifier - } + meta = {"table": target_config.relation.identifier} source_config = SourceConfig( - name= target_config.relation.name, - identifier= target_config.relation.identifier, - schema=target_config.relation.schema, - database=target_config.relation.database, - meta= meta, - tags= [], + name=target_config.relation.get("name", ""), + identifier=target_config.relation.get("identifier", ""), + schema=target_config.relation.get("schema", ""), + database=target_config.relation.get("database", ""), + meta=meta, + tags=[], ) return source_config diff --git a/dbt/include/duckdb/macros/adapters.sql b/dbt/include/duckdb/macros/adapters.sql index 34af4346..3d59aef9 100644 --- a/dbt/include/duckdb/macros/adapters.sql +++ b/dbt/include/duckdb/macros/adapters.sql @@ -209,9 +209,9 @@ def materialize(df, con): {%- endcall %} {% endmacro %} -{% macro store_relation(plugin, relation, location, format, config) -%} +{% macro store_relation(plugin, relation, location, format, config, just_register) -%} {%- set column_list = adapter.get_columns_in_relation(relation) -%} - {% do adapter.store_relation(plugin, relation, column_list, location, format, config) %} + {% do adapter.store_relation(plugin, relation, column_list, location, format, config, just_register) %} {% endmacro %} {% macro render_write_options(config) -%} diff --git a/dbt/include/duckdb/macros/materializations/external.sql b/dbt/include/duckdb/macros/materializations/external.sql index ab476a55..61261ecc 100644 --- a/dbt/include/duckdb/macros/materializations/external.sql +++ b/dbt/include/duckdb/macros/materializations/external.sql @@ -16,16 +16,15 @@ {%- set format = config.get('format', 'default') -%} {%- set plugin_name = config.get('plugin', 'native') -%} - {% do store_relation(plugin_name, target_relation, location, format, config) %} - + {% do store_relation(plugin_name, target_relation, location, format, config, False) %} -- in this moment target should exists as a view so we can setup grants or docu {{ run_hooks(post_hooks, inside_transaction=True) }} -- `COMMIT` happens here {{ adapter.commit() }} - + {{ run_hooks(post_hooks, inside_transaction=False) }} - {{ return({'relations': [target_relation]}) }} + {{ return({'relations': [target_relation]}) }} {% endmaterialization %} diff --git a/dbt/include/duckdb/macros/utils/upstream.sql b/dbt/include/duckdb/macros/utils/upstream.sql index edce959b..274137d3 100644 --- a/dbt/include/duckdb/macros/utils/upstream.sql +++ b/dbt/include/duckdb/macros/utils/upstream.sql @@ -1,40 +1,45 @@ --- todo: understand this function --- we call to create all views over the location but this doesnt work for df registration --- so we have to call some adapter function which takes the config and register the df and create view over that +-- todo: this would not work if two external are in a row {%- macro register_upstream_external_models() -%} {% if execute %} {% set upstream_nodes = {} %} -{% set upstream_schemas = {} %} {% for node in selected_resources %} - {% for upstream_node in graph['nodes'][node]['depends_on']['nodes'] %} - {% if upstream_node not in upstream_nodes and upstream_node not in selected_resources %} + {% for upstream_node in graph['nodes'][node]['depends_on']['nodes'] + if upstream_node not in upstream_nodes and upstream_node not in selected_resources + and graph['nodes'].get(upstream_node).resource_type in ('model', 'seed') + and graph['nodes'].get(upstream_node).config.materialized=='external' + %} {% do upstream_nodes.update({upstream_node: None}) %} {% set upstream = graph['nodes'].get(upstream_node) %} - {% if upstream - and upstream.resource_type in ('model', 'seed') - and upstream.config.materialized=='external' - %} - {%- set upstream_rel = api.Relation.create( - database=upstream['database'], - schema=upstream['schema'], - identifier=upstream['alias'] - ) -%} - {%- set location = upstream.config.get('location', external_location(upstream_rel, upstream.config)) -%} - {%- set rendered_options = render_write_options(upstream.config) -%} - {%- set upstream_location = adapter.external_read_location(location, rendered_options) -%} - {% if upstream_rel.schema not in upstream_schemas %} - {% call statement('main', language='sql') -%} - create schema if not exists {{ upstream_rel.schema }} - {%- endcall %} - {% do upstream_schemas.update({upstream_rel.schema: None}) %} - {% endif %} - {% call statement('main', language='sql') -%} - create or replace view {{ upstream_rel }} as ( - select * from '{{ upstream_location }}' - ); - {%- endcall %} - {%- endif %} - {% endif %} + {%- set upstream_rel = api.Relation.create( + database=upstream['database'], + schema=upstream['schema'], + identifier=upstream['alias'] + ) -%} + {%- set rendered_options = render_write_options(config) -%} + {%- set location = upstream.config.get('location', external_location(upstream_rel, upstream.config)) -%} + {%- set format = upstream.config.get('format', 'default') -%} + {%- set plugin_name = upstream.config.get('plugin', 'native') -%} + {% do store_relation(plugin_name, upstream_rel, location, format, upstream.config, True) %} + {% endfor %} + {% for upstream_node in graph['nodes'][node]['depends_on']['nodes'] + if upstream_node not in upstream_nodes and upstream_node not in selected_resources + and graph['nodes'].get(upstream_node) + and graph['nodes'].get(upstream_node).resource_type in ('model', 'seed') + %} + {% set upstream = graph['nodes'].get(upstream_node) %} + {%- set upstream_rel = api.Relation.create( + database=upstream['database'], + schema=upstream['schema'], + identifier=upstream['alias'] + ) -%} + {% call statement('main', language='sql') -%} + create schema if not exists {{ upstream_rel.schema }} + {%- endcall %} + {% call statement('main', language='sql') -%} + create or replace {{upstream.config.materialized }} {{ upstream_rel }} as ( + select * from ({{ upstream.raw_code }}) + ); + {%- endcall %} {% endfor %} {% endfor %} {% do adapter.commit() %} diff --git a/dbt/include/duckdb/macros/utils/upstream_copy.sql b/dbt/include/duckdb/macros/utils/upstream_copy.sql new file mode 100644 index 00000000..ef62a8bf --- /dev/null +++ b/dbt/include/duckdb/macros/utils/upstream_copy.sql @@ -0,0 +1,31 @@ +-- todo: this would not work if two external are in a row +{%- macro register_upstream_external_model_copy() -%} +{% if execute %} +{% set upstream_nodes = {} %} +{% for node in selected_resources %} + {% for upstream_node in graph['nodes'][node]['depends_on']['nodes'] %} + {% if upstream_node not in upstream_nodes and upstream_node not in selected_resources %} + {% do upstream_nodes.update({upstream_node: None}) %} + {% set upstream = graph['nodes'].get(upstream_node) %} + {% if upstream + and upstream.resource_type in ('model', 'seed') + and upstream.config.materialized=='external' + %} + {%- set upstream_rel = api.Relation.create( + database=upstream['database'], + schema=upstream['schema'], + identifier=upstream['alias'] + ) -%} + + {%- set rendered_options = render_write_options(config) -%} + {%- set location = upstream.config.get('location', external_location(upstream_rel, upstream.config)) -%} + {%- set format = upstream.config.get('format', 'default') -%} + {%- set plugin_name = upstream.config.get('plugin', 'native') -%} + {% do store_relation(plugin_name, upstream_rel, location, format, upstream.config, True) %} + {%- endif %} + {% endif %} + {% endfor %} +{% endfor %} +{% do adapter.commit() %} +{% endif %} +{%- endmacro -%} diff --git a/tests/functional/adapter/test_rematerialize.py b/tests/functional/adapter/test_rematerialize.py index 5300f7c2..1e5d6111 100644 --- a/tests/functional/adapter/test_rematerialize.py +++ b/tests/functional/adapter/test_rematerialize.py @@ -45,12 +45,11 @@ def dbt_profile_target(self, dbt_profile_target, tmp_path_factory): dbt_profile_target["plugins"] = [{"module": "native"}] return dbt_profile_target - + @pytest.fixture(scope="class") def project_config_update(self): return { "name": "base", - "models": {"+materialized": "external"}, "on-run-start": ["{{ register_upstream_external_models() }}"], } @@ -74,10 +73,13 @@ def test_run(self, project): "run", "--select", "downstream_model other_downstream_model downstream_of_partition_model", + "-d", ] ) # really makes sure we have created the downstream model relation = relation_from_name(project.adapter, "downstream_of_partition_model") - result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + result = project.run_sql( + f"select count(*) as num_rows from {relation}", fetch="one" + ) assert result[0] == 5 diff --git a/tests/functional/plugins/test_native.py b/tests/functional/plugins/test_native.py index ab3cfaa5..0248df85 100644 --- a/tests/functional/plugins/test_native.py +++ b/tests/functional/plugins/test_native.py @@ -91,7 +91,8 @@ def profiles_config_update(self, dbt_profile_target,tmp_path_factory): "plugins": [ {"module": "native"} ], - "external_root" : f'{extroot}' + "external_root" : f'{extroot}', + "threads" : 8 } }, "target": "dev", diff --git a/tests/functional/plugins/test_sqlite.py b/tests/functional/plugins/test_sqlite.py index 2ce1c626..534a4327 100644 --- a/tests/functional/plugins/test_sqlite.py +++ b/tests/functional/plugins/test_sqlite.py @@ -1,6 +1,8 @@ -import pytest import sqlite3 from pathlib import Path + +import pytest + from dbt.tests.util import ( run_dbt, ) @@ -12,10 +14,9 @@ class TestSQLitePlugin: - @pytest.fixture(scope="class") def sqlite_test_db(self): - path = '/tmp/satest.db' + path = "/tmp/satest.db" Path(path).unlink(missing_ok=True) db = sqlite3.connect(path) cursor = db.cursor() @@ -38,9 +39,7 @@ def profiles_config_update(self, dbt_profile_target, sqlite_test_db): "dev": { "type": "duckdb", "path": dbt_profile_target.get("path", ":memory:"), - "attach": [ - {'path': sqlite_test_db} - ] + "attach": [{"path": sqlite_test_db}], } }, "target": "dev", @@ -51,7 +50,6 @@ def profiles_config_update(self, dbt_profile_target, sqlite_test_db): def models(self, test_data_path): return { "read_write.sql": model_sql, - } def test_sqlite_plugin(self, project): @@ -60,5 +58,3 @@ def test_sqlite_plugin(self, project): res = project.run_sql("SELECT COUNT(1) FROM satest.read_write", fetch="one") assert res[0] == 2 - - From 489b0209c6bd2389c32213b43b15954c38c38b88 Mon Sep 17 00:00:00 2001 From: Aleksandar Date: Sun, 25 Feb 2024 17:31:12 +0000 Subject: [PATCH 09/13] delete upstream copy --- .../duckdb/macros/utils/upstream_copy.sql | 31 ------------------- 1 file changed, 31 deletions(-) delete mode 100644 dbt/include/duckdb/macros/utils/upstream_copy.sql diff --git a/dbt/include/duckdb/macros/utils/upstream_copy.sql b/dbt/include/duckdb/macros/utils/upstream_copy.sql deleted file mode 100644 index ef62a8bf..00000000 --- a/dbt/include/duckdb/macros/utils/upstream_copy.sql +++ /dev/null @@ -1,31 +0,0 @@ --- todo: this would not work if two external are in a row -{%- macro register_upstream_external_model_copy() -%} -{% if execute %} -{% set upstream_nodes = {} %} -{% for node in selected_resources %} - {% for upstream_node in graph['nodes'][node]['depends_on']['nodes'] %} - {% if upstream_node not in upstream_nodes and upstream_node not in selected_resources %} - {% do upstream_nodes.update({upstream_node: None}) %} - {% set upstream = graph['nodes'].get(upstream_node) %} - {% if upstream - and upstream.resource_type in ('model', 'seed') - and upstream.config.materialized=='external' - %} - {%- set upstream_rel = api.Relation.create( - database=upstream['database'], - schema=upstream['schema'], - identifier=upstream['alias'] - ) -%} - - {%- set rendered_options = render_write_options(config) -%} - {%- set location = upstream.config.get('location', external_location(upstream_rel, upstream.config)) -%} - {%- set format = upstream.config.get('format', 'default') -%} - {%- set plugin_name = upstream.config.get('plugin', 'native') -%} - {% do store_relation(plugin_name, upstream_rel, location, format, upstream.config, True) %} - {%- endif %} - {% endif %} - {% endfor %} -{% endfor %} -{% do adapter.commit() %} -{% endif %} -{%- endmacro -%} From 0115f623b3ece855aab8092c7fa796071c974d92 Mon Sep 17 00:00:00 2001 From: Aleksandar Date: Sun, 3 Mar 2024 00:34:55 +0000 Subject: [PATCH 10/13] add default native plugin; add delta write plugin; hack native plugin around df --- dbt/adapters/duckdb/environments/__init__.py | 29 ++-- dbt/adapters/duckdb/environments/local.py | 39 +++-- dbt/adapters/duckdb/plugins/delta.py | 145 +++++++++++++++++- dbt/adapters/duckdb/plugins/native.py | 16 +- .../functional/adapter/test_rematerialize.py | 2 - .../functional/adapter/test_write_options.py | 10 +- tests/functional/plugins/test_delta_write.py | 145 ++++++++++++++++++ tests/functional/plugins/test_native.py | 26 ++-- 8 files changed, 353 insertions(+), 59 deletions(-) create mode 100644 tests/functional/plugins/test_delta_write.py diff --git a/dbt/adapters/duckdb/environments/__init__.py b/dbt/adapters/duckdb/environments/__init__.py index ebe20d12..048dcaf6 100644 --- a/dbt/adapters/duckdb/environments/__init__.py +++ b/dbt/adapters/duckdb/environments/__init__.py @@ -4,19 +4,17 @@ import sys import tempfile import time -from typing import Dict -from typing import List -from typing import Optional +from typing import Dict, List, Optional import duckdb -from ..credentials import DuckDBCredentials -from ..plugins import BasePlugin -from ..utils import SourceConfig -from ..utils import TargetConfig from dbt.contracts.connection import AdapterResponse from dbt.exceptions import DbtRuntimeError +from ..credentials import DuckDBCredentials, PluginConfig +from ..plugins import BasePlugin +from ..utils import SourceConfig, TargetConfig + def _ensure_event_loop(): """ @@ -95,7 +93,9 @@ def handle(self): pass @abc.abstractmethod - def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: + def submit_python_job( + self, handle, parsed_model: dict, compiled_code: str + ) -> AdapterResponse: pass @abc.abstractmethod @@ -208,16 +208,23 @@ def initialize_cursor( def initialize_plugins(cls, creds: DuckDBCredentials) -> Dict[str, BasePlugin]: ret = {} base_config = creds.settings or {} - for plugin_def in creds.plugins or []: + for plugin_def in creds.plugins or [] + [PluginConfig("native")]: config = base_config.copy() config.update(plugin_def.config or {}) - plugin = BasePlugin.create(plugin_def.module, config=config, alias=plugin_def.alias) + plugin = BasePlugin.create( + plugin_def.module, config=config, alias=plugin_def.alias + ) ret[plugin.name] = plugin return ret @classmethod def run_python_job( - cls, con, load_df_function, identifier: str, compiled_code: str, creds: DuckDBCredentials + cls, + con, + load_df_function, + identifier: str, + compiled_code: str, + creds: DuckDBCredentials, ): mod_file = tempfile.NamedTemporaryFile(suffix=".py", delete=False) mod_file.write(compiled_code.lstrip().encode("utf-8")) diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index 88a83b6d..292d09ba 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -1,11 +1,11 @@ import threading -from . import Environment -from .. import credentials -from .. import utils from dbt.contracts.connection import AdapterResponse from dbt.exceptions import DbtRuntimeError +from .. import credentials, utils +from . import Environment + class DuckDBCursorWrapper: def __init__(self, cursor): @@ -48,7 +48,9 @@ def __init__(self, credentials: credentials.DuckDBCredentials): self.handle_count = 0 self.lock = threading.RLock() self._keep_open = ( - credentials.keep_open or credentials.path == ":memory:" or credentials.is_motherduck + credentials.keep_open + or credentials.path == ":memory:" + or credentials.is_motherduck ) self._REGISTERED_DF: dict = {} @@ -70,7 +72,9 @@ def handle(self): ) return DuckDBConnectionWrapper(cursor, self) - def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: + def submit_python_job( + self, handle, parsed_model: dict, compiled_code: str + ) -> AdapterResponse: con = handle.cursor() def ldf(table_name): @@ -105,7 +109,9 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): params.append(source_config.database) if cursor.execute(q, params).fetchone()[0]: if save_mode == "error_if_exists": - raise Exception(f"Source {source_config.table_name()} already exists!") + raise Exception( + f"Source {source_config.table_name()} already exists!" + ) else: # Nothing to do (we ignore the existing table) return @@ -118,12 +124,18 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): source_table_name = source_config.table_name() df_name = source_table_name.replace(".", "_") + "_df" - cursor.register(df_name, df) + # hack for native plugin till we can't register a native df + # if native plugin -> set df_name = df which is string + # this native view/table doesnt have to be registered for each connection + if plugin_name == "native": + df_name = df + else: + cursor.register(df_name, df) - if materialization == "view": - # save to df instance to register on each cursor creation - with self.lock: - self._REGISTERED_DF[df_name] = df + if materialization == "view": + # save to df instance to register on each cursor creation + with self.lock: + self._REGISTERED_DF[df_name] = df cursor.execute( f"CREATE OR REPLACE {materialization} {source_table_name} AS SELECT * FROM {df_name}" @@ -133,7 +145,10 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): handle.close() def store_relation( - self, plugin_name: str, target_config: utils.TargetConfig, just_register: bool = False + self, + plugin_name: str, + target_config: utils.TargetConfig, + just_register: bool = False, ) -> None: # some plugin have to be initialized on the fly? glue for example? diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index b8590efe..258e515b 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -1,10 +1,12 @@ -from typing import Any -from typing import Dict +import os +from typing import Any, Dict -from deltalake import DeltaTable +import pyarrow.compute as pc +from deltalake import DeltaTable, write_deltalake +from duckdb import DuckDBPyRelation +from ..utils import SourceConfig, TargetConfig from . import BasePlugin -from ..utils import SourceConfig class Plugin(BasePlugin): @@ -16,7 +18,9 @@ def configure_cursor(self, cursor): def load(self, source_config: SourceConfig, coursor=None): if "delta_table_path" not in source_config: - raise Exception("'delta_table_path' is a required argument for the delta table!") + raise Exception( + "'delta_table_path' is a required argument for the delta table!" + ) table_path = source_config["delta_table_path"] storage_options = source_config.get("storage_options", None) @@ -43,6 +47,137 @@ def load(self, source_config: SourceConfig, coursor=None): def default_materialization(self): return "view" + def adapt_target_config(self, target_config: TargetConfig) -> TargetConfig: + return target_config + + def create_source_config(self, target_config: TargetConfig) -> SourceConfig: + meta = { + "delta_table_path": target_config.location.path, + "storage_options": target_config.config.get("storage_options", {}), + } + + source_config = SourceConfig( + name=target_config.relation.name, + identifier=target_config.relation.identifier, + schema=target_config.relation.schema, + database=target_config.relation.database, + meta=meta, + tags=[], + ) + return source_config + + def can_be_upstream_referenced(self): + return True + + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor=None): + mode = target_config.config.get("mode", "overwrite") + table_path = target_config.location.path + storage_options = target_config.config.get("storage_options", {}) + arrow_df = df.fetch_arrow_reader() + + if mode == "overwrite_partition": + partition_key = target_config.config.get("partition_key", None) + if not partition_key: + raise Exception( + "'partition_key' has to be defined when mode 'overwrite_partition'!" + ) + + if isinstance(partition_key, str): + partition_key = [partition_key] + + partition_dict = [] + for each_key in partition_key: + unique_key_array = pc.unique(arrow_df[each_key]) + + if len(unique_key_array) == 1: + partition_dict.append((each_key, str(unique_key_array[0]))) + else: + raise Exception( + f"'{each_key}' column has not one unique value, values are: {str(unique_key_array)}" + ) + create_insert_partition( + table_path, arrow_df, partition_dict, storage_options + ) + elif mode == "merge": + # very slow -> https://github.com/delta-io/delta-rs/issues/1846 + unique_key = target_config.config.get("unique_key", None) + if not unique_key: + raise Exception("'unique_key' has to be defined when mode 'merge'!") + if isinstance(unique_key, str): + unique_key = [unique_key] + + predicate_stm = " and ".join( + [ + f'source."{each_unique_key}" = target."{each_unique_key}"' + for each_unique_key in unique_key + ] + ) + + try: + target_dt = DeltaTable(table_path, storage_options=storage_options) + except Exception: + # TODO handle this better + write_deltalake( + table_or_uri=table_path, + data=arrow_df, + storage_options=storage_options, + ) + + target_dt = DeltaTable(table_path, storage_options=storage_options) + # TODO there is a problem if the column name is uppercase + target_dt.merge( + source=arrow_df, + predicate=predicate_stm, + source_alias="source", + target_alias="target", + ).when_not_matched_insert_all().when_matched_update_all().execute() + else: + write_deltalake( + table_or_uri=table_path, + data=arrow_df, + mode=mode, + storage_options=storage_options, + ) + + +def table_exists(table_path, storage_options): + # this is bad, i have to find the way to see if there is table behind path + try: + DeltaTable(table_path, storage_options=storage_options) + except Exception: + return False + return True + + +## TODO +# add partition writing +# add optimization, vacumm options to automatically run before each run ? +# can deltars optimize if the data is bigger then memory? + + +def create_insert_partition(table_path, data, partitions, storage_options): + """create a new delta table on the path or overwrite existing partition""" + + if table_exists(table_path, storage_options): + partition_expr = [ + (partition_name, "=", partition_value) + for (partition_name, partition_value) in partitions + ] + print( + f"Overwriting delta table under: {table_path} \nwith partition expr: {partition_expr}" + ) + write_deltalake( + table_path, data, partition_filters=partition_expr, mode="overwrite" + ) + else: + partitions = [ + partition_name for (partition_name, partition_value) in partitions + ] + print( + f"Creating delta table under: {table_path} \nwith partitions: {partitions}" + ) + write_deltalake(table_path, data, partition_by=partitions) + # Future # TODO add databricks catalog diff --git a/dbt/adapters/duckdb/plugins/native.py b/dbt/adapters/duckdb/plugins/native.py index 908ccba4..04554e7f 100644 --- a/dbt/adapters/duckdb/plugins/native.py +++ b/dbt/adapters/duckdb/plugins/native.py @@ -1,12 +1,10 @@ import os -from typing import Any -from typing import Dict +from typing import Any, Dict from duckdb import DuckDBPyRelation +from ..utils import SourceConfig, TargetConfig from . import BasePlugin -from ..utils import SourceConfig -from ..utils import TargetConfig # here will be parquet,csv,json implementation, # this plugin should be default one if none is specified @@ -30,7 +28,7 @@ def load(self, source_config: SourceConfig, coursor=None): source_config.meta.get("location", "").get("path"), source_config.meta.get("config", {}).get("options", {}), ) - return coursor.sql(f"SELECT * FROM '{location}'").arrow() + return f"(SELECT * FROM '{location}')" def can_be_upstream_referenced(self): return True @@ -48,7 +46,9 @@ def create_source_config(self, target_config: TargetConfig) -> SourceConfig: def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor=None): location = target_config.location.path - options = external_write_options(location, target_config.config.get("options", {})) + options = external_write_options( + location, target_config.config.get("options", {}) + ) cursor.sql(f"COPY (SELECT * FROM df) to '{location}' ({options})") def adapt_target_config(self, target_config: TargetConfig) -> TargetConfig: @@ -104,5 +104,7 @@ def external_read_location(write_location: str, rendered_options: dict) -> str: globs = [write_location, "*"] partition_by = str(rendered_options.get("partition_by")) globs.extend(["*"] * len(partition_by.split(","))) - return ".".join(["/".join(globs), str(rendered_options.get("format", "parquet"))]) + return ".".join( + ["/".join(globs), str(rendered_options.get("format", "parquet"))] + ) return write_location diff --git a/tests/functional/adapter/test_rematerialize.py b/tests/functional/adapter/test_rematerialize.py index 1e5d6111..89f41bc0 100644 --- a/tests/functional/adapter/test_rematerialize.py +++ b/tests/functional/adapter/test_rematerialize.py @@ -42,8 +42,6 @@ def dbt_profile_target(self, dbt_profile_target, tmp_path_factory): extroot = str(tmp_path_factory.getbasetemp() / "rematerialize") os.mkdir(extroot) dbt_profile_target["external_root"] = extroot - dbt_profile_target["plugins"] = [{"module": "native"}] - return dbt_profile_target @pytest.fixture(scope="class") diff --git a/tests/functional/adapter/test_write_options.py b/tests/functional/adapter/test_write_options.py index 33d0a166..263aecc9 100644 --- a/tests/functional/adapter/test_write_options.py +++ b/tests/functional/adapter/test_write_options.py @@ -39,15 +39,12 @@ class BaseExternalMaterializations: - @pytest.fixture(scope="class") def dbt_profile_target(self, dbt_profile_target, tmp_path_factory): extroot = str(tmp_path_factory.getbasetemp() / "write_options") os.mkdir(extroot) dbt_profile_target["external_root"] = extroot - ##todo add native but delete after setted to default - dbt_profile_target["plugins"] = [{"module": "native"}] - #dbt_profile_target["threads"] = 5 + # dbt_profile_target["threads"] = 5 return dbt_profile_target @pytest.fixture(scope="class") @@ -74,7 +71,6 @@ def project_config_update(self): } def test_base(self, project): - # seed command results = run_dbt(["seed"]) # seed result length @@ -110,7 +106,9 @@ def test_base(self, project): # base table rowcount relation = relation_from_name(project.adapter, "base") - result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + result = project.run_sql( + f"select count(*) as num_rows from {relation}", fetch="one" + ) assert result[0] == 10 # relations_equal diff --git a/tests/functional/plugins/test_delta_write.py b/tests/functional/plugins/test_delta_write.py new file mode 100644 index 00000000..0c9d35df --- /dev/null +++ b/tests/functional/plugins/test_delta_write.py @@ -0,0 +1,145 @@ +import pytest + +from dbt.tests.util import ( + check_relations_equal, + run_dbt, +) + +ref1 = """ +select 2 as a, 'test' as b +""" + +delta1 = """ + {{{{ config( + materialized='external', + plugin = 'delta', + location = '{root_path}/delta1', + ) }}}} + select * from {{{{ref('ref1')}}}} +""" + +upstream_delta1 = """ + {{{{ config( + materialized='external', + plugin = 'delta', + location = '{root_path}/upstream_delta1', + ) }}}} + select * from {{{{ref('delta1')}}}} +""" + +upstream_delta1 = """ + {{{{ config( + materialized='external', + plugin = 'delta', + location = '{root_path}/upstream_delta1', + ) }}}} + select * from {{{{ref('delta1')}}}} +""" + +upstream_duckdb1 = """ +{{ config( + materialized='table', + ) }} +select * from {{ref('upstream_delta1')}} +""" + +delta2 = """ + {{{{ config( + materialized='external', + plugin = 'delta', + mode = "append", + location = '{root_path}/delta2', + ) }}}} + select * from {{{{ref('ref1')}}}} +""" + +upstream_delta2 = """ + {{{{ config( + materialized='external', + plugin = 'delta', + location = '{root_path}/upstream_delta2', + ) }}}} + select * from {{{{ref('delta2')}}}} +""" + + +ref2 = """ +select 2 as a, 'test1' as b +UNION ALL +select 3 as a, 'test2' as b +""" + +delta3 = """ + {{{{ config( + materialized='external', + plugin = 'delta', + location = '{root_path}/delta1', + mode = 'merge', + unique_key = 'a' + ) }}}} + select * from {{{{ref('ref2')}}}} +""" + + +@pytest.mark.skip_profile("buenavista", "md") +class TestPlugins: + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target): + plugins = [{"module": "delta"}] + return { + "test": { + "outputs": { + "dev": { + "type": "duckdb", + "path": dbt_profile_target.get("path", ":memory:"), + "plugins": plugins, + } + }, + "target": "dev", + } + } + + @pytest.fixture(scope="class") + def models(self, project_root): + return { + "delta1.sql": delta1.format(root_path=project_root), + "upstream_delta1.sql": upstream_delta1.format(root_path=project_root), + "upstream_duckdb1.sql": upstream_duckdb1, + "delta2.sql": delta2.format(root_path=project_root), + "upstream_delta2.sql": upstream_delta2.format(root_path=project_root), + "ref1.sql": ref1, + "ref2.sql": ref2, + "delta3.sql": delta3.format(root_path=project_root), + } + + def test_plugins(self, project): + results = run_dbt( + ["run", "--select", "ref1 delta1 upstream_delta1 upstream_duckdb1"] + ) + + # overwrite with upstream model + check_relations_equal( + project.adapter, + [ + "ref1", + "upstream_duckdb1", + ], + ) + res = project.run_sql("SELECT count(1) FROM 'upstream_delta1'", fetch="one") + assert res[0] == 1 + + # append with upstream model + + results = run_dbt(["run", "--select", "ref1 delta2"]) + + results = run_dbt(["run", "--select", "ref1 delta2 upstream_delta2"]) + + res = project.run_sql("SELECT * FROM 'delta2'", fetch="all") + assert len(res) == 2 + + # merge + # reuse path from delta1 + results = run_dbt(["run", "--select", "ref2 delta3"]) + + res = project.run_sql("SELECT * FROM 'delta3'", fetch="all") + assert res == [(3, "test2"), (2, "test1")] diff --git a/tests/functional/plugins/test_native.py b/tests/functional/plugins/test_native.py index 0248df85..56428c7d 100644 --- a/tests/functional/plugins/test_native.py +++ b/tests/functional/plugins/test_native.py @@ -36,20 +36,20 @@ SELECT * from {{ref("partition_model_parquet")}} """ -default_csv= """ +default_csv = """ {{ config(materialized="external", format="csv", delimiter="|" ) }} SELECT * FROM {{ref("base")}} - """ + """ upstream_default_csv = """ {{ config(materialized="table") }} SELECT * from {{ref("default_csv")}} """ -default_json= """ +default_json = """ {{ config(materialized="external", format="json", location="{{ adapter.external_root() }}/test.json" ) }} SELECT * FROM {{ref("base")}} - """ + """ upstream_default_json = """ {{ config(materialized="table") }} @@ -62,14 +62,14 @@ class TestDuckdbtNativelMaterializations: @pytest.fixture(scope="class") def models(self): return { - "default_parquet.sql" : default_parquet, - "upstream_default_parquet.sql" : upstream_default_parquet, + "default_parquet.sql": default_parquet, + "upstream_default_parquet.sql": upstream_default_parquet, "partition_model_parquet.sql": partition_model_parquet, "upstream_partition_model_parquet.sql": upstream_partition_model_parquet, "default_csv.sql": default_csv, "upstream_default_csv.sql": upstream_default_csv, "default_json.sql": default_json, - "upstream_default_json.sql": upstream_default_json + "upstream_default_json.sql": upstream_default_json, } @pytest.fixture(scope="class") @@ -79,7 +79,7 @@ def seeds(self): } @pytest.fixture(scope="class") - def profiles_config_update(self, dbt_profile_target,tmp_path_factory): + def profiles_config_update(self, dbt_profile_target, tmp_path_factory): extroot = str(tmp_path_factory.getbasetemp() / "external") os.mkdir(extroot) return { @@ -88,11 +88,8 @@ def profiles_config_update(self, dbt_profile_target,tmp_path_factory): "dev": { "type": "duckdb", "path": "duckdb.dev", - "plugins": [ - {"module": "native"} - ], - "external_root" : f'{extroot}', - "threads" : 8 + "external_root": f"{extroot}", + "threads": 8, } }, "target": "dev", @@ -105,6 +102,3 @@ def test_base(self, project): results = run_dbt() print(project.project_root) print("break point") - - - From 8aba866fa9d44dfcfada7a54db1dd0c6772e0476 Mon Sep 17 00:00:00 2001 From: Aleksandar Date: Sun, 3 Mar 2024 17:24:11 +0000 Subject: [PATCH 11/13] add self reference macro; reformat --- dbt/adapters/duckdb/environments/__init__.py | 21 ++-- dbt/adapters/duckdb/environments/local.py | 18 ++-- dbt/adapters/duckdb/plugins/delta.py | 32 +++---- dbt/adapters/duckdb/plugins/native.py | 14 ++- dbt/include/duckdb/macros/utils/upstream.sql | 41 ++++---- .../functional/adapter/test_rematerialize.py | 2 +- .../plugins/{ => delta}/test_delta.py | 0 .../plugins/delta/test_delta_reference.py | 96 +++++++++++++++++++ .../plugins/{ => delta}/test_delta_write.py | 0 9 files changed, 153 insertions(+), 71 deletions(-) rename tests/functional/plugins/{ => delta}/test_delta.py (100%) create mode 100644 tests/functional/plugins/delta/test_delta_reference.py rename tests/functional/plugins/{ => delta}/test_delta_write.py (100%) diff --git a/dbt/adapters/duckdb/environments/__init__.py b/dbt/adapters/duckdb/environments/__init__.py index 048dcaf6..f1085ac9 100644 --- a/dbt/adapters/duckdb/environments/__init__.py +++ b/dbt/adapters/duckdb/environments/__init__.py @@ -4,17 +4,20 @@ import sys import tempfile import time -from typing import Dict, List, Optional +from typing import Dict +from typing import List +from typing import Optional import duckdb +from ..credentials import DuckDBCredentials +from ..credentials import PluginConfig +from ..plugins import BasePlugin +from ..utils import SourceConfig +from ..utils import TargetConfig from dbt.contracts.connection import AdapterResponse from dbt.exceptions import DbtRuntimeError -from ..credentials import DuckDBCredentials, PluginConfig -from ..plugins import BasePlugin -from ..utils import SourceConfig, TargetConfig - def _ensure_event_loop(): """ @@ -93,9 +96,7 @@ def handle(self): pass @abc.abstractmethod - def submit_python_job( - self, handle, parsed_model: dict, compiled_code: str - ) -> AdapterResponse: + def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: pass @abc.abstractmethod @@ -211,9 +212,7 @@ def initialize_plugins(cls, creds: DuckDBCredentials) -> Dict[str, BasePlugin]: for plugin_def in creds.plugins or [] + [PluginConfig("native")]: config = base_config.copy() config.update(plugin_def.config or {}) - plugin = BasePlugin.create( - plugin_def.module, config=config, alias=plugin_def.alias - ) + plugin = BasePlugin.create(plugin_def.module, config=config, alias=plugin_def.alias) ret[plugin.name] = plugin return ret diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index 292d09ba..8c039ee9 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -1,11 +1,11 @@ import threading +from . import Environment +from .. import credentials +from .. import utils from dbt.contracts.connection import AdapterResponse from dbt.exceptions import DbtRuntimeError -from .. import credentials, utils -from . import Environment - class DuckDBCursorWrapper: def __init__(self, cursor): @@ -48,9 +48,7 @@ def __init__(self, credentials: credentials.DuckDBCredentials): self.handle_count = 0 self.lock = threading.RLock() self._keep_open = ( - credentials.keep_open - or credentials.path == ":memory:" - or credentials.is_motherduck + credentials.keep_open or credentials.path == ":memory:" or credentials.is_motherduck ) self._REGISTERED_DF: dict = {} @@ -72,9 +70,7 @@ def handle(self): ) return DuckDBConnectionWrapper(cursor, self) - def submit_python_job( - self, handle, parsed_model: dict, compiled_code: str - ) -> AdapterResponse: + def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: con = handle.cursor() def ldf(table_name): @@ -109,9 +105,7 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): params.append(source_config.database) if cursor.execute(q, params).fetchone()[0]: if save_mode == "error_if_exists": - raise Exception( - f"Source {source_config.table_name()} already exists!" - ) + raise Exception(f"Source {source_config.table_name()} already exists!") else: # Nothing to do (we ignore the existing table) return diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index 258e515b..75d1909e 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -1,12 +1,14 @@ -import os -from typing import Any, Dict +from typing import Any +from typing import Dict import pyarrow.compute as pc -from deltalake import DeltaTable, write_deltalake +from deltalake import DeltaTable +from deltalake import write_deltalake from duckdb import DuckDBPyRelation -from ..utils import SourceConfig, TargetConfig from . import BasePlugin +from ..utils import SourceConfig +from ..utils import TargetConfig class Plugin(BasePlugin): @@ -18,9 +20,7 @@ def configure_cursor(self, cursor): def load(self, source_config: SourceConfig, coursor=None): if "delta_table_path" not in source_config: - raise Exception( - "'delta_table_path' is a required argument for the delta table!" - ) + raise Exception("'delta_table_path' is a required argument for the delta table!") table_path = source_config["delta_table_path"] storage_options = source_config.get("storage_options", None) @@ -95,9 +95,7 @@ def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor=None): raise Exception( f"'{each_key}' column has not one unique value, values are: {str(unique_key_array)}" ) - create_insert_partition( - table_path, arrow_df, partition_dict, storage_options - ) + create_insert_partition(table_path, arrow_df, partition_dict, storage_options) elif mode == "merge": # very slow -> https://github.com/delta-io/delta-rs/issues/1846 unique_key = target_config.config.get("unique_key", None) @@ -151,8 +149,6 @@ def table_exists(table_path, storage_options): ## TODO # add partition writing -# add optimization, vacumm options to automatically run before each run ? -# can deltars optimize if the data is bigger then memory? def create_insert_partition(table_path, data, partitions, storage_options): @@ -166,16 +162,10 @@ def create_insert_partition(table_path, data, partitions, storage_options): print( f"Overwriting delta table under: {table_path} \nwith partition expr: {partition_expr}" ) - write_deltalake( - table_path, data, partition_filters=partition_expr, mode="overwrite" - ) + write_deltalake(table_path, data, partition_filters=partition_expr, mode="overwrite") else: - partitions = [ - partition_name for (partition_name, partition_value) in partitions - ] - print( - f"Creating delta table under: {table_path} \nwith partitions: {partitions}" - ) + partitions = [partition_name for (partition_name, partition_value) in partitions] + print(f"Creating delta table under: {table_path} \nwith partitions: {partitions}") write_deltalake(table_path, data, partition_by=partitions) diff --git a/dbt/adapters/duckdb/plugins/native.py b/dbt/adapters/duckdb/plugins/native.py index 04554e7f..95b45371 100644 --- a/dbt/adapters/duckdb/plugins/native.py +++ b/dbt/adapters/duckdb/plugins/native.py @@ -1,10 +1,12 @@ import os -from typing import Any, Dict +from typing import Any +from typing import Dict from duckdb import DuckDBPyRelation -from ..utils import SourceConfig, TargetConfig from . import BasePlugin +from ..utils import SourceConfig +from ..utils import TargetConfig # here will be parquet,csv,json implementation, # this plugin should be default one if none is specified @@ -46,9 +48,7 @@ def create_source_config(self, target_config: TargetConfig) -> SourceConfig: def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor=None): location = target_config.location.path - options = external_write_options( - location, target_config.config.get("options", {}) - ) + options = external_write_options(location, target_config.config.get("options", {})) cursor.sql(f"COPY (SELECT * FROM df) to '{location}' ({options})") def adapt_target_config(self, target_config: TargetConfig) -> TargetConfig: @@ -104,7 +104,5 @@ def external_read_location(write_location: str, rendered_options: dict) -> str: globs = [write_location, "*"] partition_by = str(rendered_options.get("partition_by")) globs.extend(["*"] * len(partition_by.split(","))) - return ".".join( - ["/".join(globs), str(rendered_options.get("format", "parquet"))] - ) + return ".".join(["/".join(globs), str(rendered_options.get("format", "parquet"))]) return write_location diff --git a/dbt/include/duckdb/macros/utils/upstream.sql b/dbt/include/duckdb/macros/utils/upstream.sql index 274137d3..2ae688ca 100644 --- a/dbt/include/duckdb/macros/utils/upstream.sql +++ b/dbt/include/duckdb/macros/utils/upstream.sql @@ -21,26 +21,31 @@ {%- set plugin_name = upstream.config.get('plugin', 'native') -%} {% do store_relation(plugin_name, upstream_rel, location, format, upstream.config, True) %} {% endfor %} - {% for upstream_node in graph['nodes'][node]['depends_on']['nodes'] - if upstream_node not in upstream_nodes and upstream_node not in selected_resources - and graph['nodes'].get(upstream_node) - and graph['nodes'].get(upstream_node).resource_type in ('model', 'seed') +{% endfor %} +{% do adapter.commit() %} +{% endif %} +{%- endmacro -%} + + + +{%- macro register_self_reference_external_models() -%} +{% if execute %} +{% for node in selected_resources if + graph['nodes'][node].resource_type in ('model', 'seed') + and graph['nodes'][node].config.materialized=='external' + and 'this' in graph['nodes'][node]['raw_code'] %} - {% set upstream = graph['nodes'].get(upstream_node) %} - {%- set upstream_rel = api.Relation.create( - database=upstream['database'], - schema=upstream['schema'], - identifier=upstream['alias'] + {% set current_node = graph['nodes'][node] %} + {%- set node_rel = api.Relation.create( + database=current_node['database'], + schema=current_node['schema'], + identifier=current_node['alias'] ) -%} - {% call statement('main', language='sql') -%} - create schema if not exists {{ upstream_rel.schema }} - {%- endcall %} - {% call statement('main', language='sql') -%} - create or replace {{upstream.config.materialized }} {{ upstream_rel }} as ( - select * from ({{ upstream.raw_code }}) - ); - {%- endcall %} - {% endfor %} + {%- set rendered_options = render_write_options(config) -%} + {%- set location = current_node.config.get('location', external_location(node_rel, current_node.config)) -%} + {%- set format = current_node.config.get('format', 'default') -%} + {%- set plugin_name = current_node.config.get('plugin', 'native') -%} + {% do store_relation(plugin_name, node_rel, location, format, current_node.config, True) %} {% endfor %} {% do adapter.commit() %} {% endif %} diff --git a/tests/functional/adapter/test_rematerialize.py b/tests/functional/adapter/test_rematerialize.py index 89f41bc0..8b4dfc9b 100644 --- a/tests/functional/adapter/test_rematerialize.py +++ b/tests/functional/adapter/test_rematerialize.py @@ -48,6 +48,7 @@ def dbt_profile_target(self, dbt_profile_target, tmp_path_factory): def project_config_update(self): return { "name": "base", + "models": {"+materialized": "external"}, "on-run-start": ["{{ register_upstream_external_models() }}"], } @@ -71,7 +72,6 @@ def test_run(self, project): "run", "--select", "downstream_model other_downstream_model downstream_of_partition_model", - "-d", ] ) diff --git a/tests/functional/plugins/test_delta.py b/tests/functional/plugins/delta/test_delta.py similarity index 100% rename from tests/functional/plugins/test_delta.py rename to tests/functional/plugins/delta/test_delta.py diff --git a/tests/functional/plugins/delta/test_delta_reference.py b/tests/functional/plugins/delta/test_delta_reference.py new file mode 100644 index 00000000..5fd651cf --- /dev/null +++ b/tests/functional/plugins/delta/test_delta_reference.py @@ -0,0 +1,96 @@ +import pytest + +from dbt.tests.util import ( + check_relations_equal, + run_dbt, +) + +ref1 = """ +SELECT 1 as a, 'test' as b +UNION ALL +SELECT 2 as a, 'test2' as b +UNION ALL +SELECT 3 as a, 'test3' as b +UNION ALL +SELECT 4 as a, 'test4' as b +UNION ALL +SELECT 5 as a, 'test5' as b +UNION ALL +SELECT 6 as a, 'test6' as b +""" + +delta1 = """ + {{{{ config( + materialized='external', + mode = 'append', + plugin = 'delta', + location = '{root_path}/delta1', + ) }}}} + + + select * from {{{{ref('ref1')}}}} + {{{{var('first_run')}}}} + {{% if var('first_run') == 'true' %}} + WHERE a < 2 + {{% else %}} + WHERE a >= (SELECT max(a) from {{{{ this }}}}) + {{% endif %}} + +""" + + +@pytest.mark.skip_profile("buenavista", "md") +class TestPlugins: + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target): + plugins = [{"module": "delta"}] + return { + "test": { + "outputs": { + "dev": { + "type": "duckdb", + "path": dbt_profile_target.get("path", ":memory:"), + "plugins": plugins, + } + }, + "target": "dev", + } + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "name": "base", + "on-run-start": ["{{ register_self_reference_external_models() }}"], + } + + @pytest.fixture(scope="class") + def models(self, project_root): + return { + "delta1.sql": delta1.format(root_path=project_root), + "ref1.sql": ref1, + } + + def test_plugins(self, project): + # This doesnt work because we need some kind of incremental notin + # i made it register on the begining but if the table doesnt exists by the first run it can't register + # We have to see how to do it + + results = run_dbt( + [ + "run", + "--select", + "ref1 delta1", + "--vars", + "{'first_run': 'true'}", + "-d", + ] + ) + + res = project.run_sql("SELECT * FROM 'delta1'", fetch="all") + + # results = run_dbt(["run", "--select", "ref1 delta1"]) + + # res = project.run_sql("SELECT * FROM 'delta1'", fetch="all") + + print("hello") diff --git a/tests/functional/plugins/test_delta_write.py b/tests/functional/plugins/delta/test_delta_write.py similarity index 100% rename from tests/functional/plugins/test_delta_write.py rename to tests/functional/plugins/delta/test_delta_write.py From 0b304c033acec74d4713462761a4f6230ae6d8d6 Mon Sep 17 00:00:00 2001 From: Aleksandar Date: Sun, 3 Mar 2024 17:46:18 +0000 Subject: [PATCH 12/13] add an example for incremental delta --- dbt/include/duckdb/macros/utils/upstream.sql | 3 +- .../plugins/delta/test_delta_reference.py | 28 +++++++++++-------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/dbt/include/duckdb/macros/utils/upstream.sql b/dbt/include/duckdb/macros/utils/upstream.sql index 2ae688ca..c00427b3 100644 --- a/dbt/include/duckdb/macros/utils/upstream.sql +++ b/dbt/include/duckdb/macros/utils/upstream.sql @@ -29,11 +29,10 @@ {%- macro register_self_reference_external_models() -%} -{% if execute %} +{% if execute and var("first_run") != 'true' %} {% for node in selected_resources if graph['nodes'][node].resource_type in ('model', 'seed') and graph['nodes'][node].config.materialized=='external' - and 'this' in graph['nodes'][node]['raw_code'] %} {% set current_node = graph['nodes'][node] %} {%- set node_rel = api.Relation.create( diff --git a/tests/functional/plugins/delta/test_delta_reference.py b/tests/functional/plugins/delta/test_delta_reference.py index 5fd651cf..6b9d29ef 100644 --- a/tests/functional/plugins/delta/test_delta_reference.py +++ b/tests/functional/plugins/delta/test_delta_reference.py @@ -11,12 +11,6 @@ SELECT 2 as a, 'test2' as b UNION ALL SELECT 3 as a, 'test3' as b -UNION ALL -SELECT 4 as a, 'test4' as b -UNION ALL -SELECT 5 as a, 'test5' as b -UNION ALL -SELECT 6 as a, 'test6' as b """ delta1 = """ @@ -29,7 +23,7 @@ select * from {{{{ref('ref1')}}}} - {{{{var('first_run')}}}} + {{% if var('first_run') == 'true' %}} WHERE a < 2 {{% else %}} @@ -72,9 +66,10 @@ def models(self, project_root): } def test_plugins(self, project): - # This doesnt work because we need some kind of incremental notin + # This doesnt work because we need some kind of incremental thinking # i made it register on the begining but if the table doesnt exists by the first run it can't register # We have to see how to do it + # I tried to simulate the incremental model with the variables which works results = run_dbt( [ @@ -89,8 +84,17 @@ def test_plugins(self, project): res = project.run_sql("SELECT * FROM 'delta1'", fetch="all") - # results = run_dbt(["run", "--select", "ref1 delta1"]) - - # res = project.run_sql("SELECT * FROM 'delta1'", fetch="all") + results = run_dbt( + [ + "run", + "--select", + "ref1 delta1", + "--vars", + "{'first_run': 'false'}", + "-d", + ] + ) - print("hello") + res = project.run_sql("SELECT * FROM 'delta1'", fetch="all") + assert res == [(1, "test"), (3, "test3"), (2, "test2"), (1, "test")] + print("break point") From a064f9c7d90c15c83a1683bd0c012e0a6b46f793 Mon Sep 17 00:00:00 2001 From: Aleksandar Date: Mon, 4 Mar 2024 14:13:32 +0000 Subject: [PATCH 13/13] fix typos --- dbt/adapters/duckdb/plugins/__init__.py | 8 ++++---- dbt/adapters/duckdb/plugins/delta.py | 2 +- dbt/adapters/duckdb/plugins/excel.py | 4 ++-- dbt/adapters/duckdb/plugins/gsheet.py | 2 +- dbt/adapters/duckdb/plugins/iceberg.py | 2 +- dbt/adapters/duckdb/plugins/native.py | 2 +- dbt/adapters/duckdb/plugins/sqlalchemy.py | 2 +- tests/create_function_plugin.py | 19 +++++++++---------- 8 files changed, 20 insertions(+), 21 deletions(-) diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py index 93e667c5..39b2c15c 100644 --- a/dbt/adapters/duckdb/plugins/__init__.py +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -111,8 +111,8 @@ def configure_connection(self, conn: DuckDBPyConnection): """ pass - # coursor is needed for the native plugin - def load(self, source_config: SourceConfig, coursor=None): + # cursor is needed for the native plugin + def load(self, source_config: SourceConfig, cursor=None): """ Load data from a source config and return it as a DataFrame-like object that DuckDB can read. This method should be overridden by subclasses that @@ -123,13 +123,13 @@ def load(self, source_config: SourceConfig, coursor=None): """ raise NotImplementedError(f"load method not implemented for {self.name}") - # coursor is needed just for the native, we have to do it better + # cursor is needed just for the native, we have to do it better # to had it over in some initalization? def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor=None): raise NotImplementedError(f"store method not implemented for {self.name}") def create_source_config(self, target_config: TargetConfig) -> SourceConfig: - raise NotImplementedError(f"store method not implemented for {self.name}") + raise NotImplementedError(f"create_source_config method not implemented for {self.name}") def can_be_upstream_referenced(self): return False diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index 75d1909e..5aa6e46d 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -18,7 +18,7 @@ def initialize(self, config: Dict[str, Any]): def configure_cursor(self, cursor): pass - def load(self, source_config: SourceConfig, coursor=None): + def load(self, source_config: SourceConfig, cursor=None): if "delta_table_path" not in source_config: raise Exception("'delta_table_path' is a required argument for the delta table!") diff --git a/dbt/adapters/duckdb/plugins/excel.py b/dbt/adapters/duckdb/plugins/excel.py index e59f2080..1dd47520 100644 --- a/dbt/adapters/duckdb/plugins/excel.py +++ b/dbt/adapters/duckdb/plugins/excel.py @@ -24,7 +24,7 @@ def initialize(self, plugin_config: Dict[str, Any]): if "s3_region" in plugin_config: os.environ["AWS_DEFAULT_REGION"] = plugin_config["s3_region"] - def load(self, source_config: SourceConfig, coursor=None): + def load(self, source_config: SourceConfig, cursor=None): ext_location = source_config["external_location"] ext_location = ext_location.format(**source_config.as_dict()) if "s3" in ext_location: @@ -95,7 +95,7 @@ def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor=None): def create_source_config(self, target_config: TargetConfig) -> SourceConfig: # in the reader we have just location and sheet_name, maybe we can add here more options - # but in the first place i would not recommend to upstream excel file + # but in the first place i would not recommend to downstream excel file # this works for a very simple case but not all of them meta = { "external_location": target_config.location.path, diff --git a/dbt/adapters/duckdb/plugins/gsheet.py b/dbt/adapters/duckdb/plugins/gsheet.py index 2df7ca4c..b453bbb5 100644 --- a/dbt/adapters/duckdb/plugins/gsheet.py +++ b/dbt/adapters/duckdb/plugins/gsheet.py @@ -27,7 +27,7 @@ def initialize(self, config: Dict[str, Any]): self._config = GSheetConfig.from_dict(config) self._gc = self._config.client() - def load(self, source_config: SourceConfig, coursor=None): + def load(self, source_config: SourceConfig, cursor=None): doc = None if "title" in source_config: doc = self._gc.open(source_config["title"]) diff --git a/dbt/adapters/duckdb/plugins/iceberg.py b/dbt/adapters/duckdb/plugins/iceberg.py index e2cb33f2..56e3f708 100644 --- a/dbt/adapters/duckdb/plugins/iceberg.py +++ b/dbt/adapters/duckdb/plugins/iceberg.py @@ -14,7 +14,7 @@ def initialize(self, config: Dict[str, Any]): catalog = config.pop("catalog") self._catalog = pyiceberg.catalog.load_catalog(catalog, **config) - def load(self, source_config: SourceConfig, coursor=None): + def load(self, source_config: SourceConfig, cursor=None): table_format = source_config.get("iceberg_table", "{schema}.{identifier}") table_name = table_format.format(**source_config.as_dict()) table = self._catalog.load_table(table_name) diff --git a/dbt/adapters/duckdb/plugins/native.py b/dbt/adapters/duckdb/plugins/native.py index 95b45371..c78ea696 100644 --- a/dbt/adapters/duckdb/plugins/native.py +++ b/dbt/adapters/duckdb/plugins/native.py @@ -25,7 +25,7 @@ def default_materialization(self): # this one can be better not to go over some other format and df but directly # https://stackoverflow.com/questions/78055585/how-to-reference-duckdbpyrelation-from-another-connection - def load(self, source_config: SourceConfig, coursor=None): + def load(self, source_config: SourceConfig, cursor=None): location = external_read_location( source_config.meta.get("location", "").get("path"), source_config.meta.get("config", {}).get("options", {}), diff --git a/dbt/adapters/duckdb/plugins/sqlalchemy.py b/dbt/adapters/duckdb/plugins/sqlalchemy.py index 9273bc08..11c2b50b 100644 --- a/dbt/adapters/duckdb/plugins/sqlalchemy.py +++ b/dbt/adapters/duckdb/plugins/sqlalchemy.py @@ -19,7 +19,7 @@ class Plugin(BasePlugin): def initialize(self, plugin_config: Dict[str, Any]): self.engine = create_engine(plugin_config.pop("connection_url"), **plugin_config) - def load(self, source_config: SourceConfig, coursor=None): + def load(self, source_config: SourceConfig, cursor=None): if "query" in source_config: query = source_config["query"] query = query.format(**source_config.as_dict()) diff --git a/tests/create_function_plugin.py b/tests/create_function_plugin.py index 0b99b2de..b1b3ed09 100644 --- a/tests/create_function_plugin.py +++ b/tests/create_function_plugin.py @@ -13,23 +13,22 @@ class Plugin(BasePlugin): def configure_connection(self, conn: DuckDBPyConnection): conn.create_function("foo", foo) - def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor = None): + def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor=None): assert target_config.config.get("key") == "value" def can_be_upstream_referenced(self): return True - - def load(self, source_config: SourceConfig, coursor = None): - + + def load(self, source_config: SourceConfig, cursor=None): return duckdb.sql("SELECT 1729 as foo").arrow() - + def create_source_config(self, target_config: TargetConfig) -> SourceConfig: source_config = SourceConfig( - name= target_config.relation.name, - identifier= target_config.relation.identifier, + name=target_config.relation.name, + identifier=target_config.relation.identifier, schema=target_config.relation.schema, database=target_config.relation.database, - meta= target_config.as_dict(), - tags= [], + meta=target_config.as_dict(), + tags=[], ) - return source_config \ No newline at end of file + return source_config