Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactoring-external-materialization #332

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions dbt/adapters/duckdb/environments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import duckdb

from ..credentials import DuckDBCredentials
from ..credentials import PluginConfig
from ..plugins import BasePlugin
from ..utils import SourceConfig
from ..utils import TargetConfig
Expand Down Expand Up @@ -109,7 +110,9 @@ def load_source(self, plugin_name: str, source_config: SourceConfig) -> str:
pass

@abc.abstractmethod
def store_relation(self, plugin_name: str, target_config: TargetConfig) -> None:
def store_relation(
self, plugin_name: str, target_config: TargetConfig, just_register: bool = False
) -> None:
pass

def get_binding_char(self) -> str:
Expand Down Expand Up @@ -215,7 +218,7 @@ def initialize_cursor(
def initialize_plugins(cls, creds: DuckDBCredentials) -> Dict[str, BasePlugin]:
ret = {}
base_config = creds.settings or {}
for plugin_def in creds.plugins or []:
for plugin_def in creds.plugins or [] + [PluginConfig("native")]:
config = base_config.copy()
config.update(plugin_def.config or {})
plugin = BasePlugin.create(plugin_def.module, config=config, alias=plugin_def.alias)
Expand All @@ -224,7 +227,12 @@ def initialize_plugins(cls, creds: DuckDBCredentials) -> Dict[str, BasePlugin]:

@classmethod
def run_python_job(
cls, con, load_df_function, identifier: str, compiled_code: str, creds: DuckDBCredentials
cls,
con,
load_df_function,
identifier: str,
compiled_code: str,
creds: DuckDBCredentials,
):
mod_file = tempfile.NamedTemporaryFile(suffix=".py", delete=False)
mod_file.write(compiled_code.lstrip().encode("utf-8"))
Expand Down
4 changes: 3 additions & 1 deletion dbt/adapters/duckdb/environments/buenavista.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig):
cursor.close()
handle.close()

def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) -> None:
def store_relation(
self, plugin_name: str, target_config: utils.TargetConfig, just_register: bool = False
) -> None:
handle = self.handle()
payload = {
"method": "dbt_store_relation",
Expand Down
66 changes: 46 additions & 20 deletions dbt/adapters/duckdb/environments/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def handle(self):
self.handle_count += 1

cursor = self.initialize_cursor(
self.creds, self.conn.cursor(), self._plugins, self._REGISTERED_DF
self.creds, self.conn.cursor(), self._plugins, self._REGISTERED_DF.copy()
)
return DuckDBConnectionWrapper(cursor, self)

Expand Down Expand Up @@ -109,7 +109,7 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig):
else:
# Nothing to do (we ignore the existing table)
return
df = plugin.load(source_config)
df = plugin.load(source_config, cursor)
assert df is not None

materialization = source_config.meta.get(
Expand All @@ -118,11 +118,18 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig):
source_table_name = source_config.table_name()
df_name = source_table_name.replace(".", "_") + "_df"

cursor.register(df_name, df)
# hack for native plugin till we can't register a native df
# if native plugin -> set df_name = df which is string
# this native view/table doesnt have to be registered for each connection
if plugin_name == "native":
df_name = df
else:
cursor.register(df_name, df)

if materialization == "view":
# save to df instance to register on each cursor creation
self._REGISTERED_DF[df_name] = df
if materialization == "view":
# save to df instance to register on each cursor creation
with self.lock:
self._REGISTERED_DF[df_name] = df

cursor.execute(
f"CREATE OR REPLACE {materialization} {source_table_name} AS SELECT * FROM {df_name}"
Expand All @@ -131,22 +138,41 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig):
cursor.close()
handle.close()

def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) -> None:
if plugin_name not in self._plugins:
if plugin_name.startswith("glue|"):
from ..plugins import glue
def store_relation(
self,
plugin_name: str,
target_config: utils.TargetConfig,
just_register: bool = False,
) -> None:
# some plugin have to be initialized on the fly? glue for example?

