Skip to content

Commit 0718546

Browse files
committed
Fix: Use drop cascade in janitor
1 parent eed4c26 commit 0718546

File tree

5 files changed

+274
-14
lines changed

5 files changed

+274
-14
lines changed

sqlmesh/core/engine_adapter/athena.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,13 +314,13 @@ def _build_table_properties_exp(
314314

315315
return None
316316

317-
def drop_table(self, table_name: TableName, exists: bool = True) -> None:
317+
def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None:
318318
table = exp.to_table(table_name)
319319

320320
if self._query_table_type(table) == "hive":
321321
self._truncate_table(table)
322322

323-
return super().drop_table(table_name=table, exists=exists)
323+
return super().drop_table(table_name=table, exists=exists, **kwargs)
324324

325325
def _truncate_table(self, table_name: TableName) -> None:
326326
table = exp.to_table(table_name)

sqlmesh/core/engine_adapter/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -959,14 +959,14 @@ def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool =
959959
f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'"
960960
)
961961

962-
def drop_table(self, table_name: TableName, exists: bool = True) -> None:
962+
def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None:
963963
"""Drops a table.
964964
965965
Args:
966966
table_name: The name of the table to drop.
967967
exists: If exists, defaults to True.
968968
"""
969-
self._drop_object(name=table_name, exists=exists)
969+
self._drop_object(name=table_name, exists=exists, **kwargs)
970970

