Skip to content

Commit b0ea051

Browse files
committed
Don't recreate mview for RisingWave
1 parent 551b6f7 commit b0ea051

File tree

3 files changed

+24
-14
lines changed

3 files changed

+24
-14
lines changed

.circleci/continue_config.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,10 +310,10 @@ workflows:
310310
- athena
311311
- fabric
312312
- gcp-postgres
313-
filters:
314-
branches:
315-
only:
316-
- main
313+
# filters:
314+
# branches:
315+
# only:
316+
# - main
317317
- ui_style
318318
- ui_test
319319
- vscode_test

sqlmesh/core/snapshot/evaluator.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2282,13 +2282,12 @@ def insert(
22822282
render_kwargs: t.Dict[str, t.Any],
22832283
**kwargs: t.Any,
22842284
) -> None:
2285-
snapshot = kwargs["snapshot"]
2286-
2287-
if (
2288-
not snapshot.is_materialized_view
2289-
and self.adapter.HAS_VIEW_BINDING
2290-
and self.adapter.table_exists(table_name)
2291-
):
2285+
# For all engines that support materialized views (except RisingWave) we recreate it.
2286+
# This is because case if upstream tables were recreated (e.g FULL models), the mview would be invalidated as well.
2287+
# RisingWave is an exception as it doesn't support CREATE OR REPLACE, so upstream models don't recreate their physical tables.
2288+
# Standard views still follow the same logic depending on VIEW_BINDING flag.
2289+
must_recreate_view = not self.adapter.HAS_VIEW_BINDING
2290+
if self.adapter.table_exists(table_name) and not must_recreate_view:
22922291
logger.info("Skipping creation of the view '%s'", table_name)
22932292
return
22942293

@@ -2297,7 +2296,7 @@ def insert(
22972296
table_name,
22982297
query_or_df,
22992298
model.columns_to_types,
2300-
replace=snapshot.is_materialized_view or not self.adapter.HAS_VIEW_BINDING,
2299+
replace=must_recreate_view,
23012300
materialized=self._is_materialized_view(model),
23022301
view_properties=kwargs.get("physical_properties", model.physical_properties),
23032302
table_description=model.description,

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88
import typing as t
99
import shutil
1010
from datetime import datetime, timedelta
11+
import logging
1112

1213
import numpy as np # noqa: TID253
1314
import pandas as pd # noqa: TID253
1415
import pytest
1516
import pytz
17+
from unittest import mock
1618
from sqlglot import exp, parse_one
1719
from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
1820
from sqlglot.optimizer.qualify_columns import quote_identifiers
@@ -3733,7 +3735,7 @@ def _set_config(_gateway: str, config: Config) -> None:
37333735
]
37343736

37353737

3736-
def test_materialized_view_evaluation(ctx: TestContext):
3738+
def test_materialized_view_evaluation(ctx: TestContext, mocker: MockerFixture):
37373739
adapter = ctx.engine_adapter
37383740
dialect = ctx.dialect
37393741
if not adapter.SUPPORTS_MATERIALIZED_VIEWS:
@@ -3784,7 +3786,16 @@ def test_materialized_view_evaluation(ctx: TestContext):
37843786
load_sql_based_model(d.parse(f"""MODEL (name {model_name}, kind FULL); SELECT 2 AS col"""))
37853787
)
37863788

3787-
sqlmesh.plan(auto_apply=True, no_prompts=True)
3789+
logger = logging.getLogger("sqlmesh.core.snapshot.evaluator")
3790+
3791+
with mock.patch.object(logger, "info") as mock_logger:
3792+
sqlmesh.plan(auto_apply=True, no_prompts=True)
3793+
3794+
# RisingWave does not need to recreate the mview, all other engines do
3795+
recreate_view = (
3796+
"Skipping creation of the view" if dialect == "risingwave" else "Replacing view"
3797+
)
3798+
assert any(recreate_view in call[0][0] for call in mock_logger.call_args_list)
37883799

37893800
df = adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=dialect)}")
37903801
assert df["col"][0] == 2

0 commit comments

Comments
 (0)