44import typing as t
55from functools import partial
66
7- from sqlglot import exp
7+ from sqlglot import exp , parse_one
88from sqlmesh .core .dialect import to_schema
99from sqlmesh .core .engine_adapter .shared import (
1010 CatalogSupport ,
1616from sqlmesh .core .engine_adapter .spark import SparkEngineAdapter
1717from sqlmesh .core .node import IntervalUnit
1818from sqlmesh .core .schema_diff import SchemaDiffer
19+ from sqlmesh .core .snapshot .execution_tracker import QueryExecutionTracker
1920from sqlmesh .engines .spark .db_api .spark_session import connection , SparkSessionConnection
2021from sqlmesh .utils .errors import SQLMeshError , MissingDefaultCatalogError
2122
@@ -34,6 +35,7 @@ class DatabricksEngineAdapter(SparkEngineAdapter):
3435 SUPPORTS_CLONING = True
3536 SUPPORTS_MATERIALIZED_VIEWS = True
3637 SUPPORTS_MATERIALIZED_VIEW_SCHEMA = True
38+ SUPPORTS_QUERY_EXECUTION_TRACKING = True
3739 SCHEMA_DIFFER = SchemaDiffer (
3840 support_positional_add = True ,
3941 support_nested_operations = True ,
@@ -364,3 +366,52 @@ def _build_table_properties_exp(
364366 expressions .append (clustered_by_exp )
365367 properties = exp .Properties (expressions = expressions )
366368 return properties
369+
370+ def _record_execution_stats (
371+ self , sql : str , rowcount : t .Optional [int ] = None , bytes_processed : t .Optional [int ] = None
372+ ) -> None :
373+ parsed = parse_one (sql , dialect = self .dialect )
374+ table = parsed .find (exp .Table )
375+ table_name = table .sql (dialect = self .dialect ) if table else None
376+
377+ if table_name :
378+ try :
379+ self .cursor .execute (f"DESCRIBE HISTORY { table_name } " )
380+ except :
381+ return
382+
383+ history = self .cursor .fetchall_arrow ()
384+ if history .num_rows :
385+ history_df = history .to_pandas ()
386+ write_df = history_df [history_df ["operation" ] == "WRITE" ]
387+ write_df = write_df [write_df ["timestamp" ] == write_df ["timestamp" ].max ()]
388+ if not write_df .empty :
389+ metrics = write_df ["operationMetrics" ][0 ]
390+ if metrics :
391+ rowcount = None
392+ rowcount_str = [
393+ metric [1 ] for metric in metrics if metric [0 ] == "numOutputRows"
394+ ]
395+ if rowcount_str :
396+ try :
397+ rowcount = int (rowcount_str [0 ])
398+ except (TypeError , ValueError ):
399+ pass
400+
401+ bytes_processed = None
402+ bytes_str = [
403+ metric [1 ] for metric in metrics if metric [0 ] == "numOutputBytes"
404+ ]
405+ if bytes_str :
406+ try :
407+ bytes_processed = int (bytes_str [0 ])
408+ except (TypeError , ValueError ):
409+ pass
410+
411+ if rowcount is not None or bytes_processed is not None :
412+ # if no rows were written, df contains 0 for bytes but no value for rows
413+ rowcount = (
414+ 0 if rowcount is None and bytes_processed is not None else rowcount
415+ )
416+
417+ QueryExecutionTracker .record_execution (sql , rowcount , bytes_processed )
0 commit comments