Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sqlmesh/core/plan/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool:
if new.is_view and new.is_indirect_non_breaking and not new.is_forward_only:
# View models always need to be rebuilt to reflect updated upstream dependencies
return True
if new.is_seed and not new.is_metadata:
if new.is_seed and not (
new.is_metadata
and new.previous_version
and new.previous_version.snapshot_id(new.name) == old.snapshot_id
):
# Seed models always need to be rebuilt to reflect changes in the seed file
# Unless only their metadata has been updated (eg description added) and the seed file has not been touched
return True
Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/plan/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
before_promote_snapshots = {
s.snapshot_id
for s in snapshots.values()
if deployability_index.is_representative(s)
if (deployability_index.is_representative(s) or s.is_seed)
and plan.is_selected_for_backfill(s.name)
}
after_promote_snapshots = all_selected_for_backfill_snapshots - before_promote_snapshots
Expand Down
18 changes: 15 additions & 3 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1138,10 +1138,10 @@ def _migrate_target_table(
) -> None:
adapter = self.get_adapter(snapshot.model.gateway)

target_table = exp.to_table(target_table_name)
target_table.this.set("this", f"{target_table.name}_schema_tmp")
tmp_table = exp.to_table(target_table_name)
tmp_table.this.set("this", f"{tmp_table.name}_schema_tmp")
tmp_table_name = tmp_table.sql()

tmp_table_name = target_table.sql()
if snapshot.is_materialized:
self._execute_create(
snapshot=snapshot,
Expand Down Expand Up @@ -2182,6 +2182,18 @@ def create(
self.adapter.drop_table(table_name)
raise

def migrate(
self,
target_table_name: str,
source_table_name: str,
snapshot: Snapshot,
*,
ignore_destructive: bool,
ignore_additive: bool,
**kwargs: t.Any,
) -> None:
raise NotImplementedError("Seeds do not support migrations.")

def insert(
self,
table_name: str,
Expand Down
56 changes: 56 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3232,6 +3232,62 @@ def test_virtual_environment_mode_dev_only_model_change_standalone_audit(
context.apply(plan)


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_virtual_environment_mode_dev_only_seed_model_change_schema(
init_and_plan_context: t.Callable,
):
context, plan = init_and_plan_context(
"examples/sushi", config="test_config_virtual_environment_mode_dev_only"
)
context.apply(plan)

new_csv = []
with open(context.path / "seeds" / "waiter_names.csv", "r") as fd:
is_header = True
for idx, line in enumerate(fd):
line = line.strip()
if not line:
continue
if is_header:
new_csv.append(line + ",new_column")
is_header = False
else:
new_csv.append(line + f",v{idx}")

with open(context.path / "seeds" / "waiter_names.csv", "w") as fd:
fd.write("\n".join(new_csv))

context.load()

downstream_model = context.get_model("sushi.waiter_as_customer_by_day")
downstream_model_kind = downstream_model.kind.dict()
downstream_model_kwargs = {
**downstream_model.dict(),
"kind": {
**downstream_model_kind,
"on_destructive_change": "allow",
},
"audits": [],
# Use the new column
"query": "SELECT '2023-01-07' AS event_date, new_column AS new_column FROM sushi.waiter_names",
}
context.upsert_model(SqlModel.parse_obj(downstream_model_kwargs))

context.plan("dev", auto_apply=True, no_prompts=True, skip_tests=True, enable_preview=True)

assert (
context.engine_adapter.fetchone(
"SELECT COUNT(*) FROM sushi__dev.waiter_as_customer_by_day"
)[0]
== len(new_csv) - 1
)

# Deploy to prod
context.clear_caches()
context.plan("prod", auto_apply=True, no_prompts=True, skip_tests=True)
assert "new_column" in context.engine_adapter.columns("sushi.waiter_as_customer_by_day")


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable):
context, plan = init_and_plan_context("examples/sushi")
Expand Down
1 change: 1 addition & 0 deletions tests/core/test_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,7 @@ def test_seed_model_metadata_change_no_missing_intervals(
description="foo",
)
)
snapshot_a_metadata_updated.previous_versions = snapshot_a.all_versions
assert snapshot_a_metadata_updated.version is None
assert snapshot_a_metadata_updated.change_category is None

Expand Down