diff --git a/sqlmesh/cli/main.py b/sqlmesh/cli/main.py index 8982efc9f8..5d9a12f110 100644 --- a/sqlmesh/cli/main.py +++ b/sqlmesh/cli/main.py @@ -522,6 +522,12 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None: help="Explain the plan instead of applying it.", default=None, ) +@click.option( + "--ignore-cron", + is_flag=True, + help="Run all missing intervals, ignoring individual cron schedules. Only applies if --run is set.", + default=None, +) @click.option( "--min-intervals", default=0, @@ -543,6 +549,7 @@ def plan( select_models = kwargs.pop("select_model") or None allow_destructive_models = kwargs.pop("allow_destructive_model") or None backfill_models = kwargs.pop("backfill_model") or None + ignore_cron = kwargs.pop("ignore_cron") or None setattr(get_console(), "verbosity", Verbosity(verbose)) context.plan( @@ -551,6 +558,7 @@ def plan( select_models=select_models, allow_destructive_models=allow_destructive_models, backfill_models=backfill_models, + ignore_cron=ignore_cron, **kwargs, ) diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 7d27092f0e..eca60ecea9 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -1290,6 +1290,7 @@ def plan( diff_rendered: t.Optional[bool] = None, skip_linter: t.Optional[bool] = None, explain: t.Optional[bool] = None, + ignore_cron: t.Optional[bool] = None, min_intervals: t.Optional[int] = None, ) -> Plan: """Interactively creates a plan. @@ -1367,6 +1368,7 @@ def plan( diff_rendered=diff_rendered, skip_linter=skip_linter, explain=explain, + ignore_cron=ignore_cron, min_intervals=min_intervals, ) @@ -1417,6 +1419,7 @@ def plan_builder( diff_rendered: t.Optional[bool] = None, skip_linter: t.Optional[bool] = None, explain: t.Optional[bool] = None, + ignore_cron: t.Optional[bool] = None, min_intervals: t.Optional[int] = None, ) -> PlanBuilder: """Creates a plan builder. @@ -1590,6 +1593,7 @@ def plan_builder( max_interval_end_per_model = None default_start, default_end = None, None if not run: + ignore_cron = False max_interval_end_per_model = self._get_max_interval_end_per_model( snapshots, backfill_models ) @@ -1654,6 +1658,7 @@ def plan_builder( console=self.console, user_provided_flags=user_provided_flags, explain=explain or False, + ignore_cron=ignore_cron or False, ) def apply( diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index 7918556bad..db2b43345a 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -89,6 +89,7 @@ class PlanBuilder: the environment is not finalized. start_override_per_model: A mapping of model FQNs to target start dates. end_override_per_model: A mapping of model FQNs to target end dates. + ignore_cron: Whether to ignore the node's cron schedule when computing missing intervals. explain: Whether to explain the plan instead of applying it. """ @@ -120,6 +121,7 @@ def __init__( end_bounded: bool = False, ensure_finalized_snapshots: bool = False, explain: bool = False, + ignore_cron: bool = False, start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, console: t.Optional[PlanBuilderConsole] = None, @@ -137,6 +139,7 @@ def __init__( self._enable_preview = enable_preview self._end_bounded = end_bounded self._ensure_finalized_snapshots = ensure_finalized_snapshots + self._ignore_cron = ignore_cron self._start_override_per_model = start_override_per_model self._end_override_per_model = end_override_per_model self._environment_ttl = environment_ttl @@ -335,6 +338,7 @@ def build(self) -> Plan: execution_time=plan_execution_time, end_bounded=self._end_bounded, ensure_finalized_snapshots=self._ensure_finalized_snapshots, + ignore_cron=self._ignore_cron, user_provided_flags=self._user_provided_flags, ) self._latest_plan = plan diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index 584c0d9b51..300ac62faf 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -48,6 +48,7 @@ class Plan(PydanticModel, frozen=True): end_bounded: bool ensure_finalized_snapshots: bool explain: bool + ignore_cron: bool = False environment_ttl: t.Optional[str] = None environment_naming_info: EnvironmentNamingInfo @@ -181,6 +182,7 @@ def missing_intervals(self) -> t.List[SnapshotIntervals]: start_override_per_model=self.start_override_per_model, end_override_per_model=self.end_override_per_model, end_bounded=self.end_bounded, + ignore_cron=self.ignore_cron, ).items() if snapshot.is_model and missing ] @@ -259,6 +261,7 @@ def to_evaluatable(self) -> EvaluatablePlan: forward_only=self.forward_only, end_bounded=self.end_bounded, ensure_finalized_snapshots=self.ensure_finalized_snapshots, + ignore_cron=self.ignore_cron, directly_modified_snapshots=sorted(self.directly_modified), indirectly_modified_snapshots={ s.name: sorted(snapshot_ids) for s, snapshot_ids in self.indirectly_modified.items() @@ -300,6 +303,7 @@ class EvaluatablePlan(PydanticModel): forward_only: bool end_bounded: bool ensure_finalized_snapshots: bool + ignore_cron: bool = False directly_modified_snapshots: t.List[SnapshotId] indirectly_modified_snapshots: t.Dict[str, t.List[SnapshotId]] metadata_updated_snapshots: t.List[SnapshotId] diff --git a/sqlmesh/core/plan/stages.py b/sqlmesh/core/plan/stages.py index 7ef9fcb7ef..871b540203 100644 --- a/sqlmesh/core/plan/stages.py +++ b/sqlmesh/core/plan/stages.py @@ -524,6 +524,7 @@ def _missing_intervals( }, deployability_index=deployability_index, end_bounded=plan.end_bounded, + ignore_cron=plan.ignore_cron, start_override_per_model=plan.start_override_per_model, end_override_per_model=plan.end_override_per_model, ) diff --git a/sqlmesh/magics.py b/sqlmesh/magics.py index 454b6cd4ce..2b5f185aa9 100644 --- a/sqlmesh/magics.py +++ b/sqlmesh/magics.py @@ -486,6 +486,12 @@ def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None action="store_true", help="Run latest intervals as part of the plan application (prod environment only).", ) + @argument( + "--ignore-cron", + action="store_true", + help="Run for all missing intervals, ignoring individual cron schedules. Only applies if --run is set.", + default=None, + ) @argument( "--enable-preview", action="store_true", @@ -533,6 +539,7 @@ def plan(self, context: Context, line: str) -> None: select_models=args.select_model, no_diff=args.no_diff, run=args.run, + ignore_cron=args.run, enable_preview=args.enable_preview, diff_rendered=args.diff_rendered, ) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 856406a16d..7248b2a724 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -1609,6 +1609,60 @@ def test_plan_with_run( } +@time_machine.travel("2023-01-08 15:00:00 UTC") +def test_plan_ignore_cron( + init_and_plan_context: t.Callable, +): + context, _ = init_and_plan_context("examples/sushi") + + expressions = d.parse( + f""" + MODEL ( + name memory.sushi.test_allow_partials, + kind INCREMENTAL_UNMANAGED, + allow_partials true, + start '2023-01-01', + ); + + SELECT @end_ts AS end_ts + """ + ) + model = load_sql_based_model(expressions) + + context.upsert_model(model) + context.plan("prod", skip_tests=True, auto_apply=True, no_prompts=True) + + assert ( + context.engine_adapter.fetchone("SELECT MAX(end_ts) FROM memory.sushi.test_allow_partials")[ + 0 + ] + == "2023-01-07 23:59:59.999999" + ) + + plan_no_ignore_cron = context.plan_builder( + "prod", run=True, ignore_cron=False, skip_tests=True + ).build() + assert not plan_no_ignore_cron.missing_intervals + + plan = context.plan_builder("prod", run=True, ignore_cron=True, skip_tests=True).build() + assert plan.missing_intervals == [ + SnapshotIntervals( + snapshot_id=context.get_snapshot(model, raise_if_missing=True).snapshot_id, + intervals=[ + (to_timestamp("2023-01-08"), to_timestamp("2023-01-08 15:00:00")), + ], + ) + ] + context.apply(plan) + + assert ( + context.engine_adapter.fetchone("SELECT MAX(end_ts) FROM memory.sushi.test_allow_partials")[ + 0 + ] + == "2023-01-08 14:59:59.999999" + ) + + @time_machine.travel("2023-01-08 15:00:00 UTC") def test_run_with_select_models_no_auto_upstream( init_and_plan_context: t.Callable, diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index 66018d4be4..7254a924b1 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -3359,3 +3359,64 @@ def test_environment_statements_change_allows_dev_environment_creation(make_snap assert plan is not None assert plan.context_diff.has_environment_statements_changes assert plan.context_diff.environment_statements == environment_statements + + +def test_plan_ignore_cron_flag(make_snapshot): + snapshot_a = make_snapshot( + SqlModel( + name="test_model", + kind=IncrementalByTimeRangeKind(time_column="ds"), + cron="@daily", # Daily cron schedule + start="2023-01-01", + query=parse_one("SELECT 1 as id, ds FROM VALUES ('2023-01-01') t(ds)"), + allow_partials=True, + ) + ) + snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=False) + + context_diff = ContextDiff( + environment="dev", + is_new_environment=True, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={}, + snapshots={snapshot_a.snapshot_id: snapshot_a}, + new_snapshots={snapshot_a.snapshot_id: snapshot_a}, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + plan_builder_ignore_cron = PlanBuilder( + context_diff, + start="2023-01-01", + execution_time="2023-01-05 12:00:00", + is_dev=True, + include_unmodified=True, + ignore_cron=True, + end_bounded=False, + ) + + plan = plan_builder_ignore_cron.build() + assert plan.ignore_cron is True + assert plan.to_evaluatable().ignore_cron is True + + assert plan.missing_intervals == [ + SnapshotIntervals( + snapshot_id=snapshot_a.snapshot_id, + intervals=[ + (to_timestamp("2023-01-01"), to_timestamp("2023-01-02")), + (to_timestamp("2023-01-02"), to_timestamp("2023-01-03")), + (to_timestamp("2023-01-03"), to_timestamp("2023-01-04")), + (to_timestamp("2023-01-04"), to_timestamp("2023-01-05")), + (to_timestamp("2023-01-05"), to_timestamp("2023-01-05 12:00:00")), + ], + ) + ] diff --git a/tests/core/test_plan_stages.py b/tests/core/test_plan_stages.py index 1b660e1a87..aedf50e26f 100644 --- a/tests/core/test_plan_stages.py +++ b/tests/core/test_plan_stages.py @@ -107,6 +107,7 @@ def test_build_plan_stages_basic( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[snapshot_a.snapshot_id, snapshot_b.snapshot_id], indirectly_modified_snapshots={}, metadata_updated_snapshots=[], @@ -218,6 +219,7 @@ def test_build_plan_stages_with_before_all_and_after_all( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[snapshot_a.snapshot_id, snapshot_b.snapshot_id], indirectly_modified_snapshots={}, metadata_updated_snapshots=[], @@ -326,6 +328,7 @@ def test_build_plan_stages_select_models( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[snapshot_a.snapshot_id, snapshot_b.snapshot_id], indirectly_modified_snapshots={}, metadata_updated_snapshots=[], @@ -426,6 +429,7 @@ def test_build_plan_stages_basic_no_backfill( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[snapshot_a.snapshot_id, snapshot_b.snapshot_id], indirectly_modified_snapshots={}, metadata_updated_snapshots=[], @@ -535,6 +539,7 @@ def test_build_plan_stages_restatement( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[], # No changes indirectly_modified_snapshots={}, # No changes metadata_updated_snapshots=[], @@ -644,6 +649,7 @@ def test_build_plan_stages_forward_only( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[new_snapshot_a.snapshot_id], indirectly_modified_snapshots={ new_snapshot_a.name: [new_snapshot_b.snapshot_id], @@ -772,6 +778,7 @@ def test_build_plan_stages_forward_only_dev( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[new_snapshot_a.snapshot_id], indirectly_modified_snapshots={ new_snapshot_a.name: [new_snapshot_b.snapshot_id], @@ -895,6 +902,7 @@ def _get_snapshots(snapshot_ids: t.List[SnapshotId]) -> t.Dict[SnapshotId, Snaps forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[new_snapshot_a.snapshot_id], indirectly_modified_snapshots={ new_snapshot_a.name: [new_snapshot_b.snapshot_id], @@ -1018,6 +1026,7 @@ def test_build_plan_stages_forward_only_ensure_finalized_snapshots( forward_only=False, end_bounded=False, ensure_finalized_snapshots=True, + ignore_cron=False, directly_modified_snapshots=[new_snapshot_a.snapshot_id], indirectly_modified_snapshots={ new_snapshot_a.name: [new_snapshot_b.snapshot_id], @@ -1092,6 +1101,7 @@ def test_build_plan_stages_removed_model( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[], indirectly_modified_snapshots={}, metadata_updated_snapshots=[], @@ -1173,6 +1183,7 @@ def test_build_plan_stages_environment_suffix_target_changed( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[], indirectly_modified_snapshots={}, metadata_updated_snapshots=[], @@ -1270,6 +1281,7 @@ def test_build_plan_stages_indirect_non_breaking_view_migration( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[new_snapshot_a.snapshot_id], indirectly_modified_snapshots={ new_snapshot_a.name: [new_snapshot_c.snapshot_id], @@ -1357,6 +1369,7 @@ def test_build_plan_stages_virtual_environment_mode_filtering( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[snapshot_full.snapshot_id, snapshot_dev_only.snapshot_id], indirectly_modified_snapshots={}, metadata_updated_snapshots=[], @@ -1408,6 +1421,7 @@ def test_build_plan_stages_virtual_environment_mode_filtering( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[snapshot_full.snapshot_id, snapshot_dev_only.snapshot_id], indirectly_modified_snapshots={}, metadata_updated_snapshots=[], @@ -1469,6 +1483,7 @@ def test_build_plan_stages_virtual_environment_mode_filtering( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[], indirectly_modified_snapshots={}, metadata_updated_snapshots=[], @@ -1541,6 +1556,7 @@ def test_build_plan_stages_virtual_environment_mode_no_updates( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[snapshot_dev_only.snapshot_id], indirectly_modified_snapshots={}, metadata_updated_snapshots=[], @@ -1603,6 +1619,7 @@ def test_adjust_intervals_new_forward_only_dev_intervals( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[], indirectly_modified_snapshots={}, metadata_updated_snapshots=[], @@ -1669,6 +1686,7 @@ def test_adjust_intervals_restatement_removal( forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[], indirectly_modified_snapshots={}, metadata_updated_snapshots=[], @@ -1760,6 +1778,7 @@ def test_adjust_intervals_should_force_rebuild(make_snapshot, mocker: MockerFixt forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, + ignore_cron=False, directly_modified_snapshots=[new_snapshot.snapshot_id], indirectly_modified_snapshots={}, metadata_updated_snapshots=[],