|
40 | 40 | ) |
41 | 41 | from sqlmesh.core.model.kind import TimeColumn |
42 | 42 | from sqlmesh.core.schema_diff import SchemaDiffer |
43 | | -from sqlmesh.utils import columns_to_types_all_known, random_id, CorrelationId |
44 | | -from sqlmesh.utils.connection_pool import create_connection_pool, ConnectionPool |
| 43 | +from sqlmesh.utils import CorrelationId, columns_to_types_all_known, random_id |
| 44 | +from sqlmesh.utils.connection_pool import ConnectionPool, create_connection_pool |
45 | 45 | from sqlmesh.utils.date import TimeLike, make_inclusive, to_time_column |
46 | 46 | from sqlmesh.utils.errors import ( |
| 47 | + MissingDefaultCatalogError, |
47 | 48 | SQLMeshError, |
48 | 49 | UnsupportedCatalogOperationError, |
49 | | - MissingDefaultCatalogError, |
50 | 50 | ) |
51 | 51 | from sqlmesh.utils.pandas import columns_to_types_from_df |
52 | 52 |
|
|
55 | 55 |
|
56 | 56 | from sqlmesh.core._typing import SchemaName, SessionProperties, TableName |
57 | 57 | from sqlmesh.core.engine_adapter._typing import ( |
58 | | - BigframeSession, |
59 | 58 | DF, |
| 59 | + BigframeSession, |
60 | 60 | PySparkDataFrame, |
61 | 61 | PySparkSession, |
62 | 62 | Query, |
@@ -371,7 +371,9 @@ def replace_query( |
371 | 371 | """ |
372 | 372 | target_table = exp.to_table(table_name) |
373 | 373 |
|
374 | | - table_exists = self._drop_data_object_on_type_mismatch(target_table, DataObjectType.TABLE) |
| 374 | + target_data_object = self._get_data_object(target_table) |
| 375 | + table_exists = target_data_object is not None |
| 376 | + self._drop_data_object_on_type_mismatch(target_data_object, DataObjectType.TABLE) |
375 | 377 |
|
376 | 378 | source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( |
377 | 379 | query_or_df, columns_to_types, target_table=target_table |
@@ -1146,7 +1148,7 @@ def create_view( |
1146 | 1148 |
|
1147 | 1149 | if replace: |
1148 | 1150 | self._drop_data_object_on_type_mismatch( |
1149 | | - view_name, |
| 1151 | + self._get_data_object(view_name), |
1150 | 1152 | DataObjectType.VIEW if not materialized else DataObjectType.MATERIALIZED_VIEW, |
1151 | 1153 | ) |
1152 | 1154 |
|
@@ -2515,34 +2517,34 @@ def _truncate_table(self, table_name: TableName) -> None: |
2515 | 2517 | table = exp.to_table(table_name) |
2516 | 2518 | self.execute(f"TRUNCATE TABLE {table.sql(dialect=self.dialect, identify=True)}") |
2517 | 2519 |
|
| 2520 | + def _get_data_object(self, target_name: TableName) -> t.Optional[DataObject]: |
| 2521 | + target_table = exp.to_table(target_name) |
| 2522 | + existing_data_objects = self.get_data_objects( |
| 2523 | + schema_(target_table.db, target_table.catalog), {target_table.name} |
| 2524 | + ) |
| 2525 | + if existing_data_objects: |
| 2526 | + return existing_data_objects[0] |
| 2527 | + return None |
| 2528 | + |
2518 | 2529 | def _drop_data_object_on_type_mismatch( |
2519 | | - self, target_name: TableName, expected_type: DataObjectType |
2520 | | - ) -> bool: |
| 2530 | + self, data_object: t.Optional[DataObject], expected_type: DataObjectType |
| 2531 | + ) -> None: |
2521 | 2532 | """Drops a data object if it exists and is not of the expected type. |
2522 | 2533 |
|
2523 | 2534 | Args: |
2524 | | - target_name: The name of the data object to check. |
| 2535 | + data_object: The data object to check. |
2525 | 2536 | expected_type: The expected type of the data object. |
2526 | | -
|
2527 | | - Returns: |
2528 | | - True if the data object exists and is of the expected type, False otherwise. |
2529 | 2537 | """ |
2530 | | - target_table = exp.to_table(target_name) |
2531 | | - existing_data_objects = self.get_data_objects( |
2532 | | - schema_(target_table.db, target_table.catalog), {target_table.name} |
2533 | | - ) |
2534 | | - if existing_data_objects: |
2535 | | - if existing_data_objects[0].type == expected_type: |
2536 | | - return True |
| 2538 | + if data_object is None or data_object.type == expected_type: |
| 2539 | + return |
2537 | 2540 |
|
2538 | | - logger.warning( |
2539 | | - "Target data object '%s' is a %s and not a %s, dropping it", |
2540 | | - target_table.sql(dialect=self.dialect), |
2541 | | - existing_data_objects[0].type.value, |
2542 | | - expected_type.value, |
2543 | | - ) |
2544 | | - self.drop_data_object(existing_data_objects[0]) |
2545 | | - return False |
| 2541 | + logger.warning( |
| 2542 | + "Target data object '%s' is a %s and not a %s, dropping it", |
| 2543 | + data_object.to_table().sql(dialect=self.dialect), |
| 2544 | + data_object.type.value, |
| 2545 | + expected_type.value, |
| 2546 | + ) |
| 2547 | + self.drop_data_object(data_object) |
2546 | 2548 |
|
2547 | 2549 | def _replace_by_key( |
2548 | 2550 | self, |
|
0 commit comments