Skip to content

Commit

Permalink
Fix spurious warnings and bogus index when reflecting Iceberg tables
Browse files Browse the repository at this point in the history
  • Loading branch information
metadaddy committed Jan 24, 2025
1 parent 24cc388 commit 4d1f272
Showing 1 changed file with 38 additions and 1 deletion.
39 changes: 38 additions & 1 deletion trino/sqlalchemy/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,33 @@ def _get_partitions(
partition_names = [desc[0] for desc in res.cursor.description]
return partition_names

def _has_connector_name(self, connection: Connection):
query = dedent(
"""
SELECT
COUNT(*)
FROM "system"."information_schema"."columns"
WHERE "table_catalog" = 'system'
AND "table_schema" = 'metadata'
AND "table_name" = 'catalogs'
AND "column_name" = 'connector_name'
"""
).strip()
res = connection.execute(sql.text(query))
return res.scalar() == 1

def _get_connector_name(self, connection: Connection, catalog_name: str):
query = dedent(
"""
SELECT
"connector_name"
FROM "system"."metadata"."catalogs"
WHERE "catalog_name" = :catalog_name
"""
).strip()
res = connection.execute(sql.text(query), {"catalog_name": catalog_name})
return res.scalar()

def get_pk_constraint(self, connection: Connection, table_name: str, schema: str = None, **kw) -> Dict[str, Any]:
"""Trino has no support for primary keys. Returns a dummy"""
return dict(name=None, constrained_columns=[])
Expand Down Expand Up @@ -322,11 +349,21 @@ def get_indexes(self, connection: Connection, table_name: str, schema: str = Non
if not self.has_table(connection, table_name, schema):
raise exc.NoSuchTableError(f"schema={schema}, table={table_name}")

if self._has_connector_name(connection):
catalog_name = self._get_default_catalog_name(connection)
if catalog_name is None:
raise exc.NoSuchTableError("catalog is required in connection")
connector_name = self._get_connector_name(connection, catalog_name)
if connector_name is None:
raise exc.NoSuchTableError("connector name is required")
if connector_name != "hive":
return []

partitioned_columns = None
try:
partitioned_columns = self._get_partitions(connection, f"{table_name}", schema)
except Exception as e:
# e.g. it's not a Hive table or an unpartitioned Hive table
# e.g. it's an unpartitioned Hive table
logger.debug("Couldn't fetch partition columns. schema: %s, table: %s, error: %s", schema, table_name, e)
if not partitioned_columns:
return []
Expand Down

0 comments on commit 4d1f272

Please sign in to comment.