_, glue_db = plugin_name.split("|")
config = (self.creds.settings or {}).copy()
config["glue_database"] = glue_db
self._plugins[plugin_name] = glue.Plugin(plugin_name, config)
else:
raise Exception(
f"Plugin {plugin_name} not found; known plugins are: "
+ ",".join(self._plugins.keys())
)
if plugin_name not in self._plugins:
raise Exception(
f"Plugin {plugin_name} not found; known plugins are: "
+ ",".join(self._plugins.keys())
)
plugin = self._plugins[plugin_name]
plugin.store(target_config)

# e.g add file format to the location
target_config = plugin.adapt_target_config(target_config)
if not just_register:
# export data with the store model
handle = self.handle()
cursor = handle.cursor()

df = cursor.sql(target_config.config.model.compiled_code)
# hand over Duckdb format that each plugin can choose which type of integration to use

plugin.store(df, target_config, cursor)

cursor.close()
handle.close()

# all are by default false, has to be turned on per plugin
if plugin.can_be_upstream_referenced():
# create df and view which can be referenced in the run following run
source_config = plugin.create_source_config(target_config)
self.load_source(plugin_name, source_config)

def close(self):
if self.conn:
Expand Down
3 changes: 2 additions & 1 deletion dbt/adapters/duckdb/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,15 @@ def store_relation(
path: str,
format: str,
config: RuntimeConfigObject,
just_register: bool,
) -> None:
target_config = TargetConfig(
relation=relation,
column_list=column_list,
config=config,
location=TargetLocation(path=path, format=format),
)
DuckDBConnectionManager.env().store_relation(plugin_name, target_config)
DuckDBConnectionManager.env().store_relation(plugin_name, target_config, just_register)

@available
def external_root(self) -> str:
Expand Down
17 changes: 15 additions & 2 deletions dbt/adapters/duckdb/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Optional

from duckdb import DuckDBPyConnection
from duckdb import DuckDBPyRelation

from ..credentials import DuckDBCredentials
from ..utils import SourceConfig
Expand Down Expand Up @@ -110,7 +111,8 @@ def configure_connection(self, conn: DuckDBPyConnection):
"""
pass

def load(self, source_config: SourceConfig):
# cursor is needed for the native plugin
def load(self, source_config: SourceConfig, cursor=None):
"""
Load data from a source config and return it as a DataFrame-like object
that DuckDB can read. This method should be overridden by subclasses that
Expand All @@ -121,9 +123,17 @@ def load(self, source_config: SourceConfig):
"""
raise NotImplementedError(f"load method not implemented for {self.name}")

def store(self, target_config: TargetConfig):
# cursor is needed just for the native, we have to do it better
# to had it over in some initalization?
def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor=None):
raise NotImplementedError(f"store method not implemented for {self.name}")

def create_source_config(self, target_config: TargetConfig) -> SourceConfig:
raise NotImplementedError(f"create_source_config method not implemented for {self.name}")

def can_be_upstream_referenced(self):
return False

def configure_cursor(self, cursor):
"""
Configure each copy of the DuckDB cursor.
Expand All @@ -136,3 +146,6 @@ def configure_cursor(self, cursor):

def default_materialization(self):
return "table"

def adapt_target_config(self, target_config: TargetConfig) -> TargetConfig:
return target_config
127 changes: 126 additions & 1 deletion dbt/adapters/duckdb/plugins/delta.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from typing import Any
from typing import Dict

import pyarrow.compute as pc
from deltalake import DeltaTable
from deltalake import write_deltalake
from duckdb import DuckDBPyRelation

from . import BasePlugin
from ..utils import SourceConfig
from ..utils import TargetConfig


class Plugin(BasePlugin):
Expand All @@ -14,7 +18,7 @@ def initialize(self, config: Dict[str, Any]):
def configure_cursor(self, cursor):
pass

