Skip to content

Commit e19e3e4

Browse files
feat(fabric): Add support for catalog operations
1 parent ea088aa commit e19e3e4

File tree

4 files changed

+122
-6
lines changed

4 files changed

+122
-6
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ athena-test: guard-AWS_ACCESS_KEY_ID guard-AWS_SECRET_ACCESS_KEY guard-ATHENA_S3
174174
pytest -n auto -m "athena" --retries 3 --junitxml=test-results/junit-athena.xml
175175

176176
fabric-test: guard-FABRIC_HOST guard-FABRIC_CLIENT_ID guard-FABRIC_CLIENT_SECRET guard-FABRIC_DATABASE engine-fabric-install
177-
pytest -n auto -m "fabric" --retries 3 --junitxml=test-results/junit-fabric.xml
177+
pytest -n auto -m "fabric" --retries 3 --timeout 600 --junitxml=test-results/junit-fabric.xml
178178

179179
vscode_settings:
180180
mkdir -p .vscode

sqlmesh/core/config/connection.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1723,6 +1723,10 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]:
17231723
return {
17241724
"database": self.database,
17251725
"catalog_support": CatalogSupport.FULL_SUPPORT,
1726+
"workspace": self.workspace,
1727+
"tenant": self.tenant,
1728+
"user": self.user,
1729+
"password": self.password,
17261730
}
17271731

17281732

sqlmesh/core/engine_adapter/fabric.py

