Skip to content

Commit fb7fc74

Browse files
authored
Merge branch 'main' into jo/allow_empty_models_dbt
2 parents 6e1a296 + da49d57 commit fb7fc74

File tree

3 files changed

+45
-9
lines changed

3 files changed

+45
-9
lines changed

sqlmesh/core/plan/stages.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -574,23 +574,31 @@ def _get_audit_only_snapshots(
574574
) -> t.Dict[SnapshotId, Snapshot]:
575575
metadata_snapshots = []
576576
for snapshot in new_snapshots.values():
577-
if not snapshot.is_metadata or not snapshot.is_model or not snapshot.evaluatable:
577+
if (
578+
not snapshot.is_metadata
579+
or not snapshot.is_model
580+
or not snapshot.evaluatable
581+
or not snapshot.previous_version
582+
):
578583
continue
579584

580585
metadata_snapshots.append(snapshot)
581586

582587
# Bulk load all the previous snapshots
583-
previous_snapshots = self.state_reader.get_snapshots(
584-
[
585-
s.previous_version.snapshot_id(s.name)
586-
for s in metadata_snapshots
587-
if s.previous_version
588-
]
589-
).values()
588+
previous_snapshot_ids = [
589+
s.previous_version.snapshot_id(s.name) for s in metadata_snapshots if s.previous_version
590+
]
591+
previous_snapshots = {
592+
s.name: s for s in self.state_reader.get_snapshots(previous_snapshot_ids).values()
593+
}
590594

591595
# Check if any of the snapshots have modifications to the audits field by comparing the hashes
592596
audit_snapshots = {}
593-
for snapshot, previous_snapshot in zip(metadata_snapshots, previous_snapshots):
597+
for snapshot in metadata_snapshots:
598+
if snapshot.name not in previous_snapshots:
599+
continue
600+
601+
previous_snapshot = previous_snapshots[snapshot.name]
594602
new_audits_hash = snapshot.model.audit_metadata_hash()
595603
previous_audit_hash = previous_snapshot.model.audit_metadata_hash()
596604

sqlmesh/core/scheduler.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,9 @@ def _dag(
626626
dag = DAG[SchedulingUnit]()
627627

628628
for snapshot_id in snapshot_dag:
629+
if snapshot_id.name not in self.snapshots_by_name:
630+
continue
631+
629632
snapshot = self.snapshots_by_name[snapshot_id.name]
630633
intervals = intervals_per_snapshot.get(snapshot.name, [])
631634

tests/core/test_integration.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6294,6 +6294,31 @@ def test_restatement_shouldnt_backfill_beyond_prod_intervals(init_and_plan_conte
62946294
].intervals[-1][1] == to_timestamp("2023-01-08 00:00:00 UTC")
62956295

62966296

6297+
@time_machine.travel("2023-01-08 15:00:00 UTC")
6298+
@use_terminal_console
6299+
def test_audit_only_metadata_change(init_and_plan_context: t.Callable):
6300+
context, plan = init_and_plan_context("examples/sushi")
6301+
context.apply(plan)
6302+
6303+
# Add a new audit
6304+
model = context.get_model("sushi.waiter_revenue_by_day")
6305+
audits = model.audits.copy()
6306+
audits.append(("number_of_rows", {"threshold": exp.Literal.number(1)}))
6307+
model = model.copy(update={"audits": audits})
6308+
context.upsert_model(model)
6309+
6310+
plan = context.plan_builder("prod", skip_tests=True).build()
6311+
assert len(plan.new_snapshots) == 2
6312+
assert all(s.change_category.is_metadata for s in plan.new_snapshots)
6313+
assert not plan.missing_intervals
6314+
6315+
with capture_output() as output:
6316+
context.apply(plan)
6317+
6318+
assert "Auditing models" in output.stdout
6319+
assert model.name in output.stdout
6320+
6321+
62976322
def initial_add(context: Context, environment: str):
62986323
assert not context.state_reader.get_environment(environment)
62996324

0 commit comments

Comments
 (0)