Skip to content

Commit ff9305c

Browse files
authored
feat: track rows processed during model evaluation (#5162)
1 parent 66cd077 commit ff9305c

35 files changed

+525
-98
lines changed

.circleci/continue_config.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ jobs:
239239
- checkout
240240
- run:
241241
name: Install OS-level dependencies
242-
command: ./.circleci/install-prerequisites.sh "<< parameters.engine >>"
242+
command: ./.circleci/install-prerequisites.sh "<< parameters.engine >>"
243243
- run:
244244
name: Generate database name
245245
command: |
@@ -307,7 +307,7 @@ workflows:
307307
- redshift
308308
- bigquery
309309
- clickhouse-cloud
310-
- athena
310+
- athena
311311
- fabric
312312
- gcp-postgres
313313
filters:

docs/integrations/engines/snowflake.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,14 @@ And confirm that our schemas and objects exist in the Snowflake catalog:
250250

251251
Congratulations - your SQLMesh project is up and running on Snowflake!
252252

253+
### Where are the row counts?
254+
255+
SQLMesh reports the number of rows processed by each model in its `plan` and `run` terminal output.
256+
257+
However, due to limitations in the Snowflake Python connector, row counts cannot be determined for `CREATE TABLE AS` statements. Therefore, SQLMesh does not report row counts for certain model kinds, such as `FULL` models.
258+
259+
Learn more about the connector limitation [on Github](https://github.com/snowflakedb/snowflake-connector-python/issues/645).
260+
253261
## Local/Built-in Scheduler
254262
**Engine Adapter Type**: `snowflake`
255263

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ dependencies = [
1212
"croniter",
1313
"duckdb>=0.10.0,!=0.10.3",
1414
"dateparser<=1.2.1",
15+
"humanize",
1516
"hyperscript>=0.1.0",
1617
"importlib-metadata; python_version<'3.12'",
1718
"ipywidgets",

sqlmesh/core/console.py

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import uuid
88
import logging
99
import textwrap
10+
from humanize import metric, naturalsize
1011
from itertools import zip_longest
1112
from pathlib import Path
1213
from hyperscript import h
@@ -39,6 +40,7 @@
3940
SnapshotInfoLike,
4041
)
4142
from sqlmesh.core.snapshot.definition import Interval, Intervals, SnapshotTableInfo
43+
from sqlmesh.core.snapshot.execution_tracker import QueryExecutionStats
4244
from sqlmesh.core.test import ModelTest
4345
from sqlmesh.utils import rich as srich
4446
from sqlmesh.utils import Verbosity
@@ -439,6 +441,7 @@ def update_snapshot_evaluation_progress(
439441
num_audits_passed: int,
440442
num_audits_failed: int,
441443
audit_only: bool = False,
444+
execution_stats: t.Optional[QueryExecutionStats] = None,
442445
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
443446
) -> None:
444447
"""Updates the snapshot evaluation progress."""
@@ -587,6 +590,7 @@ def update_snapshot_evaluation_progress(
587590
num_audits_passed: int,
588591
num_audits_failed: int,
589592
audit_only: bool = False,
593+
execution_stats: t.Optional[QueryExecutionStats] = None,
590594
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
591595
) -> None:
592596
pass
@@ -1032,7 +1036,9 @@ def start_evaluation_progress(
10321036

10331037
# determine column widths
10341038
self.evaluation_column_widths["annotation"] = (
1035-
_calculate_annotation_str_len(batched_intervals, self.AUDIT_PADDING)
1039+
_calculate_annotation_str_len(
1040+
batched_intervals, self.AUDIT_PADDING, len(" (123.4m rows, 123.4 KiB)")
1041+
)
10361042
+ 3 # brackets and opening escape backslash
10371043
)
10381044
self.evaluation_column_widths["name"] = max(
@@ -1077,6 +1083,7 @@ def update_snapshot_evaluation_progress(
10771083
num_audits_passed: int,
10781084
num_audits_failed: int,
10791085
audit_only: bool = False,
1086+
execution_stats: t.Optional[QueryExecutionStats] = None,
10801087
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
10811088
) -> None:
10821089
"""Update the snapshot evaluation progress."""
@@ -1097,7 +1104,7 @@ def update_snapshot_evaluation_progress(
10971104
).ljust(self.evaluation_column_widths["name"])
10981105

10991106
annotation = _create_evaluation_model_annotation(
1100-
snapshot, _format_evaluation_model_interval(snapshot, interval)
1107+
snapshot, _format_evaluation_model_interval(snapshot, interval), execution_stats
11011108
)
11021109
audits_str = ""
11031110
if num_audits_passed:
@@ -3668,6 +3675,7 @@ def update_snapshot_evaluation_progress(
36683675
num_audits_passed: int,
36693676
num_audits_failed: int,
36703677
audit_only: bool = False,
3678+
execution_stats: t.Optional[QueryExecutionStats] = None,
36713679
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
36723680
) -> None:
36733681
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
@@ -3838,6 +3846,7 @@ def update_snapshot_evaluation_progress(
38383846
num_audits_passed: int,
38393847
num_audits_failed: int,
38403848
audit_only: bool = False,
3849+
execution_stats: t.Optional[QueryExecutionStats] = None,
38413850
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
38423851
) -> None:
38433852
message = f"Evaluated {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
@@ -4169,33 +4178,62 @@ def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) ->
41694178
return ""
41704179

41714180

4172-
def _create_evaluation_model_annotation(snapshot: Snapshot, interval_info: t.Optional[str]) -> str:
4181+
def _create_evaluation_model_annotation(
4182+
snapshot: Snapshot,
4183+
interval_info: t.Optional[str],
4184+
execution_stats: t.Optional[QueryExecutionStats],
4185+
) -> str:
4186+
annotation = None
4187+
execution_stats_str = ""
4188+
if execution_stats:
4189+
rows_processed = execution_stats.total_rows_processed
4190+
if rows_processed:
4191+
# 1.00 and 1.0 to 1
4192+
rows_processed_str = metric(rows_processed).replace(".00", "").replace(".0", "")
4193+
execution_stats_str += f"{rows_processed_str} row{'s' if rows_processed > 1 else ''}"
4194+
4195+
bytes_processed = execution_stats.total_bytes_processed
4196+
execution_stats_str += (
4197+
f"{', ' if execution_stats_str else ''}{naturalsize(bytes_processed, binary=True)}"
4198+
if bytes_processed
4199+
else ""
4200+
)
4201+
execution_stats_str = f" ({execution_stats_str})" if execution_stats_str else ""
4202+
41734203
if snapshot.is_audit:
4174-
return "run standalone audit"
4175-
if snapshot.is_model and snapshot.model.kind.is_external:
4176-
return "run external audits"
4177-
if snapshot.model.kind.is_seed:
4178-
return "insert seed file"
4179-
if snapshot.model.kind.is_full:
4180-
return "full refresh"
4181-
if snapshot.model.kind.is_view:
4182-
return "recreate view"
4183-
if snapshot.model.kind.is_incremental_by_unique_key:
4184-
return "insert/update rows"
4185-
if snapshot.model.kind.is_incremental_by_partition:
4186-
return "insert partitions"
4187-
4188-
return interval_info if interval_info else ""
4189-
4190-
4191-
def _calculate_interval_str_len(snapshot: Snapshot, intervals: t.List[Interval]) -> int:
4204+
annotation = "run standalone audit"
4205+
if snapshot.is_model:
4206+
if snapshot.model.kind.is_external:
4207+
annotation = "run external audits"
4208+
if snapshot.model.kind.is_view:
4209+
annotation = "recreate view"
4210+
if snapshot.model.kind.is_seed:
4211+
annotation = f"insert seed file{execution_stats_str}"
4212+
if snapshot.model.kind.is_full:
4213+
annotation = f"full refresh{execution_stats_str}"
4214+
if snapshot.model.kind.is_incremental_by_unique_key:
4215+
annotation = f"insert/update rows{execution_stats_str}"
4216+
if snapshot.model.kind.is_incremental_by_partition:
4217+
annotation = f"insert partitions{execution_stats_str}"
4218+
4219+
if annotation:
4220+
return annotation
4221+
4222+
return f"{interval_info}{execution_stats_str}" if interval_info else ""
4223+
4224+
4225+
def _calculate_interval_str_len(
4226+
snapshot: Snapshot,
4227+
intervals: t.List[Interval],
4228+
execution_stats: t.Optional[QueryExecutionStats] = None,
4229+
) -> int:
41924230
interval_str_len = 0
41934231
for interval in intervals:
41944232
interval_str_len = max(
41954233
interval_str_len,
41964234
len(
41974235
_create_evaluation_model_annotation(
4198-
snapshot, _format_evaluation_model_interval(snapshot, interval)
4236+
snapshot, _format_evaluation_model_interval(snapshot, interval), execution_stats
41994237
)
42004238
),
42014239
)
@@ -4248,13 +4286,16 @@ def _calculate_audit_str_len(snapshot: Snapshot, audit_padding: int = 0) -> int:
42484286

42494287

42504288
def _calculate_annotation_str_len(
4251-
batched_intervals: t.Dict[Snapshot, t.List[Interval]], audit_padding: int = 0
4289+
batched_intervals: t.Dict[Snapshot, t.List[Interval]],
4290+
audit_padding: int = 0,
4291+
execution_stats_len: int = 0,
42524292
) -> int:
42534293
annotation_str_len = 0
42544294
for snapshot, intervals in batched_intervals.items():
42554295
annotation_str_len = max(
42564296
annotation_str_len,
42574297
_calculate_interval_str_len(snapshot, intervals)
4258-
+ _calculate_audit_str_len(snapshot, audit_padding),
4298+
+ _calculate_audit_str_len(snapshot, audit_padding)
4299+
+ execution_stats_len,
42594300
)
42604301
return annotation_str_len

sqlmesh/core/engine_adapter/athena.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class AthenaEngineAdapter(PandasNativeFetchDFSupportMixin, RowDiffMixin):
4545
# >>> self._execute('/* test */ DESCRIBE foo')
4646
# pyathena.error.OperationalError: FAILED: ParseException line 1:0 cannot recognize input near '/' '*' 'test'
4747
ATTACH_CORRELATION_ID = False
48+
SUPPORTS_QUERY_EXECUTION_TRACKING = True
4849
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["DATABASE", "SCHEMA"]
4950

5051
def __init__(

0 commit comments

Comments
 (0)