Skip to content

Commit d4c10e1

Browse files
committed
Fix: Exclude pending restatement intervals when fetching max interval end for a snapshot (#3661)
1 parent 5d1f952 commit d4c10e1

File tree

2 files changed

+62
-1
lines changed

2 files changed

+62
-1
lines changed

sqlmesh/core/state_sync/engine_adapter.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1156,7 +1156,11 @@ def max_interval_end_per_model(
11561156
.from_(exp.to_table(self.intervals_table).as_(table_alias))
11571157
.where(where, copy=False)
11581158
.where(
1159-
exp.and_(exp.to_column("is_dev").not_(), exp.to_column("is_removed").not_()),
1159+
exp.and_(
1160+
exp.to_column("is_dev").not_(),
1161+
exp.to_column("is_removed").not_(),
1162+
exp.to_column("is_pending_restatement").not_(),
1163+
),
11601164
copy=False,
11611165
)
11621166
.group_by(name_col, version_col, copy=False)

tests/core/test_state_sync.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2129,6 +2129,63 @@ def test_max_interval_end_per_model(
21292129
assert state_sync.max_interval_end_per_model(environment_name, set()) == {}
21302130

21312131

2132+
def test_max_interval_end_per_model_with_pending_restatements(
2133+
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
2134+
) -> None:
2135+
snapshot = make_snapshot(
2136+
SqlModel(
2137+
name="a",
2138+
cron="@daily",
2139+
query=parse_one("select 1, ds"),
2140+
),
2141+
)
2142+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
2143+
2144+
state_sync.push_snapshots([snapshot])
2145+
2146+
state_sync.add_interval(snapshot, "2023-01-01", "2023-01-01")
2147+
state_sync.add_interval(snapshot, "2023-01-02", "2023-01-02")
2148+
state_sync.add_interval(snapshot, "2023-01-03", "2023-01-03")
2149+
# Add a pending restatement interval
2150+
state_sync.add_snapshots_intervals(
2151+
[
2152+
SnapshotIntervals(
2153+
name=snapshot.name,
2154+
identifier=snapshot.identifier,
2155+
version=snapshot.version,
2156+
intervals=[],
2157+
dev_intervals=[],
2158+
pending_restatement_intervals=[
2159+
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05"))
2160+
],
2161+
)
2162+
]
2163+
)
2164+
2165+
snapshot = state_sync.get_snapshots([snapshot.snapshot_id])[snapshot.snapshot_id]
2166+
assert snapshot.intervals == [(to_timestamp("2023-01-01"), to_timestamp("2023-01-04"))]
2167+
assert snapshot.pending_restatement_intervals == [
2168+
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05"))
2169+
]
2170+
2171+
environment_name = "test_max_interval_end_for_environment"
2172+
2173+
state_sync.promote(
2174+
Environment(
2175+
name=environment_name,
2176+
snapshots=[snapshot.table_info],
2177+
start_at="2023-01-01",
2178+
end_at="2023-01-03",
2179+
plan_id="test_plan_id",
2180+
previous_finalized_snapshots=[],
2181+
)
2182+
)
2183+
2184+
assert state_sync.max_interval_end_per_model(environment_name) == {
2185+
snapshot.name: to_timestamp("2023-01-04")
2186+
}
2187+
2188+
21322189
def test_max_interval_end_per_model_ensure_finalized_snapshots(
21332190
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
21342191
) -> None:

0 commit comments

Comments
 (0)