Skip to content

Commit 147d85d

Browse files
committed
Fix: Recreate MVs on first insert
1 parent f9eb71b commit 147d85d

File tree

4 files changed

+82
-59
lines changed

4 files changed

+82
-59
lines changed

.circleci/continue_config.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,10 +310,10 @@ workflows:
310310
- athena
311311
- fabric
312312
- gcp-postgres
313-
filters:
314-
branches:
315-
only:
316-
- main
313+
# filters:
314+
# branches:
315+
# only:
316+
# - main
317317
- ui_style
318318
- ui_test
319319
- vscode_test

sqlmesh/core/snapshot/evaluator.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2290,13 +2290,19 @@ def insert(
22902290
render_kwargs: t.Dict[str, t.Any],
22912291
**kwargs: t.Any,
22922292
) -> None:
2293-
snapshot = kwargs["snapshot"]
2293+
# We should recreate MVs across supported engines (Snowflake, BigQuery etc) because
2294+
# if upstream tables were recreated (e.g FULL models), the MVs would be silently invalidated.
2295+
# The only exception to that rule is RisingWave which doesn't support CREATE OR REPLACE, so upstream
2296+
# models don't recreate their physical tables for the MVs to be invalidated.
2297+
# However, even for RW we still want to recreate MVs to avoid stale references, as is the case with normal views.
2298+
# The flag is_first_insert is used for that matter as a signal to recreate the MV if the snapshot's intervals
2299+
# have been cleared by `should_force_rebuild`
2300+
is_materialized_view = self._is_materialized_view(model)
2301+
must_recreate_view = not self.adapter.HAS_VIEW_BINDING or (
2302+
is_materialized_view and is_first_insert
2303+
)
22942304

2295-
if (
2296-
not snapshot.is_materialized_view
2297-
and self.adapter.HAS_VIEW_BINDING
2298-
and self.adapter.table_exists(table_name)
2299-
):
2305+
if self.adapter.table_exists(table_name) and not must_recreate_view:
23002306
logger.info("Skipping creation of the view '%s'", table_name)
23012307
return
23022308

@@ -2305,8 +2311,8 @@ def insert(
23052311
table_name,
23062312
query_or_df,
23072313
model.columns_to_types,
2308-
replace=not self.adapter.HAS_VIEW_BINDING,
2309-
materialized=self._is_materialized_view(model),
2314+
replace=must_recreate_view,
2315+
materialized=is_materialized_view,
23102316
view_properties=kwargs.get("physical_properties", model.physical_properties),
23112317
table_description=model.description,
23122318
column_descriptions=model.column_descriptions,

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
import typing as t
88
import shutil
99
from datetime import datetime, timedelta
10+
import logging
1011

1112
import numpy as np # noqa: TID253
1213
import pandas as pd # noqa: TID253
1314
import pytest
1415
import pytz
16+
from unittest import mock
1517
from sqlglot import exp, parse_one
1618
from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
1719
from sqlglot.optimizer.qualify_columns import quote_identifiers
@@ -3697,3 +3699,65 @@ def _set_config(gateway: str, config: Config) -> None:
36973699
"incremental_model",
36983700
"seed_model",
36993701
]
3702+
3703+
3704+
def test_materialized_view_evaluation(ctx: TestContext, mocker: MockerFixture):
3705+
adapter = ctx.engine_adapter
3706+
dialect = ctx.dialect
3707+
3708+
if not adapter.SUPPORTS_MATERIALIZED_VIEWS:
3709+
pytest.skip(f"Skipping engine {dialect} as it does not support materialized views")
3710+
elif dialect in ("snowflake", "databricks"):
3711+
pytest.skip(f"Skipping {dialect} as they're not enabled on standard accounts")
3712+
3713+
model_name = ctx.table("test_tbl")
3714+
mview_name = ctx.table("test_mview")
3715+
3716+
sqlmesh = ctx.create_context()
3717+
3718+
sqlmesh.upsert_model(
3719+
load_sql_based_model(
3720+
d.parse(
3721+
f"""
3722+
MODEL (name {model_name}, kind FULL);
3723+
3724+
SELECT 1 AS col
3725+
"""
3726+
)
3727+
)
3728+
)
3729+
3730+
sqlmesh.upsert_model(
3731+
load_sql_based_model(
3732+
d.parse(
3733+
f"""
3734+
MODEL (name {mview_name}, kind VIEW (materialized true));
3735+
3736+
SELECT * FROM {model_name}
3737+
"""
3738+
)
3739+
)
3740+
)
3741+
3742+
def _assert_mview_value(value: int):
3743+
df = adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=dialect)}")
3744+
assert df["col"][0] == value
3745+
3746+
# Case 1: Ensure that plan is successful and we can query the materialized view
3747+
sqlmesh.plan(auto_apply=True, no_prompts=True)
3748+
3749+
_assert_mview_value(value=1)
3750+
3751+
# Case 2: Ensure that we can change the underlying table and the materialized view is recreated
3752+
sqlmesh.upsert_model(
3753+
load_sql_based_model(d.parse(f"""MODEL (name {model_name}, kind FULL); SELECT 2 AS col"""))
3754+
)
3755+
3756+
logger = logging.getLogger("sqlmesh.core.snapshot.evaluator")
3757+
3758+
with mock.patch.object(logger, "info") as mock_logger:
3759+
sqlmesh.plan(auto_apply=True, no_prompts=True)
3760+
3761+
assert any("Replacing view" in call[0][0] for call in mock_logger.call_args_list)
3762+
3763+
_assert_mview_value(value=2)

tests/core/engine_adapter/integration/test_integration_bigquery.py

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -441,53 +441,6 @@ def test_table_diff_table_name_matches_column_name(ctx: TestContext):
441441
assert row_diff.full_match_count == 1
442442

443443

444-
def test_materialized_view_evaluation(ctx: TestContext, engine_adapter: BigQueryEngineAdapter):
445-
model_name = ctx.table("test_tbl")
446-
mview_name = ctx.table("test_mview")
447-
448-
sqlmesh = ctx.create_context()
449-
450-
sqlmesh.upsert_model(
451-
load_sql_based_model(
452-
d.parse(
453-
f"""
454-
MODEL (name {model_name}, kind FULL);
455-
456-
SELECT 1 AS col
457-
"""
458-
)
459-
)
460-
)
461-
462-
sqlmesh.upsert_model(
463-
load_sql_based_model(
464-
d.parse(
465-
f"""
466-
MODEL (name {mview_name}, kind VIEW (materialized true));
467-
468-
SELECT * FROM {model_name}
469-
"""
470-
)
471-
)
472-
)
473-
474-
# Case 1: Ensure that plan is successful and we can query the materialized view
475-
sqlmesh.plan(auto_apply=True, no_prompts=True)
476-
477-
df = engine_adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=ctx.dialect)}")
478-
assert df["col"][0] == 1
479-
480-
# Case 2: Ensure that we can change the underlying table and the materialized view is recreated
481-
sqlmesh.upsert_model(
482-
load_sql_based_model(d.parse(f"""MODEL (name {model_name}, kind FULL); SELECT 2 AS col"""))
483-
)
484-
485-
sqlmesh.plan(auto_apply=True, no_prompts=True)
486-
487-
df = engine_adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=ctx.dialect)}")
488-
assert df["col"][0] == 2
489-
490-
491444
def test_correlation_id_in_job_labels(ctx: TestContext):
492445
model_name = ctx.table("test")
493446

0 commit comments

Comments
 (0)