Skip to content

Commit 45c6d61

Browse files
committed
Fix: Include unexpired downstream views when cleaning up expired tables
1 parent eb4c0b4 commit 45c6d61

File tree

3 files changed

+297
-17
lines changed

3 files changed

+297
-17
lines changed

sqlmesh/core/state_sync/db/snapshot.py

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
SnapshotId,
3131
SnapshotFingerprint,
3232
SnapshotChangeCategory,
33+
snapshots_to_dag,
3334
)
3435
from sqlmesh.utils.migration import index_text_type, blob_text_type
3536
from sqlmesh.utils.date import now_timestamp, TimeLike, now, to_timestamp
@@ -224,18 +225,51 @@ def _get_expired_snapshots(
224225

225226
if not ignore_ttl:
226227
expired_query = expired_query.where(
227-
(exp.column("updated_ts") + exp.column("ttl_ms")) <= current_ts
228+
((exp.column("updated_ts") + exp.column("ttl_ms")) <= current_ts)
229+
# we need to include views even if they havent expired in case one depends on a table that /has/ expired
230+
.or_(exp.column("kind_name").eq(ModelKindName.VIEW))
228231
)
229232

230-
expired_candidates = {
233+
candidates = {
231234
SnapshotId(name=name, identifier=identifier): SnapshotNameVersion(
232235
name=name, version=version
233236
)
234237
for name, identifier, version in fetchall(self.engine_adapter, expired_query)
235238
}
236-
if not expired_candidates:
239+
240+
if not candidates:
237241
return set(), []
238242

243+
expired_candidates: t.Dict[SnapshotId, SnapshotNameVersion] = {}
244+
245+
if ignore_ttl:
246+
expired_candidates = candidates
247+
else:
248+
# Fetch full snapshots because we need to build a dependency tree
249+
full_candidates = self.get_snapshots(candidates.keys())
250+
251+
# Build DAG so we can check if any views that are not expired depend on a table snapshot that is expired
252+
dag = snapshots_to_dag(full_candidates.values())
253+
254+
# remove any non-expired views that dont depend on expired tables
255+
for snapshot_id in dag.reversed:
256+
snapshot = full_candidates.get(snapshot_id, None)
257+
if not snapshot:
258+
continue
259+
260+
if snapshot.expiration_ts <= current_ts:
261+
# All expired snapshots should be included
262+
expired_candidates[snapshot.snapshot_id] = snapshot.name_version
263+
elif snapshot.model_kind_name == ModelKindName.VIEW:
264+
# With non-expired views, check if they have any expired upstream tables
265+
if any(
266+
full_candidates[parent_id].expiration_ts <= current_ts
267+
for parent_id in dag.upstream(snapshot_id)
268+
if full_candidates[parent_id].model_kind_name != ModelKindName.VIEW
269+
):
270+
# an upstream table has expired, therefore this view is no longer valid and needs to be cleaned up as well
271+
expired_candidates[snapshot.snapshot_id] = snapshot.name_version
272+
239273
promoted_snapshot_ids = {
240274
snapshot.snapshot_id
241275
for environment in environments
@@ -253,37 +287,39 @@ def _is_snapshot_used(snapshot: SharedVersionSnapshot) -> bool:
253287
unique_expired_versions, batch_size=self.SNAPSHOT_BATCH_SIZE
254288
)
255289
cleanup_targets = []
256-
expired_snapshot_ids = set()
290+
expired_sv_snapshot_ids = set()
257291
for versions_batch in version_batches:
258-
snapshots = self._get_snapshots_with_same_version(versions_batch)
292+
sv_snapshots = self._get_snapshots_with_same_version(versions_batch)
259293

260294
snapshots_by_version = defaultdict(set)
261295
snapshots_by_dev_version = defaultdict(set)
262-
for s in snapshots:
296+
for s in sv_snapshots:
263297
snapshots_by_version[(s.name, s.version)].add(s.snapshot_id)
264298
snapshots_by_dev_version[(s.name, s.dev_version)].add(s.snapshot_id)
265299

266-
expired_snapshots = [s for s in snapshots if not _is_snapshot_used(s)]
267-
expired_snapshot_ids.update([s.snapshot_id for s in expired_snapshots])
300+
expired_sv_snapshots = [s for s in sv_snapshots if not _is_snapshot_used(s)]
301+
expired_sv_snapshot_ids.update([s.snapshot_id for s in expired_sv_snapshots])
268302

269-
for snapshot in expired_snapshots:
270-
shared_version_snapshots = snapshots_by_version[(snapshot.name, snapshot.version)]
271-
shared_version_snapshots.discard(snapshot.snapshot_id)
303+
for sv_snapshot in expired_sv_snapshots:
304+
shared_version_snapshots = snapshots_by_version[
305+
(sv_snapshot.name, sv_snapshot.version)
306+
]
307+
shared_version_snapshots.discard(sv_snapshot.snapshot_id)
272308

273309
shared_dev_version_snapshots = snapshots_by_dev_version[
274-
(snapshot.name, snapshot.dev_version)
310+
(sv_snapshot.name, sv_snapshot.dev_version)
275311
]
276-
shared_dev_version_snapshots.discard(snapshot.snapshot_id)
312+
shared_dev_version_snapshots.discard(sv_snapshot.snapshot_id)
277313

278314
if not shared_dev_version_snapshots:
279315
cleanup_targets.append(
280316
SnapshotTableCleanupTask(
281-
snapshot=snapshot.full_snapshot.table_info,
317+
snapshot=sv_snapshot.full_snapshot.table_info,
282318
dev_table_only=bool(shared_version_snapshots),
283319
)
284320
)
285321

286-
return expired_snapshot_ids, cleanup_targets
322+
return expired_sv_snapshot_ids, cleanup_targets
287323

288324
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
289325
"""Deletes snapshots.

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 166 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import pandas as pd # noqa: TID253
1414
import pytest
1515
import pytz
16+
import time_machine
1617
from sqlglot import exp, parse_one
1718
from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
1819

@@ -28,8 +29,8 @@
2829
from sqlmesh.core.model.definition import create_sql_model
2930
from sqlmesh.core.plan import Plan
3031
from sqlmesh.core.state_sync.db import EngineAdapterStateSync
31-
from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory
32-
from sqlmesh.utils.date import now, to_date, to_time_column
32+
from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory, SnapshotId
33+
from sqlmesh.utils.date import now, to_date, to_time_column, to_ds
3334
from sqlmesh.core.table_diff import TableDiff
3435
from sqlmesh.utils.errors import SQLMeshError
3536
from sqlmesh.utils.pydantic import PydanticModel
@@ -2672,3 +2673,166 @@ def test_identifier_length_limit(ctx: TestContext):
26722673
match=re.escape(match),
26732674
):
26742675
adapter.create_table(long_table_name, {"col": exp.DataType.build("int")})
2676+
2677+
2678+
def test_janitor_out_of_order_drop(ctx: TestContext, tmp_path: pathlib.Path):
2679+
"""
2680+
Scenario:
2681+
2682+
Ensure that cleaning up expired table snapshots also cleans up any unexpired view snapshots that depend on them
2683+
2684+
- We create a A (table) <- B (view)
2685+
- In dev, we modify A - triggers new version of A and a dev preview of B that both expire in 7 days
2686+
- We advance time by 3 days
2687+
- In dev, we modify B - triggers a new version of B that depends on A but expires 3 days after A
2688+
- We advance time by 5 days so that A has reached its expiry but B has not
2689+
- We expire dev so that none of these snapshots are promoted and are thus targets for cleanup
2690+
- We run the janitor
2691+
2692+
Expected outcome:
2693+
- All the dev versions of A and B should be dropped
2694+
- We should not get a 'ERROR: cannot drop table x because other objects depend on it' on engines that do schema binding
2695+
"""
2696+
2697+
models_dir = tmp_path / "models"
2698+
models_dir.mkdir()
2699+
schema = exp.to_table(ctx.schema(TEST_SCHEMA)).this
2700+
2701+
(models_dir / "model_a.sql").write_text(f"""
2702+
MODEL (
2703+
name {schema}.model_a,
2704+
kind FULL
2705+
);
2706+
2707+
SELECT 1 as a, 2 as b;
2708+
""")
2709+
2710+
(models_dir / "model_b.sql").write_text(f"""
2711+
MODEL (
2712+
name {schema}.model_b,
2713+
kind VIEW
2714+
);
2715+
2716+
SELECT a from {schema}.model_a;
2717+
""")
2718+
2719+
def _mutate_config(gateway: str, config: Config):
2720+
# helps with debugging when you can see the SQLMesh state right next to the model tables
2721+
config.gateways[gateway].state_schema = f"{schema}"
2722+
2723+
with time_machine.travel("2020-01-01 00:00:00"):
2724+
sqlmesh = ctx.create_context(
2725+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
2726+
)
2727+
sqlmesh.plan(auto_apply=True)
2728+
2729+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
2730+
# expiry is last updated + ttl
2731+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
2732+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-01"
2733+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-08"
2734+
2735+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
2736+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
2737+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-01"
2738+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-08"
2739+
2740+
model_a_prod_snapshot = model_a_snapshot
2741+
model_b_prod_snapshot = model_b_snapshot
2742+
2743+
# move forward 1 days
2744+
# new dev environment - touch models to create new snapshots
2745+
# model a / b expiry in prod should remain unmodified
2746+
# model a / b expiry in dev should be as at today
2747+
with time_machine.travel("2020-01-02 00:00:00"):
2748+
(models_dir / "model_a.sql").write_text(f"""
2749+
MODEL (
2750+
name {schema}.model_a,
2751+
kind FULL
2752+
);
2753+
2754+
SELECT 1 as a, 2 as b, 3 as c;
2755+
""")
2756+
2757+
sqlmesh = ctx.create_context(
2758+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
2759+
)
2760+
sqlmesh.plan(environment="dev", auto_apply=True)
2761+
2762+
# should now have 4 snapshots in state - 2x model a and 2x model b
2763+
# the new model b is a dev preview because its upstream model changed
2764+
assert len(sqlmesh.engine_adapter.fetchall(f"select * from {schema}._snapshots")) == 4
2765+
2766+
# context just has the two latest
2767+
assert len(sqlmesh.snapshots) == 2
2768+
2769+
# these expire 1 day later than what's in prod
2770+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
2771+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
2772+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02"
2773+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09"
2774+
2775+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
2776+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
2777+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-02"
2778+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-09"
2779+
2780+
# move forward 3 days
2781+
# touch model b in dev but leave model a
2782+
# this bumps the model b expiry but model a remains unchanged, so will expire before model b even though model b depends on it
2783+
with time_machine.travel("2020-01-05 00:00:00"):
2784+
(models_dir / "model_b.sql").write_text(f"""
2785+
MODEL (
2786+
name {schema}.model_b,
2787+
kind VIEW
2788+
);
2789+
2790+
SELECT a, 'b' as b from {schema}.model_a;
2791+
""")
2792+
2793+
sqlmesh = ctx.create_context(
2794+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
2795+
)
2796+
sqlmesh.plan(environment="dev", auto_apply=True)
2797+
2798+
# should now have 5 snapshots in state - 2x model a and 3x model b
2799+
assert len(sqlmesh.engine_adapter.fetchall(f"select * from {schema}._snapshots")) == 5
2800+
2801+
# context just has the two latest
2802+
assert len(sqlmesh.snapshots) == 2
2803+
2804+
# model a expiry should not have changed
2805+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
2806+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
2807+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02"
2808+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09"
2809+
2810+
# model b should now expire well after model a
2811+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
2812+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
2813+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-05"
2814+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-12"
2815+
2816+
# move forward to date where after model a has expired but before model b has expired
2817+
# invalidate dev to trigger cleanups
2818+
# run janitor. model a is expired so will be cleaned up and this will cascade to model b.
2819+
with time_machine.travel("2020-01-10 00:00:00"):
2820+
sqlmesh = ctx.create_context(
2821+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
2822+
)
2823+
2824+
before_snapshots = sqlmesh.engine_adapter.fetchall(
2825+
f"select name, identifier from {schema}._snapshots"
2826+
)
2827+
sqlmesh.invalidate_environment("dev")
2828+
sqlmesh.run_janitor(ignore_ttl=False)
2829+
after_snapshots = sqlmesh.engine_adapter.fetchall(
2830+
f"select name, identifier from {schema}._snapshots"
2831+
)
2832+
2833+
assert len(before_snapshots) != len(after_snapshots)
2834+
2835+
# all that's left should be the two snapshots that were in prod
2836+
assert set(
2837+
[SnapshotId(name=name, identifier=identifier) for name, identifier in after_snapshots]
2838+
) == set([model_a_prod_snapshot.snapshot_id, model_b_prod_snapshot.snapshot_id])

tests/core/state_sync/test_state_sync.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,6 +1124,86 @@ def test_delete_expired_environments(state_sync: EngineAdapterStateSync, make_sn
11241124
assert state_sync.get_environment_statements(env_a.name) == []
11251125

11261126

1127+
def test_get_expired_snapshots_includes_downstream_view_snapshots(
1128+
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable[..., Snapshot]
1129+
):
1130+
now_ts = now_timestamp()
1131+
1132+
# model_a: table snapshot
1133+
snapshot_a = make_snapshot(
1134+
SqlModel(
1135+
name="a",
1136+
kind="FULL",
1137+
query=parse_one("select a, ds"),
1138+
),
1139+
)
1140+
snapshot_a.ttl = "in 10 seconds"
1141+
snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING)
1142+
snapshot_a.updated_ts = now_ts - 15000 # now - 15 seconds = expired
1143+
1144+
# model_b: view snapshot that depends on model_a table snapshot
1145+
# in an actual scenario, this could have been created a few days later so expires a few days after the snapshot it depends on
1146+
# unlike a table, a view ceases to be valid if the upstream table it points to is dropped
1147+
snapshot_b = make_snapshot(
1148+
SqlModel(
1149+
name="b",
1150+
kind="VIEW",
1151+
query=parse_one("select *, 'foo' as model_b from a"),
1152+
),
1153+
nodes={"a": snapshot_a.model},
1154+
)
1155+
snapshot_b.ttl = "in 10 seconds"
1156+
snapshot_b.categorize_as(SnapshotChangeCategory.BREAKING)
1157+
snapshot_b.updated_ts = now_ts # now = not expired
1158+
1159+
# model_c: table snapshot that depends on model_a table snapshot but since its a table it will still work if model_a is dropped
1160+
# so should not be considered when cleaning up expired snapshots (as it has not expired)
1161+
snapshot_c = make_snapshot(
1162+
SqlModel(
1163+
name="c",
1164+
kind="FULL",
1165+
query=parse_one("select *, 'foo' as model_c from a"),
1166+
),
1167+
nodes={"a": snapshot_a.model},
1168+
)
1169+
snapshot_c.ttl = "in 10 seconds"
1170+
snapshot_c.categorize_as(SnapshotChangeCategory.BREAKING)
1171+
snapshot_c.updated_ts = now_ts # now = not expired
1172+
1173+
# model_d: view snapshot with no dependency on model a, so should not be dropped if model a is dropped
1174+
snapshot_d = make_snapshot(
1175+
SqlModel(
1176+
name="d",
1177+
kind="VIEW",
1178+
query=parse_one("select 'model_d' as d"),
1179+
),
1180+
)
1181+
snapshot_d.ttl = "in 10 seconds"
1182+
snapshot_d.categorize_as(SnapshotChangeCategory.BREAKING)
1183+
snapshot_d.updated_ts = now_ts # now = not expired
1184+
1185+
state_sync.push_snapshots([snapshot_a, snapshot_b, snapshot_c, snapshot_d])
1186+
cleanup_targets = state_sync.get_expired_snapshots(now_ts)
1187+
1188+
# snapshot_a should be cleaned up because it's expired
1189+
# snapshot_b is unexpired but should be cleaned up because snapshot_a is being cleaned up
1190+
# - since it's a view, it will stop working if the snapshot_a table it's pointing to is dropped, so thats why it needs to be dropped too
1191+
# - additionally, if it isnt dropped, some engines will throw "cannot drop because other objects depend on it" when trying to drop the snapshot_a table
1192+
# snapshot_c is unexpired and should not be cleaned up even though its upstream snapshot_a has expired, since it's a table it will still continue to work
1193+
# snapshot_d is unexpired and should also not be cleaned up. While it's a view, it doesnt depend on anything that *has* expired, so is not a target for cleanup
1194+
assert len(cleanup_targets) == 2
1195+
1196+
snapshot_a_cleanup = next(
1197+
(t for t in cleanup_targets if t.snapshot.snapshot_id == snapshot_a.snapshot_id), None
1198+
)
1199+
assert snapshot_a_cleanup
1200+
1201+
snapshot_b_cleanup = next(
1202+
(t for t in cleanup_targets if t.snapshot.snapshot_id == snapshot_b.snapshot_id), None
1203+
)
1204+
assert snapshot_b_cleanup
1205+
1206+
11271207
def test_delete_expired_snapshots(state_sync: EngineAdapterStateSync, make_snapshot: t.Callable):
11281208
now_ts = now_timestamp()
11291209

0 commit comments

Comments
 (0)