Skip to content
Merged
8 changes: 8 additions & 0 deletions sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,14 @@ def _categorize_snapshot(
if mode == AutoCategorizationMode.FULL:
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only)
elif self._context_diff.indirectly_modified(snapshot.name):
if snapshot.is_materialized_view and not forward_only:
# We categorize changes as breaking to allow for instantaneous switches in a virtual layer.
# Otherwise, there might be a potentially long downtime during MVs recreation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious to understand a bit more about this. What would cause this long downtime? Can you give an example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RisingWave doesn't support create or replace nor transactional DDL (although it can alter MV A_1 and swap it with another existing MV A_2). Previously, sqlmesh would drop and recreate the MV in case of indirect changes, making the data unavailable until the MV was rebuilt (and this can take a lot of time in case of larger MVs).

Even if an engine supports create or replace of some sort, I think having side-by-side versions of an MV allows for instantaneous rollbacks in a virtual layer.

# In the case of forward-only changes this optimization is not applicable because we want to continue
# using the same (existing) table version.
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good, but there's still a question what should be the correct behavior when forward_only is True, which will lead to the version being reused despite the INDIRECT_BREAKING category.

The forward_only flag can be set due to:

  1. Running sqlmesh plan --forward-only
  2. The virtual_environment_mode is set to dev_only in the project config, indicating that no virtual layer should be used in production

In both cases, it is the user's explicit intent to continue using the same (existing) table version. How should we handle these scenarios?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I'm not sure, but maybe we should keep the old behavior in case of forward-only changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check to preserve the original behavior for forward-only changes.

return

all_upstream_forward_only = set()
all_upstream_categories = set()
direct_parent_categories = set()
Expand Down
142 changes: 141 additions & 1 deletion tests/core/test_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
SqlModel,
ModelKindName,
)
from sqlmesh.core.model.kind import OnDestructiveChange, OnAdditiveChange
from sqlmesh.core.model.kind import OnDestructiveChange, OnAdditiveChange, ViewKind
from sqlmesh.core.model.seed import Seed
from sqlmesh.core.plan import Plan, PlanBuilder, SnapshotIntervals
from sqlmesh.core.snapshot import (
Expand Down Expand Up @@ -4161,3 +4161,143 @@ def test_plan_ignore_cron_flag(make_snapshot):
],
)
]


def test_indirect_change_to_materialized_view_is_breaking(make_snapshot):
snapshot_a_old = make_snapshot(
SqlModel(
name="a",
query=parse_one("select 1 as col_a"),
kind=ViewKind(materialized=True),
)
)
snapshot_a_old.categorize_as(SnapshotChangeCategory.BREAKING)

snapshot_b_old = make_snapshot(
SqlModel(
name="b",
query=parse_one("select col_a from a"),
kind=ViewKind(materialized=True),
),
nodes={'"a"': snapshot_a_old.model},
)
snapshot_b_old.categorize_as(SnapshotChangeCategory.BREAKING)

snapshot_a_new = make_snapshot(
SqlModel(
name="a",
query=parse_one("select 1 as col_a, 2 as col_b"),
kind=ViewKind(materialized=True),
)
)

snapshot_a_new.previous_versions = snapshot_a_old.all_versions

snapshot_b_new = make_snapshot(
snapshot_b_old.model,
nodes={'"a"': snapshot_a_new.model},
)
snapshot_b_new.previous_versions = snapshot_b_old.all_versions

context_diff = ContextDiff(
environment="test_environment",
is_new_environment=True,
is_unfinalized_environment=False,
normalize_environment_name=True,
create_from="prod",
create_from_env_exists=True,
added=set(),
removed_snapshots={},
modified_snapshots={
snapshot_a_new.name: (snapshot_a_new, snapshot_a_old),
snapshot_b_new.name: (snapshot_b_new, snapshot_b_old),
},
snapshots={
snapshot_a_new.snapshot_id: snapshot_a_new,
snapshot_b_new.snapshot_id: snapshot_b_new,
},
new_snapshots={
snapshot_a_new.snapshot_id: snapshot_a_new,
snapshot_b_new.snapshot_id: snapshot_b_new,
},
previous_plan_id=None,
previously_promoted_snapshot_ids=set(),
previous_finalized_snapshots=None,
previous_gateway_managed_virtual_layer=False,
gateway_managed_virtual_layer=False,
environment_statements=[],
)

PlanBuilder(context_diff, forward_only=False).build()

assert snapshot_b_new.change_category == SnapshotChangeCategory.INDIRECT_BREAKING


def test_forward_only_indirect_change_to_materialized_view(make_snapshot):
snapshot_a_old = make_snapshot(
SqlModel(
name="a",
query=parse_one("select 1 as col_a"),
)
)
snapshot_a_old.categorize_as(SnapshotChangeCategory.BREAKING)

snapshot_b_old = make_snapshot(
SqlModel(
name="b",
query=parse_one("select col_a from a"),
kind=ViewKind(materialized=True),
),
nodes={'"a"': snapshot_a_old.model},
)
snapshot_b_old.categorize_as(SnapshotChangeCategory.BREAKING)

snapshot_a_new = make_snapshot(
SqlModel(
name="a",
query=parse_one("select 1 as col_a, 2 as col_b"),
)
)

snapshot_a_new.previous_versions = snapshot_a_old.all_versions

snapshot_b_new = make_snapshot(
snapshot_b_old.model,
nodes={'"a"': snapshot_a_new.model},
)
snapshot_b_new.previous_versions = snapshot_b_old.all_versions

context_diff = ContextDiff(
environment="test_environment",
is_new_environment=True,
is_unfinalized_environment=False,
normalize_environment_name=True,
create_from="prod",
create_from_env_exists=True,
added=set(),
removed_snapshots={},
modified_snapshots={
snapshot_a_new.name: (snapshot_a_new, snapshot_a_old),
snapshot_b_new.name: (snapshot_b_new, snapshot_b_old),
},
snapshots={
snapshot_a_new.snapshot_id: snapshot_a_new,
snapshot_b_new.snapshot_id: snapshot_b_new,
},
new_snapshots={
snapshot_a_new.snapshot_id: snapshot_a_new,
snapshot_b_new.snapshot_id: snapshot_b_new,
},
previous_plan_id=None,
previously_promoted_snapshot_ids=set(),
previous_finalized_snapshots=None,
previous_gateway_managed_virtual_layer=False,
gateway_managed_virtual_layer=False,
environment_statements=[],
)

PlanBuilder(context_diff, forward_only=True).build()

# Forward-only indirect changes to MVs should not always be classified as indirect breaking.
# Instead, we want to preserve the standard categorization.
assert snapshot_b_new.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING
Loading