Skip to content

Commit af4faa6

Browse files
committed
Fix: Use drop cascade in janitor
1 parent 803f7d8 commit af4faa6

File tree

5 files changed

+274
-15
lines changed

5 files changed

+274
-15
lines changed

sqlmesh/core/engine_adapter/athena.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,13 +314,13 @@ def _build_table_properties_exp(
314314

315315
return None
316316

317-
def drop_table(self, table_name: TableName, exists: bool = True) -> None:
317+
def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None:
318318
table = exp.to_table(table_name)
319319

320320
if self._query_table_type(table) == "hive":
321321
self._truncate_table(table)
322322

323-
return super().drop_table(table_name=table, exists=exists)
323+
return super().drop_table(table_name=table, exists=exists, **kwargs)
324324

325325
def _truncate_table(self, table_name: TableName) -> None:
326326
table = exp.to_table(table_name)

sqlmesh/core/engine_adapter/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -958,15 +958,15 @@ def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool =
958958
raise SQLMeshError(
959959
f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'"
960960
)
961-
962-
def drop_table(self, table_name: TableName, exists: bool = True) -> None:
961+
962+
def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None:
963963
"""Drops a table.
964964
965965
Args:
966966
table_name: The name of the table to drop.
967967
exists: If exists, defaults to True.
968968
"""
969-
self._drop_object(name=table_name, exists=exists)
969+
self._drop_object(name=table_name, exists=exists, **kwargs)
970970

971971
def drop_managed_table(self, table_name: TableName, exists: bool = True) -> None:
972972
"""Drops a managed table.

sqlmesh/core/snapshot/evaluator.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,6 +1082,10 @@ def _cleanup_snapshot(
10821082
table_name,
10831083
is_table_deployable=is_table_deployable,
10841084
physical_schema=snapshot.physical_schema,
1085+
# we need to set cascade=true or we will get a 'cant drop because other objects depend on it'-style
1086+
# error on engines that enforce referential integrity, such as Postgres
1087+
# this situation can happen when a snapshot expires but downstream view snapshots that reference it have not yet expired
1088+
cascade=True,
10851089
)
10861090

10871091
if on_complete is not None:
@@ -1597,7 +1601,7 @@ def migrate(
15971601

15981602
def delete(self, name: str, **kwargs: t.Any) -> None:
15991603
_check_table_db_is_physical_schema(name, kwargs["physical_schema"])
1600-
self.adapter.drop_table(name)
1604+
self.adapter.drop_table(name, cascade=kwargs.pop("cascade", False))
16011605
logger.info("Dropped table '%s'", name)
16021606

16031607

@@ -2028,15 +2032,16 @@ def migrate(
20282032
)
20292033

20302034
def delete(self, name: str, **kwargs: t.Any) -> None:
2035+
cascade = kwargs.pop("cascade", False)
20312036
try:
2032-
self.adapter.drop_view(name)
2037+
self.adapter.drop_view(name, cascade=cascade)
20332038
except Exception:
20342039
logger.debug(
20352040
"Failed to drop view '%s'. Trying to drop the materialized view instead",
20362041
name,
20372042
exc_info=True,
20382043
)
2039-
self.adapter.drop_view(name, materialized=True)
2044+
self.adapter.drop_view(name, materialized=True, cascade=cascade)
20402045
logger.info("Dropped view '%s'", name)
20412046

20422047
def _is_materialized_view(self, model: Model) -> bool:

tests/core/engine_adapter/integration/test_integration_postgres.py

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,24 @@
11
import typing as t
22
import pytest
33
from pytest import FixtureRequest
4+
from pathlib import Path
45
from sqlmesh.core.engine_adapter import PostgresEngineAdapter
6+
from sqlmesh.core.config import Config, DuckDBConnectionConfig
57
from tests.core.engine_adapter.integration import TestContext
8+
import time_machine
9+
from datetime import timedelta
10+
from sqlmesh.utils.date import to_ds
11+
from sqlglot import exp
12+
from sqlmesh.core.context import Context
13+
from sqlmesh.core.state_sync import CachingStateSync, EngineAdapterStateSync
14+
from sqlmesh.core.snapshot.definition import SnapshotId
615

716
from tests.core.engine_adapter.integration import (
817
TestContext,
918
generate_pytest_params,
1019
ENGINES_BY_NAME,
1120
IntegrationTestEngine,
21+
TEST_SCHEMA,
1222
)
1323

1424

@@ -29,3 +39,242 @@ def engine_adapter(ctx: TestContext) -> PostgresEngineAdapter:
2939
def test_engine_adapter(ctx: TestContext):
3040
assert isinstance(ctx.engine_adapter, PostgresEngineAdapter)
3141
assert ctx.engine_adapter.fetchone("select 1") == (1,)
42+
43+
44+
def test_janitor_drop_cascade(ctx: TestContext, tmp_path: Path) -> None:
45+
"""
46+
Scenario:
47+
Ensure that cleaning up expired table snapshots also cleans up any unexpired view snapshots that depend on them
48+
- We create a A (table) <- B (view)
49+
- In dev, we modify A - triggers new version of A and a dev preview of B that both expire in 7 days
50+
- We advance time by 3 days
51+
- In dev, we modify B - triggers a new version of B that depends on A but expires 3 days after A
52+
- In dev, we create B(view) <- C(view) and B(view) <- D(table)
53+
- We advance time by 5 days so that A has reached its expiry but B, C and D have not
54+
- We expire dev so that none of these snapshots are promoted and are thus targets for cleanup
55+
- We run the janitor
56+
57+
Expected outcome:
58+
- All the dev versions of A and B should be dropped
59+
- C should be dropped as well because it's a view that depends on B which was dropped
60+
- D should not be dropped because while it depends on B which was dropped, it's a table so is still valid after B is dropped
61+
- We should NOT get a 'ERROR: cannot drop table x because other objects depend on it'
62+
63+
Note that the references in state to the views that were cascade-dropped by postgres will still exist, this is considered ok
64+
as applying a plan will recreate the physical objects
65+
"""
66+
67+
def _all_snapshot_ids(context: Context) -> t.List[SnapshotId]:
68+
assert isinstance(context.state_sync, CachingStateSync)
69+
assert isinstance(context.state_sync.state_sync, EngineAdapterStateSync)
70+
71+
return [
72+
SnapshotId(name=name, identifier=identifier)
73+
for name, identifier in context.state_sync.state_sync.engine_adapter.fetchall(
74+
"select name, identifier from sqlmesh._snapshots"
75+
)
76+
]
77+
78+
models_dir = tmp_path / "models"
79+
models_dir.mkdir()
80+
schema = exp.to_table(ctx.schema(TEST_SCHEMA)).this
81+
82+
(models_dir / "model_a.sql").write_text(f"""
83+
MODEL (
84+
name {schema}.model_a,
85+
kind FULL
86+
);
87+
SELECT 1 as a, 2 as b;
88+
""")
89+
90+
(models_dir / "model_b.sql").write_text(f"""
91+
MODEL (
92+
name {schema}.model_b,
93+
kind VIEW
94+
);
95+
SELECT a from {schema}.model_a;
96+
""")
97+
98+
def _mutate_config(gateway: str, config: Config):
99+
config.gateways[gateway].state_connection = DuckDBConnectionConfig(
100+
database=str(tmp_path / "state.db")
101+
)
102+
103+
with time_machine.travel("2020-01-01 00:00:00"):
104+
sqlmesh = ctx.create_context(
105+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
106+
)
107+
sqlmesh.plan(auto_apply=True)
108+
109+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
110+
# expiry is last updated + ttl
111+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
112+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-01"
113+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-08"
114+
115+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
116+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
117+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-01"
118+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-08"
119+
120+
model_a_prod_snapshot = model_a_snapshot
121+
model_b_prod_snapshot = model_b_snapshot
122+
123+
# move forward 1 days
124+
# new dev environment - touch models to create new snapshots
125+
# model a / b expiry in prod should remain unmodified
126+
# model a / b expiry in dev should be as at today
127+
with time_machine.travel("2020-01-02 00:00:00"):
128+
(models_dir / "model_a.sql").write_text(f"""
129+
MODEL (
130+
name {schema}.model_a,
131+
kind FULL
132+
);
133+
SELECT 1 as a, 2 as b, 3 as c;
134+
""")
135+
136+
sqlmesh = ctx.create_context(
137+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
138+
)
139+
sqlmesh.plan(environment="dev", auto_apply=True)
140+
141+
# should now have 4 snapshots in state - 2x model a and 2x model b
142+
# the new model b is a dev preview because its upstream model changed
143+
all_snapshot_ids = _all_snapshot_ids(sqlmesh)
144+
assert len(all_snapshot_ids) == 4
145+
assert len([s for s in all_snapshot_ids if "model_a" in s.name]) == 2
146+
assert len([s for s in all_snapshot_ids if "model_b" in s.name]) == 2
147+
148+
# context just has the two latest
149+
assert len(sqlmesh.snapshots) == 2
150+
151+
# these expire 1 day later than what's in prod
152+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
153+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
154+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02"
155+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09"
156+
157+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
158+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
159+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-02"
160+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-09"
161+
162+
# move forward 3 days
163+
# touch model b in dev but leave model a
164+
# this bumps the model b expiry but model a remains unchanged, so will expire before model b even though model b depends on it
165+
with time_machine.travel("2020-01-05 00:00:00"):
166+
(models_dir / "model_b.sql").write_text(f"""
167+
MODEL (
168+
name {schema}.model_b,
169+
kind VIEW
170+
);
171+
SELECT a, 'b' as b from {schema}.model_a;
172+
""")
173+
174+
(models_dir / "model_c.sql").write_text(f"""
175+
MODEL (
176+
name {schema}.model_c,
177+
kind VIEW
178+
);
179+
SELECT a, 'c' as c from {schema}.model_b;
180+
""")
181+
182+
(models_dir / "model_d.sql").write_text(f"""
183+
MODEL (
184+
name {schema}.model_d,
185+
kind FULL
186+
);
187+
SELECT a, 'd' as d from {schema}.model_b;
188+
""")
189+
190+
sqlmesh = ctx.create_context(
191+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
192+
)
193+
sqlmesh.plan(environment="dev", auto_apply=True)
194+
195+
# should now have 7 snapshots in state - 2x model a, 3x model b, 1x model c and 1x model d
196+
all_snapshot_ids = _all_snapshot_ids(sqlmesh)
197+
assert len(all_snapshot_ids) == 7
198+
assert len([s for s in all_snapshot_ids if "model_a" in s.name]) == 2
199+
assert len([s for s in all_snapshot_ids if "model_b" in s.name]) == 3
200+
assert len([s for s in all_snapshot_ids if "model_c" in s.name]) == 1
201+
assert len([s for s in all_snapshot_ids if "model_d" in s.name]) == 1
202+
203+
# context just has the 4 latest
204+
assert len(sqlmesh.snapshots) == 4
205+
206+
# model a expiry should not have changed
207+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
208+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
209+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02"
210+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09"
211+
212+
# model b should now expire well after model a
213+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
214+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
215+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-05"
216+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-12"
217+
218+
# model c should expire at the same time as model b
219+
model_c_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_c" in n)
220+
assert to_ds(model_c_snapshot.updated_ts) == to_ds(model_b_snapshot.updated_ts)
221+
assert to_ds(model_c_snapshot.expiration_ts) == to_ds(model_b_snapshot.expiration_ts)
222+
223+
# model d should expire at the same time as model b
224+
model_d_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_d" in n)
225+
assert to_ds(model_d_snapshot.updated_ts) == to_ds(model_b_snapshot.updated_ts)
226+
assert to_ds(model_d_snapshot.expiration_ts) == to_ds(model_b_snapshot.expiration_ts)
227+
228+
# move forward to date where after model a has expired but before model b has expired
229+
# invalidate dev to trigger cleanups
230+
# run janitor
231+
# - table model a is expired so will be cleaned up and this will cascade to view model b
232+
# - view model b is not expired, but because it got cascaded to, this will cascade again to view model c
233+
# - table model d is a not a view, so even though its parent view model b got dropped, it doesnt need to be dropped
234+
with time_machine.travel("2020-01-10 00:00:00"):
235+
sqlmesh = ctx.create_context(
236+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
237+
)
238+
239+
before_snapshot_ids = _all_snapshot_ids(sqlmesh)
240+
241+
before_objects = ctx.get_metadata_results(f"sqlmesh__{schema}")
242+
assert set(before_objects.tables) == set(
243+
[
244+
exp.to_table(s.table_name()).text("this")
245+
for s in (model_a_prod_snapshot, model_a_snapshot, model_d_snapshot)
246+
]
247+
)
248+
assert set(before_objects.views).issuperset(
249+
[
250+
exp.to_table(s.table_name()).text("this")
251+
for s in (model_b_prod_snapshot, model_b_snapshot, model_c_snapshot)
252+
]
253+
)
254+
255+
sqlmesh.invalidate_environment("dev")
256+
sqlmesh.run_janitor(ignore_ttl=False)
257+
258+
after_snapshot_ids = _all_snapshot_ids(sqlmesh)
259+
260+
assert len(before_snapshot_ids) != len(after_snapshot_ids)
261+
262+
# Everything should be left in state except the model_a snapshot, which expired
263+
assert set(after_snapshot_ids) == set(before_snapshot_ids) - set(
264+
[model_a_snapshot.snapshot_id]
265+
)
266+
267+
# In the db, there should be:
268+
# - the two original snapshots that were in prod, table model_a and view model_b
269+
# - model d, even though its not promoted in any environment, because it's a table snapshot that hasnt expired yet
270+
# the view snapshots that depended on model_a should be gone due to the cascading delete
271+
after_objects = ctx.get_metadata_results(f"sqlmesh__{schema}")
272+
assert set(after_objects.tables) == set(
273+
[
274+
exp.to_table(s.table_name()).text("this")
275+
for s in (model_a_prod_snapshot, model_d_snapshot)
276+
]
277+
)
278+
assert after_objects.views == [
279+
exp.to_table(model_b_prod_snapshot.table_name()).text("this")
280+
]

tests/core/test_snapshot_evaluator.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -439,26 +439,31 @@ def create_and_cleanup(name: str, dev_table_only: bool):
439439

440440
snapshot = create_and_cleanup("catalog.test_schema.test_model", True)
441441
adapter_mock.drop_table.assert_called_once_with(
442-
f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev"
442+
f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev",
443+
cascade=True,
443444
)
444445
adapter_mock.reset_mock()
445446

446447
snapshot = create_and_cleanup("test_schema.test_model", False)
447448
adapter_mock.drop_table.assert_has_calls(
448449
[
449450
call(
450-
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev"
451+
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev",
452+
cascade=True,
451453
),
452-
call(f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}"),
454+
call(f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}", cascade=True),
453455
]
454456
)
455457
adapter_mock.reset_mock()
456458

457459
snapshot = create_and_cleanup("test_model", False)
458460
adapter_mock.drop_table.assert_has_calls(
459461
[
460-
call(f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev"),
461-
call(f"sqlmesh__default.test_model__{snapshot.version}"),
462+
call(
463+
f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev",
464+
cascade=True,
465+
),
466+
call(f"sqlmesh__default.test_model__{snapshot.version}", cascade=True),
462467
]
463468
)
464469

@@ -4107,10 +4112,10 @@ def test_multiple_engine_cleanup(snapshot: Snapshot, adapters, make_snapshot):
41074112

41084113
# The clean up will happen using the specific gateway the model was created with
41094114
engine_adapters["default"].drop_table.assert_called_once_with(
4110-
f"sqlmesh__db.db__model__{snapshot.version}__dev"
4115+
f"sqlmesh__db.db__model__{snapshot.version}__dev", cascade=True
41114116
)
41124117
engine_adapters["secondary"].drop_table.assert_called_once_with(
4113-
f"sqlmesh__test_schema.test_schema__test_model__{snapshot_2.version}__dev"
4118+
f"sqlmesh__test_schema.test_schema__test_model__{snapshot_2.version}__dev", cascade=True
41144119
)
41154120

41164121

0 commit comments

Comments
 (0)