def load(self, source_config: SourceConfig):
def load(self, source_config: SourceConfig, cursor=None):
if "delta_table_path" not in source_config:
raise Exception("'delta_table_path' is a required argument for the delta table!")

Expand Down Expand Up @@ -43,6 +47,127 @@ def load(self, source_config: SourceConfig):
def default_materialization(self):
return "view"

def adapt_target_config(self, target_config: TargetConfig) -> TargetConfig:
return target_config

def create_source_config(self, target_config: TargetConfig) -> SourceConfig:
meta = {
"delta_table_path": target_config.location.path,
"storage_options": target_config.config.get("storage_options", {}),
}

source_config = SourceConfig(
name=target_config.relation.name,
identifier=target_config.relation.identifier,
schema=target_config.relation.schema,
database=target_config.relation.database,
meta=meta,
tags=[],
)
return source_config

def can_be_upstream_referenced(self):
return True

def store(self, df: DuckDBPyRelation, target_config: TargetConfig, cursor=None):
mode = target_config.config.get("mode", "overwrite")
table_path = target_config.location.path
storage_options = target_config.config.get("storage_options", {})
arrow_df = df.fetch_arrow_reader()

if mode == "overwrite_partition":
partition_key = target_config.config.get("partition_key", None)
if not partition_key:
raise Exception(
"'partition_key' has to be defined when mode 'overwrite_partition'!"
)

if isinstance(partition_key, str):
partition_key = [partition_key]

partition_dict = []
for each_key in partition_key:
unique_key_array = pc.unique(arrow_df[each_key])

if len(unique_key_array) == 1:
partition_dict.append((each_key, str(unique_key_array[0])))
else:
raise Exception(
f"'{each_key}' column has not one unique value, values are: {str(unique_key_array)}"
)
create_insert_partition(table_path, arrow_df, partition_dict, storage_options)
elif mode == "merge":
# very slow -> https://github.com/delta-io/delta-rs/issues/1846
unique_key = target_config.config.get("unique_key", None)
if not unique_key:
raise Exception("'unique_key' has to be defined when mode 'merge'!")
if isinstance(unique_key, str):
unique_key = [unique_key]

predicate_stm = " and ".join(
[
f'source."{each_unique_key}" = target."{each_unique_key}"'
for each_unique_key in unique_key
]
)

try:
target_dt = DeltaTable(table_path, storage_options=storage_options)
except Exception:
# TODO handle this better
write_deltalake(
table_or_uri=table_path,
data=arrow_df,
storage_options=storage_options,
)

target_dt = DeltaTable(table_path, storage_options=storage_options)
# TODO there is a problem if the column name is uppercase
target_dt.merge(
source=arrow_df,
predicate=predicate_stm,
source_alias="source",
target_alias="target",
).when_not_matched_insert_all().when_matched_update_all().execute()
else:
write_deltalake(
table_or_uri=table_path,
data=arrow_df,
mode=mode,
storage_options=storage_options,
)


def table_exists(table_path, storage_options):
# this is bad, i have to find the way to see if there is table behind path
try:
DeltaTable(table_path, storage_options=storage_options)
except Exception:
return False
return True


## TODO
# add partition writing


def create_insert_partition(table_path, data, partitions, storage_options):
"""create a new delta table on the path or overwrite existing partition"""

if table_exists(table_path, storage_options):
partition_expr = [
(partition_name, "=", partition_value)
for (partition_name, partition_value) in partitions
]
print(
f"Overwriting delta table under: {table_path} \nwith partition expr: {partition_expr}"
)
write_deltalake(table_path, data, partition_filters=partition_expr, mode="overwrite")
else:
partitions = [partition_name for (partition_name, partition_value) in partitions]
print(f"Creating delta table under: {table_path} \nwith partitions: {partitions}")
write_deltalake(table_path, data, partition_by=partitions)


# Future
# TODO add databricks catalog
Loading
Loading