Fix: Preserve the DAG evaluation order even when a transitive dependency is not included#5335
Fix: Preserve the DAG evaluation order even when a transitive dependency is not included#5335izeigerman merged 1 commit intomainfrom
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR fixes an issue where models run out of order due to transitive model dependencies being missing in the DAG when using selectors. The fix ensures that DAG evaluation order is preserved even when intermediate dependencies are not explicitly selected for execution.
Key changes:
- Modified scheduler to use a full DAG including all snapshots and create a subdag with transitive dependencies
- Added recursive dependency resolution to handle transitive dependencies when intermediate nodes are missing
- Added comprehensive test coverage for both simple and complex dependency chains
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| sqlmesh/core/scheduler.py | Core fix to preserve DAG ordering by building full DAG and creating subdag with transitive dependencies |
| tests/core/test_scheduler.py | Unit tests for simple and complex transitive dependency scenarios |
| tests/core/test_integration.py | Integration test verifying the fix works end-to-end with model selection |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
sqlmesh/core/scheduler.py
Outdated
| def find_upstream_dependencies(sid: SnapshotId) -> t.List[SchedulingUnit]: | ||
| if sid not in self.snapshots: | ||
| return [] | ||
|
|
||
| p_intervals = intervals_per_snapshot.get(sid.name, []) | ||
|
|
||
| if p_intervals: | ||
| if len(p_intervals) > 1: | ||
| return [DummyNode(snapshot_name=sid.name)] | ||
| interval = p_intervals[0] | ||
| return [EvaluateNode(snapshot_name=sid.name, interval=interval, batch_index=0)] | ||
| if sid in original_snapshots_to_create: | ||
| return [CreateNode(snapshot_name=sid.name)] | ||
| # This snapshot has no intervals and doesn't need creation which means | ||
| # that it can be a transitive dependency | ||
| transitive_deps: t.List[SchedulingUnit] = [] | ||
| parent_snapshot = self.snapshots[sid] | ||
| for grandparent_sid in parent_snapshot.parents: | ||
| transitive_deps.extend(find_upstream_dependencies(grandparent_sid)) | ||
| return transitive_deps |
There was a problem hiding this comment.
The recursive function find_upstream_dependencies could potentially cause infinite recursion or stack overflow in case of circular dependencies. Consider adding a visited set parameter to prevent revisiting the same snapshot ID during recursion.
There was a problem hiding this comment.
We validate circular dependency at the DAG construction time.
3d68e4f to
58db26e
Compare
…ncy is not included
58db26e to
c47efea
Compare
This fixes an issue where models run out of order due to a transitive model dependency being missing in the DAG, either because of a selector or other reasons