Skip to content

Commit ffc202b

Browse files
committed
Fix: Support a proper migration of seed models
1 parent 50aee2c commit ffc202b

File tree

5 files changed

+78
-5
lines changed

5 files changed

+78
-5
lines changed

sqlmesh/core/plan/common.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@ def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool:
1616
if new.is_view and new.is_indirect_non_breaking and not new.is_forward_only:
1717
# View models always need to be rebuilt to reflect updated upstream dependencies
1818
return True
19-
if new.is_seed and not new.is_metadata:
19+
if new.is_seed and not (
20+
new.is_metadata
21+
and new.previous_version
22+
and new.previous_version.snapshot_id(new.name) == old.snapshot_id
23+
):
2024
# Seed models always need to be rebuilt to reflect changes in the seed file
2125
# Unless only their metadata has been updated (eg description added) and the seed file has not been touched
2226
return True

sqlmesh/core/plan/stages.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
268268
before_promote_snapshots = {
269269
s.snapshot_id
270270
for s in snapshots.values()
271-
if deployability_index.is_representative(s)
271+
if (deployability_index.is_representative(s) or s.is_seed)
272272
and plan.is_selected_for_backfill(s.name)
273273
}
274274
after_promote_snapshots = all_selected_for_backfill_snapshots - before_promote_snapshots

sqlmesh/core/snapshot/evaluator.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,10 +1138,10 @@ def _migrate_target_table(
11381138
) -> None:
11391139
adapter = self.get_adapter(snapshot.model.gateway)
11401140

1141-
target_table = exp.to_table(target_table_name)
1142-
target_table.this.set("this", f"{target_table.name}_schema_tmp")
1141+
tmp_table = exp.to_table(target_table_name)
1142+
tmp_table.this.set("this", f"{tmp_table.name}_schema_tmp")
1143+
tmp_table_name = tmp_table.sql()
11431144

1144-
tmp_table_name = target_table.sql()
11451145
if snapshot.is_materialized:
11461146
self._execute_create(
11471147
snapshot=snapshot,
@@ -2182,6 +2182,18 @@ def create(
21822182
self.adapter.drop_table(table_name)
21832183
raise
21842184

2185+
def migrate(
2186+
self,
2187+
target_table_name: str,
2188+
source_table_name: str,
2189+
snapshot: Snapshot,
2190+
*,
2191+
ignore_destructive: bool,
2192+
ignore_additive: bool,
2193+
**kwargs: t.Any,
2194+
) -> None:
2195+
raise NotImplementedError("Seeds do not support migrations.")
2196+
21852197
def insert(
21862198
self,
21872199
table_name: str,

tests/core/test_integration.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3232,6 +3232,62 @@ def test_virtual_environment_mode_dev_only_model_change_standalone_audit(
32323232
context.apply(plan)
32333233

32343234

3235+
@time_machine.travel("2023-01-08 15:00:00 UTC")
3236+
def test_virtual_environment_mode_dev_only_seed_model_change_schema(
3237+
init_and_plan_context: t.Callable,
3238+
):
3239+
context, plan = init_and_plan_context(
3240+
"examples/sushi", config="test_config_virtual_environment_mode_dev_only"
3241+
)
3242+
context.apply(plan)
3243+
3244+
new_csv = []
3245+
with open(context.path / "seeds" / "waiter_names.csv", "r") as fd:
3246+
is_header = True
3247+
for idx, line in enumerate(fd):
3248+
line = line.strip()
3249+
if not line:
3250+
continue
3251+
if is_header:
3252+
new_csv.append(line + ",new_column")
3253+
is_header = False
3254+
else:
3255+
new_csv.append(line + f",v{idx}")
3256+
3257+
with open(context.path / "seeds" / "waiter_names.csv", "w") as fd:
3258+
fd.write("\n".join(new_csv))
3259+
3260+
context.load()
3261+
3262+
downstream_model = context.get_model("sushi.waiter_as_customer_by_day")
3263+
downstream_model_kind = downstream_model.kind.dict()
3264+
downstream_model_kwargs = {
3265+
**downstream_model.dict(),
3266+
"kind": {
3267+
**downstream_model_kind,
3268+
"on_destructive_change": "allow",
3269+
},
3270+
"audits": [],
3271+
# Use the new column
3272+
"query": "SELECT '2023-01-07' AS event_date, new_column AS new_column FROM sushi.waiter_names",
3273+
}
3274+
context.upsert_model(SqlModel.parse_obj(downstream_model_kwargs))
3275+
3276+
context.plan("dev", auto_apply=True, no_prompts=True, skip_tests=True, enable_preview=True)
3277+
3278+
assert (
3279+
context.engine_adapter.fetchone(
3280+
"SELECT COUNT(*) FROM sushi__dev.waiter_as_customer_by_day"
3281+
)[0]
3282+
== len(new_csv) - 1
3283+
)
3284+
3285+
# Deploy to prod
3286+
context.clear_caches()
3287+
context.plan("prod", auto_apply=True, no_prompts=True, skip_tests=True)
3288+
assert "new_column" in context.engine_adapter.columns("sushi.waiter_as_customer_by_day")
3289+
3290+
32353291
@time_machine.travel("2023-01-08 15:00:00 UTC")
32363292
def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable):
32373293
context, plan = init_and_plan_context("examples/sushi")

tests/core/test_plan.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,6 +1214,7 @@ def test_seed_model_metadata_change_no_missing_intervals(
12141214
description="foo",
12151215
)
12161216
)
1217+
snapshot_a_metadata_updated.previous_versions = snapshot_a.all_versions
12171218
assert snapshot_a_metadata_updated.version is None
12181219
assert snapshot_a_metadata_updated.change_category is None
12191220

0 commit comments

Comments
 (0)