Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 8 additions & 40 deletions docs/concepts/models/model_kinds.md
Original file line number Diff line number Diff line change
Expand Up @@ -935,13 +935,7 @@ SQLMesh achieves this by adding a `valid_from` and `valid_to` column to your mod

Therefore, you can use these models to not only tell you what the latest value is for a given record but also what the values were anytime in the past. Note that maintaining this history does come at a cost of increased storage and compute and this may not be a good fit for sources that change frequently since the history could get very large.

**Note**: SCD Type 2 models support [restatements](../plans.md#restatement-plans) with specific limitations:

- **Full restatements**: The entire table will be recreated from scratch when no start date is specified
- **Partial restatements**: You can specify a start date to restate data from a certain point onwards to the latest interval. The end date will always be set to the latest interval's end date, regardless of what end date you specify
- **Partial sections**: Restatements of specific sections (discontinued ranges) of the table are not supported

Data restatement is disabled for models of this kind by default (`disable_restatement true`). To enable restatements, set `disable_restatement false` in your model configuration.
**Note**: Partial data [restatement](../plans.md#restatement-plans) is not supported for this model kind, which means that the entire table will be recreated from scratch if restated. This may lead to data loss, so data restatement is disabled for models of this kind by default.

There are two ways to tracking changes: By Time (Recommended) or By Column.

Expand Down Expand Up @@ -1289,11 +1283,11 @@ This is the most accurate representation of the menu based on the source data pr

### Processing Source Table with Historical Data

The most common case for SCD Type 2 is creating history for a table that it doesn't have it already.
The most common case for SCD Type 2 is creating history for a table that it doesn't have it already.
In the example of the restaurant menu, the menu just tells you what is offered right now, but you want to know what was offered over time.
In this case, the default setting of `None` for `batch_size` is the best option.

Another use case though is processing a source table that already has history in it.
Another use case though is processing a source table that already has history in it.
A common example of this is a "daily snapshot" table that is created by a source system that takes a snapshot of the data at the end of each day.
If your source table has historical records, like a "daily snapshot" table, then set `batch_size` to `1` to process each interval (each day if a `@daily` cron) in sequential order.
That way the historical records will be properly captured in the SCD Type 2 table.
Expand Down Expand Up @@ -1439,14 +1433,11 @@ GROUP BY
id
```

### SCD Type 2 Restatements
### Reset SCD Type 2 Model (clearing history)

SCD Type 2 models are designed by default to protect the data that has been captured because it is not possible to recreate the history once it has been lost.
However, there are cases where you may want to clear the history and start fresh.

#### Enabling Restatements

To enable restatements for an SCD Type 2 model, set `disable_restatement` to `false` in the model definition:
For this use use case you will want to start by setting `disable_restatement` to `false` in the model definition.

```sql linenums="1" hl_lines="5"
MODEL (
Expand All @@ -1458,39 +1449,16 @@ MODEL (
);
```

#### Full Restatements (Clearing All History)

To clear all history and recreate the entire table from scratch:
Plan/apply this change to production.
Then you will want to [restate the model](../plans.md#restatement-plans).

```bash
sqlmesh plan --restate-model db.menu_items
```

!!! warning

This will remove **all** historical data on the model which in most situations cannot be recovered.

#### Partial Restatements (From a Specific Date)

You can restate data from a specific start date onwards. This will:
- Delete all records with `valid_from >= start_date`
- Reprocess the data from the start date to the latest interval

```bash
sqlmesh plan --restate-model db.menu_items --start "2023-01-15"
```

!!! note

If you specify an end date for SCD Type 2 restatements, it will be ignored and automatically set to the latest interval's end date.

```bash
# This end date will be ignored and set to the latest interval
sqlmesh plan --restate-model db.menu_items --start "2023-01-15" --end "2023-01-20"
```


#### Re-enabling Protection
This will remove the historical data on the model which in most situations cannot be recovered.

Once complete you will want to remove `disable_restatement` on the model definition which will set it back to `true` and prevent accidental data loss.

Expand Down
47 changes: 2 additions & 45 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1497,7 +1497,6 @@ def scd_type_2_by_time(
table_description: t.Optional[str] = None,
column_descriptions: t.Optional[t.Dict[str, str]] = None,
truncate: bool = False,
is_restatement: bool = False,
**kwargs: t.Any,
) -> None:
self._scd_type_2(
Expand All @@ -1514,7 +1513,6 @@ def scd_type_2_by_time(
table_description=table_description,
column_descriptions=column_descriptions,
truncate=truncate,
is_restatement=is_restatement,
**kwargs,
)

Expand All @@ -1533,7 +1531,6 @@ def scd_type_2_by_column(
table_description: t.Optional[str] = None,
column_descriptions: t.Optional[t.Dict[str, str]] = None,
truncate: bool = False,
is_restatement: bool = False,
**kwargs: t.Any,
) -> None:
self._scd_type_2(
Expand All @@ -1550,7 +1547,6 @@ def scd_type_2_by_column(
table_description=table_description,
column_descriptions=column_descriptions,
truncate=truncate,
is_restatement=is_restatement,
**kwargs,
)

Expand All @@ -1561,7 +1557,6 @@ def _scd_type_2(
unique_key: t.Sequence[exp.Expression],
valid_from_col: exp.Column,
valid_to_col: exp.Column,
start: TimeLike,
execution_time: t.Union[TimeLike, exp.Column],
invalidate_hard_deletes: bool = True,
updated_at_col: t.Optional[exp.Column] = None,
Expand All @@ -1572,7 +1567,6 @@ def _scd_type_2(
table_description: t.Optional[str] = None,
column_descriptions: t.Optional[t.Dict[str, str]] = None,
truncate: bool = False,
is_restatement: bool = False,
**kwargs: t.Any,
) -> None:
def remove_managed_columns(
Expand Down Expand Up @@ -1757,17 +1751,9 @@ def remove_managed_columns(
existing_rows_query = exp.select(*table_columns, exp.true().as_("_exists")).from_(
target_table
)

if truncate:
existing_rows_query = existing_rows_query.limit(0)

# Only set cleanup_ts if is_restatement is True and truncate is False (this to enable full restatement)
cleanup_ts = (
to_time_column(start, time_data_type, self.dialect, nullable=True)
if is_restatement and not truncate
else None
)

with source_queries[0] as source_query:
prefixed_columns_to_types = []
for column in columns_to_types:
Expand Down Expand Up @@ -1804,41 +1790,12 @@ def remove_managed_columns(
# Historical Records that Do Not Change
.with_(
"static",
existing_rows_query.where(valid_to_col.is_(exp.Null()).not_())
if cleanup_ts is None
else existing_rows_query.where(
exp.and_(
valid_to_col.is_(exp.Null().not_()),
valid_to_col < cleanup_ts,
),
),
existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()),
)
# Latest Records that can be updated
.with_(
"latest",
existing_rows_query.where(valid_to_col.is_(exp.Null()))
if cleanup_ts is None
else exp.select(
*(
to_time_column(
exp.null(), time_data_type, self.dialect, nullable=True
).as_(col)
if col == valid_to_col.name
else exp.column(col)
for col in columns_to_types
),
exp.true().as_("_exists"),
)
.from_(target_table)
.where(
exp.and_(
valid_from_col <= cleanup_ts,
exp.or_(
valid_to_col.is_(exp.null()),
valid_to_col >= cleanup_ts,
),
)
),
existing_rows_query.where(valid_to_col.is_(exp.Null())),
)
# Deleted records which can be used to determine `valid_from` for undeleted source records
.with_(
Expand Down
4 changes: 0 additions & 4 deletions sqlmesh/core/engine_adapter/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ def _scd_type_2(
unique_key: t.Sequence[exp.Expression],
valid_from_col: exp.Column,
valid_to_col: exp.Column,
start: TimeLike,
execution_time: t.Union[TimeLike, exp.Column],
invalidate_hard_deletes: bool = True,
updated_at_col: t.Optional[exp.Column] = None,
Expand All @@ -267,7 +266,6 @@ def _scd_type_2(
table_description: t.Optional[str] = None,
column_descriptions: t.Optional[t.Dict[str, str]] = None,
truncate: bool = False,
is_restatement: bool = False,
**kwargs: t.Any,
) -> None:
if columns_to_types and self.current_catalog_type == "delta_lake":
Expand All @@ -279,7 +277,6 @@ def _scd_type_2(
unique_key,
valid_from_col,
valid_to_col,
start,
execution_time,
invalidate_hard_deletes,
updated_at_col,
Expand All @@ -290,7 +287,6 @@ def _scd_type_2(
table_description,
column_descriptions,
truncate,
is_restatement,
**kwargs,
)

Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/model/kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def full_history_restatement_only(self) -> bool:
self.is_incremental_unmanaged
or self.is_incremental_by_unique_key
or self.is_incremental_by_partition
or self.is_scd_type_2
or self.is_managed
or self.is_full
or self.is_view
Expand Down
6 changes: 0 additions & 6 deletions sqlmesh/core/plan/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,6 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
return

scheduler = self.create_scheduler(stage.all_snapshots.values(), self.snapshot_evaluator)
# Convert model name restatements to snapshot ID restatements
restatements_by_snapshot_id = {
stage.all_snapshots[name].snapshot_id: interval
for name, interval in plan.restatements.items()
}
errors, _ = scheduler.run_merged_intervals(
merged_intervals=stage.snapshot_to_intervals,
deployability_index=stage.deployability_index,
Expand All @@ -248,7 +243,6 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
circuit_breaker=self._circuit_breaker,
start=plan.start,
end=plan.end,
restatements=restatements_by_snapshot_id,
)
if errors:
raise PlanError("Plan application failed.")
Expand Down
9 changes: 0 additions & 9 deletions sqlmesh/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ def evaluate(
deployability_index: DeployabilityIndex,
batch_index: int,
environment_naming_info: t.Optional[EnvironmentNamingInfo] = None,
is_restatement: bool = False,
**kwargs: t.Any,
) -> t.List[AuditResult]:
"""Evaluate a snapshot and add the processed interval to the state sync.
Expand Down Expand Up @@ -193,7 +192,6 @@ def evaluate(
snapshots=snapshots,
deployability_index=deployability_index,
batch_index=batch_index,
is_restatement=is_restatement,
**kwargs,
)
audit_results = self._audit_snapshot(
Expand Down Expand Up @@ -373,7 +371,6 @@ def run_merged_intervals(
end: t.Optional[TimeLike] = None,
run_environment_statements: bool = False,
audit_only: bool = False,
restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
"""Runs precomputed batches of missing intervals.

Expand Down Expand Up @@ -450,10 +447,6 @@ def evaluate_node(node: SchedulingUnit) -> None:
execution_time=execution_time,
)
else:
# Determine if this snapshot and interval is a restatement (for SCD type 2)
is_restatement = (
restatements is not None and snapshot.snapshot_id in restatements
)
audit_results = self.evaluate(
snapshot=snapshot,
environment_naming_info=environment_naming_info,
Expand All @@ -462,7 +455,6 @@ def evaluate_node(node: SchedulingUnit) -> None:
execution_time=execution_time,
deployability_index=deployability_index,
batch_index=batch_idx,
is_restatement=is_restatement,
)

evaluation_duration_ms = now_timestamp() - execution_start_ts
Expand Down Expand Up @@ -671,7 +663,6 @@ def _run_or_audit(
end=end,
run_environment_statements=run_environment_statements,
audit_only=audit_only,
restatements=remove_intervals,
)

return CompletionStatus.FAILURE if errors else CompletionStatus.SUCCESS
Expand Down
15 changes: 0 additions & 15 deletions sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,21 +837,6 @@ def get_removal_interval(

removal_interval = expanded_removal_interval

# SCD Type 2 validation that end date is the latest interval if it was provided
if not is_preview and self.is_scd_type_2 and self.intervals:
requested_start, requested_end = removal_interval
latest_end = self.intervals[-1][1]
if requested_end < latest_end:
from sqlmesh.core.console import get_console

get_console().log_warning(
f"SCD Type 2 model '{self.model.name}' does not support end date in restatements.\n"
f"Requested end date [{to_ts(requested_end)}] is less than the latest interval end date.\n"
f"The requested end date will be ignored. Using the latest interval end instead: [{to_ts(latest_end)}]"
)

removal_interval = self.inclusive_exclusive(requested_start, latest_end, strict)

return removal_interval

@property
Expand Down
9 changes: 0 additions & 9 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ def evaluate(
snapshots: t.Dict[str, Snapshot],
deployability_index: t.Optional[DeployabilityIndex] = None,
batch_index: int = 0,
is_restatement: bool = False,
**kwargs: t.Any,
) -> t.Optional[str]:
"""Renders the snapshot's model, executes it and stores the result in the snapshot's physical table.
Expand All @@ -166,7 +165,6 @@ def evaluate(
snapshots,
deployability_index=deployability_index,
batch_index=batch_index,
is_restatement=is_restatement,
**kwargs,
)
if result is None or isinstance(result, str):
Expand Down Expand Up @@ -624,7 +622,6 @@ def _evaluate_snapshot(
limit: t.Optional[int] = None,
deployability_index: t.Optional[DeployabilityIndex] = None,
batch_index: int = 0,
is_restatement: bool = False,
**kwargs: t.Any,
) -> DF | str | None:
"""Renders the snapshot's model and executes it. The return value depends on whether the limit was specified.
Expand Down Expand Up @@ -697,7 +694,6 @@ def apply(query_or_df: QueryOrDF, index: int = 0) -> None:
end=end,
execution_time=execution_time,
physical_properties=rendered_physical_properties,
is_restatement=is_restatement,
)
else:
logger.info(
Expand All @@ -719,7 +715,6 @@ def apply(query_or_df: QueryOrDF, index: int = 0) -> None:
end=end,
execution_time=execution_time,
physical_properties=rendered_physical_properties,
is_restatement=is_restatement,
)

with (
Expand Down Expand Up @@ -1840,8 +1835,6 @@ def insert(
table_description=model.description,
column_descriptions=model.column_descriptions,
truncate=is_first_insert,
start=kwargs["start"],
is_restatement=kwargs.get("is_restatement", False),
)
elif isinstance(model.kind, SCDType2ByColumnKind):
self.adapter.scd_type_2_by_column(
Expand All @@ -1859,8 +1852,6 @@ def insert(
table_description=model.description,
column_descriptions=model.column_descriptions,
truncate=is_first_insert,
start=kwargs["start"],
is_restatement=kwargs.get("is_restatement", False),
)
else:
raise SQLMeshError(
Expand Down
Loading