Skip to content

Commit fd915e9

Browse files
committed
Tidy up
1 parent 7f7fe61 commit fd915e9

File tree

10 files changed

+57
-58
lines changed

10 files changed

+57
-58
lines changed

.circleci/continue_config.yml

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ jobs:
239239
- checkout
240240
- run:
241241
name: Install OS-level dependencies
242-
command: ./.circleci/install-prerequisites.sh "<< parameters.engine >>"
242+
command: ./.circleci/install-prerequisites.sh "<< parameters.engine >>"
243243
- run:
244244
name: Generate database name
245245
command: |
@@ -297,9 +297,8 @@ workflows:
297297
name: cloud_engine_<< matrix.engine >>
298298
context:
299299
- sqlmesh_cloud_database_integration
300-
# TODO: uncomment this
301-
# requires:
302-
# - engine_tests_docker
300+
requires:
301+
- engine_tests_docker
303302
matrix:
304303
parameters:
305304
engine:
@@ -308,14 +307,13 @@ workflows:
308307
- redshift
309308
- bigquery
310309
- clickhouse-cloud
311-
- athena
310+
- athena
312311
- fabric
313312
- gcp-postgres
314-
# TODO: uncomment this
315-
# filters:
316-
# branches:
317-
# only:
318-
# - main
313+
filters:
314+
branches:
315+
only:
316+
- main
319317
- ui_style
320318
- ui_test
321319
- vscode_test

docs/integrations/engines/snowflake.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,14 @@ And confirm that our schemas and objects exist in the Snowflake catalog:
250250

251251
Congratulations - your SQLMesh project is up and running on Snowflake!
252252

