diff --git a/docs/concepts/models/model_kinds.md b/docs/concepts/models/model_kinds.md index e748313c81..d01cc738a6 100644 --- a/docs/concepts/models/model_kinds.md +++ b/docs/concepts/models/model_kinds.md @@ -935,13 +935,7 @@ SQLMesh achieves this by adding a `valid_from` and `valid_to` column to your mod Therefore, you can use these models to not only tell you what the latest value is for a given record but also what the values were anytime in the past. Note that maintaining this history does come at a cost of increased storage and compute and this may not be a good fit for sources that change frequently since the history could get very large. -**Note**: SCD Type 2 models support [restatements](../plans.md#restatement-plans) with specific limitations: - -- **Full restatements**: The entire table will be recreated from scratch when no start date is specified -- **Partial restatements**: You can specify a start date to restate data from a certain point onwards to the latest interval. The end date will always be set to the latest interval's end date, regardless of what end date you specify -- **Partial sections**: Restatements of specific sections (discontinued ranges) of the table are not supported - -Data restatement is disabled for models of this kind by default (`disable_restatement true`). To enable restatements, set `disable_restatement false` in your model configuration. +**Note**: Partial data [restatement](../plans.md#restatement-plans) is not supported for this model kind, which means that the entire table will be recreated from scratch if restated. This may lead to data loss, so data restatement is disabled for models of this kind by default. There are two ways to tracking changes: By Time (Recommended) or By Column. @@ -1289,11 +1283,11 @@ This is the most accurate representation of the menu based on the source data pr ### Processing Source Table with Historical Data -The most common case for SCD Type 2 is creating history for a table that it doesn't have it already. +The most common case for SCD Type 2 is creating history for a table that it doesn't have it already. In the example of the restaurant menu, the menu just tells you what is offered right now, but you want to know what was offered over time. In this case, the default setting of `None` for `batch_size` is the best option. -Another use case though is processing a source table that already has history in it. +Another use case though is processing a source table that already has history in it. A common example of this is a "daily snapshot" table that is created by a source system that takes a snapshot of the data at the end of each day. If your source table has historical records, like a "daily snapshot" table, then set `batch_size` to `1` to process each interval (each day if a `@daily` cron) in sequential order. That way the historical records will be properly captured in the SCD Type 2 table. @@ -1439,14 +1433,11 @@ GROUP BY id ``` -### SCD Type 2 Restatements +### Reset SCD Type 2 Model (clearing history) SCD Type 2 models are designed by default to protect the data that has been captured because it is not possible to recreate the history once it has been lost. However, there are cases where you may want to clear the history and start fresh. - -#### Enabling Restatements - -To enable restatements for an SCD Type 2 model, set `disable_restatement` to `false` in the model definition: +For this use use case you will want to start by setting `disable_restatement` to `false` in the model definition. ```sql linenums="1" hl_lines="5" MODEL ( @@ -1458,9 +1449,8 @@ MODEL ( ); ``` -#### Full Restatements (Clearing All History) - -To clear all history and recreate the entire table from scratch: +Plan/apply this change to production. +Then you will want to [restate the model](../plans.md#restatement-plans). ```bash sqlmesh plan --restate-model db.menu_items @@ -1468,29 +1458,7 @@ sqlmesh plan --restate-model db.menu_items !!! warning - This will remove **all** historical data on the model which in most situations cannot be recovered. - -#### Partial Restatements (From a Specific Date) - -You can restate data from a specific start date onwards. This will: -- Delete all records with `valid_from >= start_date` -- Reprocess the data from the start date to the latest interval - -```bash -sqlmesh plan --restate-model db.menu_items --start "2023-01-15" -``` - -!!! note - - If you specify an end date for SCD Type 2 restatements, it will be ignored and automatically set to the latest interval's end date. - -```bash -# This end date will be ignored and set to the latest interval -sqlmesh plan --restate-model db.menu_items --start "2023-01-15" --end "2023-01-20" -``` - - -#### Re-enabling Protection + This will remove the historical data on the model which in most situations cannot be recovered. Once complete you will want to remove `disable_restatement` on the model definition which will set it back to `true` and prevent accidental data loss. diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index d401f0e705..4651caa6ec 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -1497,7 +1497,6 @@ def scd_type_2_by_time( table_description: t.Optional[str] = None, column_descriptions: t.Optional[t.Dict[str, str]] = None, truncate: bool = False, - is_restatement: bool = False, **kwargs: t.Any, ) -> None: self._scd_type_2( @@ -1514,7 +1513,6 @@ def scd_type_2_by_time( table_description=table_description, column_descriptions=column_descriptions, truncate=truncate, - is_restatement=is_restatement, **kwargs, ) @@ -1533,7 +1531,6 @@ def scd_type_2_by_column( table_description: t.Optional[str] = None, column_descriptions: t.Optional[t.Dict[str, str]] = None, truncate: bool = False, - is_restatement: bool = False, **kwargs: t.Any, ) -> None: self._scd_type_2( @@ -1550,7 +1547,6 @@ def scd_type_2_by_column( table_description=table_description, column_descriptions=column_descriptions, truncate=truncate, - is_restatement=is_restatement, **kwargs, ) @@ -1561,7 +1557,6 @@ def _scd_type_2( unique_key: t.Sequence[exp.Expression], valid_from_col: exp.Column, valid_to_col: exp.Column, - start: TimeLike, execution_time: t.Union[TimeLike, exp.Column], invalidate_hard_deletes: bool = True, updated_at_col: t.Optional[exp.Column] = None, @@ -1572,7 +1567,6 @@ def _scd_type_2( table_description: t.Optional[str] = None, column_descriptions: t.Optional[t.Dict[str, str]] = None, truncate: bool = False, - is_restatement: bool = False, **kwargs: t.Any, ) -> None: def remove_managed_columns( @@ -1757,17 +1751,9 @@ def remove_managed_columns( existing_rows_query = exp.select(*table_columns, exp.true().as_("_exists")).from_( target_table ) - if truncate: existing_rows_query = existing_rows_query.limit(0) - # Only set cleanup_ts if is_restatement is True and truncate is False (this to enable full restatement) - cleanup_ts = ( - to_time_column(start, time_data_type, self.dialect, nullable=True) - if is_restatement and not truncate - else None - ) - with source_queries[0] as source_query: prefixed_columns_to_types = [] for column in columns_to_types: @@ -1804,41 +1790,12 @@ def remove_managed_columns( # Historical Records that Do Not Change .with_( "static", - existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()) - if cleanup_ts is None - else existing_rows_query.where( - exp.and_( - valid_to_col.is_(exp.Null().not_()), - valid_to_col < cleanup_ts, - ), - ), + existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()), ) # Latest Records that can be updated .with_( "latest", - existing_rows_query.where(valid_to_col.is_(exp.Null())) - if cleanup_ts is None - else exp.select( - *( - to_time_column( - exp.null(), time_data_type, self.dialect, nullable=True - ).as_(col) - if col == valid_to_col.name - else exp.column(col) - for col in columns_to_types - ), - exp.true().as_("_exists"), - ) - .from_(target_table) - .where( - exp.and_( - valid_from_col <= cleanup_ts, - exp.or_( - valid_to_col.is_(exp.null()), - valid_to_col >= cleanup_ts, - ), - ) - ), + existing_rows_query.where(valid_to_col.is_(exp.Null())), ) # Deleted records which can be used to determine `valid_from` for undeleted source records .with_( diff --git a/sqlmesh/core/engine_adapter/trino.py b/sqlmesh/core/engine_adapter/trino.py index 7862bfca2d..df8e45b520 100644 --- a/sqlmesh/core/engine_adapter/trino.py +++ b/sqlmesh/core/engine_adapter/trino.py @@ -256,7 +256,6 @@ def _scd_type_2( unique_key: t.Sequence[exp.Expression], valid_from_col: exp.Column, valid_to_col: exp.Column, - start: TimeLike, execution_time: t.Union[TimeLike, exp.Column], invalidate_hard_deletes: bool = True, updated_at_col: t.Optional[exp.Column] = None, @@ -267,7 +266,6 @@ def _scd_type_2( table_description: t.Optional[str] = None, column_descriptions: t.Optional[t.Dict[str, str]] = None, truncate: bool = False, - is_restatement: bool = False, **kwargs: t.Any, ) -> None: if columns_to_types and self.current_catalog_type == "delta_lake": @@ -279,7 +277,6 @@ def _scd_type_2( unique_key, valid_from_col, valid_to_col, - start, execution_time, invalidate_hard_deletes, updated_at_col, @@ -290,7 +287,6 @@ def _scd_type_2( table_description, column_descriptions, truncate, - is_restatement, **kwargs, ) diff --git a/sqlmesh/core/model/kind.py b/sqlmesh/core/model/kind.py index c297d916d5..185556fc8f 100644 --- a/sqlmesh/core/model/kind.py +++ b/sqlmesh/core/model/kind.py @@ -141,6 +141,7 @@ def full_history_restatement_only(self) -> bool: self.is_incremental_unmanaged or self.is_incremental_by_unique_key or self.is_incremental_by_partition + or self.is_scd_type_2 or self.is_managed or self.is_full or self.is_view diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index 39a5d69355..ced4631b99 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -235,11 +235,6 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla return scheduler = self.create_scheduler(stage.all_snapshots.values(), self.snapshot_evaluator) - # Convert model name restatements to snapshot ID restatements - restatements_by_snapshot_id = { - stage.all_snapshots[name].snapshot_id: interval - for name, interval in plan.restatements.items() - } errors, _ = scheduler.run_merged_intervals( merged_intervals=stage.snapshot_to_intervals, deployability_index=stage.deployability_index, @@ -248,7 +243,6 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla circuit_breaker=self._circuit_breaker, start=plan.start, end=plan.end, - restatements=restatements_by_snapshot_id, ) if errors: raise PlanError("Plan application failed.") diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 7177efe927..4582b24485 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -161,7 +161,6 @@ def evaluate( deployability_index: DeployabilityIndex, batch_index: int, environment_naming_info: t.Optional[EnvironmentNamingInfo] = None, - is_restatement: bool = False, **kwargs: t.Any, ) -> t.List[AuditResult]: """Evaluate a snapshot and add the processed interval to the state sync. @@ -193,7 +192,6 @@ def evaluate( snapshots=snapshots, deployability_index=deployability_index, batch_index=batch_index, - is_restatement=is_restatement, **kwargs, ) audit_results = self._audit_snapshot( @@ -373,7 +371,6 @@ def run_merged_intervals( end: t.Optional[TimeLike] = None, run_environment_statements: bool = False, audit_only: bool = False, - restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, ) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]: """Runs precomputed batches of missing intervals. @@ -450,10 +447,6 @@ def evaluate_node(node: SchedulingUnit) -> None: execution_time=execution_time, ) else: - # Determine if this snapshot and interval is a restatement (for SCD type 2) - is_restatement = ( - restatements is not None and snapshot.snapshot_id in restatements - ) audit_results = self.evaluate( snapshot=snapshot, environment_naming_info=environment_naming_info, @@ -462,7 +455,6 @@ def evaluate_node(node: SchedulingUnit) -> None: execution_time=execution_time, deployability_index=deployability_index, batch_index=batch_idx, - is_restatement=is_restatement, ) evaluation_duration_ms = now_timestamp() - execution_start_ts @@ -671,7 +663,6 @@ def _run_or_audit( end=end, run_environment_statements=run_environment_statements, audit_only=audit_only, - restatements=remove_intervals, ) return CompletionStatus.FAILURE if errors else CompletionStatus.SUCCESS diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 076c1efa78..996d539e60 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -837,21 +837,6 @@ def get_removal_interval( removal_interval = expanded_removal_interval - # SCD Type 2 validation that end date is the latest interval if it was provided - if not is_preview and self.is_scd_type_2 and self.intervals: - requested_start, requested_end = removal_interval - latest_end = self.intervals[-1][1] - if requested_end < latest_end: - from sqlmesh.core.console import get_console - - get_console().log_warning( - f"SCD Type 2 model '{self.model.name}' does not support end date in restatements.\n" - f"Requested end date [{to_ts(requested_end)}] is less than the latest interval end date.\n" - f"The requested end date will be ignored. Using the latest interval end instead: [{to_ts(latest_end)}]" - ) - - removal_interval = self.inclusive_exclusive(requested_start, latest_end, strict) - return removal_interval @property diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index a2ec242e37..a6dca27d35 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -140,7 +140,6 @@ def evaluate( snapshots: t.Dict[str, Snapshot], deployability_index: t.Optional[DeployabilityIndex] = None, batch_index: int = 0, - is_restatement: bool = False, **kwargs: t.Any, ) -> t.Optional[str]: """Renders the snapshot's model, executes it and stores the result in the snapshot's physical table. @@ -166,7 +165,6 @@ def evaluate( snapshots, deployability_index=deployability_index, batch_index=batch_index, - is_restatement=is_restatement, **kwargs, ) if result is None or isinstance(result, str): @@ -624,7 +622,6 @@ def _evaluate_snapshot( limit: t.Optional[int] = None, deployability_index: t.Optional[DeployabilityIndex] = None, batch_index: int = 0, - is_restatement: bool = False, **kwargs: t.Any, ) -> DF | str | None: """Renders the snapshot's model and executes it. The return value depends on whether the limit was specified. @@ -697,7 +694,6 @@ def apply(query_or_df: QueryOrDF, index: int = 0) -> None: end=end, execution_time=execution_time, physical_properties=rendered_physical_properties, - is_restatement=is_restatement, ) else: logger.info( @@ -719,7 +715,6 @@ def apply(query_or_df: QueryOrDF, index: int = 0) -> None: end=end, execution_time=execution_time, physical_properties=rendered_physical_properties, - is_restatement=is_restatement, ) with ( @@ -1840,8 +1835,6 @@ def insert( table_description=model.description, column_descriptions=model.column_descriptions, truncate=is_first_insert, - start=kwargs["start"], - is_restatement=kwargs.get("is_restatement", False), ) elif isinstance(model.kind, SCDType2ByColumnKind): self.adapter.scd_type_2_by_column( @@ -1859,8 +1852,6 @@ def insert( table_description=model.description, column_descriptions=model.column_descriptions, truncate=is_first_insert, - start=kwargs["start"], - is_restatement=kwargs.get("is_restatement", False), ) else: raise SQLMeshError( diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index 0739256d6e..039159825b 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -744,7 +744,6 @@ def test_scd_type_2_by_time(ctx_query_and_df: TestContext): columns_to_types=input_schema, table_format=ctx.default_table_format, truncate=True, - start="2022-01-01 00:00:00", ) results = ctx.get_metadata_results() assert len(results.views) == 0 @@ -808,7 +807,6 @@ def test_scd_type_2_by_time(ctx_query_and_df: TestContext): columns_to_types=input_schema, table_format=ctx.default_table_format, truncate=False, - start="2022-01-01 00:00:00", ) results = ctx.get_metadata_results() assert len(results.views) == 0 @@ -901,7 +899,6 @@ def test_scd_type_2_by_column(ctx_query_and_df: TestContext): execution_time_as_valid_from=False, columns_to_types=ctx.columns_to_types, truncate=True, - start="2023-01-01", ) results = ctx.get_metadata_results() assert len(results.views) == 0 @@ -973,7 +970,6 @@ def test_scd_type_2_by_column(ctx_query_and_df: TestContext): execution_time_as_valid_from=False, columns_to_types=ctx.columns_to_types, truncate=False, - start="2023-01-01", ) results = ctx.get_metadata_results() assert len(results.views) == 0 diff --git a/tests/core/engine_adapter/test_base.py b/tests/core/engine_adapter/test_base.py index b760a4f4a1..618d89a445 100644 --- a/tests/core/engine_adapter/test_base.py +++ b/tests/core/engine_adapter/test_base.py @@ -1239,12 +1239,10 @@ def test_scd_type_2_by_time(make_mocked_engine_adapter: t.Callable): "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), - start=datetime(2020, 1, 1, 0, 0, 0), - is_restatement=True, ) assert ( - parse_one(adapter.cursor.execute.call_args[0][0]).sql() + adapter.cursor.execute.call_args[0][0] == parse_one( """ CREATE OR REPLACE TABLE "target" AS @@ -1273,7 +1271,8 @@ def test_scd_type_2_by_time(make_mocked_engine_adapter: t.Callable): "test_valid_to", TRUE AS "_exists" FROM "target" - WHERE NOT "test_valid_to" IS NULL AND "test_valid_to" < CAST('2020-01-01 00:00:00' AS TIMESTAMP) + WHERE + NOT "test_valid_to" IS NULL ), "latest" AS ( SELECT "id", @@ -1281,11 +1280,11 @@ def test_scd_type_2_by_time(make_mocked_engine_adapter: t.Callable): "price", "test_UPDATED_at", "test_valid_from", - CAST(NULL AS TIMESTAMP) AS "test_valid_to", + "test_valid_to", TRUE AS "_exists" FROM "target" - WHERE "test_valid_from" <= CAST('2020-01-01 00:00:00' AS TIMESTAMP) - AND ("test_valid_to" IS NULL OR "test_valid_to" >= CAST('2020-01-01 00:00:00' AS TIMESTAMP)) + WHERE + "test_valid_to" IS NULL ), "deleted" AS ( SELECT "static"."id", @@ -1439,12 +1438,10 @@ def test_scd_type_2_by_time_no_invalidate_hard_deletes(make_mocked_engine_adapte "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), - start=datetime(2020, 1, 1, 0, 0, 0), - is_restatement=True, ) assert ( - parse_one(adapter.cursor.execute.call_args[0][0]).sql() + adapter.cursor.execute.call_args[0][0] == parse_one( """ CREATE OR REPLACE TABLE "target" AS @@ -1473,7 +1470,8 @@ def test_scd_type_2_by_time_no_invalidate_hard_deletes(make_mocked_engine_adapte "test_valid_to", TRUE AS "_exists" FROM "target" - WHERE NOT "test_valid_to" IS NULL AND "test_valid_to" < CAST('2020-01-01 00:00:00' AS TIMESTAMP) + WHERE + NOT "test_valid_to" IS NULL ), "latest" AS ( SELECT "id", @@ -1481,11 +1479,11 @@ def test_scd_type_2_by_time_no_invalidate_hard_deletes(make_mocked_engine_adapte "price", "test_updated_at", "test_valid_from", - CAST(NULL AS TIMESTAMP) AS "test_valid_to", + "test_valid_to", TRUE AS "_exists" FROM "target" - WHERE "test_valid_from" <= CAST('2020-01-01 00:00:00' AS TIMESTAMP) - AND ("test_valid_to" IS NULL OR "test_valid_to" >= CAST('2020-01-01 00:00:00' AS TIMESTAMP)) + WHERE + "test_valid_to" IS NULL ), "deleted" AS ( SELECT "static"."id", @@ -1628,38 +1626,35 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): "test_valid_to": exp.DataType.build("TIMESTAMPTZ"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), - start=datetime(2020, 1, 1, 0, 0, 0), - is_restatement=True, ) assert ( - parse_one(adapter.cursor.execute.call_args[0][0]).sql() + adapter.cursor.execute.call_args[0][0] == parse_one( """ - CREATE OR REPLACE TABLE "target" AS - WITH "source" AS ( - SELECT DISTINCT ON ("id1", "id2") +CREATE OR REPLACE TABLE "target" AS +WITH "source" AS ( + SELECT DISTINCT ON ("id1", "id2") TRUE AS "_exists", "id1", "id2", "name", "price", CAST("test_updated_at" AS TIMESTAMPTZ) AS "test_updated_at" - FROM ( + FROM ( SELECT CAST("id1" AS INT) AS "id1", CAST("id2" AS INT) AS "id2", CAST("name" AS VARCHAR) AS "name", CAST("price" AS DOUBLE) AS "price", - CAST("test_updated_at" AS TIMESTAMPTZ) AS "test_updated_at" + CAST("test_updated_at" AS TIMESTAMPTZ) AS "test_updated_at", FROM (VALUES (1, 4, 'muffins', 4.0, '2020-01-01 10:00:00'), (2, 5, 'chips', 5.0, '2020-01-02 15:00:00'), - (3, 6, 'soda', 6.0, '2020-01-03 12:00:00') - ) AS "t"("id1", "id2", "name", "price", "test_updated_at") - ) AS "raw_source" - ), "static" AS ( - SELECT + (3, 6, 'soda', 6.0, '2020-01-03 12:00:00')) AS "t"("id1", "id2", "name", "price", "test_updated_at") + ) AS "raw_source" +), "static" AS ( + SELECT "id1", "id2", "name", @@ -1668,23 +1663,24 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): "test_valid_from", "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE NOT "test_valid_to" IS NULL AND "test_valid_to" < CAST('2020-01-01 00:00:00+00:00' AS TIMESTAMPTZ) - ), "latest" AS ( - SELECT + FROM "target" + WHERE + NOT "test_valid_to" IS NULL +), "latest" AS ( + SELECT "id1", "id2", "name", "price", "test_updated_at", "test_valid_from", - CAST(NULL AS TIMESTAMPTZ) AS "test_valid_to", + "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE "test_valid_from" <= CAST('2020-01-01 00:00:00+00:00' AS TIMESTAMPTZ) - AND ("test_valid_to" IS NULL OR "test_valid_to" >= CAST('2020-01-01 00:00:00+00:00' AS TIMESTAMPTZ)) - ), "deleted" AS ( - SELECT + FROM "target" + WHERE + "test_valid_to" IS NULL +), "deleted" AS ( + SELECT "static"."id1", "static"."id2", "static"."name", @@ -1692,20 +1688,23 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): "static"."test_updated_at", "static"."test_valid_from", "static"."test_valid_to" - FROM "static" - LEFT JOIN "latest" + FROM "static" + LEFT JOIN "latest" ON "static"."id1" = "latest"."id1" AND "static"."id2" = "latest"."id2" - WHERE "latest"."test_valid_to" IS NULL - ), "latest_deleted" AS ( - SELECT + WHERE + "latest"."test_valid_to" IS NULL +), "latest_deleted" AS ( + SELECT TRUE AS "_exists", "id1" AS "_key0", "id2" AS "_key1", MAX("test_valid_to") AS "test_valid_to" - FROM "deleted" - GROUP BY "id1", "id2" - ), "joined" AS ( - SELECT + FROM "deleted" + GROUP BY + "id1", + "id2" +), "joined" AS ( + SELECT "source"."_exists" AS "_exists", "latest"."id1" AS "t_id1", "latest"."id2" AS "t_id2", @@ -1719,11 +1718,11 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): "source"."name" AS "name", "source"."price" AS "price", "source"."test_updated_at" AS "test_updated_at" - FROM "latest" - LEFT JOIN "source" + FROM "latest" + LEFT JOIN "source" ON "latest"."id1" = "source"."id1" AND "latest"."id2" = "source"."id2" - UNION ALL - SELECT + UNION ALL + SELECT "source"."_exists" AS "_exists", "latest"."id1" AS "t_id1", "latest"."id2" AS "t_id2", @@ -1737,12 +1736,13 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): "source"."name" AS "name", "source"."price" AS "price", "source"."test_updated_at" AS "test_updated_at" - FROM "latest" - RIGHT JOIN "source" + FROM "latest" + RIGHT JOIN "source" ON "latest"."id1" = "source"."id1" AND "latest"."id2" = "source"."id2" - WHERE "latest"."_exists" IS NULL - ), "updated_rows" AS ( - SELECT + WHERE + "latest"."_exists" IS NULL +), "updated_rows" AS ( + SELECT COALESCE("joined"."t_id1", "joined"."id1") AS "id1", COALESCE("joined"."t_id2", "joined"."id2") AS "id2", COALESCE("joined"."t_name", "joined"."name") AS "name", @@ -1751,9 +1751,9 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): CASE WHEN "t_test_valid_from" IS NULL AND NOT "latest_deleted"."_exists" IS NULL THEN CASE - WHEN "latest_deleted"."test_valid_to" > "test_updated_at" - THEN "latest_deleted"."test_valid_to" - ELSE "test_updated_at" + WHEN "latest_deleted"."test_valid_to" > "test_updated_at" + THEN "latest_deleted"."test_valid_to" + ELSE "test_updated_at" END WHEN "t_test_valid_from" IS NULL THEN CAST('1970-01-01 00:00:00+00:00' AS TIMESTAMPTZ) @@ -1766,11 +1766,12 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): THEN CAST('2020-01-01 00:00:00+00:00' AS TIMESTAMPTZ) ELSE "t_test_valid_to" END AS "test_valid_to" - FROM "joined" - LEFT JOIN "latest_deleted" - ON "joined"."id1" = "latest_deleted"."_key0" AND "joined"."id2" = "latest_deleted"."_key1" - ), "inserted_rows" AS ( - SELECT + FROM "joined" + LEFT JOIN "latest_deleted" + ON "joined"."id1" = "latest_deleted"."_key0" + AND "joined"."id2" = "latest_deleted"."_key1" +), "inserted_rows" AS ( + SELECT "id1", "id2", "name", @@ -1778,23 +1779,12 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable): "test_updated_at", "test_updated_at" AS "test_valid_from", CAST(NULL AS TIMESTAMPTZ) AS "test_valid_to" - FROM "joined" - WHERE "joined"."test_updated_at" > "joined"."t_test_updated_at" - ) - SELECT - CAST("id1" AS INT) AS "id1", - CAST("id2" AS INT) AS "id2", - CAST("name" AS VARCHAR) AS "name", - CAST("price" AS DOUBLE) AS "price", - CAST("test_updated_at" AS TIMESTAMPTZ) AS "test_updated_at", - CAST("test_valid_from" AS TIMESTAMPTZ) AS "test_valid_from", - CAST("test_valid_to" AS TIMESTAMPTZ) AS "test_valid_to" - FROM ( - SELECT "id1", "id2", "name", "price", "test_updated_at", "test_valid_from", "test_valid_to" FROM "static" - UNION ALL SELECT "id1", "id2", "name", "price", "test_updated_at", "test_valid_from", "test_valid_to" FROM "updated_rows" - UNION ALL SELECT "id1", "id2", "name", "price", "test_updated_at", "test_valid_from", "test_valid_to" FROM "inserted_rows" - ) AS "_subquery" - """ + FROM "joined" + WHERE + "joined"."test_updated_at" > "joined"."t_test_updated_at" +) +SELECT CAST("id1" AS INT) AS "id1", CAST("id2" AS INT) AS "id2", CAST("name" AS VARCHAR) AS "name", CAST("price" AS DOUBLE) AS "price", CAST("test_updated_at" AS TIMESTAMPTZ) AS "test_updated_at", CAST("test_valid_from" AS TIMESTAMPTZ) AS "test_valid_from", CAST("test_valid_to" AS TIMESTAMPTZ) AS "test_valid_to" FROM (SELECT "id1", "id2", "name", "price", "test_updated_at", "test_valid_from", "test_valid_to" FROM "static" UNION ALL SELECT "id1", "id2", "name", "price", "test_updated_at", "test_valid_from", "test_valid_to" FROM "updated_rows" UNION ALL SELECT "id1", "id2", "name", "price", "test_updated_at", "test_valid_from", "test_valid_to" FROM "inserted_rows") AS "_subquery" +""" ).sql() ) @@ -1817,72 +1807,71 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable): "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), - start=datetime(2020, 1, 1, 0, 0, 0), extra_col_ignore="testing", - is_restatement=True, ) assert ( - parse_one(adapter.cursor.execute.call_args[0][0]).sql() + adapter.cursor.execute.call_args[0][0] == parse_one( """ - CREATE OR REPLACE TABLE "target" AS - WITH "source" AS ( - SELECT DISTINCT ON ("id") +CREATE OR REPLACE TABLE "target" AS +WITH "source" AS ( + SELECT DISTINCT ON ("id") TRUE AS "_exists", "id", "name", "price" - FROM ( + FROM ( SELECT "id", "name", "price" FROM "source" - ) AS "raw_source" - ), "static" AS ( - SELECT + ) AS "raw_source" +), "static" AS ( + SELECT "id", "name", "price", "test_VALID_from", "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE NOT "test_valid_to" IS NULL AND "test_valid_to" < CAST('2020-01-01 00:00:00' AS TIMESTAMP) - ), "latest" AS ( - SELECT + FROM "target" + WHERE + NOT "test_valid_to" IS NULL +), "latest" AS ( + SELECT "id", "name", "price", "test_VALID_from", - CAST(NULL AS TIMESTAMP) AS "test_valid_to", + "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE "test_VALID_from" <= CAST('2020-01-01 00:00:00' AS TIMESTAMP) - AND ("test_valid_to" IS NULL OR "test_valid_to" >= CAST('2020-01-01 00:00:00' AS TIMESTAMP)) - ), "deleted" AS ( - SELECT + FROM "target" + WHERE + "test_valid_to" IS NULL +), "deleted" AS ( + SELECT "static"."id", "static"."name", "static"."price", "static"."test_VALID_from", "static"."test_valid_to" - FROM "static" - LEFT JOIN "latest" + FROM "static" + LEFT JOIN "latest" ON "static"."id" = "latest"."id" - WHERE + WHERE "latest"."test_valid_to" IS NULL - ), "latest_deleted" AS ( - SELECT +), "latest_deleted" AS ( + SELECT TRUE AS "_exists", "id" AS "_key0", MAX("test_valid_to") AS "test_valid_to" - FROM "deleted" - GROUP BY + FROM "deleted" + GROUP BY "id" - ), "joined" AS ( - SELECT +), "joined" AS ( + SELECT "source"."_exists" AS "_exists", "latest"."id" AS "t_id", "latest"."name" AS "t_name", @@ -1892,11 +1881,11 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable): "source"."id" AS "id", "source"."name" AS "name", "source"."price" AS "price" - FROM "latest" - LEFT JOIN "source" + FROM "latest" + LEFT JOIN "source" ON "latest"."id" = "source"."id" - UNION ALL - SELECT + UNION ALL + SELECT "source"."_exists" AS "_exists", "latest"."id" AS "t_id", "latest"."name" AS "t_name", @@ -1906,13 +1895,13 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable): "source"."id" AS "id", "source"."name" AS "name", "source"."price" AS "price" - FROM "latest" - RIGHT JOIN "source" + FROM "latest" + RIGHT JOIN "source" ON "latest"."id" = "source"."id" - WHERE + WHERE "latest"."_exists" IS NULL - ), "updated_rows" AS ( - SELECT +), "updated_rows" AS ( + SELECT COALESCE("joined"."t_id", "joined"."id") AS "id", COALESCE("joined"."t_name", "joined"."name") AS "name", COALESCE("joined"."t_price", "joined"."price") AS "price", @@ -1920,73 +1909,63 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable): CASE WHEN "joined"."_exists" IS NULL OR ( - ( - NOT "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL - ) - AND ( - "joined"."name" <> "joined"."t_name" - OR ( - "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL - ) - OR ( - NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL - ) - OR "joined"."price" <> "joined"."t_price" - OR ( - "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL + ( + NOT "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL ) - OR ( - NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL + AND ( + "joined"."name" <> "joined"."t_name" + OR ( + "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL + ) + OR ( + NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL + ) + OR "joined"."price" <> "joined"."t_price" + OR ( + "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL + ) + OR ( + NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL + ) ) ) - ) THEN CAST('2020-01-01 00:00:00' AS TIMESTAMP) ELSE "t_test_valid_to" END AS "test_valid_to" - FROM "joined" - LEFT JOIN "latest_deleted" + FROM "joined" + LEFT JOIN "latest_deleted" ON "joined"."id" = "latest_deleted"."_key0" - ), "inserted_rows" AS ( - SELECT +), "inserted_rows" AS ( + SELECT "id", "name", "price", CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS "test_VALID_from", CAST(NULL AS TIMESTAMP) AS "test_valid_to" - FROM "joined" - WHERE + FROM "joined" + WHERE ( NOT "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL ) AND ( "joined"."name" <> "joined"."t_name" OR ( - "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL + "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL ) OR ( - NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL + NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL ) OR "joined"."price" <> "joined"."t_price" OR ( - "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL + "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL ) OR ( - NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL + NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL ) ) - ) - SELECT - CAST("id" AS INT) AS "id", - CAST("name" AS VARCHAR) AS "name", - CAST("price" AS DOUBLE) AS "price", - CAST("test_VALID_from" AS TIMESTAMP) AS "test_VALID_from", - CAST("test_valid_to" AS TIMESTAMP) AS "test_valid_to" - FROM ( - SELECT "id", "name", "price", "test_VALID_from", "test_valid_to" FROM "static" - UNION ALL SELECT "id", "name", "price", "test_VALID_from", "test_valid_to" FROM "updated_rows" - UNION ALL SELECT "id", "name", "price", "test_VALID_from", "test_valid_to" FROM "inserted_rows" - ) AS "_subquery" - """ +) +SELECT CAST("id" AS INT) AS "id", CAST("name" AS VARCHAR) AS "name", CAST("price" AS DOUBLE) AS "price", CAST("test_VALID_from" AS TIMESTAMP) AS "test_VALID_from", CAST("test_valid_to" AS TIMESTAMP) AS "test_valid_to" FROM (SELECT "id", "name", "price", "test_VALID_from", "test_valid_to" FROM "static" UNION ALL SELECT "id", "name", "price", "test_VALID_from", "test_valid_to" FROM "updated_rows" UNION ALL SELECT "id", "name", "price", "test_VALID_from", "test_valid_to" FROM "inserted_rows") AS "_subquery" + """ ).sql() ) @@ -2010,31 +1989,30 @@ def test_scd_type_2_by_column_composite_key(make_mocked_engine_adapter: t.Callab "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), - start=datetime(2020, 1, 1, 0, 0, 0), - is_restatement=True, ) + assert ( - parse_one(adapter.cursor.execute.call_args[0][0]).sql() + adapter.cursor.execute.call_args[0][0] == parse_one( """ - CREATE OR REPLACE TABLE "target" AS - WITH "source" AS ( - SELECT DISTINCT ON (CONCAT("id_a", "id_b")) +CREATE OR REPLACE TABLE "target" AS +WITH "source" AS ( + SELECT DISTINCT ON (CONCAT("id_a", "id_b")) TRUE AS "_exists", "id_a", "id_b", "name", - "price" - FROM ( + "price", + FROM ( SELECT "id_a", "id_b", "name", "price" FROM "source" - ) AS "raw_source" - ), "static" AS ( - SELECT + ) AS "raw_source" +), "static" AS ( + SELECT "id_a", "id_b", "name", @@ -2042,41 +2020,44 @@ def test_scd_type_2_by_column_composite_key(make_mocked_engine_adapter: t.Callab "test_VALID_from", "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE NOT "test_valid_to" IS NULL AND "test_valid_to" < CAST('2020-01-01 00:00:00' AS TIMESTAMP) - ), "latest" AS ( - SELECT + FROM "target" + WHERE + NOT "test_valid_to" IS NULL +), "latest" AS ( + SELECT "id_a", "id_b", "name", "price", "test_VALID_from", - CAST(NULL AS TIMESTAMP) AS "test_valid_to", + "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE "test_VALID_from" <= CAST('2020-01-01 00:00:00' AS TIMESTAMP) - AND ("test_valid_to" IS NULL OR "test_valid_to" >= CAST('2020-01-01 00:00:00' AS TIMESTAMP)) - ), "deleted" AS ( - SELECT + FROM "target" + WHERE + "test_valid_to" IS NULL +), "deleted" AS ( + SELECT "static"."id_a", "static"."id_b", "static"."name", "static"."price", "static"."test_VALID_from", "static"."test_valid_to" - FROM "static" - LEFT JOIN "latest" + FROM "static" + LEFT JOIN "latest" ON CONCAT("static"."id_a", "static"."id_b") = CONCAT("latest"."id_a", "latest"."id_b") - WHERE "latest"."test_valid_to" IS NULL - ), "latest_deleted" AS ( - SELECT + WHERE + "latest"."test_valid_to" IS NULL +), "latest_deleted" AS ( + SELECT TRUE AS "_exists", CONCAT("id_a", "id_b") AS "_key0", MAX("test_valid_to") AS "test_valid_to" - FROM "deleted" - GROUP BY CONCAT("id_a", "id_b") - ), "joined" AS ( - SELECT + FROM "deleted" + GROUP BY + CONCAT("id_a", "id_b") +), "joined" AS ( + SELECT "source"."_exists" AS "_exists", "latest"."id_a" AS "t_id_a", "latest"."id_b" AS "t_id_b", @@ -2088,11 +2069,11 @@ def test_scd_type_2_by_column_composite_key(make_mocked_engine_adapter: t.Callab "source"."id_b" AS "id_b", "source"."name" AS "name", "source"."price" AS "price" - FROM "latest" - LEFT JOIN "source" + FROM "latest" + LEFT JOIN "source" ON CONCAT("latest"."id_a", "latest"."id_b") = CONCAT("source"."id_a", "source"."id_b") - UNION ALL - SELECT + UNION ALL + SELECT "source"."_exists" AS "_exists", "latest"."id_a" AS "t_id_a", "latest"."id_b" AS "t_id_b", @@ -2104,12 +2085,13 @@ def test_scd_type_2_by_column_composite_key(make_mocked_engine_adapter: t.Callab "source"."id_b" AS "id_b", "source"."name" AS "name", "source"."price" AS "price" - FROM "latest" - RIGHT JOIN "source" + FROM "latest" + RIGHT JOIN "source" ON CONCAT("latest"."id_a", "latest"."id_b") = CONCAT("source"."id_a", "source"."id_b") - WHERE "latest"."_exists" IS NULL - ), "updated_rows" AS ( - SELECT + WHERE + "latest"."_exists" IS NULL +), "updated_rows" AS ( + SELECT COALESCE("joined"."t_id_a", "joined"."id_a") AS "id_a", COALESCE("joined"."t_id_b", "joined"."id_b") AS "id_b", COALESCE("joined"."t_name", "joined"."name") AS "name", @@ -2118,55 +2100,64 @@ def test_scd_type_2_by_column_composite_key(make_mocked_engine_adapter: t.Callab CASE WHEN "joined"."_exists" IS NULL OR ( - (NOT CONCAT("t_id_a", "t_id_b") IS NULL AND NOT CONCAT("id_a", "id_b") IS NULL) + ( + NOT CONCAT("t_id_a", "t_id_b") IS NULL AND NOT CONCAT("id_a", "id_b") IS NULL + ) AND ( - "joined"."name" <> "joined"."t_name" - OR ("joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL) - OR (NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL) - OR "joined"."price" <> "joined"."t_price" - OR ("joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL) - OR (NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL) + "joined"."name" <> "joined"."t_name" + OR ( + "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL + ) + OR ( + NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL + ) + OR "joined"."price" <> "joined"."t_price" + OR ( + "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL + ) + OR ( + NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL + ) ) ) THEN CAST('2020-01-01 00:00:00' AS TIMESTAMP) ELSE "t_test_valid_to" END AS "test_valid_to" - FROM "joined" - LEFT JOIN "latest_deleted" + FROM "joined" + LEFT JOIN "latest_deleted" ON CONCAT("joined"."id_a", "joined"."id_b") = "latest_deleted"."_key0" - ), "inserted_rows" AS ( - SELECT +), "inserted_rows" AS ( + SELECT "id_a", "id_b", "name", "price", CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS "test_VALID_from", CAST(NULL AS TIMESTAMP) AS "test_valid_to" - FROM "joined" - WHERE - (NOT CONCAT("t_id_a", "t_id_b") IS NULL AND NOT CONCAT("id_a", "id_b") IS NULL) + FROM "joined" + WHERE + ( + NOT CONCAT("t_id_a", "t_id_b") IS NULL AND NOT CONCAT("id_a", "id_b") IS NULL + ) AND ( "joined"."name" <> "joined"."t_name" - OR ("joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL) - OR (NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL) + OR ( + "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL + ) + OR ( + NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL + ) OR "joined"."price" <> "joined"."t_price" - OR ("joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL) - OR (NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL) + OR ( + "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL + ) + OR ( + NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL + ) ) - ) - SELECT - CAST("id_a" AS VARCHAR) AS "id_a", - CAST("id_b" AS VARCHAR) AS "id_b", - CAST("name" AS VARCHAR) AS "name", - CAST("price" AS DOUBLE) AS "price", - CAST("test_VALID_from" AS TIMESTAMP) AS "test_VALID_from", - CAST("test_valid_to" AS TIMESTAMP) AS "test_valid_to" - FROM ( - SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "static" - UNION ALL SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "updated_rows" - UNION ALL SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "inserted_rows" - ) AS "_subquery" - """ +) +SELECT CAST("id_a" AS VARCHAR) AS "id_a", CAST("id_b" AS VARCHAR) AS "id_b", CAST("name" AS VARCHAR) AS "name", CAST("price" AS DOUBLE) AS "price", CAST("test_VALID_from" AS TIMESTAMP) AS "test_VALID_from", CAST("test_valid_to" AS TIMESTAMP) AS "test_valid_to" FROM (SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "static" UNION ALL SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "updated_rows" UNION ALL SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "inserted_rows") AS "_subquery" + """ ).sql() ) @@ -2190,7 +2181,6 @@ def test_scd_type_2_truncate(make_mocked_engine_adapter: t.Callable): }, execution_time=datetime(2020, 1, 1, 0, 0, 0), truncate=True, - start=datetime(2020, 1, 1, 0, 0, 0), ) assert ( @@ -2373,70 +2363,70 @@ def test_scd_type_2_by_column_star_check(make_mocked_engine_adapter: t.Callable) "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), - start=datetime(2020, 1, 1, 0, 0, 0), - is_restatement=True, ) assert ( - parse_one(adapter.cursor.execute.call_args[0][0]).sql() + adapter.cursor.execute.call_args[0][0] == parse_one( """ - CREATE OR REPLACE TABLE "target" AS - WITH "source" AS ( - SELECT DISTINCT ON ("id") +CREATE OR REPLACE TABLE "target" AS +WITH "source" AS ( + SELECT DISTINCT ON ("id") TRUE AS "_exists", "id", "name", "price" - FROM ( + FROM ( SELECT "id", "name", "price" FROM "source" - ) AS "raw_source" - ), "static" AS ( - SELECT + ) AS "raw_source" +), "static" AS ( + SELECT "id", "name", "price", "test_valid_from", "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE NOT "test_valid_to" IS NULL AND "test_valid_to" < CAST('2020-01-01 00:00:00' AS TIMESTAMP) - ), "latest" AS ( - SELECT + FROM "target" + WHERE + NOT "test_valid_to" IS NULL +), "latest" AS ( + SELECT "id", "name", "price", "test_valid_from", - CAST(NULL AS TIMESTAMP) AS "test_valid_to", + "test_valid_to", TRUE AS "_exists" - FROM "target" - WHERE "test_valid_from" <= CAST('2020-01-01 00:00:00' AS TIMESTAMP) - AND ("test_valid_to" IS NULL OR "test_valid_to" >= CAST('2020-01-01 00:00:00' AS TIMESTAMP)) - ), "deleted" AS ( - SELECT + FROM "target" + WHERE + "test_valid_to" IS NULL +), "deleted" AS ( + SELECT "static"."id", "static"."name", "static"."price", "static"."test_valid_from", "static"."test_valid_to" - FROM "static" - LEFT JOIN "latest" + FROM "static" + LEFT JOIN "latest" ON "static"."id" = "latest"."id" - WHERE "latest"."test_valid_to" IS NULL - ), "latest_deleted" AS ( - SELECT + WHERE + "latest"."test_valid_to" IS NULL +), "latest_deleted" AS ( + SELECT TRUE AS "_exists", "id" AS "_key0", MAX("test_valid_to") AS "test_valid_to" - FROM "deleted" - GROUP BY + FROM "deleted" + GROUP BY "id" - ), "joined" AS ( - SELECT +), "joined" AS ( + SELECT "source"."_exists" AS "_exists", "latest"."id" AS "t_id", "latest"."name" AS "t_name", @@ -2446,11 +2436,11 @@ def test_scd_type_2_by_column_star_check(make_mocked_engine_adapter: t.Callable) "source"."id" AS "id", "source"."name" AS "name", "source"."price" AS "price" - FROM "latest" - LEFT JOIN "source" + FROM "latest" + LEFT JOIN "source" ON "latest"."id" = "source"."id" - UNION ALL - SELECT + UNION ALL + SELECT "source"."_exists" AS "_exists", "latest"."id" AS "t_id", "latest"."name" AS "t_name", @@ -2460,12 +2450,13 @@ def test_scd_type_2_by_column_star_check(make_mocked_engine_adapter: t.Callable) "source"."id" AS "id", "source"."name" AS "name", "source"."price" AS "price" - FROM "latest" - RIGHT JOIN "source" + FROM "latest" + RIGHT JOIN "source" ON "latest"."id" = "source"."id" - WHERE "latest"."_exists" IS NULL - ), "updated_rows" AS ( - SELECT + WHERE + "latest"."_exists" IS NULL +), "updated_rows" AS ( + SELECT COALESCE("joined"."t_id", "joined"."id") AS "id", COALESCE("joined"."t_name", "joined"."name") AS "name", COALESCE("joined"."t_price", "joined"."price") AS "price", @@ -2473,59 +2464,77 @@ def test_scd_type_2_by_column_star_check(make_mocked_engine_adapter: t.Callable) CASE WHEN "joined"."_exists" IS NULL OR ( - (NOT "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL) - AND ( - "joined"."id" <> "joined"."t_id" - OR ("joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL) - OR (NOT "joined"."t_id" IS NULL AND "joined"."id" IS NULL) - OR "joined"."name" <> "joined"."t_name" - OR ("joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL) - OR (NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL) - OR "joined"."price" <> "joined"."t_price" - OR ("joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL) - OR (NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL) - ) + ( + NOT "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL + ) + AND ( + "joined"."id" <> "joined"."t_id" + OR ( + "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL + ) + OR ( + NOT "joined"."t_id" IS NULL AND "joined"."id" IS NULL + ) + OR "joined"."name" <> "joined"."t_name" + OR ( + "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL + ) + OR ( + NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL + ) + OR "joined"."price" <> "joined"."t_price" + OR ( + "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL + ) + OR ( + NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL + ) + ) ) THEN CAST('2020-01-01 00:00:00' AS TIMESTAMP) ELSE "t_test_valid_to" END AS "test_valid_to" - FROM "joined" - LEFT JOIN "latest_deleted" + FROM "joined" + LEFT JOIN "latest_deleted" ON "joined"."id" = "latest_deleted"."_key0" - ), "inserted_rows" AS ( - SELECT +), "inserted_rows" AS ( + SELECT "id", "name", "price", CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS "test_valid_from", CAST(NULL AS TIMESTAMP) AS "test_valid_to" - FROM "joined" - WHERE - (NOT "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL) + FROM "joined" + WHERE + ( + NOT "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL + ) AND ( "joined"."id" <> "joined"."t_id" - OR ("joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL) - OR (NOT "joined"."t_id" IS NULL AND "joined"."id" IS NULL) + OR ( + "joined"."t_id" IS NULL AND NOT "joined"."id" IS NULL + ) + OR ( + NOT "joined"."t_id" IS NULL AND "joined"."id" IS NULL + ) OR "joined"."name" <> "joined"."t_name" - OR ("joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL) - OR (NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL) + OR ( + "joined"."t_name" IS NULL AND NOT "joined"."name" IS NULL + ) + OR ( + NOT "joined"."t_name" IS NULL AND "joined"."name" IS NULL + ) OR "joined"."price" <> "joined"."t_price" - OR ("joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL) - OR (NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL) + OR ( + "joined"."t_price" IS NULL AND NOT "joined"."price" IS NULL + ) + OR ( + NOT "joined"."t_price" IS NULL AND "joined"."price" IS NULL + ) ) - ) - SELECT - CAST("id" AS INT) AS "id", - CAST("name" AS VARCHAR) AS "name", - CAST("price" AS DOUBLE) AS "price", - CAST("test_valid_from" AS TIMESTAMP) AS "test_valid_from", - CAST("test_valid_to" AS TIMESTAMP) AS "test_valid_to" - FROM ( - SELECT "id", "name", "price", "test_valid_from", "test_valid_to" FROM "static" - UNION ALL SELECT "id", "name", "price", "test_valid_from", "test_valid_to" FROM "updated_rows" - UNION ALL SELECT "id", "name", "price", "test_valid_from", "test_valid_to" FROM "inserted_rows" - ) AS "_subquery" - """ +) +SELECT CAST("id" AS INT) AS "id", CAST("name" AS VARCHAR) AS "name", CAST("price" AS DOUBLE) AS "price", CAST("test_valid_from" AS TIMESTAMP) AS "test_valid_from", CAST("test_valid_to" AS TIMESTAMP) AS "test_valid_to" FROM (SELECT "id", "name", "price", "test_valid_from", "test_valid_to" FROM "static" UNION ALL SELECT "id", "name", "price", "test_valid_from", "test_valid_to" FROM "updated_rows" UNION ALL SELECT "id", "name", "price", "test_valid_from", "test_valid_to" FROM "inserted_rows") AS "_subquery" + """ ).sql() ) @@ -2549,12 +2558,10 @@ def test_scd_type_2_by_column_no_invalidate_hard_deletes(make_mocked_engine_adap "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), - start=datetime(2020, 1, 1, 0, 0, 0), - is_restatement=True, ) assert ( - parse_one(adapter.cursor.execute.call_args[0][0]).sql() + adapter.cursor.execute.call_args[0][0] == parse_one( """ CREATE OR REPLACE TABLE "target" AS @@ -2580,18 +2587,19 @@ def test_scd_type_2_by_column_no_invalidate_hard_deletes(make_mocked_engine_adap "test_valid_to", TRUE AS "_exists" FROM "target" - WHERE NOT "test_valid_to" IS NULL AND "test_valid_to" < CAST('2020-01-01 00:00:00' AS TIMESTAMP) + WHERE + NOT "test_valid_to" IS NULL ), "latest" AS ( SELECT "id", "name", "price", "test_valid_from", - CAST(NULL AS TIMESTAMP) AS "test_valid_to", + "test_valid_to", TRUE AS "_exists" FROM "target" - WHERE "test_valid_from" <= CAST('2020-01-01 00:00:00' AS TIMESTAMP) - AND ("test_valid_to" IS NULL OR "test_valid_to" >= CAST('2020-01-01 00:00:00' AS TIMESTAMP)) + WHERE + "test_valid_to" IS NULL ), "deleted" AS ( SELECT "static"."id", diff --git a/tests/core/engine_adapter/test_clickhouse.py b/tests/core/engine_adapter/test_clickhouse.py index a0cd33af70..973e178820 100644 --- a/tests/core/engine_adapter/test_clickhouse.py +++ b/tests/core/engine_adapter/test_clickhouse.py @@ -612,8 +612,6 @@ def test_scd_type_2_by_time( "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), - start=datetime(2020, 1, 1, 0, 0, 0), - truncate=True, ) assert to_sql_calls(adapter)[3] == parse_one( @@ -645,7 +643,7 @@ def test_scd_type_2_by_time( TRUE AS "_exists" FROM ""__temp_target_efgh"" WHERE - NOT "test_valid_to" IS NULL LIMIT 0 + NOT "test_valid_to" IS NULL ), "latest" AS ( SELECT "id", @@ -657,7 +655,7 @@ def test_scd_type_2_by_time( TRUE AS "_exists" FROM ""__temp_target_efgh"" WHERE - "test_valid_to" IS NULL LIMIT 0 + "test_valid_to" IS NULL ), "deleted" AS ( SELECT "static"."id", @@ -825,8 +823,6 @@ def test_scd_type_2_by_column( "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), - start=datetime(2020, 1, 1, 0, 0, 0), - truncate=True, ) assert to_sql_calls(adapter)[3] == parse_one( @@ -856,7 +852,7 @@ def test_scd_type_2_by_column( TRUE AS "_exists" FROM "__temp_target_efgh" WHERE - NOT ("test_valid_to" IS NULL) LIMIT 0 + NOT "test_valid_to" IS NULL ), "latest" AS ( SELECT "id", @@ -867,7 +863,7 @@ def test_scd_type_2_by_column( TRUE AS "_exists" FROM "__temp_target_efgh" WHERE - "test_valid_to" IS NULL LIMIT 0 + "test_valid_to" IS NULL ), "deleted" AS ( SELECT "static"."id", @@ -923,7 +919,7 @@ def test_scd_type_2_by_column( COALESCE("joined"."t_id", "joined"."id") AS "id", COALESCE("joined"."t_name", "joined"."name") AS "name", COALESCE("joined"."t_price", "joined"."price") AS "price", - COALESCE("t_test_VALID_from", CAST('1970-01-01 00:00:00' AS Nullable(DateTime64(6)))) AS "test_VALID_from", + COALESCE("t_test_VALID_from", CAST('2020-01-01 00:00:00' AS Nullable(DateTime64(6)))) AS "test_VALID_from", CASE WHEN "joined"."_exists" IS NULL OR ( diff --git a/tests/core/engine_adapter/test_spark.py b/tests/core/engine_adapter/test_spark.py index 2ef70a6929..468de9f75a 100644 --- a/tests/core/engine_adapter/test_spark.py +++ b/tests/core/engine_adapter/test_spark.py @@ -591,8 +591,6 @@ def check_table_exists(table_name: exp.Table) -> bool: "test_valid_to": exp.DataType.build("TIMESTAMP"), }, execution_time=datetime(2020, 1, 1, 0, 0, 0), - start=datetime(2020, 1, 1, 0, 0, 0), - truncate=True, ) assert to_sql_calls(adapter) == [ @@ -637,7 +635,7 @@ def check_table_exists(table_name: exp.Table) -> bool: TRUE AS `_exists` FROM `db`.`temp_target_abcdefgh` WHERE - NOT `test_valid_to` IS NULL LIMIT 0 + NOT `test_valid_to` IS NULL ), `latest` AS ( SELECT `id`, @@ -649,7 +647,7 @@ def check_table_exists(table_name: exp.Table) -> bool: TRUE AS `_exists` FROM `db`.`temp_target_abcdefgh` WHERE - `test_valid_to` IS NULL LIMIT 0 + `test_valid_to` IS NULL ), `deleted` AS ( SELECT `static`.`id`, diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 5a0e7bdf48..856406a16d 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -6859,169 +6859,6 @@ def plan_with_output(ctx: Context, environment: str): assert context_diff.environment == environment -@time_machine.travel("2023-01-08 15:00:00 UTC") -def test_scd_type_2_restatement(init_and_plan_context: t.Callable): - context, plan = init_and_plan_context("examples/sushi") - context.apply(plan) - - raw_employee_status = d.parse(""" - MODEL ( - name memory.hr_system.raw_employee_status, - kind FULL - ); - - SELECT - 1001 AS employee_id, - 'engineering' AS department, - 'EMEA' AS region, - '2023-01-08 15:00:00 UTC' AS last_modified; - """) - - # Create SCD Type 2 model for employee history tracking - employee_history = d.parse(""" - MODEL ( - name memory.hr_system.employee_history, - kind SCD_TYPE_2_BY_TIME ( - unique_key employee_id, - updated_at_name last_modified, - disable_restatement false - ), - owner hr_analytics, - cron '*/5 * * * *', - grain employee_id, - description 'Historical tracking of employee status changes' - ); - - SELECT - employee_id::INT AS employee_id, - department::TEXT AS department, - region::TEXT AS region, - last_modified AS last_modified - FROM - memory.hr_system.raw_employee_status; - """) - - raw_employee_status_model = load_sql_based_model(raw_employee_status) - employee_history_model = load_sql_based_model(employee_history) - context.upsert_model(raw_employee_status_model) - context.upsert_model(employee_history_model) - - # Initial plan and apply - plan = context.plan_builder("prod", skip_tests=True).build() - context.apply(plan) - - query = "SELECT employee_id, department, region, valid_from, valid_to FROM memory.hr_system.employee_history ORDER BY employee_id, valid_from" - initial_data = context.engine_adapter.fetchdf(query) - - assert len(initial_data) == 1 - assert initial_data["valid_to"].isna().all() - assert initial_data["department"].tolist() == ["engineering"] - assert initial_data["region"].tolist() == ["EMEA"] - - # Apply a future plan with source changes - with time_machine.travel("2023-01-08 15:10:00 UTC"): - # Update source model, employee 1001 changed region - raw_employee_status_v2 = d.parse(""" - MODEL ( - name memory.hr_system.raw_employee_status, - kind FULL - ); - - SELECT - 1001 AS employee_id, - 'engineering' AS department, - 'AMER' AS region, - '2023-01-08 15:10:00 UTC' AS last_modified; - """) - raw_employee_status_v2_model = load_sql_based_model(raw_employee_status_v2) - context.upsert_model(raw_employee_status_v2_model) - context.plan( - auto_apply=True, no_prompts=True, categorizer_config=CategorizerConfig.all_full() - ) - - with time_machine.travel("2023-01-08 15:20:00 UTC"): - context.run() - data_after_change = context.engine_adapter.fetchdf(query) - - # Validate the SCD2 history for employee 1001 - assert len(data_after_change) == 2 - assert data_after_change.iloc[0]["employee_id"] == 1001 - assert data_after_change.iloc[0]["department"] == "engineering" - assert data_after_change.iloc[0]["region"] == "EMEA" - assert str(data_after_change.iloc[0]["valid_from"]) == "1970-01-01 00:00:00" - assert str(data_after_change.iloc[0]["valid_to"]) == "2023-01-08 15:10:00" - assert data_after_change.iloc[1]["employee_id"] == 1001 - assert data_after_change.iloc[1]["department"] == "engineering" - assert data_after_change.iloc[1]["region"] == "AMER" - assert str(data_after_change.iloc[1]["valid_from"]) == "2023-01-08 15:10:00" - assert pd.isna(data_after_change.iloc[1]["valid_to"]) - - # Update source model, employee 1001 changed region again and department - raw_employee_status_v2 = d.parse(""" - MODEL ( - name memory.hr_system.raw_employee_status, - kind FULL - ); - - SELECT - 1001 AS employee_id, - 'sales' AS department, - 'ANZ' AS region, - '2023-01-08 15:26:00 UTC' AS last_modified; - """) - raw_employee_status_v2_model = load_sql_based_model(raw_employee_status_v2) - context.upsert_model(raw_employee_status_v2_model) - context.plan( - auto_apply=True, no_prompts=True, categorizer_config=CategorizerConfig.all_full() - ) - - with time_machine.travel("2023-01-08 15:35:00 UTC"): - context.run() - data_after_change = context.engine_adapter.fetchdf(query) - - # Validate the SCD2 history for employee 1001 after second change - assert len(data_after_change) == 3 - assert data_after_change.iloc[0]["employee_id"] == 1001 - assert data_after_change.iloc[0]["department"] == "engineering" - assert data_after_change.iloc[0]["region"] == "EMEA" - assert str(data_after_change.iloc[0]["valid_from"]) == "1970-01-01 00:00:00" - assert str(data_after_change.iloc[0]["valid_to"]) == "2023-01-08 15:10:00" - assert data_after_change.iloc[1]["employee_id"] == 1001 - assert data_after_change.iloc[1]["department"] == "engineering" - assert data_after_change.iloc[1]["region"] == "AMER" - assert str(data_after_change.iloc[1]["valid_from"]) == "2023-01-08 15:10:00" - assert str(data_after_change.iloc[1]["valid_to"]) == "2023-01-08 15:26:00" - assert data_after_change.iloc[2]["employee_id"] == 1001 - assert data_after_change.iloc[2]["department"] == "sales" - assert data_after_change.iloc[2]["region"] == "ANZ" - assert str(data_after_change.iloc[2]["valid_from"]) == "2023-01-08 15:26:00" - assert pd.isna(data_after_change.iloc[2]["valid_to"]) - - # Now test restatement cleanup by restating from 15:10 (first change) - with time_machine.travel("2023-01-08 15:38:00 UTC"): - plan = context.plan_builder( - "prod", - skip_tests=True, - restate_models=["memory.hr_system.employee_history"], - start="2023-01-08 15:09:00", - ).build() - context.apply(plan) - restated_data = context.engine_adapter.fetchdf(query) - - # Validate the SCD2 history after restatement - assert len(restated_data) == 2 - assert restated_data.iloc[0]["employee_id"] == 1001 - assert restated_data.iloc[0]["department"] == "engineering" - assert restated_data.iloc[0]["region"] == "EMEA" - assert str(restated_data.iloc[0]["valid_from"]) == "1970-01-01 00:00:00" - assert str(restated_data.iloc[0]["valid_to"]) == "2023-01-08 15:26:00" - assert restated_data.iloc[1]["employee_id"] == 1001 - assert restated_data.iloc[1]["department"] == "sales" - assert restated_data.iloc[1]["region"] == "ANZ" - assert str(restated_data.iloc[1]["valid_from"]) == "2023-01-08 15:26:00" - assert pd.isna(restated_data.iloc[1]["valid_to"]) - - @time_machine.travel("2020-01-01 00:00:00 UTC") def test_scd_type_2_full_restatement_no_start_date(init_and_plan_context: t.Callable): context, plan = init_and_plan_context("examples/sushi") @@ -7348,7 +7185,7 @@ def test_scd_type_2_regular_run_with_offset(init_and_plan_context: t.Callable): assert str(data_after_change.iloc[2]["valid_from"]) == "2023-01-09 07:26:00" assert pd.isna(data_after_change.iloc[2]["valid_to"]) - # Now test restatement still works as expected by restating from 2023-01-09 00:10:00 (first change) + # Now test restatement works (full restatement support currently) with time_machine.travel("2023-01-10 07:38:00 UTC"): plan = context.plan_builder( "prod", @@ -7359,18 +7196,13 @@ def test_scd_type_2_regular_run_with_offset(init_and_plan_context: t.Callable): context.apply(plan) restated_data = context.engine_adapter.fetchdf(query) - # Validate the SCD2 history after restatement - assert len(restated_data) == 2 + # Validate the SCD2 history after restatement has been wiped bar one + assert len(restated_data) == 1 assert restated_data.iloc[0]["employee_id"] == 1001 - assert restated_data.iloc[0]["department"] == "engineering" - assert restated_data.iloc[0]["region"] == "EMEA" + assert restated_data.iloc[0]["department"] == "sales" + assert restated_data.iloc[0]["region"] == "ANZ" assert str(restated_data.iloc[0]["valid_from"]) == "1970-01-01 00:00:00" - assert str(restated_data.iloc[0]["valid_to"]) == "2023-01-09 07:26:00" - assert restated_data.iloc[1]["employee_id"] == 1001 - assert restated_data.iloc[1]["department"] == "sales" - assert restated_data.iloc[1]["region"] == "ANZ" - assert str(restated_data.iloc[1]["valid_from"]) == "2023-01-09 07:26:00" - assert pd.isna(restated_data.iloc[1]["valid_to"]) + assert pd.isna(restated_data.iloc[0]["valid_to"]) def test_engine_adapters_multi_repo_all_gateways_gathered(copy_to_temp_path): diff --git a/tests/core/test_model.py b/tests/core/test_model.py index 6f3bd0bc7e..6bc5388468 100644 --- a/tests/core/test_model.py +++ b/tests/core/test_model.py @@ -10441,9 +10441,9 @@ def test_signal_always_true(batch, arg1, arg2): def test_scd_type_2_full_history_restatement(): - assert ModelKindName.SCD_TYPE_2.full_history_restatement_only is False - assert ModelKindName.SCD_TYPE_2_BY_TIME.full_history_restatement_only is False - assert ModelKindName.SCD_TYPE_2_BY_COLUMN.full_history_restatement_only is False + assert ModelKindName.SCD_TYPE_2.full_history_restatement_only is True + assert ModelKindName.SCD_TYPE_2_BY_TIME.full_history_restatement_only is True + assert ModelKindName.SCD_TYPE_2_BY_COLUMN.full_history_restatement_only is True assert ModelKindName.INCREMENTAL_BY_TIME_RANGE.full_history_restatement_only is False diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index a3c7837711..6c0763892e 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -2049,8 +2049,6 @@ def test_insert_into_scd_type_2_by_time( column_descriptions={}, updated_at_as_valid_from=False, truncate=truncate, - is_restatement=False, - start="2020-01-01", ) adapter_mock.columns.assert_called_once_with(snapshot.table_name()) @@ -2223,8 +2221,6 @@ def test_insert_into_scd_type_2_by_column( table_description=None, column_descriptions={}, truncate=truncate, - is_restatement=False, - start="2020-01-01", ) adapter_mock.columns.assert_called_once_with(snapshot.table_name())