Skip to content

Commit 2670119

Browse files
committed
adjust intervals based on force rebuild at runtime
1 parent 08d80bf commit 2670119

File tree

7 files changed

+315
-179
lines changed

7 files changed

+315
-179
lines changed

sqlmesh/core/plan/builder.py

Lines changed: 20 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
)
1717
from sqlmesh.core.context_diff import ContextDiff
1818
from sqlmesh.core.environment import EnvironmentNamingInfo
19+
from sqlmesh.core.plan.common import should_force_rebuild
1920
from sqlmesh.core.plan.definition import (
2021
Plan,
2122
SnapshotMapping,
@@ -276,7 +277,7 @@ def build(self) -> Plan:
276277

277278
self._check_destructive_changes(directly_modified)
278279
self._categorize_snapshots(dag, indirectly_modified)
279-
self._adjust_new_snapshot_intervals()
280+
self._adjust_snapshot_intervals()
280281

281282
deployability_index = (
282283
DeployabilityIndex.create(
@@ -508,21 +509,22 @@ def _build_models_to_backfill(
508509
).sorted
509510
}
510511

511-
def _adjust_new_snapshot_intervals(self) -> None:
512-
old_snapshots = {
513-
(old.name, old.version_get_or_generate()): old
514-
for _, old in self._context_diff.modified_snapshots.values()
515-
}
516-
517-
for new in self._context_diff.new_snapshots.values():
518-
new.intervals = []
519-
new.dev_intervals = []
520-
old = old_snapshots.get((new.name, new.version_get_or_generate()))
521-
if not old:
512+
def _adjust_snapshot_intervals(self) -> None:
513+
for new, old in self._context_diff.modified_snapshots.values():
514+
if not new.is_model or not old.is_model:
522515
continue
523-
new.merge_intervals(old)
524-
if new.is_forward_only:
525-
new.dev_intervals = new.intervals.copy()
516+
is_same_version = old.version_get_or_generate() == new.version_get_or_generate()
517+
if is_same_version and should_force_rebuild(old, new):
518+
# If the difference between 2 snapshots requires a full rebuild,
519+
# then clear the intervals for the new snapshot.
520+
self._context_diff.snapshots[new.snapshot_id].intervals = []
521+
elif new.snapshot_id in self._context_diff.new_snapshots:
522+
new.intervals = []
523+
new.dev_intervals = []
524+
if is_same_version:
525+
new.merge_intervals(old)
526+
if new.is_forward_only:
527+
new.dev_intervals = new.intervals.copy()
526528

527529
def _check_destructive_changes(self, directly_modified: t.Set[SnapshotId]) -> None:
528530
for s_id in sorted(directly_modified):
@@ -589,7 +591,7 @@ def _categorize_snapshots(
589591
forward_only = self._forward_only or self._is_forward_only_change(s_id)
590592
if forward_only and s_id.name in self._context_diff.modified_snapshots:
591593
new, old = self._context_diff.modified_snapshots[s_id.name]
592-
if _should_force_rebuild(old, new) or snapshot.is_seed:
594+
if should_force_rebuild(old, new) or snapshot.is_seed:
593595
# Breaking kind changes and seed changes can't be forward-only.
594596
forward_only = False
595597

@@ -614,7 +616,7 @@ def _categorize_snapshot(
614616
if self._context_diff.directly_modified(s_id.name):
615617
if self._auto_categorization_enabled:
616618
new, old = self._context_diff.modified_snapshots[s_id.name]
617-
if _should_force_rebuild(old, new):
619+
if should_force_rebuild(old, new):
618620
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, False)
619621
return
620622

@@ -772,7 +774,7 @@ def _is_forward_only_change(self, s_id: SnapshotId) -> bool:
772774
if snapshot.name in self._context_diff.modified_snapshots:
773775
_, old = self._context_diff.modified_snapshots[snapshot.name]
774776
# If the model kind has changed in a breaking way, then we can't consider this to be a forward-only change.
775-
if snapshot.is_model and _should_force_rebuild(old, snapshot):
777+
if snapshot.is_model and should_force_rebuild(old, snapshot):
776778
return False
777779
return (
778780
snapshot.is_model and snapshot.model.forward_only and bool(snapshot.previous_versions)
@@ -870,19 +872,3 @@ def _modified_and_added_snapshots(self) -> t.List[Snapshot]:
870872
if snapshot.name in self._context_diff.modified_snapshots
871873
or snapshot.snapshot_id in self._context_diff.added
872874
]
873-
874-
875-
def _should_force_rebuild(old: Snapshot, new: Snapshot) -> bool:
876-
if old.virtual_environment_mode != new.virtual_environment_mode:
877-
# If the virtual environment mode has changed, then we need to rebuild
878-
return True
879-
if old.model.kind.name == new.model.kind.name:
880-
# If the kind hasn't changed, then we don't need to rebuild
881-
return False
882-
if not old.is_incremental or not new.is_incremental:
883-
# If either is not incremental, then we need to rebuild
884-
return True
885-
if old.model.partitioned_by == new.model.partitioned_by:
886-
# If the partitioning hasn't changed, then we don't need to rebuild
887-
return False
888-
return True

sqlmesh/core/plan/common.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from __future__ import annotations
2+
3+
from sqlmesh.core.snapshot import Snapshot
4+
5+
6+
def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool:
7+
if old.virtual_environment_mode != new.virtual_environment_mode:
8+
# If the virtual environment mode has changed, then we need to rebuild
9+
return True
10+
if old.model.kind.name == new.model.kind.name:
11+
# If the kind hasn't changed, then we don't need to rebuild
12+
return False
13+
if not old.is_incremental or not new.is_incremental:
14+
# If either is not incremental, then we need to rebuild
15+
return True
16+
if old.model.partitioned_by == new.model.partitioned_by:
17+
# If the partitioning hasn't changed, then we don't need to rebuild
18+
return False
19+
return True

sqlmesh/core/plan/stages.py

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import typing as t
22

33
from dataclasses import dataclass
4-
from sqlmesh.core.environment import EnvironmentStatements, EnvironmentNamingInfo
4+
from sqlmesh.core.environment import EnvironmentStatements, EnvironmentNamingInfo, Environment
5+
from sqlmesh.core.plan.common import should_force_rebuild
56
from sqlmesh.core.plan.definition import EvaluatablePlan
67
from sqlmesh.core.state_sync import StateReader
78
from sqlmesh.core.scheduler import merged_missing_intervals, SnapshotToIntervals
@@ -230,8 +231,9 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
230231
all_selected_for_backfill_snapshots = {
231232
s.snapshot_id for s in snapshots.values() if plan.is_selected_for_backfill(s.name)
232233
}
234+
existing_environment = self.state_reader.get_environment(plan.environment.name)
233235

234-
self._adjust_intervals(snapshots_by_name, plan)
236+
self._adjust_intervals(snapshots_by_name, plan, existing_environment)
235237

236238
deployability_index = DeployabilityIndex.create(snapshots, start=plan.start)
237239
deployability_index_for_creation = deployability_index
@@ -269,7 +271,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
269271
missing_intervals_after_promote[snapshot] = intervals
270272

271273
promoted_snapshots, demoted_snapshots, demoted_environment_naming_info = (
272-
self._get_promoted_demoted_snapshots(plan)
274+
self._get_promoted_demoted_snapshots(plan, existing_environment)
273275
)
274276

275277
stages: t.List[PlanStage] = []
@@ -459,11 +461,10 @@ def _should_update_virtual_layer(snapshot: SnapshotTableInfo) -> bool:
459461
)
460462

461463
def _get_promoted_demoted_snapshots(
462-
self, plan: EvaluatablePlan
464+
self, plan: EvaluatablePlan, existing_environment: t.Optional[Environment]
463465
) -> t.Tuple[
464466
t.Set[SnapshotTableInfo], t.Set[SnapshotTableInfo], t.Optional[EnvironmentNamingInfo]
465467
]:
466-
existing_environment = self.state_reader.get_environment(plan.environment.name)
467468
if existing_environment:
468469
new_table_infos = {
469470
table_info.name: table_info for table_info in plan.environment.promoted_snapshots
@@ -579,10 +580,40 @@ def _should_create(s: Snapshot) -> bool:
579580
return [s for s in snapshots.values() if _should_create(s)]
580581

581582
def _adjust_intervals(
582-
self, snapshots_by_name: t.Dict[str, Snapshot], plan: EvaluatablePlan
583+
self,
584+
snapshots_by_name: t.Dict[str, Snapshot],
585+
plan: EvaluatablePlan,
586+
existing_environment: t.Optional[Environment],
583587
) -> None:
584588
# Make sure the intervals are up to date and restatements are reflected
585589
self.state_reader.refresh_snapshot_intervals(snapshots_by_name.values())
590+
591+
if existing_environment:
592+
new_snapshot_ids = set()
593+
new_snapshot_versions = set()
594+
for s in snapshots_by_name.values():
595+
if s.is_model:
596+
new_snapshot_ids.add(s.snapshot_id)
597+
new_snapshot_versions.add(s.name_version)
598+
# Only compare to old snapshots that share the same version as the new snapshots
599+
old_snapshot_ids = {
600+
s.snapshot_id
601+
for s in existing_environment.snapshots
602+
if s.is_model
603+
and s.name_version in new_snapshot_versions
604+
and s.snapshot_id not in new_snapshot_ids
605+
}
606+
if old_snapshot_ids:
607+
old_snapshots = self.state_reader.get_snapshots(old_snapshot_ids)
608+
for old in old_snapshots.values():
609+
new = snapshots_by_name.get(old.name)
610+
if not new or old.version != new.version:
611+
continue
612+
if should_force_rebuild(old, new):
613+
# If the difference between 2 snapshots requires a full rebuild,
614+
# then clear the intervals for the new snapshot.
615+
new.intervals = []
616+
586617
for new_snapshot in plan.new_snapshots:
587618
if new_snapshot.is_forward_only:
588619
# Forward-only snapshots inherit intervals in dev because of cloning

sqlmesh/core/snapshot/definition.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -893,19 +893,15 @@ def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None:
893893
Args:
894894
other: The target snapshot to inherit intervals from.
895895
"""
896-
if self.is_no_rebuild or self.virtual_environment_mode.is_full or not self.is_paused:
897-
# If the virtual environment mode is not full we can only merge prod intervals if this snapshot
898-
# is currently promoted in production or if it's forward-only / metadata / indirect non-breaking.
899-
# Otherwise, we want to ignore any existing intervals and backfill this snapshot from scratch.
900-
effective_from_ts = self.normalized_effective_from_ts or 0
901-
apply_effective_from = effective_from_ts > 0 and self.identifier != other.identifier
902-
for start, end in other.intervals:
903-
# If the effective_from is set, then intervals that come after it must come from
904-
# the current snapshots.
905-
if apply_effective_from and start < effective_from_ts:
906-
end = min(end, effective_from_ts)
907-
if not apply_effective_from or end <= effective_from_ts:
908-
self.add_interval(start, end)
896+
effective_from_ts = self.normalized_effective_from_ts or 0
897+
apply_effective_from = effective_from_ts > 0 and self.identifier != other.identifier
898+
for start, end in other.intervals:
899+
# If the effective_from is set, then intervals that come after it must come from
900+
# the current snapshots.
901+
if apply_effective_from and start < effective_from_ts:
902+
end = min(end, effective_from_ts)
903+
if not apply_effective_from or end <= effective_from_ts:
904+
self.add_interval(start, end)
909905

910906
if self.dev_version == other.dev_version:
911907
# Merge dev intervals if the dev versions match which would mean

tests/core/test_integration.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2694,7 +2694,11 @@ def test_restatement_plan_across_environments_snapshot_with_shared_version(
26942694
assert isinstance(previous_kind, IncrementalByTimeRangeKind)
26952695

26962696
model = model.copy(
2697-
update={"kind": IncrementalUnmanagedKind(), "physical_version": "pinned_version_12345"}
2697+
update={
2698+
"kind": IncrementalUnmanagedKind(),
2699+
"physical_version": "pinned_version_12345",
2700+
"partitioned_by_": [exp.column("event_date")],
2701+
}
26982702
)
26992703
context.upsert_model(model)
27002704
context.plan("prod", auto_apply=True, no_prompts=True)

0 commit comments

Comments
 (0)