Skip to content

Commit d547773

Browse files
committed
Fix: Always treat forward-only models as non-deployable (#3510)
1 parent c9057d4 commit d547773

File tree

5 files changed

+86
-25
lines changed

5 files changed

+86
-25
lines changed

sqlmesh/core/scheduler.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,9 @@ def merged_missing_intervals(
183183
validate_date_range(start, end)
184184

185185
snapshots: t.Collection[Snapshot] = self.snapshot_per_version.values()
186-
if selected_snapshots is not None:
187-
snapshots = [s for s in snapshots if s.name in selected_snapshots]
188-
189186
self.state_sync.refresh_snapshot_intervals(snapshots)
190187

191-
return compute_interval_params(
188+
snapshots_to_intervals = compute_interval_params(
192189
snapshots,
193190
start=start or earliest_start_date(snapshots),
194191
end=end or now(),
@@ -199,6 +196,13 @@ def merged_missing_intervals(
199196
ignore_cron=ignore_cron,
200197
end_bounded=end_bounded,
201198
)
199+
# Filtering snapshots after computing missing intervals because we need all snapshots in order
200+
# to correctly infer start dates.
201+
if selected_snapshots is not None:
202+
snapshots_to_intervals = {
203+
s: i for s, i in snapshots_to_intervals.items() if s.name in selected_snapshots
204+
}
205+
return snapshots_to_intervals
202206

203207
def evaluate(
204208
self,

sqlmesh/core/snapshot/definition.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,13 +1308,7 @@ def _visit(node: SnapshotId, deployable: bool = True) -> None:
13081308

13091309
if deployable and node in snapshots:
13101310
snapshot = snapshots[node]
1311-
# Capture uncategorized snapshot which represents a forward-only model.
1312-
is_uncategorized_forward_only_model = (
1313-
snapshot.change_category is None
1314-
and snapshot.previous_versions
1315-
and snapshot.is_model
1316-
and snapshot.model.forward_only
1317-
)
1311+
is_forward_only_model = snapshot.is_model and snapshot.model.forward_only
13181312

13191313
is_valid_start = (
13201314
snapshot.is_valid_start(
@@ -1327,7 +1321,7 @@ def _visit(node: SnapshotId, deployable: bool = True) -> None:
13271321
if (
13281322
snapshot.is_forward_only
13291323
or snapshot.is_indirect_non_breaking
1330-
or is_uncategorized_forward_only_model
1324+
or is_forward_only_model
13311325
or not is_valid_start
13321326
):
13331327
# FORWARD_ONLY and INDIRECT_NON_BREAKING snapshots are not deployable by nature.
@@ -1340,8 +1334,7 @@ def _visit(node: SnapshotId, deployable: bool = True) -> None:
13401334
else:
13411335
this_deployable = True
13421336
children_deployable = is_valid_start and not (
1343-
snapshot.is_paused
1344-
and (snapshot.is_forward_only or is_uncategorized_forward_only_model)
1337+
snapshot.is_paused and (snapshot.is_forward_only or is_forward_only_model)
13451338
)
13461339
else:
13471340
this_deployable, children_deployable = False, False

tests/core/test_integration.py

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1774,6 +1774,72 @@ def test_new_forward_only_model_concurrent_versions(init_and_plan_context: t.Cal
17741774
assert df.to_dict() == {"ds": {0: "2023-01-07"}, "b": {0: None}}
17751775

17761776

1777+
@freeze_time("2023-01-08 15:00:00")
1778+
def test_new_forward_only_model_same_dev_environment(init_and_plan_context: t.Callable):
1779+
context, plan = init_and_plan_context("examples/sushi")
1780+
context.apply(plan)
1781+
1782+
new_model_expr = d.parse(
1783+
"""
1784+
MODEL (
1785+
name memory.sushi.new_model,
1786+
kind INCREMENTAL_BY_TIME_RANGE (
1787+
time_column ds,
1788+
forward_only TRUE,
1789+
on_destructive_change 'allow',
1790+
),
1791+
);
1792+
1793+
SELECT '2023-01-07' AS ds, 1 AS a;
1794+
"""
1795+
)
1796+
new_model = load_sql_based_model(new_model_expr)
1797+
1798+
# Add the first version of the model and apply it to dev.
1799+
context.upsert_model(new_model)
1800+
snapshot_a = context.get_snapshot(new_model.name)
1801+
plan_a = context.plan("dev", no_prompts=True)
1802+
snapshot_a = plan_a.snapshots[snapshot_a.snapshot_id]
1803+
1804+
assert snapshot_a.snapshot_id in plan_a.context_diff.new_snapshots
1805+
assert snapshot_a.snapshot_id in plan_a.context_diff.added
1806+
assert snapshot_a.change_category == SnapshotChangeCategory.BREAKING
1807+
1808+
context.apply(plan_a)
1809+
1810+
df = context.fetchdf("SELECT * FROM memory.sushi__dev.new_model")
1811+
assert df.to_dict() == {"ds": {0: "2023-01-07"}, "a": {0: 1}}
1812+
1813+
new_model_alt_expr = d.parse(
1814+
"""
1815+
MODEL (
1816+
name memory.sushi.new_model,
1817+
kind INCREMENTAL_BY_TIME_RANGE (
1818+
time_column ds,
1819+
forward_only TRUE,
1820+
on_destructive_change 'allow',
1821+
),
1822+
);
1823+
1824+
SELECT '2023-01-07' AS ds, 1 AS b;
1825+
"""
1826+
)
1827+
new_model_alt = load_sql_based_model(new_model_alt_expr)
1828+
1829+
# Add the second version of the model and apply it to the same environment.
1830+
context.upsert_model(new_model_alt)
1831+
snapshot_b = context.get_snapshot(new_model_alt.name)
1832+
1833+
context.invalidate_environment("dev", sync=True)
1834+
plan_b = context.plan("dev", no_prompts=True)
1835+
snapshot_b = plan_b.snapshots[snapshot_b.snapshot_id]
1836+
1837+
context.apply(plan_b)
1838+
1839+
df = context.fetchdf("SELECT * FROM memory.sushi__dev.new_model").replace({np.nan: None})
1840+
assert df.to_dict() == {"ds": {0: "2023-01-07"}, "b": {0: 1}}
1841+
1842+
17771843
def test_plan_twice_with_star_macro_yields_no_diff(tmp_path: Path):
17781844
init_example_project(tmp_path, dialect="duckdb")
17791845

@@ -2561,7 +2627,7 @@ def get_default_catalog_and_non_tables(
25612627
) = get_default_catalog_and_non_tables(metadata, context.default_catalog)
25622628
assert len(prod_views) == 13
25632629
assert len(dev_views) == 0
2564-
assert len(user_default_tables) == 13
2630+
assert len(user_default_tables) == 16
25652631
assert state_metadata.schemas == ["sqlmesh"]
25662632
assert {x.sql() for x in state_metadata.qualified_tables}.issuperset(
25672633
{
@@ -2580,7 +2646,7 @@ def get_default_catalog_and_non_tables(
25802646
) = get_default_catalog_and_non_tables(metadata, context.default_catalog)
25812647
assert len(prod_views) == 13
25822648
assert len(dev_views) == 13
2583-
assert len(user_default_tables) == 13
2649+
assert len(user_default_tables) == 16
25842650
assert len(non_default_tables) == 0
25852651
assert state_metadata.schemas == ["sqlmesh"]
25862652
assert {x.sql() for x in state_metadata.qualified_tables}.issuperset(
@@ -2600,7 +2666,7 @@ def get_default_catalog_and_non_tables(
26002666
) = get_default_catalog_and_non_tables(metadata, context.default_catalog)
26012667
assert len(prod_views) == 13
26022668
assert len(dev_views) == 26
2603-
assert len(user_default_tables) == 13
2669+
assert len(user_default_tables) == 16
26042670
assert len(non_default_tables) == 0
26052671
assert state_metadata.schemas == ["sqlmesh"]
26062672
assert {x.sql() for x in state_metadata.qualified_tables}.issuperset(
@@ -2621,7 +2687,7 @@ def get_default_catalog_and_non_tables(
26212687
) = get_default_catalog_and_non_tables(metadata, context.default_catalog)
26222688
assert len(prod_views) == 13
26232689
assert len(dev_views) == 13
2624-
assert len(user_default_tables) == 13
2690+
assert len(user_default_tables) == 16
26252691
assert len(non_default_tables) == 0
26262692
assert state_metadata.schemas == ["sqlmesh"]
26272693
assert {x.sql() for x in state_metadata.qualified_tables}.issuperset(

tests/core/test_snapshot.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1730,17 +1730,15 @@ def test_deployability_index_categorized_forward_only_model(make_snapshot):
17301730
snapshot_b.parents = (snapshot_a.snapshot_id,)
17311731
snapshot_b.categorize_as(SnapshotChangeCategory.METADATA)
17321732

1733-
# The fact that the model is forward only should be ignored if an actual category
1734-
# has been assigned.
17351733
deployability_index = DeployabilityIndex.create(
17361734
{s.snapshot_id: s for s in [snapshot_a, snapshot_b]}
17371735
)
17381736

1739-
assert deployability_index.is_deployable(snapshot_a)
1740-
assert deployability_index.is_deployable(snapshot_b)
1737+
assert not deployability_index.is_deployable(snapshot_a)
1738+
assert not deployability_index.is_deployable(snapshot_b)
17411739

1742-
assert deployability_index.is_representative(snapshot_a)
1743-
assert deployability_index.is_representative(snapshot_b)
1740+
assert not deployability_index.is_representative(snapshot_a)
1741+
assert not deployability_index.is_representative(snapshot_b)
17441742

17451743

17461744
def test_deployability_index_missing_parent(make_snapshot):

tests/integrations/jupyter/test_magics.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ def test_plan(
291291

292292
# TODO: Should this be going to stdout? This is printing the status updates for when each batch finishes for
293293
# the models and how long it took
294-
assert len(output.stdout.strip().split("\n")) == 22
294+
assert len(output.stdout.strip().split("\n")) == 23
295295
assert not output.stderr
296296
assert len(output.outputs) == 4
297297
text_output = convert_all_html_output_to_text(output)

0 commit comments

Comments
 (0)