From 9537514bb13406fd7286428c9d3288a039cebfc6 Mon Sep 17 00:00:00 2001 From: Arno Roos Date: Wed, 24 Jan 2024 23:47:44 +0100 Subject: [PATCH 1/7] added partition support for Glue --- dbt/adapters/duckdb/plugins/glue.py | 18 ++++++++++++++++++ .../macros/materializations/external.sql | 1 + 2 files changed, 19 insertions(+) diff --git a/dbt/adapters/duckdb/plugins/glue.py b/dbt/adapters/duckdb/plugins/glue.py index c26a944b..b43fc69e 100644 --- a/dbt/adapters/duckdb/plugins/glue.py +++ b/dbt/adapters/duckdb/plugins/glue.py @@ -2,6 +2,7 @@ from typing import Dict from typing import Optional from typing import Sequence +from typing import List import boto3 from mypy_boto3_glue import GlueClient @@ -161,6 +162,17 @@ def _get_column_type_def( else: return None +def _add_partition_columns(table_def: TableInputTypeDef, partition_columns: List[Dict[str, str]]) -> TableInputTypeDef: + if 'PartitionKeys' not in table_def: + table_def['PartitionKeys'] = [] + for column_dict in partition_columns: + for column, column_type in column_dict.items(): + column_type_def = ColumnTypeDef( + Name=column, + Type=column_type + ) + table_def['PartitionKeys'].append(column_type_def) + return table_def def _get_table_def( table: str, @@ -205,6 +217,7 @@ def create_or_update_table( s3_path: str, file_format: str, delimiter: str, + partition_columns: List[Dict[str, str]], ) -> None: # Existing table in AWS Glue catalog glue_table = _get_table(client=client, database=database, table=table) @@ -215,7 +228,9 @@ def create_or_update_table( columns=columns, file_format=file_format, delimiter=delimiter, + partition_columns=partition_columns ) + table_def = _add_partition_columns(table_def, partition_columns) if glue_table: # Existing columns in AWS Glue catalog glue_columns = _get_column_type_def(glue_table) @@ -236,6 +251,8 @@ def store(self, target_config: TargetConfig): assert target_config.location is not None assert target_config.relation.identifier is not None table: str = target_config.relation.identifier + partition_columns = target_config.config.get('partition_columns', []) + create_or_update_table( self.client, self.database, @@ -244,4 +261,5 @@ def store(self, target_config: TargetConfig): target_config.location.path, target_config.location.format, self.delimiter, + partition_columns ) diff --git a/dbt/include/duckdb/macros/materializations/external.sql b/dbt/include/duckdb/macros/materializations/external.sql index ed1e179b..91e9b5ab 100644 --- a/dbt/include/duckdb/macros/materializations/external.sql +++ b/dbt/include/duckdb/macros/materializations/external.sql @@ -78,6 +78,7 @@ -- register table into glue {%- set plugin_name = config.get('plugin') -%} {%- set glue_register = config.get('glue_register', default=false) -%} + {%- set partition_columns = config.get('partition_columns', []) -%} {% if plugin_name is not none or glue_register is true %} {% if glue_register %} {# legacy hack to set the glue database name, deprecate this #} From 20228d51318be75fef17523a572d2d5741a558db Mon Sep 17 00:00:00 2001 From: Arno Roos Date: Wed, 24 Jan 2024 23:56:29 +0100 Subject: [PATCH 2/7] removed old argument --- dbt/adapters/duckdb/plugins/glue.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbt/adapters/duckdb/plugins/glue.py b/dbt/adapters/duckdb/plugins/glue.py index b43fc69e..3c4e4d86 100644 --- a/dbt/adapters/duckdb/plugins/glue.py +++ b/dbt/adapters/duckdb/plugins/glue.py @@ -227,8 +227,7 @@ def create_or_update_table( s3_path=s3_path, columns=columns, file_format=file_format, - delimiter=delimiter, - partition_columns=partition_columns + delimiter=delimiter ) table_def = _add_partition_columns(table_def, partition_columns) if glue_table: From 15ca9665c1130d0317687f4050046b388e29b324 Mon Sep 17 00:00:00 2001 From: Arno Roos Date: Thu, 25 Jan 2024 11:47:54 +0100 Subject: [PATCH 3/7] fixed assertion, won't add partitionkeys if no partition_columns are present --- dbt/adapters/duckdb/plugins/glue.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/dbt/adapters/duckdb/plugins/glue.py b/dbt/adapters/duckdb/plugins/glue.py index 3c4e4d86..b178d0f6 100644 --- a/dbt/adapters/duckdb/plugins/glue.py +++ b/dbt/adapters/duckdb/plugins/glue.py @@ -165,13 +165,12 @@ def _get_column_type_def( def _add_partition_columns(table_def: TableInputTypeDef, partition_columns: List[Dict[str, str]]) -> TableInputTypeDef: if 'PartitionKeys' not in table_def: table_def['PartitionKeys'] = [] - for column_dict in partition_columns: - for column, column_type in column_dict.items(): - column_type_def = ColumnTypeDef( - Name=column, - Type=column_type - ) - table_def['PartitionKeys'].append(column_type_def) + for column in partition_columns: + column_type_def = ColumnTypeDef( + Name=column['name'], + Type=column['type'] + ) + table_def['PartitionKeys'].append(column_type_def) return table_def def _get_table_def( @@ -217,7 +216,7 @@ def create_or_update_table( s3_path: str, file_format: str, delimiter: str, - partition_columns: List[Dict[str, str]], + partition_columns: List[Dict[str, str]] = [], ) -> None: # Existing table in AWS Glue catalog glue_table = _get_table(client=client, database=database, table=table) @@ -229,7 +228,9 @@ def create_or_update_table( file_format=file_format, delimiter=delimiter ) - table_def = _add_partition_columns(table_def, partition_columns) + # Add partition columns + if partition_columns != []: + table_def = _add_partition_columns(table_def, partition_columns) if glue_table: # Existing columns in AWS Glue catalog glue_columns = _get_column_type_def(glue_table) From c06374434548c75dbd499d5ef7b628faf27f7652 Mon Sep 17 00:00:00 2001 From: Arno Roos Date: Fri, 26 Jan 2024 10:44:03 +0100 Subject: [PATCH 4/7] set s3_parent path to original if partitioning is used --- dbt/adapters/duckdb/plugins/glue.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/dbt/adapters/duckdb/plugins/glue.py b/dbt/adapters/duckdb/plugins/glue.py index b178d0f6..bd0ce0d5 100644 --- a/dbt/adapters/duckdb/plugins/glue.py +++ b/dbt/adapters/duckdb/plugins/glue.py @@ -171,16 +171,19 @@ def _add_partition_columns(table_def: TableInputTypeDef, partition_columns: List Type=column['type'] ) table_def['PartitionKeys'].append(column_type_def) + # Remove columns from StorageDescriptor if they match with partition columns to avoid duplicate columns + for partition_column in partition_columns: + table_def['StorageDescriptor']['Columns'] = [column for column in table_def['StorageDescriptor']['Columns'] if not (column['Name'] == partition_column['name'] and column['Type'] == partition_column['type'])] return table_def def _get_table_def( table: str, - s3_path: str, + s3_parent: str, columns: Sequence["ColumnTypeDef"], file_format: str, delimiter: str, ): - s3_parent = "/".join(s3_path.split("/")[:-1]) + if file_format == "csv": table_def = _get_csv_table_def( table=table, @@ -218,12 +221,18 @@ def create_or_update_table( delimiter: str, partition_columns: List[Dict[str, str]] = [], ) -> None: + # Set s3 original path if partitioning is used, else use parent path + if partition_columns != []: + s3_parent = s3_path + if partition_columns == []: + s3_parent = "/".join(s3_path.split("/")[:-1]) + # Existing table in AWS Glue catalog glue_table = _get_table(client=client, database=database, table=table) columns = _convert_columns(column_list) table_def = _get_table_def( table=table, - s3_path=s3_path, + s3_parent=s3_parent, columns=columns, file_format=file_format, delimiter=delimiter From 44c0d9efff0b03e1b82e8bb4a0b3962fbe2e6485 Mon Sep 17 00:00:00 2001 From: Arno Roos Date: Fri, 26 Jan 2024 18:13:19 +0100 Subject: [PATCH 5/7] Fixed parameter hint for partition_columns --- dbt/adapters/duckdb/plugins/glue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/duckdb/plugins/glue.py b/dbt/adapters/duckdb/plugins/glue.py index bd0ce0d5..7adf5e2b 100644 --- a/dbt/adapters/duckdb/plugins/glue.py +++ b/dbt/adapters/duckdb/plugins/glue.py @@ -162,7 +162,7 @@ def _get_column_type_def( else: return None -def _add_partition_columns(table_def: TableInputTypeDef, partition_columns: List[Dict[str, str]]) -> TableInputTypeDef: +def _add_partition_columns(table_def: TableInputTypeDef, partition_columns: List[ColumnTypeDef]) -> TableInputTypeDef: if 'PartitionKeys' not in table_def: table_def['PartitionKeys'] = [] for column in partition_columns: From 9da15a619df6cb80585b74170421d560ca16bc4f Mon Sep 17 00:00:00 2001 From: Arno Roos Date: Fri, 26 Jan 2024 18:39:58 +0100 Subject: [PATCH 6/7] fixed partition_columns hint --- dbt/adapters/duckdb/plugins/glue.py | 35 +++++++++++++++++------------ 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/dbt/adapters/duckdb/plugins/glue.py b/dbt/adapters/duckdb/plugins/glue.py index 7adf5e2b..8c663598 100644 --- a/dbt/adapters/duckdb/plugins/glue.py +++ b/dbt/adapters/duckdb/plugins/glue.py @@ -1,8 +1,8 @@ from typing import Any from typing import Dict +from typing import List from typing import Optional from typing import Sequence -from typing import List import boto3 from mypy_boto3_glue import GlueClient @@ -162,20 +162,28 @@ def _get_column_type_def( else: return None -def _add_partition_columns(table_def: TableInputTypeDef, partition_columns: List[ColumnTypeDef]) -> TableInputTypeDef: - if 'PartitionKeys' not in table_def: - table_def['PartitionKeys'] = [] + +def _add_partition_columns( + table_def: TableInputTypeDef, partition_columns: List[Dict[str, str]] +) -> TableInputTypeDef: + if "PartitionKeys" not in table_def: + table_def["PartitionKeys"] = [] for column in partition_columns: - column_type_def = ColumnTypeDef( - Name=column['name'], - Type=column['type'] - ) - table_def['PartitionKeys'].append(column_type_def) + column_type_def = ColumnTypeDef(Name=column["name"], Type=column["type"]) + table_def["PartitionKeys"].append(column_type_def) # Remove columns from StorageDescriptor if they match with partition columns to avoid duplicate columns for partition_column in partition_columns: - table_def['StorageDescriptor']['Columns'] = [column for column in table_def['StorageDescriptor']['Columns'] if not (column['Name'] == partition_column['name'] and column['Type'] == partition_column['type'])] + table_def["StorageDescriptor"]["Columns"] = [ + column + for column in table_def["StorageDescriptor"]["Columns"] + if not ( + column["Name"] == partition_column["name"] + and column["Type"] == partition_column["type"] + ) + ] return table_def + def _get_table_def( table: str, s3_parent: str, @@ -183,7 +191,6 @@ def _get_table_def( file_format: str, delimiter: str, ): - if file_format == "csv": table_def = _get_csv_table_def( table=table, @@ -235,7 +242,7 @@ def create_or_update_table( s3_parent=s3_parent, columns=columns, file_format=file_format, - delimiter=delimiter + delimiter=delimiter, ) # Add partition columns if partition_columns != []: @@ -260,7 +267,7 @@ def store(self, target_config: TargetConfig): assert target_config.location is not None assert target_config.relation.identifier is not None table: str = target_config.relation.identifier - partition_columns = target_config.config.get('partition_columns', []) + partition_columns = target_config.config.get("partition_columns", []) create_or_update_table( self.client, @@ -270,5 +277,5 @@ def store(self, target_config: TargetConfig): target_config.location.path, target_config.location.format, self.delimiter, - partition_columns + partition_columns, ) From 7960d03cf18a334c9622a08634057d07276492e2 Mon Sep 17 00:00:00 2001 From: Arno Roos Date: Fri, 26 Jan 2024 19:02:11 +0100 Subject: [PATCH 7/7] passed the typekey test --- dbt/adapters/duckdb/plugins/glue.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dbt/adapters/duckdb/plugins/glue.py b/dbt/adapters/duckdb/plugins/glue.py index 8c663598..76f25419 100644 --- a/dbt/adapters/duckdb/plugins/glue.py +++ b/dbt/adapters/duckdb/plugins/glue.py @@ -163,22 +163,22 @@ def _get_column_type_def( return None -def _add_partition_columns( - table_def: TableInputTypeDef, partition_columns: List[Dict[str, str]] -) -> TableInputTypeDef: +def _add_partition_columns(table_def: TableInputTypeDef, partition_columns) -> TableInputTypeDef: + partition_keys = [] if "PartitionKeys" not in table_def: table_def["PartitionKeys"] = [] for column in partition_columns: - column_type_def = ColumnTypeDef(Name=column["name"], Type=column["type"]) - table_def["PartitionKeys"].append(column_type_def) + partition_column = ColumnTypeDef(Name=column["Name"], Type=column["Type"]) + partition_keys.append(partition_column) + table_def["PartitionKeys"] = partition_keys # Remove columns from StorageDescriptor if they match with partition columns to avoid duplicate columns for partition_column in partition_columns: table_def["StorageDescriptor"]["Columns"] = [ column for column in table_def["StorageDescriptor"]["Columns"] if not ( - column["Name"] == partition_column["name"] - and column["Type"] == partition_column["type"] + column["Name"] == partition_column["Name"] + and column["Type"] == partition_column["Type"] ) ] return table_def