From 9656c41a97b94edf94e2cbf245a56acfbeadc028 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Tue, 2 Sep 2025 12:50:19 -0700 Subject: [PATCH 1/5] Fix: Make sure that changes to seed models are reflected in the dev-only VDE mode --- sqlmesh/core/plan/common.py | 5 ++++- tests/core/test_integration.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/plan/common.py b/sqlmesh/core/plan/common.py index 8d31b0ead3..929837eb7e 100644 --- a/sqlmesh/core/plan/common.py +++ b/sqlmesh/core/plan/common.py @@ -5,7 +5,10 @@ def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool: if new.is_view and new.is_indirect_non_breaking and not new.is_forward_only: - # View models always need to be rebuilt to reflect updated upstream dependencies. + # View models always need to be rebuilt to reflect updated upstream dependencies + return True + if new.is_seed: + # Seed models always need to be rebuilt to reflect changes in the seed file return True return is_breaking_kind_change(old, new) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index c22e904374..ab45bd23b1 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -2918,6 +2918,37 @@ def test_virtual_environment_mode_dev_only_model_kind_change_manual_categorizati ] +@time_machine.travel("2023-01-08 15:00:00 UTC") +def test_virtual_environment_mode_dev_only_seed_model_change( + init_and_plan_context: t.Callable, +): + context, plan = init_and_plan_context( + "examples/sushi", config="test_config_virtual_environment_mode_dev_only" + ) + context.apply(plan) + + seed_model = context.get_model("sushi.waiter_names") + with open(seed_model.seed_path, "a") as fd: + fd.write("\n123,New Test Name") + + context.load() + seed_model_snapshot = context.get_snapshot("sushi.waiter_names") + plan = context.plan_builder("prod").build() + assert plan.directly_modified == {seed_model_snapshot.snapshot_id} + assert len(plan.missing_intervals) == 1 + assert plan.missing_intervals[0].snapshot_id == seed_model_snapshot.snapshot_id + + context.apply(plan) + + actual_seed_df = context.fetchdf("SELECT * FROM sushi.waiter_names WHERE id = 123") + assert actual_seed_df.to_dict("records") == [ + { + "id": 123, + "name": "New Test Name", + } + ] + + @time_machine.travel("2023-01-08 15:00:00 UTC") def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable): context, plan = init_and_plan_context("examples/sushi") From 8c5f437566062f586edb94d037245584971e35b7 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Tue, 2 Sep 2025 13:14:12 -0700 Subject: [PATCH 2/5] Fix: Seed model changes when using the dev-only VDE mode --- sqlmesh/core/console.py | 2 +- sqlmesh/core/snapshot/evaluator.py | 27 ++++++++++++--------------- tests/core/test_integration.py | 20 ++++++++++++-------- 3 files changed, 25 insertions(+), 24 deletions(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index 0907b39987..af28f75932 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -2076,7 +2076,7 @@ def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[st if text_diff: self._print("") self._print(Syntax(text_diff, "sql", word_wrap=True)) - self._print(tree) + self._print(tree) def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None: """Displays the models with missing dates.""" diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index c53c0a88db..6d6525a771 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -2102,21 +2102,18 @@ def create( return super().create(table_name, model, is_table_deployable, render_kwargs, **kwargs) - if is_table_deployable: - # For seeds we insert data at the time of table creation. - try: - for index, df in enumerate(model.render_seed()): - if index == 0: - self._replace_query_for_model( - model, table_name, df, render_kwargs, **kwargs - ) - else: - self.adapter.insert_append( - table_name, df, target_columns_to_types=model.columns_to_types - ) - except Exception: - self.adapter.drop_table(table_name) - raise + # For seeds we insert data at the time of table creation. + try: + for index, df in enumerate(model.render_seed()): + if index == 0: + self._replace_query_for_model(model, table_name, df, render_kwargs, **kwargs) + else: + self.adapter.insert_append( + table_name, df, target_columns_to_types=model.columns_to_types + ) + except Exception: + self.adapter.drop_table(table_name) + raise def insert( self, diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index ab45bd23b1..878796112e 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -2933,20 +2933,24 @@ def test_virtual_environment_mode_dev_only_seed_model_change( context.load() seed_model_snapshot = context.get_snapshot("sushi.waiter_names") + plan = context.plan_builder("dev").build() + assert plan.directly_modified == {seed_model_snapshot.snapshot_id} + assert len(plan.missing_intervals) == 2 + context.apply(plan) + + actual_seed_df_in_dev = context.fetchdf("SELECT * FROM sushi__dev.waiter_names WHERE id = 123") + assert actual_seed_df_in_dev.to_dict("records") == [{"id": 123, "name": "New Test Name"}] + actual_seed_df_in_prod = context.fetchdf("SELECT * FROM sushi.waiter_names WHERE id = 123") + assert actual_seed_df_in_prod.empty + plan = context.plan_builder("prod").build() assert plan.directly_modified == {seed_model_snapshot.snapshot_id} assert len(plan.missing_intervals) == 1 assert plan.missing_intervals[0].snapshot_id == seed_model_snapshot.snapshot_id - context.apply(plan) - actual_seed_df = context.fetchdf("SELECT * FROM sushi.waiter_names WHERE id = 123") - assert actual_seed_df.to_dict("records") == [ - { - "id": 123, - "name": "New Test Name", - } - ] + actual_seed_df_in_prod = context.fetchdf("SELECT * FROM sushi.waiter_names WHERE id = 123") + assert actual_seed_df_in_prod.to_dict("records") == [{"id": 123, "name": "New Test Name"}] @time_machine.travel("2023-01-08 15:00:00 UTC") From b69e2dc2e3f082e9a69faac9c5da2f9aad2fc7fa Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Tue, 2 Sep 2025 15:21:18 -0700 Subject: [PATCH 3/5] try fix test --- tests/core/test_integration.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 878796112e..664c3138fb 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -2931,6 +2931,7 @@ def test_virtual_environment_mode_dev_only_seed_model_change( with open(seed_model.seed_path, "a") as fd: fd.write("\n123,New Test Name") + context.clear_caches() context.load() seed_model_snapshot = context.get_snapshot("sushi.waiter_names") plan = context.plan_builder("dev").build() From 6ef58cc81d8b0e11e8f7f972bc4f8142cb203adc Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Tue, 2 Sep 2025 15:41:20 -0700 Subject: [PATCH 4/5] attempt to fix tests --- tests/core/test_integration.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 664c3138fb..3d43a9f298 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -2922,16 +2922,16 @@ def test_virtual_environment_mode_dev_only_model_kind_change_manual_categorizati def test_virtual_environment_mode_dev_only_seed_model_change( init_and_plan_context: t.Callable, ): - context, plan = init_and_plan_context( + context, _ = init_and_plan_context( "examples/sushi", config="test_config_virtual_environment_mode_dev_only" ) - context.apply(plan) + context.load() + context.plan("prod", auto_apply=True, no_prompts=True) seed_model = context.get_model("sushi.waiter_names") with open(seed_model.seed_path, "a") as fd: fd.write("\n123,New Test Name") - context.clear_caches() context.load() seed_model_snapshot = context.get_snapshot("sushi.waiter_names") plan = context.plan_builder("dev").build() From 6d441e6426fbcf81be1448dd05478ed5084dd323 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Tue, 2 Sep 2025 17:39:46 -0700 Subject: [PATCH 5/5] one more edge case --- sqlmesh/core/scheduler.py | 9 +++++++- tests/core/test_integration.py | 39 ++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index dc6499c1a3..44d6b14c10 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -474,10 +474,17 @@ def run_merged_intervals( execution_time=execution_time, ) + # We only need to create physical tables if the snapshot is not representative or if it + # needs backfill + snapshots_to_create_candidates = [ + s + for s in selected_snapshots + if not deployability_index.is_representative(s) or s in batched_intervals + ] snapshots_to_create = { s.snapshot_id for s in self.snapshot_evaluator.get_snapshots_to_create( - selected_snapshots, deployability_index + snapshots_to_create_candidates, deployability_index ) } diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 3d43a9f298..c00733238a 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -2954,6 +2954,45 @@ def test_virtual_environment_mode_dev_only_seed_model_change( assert actual_seed_df_in_prod.to_dict("records") == [{"id": 123, "name": "New Test Name"}] +@time_machine.travel("2023-01-08 15:00:00 UTC") +def test_virtual_environment_mode_dev_only_model_change_downstream_of_seed( + init_and_plan_context: t.Callable, +): + """This test covers a scenario when a model downstream of a seed model is modified and explicitly selected + causing an (unhydrated) seed model to sourced from the state. If SQLMesh attempts to create + a table for the unchanged seed model, it will fail because the seed model is not hydrated. + """ + context, _ = init_and_plan_context( + "examples/sushi", config="test_config_virtual_environment_mode_dev_only" + ) + context.load() + context.plan("prod", auto_apply=True, no_prompts=True) + + # Make sure that a different version of the seed model is loaded + seed_model = context.get_model("sushi.waiter_names") + seed_model = seed_model.copy(update={"stamp": "force new version"}) + context.upsert_model(seed_model) + + # Make a change to the downstream model + model = context.get_model("sushi.waiter_as_customer_by_day") + model = model.copy(update={"stamp": "force new version"}) + context.upsert_model(model) + + # It is important to clear the cache so that the hydrated seed model is not sourced from the cache + context.clear_caches() + + # Make sure to use the selector so that the seed model is sourced from the state + plan = context.plan_builder("dev", select_models=[model.name]).build() + assert len(plan.directly_modified) == 1 + assert list(plan.directly_modified)[0].name == model.fqn + assert len(plan.missing_intervals) == 1 + assert plan.missing_intervals[0].snapshot_id.name == model.fqn + + # Make sure there's no error when applying the plan + context.apply(plan) + context.plan("prod", auto_apply=True, no_prompts=True) + + @time_machine.travel("2023-01-08 15:00:00 UTC") def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable): context, plan = init_and_plan_context("examples/sushi")