Skip to content

Commit 72c0a7e

Browse files
committed
Fix rebase
1 parent c74d9f8 commit 72c0a7e

File tree

2 files changed

+50
-51
lines changed

2 files changed

+50
-51
lines changed

sqlmesh/core/scheduler.py

Lines changed: 50 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -484,33 +484,33 @@ def run_node(node: SchedulingUnit) -> None:
484484
if isinstance(node, DummyNode):
485485
return
486486

487-
with QueryExecutionTracker.track_execution(
488-
f"{snapshot.name}_{batch_idx}"
489-
) as execution_context:
490-
snapshot = self.snapshots_by_name[node.snapshot_name]
491-
492-
if isinstance(node, EvaluateNode):
493-
self.console.start_snapshot_evaluation_progress(snapshot)
494-
execution_start_ts = now_timestamp()
495-
evaluation_duration_ms: t.Optional[int] = None
496-
start, end = node.interval
497-
498-
audit_results: t.List[AuditResult] = []
499-
try:
500-
assert execution_time # mypy
501-
assert deployability_index # mypy
502-
503-
if audit_only:
504-
audit_results = self._audit_snapshot(
505-
snapshot=snapshot,
506-
environment_naming_info=environment_naming_info,
507-
deployability_index=deployability_index,
508-
snapshots=self.snapshots_by_name,
509-
start=start,
510-
end=end,
511-
execution_time=execution_time,
512-
)
513-
else:
487+
snapshot = self.snapshots_by_name[node.snapshot_name]
488+
489+
if isinstance(node, EvaluateNode):
490+
self.console.start_snapshot_evaluation_progress(snapshot)
491+
execution_start_ts = now_timestamp()
492+
evaluation_duration_ms: t.Optional[int] = None
493+
start, end = node.interval
494+
495+
audit_results: t.List[AuditResult] = []
496+
try:
497+
assert execution_time # mypy
498+
assert deployability_index # mypy
499+
500+
if audit_only:
501+
audit_results = self._audit_snapshot(
502+
snapshot=snapshot,
503+
environment_naming_info=environment_naming_info,
504+
deployability_index=deployability_index,
505+
snapshots=self.snapshots_by_name,
506+
start=start,
507+
end=end,
508+
execution_time=execution_time,
509+
)
510+
else:
511+
with self.snapshot_evaluator.execution_tracker.track_execution(
512+
f"{snapshot.name}_{node.batch_index}"
513+
) as execution_context:
514514
audit_results = self.evaluate(
515515
snapshot=snapshot,
516516
environment_naming_info=environment_naming_info,
@@ -523,31 +523,31 @@ def run_node(node: SchedulingUnit) -> None:
523523
target_table_exists=snapshot.snapshot_id not in snapshots_to_create,
524524
)
525525

526-
evaluation_duration_ms = now_timestamp() - execution_start_ts
527-
finally:
528-
num_audits = len(audit_results)
529-
num_audits_failed = sum(1 for result in audit_results if result.count)
526+
evaluation_duration_ms = now_timestamp() - execution_start_ts
527+
finally:
528+
num_audits = len(audit_results)
529+
num_audits_failed = sum(1 for result in audit_results if result.count)
530530

531-
execution_stats = self.snapshot_evaluator.execution_tracker.get_execution_stats(
532-
f"{snapshot.snapshot_id}_{batch_idx}"
533-
)
531+
execution_stats = self.snapshot_evaluator.execution_tracker.get_execution_stats(
532+
f"{snapshot.snapshot_id}_{node.batch_index}"
533+
)
534534

535-
self.console.update_snapshot_evaluation_progress(
536-
snapshot,
537-
batched_intervals[snapshot][node.batch_index],
538-
node.batch_index,
539-
evaluation_duration_ms,
540-
num_audits - num_audits_failed,
541-
num_audits_failed,
542-
execution_stats=execution_stats,
543-
)
544-
elif isinstance(node, CreateNode):
545-
self.snapshot_evaluator.create_snapshot(
546-
snapshot=snapshot,
547-
snapshots=self.snapshots_by_name,
548-
deployability_index=deployability_index,
549-
allow_destructive_snapshots=allow_destructive_snapshots or set(),
550-
)
535+
self.console.update_snapshot_evaluation_progress(
536+
snapshot,
537+
batched_intervals[snapshot][node.batch_index],
538+
node.batch_index,
539+
evaluation_duration_ms,
540+
num_audits - num_audits_failed,
541+
num_audits_failed,
542+
execution_stats=execution_stats,
543+
)
544+
elif isinstance(node, CreateNode):
545+
self.snapshot_evaluator.create_snapshot(
546+
snapshot=snapshot,
547+
snapshots=self.snapshots_by_name,
548+
deployability_index=deployability_index,
549+
allow_destructive_snapshots=allow_destructive_snapshots or set(),
550+
)
551551

552552
try:
553553
with self.snapshot_evaluator.concurrent_context():

sqlmesh/core/snapshot/evaluator.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
SnapshotInfoLike,
6060
SnapshotTableCleanupTask,
6161
)
62-
from sqlmesh.core.snapshot.definition import parent_snapshots_by_name
6362
from sqlmesh.core.snapshot.execution_tracker import QueryExecutionTracker
6463
from sqlmesh.utils import random_id, CorrelationId
6564
from sqlmesh.utils.concurrency import (

0 commit comments

Comments
 (0)