Skip to content

Commit

Permalink
Merge pull request #324 from firewall413/feature/add_partitioning_sup…
Browse files Browse the repository at this point in the history
…port_for_aws_glue

added partition support for Glue
  • Loading branch information
jwills authored Jan 26, 2024
2 parents f1aa417 + 7960d03 commit 5d20dd3
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 3 deletions.
40 changes: 37 additions & 3 deletions dbt/adapters/duckdb/plugins/glue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence

Expand Down Expand Up @@ -162,14 +163,34 @@ def _get_column_type_def(
return None


def _add_partition_columns(table_def: TableInputTypeDef, partition_columns) -> TableInputTypeDef:
partition_keys = []
if "PartitionKeys" not in table_def:
table_def["PartitionKeys"] = []
for column in partition_columns:
partition_column = ColumnTypeDef(Name=column["Name"], Type=column["Type"])
partition_keys.append(partition_column)
table_def["PartitionKeys"] = partition_keys
# Remove columns from StorageDescriptor if they match with partition columns to avoid duplicate columns
for partition_column in partition_columns:
table_def["StorageDescriptor"]["Columns"] = [
column
for column in table_def["StorageDescriptor"]["Columns"]
if not (
column["Name"] == partition_column["Name"]
and column["Type"] == partition_column["Type"]
)
]
return table_def


def _get_table_def(
table: str,
s3_path: str,
s3_parent: str,
columns: Sequence["ColumnTypeDef"],
file_format: str,
delimiter: str,
):
s3_parent = "/".join(s3_path.split("/")[:-1])
if file_format == "csv":
table_def = _get_csv_table_def(
table=table,
Expand Down Expand Up @@ -205,17 +226,27 @@ def create_or_update_table(
s3_path: str,
file_format: str,
delimiter: str,
partition_columns: List[Dict[str, str]] = [],
) -> None:
# Set s3 original path if partitioning is used, else use parent path
if partition_columns != []:
s3_parent = s3_path
if partition_columns == []:
s3_parent = "/".join(s3_path.split("/")[:-1])

# Existing table in AWS Glue catalog
glue_table = _get_table(client=client, database=database, table=table)
columns = _convert_columns(column_list)
table_def = _get_table_def(
table=table,
s3_path=s3_path,
s3_parent=s3_parent,
columns=columns,
file_format=file_format,
delimiter=delimiter,
)
# Add partition columns
if partition_columns != []:
table_def = _add_partition_columns(table_def, partition_columns)
if glue_table:
# Existing columns in AWS Glue catalog
glue_columns = _get_column_type_def(glue_table)
Expand All @@ -236,6 +267,8 @@ def store(self, target_config: TargetConfig):
assert target_config.location is not None
assert target_config.relation.identifier is not None
table: str = target_config.relation.identifier
partition_columns = target_config.config.get("partition_columns", [])

create_or_update_table(
self.client,
self.database,
Expand All @@ -244,4 +277,5 @@ def store(self, target_config: TargetConfig):
target_config.location.path,
target_config.location.format,
self.delimiter,
partition_columns,
)
1 change: 1 addition & 0 deletions dbt/include/duckdb/macros/materializations/external.sql
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
-- register table into glue
{%- set plugin_name = config.get('plugin') -%}
{%- set glue_register = config.get('glue_register', default=false) -%}
{%- set partition_columns = config.get('partition_columns', []) -%}
{% if plugin_name is not none or glue_register is true %}
{% if glue_register %}
{# legacy hack to set the glue database name, deprecate this #}
Expand Down

0 comments on commit 5d20dd3

Please sign in to comment.