diff --git a/dbt/adapters/duckdb/plugins/glue.py b/dbt/adapters/duckdb/plugins/glue.py index c26a944b..76f25419 100644 --- a/dbt/adapters/duckdb/plugins/glue.py +++ b/dbt/adapters/duckdb/plugins/glue.py @@ -1,5 +1,6 @@ from typing import Any from typing import Dict +from typing import List from typing import Optional from typing import Sequence @@ -162,14 +163,34 @@ def _get_column_type_def( return None +def _add_partition_columns(table_def: TableInputTypeDef, partition_columns) -> TableInputTypeDef: + partition_keys = [] + if "PartitionKeys" not in table_def: + table_def["PartitionKeys"] = [] + for column in partition_columns: + partition_column = ColumnTypeDef(Name=column["Name"], Type=column["Type"]) + partition_keys.append(partition_column) + table_def["PartitionKeys"] = partition_keys + # Remove columns from StorageDescriptor if they match with partition columns to avoid duplicate columns + for partition_column in partition_columns: + table_def["StorageDescriptor"]["Columns"] = [ + column + for column in table_def["StorageDescriptor"]["Columns"] + if not ( + column["Name"] == partition_column["Name"] + and column["Type"] == partition_column["Type"] + ) + ] + return table_def + + def _get_table_def( table: str, - s3_path: str, + s3_parent: str, columns: Sequence["ColumnTypeDef"], file_format: str, delimiter: str, ): - s3_parent = "/".join(s3_path.split("/")[:-1]) if file_format == "csv": table_def = _get_csv_table_def( table=table, @@ -205,17 +226,27 @@ def create_or_update_table( s3_path: str, file_format: str, delimiter: str, + partition_columns: List[Dict[str, str]] = [], ) -> None: + # Set s3 original path if partitioning is used, else use parent path + if partition_columns != []: + s3_parent = s3_path + if partition_columns == []: + s3_parent = "/".join(s3_path.split("/")[:-1]) + # Existing table in AWS Glue catalog glue_table = _get_table(client=client, database=database, table=table) columns = _convert_columns(column_list) table_def = _get_table_def( table=table, - s3_path=s3_path, + s3_parent=s3_parent, columns=columns, file_format=file_format, delimiter=delimiter, ) + # Add partition columns + if partition_columns != []: + table_def = _add_partition_columns(table_def, partition_columns) if glue_table: # Existing columns in AWS Glue catalog glue_columns = _get_column_type_def(glue_table) @@ -236,6 +267,8 @@ def store(self, target_config: TargetConfig): assert target_config.location is not None assert target_config.relation.identifier is not None table: str = target_config.relation.identifier + partition_columns = target_config.config.get("partition_columns", []) + create_or_update_table( self.client, self.database, @@ -244,4 +277,5 @@ def store(self, target_config: TargetConfig): target_config.location.path, target_config.location.format, self.delimiter, + partition_columns, ) diff --git a/dbt/include/duckdb/macros/materializations/external.sql b/dbt/include/duckdb/macros/materializations/external.sql index ed1e179b..91e9b5ab 100644 --- a/dbt/include/duckdb/macros/materializations/external.sql +++ b/dbt/include/duckdb/macros/materializations/external.sql @@ -78,6 +78,7 @@ -- register table into glue {%- set plugin_name = config.get('plugin') -%} {%- set glue_register = config.get('glue_register', default=false) -%} + {%- set partition_columns = config.get('partition_columns', []) -%} {% if plugin_name is not none or glue_register is true %} {% if glue_register %} {# legacy hack to set the glue database name, deprecate this #}