Skip to content

Commit 7cb8447

Browse files
fix(fabric): Ensure schemas exist before creating tables
1 parent cedfab4 commit 7cb8447

File tree

1 file changed

+125
-1
lines changed

1 file changed

+125
-1
lines changed

sqlmesh/core/engine_adapter/fabric.py

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,122 @@ def create_schema(
434434
logger.debug(f"No catalog detected, using original: {schema_name}")
435435
super().create_schema(schema_name, ignore_if_exists, **kwargs)
436436

437+
def _ensure_schema_exists(self, table_name: TableName) -> None:
438+
"""
439+
Ensure that the schema for a table exists before creating the table.
440+
This is necessary for Fabric because schemas must exist before tables can be created in them.
441+
"""
442+
table = exp.to_table(table_name)
443+
if table.db:
444+
schema_name = table.db
445+
catalog_name = table.catalog
446+
447+
# Build the full schema name
448+
if catalog_name:
449+
full_schema_name = f"{catalog_name}.{schema_name}"
450+
else:
451+
full_schema_name = schema_name
452+
453+
logger.debug(f"Ensuring schema exists: {full_schema_name}")
454+
455+
try:
456+
# Create the schema if it doesn't exist
457+
self.create_schema(full_schema_name, ignore_if_exists=True)
458+
except Exception as e:
459+
logger.debug(f"Error creating schema {full_schema_name}: {e}")
460+
# Continue anyway - the schema might already exist or we might not have permissions
461+
462+
def _create_table(
463+
self,
464+
table_name_or_schema: t.Union[exp.Schema, TableName],
465+
expression: t.Optional[exp.Expression],
466+
exists: bool = True,
467+
replace: bool = False,
468+
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
469+
table_description: t.Optional[str] = None,
470+
column_descriptions: t.Optional[t.Dict[str, str]] = None,
471+
table_kind: t.Optional[str] = None,
472+
**kwargs: t.Any,
473+
) -> None:
474+
"""
475+
Override _create_table to ensure schema exists before creating tables.
476+
"""
477+
# Extract table name for schema creation
478+
if isinstance(table_name_or_schema, exp.Schema):
479+
table_name = table_name_or_schema.this
480+
else:
481+
table_name = table_name_or_schema
482+
483+
# Ensure the schema exists before creating the table
484+
self._ensure_schema_exists(table_name)
485+
486+
# Call the parent implementation
487+
super()._create_table(
488+
table_name_or_schema=table_name_or_schema,
489+
expression=expression,
490+
exists=exists,
491+
replace=replace,
492+
columns_to_types=columns_to_types,
493+
table_description=table_description,
494+
column_descriptions=column_descriptions,
495+
table_kind=table_kind,
496+
**kwargs,
497+
)
498+
499+
def create_table(
500+
self,
501+
table_name: TableName,
502+
columns_to_types: t.Dict[str, exp.DataType],
503+
primary_key: t.Optional[t.Tuple[str, ...]] = None,
504+
exists: bool = True,
505+
table_description: t.Optional[str] = None,
506+
column_descriptions: t.Optional[t.Dict[str, str]] = None,
507+
**kwargs: t.Any,
508+
) -> None:
509+
"""
510+
Override create_table to ensure schema exists before creating tables.
511+
"""
512+
# Ensure the schema exists before creating the table
513+
self._ensure_schema_exists(table_name)
514+
515+
# Call the parent implementation
516+
super().create_table(
517+
table_name=table_name,
518+
columns_to_types=columns_to_types,
519+
primary_key=primary_key,
520+
exists=exists,
521+
table_description=table_description,
522+
column_descriptions=column_descriptions,
523+
**kwargs,
524+
)
525+
526+
def ctas(
527+
self,
528+
table_name: TableName,
529+
query_or_df: t.Any,
530+
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
531+
exists: bool = True,
532+
table_description: t.Optional[str] = None,
533+
column_descriptions: t.Optional[t.Dict[str, str]] = None,
534+
**kwargs: t.Any,
535+
) -> None:
536+
"""
537+
Override ctas to ensure schema exists before creating tables.
538+
"""
539+
# Ensure the schema exists before creating the table
540+
self._ensure_schema_exists(table_name)
541+
542+
# Call the parent implementation
543+
super().ctas(
544+
table_name=table_name,
545+
query_or_df=query_or_df,
546+
columns_to_types=columns_to_types,
547+
exists=exists,
548+
table_description=table_description,
549+
column_descriptions=column_descriptions,
550+
**kwargs,
551+
)
552+
437553
def create_view(
438554
self,
439555
view_name: t.Union[str, exp.Table],
@@ -448,11 +564,19 @@ def create_view(
448564
**create_kwargs: t.Any,
449565
) -> None:
450566
"""
451-
Override create_view to handle catalog-qualified view names.
567+
Override create_view to handle catalog-qualified view names and ensure schema exists.
452568
Fabric doesn't support 'CREATE VIEW [catalog].[schema].[view]' syntax.
453569
"""
454570
logger.debug(f"create_view called with: {view_name} (type: {type(view_name)})")
455571

572+
# Ensure schema exists for the view
573+
if isinstance(view_name, exp.Table):
574+
self._ensure_schema_exists(view_name)
575+
elif isinstance(view_name, str):
576+
# Parse string to table for schema extraction
577+
parsed_table = exp.to_table(view_name)
578+
self._ensure_schema_exists(parsed_table)
579+
456580
# Handle exp.Table objects that might be catalog-qualified
457581
if isinstance(view_name, exp.Table):
458582
if view_name.catalog:

0 commit comments

Comments
 (0)