Skip to content

Commit e139601

Browse files
committed
support concurrent creation of non-overlapping schemas
1 parent 58f5f9d commit e139601

File tree

1 file changed

+27
-10
lines changed

1 file changed

+27
-10
lines changed

sqlmesh/core/snapshot/evaluator.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,11 @@ def promote(
278278
for gateway, tables in tables_by_gateway.items():
279279
if environment_naming_info.suffix_target.is_catalog:
280280
self._create_catalogs(tables=tables, gateway=gateway)
281-
self._create_schemas(tables=tables, gateway=gateway)
281+
282+
gateway_table_pairs = [
283+
(gateway, table) for gateway, tables in tables_by_gateway.items() for table in tables
284+
]
285+
self._create_schemas(gateway_table_pairs=gateway_table_pairs)
282286

283287
deployability_index = deployability_index or DeployabilityIndex.all_deployable()
284288
with self.concurrent_context():
@@ -381,8 +385,10 @@ def create_physical_schemas(
381385
snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot))
382386
)
383387

384-
for gateway, tables in tables_by_gateway.items():
385-
self._create_schemas(tables=tables, gateway=gateway)
388+
gateway_table_pairs = [
389+
(gateway, table) for gateway, tables in tables_by_gateway.items() for table in tables
390+
]
391+
self._create_schemas(gateway_table_pairs=gateway_table_pairs)
386392

387393
def get_snapshots_to_create(
388394
self, target_snapshots: t.Iterable[Snapshot], deployability_index: DeployabilityIndex
@@ -1296,19 +1302,30 @@ def _create_catalogs(
12961302

12971303
def _create_schemas(
12981304
self,
1299-
tables: t.Iterable[t.Union[exp.Table, str]],
1300-
gateway: t.Optional[str] = None,
1305+
gateway_table_pairs: t.Iterable[t.Tuple[t.Optional[str], t.Union[exp.Table, str]]],
13011306
) -> None:
1302-
table_exprs = [exp.to_table(t) for t in tables]
1303-
unique_schemas = {(t.args["db"], t.args.get("catalog")) for t in table_exprs if t and t.db}
1304-
# Create schemas sequentially, since some engines (eg. Postgres) may not support concurrent creation
1305-
# of schemas with the same name.
1306-
for schema_name, catalog in unique_schemas:
1307+
table_exprs = [(gateway, exp.to_table(t)) for gateway, t in gateway_table_pairs]
1308+
unique_schemas = {
1309+
(gateway, t.args["db"], t.args.get("catalog"))
1310+
for gateway, t in table_exprs
1311+
if t and t.db
1312+
}
1313+
1314+
def _create_schema(
1315+
gateway: t.Optional[str], schema_name: str, catalog: t.Optional[str]
1316+
) -> None:
13071317
schema = schema_(schema_name, catalog)
13081318
logger.info("Creating schema '%s'", schema)
13091319
adapter = self.get_adapter(gateway)
13101320
adapter.create_schema(schema)
13111321

1322+
with self.concurrent_context():
1323+
concurrent_apply_to_values(
1324+
list(unique_schemas),
1325+
lambda item: _create_schema(item[0], item[1], item[2]),
1326+
self.ddl_concurrent_tasks,
1327+
)
1328+
13121329
def get_adapter(self, gateway: t.Optional[str] = None) -> EngineAdapter:
13131330
"""Returns the adapter for the specified gateway or the default adapter if none is provided."""
13141331
if gateway:

0 commit comments

Comments
 (0)