Skip to content

Commit 9feefcf

Browse files
committed
fix: bigquery snowflake source columns support
1 parent 34dc9fd commit 9feefcf

File tree

3 files changed

+20
-11
lines changed

3 files changed

+20
-11
lines changed

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,17 +169,18 @@ def _df_to_source_queries(
169169
)
170170

171171
def query_factory() -> Query:
172-
if bigframes_pd and isinstance(df, bigframes_pd.DataFrame):
173-
df.to_gbq(
172+
ordered_df = df[list(source_columns_to_types)]
173+
if bigframes_pd and isinstance(ordered_df, bigframes_pd.DataFrame):
174+
ordered_df.to_gbq(
174175
f"{temp_bq_table.project}.{temp_bq_table.dataset_id}.{temp_bq_table.table_id}",
175176
if_exists="replace",
176177
)
177178
elif not self.table_exists(temp_table):
178179
# Make mypy happy
179-
assert isinstance(df, pd.DataFrame)
180+
assert isinstance(ordered_df, pd.DataFrame)
180181
self._db_call(self.client.create_table, table=temp_bq_table, exists_ok=False)
181182
result = self.__load_pandas_to_table(
182-
temp_bq_table, df, source_columns_to_types, replace=False
183+
temp_bq_table, ordered_df, source_columns_to_types, replace=False
183184
)
184185
if result.errors:
185186
raise SQLMeshError(result.errors)

sqlmesh/core/engine_adapter/snowflake.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,8 @@ def query_factory() -> Query:
378378
elif isinstance(df, pd.DataFrame):
379379
from snowflake.connector.pandas_tools import write_pandas
380380

381+
ordered_df = df[list(source_columns_to_types)]
382+
381383
# Workaround for https://github.com/snowflakedb/snowflake-connector-python/issues/1034
382384
# The above issue has already been fixed upstream, but we keep the following
383385
# line anyway in order to support a wider range of Snowflake versions.
@@ -388,16 +390,16 @@ def query_factory() -> Query:
388390

389391
# See: https://stackoverflow.com/a/75627721
390392
for column, kind in source_columns_to_types.items():
391-
if is_datetime64_any_dtype(df.dtypes[column]):
393+
if is_datetime64_any_dtype(ordered_df.dtypes[column]):
392394
if kind.is_type("date"): # type: ignore
393-
df[column] = pd.to_datetime(df[column]).dt.date # type: ignore
394-
elif getattr(df.dtypes[column], "tz", None) is not None: # type: ignore
395-
df[column] = pd.to_datetime(df[column]).dt.strftime(
395+
ordered_df[column] = pd.to_datetime(ordered_df[column]).dt.date # type: ignore
396+
elif getattr(ordered_df.dtypes[column], "tz", None) is not None: # type: ignore
397+
ordered_df[column] = pd.to_datetime(ordered_df[column]).dt.strftime(
396398
"%Y-%m-%d %H:%M:%S.%f%z"
397399
) # type: ignore
398400
# https://github.com/snowflakedb/snowflake-connector-python/issues/1677
399401
else: # type: ignore
400-
df[column] = pd.to_datetime(df[column]).dt.strftime(
402+
ordered_df[column] = pd.to_datetime(ordered_df[column]).dt.strftime(
401403
"%Y-%m-%d %H:%M:%S.%f"
402404
) # type: ignore
403405

@@ -407,7 +409,7 @@ def query_factory() -> Query:
407409

408410
write_pandas(
409411
self._connection_pool.get(),
410-
df,
412+
ordered_df,
411413
temp_table.name,
412414
schema=temp_table.db or None,
413415
database=database.sql(dialect=self.dialect) if database else None,

tests/core/engine_adapter/test_bigquery.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,13 @@ def temp_table_exists(table: exp.Table) -> bool:
487487
retry_resp_call.errors = None
488488
retry_mock.return_value = retry_resp
489489
db_call_mock.return_value = AttributeDict({"errors": None})
490-
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
490+
df = pd.DataFrame(
491+
{
492+
"id": [1, 2, 3],
493+
"ts": ["2025-01-01 00:00:00", "2025-01-01 00:00:00", "2025-01-01 00:00:00"],
494+
"val": [7, 8, 9],
495+
}
496+
)
491497
adapter.merge(
492498
target_table="target",
493499
source_table=df,

0 commit comments

Comments
 (0)