Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,15 @@ Usage: sqlmesh janitor [OPTIONS]
Options:
--ignore-ttl Cleanup snapshots that are not referenced in any environment,
regardless of when they're set to expire
--batch-start TEXT
Optional datetime (for example, "2024-12-01" or "2 days ago")
to begin evaluating expired snapshots. When omitted the
janitor starts at the current time.
--batch-seconds INTEGER
When provided alongside --batch-start, reruns the janitor in
batches by advancing the evaluation time in the given number
of seconds until it catches up to "now". Use this to throttle
large cleanup jobs.
--help Show this message and exit.
```

Expand Down
23 changes: 21 additions & 2 deletions sqlmesh/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,16 +631,35 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
is_flag=True,
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire",
)
@click.option(
"--batch-start",
help="The batch start datetime to start processing expired snapshots to avoid large requests.",
)
@click.option(
"--batch-seconds",
"batch_size_seconds",
type=int,
help="When provided with --batch-start, runs the janitor in batches by incrementing the timestamp by this many seconds until reaching the current time.",
)
@click.pass_context
@error_handler
@cli_analytics
def janitor(ctx: click.Context, ignore_ttl: bool, **kwargs: t.Any) -> None:
def janitor(
ctx: click.Context,
ignore_ttl: bool,
batch_start: t.Optional[TimeLike],
batch_size_seconds: t.Optional[int],
) -> None:
"""
Run the janitor process on-demand.

The janitor cleans up old environments and expired snapshots.
"""
ctx.obj.run_janitor(ignore_ttl, **kwargs)
ctx.obj.run_janitor(
ignore_ttl,
batch_start=batch_start,
batch_size_seconds=batch_size_seconds,
)


@cli.command("destroy")
Expand Down
38 changes: 34 additions & 4 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -855,12 +855,42 @@ def _has_environment_changed() -> bool:
return completion_status

@python_api_analytics
def run_janitor(self, ignore_ttl: bool) -> bool:
def run_janitor(
self,
ignore_ttl: bool,
batch_start: t.Optional[TimeLike] = None,
batch_size_seconds: t.Optional[int] = None,
) -> bool:
success = False

if self.console.start_cleanup(ignore_ttl):
try:
self._run_janitor(ignore_ttl)
if batch_size_seconds is not None:
if batch_size_seconds <= 0:
raise SQLMeshError("batch_size_seconds must be a positive integer.")
if batch_start is None:
raise SQLMeshError(
"batch_start must be provided when batch_size_seconds is set."
)

current_ts = now_timestamp()

if batch_start is not None:
batch_start = to_timestamp(batch_start)
next_batch_ts = batch_start
else:
next_batch_ts = current_ts

batch_delta_ms = None if batch_size_seconds is None else batch_size_seconds * 1000

while True:
self._run_janitor(ignore_ttl, current_ts=next_batch_ts)

if batch_delta_ms is None or next_batch_ts >= current_ts:
break

next_batch_ts = min(next_batch_ts + batch_delta_ms, current_ts)

success = True
finally:
self.console.stop_cleanup(success=success)
Expand Down Expand Up @@ -2846,8 +2876,8 @@ def _destroy(self) -> bool:

return True

def _run_janitor(self, ignore_ttl: bool = False) -> None:
current_ts = now_timestamp()
def _run_janitor(self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None) -> None:
current_ts = current_ts or now_timestamp()

# Clean up expired environments by removing their views and schemas
self._cleanup_environments(current_ts=current_ts)
Expand Down
36 changes: 35 additions & 1 deletion tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from click.testing import CliRunner
from sqlmesh import RuntimeEnv
from sqlmesh.cli.project_init import ProjectTemplate, init_example_project
from sqlmesh.cli.main import cli
from sqlmesh.cli.main import cli, janitor
from sqlmesh.core.context import Context
from sqlmesh.integrations.dlt import generate_dlt_models
from sqlmesh.utils.date import now_ds, time_like_to_str, timedelta, to_datetime, yesterday_ds
Expand Down Expand Up @@ -127,6 +127,40 @@ def init_prod_and_backfill(runner, temp_dir) -> None:
assert path.exists(temp_dir / "db.db")


def test_cli_janitor_batch_string(runner: CliRunner, mocker) -> None:
context_mock = mocker.MagicMock()

result = runner.invoke(
janitor,
["--ignore-ttl", "--batch-start", "2025-01-01", "--batch-seconds", "60"],
obj=context_mock,
)

assert result.exit_code == 0
context_mock.run_janitor.assert_called_once_with(
True,
batch_start="2025-01-01",
batch_size_seconds=60,
)


def test_cli_janitor_batch_int(runner: CliRunner, mocker) -> None:
context_mock = mocker.MagicMock()

result = runner.invoke(
janitor,
["--ignore-ttl", "--batch-start", "1735862400000", "--batch-seconds", "60"],
obj=context_mock,
)

assert result.exit_code == 0
context_mock.run_janitor.assert_called_once_with(
True,
batch_start="1735862400000",
batch_size_seconds=60,
)


def assert_duckdb_test(result) -> None:
assert "Successfully Ran 1 tests against duckdb" in result.output

Expand Down
81 changes: 81 additions & 0 deletions tests/core/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,87 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
)


@time_machine.travel("2025-01-03 00:00:00 UTC", tick=False)
def test_run_janitor_default_timestamp(sushi_context: Context, mocker: MockerFixture) -> None:
context = sushi_context
mocker.patch.object(context.console, "start_cleanup", return_value=True)
stop_cleanup = mocker.patch.object(context.console, "stop_cleanup")
run_janitor = mocker.patch.object(context, "_run_janitor")

result = context.run_janitor(ignore_ttl=False)

assert result is True
run_janitor.assert_called_once_with(False, current_ts=1735862400000)
stop_cleanup.assert_called_once_with(success=True)


@time_machine.travel("2025-01-03 00:00:00 UTC", tick=False)
def test_run_janitor_no_batch_size_seconds(sushi_context: Context, mocker: MockerFixture) -> None:
context = sushi_context
mocker.patch.object(context.console, "start_cleanup", return_value=True)
stop_cleanup = mocker.patch.object(context.console, "stop_cleanup")
run_janitor = mocker.patch.object(context, "_run_janitor")

result = context.run_janitor(
ignore_ttl=False,
batch_start="2 days ago",
)

assert result is True
assert run_janitor.call_args_list == [
call(False, current_ts=1735689600000),
]
stop_cleanup.assert_called_once_with(success=True)


@time_machine.travel("2025-01-03 00:00:00 UTC", tick=False)
def test_run_janitor_batches(sushi_context: Context, mocker: MockerFixture) -> None:
context = sushi_context
mocker.patch.object(context.console, "start_cleanup", return_value=True)
stop_cleanup = mocker.patch.object(context.console, "stop_cleanup")
run_janitor = mocker.patch.object(context, "_run_janitor")

result = context.run_janitor(
ignore_ttl=False,
batch_start="2 days ago",
batch_size_seconds=60 * 60 * 24, # 1 day
)

assert result is True
assert run_janitor.call_args_list == [
call(False, current_ts=1735689600000),
call(False, current_ts=1735776000000),
call(False, current_ts=1735862400000),
]
stop_cleanup.assert_called_once_with(success=True)


def test_run_janitor_batch_requires_current_ts(
sushi_context: Context, mocker: MockerFixture
) -> None:
context = sushi_context
mocker.patch.object(context.console, "start_cleanup", return_value=True)
stop_cleanup = mocker.patch.object(context.console, "stop_cleanup")

with pytest.raises(SQLMeshError, match="batch_start must be provided"):
context.run_janitor(ignore_ttl=False, batch_size_seconds=60)

stop_cleanup.assert_called_once_with(success=False)


def test_run_janitor_batch_requires_positive_seconds(
sushi_context: Context, mocker: MockerFixture
) -> None:
context = sushi_context
mocker.patch.object(context.console, "start_cleanup", return_value=True)
stop_cleanup = mocker.patch.object(context.console, "stop_cleanup")

with pytest.raises(SQLMeshError, match="positive integer"):
context.run_janitor(ignore_ttl=False, batch_start=100, batch_size_seconds=0)

stop_cleanup.assert_called_once_with(success=False)


@pytest.mark.slow
def test_plan_default_end(sushi_context_pre_scheduling: Context):
prod_plan_builder = sushi_context_pre_scheduling.plan_builder("prod")
Expand Down