971971
def drop_managed_table(self, table_name: TableName, exists: bool = True) -> None:
972972
"""Drops a managed table.

sqlmesh/core/snapshot/evaluator.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,6 +1082,10 @@ def _cleanup_snapshot(
10821082
table_name,
10831083
is_table_deployable=is_table_deployable,
10841084
physical_schema=snapshot.physical_schema,
1085+
# we need to set cascade=true or we will get a 'cant drop because other objects depend on it'-style
1086+
# error on engines that enforce referential integrity, such as Postgres
1087+
# this situation can happen when a snapshot expires but downstream view snapshots that reference it have not yet expired
1088+
cascade=True,
10851089
)
10861090

10871091
if on_complete is not None:
@@ -1597,7 +1601,7 @@ def migrate(
15971601

15981602
def delete(self, name: str, **kwargs: t.Any) -> None:
15991603
_check_table_db_is_physical_schema(name, kwargs["physical_schema"])
1600-
self.adapter.drop_table(name)
1604+
self.adapter.drop_table(name, cascade=kwargs.pop("cascade", False))
16011605
logger.info("Dropped table '%s'", name)
16021606

16031607

@@ -2028,15 +2032,16 @@ def migrate(
20282032
)
20292033

20302034
def delete(self, name: str, **kwargs: t.Any) -> None:
2035+
cascade = kwargs.pop("cascade", False)
20312036
try:
2032-
self.adapter.drop_view(name)
2037+
self.adapter.drop_view(name, cascade=cascade)
20332038
except Exception:
20342039
logger.debug(
20352040
"Failed to drop view '%s'. Trying to drop the materialized view instead",
20362041
name,
20372042
exc_info=True,
20382043
)
2039-
self.adapter.drop_view(name, materialized=True)
2044+
self.adapter.drop_view(name, materialized=True, cascade=cascade)
20402045
logger.info("Dropped view '%s'", name)
20412046

20422047
def _is_materialized_view(self, model: Model) -> bool:

tests/core/engine_adapter/integration/test_integration_postgres.py

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
import typing as t
22
import pytest
33
from pytest import FixtureRequest
4+
from pathlib import Path
45
from sqlmesh.core.engine_adapter import PostgresEngineAdapter
6+
from sqlmesh.core.config import Config, DuckDBConnectionConfig
7+
from tests.core.engine_adapter.integration import TestContext
8+
import time_machine
9+
from datetime import timedelta
10+
from sqlmesh.utils.date import to_ds
11+
from sqlglot import exp
12+
from sqlmesh.core.context import Context
13+
from sqlmesh.core.state_sync import CachingStateSync, EngineAdapterStateSync
14+
from sqlmesh.core.snapshot.definition import SnapshotId
515

616
from tests.core.engine_adapter.integration import (
717
TestContext,
818
generate_pytest_params,
919
ENGINES_BY_NAME,
1020
IntegrationTestEngine,
21+
TEST_SCHEMA,
1122
)
1223

1324

@@ -33,3 +44,242 @@ def test_engine_adapter(ctx: TestContext):
3344
def test_server_version_psycopg(ctx: TestContext):
3445
assert isinstance(ctx.engine_adapter, PostgresEngineAdapter)
3546
assert ctx.engine_adapter.server_version != (0, 0)
47+
48+
49+
def test_janitor_drop_cascade(ctx: TestContext, tmp_path: Path) -> None:
50+
"""
51+
Scenario:
52+
Ensure that cleaning up expired table snapshots also cleans up any unexpired view snapshots that depend on them
53+
- We create a A (table) <- B (view)
54+
- In dev, we modify A - triggers new version of A and a dev preview of B that both expire in 7 days
55+
- We advance time by 3 days
56+
- In dev, we modify B - triggers a new version of B that depends on A but expires 3 days after A
57+
- In dev, we create B(view) <- C(view) and B(view) <- D(table)
58+
- We advance time by 5 days so that A has reached its expiry but B, C and D have not
59+
- We expire dev so that none of these snapshots are promoted and are thus targets for cleanup
60+
- We run the janitor
61+
62+
Expected outcome:
63+
- All the dev versions of A and B should be dropped
64+
- C should be dropped as well because it's a view that depends on B which was dropped
65+
- D should not be dropped because while it depends on B which was dropped, it's a table so is still valid after B is dropped
66+
- We should NOT get a 'ERROR: cannot drop table x because other objects depend on it'
67+
68+
Note that the references in state to the views that were cascade-dropped by postgres will still exist, this is considered ok
69+
as applying a plan will recreate the physical objects
70+
"""
71+
72+
def _all_snapshot_ids(context: Context) -> t.List[SnapshotId]:
73+
assert isinstance(context.state_sync, CachingStateSync)
74+
assert isinstance(context.state_sync.state_sync, EngineAdapterStateSync)
75+
76+
return [
77+
SnapshotId(name=name, identifier=identifier)
78+
for name, identifier in context.state_sync.state_sync.engine_adapter.fetchall(
79+
"select name, identifier from sqlmesh._snapshots"
80+
)
81+
]
82+
83+
models_dir = tmp_path / "models"
84+
models_dir.mkdir()
85+
schema = exp.to_table(ctx.schema(TEST_SCHEMA)).this
86+
87+
(models_dir / "model_a.sql").write_text(f"""
88+
MODEL (
89+
name {schema}.model_a,
90+
kind FULL
91+
);
92+
SELECT 1 as a, 2 as b;
93+
""")
94+
95+
(models_dir / "model_b.sql").write_text(f"""
96+
MODEL (
97+
name {schema}.model_b,
98+
kind VIEW
99+
);
100+
SELECT a from {schema}.model_a;
101+
""")
102+
103+
def _mutate_config(gateway: str, config: Config):
104+
config.gateways[gateway].state_connection = DuckDBConnectionConfig(
105+
database=str(tmp_path / "state.db")
106+
)
107+
108+
with time_machine.travel("2020-01-01 00:00:00"):
109+
sqlmesh = ctx.create_context(
110+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
111+
)
112+
sqlmesh.plan(auto_apply=True)
113+
114+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
115+
# expiry is last updated + ttl
116+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
117+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-01"
118+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-08"
119+
120+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
121+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
122+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-01"
123+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-08"
124+
125+
model_a_prod_snapshot = model_a_snapshot
126+
model_b_prod_snapshot = model_b_snapshot
127+
128+
# move forward 1 days
129+
# new dev environment - touch models to create new snapshots
130+
# model a / b expiry in prod should remain unmodified
131+
# model a / b expiry in dev should be as at today
132+
with time_machine.travel("2020-01-02 00:00:00"):
133+
(models_dir / "model_a.sql").write_text(f"""
134+
MODEL (
135+
name {schema}.model_a,
136+
kind FULL
137+
);
138+
SELECT 1 as a, 2 as b, 3 as c;
139+
""")
140+
141+
sqlmesh = ctx.create_context(
142+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
143+
)
144+
sqlmesh.plan(environment="dev", auto_apply=True)
145+
146+
# should now have 4 snapshots in state - 2x model a and 2x model b
147+
# the new model b is a dev preview because its upstream model changed
148+
all_snapshot_ids = _all_snapshot_ids(sqlmesh)
149+
assert len(all_snapshot_ids) == 4
150+
assert len([s for s in all_snapshot_ids if "model_a" in s.name]) == 2
151+
assert len([s for s in all_snapshot_ids if "model_b" in s.name]) == 2
152+
153+
# context just has the two latest
154+
assert len(sqlmesh.snapshots) == 2
155+
156+
# these expire 1 day later than what's in prod
157+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
158+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
159+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02"
160+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09"
161+
162+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
163+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
164+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-02"
165+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-09"
166+
167+
# move forward 3 days
168+
# touch model b in dev but leave model a
169+
# this bumps the model b expiry but model a remains unchanged, so will expire before model b even though model b depends on it
170+
with time_machine.travel("2020-01-05 00:00:00"):
171+
(models_dir / "model_b.sql").write_text(f"""
172+
MODEL (
173+
name {schema}.model_b,
174+
kind VIEW
175+
);
176+
SELECT a, 'b' as b from {schema}.model_a;
177+
""")
178+
179+
(models_dir / "model_c.sql").write_text(f"""
180+
MODEL (
181+
name {schema}.model_c,
182+
kind VIEW
183+
);
184+
SELECT a, 'c' as c from {schema}.model_b;
185+
""")
186+
187+
(models_dir / "model_d.sql").write_text(f"""
188+
MODEL (
189+
name {schema}.model_d,
190+
kind FULL
191+
);
192+
SELECT a, 'd' as d from {schema}.model_b;
193+
""")
194+
195+
sqlmesh = ctx.create_context(
196+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
197+
)
198+
sqlmesh.plan(environment="dev", auto_apply=True)
199+
200+
# should now have 7 snapshots in state - 2x model a, 3x model b, 1x model c and 1x model d
201+
all_snapshot_ids = _all_snapshot_ids(sqlmesh)
202+
assert len(all_snapshot_ids) == 7
203+
assert len([s for s in all_snapshot_ids if "model_a" in s.name]) == 2
204+
assert len([s for s in all_snapshot_ids if "model_b" in s.name]) == 3
205+
assert len([s for s in all_snapshot_ids if "model_c" in s.name]) == 1
206+
assert len([s for s in all_snapshot_ids if "model_d" in s.name]) == 1
207+
208+
# context just has the 4 latest
209+
assert len(sqlmesh.snapshots) == 4
210+
211+
# model a expiry should not have changed
212+
model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n)
213+
assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1)
214+
assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02"
215+
assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09"
216+
217+
# model b should now expire well after model a
218+
model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n)
219+
assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1)
220+
assert to_ds(model_b_snapshot.updated_ts) == "2020-01-05"
221+
assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-12"
222+
223+
# model c should expire at the same time as model b
224+
model_c_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_c" in n)
225+
assert to_ds(model_c_snapshot.updated_ts) == to_ds(model_b_snapshot.updated_ts)
226+
assert to_ds(model_c_snapshot.expiration_ts) == to_ds(model_b_snapshot.expiration_ts)
227+
228+
# model d should expire at the same time as model b
229+
model_d_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_d" in n)
230+
assert to_ds(model_d_snapshot.updated_ts) == to_ds(model_b_snapshot.updated_ts)
231+
assert to_ds(model_d_snapshot.expiration_ts) == to_ds(model_b_snapshot.expiration_ts)
232+
233+
# move forward to date where after model a has expired but before model b has expired
234+
# invalidate dev to trigger cleanups
235+
# run janitor
236+
# - table model a is expired so will be cleaned up and this will cascade to view model b
237+
# - view model b is not expired, but because it got cascaded to, this will cascade again to view model c
238+
# - table model d is a not a view, so even though its parent view model b got dropped, it doesnt need to be dropped
239+
with time_machine.travel("2020-01-10 00:00:00"):
240+
sqlmesh = ctx.create_context(
241+
path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False
242+
)
243+
244+
before_snapshot_ids = _all_snapshot_ids(sqlmesh)
245+
246+
before_objects = ctx.get_metadata_results(f"sqlmesh__{schema}")
247+
assert set(before_objects.tables) == set(
248+
[
249+
exp.to_table(s.table_name()).text("this")
250+
for s in (model_a_prod_snapshot, model_a_snapshot, model_d_snapshot)
251+
]
252+
)
253+
assert set(before_objects.views).issuperset(
254+
[
255+
exp.to_table(s.table_name()).text("this")
256+
for s in (model_b_prod_snapshot, model_b_snapshot, model_c_snapshot)
257+
]
258+
)
259+
260+
sqlmesh.invalidate_environment("dev")
261+
sqlmesh.run_janitor(ignore_ttl=False)
262+
263+
after_snapshot_ids = _all_snapshot_ids(sqlmesh)
264+
265+
assert len(before_snapshot_ids) != len(after_snapshot_ids)
266+
267+
# Everything should be left in state except the model_a snapshot, which expired
268+
assert set(after_snapshot_ids) == set(before_snapshot_ids) - set(
269+
[model_a_snapshot.snapshot_id]
270+
)
271+
272+
# In the db, there should be:
273+
# - the two original snapshots that were in prod, table model_a and view model_b
274+
# - model d, even though its not promoted in any environment, because it's a table snapshot that hasnt expired yet
275+
# the view snapshots that depended on model_a should be gone due to the cascading delete
276+
after_objects = ctx.get_metadata_results(f"sqlmesh__{schema}")
277+
assert set(after_objects.tables) == set(
278+
[
279+
exp.to_table(s.table_name()).text("this")
280+
for s in (model_a_prod_snapshot, model_d_snapshot)
281+
]
282+
)
283+
assert after_objects.views == [
284+
exp.to_table(model_b_prod_snapshot.table_name()).text("this")
285+
]

tests/core/test_snapshot_evaluator.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -439,26 +439,31 @@ def create_and_cleanup(name: str, dev_table_only: bool):
439439

440440
snapshot = create_and_cleanup("catalog.test_schema.test_model", True)
441441
adapter_mock.drop_table.assert_called_once_with(
442-
f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev"
442+
f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev",
443+
cascade=True,
443444
)
444445
adapter_mock.reset_mock()
445446

446447
snapshot = create_and_cleanup("test_schema.test_model", False)
447448
adapter_mock.drop_table.assert_has_calls(
448449
[
449450
call(
450-
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev"
451+
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev",
452+
cascade=True,
451453
),
452-
call(f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}"),
454+
call(f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}", cascade=True),
453455
]
454456
)
455457
adapter_mock.reset_mock()
456458

457459
snapshot = create_and_cleanup("test_model", False)
458460
adapter_mock.drop_table.assert_has_calls(
459461
[
460-
call(f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev"),
461-
call(f"sqlmesh__default.test_model__{snapshot.version}"),
462+
call(
463+
f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev",
464+
cascade=True,
465+
),
466+
call(f"sqlmesh__default.test_model__{snapshot.version}", cascade=True),
462467
]
463468
)
464469

@@ -4107,10 +4112,10 @@ def test_multiple_engine_cleanup(snapshot: Snapshot, adapters, make_snapshot):
41074112

41084113
# The clean up will happen using the specific gateway the model was created with
41094114
engine_adapters["default"].drop_table.assert_called_once_with(
4110-
f"sqlmesh__db.db__model__{snapshot.version}__dev"
4115+
f"sqlmesh__db.db__model__{snapshot.version}__dev", cascade=True
41114116
)
41124117
engine_adapters["secondary"].drop_table.assert_called_once_with(
4113-
f"sqlmesh__test_schema.test_schema__test_model__{snapshot_2.version}__dev"
4118+
f"sqlmesh__test_schema.test_schema__test_model__{snapshot_2.version}__dev", cascade=True
41144119
)
41154120

41164121

0 commit comments

Comments
 (0)