Skip to content

Commit 9595304

Browse files
committed
simplify
1 parent 96e8e89 commit 9595304

File tree

1 file changed

+7
-49
lines changed

1 file changed

+7
-49
lines changed

sqlmesh/magics.py

Lines changed: 7 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,57 +1163,15 @@ def _fetchdf_athena_pandas_cursor(self, context: Context, sql: str) -> "pd.DataF
11631163
except ImportError as e:
11641164
raise MagicError(f"PyAthena with pandas support is required: {e}")
11651165

1166-
# Use SQLMesh's transpilation to convert SQL to Athena dialect
1167-
# This handles features like QUALIFY that need transpilation
11681166
try:
1169-
# Parse the SQL string into a SQLGlot expression first
1170-
from sqlmesh.core.dialect import parse
1171-
parsed_expressions = parse(sql, default_dialect=context.config.dialect)
1172-
1173-
# Get the first expression (should be a SELECT statement)
1174-
if parsed_expressions:
1175-
transpiled_sql = context.engine_adapter._to_sql(parsed_expressions[0], quote=False)
1176-
else:
1177-
raise ValueError("No valid SQL expressions found")
1178-
1179-
except Exception as e:
1180-
context.console.log_error(f"SQL transpilation failed: {e}")
1181-
# Fall back to the regular fetchdf method if transpilation fails
1182-
return context.fetchdf(sql)
1167+
conn_config = context.config.get_connection(context.config.default_connection)
1168+
connection_kwargs = {
1169+
k: v for k, v in conn_config.dict().items()
1170+
if k in conn_config._connection_kwargs_keys and v is not None
1171+
}
1172+
cursor = connect(cursor_class=PandasCursor, **connection_kwargs).cursor()
1173+
return cursor.execute(sql).as_pandas()
11831174

1184-
# Get the connection configuration for Athena
1185-
conn_config = context.config.get_connection(context.config.default_connection)
1186-
1187-
# Build connection kwargs using the same logic as SQLMesh
1188-
connection_kwargs = {
1189-
k: v for k, v in conn_config.dict().items()
1190-
if k in conn_config._connection_kwargs_keys and v is not None
1191-
}
1192-
1193-
# Create connection with PandasCursor specifically
1194-
try:
1195-
with connect(
1196-
cursor_class=PandasCursor,
1197-
**connection_kwargs
1198-
) as conn:
1199-
with conn.cursor() as cursor:
1200-
cursor.execute(transpiled_sql)
1201-
1202-
# PyAthena PandasCursor needs to be converted to DataFrame manually
1203-
# It returns data but we need to use pandas.DataFrame constructor
1204-
data = cursor.fetchall()
1205-
1206-
if data:
1207-
# Get column names from cursor description
1208-
columns = [desc[0] for desc in cursor.description] if cursor.description else None
1209-
df = pd.DataFrame(data, columns=columns)
1210-
else:
1211-
# Empty result set
1212-
columns = [desc[0] for desc in cursor.description] if cursor.description else []
1213-
df = pd.DataFrame(columns=columns)
1214-
1215-
return df
1216-
12171175
except Exception as e:
12181176
# Fall back to the regular fetchdf method if PandasCursor fails
12191177
context.console.log_error(f"PandasCursor failed, falling back to standard method: {e}")

0 commit comments

Comments
 (0)