From c42ced4cf24e4e6a00d911af2d9ab710d4fd317a Mon Sep 17 00:00:00 2001 From: vaggelisd Date: Wed, 20 Aug 2025 18:45:01 +0300 Subject: [PATCH] Fix: Recreate MVs on first insert --- sqlmesh/core/snapshot/evaluator.py | 22 ++++--- .../integration/test_integration.py | 65 +++++++++++++++++++ .../integration/test_integration_bigquery.py | 47 -------------- 3 files changed, 79 insertions(+), 55 deletions(-) diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 90186faba7..87a6d15c42 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -2300,13 +2300,19 @@ def insert( render_kwargs: t.Dict[str, t.Any], **kwargs: t.Any, ) -> None: - snapshot = kwargs["snapshot"] + # We should recreate MVs across supported engines (Snowflake, BigQuery etc) because + # if upstream tables were recreated (e.g FULL models), the MVs would be silently invalidated. + # The only exception to that rule is RisingWave which doesn't support CREATE OR REPLACE, so upstream + # models don't recreate their physical tables for the MVs to be invalidated. + # However, even for RW we still want to recreate MVs to avoid stale references, as is the case with normal views. + # The flag is_first_insert is used for that matter as a signal to recreate the MV if the snapshot's intervals + # have been cleared by `should_force_rebuild` + is_materialized_view = self._is_materialized_view(model) + must_recreate_view = not self.adapter.HAS_VIEW_BINDING or ( + is_materialized_view and is_first_insert + ) - if ( - not snapshot.is_materialized_view - and self.adapter.HAS_VIEW_BINDING - and self.adapter.table_exists(table_name) - ): + if self.adapter.table_exists(table_name) and not must_recreate_view: logger.info("Skipping creation of the view '%s'", table_name) return @@ -2315,8 +2321,8 @@ def insert( table_name, query_or_df, model.columns_to_types, - replace=not self.adapter.HAS_VIEW_BINDING, - materialized=self._is_materialized_view(model), + replace=must_recreate_view, + materialized=is_materialized_view, view_properties=kwargs.get("physical_properties", model.physical_properties), table_description=model.description, column_descriptions=model.column_descriptions, diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index 19a45329d5..fcbc711f49 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -7,7 +7,10 @@ import typing as t import shutil from datetime import datetime, timedelta, date +from unittest import mock from unittest.mock import patch +import logging + import numpy as np # noqa: TID253 import pandas as pd # noqa: TID253 import pytest @@ -3748,3 +3751,65 @@ def _set_config(gateway: str, config: Config) -> None: "incremental_model", "seed_model", ] + + +def test_materialized_view_evaluation(ctx: TestContext, mocker: MockerFixture): + adapter = ctx.engine_adapter + dialect = ctx.dialect + + if not adapter.SUPPORTS_MATERIALIZED_VIEWS: + pytest.skip(f"Skipping engine {dialect} as it does not support materialized views") + elif dialect in ("snowflake", "databricks"): + pytest.skip(f"Skipping {dialect} as they're not enabled on standard accounts") + + model_name = ctx.table("test_tbl") + mview_name = ctx.table("test_mview") + + sqlmesh = ctx.create_context() + + sqlmesh.upsert_model( + load_sql_based_model( + d.parse( + f""" + MODEL (name {model_name}, kind FULL); + + SELECT 1 AS col + """ + ) + ) + ) + + sqlmesh.upsert_model( + load_sql_based_model( + d.parse( + f""" + MODEL (name {mview_name}, kind VIEW (materialized true)); + + SELECT * FROM {model_name} + """ + ) + ) + ) + + def _assert_mview_value(value: int): + df = adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=dialect)}") + assert df["col"][0] == value + + # Case 1: Ensure that plan is successful and we can query the materialized view + sqlmesh.plan(auto_apply=True, no_prompts=True) + + _assert_mview_value(value=1) + + # Case 2: Ensure that we can change the underlying table and the materialized view is recreated + sqlmesh.upsert_model( + load_sql_based_model(d.parse(f"""MODEL (name {model_name}, kind FULL); SELECT 2 AS col""")) + ) + + logger = logging.getLogger("sqlmesh.core.snapshot.evaluator") + + with mock.patch.object(logger, "info") as mock_logger: + sqlmesh.plan(auto_apply=True, no_prompts=True) + + assert any("Replacing view" in call[0][0] for call in mock_logger.call_args_list) + + _assert_mview_value(value=2) diff --git a/tests/core/engine_adapter/integration/test_integration_bigquery.py b/tests/core/engine_adapter/integration/test_integration_bigquery.py index 66a647dc80..0a6dd6b2a4 100644 --- a/tests/core/engine_adapter/integration/test_integration_bigquery.py +++ b/tests/core/engine_adapter/integration/test_integration_bigquery.py @@ -441,53 +441,6 @@ def test_table_diff_table_name_matches_column_name(ctx: TestContext): assert row_diff.full_match_count == 1 -def test_materialized_view_evaluation(ctx: TestContext, engine_adapter: BigQueryEngineAdapter): - model_name = ctx.table("test_tbl") - mview_name = ctx.table("test_mview") - - sqlmesh = ctx.create_context() - - sqlmesh.upsert_model( - load_sql_based_model( - d.parse( - f""" - MODEL (name {model_name}, kind FULL); - - SELECT 1 AS col - """ - ) - ) - ) - - sqlmesh.upsert_model( - load_sql_based_model( - d.parse( - f""" - MODEL (name {mview_name}, kind VIEW (materialized true)); - - SELECT * FROM {model_name} - """ - ) - ) - ) - - # Case 1: Ensure that plan is successful and we can query the materialized view - sqlmesh.plan(auto_apply=True, no_prompts=True) - - df = engine_adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=ctx.dialect)}") - assert df["col"][0] == 1 - - # Case 2: Ensure that we can change the underlying table and the materialized view is recreated - sqlmesh.upsert_model( - load_sql_based_model(d.parse(f"""MODEL (name {model_name}, kind FULL); SELECT 2 AS col""")) - ) - - sqlmesh.plan(auto_apply=True, no_prompts=True) - - df = engine_adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=ctx.dialect)}") - assert df["col"][0] == 2 - - def test_correlation_id_in_job_labels(ctx: TestContext): model_name = ctx.table("test")