Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 8 additions & 16 deletions docs/integrations/dbt.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ This section describes how to adapt dbt's incremental models to run on sqlmesh a
SQLMesh supports two approaches to implement [idempotent](../concepts/glossary.md#idempotency) incremental loads:

* Using merge (with the sqlmesh [`INCREMENTAL_BY_UNIQUE_KEY` model kind](../concepts/models/model_kinds.md#incremental_by_unique_key))
* Using insert-overwrite/delete+insert (with the sqlmesh [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range))
* Using [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range)

#### Incremental by unique key

Expand All @@ -233,28 +233,22 @@ To enable incremental_by_unique_key incrementality, the model configuration shou

#### Incremental by time range

To enable incremental_by_time_range incrementality, the model configuration should contain:
To enable incremental_by_time_range incrementality, the model configuration must contain:

* The `time_column` key with the model's time column field name as the value (see [`time column`](../concepts/models/model_kinds.md#time-column) for details)
* The `materialized` key with value `'incremental'`
* Either:
* The `incremental_strategy` key with value `'insert_overwrite'` or
* The `incremental_strategy` key with value `'delete+insert'`
* Note: in this context, these two strategies are synonyms. Regardless of which one is specified SQLMesh will use the [`best incremental strategy`](../concepts/models/model_kinds.md#materialization-strategy) for the target engine.
* The `incremental_strategy` key with the value `incremental_by_time_range`
* The `time_column` key with the model's time column field name as the value (see [`time column`](../concepts/models/model_kinds.md#time-column) for details)

### Incremental logic

SQLMesh requires a new jinja block gated by `{% if sqlmesh_incremental is defined %}`. The new block should supersede the existing `{% if is_incremental() %}` block and contain the `WHERE` clause selecting the time interval.
Unlike dbt incremental strategies, SQLMesh does not require the use of `is_incremental` jinja blocks to implement incremental logic.
Instead, SQLMesh provides predefined time macro variables that can be used in the model's SQL to filter data based on the time column.

For example, the SQL `WHERE` clause with the "ds" column goes in a new jinja block gated by `{% if sqlmesh_incremental is defined %}` as follows:

```bash
> {% if sqlmesh_incremental is defined %}
> WHERE
> ds BETWEEN '{{ start_ds }}' AND '{{ end_ds }}'
> {% elif is_incremental() %}
> ; < your existing is_incremental block >
> {% endif %}
```

`{{ start_ds }}` and `{{ end_ds }}` are the jinja equivalents of SQLMesh's `@start_ds` and `@end_ds` predefined time macro variables. See all [predefined time variables](../concepts/macros/macro_variables.md) available in jinja.
Expand All @@ -263,13 +257,11 @@ For example, the SQL `WHERE` clause with the "ds" column goes in a new jinja blo

SQLMesh provides configuration parameters that enable control over how incremental computations occur. These parameters are set in the model's `config` block.

The [`batch_size` parameter](../concepts/models/overview.md#batch_size) determines the maximum number of time intervals to run in a single job.

The [`lookback` parameter](../concepts/models/overview.md#lookback) is used to capture late arriving data. It sets the number of units of late arriving data the model should expect and must be a positive integer.
See [Incremental Model Properties](../concepts/models/overview.md#incremental-model-properties) for the full list of incremental model configuration parameters.

**Note:** By default, all incremental dbt models are configured to be [forward-only](../concepts/plans.md#forward-only-plans). However, you can change this behavior by setting the `forward_only: false` setting either in the configuration of an individual model or globally for all models in the `dbt_project.yaml` file. The [forward-only](../concepts/plans.md#forward-only-plans) mode aligns more closely with the typical operation of dbt and therefore better meets user's expectations.

Similarly, the [allow_partials](../concepts/models/overview.md#allow_partials) parameter is set to `true` by default for incremental dbt models unless the time column is specified, or the `allow_partials` parameter is explicitly set to `false` in the model configuration.
Similarly, the [allow_partials](../concepts/models/overview.md#allow_partials) parameter is set to `true` by default unless the `allow_partials` parameter is explicitly set to `false` in the model configuration.

#### on_schema_change

Expand Down
2 changes: 1 addition & 1 deletion examples/sushi_dbt/models/customer_revenue_by_day.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{
config(
materialized='incremental',
incremental_strategy='delete+insert',
incremental_strategy='incremental_by_time_range',
cluster_by=['ds'],
time_column='ds',
)
Expand Down
2 changes: 1 addition & 1 deletion examples/sushi_dbt/models/waiter_as_customer_by_day.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{
config(
materialized='incremental',
incremental_strategy='delete+insert',
incremental_strategy='incremental_by_time_range',
cluster_by=['ds'],
time_column='ds',
)
Expand Down
2 changes: 1 addition & 1 deletion examples/sushi_dbt/models/waiter_revenue_by_day.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{
config(
materialized='incremental',
incremental_strategy='delete+insert',
incremental_strategy='incremental_by_time_range',
cluster_by=['ds'],
time_column='ds',
)
Expand Down
2 changes: 1 addition & 1 deletion examples/sushi_dbt/models/waiter_revenue_by_day_v1.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{
config(
materialized='incremental',
incremental_strategy='delete+insert',
incremental_strategy='incremental_by_time_range',
cluster_by=['ds'],
time_column='ds',
)
Expand Down
79 changes: 53 additions & 26 deletions sqlmesh/dbt/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@
ManagedKind,
create_sql_model,
)
from sqlmesh.core.model.kind import SCDType2ByTimeKind, OnDestructiveChange, OnAdditiveChange
from sqlmesh.core.model.kind import (
SCDType2ByTimeKind,
OnDestructiveChange,
OnAdditiveChange,
on_destructive_change_validator,
on_additive_change_validator,
TimeColumn,
)
from sqlmesh.dbt.basemodel import BaseModelConfig, Materialization, SnapshotStrategy
from sqlmesh.dbt.common import SqlStr, sql_str_validator
from sqlmesh.utils.errors import ConfigError
Expand All @@ -41,7 +48,9 @@
logger = logging.getLogger(__name__)


INCREMENTAL_BY_TIME_STRATEGIES = set(["delete+insert", "insert_overwrite", "microbatch"])
INCREMENTAL_BY_TIME_RANGE_STRATEGIES = set(
["delete+insert", "insert_overwrite", "microbatch", "incremental_by_time_range"]
)
INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES = set(["merge"])


Expand Down Expand Up @@ -77,7 +86,7 @@ class ModelConfig(BaseModelConfig):

# sqlmesh fields
sql: SqlStr = SqlStr("")
time_column: t.Optional[str] = None
time_column: t.Optional[TimeColumn] = None
cron: t.Optional[str] = None
interval_unit: t.Optional[str] = None
batch_concurrency: t.Optional[int] = None
Expand All @@ -87,6 +96,9 @@ class ModelConfig(BaseModelConfig):
physical_version: t.Optional[str] = None
auto_restatement_cron: t.Optional[str] = None
auto_restatement_intervals: t.Optional[int] = None
partition_by_time_column: t.Optional[bool] = None
on_destructive_change: t.Optional[OnDestructiveChange] = None
on_additive_change: t.Optional[OnAdditiveChange] = None

# DBT configuration fields
cluster_by: t.Optional[t.List[str]] = None
Expand Down Expand Up @@ -139,6 +151,9 @@ class ModelConfig(BaseModelConfig):
incremental_predicates: t.Optional[t.List[str]] = None

_sql_validator = sql_str_validator
_on_destructive_change_validator = on_destructive_change_validator
_on_additive_change_validator = on_additive_change_validator
_time_column_validator = TimeColumn.validator()

@field_validator(
"unique_key",
Expand Down Expand Up @@ -230,17 +245,6 @@ def snapshot_strategy(self) -> t.Optional[SnapshotStrategy]:
def table_schema(self) -> str:
return self.target_schema or super().table_schema

def _get_overlapping_field_value(
self, context: DbtContext, dbt_field_name: str, sqlmesh_field_name: str
) -> t.Optional[t.Any]:
dbt_field = self._get_field_value(dbt_field_name)
sqlmesh_field = getattr(self, sqlmesh_field_name, None)
if dbt_field is not None and sqlmesh_field is not None:
get_console().log_warning(
f"Both '{dbt_field_name}' and '{sqlmesh_field_name}' are set for model '{self.canonical_name(context)}'. '{sqlmesh_field_name}' will be used."
)
return sqlmesh_field if sqlmesh_field is not None else dbt_field

def model_kind(self, context: DbtContext) -> ModelKind:
"""
Get the sqlmesh ModelKind
Expand Down Expand Up @@ -275,8 +279,12 @@ def model_kind(self, context: DbtContext) -> ModelKind:
"Valid values are 'ignore', 'fail', 'append_new_columns', 'sync_all_columns'."
)

incremental_kind_kwargs["on_destructive_change"] = on_destructive_change
incremental_kind_kwargs["on_additive_change"] = on_additive_change
incremental_kind_kwargs["on_destructive_change"] = (
self._get_field_value("on_destructive_change") or on_destructive_change
)
incremental_kind_kwargs["on_additive_change"] = (
self._get_field_value("on_additive_change") or on_additive_change
)
auto_restatement_cron_value = self._get_field_value("auto_restatement_cron")
if auto_restatement_cron_value is not None:
incremental_kind_kwargs["auto_restatement_cron"] = auto_restatement_cron_value
Expand All @@ -292,7 +300,8 @@ def model_kind(self, context: DbtContext) -> ModelKind:
incremental_kind_kwargs["forward_only"] = forward_only_value

is_incremental_by_time_range = self.time_column or (
self.incremental_strategy and self.incremental_strategy == "microbatch"
self.incremental_strategy
and self.incremental_strategy in {"microbatch", "incremental_by_time_range"}
)
# Get shared incremental by kwargs
for field in ("batch_size", "batch_concurrency", "lookback"):
Expand All @@ -313,22 +322,29 @@ def model_kind(self, context: DbtContext) -> ModelKind:
)
incremental_by_kind_kwargs["disable_restatement"] = disable_restatement

# Incremental by time range which includes microbatch
if is_incremental_by_time_range:
strategy = self.incremental_strategy or target.default_incremental_strategy(
IncrementalByTimeRangeKind
)

if strategy not in INCREMENTAL_BY_TIME_STRATEGIES:
if strategy not in INCREMENTAL_BY_TIME_RANGE_STRATEGIES:
get_console().log_warning(
f"SQLMesh incremental by time strategy is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. "
f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_STRATEGIES)}."
f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_RANGE_STRATEGIES)}."
)

if strategy == "microbatch":
time_column = self._get_overlapping_field_value(
context, "event_time", "time_column"
if self.time_column and strategy != "incremental_by_time_range":
get_console().log_warning(
f"Using `time_column` on a model with incremental_strategy '{strategy}' has been deprecated. "
f"Please use `incremental_by_time_range` instead in model '{self.canonical_name(context)}'."
)

if strategy == "microbatch":
if self.time_column:
raise ConfigError(
f"{self.canonical_name(context)}: 'time_column' cannot be used with 'microbatch' incremental strategy. Use 'event_time' instead."
)
time_column = self._get_field_value("event_time")
if not time_column:
raise ConfigError(
f"{self.canonical_name(context)}: 'event_time' is required for microbatch incremental strategy."
Expand All @@ -342,11 +358,22 @@ def model_kind(self, context: DbtContext) -> ModelKind:
)
time_column = self.time_column

incremental_by_time_range_kwargs = {
"time_column": time_column,
}
if self.auto_restatement_intervals:
incremental_by_time_range_kwargs["auto_restatement_intervals"] = (
self.auto_restatement_intervals
)
if self.partition_by_time_column is not None:
incremental_by_time_range_kwargs["partition_by_time_column"] = (
self.partition_by_time_column
)

return IncrementalByTimeRangeKind(
time_column=time_column,
auto_restatement_intervals=self.auto_restatement_intervals,
**incremental_kind_kwargs,
**incremental_by_kind_kwargs,
**incremental_by_time_range_kwargs,
)

if self.unique_key:
Expand Down Expand Up @@ -384,7 +411,7 @@ def model_kind(self, context: DbtContext) -> ModelKind:
IncrementalUnmanagedKind
)
return IncrementalUnmanagedKind(
insert_overwrite=strategy in INCREMENTAL_BY_TIME_STRATEGIES,
insert_overwrite=strategy in INCREMENTAL_BY_TIME_RANGE_STRATEGIES,
disable_restatement=incremental_by_kind_kwargs["disable_restatement"],
**incremental_kind_kwargs,
)
Expand Down
5 changes: 3 additions & 2 deletions tests/dbt/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pytest

from sqlmesh.core.config import ModelDefaultsConfig
from sqlmesh.core.model import TimeColumn
from sqlmesh.dbt.basemodel import Dependencies
from sqlmesh.dbt.common import ModelAttrs
from sqlmesh.dbt.context import DbtContext
Expand Down Expand Up @@ -83,7 +84,7 @@ def test_manifest_helper(caplog):
assert waiter_as_customer_by_day_config.materialized == "incremental"
assert waiter_as_customer_by_day_config.incremental_strategy == "delete+insert"
assert waiter_as_customer_by_day_config.cluster_by == ["ds"]
assert waiter_as_customer_by_day_config.time_column == "ds"
assert waiter_as_customer_by_day_config.time_column == TimeColumn.create("ds", "duckdb")

if DBT_VERSION >= (1, 5, 0):
waiter_revenue_by_day_config = models["waiter_revenue_by_day_v2"]
Expand All @@ -105,7 +106,7 @@ def test_manifest_helper(caplog):
assert waiter_revenue_by_day_config.materialized == "incremental"
assert waiter_revenue_by_day_config.incremental_strategy == "delete+insert"
assert waiter_revenue_by_day_config.cluster_by == ["ds"]
assert waiter_revenue_by_day_config.time_column == "ds"
assert waiter_revenue_by_day_config.time_column == TimeColumn.create("ds", "duckdb")
assert waiter_revenue_by_day_config.dialect_ == "bigquery"

assert helper.models("customers")["customers"].dependencies == Dependencies(
Expand Down
Loading