From ffc202b6d9202a81abed901199fa74f08dc51a78 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Fri, 19 Sep 2025 13:27:44 -0700 Subject: [PATCH] Fix: Support a proper migration of seed models --- sqlmesh/core/plan/common.py | 6 +++- sqlmesh/core/plan/stages.py | 2 +- sqlmesh/core/snapshot/evaluator.py | 18 ++++++++-- tests/core/test_integration.py | 56 ++++++++++++++++++++++++++++++ tests/core/test_plan.py | 1 + 5 files changed, 78 insertions(+), 5 deletions(-) diff --git a/sqlmesh/core/plan/common.py b/sqlmesh/core/plan/common.py index 2ae34fbfba..bece17639c 100644 --- a/sqlmesh/core/plan/common.py +++ b/sqlmesh/core/plan/common.py @@ -16,7 +16,11 @@ def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool: if new.is_view and new.is_indirect_non_breaking and not new.is_forward_only: # View models always need to be rebuilt to reflect updated upstream dependencies return True - if new.is_seed and not new.is_metadata: + if new.is_seed and not ( + new.is_metadata + and new.previous_version + and new.previous_version.snapshot_id(new.name) == old.snapshot_id + ): # Seed models always need to be rebuilt to reflect changes in the seed file # Unless only their metadata has been updated (eg description added) and the seed file has not been touched return True diff --git a/sqlmesh/core/plan/stages.py b/sqlmesh/core/plan/stages.py index 9425608619..729e1705b4 100644 --- a/sqlmesh/core/plan/stages.py +++ b/sqlmesh/core/plan/stages.py @@ -268,7 +268,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]: before_promote_snapshots = { s.snapshot_id for s in snapshots.values() - if deployability_index.is_representative(s) + if (deployability_index.is_representative(s) or s.is_seed) and plan.is_selected_for_backfill(s.name) } after_promote_snapshots = all_selected_for_backfill_snapshots - before_promote_snapshots diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 688f9d8d5b..c2064fd853 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1138,10 +1138,10 @@ def _migrate_target_table( ) -> None: adapter = self.get_adapter(snapshot.model.gateway) - target_table = exp.to_table(target_table_name) - target_table.this.set("this", f"{target_table.name}_schema_tmp") + tmp_table = exp.to_table(target_table_name) + tmp_table.this.set("this", f"{tmp_table.name}_schema_tmp") + tmp_table_name = tmp_table.sql() - tmp_table_name = target_table.sql() if snapshot.is_materialized: self._execute_create( snapshot=snapshot, @@ -2182,6 +2182,18 @@ def create( self.adapter.drop_table(table_name) raise + def migrate( + self, + target_table_name: str, + source_table_name: str, + snapshot: Snapshot, + *, + ignore_destructive: bool, + ignore_additive: bool, + **kwargs: t.Any, + ) -> None: + raise NotImplementedError("Seeds do not support migrations.") + def insert( self, table_name: str, diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index a3f9584aa3..bac495a5f1 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -3232,6 +3232,62 @@ def test_virtual_environment_mode_dev_only_model_change_standalone_audit( context.apply(plan) +@time_machine.travel("2023-01-08 15:00:00 UTC") +def test_virtual_environment_mode_dev_only_seed_model_change_schema( + init_and_plan_context: t.Callable, +): + context, plan = init_and_plan_context( + "examples/sushi", config="test_config_virtual_environment_mode_dev_only" + ) + context.apply(plan) + + new_csv = [] + with open(context.path / "seeds" / "waiter_names.csv", "r") as fd: + is_header = True + for idx, line in enumerate(fd): + line = line.strip() + if not line: + continue + if is_header: + new_csv.append(line + ",new_column") + is_header = False + else: + new_csv.append(line + f",v{idx}") + + with open(context.path / "seeds" / "waiter_names.csv", "w") as fd: + fd.write("\n".join(new_csv)) + + context.load() + + downstream_model = context.get_model("sushi.waiter_as_customer_by_day") + downstream_model_kind = downstream_model.kind.dict() + downstream_model_kwargs = { + **downstream_model.dict(), + "kind": { + **downstream_model_kind, + "on_destructive_change": "allow", + }, + "audits": [], + # Use the new column + "query": "SELECT '2023-01-07' AS event_date, new_column AS new_column FROM sushi.waiter_names", + } + context.upsert_model(SqlModel.parse_obj(downstream_model_kwargs)) + + context.plan("dev", auto_apply=True, no_prompts=True, skip_tests=True, enable_preview=True) + + assert ( + context.engine_adapter.fetchone( + "SELECT COUNT(*) FROM sushi__dev.waiter_as_customer_by_day" + )[0] + == len(new_csv) - 1 + ) + + # Deploy to prod + context.clear_caches() + context.plan("prod", auto_apply=True, no_prompts=True, skip_tests=True) + assert "new_column" in context.engine_adapter.columns("sushi.waiter_as_customer_by_day") + + @time_machine.travel("2023-01-08 15:00:00 UTC") def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable): context, plan = init_and_plan_context("examples/sushi") diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index 59bc91d1bf..40967f1fbe 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -1214,6 +1214,7 @@ def test_seed_model_metadata_change_no_missing_intervals( description="foo", ) ) + snapshot_a_metadata_updated.previous_versions = snapshot_a.all_versions assert snapshot_a_metadata_updated.version is None assert snapshot_a_metadata_updated.change_category is None