Skip to content

Commit af0f2cc

Browse files
committed
Fix rebase
1 parent 583b39d commit af0f2cc

File tree

2 files changed

+49
-53
lines changed

2 files changed

+49
-53
lines changed

sqlmesh/core/scheduler.py

Lines changed: 49 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -485,33 +485,33 @@ def run_node(node: SchedulingUnit) -> None:
485485
if isinstance(node, DummyNode):
486486
return
487487

488-
with QueryExecutionTracker.track_execution(
489-
f"{snapshot.name}_{batch_idx}"
490-
) as execution_context:
491-
snapshot = self.snapshots_by_name[node.snapshot_name]
492-
493-
if isinstance(node, EvaluateNode):
494-
self.console.start_snapshot_evaluation_progress(snapshot)
495-
execution_start_ts = now_timestamp()
496-
evaluation_duration_ms: t.Optional[int] = None
497-
start, end = node.interval
498-
499-
audit_results: t.List[AuditResult] = []
500-
try:
501-
assert execution_time # mypy
502-
assert deployability_index # mypy
503-
504-
if audit_only:
505-
audit_results = self._audit_snapshot(
506-
snapshot=snapshot,
507-
environment_naming_info=environment_naming_info,
508-
deployability_index=deployability_index,
509-
snapshots=self.snapshots_by_name,
510-
start=start,
511-
end=end,
512-
execution_time=execution_time,
513-
)
514-
else:
488+
snapshot = self.snapshots_by_name[node.snapshot_name]
489+
490+
if isinstance(node, EvaluateNode):
491+
self.console.start_snapshot_evaluation_progress(snapshot)
492+
execution_start_ts = now_timestamp()
493+
evaluation_duration_ms: t.Optional[int] = None
494+
start, end = node.interval
495+
496+
audit_results: t.List[AuditResult] = []
497+
try:
498+
assert execution_time # mypy
499+
assert deployability_index # mypy
500+
501+
if audit_only:
502+
audit_results = self._audit_snapshot(
503+
snapshot=snapshot,
504+
environment_naming_info=environment_naming_info,
505+
deployability_index=deployability_index,
506+
snapshots=self.snapshots_by_name,
507+
start=start,
508+
end=end,
509+
execution_time=execution_time,
510+
)
511+
else:
512+
with self.snapshot_evaluator.execution_tracker.track_execution(
513+
f"{snapshot.name}_{node.batch_index}"
514+
) as execution_context:
515515
audit_results = self.evaluate(
516516
snapshot=snapshot,
517517
environment_naming_info=environment_naming_info,
@@ -524,34 +524,31 @@ def run_node(node: SchedulingUnit) -> None:
524524
target_table_exists=snapshot.snapshot_id not in snapshots_to_create,
525525
)
526526

527-
evaluation_duration_ms = now_timestamp() - execution_start_ts
528-
finally:
529-
num_audits = len(audit_results)
530-
num_audits_failed = sum(1 for result in audit_results if result.count)
527+
evaluation_duration_ms = now_timestamp() - execution_start_ts
528+
finally:
529+
num_audits = len(audit_results)
530+
num_audits_failed = sum(1 for result in audit_results if result.count)
531531

532-
execution_stats = self.snapshot_evaluator.execution_tracker.get_execution_stats(
533-
f"{snapshot.snapshot_id}_{batch_idx}"
534-
)
532+
execution_stats = self.snapshot_evaluator.execution_tracker.get_execution_stats(
533+
f"{snapshot.snapshot_id}_{node.batch_index}"
534+
)
535535

536-
self.console.update_snapshot_evaluation_progress(
537-
snapshot,
538-
batched_intervals[snapshot][node.batch_index],
539-
node.batch_index,
540-
evaluation_duration_ms,
541-
num_audits - num_audits_failed,
542-
num_audits_failed,
543-
execution_stats=execution_stats,
544-
auto_restatement_triggers=auto_restatement_triggers.get(
545-
snapshot.snapshot_id
546-
),
536+
self.console.update_snapshot_evaluation_progress(
537+
snapshot,
538+
batched_intervals[snapshot][node.batch_index],
539+
node.batch_index,
540+
evaluation_duration_ms,
541+
num_audits - num_audits_failed,
542+
num_audits_failed,
543+
execution_stats=execution_stats,
544+
)
545+
elif isinstance(node, CreateNode):
546+
self.snapshot_evaluator.create_snapshot(
547+
snapshot=snapshot,
548+
snapshots=self.snapshots_by_name,
549+
deployability_index=deployability_index,
550+
allow_destructive_snapshots=allow_destructive_snapshots or set(),
547551
)
548-
elif isinstance(node, CreateNode):
549-
self.snapshot_evaluator.create_snapshot(
550-
snapshot=snapshot,
551-
snapshots=self.snapshots_by_name,
552-
deployability_index=deployability_index,
553-
allow_destructive_snapshots=allow_destructive_snapshots or set(),
554-
)
555552

556553
try:
557554
with self.snapshot_evaluator.concurrent_context():

sqlmesh/core/snapshot/evaluator.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
SnapshotInfoLike,
6060
SnapshotTableCleanupTask,
6161
)
62-
from sqlmesh.core.snapshot.definition import parent_snapshots_by_name
6362
from sqlmesh.core.snapshot.execution_tracker import QueryExecutionTracker
6463
from sqlmesh.utils import random_id, CorrelationId
6564
from sqlmesh.utils.concurrency import (

0 commit comments

Comments
 (0)