253+
### Where are the row counts?
254+
255+
SQLMesh reports the number of rows processed by each model in its `plan` and `run` terminal output.
256+
257+
However, due to limitations in the Snowflake Python connector, row counts cannot be determined for `CREATE TABLE AS` statements. Therefore, SQLMesh does not report row counts for certain model kinds, such as `FULL` models.
258+
259+
Learn more about the connector limitation [on Github](https://github.com/snowflakedb/snowflake-connector-python/issues/645).
260+
253261
## Local/Built-in Scheduler
254262
**Engine Adapter Type**: `snowflake`
255263

sqlmesh/core/console.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4030,9 +4030,7 @@ def show_table_diff_summary(self, table_diff: TableDiff) -> None:
40304030
self._write(f"Join On: {keys}")
40314031

40324032

4033-
# TODO: remove this
4034-
# _CONSOLE: Console = NoopConsole()
4035-
_CONSOLE: Console = TerminalConsole()
4033+
_CONSOLE: Console = NoopConsole()
40364034

40374035

40384036
def set_console(console: Console) -> None:

sqlmesh/core/engine_adapter/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2464,7 +2464,7 @@ def _execute(self, sql: str, track_rows_processed: bool = False, **kwargs: t.Any
24642464
try:
24652465
rowcount = int(rowcount_raw)
24662466
except (TypeError, ValueError):
2467-
pass
2467+
return
24682468

24692469
self._record_execution_stats(sql, rowcount)
24702470

sqlmesh/core/engine_adapter/snowflake.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -680,21 +680,25 @@ def _record_execution_stats(
680680
681681
We do not want to record the incorrect row count of 1, so we check whether that row contains the table
682682
successfully created string. If so, we return early and do not record the row count.
683+
684+
Ref: https://github.com/snowflakedb/snowflake-connector-python/issues/645
683685
"""
684686
if rowcount == 1:
685687
results = self.cursor.fetchone()
686688
if results:
687689
try:
688690
results_str = str(results[0])
689-
except (ValueError, TypeError):
691+
except (TypeError, ValueError, IndexError):
690692
return
691693

692694
# Snowflake identifiers may be:
693695
# - An unquoted contiguous set of [a-zA-Z0-9_$] characters
694696
# - A double-quoted string that may contain spaces and nested double-quotes represented by `""`. Example: " my ""table"" name "
695-
is_created = re.match(r'Table [a-zA-Z0-9_$"]*? successfully created\.', results_str)
697+
is_created = re.match(
698+
r'Table [a-zA-Z0-9_$ "]*? successfully created\.', results_str
699+
)
696700
is_already_exists = re.match(
697-
r'[a-zA-Z0-9_$"]*? already exists, statement succeeded\.',
701+
r'[a-zA-Z0-9_$ "]*? already exists, statement succeeded\.',
698702
results_str,
699703
)
700704
if is_created or is_already_exists:

sqlmesh/core/scheduler.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -514,21 +514,18 @@ def run_node(node: SchedulingUnit) -> None:
514514
execution_time=execution_time,
515515
)
516516
else:
517-
with self.snapshot_evaluator.execution_tracker.track_execution(
518-
f"{snapshot.name}_{node.batch_index}"
519-
) as execution_context:
520-
audit_results = self.evaluate(
521-
snapshot=snapshot,
522-
environment_naming_info=environment_naming_info,
523-
start=start,
524-
end=end,
525-
execution_time=execution_time,
526-
deployability_index=deployability_index,
527-
batch_index=node.batch_index,
528-
allow_destructive_snapshots=allow_destructive_snapshots,
529-
allow_additive_snapshots=allow_additive_snapshots,
530-
target_table_exists=snapshot.snapshot_id not in snapshots_to_create,
531-
)
517+
audit_results = self.evaluate(
518+
snapshot=snapshot,
519+
environment_naming_info=environment_naming_info,
520+
start=start,
521+
end=end,
522+
execution_time=execution_time,
523+
deployability_index=deployability_index,
524+
batch_index=node.batch_index,
525+
allow_destructive_snapshots=allow_destructive_snapshots,
526+
allow_additive_snapshots=allow_additive_snapshots,
527+
target_table_exists=snapshot.snapshot_id not in snapshots_to_create,
528+
)
532529

533530
evaluation_duration_ms = now_timestamp() - execution_start_ts
534531
finally:
@@ -547,6 +544,9 @@ def run_node(node: SchedulingUnit) -> None:
547544
num_audits - num_audits_failed,
548545
num_audits_failed,
549546
execution_stats=execution_stats,
547+
auto_restatement_triggers=auto_restatement_triggers.get(
548+
snapshot.snapshot_id
549+
),
550550
)
551551
elif isinstance(node, CreateNode):
552552
self.snapshot_evaluator.create_snapshot(

sqlmesh/core/snapshot/execution_tracker.py

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import annotations
22

3-
import time
43
import typing as t
54
from contextlib import contextmanager
65
from threading import local, Lock
@@ -12,10 +11,6 @@ class QueryExecutionStats:
1211
snapshot_batch_id: str
1312
total_rows_processed: t.Optional[int] = None
1413
total_bytes_processed: t.Optional[int] = None
15-
query_count: int = 0
16-
queries_executed: t.List[t.Tuple[str, t.Optional[int], t.Optional[int], float]] = field(
17-
default_factory=list
18-
)
1914

2015

2116
@dataclass
@@ -26,10 +21,8 @@ class QueryExecutionContext:
2621
It accumulates statistics from multiple cursor.execute() calls during a single snapshot evaluation.
2722
2823
Attributes:
29-
id: Identifier linking this context to a specific operation
30-
total_rows_processed: Running sum of cursor.rowcount from all executed queries during evaluation
31-
query_count: Total number of SQL statements executed
32-
queries_executed: List of (sql_snippet, row_count, timestamp) tuples for debugging
24+
snapshot_batch_id: Identifier linking this context to a specific snapshot evaluation
25+
stats: Running sum of cursor.rowcount and possibly bytes processed from all executed queries during evaluation
3326
"""
3427

3528
snapshot_batch_id: str
@@ -55,20 +48,12 @@ def add_execution(
5548
else:
5649
self.stats.total_bytes_processed += bytes_processed
5750

58-
self.stats.query_count += 1
59-
# TODO: remove this
60-
# for debugging
61-
self.stats.queries_executed.append((sql[:300], row_count, bytes_processed, time.time()))
62-
6351
def get_execution_stats(self) -> QueryExecutionStats:
6452
return self.stats
6553

6654

6755
class QueryExecutionTracker:
68-
"""
69-
Thread-local context manager for snapshot execution statistics, such as
70-
rows processed.
71-
"""
56+
"""Thread-local context manager for snapshot execution statistics, such as rows processed."""
7257

7358
_thread_local = local()
7459
_contexts: t.Dict[str, QueryExecutionContext] = {}
@@ -86,9 +71,7 @@ def is_tracking(cls) -> bool:
8671
def track_execution(
8772
self, snapshot_id_batch: str
8873
) -> t.Iterator[t.Optional[QueryExecutionContext]]:
89-
"""
90-
Context manager for tracking snapshot execution statistics.
91-
"""
74+
"""Context manager for tracking snapshot execution statistics such as row counts and bytes processed."""
9275
context = QueryExecutionContext(snapshot_batch_id=snapshot_id_batch)
9376
self._thread_local.context = context
9477
with self._contexts_lock:

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2434,6 +2434,7 @@ def capture_execution_stats(
24342434
num_audits_failed,
24352435
audit_only=False,
24362436
execution_stats=None,
2437+
auto_restatement_triggers=None,
24372438
):
24382439
if execution_stats is not None:
24392440
actual_execution_stats[snapshot.model.name.replace(f"{schema_name}.", "")] = (

tests/core/engine_adapter/integration/test_integration_snowflake.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212
from sqlmesh.core.plan import Plan
1313
from tests.core.engine_adapter.integration import TestContext
1414
from sqlmesh import model, ExecutionContext
15+
from pytest_mock import MockerFixture
16+
from sqlmesh.core.snapshot.execution_tracker import (
17+
QueryExecutionContext,
18+
QueryExecutionTracker,
19+
)
1520
from sqlmesh.core.model import ModelKindName
16-
from sqlmesh.core.snapshot.execution_tracker import QueryExecutionTracker
1721
from datetime import datetime
1822

1923
from tests.core.engine_adapter.integration import (
@@ -310,10 +314,14 @@ def fetch_database_names() -> t.Set[str]:
310314
assert fetch_database_names() == {non_sqlmesh_managed_catalog}
311315

312316

313-
def test_rows_tracker(ctx: TestContext, engine_adapter: SnowflakeEngineAdapter):
317+
def test_rows_tracker(
318+
ctx: TestContext, engine_adapter: SnowflakeEngineAdapter, mocker: MockerFixture
319+
):
314320
sqlmesh = ctx.create_context()
315321
tracker = QueryExecutionTracker()
316322

323+
add_execution_spy = mocker.spy(QueryExecutionContext, "add_execution")
324+
317325
with tracker.track_execution("a"):
318326
# Snowflake doesn't report row counts for CTAS, so this should not be tracked
319327
engine_adapter.execute(
@@ -322,7 +330,8 @@ def test_rows_tracker(ctx: TestContext, engine_adapter: SnowflakeEngineAdapter):
322330
engine_adapter.execute("INSERT INTO a VALUES (2), (3)", track_rows_processed=True)
323331
engine_adapter.execute("INSERT INTO a VALUES (4)", track_rows_processed=True)
324332

333+
assert add_execution_spy.call_count == 2
334+
325335
stats = tracker.get_execution_stats("a")
326336
assert stats is not None
327-
assert stats.query_count == 2
328337
assert stats.total_rows_processed == 3

tests/core/test_execution_tracker.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,4 @@ def worker(id: str, row_counts: list[int]) -> QueryExecutionStats:
3434
by_batch = {s.snapshot_batch_id: s for s in results}
3535

3636
assert by_batch["batch_A"].total_rows_processed == 15
37-
assert by_batch["batch_A"].query_count == 2
3837
assert by_batch["batch_B"].total_rows_processed == 10
39-
assert by_batch["batch_B"].query_count == 2

0 commit comments

Comments
 (0)