From 45e6f8e8fddff3c3a10b8f88867977b5f8a4ed96 Mon Sep 17 00:00:00 2001 From: eakmanrq <6326532+eakmanrq@users.noreply.github.com> Date: Wed, 1 Oct 2025 16:05:02 -0700 Subject: [PATCH] feat: support batching of janitor cleanup --- docs/reference/cli.md | 9 +++++ sqlmesh/cli/main.py | 23 ++++++++++- sqlmesh/core/context.py | 38 ++++++++++++++++-- tests/cli/test_cli.py | 36 ++++++++++++++++- tests/core/test_context.py | 81 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 180 insertions(+), 7 deletions(-) diff --git a/docs/reference/cli.md b/docs/reference/cli.md index a9ce9366e1..18269276c6 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -309,6 +309,15 @@ Usage: sqlmesh janitor [OPTIONS] Options: --ignore-ttl Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire + --batch-start TEXT + Optional datetime (for example, "2024-12-01" or "2 days ago") + to begin evaluating expired snapshots. When omitted the + janitor starts at the current time. + --batch-seconds INTEGER + When provided alongside --batch-start, reruns the janitor in + batches by advancing the evaluation time in the given number + of seconds until it catches up to "now". Use this to throttle + large cleanup jobs. --help Show this message and exit. ``` diff --git a/sqlmesh/cli/main.py b/sqlmesh/cli/main.py index 2f18c0a4b7..076d3f26f6 100644 --- a/sqlmesh/cli/main.py +++ b/sqlmesh/cli/main.py @@ -631,16 +631,35 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None: is_flag=True, help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire", ) +@click.option( + "--batch-start", + help="The batch start datetime to start processing expired snapshots to avoid large requests.", +) +@click.option( + "--batch-seconds", + "batch_size_seconds", + type=int, + help="When provided with --batch-start, runs the janitor in batches by incrementing the timestamp by this many seconds until reaching the current time.", +) @click.pass_context @error_handler @cli_analytics -def janitor(ctx: click.Context, ignore_ttl: bool, **kwargs: t.Any) -> None: +def janitor( + ctx: click.Context, + ignore_ttl: bool, + batch_start: t.Optional[TimeLike], + batch_size_seconds: t.Optional[int], +) -> None: """ Run the janitor process on-demand. The janitor cleans up old environments and expired snapshots. """ - ctx.obj.run_janitor(ignore_ttl, **kwargs) + ctx.obj.run_janitor( + ignore_ttl, + batch_start=batch_start, + batch_size_seconds=batch_size_seconds, + ) @cli.command("destroy") diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index e31a04fe81..f4716cb850 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -855,12 +855,42 @@ def _has_environment_changed() -> bool: return completion_status @python_api_analytics - def run_janitor(self, ignore_ttl: bool) -> bool: + def run_janitor( + self, + ignore_ttl: bool, + batch_start: t.Optional[TimeLike] = None, + batch_size_seconds: t.Optional[int] = None, + ) -> bool: success = False if self.console.start_cleanup(ignore_ttl): try: - self._run_janitor(ignore_ttl) + if batch_size_seconds is not None: + if batch_size_seconds <= 0: + raise SQLMeshError("batch_size_seconds must be a positive integer.") + if batch_start is None: + raise SQLMeshError( + "batch_start must be provided when batch_size_seconds is set." + ) + + current_ts = now_timestamp() + + if batch_start is not None: + batch_start = to_timestamp(batch_start) + next_batch_ts = batch_start + else: + next_batch_ts = current_ts + + batch_delta_ms = None if batch_size_seconds is None else batch_size_seconds * 1000 + + while True: + self._run_janitor(ignore_ttl, current_ts=next_batch_ts) + + if batch_delta_ms is None or next_batch_ts >= current_ts: + break + + next_batch_ts = min(next_batch_ts + batch_delta_ms, current_ts) + success = True finally: self.console.stop_cleanup(success=success) @@ -2846,8 +2876,8 @@ def _destroy(self) -> bool: return True - def _run_janitor(self, ignore_ttl: bool = False) -> None: - current_ts = now_timestamp() + def _run_janitor(self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None) -> None: + current_ts = current_ts or now_timestamp() # Clean up expired environments by removing their views and schemas self._cleanup_environments(current_ts=current_ts) diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 1be44e18f9..9f7585396f 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -14,7 +14,7 @@ from click.testing import CliRunner from sqlmesh import RuntimeEnv from sqlmesh.cli.project_init import ProjectTemplate, init_example_project -from sqlmesh.cli.main import cli +from sqlmesh.cli.main import cli, janitor from sqlmesh.core.context import Context from sqlmesh.integrations.dlt import generate_dlt_models from sqlmesh.utils.date import now_ds, time_like_to_str, timedelta, to_datetime, yesterday_ds @@ -127,6 +127,40 @@ def init_prod_and_backfill(runner, temp_dir) -> None: assert path.exists(temp_dir / "db.db") +def test_cli_janitor_batch_string(runner: CliRunner, mocker) -> None: + context_mock = mocker.MagicMock() + + result = runner.invoke( + janitor, + ["--ignore-ttl", "--batch-start", "2025-01-01", "--batch-seconds", "60"], + obj=context_mock, + ) + + assert result.exit_code == 0 + context_mock.run_janitor.assert_called_once_with( + True, + batch_start="2025-01-01", + batch_size_seconds=60, + ) + + +def test_cli_janitor_batch_int(runner: CliRunner, mocker) -> None: + context_mock = mocker.MagicMock() + + result = runner.invoke( + janitor, + ["--ignore-ttl", "--batch-start", "1735862400000", "--batch-seconds", "60"], + obj=context_mock, + ) + + assert result.exit_code == 0 + context_mock.run_janitor.assert_called_once_with( + True, + batch_start="1735862400000", + batch_size_seconds=60, + ) + + def assert_duckdb_test(result) -> None: assert "Successfully Ran 1 tests against duckdb" in result.output diff --git a/tests/core/test_context.py b/tests/core/test_context.py index b7ce64eb4c..948089c417 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -1057,6 +1057,87 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None: ) +@time_machine.travel("2025-01-03 00:00:00 UTC", tick=False) +def test_run_janitor_default_timestamp(sushi_context: Context, mocker: MockerFixture) -> None: + context = sushi_context + mocker.patch.object(context.console, "start_cleanup", return_value=True) + stop_cleanup = mocker.patch.object(context.console, "stop_cleanup") + run_janitor = mocker.patch.object(context, "_run_janitor") + + result = context.run_janitor(ignore_ttl=False) + + assert result is True + run_janitor.assert_called_once_with(False, current_ts=1735862400000) + stop_cleanup.assert_called_once_with(success=True) + + +@time_machine.travel("2025-01-03 00:00:00 UTC", tick=False) +def test_run_janitor_no_batch_size_seconds(sushi_context: Context, mocker: MockerFixture) -> None: + context = sushi_context + mocker.patch.object(context.console, "start_cleanup", return_value=True) + stop_cleanup = mocker.patch.object(context.console, "stop_cleanup") + run_janitor = mocker.patch.object(context, "_run_janitor") + + result = context.run_janitor( + ignore_ttl=False, + batch_start="2 days ago", + ) + + assert result is True + assert run_janitor.call_args_list == [ + call(False, current_ts=1735689600000), + ] + stop_cleanup.assert_called_once_with(success=True) + + +@time_machine.travel("2025-01-03 00:00:00 UTC", tick=False) +def test_run_janitor_batches(sushi_context: Context, mocker: MockerFixture) -> None: + context = sushi_context + mocker.patch.object(context.console, "start_cleanup", return_value=True) + stop_cleanup = mocker.patch.object(context.console, "stop_cleanup") + run_janitor = mocker.patch.object(context, "_run_janitor") + + result = context.run_janitor( + ignore_ttl=False, + batch_start="2 days ago", + batch_size_seconds=60 * 60 * 24, # 1 day + ) + + assert result is True + assert run_janitor.call_args_list == [ + call(False, current_ts=1735689600000), + call(False, current_ts=1735776000000), + call(False, current_ts=1735862400000), + ] + stop_cleanup.assert_called_once_with(success=True) + + +def test_run_janitor_batch_requires_current_ts( + sushi_context: Context, mocker: MockerFixture +) -> None: + context = sushi_context + mocker.patch.object(context.console, "start_cleanup", return_value=True) + stop_cleanup = mocker.patch.object(context.console, "stop_cleanup") + + with pytest.raises(SQLMeshError, match="batch_start must be provided"): + context.run_janitor(ignore_ttl=False, batch_size_seconds=60) + + stop_cleanup.assert_called_once_with(success=False) + + +def test_run_janitor_batch_requires_positive_seconds( + sushi_context: Context, mocker: MockerFixture +) -> None: + context = sushi_context + mocker.patch.object(context.console, "start_cleanup", return_value=True) + stop_cleanup = mocker.patch.object(context.console, "stop_cleanup") + + with pytest.raises(SQLMeshError, match="positive integer"): + context.run_janitor(ignore_ttl=False, batch_start=100, batch_size_seconds=0) + + stop_cleanup.assert_called_once_with(success=False) + + @pytest.mark.slow def test_plan_default_end(sushi_context_pre_scheduling: Context): prod_plan_builder = sushi_context_pre_scheduling.plan_builder("prod")