@@ -31,6 +31,30 @@ class FabricAdapter(LogicalMergeMixin, MSSQLEngineAdapter):
3131 SUPPORTS_CREATE_DROP_CATALOG = True
3232 INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy .DELETE_INSERT
3333
34+ def __init__ (self , * args : t .Any , ** kwargs : t .Any ) -> None :
35+ super ().__init__ (* args , ** kwargs )
36+ # Store the desired catalog for dynamic switching
37+ self ._target_catalog : t .Optional [str ] = None
38+ # Store the original connection factory for wrapping
39+ self ._original_connection_factory = self ._connection_pool ._connection_factory # type: ignore
40+ # Replace the connection factory with our custom one
41+ self ._connection_pool ._connection_factory = self ._create_fabric_connection # type: ignore
42+
43+ def _create_fabric_connection (self ) -> t .Any :
44+ """Custom connection factory that uses the target catalog if set."""
45+ # If we have a target catalog, we need to modify the connection parameters
46+ if self ._target_catalog :
47+ # The original factory was created with partial(), so we need to extract and modify the kwargs
48+ if hasattr (self ._original_connection_factory , "keywords" ):
49+ # It's a partial function, get the original keywords
50+ original_kwargs = self ._original_connection_factory .keywords .copy ()
51+ original_kwargs ["database" ] = self ._target_catalog
52+ # Call the underlying function with modified kwargs
53+ return self ._original_connection_factory .func (** original_kwargs )
54+
55+ # Use the original factory if no target catalog is set
56+ return self ._original_connection_factory ()
57+
3458 def _insert_overwrite_by_condition (
3559 self ,
3660 table_name : TableName ,
@@ -298,3 +322,51 @@ def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
298322 return
299323 logger .error (f"Failed to delete Fabric warehouse { warehouse_name } : { e } " )
300324 raise
325+
326+ def set_current_catalog (self , catalog_name : str ) -> None :
327+ """
328+ Set the current catalog for Microsoft Fabric connections.
329+
330+ Override to handle Fabric's stateless session limitation where USE statements
331+ don't persist across queries. Instead, we close existing connections and
332+ recreate them with the new catalog in the connection configuration.
333+
334+ Args:
335+ catalog_name: The name of the catalog (warehouse) to switch to
336+
337+ Note:
338+ Fabric doesn't support catalog switching via USE statements because each
339+ statement runs as an independent session. This method works around this
340+ limitation by updating the connection pool with new catalog configuration.
341+
342+ See:
343+ https://learn.microsoft.com/en-us/fabric/data-warehouse/sql-query-editor#limitations
344+ """
345+ current_catalog = self .get_current_catalog ()
346+
347+ # If already using the requested catalog, do nothing
348+ if current_catalog and current_catalog == catalog_name :
349+ logger .debug (f"Already using catalog '{ catalog_name } ', no action needed" )
350+ return
351+
352+ logger .info (f"Switching from catalog '{ current_catalog } ' to '{ catalog_name } '" )
353+
354+ # Set the target catalog for our custom connection factory
355+ self ._target_catalog = catalog_name
356+
357+ # Close all existing connections since Fabric requires reconnection for catalog changes
358+ self .close ()
359+
360+ # Verify the catalog switch worked by getting a new connection
361+ try :
362+ actual_catalog = self .get_current_catalog ()
363+ if actual_catalog and actual_catalog == catalog_name :
364+ logger .debug (f"Successfully switched to catalog '{ catalog_name } '" )
365+ else :
366+ logger .warning (
367+ f"Catalog switch may have failed. Expected '{ catalog_name } ', got '{ actual_catalog } '"
368+ )
369+ except Exception as e :
370+ logger .debug (f"Could not verify catalog switch: { e } " )
371+
372+ logger .debug (f"Updated target catalog to '{ catalog_name } ' and closed connections" )
0 commit comments