diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index c124c2098f..35109ec36e 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -1644,6 +1644,7 @@ def create( snapshot.is_valid_start(start, snapshot_start) if start is not None else True ) + children_deployable = is_valid_start and not has_auto_restatement if ( snapshot.is_forward_only or snapshot.is_indirect_non_breaking @@ -1660,15 +1661,9 @@ def create( ): # This snapshot represents what's currently deployed in prod. representative_shared_version_ids.add(node) - - # A child can still be deployable even if its parent is not - children_deployable = ( - is_valid_start - and not ( - snapshot.is_paused and (snapshot.is_forward_only or is_forward_only_model) - ) - and not has_auto_restatement - ) + else: + # If the parent is not representative then its children can't be deployable. + children_deployable = False else: children_deployable = False if not snapshots[node].is_paused: diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 948882c4dc..8d4991adb9 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -1347,6 +1347,46 @@ def test_indirect_non_breaking_change_after_forward_only_in_dev(init_and_plan_co ) +@time_machine.travel("2023-01-08 15:00:00 UTC") +def test_changes_downstream_of_indirect_non_breaking_snapshot_without_intervals( + init_and_plan_context: t.Callable, +): + context, plan = init_and_plan_context("examples/sushi") + context.apply(plan) + + # Make a breaking change first but don't backfill it + model = context.get_model("sushi.orders") + model = model.copy(update={"stamp": "force new version"}) + context.upsert_model(model) + plan_builder = context.plan_builder( + "dev", skip_backfill=True, skip_tests=True, no_auto_categorization=True + ) + plan_builder.set_choice(context.get_snapshot(model), SnapshotChangeCategory.BREAKING) + context.apply(plan_builder.build()) + + # Now make a non-breaking change to the same snapshot. + model = model.copy(update={"stamp": "force another new version"}) + context.upsert_model(model) + plan_builder = context.plan_builder( + "dev", skip_backfill=True, skip_tests=True, no_auto_categorization=True + ) + plan_builder.set_choice(context.get_snapshot(model), SnapshotChangeCategory.NON_BREAKING) + context.apply(plan_builder.build()) + + # Now make a change to a model downstream of the above model. + downstream_model = context.get_model("sushi.top_waiters") + downstream_model = downstream_model.copy(update={"stamp": "yet another new version"}) + context.upsert_model(downstream_model) + plan = context.plan_builder("dev", skip_tests=True).build() + + # If the parent is not representative then the child cannot be deployable + deployability_index = plan.deployability_index + assert not deployability_index.is_representative( + context.get_snapshot("sushi.waiter_revenue_by_day") + ) + assert not deployability_index.is_deployable(context.get_snapshot("sushi.top_waiters")) + + @time_machine.travel("2023-01-08 15:00:00 UTC", tick=True) def test_metadata_change_after_forward_only_results_in_migration(init_and_plan_context: t.Callable): context, plan = init_and_plan_context("examples/sushi")