Skip to content

Commit eeba4dd

Browse files
committed
Fix: Include unexpired downstream views when cleaning up expired tables
1 parent eb4c0b4 commit eeba4dd

File tree

3 files changed

+320
-18
lines changed

3 files changed

+320
-18
lines changed

sqlmesh/core/state_sync/db/snapshot.py

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -224,18 +224,54 @@ def _get_expired_snapshots(
224224

225225
if not ignore_ttl:
226226
expired_query = expired_query.where(
227-
(exp.column("updated_ts") + exp.column("ttl_ms")) <= current_ts
227+
((exp.column("updated_ts") + exp.column("ttl_ms")) <= current_ts)
228+
# we need to include views even if they havent expired in case one depends on a table that /has/ expired
229+
.or_(exp.column("kind_name").eq(ModelKindName.VIEW))
228230
)
229231

230-
expired_candidates = {
232+
candidates = {
231233
SnapshotId(name=name, identifier=identifier): SnapshotNameVersion(
232234
name=name, version=version
233235
)
234236
for name, identifier, version in fetchall(self.engine_adapter, expired_query)
235237
}
236-
if not expired_candidates:
238+
239+
if not candidates:
237240
return set(), []
238241

242+
expired_candidates: t.Dict[SnapshotId, SnapshotNameVersion] = {}
243+
244+
if ignore_ttl:
245+
expired_candidates = candidates
246+
else:
247+
# Fetch full snapshots because we need the dependency relationship
248+
full_candidates = self.get_snapshots(candidates.keys())
249+
250+
# remove any non-expired views that dont depend on expired tables
251+
for snapshot_id in full_candidates:
252+
snapshot = full_candidates.get(snapshot_id, None)
253+
if not snapshot:
254+
continue
255+
256+
if snapshot.expiration_ts <= current_ts:
257+
# All expired snapshots should be included
258+
expired_candidates[snapshot.snapshot_id] = snapshot.name_version
259+
elif snapshot.model_kind_name == ModelKindName.VIEW:
260+
# With non-expired views, check if they have any expired upstream tables
261+
immediate_parents = [
262+
full_candidates[parent_id]
263+
for parent_id in snapshot.parents
264+
if parent_id in full_candidates
265+
]
266+
267+
if any(
268+
parent.expiration_ts <= current_ts
269+
for parent in immediate_parents
270+
if parent.model_kind_name != ModelKindName.VIEW
271+
):
272+
# an immediate upstream table has expired, therefore this view is no longer valid and needs to be cleaned up as well
273+
expired_candidates[snapshot.snapshot_id] = snapshot.name_version
274+
239275
promoted_snapshot_ids = {
240276
snapshot.snapshot_id
241277
for environment in environments
@@ -253,37 +289,39 @@ def _is_snapshot_used(snapshot: SharedVersionSnapshot) -> bool:
253289
unique_expired_versions, batch_size=self.SNAPSHOT_BATCH_SIZE
254290
)
255291
cleanup_targets = []
256-
expired_snapshot_ids = set()
292+
expired_sv_snapshot_ids = set()
257293
for versions_batch in version_batches:
258-
snapshots = self._get_snapshots_with_same_version(versions_batch)
294+
sv_snapshots = self._get_snapshots_with_same_version(versions_batch)
259295

260296
snapshots_by_version = defaultdict(set)
261297
snapshots_by_dev_version = defaultdict(set)
262-
for s in snapshots:
298+
for s in sv_snapshots:
263299
snapshots_by_version[(s.name, s.version)].add(s.snapshot_id)
264300
snapshots_by_dev_version[(s.name, s.dev_version)].add(s.snapshot_id)
265301

266-
expired_snapshots = [s for s in snapshots if not _is_snapshot_used(s)]
267-
expired_snapshot_ids.update([s.snapshot_id for s in expired_snapshots])
302+
expired_sv_snapshots = [s for s in sv_snapshots if not _is_snapshot_used(s)]
303+
expired_sv_snapshot_ids.update([s.snapshot_id for s in expired_sv_snapshots])
268304

269-
for snapshot in expired_snapshots:
270-
shared_version_snapshots = snapshots_by_version[(snapshot.name, snapshot.version)]
271-
shared_version_snapshots.discard(snapshot.snapshot_id)
305+
for sv_snapshot in expired_sv_snapshots:
306+
shared_version_snapshots = snapshots_by_version[
307+
(sv_snapshot.name, sv_snapshot.version)
308+
]
309+
shared_version_snapshots.discard(sv_snapshot.snapshot_id)
272310

273311
shared_dev_version_snapshots = snapshots_by_dev_version[
274-
(snapshot.name, snapshot.dev_version)
312+
(sv_snapshot.name, sv_snapshot.dev_version)
275313
]
276-
shared_dev_version_snapshots.discard(snapshot.snapshot_id)
314+
shared_dev_version_snapshots.discard(sv_snapshot.snapshot_id)
277315

278316
if not shared_dev_version_snapshots:
279317
cleanup_targets.append(
280318
SnapshotTableCleanupTask(
281-
snapshot=snapshot.full_snapshot.table_info,
319+
snapshot=sv_snapshot.full_snapshot.table_info,
282320
dev_table_only=bool(shared_version_snapshots),
283321
)
284322
)
285323

286-
return expired_snapshot_ids, cleanup_targets
324+
return expired_sv_snapshot_ids, cleanup_targets
287325

288326
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
289327
"""Deletes snapshots.

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 183 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,26 @@
1313
import pandas as pd # noqa: TID253
1414
import pytest
1515
import pytz
16+
import time_machine
1617
from sqlglot import exp, parse_one
1718
from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
1819

1920
from sqlmesh import Config, Context
2021
from sqlmesh.cli.project_init import init_example_project
2122
from sqlmesh.core.config import load_config_from_paths
22-
from sqlmesh.core.config.connection import ConnectionConfig
23+
from sqlmesh.core.config.connection import ConnectionConfig, DuckDBConnectionConfig
2324
import sqlmesh.core.dialect as d
2425
from sqlmesh.core.dialect import select_from_values
2526
from sqlmesh.core.model import Model, load_sql_based_model
27+
from sqlmesh.core.engine_adapter import EngineAdapter
2628
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType
2729
from sqlmesh.core.engine_adapter.mixins import RowDiffMixin
2830
from sqlmesh.core.model.definition import create_sql_model
2931
from sqlmesh.core.plan import Plan
32+
from sqlmesh.core.state_sync.cache import CachingStateSync
3033
from sqlmesh.core.state_sync.db import EngineAdapterStateSync
31-
from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory
32-
from sqlmesh.utils.date import now, to_date, to_time_column
34+
from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory, SnapshotId
35+
from sqlmesh.utils.date import now, to_date, to_time_column, to_ds
3336
from sqlmesh.core.table_diff import TableDiff
3437
from sqlmesh.utils.errors import SQLMeshError
3538
from sqlmesh.utils.pydantic import PydanticModel
@@ -2672,3 +2675,180 @@ def test_identifier_length_limit(ctx: TestContext):
26722675
match=re.escape(match),
26732676
):
26742677
adapter.create_table(long_table_name, {"col": exp.DataType.build("int")})
2678+
2679+
2680+
def test_janitor_drops_downstream_unexpired_hard_dependencies(
2681+
ctx: TestContext, tmp_path: pathlib.Path
2682+
):
2683+
"""
2684+
Scenario:
2685+
2686+
Ensure that cleaning up expired table snapshots also cleans up any unexpired view snapshots that depend on them
2687+
2688+
- We create a A (table) <- B (view)
2689+
- In dev, we modify A - triggers new version of A and a dev preview of B that both expire in 7 days
2690+
- We advance time by 3 days
2691+
- In dev, we modify B - triggers a new version of B that depends on A but expires 3 days after A
2692+
- We advance time by 5 days so that A has reached its expiry but B has not
2693+
- We expire dev so that none of these snapshots are promoted and are thus targets for cleanup
2694+
- We run the janitor
2695+
2696+
Expected outcome:
2697+
- All the dev versions of A and B should be dropped
2698+
- We should not get a 'ERROR: cannot drop table x because other objects depend on it' on engines that do schema binding
2699+
"""
2700+
2701+
def _state_sync_engine_adapter(context: Context) -> EngineAdapter:
2702+
assert isinstance(context.state_sync, CachingStateSync)
2703+
assert isinstance(context.state_sync.state_sync, EngineAdapterStateSync)
2704+
return context.state_sync.state_sync.engine_adapter
2705+
2706+
models_dir = tmp_path / "models"
2707+
models_dir.mkdir()
2708+
schema = exp.to_table(ctx.schema(TEST_SCHEMA)).this
2709+
2710+
(models_dir / "model_a.sql").write_text(f"""
2711+
MODEL (
2712+
name {schema}.model_a,
2713+
kind FULL
2714+
);
2715+
2716+
SELECT 1 as a, 2 as b;
2717+
""")
2718+
2719+
(models_dir / "model_b.sql").write_text(f"""
2720+
MODEL (
2721+
name {schema}.model_b,
2722+
kind VIEW
2723+
);
2724+
2725+
SELECT a from {schema}.model_a;
2726+
""")
2727+
2728+
def _mutate_config(gateway: str, config: Config):
2729+
config.gateways[gateway].state_connection = DuckDBConnectionConfig(
2730+
database=str(tmp_path / "state.db")
2731+
)
2732+
2733+
with time_machine.travel("2020-01-01 00:00:00"):
2734+
sqlmesh = ctx.create_context(
2735+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
2736+
)
2737+
sqlmesh.plan(auto_apply=True)
2738+
2739+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
2740+
# expiry is last updated + ttl
2741+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
2742+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-01"
2743+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-08"
2744+
2745+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
2746+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
2747+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-01"
2748+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-08"
2749+
2750+
model_a_prod_snapshot = model_a_snapshot
2751+
model_b_prod_snapshot = model_b_snapshot
2752+
2753+
# move forward 1 days
2754+
# new dev environment - touch models to create new snapshots
2755+
# model a / b expiry in prod should remain unmodified
2756+
# model a / b expiry in dev should be as at today
2757+
with time_machine.travel("2020-01-02 00:00:00"):
2758+
(models_dir / "model_a.sql").write_text(f"""
2759+
MODEL (
2760+
name {schema}.model_a,
2761+
kind FULL
2762+
);
2763+
2764+
SELECT 1 as a, 2 as b, 3 as c;
2765+
""")
2766+
2767+
sqlmesh = ctx.create_context(
2768+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
2769+
)
2770+
sqlmesh.plan(environment="dev", auto_apply=True)
2771+
2772+
# should now have 4 snapshots in state - 2x model a and 2x model b
2773+
# the new model b is a dev preview because its upstream model changed
2774+
assert (
2775+
len(_state_sync_engine_adapter(sqlmesh).fetchall(f"select * from sqlmesh._snapshots"))
2776+
== 4
2777+
)
2778+
2779+
# context just has the two latest
2780+
assert len(sqlmesh.snapshots) == 2
2781+
2782+
# these expire 1 day later than what's in prod
2783+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
2784+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
2785+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02"
2786+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09"
2787+
2788+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
2789+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
2790+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-02"
2791+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-09"
2792+
2793+
# move forward 3 days
2794+
# touch model b in dev but leave model a
2795+
# this bumps the model b expiry but model a remains unchanged, so will expire before model b even though model b depends on it
2796+
with time_machine.travel("2020-01-05 00:00:00"):
2797+
(models_dir / "model_b.sql").write_text(f"""
2798+
MODEL (
2799+
name {schema}.model_b,
2800+
kind VIEW
2801+
);
2802+
2803+
SELECT a, 'b' as b from {schema}.model_a;
2804+
""")
2805+
2806+
sqlmesh = ctx.create_context(
2807+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
2808+
)
2809+
sqlmesh.plan(environment="dev", auto_apply=True)
2810+
2811+
# should now have 5 snapshots in state - 2x model a and 3x model b
2812+
assert (
2813+
len(_state_sync_engine_adapter(sqlmesh).fetchall(f"select * from sqlmesh._snapshots"))
2814+
== 5
2815+
)
2816+
2817+
# context just has the two latest
2818+
assert len(sqlmesh.snapshots) == 2
2819+
2820+
# model a expiry should not have changed
2821+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
2822+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
2823+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02"
2824+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09"
2825+
2826+
# model b should now expire well after model a
2827+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
2828+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
2829+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-05"
2830+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-12"
2831+
2832+
# move forward to date where after model a has expired but before model b has expired
2833+
# invalidate dev to trigger cleanups
2834+
# run janitor. model a is expired so will be cleaned up and this will cascade to model b.
2835+
with time_machine.travel("2020-01-10 00:00:00"):
2836+
sqlmesh = ctx.create_context(
2837+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
2838+
)
2839+
2840+
before_snapshots = _state_sync_engine_adapter(sqlmesh).fetchall(
2841+
f"select name, identifier from sqlmesh._snapshots"
2842+
)
2843+
sqlmesh.invalidate_environment("dev")
2844+
sqlmesh.run_janitor(ignore_ttl=False)
2845+
after_snapshots = _state_sync_engine_adapter(sqlmesh).fetchall(
2846+
f"select name, identifier from sqlmesh._snapshots"
2847+
)
2848+
2849+
assert len(before_snapshots) != len(after_snapshots)
2850+
2851+
# all that's left should be the two snapshots that were in prod
2852+
assert set(
2853+
[SnapshotId(name=name, identifier=identifier) for name, identifier in after_snapshots]
2854+
) == set([model_a_prod_snapshot.snapshot_id, model_b_prod_snapshot.snapshot_id])

0 commit comments

Comments
 (0)