diff --git a/docs/integrations/dbt.md b/docs/integrations/dbt.md index 4342f47779..7cbef5b8fa 100644 --- a/docs/integrations/dbt.md +++ b/docs/integrations/dbt.md @@ -219,7 +219,7 @@ This section describes how to adapt dbt's incremental models to run on sqlmesh a SQLMesh supports two approaches to implement [idempotent](../concepts/glossary.md#idempotency) incremental loads: * Using merge (with the sqlmesh [`INCREMENTAL_BY_UNIQUE_KEY` model kind](../concepts/models/model_kinds.md#incremental_by_unique_key)) -* Using insert-overwrite/delete+insert (with the sqlmesh [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range)) +* Using [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range) #### Incremental by unique key @@ -233,28 +233,22 @@ To enable incremental_by_unique_key incrementality, the model configuration shou #### Incremental by time range -To enable incremental_by_time_range incrementality, the model configuration should contain: +To enable incremental_by_time_range incrementality, the model configuration must contain: -* The `time_column` key with the model's time column field name as the value (see [`time column`](../concepts/models/model_kinds.md#time-column) for details) * The `materialized` key with value `'incremental'` -* Either: - * The `incremental_strategy` key with value `'insert_overwrite'` or - * The `incremental_strategy` key with value `'delete+insert'` - * Note: in this context, these two strategies are synonyms. Regardless of which one is specified SQLMesh will use the [`best incremental strategy`](../concepts/models/model_kinds.md#materialization-strategy) for the target engine. +* The `incremental_strategy` key with the value `incremental_by_time_range` +* The `time_column` key with the model's time column field name as the value (see [`time column`](../concepts/models/model_kinds.md#time-column) for details) ### Incremental logic -SQLMesh requires a new jinja block gated by `{% if sqlmesh_incremental is defined %}`. The new block should supersede the existing `{% if is_incremental() %}` block and contain the `WHERE` clause selecting the time interval. +Unlike dbt incremental strategies, SQLMesh does not require the use of `is_incremental` jinja blocks to implement incremental logic. +Instead, SQLMesh provides predefined time macro variables that can be used in the model's SQL to filter data based on the time column. For example, the SQL `WHERE` clause with the "ds" column goes in a new jinja block gated by `{% if sqlmesh_incremental is defined %}` as follows: ```bash -> {% if sqlmesh_incremental is defined %} > WHERE > ds BETWEEN '{{ start_ds }}' AND '{{ end_ds }}' -> {% elif is_incremental() %} -> ; < your existing is_incremental block > -> {% endif %} ``` `{{ start_ds }}` and `{{ end_ds }}` are the jinja equivalents of SQLMesh's `@start_ds` and `@end_ds` predefined time macro variables. See all [predefined time variables](../concepts/macros/macro_variables.md) available in jinja. @@ -263,13 +257,11 @@ For example, the SQL `WHERE` clause with the "ds" column goes in a new jinja blo SQLMesh provides configuration parameters that enable control over how incremental computations occur. These parameters are set in the model's `config` block. -The [`batch_size` parameter](../concepts/models/overview.md#batch_size) determines the maximum number of time intervals to run in a single job. - -The [`lookback` parameter](../concepts/models/overview.md#lookback) is used to capture late arriving data. It sets the number of units of late arriving data the model should expect and must be a positive integer. +See [Incremental Model Properties](../concepts/models/overview.md#incremental-model-properties) for the full list of incremental model configuration parameters. **Note:** By default, all incremental dbt models are configured to be [forward-only](../concepts/plans.md#forward-only-plans). However, you can change this behavior by setting the `forward_only: false` setting either in the configuration of an individual model or globally for all models in the `dbt_project.yaml` file. The [forward-only](../concepts/plans.md#forward-only-plans) mode aligns more closely with the typical operation of dbt and therefore better meets user's expectations. -Similarly, the [allow_partials](../concepts/models/overview.md#allow_partials) parameter is set to `true` by default for incremental dbt models unless the time column is specified, or the `allow_partials` parameter is explicitly set to `false` in the model configuration. +Similarly, the [allow_partials](../concepts/models/overview.md#allow_partials) parameter is set to `true` by default unless the `allow_partials` parameter is explicitly set to `false` in the model configuration. #### on_schema_change diff --git a/examples/sushi_dbt/models/customer_revenue_by_day.sql b/examples/sushi_dbt/models/customer_revenue_by_day.sql index f3f49cfc14..9810481eff 100644 --- a/examples/sushi_dbt/models/customer_revenue_by_day.sql +++ b/examples/sushi_dbt/models/customer_revenue_by_day.sql @@ -1,7 +1,7 @@ {{ config( materialized='incremental', - incremental_strategy='delete+insert', + incremental_strategy='incremental_by_time_range', cluster_by=['ds'], time_column='ds', ) diff --git a/examples/sushi_dbt/models/waiter_as_customer_by_day.sql b/examples/sushi_dbt/models/waiter_as_customer_by_day.sql index 3d4967aec7..a1145c2b5c 100644 --- a/examples/sushi_dbt/models/waiter_as_customer_by_day.sql +++ b/examples/sushi_dbt/models/waiter_as_customer_by_day.sql @@ -1,7 +1,7 @@ {{ config( materialized='incremental', - incremental_strategy='delete+insert', + incremental_strategy='incremental_by_time_range', cluster_by=['ds'], time_column='ds', ) diff --git a/examples/sushi_dbt/models/waiter_revenue_by_day.sql b/examples/sushi_dbt/models/waiter_revenue_by_day.sql index d430c6125b..670e238962 100644 --- a/examples/sushi_dbt/models/waiter_revenue_by_day.sql +++ b/examples/sushi_dbt/models/waiter_revenue_by_day.sql @@ -1,7 +1,7 @@ {{ config( materialized='incremental', - incremental_strategy='delete+insert', + incremental_strategy='incremental_by_time_range', cluster_by=['ds'], time_column='ds', ) diff --git a/examples/sushi_dbt/models/waiter_revenue_by_day_v1.sql b/examples/sushi_dbt/models/waiter_revenue_by_day_v1.sql index d430c6125b..670e238962 100644 --- a/examples/sushi_dbt/models/waiter_revenue_by_day_v1.sql +++ b/examples/sushi_dbt/models/waiter_revenue_by_day_v1.sql @@ -1,7 +1,7 @@ {{ config( materialized='incremental', - incremental_strategy='delete+insert', + incremental_strategy='incremental_by_time_range', cluster_by=['ds'], time_column='ds', ) diff --git a/sqlmesh/dbt/model.py b/sqlmesh/dbt/model.py index 9997d464ae..a4ebf93ae5 100644 --- a/sqlmesh/dbt/model.py +++ b/sqlmesh/dbt/model.py @@ -25,7 +25,14 @@ ManagedKind, create_sql_model, ) -from sqlmesh.core.model.kind import SCDType2ByTimeKind, OnDestructiveChange, OnAdditiveChange +from sqlmesh.core.model.kind import ( + SCDType2ByTimeKind, + OnDestructiveChange, + OnAdditiveChange, + on_destructive_change_validator, + on_additive_change_validator, + TimeColumn, +) from sqlmesh.dbt.basemodel import BaseModelConfig, Materialization, SnapshotStrategy from sqlmesh.dbt.common import SqlStr, sql_str_validator from sqlmesh.utils.errors import ConfigError @@ -41,7 +48,9 @@ logger = logging.getLogger(__name__) -INCREMENTAL_BY_TIME_STRATEGIES = set(["delete+insert", "insert_overwrite", "microbatch"]) +INCREMENTAL_BY_TIME_RANGE_STRATEGIES = set( + ["delete+insert", "insert_overwrite", "microbatch", "incremental_by_time_range"] +) INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES = set(["merge"]) @@ -77,7 +86,7 @@ class ModelConfig(BaseModelConfig): # sqlmesh fields sql: SqlStr = SqlStr("") - time_column: t.Optional[str] = None + time_column: t.Optional[TimeColumn] = None cron: t.Optional[str] = None interval_unit: t.Optional[str] = None batch_concurrency: t.Optional[int] = None @@ -87,6 +96,9 @@ class ModelConfig(BaseModelConfig): physical_version: t.Optional[str] = None auto_restatement_cron: t.Optional[str] = None auto_restatement_intervals: t.Optional[int] = None + partition_by_time_column: t.Optional[bool] = None + on_destructive_change: t.Optional[OnDestructiveChange] = None + on_additive_change: t.Optional[OnAdditiveChange] = None # DBT configuration fields cluster_by: t.Optional[t.List[str]] = None @@ -139,6 +151,9 @@ class ModelConfig(BaseModelConfig): incremental_predicates: t.Optional[t.List[str]] = None _sql_validator = sql_str_validator + _on_destructive_change_validator = on_destructive_change_validator + _on_additive_change_validator = on_additive_change_validator + _time_column_validator = TimeColumn.validator() @field_validator( "unique_key", @@ -230,17 +245,6 @@ def snapshot_strategy(self) -> t.Optional[SnapshotStrategy]: def table_schema(self) -> str: return self.target_schema or super().table_schema - def _get_overlapping_field_value( - self, context: DbtContext, dbt_field_name: str, sqlmesh_field_name: str - ) -> t.Optional[t.Any]: - dbt_field = self._get_field_value(dbt_field_name) - sqlmesh_field = getattr(self, sqlmesh_field_name, None) - if dbt_field is not None and sqlmesh_field is not None: - get_console().log_warning( - f"Both '{dbt_field_name}' and '{sqlmesh_field_name}' are set for model '{self.canonical_name(context)}'. '{sqlmesh_field_name}' will be used." - ) - return sqlmesh_field if sqlmesh_field is not None else dbt_field - def model_kind(self, context: DbtContext) -> ModelKind: """ Get the sqlmesh ModelKind @@ -275,8 +279,12 @@ def model_kind(self, context: DbtContext) -> ModelKind: "Valid values are 'ignore', 'fail', 'append_new_columns', 'sync_all_columns'." ) - incremental_kind_kwargs["on_destructive_change"] = on_destructive_change - incremental_kind_kwargs["on_additive_change"] = on_additive_change + incremental_kind_kwargs["on_destructive_change"] = ( + self._get_field_value("on_destructive_change") or on_destructive_change + ) + incremental_kind_kwargs["on_additive_change"] = ( + self._get_field_value("on_additive_change") or on_additive_change + ) auto_restatement_cron_value = self._get_field_value("auto_restatement_cron") if auto_restatement_cron_value is not None: incremental_kind_kwargs["auto_restatement_cron"] = auto_restatement_cron_value @@ -292,7 +300,8 @@ def model_kind(self, context: DbtContext) -> ModelKind: incremental_kind_kwargs["forward_only"] = forward_only_value is_incremental_by_time_range = self.time_column or ( - self.incremental_strategy and self.incremental_strategy == "microbatch" + self.incremental_strategy + and self.incremental_strategy in {"microbatch", "incremental_by_time_range"} ) # Get shared incremental by kwargs for field in ("batch_size", "batch_concurrency", "lookback"): @@ -313,22 +322,29 @@ def model_kind(self, context: DbtContext) -> ModelKind: ) incremental_by_kind_kwargs["disable_restatement"] = disable_restatement - # Incremental by time range which includes microbatch if is_incremental_by_time_range: strategy = self.incremental_strategy or target.default_incremental_strategy( IncrementalByTimeRangeKind ) - if strategy not in INCREMENTAL_BY_TIME_STRATEGIES: + if strategy not in INCREMENTAL_BY_TIME_RANGE_STRATEGIES: get_console().log_warning( f"SQLMesh incremental by time strategy is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. " - f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_STRATEGIES)}." + f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_RANGE_STRATEGIES)}." ) - if strategy == "microbatch": - time_column = self._get_overlapping_field_value( - context, "event_time", "time_column" + if self.time_column and strategy != "incremental_by_time_range": + get_console().log_warning( + f"Using `time_column` on a model with incremental_strategy '{strategy}' has been deprecated. " + f"Please use `incremental_by_time_range` instead in model '{self.canonical_name(context)}'." ) + + if strategy == "microbatch": + if self.time_column: + raise ConfigError( + f"{self.canonical_name(context)}: 'time_column' cannot be used with 'microbatch' incremental strategy. Use 'event_time' instead." + ) + time_column = self._get_field_value("event_time") if not time_column: raise ConfigError( f"{self.canonical_name(context)}: 'event_time' is required for microbatch incremental strategy." @@ -342,11 +358,22 @@ def model_kind(self, context: DbtContext) -> ModelKind: ) time_column = self.time_column + incremental_by_time_range_kwargs = { + "time_column": time_column, + } + if self.auto_restatement_intervals: + incremental_by_time_range_kwargs["auto_restatement_intervals"] = ( + self.auto_restatement_intervals + ) + if self.partition_by_time_column is not None: + incremental_by_time_range_kwargs["partition_by_time_column"] = ( + self.partition_by_time_column + ) + return IncrementalByTimeRangeKind( - time_column=time_column, - auto_restatement_intervals=self.auto_restatement_intervals, **incremental_kind_kwargs, **incremental_by_kind_kwargs, + **incremental_by_time_range_kwargs, ) if self.unique_key: @@ -384,7 +411,7 @@ def model_kind(self, context: DbtContext) -> ModelKind: IncrementalUnmanagedKind ) return IncrementalUnmanagedKind( - insert_overwrite=strategy in INCREMENTAL_BY_TIME_STRATEGIES, + insert_overwrite=strategy in INCREMENTAL_BY_TIME_RANGE_STRATEGIES, disable_restatement=incremental_by_kind_kwargs["disable_restatement"], **incremental_kind_kwargs, ) diff --git a/tests/dbt/test_manifest.py b/tests/dbt/test_manifest.py index ba8971e9b2..1ea94cceb0 100644 --- a/tests/dbt/test_manifest.py +++ b/tests/dbt/test_manifest.py @@ -5,6 +5,7 @@ import pytest from sqlmesh.core.config import ModelDefaultsConfig +from sqlmesh.core.model import TimeColumn from sqlmesh.dbt.basemodel import Dependencies from sqlmesh.dbt.common import ModelAttrs from sqlmesh.dbt.context import DbtContext @@ -83,7 +84,7 @@ def test_manifest_helper(caplog): assert waiter_as_customer_by_day_config.materialized == "incremental" assert waiter_as_customer_by_day_config.incremental_strategy == "delete+insert" assert waiter_as_customer_by_day_config.cluster_by == ["ds"] - assert waiter_as_customer_by_day_config.time_column == "ds" + assert waiter_as_customer_by_day_config.time_column == TimeColumn.create("ds", "duckdb") if DBT_VERSION >= (1, 5, 0): waiter_revenue_by_day_config = models["waiter_revenue_by_day_v2"] @@ -105,7 +106,7 @@ def test_manifest_helper(caplog): assert waiter_revenue_by_day_config.materialized == "incremental" assert waiter_revenue_by_day_config.incremental_strategy == "delete+insert" assert waiter_revenue_by_day_config.cluster_by == ["ds"] - assert waiter_revenue_by_day_config.time_column == "ds" + assert waiter_revenue_by_day_config.time_column == TimeColumn.create("ds", "duckdb") assert waiter_revenue_by_day_config.dialect_ == "bigquery" assert helper.models("customers")["customers"].dependencies == Dependencies( diff --git a/tests/dbt/test_model.py b/tests/dbt/test_model.py index 9f4f75bdbd..5037a69d65 100644 --- a/tests/dbt/test_model.py +++ b/tests/dbt/test_model.py @@ -7,6 +7,7 @@ from sqlglot import exp from sqlmesh import Context from sqlmesh.core.model import TimeColumn, IncrementalByTimeRangeKind +from sqlmesh.core.model.kind import OnDestructiveChange, OnAdditiveChange from sqlmesh.dbt.common import Dependencies from sqlmesh.dbt.context import DbtContext from sqlmesh.dbt.model import ModelConfig @@ -301,3 +302,148 @@ def test_load_microbatch_required_only( ) assert model.kind.batch_size == 1 assert model.depends_on_self is False + + +@pytest.mark.slow +def test_load_incremental_time_range_strategy_required_only( + tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project +) -> None: + project_dir, model_dir = create_empty_project() + # add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it + incremental_time_range_contents = """ + {{ + config( + materialized='incremental', + incremental_strategy='incremental_by_time_range', + time_column='ds', + ) + }} + + SELECT 1 as cola, '2021-01-01' as ds + """ + incremental_time_range_model_file = model_dir / "incremental_time_range.sql" + with open(incremental_time_range_model_file, "w", encoding="utf-8") as f: + f.write(incremental_time_range_contents) + + snapshot_fqn = '"local"."main"."incremental_time_range"' + context = Context(paths=project_dir) + model = context.snapshots[snapshot_fqn].model + # Validate model-level attributes + assert model.start == "2025-01-01" + assert model.interval_unit.is_day + # Validate model kind attributes + assert isinstance(model.kind, IncrementalByTimeRangeKind) + assert model.kind.lookback == 1 + assert model.kind.time_column == TimeColumn( + column=exp.to_column("ds", quoted=True), format="%Y-%m-%d" + ) + assert model.kind.batch_size is None + assert model.depends_on_self is False + assert model.kind.auto_restatement_intervals is None + assert model.kind.partition_by_time_column is True + + +@pytest.mark.slow +def test_load_incremental_time_range_strategy_all_defined( + tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project +) -> None: + project_dir, model_dir = create_empty_project() + # add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it + incremental_time_range_contents = """ + {{ + config( + materialized='incremental', + incremental_strategy='incremental_by_time_range', + time_column={ + 'column': 'ds', + 'format': '%Y%m%d' + }, + auto_restatement_intervals=3, + partition_by_time_column=false, + lookback=5, + batch_size=3, + batch_concurrency=2, + forward_only=true, + disable_restatement=true, + on_destructive_change='allow', + on_additive_change='error', + auto_restatement_cron='@hourly', + on_schema_change='ignore' + ) + }} + + SELECT 1 as cola, '2021-01-01' as ds + """ + incremental_time_range_model_file = model_dir / "incremental_time_range.sql" + with open(incremental_time_range_model_file, "w", encoding="utf-8") as f: + f.write(incremental_time_range_contents) + + snapshot_fqn = '"local"."main"."incremental_time_range"' + context = Context(paths=project_dir) + model = context.snapshots[snapshot_fqn].model + # Validate model-level attributes + assert model.start == "2025-01-01" + assert model.interval_unit.is_day + # Validate model kind attributes + assert isinstance(model.kind, IncrementalByTimeRangeKind) + # `on_schema_change` is ignored since the user explicitly overrode the values + assert model.kind.on_destructive_change == OnDestructiveChange.ALLOW + assert model.kind.on_additive_change == OnAdditiveChange.ERROR + assert model.kind.forward_only is True + assert model.kind.disable_restatement is True + assert model.kind.auto_restatement_cron == "@hourly" + assert model.kind.auto_restatement_intervals == 3 + assert model.kind.partition_by_time_column is False + assert model.kind.lookback == 5 + assert model.kind.time_column == TimeColumn( + column=exp.to_column("ds", quoted=True), format="%Y%m%d" + ) + assert model.kind.batch_size == 3 + assert model.kind.batch_concurrency == 2 + assert model.depends_on_self is False + + +@pytest.mark.slow +def test_load_deprecated_incremental_time_column( + tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project +) -> None: + project_dir, model_dir = create_empty_project() + # add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it + incremental_time_range_contents = """ + {{ + config( + materialized='incremental', + incremental_strategy='delete+insert', + time_column='ds' + ) + }} + + SELECT 1 as cola, '2021-01-01' as ds + """ + incremental_time_range_model_file = model_dir / "incremental_time_range.sql" + with open(incremental_time_range_model_file, "w", encoding="utf-8") as f: + f.write(incremental_time_range_contents) + + snapshot_fqn = '"local"."main"."incremental_time_range"' + context = Context(paths=project_dir) + model = context.snapshots[snapshot_fqn].model + # Validate model-level attributes + assert model.start == "2025-01-01" + assert model.interval_unit.is_day + # Validate model-level attributes + assert model.start == "2025-01-01" + assert model.interval_unit.is_day + # Validate model kind attributes + assert isinstance(model.kind, IncrementalByTimeRangeKind) + assert model.kind.lookback == 1 + assert model.kind.time_column == TimeColumn( + column=exp.to_column("ds", quoted=True), format="%Y-%m-%d" + ) + assert model.kind.batch_size is None + assert model.depends_on_self is False + assert model.kind.auto_restatement_intervals is None + assert model.kind.partition_by_time_column is True + assert ( + "Using `time_column` on a model with incremental_strategy 'delete+insert' has been deprecated. Please use `incremental_by_time_range` instead in model 'main.incremental_time_range'." + in caplog.text + )