44import typing as t
55from functools import partial
66
7- from sqlglot import exp , parse_one
7+ from sqlglot import exp
88from sqlmesh .core .dialect import to_schema
99from sqlmesh .core .engine_adapter .shared import (
1010 CatalogSupport ,
1414 SourceQuery ,
1515)
1616from sqlmesh .core .engine_adapter .spark import SparkEngineAdapter
17- from sqlmesh .engines .spark .db_api .spark_session import SparkSessionCursor
1817from sqlmesh .core .node import IntervalUnit
1918from sqlmesh .core .schema_diff import NestedSupport
20- from sqlmesh .core .snapshot .execution_tracker import QueryExecutionTracker
2119from sqlmesh .engines .spark .db_api .spark_session import connection , SparkSessionConnection
2220from sqlmesh .utils .errors import SQLMeshError , MissingDefaultCatalogError
2321
@@ -36,7 +34,6 @@ class DatabricksEngineAdapter(SparkEngineAdapter):
3634 SUPPORTS_CLONING = True
3735 SUPPORTS_MATERIALIZED_VIEWS = True
3836 SUPPORTS_MATERIALIZED_VIEW_SCHEMA = True
39- SUPPORTS_QUERY_EXECUTION_TRACKING = True
4037 SCHEMA_DIFFER_KWARGS = {
4138 "support_positional_add" : True ,
4239 "nested_support" : NestedSupport .ALL ,
@@ -366,73 +363,3 @@ def _build_table_properties_exp(
366363 expressions .append (clustered_by_exp )
367364 properties = exp .Properties (expressions = expressions )
368365 return properties
369-
370- def _record_execution_stats (
371- self , sql : str , rowcount : t .Optional [int ] = None , bytes_processed : t .Optional [int ] = None
372- ) -> None :
373- parsed = parse_one (sql , dialect = self .dialect )
374- table = parsed .find (exp .Table )
375- table_name = table .sql (dialect = self .dialect ) if table else None
376-
377- if table_name :
378- try :
379- self .cursor .execute (f"DESCRIBE HISTORY { table_name } " )
380- except :
381- return
382-
383- history = (
384- self .cursor .fetchdf ()
385- if isinstance (self .cursor , SparkSessionCursor )
386- else self .cursor .fetchall_arrow ()
387- )
388- if history is not None :
389- from pandas import DataFrame as PandasDataFrame
390- from pyspark .sql import DataFrame as PySparkDataFrame
391- from pyspark .sql .connect .dataframe import DataFrame as PySparkConnectDataFrame
392-
393- history_df = None
394- if isinstance (history , PandasDataFrame ):
395- history_df = history
396- elif isinstance (history , (PySparkDataFrame , PySparkConnectDataFrame )):
397- history_df = history .toPandas ()
398- else :
399- # arrow table
400- history_df = history .to_pandas ()
401-
402- if history_df is not None and not history_df .empty :
403- write_df = history_df [history_df ["operation" ] == "WRITE" ]
404- write_df = write_df [write_df ["timestamp" ] == write_df ["timestamp" ].max ()]
405- if not write_df .empty and "operationMetrics" in write_df .columns :
406- metrics = write_df ["operationMetrics" ].iloc [0 ]
407- if metrics :
408- rowcount = None
409- rowcount_str = [
410- metric [1 ] for metric in metrics if metric [0 ] == "numOutputRows"
411- ]
412- if rowcount_str :
413- try :
414- rowcount = int (rowcount_str [0 ])
415- except (TypeError , ValueError ):
416- pass
417-
418- bytes_processed = None
419- bytes_str = [
420- metric [1 ] for metric in metrics if metric [0 ] == "numOutputBytes"
421- ]
422- if bytes_str :
423- try :
424- bytes_processed = int (bytes_str [0 ])
425- except (TypeError , ValueError ):
426- pass
427-
428- if rowcount is not None or bytes_processed is not None :
429- # if no rows were written, df contains 0 for bytes but no value for rows
430- rowcount = (
431- 0
432- if rowcount is None and bytes_processed is not None
433- else rowcount
434- )
435-
436- QueryExecutionTracker .record_execution (
437- sql , rowcount , bytes_processed
438- )
0 commit comments