Skip to content

Commit c866f83

Browse files
authored
Fix: enable fetching schema for models querying INFORMATION_SCHEMA (#3324)
1 parent 6669d29 commit c866f83

File tree

4 files changed

+78
-15
lines changed

4 files changed

+78
-15
lines changed

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -223,17 +223,35 @@ def dtype_to_sql(dtype: t.Optional[StandardSqlDataType]) -> str:
223223
return "JSON"
224224
return kind.name
225225

226-
table = self._get_table(table_name)
227-
columns = {
228-
field.name: exp.DataType.build(
229-
dtype_to_sql(field.to_standard_sql().type), dialect=self.dialect
230-
)
231-
for field in table.schema
232-
}
233-
if include_pseudo_columns and table.time_partitioning and not table.time_partitioning.field:
234-
columns["_PARTITIONTIME"] = exp.DataType.build("TIMESTAMP")
235-
if table.time_partitioning.type_ == "DAY":
236-
columns["_PARTITIONDATE"] = exp.DataType.build("DATE")
226+
def create_mapping_schema(
227+
schema: t.Sequence[bigquery.SchemaField],
228+
) -> t.Dict[str, exp.DataType]:
229+
return {
230+
field.name: exp.DataType.build(
231+
dtype_to_sql(field.to_standard_sql().type), dialect=self.dialect
232+
)
233+
for field in schema
234+
}
235+
236+
table = exp.to_table(table_name)
237+
if len(table.parts) > 3:
238+
# The client's `get_table` method can't handle paths with >3 identifiers
239+
self.execute(exp.select("*").from_(table).limit(1))
240+
query_results = self._query_job._query_results
241+
columns = create_mapping_schema(query_results.schema)
242+
else:
243+
bq_table = self._get_table(table)
244+
columns = create_mapping_schema(bq_table.schema)
245+
246+
if (
247+
include_pseudo_columns
248+
and bq_table.time_partitioning
249+
and not bq_table.time_partitioning.field
250+
):
251+
columns["_PARTITIONTIME"] = exp.DataType.build("TIMESTAMP")
252+
if bq_table.time_partitioning.type_ == "DAY":
253+
columns["_PARTITIONDATE"] = exp.DataType.build("DATE")
254+
237255
return columns
238256

239257
def alter_table(

tests/core/engine_adapter/integration/__init__.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from tests.utils.pandas import compare_dataframes
2424

2525
if t.TYPE_CHECKING:
26+
from sqlmesh.core._typing import TableName
2627
from sqlmesh.core.engine_adapter._typing import Query
2728

2829
TEST_SCHEMA = "test_schema"
@@ -212,12 +213,16 @@ def input_data(
212213
def output_data(self, data: pd.DataFrame) -> pd.DataFrame:
213214
return self._format_df(data)
214215

215-
def table(self, table_name: str, schema: str = TEST_SCHEMA) -> exp.Table:
216+
def table(self, table_name: TableName, schema: str = TEST_SCHEMA) -> exp.Table:
216217
schema = self.add_test_suffix(schema)
217218
self._schemas.append(schema)
219+
220+
table = exp.to_table(table_name)
221+
table.set("db", exp.parse_identifier(schema, dialect=self.dialect))
222+
218223
return exp.to_table(
219224
normalize_model_name(
220-
".".join([schema, table_name]),
225+
table,
221226
default_catalog=self.engine_adapter.default_catalog,
222227
dialect=self.dialect,
223228
)

tests/core/engine_adapter/integration/test_integration_bigquery.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,3 +178,43 @@ def _get_data_object(table: exp.Table) -> DataObject:
178178

179179
metadata = _get_data_object(target_table_1)
180180
assert not metadata.is_clustered
181+
182+
183+
def test_fetch_schema_of_information_schema_tables(
184+
ctx: TestContext, engine_adapter: BigQueryEngineAdapter
185+
):
186+
# We produce Table(this=Dot(this=INFORMATION_SCHEMA, expression=TABLES)) here,
187+
# otherwise `db` or `catalog` would be set, which is not the right representation
188+
information_schema_tables = exp.to_table("_._.INFORMATION_SCHEMA.TABLES")
189+
information_schema_tables.set("db", None)
190+
information_schema_tables.set("catalog", None)
191+
192+
source = ctx.table(information_schema_tables)
193+
194+
expected_columns_to_types = {
195+
"table_catalog": exp.DataType.build("TEXT"),
196+
"table_schema": exp.DataType.build("TEXT"),
197+
"table_name": exp.DataType.build("TEXT"),
198+
"table_type": exp.DataType.build("TEXT"),
199+
"is_insertable_into": exp.DataType.build("TEXT"),
200+
"is_typed": exp.DataType.build("TEXT"),
201+
"creation_time": exp.DataType.build("TIMESTAMPTZ"),
202+
"base_table_catalog": exp.DataType.build("TEXT"),
203+
"base_table_schema": exp.DataType.build("TEXT"),
204+
"base_table_name": exp.DataType.build("TEXT"),
205+
"snapshot_time_ms": exp.DataType.build("TIMESTAMPTZ"),
206+
"ddl": exp.DataType.build("TEXT"),
207+
"default_collation_name": exp.DataType.build("TEXT"),
208+
"upsert_stream_apply_watermark": exp.DataType.build("TIMESTAMPTZ"),
209+
"replica_source_catalog": exp.DataType.build("TEXT"),
210+
"replica_source_schema": exp.DataType.build("TEXT"),
211+
"replica_source_name": exp.DataType.build("TEXT"),
212+
"replication_status": exp.DataType.build("TEXT"),
213+
"replication_error": exp.DataType.build("TEXT"),
214+
"is_change_history_enabled": exp.DataType.build("TEXT"),
215+
"sync_status": exp.DataType.build(
216+
"STRUCT<last_completion_time TIMESTAMPTZ, error_time TIMESTAMPTZ, error STRUCT<reason TEXT, location TEXT, message TEXT>>"
217+
),
218+
}
219+
220+
assert expected_columns_to_types == engine_adapter.columns(source.sql())

tests/core/test_model.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5945,12 +5945,12 @@ def custom_macro(evaluator, arg1, arg2):
59455945
dialect snowflake,
59465946
);
59475947
5948-
SELECT * FROM (@custom_macro(@foo, @bar)) AS q
5948+
SELECT * FROM (@custom_macro(@a, @b)) AS q
59495949
""")
59505950

59515951
config = Config(
59525952
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
5953-
variables={"foo": "foo", "bar": "boo"},
5953+
variables={"a": "a", "b": "b"},
59545954
)
59555955
context = Context(paths=tmp_path, config=config)
59565956

0 commit comments

Comments
 (0)