Skip to content

Commit 74add5a

Browse files
Revert: Partial restatements for scd type 2 models
1 parent 803f7d8 commit 74add5a

File tree

15 files changed

+348
-639
lines changed

15 files changed

+348
-639
lines changed

docs/concepts/models/model_kinds.md

Lines changed: 8 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -935,13 +935,7 @@ SQLMesh achieves this by adding a `valid_from` and `valid_to` column to your mod
935935

936936
Therefore, you can use these models to not only tell you what the latest value is for a given record but also what the values were anytime in the past. Note that maintaining this history does come at a cost of increased storage and compute and this may not be a good fit for sources that change frequently since the history could get very large.
937937

938-
**Note**: SCD Type 2 models support [restatements](../plans.md#restatement-plans) with specific limitations:
939-
940-
- **Full restatements**: The entire table will be recreated from scratch when no start date is specified
941-
- **Partial restatements**: You can specify a start date to restate data from a certain point onwards to the latest interval. The end date will always be set to the latest interval's end date, regardless of what end date you specify
942-
- **Partial sections**: Restatements of specific sections (discontinued ranges) of the table are not supported
943-
944-
Data restatement is disabled for models of this kind by default (`disable_restatement true`). To enable restatements, set `disable_restatement false` in your model configuration.
938+
**Note**: Partial data [restatement](../plans.md#restatement-plans) is not supported for this model kind, which means that the entire table will be recreated from scratch if restated. This may lead to data loss, so data restatement is disabled for models of this kind by default.
945939

946940
There are two ways to tracking changes: By Time (Recommended) or By Column.
947941

@@ -1289,11 +1283,11 @@ This is the most accurate representation of the menu based on the source data pr
12891283

12901284
### Processing Source Table with Historical Data
12911285

1292-
The most common case for SCD Type 2 is creating history for a table that it doesn't have it already.
1286+
The most common case for SCD Type 2 is creating history for a table that it doesn't have it already.
12931287
In the example of the restaurant menu, the menu just tells you what is offered right now, but you want to know what was offered over time.
12941288
In this case, the default setting of `None` for `batch_size` is the best option.
12951289

1296-
Another use case though is processing a source table that already has history in it.
1290+
Another use case though is processing a source table that already has history in it.
12971291
A common example of this is a "daily snapshot" table that is created by a source system that takes a snapshot of the data at the end of each day.
12981292
If your source table has historical records, like a "daily snapshot" table, then set `batch_size` to `1` to process each interval (each day if a `@daily` cron) in sequential order.
12991293
That way the historical records will be properly captured in the SCD Type 2 table.
@@ -1439,14 +1433,11 @@ GROUP BY
14391433
id
14401434
```
14411435

1442-
### SCD Type 2 Restatements
1436+
### Reset SCD Type 2 Model (clearing history)
14431437

14441438
SCD Type 2 models are designed by default to protect the data that has been captured because it is not possible to recreate the history once it has been lost.
14451439
However, there are cases where you may want to clear the history and start fresh.
1446-
1447-
#### Enabling Restatements
1448-
1449-
To enable restatements for an SCD Type 2 model, set `disable_restatement` to `false` in the model definition:
1440+
For this use use case you will want to start by setting `disable_restatement` to `false` in the model definition.
14501441

14511442
```sql linenums="1" hl_lines="5"
14521443
MODEL (
@@ -1458,39 +1449,16 @@ MODEL (
14581449
);
14591450
```
14601451

1461-
#### Full Restatements (Clearing All History)
1462-
1463-
To clear all history and recreate the entire table from scratch:
1452+
Plan/apply this change to production.
1453+
Then you will want to [restate the model](../plans.md#restatement-plans).
14641454

14651455
```bash
14661456
sqlmesh plan --restate-model db.menu_items
14671457
```
14681458

14691459
!!! warning
14701460

1471-
This will remove **all** historical data on the model which in most situations cannot be recovered.
1472-
1473-
#### Partial Restatements (From a Specific Date)
1474-
1475-
You can restate data from a specific start date onwards. This will:
1476-
- Delete all records with `valid_from >= start_date`
1477-
- Reprocess the data from the start date to the latest interval
1478-
1479-
```bash
1480-
sqlmesh plan --restate-model db.menu_items --start "2023-01-15"
1481-
```
1482-
1483-
!!! note
1484-
1485-
If you specify an end date for SCD Type 2 restatements, it will be ignored and automatically set to the latest interval's end date.
1486-
1487-
```bash
1488-
# This end date will be ignored and set to the latest interval
1489-
sqlmesh plan --restate-model db.menu_items --start "2023-01-15" --end "2023-01-20"
1490-
```
1491-
1492-
1493-
#### Re-enabling Protection
1461+
This will remove the historical data on the model which in most situations cannot be recovered.
14941462

14951463
Once complete you will want to remove `disable_restatement` on the model definition which will set it back to `true` and prevent accidental data loss.
14961464

sqlmesh/core/engine_adapter/base.py

Lines changed: 2 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1497,7 +1497,6 @@ def scd_type_2_by_time(
14971497
table_description: t.Optional[str] = None,
14981498
column_descriptions: t.Optional[t.Dict[str, str]] = None,
14991499
truncate: bool = False,
1500-
is_restatement: bool = False,
15011500
**kwargs: t.Any,
15021501
) -> None:
15031502
self._scd_type_2(
@@ -1514,7 +1513,6 @@ def scd_type_2_by_time(
15141513
table_description=table_description,
15151514
column_descriptions=column_descriptions,
15161515
truncate=truncate,
1517-
is_restatement=is_restatement,
15181516
**kwargs,
15191517
)
15201518

@@ -1533,7 +1531,6 @@ def scd_type_2_by_column(
15331531
table_description: t.Optional[str] = None,
15341532
column_descriptions: t.Optional[t.Dict[str, str]] = None,
15351533
truncate: bool = False,
1536-
is_restatement: bool = False,
15371534
**kwargs: t.Any,
15381535
) -> None:
15391536
self._scd_type_2(
@@ -1550,7 +1547,6 @@ def scd_type_2_by_column(
15501547
table_description=table_description,
15511548
column_descriptions=column_descriptions,
15521549
truncate=truncate,
1553-
is_restatement=is_restatement,
15541550
**kwargs,
15551551
)
15561552

@@ -1561,7 +1557,6 @@ def _scd_type_2(
15611557
unique_key: t.Sequence[exp.Expression],
15621558
valid_from_col: exp.Column,
15631559
valid_to_col: exp.Column,
1564-
start: TimeLike,
15651560
execution_time: t.Union[TimeLike, exp.Column],
15661561
invalidate_hard_deletes: bool = True,
15671562
updated_at_col: t.Optional[exp.Column] = None,
@@ -1572,7 +1567,6 @@ def _scd_type_2(
15721567
table_description: t.Optional[str] = None,
15731568
column_descriptions: t.Optional[t.Dict[str, str]] = None,
15741569
truncate: bool = False,
1575-
is_restatement: bool = False,
15761570
**kwargs: t.Any,
15771571
) -> None:
15781572
def remove_managed_columns(
@@ -1757,17 +1751,9 @@ def remove_managed_columns(
17571751
existing_rows_query = exp.select(*table_columns, exp.true().as_("_exists")).from_(
17581752
target_table
17591753
)
1760-
17611754
if truncate:
17621755
existing_rows_query = existing_rows_query.limit(0)
17631756

1764-
# Only set cleanup_ts if is_restatement is True and truncate is False (this to enable full restatement)
1765-
cleanup_ts = (
1766-
to_time_column(start, time_data_type, self.dialect, nullable=True)
1767-
if is_restatement and not truncate
1768-
else None
1769-
)
1770-
17711757
with source_queries[0] as source_query:
17721758
prefixed_columns_to_types = []
17731759
for column in columns_to_types:
@@ -1804,41 +1790,12 @@ def remove_managed_columns(
18041790
# Historical Records that Do Not Change
18051791
.with_(
18061792
"static",
1807-
existing_rows_query.where(valid_to_col.is_(exp.Null()).not_())
1808-
if cleanup_ts is None
1809-
else existing_rows_query.where(
1810-
exp.and_(
1811-
valid_to_col.is_(exp.Null().not_()),
1812-
valid_to_col < cleanup_ts,
1813-
),
1814-
),
1793+
existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()),
18151794
)
18161795
# Latest Records that can be updated
18171796
.with_(
18181797
"latest",
1819-
existing_rows_query.where(valid_to_col.is_(exp.Null()))
1820-
if cleanup_ts is None
1821-
else exp.select(
1822-
*(
1823-
to_time_column(
1824-
exp.null(), time_data_type, self.dialect, nullable=True
1825-
).as_(col)
1826-
if col == valid_to_col.name
1827-
else exp.column(col)
1828-
for col in columns_to_types
1829-
),
1830-
exp.true().as_("_exists"),
1831-
)
1832-
.from_(target_table)
1833-
.where(
1834-
exp.and_(
1835-
valid_from_col <= cleanup_ts,
1836-
exp.or_(
1837-
valid_to_col.is_(exp.null()),
1838-
valid_to_col >= cleanup_ts,
1839-
),
1840-
)
1841-
),
1798+
existing_rows_query.where(valid_to_col.is_(exp.Null())),
18421799
)
18431800
# Deleted records which can be used to determine `valid_from` for undeleted source records
18441801
.with_(

sqlmesh/core/engine_adapter/trino.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,6 @@ def _scd_type_2(
256256
unique_key: t.Sequence[exp.Expression],
257257
valid_from_col: exp.Column,
258258
valid_to_col: exp.Column,
259-
start: TimeLike,
260259
execution_time: t.Union[TimeLike, exp.Column],
261260
invalidate_hard_deletes: bool = True,
262261
updated_at_col: t.Optional[exp.Column] = None,
@@ -267,7 +266,6 @@ def _scd_type_2(
267266
table_description: t.Optional[str] = None,
268267
column_descriptions: t.Optional[t.Dict[str, str]] = None,
269268
truncate: bool = False,
270-
is_restatement: bool = False,
271269
**kwargs: t.Any,
272270
) -> None:
273271
if columns_to_types and self.current_catalog_type == "delta_lake":
@@ -279,7 +277,6 @@ def _scd_type_2(
279277
unique_key,
280278
valid_from_col,
281279
valid_to_col,
282-
start,
283280
execution_time,
284281
invalidate_hard_deletes,
285282
updated_at_col,
@@ -290,7 +287,6 @@ def _scd_type_2(
290287
table_description,
291288
column_descriptions,
292289
truncate,
293-
is_restatement,
294290
**kwargs,
295291
)
296292

sqlmesh/core/model/kind.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ def full_history_restatement_only(self) -> bool:
141141
self.is_incremental_unmanaged
142142
or self.is_incremental_by_unique_key
143143
or self.is_incremental_by_partition
144+
or self.is_scd_type_2
144145
or self.is_managed
145146
or self.is_full
146147
or self.is_view

sqlmesh/core/plan/evaluator.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,6 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
235235
return
236236

237237
scheduler = self.create_scheduler(stage.all_snapshots.values(), self.snapshot_evaluator)
238-
# Convert model name restatements to snapshot ID restatements
239-
restatements_by_snapshot_id = {
240-
stage.all_snapshots[name].snapshot_id: interval
241-
for name, interval in plan.restatements.items()
242-
}
243238
errors, _ = scheduler.run_merged_intervals(
244239
merged_intervals=stage.snapshot_to_intervals,
245240
deployability_index=stage.deployability_index,
@@ -248,7 +243,6 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
248243
circuit_breaker=self._circuit_breaker,
249244
start=plan.start,
250245
end=plan.end,
251-
restatements=restatements_by_snapshot_id,
252246
)
253247
if errors:
254248
raise PlanError("Plan application failed.")

sqlmesh/core/scheduler.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ def evaluate(
161161
deployability_index: DeployabilityIndex,
162162
batch_index: int,
163163
environment_naming_info: t.Optional[EnvironmentNamingInfo] = None,
164-
is_restatement: bool = False,
165164
**kwargs: t.Any,
166165
) -> t.List[AuditResult]:
167166
"""Evaluate a snapshot and add the processed interval to the state sync.
@@ -193,7 +192,6 @@ def evaluate(
193192
snapshots=snapshots,
194193
deployability_index=deployability_index,
195194
batch_index=batch_index,
196-
is_restatement=is_restatement,
197195
**kwargs,
198196
)
199197
audit_results = self._audit_snapshot(
@@ -373,7 +371,6 @@ def run_merged_intervals(
373371
end: t.Optional[TimeLike] = None,
374372
run_environment_statements: bool = False,
375373
audit_only: bool = False,
376-
restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
377374
) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
378375
"""Runs precomputed batches of missing intervals.
379376
@@ -450,10 +447,6 @@ def evaluate_node(node: SchedulingUnit) -> None:
450447
execution_time=execution_time,
451448
)
452449
else:
453-
# Determine if this snapshot and interval is a restatement (for SCD type 2)
454-
is_restatement = (
455-
restatements is not None and snapshot.snapshot_id in restatements
456-
)
457450
audit_results = self.evaluate(
458451
snapshot=snapshot,
459452
environment_naming_info=environment_naming_info,
@@ -462,7 +455,6 @@ def evaluate_node(node: SchedulingUnit) -> None:
462455
execution_time=execution_time,
463456
deployability_index=deployability_index,
464457
batch_index=batch_idx,
465-
is_restatement=is_restatement,
466458
)
467459

468460
evaluation_duration_ms = now_timestamp() - execution_start_ts
@@ -671,7 +663,6 @@ def _run_or_audit(
671663
end=end,
672664
run_environment_statements=run_environment_statements,
673665
audit_only=audit_only,
674-
restatements=remove_intervals,
675666
)
676667

677668
return CompletionStatus.FAILURE if errors else CompletionStatus.SUCCESS

sqlmesh/core/snapshot/definition.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -837,21 +837,6 @@ def get_removal_interval(
837837

838838
removal_interval = expanded_removal_interval
839839

840-
# SCD Type 2 validation that end date is the latest interval if it was provided
841-
if not is_preview and self.is_scd_type_2 and self.intervals:
842-
requested_start, requested_end = removal_interval
843-
latest_end = self.intervals[-1][1]
844-
if requested_end < latest_end:
845-
from sqlmesh.core.console import get_console
846-
847-
get_console().log_warning(
848-
f"SCD Type 2 model '{self.model.name}' does not support end date in restatements.\n"
849-
f"Requested end date [{to_ts(requested_end)}] is less than the latest interval end date.\n"
850-
f"The requested end date will be ignored. Using the latest interval end instead: [{to_ts(latest_end)}]"
851-
)
852-
853-
removal_interval = self.inclusive_exclusive(requested_start, latest_end, strict)
854-
855840
return removal_interval
856841

857842
@property

sqlmesh/core/snapshot/evaluator.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ def evaluate(
140140
snapshots: t.Dict[str, Snapshot],
141141
deployability_index: t.Optional[DeployabilityIndex] = None,
142142
batch_index: int = 0,
143-
is_restatement: bool = False,
144143
**kwargs: t.Any,
145144
) -> t.Optional[str]:
146145
"""Renders the snapshot's model, executes it and stores the result in the snapshot's physical table.
@@ -166,7 +165,6 @@ def evaluate(
166165
snapshots,
167166
deployability_index=deployability_index,
168167
batch_index=batch_index,
169-
is_restatement=is_restatement,
170168
**kwargs,
171169
)
172170
if result is None or isinstance(result, str):
@@ -624,7 +622,6 @@ def _evaluate_snapshot(
624622
limit: t.Optional[int] = None,
625623
deployability_index: t.Optional[DeployabilityIndex] = None,
626624
batch_index: int = 0,
627-
is_restatement: bool = False,
628625
**kwargs: t.Any,
629626
) -> DF | str | None:
630627
"""Renders the snapshot's model and executes it. The return value depends on whether the limit was specified.
@@ -697,7 +694,6 @@ def apply(query_or_df: QueryOrDF, index: int = 0) -> None:
697694
end=end,
698695
execution_time=execution_time,
699696
physical_properties=rendered_physical_properties,
700-
is_restatement=is_restatement,
701697
)
702698
else:
703699
logger.info(
@@ -719,7 +715,6 @@ def apply(query_or_df: QueryOrDF, index: int = 0) -> None:
719715
end=end,
720716
execution_time=execution_time,
721717
physical_properties=rendered_physical_properties,
722-
is_restatement=is_restatement,
723718
)
724719

725720
with (
@@ -1840,8 +1835,6 @@ def insert(
18401835
table_description=model.description,
18411836
column_descriptions=model.column_descriptions,
18421837
truncate=is_first_insert,
1843-
start=kwargs["start"],
1844-
is_restatement=kwargs.get("is_restatement", False),
18451838
)
18461839
elif isinstance(model.kind, SCDType2ByColumnKind):
18471840
self.adapter.scd_type_2_by_column(
@@ -1859,8 +1852,6 @@ def insert(
18591852
table_description=model.description,
18601853
column_descriptions=model.column_descriptions,
18611854
truncate=is_first_insert,
1862-
start=kwargs["start"],
1863-
is_restatement=kwargs.get("is_restatement", False),
18641855
)
18651856
else:
18661857
raise SQLMeshError(

0 commit comments

Comments
 (0)