Skip to content

Commit f9ffb34

Browse files
committed
Fix: Set the 'unrestorable' flag even when the snapshot has already been paused (#2205)
1 parent 79eda6e commit f9ffb34

File tree

2 files changed

+78
-2
lines changed

2 files changed

+78
-2
lines changed

sqlmesh/core/state_sync/common.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,23 @@ def unpause_snapshots(
240240
[(snapshot, snapshot.get_removal_interval(effective_from_ts, current_ts))]
241241
)
242242

243+
update_required = False
244+
243245
if snapshot.unpaused_ts:
244246
logger.info("Pausing snapshot %s", snapshot.snapshot_id)
245247
snapshot.set_unpaused_ts(None)
246-
if not snapshot.is_forward_only and target_snapshot.is_forward_only:
247-
snapshot.unrestorable = True
248+
update_required = True
249+
250+
if (
251+
not snapshot.is_forward_only
252+
and target_snapshot.is_forward_only
253+
and not snapshot.unrestorable
254+
):
255+
logger.info("Marking snapshot %s as unrestorable", snapshot.snapshot_id)
256+
snapshot.unrestorable = True
257+
update_required = True
258+
259+
if update_required:
248260
self._update_snapshot(snapshot)
249261

250262
def _ensure_no_gaps(

tests/core/test_state_sync.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,6 +1173,70 @@ def test_unpause_snapshots(state_sync: EngineAdapterStateSync, make_snapshot: t.
11731173
assert not actual_snapshots[new_snapshot.snapshot_id].unrestorable
11741174

11751175

1176+
def test_unrestorable_snapshot(state_sync: EngineAdapterStateSync, make_snapshot: t.Callable):
1177+
snapshot = make_snapshot(
1178+
SqlModel(
1179+
name="test_snapshot",
1180+
query=parse_one("select 1, ds"),
1181+
cron="@daily",
1182+
),
1183+
)
1184+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
1185+
snapshot.version = "a"
1186+
1187+
assert not snapshot.unpaused_ts
1188+
state_sync.push_snapshots([snapshot])
1189+
1190+
unpaused_dt = "2022-01-01"
1191+
state_sync.unpause_snapshots([snapshot], unpaused_dt)
1192+
1193+
actual_snapshot = state_sync.get_snapshots([snapshot])[snapshot.snapshot_id]
1194+
assert actual_snapshot.unpaused_ts
1195+
assert actual_snapshot.unpaused_ts == to_timestamp(unpaused_dt)
1196+
1197+
new_indirect_non_breaking_snapshot = make_snapshot(
1198+
SqlModel(name="test_snapshot", query=parse_one("select 2, ds"), cron="@daily")
1199+
)
1200+
new_indirect_non_breaking_snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING)
1201+
new_indirect_non_breaking_snapshot.version = "a"
1202+
1203+
assert not new_indirect_non_breaking_snapshot.unpaused_ts
1204+
state_sync.push_snapshots([new_indirect_non_breaking_snapshot])
1205+
state_sync.unpause_snapshots([new_indirect_non_breaking_snapshot], unpaused_dt)
1206+
1207+
actual_snapshots = state_sync.get_snapshots([snapshot, new_indirect_non_breaking_snapshot])
1208+
assert not actual_snapshots[snapshot.snapshot_id].unpaused_ts
1209+
assert actual_snapshots[
1210+
new_indirect_non_breaking_snapshot.snapshot_id
1211+
].unpaused_ts == to_timestamp(unpaused_dt)
1212+
1213+
assert not actual_snapshots[snapshot.snapshot_id].unrestorable
1214+
assert not actual_snapshots[new_indirect_non_breaking_snapshot.snapshot_id].unrestorable
1215+
1216+
new_forward_only_snapshot = make_snapshot(
1217+
SqlModel(name="test_snapshot", query=parse_one("select 3, ds"), cron="@daily")
1218+
)
1219+
new_forward_only_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
1220+
new_forward_only_snapshot.version = "a"
1221+
1222+
assert not new_forward_only_snapshot.unpaused_ts
1223+
state_sync.push_snapshots([new_forward_only_snapshot])
1224+
state_sync.unpause_snapshots([new_forward_only_snapshot], unpaused_dt)
1225+
1226+
actual_snapshots = state_sync.get_snapshots(
1227+
[snapshot, new_indirect_non_breaking_snapshot, new_forward_only_snapshot]
1228+
)
1229+
assert not actual_snapshots[snapshot.snapshot_id].unpaused_ts
1230+
assert not actual_snapshots[new_indirect_non_breaking_snapshot.snapshot_id].unpaused_ts
1231+
assert actual_snapshots[new_forward_only_snapshot.snapshot_id].unpaused_ts == to_timestamp(
1232+
unpaused_dt
1233+
)
1234+
1235+
assert actual_snapshots[snapshot.snapshot_id].unrestorable
1236+
assert actual_snapshots[new_indirect_non_breaking_snapshot.snapshot_id].unrestorable
1237+
assert not actual_snapshots[new_forward_only_snapshot.snapshot_id].unrestorable
1238+
1239+
11761240
def test_unpause_snapshots_remove_intervals(
11771241
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
11781242
):

0 commit comments

Comments
 (0)