diff --git a/CHANGELOG.md b/CHANGELOG.md index fcb0444..389f219 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## New version +- Adds update_iceberg_ts column with an add_iceberg_timestamp option. + ## v1.9.0 - Allow to load big seed files - Migrates the PySpark code for the Iceberg file format at a macro level, making the impl.py file more readable. diff --git a/dbt/include/glue/macros/adapters.sql b/dbt/include/glue/macros/adapters.sql index c687187..b18eafb 100644 --- a/dbt/include/glue/macros/adapters.sql +++ b/dbt/include/glue/macros/adapters.sql @@ -70,6 +70,7 @@ {%- else -%} {%- set file_format = config.get('file_format', validator=validation.any[basestring]) -%} {%- set table_properties = config.get('table_properties', default={}) -%} + {%- set add_timestamp = config.get('add_iceberg_timestamp', default=true) -%} {%- set create_statement_string -%} {% if file_format in ['delta', 'iceberg'] -%} @@ -79,10 +80,25 @@ {% endif %} {%- endset %} + {%- if file_format == 'iceberg' and add_timestamp %} + {# For Iceberg tables with timestamp enabled, wrap the SQL to include update_iceberg_ts #} + {%- set modified_sql -%} + with source_data as ( + {{ sql }} + ) + select + *, + current_timestamp() as update_iceberg_ts + from source_data + {%- endset %} + {%- else %} + {%- set modified_sql = sql -%} + {%- endif %} + {{ create_statement_string }} {{ relation }} {% set contract_config = config.get('contract') %} {% if contract_config.enforced %} - {{ get_assert_columns_equivalent(sql) }} + {{ get_assert_columns_equivalent(modified_sql) }} {#-- This does not enforce contstraints and needs to be a TODO #} {#-- We'll need to change up the query because with CREATE TABLE AS SELECT, #} {#-- you do not specify the columns #} @@ -94,7 +110,7 @@ {{ glue__location_clause() }} {{ comment_clause() }} as - {{ sql }} + {{ modified_sql }} {%- endif %} {%- endmacro -%} diff --git a/tests/functional/adapter/test_iceberg.py b/tests/functional/adapter/test_iceberg.py index c457532..d31a90f 100644 --- a/tests/functional/adapter/test_iceberg.py +++ b/tests/functional/adapter/test_iceberg.py @@ -192,7 +192,7 @@ def test_incremental(self, project): # base table rowcount relation = relation_from_name(project.adapter, "base") - + project.run_sql(f"refresh table {relation}") # run refresh table to disable the previous parquet file paths result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") @@ -275,3 +275,94 @@ class TestTableMatGlue(BaseTableMaterialization): class TestValidateConnectionGlue(BaseValidateConnection): pass + +class TestIcebergTimestamp: + """ + Test class to verify that the `update_iceberg_ts` column is correctly added + when `add_iceberg_timestamp` is set to True, and is not added otherwise. + """ + + @pytest.fixture(scope="class") + def models(self): + """ + Provide two models for testing: + 1. iceberg_timestamp_enabled.sql + - Has add_iceberg_timestamp=True + 2. iceberg_timestamp_disabled.sql + - Has add_iceberg_timestamp=False + """ + + iceberg_timestamp_enabled_sql = """ + {{ config( + materialized="table", + file_format="iceberg", + add_iceberg_timestamp=True + ) }} + select + 1 as id, + 'enabled' as status + """ + + iceberg_timestamp_disabled_sql = """ + {{ config( + materialized="table", + file_format="iceberg", + add_iceberg_timestamp=False + ) }} + select + 2 as id, + 'disabled' as status + """ + + return { + "iceberg_timestamp_enabled.sql": iceberg_timestamp_enabled_sql, + "iceberg_timestamp_disabled.sql": iceberg_timestamp_disabled_sql, + "schema.yml": schema_base_yml, + } + + def test_iceberg_timestamp_enabled(self, project): + """ + When add_iceberg_timestamp=True, the column 'update_iceberg_ts' must exist. + """ + results = run_dbt(["seed"]) + assert len(results) == 1 + + results = run_dbt(["run", "-m", "iceberg_timestamp_enabled"]) + assert len(results) == 1 + + relation_enabled = relation_from_name(project.adapter, "iceberg_timestamp_enabled") + columns_enabled = project.run_sql(f"DESCRIBE {relation_enabled}", fetch="all") + column_names_enabled = [col[0].lower() for col in columns_enabled] + + assert "update_iceberg_ts" in column_names_enabled, ( + f"Expected 'update_iceberg_ts' column in {relation_enabled}, but only got: {column_names_enabled}" + ) + + result_enabled = project.run_sql( + f"SELECT count(*) FROM {relation_enabled}", fetch="one" + ) + assert result_enabled[0] == 1 + + def test_iceberg_timestamp_disabled(self, project): + """ + When add_iceberg_timestamp=False, the column 'update_iceberg_ts' must NOT exist. + """ + results = run_dbt(["seed"]) + assert len(results) == 1 + + results = run_dbt(["run", "-m", "iceberg_timestamp_disabled"]) + assert len(results) == 1 + + relation_disabled = relation_from_name(project.adapter, "iceberg_timestamp_disabled") + columns_disabled = project.run_sql(f"DESCRIBE {relation_disabled}", fetch="all") + column_names_disabled = [col[0].lower() for col in columns_disabled] + + assert "update_iceberg_ts" not in column_names_disabled, ( + f"Did not expect 'update_iceberg_ts' column in {relation_disabled}, " + f"but got columns: {column_names_disabled}" + ) + + result_disabled = project.run_sql( + f"SELECT count(*) FROM {relation_disabled}", fetch="one" + ) + assert result_disabled[0] == 1 \ No newline at end of file diff --git a/tests/functional/conftest.py b/tests/functional/conftest.py index 0f581cb..4eb9124 100644 --- a/tests/functional/conftest.py +++ b/tests/functional/conftest.py @@ -40,7 +40,7 @@ def dbt_profile_target(unique_schema, use_arrow): 'datalake_formats': 'delta,iceberg', 'conf': f"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.sql.legacy.allowNonEmptyLocationInCTAS=true --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse={get_s3_location()} --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.sources.partitionOverwriteMode=dynamic", 'glue_session_reuse': True, - 'use_arrow': use_arrow + 'use_arrow': False } diff --git a/tests/unit/macros/base.py b/tests/unit/macros/base.py new file mode 100644 index 0000000..24f9398 --- /dev/null +++ b/tests/unit/macros/base.py @@ -0,0 +1,159 @@ +import re +import os +from typing import Any +from unittest.mock import Mock + +import pytest +from jinja2 import Environment, FileSystemLoader, Template + +from dbt.adapters.glue.relation import SparkRelation + + +class TemplateBundle: + def __init__(self, template, context, relation): + self.template = template + self.context = context + self.relation = relation + + +class GlueMacroTestBase: + @pytest.fixture(autouse=True) + def config(self, context) -> dict: + """ + Anything you put in this dict will be returned by config in the rendered template + """ + local_config: dict[str, Any] = {} + context["config"].get = lambda key, default=None, **kwargs: local_config.get(key, default) + return local_config + + @pytest.fixture(scope="class") + def default_context(self) -> dict: + """ + This is the default context used in all tests. + """ + context = { + "validation": Mock(), + "model": Mock(), + "exceptions": Mock(), + "config": Mock(), + "statement": lambda r, caller: r, + "adapter": Mock(), + "var": Mock(), + "return": lambda r: r, + "is_incremental": Mock(return_value=False), + } + return context + + @pytest.fixture(scope="class") + def macro_folders_to_load(self) -> list: + """ + This is a list of folders from which we look to load Glue macro templates. + All folders are relative to the dbt/include/glue folder. + """ + return ["macros"] + + def _get_project_root(self): + """Get the project root directory""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + while not os.path.exists(os.path.join(current_dir, "dbt")): + parent = os.path.dirname(current_dir) + if parent == current_dir: + raise RuntimeError("Could not find project root") + current_dir = parent + return current_dir + + @pytest.fixture(scope="class") + def glue_env(self, macro_folders_to_load) -> Environment: + """ + The environment used for rendering Glue macros + """ + project_root = self._get_project_root() + search_paths = [] + + for folder in macro_folders_to_load: + path = os.path.join(project_root, "dbt", "include", "glue", folder) + if os.path.exists(path): + search_paths.append(path) + + if not search_paths: + raise RuntimeError(f"No macro folders found in {search_paths}") + + return Environment( + loader=FileSystemLoader(search_paths), + extensions=["jinja2.ext.do"], + ) + + @pytest.fixture(scope="class") + def template_name(self) -> str: + """ + The name of the Glue template you want to test, not including the path. + Example: "adapters.sql" + """ + raise NotImplementedError("Must be implemented by subclasses") + + @pytest.fixture + def template(self, template_name, default_context, glue_env) -> Template: + """ + This creates the template you will test against. + """ + context = default_context.copy() + current_template = glue_env.get_template(template_name, globals=context) + + def dispatch(macro_name, macro_namespace=None, packages=None): + if hasattr(current_template.module, f"glue__{macro_name}"): + return getattr(current_template.module, f"glue__{macro_name}") + else: + return context[f"glue__{macro_name}"] + + context["adapter"].dispatch = dispatch + return current_template + + @pytest.fixture + def context(self, template) -> dict: + """ + Access to the context used to render the template. + """ + return template.globals + + @pytest.fixture(scope="class") + def relation(self): + """ + Dummy relation to use in tests. + """ + data = { + "path": { + "database": "test_db", + "schema": "test_schema", + "identifier": "test_table", + }, + "type": None, + } + return SparkRelation.from_dict(data) + + @pytest.fixture + def template_bundle(self, template, context, relation): + """ + Bundles up the compiled template, its context, and a dummy relation. + """ + context["model"].alias = relation.identifier + return TemplateBundle(template, context, relation) + + def run_macro_raw(self, template, name, *args): + """ + Run the named macro from a template, and return the rendered value. + """ + return getattr(template.module, name)(*args) + + def run_macro(self, template, name, *args): + """ + Run the named macro from a template, and return the rendered value. + This version strips off extra whitespace and newlines. + """ + value = self.run_macro_raw(template, name, *args) + return re.sub(r"\s\s+", " ", value).strip() + + def render_bundle(self, template_bundle, name, *args): + """ + Convenience method for macros that take a relation as a first argument. + """ + return self.run_macro(template_bundle.template, name, template_bundle.relation, *args) \ No newline at end of file diff --git a/tests/unit/macros/test_macros.py b/tests/unit/macros/test_macros.py new file mode 100644 index 0000000..a3fa3d4 --- /dev/null +++ b/tests/unit/macros/test_macros.py @@ -0,0 +1,98 @@ +from base import GlueMacroTestBase +import pytest +from unittest.mock import Mock, MagicMock + +class TestGlueMacros(GlueMacroTestBase): + @pytest.fixture(scope="class") + def macro_folders_to_load(self) -> list: + return ["macros"] + + @pytest.fixture(scope="class") + def template_name(self) -> str: + return "adapters.sql" + + @pytest.fixture(scope="class") + def default_context(self) -> dict: + context = { + "validation": Mock(), + "model": Mock(), + "exceptions": Mock(), + "config": Mock(), + "statement": lambda r, caller: r, + "adapter": Mock(), + "var": Mock(), + "return": lambda r: r, + "is_incremental": Mock(return_value=False), + "partition_cols": lambda *args, **kwargs: "", + "clustered_cols": lambda *args, **kwargs: "", + "comment_clause": lambda *args, **kwargs: "", + "set_table_properties": lambda *args, **kwargs: "", + "create_temporary_view": lambda *args, **kwargs: "", + "glue__location_clause": lambda *args, **kwargs: "", + "glue__file_format_clause": lambda *args, **kwargs: "using {} ".format( + context["config"].get("file_format", "parquet") + ), + } + + # Mock model macro calls + mock_model = MagicMock() + mock_model.alias = "test_table" + context["model"] = mock_model + + # Mock config.get calls + local_config = {} + mock_config = MagicMock() + mock_config.get = lambda key, default=None, **kwargs: local_config.get(key, default) + context["config"] = mock_config + + return context + + def test_create_table_as_with_timestamp_enabled(self, template, relation, config): + """Test table creation with timestamp column enabled""" + config.update({ + 'file_format': 'iceberg', + 'add_iceberg_timestamp': True + }) + + sql = "select 1 as id, 'test' as name" + result = self.run_macro(template, "glue__create_table_as", False, relation, sql) + + # Check if the result contains necessary elements + assert "create or replace table" in result.lower() + assert "using iceberg" in result.lower() + assert "current_timestamp() as update_iceberg_ts" in result.lower() + + def test_create_table_as_with_timestamp_disabled(self, template, relation, config): + """Test table creation with timestamp column disabled""" + config.update({ + 'file_format': 'iceberg', + 'add_iceberg_timestamp': False + }) + + sql = "select 1 as id, 'test' as name" + result = self.run_macro(template, "glue__create_table_as", False, relation, sql) + + # Check if timestamp column is not added + assert "create or replace table" in result.lower() + assert "using iceberg" in result.lower() + assert "update_iceberg_ts" not in result.lower() + + def test_create_table_as_non_iceberg(self, template, relation, config): + """Test table creation with non-iceberg format""" + config.update({ + 'file_format': 'parquet', + 'add_iceberg_timestamp': True # This option should be ignored for non-iceberg format + }) + + sql = "select 1 as id, 'test' as name" + result = self.run_macro(template, "glue__create_table_as", False, relation, sql) + + # Check if timestamp is not added for non-iceberg format + assert "create table" in result.lower() + assert "using parquet" in result.lower() + assert "update_iceberg_ts" not in result.lower() + + def test_file_format_defaults(self, template): + """Test default value for file format""" + result = self.run_macro(template, "glue__file_format_clause") + assert "using parquet" in result.lower() # parquet is default \ No newline at end of file