Skip to content

Commit d7e21f0

Browse files
Feat!: Add multiple engine project support (#3394)
1 parent 04feaff commit d7e21f0

File tree

26 files changed

+848
-105
lines changed

26 files changed

+848
-105
lines changed

docs/concepts/models/overview.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,9 @@ Learn more about these properties and their default values in the [model configu
351351

352352
NOTE: This can only be set for forward-only models.
353353

354+
### gateway
355+
: Specifies the gateway to use for the execution of this model. When not specified, the default gateway is used.
356+
354357
## Incremental Model Properties
355358

356359
These properties can be specified in an incremental model's `kind` definition.

docs/guides/multi_engine.md

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# Multi-Engine guide
2+
3+
Organizations typically connect to a data warehouse through a single engine to ensure data consistency. However, there are cases where the processing capabilities of one engine may be better suited to specific tasks than another.
4+
5+
By decoupling storage from compute and with growing support for open table formats like Apache Iceberg and Hive, different engines can now interact with the same data.
6+
7+
With SQLMesh's new multi-engine feature, users can leverage multiple engine adapters within a single project, offering the flexibility to choose the best engine for each task.
8+
9+
This feature allows you to run each model on a specified engine, provided the data catalog is shared and the engines support read/write operations on it.
10+
11+
12+
## Configuring project with multiple engines
13+
14+
To configure a SQLMesh project with multiple engines, simply include all required gateway [connections](../reference/configuration.md#connection) in your configuration.
15+
16+
Next, specify the appropriate `gateway` in the `MODEL` DDL for each model. If no gateway is explicitly defined, the default gateway will be used.
17+
18+
The [virtual layer](../concepts/glossary.md#virtual-layer) will be created within the engine corresponding to the default gateway.
19+
20+
### Example
21+
22+
Below is a simple example of setting up a project with connections to both DuckDB and PostgreSQL.
23+
24+
In this setup, the PostgreSQL engine is set as the default, so it will be used to manage views in the virtual layer.
25+
26+
Meanwhile, the DuckDB's [attach](https://duckdb.org/docs/sql/statements/attach.html) feature enables read-write access to the PostgreSQL catalog's physical tables.
27+
28+
=== "YAML"
29+
30+
```yaml linenums="1"
31+
gateways:
32+
duckdb:
33+
connection:
34+
type: duckdb
35+
catalogs:
36+
main_db:
37+
type: postgres
38+
path: 'dbname=main_db user=postgres host=127.0.0.1'
39+
extensions:
40+
- name: iceberg
41+
postgres:
42+
connection:
43+
type: postgres
44+
database: main_db
45+
user: user
46+
password: password
47+
host: 127.0.0.1
48+
port: 5432
49+
default_gateway: postgres
50+
```
51+
52+
=== "Python"
53+
54+
```python linenums="1"
55+
from sqlmesh.core.config import (
56+
Config,
57+
ModelDefaultsConfig,
58+
GatewayConfig,
59+
DuckDBConnectionConfig,
60+
PostgresConnectionConfig
61+
)
62+
from sqlmesh.core.config.connection import DuckDBAttachOptions
63+
64+
config = Config(
65+
model_defaults=ModelDefaultsConfig(dialect="postgres"),
66+
gateways={
67+
"duckdb": GatewayConfig(
68+
connection=DuckDBConnectionConfig(
69+
catalogs={
70+
"main_db": DuckDBAttachOptions(
71+
type="postgres",
72+
path="dbname=main_db user=postgres host=127.0.0.1"
73+
),
74+
},
75+
extensions=["iceberg"],
76+
)
77+
),
78+
"postgres": GatewayConfig(
79+
connection=PostgresConnectionConfig(
80+
host="127.0.0.1",
81+
port=5432,
82+
user="postgres",
83+
password="password",
84+
database="main_db",
85+
)
86+
),
87+
},
88+
default_gateway="postgres",
89+
)
90+
```
91+
92+
Given this configuration, when a model’s gateway is set to duckdb, it will be materialized within the PostgreSQL `main_db` catalog, but it will be evaluated using DuckDB’s engine.
93+
94+
95+
```sql linenums="1"
96+
MODEL (
97+
name orders.order_ship_date,
98+
kind FULL,
99+
gateway duckdb,
100+
);
101+
102+
SELECT
103+
l_orderkey,
104+
l_shipdate
105+
FROM
106+
iceberg_scan('data/bucket/lineitem_iceberg', allow_moved_paths = true);
107+
```
108+
109+
In this model, the DuckDB engine can be used to scan and load data from an iceberg table and create the physical table in the PostgreSQL database.
110+
111+
While the PostgreSQL engine is responsible for creating the model's view for the virtual layer.

docs/reference/model_configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ Configuration options for SQLMesh model properties. Supported by all model kinds
3737
| `session_properties` | A key-value mapping of arbitrary properties specific to the target engine that are applied to the engine session. Specified as key-value pairs (`key = value`). | dict | N |
3838
| `allow_partials` | Whether this model can process partial (incomplete) data intervals | bool | N |
3939
| `enabled` | Whether the model is enabled. This attribute is `true` by default. Setting it to `false` causes SQLMesh to ignore this model when loading the project. | bool | N |
40+
| `gateway` | Specifies the gateway to use for the execution of this model. When not specified, the default gateway is used. | str | N |
4041

4142
### Model defaults
4243

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ nav:
1515
- guides/projects.md
1616
- guides/multi_repo.md
1717
- guides/isolated_systems.md
18+
- guides/multi_engine.md
1819
- Project setup:
1920
- guides/configuration.md
2021
- guides/connections.md

sqlmesh/core/context.py

Lines changed: 72 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,11 @@
5656
from sqlmesh.core import constants as c
5757
from sqlmesh.core.analytics import python_api_analytics
5858
from sqlmesh.core.audit import Audit, ModelAudit, StandaloneAudit
59-
from sqlmesh.core.config import CategorizerConfig, Config, load_configs
59+
from sqlmesh.core.config import (
60+
CategorizerConfig,
61+
Config,
62+
load_configs,
63+
)
6064
from sqlmesh.core.config.loader import C
6165
from sqlmesh.core.console import Console, get_console
6266
from sqlmesh.core.context_diff import ContextDiff
@@ -289,7 +293,6 @@ class GenericContext(BaseContext, t.Generic[C]):
289293
"""Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.
290294
291295
Args:
292-
engine_adapter: The default engine adapter to use.
293296
notification_targets: The notification target to use. Defaults to what is defined in config.
294297
paths: The directories containing SQLMesh files.
295298
config: A Config object or the name of a Config object in config.py.
@@ -311,7 +314,6 @@ class GenericContext(BaseContext, t.Generic[C]):
311314

312315
def __init__(
313316
self,
314-
engine_adapter: t.Optional[EngineAdapter] = None,
315317
notification_targets: t.Optional[t.List[NotificationTarget]] = None,
316318
state_sync: t.Optional[StateSync] = None,
317319
paths: t.Union[str | Path, t.Iterable[str | Path]] = "",
@@ -367,19 +369,22 @@ def __init__(
367369
self.environment_ttl = self.config.environment_ttl
368370
self.pinned_environments = Environment.sanitize_names(self.config.pinned_environments)
369371
self.auto_categorize_changes = self.config.plan.auto_categorize_changes
372+
self.selected_gateway = gateway or self.config.default_gateway_name
370373

371374
self._connection_config = self.config.get_connection(self.gateway)
372375
self.concurrent_tasks = concurrent_tasks or self._connection_config.concurrent_tasks
373-
self._engine_adapter = engine_adapter or self._connection_config.create_engine_adapter()
374376

375-
self.console = console or get_console(dialect=self._engine_adapter.dialect)
377+
self._engine_adapters: t.Dict[str, EngineAdapter] = {
378+
self.selected_gateway: self._connection_config.create_engine_adapter()
379+
}
376380

381+
self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None
382+
383+
self.console = console or get_console(dialect=self.engine_adapter.dialect)
377384
self._test_connection_config = self.config.get_test_connection(
378385
self.gateway, self.default_catalog, default_catalog_dialect=self.engine_adapter.DIALECT
379386
)
380387

381-
self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None
382-
383388
self._provided_state_sync: t.Optional[StateSync] = state_sync
384389
self._state_sync: t.Optional[StateSync] = None
385390

@@ -406,24 +411,32 @@ def default_dialect(self) -> t.Optional[str]:
406411

407412
@property
408413
def engine_adapter(self) -> EngineAdapter:
409-
"""Returns an engine adapter."""
410-
return self._engine_adapter
414+
"""Returns the default engine adapter."""
415+
return self._engine_adapters[self.selected_gateway]
411416

412417
@property
413418
def snapshot_evaluator(self) -> SnapshotEvaluator:
414419
if not self._snapshot_evaluator:
420+
if self._snapshot_gateways:
421+
self._create_engine_adapters(set(self._snapshot_gateways.values()))
415422
self._snapshot_evaluator = SnapshotEvaluator(
416-
self.engine_adapter.with_log_level(logging.INFO),
423+
{
424+
gateway: adapter.with_log_level(logging.INFO)
425+
for gateway, adapter in self._engine_adapters.items()
426+
},
417427
ddl_concurrent_tasks=self.concurrent_tasks,
428+
selected_gateway=self.selected_gateway,
418429
)
419430
return self._snapshot_evaluator
420431

421432
def execution_context(
422-
self, deployability_index: t.Optional[DeployabilityIndex] = None
433+
self,
434+
deployability_index: t.Optional[DeployabilityIndex] = None,
435+
engine_adapter: t.Optional[EngineAdapter] = None,
423436
) -> ExecutionContext:
424437
"""Returns an execution context."""
425438
return ExecutionContext(
426-
engine_adapter=self._engine_adapter,
439+
engine_adapter=engine_adapter or self.engine_adapter,
427440
snapshots=self.snapshots,
428441
deployability_index=deployability_index,
429442
default_dialect=self.default_dialect,
@@ -905,7 +918,9 @@ def render(
905918
if model.is_seed:
906919
df = next(
907920
model.render(
908-
context=self.execution_context(),
921+
context=self.execution_context(
922+
engine_adapter=self._get_engine_adapter(model.gateway)
923+
),
909924
start=start,
910925
end=end,
911926
execution_time=execution_time,
@@ -924,7 +939,7 @@ def render(
924939
snapshots=snapshots,
925940
expand=expand,
926941
deployability_index=deployability_index,
927-
engine_adapter=self.engine_adapter,
942+
engine_adapter=self._get_engine_adapter(model.gateway),
928943
**kwargs,
929944
)
930945

@@ -950,7 +965,6 @@ def evaluate(
950965
limit: A limit applied to the model.
951966
"""
952967
snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True)
953-
954968
df = self.snapshot_evaluator.evaluate_and_fetch(
955969
snapshot,
956970
start=start,
@@ -1459,8 +1473,10 @@ def table_diff(
14591473
"""
14601474
source_alias, target_alias = source, target
14611475

1476+
adapter = self.engine_adapter
14621477
if model_or_snapshot:
14631478
model = self.get_model(model_or_snapshot, raise_if_missing=True)
1479+
adapter = self._get_engine_adapter(model.gateway)
14641480
source_env = self.state_reader.get_environment(source)
14651481
target_env = self.state_reader.get_environment(target)
14661482

@@ -1495,7 +1511,7 @@ def table_diff(
14951511
)
14961512

14971513
table_diff = TableDiff(
1498-
adapter=self._engine_adapter,
1514+
adapter=adapter,
14991515
source=source,
15001516
target=target,
15011517
on=on,
@@ -1619,14 +1635,15 @@ def create_test(
16191635
}
16201636

16211637
try:
1638+
model_to_test = self.get_model(model, raise_if_missing=True)
16221639
test_adapter = self._test_connection_config.create_engine_adapter(
16231640
register_comments_override=False
16241641
)
16251642
generate_test(
1626-
model=self.get_model(model, raise_if_missing=True),
1643+
model=model_to_test,
16271644
input_queries=input_queries,
16281645
models=self._models,
1629-
engine_adapter=self._engine_adapter,
1646+
engine_adapter=self._get_engine_adapter(model_to_test.gateway),
16301647
test_engine_adapter=test_adapter,
16311648
project_path=self.path,
16321649
overwrite=overwrite,
@@ -1840,7 +1857,7 @@ def create_external_models(self, strict: bool = False) -> None:
18401857
if self.config_for_node(model) is config
18411858
},
18421859
),
1843-
adapter=self._engine_adapter,
1860+
adapter=self.engine_adapter,
18441861
state_reader=self.state_reader,
18451862
dialect=config.model_defaults.dialect,
18461863
gateway=self.gateway,
@@ -1867,7 +1884,7 @@ def print_info(self, skip_connection: bool = False, verbose: bool = False) -> No
18671884
self.config.get_state_connection(self.gateway), self.console, "State Connection"
18681885
)
18691886

1870-
self._try_connection("data warehouse", self._engine_adapter.ping)
1887+
self._try_connection("data warehouse", self.engine_adapter.ping)
18711888
state_connection = self.config.get_state_connection(self.gateway)
18721889
if state_connection:
18731890
self._try_connection("state backend", state_connection.connection_validator())
@@ -1958,7 +1975,9 @@ def _run_plan_tests(
19581975
result, test_output = self._run_tests()
19591976
if result.testsRun > 0:
19601977
self.console.log_test_results(
1961-
result, test_output, self._test_connection_config._engine_adapter.DIALECT
1978+
result,
1979+
test_output,
1980+
self._test_connection_config._engine_adapter.DIALECT,
19621981
)
19631982
if not result.wasSuccessful():
19641983
raise PlanError(
@@ -1988,6 +2007,35 @@ def _model_tables(self) -> t.Dict[str, str]:
19882007
for fqn, snapshot in self.snapshots.items()
19892008
}
19902009

2010+
@property
2011+
def _snapshot_gateways(self) -> t.Dict[str, str]:
2012+
"""Mapping of snapshot name to the gateway if specified in the model."""
2013+
2014+
return {
2015+
fqn: snapshot.model.gateway
2016+
for fqn, snapshot in self.snapshots.items()
2017+
if snapshot.is_model and snapshot.model.gateway
2018+
}
2019+
2020+
def _create_engine_adapters(self, gateways: t.Optional[t.Set] = None) -> None:
2021+
"""Create engine adapters for the gateways, when none provided include all defined in the configs."""
2022+
2023+
for gateway_name in self.config.gateways:
2024+
if gateway_name != self.selected_gateway and (
2025+
gateways is None or gateway_name in gateways
2026+
):
2027+
connection = self.config.get_connection(gateway_name)
2028+
adapter = connection.create_engine_adapter()
2029+
self.concurrent_tasks = min(self.concurrent_tasks, connection.concurrent_tasks)
2030+
self._engine_adapters[gateway_name] = adapter
2031+
2032+
def _get_engine_adapter(self, gateway: t.Optional[str] = None) -> EngineAdapter:
2033+
if gateway:
2034+
if adapter := self._engine_adapters.get(gateway):
2035+
return adapter
2036+
raise SQLMeshError(f"Gateway '{gateway}' not found in the available engine adapters.")
2037+
return self.engine_adapter
2038+
19912039
def _snapshots(
19922040
self, models_override: t.Optional[UniqueKeyDict[str, Model]] = None
19932041
) -> t.Dict[str, Snapshot]:
@@ -2085,7 +2133,9 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
20852133
self._cleanup_environments()
20862134
expired_snapshots = self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl)
20872135
self.snapshot_evaluator.cleanup(
2088-
expired_snapshots, on_complete=self.console.update_cleanup_progress
2136+
expired_snapshots,
2137+
self._snapshot_gateways,
2138+
on_complete=self.console.update_cleanup_progress,
20892139
)
20902140

20912141
self.state_sync.compact_intervals()

sqlmesh/core/engine_adapter/mixins.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ def _build_create_table_exp(
299299
select_or_union.set("where", None)
300300

301301
temp_view_name = self._get_temp_table("ctas")
302+
302303
self.create_view(
303304
temp_view_name, select_statement, replace=False, no_schema_binding=False
304305
)

0 commit comments

Comments
 (0)