Lines changed: 114 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import typing as t
44
import logging
5+
import time
56
from sqlglot import exp
67
from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter
78
from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery
@@ -130,6 +131,18 @@ def _make_fabric_api_request(
130131

131132
return response.json() if response.content else {}
132133

134+
except requests.exceptions.HTTPError as e:
135+
error_details = ""
136+
try:
137+
if response.content:
138+
error_response = response.json()
139+
error_details = error_response.get("error", {}).get(
140+
"message", str(error_response)
141+
)
142+
except (ValueError, AttributeError):
143+
error_details = response.text if hasattr(response, "text") else str(e)
144+
145+
raise SQLMeshError(f"Fabric API HTTP error ({response.status_code}): {error_details}")
133146
except requests.exceptions.RequestException as e:
134147
raise SQLMeshError(f"Fabric API request failed: {e}")
135148

@@ -139,18 +152,70 @@ def _create_catalog(self, catalog_name: exp.Identifier) -> None:
139152

140153
logger.info(f"Creating Fabric warehouse: {warehouse_name}")
141154

155+
# First check if warehouse already exists
156+
try:
157+
warehouses = self._make_fabric_api_request("GET", "warehouses")
158+
for warehouse in warehouses.get("value", []):
159+
if warehouse.get("displayName") == warehouse_name:
160+
logger.info(f"Fabric warehouse already exists: {warehouse_name}")
161+
return
162+
except SQLMeshError as e:
163+
logger.warning(f"Failed to check existing warehouses: {e}")
164+
165+
# Create the warehouse
142166
request_data = {
143167
"displayName": warehouse_name,
144168
"description": f"Warehouse created by SQLMesh: {warehouse_name}",
145169
}
146170

147171
try:
148-
self._make_fabric_api_request("POST", "warehouses", request_data)
172+
response = self._make_fabric_api_request("POST", "warehouses", request_data)
149173
logger.info(f"Successfully created Fabric warehouse: {warehouse_name}")
174+
175+
# Wait for warehouse to become ready
176+
max_retries = 30 # Wait up to 5 minutes
177+
retry_delay = 10 # 10 seconds between retries
178+
179+
for attempt in range(max_retries):
180+
try:
181+
# Try to verify warehouse exists and is ready
182+
warehouses = self._make_fabric_api_request("GET", "warehouses")
183+
for warehouse in warehouses.get("value", []):
184+
if warehouse.get("displayName") == warehouse_name:
185+
state = warehouse.get("state", "Unknown")
186+
logger.info(f"Warehouse {warehouse_name} state: {state}")
187+
if state == "Active":
188+
logger.info(f"Warehouse {warehouse_name} is ready")
189+
return
190+
if state == "Failed":
191+
raise SQLMeshError(f"Warehouse {warehouse_name} creation failed")
192+
193+
if attempt < max_retries - 1:
194+
logger.info(
195+
f"Waiting for warehouse {warehouse_name} to become ready (attempt {attempt + 1}/{max_retries})"
196+
)
197+
time.sleep(retry_delay)
198+
else:
199+
logger.warning(
200+
f"Warehouse {warehouse_name} may not be fully ready after {max_retries} attempts"
201+
)
202+
203+
except SQLMeshError as e:
204+
if attempt < max_retries - 1:
205+
logger.warning(
206+
f"Failed to check warehouse readiness (attempt {attempt + 1}/{max_retries}): {e}"
207+
)
208+
time.sleep(retry_delay)
209+
else:
210+
logger.error(f"Failed to verify warehouse readiness: {e}")
211+
raise
212+
150213
except SQLMeshError as e:
151-
if "already exists" in str(e).lower():
214+
error_msg = str(e).lower()
215+
if "already exists" in error_msg or "conflict" in error_msg:
152216
logger.info(f"Fabric warehouse already exists: {warehouse_name}")
153217
return
218+
logger.error(f"Failed to create Fabric warehouse {warehouse_name}: {e}")
154219
raise
155220

156221
def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
@@ -159,8 +224,8 @@ def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
159224

160225
logger.info(f"Deleting Fabric warehouse: {warehouse_name}")
161226

162-
# First, we need to get the warehouse ID by listing warehouses
163227
try:
228+
# First, get the warehouse ID by listing warehouses
164229
warehouses = self._make_fabric_api_request("GET", "warehouses")
165230
warehouse_id = None
166231

@@ -170,14 +235,58 @@ def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
170235
break
171236

172237
if not warehouse_id:
173-
raise SQLMeshError(f"Warehouse not found: {warehouse_name}")
238+
logger.info(f"Fabric warehouse does not exist: {warehouse_name}")
239+
return
174240

175241
# Delete the warehouse by ID
176242
self._make_fabric_api_request("DELETE", f"warehouses/{warehouse_id}")
177243
logger.info(f"Successfully deleted Fabric warehouse: {warehouse_name}")
178244

245+
# Wait for warehouse to be fully deleted
246+
max_retries = 15 # Wait up to 2.5 minutes
247+
retry_delay = 10 # 10 seconds between retries
248+
249+
for attempt in range(max_retries):
250+
try:
251+
warehouses = self._make_fabric_api_request("GET", "warehouses")
252+
still_exists = False
253+
254+
for warehouse in warehouses.get("value", []):
255+
if warehouse.get("displayName") == warehouse_name:
256+
state = warehouse.get("state", "Unknown")
257+
logger.info(f"Warehouse {warehouse_name} deletion state: {state}")
258+
still_exists = True
259+
break
260+
261+
if not still_exists:
262+
logger.info(f"Warehouse {warehouse_name} successfully deleted")
263+
return
264+
265+
if attempt < max_retries - 1:
266+
logger.info(
267+
f"Waiting for warehouse {warehouse_name} deletion to complete (attempt {attempt + 1}/{max_retries})"
268+
)
269+
time.sleep(retry_delay)
270+
else:
271+
logger.warning(
272+
f"Warehouse {warehouse_name} may still be in deletion process after {max_retries} attempts"
273+
)
274+
275+
except SQLMeshError as e:
276+
if attempt < max_retries - 1:
277+
logger.warning(
278+
f"Failed to check warehouse deletion status (attempt {attempt + 1}/{max_retries}): {e}"
279+
)
280+
time.sleep(retry_delay)
281+
else:
282+
logger.warning(f"Failed to verify warehouse deletion: {e}")
283+
# Don't raise here as deletion might have succeeded
284+
return
285+
179286
except SQLMeshError as e:
180-
if "not found" in str(e).lower():
287+
error_msg = str(e).lower()
288+
if "not found" in error_msg or "does not exist" in error_msg:
181289
logger.info(f"Fabric warehouse does not exist: {warehouse_name}")
182290
return
291+
logger.error(f"Failed to delete Fabric warehouse {warehouse_name}: {e}")
183292
raise

tests/core/engine_adapter/integration/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,9 @@ def create_catalog(self, catalog_name: str):
680680
except Exception:
681681
pass
682682
self.engine_adapter.cursor.connection.autocommit(False)
683+
elif self.dialect == "fabric":
684+
# Use the engine adapter's built-in catalog creation functionality
685+
self.engine_adapter.create_catalog(catalog_name)
683686
elif self.dialect == "snowflake":
684687
self.engine_adapter.execute(f'CREATE DATABASE IF NOT EXISTS "{catalog_name}"')
685688
elif self.dialect == "duckdb":

0 commit comments

Comments
 (0)