diff --git a/sqlmesh/core/state_sync/db/snapshot.py b/sqlmesh/core/state_sync/db/snapshot.py index 5b6d96d970..3e16481487 100644 --- a/sqlmesh/core/state_sync/db/snapshot.py +++ b/sqlmesh/core/state_sync/db/snapshot.py @@ -30,6 +30,7 @@ SnapshotId, SnapshotFingerprint, SnapshotChangeCategory, + snapshots_to_dag, ) from sqlmesh.utils.migration import index_text_type, blob_text_type from sqlmesh.utils.date import now_timestamp, TimeLike, now, to_timestamp @@ -221,21 +222,65 @@ def _get_expired_snapshots( ignore_ttl: bool = False, ) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask]]: expired_query = exp.select("name", "identifier", "version").from_(self.snapshots_table) + expired_record_count = 0 if not ignore_ttl: expired_query = expired_query.where( (exp.column("updated_ts") + exp.column("ttl_ms")) <= current_ts ) - expired_candidates = { - SnapshotId(name=name, identifier=identifier): SnapshotNameVersion( - name=name, version=version - ) - for name, identifier, version in fetchall(self.engine_adapter, expired_query) - } - if not expired_candidates: + if result := fetchone( + self.engine_adapter, + exp.select("count(*)").from_(expired_query.subquery().as_("expired_snapshots")), + ): + expired_record_count = result[0] + + # we need to include views even if they havent expired in case one depends on a table or view that /has/ expired + # but we only need to do this if there are expired objects to begin with + if expired_record_count > 0: + expired_query = t.cast( + exp.Select, expired_query.or_(exp.column("kind_name").eq(ModelKindName.VIEW)) + ) + + candidates = {} + if ignore_ttl or expired_record_count > 0: + candidates = { + SnapshotId(name=name, identifier=identifier): SnapshotNameVersion( + name=name, version=version + ) + for name, identifier, version in fetchall(self.engine_adapter, expired_query) + } + + if not candidates: return set(), [] + expired_candidates: t.Dict[SnapshotId, SnapshotNameVersion] = {} + + if ignore_ttl: + expired_candidates = candidates + else: + # Fetch full snapshots because we need the dependency relationship + full_candidates = self.get_snapshots(candidates.keys()) + + dag = snapshots_to_dag(full_candidates.values()) + + # Include any non-expired views that depend on expired tables + for snapshot_id in dag: + snapshot = full_candidates.get(snapshot_id, None) + + if not snapshot: + continue + + if snapshot.expiration_ts <= current_ts: + # All expired snapshots should be included regardless + expired_candidates[snapshot.snapshot_id] = snapshot.name_version + elif snapshot.model_kind_name == ModelKindName.VIEW: + # Check if any of our parents are in the expired list + # This works because we traverse the dag in topological order, so if our parent either directly + # expired or indirectly expired because /its/ parent expired, it will still be in the expired_snapshots list + if any(parent.snapshot_id in expired_candidates for parent in snapshot.parents): + expired_candidates[snapshot.snapshot_id] = snapshot.name_version + promoted_snapshot_ids = { snapshot.snapshot_id for environment in environments @@ -253,37 +298,39 @@ def _is_snapshot_used(snapshot: SharedVersionSnapshot) -> bool: unique_expired_versions, batch_size=self.SNAPSHOT_BATCH_SIZE ) cleanup_targets = [] - expired_snapshot_ids = set() + expired_sv_snapshot_ids = set() for versions_batch in version_batches: - snapshots = self._get_snapshots_with_same_version(versions_batch) + sv_snapshots = self._get_snapshots_with_same_version(versions_batch) snapshots_by_version = defaultdict(set) snapshots_by_dev_version = defaultdict(set) - for s in snapshots: + for s in sv_snapshots: snapshots_by_version[(s.name, s.version)].add(s.snapshot_id) snapshots_by_dev_version[(s.name, s.dev_version)].add(s.snapshot_id) - expired_snapshots = [s for s in snapshots if not _is_snapshot_used(s)] - expired_snapshot_ids.update([s.snapshot_id for s in expired_snapshots]) + expired_sv_snapshots = [s for s in sv_snapshots if not _is_snapshot_used(s)] + expired_sv_snapshot_ids.update([s.snapshot_id for s in expired_sv_snapshots]) - for snapshot in expired_snapshots: - shared_version_snapshots = snapshots_by_version[(snapshot.name, snapshot.version)] - shared_version_snapshots.discard(snapshot.snapshot_id) + for sv_snapshot in expired_sv_snapshots: + shared_version_snapshots = snapshots_by_version[ + (sv_snapshot.name, sv_snapshot.version) + ] + shared_version_snapshots.discard(sv_snapshot.snapshot_id) shared_dev_version_snapshots = snapshots_by_dev_version[ - (snapshot.name, snapshot.dev_version) + (sv_snapshot.name, sv_snapshot.dev_version) ] - shared_dev_version_snapshots.discard(snapshot.snapshot_id) + shared_dev_version_snapshots.discard(sv_snapshot.snapshot_id) if not shared_dev_version_snapshots: cleanup_targets.append( SnapshotTableCleanupTask( - snapshot=snapshot.full_snapshot.table_info, + snapshot=sv_snapshot.full_snapshot.table_info, dev_table_only=bool(shared_version_snapshots), ) ) - return expired_snapshot_ids, cleanup_targets + return expired_sv_snapshot_ids, cleanup_targets def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None: """Deletes snapshots. diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index 80ce7ac18d..bab8c9490e 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -13,13 +13,14 @@ import pandas as pd # noqa: TID253 import pytest import pytz +import time_machine from sqlglot import exp, parse_one from sqlglot.optimizer.normalize_identifiers import normalize_identifiers from sqlmesh import Config, Context from sqlmesh.cli.project_init import init_example_project from sqlmesh.core.config import load_config_from_paths -from sqlmesh.core.config.connection import ConnectionConfig +from sqlmesh.core.config.connection import ConnectionConfig, DuckDBConnectionConfig import sqlmesh.core.dialect as d from sqlmesh.core.dialect import select_from_values from sqlmesh.core.model import Model, load_sql_based_model @@ -27,9 +28,10 @@ from sqlmesh.core.engine_adapter.mixins import RowDiffMixin, LogicalMergeMixin from sqlmesh.core.model.definition import create_sql_model from sqlmesh.core.plan import Plan +from sqlmesh.core.state_sync.cache import CachingStateSync from sqlmesh.core.state_sync.db import EngineAdapterStateSync -from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory -from sqlmesh.utils.date import now, to_date, to_time_column +from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory, SnapshotId +from sqlmesh.utils.date import now, to_date, to_time_column, to_ds from sqlmesh.core.table_diff import TableDiff from sqlmesh.utils.errors import SQLMeshError from sqlmesh.utils.pydantic import PydanticModel @@ -2799,3 +2801,224 @@ def test_identifier_length_limit(ctx: TestContext): match=re.escape(match), ): adapter.create_table(long_table_name, {"col": exp.DataType.build("int")}) + + +def test_janitor_drops_downstream_unexpired_hard_dependencies( + ctx: TestContext, tmp_path: pathlib.Path +): + """ + Scenario: + + Ensure that cleaning up expired table snapshots also cleans up any unexpired view snapshots that depend on them + + - We create a A (table) <- B (view) + - In dev, we modify A - triggers new version of A and a dev preview of B that both expire in 7 days + - We advance time by 3 days + - In dev, we modify B - triggers a new version of B that depends on A but expires 3 days after A + - In dev, we create B(view) <- C(view) and B(view) <- D(table) + - We advance time by 5 days so that A has reached its expiry but B, C and D have not + - We expire dev so that none of these snapshots are promoted and are thus targets for cleanup + - We run the janitor + + Expected outcome: + - All the dev versions of A and B should be dropped + - C should be dropped as well because it's a view that depends on B which was dropped + - D should not be dropped because while it depends on B which was dropped, it's a table so is still valid after B is dropped + - We should not get a 'ERROR: cannot drop table x because other objects depend on it' on engines that do schema binding + """ + + def _all_snapshot_ids(context: Context) -> t.List[SnapshotId]: + assert isinstance(context.state_sync, CachingStateSync) + assert isinstance(context.state_sync.state_sync, EngineAdapterStateSync) + + return [ + SnapshotId(name=name, identifier=identifier) + for name, identifier in context.state_sync.state_sync.engine_adapter.fetchall( + "select name, identifier from sqlmesh._snapshots" + ) + ] + + models_dir = tmp_path / "models" + models_dir.mkdir() + schema = exp.to_table(ctx.schema(TEST_SCHEMA)).this + + (models_dir / "model_a.sql").write_text(f""" + MODEL ( + name {schema}.model_a, + kind FULL + ); + + SELECT 1 as a, 2 as b; + """) + + (models_dir / "model_b.sql").write_text(f""" + MODEL ( + name {schema}.model_b, + kind VIEW + ); + + SELECT a from {schema}.model_a; + """) + + def _mutate_config(gateway: str, config: Config): + config.gateways[gateway].state_connection = DuckDBConnectionConfig( + database=str(tmp_path / "state.db") + ) + + with time_machine.travel("2020-01-01 00:00:00"): + sqlmesh = ctx.create_context( + path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False + ) + sqlmesh.plan(auto_apply=True) + + model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n) + # expiry is last updated + ttl + assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1) + assert to_ds(model_a_snapshot.updated_ts) == "2020-01-01" + assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-08" + + model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n) + assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1) + assert to_ds(model_b_snapshot.updated_ts) == "2020-01-01" + assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-08" + + model_a_prod_snapshot = model_a_snapshot + model_b_prod_snapshot = model_b_snapshot + + # move forward 1 days + # new dev environment - touch models to create new snapshots + # model a / b expiry in prod should remain unmodified + # model a / b expiry in dev should be as at today + with time_machine.travel("2020-01-02 00:00:00"): + (models_dir / "model_a.sql").write_text(f""" + MODEL ( + name {schema}.model_a, + kind FULL + ); + + SELECT 1 as a, 2 as b, 3 as c; + """) + + sqlmesh = ctx.create_context( + path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False + ) + sqlmesh.plan(environment="dev", auto_apply=True) + + # should now have 4 snapshots in state - 2x model a and 2x model b + # the new model b is a dev preview because its upstream model changed + all_snapshot_ids = _all_snapshot_ids(sqlmesh) + assert len(all_snapshot_ids) == 4 + assert len([s for s in all_snapshot_ids if "model_a" in s.name]) == 2 + assert len([s for s in all_snapshot_ids if "model_b" in s.name]) == 2 + + # context just has the two latest + assert len(sqlmesh.snapshots) == 2 + + # these expire 1 day later than what's in prod + model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n) + assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1) + assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02" + assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09" + + model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n) + assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1) + assert to_ds(model_b_snapshot.updated_ts) == "2020-01-02" + assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-09" + + # move forward 3 days + # touch model b in dev but leave model a + # this bumps the model b expiry but model a remains unchanged, so will expire before model b even though model b depends on it + with time_machine.travel("2020-01-05 00:00:00"): + (models_dir / "model_b.sql").write_text(f""" + MODEL ( + name {schema}.model_b, + kind VIEW + ); + + SELECT a, 'b' as b from {schema}.model_a; + """) + + (models_dir / "model_c.sql").write_text(f""" + MODEL ( + name {schema}.model_c, + kind VIEW + ); + + SELECT a, 'c' as c from {schema}.model_b; + """) + + (models_dir / "model_d.sql").write_text(f""" + MODEL ( + name {schema}.model_d, + kind FULL + ); + + SELECT a, 'd' as d from {schema}.model_b; + """) + + sqlmesh = ctx.create_context( + path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False + ) + sqlmesh.plan(environment="dev", auto_apply=True) + + # should now have 7 snapshots in state - 2x model a, 3x model b, 1x model c and 1x model d + all_snapshot_ids = _all_snapshot_ids(sqlmesh) + assert len(all_snapshot_ids) == 7 + assert len([s for s in all_snapshot_ids if "model_a" in s.name]) == 2 + assert len([s for s in all_snapshot_ids if "model_b" in s.name]) == 3 + assert len([s for s in all_snapshot_ids if "model_c" in s.name]) == 1 + assert len([s for s in all_snapshot_ids if "model_d" in s.name]) == 1 + + # context just has the 4 latest + assert len(sqlmesh.snapshots) == 4 + + # model a expiry should not have changed + model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n) + assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1) + assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02" + assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09" + + # model b should now expire well after model a + model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n) + assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1) + assert to_ds(model_b_snapshot.updated_ts) == "2020-01-05" + assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-12" + + # model c should expire at the same time as model b + model_c_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_c" in n) + assert to_ds(model_c_snapshot.updated_ts) == to_ds(model_b_snapshot.updated_ts) + assert to_ds(model_c_snapshot.expiration_ts) == to_ds(model_b_snapshot.expiration_ts) + + # model d should expire at the same time as model b + model_d_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_d" in n) + assert to_ds(model_d_snapshot.updated_ts) == to_ds(model_b_snapshot.updated_ts) + assert to_ds(model_d_snapshot.expiration_ts) == to_ds(model_b_snapshot.expiration_ts) + + # move forward to date where after model a has expired but before model b has expired + # invalidate dev to trigger cleanups + # run janitor + # - table model a is expired so will be cleaned up and this will cascade to view model b + # - view model b is not expired, but because it got cascaded to, this will cascade again to view model c + # - table model d is a not a view, so even though its parent view model b got dropped, it doesnt need to be dropped + with time_machine.travel("2020-01-10 00:00:00"): + sqlmesh = ctx.create_context( + path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False + ) + + before_snapshot_ids = _all_snapshot_ids(sqlmesh) + + sqlmesh.invalidate_environment("dev") + sqlmesh.run_janitor(ignore_ttl=False) + + after_snapshot_ids = _all_snapshot_ids(sqlmesh) + + assert len(before_snapshot_ids) != len(after_snapshot_ids) + + # all that's left should be the two original snapshots that were in prod and model d + assert set(after_snapshot_ids) == set( + [ + model_a_prod_snapshot.snapshot_id, + model_b_prod_snapshot.snapshot_id, + model_d_snapshot.snapshot_id, + ] + ) diff --git a/tests/core/state_sync/test_state_sync.py b/tests/core/state_sync/test_state_sync.py index eba948bd9a..56fd8af18c 100644 --- a/tests/core/state_sync/test_state_sync.py +++ b/tests/core/state_sync/test_state_sync.py @@ -1124,6 +1124,171 @@ def test_delete_expired_environments(state_sync: EngineAdapterStateSync, make_sn assert state_sync.get_environment_statements(env_a.name) == [] +def test_get_expired_snapshots_includes_downstream_view_snapshots( + state_sync: EngineAdapterStateSync, make_snapshot: t.Callable[..., Snapshot] +): + now_ts = now_timestamp() + + assert len(state_sync.get_expired_snapshots(now_ts)) == 0 + + # model_a: table snapshot + snapshot_a = make_snapshot( + SqlModel( + name="a", + kind="FULL", + query=parse_one("select a, ds"), + ), + ) + snapshot_a.ttl = "in 10 seconds" + snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING) + snapshot_a.updated_ts = now_ts - 15000 # now - 15 seconds = expired + + # model_b: view snapshot that depends on model_a table snapshot + # in an actual scenario, this could have been created a few days later so expires a few days after the snapshot it depends on + # unlike a table, a view ceases to be valid if the upstream table it points to is dropped + snapshot_b = make_snapshot( + SqlModel( + name="b", + kind="VIEW", + depends_on=["a"], + query=parse_one("select *, 'foo' as model_b from a"), + ), + nodes={'"a"': snapshot_a.model}, + ) + snapshot_b.ttl = "in 10 seconds" + snapshot_b.categorize_as(SnapshotChangeCategory.BREAKING) + snapshot_b.updated_ts = now_ts # now = not expired + assert snapshot_b.parents == (snapshot_a.snapshot_id,) + + # model_c: table snapshot that depends on model_a table snapshot but since its a table it will still work if model_a is dropped + # so should not be considered when cleaning up expired snapshots (as it has not expired) + snapshot_c = make_snapshot( + SqlModel( + name="c", + kind="FULL", + depends_on=["a"], + query=parse_one("select *, 'foo' as model_c from a"), + ), + nodes={'"a"': snapshot_a.model}, + ) + snapshot_c.ttl = "in 10 seconds" + snapshot_c.categorize_as(SnapshotChangeCategory.BREAKING) + snapshot_c.updated_ts = now_ts # now = not expired + assert snapshot_c.parents == (snapshot_a.snapshot_id,) + + # model_d: view snapshot with no dependency on model a, so should not be dropped if model a is dropped + snapshot_d = make_snapshot( + SqlModel( + name="d", + kind="VIEW", + query=parse_one("select 'model_d' as d"), + ), + ) + snapshot_d.ttl = "in 10 seconds" + snapshot_d.categorize_as(SnapshotChangeCategory.BREAKING) + snapshot_d.updated_ts = now_ts # now = not expired + + state_sync.push_snapshots([snapshot_a, snapshot_b, snapshot_c, snapshot_d]) + cleanup_targets = state_sync.get_expired_snapshots(now_ts) + + # snapshot_a should be cleaned up because it's expired + # snapshot_b is unexpired but should be cleaned up because snapshot_a is being cleaned up + # - since it's a view, it will stop working if the snapshot_a table it's pointing to is dropped, so thats why it needs to be dropped too + # - additionally, if it isnt dropped, some engines will throw "cannot drop because other objects depend on it" when trying to drop the snapshot_a table + # snapshot_c is unexpired and should not be cleaned up even though its upstream snapshot_a has expired, since it's a table it will still continue to work + # snapshot_d is unexpired and should also not be cleaned up. While it's a view, it doesnt depend on anything that *has* expired, so is not a target for cleanup + assert len(cleanup_targets) == 2 + + snapshot_a_cleanup = next( + (t for t in cleanup_targets if t.snapshot.snapshot_id == snapshot_a.snapshot_id), None + ) + assert snapshot_a_cleanup + + snapshot_b_cleanup = next( + (t for t in cleanup_targets if t.snapshot.snapshot_id == snapshot_b.snapshot_id), None + ) + assert snapshot_b_cleanup + + +def test_get_expired_snapshots_includes_downstream_transitive_view_snapshots( + state_sync: EngineAdapterStateSync, make_snapshot: t.Callable[..., Snapshot] +): + now_ts = now_timestamp() + + # model_a: table snapshot + snapshot_a = make_snapshot( + SqlModel( + name="a", + kind="FULL", + query=parse_one("select a, ds"), + ), + ttl="in 10 seconds", + ) + snapshot_a.updated_ts = now_ts - 15000 # now - 15 seconds = expired + assert not snapshot_a.parents + + # model_b: view snapshot that depends on model_a table snapshot + snapshot_b = make_snapshot( + SqlModel( + name="b", + kind="VIEW", + depends_on=["a"], + query=parse_one("select *, 'model_b' as m from a"), + ), + nodes={'"a"': snapshot_a.model}, + ttl="in 10 seconds", + ) + assert snapshot_b.parents == (snapshot_a.snapshot_id,) + + # model_c: view snapshot that depends on model_b view snapshot (i.e no direct dependency on the model_a table ansphot). + # if the model_b view is dropped, model_c should also be dropped because it's a view that depends on a view being dropped + snapshot_c = make_snapshot( + SqlModel( + name="c", + kind="VIEW", + depends_on=["b"], + query=parse_one("select *, 'model_c' as m from b"), + ), + nodes={'"a"': snapshot_a.model, '"b"': snapshot_b.model}, + ttl="in 10 seconds", + ) + assert snapshot_c.parents == (snapshot_b.snapshot_id,) + + # model_d: table snapshot that depends on model_b view snapshot (i.e no direct dependency on the model_a table snapshot). + # if the model_b view is dropped, model_d should NOT be dropped because it can still be queried afterwards (and has not expired) + snapshot_d = make_snapshot( + SqlModel( + name="d", + kind="FULL", + depends_on=["b"], + query=parse_one("select *, 'model_d' as m from b"), + ), + nodes={'"a"': snapshot_a.model, '"b"': snapshot_b.model}, + ttl="in 10 seconds", + ) + assert snapshot_d.parents == (snapshot_b.snapshot_id,) + + snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING) + snapshot_b.categorize_as(SnapshotChangeCategory.BREAKING) + snapshot_c.categorize_as(SnapshotChangeCategory.BREAKING) + snapshot_d.categorize_as(SnapshotChangeCategory.BREAKING) + + state_sync.push_snapshots([snapshot_a, snapshot_b, snapshot_c, snapshot_d]) + + cleanup_targets = state_sync.get_expired_snapshots(now_ts) + + # snapshot_a should be cleaned up because it's expired + # snapshot_b is unexpired but should be cleaned up because snapshot_a is being cleaned up + # snapshot_c is unexpired and should be cleaned up because snapshot_b is being cleaned up + # snapshot_d is unexpired and should not be cleaned up. Although it depends on snapshot_b which is being cleaned up, + # its a table and not a view so will not be invalid the second snapshot_b is cleaned up + assert len(cleanup_targets) == 3 + + assert set(t.snapshot.snapshot_id for t in cleanup_targets) == set( + [snapshot_a.snapshot_id, snapshot_b.snapshot_id, snapshot_c.snapshot_id] + ) + + def test_delete_expired_snapshots(state_sync: EngineAdapterStateSync, make_snapshot: t.Callable): now_ts = now_timestamp() @@ -1225,10 +1390,17 @@ def test_delete_expired_snapshots_batching( snapshot_b.snapshot_id, } - assert state_sync.delete_expired_snapshots() == [ - SnapshotTableCleanupTask(snapshot=snapshot_a.table_info, dev_table_only=False), - SnapshotTableCleanupTask(snapshot=snapshot_b.table_info, dev_table_only=False), - ] + deleted_snapshots = state_sync.delete_expired_snapshots() + assert len(deleted_snapshots) == 2 + + assert ( + SnapshotTableCleanupTask(snapshot=snapshot_a.table_info, dev_table_only=False) + in deleted_snapshots + ) + assert ( + SnapshotTableCleanupTask(snapshot=snapshot_b.table_info, dev_table_only=False) + in deleted_snapshots + ) assert not state_sync.get_snapshots(all_snapshots)