From 6200690338eaa3cdd282c1d7949d52c5d7cd553a Mon Sep 17 00:00:00 2001 From: vaggelisd Date: Tue, 26 Aug 2025 18:00:46 +0300 Subject: [PATCH 01/11] Feat: Skip evaluation if upstream external model has not changed --- sqlmesh/core/engine_adapter/base.py | 4 + sqlmesh/core/engine_adapter/bigquery.py | 23 ++++ sqlmesh/core/engine_adapter/snowflake.py | 16 +++ sqlmesh/core/plan/evaluator.py | 1 + sqlmesh/core/scheduler.py | 57 ++++++++- sqlmesh/core/snapshot/definition.py | 10 ++ sqlmesh/core/state_sync/base.py | 3 + sqlmesh/core/state_sync/db/interval.py | 26 +++- .../v0093_add_last_altered_to_intervals.py | 23 ++++ .../integration/test_integration.py | 112 +++++++++++++++++- 10 files changed, 271 insertions(+), 4 deletions(-) create mode 100644 sqlmesh/migrations/v0093_add_last_altered_to_intervals.py diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 47e6a4260c..135e7e49cc 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -119,6 +119,7 @@ class EngineAdapter: MAX_IDENTIFIER_LENGTH: t.Optional[int] = None ATTACH_CORRELATION_ID = True SUPPORTS_QUERY_EXECUTION_TRACKING = False + SUPPORTS_EXTERNAL_MODEL_FRESHNESS = False def __init__( self, @@ -2927,6 +2928,9 @@ def _check_identifier_length(self, expression: exp.Expression) -> None: f"Identifier name '{name}' (length {name_length}) exceeds {self.dialect.capitalize()}'s max identifier limit of {self.MAX_IDENTIFIER_LENGTH} characters" ) + def get_external_model_freshness(self, table_names: t.List[TableName]) -> t.List[int]: + raise NotImplementedError() + class EngineAdapterWithIndexSupport(EngineAdapter): SUPPORTS_INDEXES = True diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index 0dfa2325e8..eb31f7bea4 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -67,6 +67,7 @@ class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin): MAX_TABLE_COMMENT_LENGTH = 1024 MAX_COLUMN_COMMENT_LENGTH = 1024 SUPPORTS_QUERY_EXECUTION_TRACKING = True + SUPPORTS_EXTERNAL_MODEL_FRESHNESS = True SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA"] INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.MERGE @@ -755,6 +756,28 @@ def table_exists(self, table_name: TableName) -> bool: except NotFound: return False + def get_external_model_freshness(self, table_names: t.List[TableName]) -> t.List[int]: + from sqlmesh.utils.date import to_timestamp + + datasets_to_tables: t.DefaultDict[str, t.List[str]] = defaultdict(list) + for table_name in table_names: + table = exp.to_table(table_name) + datasets_to_tables[table.db].append(table.name) + + results = [] + + for dataset, tables in datasets_to_tables.items(): + query = ( + f"SELECT TIMESTAMP_MILLIS(last_modified_time) FROM `{dataset}.__TABLES__` WHERE " + ) + for i, table_name in enumerate(tables): + query += f"TABLE_ID = '{table_name}'" + if i < len(tables) - 1: + query += " OR " + results.extend(self.fetchall(query)) + + return [to_timestamp(row[0]) for row in results] + def _get_table(self, table_name: TableName) -> BigQueryTable: """ Returns a BigQueryTable object for the given table name. diff --git a/sqlmesh/core/engine_adapter/snowflake.py b/sqlmesh/core/engine_adapter/snowflake.py index 9c27b45115..19a5a28e82 100644 --- a/sqlmesh/core/engine_adapter/snowflake.py +++ b/sqlmesh/core/engine_adapter/snowflake.py @@ -54,6 +54,7 @@ class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixi SUPPORTS_MANAGED_MODELS = True CURRENT_CATALOG_EXPRESSION = exp.func("current_database") SUPPORTS_CREATE_DROP_CATALOG = True + SUPPORTS_EXTERNAL_MODEL_FRESHNESS = True SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["DATABASE", "SCHEMA", "TABLE"] SCHEMA_DIFFER_KWARGS = { "parameterized_type_defaults": { @@ -669,3 +670,18 @@ def close(self) -> t.Any: self._connection_pool.set_attribute(self.SNOWPARK, None) return super().close() + + def get_external_model_freshness(self, table_names: t.List[TableName]) -> t.List[int]: + from sqlmesh.utils.date import to_timestamp + + num_tables = len(table_names) + + query = "SELECT LAST_ALTERED FROM INFORMATION_SCHEMA.TABLES WHERE" + for i, table_name in enumerate(table_names): + table = exp.to_table(table_name) + query += f"""(TABLE_NAME = '{table.name}' AND TABLE_SCHEMA = '{table.db}' AND TABLE_CATALOG = '{table.catalog}')""" + if i < num_tables - 1: + query += " OR " + + result = self.fetchall(query) + return [to_timestamp(row[0]) for row in result] diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index 03ecb770bf..d5c766c4d8 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -258,6 +258,7 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla allow_additive_snapshots=plan.allow_additive_models, selected_snapshot_ids=stage.selected_snapshot_ids, selected_models=plan.selected_models, + is_restatement_plan=bool(plan.restatements), ) if errors: raise PlanError("Plan application failed.") diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index af4d72b165..e90693d989 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -54,6 +54,8 @@ if t.TYPE_CHECKING: from sqlmesh.core.context import ExecutionContext + from sqlmesh.core._typing import TableName + from sqlmesh.core.engine_adapter import EngineAdapter logger = logging.getLogger(__name__) SnapshotToIntervals = t.Dict[Snapshot, Intervals] @@ -188,6 +190,46 @@ def merged_missing_intervals( } return snapshots_to_intervals + def can_skip_evaluation(self, snapshot: Snapshot, snapshots: t.Dict[str, Snapshot]) -> bool: + if not snapshot.last_altered_ts: + return False + + from collections import defaultdict + + parent_snapshots = {p for p in snapshots.values() if p.name != snapshot.name} + if len(parent_snapshots) != len(snapshot.node.depends_on): + # The mismatch can happen if e.g an external model is not registered in the project + return False + + adapter_to_parent_snapshots: t.Dict[EngineAdapter, t.List[Snapshot]] = defaultdict(list) + + for parent_snapshot in parent_snapshots: + if not parent_snapshot.is_external: + return False + + adapter = self.snapshot_evaluator.get_adapter(parent_snapshot.model_gateway) + if not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS: + return False + + adapter_to_parent_snapshots[adapter].append(parent_snapshot) + + if not adapter_to_parent_snapshots: + return False + + external_models_freshness: t.List[int] = [] + + for adapter, adapter_snapshots in adapter_to_parent_snapshots.items(): + table_names: t.List[TableName] = [ + exp.to_table(parent_snapshot.name, parent_snapshot.node.dialect) + for parent_snapshot in adapter_snapshots + ] + external_models_freshness.extend(adapter.get_external_model_freshness(table_names)) + + return all( + snapshot.last_altered_ts > external_model_freshness + for external_model_freshness in external_models_freshness + ) + def evaluate( self, snapshot: Snapshot, @@ -200,6 +242,7 @@ def evaluate( allow_destructive_snapshots: t.Optional[t.Set[str]] = None, allow_additive_snapshots: t.Optional[t.Set[str]] = None, target_table_exists: t.Optional[bool] = None, + is_restatement_plan: bool = False, **kwargs: t.Any, ) -> t.List[AuditResult]: """Evaluate a snapshot and add the processed interval to the state sync. @@ -224,6 +267,14 @@ def evaluate( snapshots = parent_snapshots_by_name(snapshot, self.snapshots) + if not is_restatement_plan and self.can_skip_evaluation(snapshot, snapshots): + logger.info(f""" + Skipping evaluation for snapshot {snapshot.name} as it depends on external models + that have not been updated since the last run. + """) + + return [] + is_deployable = deployability_index.is_deployable(snapshot) wap_id = self.snapshot_evaluator.evaluate( @@ -251,7 +302,9 @@ def evaluate( **kwargs, ) - self.state_sync.add_interval(snapshot, start, end, is_dev=not is_deployable) + self.state_sync.add_interval( + snapshot, start, end, is_dev=not is_deployable, last_altered_ts=now_timestamp() + ) return audit_results def run( @@ -422,6 +475,7 @@ def run_merged_intervals( run_environment_statements: bool = False, audit_only: bool = False, auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}, + is_restatement_plan: bool = False, ) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]: """Runs precomputed batches of missing intervals. @@ -542,6 +596,7 @@ def run_node(node: SchedulingUnit) -> None: allow_additive_snapshots=allow_additive_snapshots, target_table_exists=snapshot.snapshot_id not in snapshots_to_create, selected_models=selected_models, + is_restatement_plan=is_restatement_plan, ) evaluation_duration_ms = now_timestamp() - execution_start_ts diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 600d84fe83..db6f5e6257 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -185,6 +185,7 @@ class SnapshotIntervals(PydanticModel): intervals: Intervals = [] dev_intervals: Intervals = [] pending_restatement_intervals: Intervals = [] + last_altered_ts: t.Optional[int] = None @property def snapshot_id(self) -> t.Optional[SnapshotId]: @@ -713,6 +714,9 @@ class Snapshot(PydanticModel, SnapshotInfoMixin): dev_table_suffix: str = "dev" table_naming_convention: TableNamingConvention = TableNamingConvention.default forward_only: bool = False + # Physical table last modified timestamp, not to be confused with the "updated_ts" field + # which is for the snapshot record itself + last_altered_ts: t.Optional[int] = None @field_validator("ttl") @classmethod @@ -751,6 +755,12 @@ def hydrate_with_intervals_by_version( ) for interval in snapshot_intervals: snapshot.merge_intervals(interval) + + if interval.last_altered_ts: + snapshot.last_altered_ts = max( + snapshot.last_altered_ts or -1, interval.last_altered_ts + ) + result.append(snapshot) return result diff --git a/sqlmesh/core/state_sync/base.py b/sqlmesh/core/state_sync/base.py index 450d6f7408..9dd5f5c8ab 100644 --- a/sqlmesh/core/state_sync/base.py +++ b/sqlmesh/core/state_sync/base.py @@ -496,6 +496,7 @@ def add_interval( start: TimeLike, end: TimeLike, is_dev: bool = False, + last_altered_ts: t.Optional[int] = None, ) -> None: """Add an interval to a snapshot and sync it to the store. @@ -504,6 +505,7 @@ def add_interval( start: The start of the interval to add. end: The end of the interval to add. is_dev: Indicates whether the given interval is being added while in development mode + last_altered_ts: The timestamp of the last modification of the physical table """ start_ts, end_ts = snapshot.inclusive_exclusive(start, end, strict=False, expand=False) if not snapshot.version: @@ -516,6 +518,7 @@ def add_interval( dev_version=snapshot.dev_version, intervals=intervals if not is_dev else [], dev_intervals=intervals if is_dev else [], + last_altered_ts=last_altered_ts, ) self.add_snapshots_intervals([snapshot_intervals]) diff --git a/sqlmesh/core/state_sync/db/interval.py b/sqlmesh/core/state_sync/db/interval.py index b15ad2d57b..d12e2ad940 100644 --- a/sqlmesh/core/state_sync/db/interval.py +++ b/sqlmesh/core/state_sync/db/interval.py @@ -60,6 +60,7 @@ def __init__( "is_removed": exp.DataType.build("boolean"), "is_compacted": exp.DataType.build("boolean"), "is_pending_restatement": exp.DataType.build("boolean"), + "last_altered_ts": exp.DataType.build("bigint"), } def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None: @@ -215,13 +216,23 @@ def _push_snapshot_intervals( for start_ts, end_ts in snapshot.intervals: new_intervals.append( _interval_to_df( - snapshot, start_ts, end_ts, is_dev=False, is_compacted=is_compacted + snapshot, + start_ts, + end_ts, + is_dev=False, + is_compacted=is_compacted, + last_altered_ts=snapshot.last_altered_ts, ) ) for start_ts, end_ts in snapshot.dev_intervals: new_intervals.append( _interval_to_df( - snapshot, start_ts, end_ts, is_dev=True, is_compacted=is_compacted + snapshot, + start_ts, + end_ts, + is_dev=True, + is_compacted=is_compacted, + last_altered_ts=snapshot.last_altered_ts, ) ) @@ -236,6 +247,7 @@ def _push_snapshot_intervals( is_dev=False, is_compacted=is_compacted, is_pending_restatement=True, + last_altered_ts=snapshot.last_altered_ts, ) ) @@ -284,6 +296,7 @@ def _get_snapshot_intervals( is_dev, is_removed, is_pending_restatement, + last_altered_ts, ) in rows: interval_ids.add(interval_id) merge_key = (name, version, dev_version, identifier) @@ -296,6 +309,12 @@ def _get_snapshot_intervals( identifier=identifier, version=version, dev_version=dev_version, + last_altered_ts=last_altered_ts, + ) + + if last_altered_ts: + intervals[merge_key].last_altered_ts = max( + intervals[merge_key].last_altered_ts or 0, last_altered_ts ) if pending_restatement_interval_merge_key not in intervals: @@ -340,6 +359,7 @@ def _get_snapshot_intervals_query(self, uncompacted_only: bool) -> exp.Select: "is_dev", "is_removed", "is_pending_restatement", + "last_altered_ts", ) .from_(exp.to_table(self.intervals_table).as_("intervals")) .order_by( @@ -460,6 +480,7 @@ def _interval_to_df( is_removed: bool = False, is_compacted: bool = False, is_pending_restatement: bool = False, + last_altered_ts: t.Optional[int] = None, ) -> t.Dict[str, t.Any]: return { "id": random_id(), @@ -474,4 +495,5 @@ def _interval_to_df( "is_removed": is_removed, "is_compacted": is_compacted, "is_pending_restatement": is_pending_restatement, + "last_altered_ts": last_altered_ts, } diff --git a/sqlmesh/migrations/v0093_add_last_altered_to_intervals.py b/sqlmesh/migrations/v0093_add_last_altered_to_intervals.py new file mode 100644 index 0000000000..2a1e085b82 --- /dev/null +++ b/sqlmesh/migrations/v0093_add_last_altered_to_intervals.py @@ -0,0 +1,23 @@ +"""Add dev version to the intervals table.""" + +from sqlglot import exp + + +def migrate(state_sync, **kwargs): # type: ignore + engine_adapter = state_sync.engine_adapter + schema = state_sync.schema + intervals_table = "_intervals" + if schema: + intervals_table = f"{schema}.{intervals_table}" + + alter_table_exp = exp.Alter( + this=exp.to_table(intervals_table), + kind="TABLE", + actions=[ + exp.ColumnDef( + this=exp.to_column("last_altered_ts"), + kind=exp.DataType.build("BIGINT", dialect=engine_adapter.dialect), + ) + ], + ) + engine_adapter.execute(alter_table_exp) diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index 5a708e1e4c..26bd33eb65 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -11,6 +11,9 @@ from unittest.mock import patch import logging +import time_machine +from pytest_mock.plugin import MockerFixture + import numpy as np # noqa: TID253 import pandas as pd # noqa: TID253 import pytest @@ -45,6 +48,7 @@ TEST_SCHEMA, wait_until, ) +from tests.utils.test_helpers import use_terminal_console DATA_TYPE = exp.DataType.Type VARCHAR_100 = exp.DataType.build("varchar(100)") @@ -3774,7 +3778,7 @@ def _set_config(gateway: str, config: Config) -> None: ] -def test_materialized_view_evaluation(ctx: TestContext, mocker: MockerFixture): +def test_materialized_view_evaluation(ctx: TestContext): adapter = ctx.engine_adapter dialect = ctx.dialect @@ -3834,3 +3838,109 @@ def _assert_mview_value(value: int): assert any("Replacing view" in call[0][0] for call in mock_logger.call_args_list) _assert_mview_value(value=2) + + +@use_terminal_console +def test_external_model_freshness(ctx: TestContext, mocker: MockerFixture, tmp_path: pathlib.Path): + adapter = ctx.engine_adapter + if not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS: + pytest.skip("This test only runs for engines that support external model freshness") + + def _run_plan( + sqlmesh_context: Context, restate_models: t.Optional[t.List[str]] = None + ) -> PlanResults: + plan: Plan = sqlmesh_context.plan( + auto_apply=True, no_prompts=True, restate_models=restate_models + ) + return PlanResults.create(plan, ctx, schema) + + import sqlmesh + + spy = mocker.spy(sqlmesh.core.snapshot.evaluator.SnapshotEvaluator, "evaluate") + + def _assert_model_evaluation(lambda_func, was_evaluated, day_delta=0): + call_count_before = spy.call_count + logger = logging.getLogger("sqlmesh.core.scheduler") + + with time_machine.travel(now(minute_floor=False) + timedelta(days=day_delta)): + with mock.patch.object(logger, "info") as mock_logger: + lambda_func() + + evaluation_skipped_log = any( + "Skipping evaluation for snapshot" in call[0][0] for call in mock_logger.call_args_list + ) + + if was_evaluated: + assert not evaluation_skipped_log + assert spy.call_count == call_count_before + 1 + else: + assert evaluation_skipped_log + assert spy.call_count == call_count_before + + # Create & initialize schema + schema = ctx.add_test_suffix(TEST_SCHEMA) + ctx._schemas.append(schema) + adapter.create_schema(schema) + + # Create & initialize external models + external_table1 = f"{schema}.external_table1" + external_table2 = f"{schema}.external_table2" + + external_models_yaml = tmp_path / "external_models.yaml" + external_models_yaml.write_text(f""" +- name: {external_table1} + columns: + col1: int + +- name: {external_table2} + columns: + col2: int +""") + + adapter.execute( + f"CREATE TABLE {external_table1} AS (SELECT 1 AS col1)", quote_identifiers=False + ) + adapter.execute( + f"CREATE TABLE {external_table2} AS (SELECT 2 AS col2)", quote_identifiers=False + ) + + # Create model that depends on external models + model_name = f"{schema}.new_model" + model_path = tmp_path / "models" / "new_model.sql" + (tmp_path / "models").mkdir(parents=True, exist_ok=True) + model_path.write_text(f""" + MODEL ( + name {model_name}, + start '2024-01-01', + kind FULL + ); + + SELECT col1 * col2 AS col FROM {external_table1}, {external_table2}; + """) + + # Initialize context + def _set_config(gateway: str, config: Config) -> None: + config.model_defaults.dialect = ctx.dialect + + context = ctx.create_context(path=tmp_path, config_mutator=_set_config) + + # Case 1: Model is evaluated on first insertion + _assert_model_evaluation(lambda: _run_plan(context), was_evaluated=True) + + # Case 2: Model is NOT evaluated on run if external models are not fresh + _assert_model_evaluation(lambda: context.run(), was_evaluated=False, day_delta=2) + + # Case 3: Model is evaluated on run if any external model is fresh + adapter.execute(f"INSERT INTO {external_table2} (col2) VALUES (3)", quote_identifiers=False) + + _assert_model_evaluation(lambda: context.run(), was_evaluated=True, day_delta=2) + + # Case 4: Model is evaluated on a restatement plan even if the external model is not fresh + _assert_model_evaluation( + lambda: _run_plan(context, restate_models=[model_name]), was_evaluated=True, day_delta=3 + ) + + # Case 5: Model is evaluated if changed even if the external model is not fresh + model_path.write_text(model_path.read_text().replace("col1 * col2", "col1 + col2")) + context.load() + _assert_model_evaluation(lambda: _run_plan(context), was_evaluated=True, day_delta=2) From 291824eca8dac6766a59b66065494d4a25c2ef59 Mon Sep 17 00:00:00 2001 From: vaggelisd Date: Thu, 4 Sep 2025 18:41:29 +0300 Subject: [PATCH 02/11] Refactor logic into a built-in signal --- .circleci/continue_config.yml | 8 +- sqlmesh/core/scheduler.py | 33 ++++-- sqlmesh/core/signal.py | 34 +++++- sqlmesh/core/snapshot/definition.py | 7 +- ...=> v0098_add_last_altered_to_intervals.py} | 2 +- .../integration/test_integration.py | 104 ++++++++++++------ 6 files changed, 138 insertions(+), 50 deletions(-) rename sqlmesh/migrations/{v0093_add_last_altered_to_intervals.py => v0098_add_last_altered_to_intervals.py} (91%) diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index c549c0ae78..27bc5cf6b0 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -313,10 +313,10 @@ workflows: - athena - fabric - gcp-postgres - filters: - branches: - only: - - main + # filters: + # branches: + # only: + # - main - ui_style - ui_test - vscode_test diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index e90693d989..5f0b111a12 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -267,14 +267,6 @@ def evaluate( snapshots = parent_snapshots_by_name(snapshot, self.snapshots) - if not is_restatement_plan and self.can_skip_evaluation(snapshot, snapshots): - logger.info(f""" - Skipping evaluation for snapshot {snapshot.name} as it depends on external models - that have not been updated since the last run. - """) - - return [] - is_deployable = deployability_index.is_deployable(snapshot) wap_id = self.snapshot_evaluator.evaluate( @@ -388,6 +380,7 @@ def batch_intervals( deployability_index: t.Optional[DeployabilityIndex], environment_naming_info: EnvironmentNamingInfo, dag: t.Optional[DAG[SnapshotId]] = None, + is_restatement_plan: bool = False, ) -> t.Dict[Snapshot, Intervals]: dag = dag or snapshots_to_dag(merged_intervals) @@ -427,6 +420,7 @@ def batch_intervals( intervals, context, environment_naming_info, + is_restatement_plan=is_restatement_plan, ) unready -= set(intervals) @@ -509,9 +503,12 @@ def run_merged_intervals( snapshot_dag = full_dag.subdag(*selected_snapshot_ids_set) batched_intervals = self.batch_intervals( - merged_intervals, deployability_index, environment_naming_info, dag=snapshot_dag + merged_intervals, + deployability_index, + environment_naming_info, + dag=snapshot_dag, + is_restatement_plan=is_restatement_plan, ) - self.console.start_evaluation_progress( batched_intervals, environment_naming_info, @@ -970,6 +967,7 @@ def _check_ready_intervals( intervals: Intervals, context: ExecutionContext, environment_naming_info: EnvironmentNamingInfo, + is_restatement_plan: bool = False, ) -> Intervals: """Checks if the intervals are ready for evaluation for the given snapshot. @@ -991,6 +989,17 @@ def _check_ready_intervals( if not (signals and signals.signals_to_kwargs): return intervals + signal_names = signals.signals_to_kwargs.keys() + + if ( + is_restatement_plan + and len(signal_names) == 1 + and next(iter(signal_names)) == "freshness" + ): + # Freshness signal is not checked for restatement plans to allow users + # for an escape hatch in reevaluating models + return intervals + self.console.start_signal_progress( snapshot, self.default_catalog, @@ -998,6 +1007,9 @@ def _check_ready_intervals( ) for signal_idx, (signal_name, kwargs) in enumerate(signals.signals_to_kwargs.items()): + if is_restatement_plan and signal_name == "freshness": + continue + # Capture intervals before signal check for display intervals_to_check = merge_intervals(intervals) @@ -1011,6 +1023,7 @@ def _check_ready_intervals( python_env=signals.python_env, dialect=snapshot.model.dialect, path=snapshot.model._path, + snapshot=snapshot, kwargs=kwargs, ) except SQLMeshError as e: diff --git a/sqlmesh/core/signal.py b/sqlmesh/core/signal.py index d9ee670922..b99b79aa12 100644 --- a/sqlmesh/core/signal.py +++ b/sqlmesh/core/signal.py @@ -1,8 +1,13 @@ from __future__ import annotations - +import typing as t from sqlmesh.utils import UniqueKeyDict, registry_decorator +if t.TYPE_CHECKING: + from sqlmesh.core.context import ExecutionContext + from sqlmesh.core.snapshot.definition import Snapshot + from sqlmesh.utils.date import DatetimeRanges + class signal(registry_decorator): """Specifies a function which intervals are ready from a list of scheduled intervals. @@ -33,3 +38,30 @@ class signal(registry_decorator): SignalRegistry = UniqueKeyDict[str, signal] + + +@signal() +def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionContext) -> bool: + adapter = context.engine_adapter + if not snapshot.last_altered_ts or not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS: + return True + + adapter = context.engine_adapter + parent_snapshots = {context.snapshots[p.name] for p in snapshot.parents} + if len(parent_snapshots) != len(snapshot.node.depends_on) or not all( + p.is_external for p in parent_snapshots + ): + # The mismatch can happen if e.g an external model is not registered in the project + return True + + # Finding new data means that the upstream depedencies have been altered + # since the last time the model was evaluated + upstream_dep_has_new_data = any( + upstream_last_altered_ts > snapshot.last_altered_ts + for upstream_last_altered_ts in adapter.get_external_model_freshness( + [p.name for p in parent_snapshots] + ) + ) + + # Returning true is a no-op, returning False nullifies the batch so the model will not be evaluated. + return upstream_dep_has_new_data diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index db6f5e6257..a8a68f6d5c 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -756,7 +756,9 @@ def hydrate_with_intervals_by_version( for interval in snapshot_intervals: snapshot.merge_intervals(interval) - if interval.last_altered_ts: + # Differentiate last_altered_ts between snapshots with shared version but + # different dev versions e.g prod vs FORWARD_ONLY dev + if snapshot.dev_version == interval.dev_version and interval.last_altered_ts: snapshot.last_altered_ts = max( snapshot.last_altered_ts or -1, interval.last_altered_ts ) @@ -1091,6 +1093,7 @@ def check_ready_intervals( python_env=signals.python_env, dialect=self.model.dialect, path=self.model._path, + snapshot=self, kwargs=kwargs, ) except SQLMeshError as e: @@ -2431,6 +2434,7 @@ def check_ready_intervals( python_env: t.Dict[str, Executable], dialect: DialectType = None, path: t.Optional[Path] = None, + snapshot: t.Optional[Snapshot] = None, kwargs: t.Optional[t.Dict] = None, ) -> Intervals: checked_intervals: Intervals = [] @@ -2446,6 +2450,7 @@ def check_ready_intervals( provided_args=(batch,), provided_kwargs=(kwargs or {}), context=context, + snapshot=snapshot, ) except Exception as ex: raise SignalEvalError(format_evaluated_code_exception(ex, python_env)) diff --git a/sqlmesh/migrations/v0093_add_last_altered_to_intervals.py b/sqlmesh/migrations/v0098_add_last_altered_to_intervals.py similarity index 91% rename from sqlmesh/migrations/v0093_add_last_altered_to_intervals.py rename to sqlmesh/migrations/v0098_add_last_altered_to_intervals.py index 2a1e085b82..dd024d1e6d 100644 --- a/sqlmesh/migrations/v0093_add_last_altered_to_intervals.py +++ b/sqlmesh/migrations/v0098_add_last_altered_to_intervals.py @@ -3,7 +3,7 @@ from sqlglot import exp -def migrate(state_sync, **kwargs): # type: ignore +def migrate_schemas(state_sync, **kwargs): # type: ignore engine_adapter = state_sync.engine_adapter schema = state_sync.schema intervals_table = "_intervals" diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index 26bd33eb65..7e70ef0323 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -10,6 +10,8 @@ from unittest import mock from unittest.mock import patch import logging +from IPython.utils.capture import capture_output + import time_machine from pytest_mock.plugin import MockerFixture @@ -3846,36 +3848,45 @@ def test_external_model_freshness(ctx: TestContext, mocker: MockerFixture, tmp_p if not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS: pytest.skip("This test only runs for engines that support external model freshness") - def _run_plan( - sqlmesh_context: Context, restate_models: t.Optional[t.List[str]] = None - ) -> PlanResults: - plan: Plan = sqlmesh_context.plan( - auto_apply=True, no_prompts=True, restate_models=restate_models + def _assert_snapshot_last_altered_ts(context: Context, snapshot_id: str, timestamp: datetime): + from sqlmesh.utils.date import to_datetime + + snapshot = context.state_sync.get_snapshots([snapshot_id])[snapshot_id] + assert to_datetime(snapshot.last_altered_ts).replace(microsecond=0) == timestamp.replace( + microsecond=0 ) - return PlanResults.create(plan, ctx, schema) import sqlmesh spy = mocker.spy(sqlmesh.core.snapshot.evaluator.SnapshotEvaluator, "evaluate") def _assert_model_evaluation(lambda_func, was_evaluated, day_delta=0): - call_count_before = spy.call_count - logger = logging.getLogger("sqlmesh.core.scheduler") - - with time_machine.travel(now(minute_floor=False) + timedelta(days=day_delta)): - with mock.patch.object(logger, "info") as mock_logger: - lambda_func() - - evaluation_skipped_log = any( - "Skipping evaluation for snapshot" in call[0][0] for call in mock_logger.call_args_list - ) - + spy.reset_mock() + timestamp = now(minute_floor=False) + timedelta(days=day_delta) + with time_machine.travel(timestamp, tick=False): + with capture_output() as output: + plan_or_run_result = lambda_func() + + evaluate_function_called = spy.call_count == 1 + signal_was_checked = "Checking signals for" in output.stdout + restatement_plan = isinstance(plan_or_run_result, Plan) and plan_or_run_result.restatements + if restatement_plan: + # Restatement plans exclude this signal so we expect the actual evaluation + # to happen but not through the signal + assert evaluate_function_called + assert not signal_was_checked + return + + # All other cases (e.g normal plans or runs) will check the freshness signal + assert signal_was_checked if was_evaluated: - assert not evaluation_skipped_log - assert spy.call_count == call_count_before + 1 + assert "All ready" in output.stdout + assert evaluate_function_called else: - assert evaluation_skipped_log - assert spy.call_count == call_count_before + assert "None ready" in output.stdout + assert not evaluate_function_called + + return timestamp, plan_or_run_result # Create & initialize schema schema = ctx.add_test_suffix(TEST_SCHEMA) @@ -3912,7 +3923,10 @@ def _assert_model_evaluation(lambda_func, was_evaluated, day_delta=0): MODEL ( name {model_name}, start '2024-01-01', - kind FULL + kind FULL, + signals ( + freshness(), + ) ); SELECT col1 * col2 AS col FROM {external_table1}, {external_table2}; @@ -3924,23 +3938,47 @@ def _set_config(gateway: str, config: Config) -> None: context = ctx.create_context(path=tmp_path, config_mutator=_set_config) - # Case 1: Model is evaluated on first insertion - _assert_model_evaluation(lambda: _run_plan(context), was_evaluated=True) + # Case 1: Model is evaluated for the first plan + prod_plan_ts, prod_plan = _assert_model_evaluation( + lambda: context.plan(auto_apply=True, no_prompts=True), was_evaluated=True + ) + + prod_snapshot_id = next(iter(prod_plan.context_diff.new_snapshots)) + _assert_snapshot_last_altered_ts(context, prod_snapshot_id, prod_plan_ts) # Case 2: Model is NOT evaluated on run if external models are not fresh - _assert_model_evaluation(lambda: context.run(), was_evaluated=False, day_delta=2) + _assert_model_evaluation(lambda: context.run(), was_evaluated=False, day_delta=1) - # Case 3: Model is evaluated on run if any external model is fresh - adapter.execute(f"INSERT INTO {external_table2} (col2) VALUES (3)", quote_identifiers=False) + # Case 3: Differentiate last_altered_ts between snapshots with shared version + # For instance, creating a FORWARD_ONLY change in dev (reusing the version but creating a dev preview) should not cause + # the prod snapshot's last_altered_ts to be updated when fetched from the state sync + model_path.write_text(model_path.read_text().replace("col1 * col2", "col1 + col2")) + context.load() + dev_plan_ts = now(minute_floor=False) + timedelta(days=2) + with time_machine.travel(dev_plan_ts, tick=False): + dev_plan = context.plan( + environment="dev", forward_only=True, auto_apply=True, no_prompts=True + ) + + context.state_sync.clear_cache() + dev_snapshot_id = next(iter(dev_plan.context_diff.new_snapshots)) + _assert_snapshot_last_altered_ts(context, dev_snapshot_id, dev_plan_ts) + _assert_snapshot_last_altered_ts(context, prod_snapshot_id, prod_plan_ts) + # Case 4: Model is evaluated on run if any external model is fresh + adapter.execute(f"INSERT INTO {external_table2} (col2) VALUES (3)", quote_identifiers=False) _assert_model_evaluation(lambda: context.run(), was_evaluated=True, day_delta=2) - # Case 4: Model is evaluated on a restatement plan even if the external model is not fresh + # Case 5: Model is evaluated if changed (case 3) even if the external model is not fresh + model_path.write_text(model_path.read_text().replace("col1 + col2", "col1 * col2 * 5")) + context.load() _assert_model_evaluation( - lambda: _run_plan(context, restate_models=[model_name]), was_evaluated=True, day_delta=3 + lambda: context.plan(auto_apply=True, no_prompts=True), was_evaluated=True, day_delta=3 ) - # Case 5: Model is evaluated if changed even if the external model is not fresh - model_path.write_text(model_path.read_text().replace("col1 * col2", "col1 + col2")) - context.load() - _assert_model_evaluation(lambda: _run_plan(context), was_evaluated=True, day_delta=2) + # Case 6: Model is evaluated on a restatement plan even if the external model is not fresh + _assert_model_evaluation( + lambda: context.plan(restate_models=[model_name], auto_apply=True, no_prompts=True), + was_evaluated=True, + day_delta=4, + ) From 36fdc8503f47218aa0bd85f1e1d34f74593b927c Mon Sep 17 00:00:00 2001 From: vaggelisd Date: Mon, 15 Sep 2025 14:24:57 +0300 Subject: [PATCH 03/11] Differentiate last_altered_ts from dev_last_altered_ts --- sqlmesh/core/signal.py | 15 ++++++++-- sqlmesh/core/snapshot/definition.py | 30 ++++++++++++++----- sqlmesh/core/state_sync/db/interval.py | 8 ++--- .../integration/test_integration.py | 30 ++++++++++++++----- 4 files changed, 60 insertions(+), 23 deletions(-) diff --git a/sqlmesh/core/signal.py b/sqlmesh/core/signal.py index b99b79aa12..1050653f8a 100644 --- a/sqlmesh/core/signal.py +++ b/sqlmesh/core/signal.py @@ -42,11 +42,20 @@ class signal(registry_decorator): @signal() def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionContext) -> bool: + deployability_index = context.deployability_index adapter = context.engine_adapter - if not snapshot.last_altered_ts or not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS: + + if not deployability_index or not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS: + return True + + last_altered_ts = ( + snapshot.last_altered_ts + if deployability_index.is_deployable(snapshot) + else snapshot.dev_last_altered_ts + ) + if not last_altered_ts: return True - adapter = context.engine_adapter parent_snapshots = {context.snapshots[p.name] for p in snapshot.parents} if len(parent_snapshots) != len(snapshot.node.depends_on) or not all( p.is_external for p in parent_snapshots @@ -57,7 +66,7 @@ def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionConte # Finding new data means that the upstream depedencies have been altered # since the last time the model was evaluated upstream_dep_has_new_data = any( - upstream_last_altered_ts > snapshot.last_altered_ts + upstream_last_altered_ts > last_altered_ts for upstream_last_altered_ts in adapter.get_external_model_freshness( [p.name for p in parent_snapshots] ) diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index a8a68f6d5c..4e4dc7c066 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -186,6 +186,7 @@ class SnapshotIntervals(PydanticModel): dev_intervals: Intervals = [] pending_restatement_intervals: Intervals = [] last_altered_ts: t.Optional[int] = None + dev_last_altered_ts: t.Optional[int] = None @property def snapshot_id(self) -> t.Optional[SnapshotId]: @@ -206,6 +207,12 @@ def add_dev_interval(self, start: int, end: int) -> None: def add_pending_restatement_interval(self, start: int, end: int) -> None: self._add_interval(start, end, "pending_restatement_intervals") + def add_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None: + self._add_last_altered_ts(last_altered_ts, "last_altered_ts") + + def add_dev_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None: + self._add_last_altered_ts(last_altered_ts, "dev_last_altered_ts") + def remove_interval(self, start: int, end: int) -> None: self._remove_interval(start, end, "intervals") @@ -225,6 +232,13 @@ def _add_interval(self, start: int, end: int, interval_attr: str) -> None: target_intervals = merge_intervals([*target_intervals, (start, end)]) setattr(self, interval_attr, target_intervals) + def _add_last_altered_ts( + self, last_altered_ts: t.Optional[int], last_altered_attr: str + ) -> None: + if last_altered_ts: + existing_last_altered_ts = getattr(self, last_altered_attr) + setattr(self, last_altered_attr, max(existing_last_altered_ts or -1, last_altered_ts)) + def _remove_interval(self, start: int, end: int, interval_attr: str) -> None: target_intervals = getattr(self, interval_attr) target_intervals = remove_interval(target_intervals, start, end) @@ -717,6 +731,7 @@ class Snapshot(PydanticModel, SnapshotInfoMixin): # Physical table last modified timestamp, not to be confused with the "updated_ts" field # which is for the snapshot record itself last_altered_ts: t.Optional[int] = None + dev_last_altered_ts: t.Optional[int] = None @field_validator("ttl") @classmethod @@ -756,13 +771,6 @@ def hydrate_with_intervals_by_version( for interval in snapshot_intervals: snapshot.merge_intervals(interval) - # Differentiate last_altered_ts between snapshots with shared version but - # different dev versions e.g prod vs FORWARD_ONLY dev - if snapshot.dev_version == interval.dev_version and interval.last_altered_ts: - snapshot.last_altered_ts = max( - snapshot.last_altered_ts or -1, interval.last_altered_ts - ) - result.append(snapshot) return result @@ -969,12 +977,20 @@ def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None: if not apply_effective_from or end <= effective_from_ts: self.add_interval(start, end) + if other.last_altered_ts: + self.last_altered_ts = max(self.last_altered_ts or -1, other.last_altered_ts) + if self.dev_version == other.dev_version: # Merge dev intervals if the dev versions match which would mean # that this and the other snapshot are pointing to the same dev table. for start, end in other.dev_intervals: self.add_interval(start, end, is_dev=True) + if other.dev_last_altered_ts: + self.dev_last_altered_ts = max( + self.dev_last_altered_ts or -1, other.dev_last_altered_ts + ) + self.pending_restatement_intervals = merge_intervals( [*self.pending_restatement_intervals, *other.pending_restatement_intervals] ) diff --git a/sqlmesh/core/state_sync/db/interval.py b/sqlmesh/core/state_sync/db/interval.py index d12e2ad940..ec9a5bc107 100644 --- a/sqlmesh/core/state_sync/db/interval.py +++ b/sqlmesh/core/state_sync/db/interval.py @@ -309,12 +309,6 @@ def _get_snapshot_intervals( identifier=identifier, version=version, dev_version=dev_version, - last_altered_ts=last_altered_ts, - ) - - if last_altered_ts: - intervals[merge_key].last_altered_ts = max( - intervals[merge_key].last_altered_ts or 0, last_altered_ts ) if pending_restatement_interval_merge_key not in intervals: @@ -337,8 +331,10 @@ def _get_snapshot_intervals( else: if is_dev: intervals[merge_key].add_dev_interval(start, end) + intervals[merge_key].add_dev_last_altered_ts(last_altered_ts) else: intervals[merge_key].add_interval(start, end) + intervals[merge_key].add_last_altered_ts(last_altered_ts) # Remove all pending restatement intervals recorded before the current interval has been added intervals[ pending_restatement_interval_merge_key diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index 7e70ef0323..5828ab8b31 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -3848,13 +3848,24 @@ def test_external_model_freshness(ctx: TestContext, mocker: MockerFixture, tmp_p if not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS: pytest.skip("This test only runs for engines that support external model freshness") - def _assert_snapshot_last_altered_ts(context: Context, snapshot_id: str, timestamp: datetime): + def _assert_snapshot_last_altered_ts( + context: Context, + snapshot_id: str, + last_altered_ts: datetime, + dev_last_altered_ts: t.Optional[datetime] = None, + ): from sqlmesh.utils.date import to_datetime snapshot = context.state_sync.get_snapshots([snapshot_id])[snapshot_id] - assert to_datetime(snapshot.last_altered_ts).replace(microsecond=0) == timestamp.replace( + + assert to_datetime(snapshot.last_altered_ts).replace( microsecond=0 - ) + ) == last_altered_ts.replace(microsecond=0) + + if dev_last_altered_ts: + assert to_datetime(snapshot.dev_last_altered_ts).replace( + microsecond=0 + ) == dev_last_altered_ts.replace(microsecond=0) import sqlmesh @@ -3944,14 +3955,14 @@ def _set_config(gateway: str, config: Config) -> None: ) prod_snapshot_id = next(iter(prod_plan.context_diff.new_snapshots)) - _assert_snapshot_last_altered_ts(context, prod_snapshot_id, prod_plan_ts) + _assert_snapshot_last_altered_ts(context, prod_snapshot_id, last_altered_ts=prod_plan_ts) # Case 2: Model is NOT evaluated on run if external models are not fresh _assert_model_evaluation(lambda: context.run(), was_evaluated=False, day_delta=1) # Case 3: Differentiate last_altered_ts between snapshots with shared version # For instance, creating a FORWARD_ONLY change in dev (reusing the version but creating a dev preview) should not cause - # the prod snapshot's last_altered_ts to be updated when fetched from the state sync + # any side effects to the prod snapshot's last_altered_ts hydration model_path.write_text(model_path.read_text().replace("col1 * col2", "col1 + col2")) context.load() dev_plan_ts = now(minute_floor=False) + timedelta(days=2) @@ -3962,8 +3973,13 @@ def _set_config(gateway: str, config: Config) -> None: context.state_sync.clear_cache() dev_snapshot_id = next(iter(dev_plan.context_diff.new_snapshots)) - _assert_snapshot_last_altered_ts(context, dev_snapshot_id, dev_plan_ts) - _assert_snapshot_last_altered_ts(context, prod_snapshot_id, prod_plan_ts) + _assert_snapshot_last_altered_ts( + context, + dev_snapshot_id, + last_altered_ts=prod_plan_ts, + dev_last_altered_ts=dev_plan_ts, + ) + _assert_snapshot_last_altered_ts(context, prod_snapshot_id, last_altered_ts=prod_plan_ts) # Case 4: Model is evaluated on run if any external model is fresh adapter.execute(f"INSERT INTO {external_table2} (col2) VALUES (3)", quote_identifiers=False) From 4ac76da9add34eb789a61a4c5b83492a32cdc5b1 Mon Sep 17 00:00:00 2001 From: vaggelisd Date: Tue, 16 Sep 2025 18:34:55 +0300 Subject: [PATCH 04/11] Use dev_last_altered_ts during add_interval too --- sqlmesh/core/state_sync/base.py | 3 ++- sqlmesh/core/state_sync/db/interval.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sqlmesh/core/state_sync/base.py b/sqlmesh/core/state_sync/base.py index 9dd5f5c8ab..2f8a68dd4a 100644 --- a/sqlmesh/core/state_sync/base.py +++ b/sqlmesh/core/state_sync/base.py @@ -518,7 +518,8 @@ def add_interval( dev_version=snapshot.dev_version, intervals=intervals if not is_dev else [], dev_intervals=intervals if is_dev else [], - last_altered_ts=last_altered_ts, + last_altered_ts=last_altered_ts if not is_dev else None, + dev_last_altered_ts=last_altered_ts if is_dev else None, ) self.add_snapshots_intervals([snapshot_intervals]) diff --git a/sqlmesh/core/state_sync/db/interval.py b/sqlmesh/core/state_sync/db/interval.py index ec9a5bc107..af828382de 100644 --- a/sqlmesh/core/state_sync/db/interval.py +++ b/sqlmesh/core/state_sync/db/interval.py @@ -232,7 +232,7 @@ def _push_snapshot_intervals( end_ts, is_dev=True, is_compacted=is_compacted, - last_altered_ts=snapshot.last_altered_ts, + last_altered_ts=snapshot.dev_last_altered_ts, ) ) From e31cc9f2c14e3dc105db32e201703bc69bec56fb Mon Sep 17 00:00:00 2001 From: vaggelisd Date: Thu, 18 Sep 2025 14:28:25 +0300 Subject: [PATCH 05/11] Fix tests --- sqlmesh/migrations/v0098_add_last_altered_to_intervals.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sqlmesh/migrations/v0098_add_last_altered_to_intervals.py b/sqlmesh/migrations/v0098_add_last_altered_to_intervals.py index dd024d1e6d..1a119a338d 100644 --- a/sqlmesh/migrations/v0098_add_last_altered_to_intervals.py +++ b/sqlmesh/migrations/v0098_add_last_altered_to_intervals.py @@ -21,3 +21,7 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore ], ) engine_adapter.execute(alter_table_exp) + + +def migrate_rows(state_sync, **kwargs): # type: ignore + pass From 07a13a367e5ba98a79094b01e63ab09720f44c67 Mon Sep 17 00:00:00 2001 From: vaggelisd Date: Mon, 22 Sep 2025 19:01:52 +0300 Subject: [PATCH 06/11] Temporarily disable BQ from testing due to hangs --- sqlmesh/core/engine_adapter/bigquery.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index eb31f7bea4..e05211efe4 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -67,7 +67,6 @@ class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin): MAX_TABLE_COMMENT_LENGTH = 1024 MAX_COLUMN_COMMENT_LENGTH = 1024 SUPPORTS_QUERY_EXECUTION_TRACKING = True - SUPPORTS_EXTERNAL_MODEL_FRESHNESS = True SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA"] INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.MERGE From 59b0bf7800a67bc0bdcd62eb98e37fee74805019 Mon Sep 17 00:00:00 2001 From: vaggelisd Date: Thu, 25 Sep 2025 12:00:03 +0300 Subject: [PATCH 07/11] PR Feedback 1 --- sqlmesh/core/context.py | 7 +++ sqlmesh/core/engine_adapter/base.py | 4 +- sqlmesh/core/engine_adapter/bigquery.py | 2 +- sqlmesh/core/engine_adapter/snowflake.py | 4 +- sqlmesh/core/scheduler.py | 55 +------------------ sqlmesh/core/signal.py | 7 ++- sqlmesh/core/snapshot/definition.py | 16 +++--- sqlmesh/core/state_sync/db/interval.py | 4 +- .../integration/test_integration.py | 12 +--- 9 files changed, 30 insertions(+), 81 deletions(-) diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index e3feb1e14b..65ceee96ed 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -274,6 +274,7 @@ def __init__( deployability_index: t.Optional[DeployabilityIndex] = None, default_dialect: t.Optional[str] = None, default_catalog: t.Optional[str] = None, + is_restatement_plan: t.Optional[bool] = None, variables: t.Optional[t.Dict[str, t.Any]] = None, blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, ): @@ -284,6 +285,7 @@ def __init__( self._default_dialect = default_dialect self._variables = variables or {} self._blueprint_variables = blueprint_variables or {} + self._is_restatement_plan = is_restatement_plan @property def default_dialect(self) -> t.Optional[str]: @@ -308,6 +310,10 @@ def gateway(self) -> t.Optional[str]: """Returns the gateway name.""" return self.var(c.GATEWAY) + @property + def is_restatement_plan(self) -> t.Optional[bool]: + return self._is_restatement_plan + def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: """Returns a variable value.""" return self._variables.get(var_name.lower(), default) @@ -328,6 +334,7 @@ def with_variables( self.deployability_index, self._default_dialect, self._default_catalog, + self._is_restatement_plan, variables=variables, blueprint_variables=blueprint_variables, ) diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 135e7e49cc..68c6404081 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -119,7 +119,7 @@ class EngineAdapter: MAX_IDENTIFIER_LENGTH: t.Optional[int] = None ATTACH_CORRELATION_ID = True SUPPORTS_QUERY_EXECUTION_TRACKING = False - SUPPORTS_EXTERNAL_MODEL_FRESHNESS = False + SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS = False def __init__( self, @@ -2928,7 +2928,7 @@ def _check_identifier_length(self, expression: exp.Expression) -> None: f"Identifier name '{name}' (length {name_length}) exceeds {self.dialect.capitalize()}'s max identifier limit of {self.MAX_IDENTIFIER_LENGTH} characters" ) - def get_external_model_freshness(self, table_names: t.List[TableName]) -> t.List[int]: + def get_table_last_modified_ts(self, table_names: t.List[TableName]) -> t.List[int]: raise NotImplementedError() diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index e05211efe4..26abad9ebc 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -755,7 +755,7 @@ def table_exists(self, table_name: TableName) -> bool: except NotFound: return False - def get_external_model_freshness(self, table_names: t.List[TableName]) -> t.List[int]: + def get_table_last_modified_ts(self, table_names: t.List[TableName]) -> t.List[int]: from sqlmesh.utils.date import to_timestamp datasets_to_tables: t.DefaultDict[str, t.List[str]] = defaultdict(list) diff --git a/sqlmesh/core/engine_adapter/snowflake.py b/sqlmesh/core/engine_adapter/snowflake.py index 19a5a28e82..1554589779 100644 --- a/sqlmesh/core/engine_adapter/snowflake.py +++ b/sqlmesh/core/engine_adapter/snowflake.py @@ -54,7 +54,7 @@ class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixi SUPPORTS_MANAGED_MODELS = True CURRENT_CATALOG_EXPRESSION = exp.func("current_database") SUPPORTS_CREATE_DROP_CATALOG = True - SUPPORTS_EXTERNAL_MODEL_FRESHNESS = True + SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS = True SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["DATABASE", "SCHEMA", "TABLE"] SCHEMA_DIFFER_KWARGS = { "parameterized_type_defaults": { @@ -671,7 +671,7 @@ def close(self) -> t.Any: return super().close() - def get_external_model_freshness(self, table_names: t.List[TableName]) -> t.List[int]: + def get_table_last_modified_ts(self, table_names: t.List[TableName]) -> t.List[int]: from sqlmesh.utils.date import to_timestamp num_tables = len(table_names) diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 5f0b111a12..8ee98bd330 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -54,8 +54,6 @@ if t.TYPE_CHECKING: from sqlmesh.core.context import ExecutionContext - from sqlmesh.core._typing import TableName - from sqlmesh.core.engine_adapter import EngineAdapter logger = logging.getLogger(__name__) SnapshotToIntervals = t.Dict[Snapshot, Intervals] @@ -190,46 +188,6 @@ def merged_missing_intervals( } return snapshots_to_intervals - def can_skip_evaluation(self, snapshot: Snapshot, snapshots: t.Dict[str, Snapshot]) -> bool: - if not snapshot.last_altered_ts: - return False - - from collections import defaultdict - - parent_snapshots = {p for p in snapshots.values() if p.name != snapshot.name} - if len(parent_snapshots) != len(snapshot.node.depends_on): - # The mismatch can happen if e.g an external model is not registered in the project - return False - - adapter_to_parent_snapshots: t.Dict[EngineAdapter, t.List[Snapshot]] = defaultdict(list) - - for parent_snapshot in parent_snapshots: - if not parent_snapshot.is_external: - return False - - adapter = self.snapshot_evaluator.get_adapter(parent_snapshot.model_gateway) - if not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS: - return False - - adapter_to_parent_snapshots[adapter].append(parent_snapshot) - - if not adapter_to_parent_snapshots: - return False - - external_models_freshness: t.List[int] = [] - - for adapter, adapter_snapshots in adapter_to_parent_snapshots.items(): - table_names: t.List[TableName] = [ - exp.to_table(parent_snapshot.name, parent_snapshot.node.dialect) - for parent_snapshot in adapter_snapshots - ] - external_models_freshness.extend(adapter.get_external_model_freshness(table_names)) - - return all( - snapshot.last_altered_ts > external_model_freshness - for external_model_freshness in external_models_freshness - ) - def evaluate( self, snapshot: Snapshot, @@ -413,6 +371,7 @@ def batch_intervals( deployability_index, default_dialect=adapter.dialect, default_catalog=self.default_catalog, + is_restatement_plan=is_restatement_plan, ) intervals = self._check_ready_intervals( @@ -991,15 +950,6 @@ def _check_ready_intervals( signal_names = signals.signals_to_kwargs.keys() - if ( - is_restatement_plan - and len(signal_names) == 1 - and next(iter(signal_names)) == "freshness" - ): - # Freshness signal is not checked for restatement plans to allow users - # for an escape hatch in reevaluating models - return intervals - self.console.start_signal_progress( snapshot, self.default_catalog, @@ -1007,9 +957,6 @@ def _check_ready_intervals( ) for signal_idx, (signal_name, kwargs) in enumerate(signals.signals_to_kwargs.items()): - if is_restatement_plan and signal_name == "freshness": - continue - # Capture intervals before signal check for display intervals_to_check = merge_intervals(intervals) diff --git a/sqlmesh/core/signal.py b/sqlmesh/core/signal.py index 1050653f8a..8c9538c1a3 100644 --- a/sqlmesh/core/signal.py +++ b/sqlmesh/core/signal.py @@ -42,10 +42,13 @@ class signal(registry_decorator): @signal() def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionContext) -> bool: + if context.is_restatement_plan: + return True + deployability_index = context.deployability_index adapter = context.engine_adapter - if not deployability_index or not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS: + if not deployability_index or not adapter.SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS: return True last_altered_ts = ( @@ -67,7 +70,7 @@ def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionConte # since the last time the model was evaluated upstream_dep_has_new_data = any( upstream_last_altered_ts > last_altered_ts - for upstream_last_altered_ts in adapter.get_external_model_freshness( + for upstream_last_altered_ts in adapter.get_table_last_modified_ts( [p.name for p in parent_snapshots] ) ) diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 4e4dc7c066..23ab0b21db 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -207,11 +207,11 @@ def add_dev_interval(self, start: int, end: int) -> None: def add_pending_restatement_interval(self, start: int, end: int) -> None: self._add_interval(start, end, "pending_restatement_intervals") - def add_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None: - self._add_last_altered_ts(last_altered_ts, "last_altered_ts") + def update_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None: + self._update_last_altered_ts(last_altered_ts, "last_altered_ts") - def add_dev_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None: - self._add_last_altered_ts(last_altered_ts, "dev_last_altered_ts") + def update_dev_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None: + self._update_last_altered_ts(last_altered_ts, "dev_last_altered_ts") def remove_interval(self, start: int, end: int) -> None: self._remove_interval(start, end, "intervals") @@ -232,12 +232,12 @@ def _add_interval(self, start: int, end: int, interval_attr: str) -> None: target_intervals = merge_intervals([*target_intervals, (start, end)]) setattr(self, interval_attr, target_intervals) - def _add_last_altered_ts( + def _update_last_altered_ts( self, last_altered_ts: t.Optional[int], last_altered_attr: str ) -> None: if last_altered_ts: existing_last_altered_ts = getattr(self, last_altered_attr) - setattr(self, last_altered_attr, max(existing_last_altered_ts or -1, last_altered_ts)) + setattr(self, last_altered_attr, max(existing_last_altered_ts or 0, last_altered_ts)) def _remove_interval(self, start: int, end: int, interval_attr: str) -> None: target_intervals = getattr(self, interval_attr) @@ -978,7 +978,7 @@ def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None: self.add_interval(start, end) if other.last_altered_ts: - self.last_altered_ts = max(self.last_altered_ts or -1, other.last_altered_ts) + self.last_altered_ts = max(self.last_altered_ts or 0, other.last_altered_ts) if self.dev_version == other.dev_version: # Merge dev intervals if the dev versions match which would mean @@ -988,7 +988,7 @@ def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None: if other.dev_last_altered_ts: self.dev_last_altered_ts = max( - self.dev_last_altered_ts or -1, other.dev_last_altered_ts + self.dev_last_altered_ts or 0, other.dev_last_altered_ts ) self.pending_restatement_intervals = merge_intervals( diff --git a/sqlmesh/core/state_sync/db/interval.py b/sqlmesh/core/state_sync/db/interval.py index af828382de..8ccdc58fa0 100644 --- a/sqlmesh/core/state_sync/db/interval.py +++ b/sqlmesh/core/state_sync/db/interval.py @@ -331,10 +331,10 @@ def _get_snapshot_intervals( else: if is_dev: intervals[merge_key].add_dev_interval(start, end) - intervals[merge_key].add_dev_last_altered_ts(last_altered_ts) + intervals[merge_key].update_dev_last_altered_ts(last_altered_ts) else: intervals[merge_key].add_interval(start, end) - intervals[merge_key].add_last_altered_ts(last_altered_ts) + intervals[merge_key].update_last_altered_ts(last_altered_ts) # Remove all pending restatement intervals recorded before the current interval has been added intervals[ pending_restatement_interval_merge_key diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index 5828ab8b31..5190d26e98 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -3845,8 +3845,8 @@ def _assert_mview_value(value: int): @use_terminal_console def test_external_model_freshness(ctx: TestContext, mocker: MockerFixture, tmp_path: pathlib.Path): adapter = ctx.engine_adapter - if not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS: - pytest.skip("This test only runs for engines that support external model freshness") + if not adapter.SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS: + pytest.skip("This test only runs for engines that support metadata-based freshness") def _assert_snapshot_last_altered_ts( context: Context, @@ -3880,15 +3880,7 @@ def _assert_model_evaluation(lambda_func, was_evaluated, day_delta=0): evaluate_function_called = spy.call_count == 1 signal_was_checked = "Checking signals for" in output.stdout - restatement_plan = isinstance(plan_or_run_result, Plan) and plan_or_run_result.restatements - if restatement_plan: - # Restatement plans exclude this signal so we expect the actual evaluation - # to happen but not through the signal - assert evaluate_function_called - assert not signal_was_checked - return - # All other cases (e.g normal plans or runs) will check the freshness signal assert signal_was_checked if was_evaluated: assert "All ready" in output.stdout From 1958d17dbb10d924211d7f7e43c1a32535f1cb90 Mon Sep 17 00:00:00 2001 From: vaggelisd Date: Thu, 25 Sep 2025 20:24:07 +0300 Subject: [PATCH 08/11] PR Feedback 2 --- sqlmesh/core/context.py | 10 +++++----- sqlmesh/core/plan/evaluator.py | 2 +- sqlmesh/core/scheduler.py | 14 ++++---------- sqlmesh/core/signal.py | 10 ++++------ 4 files changed, 14 insertions(+), 22 deletions(-) diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 65ceee96ed..e31a04fe81 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -274,7 +274,7 @@ def __init__( deployability_index: t.Optional[DeployabilityIndex] = None, default_dialect: t.Optional[str] = None, default_catalog: t.Optional[str] = None, - is_restatement_plan: t.Optional[bool] = None, + is_restatement: t.Optional[bool] = None, variables: t.Optional[t.Dict[str, t.Any]] = None, blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, ): @@ -285,7 +285,7 @@ def __init__( self._default_dialect = default_dialect self._variables = variables or {} self._blueprint_variables = blueprint_variables or {} - self._is_restatement_plan = is_restatement_plan + self._is_restatement = is_restatement @property def default_dialect(self) -> t.Optional[str]: @@ -311,8 +311,8 @@ def gateway(self) -> t.Optional[str]: return self.var(c.GATEWAY) @property - def is_restatement_plan(self) -> t.Optional[bool]: - return self._is_restatement_plan + def is_restatement(self) -> t.Optional[bool]: + return self._is_restatement def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: """Returns a variable value.""" @@ -334,7 +334,7 @@ def with_variables( self.deployability_index, self._default_dialect, self._default_catalog, - self._is_restatement_plan, + self._is_restatement, variables=variables, blueprint_variables=blueprint_variables, ) diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index d5c766c4d8..f2f432a97e 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -258,7 +258,7 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla allow_additive_snapshots=plan.allow_additive_models, selected_snapshot_ids=stage.selected_snapshot_ids, selected_models=plan.selected_models, - is_restatement_plan=bool(plan.restatements), + is_restatement=bool(plan.restatements), ) if errors: raise PlanError("Plan application failed.") diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 8ee98bd330..7e27205fc6 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -200,7 +200,6 @@ def evaluate( allow_destructive_snapshots: t.Optional[t.Set[str]] = None, allow_additive_snapshots: t.Optional[t.Set[str]] = None, target_table_exists: t.Optional[bool] = None, - is_restatement_plan: bool = False, **kwargs: t.Any, ) -> t.List[AuditResult]: """Evaluate a snapshot and add the processed interval to the state sync. @@ -338,7 +337,7 @@ def batch_intervals( deployability_index: t.Optional[DeployabilityIndex], environment_naming_info: EnvironmentNamingInfo, dag: t.Optional[DAG[SnapshotId]] = None, - is_restatement_plan: bool = False, + is_restatement: bool = False, ) -> t.Dict[Snapshot, Intervals]: dag = dag or snapshots_to_dag(merged_intervals) @@ -371,7 +370,7 @@ def batch_intervals( deployability_index, default_dialect=adapter.dialect, default_catalog=self.default_catalog, - is_restatement_plan=is_restatement_plan, + is_restatement=is_restatement, ) intervals = self._check_ready_intervals( @@ -379,7 +378,6 @@ def batch_intervals( intervals, context, environment_naming_info, - is_restatement_plan=is_restatement_plan, ) unready -= set(intervals) @@ -428,7 +426,7 @@ def run_merged_intervals( run_environment_statements: bool = False, audit_only: bool = False, auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}, - is_restatement_plan: bool = False, + is_restatement: bool = False, ) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]: """Runs precomputed batches of missing intervals. @@ -466,7 +464,7 @@ def run_merged_intervals( deployability_index, environment_naming_info, dag=snapshot_dag, - is_restatement_plan=is_restatement_plan, + is_restatement=is_restatement, ) self.console.start_evaluation_progress( batched_intervals, @@ -552,7 +550,6 @@ def run_node(node: SchedulingUnit) -> None: allow_additive_snapshots=allow_additive_snapshots, target_table_exists=snapshot.snapshot_id not in snapshots_to_create, selected_models=selected_models, - is_restatement_plan=is_restatement_plan, ) evaluation_duration_ms = now_timestamp() - execution_start_ts @@ -926,7 +923,6 @@ def _check_ready_intervals( intervals: Intervals, context: ExecutionContext, environment_naming_info: EnvironmentNamingInfo, - is_restatement_plan: bool = False, ) -> Intervals: """Checks if the intervals are ready for evaluation for the given snapshot. @@ -948,8 +944,6 @@ def _check_ready_intervals( if not (signals and signals.signals_to_kwargs): return intervals - signal_names = signals.signals_to_kwargs.keys() - self.console.start_signal_progress( snapshot, self.default_catalog, diff --git a/sqlmesh/core/signal.py b/sqlmesh/core/signal.py index 8c9538c1a3..52e6c59c8d 100644 --- a/sqlmesh/core/signal.py +++ b/sqlmesh/core/signal.py @@ -7,6 +7,7 @@ from sqlmesh.core.context import ExecutionContext from sqlmesh.core.snapshot.definition import Snapshot from sqlmesh.utils.date import DatetimeRanges + from sqlmesh.core.snapshot.definition import DeployabilityIndex class signal(registry_decorator): @@ -42,15 +43,12 @@ class signal(registry_decorator): @signal() def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionContext) -> bool: - if context.is_restatement_plan: - return True - - deployability_index = context.deployability_index adapter = context.engine_adapter - - if not deployability_index or not adapter.SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS: + if context.is_restatement or not adapter.SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS: return True + deployability_index = context.deployability_index or DeployabilityIndex.all_deployable() + last_altered_ts = ( snapshot.last_altered_ts if deployability_index.is_deployable(snapshot) From e05d5b457abbecc2649bcdb9835d997279d2290d Mon Sep 17 00:00:00 2001 From: vaggelisd Date: Thu, 25 Sep 2025 20:44:51 +0300 Subject: [PATCH 09/11] Bump migration script --- ...red_to_intervals.py => v0099_add_last_altered_to_intervals.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename sqlmesh/migrations/{v0098_add_last_altered_to_intervals.py => v0099_add_last_altered_to_intervals.py} (100%) diff --git a/sqlmesh/migrations/v0098_add_last_altered_to_intervals.py b/sqlmesh/migrations/v0099_add_last_altered_to_intervals.py similarity index 100% rename from sqlmesh/migrations/v0098_add_last_altered_to_intervals.py rename to sqlmesh/migrations/v0099_add_last_altered_to_intervals.py From d8ecf37874f644313005ee17f130ae4e56c4971c Mon Sep 17 00:00:00 2001 From: vaggelisd Date: Mon, 29 Sep 2025 11:30:41 +0300 Subject: [PATCH 10/11] Revert CI --- .circleci/continue_config.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index 27bc5cf6b0..c549c0ae78 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -313,10 +313,10 @@ workflows: - athena - fabric - gcp-postgres - # filters: - # branches: - # only: - # - main + filters: + branches: + only: + - main - ui_style - ui_test - vscode_test From 0080c00cb6839bb5c0e8925f84127a7c2a80ffb9 Mon Sep 17 00:00:00 2001 From: vaggelisd Date: Mon, 29 Sep 2025 15:15:36 +0300 Subject: [PATCH 11/11] Add arg to EngineAdapterStateSync --- sqlmesh/core/state_sync/db/facade.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/state_sync/db/facade.py b/sqlmesh/core/state_sync/db/facade.py index 29fc9f1740..3c23ef339c 100644 --- a/sqlmesh/core/state_sync/db/facade.py +++ b/sqlmesh/core/state_sync/db/facade.py @@ -381,8 +381,9 @@ def add_interval( start: TimeLike, end: TimeLike, is_dev: bool = False, + last_altered_ts: t.Optional[int] = None, ) -> None: - super().add_interval(snapshot, start, end, is_dev) + super().add_interval(snapshot, start, end, is_dev, last_altered_ts) @transactional() def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None: