Skip to content

Commit bf811c2

Browse files
authored
Feat: Propagate ignore_cron into plan (#5152)
1 parent 57585b3 commit bf811c2

File tree

9 files changed

+163
-0
lines changed

9 files changed

+163
-0
lines changed

sqlmesh/cli/main.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,12 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None:
522522
help="Explain the plan instead of applying it.",
523523
default=None,
524524
)
525+
@click.option(
526+
"--ignore-cron",
527+
is_flag=True,
528+
help="Run all missing intervals, ignoring individual cron schedules. Only applies if --run is set.",
529+
default=None,
530+
)
525531
@click.option(
526532
"--min-intervals",
527533
default=0,
@@ -543,6 +549,7 @@ def plan(
543549
select_models = kwargs.pop("select_model") or None
544550
allow_destructive_models = kwargs.pop("allow_destructive_model") or None
545551
backfill_models = kwargs.pop("backfill_model") or None
552+
ignore_cron = kwargs.pop("ignore_cron") or None
546553
setattr(get_console(), "verbosity", Verbosity(verbose))
547554

548555
context.plan(
@@ -551,6 +558,7 @@ def plan(
551558
select_models=select_models,
552559
allow_destructive_models=allow_destructive_models,
553560
backfill_models=backfill_models,
561+
ignore_cron=ignore_cron,
554562
**kwargs,
555563
)
556564

sqlmesh/core/context.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1290,6 +1290,7 @@ def plan(
12901290
diff_rendered: t.Optional[bool] = None,
12911291
skip_linter: t.Optional[bool] = None,
12921292
explain: t.Optional[bool] = None,
1293+
ignore_cron: t.Optional[bool] = None,
12931294
min_intervals: t.Optional[int] = None,
12941295
) -> Plan:
12951296
"""Interactively creates a plan.
@@ -1367,6 +1368,7 @@ def plan(
13671368
diff_rendered=diff_rendered,
13681369
skip_linter=skip_linter,
13691370
explain=explain,
1371+
ignore_cron=ignore_cron,
13701372
min_intervals=min_intervals,
13711373
)
13721374

@@ -1417,6 +1419,7 @@ def plan_builder(
14171419
diff_rendered: t.Optional[bool] = None,
14181420
skip_linter: t.Optional[bool] = None,
14191421
explain: t.Optional[bool] = None,
1422+
ignore_cron: t.Optional[bool] = None,
14201423
min_intervals: t.Optional[int] = None,
14211424
) -> PlanBuilder:
14221425
"""Creates a plan builder.
@@ -1590,6 +1593,7 @@ def plan_builder(
15901593
max_interval_end_per_model = None
15911594
default_start, default_end = None, None
15921595
if not run:
1596+
ignore_cron = False
15931597
max_interval_end_per_model = self._get_max_interval_end_per_model(
15941598
snapshots, backfill_models
15951599
)
@@ -1654,6 +1658,7 @@ def plan_builder(
16541658
console=self.console,
16551659
user_provided_flags=user_provided_flags,
16561660
explain=explain or False,
1661+
ignore_cron=ignore_cron or False,
16571662
)
16581663

16591664
def apply(

sqlmesh/core/plan/builder.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ class PlanBuilder:
8989
the environment is not finalized.
9090
start_override_per_model: A mapping of model FQNs to target start dates.
9191
end_override_per_model: A mapping of model FQNs to target end dates.
92+
ignore_cron: Whether to ignore the node's cron schedule when computing missing intervals.
9293
explain: Whether to explain the plan instead of applying it.
9394
"""
9495

@@ -120,6 +121,7 @@ def __init__(
120121
end_bounded: bool = False,
121122
ensure_finalized_snapshots: bool = False,
122123
explain: bool = False,
124+
ignore_cron: bool = False,
123125
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
124126
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
125127
console: t.Optional[PlanBuilderConsole] = None,
@@ -137,6 +139,7 @@ def __init__(
137139
self._enable_preview = enable_preview
138140
self._end_bounded = end_bounded
139141
self._ensure_finalized_snapshots = ensure_finalized_snapshots
142+
self._ignore_cron = ignore_cron
140143
self._start_override_per_model = start_override_per_model
141144
self._end_override_per_model = end_override_per_model
142145
self._environment_ttl = environment_ttl
@@ -335,6 +338,7 @@ def build(self) -> Plan:
335338
execution_time=plan_execution_time,
336339
end_bounded=self._end_bounded,
337340
ensure_finalized_snapshots=self._ensure_finalized_snapshots,
341+
ignore_cron=self._ignore_cron,
338342
user_provided_flags=self._user_provided_flags,
339343
)
340344
self._latest_plan = plan

sqlmesh/core/plan/definition.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class Plan(PydanticModel, frozen=True):
4848
end_bounded: bool
4949
ensure_finalized_snapshots: bool
5050
explain: bool
51+
ignore_cron: bool = False
5152

5253
environment_ttl: t.Optional[str] = None
5354
environment_naming_info: EnvironmentNamingInfo
@@ -181,6 +182,7 @@ def missing_intervals(self) -> t.List[SnapshotIntervals]:
181182
start_override_per_model=self.start_override_per_model,
182183
end_override_per_model=self.end_override_per_model,
183184
end_bounded=self.end_bounded,
185+
ignore_cron=self.ignore_cron,
184186
).items()
185187
if snapshot.is_model and missing
186188
]
@@ -259,6 +261,7 @@ def to_evaluatable(self) -> EvaluatablePlan:
259261
forward_only=self.forward_only,
260262
end_bounded=self.end_bounded,
261263
ensure_finalized_snapshots=self.ensure_finalized_snapshots,
264+
ignore_cron=self.ignore_cron,
262265
directly_modified_snapshots=sorted(self.directly_modified),
263266
indirectly_modified_snapshots={
264267
s.name: sorted(snapshot_ids) for s, snapshot_ids in self.indirectly_modified.items()
@@ -300,6 +303,7 @@ class EvaluatablePlan(PydanticModel):
300303
forward_only: bool
301304
end_bounded: bool
302305
ensure_finalized_snapshots: bool
306+
ignore_cron: bool = False
303307
directly_modified_snapshots: t.List[SnapshotId]
304308
indirectly_modified_snapshots: t.Dict[str, t.List[SnapshotId]]
305309
metadata_updated_snapshots: t.List[SnapshotId]

sqlmesh/core/plan/stages.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,7 @@ def _missing_intervals(
524524
},
525525
deployability_index=deployability_index,
526526
end_bounded=plan.end_bounded,
527+
ignore_cron=plan.ignore_cron,
527528
start_override_per_model=plan.start_override_per_model,
528529
end_override_per_model=plan.end_override_per_model,
529530
)

sqlmesh/magics.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,12 @@ def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None
486486
action="store_true",
487487
help="Run latest intervals as part of the plan application (prod environment only).",
488488
)
489+
@argument(
490+
"--ignore-cron",
491+
action="store_true",
492+
help="Run for all missing intervals, ignoring individual cron schedules. Only applies if --run is set.",
493+
default=None,
494+
)
489495
@argument(
490496
"--enable-preview",
491497
action="store_true",
@@ -533,6 +539,7 @@ def plan(self, context: Context, line: str) -> None:
533539
select_models=args.select_model,
534540
no_diff=args.no_diff,
535541
run=args.run,
542+
ignore_cron=args.run,
536543
enable_preview=args.enable_preview,
537544
diff_rendered=args.diff_rendered,
538545
)

tests/core/test_integration.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1609,6 +1609,60 @@ def test_plan_with_run(
16091609
}
16101610

16111611

1612+
@time_machine.travel("2023-01-08 15:00:00 UTC")
1613+
def test_plan_ignore_cron(
1614+
init_and_plan_context: t.Callable,
1615+
):
1616+
context, _ = init_and_plan_context("examples/sushi")
1617+
1618+
expressions = d.parse(
1619+
f"""
1620+
MODEL (
1621+
name memory.sushi.test_allow_partials,
1622+
kind INCREMENTAL_UNMANAGED,
1623+
allow_partials true,
1624+
start '2023-01-01',
1625+
);
1626+
1627+
SELECT @end_ts AS end_ts
1628+
"""
1629+
)
1630+
model = load_sql_based_model(expressions)
1631+
1632+
context.upsert_model(model)
1633+
context.plan("prod", skip_tests=True, auto_apply=True, no_prompts=True)
1634+
1635+
assert (
1636+
context.engine_adapter.fetchone("SELECT MAX(end_ts) FROM memory.sushi.test_allow_partials")[
1637+
0
1638+
]
1639+
== "2023-01-07 23:59:59.999999"
1640+
)
1641+
1642+
plan_no_ignore_cron = context.plan_builder(
1643+
"prod", run=True, ignore_cron=False, skip_tests=True
1644+
).build()
1645+
assert not plan_no_ignore_cron.missing_intervals
1646+
1647+
plan = context.plan_builder("prod", run=True, ignore_cron=True, skip_tests=True).build()
1648+
assert plan.missing_intervals == [
1649+
SnapshotIntervals(
1650+
snapshot_id=context.get_snapshot(model, raise_if_missing=True).snapshot_id,
1651+
intervals=[
1652+
(to_timestamp("2023-01-08"), to_timestamp("2023-01-08 15:00:00")),
1653+
],
1654+
)
1655+
]
1656+
context.apply(plan)
1657+
1658+
assert (
1659+
context.engine_adapter.fetchone("SELECT MAX(end_ts) FROM memory.sushi.test_allow_partials")[
1660+
0
1661+
]
1662+
== "2023-01-08 14:59:59.999999"
1663+
)
1664+
1665+
16121666
@time_machine.travel("2023-01-08 15:00:00 UTC")
16131667
def test_run_with_select_models_no_auto_upstream(
16141668
init_and_plan_context: t.Callable,

tests/core/test_plan.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3359,3 +3359,64 @@ def test_environment_statements_change_allows_dev_environment_creation(make_snap
33593359
assert plan is not None
33603360
assert plan.context_diff.has_environment_statements_changes
33613361
assert plan.context_diff.environment_statements == environment_statements
3362+
3363+
3364+
def test_plan_ignore_cron_flag(make_snapshot):
3365+
snapshot_a = make_snapshot(
3366+
SqlModel(
3367+
name="test_model",
3368+
kind=IncrementalByTimeRangeKind(time_column="ds"),
3369+
cron="@daily", # Daily cron schedule
3370+
start="2023-01-01",
3371+
query=parse_one("SELECT 1 as id, ds FROM VALUES ('2023-01-01') t(ds)"),
3372+
allow_partials=True,
3373+
)
3374+
)
3375+
snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=False)
3376+
3377+
context_diff = ContextDiff(
3378+
environment="dev",
3379+
is_new_environment=True,
3380+
is_unfinalized_environment=False,
3381+
normalize_environment_name=True,
3382+
create_from="prod",
3383+
create_from_env_exists=True,
3384+
added=set(),
3385+
removed_snapshots={},
3386+
modified_snapshots={},
3387+
snapshots={snapshot_a.snapshot_id: snapshot_a},
3388+
new_snapshots={snapshot_a.snapshot_id: snapshot_a},
3389+
previous_plan_id=None,
3390+
previously_promoted_snapshot_ids=set(),
3391+
previous_finalized_snapshots=None,
3392+
previous_gateway_managed_virtual_layer=False,
3393+
gateway_managed_virtual_layer=False,
3394+
environment_statements=[],
3395+
)
3396+
3397+
plan_builder_ignore_cron = PlanBuilder(
3398+
context_diff,
3399+
start="2023-01-01",
3400+
execution_time="2023-01-05 12:00:00",
3401+
is_dev=True,
3402+
include_unmodified=True,
3403+
ignore_cron=True,
3404+
end_bounded=False,
3405+
)
3406+
3407+
plan = plan_builder_ignore_cron.build()
3408+
assert plan.ignore_cron is True
3409+
assert plan.to_evaluatable().ignore_cron is True
3410+
3411+
assert plan.missing_intervals == [
3412+
SnapshotIntervals(
3413+
snapshot_id=snapshot_a.snapshot_id,
3414+
intervals=[
3415+
(to_timestamp("2023-01-01"), to_timestamp("2023-01-02")),
3416+
(to_timestamp("2023-01-02"), to_timestamp("2023-01-03")),
3417+
(to_timestamp("2023-01-03"), to_timestamp("2023-01-04")),
3418+
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
3419+
(to_timestamp("2023-01-05"), to_timestamp("2023-01-05 12:00:00")),
3420+
],
3421+
)
3422+
]

0 commit comments

Comments
 (0)