diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index 0907b39987..af28f75932 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -2076,7 +2076,7 @@ def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[st if text_diff: self._print("") self._print(Syntax(text_diff, "sql", word_wrap=True)) - self._print(tree) + self._print(tree) def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None: """Displays the models with missing dates.""" diff --git a/sqlmesh/core/plan/common.py b/sqlmesh/core/plan/common.py index 8d31b0ead3..929837eb7e 100644 --- a/sqlmesh/core/plan/common.py +++ b/sqlmesh/core/plan/common.py @@ -5,7 +5,10 @@ def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool: if new.is_view and new.is_indirect_non_breaking and not new.is_forward_only: - # View models always need to be rebuilt to reflect updated upstream dependencies. + # View models always need to be rebuilt to reflect updated upstream dependencies + return True + if new.is_seed: + # Seed models always need to be rebuilt to reflect changes in the seed file return True return is_breaking_kind_change(old, new) diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index dc6499c1a3..44d6b14c10 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -474,10 +474,17 @@ def run_merged_intervals( execution_time=execution_time, ) + # We only need to create physical tables if the snapshot is not representative or if it + # needs backfill + snapshots_to_create_candidates = [ + s + for s in selected_snapshots + if not deployability_index.is_representative(s) or s in batched_intervals + ] snapshots_to_create = { s.snapshot_id for s in self.snapshot_evaluator.get_snapshots_to_create( - selected_snapshots, deployability_index + snapshots_to_create_candidates, deployability_index ) } diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index c53c0a88db..6d6525a771 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -2102,21 +2102,18 @@ def create( return super().create(table_name, model, is_table_deployable, render_kwargs, **kwargs) - if is_table_deployable: - # For seeds we insert data at the time of table creation. - try: - for index, df in enumerate(model.render_seed()): - if index == 0: - self._replace_query_for_model( - model, table_name, df, render_kwargs, **kwargs - ) - else: - self.adapter.insert_append( - table_name, df, target_columns_to_types=model.columns_to_types - ) - except Exception: - self.adapter.drop_table(table_name) - raise + # For seeds we insert data at the time of table creation. + try: + for index, df in enumerate(model.render_seed()): + if index == 0: + self._replace_query_for_model(model, table_name, df, render_kwargs, **kwargs) + else: + self.adapter.insert_append( + table_name, df, target_columns_to_types=model.columns_to_types + ) + except Exception: + self.adapter.drop_table(table_name) + raise def insert( self, diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index c22e904374..c00733238a 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -2918,6 +2918,81 @@ def test_virtual_environment_mode_dev_only_model_kind_change_manual_categorizati ] +@time_machine.travel("2023-01-08 15:00:00 UTC") +def test_virtual_environment_mode_dev_only_seed_model_change( + init_and_plan_context: t.Callable, +): + context, _ = init_and_plan_context( + "examples/sushi", config="test_config_virtual_environment_mode_dev_only" + ) + context.load() + context.plan("prod", auto_apply=True, no_prompts=True) + + seed_model = context.get_model("sushi.waiter_names") + with open(seed_model.seed_path, "a") as fd: + fd.write("\n123,New Test Name") + + context.load() + seed_model_snapshot = context.get_snapshot("sushi.waiter_names") + plan = context.plan_builder("dev").build() + assert plan.directly_modified == {seed_model_snapshot.snapshot_id} + assert len(plan.missing_intervals) == 2 + context.apply(plan) + + actual_seed_df_in_dev = context.fetchdf("SELECT * FROM sushi__dev.waiter_names WHERE id = 123") + assert actual_seed_df_in_dev.to_dict("records") == [{"id": 123, "name": "New Test Name"}] + actual_seed_df_in_prod = context.fetchdf("SELECT * FROM sushi.waiter_names WHERE id = 123") + assert actual_seed_df_in_prod.empty + + plan = context.plan_builder("prod").build() + assert plan.directly_modified == {seed_model_snapshot.snapshot_id} + assert len(plan.missing_intervals) == 1 + assert plan.missing_intervals[0].snapshot_id == seed_model_snapshot.snapshot_id + context.apply(plan) + + actual_seed_df_in_prod = context.fetchdf("SELECT * FROM sushi.waiter_names WHERE id = 123") + assert actual_seed_df_in_prod.to_dict("records") == [{"id": 123, "name": "New Test Name"}] + + +@time_machine.travel("2023-01-08 15:00:00 UTC") +def test_virtual_environment_mode_dev_only_model_change_downstream_of_seed( + init_and_plan_context: t.Callable, +): + """This test covers a scenario when a model downstream of a seed model is modified and explicitly selected + causing an (unhydrated) seed model to sourced from the state. If SQLMesh attempts to create + a table for the unchanged seed model, it will fail because the seed model is not hydrated. + """ + context, _ = init_and_plan_context( + "examples/sushi", config="test_config_virtual_environment_mode_dev_only" + ) + context.load() + context.plan("prod", auto_apply=True, no_prompts=True) + + # Make sure that a different version of the seed model is loaded + seed_model = context.get_model("sushi.waiter_names") + seed_model = seed_model.copy(update={"stamp": "force new version"}) + context.upsert_model(seed_model) + + # Make a change to the downstream model + model = context.get_model("sushi.waiter_as_customer_by_day") + model = model.copy(update={"stamp": "force new version"}) + context.upsert_model(model) + + # It is important to clear the cache so that the hydrated seed model is not sourced from the cache + context.clear_caches() + + # Make sure to use the selector so that the seed model is sourced from the state + plan = context.plan_builder("dev", select_models=[model.name]).build() + assert len(plan.directly_modified) == 1 + assert list(plan.directly_modified)[0].name == model.fqn + assert len(plan.missing_intervals) == 1 + assert plan.missing_intervals[0].snapshot_id.name == model.fqn + + # Make sure there's no error when applying the plan + context.apply(plan) + context.plan("prod", auto_apply=True, no_prompts=True) + + @time_machine.travel("2023-01-08 15:00:00 UTC") def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable): context, plan = init_and_plan_context("examples/sushi")