Skip to content

Commit 2727faa

Browse files
committed
Fix: Use drop cascade in janitor
1 parent 21f06dd commit 2727faa

File tree

5 files changed

+274
-14
lines changed

5 files changed

+274
-14
lines changed

sqlmesh/core/engine_adapter/athena.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,13 +314,13 @@ def _build_table_properties_exp(
314314

315315
return None
316316

317-
def drop_table(self, table_name: TableName, exists: bool = True) -> None:
317+
def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None:
318318
table = exp.to_table(table_name)
319319

320320
if self._query_table_type(table) == "hive":
321321
self._truncate_table(table)
322322

323-
return super().drop_table(table_name=table, exists=exists)
323+
return super().drop_table(table_name=table, exists=exists, **kwargs)
324324

325325
def _truncate_table(self, table_name: TableName) -> None:
326326
table = exp.to_table(table_name)

sqlmesh/core/engine_adapter/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,14 +1030,14 @@ def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool =
10301030
f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'"
10311031
)
10321032

1033-
def drop_table(self, table_name: TableName, exists: bool = True) -> None:
1033+
def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None:
10341034
"""Drops a table.
10351035
10361036
Args:
10371037
table_name: The name of the table to drop.
10381038
exists: If exists, defaults to True.
10391039
"""
1040-
self._drop_object(name=table_name, exists=exists)
1040+
self._drop_object(name=table_name, exists=exists, **kwargs)
10411041

