From ac86862251cd6cb31f454e7e6944ce12ef4fcd57 Mon Sep 17 00:00:00 2001 From: Pat Patterson Date: Wed, 15 Jan 2025 14:53:30 -0800 Subject: [PATCH] Fix spurious warnings and bogus index when reflecting Iceberg tables --- trino/sqlalchemy/dialect.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/trino/sqlalchemy/dialect.py b/trino/sqlalchemy/dialect.py index ad28b18a..b8b794c8 100644 --- a/trino/sqlalchemy/dialect.py +++ b/trino/sqlalchemy/dialect.py @@ -229,6 +229,19 @@ def _get_partitions( partition_names = [desc[0] for desc in res.cursor.description] return partition_names + def _connector_is_hive(self, connection: Connection, catalog_name: str): + query = dedent( + """ + SELECT + COUNT(*) + FROM "system"."metadata"."table_properties" + WHERE "catalog_name" = :catalog_name + AND "property_name" = 'bucketing_version' + """ + ).strip() + res = connection.execute(sql.text(query), {"catalog_name": catalog_name}) + return res.scalar() == 1 + def get_pk_constraint(self, connection: Connection, table_name: str, schema: str = None, **kw) -> Dict[str, Any]: """Trino has no support for primary keys. Returns a dummy""" return dict(name=None, constrained_columns=[]) @@ -322,11 +335,17 @@ def get_indexes(self, connection: Connection, table_name: str, schema: str = Non if not self.has_table(connection, table_name, schema): raise exc.NoSuchTableError(f"schema={schema}, table={table_name}") + catalog_name = self._get_default_catalog_name(connection) + if catalog_name is None: + raise exc.NoSuchTableError("catalog is required in connection") + if not self._connector_is_hive(connection, catalog_name): + return [] + partitioned_columns = None try: partitioned_columns = self._get_partitions(connection, f"{table_name}", schema) except Exception as e: - # e.g. it's not a Hive table or an unpartitioned Hive table + # e.g. it's an unpartitioned Hive table logger.debug("Couldn't fetch partition columns. schema: %s, table: %s, error: %s", schema, table_name, e) if not partitioned_columns: return []