Skip to content

Commit

Permalink
Merge branch 'main' into fix_on_schema_change
Browse files Browse the repository at this point in the history
  • Loading branch information
moomindani authored Feb 17, 2025
2 parents e8018f1 + 690f158 commit d4f19ef
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 3 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## Future Release
- Allow spawning new isolated sessions for the models that require different session configuration
## New version
- Correctly handle EntityNotFound when trying to determine session state, setting state to does not exist instead of STOPPED.
- Allow spawning new isolated sessions for the models that require different session configuration.
- Correctly handle EntityNotFound when listing relations.
- Fix the get_columns_in_relation function error when on_schema_change is specified

## v1.9.0
Expand Down
6 changes: 6 additions & 0 deletions dbt/adapters/glue/gluedbapi/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,12 @@ def state(self) -> str:
session = response.get("Session", {})
self._state = session.get("Status")
except Exception as e:
if isinstance(e, botocore.exceptions.ClientError):
if e.response['Error']['Code'] == 'EntityNotFoundException':
logger.debug(f"Session {self.session_id} not found")
logger.debug(e)
self._state = None
return self._state
logger.debug(f"Error while checking state of session {self.session_id}")
logger.debug(e)
self._state = GlueSessionState.STOPPED
Expand Down
2 changes: 2 additions & 0 deletions dbt/adapters/glue/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def list_relations_without_caching(self, schema_relation: SparkRelation):
type=self.relation_type_map.get(table.get("TableType")),
))
return relations
except client.exceptions.EntityNotFoundException as e:
return []
except Exception as e:
logger.error(e)

Expand Down
38 changes: 37 additions & 1 deletion tests/unit/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from multiprocessing import get_context
from botocore.client import BaseClient
from moto import mock_aws
import boto3

import agate
from dbt.config import RuntimeConfig
Expand All @@ -13,6 +14,7 @@
from dbt.adapters.glue import GlueAdapter
from dbt.adapters.glue.gluedbapi import GlueConnection
from dbt.adapters.glue.relation import SparkRelation
from dbt.adapters.contracts.relation import RelationConfig
from tests.util import config_from_parts_or_dicts
from .util import MockAWSService

Expand Down Expand Up @@ -122,4 +124,38 @@ def test_get_custom_iceberg_catalog_namespace(self):
with mock.patch("dbt.adapters.glue.connections.open"):
connection = adapter.acquire_connection("dummy")
connection.handle # trigger lazy-load
self.assertEqual(adapter.get_custom_iceberg_catalog_namespace(), "custom_iceberg_catalog")
self.assertEqual(adapter.get_custom_iceberg_catalog_namespace(), "custom_iceberg_catalog")

@mock_aws
def test_when_database_not_exists_list_relations_without_caching_returns_empty_array(self):
config = self._get_config()
adapter = GlueAdapter(config, get_context("spawn"))
adapter.get_connection = lambda : (None, boto3.client("glue", region_name="us-east-1"))
relation = Mock(SparkRelation)
relation.schema = 'mockdb'
actual = adapter.list_relations_without_caching(relation)
self.assertEqual([],actual)

@mock_aws
def test_list_relations_returns_database_tables(self):
config = self._get_config()
glue_client = boto3.client("glue", region_name="us-east-1")

# Prepare database tables
database_name = 'mockdb'
table_names = ['table1', 'table2', 'table3']
glue_client.create_database(DatabaseInput={"Name":database_name})
for table_name in table_names:
glue_client.create_table(DatabaseName=database_name,TableInput={"Name":table_name})
expected = [(database_name, table_name) for table_name in table_names]

# Prepare adapter for test
adapter = GlueAdapter(config, get_context("spawn"))
adapter.get_connection = lambda : (None, glue_client)
relation = Mock(SparkRelation)
relation.schema = database_name

relations = adapter.list_relations_without_caching(relation)

actual = [(relation.path.schema, relation.path.identifier) for relation in relations]
self.assertCountEqual(expected,actual)

0 comments on commit d4f19ef

Please sign in to comment.