Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sqlmesh/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,12 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None:
help="Explain the plan instead of applying it.",
default=None,
)
@click.option(
"--ignore-cron",
is_flag=True,
help="Run all missing intervals, ignoring individual cron schedules. Only applies if --run is set.",
default=None,
)
@click.option(
"--min-intervals",
default=0,
Expand All @@ -543,6 +549,7 @@ def plan(
select_models = kwargs.pop("select_model") or None
allow_destructive_models = kwargs.pop("allow_destructive_model") or None
backfill_models = kwargs.pop("backfill_model") or None
ignore_cron = kwargs.pop("ignore_cron") or None
setattr(get_console(), "verbosity", Verbosity(verbose))

context.plan(
Expand All @@ -551,6 +558,7 @@ def plan(
select_models=select_models,
allow_destructive_models=allow_destructive_models,
backfill_models=backfill_models,
ignore_cron=ignore_cron,
**kwargs,
)

Expand Down
5 changes: 5 additions & 0 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,7 @@ def plan(
diff_rendered: t.Optional[bool] = None,
skip_linter: t.Optional[bool] = None,
explain: t.Optional[bool] = None,
ignore_cron: t.Optional[bool] = None,
min_intervals: t.Optional[int] = None,
) -> Plan:
"""Interactively creates a plan.
Expand Down Expand Up @@ -1367,6 +1368,7 @@ def plan(
diff_rendered=diff_rendered,
skip_linter=skip_linter,
explain=explain,
ignore_cron=ignore_cron,
min_intervals=min_intervals,
)

Expand Down Expand Up @@ -1417,6 +1419,7 @@ def plan_builder(
diff_rendered: t.Optional[bool] = None,
skip_linter: t.Optional[bool] = None,
explain: t.Optional[bool] = None,
ignore_cron: t.Optional[bool] = None,
min_intervals: t.Optional[int] = None,
) -> PlanBuilder:
"""Creates a plan builder.
Expand Down Expand Up @@ -1590,6 +1593,7 @@ def plan_builder(
max_interval_end_per_model = None
default_start, default_end = None, None
if not run:
ignore_cron = False
max_interval_end_per_model = self._get_max_interval_end_per_model(
snapshots, backfill_models
)
Expand Down Expand Up @@ -1654,6 +1658,7 @@ def plan_builder(
console=self.console,
user_provided_flags=user_provided_flags,
explain=explain or False,
ignore_cron=ignore_cron or False,
)

def apply(
Expand Down
4 changes: 4 additions & 0 deletions sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class PlanBuilder:
the environment is not finalized.
start_override_per_model: A mapping of model FQNs to target start dates.
end_override_per_model: A mapping of model FQNs to target end dates.
ignore_cron: Whether to ignore the node's cron schedule when computing missing intervals.
explain: Whether to explain the plan instead of applying it.
"""

Expand Down Expand Up @@ -120,6 +121,7 @@ def __init__(
end_bounded: bool = False,
ensure_finalized_snapshots: bool = False,
explain: bool = False,
ignore_cron: bool = False,
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
console: t.Optional[PlanBuilderConsole] = None,
Expand All @@ -137,6 +139,7 @@ def __init__(
self._enable_preview = enable_preview
self._end_bounded = end_bounded
self._ensure_finalized_snapshots = ensure_finalized_snapshots
self._ignore_cron = ignore_cron
self._start_override_per_model = start_override_per_model
self._end_override_per_model = end_override_per_model
self._environment_ttl = environment_ttl
Expand Down Expand Up @@ -335,6 +338,7 @@ def build(self) -> Plan:
execution_time=plan_execution_time,
end_bounded=self._end_bounded,
ensure_finalized_snapshots=self._ensure_finalized_snapshots,
ignore_cron=self._ignore_cron,
user_provided_flags=self._user_provided_flags,
)
self._latest_plan = plan
Expand Down
4 changes: 4 additions & 0 deletions sqlmesh/core/plan/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Plan(PydanticModel, frozen=True):
end_bounded: bool
ensure_finalized_snapshots: bool
explain: bool
ignore_cron: bool = False

environment_ttl: t.Optional[str] = None
environment_naming_info: EnvironmentNamingInfo
Expand Down Expand Up @@ -181,6 +182,7 @@ def missing_intervals(self) -> t.List[SnapshotIntervals]:
start_override_per_model=self.start_override_per_model,
end_override_per_model=self.end_override_per_model,
end_bounded=self.end_bounded,
ignore_cron=self.ignore_cron,
).items()
if snapshot.is_model and missing
]
Expand Down Expand Up @@ -259,6 +261,7 @@ def to_evaluatable(self) -> EvaluatablePlan:
forward_only=self.forward_only,
end_bounded=self.end_bounded,
ensure_finalized_snapshots=self.ensure_finalized_snapshots,
ignore_cron=self.ignore_cron,
directly_modified_snapshots=sorted(self.directly_modified),
indirectly_modified_snapshots={
s.name: sorted(snapshot_ids) for s, snapshot_ids in self.indirectly_modified.items()
Expand Down Expand Up @@ -300,6 +303,7 @@ class EvaluatablePlan(PydanticModel):
forward_only: bool
end_bounded: bool
ensure_finalized_snapshots: bool
ignore_cron: bool = False
directly_modified_snapshots: t.List[SnapshotId]
indirectly_modified_snapshots: t.Dict[str, t.List[SnapshotId]]
metadata_updated_snapshots: t.List[SnapshotId]
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/plan/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ def _missing_intervals(
},
deployability_index=deployability_index,
end_bounded=plan.end_bounded,
ignore_cron=plan.ignore_cron,
start_override_per_model=plan.start_override_per_model,
end_override_per_model=plan.end_override_per_model,
)
Expand Down
7 changes: 7 additions & 0 deletions sqlmesh/magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,12 @@ def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None
action="store_true",
help="Run latest intervals as part of the plan application (prod environment only).",
)
@argument(
"--ignore-cron",
action="store_true",
help="Run for all missing intervals, ignoring individual cron schedules. Only applies if --run is set.",
default=None,
)
@argument(
"--enable-preview",
action="store_true",
Expand Down Expand Up @@ -533,6 +539,7 @@ def plan(self, context: Context, line: str) -> None:
select_models=args.select_model,
no_diff=args.no_diff,
run=args.run,
ignore_cron=args.run,
enable_preview=args.enable_preview,
diff_rendered=args.diff_rendered,
)
Expand Down
54 changes: 54 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,60 @@ def test_plan_with_run(
}


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_plan_ignore_cron(
init_and_plan_context: t.Callable,
):
context, _ = init_and_plan_context("examples/sushi")

expressions = d.parse(
f"""
MODEL (
name memory.sushi.test_allow_partials,
kind INCREMENTAL_UNMANAGED,
allow_partials true,
start '2023-01-01',
);

SELECT @end_ts AS end_ts
"""
)
model = load_sql_based_model(expressions)

context.upsert_model(model)
context.plan("prod", skip_tests=True, auto_apply=True, no_prompts=True)

assert (
context.engine_adapter.fetchone("SELECT MAX(end_ts) FROM memory.sushi.test_allow_partials")[
0
]
== "2023-01-07 23:59:59.999999"
)

plan_no_ignore_cron = context.plan_builder(
"prod", run=True, ignore_cron=False, skip_tests=True
).build()
assert not plan_no_ignore_cron.missing_intervals

plan = context.plan_builder("prod", run=True, ignore_cron=True, skip_tests=True).build()
assert plan.missing_intervals == [
SnapshotIntervals(
snapshot_id=context.get_snapshot(model, raise_if_missing=True).snapshot_id,
intervals=[
(to_timestamp("2023-01-08"), to_timestamp("2023-01-08 15:00:00")),
],
)
]
context.apply(plan)

assert (
context.engine_adapter.fetchone("SELECT MAX(end_ts) FROM memory.sushi.test_allow_partials")[
0
]
== "2023-01-08 14:59:59.999999"
)


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_run_with_select_models_no_auto_upstream(
init_and_plan_context: t.Callable,
Expand Down
61 changes: 61 additions & 0 deletions tests/core/test_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3359,3 +3359,64 @@ def test_environment_statements_change_allows_dev_environment_creation(make_snap
assert plan is not None
assert plan.context_diff.has_environment_statements_changes
assert plan.context_diff.environment_statements == environment_statements


def test_plan_ignore_cron_flag(make_snapshot):
snapshot_a = make_snapshot(
SqlModel(
name="test_model",
kind=IncrementalByTimeRangeKind(time_column="ds"),
cron="@daily", # Daily cron schedule
start="2023-01-01",
query=parse_one("SELECT 1 as id, ds FROM VALUES ('2023-01-01') t(ds)"),
allow_partials=True,
)
)
snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=False)

context_diff = ContextDiff(
environment="dev",
is_new_environment=True,
is_unfinalized_environment=False,
normalize_environment_name=True,
create_from="prod",
create_from_env_exists=True,
added=set(),
removed_snapshots={},
modified_snapshots={},
snapshots={snapshot_a.snapshot_id: snapshot_a},
new_snapshots={snapshot_a.snapshot_id: snapshot_a},
previous_plan_id=None,
previously_promoted_snapshot_ids=set(),
previous_finalized_snapshots=None,
previous_gateway_managed_virtual_layer=False,
gateway_managed_virtual_layer=False,
environment_statements=[],
)

plan_builder_ignore_cron = PlanBuilder(
context_diff,
start="2023-01-01",
execution_time="2023-01-05 12:00:00",
is_dev=True,
include_unmodified=True,
ignore_cron=True,
end_bounded=False,
)

plan = plan_builder_ignore_cron.build()
assert plan.ignore_cron is True
assert plan.to_evaluatable().ignore_cron is True

assert plan.missing_intervals == [
SnapshotIntervals(
snapshot_id=snapshot_a.snapshot_id,
intervals=[
(to_timestamp("2023-01-01"), to_timestamp("2023-01-02")),
(to_timestamp("2023-01-02"), to_timestamp("2023-01-03")),
(to_timestamp("2023-01-03"), to_timestamp("2023-01-04")),
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
(to_timestamp("2023-01-05"), to_timestamp("2023-01-05 12:00:00")),
],
)
]
Loading