Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions sqlmesh/core/engine_adapter/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,18 @@ def _df_to_source_queries(
)

def query_factory() -> Query:
if bigframes_pd and isinstance(df, bigframes_pd.DataFrame):
df.to_gbq(
ordered_df = df[list(source_columns_to_types)]
if bigframes_pd and isinstance(ordered_df, bigframes_pd.DataFrame):
ordered_df.to_gbq(
f"{temp_bq_table.project}.{temp_bq_table.dataset_id}.{temp_bq_table.table_id}",
if_exists="replace",
)
elif not self.table_exists(temp_table):
# Make mypy happy
assert isinstance(df, pd.DataFrame)
assert isinstance(ordered_df, pd.DataFrame)
self._db_call(self.client.create_table, table=temp_bq_table, exists_ok=False)
result = self.__load_pandas_to_table(
temp_bq_table, df, source_columns_to_types, replace=False
temp_bq_table, ordered_df, source_columns_to_types, replace=False
)
if result.errors:
raise SQLMeshError(result.errors)
Expand Down
14 changes: 8 additions & 6 deletions sqlmesh/core/engine_adapter/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@ def query_factory() -> Query:
elif isinstance(df, pd.DataFrame):
from snowflake.connector.pandas_tools import write_pandas

ordered_df = df[list(source_columns_to_types)]

# Workaround for https://github.com/snowflakedb/snowflake-connector-python/issues/1034
# The above issue has already been fixed upstream, but we keep the following
# line anyway in order to support a wider range of Snowflake versions.
Expand All @@ -388,16 +390,16 @@ def query_factory() -> Query:

# See: https://stackoverflow.com/a/75627721
for column, kind in source_columns_to_types.items():
if is_datetime64_any_dtype(df.dtypes[column]):
if is_datetime64_any_dtype(ordered_df.dtypes[column]):
if kind.is_type("date"): # type: ignore
df[column] = pd.to_datetime(df[column]).dt.date # type: ignore
elif getattr(df.dtypes[column], "tz", None) is not None: # type: ignore
df[column] = pd.to_datetime(df[column]).dt.strftime(
ordered_df[column] = pd.to_datetime(ordered_df[column]).dt.date # type: ignore
elif getattr(ordered_df.dtypes[column], "tz", None) is not None: # type: ignore
ordered_df[column] = pd.to_datetime(ordered_df[column]).dt.strftime(
"%Y-%m-%d %H:%M:%S.%f%z"
) # type: ignore
# https://github.com/snowflakedb/snowflake-connector-python/issues/1677
else: # type: ignore
df[column] = pd.to_datetime(df[column]).dt.strftime(
ordered_df[column] = pd.to_datetime(ordered_df[column]).dt.strftime(
"%Y-%m-%d %H:%M:%S.%f"
) # type: ignore

Expand All @@ -407,7 +409,7 @@ def query_factory() -> Query:

write_pandas(
self._connection_pool.get(),
df,
ordered_df,
temp_table.name,
schema=temp_table.db or None,
database=database.sql(dialect=self.dialect) if database else None,
Expand Down
8 changes: 7 additions & 1 deletion tests/core/engine_adapter/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,13 @@ def temp_table_exists(table: exp.Table) -> bool:
retry_resp_call.errors = None
retry_mock.return_value = retry_resp
db_call_mock.return_value = AttributeDict({"errors": None})
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
df = pd.DataFrame(
{
"id": [1, 2, 3],
"ts": ["2025-01-01 00:00:00", "2025-01-01 00:00:00", "2025-01-01 00:00:00"],
"val": [7, 8, 9],
}
)
Comment on lines +490 to +496
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual values in the dataframe didn't matter before so they just had placeholder values but due to the ordering check they are now updated to match what we claim the structure is in the merge call.

adapter.merge(
target_table="target",
source_table=df,
Expand Down