Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions docs/concepts/models/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -507,11 +507,15 @@ Some properties are only available in specific model kinds - see the [model conf
: Set this to true to indicate that all changes to this model should be [forward-only](../plans.md#forward-only-plans).

### on_destructive_change
: What should happen when a change to a [forward-only model](../../guides/incremental_time.md#forward-only-models) or incremental model in a [forward-only plan](../plans.md#forward-only-plans) causes a destructive modification to the table schema (i.e., requires dropping an existing column).
: What should happen when a change to a [forward-only model](../../guides/incremental_time.md#forward-only-models) or incremental model in a [forward-only plan](../plans.md#forward-only-plans) causes a destructive modification to the table schema (i.e., requires dropping an existing column or modifying column constraints in ways that could cause data loss).

SQLMesh checks for destructive changes at plan time based on the model definition and run time based on the model's underlying physical tables.

Must be one of the following values: `allow`, `warn`, or `error` (default).
Must be one of the following values: `allow`, `warn`, `error` (default), or `ignore`.

!!! warning "Ignore is Dangerous"

`ignore` is dangerous since it can result in error or data loss. It likely should never be used but could be useful as an "escape-hatch" or a way to workaround unexpected behavior.

### disable_restatement
: Set this to true to indicate that [data restatement](../plans.md#restatement-plans) is disabled for this model.
Expand Down
4 changes: 4 additions & 0 deletions docs/guides/custom_materializations.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class CustomFullMaterialization(CustomMaterialization):
query_or_df: QueryOrDF,
model: Model,
is_first_insert: bool,
render_kwargs: t.Dict[str, t.Any],
**kwargs: t.Any,
) -> None:
self.adapter.replace_query(table_name, query_or_df)
Expand All @@ -78,6 +79,7 @@ Let's unpack this materialization:
* `query_or_df` - a query (of SQLGlot expression type) or DataFrame (Pandas, PySpark, or Snowpark) instance to be inserted
* `model` - the model definition object used to access model parameters and user-specified materialization arguments
* `is_first_insert` - whether this is the first insert for the current version of the model (used with batched or multi-step inserts)
* `render_kwargs` - a dictionary of arguments used to render the model query
* `kwargs` - additional and future arguments
* The `self.adapter` instance is used to interact with the target engine. It comes with a set of useful high-level APIs like `replace_query`, `columns`, and `table_exists`, but also supports executing arbitrary SQL expressions with its `execute` method.

Expand Down Expand Up @@ -150,6 +152,7 @@ class CustomFullMaterialization(CustomMaterialization):
query_or_df: QueryOrDF,
model: Model,
is_first_insert: bool,
render_kwargs: t.Dict[str, t.Any],
**kwargs: t.Any,
) -> None:
config_value = model.custom_materialization_properties["config_key"]
Expand Down Expand Up @@ -232,6 +235,7 @@ class CustomFullMaterialization(CustomMaterialization[MyCustomKind]):
query_or_df: QueryOrDF,
model: Model,
is_first_insert: bool,
render_kwargs: t.Dict[str, t.Any],
**kwargs: t.Any,
) -> None:
assert isinstance(model.kind, MyCustomKind)
Expand Down
7 changes: 6 additions & 1 deletion docs/guides/incremental_time.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ The check is performed at plan time based on the model definition. SQLMesh may n

A model's `on_destructive_change` [configuration setting](../reference/model_configuration.md#incremental-models) determines what happens when SQLMesh detects a destructive change.

By default, SQLMesh will error so no data is lost. You can set `on_destructive_change` to `warn` or `allow` in the model's `MODEL` block to allow destructive changes.
By default, SQLMesh will error so no data is lost. You can set `on_destructive_change` to `warn` or `allow` in the model's `MODEL` block to allow destructive changes.
`ignore` can be used to not perform the schema change and allow the table's definition to diverge from the model definition.

!!! warning "Ignore is Dangerous"

`ignore` is dangerous since it can result in error or data loss. It likely should never be used but could be useful as an "escape-hatch" or a way to workaround unexpected behavior.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would it result in data loss?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also how can it result in error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also how can it result in error?

If you made a data type change to a new type that isn't coerceable. The next time it runs it would error trying to insert the rows (since the underlying table wasn't actually changed).

Why would it result in data loss?

Not certain yet. Putting this warning for now as we do more testing with this fully implemented in the dbt adapter. After a bit if we don't identify any issues I can remove this part of the warning. For now though I would rather overly caution native users about this feature.


This example configures a model to silently `allow` destructive changes:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ def insert(
query_or_df: QueryOrDF,
model: Model,
is_first_insert: bool,
render_kwargs: t.Dict[str, t.Any],
**kwargs: t.Any,
) -> None:
assert type(model.kind).__name__ == "ExtendedCustomKind"

self._replace_query_for_model(model, table_name, query_or_df)
self._replace_query_for_model(model, table_name, query_or_df, render_kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def insert(
query_or_df: QueryOrDF,
model: Model,
is_first_insert: bool,
render_kwargs: t.Dict[str, t.Any],
**kwargs: t.Any,
) -> None:
self._replace_query_for_model(model, table_name, query_or_df)
self._replace_query_for_model(model, table_name, query_or_df, render_kwargs)
35 changes: 25 additions & 10 deletions sqlmesh/core/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from sqlglot.tokens import Token

from sqlmesh.core.constants import MAX_MODEL_DEFINITION_SIZE
from sqlmesh.utils import get_source_columns_to_types
from sqlmesh.utils.errors import SQLMeshError, ConfigError
from sqlmesh.utils.pandas import columns_to_types_from_df

Expand Down Expand Up @@ -1121,7 +1122,7 @@ def select_from_values(
for i in range(0, num_rows, batch_size):
yield select_from_values_for_batch_range(
values=values,
columns_to_types=columns_to_types,
target_columns_to_types=columns_to_types,
batch_start=i,
batch_end=min(i + batch_size, num_rows),
alias=alias,
Expand All @@ -1130,35 +1131,49 @@ def select_from_values(

def select_from_values_for_batch_range(
values: t.List[t.Tuple[t.Any, ...]],
columns_to_types: t.Dict[str, exp.DataType],
target_columns_to_types: t.Dict[str, exp.DataType],
batch_start: int,
batch_end: int,
alias: str = "t",
source_columns: t.Optional[t.List[str]] = None,
) -> exp.Select:
casted_columns = [
exp.alias_(exp.cast(exp.column(column), to=kind), column, copy=False)
for column, kind in columns_to_types.items()
]
source_columns = source_columns or list(target_columns_to_types)
source_columns_to_types = get_source_columns_to_types(target_columns_to_types, source_columns)

if not values:
# Ensures we don't generate an empty VALUES clause & forces a zero-row output
where = exp.false()
expressions = [tuple(exp.cast(exp.null(), to=kind) for kind in columns_to_types.values())]
expressions = [
tuple(exp.cast(exp.null(), to=kind) for kind in source_columns_to_types.values())
]
else:
where = None
expressions = [
tuple(transform_values(v, columns_to_types)) for v in values[batch_start:batch_end]
tuple(transform_values(v, source_columns_to_types))
for v in values[batch_start:batch_end]
]

values_exp = exp.values(expressions, alias=alias, columns=columns_to_types)
values_exp = exp.values(expressions, alias=alias, columns=source_columns_to_types)
if values:
# BigQuery crashes on `SELECT CAST(x AS TIMESTAMP) FROM UNNEST([NULL]) AS x`, but not
# on `SELECT CAST(x AS TIMESTAMP) FROM UNNEST([CAST(NULL AS TIMESTAMP)]) AS x`. This
# ensures nulls under the `Values` expression are cast to avoid similar issues.
for value, kind in zip(values_exp.expressions[0].expressions, columns_to_types.values()):
for value, kind in zip(
values_exp.expressions[0].expressions, source_columns_to_types.values()
):
if isinstance(value, exp.Null):
value.replace(exp.cast(value, to=kind))

casted_columns = [
exp.alias_(
exp.cast(
exp.column(column) if column in source_columns_to_types else exp.Null(), to=kind
),
column,
copy=False,
)
for column, kind in target_columns_to_types.items()
]
return exp.select(*casted_columns).from_(values_exp, copy=False).where(where, copy=False)


Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/engine_adapter/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

snowpark = optional_import("snowflake.snowpark")

Query = t.Union[exp.Query, exp.DerivedTable]
Query = exp.Query
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed internally and we don't see a reason why exp.DerivedTable was expected before and that code is from version 0.0.2 so assuming it is no longer needed.

PySparkSession = t.Union[pyspark.sql.SparkSession, pyspark.sql.connect.dataframe.SparkSession]
PySparkDataFrame = t.Union[pyspark.sql.DataFrame, pyspark.sql.connect.dataframe.DataFrame]

Expand Down
26 changes: 14 additions & 12 deletions sqlmesh/core/engine_adapter/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ def catalog_support(self) -> CatalogSupport:
def create_state_table(
self,
table_name: str,
columns_to_types: t.Dict[str, exp.DataType],
target_columns_to_types: t.Dict[str, exp.DataType],
primary_key: t.Optional[t.Tuple[str, ...]] = None,
) -> None:
self.create_table(
table_name,
columns_to_types,
target_columns_to_types,
primary_key=primary_key,
# it's painfully slow, but it works
table_format="iceberg",
Expand Down Expand Up @@ -178,7 +178,7 @@ def _build_create_table_exp(
expression: t.Optional[exp.Expression],
exists: bool = True,
replace: bool = False,
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
table_description: t.Optional[str] = None,
table_kind: t.Optional[str] = None,
partitioned_by: t.Optional[t.List[exp.Expression]] = None,
Expand All @@ -198,7 +198,7 @@ def _build_create_table_exp(
properties = self._build_table_properties_exp(
table=table,
expression=expression,
columns_to_types=columns_to_types,
target_columns_to_types=target_columns_to_types,
partitioned_by=partitioned_by,
table_properties=table_properties,
table_description=table_description,
Expand Down Expand Up @@ -237,7 +237,7 @@ def _build_table_properties_exp(
partition_interval_unit: t.Optional[IntervalUnit] = None,
clustered_by: t.Optional[t.List[exp.Expression]] = None,
table_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
table_description: t.Optional[str] = None,
table_kind: t.Optional[str] = None,
table: t.Optional[exp.Table] = None,
Expand Down Expand Up @@ -265,12 +265,12 @@ def _build_table_properties_exp(

if partitioned_by:
schema_expressions: t.List[exp.Expression] = []
if is_hive and columns_to_types:
if is_hive and target_columns_to_types:
# For Hive-style tables, you cannot include the partitioned by columns in the main set of columns
# In the PARTITIONED BY expression, you also cant just include the column names, you need to include the data type as well
# ref: https://docs.aws.amazon.com/athena/latest/ug/partitions.html
for match_name, match_dtype in self._find_matching_columns(
partitioned_by, columns_to_types
partitioned_by, target_columns_to_types
):
column_def = exp.ColumnDef(this=exp.to_identifier(match_name), kind=match_dtype)
schema_expressions.append(column_def)
Expand Down Expand Up @@ -431,9 +431,10 @@ def replace_query(
self,
table_name: TableName,
query_or_df: QueryOrDF,
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
table_description: t.Optional[str] = None,
column_descriptions: t.Optional[t.Dict[str, str]] = None,
source_columns: t.Optional[t.List[str]] = None,
**kwargs: t.Any,
) -> None:
table = exp.to_table(table_name)
Expand All @@ -444,17 +445,18 @@ def replace_query(
return super().replace_query(
table_name=table,
query_or_df=query_or_df,
columns_to_types=columns_to_types,
target_columns_to_types=target_columns_to_types,
table_description=table_description,
column_descriptions=column_descriptions,
source_columns=source_columns,
**kwargs,
)

def _insert_overwrite_by_time_partition(
self,
table_name: TableName,
source_queries: t.List[SourceQuery],
columns_to_types: t.Dict[str, exp.DataType],
target_columns_to_types: t.Dict[str, exp.DataType],
where: exp.Condition,
**kwargs: t.Any,
) -> None:
Expand All @@ -465,7 +467,7 @@ def _insert_overwrite_by_time_partition(
if table_type == "iceberg":
# Iceberg tables work as expected, we can use the default behaviour
return super()._insert_overwrite_by_time_partition(
table, source_queries, columns_to_types, where, **kwargs
table, source_queries, target_columns_to_types, where, **kwargs
)

# For Hive tables, we need to drop all the partitions covered by the query and delete the data from S3
Expand All @@ -475,7 +477,7 @@ def _insert_overwrite_by_time_partition(
return super()._insert_overwrite_by_time_partition(
table,
source_queries,
columns_to_types,
target_columns_to_types,
where,
insert_overwrite_strategy_override=InsertOverwriteStrategy.INTO_IS_OVERWRITE, # since we already cleared the data
**kwargs,
Expand Down
Loading