Skip to content

Commit ff2e7f5

Browse files
committed
Fix: Recreate MVs on first insert
1 parent 029d8d5 commit ff2e7f5

File tree

3 files changed

+79
-55
lines changed

3 files changed

+79
-55
lines changed

sqlmesh/core/snapshot/evaluator.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2300,13 +2300,19 @@ def insert(
23002300
render_kwargs: t.Dict[str, t.Any],
23012301
**kwargs: t.Any,
23022302
) -> None:
2303-
snapshot = kwargs["snapshot"]
2303+
# We should recreate MVs across supported engines (Snowflake, BigQuery etc) because
2304+
# if upstream tables were recreated (e.g FULL models), the MVs would be silently invalidated.
2305+
# The only exception to that rule is RisingWave which doesn't support CREATE OR REPLACE, so upstream
2306+
# models don't recreate their physical tables for the MVs to be invalidated.
2307+
# However, even for RW we still want to recreate MVs to avoid stale references, as is the case with normal views.
2308+
# The flag is_first_insert is used for that matter as a signal to recreate the MV if the snapshot's intervals
2309+
# have been cleared by `should_force_rebuild`
2310+
is_materialized_view = self._is_materialized_view(model)
2311+
must_recreate_view = not self.adapter.HAS_VIEW_BINDING or (
2312+
is_materialized_view and is_first_insert
2313+
)
23042314

2305-
if (
2306-
not snapshot.is_materialized_view
2307-
and self.adapter.HAS_VIEW_BINDING
2308-
and self.adapter.table_exists(table_name)
2309-
):
2315+
if self.adapter.table_exists(table_name) and not must_recreate_view:
23102316
logger.info("Skipping creation of the view '%s'", table_name)
23112317
return
23122318

@@ -2315,8 +2321,8 @@ def insert(
23152321
table_name,
23162322
query_or_df,
23172323
model.columns_to_types,
2318-
replace=not self.adapter.HAS_VIEW_BINDING,
2319-
materialized=self._is_materialized_view(model),
2324+
replace=must_recreate_view,
2325+
materialized=is_materialized_view,
23202326
view_properties=kwargs.get("physical_properties", model.physical_properties),
23212327
table_description=model.description,
23222328
column_descriptions=model.column_descriptions,

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
import typing as t
88
import shutil
99
from datetime import datetime, timedelta, date
10+
from unittest import mock
1011
from unittest.mock import patch
12+
import logging
13+
1114
import numpy as np # noqa: TID253
1215
import pandas as pd # noqa: TID253
1316
import pytest
@@ -3748,3 +3751,65 @@ def _set_config(gateway: str, config: Config) -> None:
37483751
"incremental_model",
37493752
"seed_model",
37503753
]
3754+
3755+
3756+
def test_materialized_view_evaluation(ctx: TestContext, mocker: MockerFixture):
3757+
adapter = ctx.engine_adapter
3758+
dialect = ctx.dialect
3759+
3760+
if not adapter.SUPPORTS_MATERIALIZED_VIEWS:
3761+
pytest.skip(f"Skipping engine {dialect} as it does not support materialized views")
3762+
elif dialect in ("snowflake", "databricks"):
3763+
pytest.skip(f"Skipping {dialect} as they're not enabled on standard accounts")
3764+
3765+
model_name = ctx.table("test_tbl")
3766+
mview_name = ctx.table("test_mview")
3767+
3768+
sqlmesh = ctx.create_context()
3769+
3770+
sqlmesh.upsert_model(
3771+
load_sql_based_model(
3772+
d.parse(
3773+
f"""
3774+
MODEL (name {model_name}, kind FULL);
3775+
3776+
SELECT 1 AS col
3777+
"""
3778+
)
3779+
)
3780+
)
3781+
3782+
sqlmesh.upsert_model(
3783+
load_sql_based_model(
3784+
d.parse(
3785+
f"""
3786+
MODEL (name {mview_name}, kind VIEW (materialized true));
3787+
3788+
SELECT * FROM {model_name}
3789+
"""
3790+
)
3791+
)
3792+
)
3793+
3794+
def _assert_mview_value(value: int):
3795+
df = adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=dialect)}")
3796+
assert df["col"][0] == value
3797+
3798+
# Case 1: Ensure that plan is successful and we can query the materialized view
3799+
sqlmesh.plan(auto_apply=True, no_prompts=True)
3800+
3801+
_assert_mview_value(value=1)
3802+
3803+
# Case 2: Ensure that we can change the underlying table and the materialized view is recreated
3804+
sqlmesh.upsert_model(
3805+
load_sql_based_model(d.parse(f"""MODEL (name {model_name}, kind FULL); SELECT 2 AS col"""))
3806+
)
3807+
3808+
logger = logging.getLogger("sqlmesh.core.snapshot.evaluator")
3809+
3810+
with mock.patch.object(logger, "info") as mock_logger:
3811+
sqlmesh.plan(auto_apply=True, no_prompts=True)
3812+
3813+
assert any("Replacing view" in call[0][0] for call in mock_logger.call_args_list)
3814+
3815+
_assert_mview_value(value=2)

tests/core/engine_adapter/integration/test_integration_bigquery.py

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -441,53 +441,6 @@ def test_table_diff_table_name_matches_column_name(ctx: TestContext):
441441
assert row_diff.full_match_count == 1
442442

443443

444-
def test_materialized_view_evaluation(ctx: TestContext, engine_adapter: BigQueryEngineAdapter):
445-
model_name = ctx.table("test_tbl")
446-
mview_name = ctx.table("test_mview")
447-
448-
sqlmesh = ctx.create_context()
449-
450-
sqlmesh.upsert_model(
451-
load_sql_based_model(
452-
d.parse(
453-
f"""
454-
MODEL (name {model_name}, kind FULL);
455-
456-
SELECT 1 AS col
457-
"""
458-
)
459-
)
460-
)
461-
462-
sqlmesh.upsert_model(
463-
load_sql_based_model(
464-
d.parse(
465-
f"""
466-
MODEL (name {mview_name}, kind VIEW (materialized true));
467-
468-
SELECT * FROM {model_name}
469-
"""
470-
)
471-
)
472-
)
473-
474-
# Case 1: Ensure that plan is successful and we can query the materialized view
475-
sqlmesh.plan(auto_apply=True, no_prompts=True)
476-
477-
df = engine_adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=ctx.dialect)}")
478-
assert df["col"][0] == 1
479-
480-
# Case 2: Ensure that we can change the underlying table and the materialized view is recreated
481-
sqlmesh.upsert_model(
482-
load_sql_based_model(d.parse(f"""MODEL (name {model_name}, kind FULL); SELECT 2 AS col"""))
483-
)
484-
485-
sqlmesh.plan(auto_apply=True, no_prompts=True)
486-
487-
df = engine_adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=ctx.dialect)}")
488-
assert df["col"][0] == 2
489-
490-
491444
def test_correlation_id_in_job_labels(ctx: TestContext):
492445
model_name = ctx.table("test")
493446

0 commit comments

Comments
 (0)