Skip to content

Commit a869df3

Browse files
feat(fabric): Refactor _create_catalog
1 parent e19e3e4 commit a869df3

File tree

2 files changed

+107
-60
lines changed

2 files changed

+107
-60
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ athena-test: guard-AWS_ACCESS_KEY_ID guard-AWS_SECRET_ACCESS_KEY guard-ATHENA_S3
174174
pytest -n auto -m "athena" --retries 3 --junitxml=test-results/junit-athena.xml
175175

176176
fabric-test: guard-FABRIC_HOST guard-FABRIC_CLIENT_ID guard-FABRIC_CLIENT_SECRET guard-FABRIC_DATABASE engine-fabric-install
177-
pytest -n auto -m "fabric" --retries 3 --timeout 600 --junitxml=test-results/junit-fabric.xml
177+
pytest -n auto -m "fabric" --retries 3 --junitxml=test-results/junit-fabric.xml
178178

179179
vscode_settings:
180180
mkdir -p .vscode

sqlmesh/core/engine_adapter/fabric.py

Lines changed: 106 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -146,77 +146,124 @@ def _make_fabric_api_request(
146146
except requests.exceptions.RequestException as e:
147147
raise SQLMeshError(f"Fabric API request failed: {e}")
148148

149-
def _create_catalog(self, catalog_name: exp.Identifier) -> None:
150-
"""Create a catalog (warehouse) in Microsoft Fabric via REST API."""
151-
warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False)
149+
def _make_fabric_api_request_with_location(
150+
self, method: str, endpoint: str, data: t.Optional[t.Dict[str, t.Any]] = None
151+
) -> t.Dict[str, t.Any]:
152+
"""Make a request to the Fabric REST API and return response with status code and location."""
153+
if not requests:
154+
raise SQLMeshError("requests library is required for Fabric catalog operations")
152155

153-
logger.info(f"Creating Fabric warehouse: {warehouse_name}")
156+
workspace = self._extra_config.get("workspace")
157+
if not workspace:
158+
raise SQLMeshError(
159+
"workspace parameter is required in connection config for Fabric catalog operations"
160+
)
161+
162+
base_url = "https://api.fabric.microsoft.com/v1"
163+
url = f"{base_url}/workspaces/{workspace}/{endpoint}"
164+
headers = self._get_fabric_auth_headers()
154165

