Skip to content

Commit be1fafd

Browse files
committed
Add --skip-audits flag to plan and run commands
Allow users to skip audit execution during plan application and run operations. This is useful for large-scale backfills or development scenarios where audit overhead is not desired. Signed-off-by: Harry Brundage <harry.brundage@gmail.com>
1 parent cea8418 commit be1fafd

File tree

11 files changed

+447
-51
lines changed

11 files changed

+447
-51
lines changed

docs/concepts/audits.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,7 @@ By default, SQLMesh will halt the pipeline when an audit fails to prevent potent
684684

685685
## Advanced usage
686686
### Skipping audits
687-
Audits can be skipped by setting the `skip` argument to `true` as in the following example:
687+
Individual audits can be skipped by setting the `skip` argument to `true` as in the following example:
688688

689689
```sql linenums="1" hl_lines="3"
690690
AUDIT (
@@ -696,6 +696,13 @@ WHERE ds BETWEEN @start_ds AND @end_ds AND
696696
price IS NULL;
697697
```
698698

699+
To skip **all** audits for an entire `plan` or `run` invocation, use the `--skip-audits` CLI flag:
700+
701+
```bash
702+
sqlmesh plan --skip-audits
703+
sqlmesh run --skip-audits
704+
```
705+
699706
### Non-blocking audits
700707
By default, audits that fail will stop the execution of the pipeline to prevent bad data from propagating further.
701708