10421042
def drop_managed_table(self, table_name: TableName, exists: bool = True) -> None:
10431043
"""Drops a managed table.

sqlmesh/core/snapshot/evaluator.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,6 +1083,10 @@ def _cleanup_snapshot(
10831083
table_name,
10841084
is_table_deployable=is_table_deployable,
10851085
physical_schema=snapshot.physical_schema,
1086+
# we need to set cascade=true or we will get a 'cant drop because other objects depend on it'-style
1087+
# error on engines that enforce referential integrity, such as Postgres
1088+
# this situation can happen when a snapshot expires but downstream view snapshots that reference it have not yet expired
1089+
cascade=True,
10861090
)
10871091
except Exception:
10881092
# Use `get_data_object` to check if the table exists instead of `table_exists` since the former
@@ -1585,7 +1589,7 @@ def migrate(
15851589

15861590
def delete(self, name: str, **kwargs: t.Any) -> None:
15871591
_check_table_db_is_physical_schema(name, kwargs["physical_schema"])
1588-
self.adapter.drop_table(name)
1592+
self.adapter.drop_table(name, cascade=kwargs.pop("cascade", False))
15891593
logger.info("Dropped table '%s'", name)
15901594

15911595
def _replace_query_for_model(
@@ -2195,15 +2199,16 @@ def migrate(
21952199
)
21962200

21972201
def delete(self, name: str, **kwargs: t.Any) -> None:
2202+
cascade = kwargs.pop("cascade", False)
21982203
try:
2199-
self.adapter.drop_view(name)
2204+
self.adapter.drop_view(name, cascade=cascade)
22002205
except Exception:
22012206
logger.debug(
22022207
"Failed to drop view '%s'. Trying to drop the materialized view instead",
22032208
name,
22042209
exc_info=True,
22052210
)
2206-
self.adapter.drop_view(name, materialized=True)
2211+
self.adapter.drop_view(name, materialized=True, cascade=cascade)
22072212
logger.info("Dropped view '%s'", name)
22082213

22092214
def _is_materialized_view(self, model: Model) -> bool:

tests/core/engine_adapter/integration/test_integration_postgres.py

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
import typing as t
22
import pytest
33
from pytest import FixtureRequest
4+
from pathlib import Path
45
from sqlmesh.core.engine_adapter import PostgresEngineAdapter
6+
from sqlmesh.core.config import Config, DuckDBConnectionConfig
7+
from tests.core.engine_adapter.integration import TestContext
8+
import time_machine
9+
from datetime import timedelta
10+
from sqlmesh.utils.date import to_ds
11+
from sqlglot import exp
12+
from sqlmesh.core.context import Context
13+
from sqlmesh.core.state_sync import CachingStateSync, EngineAdapterStateSync
14+
from sqlmesh.core.snapshot.definition import SnapshotId
515

616
from tests.core.engine_adapter.integration import (
717
TestContext,
818
generate_pytest_params,
919
ENGINES_BY_NAME,
1020
IntegrationTestEngine,
21+
TEST_SCHEMA,
1122
)
1223

1324

@@ -33,3 +44,242 @@ def test_engine_adapter(ctx: TestContext):
3344
def test_server_version_psycopg(ctx: TestContext):
3445
assert isinstance(ctx.engine_adapter, PostgresEngineAdapter)
3546
assert ctx.engine_adapter.server_version != (0, 0)
47+
48+
49+
def test_janitor_drop_cascade(ctx: TestContext, tmp_path: Path) -> None:
50+
"""
51+
Scenario:
52+
Ensure that cleaning up expired table snapshots also cleans up any unexpired view snapshots that depend on them
53+
- We create a A (table) <- B (view)
54+
- In dev, we modify A - triggers new version of A and a dev preview of B that both expire in 7 days
55+
- We advance time by 3 days
56+
- In dev, we modify B - triggers a new version of B that depends on A but expires 3 days after A
57+
- In dev, we create B(view) <- C(view) and B(view) <- D(table)
58+
- We advance time by 5 days so that A has reached its expiry but B, C and D have not
59+
- We expire dev so that none of these snapshots are promoted and are thus targets for cleanup
60+
- We run the janitor
61+
62+
Expected outcome:
63+
- All the dev versions of A and B should be dropped
64+
- C should be dropped as well because it's a view that depends on B which was dropped
65+
- D should not be dropped because while it depends on B which was dropped, it's a table so is still valid after B is dropped
66+
- We should NOT get a 'ERROR: cannot drop table x because other objects depend on it'
67+
68+
Note that the references in state to the views that were cascade-dropped by postgres will still exist, this is considered ok
69+
as applying a plan will recreate the physical objects
70+
"""
71+
72+
def _all_snapshot_ids(context: Context) -> t.List[SnapshotId]:
73+
assert isinstance(context.state_sync, CachingStateSync)
74+
assert isinstance(context.state_sync.state_sync, EngineAdapterStateSync)
75+
76+
return [
77+
SnapshotId(name=name, identifier=identifier)
78+
for name, identifier in context.state_sync.state_sync.engine_adapter.fetchall(
79+
"select name, identifier from sqlmesh._snapshots"
80+
)
81+
]
82+
83+
models_dir = tmp_path / "models"
84+
models_dir.mkdir()
85+
schema = exp.to_table(ctx.schema(TEST_SCHEMA)).this
86+
87+
(models_dir / "model_a.sql").write_text(f"""
88+
MODEL (
89+
name {schema}.model_a,
90+
kind FULL
91+
);
92+
SELECT 1 as a, 2 as b;
93+
""")
94+
95+
(models_dir / "model_b.sql").write_text(f"""
96+
MODEL (
97+
name {schema}.model_b,
98+
kind VIEW
99+
);
100+
SELECT a from {schema}.model_a;
101+
""")
102+
103+
def _mutate_config(gateway: str, config: Config):
104+
config.gateways[gateway].state_connection = DuckDBConnectionConfig(
105+
database=str(tmp_path / "state.db")
106+
)
107+
108+
with time_machine.travel("2020-01-01 00:00:00"):
109+
sqlmesh = ctx.create_context(
110+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
111+
)
112+
sqlmesh.plan(auto_apply=True)
113+
114+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
115+
# expiry is last updated + ttl
116+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
117+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-01"
118+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-08"
119+
120+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
121+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
122+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-01"
123+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-08"
124+
125+
model_a_prod_snapshot = model_a_snapshot
126+
model_b_prod_snapshot = model_b_snapshot
127+
128+
# move forward 1 days
129+
# new dev environment - touch models to create new snapshots
130+
# model a / b expiry in prod should remain unmodified
131+
# model a / b expiry in dev should be as at today
132+
with time_machine.travel("2020-01-02 00:00:00"):
133+
(models_dir / "model_a.sql").write_text(f"""
134+
MODEL (
135+
name {schema}.model_a,
136+
kind FULL
137+
);
138+
SELECT 1 as a, 2 as b, 3 as c;
139+
""")
140+
141+
sqlmesh = ctx.create_context(
142+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
143+
)
144+
sqlmesh.plan(environment="dev", auto_apply=True)
145+
146+
# should now have 4 snapshots in state - 2x model a and 2x model b
147+
# the new model b is a dev preview because its upstream model changed
148+
all_snapshot_ids = _all_snapshot_ids(sqlmesh)
149+
assert len(all_snapshot_ids) == 4
150+
assert len([s for s in all_snapshot_ids if "model_a" in s.name]) == 2
151+
assert len([s for s in all_snapshot_ids if "model_b" in s.name]) == 2
152+
153+
# context just has the two latest
154+
assert len(sqlmesh.snapshots) == 2
155+
156+
# these expire 1 day later than what's in prod
157+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
158+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
159+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02"
160+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09"
161+
162+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
163+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
164+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-02"
165+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-09"
166+
167+
# move forward 3 days
168+
# touch model b in dev but leave model a
169+
# this bumps the model b expiry but model a remains unchanged, so will expire before model b even though model b depends on it
170+
with time_machine.travel("2020-01-05 00:00:00"):
171+
(models_dir / "model_b.sql").write_text(f"""
172+
MODEL (
173+
name {schema}.model_b,
174+
kind VIEW
175+
);
176+
SELECT a, 'b' as b from {schema}.model_a;
177+
""")
178+
179+
(models_dir / "model_c.sql").write_text(f"""
180+
MODEL (
181+
name {schema}.model_c,
182+
kind VIEW
183+
);
184+
SELECT a, 'c' as c from {schema}.model_b;
185+
""")
186+
187+
(models_dir / "model_d.sql").write_text(f"""
188+
MODEL (
189+
name {schema}.model_d,
190+
kind FULL
191+
);
192+
SELECT a, 'd' as d from {schema}.model_b;
193+
""")
194+
195+
sqlmesh = ctx.create_context(
196+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
197+
)
198+
sqlmesh.plan(environment="dev", auto_apply=True)
199+
200+
# should now have 7 snapshots in state - 2x model a, 3x model b, 1x model c and 1x model d
201+
all_snapshot_ids = _all_snapshot_ids(sqlmesh)
202+
assert len(all_snapshot_ids) == 7
203+
assert len([s for s in all_snapshot_ids if "model_a" in s.name]) == 2
204+
assert len([s for s in all_snapshot_ids if "model_b" in s.name]) == 3
205+
assert len([s for s in all_snapshot_ids if "model_c" in s.name]) == 1
206+
assert len([s for s in all_snapshot_ids if "model_d" in s.name]) == 1
207+
208+
# context just has the 4 latest
209+
assert len(sqlmesh.snapshots) == 4
210+
211+
# model a expiry should not have changed
212+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
213+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
214+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02"
215+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09"
216+
217+
# model b should now expire well after model a
218+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
219+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
220+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-05"
221+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-12"
222+
223+
# model c should expire at the same time as model b
224+
model_c_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_c" in n)
225+
assert to_ds(model_c_snapshot.updated_ts) == to_ds(model_b_snapshot.updated_ts)
226+
assert to_ds(model_c_snapshot.expiration_ts) == to_ds(model_b_snapshot.expiration_ts)
227+
228+
# model d should expire at the same time as model b
229+
model_d_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_d" in n)
230+
assert to_ds(model_d_snapshot.updated_ts) == to_ds(model_b_snapshot.updated_ts)
231+
assert to_ds(model_d_snapshot.expiration_ts) == to_ds(model_b_snapshot.expiration_ts)
232+
233+
# move forward to date where after model a has expired but before model b has expired
234+
# invalidate dev to trigger cleanups
235+
# run janitor
236+
# - table model a is expired so will be cleaned up and this will cascade to view model b
237+
# - view model b is not expired, but because it got cascaded to, this will cascade again to view model c
238+
# - table model d is a not a view, so even though its parent view model b got dropped, it doesnt need to be dropped
239+
with time_machine.travel("2020-01-10 00:00:00"):
240+
sqlmesh = ctx.create_context(
241+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
242+
)
243+
244+
before_snapshot_ids = _all_snapshot_ids(sqlmesh)
245+
246+
before_objects = ctx.get_metadata_results(f"sqlmesh__{schema}")
247+
assert set(before_objects.tables) == set(
248+
[
249+
exp.to_table(s.table_name()).text("this")
250+
for s in (model_a_prod_snapshot, model_a_snapshot, model_d_snapshot)
251+
]
252+
)
253+
assert set(before_objects.views).issuperset(
254+
[
255+
exp.to_table(s.table_name()).text("this")
256+
for s in (model_b_prod_snapshot, model_b_snapshot, model_c_snapshot)
257+
]
258+
)
259+
260+
sqlmesh.invalidate_environment("dev")
261+
sqlmesh.run_janitor(ignore_ttl=False)
262+
263+
after_snapshot_ids = _all_snapshot_ids(sqlmesh)
264+
265+
assert len(before_snapshot_ids) != len(after_snapshot_ids)
266+
267+
# Everything should be left in state except the model_a snapshot, which expired
268+
assert set(after_snapshot_ids) == set(before_snapshot_ids) - set(
269+
[model_a_snapshot.snapshot_id]
270+
)
271+
272+
# In the db, there should be:
273+
# - the two original snapshots that were in prod, table model_a and view model_b
274+
# - model d, even though its not promoted in any environment, because it's a table snapshot that hasnt expired yet
275+
# the view snapshots that depended on model_a should be gone due to the cascading delete
276+
after_objects = ctx.get_metadata_results(f"sqlmesh__{schema}")
277+
assert set(after_objects.tables) == set(
278+
[
279+
exp.to_table(s.table_name()).text("this")
280+
for s in (model_a_prod_snapshot, model_d_snapshot)
281+
]
282+
)
283+
assert after_objects.views == [
284+
exp.to_table(model_b_prod_snapshot.table_name()).text("this")
285+
]

tests/core/test_snapshot_evaluator.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,8 @@ def create_and_cleanup(name: str, dev_table_only: bool):
440440
snapshot = create_and_cleanup("catalog.test_schema.test_model", True)
441441
adapter_mock.get_data_object.assert_not_called()
442442
adapter_mock.drop_table.assert_called_once_with(
443-
f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev"
443+
f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev",
444+
cascade=True,
444445
)
445446
adapter_mock.reset_mock()
446447

@@ -449,9 +450,10 @@ def create_and_cleanup(name: str, dev_table_only: bool):
449450
adapter_mock.drop_table.assert_has_calls(
450451
[
451452
call(
452-
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev"
453+
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev",
454+
cascade=True,
453455
),
454-
call(f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}"),
456+
call(f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}", cascade=True),
455457
]
456458
)
457459
adapter_mock.reset_mock()
@@ -460,8 +462,11 @@ def create_and_cleanup(name: str, dev_table_only: bool):
460462
adapter_mock.get_data_object.assert_not_called()
461463
adapter_mock.drop_table.assert_has_calls(
462464
[
463-
call(f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev"),
464-
call(f"sqlmesh__default.test_model__{snapshot.version}"),
465+
call(
466+
f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev",
467+
cascade=True,
468+
),
469+
call(f"sqlmesh__default.test_model__{snapshot.version}", cascade=True),
465470
]
466471
)
467472

@@ -4198,10 +4203,10 @@ def test_multiple_engine_cleanup(snapshot: Snapshot, adapters, make_snapshot):
41984203

41994204
# The clean up will happen using the specific gateway the model was created with
42004205
engine_adapters["default"].drop_table.assert_called_once_with(
4201-
f"sqlmesh__db.db__model__{snapshot.version}__dev"
4206+
f"sqlmesh__db.db__model__{snapshot.version}__dev", cascade=True
42024207
)
42034208
engine_adapters["secondary"].drop_table.assert_called_once_with(
4204-
f"sqlmesh__test_schema.test_schema__test_model__{snapshot_2.version}__dev"
4209+
f"sqlmesh__test_schema.test_schema__test_model__{snapshot_2.version}__dev", cascade=True
42054210
)
42064211

42074212

0 commit comments

Comments
 (0)