155-
# First check if warehouse already exists
156166
try:
157-
warehouses = self._make_fabric_api_request("GET", "warehouses")
158-
for warehouse in warehouses.get("value", []):
159-
if warehouse.get("displayName") == warehouse_name:
160-
logger.info(f"Fabric warehouse already exists: {warehouse_name}")
167+
if method.upper() == "POST":
168+
response = requests.post(url, headers=headers, json=data)
169+
else:
170+
raise SQLMeshError(f"Unsupported HTTP method for location tracking: {method}")
171+
172+
# Check for errors first
173+
response.raise_for_status()
174+
175+
result = {"status_code": response.status_code}
176+
177+
# Extract location header for polling
178+
if "location" in response.headers:
179+
result["location"] = response.headers["location"]
180+
181+
# Include response body if present
182+
if response.content:
183+
result.update(response.json())
184+
185+
return result
186+
187+
except requests.exceptions.HTTPError as e:
188+
error_details = ""
189+
try:
190+
if response.content:
191+
error_response = response.json()
192+
error_details = error_response.get("error", {}).get(
193+
"message", str(error_response)
194+
)
195+
except (ValueError, AttributeError):
196+
error_details = response.text if hasattr(response, "text") else str(e)
197+
198+
raise SQLMeshError(f"Fabric API HTTP error ({response.status_code}): {error_details}")
199+
except requests.exceptions.RequestException as e:
200+
raise SQLMeshError(f"Fabric API request failed: {e}")
201+
202+
def _poll_operation_status(self, location_url: str, operation_name: str) -> None:
203+
"""Poll the operation status until completion."""
204+
if not requests:
205+
raise SQLMeshError("requests library is required for Fabric catalog operations")
206+
207+
headers = self._get_fabric_auth_headers()
208+
max_attempts = 60 # Poll for up to 10 minutes
209+
initial_delay = 1 # Start with 1 second
210+
211+
for attempt in range(max_attempts):
212+
try:
213+
response = requests.get(location_url, headers=headers)
214+
response.raise_for_status()
215+
216+
result = response.json()
217+
status = result.get("status", "Unknown")
218+
219+
logger.info(f"Operation {operation_name} status: {status}")
220+
221+
if status == "Succeeded":
161222
return
162-
except SQLMeshError as e:
163-
logger.warning(f"Failed to check existing warehouses: {e}")
223+
if status == "Failed":
224+
error_msg = result.get("error", {}).get("message", "Unknown error")
225+
raise SQLMeshError(f"Operation {operation_name} failed: {error_msg}")
226+
elif status in ["InProgress", "Running"]:
227+
# Use exponential backoff with max of 30 seconds
228+
delay = min(initial_delay * (2 ** min(attempt // 3, 4)), 30)
229+
logger.info(f"Waiting {delay} seconds before next status check...")
230+
time.sleep(delay)
231+
else:
232+
logger.warning(f"Unknown status '{status}' for operation {operation_name}")
233+
time.sleep(5)
234+
235+
except requests.exceptions.RequestException as e:
236+
if attempt < max_attempts - 1:
237+
logger.warning(f"Failed to poll status (attempt {attempt + 1}): {e}")
238+
time.sleep(5)
239+
else:
240+
raise SQLMeshError(f"Failed to poll operation status: {e}")
241+
242+
raise SQLMeshError(f"Operation {operation_name} did not complete within timeout")
243+
244+
def _create_catalog(self, catalog_name: exp.Identifier) -> None:
245+
"""Create a catalog (warehouse) in Microsoft Fabric via REST API."""
246+
warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False)
247+
logger.info(f"Creating Fabric warehouse: {warehouse_name}")
164248

165-
# Create the warehouse
166249
request_data = {
167250
"displayName": warehouse_name,
168251
"description": f"Warehouse created by SQLMesh: {warehouse_name}",
169252
}
170253

171-
try:
172-
response = self._make_fabric_api_request("POST", "warehouses", request_data)
173-
logger.info(f"Successfully created Fabric warehouse: {warehouse_name}")
174-
175-
# Wait for warehouse to become ready
176-
max_retries = 30 # Wait up to 5 minutes
177-
retry_delay = 10 # 10 seconds between retries
254+
response = self._make_fabric_api_request_with_location("POST", "warehouses", request_data)
178255

179-
for attempt in range(max_retries):
180-
try:
181-
# Try to verify warehouse exists and is ready
182-
warehouses = self._make_fabric_api_request("GET", "warehouses")
183-
for warehouse in warehouses.get("value", []):
184-
if warehouse.get("displayName") == warehouse_name:
185-
state = warehouse.get("state", "Unknown")
186-
logger.info(f"Warehouse {warehouse_name} state: {state}")
187-
if state == "Active":
188-
logger.info(f"Warehouse {warehouse_name} is ready")
189-
return
190-
if state == "Failed":
191-
raise SQLMeshError(f"Warehouse {warehouse_name} creation failed")
192-
193-
if attempt < max_retries - 1:
194-
logger.info(
195-
f"Waiting for warehouse {warehouse_name} to become ready (attempt {attempt + 1}/{max_retries})"
196-
)
197-
time.sleep(retry_delay)
198-
else:
199-
logger.warning(
200-
f"Warehouse {warehouse_name} may not be fully ready after {max_retries} attempts"
201-
)
202-
203-
except SQLMeshError as e:
204-
if attempt < max_retries - 1:
205-
logger.warning(
206-
f"Failed to check warehouse readiness (attempt {attempt + 1}/{max_retries}): {e}"
207-
)
208-
time.sleep(retry_delay)
209-
else:
210-
logger.error(f"Failed to verify warehouse readiness: {e}")
211-
raise
256+
# Handle direct success (201) or async creation (202)
257+
if response.get("status_code") == 201:
258+
logger.info(f"Successfully created Fabric warehouse: {warehouse_name}")
259+
return
212260

213-
except SQLMeshError as e:
214-
error_msg = str(e).lower()
215-
if "already exists" in error_msg or "conflict" in error_msg:
216-
logger.info(f"Fabric warehouse already exists: {warehouse_name}")
217-
return
218-
logger.error(f"Failed to create Fabric warehouse {warehouse_name}: {e}")
219-
raise
261+
if response.get("status_code") == 202 and response.get("location"):
262+
logger.info(f"Warehouse creation initiated for: {warehouse_name}")
263+
self._poll_operation_status(response["location"], warehouse_name)
264+
logger.info(f"Successfully created Fabric warehouse: {warehouse_name}")
265+
else:
266+
raise SQLMeshError(f"Unexpected response from warehouse creation: {response}")
220267

221268
def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
222269
"""Drop a catalog (warehouse) in Microsoft Fabric via REST API."""

0 commit comments

Comments
 (0)