docs/reference/cli.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,8 @@ Options:
393393
versions of the models and standalone
394394
audits.
395395
--explain Explain the plan instead of applying it.
396+
--skip-audits Skip audit execution during plan
397+
application.
396398
-v, --verbose Verbose output. Use -vv for very verbose
397399
output.
398400
--help Show this message and exit.
@@ -504,6 +506,7 @@ Options:
504506
Only applicable when --select-model is used.
505507
Note: this may result in missing / invalid
506508
data for the selected models.
509+
--skip-audits Skip audit execution during the run.
507510
--help Show this message and exit.
508511
```
509512

sqlmesh/cli/main.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,12 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None:
538538
default=None,
539539
help="For every model, ensure at least this many intervals are covered by a missing intervals check regardless of the plan start date",
540540
)
541+
@click.option(
542+
"--skip-audits",
543+
is_flag=True,
544+
help="Skip audit execution during plan application.",
545+
default=None,
546+
)
541547
@opt.verbose
542548
@click.pass_context
543549
@error_handler
@@ -596,6 +602,11 @@ def plan(
596602
is_flag=True,
597603
help="Do not automatically include upstream models. Only applicable when --select-model is used. Note: this may result in missing / invalid data for the selected models.",
598604
)
605+
@click.option(
606+
"--skip-audits",
607+
is_flag=True,
608+
help="Skip audit execution during the run.",
609+
)
599610
@click.pass_context
600611
@error_handler
601612
@cli_analytics

sqlmesh/core/context.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,7 @@ def run(
762762
select_models: t.Optional[t.Collection[str]] = None,
763763
exit_on_env_update: t.Optional[int] = None,
764764
no_auto_upstream: bool = False,
765+
skip_audits: bool = False,
765766
) -> CompletionStatus:
766767
"""Run the entire dag through the scheduler.
767768
@@ -777,6 +778,7 @@ def run(
777778
exit_on_env_update: If set, exits with the provided code if the run is interrupted by an update
778779
to the target environment.
779780
no_auto_upstream: Whether to not force upstream models to run. Only applicable when using `select_models`.
781+
skip_audits: Whether to skip audit execution after a successful run. Default: False.
780782
781783
Returns:
782784
True if the run was successful, False otherwise.
@@ -845,6 +847,7 @@ def _has_environment_changed() -> bool:
845847
select_models=select_models,
846848
circuit_breaker=_has_environment_changed,
847849
no_auto_upstream=no_auto_upstream,
850+
skip_audits=skip_audits,
848851
)
849852
done = True
850853
except CircuitBreakerError:
@@ -1340,6 +1343,7 @@ def plan(
13401343
explain: t.Optional[bool] = None,
13411344
ignore_cron: t.Optional[bool] = None,
13421345
min_intervals: t.Optional[int] = None,
1346+
skip_audits: t.Optional[bool] = None,
13431347
) -> Plan:
13441348
"""Interactively creates a plan.
13451349
@@ -1389,6 +1393,7 @@ def plan(
13891393
explain: Whether to explain the plan instead of applying it.
13901394
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
13911395
on every model when checking for missing intervals
1396+
skip_audits: Whether to skip audit execution after a successful backfill. Default: False.
13921397
13931398
Returns:
13941399
The populated Plan object.
@@ -1420,6 +1425,7 @@ def plan(
14201425
explain=explain,
14211426
ignore_cron=ignore_cron,
14221427
min_intervals=min_intervals,
1428+
skip_audits=skip_audits,
14231429
)
14241430

14251431
plan = plan_builder.build()
@@ -1473,6 +1479,7 @@ def plan_builder(
14731479
ignore_cron: t.Optional[bool] = None,
14741480
min_intervals: t.Optional[int] = None,
14751481
always_include_local_changes: t.Optional[bool] = None,
1482+
skip_audits: t.Optional[bool] = None,
14761483
) -> PlanBuilder:
14771484
"""Creates a plan builder.
14781485
@@ -1513,6 +1520,7 @@ def plan_builder(
15131520
on every model when checking for missing intervals
15141521
always_include_local_changes: Usually when restatements are present, local changes in the filesystem are ignored.
15151522
However, it can be desirable to deploy changes + restatements in the same plan, so this flag overrides the default behaviour.
1523+
skip_audits: Whether to skip audit execution after a successful backfill. Default: False.
15161524
15171525
Returns:
15181526
The plan builder.
@@ -1544,6 +1552,7 @@ def plan_builder(
15441552
"diff_rendered": diff_rendered,
15451553
"skip_linter": skip_linter,
15461554
"min_intervals": min_intervals,
1555+
"skip_audits": skip_audits,
15471556
}
15481557
user_provided_flags: t.Dict[str, UserProvidedFlags] = {
15491558
k: v for k, v in kwargs.items() if v is not None
@@ -1553,6 +1562,7 @@ def plan_builder(
15531562
no_gaps = no_gaps or False
15541563
skip_backfill = skip_backfill or False
15551564
empty_backfill = empty_backfill or False
1565+
skip_audits = skip_audits or False
15561566
run = run or False
15571567
diff_rendered = diff_rendered or False
15581568
skip_linter = skip_linter or False
@@ -1747,6 +1757,7 @@ def plan_builder(
17471757
},
17481758
explain=explain or False,
17491759
ignore_cron=ignore_cron or False,
1760+
skip_audits=skip_audits,
17501761
)
17511762

17521763
def apply(
@@ -2542,6 +2553,7 @@ def _run(
25422553
select_models: t.Optional[t.Collection[str]],
25432554
circuit_breaker: t.Optional[t.Callable[[], bool]],
25442555
no_auto_upstream: bool,
2556+
skip_audits: bool = False,
25452557
) -> CompletionStatus:
25462558
scheduler = self.scheduler(environment=environment)
25472559
snapshots = scheduler.snapshots
@@ -2561,6 +2573,7 @@ def _run(
25612573
selected_snapshots=select_models,
25622574
auto_restatement_enabled=environment.lower() == c.PROD,
25632575
run_environment_statements=True,
2576+
skip_audits=skip_audits,
25642577
)
25652578

25662579
if completion_status.is_nothing_to_do:

sqlmesh/core/plan/builder.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ def __init__(
111111
no_gaps: bool = False,
112112
skip_backfill: bool = False,
113113
empty_backfill: bool = False,
114+
skip_audits: bool = False,
114115
is_dev: bool = False,
115116
forward_only: bool = False,
116117
allow_destructive_models: t.Optional[t.Iterable[str]] = None,
@@ -139,6 +140,7 @@ def __init__(
139140
self._no_gaps = no_gaps
140141
self._skip_backfill = skip_backfill
141142
self._empty_backfill = empty_backfill
143+
self._skip_audits = skip_audits
142144
self._is_dev = is_dev
143145
self._forward_only = forward_only
144146
self._allow_destructive_models = set(
@@ -331,6 +333,7 @@ def build(self) -> Plan:
331333
is_dev=self._is_dev,
332334
skip_backfill=self._skip_backfill,
333335
empty_backfill=self._empty_backfill,
336+
skip_audits=self._skip_audits,
334337
no_gaps=self._no_gaps,
335338
forward_only=self._forward_only,
336339
explain=self._explain,

sqlmesh/core/plan/definition.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class Plan(PydanticModel, frozen=True):
4141
is_dev: bool
4242
skip_backfill: bool
4343
empty_backfill: bool
44+
skip_audits: bool = False
4445
no_gaps: bool
4546
forward_only: bool
4647
allow_destructive_models: t.Set[str]
@@ -271,6 +272,7 @@ def to_evaluatable(self) -> EvaluatablePlan:
271272
no_gaps=self.no_gaps,
272273
skip_backfill=self.skip_backfill,
273274
empty_backfill=self.empty_backfill,
275+
skip_audits=self.skip_audits,
274276
restatements={s.name: i for s, i in self.restatements.items()},
275277
restate_all_snapshots=self.restate_all_snapshots,
276278
is_dev=self.is_dev,
@@ -316,6 +318,7 @@ class EvaluatablePlan(PydanticModel):
316318
no_gaps: bool
317319
skip_backfill: bool
318320
empty_backfill: bool
321+
skip_audits: bool = False
319322
restatements: t.Dict[str, Interval]
320323
restate_all_snapshots: bool
321324
is_dev: bool

sqlmesh/core/plan/evaluator.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,13 +259,17 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
259259
selected_snapshot_ids=stage.selected_snapshot_ids,
260260
selected_models=plan.selected_models,
261261
is_restatement=bool(plan.restatements),
262+
skip_audits=plan.skip_audits,
262263
)
263264
if errors:
264265
raise PlanError("Plan application failed.")
265266

266267
def visit_audit_only_run_stage(
267268
self, stage: stages.AuditOnlyRunStage, plan: EvaluatablePlan
268269
) -> None:
270+
if plan.skip_audits:
271+
return
272+
269273
audit_snapshots = stage.snapshots
270274
if not audit_snapshots:
271275
return

sqlmesh/core/scheduler.py

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ def evaluate(
200200
allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
201201
allow_additive_snapshots: t.Optional[t.Set[str]] = None,
202202
target_table_exists: t.Optional[bool] = None,
203+
skip_audits: bool = False,
203204
**kwargs: t.Any,
204205
) -> t.List[AuditResult]:
205206
"""Evaluate a snapshot and add the processed interval to the state sync.
@@ -239,17 +240,19 @@ def evaluate(
239240
target_table_exists=target_table_exists,
240241
**kwargs,
241242
)
242-
audit_results = self._audit_snapshot(
243-
snapshot=snapshot,
244-
environment_naming_info=environment_naming_info,
245-
start=start,
246-
end=end,
247-
execution_time=execution_time,
248-
snapshots=snapshots,
249-
deployability_index=deployability_index,
250-
wap_id=wap_id,
251-
**kwargs,
252-
)
243+
audit_results: t.List[AuditResult] = []
244+
if not skip_audits:
245+
audit_results = self._audit_snapshot(
246+
snapshot=snapshot,
247+
environment_naming_info=environment_naming_info,
248+
start=start,
249+
end=end,
250+
execution_time=execution_time,
251+
snapshots=snapshots,
252+
deployability_index=deployability_index,
253+
wap_id=wap_id,
254+
**kwargs,
255+
)
253256

254257
self.state_sync.add_interval(
255258
snapshot, start, end, is_dev=not is_deployable, last_altered_ts=now_timestamp()
@@ -272,6 +275,7 @@ def run(
272275
deployability_index: t.Optional[DeployabilityIndex] = None,
273276
auto_restatement_enabled: bool = False,
274277
run_environment_statements: bool = False,
278+
skip_audits: bool = False,
275279
) -> CompletionStatus:
276280
return self._run_or_audit(
277281
environment=environment,
@@ -288,6 +292,7 @@ def run(
288292
deployability_index=deployability_index,
289293
auto_restatement_enabled=auto_restatement_enabled,
290294
run_environment_statements=run_environment_statements,
295+
skip_audits=skip_audits,
291296
)
292297

293298
def audit(
@@ -304,7 +309,11 @@ def audit(
304309
circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
305310
deployability_index: t.Optional[DeployabilityIndex] = None,
306311
run_environment_statements: bool = False,
312+
skip_audits: bool = False,
307313
) -> CompletionStatus:
314+
if skip_audits:
315+
return CompletionStatus.SUCCESS
316+
308317
# Remove the intervals from the snapshots that will be audited so that they can be recomputed
309318
# by _run_or_audit as "missing intervals" to reuse the rest of it's logic
310319
remove_intervals = {}
@@ -434,6 +443,7 @@ def run_merged_intervals(
434443
selected_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
435444
run_environment_statements: bool = False,
436445
audit_only: bool = False,
446+
skip_audits: bool = False,
437447
auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {},
438448
is_restatement: bool = False,
439449
) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
@@ -537,15 +547,16 @@ def run_node(node: SchedulingUnit) -> None:
537547
assert deployability_index # mypy
538548

539549
if audit_only:
540-
audit_results = self._audit_snapshot(
541-
snapshot=snapshot,
542-
environment_naming_info=environment_naming_info,
543-
deployability_index=deployability_index,
544-
snapshots=self.snapshots_by_name,
545-
start=start,
546-
end=end,
547-
execution_time=execution_time,
548-
)
550+
if not skip_audits:
551+
audit_results = self._audit_snapshot(
552+
snapshot=snapshot,
553+
environment_naming_info=environment_naming_info,
554+
deployability_index=deployability_index,
555+
snapshots=self.snapshots_by_name,
556+
start=start,
557+
end=end,
558+
execution_time=execution_time,
559+
)
549560
else:
550561
# If batch_index > 0, then the target table must exist since the first batch would have created it
551562
target_table_exists = (
@@ -563,6 +574,7 @@ def run_node(node: SchedulingUnit) -> None:
563574
allow_additive_snapshots=allow_additive_snapshots,
564575
target_table_exists=target_table_exists,
565576
selected_models=selected_models,
577+
skip_audits=skip_audits,
566578
)
567579

568580
evaluation_duration_ms = now_timestamp() - execution_start_ts
@@ -785,6 +797,7 @@ def _run_or_audit(
785797
auto_restatement_enabled: bool = False,
786798
run_environment_statements: bool = False,
787799
audit_only: bool = False,
800+
skip_audits: bool = False,
788801
) -> CompletionStatus:
789802
"""Concurrently runs or audits all snapshots in topological order.
790803
@@ -876,6 +889,7 @@ def _run_or_audit(
876889
end=end,
877890
run_environment_statements=run_environment_statements,
878891
audit_only=audit_only,
892+
skip_audits=skip_audits,
879893
auto_restatement_triggers=auto_restatement_triggers,
880894
selected_models={
881895
s.node.dbt_unique_id for s in merged_intervals if s.node.dbt_unique_id

0 commit comments

Comments
 (0)