Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds update_iceberg_ts column with an add_iceberg_timestamp option. #497

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## New version
- Adds update_iceberg_ts column with an add_iceberg_timestamp option.

## v1.9.0
- Allow to load big seed files
- Migrates the PySpark code for the Iceberg file format at a macro level, making the impl.py file more readable.
Expand Down
20 changes: 18 additions & 2 deletions dbt/include/glue/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
{%- else -%}
{%- set file_format = config.get('file_format', validator=validation.any[basestring]) -%}
{%- set table_properties = config.get('table_properties', default={}) -%}
{%- set add_timestamp = config.get('add_iceberg_timestamp', default=true) -%}

{%- set create_statement_string -%}
{% if file_format in ['delta', 'iceberg'] -%}
Expand All @@ -79,10 +80,25 @@
{% endif %}
{%- endset %}

{%- if file_format == 'iceberg' and add_timestamp %}
{# For Iceberg tables with timestamp enabled, wrap the SQL to include update_iceberg_ts #}
{%- set modified_sql -%}
with source_data as (
{{ sql }}
)
select
*,
current_timestamp() as update_iceberg_ts
from source_data
{%- endset %}
{%- else %}
{%- set modified_sql = sql -%}
{%- endif %}

{{ create_statement_string }} {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{{ get_assert_columns_equivalent(modified_sql) }}
{#-- This does not enforce contstraints and needs to be a TODO #}
{#-- We'll need to change up the query because with CREATE TABLE AS SELECT, #}
{#-- you do not specify the columns #}
Expand All @@ -94,7 +110,7 @@
{{ glue__location_clause() }}
{{ comment_clause() }}
as
{{ sql }}
{{ modified_sql }}
{%- endif %}
{%- endmacro -%}

Expand Down
93 changes: 92 additions & 1 deletion tests/functional/adapter/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def test_incremental(self, project):

# base table rowcount
relation = relation_from_name(project.adapter, "base")

project.run_sql(f"refresh table {relation}")
# run refresh table to disable the previous parquet file paths
result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one")
Expand Down Expand Up @@ -275,3 +275,94 @@ class TestTableMatGlue(BaseTableMaterialization):
class TestValidateConnectionGlue(BaseValidateConnection):
pass


class TestIcebergTimestamp:
"""
Test class to verify that the `update_iceberg_ts` column is correctly added
when `add_iceberg_timestamp` is set to True, and is not added otherwise.
"""

@pytest.fixture(scope="class")
def models(self):
"""
Provide two models for testing:
1. iceberg_timestamp_enabled.sql
- Has add_iceberg_timestamp=True
2. iceberg_timestamp_disabled.sql
- Has add_iceberg_timestamp=False
"""

iceberg_timestamp_enabled_sql = """
{{ config(
materialized="table",
file_format="iceberg",
add_iceberg_timestamp=True
) }}
select
1 as id,
'enabled' as status
"""

iceberg_timestamp_disabled_sql = """
{{ config(
materialized="table",
file_format="iceberg",
add_iceberg_timestamp=False
) }}
select
2 as id,
'disabled' as status
"""

return {
"iceberg_timestamp_enabled.sql": iceberg_timestamp_enabled_sql,
"iceberg_timestamp_disabled.sql": iceberg_timestamp_disabled_sql,
"schema.yml": schema_base_yml,
}

def test_iceberg_timestamp_enabled(self, project):
"""
When add_iceberg_timestamp=True, the column 'update_iceberg_ts' must exist.
"""
results = run_dbt(["seed"])
assert len(results) == 1

results = run_dbt(["run", "-m", "iceberg_timestamp_enabled"])
assert len(results) == 1

relation_enabled = relation_from_name(project.adapter, "iceberg_timestamp_enabled")
columns_enabled = project.run_sql(f"DESCRIBE {relation_enabled}", fetch="all")
column_names_enabled = [col[0].lower() for col in columns_enabled]

assert "update_iceberg_ts" in column_names_enabled, (
f"Expected 'update_iceberg_ts' column in {relation_enabled}, but only got: {column_names_enabled}"
)

result_enabled = project.run_sql(
f"SELECT count(*) FROM {relation_enabled}", fetch="one"
)
assert result_enabled[0] == 1

def test_iceberg_timestamp_disabled(self, project):
"""
When add_iceberg_timestamp=False, the column 'update_iceberg_ts' must NOT exist.
"""
results = run_dbt(["seed"])
assert len(results) == 1

results = run_dbt(["run", "-m", "iceberg_timestamp_disabled"])
assert len(results) == 1

relation_disabled = relation_from_name(project.adapter, "iceberg_timestamp_disabled")
columns_disabled = project.run_sql(f"DESCRIBE {relation_disabled}", fetch="all")
column_names_disabled = [col[0].lower() for col in columns_disabled]

assert "update_iceberg_ts" not in column_names_disabled, (
f"Did not expect 'update_iceberg_ts' column in {relation_disabled}, "
f"but got columns: {column_names_disabled}"
)

result_disabled = project.run_sql(
f"SELECT count(*) FROM {relation_disabled}", fetch="one"
)
assert result_disabled[0] == 1
2 changes: 1 addition & 1 deletion tests/functional/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def dbt_profile_target(unique_schema, use_arrow):
'datalake_formats': 'delta,iceberg',
'conf': f"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.sql.legacy.allowNonEmptyLocationInCTAS=true --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse={get_s3_location()} --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.sources.partitionOverwriteMode=dynamic",
'glue_session_reuse': True,
'use_arrow': use_arrow
'use_arrow': False
}


Expand Down
159 changes: 159 additions & 0 deletions tests/unit/macros/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import re
import os
from typing import Any
from unittest.mock import Mock

import pytest
from jinja2 import Environment, FileSystemLoader, Template

from dbt.adapters.glue.relation import SparkRelation


class TemplateBundle:
def __init__(self, template, context, relation):
self.template = template
self.context = context
self.relation = relation


class GlueMacroTestBase:
@pytest.fixture(autouse=True)
def config(self, context) -> dict:
"""
Anything you put in this dict will be returned by config in the rendered template
"""
local_config: dict[str, Any] = {}
context["config"].get = lambda key, default=None, **kwargs: local_config.get(key, default)
return local_config

@pytest.fixture(scope="class")
def default_context(self) -> dict:
"""
This is the default context used in all tests.
"""
context = {
"validation": Mock(),
"model": Mock(),
"exceptions": Mock(),
"config": Mock(),
"statement": lambda r, caller: r,
"adapter": Mock(),
"var": Mock(),
"return": lambda r: r,
"is_incremental": Mock(return_value=False),
}
return context

@pytest.fixture(scope="class")
def macro_folders_to_load(self) -> list:
"""
This is a list of folders from which we look to load Glue macro templates.
All folders are relative to the dbt/include/glue folder.
"""
return ["macros"]

def _get_project_root(self):
"""Get the project root directory"""
current_dir = os.path.dirname(os.path.abspath(__file__))
while not os.path.exists(os.path.join(current_dir, "dbt")):
parent = os.path.dirname(current_dir)
if parent == current_dir:
raise RuntimeError("Could not find project root")
current_dir = parent
return current_dir

@pytest.fixture(scope="class")
def glue_env(self, macro_folders_to_load) -> Environment:
"""
The environment used for rendering Glue macros
"""
project_root = self._get_project_root()
search_paths = []

for folder in macro_folders_to_load:
path = os.path.join(project_root, "dbt", "include", "glue", folder)
if os.path.exists(path):
search_paths.append(path)

if not search_paths:
raise RuntimeError(f"No macro folders found in {search_paths}")

return Environment(
loader=FileSystemLoader(search_paths),
extensions=["jinja2.ext.do"],
)

@pytest.fixture(scope="class")
def template_name(self) -> str:
"""
The name of the Glue template you want to test, not including the path.
Example: "adapters.sql"
"""
raise NotImplementedError("Must be implemented by subclasses")

@pytest.fixture
def template(self, template_name, default_context, glue_env) -> Template:
"""
This creates the template you will test against.
"""
context = default_context.copy()
current_template = glue_env.get_template(template_name, globals=context)

def dispatch(macro_name, macro_namespace=None, packages=None):
if hasattr(current_template.module, f"glue__{macro_name}"):
return getattr(current_template.module, f"glue__{macro_name}")
else:
return context[f"glue__{macro_name}"]

context["adapter"].dispatch = dispatch
return current_template

@pytest.fixture
def context(self, template) -> dict:
"""
Access to the context used to render the template.
"""
return template.globals

@pytest.fixture(scope="class")
def relation(self):
"""
Dummy relation to use in tests.
"""
data = {
"path": {
"database": "test_db",
"schema": "test_schema",
"identifier": "test_table",
},
"type": None,
}
return SparkRelation.from_dict(data)

@pytest.fixture
def template_bundle(self, template, context, relation):
"""
Bundles up the compiled template, its context, and a dummy relation.
"""
context["model"].alias = relation.identifier
return TemplateBundle(template, context, relation)

def run_macro_raw(self, template, name, *args):
"""
Run the named macro from a template, and return the rendered value.
"""
return getattr(template.module, name)(*args)

def run_macro(self, template, name, *args):
"""
Run the named macro from a template, and return the rendered value.
This version strips off extra whitespace and newlines.
"""
value = self.run_macro_raw(template, name, *args)
return re.sub(r"\s\s+", " ", value).strip()

def render_bundle(self, template_bundle, name, *args):
"""
Convenience method for macros that take a relation as a first argument.
"""
return self.run_macro(template_bundle.template, name, template_bundle.relation, *args)
Loading
Loading