diff --git a/docs/concepts/audits.md b/docs/concepts/audits.md index a5a9fccc49..74827b756f 100644 --- a/docs/concepts/audits.md +++ b/docs/concepts/audits.md @@ -684,7 +684,7 @@ By default, SQLMesh will halt the pipeline when an audit fails to prevent potent ## Advanced usage ### Skipping audits -Audits can be skipped by setting the `skip` argument to `true` as in the following example: +Individual audits can be skipped by setting the `skip` argument to `true` as in the following example: ```sql linenums="1" hl_lines="3" AUDIT ( @@ -696,6 +696,13 @@ WHERE ds BETWEEN @start_ds AND @end_ds AND price IS NULL; ``` +To skip **all** audits for an entire `plan` or `run` invocation, use the `--skip-audits` CLI flag: + +```bash +sqlmesh plan --skip-audits +sqlmesh run --skip-audits +``` + ### Non-blocking audits By default, audits that fail will stop the execution of the pipeline to prevent bad data from propagating further. diff --git a/docs/reference/cli.md b/docs/reference/cli.md index a9ce9366e1..486407a638 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -393,6 +393,8 @@ Options: versions of the models and standalone audits. --explain Explain the plan instead of applying it. + --skip-audits Skip audit execution during plan + application. -v, --verbose Verbose output. Use -vv for very verbose output. --help Show this message and exit. @@ -504,6 +506,7 @@ Options: Only applicable when --select-model is used. Note: this may result in missing / invalid data for the selected models. + --skip-audits Skip audit execution during the run. --help Show this message and exit. ``` diff --git a/sqlmesh/cli/main.py b/sqlmesh/cli/main.py index 45f95d2abb..3c6c30c9bc 100644 --- a/sqlmesh/cli/main.py +++ b/sqlmesh/cli/main.py @@ -538,6 +538,12 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None: default=None, help="For every model, ensure at least this many intervals are covered by a missing intervals check regardless of the plan start date", ) +@click.option( + "--skip-audits", + is_flag=True, + help="Skip audit execution during plan application.", + default=None, +) @opt.verbose @click.pass_context @error_handler @@ -596,6 +602,11 @@ def plan( is_flag=True, help="Do not automatically include upstream models. Only applicable when --select-model is used. Note: this may result in missing / invalid data for the selected models.", ) +@click.option( + "--skip-audits", + is_flag=True, + help="Skip audit execution during the run.", +) @click.pass_context @error_handler @cli_analytics diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index e6b404c597..8817502079 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -762,6 +762,7 @@ def run( select_models: t.Optional[t.Collection[str]] = None, exit_on_env_update: t.Optional[int] = None, no_auto_upstream: bool = False, + skip_audits: bool = False, ) -> CompletionStatus: """Run the entire dag through the scheduler. @@ -777,6 +778,7 @@ def run( exit_on_env_update: If set, exits with the provided code if the run is interrupted by an update to the target environment. no_auto_upstream: Whether to not force upstream models to run. Only applicable when using `select_models`. + skip_audits: Whether to skip audit execution after a successful run. Default: False. Returns: True if the run was successful, False otherwise. @@ -845,6 +847,7 @@ def _has_environment_changed() -> bool: select_models=select_models, circuit_breaker=_has_environment_changed, no_auto_upstream=no_auto_upstream, + skip_audits=skip_audits, ) done = True except CircuitBreakerError: @@ -1340,6 +1343,7 @@ def plan( explain: t.Optional[bool] = None, ignore_cron: t.Optional[bool] = None, min_intervals: t.Optional[int] = None, + skip_audits: t.Optional[bool] = None, ) -> Plan: """Interactively creates a plan. @@ -1389,6 +1393,7 @@ def plan( explain: Whether to explain the plan instead of applying it. min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered on every model when checking for missing intervals + skip_audits: Whether to skip audit execution after a successful backfill. Default: False. Returns: The populated Plan object. @@ -1420,6 +1425,7 @@ def plan( explain=explain, ignore_cron=ignore_cron, min_intervals=min_intervals, + skip_audits=skip_audits, ) plan = plan_builder.build() @@ -1473,6 +1479,7 @@ def plan_builder( ignore_cron: t.Optional[bool] = None, min_intervals: t.Optional[int] = None, always_include_local_changes: t.Optional[bool] = None, + skip_audits: t.Optional[bool] = None, ) -> PlanBuilder: """Creates a plan builder. @@ -1513,6 +1520,7 @@ def plan_builder( on every model when checking for missing intervals always_include_local_changes: Usually when restatements are present, local changes in the filesystem are ignored. However, it can be desirable to deploy changes + restatements in the same plan, so this flag overrides the default behaviour. + skip_audits: Whether to skip audit execution after a successful backfill. Default: False. Returns: The plan builder. @@ -1544,6 +1552,7 @@ def plan_builder( "diff_rendered": diff_rendered, "skip_linter": skip_linter, "min_intervals": min_intervals, + "skip_audits": skip_audits, } user_provided_flags: t.Dict[str, UserProvidedFlags] = { k: v for k, v in kwargs.items() if v is not None @@ -1553,6 +1562,7 @@ def plan_builder( no_gaps = no_gaps or False skip_backfill = skip_backfill or False empty_backfill = empty_backfill or False + skip_audits = skip_audits or False run = run or False diff_rendered = diff_rendered or False skip_linter = skip_linter or False @@ -1747,6 +1757,7 @@ def plan_builder( }, explain=explain or False, ignore_cron=ignore_cron or False, + skip_audits=skip_audits, ) def apply( @@ -2542,6 +2553,7 @@ def _run( select_models: t.Optional[t.Collection[str]], circuit_breaker: t.Optional[t.Callable[[], bool]], no_auto_upstream: bool, + skip_audits: bool = False, ) -> CompletionStatus: scheduler = self.scheduler(environment=environment) snapshots = scheduler.snapshots @@ -2561,6 +2573,7 @@ def _run( selected_snapshots=select_models, auto_restatement_enabled=environment.lower() == c.PROD, run_environment_statements=True, + skip_audits=skip_audits, ) if completion_status.is_nothing_to_do: diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index 7d753cc330..ee60bc5102 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -111,6 +111,7 @@ def __init__( no_gaps: bool = False, skip_backfill: bool = False, empty_backfill: bool = False, + skip_audits: bool = False, is_dev: bool = False, forward_only: bool = False, allow_destructive_models: t.Optional[t.Iterable[str]] = None, @@ -139,6 +140,7 @@ def __init__( self._no_gaps = no_gaps self._skip_backfill = skip_backfill self._empty_backfill = empty_backfill + self._skip_audits = skip_audits self._is_dev = is_dev self._forward_only = forward_only self._allow_destructive_models = set( @@ -331,6 +333,7 @@ def build(self) -> Plan: is_dev=self._is_dev, skip_backfill=self._skip_backfill, empty_backfill=self._empty_backfill, + skip_audits=self._skip_audits, no_gaps=self._no_gaps, forward_only=self._forward_only, explain=self._explain, diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index 866299eff8..c0d3947b58 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -41,6 +41,7 @@ class Plan(PydanticModel, frozen=True): is_dev: bool skip_backfill: bool empty_backfill: bool + skip_audits: bool = False no_gaps: bool forward_only: bool allow_destructive_models: t.Set[str] @@ -271,6 +272,7 @@ def to_evaluatable(self) -> EvaluatablePlan: no_gaps=self.no_gaps, skip_backfill=self.skip_backfill, empty_backfill=self.empty_backfill, + skip_audits=self.skip_audits, restatements={s.name: i for s, i in self.restatements.items()}, restate_all_snapshots=self.restate_all_snapshots, is_dev=self.is_dev, @@ -316,6 +318,7 @@ class EvaluatablePlan(PydanticModel): no_gaps: bool skip_backfill: bool empty_backfill: bool + skip_audits: bool = False restatements: t.Dict[str, Interval] restate_all_snapshots: bool is_dev: bool diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index f2f432a97e..a78983161d 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -259,6 +259,7 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla selected_snapshot_ids=stage.selected_snapshot_ids, selected_models=plan.selected_models, is_restatement=bool(plan.restatements), + skip_audits=plan.skip_audits, ) if errors: raise PlanError("Plan application failed.") @@ -266,6 +267,9 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla def visit_audit_only_run_stage( self, stage: stages.AuditOnlyRunStage, plan: EvaluatablePlan ) -> None: + if plan.skip_audits: + return + audit_snapshots = stage.snapshots if not audit_snapshots: return diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 5eb0ff40ff..d8388f4820 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -200,6 +200,7 @@ def evaluate( allow_destructive_snapshots: t.Optional[t.Set[str]] = None, allow_additive_snapshots: t.Optional[t.Set[str]] = None, target_table_exists: t.Optional[bool] = None, + skip_audits: bool = False, **kwargs: t.Any, ) -> t.List[AuditResult]: """Evaluate a snapshot and add the processed interval to the state sync. @@ -239,17 +240,19 @@ def evaluate( target_table_exists=target_table_exists, **kwargs, ) - audit_results = self._audit_snapshot( - snapshot=snapshot, - environment_naming_info=environment_naming_info, - start=start, - end=end, - execution_time=execution_time, - snapshots=snapshots, - deployability_index=deployability_index, - wap_id=wap_id, - **kwargs, - ) + audit_results: t.List[AuditResult] = [] + if not skip_audits: + audit_results = self._audit_snapshot( + snapshot=snapshot, + environment_naming_info=environment_naming_info, + start=start, + end=end, + execution_time=execution_time, + snapshots=snapshots, + deployability_index=deployability_index, + wap_id=wap_id, + **kwargs, + ) self.state_sync.add_interval( snapshot, start, end, is_dev=not is_deployable, last_altered_ts=now_timestamp() @@ -272,6 +275,7 @@ def run( deployability_index: t.Optional[DeployabilityIndex] = None, auto_restatement_enabled: bool = False, run_environment_statements: bool = False, + skip_audits: bool = False, ) -> CompletionStatus: return self._run_or_audit( environment=environment, @@ -288,6 +292,7 @@ def run( deployability_index=deployability_index, auto_restatement_enabled=auto_restatement_enabled, run_environment_statements=run_environment_statements, + skip_audits=skip_audits, ) def audit( @@ -304,7 +309,11 @@ def audit( circuit_breaker: t.Optional[t.Callable[[], bool]] = None, deployability_index: t.Optional[DeployabilityIndex] = None, run_environment_statements: bool = False, + skip_audits: bool = False, ) -> CompletionStatus: + if skip_audits: + return CompletionStatus.SUCCESS + # Remove the intervals from the snapshots that will be audited so that they can be recomputed # by _run_or_audit as "missing intervals" to reuse the rest of it's logic remove_intervals = {} @@ -434,6 +443,7 @@ def run_merged_intervals( selected_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, run_environment_statements: bool = False, audit_only: bool = False, + skip_audits: bool = False, auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}, is_restatement: bool = False, ) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]: @@ -537,15 +547,16 @@ def run_node(node: SchedulingUnit) -> None: assert deployability_index # mypy if audit_only: - audit_results = self._audit_snapshot( - snapshot=snapshot, - environment_naming_info=environment_naming_info, - deployability_index=deployability_index, - snapshots=self.snapshots_by_name, - start=start, - end=end, - execution_time=execution_time, - ) + if not skip_audits: + audit_results = self._audit_snapshot( + snapshot=snapshot, + environment_naming_info=environment_naming_info, + deployability_index=deployability_index, + snapshots=self.snapshots_by_name, + start=start, + end=end, + execution_time=execution_time, + ) else: # If batch_index > 0, then the target table must exist since the first batch would have created it target_table_exists = ( @@ -563,6 +574,7 @@ def run_node(node: SchedulingUnit) -> None: allow_additive_snapshots=allow_additive_snapshots, target_table_exists=target_table_exists, selected_models=selected_models, + skip_audits=skip_audits, ) evaluation_duration_ms = now_timestamp() - execution_start_ts @@ -785,6 +797,7 @@ def _run_or_audit( auto_restatement_enabled: bool = False, run_environment_statements: bool = False, audit_only: bool = False, + skip_audits: bool = False, ) -> CompletionStatus: """Concurrently runs or audits all snapshots in topological order. @@ -876,6 +889,7 @@ def _run_or_audit( end=end, run_environment_statements=run_environment_statements, audit_only=audit_only, + skip_audits=skip_audits, auto_restatement_triggers=auto_restatement_triggers, selected_models={ s.node.dbt_unique_id for s in merged_intervals if s.node.dbt_unique_id diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 480d186fa1..2b5b28a513 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -1734,6 +1734,114 @@ def test_ignore_warnings(runner: CliRunner, tmp_path: Path) -> None: assert audit_warning not in result.output +def test_plan_skip_audits(runner: CliRunner, tmp_path: Path) -> None: + create_example_project(tmp_path) + + # Add non-blocking audit that always fails to generate a WARNING + with open(tmp_path / "models" / "full_model.sql", "w", encoding="utf-8") as f: + f.write( + """ +MODEL ( + name sqlmesh_example.full_model, + kind FULL, + cron '@daily', + grain item_id, + audits (full_nonblocking_audit), +); + +SELECT + item_id, + COUNT(DISTINCT id) AS num_orders, +FROM + sqlmesh_example.incremental_model +GROUP BY item_id; + +AUDIT ( + name full_nonblocking_audit, + blocking false, +); +select 1 as a; +""" + ) + + audit_warning = "[WARNING] sqlmesh_example.full_model: 'full_nonblocking_audit' audit error: " + + # Without --skip-audits, the audit warning appears + result = runner.invoke( + cli, + ["--paths", str(tmp_path), "plan", "--no-prompts", "--auto-apply", "--skip-tests"], + ) + assert result.exit_code == 0 + assert audit_warning in result.output + + # With --skip-audits, the audit is not executed so no warning appears + result = runner.invoke( + cli, + ["--paths", str(tmp_path), "plan", "--no-prompts", "--auto-apply", "--skip-tests", "--skip-audits"], + ) + assert result.exit_code == 0 + assert audit_warning not in result.output + + +def test_run_skip_audits(runner: CliRunner, tmp_path: Path) -> None: + create_example_project(tmp_path) + + # Add non-blocking audit that always fails to generate a WARNING + with open(tmp_path / "models" / "full_model.sql", "w", encoding="utf-8") as f: + f.write( + """ +MODEL ( + name sqlmesh_example.full_model, + kind FULL, + cron '@daily', + grain item_id, + audits (full_nonblocking_audit), +); + +SELECT + item_id, + COUNT(DISTINCT id) AS num_orders, +FROM + sqlmesh_example.incremental_model +GROUP BY item_id; + +AUDIT ( + name full_nonblocking_audit, + blocking false, +); +select 1 as a; +""" + ) + + audit_warning = "[WARNING] sqlmesh_example.full_model: 'full_nonblocking_audit' audit error: " + + with time_machine.travel("2023-01-01 23:59:00 UTC", tick=False) as traveler: + # Initialize prod at time T1 + result = runner.invoke( + cli, + ["--paths", str(tmp_path), "plan", "--no-prompts", "--auto-apply", "--skip-tests"], + ) + assert result.exit_code == 0 + + # Advance time so the daily cron has elapsed, then run without --skip-audits + traveler.move_to("2023-01-02 00:01:00 UTC") + result = runner.invoke( + cli, + ["--paths", str(tmp_path), "run"], + ) + assert result.exit_code == 0 + assert audit_warning in result.output + + # Advance time again so the daily cron has elapsed, then run with --skip-audits + traveler.move_to("2023-01-03 00:01:00 UTC") + result = runner.invoke( + cli, + ["--paths", str(tmp_path), "run", "--skip-audits"], + ) + assert result.exit_code == 0 + assert audit_warning not in result.output + + def test_table_diff_schema_diff_ignore_case(runner: CliRunner, tmp_path: Path): from sqlmesh.core.engine_adapter import DuckDBEngineAdapter diff --git a/tests/core/integration/test_audits.py b/tests/core/integration/test_audits.py index 457974fdac..226db14486 100644 --- a/tests/core/integration/test_audits.py +++ b/tests/core/integration/test_audits.py @@ -346,3 +346,37 @@ def test_default_audits_with_custom_audit_definitions(tmp_path: Path): if audit_name == "positive_amount": assert "column" in audit_args assert audit_args["column"].name == "amount" + + +@pytest.mark.slow +def test_skip_audits_bypasses_blocking_audit(tmp_path: Path): + models_dir = tmp_path / "models" + models_dir.mkdir(exist_ok=True) + + create_temp_file( + tmp_path, + models_dir / "bad_model.sql", + dedent(""" + MODEL ( + name test.bad_model, + kind FULL + ); + + SELECT NULL AS customer_id + """), + ) + + config = Config( + model_defaults=ModelDefaultsConfig( + dialect="duckdb", audits=["not_null(columns := [customer_id])"] + ) + ) + + context = Context(paths=tmp_path, config=config) + + # Without skip_audits, the blocking audit causes a PlanError + with pytest.raises(PlanError): + context.plan("prod", no_prompts=True, auto_apply=True) + + # With skip_audits=True, the plan succeeds even though the audit would fail + context.plan("prod", no_prompts=True, auto_apply=True, skip_audits=True) diff --git a/tests/core/test_scheduler.py b/tests/core/test_scheduler.py index cd32d2451d..7fb1afd5de 100644 --- a/tests/core/test_scheduler.py +++ b/tests/core/test_scheduler.py @@ -1213,3 +1213,38 @@ def test_dag_upstream_dependency_caching_with_complex_diamond(mocker: MockerFixt expected_g_node: {expected_a_node}, expected_h_node: {expected_a_node}, } + + +def test_evaluate_skip_audits(mocker: MockerFixture, make_snapshot): + snapshot: Snapshot = make_snapshot( + SqlModel( + name="name", + kind=IncrementalByTimeRangeKind(time_column="ds"), + query=parse_one("SELECT ds FROM parent.tbl"), + ) + ) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + state_sync_mock = mocker.MagicMock() + scheduler = Scheduler( + snapshots=[snapshot], + snapshot_evaluator=SnapshotEvaluator(adapters=mocker.MagicMock(), ddl_concurrent_tasks=1), + state_sync=state_sync_mock, + max_workers=1, + default_catalog=None, + ) + + audit_spy = mocker.spy(scheduler, "_audit_snapshot") + + result = scheduler.evaluate( + snapshot, + start=to_datetime("2022-01-01"), + end=to_datetime("2022-01-02"), + execution_time=to_datetime("2022-01-02"), + deployability_index=DeployabilityIndex.all_deployable(), + batch_index=0, + skip_audits=True, + ) + + assert result == [] + audit_spy.assert_not_called()