Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions sqlmesh/dbt/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import typing as t
import sqlmesh.core.dialect as d
from pathlib import Path
from collections import defaultdict
from sqlmesh.core.config import (
Config,
ConnectionConfig,
Expand Down Expand Up @@ -137,16 +138,22 @@ def _to_sqlmesh(config: BMC, context: DbtContext) -> Model:
package_context.set_and_render_variables(package.variables, package.name)
package_models: t.Dict[str, BaseModelConfig] = {**package.models, **package.seeds}

package_models_by_path: t.Dict[Path, t.List[BaseModelConfig]] = defaultdict(list)
for model in package_models.values():
if isinstance(model, ModelConfig) and not model.sql.strip():
logger.info(f"Skipping empty model '{model.name}' at path '{model.path}'.")
continue
package_models_by_path[model.path].append(model)

sqlmesh_model = cache.get_or_load_models(
model.path, loader=lambda: [_to_sqlmesh(model, package_context)]
)[0]

models[sqlmesh_model.fqn] = sqlmesh_model
for path, path_models in package_models_by_path.items():
sqlmesh_models = cache.get_or_load_models(
path,
loader=lambda: [
_to_sqlmesh(model, package_context) for model in path_models
],
)
for sqlmesh_model in sqlmesh_models:
models[sqlmesh_model.fqn] = sqlmesh_model

models.update(self._load_external_models(audits, cache))

Expand Down
12 changes: 12 additions & 0 deletions tests/dbt/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,18 @@ def test_load_microbatch_with_ref_no_filter(
)


@pytest.mark.slow
def test_load_multiple_snapshots_defined_in_same_file(sushi_test_dbt_context: Context) -> None:
context = sushi_test_dbt_context
assert context.get_model("snapshots.items_snapshot")
assert context.get_model("snapshots.items_check_snapshot")

# Make sure cache works too
context.load()
assert context.get_model("snapshots.items_snapshot")
assert context.get_model("snapshots.items_check_snapshot")


@pytest.mark.slow
def test_dbt_jinja_macro_undefined_variable_error(create_empty_project):
project_dir, model_dir = create_empty_project()
Expand Down
15 changes: 0 additions & 15 deletions tests/fixtures/dbt/sushi_test/snapshots/items_check_snapshot.sql

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,19 @@
select * from {{ source('streaming', 'items') }}

{% endsnapshot %}

{% snapshot items_check_snapshot %}

{{
config(
target_schema='snapshots',
unique_key='id',
strategy='check',
check_cols=['ds'],
invalidate_hard_deletes=True,
)
}}

select * from {{ source('streaming', 'items') }}

{% endsnapshot %}