|
13 | 13 | from tests.core.engine_adapter.integration import TestContext |
14 | 14 | from sqlmesh import model, ExecutionContext |
15 | 15 | from sqlmesh.core.model import ModelKindName |
| 16 | +from sqlmesh.core.snapshot.execution_tracker import QueryExecutionTracker |
16 | 17 | from datetime import datetime |
17 | 18 |
|
18 | 19 | from tests.core.engine_adapter.integration import ( |
@@ -307,3 +308,21 @@ def fetch_database_names() -> t.Set[str]: |
307 | 308 |
|
308 | 309 | engine_adapter.drop_catalog(sqlmesh_managed_catalog) # works, catalog is SQLMesh-managed |
309 | 310 | assert fetch_database_names() == {non_sqlmesh_managed_catalog} |
| 311 | + |
| 312 | + |
| 313 | +def test_rows_tracker(ctx: TestContext, engine_adapter: SnowflakeEngineAdapter): |
| 314 | + sqlmesh = ctx.create_context() |
| 315 | + tracker = QueryExecutionTracker() |
| 316 | + |
| 317 | + with tracker.track_execution("a"): |
| 318 | + # Snowflake doesn't report row counts for CTAS, so this should not be tracked |
| 319 | + engine_adapter.execute( |
| 320 | + "CREATE TABLE a (id int) AS SELECT 1 as id", track_rows_processed=True |
| 321 | + ) |
| 322 | + engine_adapter.execute("INSERT INTO a VALUES (2), (3)", track_rows_processed=True) |
| 323 | + engine_adapter.execute("INSERT INTO a VALUES (4)", track_rows_processed=True) |
| 324 | + |
| 325 | + stats = tracker.get_execution_stats("a") |
| 326 | + assert stats is not None |
| 327 | + assert stats.query_count == 2 |
| 328 | + assert stats.total_rows_processed == 3 |
0 commit comments