Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ module = [
"pydantic_core.*",
"dlt.*",
"bigframes.*",
"json_stream.*"
"json_stream.*",
"duckdb.*"
]
ignore_missing_imports = true

Expand Down
11 changes: 10 additions & 1 deletion sqlmesh/core/plan/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Snapshot,
SnapshotTableInfo,
SnapshotId,
snapshots_to_dag,
)


Expand Down Expand Up @@ -248,6 +249,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
stored_snapshots = self.state_reader.get_snapshots(plan.environment.snapshots)
snapshots = {**new_snapshots, **stored_snapshots}
snapshots_by_name = {s.name: s for s in snapshots.values()}
dag = snapshots_to_dag(snapshots.values())
Comment on lines 249 to +252
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reasoning for having this logic outside of EvaluatablePlan? For example should it have properties evaluatable_snapshots and dag which would return this info? Reading this over and I was surprised I couldn't consult the plan itself to get this information.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because EvaluatablePlan instance only contains data which can be (and will be) serialized. We could consider making it a property method of the EvaluatblePlan but currently there isn't really any other consumer for this. Plus we need access to the StateReader to fetch all snapshots which EvalutablePlan doesn't (and shouldn't) posses.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes the state reader. Thanks.


all_selected_for_backfill_snapshots = {
s.snapshot_id for s in snapshots.values() if plan.is_selected_for_backfill(s.name)
Expand All @@ -271,8 +273,15 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
after_promote_snapshots = all_selected_for_backfill_snapshots - before_promote_snapshots
deployability_index = DeployabilityIndex.all_deployable()

snapshot_ids_with_schema_migration = [
s.snapshot_id for s in snapshots.values() if s.requires_schema_migration_in_prod
]
# Include all upstream dependencies of snapshots that require schema migration to make sure
# the upstream tables are created before the schema updates are applied
snapshots_with_schema_migration = [
s for s in snapshots.values() if s.requires_schema_migration_in_prod
snapshots[s_id]
for s_id in dag.subdag(*snapshot_ids_with_schema_migration)
if snapshots[s_id].supports_schema_migration_in_prod
]

snapshots_to_intervals = self._missing_intervals(
Expand Down
20 changes: 10 additions & 10 deletions sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1477,19 +1477,19 @@ def expiration_ts(self) -> int:
check_categorical_relative_expression=False,
)

@property
def supports_schema_migration_in_prod(self) -> bool:
"""Returns whether or not this snapshot supports schema migration when deployed to production."""
return self.is_paused and self.is_model and not self.is_symbolic

@property
def requires_schema_migration_in_prod(self) -> bool:
"""Returns whether or not this snapshot requires a schema migration when deployed to production."""
return (
self.is_paused
and self.is_model
and self.is_materialized
and (
(self.previous_version and self.previous_version.version == self.version)
or self.model.forward_only
or bool(self.model.physical_version)
or not self.virtual_environment_mode.is_full
)
return self.supports_schema_migration_in_prod and (
(self.previous_version and self.previous_version.version == self.version)
or self.model.forward_only
or bool(self.model.physical_version)
or not self.virtual_environment_mode.is_full
)

@property
Expand Down
25 changes: 18 additions & 7 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,15 +489,14 @@ def migrate(
allow_destructive_snapshots = allow_destructive_snapshots or set()
allow_additive_snapshots = allow_additive_snapshots or set()
snapshots_by_name = {s.name: s for s in snapshots.values()}
snapshots_with_data_objects = [snapshots[s_id] for s_id in target_data_objects]
with self.concurrent_context():
# Only migrate snapshots for which there's an existing data object
concurrent_apply_to_snapshots(
snapshots_with_data_objects,
snapshots_by_name.values(),
lambda s: self._migrate_snapshot(
s,
snapshots_by_name,
target_data_objects[s.snapshot_id],
target_data_objects.get(s.snapshot_id),
Comment on lines +495 to +499
Copy link

Copilot AI Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The migration logic now passes None for snapshots without target data objects, but the downstream code may not handle this case properly. Consider adding validation to ensure the migration logic can handle None target_data_object values.

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the downstream method indicates that it supports an optional target_data_object in its signature

allow_destructive_snapshots,
allow_additive_snapshots,
self.get_adapter(s.model_gateway),
Expand Down Expand Up @@ -1059,7 +1058,7 @@ def _migrate_snapshot(
adapter: EngineAdapter,
deployability_index: DeployabilityIndex,
) -> None:
if not snapshot.requires_schema_migration_in_prod:
if not snapshot.is_model or snapshot.is_symbolic:
return

deployability_index = DeployabilityIndex.all_deployable()
Expand All @@ -1081,20 +1080,32 @@ def _migrate_snapshot(
):
table_exists = False

rendered_physical_properties = snapshot.model.render_physical_properties(
**render_kwargs
)

if table_exists:
self._migrate_target_table(
target_table_name=target_table_name,
snapshot=snapshot,
snapshots=snapshots,
deployability_index=deployability_index,
render_kwargs=render_kwargs,
rendered_physical_properties=snapshot.model.render_physical_properties(
**render_kwargs
),
rendered_physical_properties=rendered_physical_properties,
allow_destructive_snapshots=allow_destructive_snapshots,
allow_additive_snapshots=allow_additive_snapshots,
run_pre_post_statements=True,
)
else:
self._execute_create(
snapshot=snapshot,
table_name=snapshot.table_name(is_deployable=True),
is_table_deployable=True,
deployability_index=deployability_index,
create_render_kwargs=render_kwargs,
rendered_physical_properties=rendered_physical_properties,
dry_run=True,
Copy link

Copilot AI Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dry_run=True parameter suggests this is only a validation step, but for view migration to work properly, the view should actually be created. This may prevent view snapshots from being properly migrated.

Suggested change
dry_run=True,
dry_run=False,

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just a bad comment all around

)

def _migrate_target_table(
self,
Expand Down
2 changes: 1 addition & 1 deletion tests/core/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1593,7 +1593,7 @@ def test_raw_code_handling(sushi_test_dbt_context: Context):
hook = model.render_pre_statements()[0]
assert (
hook.sql()
== f'''CREATE TABLE "t" AS SELECT 'Length is {raw_code_length}' AS "length_col"'''
== f'''CREATE TABLE IF NOT EXISTS "t" AS SELECT 'Length is {raw_code_length}' AS "length_col"'''
)


Expand Down
20 changes: 20 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,26 @@ def test_forward_only_parent_created_in_dev_child_created_in_prod(
context.apply(plan)


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_forward_only_view_migration(
init_and_plan_context: t.Callable,
):
context, plan = init_and_plan_context("examples/sushi")
context.apply(plan)

model = context.get_model("sushi.top_waiters")
assert model.kind.is_view
model = add_projection_to_model(t.cast(SqlModel, model))
context.upsert_model(model)

# Apply a forward-only plan
context.plan("prod", skip_tests=True, no_prompts=True, auto_apply=True, forward_only=True)

# Make sure that the new column got reflected in the view schema
df = context.fetchdf("SELECT one FROM sushi.top_waiters LIMIT 1")
assert len(df) == 1


@time_machine.travel("2023-01-08 00:00:00 UTC")
def test_new_forward_only_model(init_and_plan_context: t.Callable):
context, _ = init_and_plan_context("examples/sushi")
Expand Down
11 changes: 6 additions & 5 deletions tests/core/test_plan_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -1661,16 +1661,17 @@ def test_build_plan_stages_indirect_non_breaking_view_migration(
stages = build_plan_stages(plan, state_reader, None)

# Verify stages
assert len(stages) == 8
assert len(stages) == 9

assert isinstance(stages[0], CreateSnapshotRecordsStage)
assert isinstance(stages[1], PhysicalLayerSchemaCreationStage)
assert isinstance(stages[2], BackfillStage)
assert isinstance(stages[3], EnvironmentRecordUpdateStage)
assert isinstance(stages[4], UnpauseStage)
assert isinstance(stages[5], BackfillStage)
assert isinstance(stages[6], VirtualLayerUpdateStage)
assert isinstance(stages[7], FinalizeEnvironmentStage)
assert isinstance(stages[4], MigrateSchemasStage)
assert isinstance(stages[5], UnpauseStage)
assert isinstance(stages[6], BackfillStage)
assert isinstance(stages[7], VirtualLayerUpdateStage)
assert isinstance(stages[8], FinalizeEnvironmentStage)


def test_build_plan_stages_virtual_environment_mode_filtering(
Expand Down
8 changes: 7 additions & 1 deletion tests/core/test_snapshot_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1402,7 +1402,13 @@ def test_migrate_view(
evaluator = SnapshotEvaluator(adapter)
evaluator.migrate([snapshot], {})

adapter.cursor.execute.assert_not_called()
adapter.cursor.execute.assert_has_calls(
[
call(
f'CREATE OR REPLACE VIEW "sqlmesh__test_schema"."test_schema__test_model__{snapshot.version}" ("c", "a") AS SELECT "c" AS "c", "a" AS "a" FROM "tbl" AS "tbl"'
),
]
)


def test_migrate_snapshot_data_object_type_mismatch(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{{
config(
pre_hook=['CREATE TABLE t AS SELECT \'Length is {{ model.raw_code|length }}\' AS length_col']
pre_hook=['CREATE TABLE IF NOT EXISTS t AS SELECT \'Length is {{ model.raw_code|length }}\' AS length_col']
)
}}

Expand Down