|
14 | 14 | from sqlmesh.utils.pydantic import PydanticModel |
15 | 15 | from sqlmesh.core.environment import Environment, EnvironmentStatements |
16 | 16 | from sqlmesh.utils.errors import SQLMeshError |
17 | | -from sqlmesh.core.snapshot import Snapshot |
| 17 | +from sqlmesh.core.snapshot import Snapshot, SnapshotEvaluator |
18 | 18 |
|
19 | 19 | if t.TYPE_CHECKING: |
20 | 20 | from sqlmesh.core.engine_adapter.base import EngineAdapter |
21 | | - from sqlmesh.core.state_sync.base import Versions |
| 21 | + from sqlmesh.core.state_sync.base import Versions, ExpiredSnapshotBatch, StateReader, StateSync |
22 | 22 |
|
23 | 23 | logger = logging.getLogger(__name__) |
24 | 24 |
|
| 25 | +EXPIRED_SNAPSHOT_DEFAULT_BATCH_SIZE = 200 |
| 26 | + |
25 | 27 |
|
26 | 28 | def cleanup_expired_views( |
27 | 29 | default_adapter: EngineAdapter, |
@@ -215,3 +217,94 @@ def __iter__(self) -> t.Iterator[StateStreamContents]: |
215 | 217 | yield EnvironmentsChunk(environments) |
216 | 218 |
|
217 | 219 | return _StateStream() |
| 220 | + |
| 221 | + |
| 222 | +def for_each_expired_snapshot_batch( |
| 223 | + state_reader: StateReader, |
| 224 | + *, |
| 225 | + current_ts: int, |
| 226 | + ignore_ttl: bool = False, |
| 227 | + callback: t.Callable[[ExpiredSnapshotBatch], None], |
| 228 | + batch_size: t.Optional[int] = None, |
| 229 | +) -> int: |
| 230 | + """Iterate over expired snapshot batches and invoke callback for each batch. |
| 231 | +
|
| 232 | + Args: |
| 233 | + state_reader: StateReader instance to query expired snapshots from. |
| 234 | + current_ts: Timestamp used to evaluate expiration. |
| 235 | + ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced). |
| 236 | + callback: Function to invoke for each batch of expired snapshots. |
| 237 | + batch_size: Maximum number of snapshots to fetch per batch. |
| 238 | + """ |
| 239 | + from sqlmesh.core.state_sync.base import LowerBatchBoundary |
| 240 | + |
| 241 | + batch_size = batch_size if batch_size is not None else EXPIRED_SNAPSHOT_DEFAULT_BATCH_SIZE |
| 242 | + batch_boundary = LowerBatchBoundary.init_batch_boundary(batch_size=batch_size) |
| 243 | + num_expired_snapshots = 0 |
| 244 | + |
| 245 | + while True: |
| 246 | + batch = state_reader.get_expired_snapshots( |
| 247 | + current_ts=current_ts, |
| 248 | + ignore_ttl=ignore_ttl, |
| 249 | + batch_boundary=batch_boundary, |
| 250 | + ) |
| 251 | + |
| 252 | + if not batch: |
| 253 | + return num_expired_snapshots |
| 254 | + |
| 255 | + callback(batch) |
| 256 | + |
| 257 | + batch_boundary = batch.batch_boundary.to_lower_batch_boundary(batch_size=batch_size) |
| 258 | + num_expired_snapshots += len(batch.expired_snapshot_ids) |
| 259 | + |
| 260 | + |
| 261 | +def delete_expired_snapshots( |
| 262 | + state_sync: StateSync, |
| 263 | + snapshot_evaluator: SnapshotEvaluator, |
| 264 | + *, |
| 265 | + current_ts: int, |
| 266 | + ignore_ttl: bool = False, |
| 267 | + batch_size: t.Optional[int] = None, |
| 268 | + console: t.Optional[Console] = None, |
| 269 | +) -> None: |
| 270 | + """Delete all expired snapshots in batches. |
| 271 | +
|
| 272 | + This helper function encapsulates the logic for deleting expired snapshots in batches, |
| 273 | + eliminating code duplication across different use cases. |
| 274 | +
|
| 275 | + Args: |
| 276 | + state_sync: StateSync instance to query and delete expired snapshots from. |
| 277 | + snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots. |
| 278 | + current_ts: Timestamp used to evaluate expiration. |
| 279 | + ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced). |
| 280 | + batch_size: Maximum number of snapshots to fetch per batch. |
| 281 | + console: Optional console for reporting progress. |
| 282 | +
|
| 283 | + Returns: |
| 284 | + The total number of deleted expired snapshots. |
| 285 | + """ |
| 286 | + |
| 287 | + def process_batch(batch: ExpiredSnapshotBatch) -> None: |
| 288 | + logger.info( |
| 289 | + "Processing batch of size %s and max_updated_ts of %s", |
| 290 | + len(batch.expired_snapshot_ids), |
| 291 | + batch.batch_boundary.updated_ts, |
| 292 | + ) |
| 293 | + snapshot_evaluator.cleanup( |
| 294 | + target_snapshots=batch.cleanup_tasks, |
| 295 | + on_complete=console.update_cleanup_progress if console else None, |
| 296 | + ) |
| 297 | + state_sync.delete_expired_snapshots( |
| 298 | + upper_batch_boundary=batch.batch_boundary.to_upper_batch_boundary(), |
| 299 | + ignore_ttl=ignore_ttl, |
| 300 | + ) |
| 301 | + logger.info("Cleaned up expired snapshots batch") |
| 302 | + |
| 303 | + num_deleted = for_each_expired_snapshot_batch( |
| 304 | + state_sync, |
| 305 | + current_ts=current_ts, |
| 306 | + ignore_ttl=ignore_ttl, |
| 307 | + callback=process_batch, |
| 308 | + batch_size=batch_size, |
| 309 | + ) |
| 310 | + logger.info("Cleaned up %s expired snapshots", num_deleted) |
0 commit comments