Skip to content

Commit f072873

Browse files
committed
Tidy up
1 parent e674d73 commit f072873

File tree

8 files changed

+34
-43
lines changed

8 files changed

+34
-43
lines changed

docs/integrations/engines/snowflake.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,14 @@ And confirm that our schemas and objects exist in the Snowflake catalog:
250250

251251
Congratulations - your SQLMesh project is up and running on Snowflake!
252252

253+
### Where are the row counts?
254+
255+
SQLMesh reports the number of rows processed by each model in its `plan` and `run` terminal output.
256+
257+
However, due to limitations in the Snowflake Python connector, row counts cannot be determined for `CREATE TABLE AS` statements. Therefore, SQLMesh does not report row counts for certain model kinds, such as `FULL` models.
258+
259+
Learn more about this connector limitation in this [Github issue](https://github.com/snowflakedb/snowflake-connector-python/issues/645).
260+
253261
## Local/Built-in Scheduler
254262
**Engine Adapter Type**: `snowflake`
255263

sqlmesh/core/engine_adapter/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2464,7 +2464,7 @@ def _execute(self, sql: str, track_rows_processed: bool = False, **kwargs: t.Any
24642464
try:
24652465
rowcount = int(rowcount_raw)
24662466
except (TypeError, ValueError):
2467-
pass
2467+
return
24682468

24692469
self._record_execution_stats(sql, rowcount)
24702470

sqlmesh/core/engine_adapter/snowflake.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -686,15 +686,17 @@ def _record_execution_stats(
686686
if results:
687687
try:
688688
results_str = str(results[0])
689-
except (ValueError, TypeError):
689+
except (TypeError, ValueError, IndexError):
690690
return
691691

692692
# Snowflake identifiers may be:
693693
# - An unquoted contiguous set of [a-zA-Z0-9_$] characters
694694
# - A double-quoted string that may contain spaces and nested double-quotes represented by `""`. Example: " my ""table"" name "
695-
is_created = re.match(r'Table [a-zA-Z0-9_$"]*? successfully created\.', results_str)
695+
is_created = re.match(
696+
r'Table [a-zA-Z0-9_$ "]*? successfully created\.', results_str
697+
)
696698
is_already_exists = re.match(
697-
r'[a-zA-Z0-9_$"]*? already exists, statement succeeded\.',
699+
r'[a-zA-Z0-9_$ "]*? already exists, statement succeeded\.',
698700
results_str,
699701
)
700702
if is_created or is_already_exists:

sqlmesh/core/scheduler.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -514,21 +514,18 @@ def run_node(node: SchedulingUnit) -> None:
514514
execution_time=execution_time,
515515
)
516516
else:
517-
with self.snapshot_evaluator.execution_tracker.track_execution(
518-
f"{snapshot.name}_{node.batch_index}"
519-
) as execution_context:
520-
audit_results = self.evaluate(
521-
snapshot=snapshot,
522-
environment_naming_info=environment_naming_info,
523-
start=start,
524-
end=end,
525-
execution_time=execution_time,
526-
deployability_index=deployability_index,
527-
batch_index=node.batch_index,
528-
allow_destructive_snapshots=allow_destructive_snapshots,
529-
allow_additive_snapshots=allow_additive_snapshots,
530-
target_table_exists=snapshot.snapshot_id not in snapshots_to_create,
531-
)
517+
audit_results = self.evaluate(
518+
snapshot=snapshot,
519+
environment_naming_info=environment_naming_info,
520+
start=start,
521+
end=end,
522+
execution_time=execution_time,
523+
deployability_index=deployability_index,
524+
batch_index=node.batch_index,
525+
allow_destructive_snapshots=allow_destructive_snapshots,
526+
allow_additive_snapshots=allow_additive_snapshots,
527+
target_table_exists=snapshot.snapshot_id not in snapshots_to_create,
528+
)
532529

533530
evaluation_duration_ms = now_timestamp() - execution_start_ts
534531
finally:
@@ -547,6 +544,9 @@ def run_node(node: SchedulingUnit) -> None:
547544
num_audits - num_audits_failed,
548545
num_audits_failed,
549546
execution_stats=execution_stats,
547+
auto_restatement_triggers=auto_restatement_triggers.get(
548+
snapshot.snapshot_id
549+
),
550550
)
551551
elif isinstance(node, CreateNode):
552552
self.snapshot_evaluator.create_snapshot(

sqlmesh/core/snapshot/execution_tracker.py

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import annotations
22

3-
import time
43
import typing as t
54
from contextlib import contextmanager
65
from threading import local, Lock
@@ -12,10 +11,6 @@ class QueryExecutionStats:
1211
snapshot_batch_id: str
1312
total_rows_processed: t.Optional[int] = None
1413
total_bytes_processed: t.Optional[int] = None
15-
query_count: int = 0
16-
queries_executed: t.List[t.Tuple[str, t.Optional[int], t.Optional[int], float]] = field(
17-
default_factory=list
18-
)
1914

2015

2116
@dataclass
@@ -26,10 +21,8 @@ class QueryExecutionContext:
2621
It accumulates statistics from multiple cursor.execute() calls during a single snapshot evaluation.
2722
2823
Attributes:
29-
id: Identifier linking this context to a specific operation
30-
total_rows_processed: Running sum of cursor.rowcount from all executed queries during evaluation
31-
query_count: Total number of SQL statements executed
32-
queries_executed: List of (sql_snippet, row_count, timestamp) tuples for debugging
24+
snapshot_batch_id: Identifier linking this context to a specific snapshot evaluation
25+
stats: Running sum of cursor.rowcount and possibly bytes processed from all executed queries during evaluation
3326
"""
3427

3528
snapshot_batch_id: str
@@ -55,20 +48,12 @@ def add_execution(
5548
else:
5649
self.stats.total_bytes_processed += bytes_processed
5750

58-
self.stats.query_count += 1
59-
# TODO: remove this
60-
# for debugging
61-
self.stats.queries_executed.append((sql[:300], row_count, bytes_processed, time.time()))
62-
6351
def get_execution_stats(self) -> QueryExecutionStats:
6452
return self.stats
6553

6654

6755
class QueryExecutionTracker:
68-
"""
69-
Thread-local context manager for snapshot execution statistics, such as
70-
rows processed.
71-
"""
56+
"""Thread-local context manager for snapshot execution statistics, such as rows processed."""
7257

7358
_thread_local = local()
7459
_contexts: t.Dict[str, QueryExecutionContext] = {}
@@ -86,9 +71,7 @@ def is_tracking(cls) -> bool:
8671
def track_execution(
8772
self, snapshot_id_batch: str
8873
) -> t.Iterator[t.Optional[QueryExecutionContext]]:
89-
"""
90-
Context manager for tracking snapshot execution statistics.
91-
"""
74+
"""Context manager for tracking snapshot execution statistics such as row counts and bytes processed."""
9275
context = QueryExecutionContext(snapshot_batch_id=snapshot_id_batch)
9376
self._thread_local.context = context
9477
with self._contexts_lock:

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2434,6 +2434,7 @@ def capture_execution_stats(
24342434
num_audits_failed,
24352435
audit_only=False,
24362436
execution_stats=None,
2437+
auto_restatement_triggers=None,
24372438
):
24382439
if execution_stats is not None:
24392440
actual_execution_stats[snapshot.model.name.replace(f"{schema_name}.", "")] = (

tests/core/engine_adapter/integration/test_integration_snowflake.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,5 +324,4 @@ def test_rows_tracker(ctx: TestContext, engine_adapter: SnowflakeEngineAdapter):
324324

325325
stats = tracker.get_execution_stats("a")
326326
assert stats is not None
327-
assert stats.query_count == 2
328327
assert stats.total_rows_processed == 3

tests/core/test_execution_tracker.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,4 @@ def worker(id: str, row_counts: list[int]) -> QueryExecutionStats:
3434
by_batch = {s.snapshot_batch_id: s for s in results}
3535

3636
assert by_batch["batch_A"].total_rows_processed == 15
37-
assert by_batch["batch_A"].query_count == 2
3837
assert by_batch["batch_B"].total_rows_processed == 10
39-
assert by_batch["batch_B"].query_count == 2

0 commit comments

Comments
 (0)