Skip to content

Commit 1655263

Browse files
feat(fabric): add catalog management for Fabric
1 parent 7756a8f commit 1655263

File tree

2 files changed

+140
-1
lines changed

2 files changed

+140
-1
lines changed

sqlmesh/core/config/connection.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1709,6 +1709,11 @@ class FabricConnectionConfig(MSSQLConnectionConfig):
17091709
DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17 # type: ignore
17101710
driver: t.Literal["pyodbc"] = "pyodbc"
17111711
autocommit: t.Optional[bool] = True
1712+
workspace_id: t.Optional[str] = None
1713+
# Service Principal authentication for Fabric REST API
1714+
tenant_id: t.Optional[str] = None
1715+
client_id: t.Optional[str] = None
1716+
client_secret: t.Optional[str] = None
17121717

17131718
@property
17141719
def _engine_adapter(self) -> t.Type[EngineAdapter]:
@@ -1720,7 +1725,7 @@ def _engine_adapter(self) -> t.Type[EngineAdapter]:
17201725
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
17211726
return {
17221727
"database": self.database,
1723-
"catalog_support": CatalogSupport.SINGLE_CATALOG_ONLY,
1728+
"catalog_support": CatalogSupport.REQUIRES_SET_CATALOG,
17241729
}
17251730

17261731

sqlmesh/core/engine_adapter/fabric.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
from __future__ import annotations
22

33
import typing as t
4+
import logging
45
from sqlglot import exp
56
from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter
67
from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery
78
from sqlmesh.core.engine_adapter.base import EngineAdapter
9+
from sqlmesh.utils import optional_import
10+
from sqlmesh.utils.errors import SQLMeshError
811

912
if t.TYPE_CHECKING:
1013
from sqlmesh.core._typing import TableName
1114

1215

1316
from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin
1417

18+
logger = logging.getLogger(__name__)
19+
requests = optional_import("requests")
20+
1521

1622
class FabricAdapter(LogicalMergeMixin, MSSQLEngineAdapter):
1723
"""
@@ -21,6 +27,7 @@ class FabricAdapter(LogicalMergeMixin, MSSQLEngineAdapter):
2127
DIALECT = "fabric"
2228
SUPPORTS_INDEXES = False
2329
SUPPORTS_TRANSACTIONS = False
30+
SUPPORTS_CREATE_DROP_CATALOG = True
2431
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT
2532

2633
def _insert_overwrite_by_condition(
@@ -47,3 +54,130 @@ def _insert_overwrite_by_condition(
4754
insert_overwrite_strategy_override=InsertOverwriteStrategy.DELETE_INSERT,
4855
**kwargs,
4956
)
57+
58+
def _get_access_token(self) -> str:
59+
"""Get access token using Service Principal authentication."""
60+
tenant_id = self._extra_config.get("tenant_id")
61+
client_id = self._extra_config.get("client_id")
62+
client_secret = self._extra_config.get("client_secret")
63+
64+
if not all([tenant_id, client_id, client_secret]):
65+
raise SQLMeshError(
66+
"Service Principal authentication requires tenant_id, client_id, and client_secret "
67+
"in the Fabric connection configuration"
68+
)
69+
70+
if not requests:
71+
raise SQLMeshError("requests library is required for Fabric authentication")
72+
73+
# Use Azure AD OAuth2 token endpoint
74+
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
75+
76+
data = {
77+
"grant_type": "client_credentials",
78+
"client_id": client_id,
79+
"client_secret": client_secret,
80+
"scope": "https://api.fabric.microsoft.com/.default",
81+
}
82+
83+
try:
84+
response = requests.post(token_url, data=data)
85+
response.raise_for_status()
86+
token_data = response.json()
87+
return token_data["access_token"]
88+
except requests.exceptions.RequestException as e:
89+
raise SQLMeshError(f"Failed to authenticate with Azure AD: {e}")
90+
except KeyError:
91+
raise SQLMeshError("Invalid response from Azure AD token endpoint")
92+
93+
def _get_fabric_auth_headers(self) -> t.Dict[str, str]:
94+
"""Get authentication headers for Fabric REST API calls."""
95+
access_token = self._get_access_token()
96+
return {"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}
97+
98+
def _make_fabric_api_request(
99+
self, method: str, endpoint: str, data: t.Optional[t.Dict[str, t.Any]] = None
100+
) -> t.Dict[str, t.Any]:
101+
"""Make a request to the Fabric REST API."""
102+
if not requests:
103+
raise SQLMeshError("requests library is required for Fabric catalog operations")
104+
105+
workspace_id = self._extra_config.get("workspace_id")
106+
if not workspace_id:
107+
raise SQLMeshError(
108+
"workspace_id parameter is required in connection config for Fabric catalog operations"
109+
)
110+
111+
base_url = "https://api.fabric.microsoft.com/v1"
112+
url = f"{base_url}/workspaces/{workspace_id}/{endpoint}"
113+
114+
headers = self._get_fabric_auth_headers()
115+
116+
try:
117+
if method.upper() == "GET":
118+
response = requests.get(url, headers=headers)
119+
elif method.upper() == "POST":
120+
response = requests.post(url, headers=headers, json=data)
121+
elif method.upper() == "DELETE":
122+
response = requests.delete(url, headers=headers)
123+
else:
124+
raise SQLMeshError(f"Unsupported HTTP method: {method}")
125+
126+
response.raise_for_status()
127+
128+
if response.status_code == 204: # No content
129+
return {}
130+
131+
return response.json() if response.content else {}
132+
133+
except requests.exceptions.RequestException as e:
134+
raise SQLMeshError(f"Fabric API request failed: {e}")
135+
136+
def _create_catalog(self, catalog_name: exp.Identifier) -> None:
137+
"""Create a catalog (warehouse) in Microsoft Fabric via REST API."""
138+
warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False)
139+
140+
logger.info(f"Creating Fabric warehouse: {warehouse_name}")
141+
142+
request_data = {
143+
"displayName": warehouse_name,
144+
"description": f"Warehouse created by SQLMesh: {warehouse_name}",
145+
}
146+
147+
try:
148+
self._make_fabric_api_request("POST", "warehouses", request_data)
149+
logger.info(f"Successfully created Fabric warehouse: {warehouse_name}")
150+
except SQLMeshError as e:
151+
if "already exists" in str(e).lower():
152+
logger.info(f"Fabric warehouse already exists: {warehouse_name}")
153+
return
154+
raise
155+
156+
def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
157+
"""Drop a catalog (warehouse) in Microsoft Fabric via REST API."""
158+
warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False)
159+
160+
logger.info(f"Deleting Fabric warehouse: {warehouse_name}")
161+
162+
# First, we need to get the warehouse ID by listing warehouses
163+
try:
164+
warehouses = self._make_fabric_api_request("GET", "warehouses")
165+
warehouse_id = None
166+
167+
for warehouse in warehouses.get("value", []):
168+
if warehouse.get("displayName") == warehouse_name:
169+
warehouse_id = warehouse.get("id")
170+
break
171+
172+
if not warehouse_id:
173+
raise SQLMeshError(f"Warehouse not found: {warehouse_name}")
174+
175+
# Delete the warehouse by ID
176+
self._make_fabric_api_request("DELETE", f"warehouses/{warehouse_id}")
177+
logger.info(f"Successfully deleted Fabric warehouse: {warehouse_name}")
178+
179+
except SQLMeshError as e:
180+
if "not found" in str(e).lower():
181+
logger.info(f"Fabric warehouse does not exist: {warehouse_name}")
182+
return
183+
raise

0 commit comments

Comments
 (0)