From 62d9a03f49afe317265f96bc7cfa5000173c704e Mon Sep 17 00:00:00 2001 From: Pat Patterson Date: Wed, 15 Jan 2025 14:53:30 -0800 Subject: [PATCH 1/2] Fix spurious warnings and bogus index when reflecting Iceberg tables --- trino/sqlalchemy/dialect.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/trino/sqlalchemy/dialect.py b/trino/sqlalchemy/dialect.py index ce3e537f..ad05aeec 100644 --- a/trino/sqlalchemy/dialect.py +++ b/trino/sqlalchemy/dialect.py @@ -214,7 +214,7 @@ def _get_partitions( connection: Connection, table_name: str, schema: str = None - ) -> List[Dict[str, List[Any]]]: + ) -> Optional[List[str]]: schema = schema or self._get_default_schema_name(connection) query = dedent( f""" @@ -223,6 +223,17 @@ def _get_partitions( ).strip() res = connection.execute(sql.text(query)) partition_names = [desc[0] for desc in res.cursor.description] + data_types = [desc[1] for desc in res.cursor.description] + # Compare the column names and types to the shape of an Iceberg $partitions table + if (partition_names == ['partition', 'record_count', 'file_count', 'total_size', 'data'] + and data_types[0].startswith('row(') + and data_types[1] == 'bigint' + and data_types[2] == 'bigint' + and data_types[3] == 'bigint' + and data_types[4].startswith('row(')): + # This is an Iceberg $partitions table - these match the partition metadata columns + return None + # This is a Hive table - these are the partition names return partition_names def get_pk_constraint(self, connection: Connection, table_name: str, schema: str = None, **kw) -> Dict[str, Any]: @@ -322,7 +333,7 @@ def get_indexes(self, connection: Connection, table_name: str, schema: str = Non try: partitioned_columns = self._get_partitions(connection, f"{table_name}", schema) except Exception as e: - # e.g. it's not a Hive table or an unpartitioned Hive table + # e.g. it's an unpartitioned Hive table logger.debug("Couldn't fetch partition columns. schema: %s, table: %s, error: %s", schema, table_name, e) if not partitioned_columns: return [] From c1dfd7e503ec3162c02d8284773108462faf5f76 Mon Sep 17 00:00:00 2001 From: Ashhar Hasan Date: Fri, 6 Mar 2026 21:28:57 +0530 Subject: [PATCH 2/2] Add integration tests for get_indexes on Iceberg and Hive tables Enable dynamic catalog management in the dev server so tests can use CREATE CATALOG. Add tests verifying that get_indexes returns empty for Iceberg tables and returns partition columns for Hive tables. Co-Authored-By: Claude Sonnet 4.6 --- etc/config.properties | 3 + tests/development_server.py | 4 +- .../test_sqlalchemy_integration.py | 97 +++++++++++++++++++ 3 files changed, 101 insertions(+), 3 deletions(-) diff --git a/etc/config.properties b/etc/config.properties index 10372938..38c58875 100644 --- a/etc/config.properties +++ b/etc/config.properties @@ -13,5 +13,8 @@ protocol.spooling.enabled=true protocol.spooling.shared-secret-key=jxTKysfCBuMZtFqUf8UJDQ1w9ez8rynEJsJqgJf66u0= protocol.spooling.retrieval-mode=coordinator_proxy +# Enable dynamic catalog management +catalog.management=dynamic + # Disable http request log http-server.log.enabled=false diff --git a/tests/development_server.py b/tests/development_server.py index fd15c201..2928a6da 100644 --- a/tests/development_server.py +++ b/tests/development_server.py @@ -76,9 +76,6 @@ def start_development_server(port=None, trino_version=TRINO_VERSION): root = Path(__file__).parent.parent - trino = trino \ - .with_volume_mapping(str(root / "etc/catalog"), "/etc/trino/catalog") - # Enable spooling config if supports_spooling_protocol: trino \ @@ -89,6 +86,7 @@ def start_development_server(port=None, trino_version=TRINO_VERSION): .with_volume_mapping(str(root / "etc/config.properties"), "/etc/trino/config.properties") else: trino \ + .with_volume_mapping(str(root / "etc/catalog"), "/etc/trino/catalog") \ .with_volume_mapping(str(root / "etc/jvm-pre-466.config"), "/etc/trino/jvm.config") \ .with_volume_mapping(str(root / "etc/config-pre-466.properties"), "/etc/trino/config.properties") diff --git a/tests/integration/test_sqlalchemy_integration.py b/tests/integration/test_sqlalchemy_integration.py index 896d0d91..e5160bb0 100644 --- a/tests/integration/test_sqlalchemy_integration.py +++ b/tests/integration/test_sqlalchemy_integration.py @@ -20,6 +20,7 @@ from sqlalchemy.sql import or_ from sqlalchemy.types import ARRAY +import trino.dbapi from tests.integration.conftest import trino_version from tests.unit.conftest import sqlalchemy_version from trino.sqlalchemy.datatype import JSON @@ -757,3 +758,99 @@ def _num_queries_containing_string(connection, query_string): result = connection.execute(statement) rows = result.fetchall() return len(list(filter(lambda rec: query_string in rec[0], rows))) + + +@pytest.mark.skipif(trino_version() == 351, reason="Dynamic catalogs not supported") +def test_get_indexes_returns_empty_for_iceberg_table(run_trino): + host, port = run_trino + catalog_name = "test_iceberg" + schema_name = "test_schema" + table_name = "partitioned" + + conn = trino.dbapi.connect(host=host, port=port, user="test") + try: + cur = conn.cursor() + cur.execute( + f"CREATE CATALOG {catalog_name} USING iceberg " + f"WITH (\"iceberg.catalog.type\" = 'TESTING_FILE_METASTORE', " + f"\"hive.metastore.catalog.dir\" = 'file:///tmp/iceberg-test', " + f"\"fs.native-local.enabled\" = 'true')" + ) + cur.fetchall() + cur.execute(f"CREATE SCHEMA {catalog_name}.{schema_name}") + cur.fetchall() + cur.execute( + f"CREATE TABLE {catalog_name}.{schema_name}.{table_name} " + f"(id INTEGER, year INTEGER) " + f"WITH (partitioning = ARRAY['year'])" + ) + cur.fetchall() + cur.execute( + f"INSERT INTO {catalog_name}.{schema_name}.{table_name} VALUES (1, 2023)" + ) + cur.fetchall() + + engine = sqla.create_engine( + f"trino://test@{host}:{port}/{catalog_name}", + connect_args={"source": "test", "max_attempts": 1}, + ) + indexes = sqla.inspect(engine).get_indexes(table_name, schema=schema_name) + assert indexes == [] + finally: + cur = conn.cursor() + cur.execute(f"DROP TABLE IF EXISTS {catalog_name}.{schema_name}.{table_name}") + cur.fetchall() + cur.execute(f"DROP SCHEMA IF EXISTS {catalog_name}.{schema_name}") + cur.fetchall() + cur.execute(f"DROP CATALOG IF EXISTS {catalog_name}") + cur.fetchall() + conn.close() + + +@pytest.mark.skipif(trino_version() == 351, reason="Dynamic catalogs not supported") +def test_get_indexes_returns_partitions_for_hive_table(run_trino): + host, port = run_trino + catalog_name = "test_hive" + schema_name = "test_schema" + table_name = "partitioned" + + conn = trino.dbapi.connect(host=host, port=port, user="test") + try: + cur = conn.cursor() + cur.execute( + f"CREATE CATALOG {catalog_name} USING hive " + f"WITH (\"hive.metastore\" = 'file', " + f"\"hive.metastore.catalog.dir\" = 'file:///tmp/hive-test', " + f"\"fs.native-local.enabled\" = 'true')" + ) + cur.fetchall() + cur.execute(f"CREATE SCHEMA {catalog_name}.{schema_name}") + cur.fetchall() + cur.execute( + f"CREATE TABLE {catalog_name}.{schema_name}.{table_name} " + f"(id INTEGER, name VARCHAR, region VARCHAR) " + f"WITH (partitioned_by = ARRAY['name', 'region'])" + ) + cur.fetchall() + cur.execute( + f"INSERT INTO {catalog_name}.{schema_name}.{table_name} VALUES (1, 'alice', 'us-east')" + ) + cur.fetchall() + + engine = sqla.create_engine( + f"trino://test@{host}:{port}/{catalog_name}", + connect_args={"source": "test", "max_attempts": 1}, + ) + indexes = sqla.inspect(engine).get_indexes(table_name, schema=schema_name) + assert len(indexes) == 1 + assert indexes[0]["name"] == "partition" + assert indexes[0]["column_names"] == ["name", "region"] + finally: + cur = conn.cursor() + cur.execute(f"DROP TABLE IF EXISTS {catalog_name}.{schema_name}.{table_name}") + cur.fetchall() + cur.execute(f"DROP SCHEMA IF EXISTS {catalog_name}.{schema_name}") + cur.fetchall() + cur.execute(f"DROP CATALOG IF EXISTS {catalog_name}") + cur.fetchall() + conn.close()