Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/concepts/audits.md
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ By default, SQLMesh will halt the pipeline when an audit fails to prevent potent

## Advanced usage
### Skipping audits
Audits can be skipped by setting the `skip` argument to `true` as in the following example:
Individual audits can be skipped by setting the `skip` argument to `true` as in the following example:

```sql linenums="1" hl_lines="3"
AUDIT (
Expand All @@ -696,6 +696,13 @@ WHERE ds BETWEEN @start_ds AND @end_ds AND
price IS NULL;
```

To skip **all** audits for an entire `plan` or `run` invocation, use the `--skip-audits` CLI flag:

```bash
sqlmesh plan --skip-audits
sqlmesh run --skip-audits
```

### Non-blocking audits
By default, audits that fail will stop the execution of the pipeline to prevent bad data from propagating further.

Expand Down
3 changes: 3 additions & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ Options:
versions of the models and standalone
audits.
--explain Explain the plan instead of applying it.
--skip-audits Skip audit execution during plan
application.
-v, --verbose Verbose output. Use -vv for very verbose
output.
--help Show this message and exit.
Expand Down Expand Up @@ -504,6 +506,7 @@ Options:
Only applicable when --select-model is used.
Note: this may result in missing / invalid
data for the selected models.
--skip-audits Skip audit execution during the run.
--help Show this message and exit.
```

Expand Down
11 changes: 11 additions & 0 deletions sqlmesh/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,12 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None:
default=None,
help="For every model, ensure at least this many intervals are covered by a missing intervals check regardless of the plan start date",
)
@click.option(
"--skip-audits",
is_flag=True,
help="Skip audit execution during plan application.",
default=None,
)
@opt.verbose
@click.pass_context
@error_handler
Expand Down Expand Up @@ -596,6 +602,11 @@ def plan(
is_flag=True,
help="Do not automatically include upstream models. Only applicable when --select-model is used. Note: this may result in missing / invalid data for the selected models.",
)
@click.option(
"--skip-audits",
is_flag=True,
help="Skip audit execution during the run.",
)
@click.pass_context
@error_handler
@cli_analytics
Expand Down
13 changes: 13 additions & 0 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ def run(
select_models: t.Optional[t.Collection[str]] = None,
exit_on_env_update: t.Optional[int] = None,
no_auto_upstream: bool = False,
skip_audits: bool = False,
) -> CompletionStatus:
"""Run the entire dag through the scheduler.

Expand All @@ -777,6 +778,7 @@ def run(
exit_on_env_update: If set, exits with the provided code if the run is interrupted by an update
to the target environment.
no_auto_upstream: Whether to not force upstream models to run. Only applicable when using `select_models`.
skip_audits: Whether to skip audit execution after a successful run. Default: False.

Returns:
True if the run was successful, False otherwise.
Expand Down Expand Up @@ -845,6 +847,7 @@ def _has_environment_changed() -> bool:
select_models=select_models,
circuit_breaker=_has_environment_changed,
no_auto_upstream=no_auto_upstream,
skip_audits=skip_audits,
)
done = True
except CircuitBreakerError:
Expand Down Expand Up @@ -1340,6 +1343,7 @@ def plan(
explain: t.Optional[bool] = None,
ignore_cron: t.Optional[bool] = None,
min_intervals: t.Optional[int] = None,
skip_audits: t.Optional[bool] = None,
) -> Plan:
"""Interactively creates a plan.

Expand Down Expand Up @@ -1389,6 +1393,7 @@ def plan(
explain: Whether to explain the plan instead of applying it.
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
on every model when checking for missing intervals
skip_audits: Whether to skip audit execution after a successful backfill. Default: False.

Returns:
The populated Plan object.
Expand Down Expand Up @@ -1420,6 +1425,7 @@ def plan(
explain=explain,
ignore_cron=ignore_cron,
min_intervals=min_intervals,
skip_audits=skip_audits,
)

plan = plan_builder.build()
Expand Down Expand Up @@ -1473,6 +1479,7 @@ def plan_builder(
ignore_cron: t.Optional[bool] = None,
min_intervals: t.Optional[int] = None,
always_include_local_changes: t.Optional[bool] = None,
skip_audits: t.Optional[bool] = None,
) -> PlanBuilder:
"""Creates a plan builder.

Expand Down Expand Up @@ -1513,6 +1520,7 @@ def plan_builder(
on every model when checking for missing intervals
always_include_local_changes: Usually when restatements are present, local changes in the filesystem are ignored.
However, it can be desirable to deploy changes + restatements in the same plan, so this flag overrides the default behaviour.
skip_audits: Whether to skip audit execution after a successful backfill. Default: False.

Returns:
The plan builder.
Expand Down Expand Up @@ -1544,6 +1552,7 @@ def plan_builder(
"diff_rendered": diff_rendered,
"skip_linter": skip_linter,
"min_intervals": min_intervals,
"skip_audits": skip_audits,
}
user_provided_flags: t.Dict[str, UserProvidedFlags] = {
k: v for k, v in kwargs.items() if v is not None
Expand All @@ -1553,6 +1562,7 @@ def plan_builder(
no_gaps = no_gaps or False
skip_backfill = skip_backfill or False
empty_backfill = empty_backfill or False
skip_audits = skip_audits or False
run = run or False
diff_rendered = diff_rendered or False
skip_linter = skip_linter or False
Expand Down Expand Up @@ -1747,6 +1757,7 @@ def plan_builder(
},
explain=explain or False,
ignore_cron=ignore_cron or False,
skip_audits=skip_audits,
)

def apply(
Expand Down Expand Up @@ -2542,6 +2553,7 @@ def _run(
select_models: t.Optional[t.Collection[str]],
circuit_breaker: t.Optional[t.Callable[[], bool]],
no_auto_upstream: bool,
skip_audits: bool = False,
) -> CompletionStatus:
scheduler = self.scheduler(environment=environment)
snapshots = scheduler.snapshots
Expand All @@ -2561,6 +2573,7 @@ def _run(
selected_snapshots=select_models,
auto_restatement_enabled=environment.lower() == c.PROD,
run_environment_statements=True,
skip_audits=skip_audits,
)

if completion_status.is_nothing_to_do:
Expand Down
3 changes: 3 additions & 0 deletions sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def __init__(
no_gaps: bool = False,
skip_backfill: bool = False,
empty_backfill: bool = False,
skip_audits: bool = False,
is_dev: bool = False,
forward_only: bool = False,
allow_destructive_models: t.Optional[t.Iterable[str]] = None,
Expand Down Expand Up @@ -139,6 +140,7 @@ def __init__(
self._no_gaps = no_gaps
self._skip_backfill = skip_backfill
self._empty_backfill = empty_backfill
self._skip_audits = skip_audits
self._is_dev = is_dev
self._forward_only = forward_only
self._allow_destructive_models = set(
Expand Down Expand Up @@ -331,6 +333,7 @@ def build(self) -> Plan:
is_dev=self._is_dev,
skip_backfill=self._skip_backfill,
empty_backfill=self._empty_backfill,
skip_audits=self._skip_audits,
no_gaps=self._no_gaps,
forward_only=self._forward_only,
explain=self._explain,
Expand Down
3 changes: 3 additions & 0 deletions sqlmesh/core/plan/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Plan(PydanticModel, frozen=True):
is_dev: bool
skip_backfill: bool
empty_backfill: bool
skip_audits: bool = False
no_gaps: bool
forward_only: bool
allow_destructive_models: t.Set[str]
Expand Down Expand Up @@ -271,6 +272,7 @@ def to_evaluatable(self) -> EvaluatablePlan:
no_gaps=self.no_gaps,
skip_backfill=self.skip_backfill,
empty_backfill=self.empty_backfill,
skip_audits=self.skip_audits,
restatements={s.name: i for s, i in self.restatements.items()},
restate_all_snapshots=self.restate_all_snapshots,
is_dev=self.is_dev,
Expand Down Expand Up @@ -316,6 +318,7 @@ class EvaluatablePlan(PydanticModel):
no_gaps: bool
skip_backfill: bool
empty_backfill: bool
skip_audits: bool = False
restatements: t.Dict[str, Interval]
restate_all_snapshots: bool
is_dev: bool
Expand Down
4 changes: 4 additions & 0 deletions sqlmesh/core/plan/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,17 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
selected_snapshot_ids=stage.selected_snapshot_ids,
selected_models=plan.selected_models,
is_restatement=bool(plan.restatements),
skip_audits=plan.skip_audits,
)
if errors:
raise PlanError("Plan application failed.")

def visit_audit_only_run_stage(
self, stage: stages.AuditOnlyRunStage, plan: EvaluatablePlan
) -> None:
if plan.skip_audits:
return

audit_snapshots = stage.snapshots
if not audit_snapshots:
return
Expand Down
54 changes: 34 additions & 20 deletions sqlmesh/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def evaluate(
allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
allow_additive_snapshots: t.Optional[t.Set[str]] = None,
target_table_exists: t.Optional[bool] = None,
skip_audits: bool = False,
**kwargs: t.Any,
) -> t.List[AuditResult]:
"""Evaluate a snapshot and add the processed interval to the state sync.
Expand Down Expand Up @@ -239,17 +240,19 @@ def evaluate(
target_table_exists=target_table_exists,
**kwargs,
)
audit_results = self._audit_snapshot(
snapshot=snapshot,
environment_naming_info=environment_naming_info,
start=start,
end=end,
execution_time=execution_time,
snapshots=snapshots,
deployability_index=deployability_index,
wap_id=wap_id,
**kwargs,
)
audit_results: t.List[AuditResult] = []
if not skip_audits:
audit_results = self._audit_snapshot(
snapshot=snapshot,
environment_naming_info=environment_naming_info,
start=start,
end=end,
execution_time=execution_time,
snapshots=snapshots,
deployability_index=deployability_index,
wap_id=wap_id,
**kwargs,
)

self.state_sync.add_interval(
snapshot, start, end, is_dev=not is_deployable, last_altered_ts=now_timestamp()
Expand All @@ -272,6 +275,7 @@ def run(
deployability_index: t.Optional[DeployabilityIndex] = None,
auto_restatement_enabled: bool = False,
run_environment_statements: bool = False,
skip_audits: bool = False,
) -> CompletionStatus:
return self._run_or_audit(
environment=environment,
Expand All @@ -288,6 +292,7 @@ def run(
deployability_index=deployability_index,
auto_restatement_enabled=auto_restatement_enabled,
run_environment_statements=run_environment_statements,
skip_audits=skip_audits,
)

def audit(
Expand All @@ -304,7 +309,11 @@ def audit(
circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
deployability_index: t.Optional[DeployabilityIndex] = None,
run_environment_statements: bool = False,
skip_audits: bool = False,
) -> CompletionStatus:
if skip_audits:
return CompletionStatus.SUCCESS

# Remove the intervals from the snapshots that will be audited so that they can be recomputed
# by _run_or_audit as "missing intervals" to reuse the rest of it's logic
remove_intervals = {}
Expand Down Expand Up @@ -434,6 +443,7 @@ def run_merged_intervals(
selected_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
run_environment_statements: bool = False,
audit_only: bool = False,
skip_audits: bool = False,
auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {},
is_restatement: bool = False,
) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
Expand Down Expand Up @@ -537,15 +547,16 @@ def run_node(node: SchedulingUnit) -> None:
assert deployability_index # mypy

if audit_only:
audit_results = self._audit_snapshot(
snapshot=snapshot,
environment_naming_info=environment_naming_info,
deployability_index=deployability_index,
snapshots=self.snapshots_by_name,
start=start,
end=end,
execution_time=execution_time,
)
if not skip_audits:
audit_results = self._audit_snapshot(
snapshot=snapshot,
environment_naming_info=environment_naming_info,
deployability_index=deployability_index,
snapshots=self.snapshots_by_name,
start=start,
end=end,
execution_time=execution_time,
)
else:
# If batch_index > 0, then the target table must exist since the first batch would have created it
target_table_exists = (
Expand All @@ -563,6 +574,7 @@ def run_node(node: SchedulingUnit) -> None:
allow_additive_snapshots=allow_additive_snapshots,
target_table_exists=target_table_exists,
selected_models=selected_models,
skip_audits=skip_audits,
)

evaluation_duration_ms = now_timestamp() - execution_start_ts
Expand Down Expand Up @@ -785,6 +797,7 @@ def _run_or_audit(
auto_restatement_enabled: bool = False,
run_environment_statements: bool = False,
audit_only: bool = False,
skip_audits: bool = False,
) -> CompletionStatus:
"""Concurrently runs or audits all snapshots in topological order.

Expand Down Expand Up @@ -876,6 +889,7 @@ def _run_or_audit(
end=end,
run_environment_statements=run_environment_statements,
audit_only=audit_only,
skip_audits=skip_audits,
auto_restatement_triggers=auto_restatement_triggers,
selected_models={
s.node.dbt_unique_id for s in merged_intervals if s.node.dbt_unique_id
Expand Down
Loading
Loading