Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,7 @@ def clone_table(
target_table_name: TableName,
source_table_name: TableName,
replace: bool = False,
exists: bool = True,
clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
**kwargs: t.Any,
) -> None:
Expand All @@ -1053,6 +1054,7 @@ def clone_table(
target_table_name: The name of the table that should be created.
source_table_name: The name of the source table that should be cloned.
replace: Whether or not to replace an existing table.
exists: Indicates whether to include the IF NOT EXISTS check.
"""
if not self.SUPPORTS_CLONING:
raise NotImplementedError(f"Engine does not support cloning: {type(self)}")
Expand All @@ -1063,6 +1065,7 @@ def clone_table(
this=exp.to_table(target_table_name),
kind="TABLE",
replace=replace,
exists=exists,
clone=exp.Clone(
this=exp.to_table(source_table_name),
**(clone_kwargs or {}),
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ def clone_table(
target_table_name: TableName,
source_table_name: TableName,
replace: bool = False,
exists: bool = True,
clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
**kwargs: t.Any,
) -> None:
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ def clone_table(
target_table_name: TableName,
source_table_name: TableName,
replace: bool = False,
exists: bool = True,
clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
**kwargs: t.Any,
) -> None:
Expand Down
14 changes: 12 additions & 2 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from sqlglot import exp, select
from sqlglot.executor import execute
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_not_exception_type

from sqlmesh.core import constants as c
from sqlmesh.core import dialect as d
Expand Down Expand Up @@ -76,6 +77,7 @@
from sqlmesh.utils.errors import (
ConfigError,
DestructiveChangeError,
MigrationNotSupportedError,
SQLMeshError,
format_destructive_change_msg,
format_additive_change_msg,
Expand Down Expand Up @@ -1035,7 +1037,6 @@ def _clone_snapshot_in_dev(
adapter.clone_table(
target_table_name,
snapshot.table_name(),
replace=True,
rendered_physical_properties=rendered_physical_properties,
)
self._migrate_target_table(
Expand Down Expand Up @@ -1111,6 +1112,15 @@ def _migrate_snapshot(
dry_run=True,
)

# Retry in case when the table is migrated concurrently from another plan application
@retry(
reraise=True,
stop=stop_after_attempt(5),
wait=wait_exponential(min=1, max=16),
retry=retry_if_not_exception_type(
(DestructiveChangeError, AdditiveChangeError, MigrationNotSupportedError)
),
)
def _migrate_target_table(
self,
target_table_name: str,
Expand Down Expand Up @@ -2671,7 +2681,7 @@ def migrate(
)
if len(potential_alter_operations) > 0:
# this can happen if a user changes a managed model and deliberately overrides a plan to be forward only, eg `sqlmesh plan --forward-only`
raise SQLMeshError(
raise MigrationNotSupportedError(
f"The schema of the managed model '{target_table_name}' cannot be updated in a forward-only fashion."
)

Expand Down
4 changes: 4 additions & 0 deletions sqlmesh/utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ class AdditiveChangeError(SQLMeshError):
pass


class MigrationNotSupportedError(SQLMeshError):
pass


class NotificationTargetError(SQLMeshError):
pass

Expand Down
2 changes: 1 addition & 1 deletion tests/core/engine_adapter/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3347,7 +3347,7 @@ def test_clone_table(make_mocked_engine_adapter: t.Callable):
adapter.clone_table("target_table", "source_table")

adapter.cursor.execute.assert_called_once_with(
"CREATE TABLE `target_table` CLONE `source_table`"
"CREATE TABLE IF NOT EXISTS `target_table` CLONE `source_table`"
)


Expand Down
2 changes: 1 addition & 1 deletion tests/core/engine_adapter/test_databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def test_clone_table(mocker: MockFixture, make_mocked_engine_adapter: t.Callable
adapter = make_mocked_engine_adapter(DatabricksEngineAdapter, default_catalog="test_catalog")
adapter.clone_table("target_table", "source_table")
adapter.cursor.execute.assert_called_once_with(
"CREATE TABLE `target_table` SHALLOW CLONE `source_table`"
"CREATE TABLE IF NOT EXISTS `target_table` SHALLOW CLONE `source_table`"
)


Expand Down
6 changes: 3 additions & 3 deletions tests/core/engine_adapter/test_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ def test_clone_table(mocker: MockerFixture, make_mocked_engine_adapter: t.Callab
adapter = make_mocked_engine_adapter(SnowflakeEngineAdapter, default_catalog="test_catalog")
adapter.clone_table("target_table", "source_table")
adapter.cursor.execute.assert_called_once_with(
'CREATE TABLE "target_table" CLONE "source_table"'
'CREATE TABLE IF NOT EXISTS "target_table" CLONE "source_table"'
)

# Validate with transient type we create the clone table accordingly
Expand All @@ -700,7 +700,7 @@ def test_clone_table(mocker: MockerFixture, make_mocked_engine_adapter: t.Callab
"target_table", "source_table", rendered_physical_properties=rendered_physical_properties
)
adapter.cursor.execute.assert_called_once_with(
'CREATE TRANSIENT TABLE "target_table" CLONE "source_table"'
'CREATE TRANSIENT TABLE IF NOT EXISTS "target_table" CLONE "source_table"'
)

# Validate other engine adapters would work as usual even when we pass the properties
Expand All @@ -710,7 +710,7 @@ def test_clone_table(mocker: MockerFixture, make_mocked_engine_adapter: t.Callab
"target_table", "source_table", rendered_physical_properties=rendered_physical_properties
)
adapter.cursor.execute.assert_called_once_with(
'CREATE TABLE "target_table" CLONE "source_table"'
'CREATE TABLE IF NOT EXISTS "target_table" CLONE "source_table"'
)


Expand Down
4 changes: 1 addition & 3 deletions tests/core/test_snapshot_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1678,7 +1678,6 @@ def test_create_clone_in_dev(mocker: MockerFixture, adapter_mock, make_snapshot)
adapter_mock.clone_table.assert_called_once_with(
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.dev_version}__dev",
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}",
replace=True,
rendered_physical_properties={},
)

Expand All @@ -1701,7 +1700,7 @@ def test_drop_clone_in_dev_when_migration_fails(mocker: MockerFixture, adapter_m
adapter_mock.get_alter_operations.return_value = []
evaluator = SnapshotEvaluator(adapter_mock)

adapter_mock.alter_table.side_effect = Exception("Migration failed")
adapter_mock.alter_table.side_effect = DestructiveChangeError("Migration failed")

model = load_sql_based_model(
parse( # type: ignore
Expand All @@ -1728,7 +1727,6 @@ def test_drop_clone_in_dev_when_migration_fails(mocker: MockerFixture, adapter_m
adapter_mock.clone_table.assert_called_once_with(
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}__dev",
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}",
replace=True,
rendered_physical_properties={},
)

Expand Down