Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions sqlmesh/dbt/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,8 @@ def model_kind(self, context: DbtContext) -> ModelKind:
raise ConfigError(
f"{self.canonical_name(context)}: 'event_time' is required for microbatch incremental strategy."
)
concurrent_batches = self._get_field_value("concurrent_batches")
if concurrent_batches is True:
if incremental_by_kind_kwargs.get("batch_size"):
get_console().log_warning(
f"'concurrent_batches' is set to True and 'batch_size' are defined in '{self.canonical_name(context)}'. The batch size will be set to the value of `batch_size`."
)
incremental_by_kind_kwargs["batch_size"] = incremental_by_kind_kwargs.get(
"batch_size", 1
)
# dbt microbatch always processes batches in a size of 1
incremental_by_kind_kwargs["batch_size"] = 1
else:
if not self.time_column:
raise ConfigError(
Expand Down Expand Up @@ -674,6 +667,11 @@ def to_sqlmesh(
)
else:
model_kwargs["start"] = begin
# If user explicitly disables concurrent batches then we want to set depends on past to true which we
# will do by including the model in the depends_on
if self.concurrent_batches is not None and self.concurrent_batches is False:
depends_on = model_kwargs.get("depends_on", set())
depends_on.add(self.canonical_name(context))

model_kwargs["start"] = model_kwargs.get(
"start", context.sqlmesh_config.model_defaults.start
Expand Down
7 changes: 5 additions & 2 deletions tests/dbt/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def test_load_microbatch_all_defined(
column=exp.to_column("ds", quoted=True), format="%Y-%m-%d"
)
assert model.kind.batch_size == 1
assert model.depends_on_self is False


@pytest.mark.slow
Expand Down Expand Up @@ -259,7 +260,8 @@ def test_load_microbatch_all_defined_diff_values(
assert model.kind.time_column == TimeColumn(
column=exp.to_column("blah", quoted=True), format="%Y-%m-%d"
)
assert model.kind.batch_size is None
assert model.kind.batch_size == 1
assert model.depends_on_self is True


@pytest.mark.slow
Expand Down Expand Up @@ -297,4 +299,5 @@ def test_load_microbatch_required_only(
assert model.kind.time_column == TimeColumn(
column=exp.to_column("ds", quoted=True), format="%Y-%m-%d"
)
assert model.kind.batch_size is None
assert model.kind.batch_size == 1
assert model.depends_on_self is False