Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sqlmesh/core/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -2076,7 +2076,7 @@ def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[st
if text_diff:
self._print("")
self._print(Syntax(text_diff, "sql", word_wrap=True))
self._print(tree)
self._print(tree)

def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None:
"""Displays the models with missing dates."""
Expand Down
5 changes: 4 additions & 1 deletion sqlmesh/core/plan/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool:
if new.is_view and new.is_indirect_non_breaking and not new.is_forward_only:
# View models always need to be rebuilt to reflect updated upstream dependencies.
# View models always need to be rebuilt to reflect updated upstream dependencies
return True
if new.is_seed:
# Seed models always need to be rebuilt to reflect changes in the seed file
return True
return is_breaking_kind_change(old, new)

Expand Down
9 changes: 8 additions & 1 deletion sqlmesh/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,10 +474,17 @@ def run_merged_intervals(
execution_time=execution_time,
)

# We only need to create physical tables if the snapshot is not representative or if it
# needs backfill
snapshots_to_create_candidates = [
s
for s in selected_snapshots
if not deployability_index.is_representative(s) or s in batched_intervals
]
snapshots_to_create = {
s.snapshot_id
for s in self.snapshot_evaluator.get_snapshots_to_create(
selected_snapshots, deployability_index
snapshots_to_create_candidates, deployability_index
)
}

Expand Down
27 changes: 12 additions & 15 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2102,21 +2102,18 @@ def create(
return

super().create(table_name, model, is_table_deployable, render_kwargs, **kwargs)
if is_table_deployable:
# For seeds we insert data at the time of table creation.
try:
for index, df in enumerate(model.render_seed()):
if index == 0:
self._replace_query_for_model(
model, table_name, df, render_kwargs, **kwargs
)
else:
self.adapter.insert_append(
table_name, df, target_columns_to_types=model.columns_to_types
)
except Exception:
self.adapter.drop_table(table_name)
raise
# For seeds we insert data at the time of table creation.
try:
for index, df in enumerate(model.render_seed()):
if index == 0:
self._replace_query_for_model(model, table_name, df, render_kwargs, **kwargs)
else:
self.adapter.insert_append(
table_name, df, target_columns_to_types=model.columns_to_types
)
except Exception:
self.adapter.drop_table(table_name)
raise

def insert(
self,
Expand Down
75 changes: 75 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2918,6 +2918,81 @@ def test_virtual_environment_mode_dev_only_model_kind_change_manual_categorizati
]


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_virtual_environment_mode_dev_only_seed_model_change(
init_and_plan_context: t.Callable,
):
context, _ = init_and_plan_context(
"examples/sushi", config="test_config_virtual_environment_mode_dev_only"
)
context.load()
context.plan("prod", auto_apply=True, no_prompts=True)

seed_model = context.get_model("sushi.waiter_names")
with open(seed_model.seed_path, "a") as fd:
fd.write("\n123,New Test Name")

context.load()
seed_model_snapshot = context.get_snapshot("sushi.waiter_names")
plan = context.plan_builder("dev").build()
assert plan.directly_modified == {seed_model_snapshot.snapshot_id}
assert len(plan.missing_intervals) == 2
context.apply(plan)

actual_seed_df_in_dev = context.fetchdf("SELECT * FROM sushi__dev.waiter_names WHERE id = 123")
assert actual_seed_df_in_dev.to_dict("records") == [{"id": 123, "name": "New Test Name"}]
actual_seed_df_in_prod = context.fetchdf("SELECT * FROM sushi.waiter_names WHERE id = 123")
assert actual_seed_df_in_prod.empty

plan = context.plan_builder("prod").build()
assert plan.directly_modified == {seed_model_snapshot.snapshot_id}
assert len(plan.missing_intervals) == 1
assert plan.missing_intervals[0].snapshot_id == seed_model_snapshot.snapshot_id
context.apply(plan)

actual_seed_df_in_prod = context.fetchdf("SELECT * FROM sushi.waiter_names WHERE id = 123")
assert actual_seed_df_in_prod.to_dict("records") == [{"id": 123, "name": "New Test Name"}]


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_virtual_environment_mode_dev_only_model_change_downstream_of_seed(
init_and_plan_context: t.Callable,
):
"""This test covers a scenario when a model downstream of a seed model is modified and explicitly selected
causing an (unhydrated) seed model to sourced from the state. If SQLMesh attempts to create
a table for the unchanged seed model, it will fail because the seed model is not hydrated.
"""
context, _ = init_and_plan_context(
"examples/sushi", config="test_config_virtual_environment_mode_dev_only"
)
context.load()
context.plan("prod", auto_apply=True, no_prompts=True)

# Make sure that a different version of the seed model is loaded
seed_model = context.get_model("sushi.waiter_names")
seed_model = seed_model.copy(update={"stamp": "force new version"})
context.upsert_model(seed_model)

# Make a change to the downstream model
model = context.get_model("sushi.waiter_as_customer_by_day")
model = model.copy(update={"stamp": "force new version"})
context.upsert_model(model)

# It is important to clear the cache so that the hydrated seed model is not sourced from the cache
context.clear_caches()

# Make sure to use the selector so that the seed model is sourced from the state
plan = context.plan_builder("dev", select_models=[model.name]).build()
assert len(plan.directly_modified) == 1
assert list(plan.directly_modified)[0].name == model.fqn
assert len(plan.missing_intervals) == 1
assert plan.missing_intervals[0].snapshot_id.name == model.fqn

# Make sure there's no error when applying the plan
context.apply(plan)
context.plan("prod", auto_apply=True, no_prompts=True)


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable):
context, plan = init_and_plan_context("examples/sushi")
Expand Down