Skip to content

Commit 4379581

Browse files
authored
Feat!: Dev-only VDE mode (#5087)
1 parent 54320bd commit 4379581

40 files changed

+1437
-263
lines changed

docs/guides/configuration.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,44 @@ sqlmesh_md5__d3b07384d113edec49eaa6238ad5ff00__dev
538538

539539
This has a downside that now it's much more difficult to determine which table corresponds to which model by just looking at the database with a SQL client. However, the table names have a predictable length so there are no longer any surprises with identfiers exceeding the max length at the physical layer.
540540

541+
#### Virtual Data Environment Modes
542+
543+
By default, Virtual Data Environments (VDE) are applied across both development and production environments. This allows SQLMesh to reuse physical tables when appropriate, even when promoting from development to production.
544+
545+
However, users may prefer their production environment to be non-virtual. The non-exhaustive list of reasons may include:
546+
547+
- Integration with third-party tools and platforms, such as data catalogs, may not work well with the virtual view layer that SQLMesh imposes by default
548+
- A desire to rely on time travel features provided by cloud data warehouses such as BigQuery, Snowflake, and Databricks
549+
550+
To mitigate this, SQLMesh offers an alternative 'dev-only' mode for using VDE. It can be enabled in the project configuration like so:
551+
552+
=== "YAML"
553+
554+
```yaml linenums="1"
555+
virtual_environment_mode: dev_only
556+
```
557+
558+
=== "Python"
559+
560+
```python linenums="1"
561+
from sqlmesh.core.config import Config
562+
563+
config = Config(
564+
virtual_environment_mode="dev_only",
565+
)
566+
```
567+
568+
'dev-only' mode means that VDE is applied only in development environments. While in production, model tables and views are updated directly and bypass the virtual layer. This also means that physical tables in production will be created using the original, **unversioned** model names. Users will still benefit from VDE and data reuse across development environments.
569+
570+
Please note the following tradeoffs when enabling this mode:
571+
572+
- All data inserted in development environments is used only for [preview](../concepts/plans.md#data-preview-for-forward-only-changes) and will **not** be reused in production
573+
- Reverting a model to a previous version will be applied going forward and may require an explicit data restatement
574+
575+
!!! warning
576+
Switching the mode for an existing project will result in a **complete rebuild** of all models in the project. Refer to the [Table Migration Guide](./table_migration.md) to migrate existing tables without rebuilding them from scratch.
577+
578+
541579
#### Environment view catalogs
542580

543581
By default, SQLMesh creates an environment view in the same [catalog](../concepts/glossary.md#catalog) as the physical table the view points to. The physical table's catalog is determined by either the catalog specified in the model name or the default catalog defined in the connection.

docs/reference/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ Configuration options for how SQLMesh manages environment creation and promotion
4646
| `environment_suffix_target` | Whether SQLMesh views should append their environment name to the `schema`, `table` or `catalog` - [additional details](../guides/configuration.md#view-schema-override). (Default: `schema`) | string | N |
4747
| `gateway_managed_virtual_layer` | Whether SQLMesh views of the virtual layer will be created by the default gateway or model specified gateways - [additional details](../guides/multi_engine.md#gateway-managed-virtual-layer). (Default: False) | boolean | N |
4848
| `environment_catalog_mapping` | A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment. | dict[string, string] | N |
49+
| `virtual_environment_mode` | Determines the Virtual Data Environment (VDE) mode. If set to `full`, VDE is used in both production and development environments. The `dev_only` option enables VDE only in development environments, while in production, no virtual layer is used and models are materialized directly using their original names (i.e., no versioned physical tables). (Default: `full`) | string | N |
4950

5051
### Models
5152

examples/sushi/config.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22

3+
from sqlmesh.core.config.common import VirtualEnvironmentMode
34
from sqlmesh.core.config import (
45
AutoCategorizationMode,
56
BigQueryConnectionConfig,
@@ -76,6 +77,16 @@
7677
model_defaults=model_defaults,
7778
)
7879

80+
# A configuration used for SQLMesh tests with virtual environment mode set to DEV_ONLY.
81+
test_config_virtual_environment_mode_dev_only = test_config.copy(
82+
update={
83+
"virtual_environment_mode": VirtualEnvironmentMode.DEV_ONLY,
84+
"plan": PlanConfig(
85+
auto_categorize_changes=CategorizerConfig.all_full(),
86+
),
87+
}
88+
)
89+
7990
# A DuckDB config with a physical schema map.
8091
map_config = Config(
8192
default_connection=DuckDBConnectionConfig(),

sqlmesh/core/config/common.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,35 @@ def __repr__(self) -> str:
4949
return str(self)
5050

5151

52+
class VirtualEnvironmentMode(str, Enum):
53+
"""Mode for virtual environment behavior.
54+
55+
FULL: Use full virtual environment functionality with versioned table names and virtual layer updates.
56+
DEV_ONLY: Bypass virtual environments in production, using original unversioned model names.
57+
"""
58+
59+
FULL = "full"
60+
DEV_ONLY = "dev_only"
61+
62+
@property
63+
def is_full(self) -> bool:
64+
return self == VirtualEnvironmentMode.FULL
65+
66+
@property
67+
def is_dev_only(self) -> bool:
68+
return self == VirtualEnvironmentMode.DEV_ONLY
69+
70+
@classproperty
71+
def default(cls) -> VirtualEnvironmentMode:
72+
return VirtualEnvironmentMode.FULL
73+
74+
def __str__(self) -> str:
75+
return self.name
76+
77+
def __repr__(self) -> str:
78+
return str(self)
79+
80+
5281
class TableNamingConvention(str, Enum):
5382
# Causes table names at the physical layer to follow the convention:
5483
# <schema-name>__<table-name>__<fingerprint>

sqlmesh/core/config/root.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@
1414
from sqlmesh.cicd.config import CICDBotConfig
1515
from sqlmesh.core import constants as c
1616
from sqlmesh.core.console import get_console
17-
from sqlmesh.core.config import EnvironmentSuffixTarget, TableNamingConvention
17+
from sqlmesh.core.config.common import (
18+
EnvironmentSuffixTarget,
19+
TableNamingConvention,
20+
VirtualEnvironmentMode,
21+
)
1822
from sqlmesh.core.config.base import BaseConfig, UpdateStrategy
1923
from sqlmesh.core.config.common import variables_validator, compile_regex_mapping
2024
from sqlmesh.core.config.connection import (
@@ -110,6 +114,7 @@ class Config(BaseConfig):
110114
physical_schema_mapping: A mapping from regular expressions to names of schemas in which physical tables for corresponding models will be placed.
111115
environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
112116
physical_table_naming_convention: Indicates how tables should be named at the physical layer
117+
virtual_environment_mode: Indicates how environments should be handled.
113118
gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway.
114119
infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python package requirements.
115120
environment_catalog_mapping: A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment.
@@ -148,12 +153,9 @@ class Config(BaseConfig):
148153
env_vars: t.Dict[str, str] = {}
149154
username: str = ""
150155
physical_schema_mapping: RegexKeyDict = {}
151-
environment_suffix_target: EnvironmentSuffixTarget = Field(
152-
default=EnvironmentSuffixTarget.default
153-
)
154-
physical_table_naming_convention: TableNamingConvention = Field(
155-
default=TableNamingConvention.default
156-
)
156+
environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.default
157+
physical_table_naming_convention: TableNamingConvention = TableNamingConvention.default
158+
virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default
157159
gateway_managed_virtual_layer: bool = False
158160
infer_python_dependencies: bool = True
159161
environment_catalog_mapping: RegexKeyDict = {}
@@ -260,6 +262,11 @@ def _normalize_identifiers(key: str) -> None:
260262
"Please specify one or the other"
261263
)
262264

265+
if self.plan.use_finalized_state and not self.virtual_environment_mode.is_full:
266+
raise ConfigError(
267+
"Using the finalized state is only supported when `virtual_environment_mode` is set to `full`."
268+
)
269+
263270
if self.environment_catalog_mapping:
264271
_normalize_identifiers("environment_catalog_mapping")
265272
if self.physical_schema_mapping:

sqlmesh/core/context.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1616,6 +1616,11 @@ def plan_builder(
16161616
max_interval_end_per_model,
16171617
)
16181618

1619+
if not self.config.virtual_environment_mode.is_full:
1620+
forward_only = True
1621+
elif forward_only is None:
1622+
forward_only = self.config.plan.forward_only
1623+
16191624
return self.PLAN_BUILDER_TYPE(
16201625
context_diff=context_diff,
16211626
start=start,
@@ -1628,9 +1633,7 @@ def plan_builder(
16281633
skip_backfill=skip_backfill,
16291634
empty_backfill=empty_backfill,
16301635
is_dev=is_dev,
1631-
forward_only=(
1632-
forward_only if forward_only is not None else self.config.plan.forward_only
1633-
),
1636+
forward_only=forward_only,
16341637
allow_destructive_models=expanded_destructive_models,
16351638
environment_ttl=environment_ttl,
16361639
environment_suffix_target=self.config.environment_suffix_target,
@@ -2936,7 +2939,7 @@ def _node_or_snapshot_to_fqn(self, node_or_snapshot: NodeOrSnapshot) -> str:
29362939
def _plan_preview_enabled(self) -> bool:
29372940
if self.config.plan.enable_preview is not None:
29382941
return self.config.plan.enable_preview
2939-
# It is dangerous to enable preview by default for dbt projects that rely on engines that dont support cloning.
2942+
# It is dangerous to enable preview by default for dbt projects that rely on engines that don't support cloning.
29402943
# Enabling previews in such cases can result in unintended full refreshes because dbt incremental models rely on
29412944
# the maximum timestamp value in the target table.
29422945
return self._project_type == c.NATIVE or self.engine_adapter.SUPPORTS_CLONING

sqlmesh/core/engine_adapter/base.py

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,21 @@
3232
CommentCreationTable,
3333
CommentCreationView,
3434
DataObject,
35+
DataObjectType,
3536
EngineRunMode,
3637
InsertOverwriteStrategy,
3738
SourceQuery,
3839
set_catalog,
3940
)
4041
from sqlmesh.core.model.kind import TimeColumn
4142
from sqlmesh.core.schema_diff import SchemaDiffer
42-
from sqlmesh.utils import columns_to_types_all_known, random_id, CorrelationId
43-
from sqlmesh.utils.connection_pool import create_connection_pool, ConnectionPool
43+
from sqlmesh.utils import CorrelationId, columns_to_types_all_known, random_id
44+
from sqlmesh.utils.connection_pool import ConnectionPool, create_connection_pool
4445
from sqlmesh.utils.date import TimeLike, make_inclusive, to_time_column
4546
from sqlmesh.utils.errors import (
47+
MissingDefaultCatalogError,
4648
SQLMeshError,
4749
UnsupportedCatalogOperationError,
48-
MissingDefaultCatalogError,
4950
)
5051
from sqlmesh.utils.pandas import columns_to_types_from_df
5152

@@ -54,8 +55,8 @@
5455

5556
from sqlmesh.core._typing import SchemaName, SessionProperties, TableName
5657
from sqlmesh.core.engine_adapter._typing import (
57-
BigframeSession,
5858
DF,
59+
BigframeSession,
5960
PySparkDataFrame,
6061
PySparkSession,
6162
Query,
@@ -369,6 +370,12 @@ def replace_query(
369370
kwargs: Optional create table properties.
370371
"""
371372
target_table = exp.to_table(table_name)
373+
374+
target_data_object = self.get_data_object(target_table)
375+
table_exists = target_data_object is not None
376+
if self.drop_data_object_on_type_mismatch(target_data_object, DataObjectType.TABLE):
377+
table_exists = False
378+
372379
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
373380
query_or_df, columns_to_types, target_table=target_table
374381
)
@@ -390,7 +397,7 @@ def replace_query(
390397
)
391398
# All engines support `CREATE TABLE AS` so we use that if the table doesn't already exist and we
392399
# use `CREATE OR REPLACE TABLE AS` if the engine supports it
393-
if self.SUPPORTS_REPLACE_TABLE or not self.table_exists(target_table):
400+
if self.SUPPORTS_REPLACE_TABLE or not table_exists:
394401
return self._create_table_from_source_queries(
395402
target_table,
396403
source_queries,
@@ -930,6 +937,28 @@ def clone_table(
930937
)
931938
)
932939

940+
def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool = True) -> None:
941+
"""Drops a data object of arbitrary type.
942+
943+
Args:
944+
data_object: The data object to drop.
945+
ignore_if_not_exists: If True, no error will be raised if the data object does not exist.
946+
"""
947+
if data_object.type.is_view:
948+
self.drop_view(data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists)
949+
elif data_object.type.is_materialized_view:
950+
self.drop_view(
951+
data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists, materialized=True
952+
)
953+
elif data_object.type.is_table:
954+
self.drop_table(data_object.to_table(), exists=ignore_if_not_exists)
955+
elif data_object.type.is_managed_table:
956+
self.drop_managed_table(data_object.to_table(), exists=ignore_if_not_exists)
957+
else:
958+
raise SQLMeshError(
959+
f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'"
960+
)
961+
933962
def drop_table(self, table_name: TableName, exists: bool = True) -> None:
934963
"""Drops a table.
935964

@@ -1118,6 +1147,12 @@ def create_view(
11181147
if properties.expressions:
11191148
create_kwargs["properties"] = properties
11201149

1150+
if replace:
1151+
self.drop_data_object_on_type_mismatch(
1152+
self.get_data_object(view_name),
1153+
DataObjectType.VIEW if not materialized else DataObjectType.MATERIALIZED_VIEW,
1154+
)
1155+
11211156
with source_queries[0] as query:
11221157
self.execute(
11231158
exp.Create(
@@ -2022,6 +2057,15 @@ def rename_table(
20222057
)
20232058
self._rename_table(old_table_name, new_table_name)
20242059

2060+
def get_data_object(self, target_name: TableName) -> t.Optional[DataObject]:
2061+
target_table = exp.to_table(target_name)
2062+
existing_data_objects = self.get_data_objects(
2063+
schema_(target_table.db, target_table.catalog), {target_table.name}
2064+
)
2065+
if existing_data_objects:
2066+
return existing_data_objects[0]
2067+
return None
2068+
20252069
def get_data_objects(
20262070
self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
20272071
) -> t.List[DataObject]:
@@ -2483,6 +2527,30 @@ def _truncate_table(self, table_name: TableName) -> None:
24832527
table = exp.to_table(table_name)
24842528
self.execute(f"TRUNCATE TABLE {table.sql(dialect=self.dialect, identify=True)}")
24852529

2530+
def drop_data_object_on_type_mismatch(
2531+
self, data_object: t.Optional[DataObject], expected_type: DataObjectType
2532+
) -> bool:
2533+
"""Drops a data object if it exists and is not of the expected type.
2534+
2535+
Args:
2536+
data_object: The data object to check.
2537+
expected_type: The expected type of the data object.
2538+
2539+
Returns:
2540+
True if the data object was dropped, False otherwise.
2541+
"""
2542+
if data_object is None or data_object.type == expected_type:
2543+
return False
2544+
2545+
logger.warning(
2546+
"Target data object '%s' is a %s and not a %s, dropping it",
2547+
data_object.to_table().sql(dialect=self.dialect),
2548+
data_object.type.value,
2549+
expected_type.value,
2550+
)
2551+
self.drop_data_object(data_object)
2552+
return True
2553+
24862554
def _replace_by_key(
24872555
self,
24882556
target_table: TableName,

sqlmesh/core/engine_adapter/redshift.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,12 @@ def replace_query(
262262
"""
263263
import pandas as pd
264264

265-
if not isinstance(query_or_df, pd.DataFrame) or not self.table_exists(table_name):
265+
target_data_object = self.get_data_object(table_name)
266+
table_exists = target_data_object is not None
267+
if self.drop_data_object_on_type_mismatch(target_data_object, DataObjectType.TABLE):
268+
table_exists = False
269+
270+
if not isinstance(query_or_df, pd.DataFrame) or not table_exists:
266271
return super().replace_query(
267272
table_name,
268273
query_or_df,

sqlmesh/core/engine_adapter/shared.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ class DataObject(PydanticModel):
171171
def is_clustered(self) -> bool:
172172
return bool(self.clustering_key)
173173

174+
def to_table(self) -> exp.Table:
175+
return exp.table_(self.name, db=self.schema_name, catalog=self.catalog, quoted=True)
176+
174177

175178
class CatalogSupport(Enum):
176179
# The engine has no concept of catalogs

sqlmesh/core/loader.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,7 @@ def _load_sql_models(
603603
infer_names=self.config.model_naming.infer_names,
604604
signal_definitions=signals,
605605
default_catalog_per_gateway=self.context.default_catalog_per_gateway,
606+
virtual_environment_mode=self.config.virtual_environment_mode,
606607
**loading_default_kwargs or {},
607608
)
608609

@@ -683,6 +684,7 @@ def _load_python_models(
683684
audit_definitions=audits,
684685
signal_definitions=signals,
685686
default_catalog_per_gateway=self.context.default_catalog_per_gateway,
687+
virtual_environment_mode=self.config.virtual_environment_mode,
686688
):
687689
if model.enabled:
688690
models[model.fqn] = model

0 commit comments

Comments
 (0)