Skip to content

Commit 854f3ec

Browse files
committed
Fix: Include unexpired downstream views when cleaning up expired tables
1 parent adf6a68 commit 854f3ec

File tree

3 files changed

+320
-18
lines changed

3 files changed

+320
-18
lines changed

sqlmesh/core/state_sync/db/snapshot.py

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -224,18 +224,54 @@ def _get_expired_snapshots(
224224

225225
if not ignore_ttl:
226226
expired_query = expired_query.where(
227-
(exp.column("updated_ts") + exp.column("ttl_ms")) <= current_ts
227+
((exp.column("updated_ts") + exp.column("ttl_ms")) <= current_ts)
228+
# we need to include views even if they havent expired in case one depends on a table that /has/ expired
229+
.or_(exp.column("kind_name").eq(ModelKindName.VIEW))
228230
)
229231

230-
expired_candidates = {
232+
candidates = {
231233
SnapshotId(name=name, identifier=identifier): SnapshotNameVersion(
232234
name=name, version=version
233235
)
234236
for name, identifier, version in fetchall(self.engine_adapter, expired_query)
235237
}
236-
if not expired_candidates:
238+
239+
if not candidates:
237240
return set(), []
238241

242+
expired_candidates: t.Dict[SnapshotId, SnapshotNameVersion] = {}
243+
244+
if ignore_ttl:
245+
expired_candidates = candidates
246+
else:
247+
# Fetch full snapshots because we need the dependency relationship
248+
full_candidates = self.get_snapshots(candidates.keys())
249+
250+
# Include any non-expired views that depend on expired tables
251+
for snapshot_id in full_candidates:
252+
snapshot = full_candidates.get(snapshot_id, None)
253+
if not snapshot:
254+
continue
255+
256+
if snapshot.expiration_ts <= current_ts:
257+
# All expired snapshots should be included
258+
expired_candidates[snapshot.snapshot_id] = snapshot.name_version
259+
elif snapshot.model_kind_name == ModelKindName.VIEW:
260+
# With non-expired views, check if they point to an expired parent table
261+
immediate_parents = [
262+
full_candidates[parent_id]
263+
for parent_id in snapshot.parents
264+
if parent_id in full_candidates
265+
]
266+
267+
if any(
268+
parent.expiration_ts <= current_ts
269+
for parent in immediate_parents
270+
if parent.model_kind_name != ModelKindName.VIEW
271+
):
272+
# an immediate upstream table has expired, therefore this view is no longer valid and needs to be cleaned up as well
273+
expired_candidates[snapshot.snapshot_id] = snapshot.name_version
274+
239275
promoted_snapshot_ids = {
240276
snapshot.snapshot_id
241277
for environment in environments
@@ -253,37 +289,39 @@ def _is_snapshot_used(snapshot: SharedVersionSnapshot) -> bool:
253289
unique_expired_versions, batch_size=self.SNAPSHOT_BATCH_SIZE
254290
)
255291
cleanup_targets = []
256-
expired_snapshot_ids = set()
292+
expired_sv_snapshot_ids = set()
257293
for versions_batch in version_batches:
258-
snapshots = self._get_snapshots_with_same_version(versions_batch)
294+
sv_snapshots = self._get_snapshots_with_same_version(versions_batch)
259295

260296
snapshots_by_version = defaultdict(set)
261297
snapshots_by_dev_version = defaultdict(set)
262-
for s in snapshots:
298+
for s in sv_snapshots:
263299
snapshots_by_version[(s.name, s.version)].add(s.snapshot_id)
264300
snapshots_by_dev_version[(s.name, s.dev_version)].add(s.snapshot_id)
265301

266-
expired_snapshots = [s for s in snapshots if not _is_snapshot_used(s)]
267-
expired_snapshot_ids.update([s.snapshot_id for s in expired_snapshots])
302+
expired_sv_snapshots = [s for s in sv_snapshots if not _is_snapshot_used(s)]
303+
expired_sv_snapshot_ids.update([s.snapshot_id for s in expired_sv_snapshots])
268304

269-
for snapshot in expired_snapshots:
270-
shared_version_snapshots = snapshots_by_version[(snapshot.name, snapshot.version)]
271-
shared_version_snapshots.discard(snapshot.snapshot_id)
305+
for sv_snapshot in expired_sv_snapshots:
306+
shared_version_snapshots = snapshots_by_version[
307+
(sv_snapshot.name, sv_snapshot.version)
308+
]
309+
shared_version_snapshots.discard(sv_snapshot.snapshot_id)
272310

273311
shared_dev_version_snapshots = snapshots_by_dev_version[
274-
(snapshot.name, snapshot.dev_version)
312+
(sv_snapshot.name, sv_snapshot.dev_version)
275313
]
276-
shared_dev_version_snapshots.discard(snapshot.snapshot_id)
314+
shared_dev_version_snapshots.discard(sv_snapshot.snapshot_id)
277315

278316
if not shared_dev_version_snapshots:
279317
cleanup_targets.append(
280318
SnapshotTableCleanupTask(
281-
snapshot=snapshot.full_snapshot.table_info,
319+
snapshot=sv_snapshot.full_snapshot.table_info,
282320
dev_table_only=bool(shared_version_snapshots),
283321
)
284322
)
285323

286-
return expired_snapshot_ids, cleanup_targets
324+
return expired_sv_snapshot_ids, cleanup_targets
287325

288326
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
289327
"""Deletes snapshots.

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 183 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,26 @@
1313
import pandas as pd # noqa: TID253
1414
import pytest
1515
import pytz
16+
import time_machine
1617
from sqlglot import exp, parse_one
1718
from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
1819

1920
from sqlmesh import Config, Context
2021
from sqlmesh.cli.project_init import init_example_project
2122
from sqlmesh.core.config import load_config_from_paths
22-
from sqlmesh.core.config.connection import ConnectionConfig
23+
from sqlmesh.core.config.connection import ConnectionConfig, DuckDBConnectionConfig
2324
import sqlmesh.core.dialect as d
2425
from sqlmesh.core.dialect import select_from_values
2526
from sqlmesh.core.model import Model, load_sql_based_model
27+
from sqlmesh.core.engine_adapter import EngineAdapter
2628
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType
2729
from sqlmesh.core.engine_adapter.mixins import RowDiffMixin, LogicalMergeMixin
2830
from sqlmesh.core.model.definition import create_sql_model
2931
from sqlmesh.core.plan import Plan
32+
from sqlmesh.core.state_sync.cache import CachingStateSync
3033
from sqlmesh.core.state_sync.db import EngineAdapterStateSync
31-
from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory
32-
from sqlmesh.utils.date import now, to_date, to_time_column
34+
from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory, SnapshotId
35+
from sqlmesh.utils.date import now, to_date, to_time_column, to_ds
3336
from sqlmesh.core.table_diff import TableDiff
3437
from sqlmesh.utils.errors import SQLMeshError
3538
from sqlmesh.utils.pydantic import PydanticModel
@@ -2799,3 +2802,180 @@ def test_identifier_length_limit(ctx: TestContext):
27992802
match=re.escape(match),
28002803
):
28012804
adapter.create_table(long_table_name, {"col": exp.DataType.build("int")})
2805+
2806+
2807+
def test_janitor_drops_downstream_unexpired_hard_dependencies(
2808+
ctx: TestContext, tmp_path: pathlib.Path
2809+
):
2810+
"""
2811+
Scenario:
2812+
2813+
Ensure that cleaning up expired table snapshots also cleans up any unexpired view snapshots that depend on them
2814+
2815+
- We create a A (table) <- B (view)
2816+
- In dev, we modify A - triggers new version of A and a dev preview of B that both expire in 7 days
2817+
- We advance time by 3 days
2818+
- In dev, we modify B - triggers a new version of B that depends on A but expires 3 days after A
2819+
- We advance time by 5 days so that A has reached its expiry but B has not
2820+
- We expire dev so that none of these snapshots are promoted and are thus targets for cleanup
2821+
- We run the janitor
2822+
2823+
Expected outcome:
2824+
- All the dev versions of A and B should be dropped
2825+
- We should not get a 'ERROR: cannot drop table x because other objects depend on it' on engines that do schema binding
2826+
"""
2827+
2828+
def _state_sync_engine_adapter(context: Context) -> EngineAdapter:
2829+
assert isinstance(context.state_sync, CachingStateSync)
2830+
assert isinstance(context.state_sync.state_sync, EngineAdapterStateSync)
2831+
return context.state_sync.state_sync.engine_adapter
2832+
2833+
models_dir = tmp_path / "models"
2834+
models_dir.mkdir()
2835+
schema = exp.to_table(ctx.schema(TEST_SCHEMA)).this
2836+
2837+
(models_dir / "model_a.sql").write_text(f"""
2838+
MODEL (
2839+
name {schema}.model_a,
2840+
kind FULL
2841+
);
2842+
2843+
SELECT 1 as a, 2 as b;
2844+
""")
2845+
2846+
(models_dir / "model_b.sql").write_text(f"""
2847+
MODEL (
2848+
name {schema}.model_b,
2849+
kind VIEW
2850+
);
2851+
2852+
SELECT a from {schema}.model_a;
2853+
""")
2854+
2855+
def _mutate_config(gateway: str, config: Config):
2856+
config.gateways[gateway].state_connection = DuckDBConnectionConfig(
2857+
database=str(tmp_path / "state.db")
2858+
)
2859+
2860+
with time_machine.travel("2020-01-01 00:00:00"):
2861+
sqlmesh = ctx.create_context(
2862+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
2863+
)
2864+
sqlmesh.plan(auto_apply=True)
2865+
2866+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
2867+
# expiry is last updated + ttl
2868+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
2869+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-01"
2870+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-08"
2871+
2872+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
2873+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
2874+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-01"
2875+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-08"
2876+
2877+
model_a_prod_snapshot = model_a_snapshot
2878+
model_b_prod_snapshot = model_b_snapshot
2879+
2880+
# move forward 1 days
2881+
# new dev environment - touch models to create new snapshots
2882+
# model a / b expiry in prod should remain unmodified
2883+
# model a / b expiry in dev should be as at today
2884+
with time_machine.travel("2020-01-02 00:00:00"):
2885+
(models_dir / "model_a.sql").write_text(f"""
2886+
MODEL (
2887+
name {schema}.model_a,
2888+
kind FULL
2889+
);
2890+
2891+
SELECT 1 as a, 2 as b, 3 as c;
2892+
""")
2893+
2894+
sqlmesh = ctx.create_context(
2895+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
2896+
)
2897+
sqlmesh.plan(environment="dev", auto_apply=True)
2898+
2899+
# should now have 4 snapshots in state - 2x model a and 2x model b
2900+
# the new model b is a dev preview because its upstream model changed
2901+
assert (
2902+
len(_state_sync_engine_adapter(sqlmesh).fetchall(f"select * from sqlmesh._snapshots"))
2903+
== 4
2904+
)
2905+
2906+
# context just has the two latest
2907+
assert len(sqlmesh.snapshots) == 2
2908+
2909+
# these expire 1 day later than what's in prod
2910+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
2911+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
2912+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02"
2913+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09"
2914+
2915+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
2916+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
2917+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-02"
2918+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-09"
2919+
2920+
# move forward 3 days
2921+
# touch model b in dev but leave model a
2922+
# this bumps the model b expiry but model a remains unchanged, so will expire before model b even though model b depends on it
2923+
with time_machine.travel("2020-01-05 00:00:00"):
2924+
(models_dir / "model_b.sql").write_text(f"""
2925+
MODEL (
2926+
name {schema}.model_b,
2927+
kind VIEW
2928+
);
2929+
2930+
SELECT a, 'b' as b from {schema}.model_a;
2931+
""")
2932+
2933+
sqlmesh = ctx.create_context(
2934+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
2935+
)
2936+
sqlmesh.plan(environment="dev", auto_apply=True)
2937+
2938+
# should now have 5 snapshots in state - 2x model a and 3x model b
2939+
assert (
2940+
len(_state_sync_engine_adapter(sqlmesh).fetchall(f"select * from sqlmesh._snapshots"))
2941+
== 5
2942+
)
2943+
2944+
# context just has the two latest
2945+
assert len(sqlmesh.snapshots) == 2
2946+
2947+
# model a expiry should not have changed
2948+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
2949+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
2950+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02"
2951+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09"
2952+
2953+
# model b should now expire well after model a
2954+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
2955+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
2956+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-05"
2957+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-12"
2958+
2959+
# move forward to date where after model a has expired but before model b has expired
2960+
# invalidate dev to trigger cleanups
2961+
# run janitor. model a is expired so will be cleaned up and this will cascade to model b.
2962+
with time_machine.travel("2020-01-10 00:00:00"):
2963+
sqlmesh = ctx.create_context(
2964+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
2965+
)
2966+
2967+
before_snapshots = _state_sync_engine_adapter(sqlmesh).fetchall(
2968+
f"select name, identifier from sqlmesh._snapshots"
2969+
)
2970+
sqlmesh.invalidate_environment("dev")
2971+
sqlmesh.run_janitor(ignore_ttl=False)
2972+
after_snapshots = _state_sync_engine_adapter(sqlmesh).fetchall(
2973+
f"select name, identifier from sqlmesh._snapshots"
2974+
)
2975+
2976+
assert len(before_snapshots) != len(after_snapshots)
2977+
2978+
# all that's left should be the two snapshots that were in prod
2979+
assert set(
2980+
[SnapshotId(name=name, identifier=identifier) for name, identifier in after_snapshots]
2981+
) == set([model_a_prod_snapshot.snapshot_id, model_b_prod_snapshot.snapshot_id])

0 commit comments

Comments
 (0)