Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions sqlmesh/dbt/basemodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,22 @@ def tests_ref_source_dependencies(self) -> Dependencies:
dependencies.macros = []
return dependencies

def remove_tests_with_invalid_refs(self, context: DbtContext) -> None:
"""
Removes tests that reference models that do not exist in the context in order to match dbt behavior.

Args:
context: The dbt context this model resides within.

Returns:
None
"""
self.tests = [
test
for test in self.tests
if all(ref in context.refs for ref in test.dependencies.refs)
]

def check_for_circular_test_refs(self, context: DbtContext) -> None:
"""
Checks for direct circular references between two models and raises an exception if found.
Expand Down Expand Up @@ -295,6 +311,7 @@ def sqlmesh_model_kwargs(
column_types_override: t.Optional[t.Dict[str, ColumnConfig]] = None,
) -> t.Dict[str, t.Any]:
"""Get common sqlmesh model parameters"""
self.remove_tests_with_invalid_refs(context)
self.check_for_circular_test_refs(context)
model_dialect = self.dialect(context)
model_context = context.context_for_dependencies(
Expand Down
5 changes: 3 additions & 2 deletions sqlmesh/dbt/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from sqlmesh.dbt.manifest import ManifestHelper
from sqlmesh.dbt.target import TargetConfig
from sqlmesh.utils import AttributeDict
from sqlmesh.utils.errors import ConfigError, SQLMeshError
from sqlmesh.utils.errors import ConfigError, SQLMeshError, MissingModelError
from sqlmesh.utils.jinja import (
JinjaGlobalAttribute,
JinjaMacroRegistry,
Expand Down Expand Up @@ -265,7 +265,8 @@ def context_for_dependencies(self, dependencies: Dependencies) -> DbtContext:
else:
models[ref] = t.cast(ModelConfig, model)
else:
raise ConfigError(f"Model '{ref}' was not found.")
exception = MissingModelError(ref)
raise exception

for source in dependencies.sources:
if source in self.sources:
Expand Down
11 changes: 9 additions & 2 deletions sqlmesh/dbt/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from sqlmesh.dbt.project import Project
from sqlmesh.dbt.target import TargetConfig
from sqlmesh.utils import UniqueKeyDict
from sqlmesh.utils.errors import ConfigError
from sqlmesh.utils.errors import ConfigError, MissingModelError
from sqlmesh.utils.jinja import (
JinjaMacroRegistry,
make_jinja_registry,
Expand Down Expand Up @@ -162,7 +162,14 @@ def _load_audits(
context.set_and_render_variables(package.variables, package.name)
for test in package.tests.values():
logger.debug("Converting '%s' to sqlmesh format", test.name)
audits[test.name] = test.to_sqlmesh(context)
try:
audits[test.name] = test.to_sqlmesh(context)
except MissingModelError as e:
logger.warning(
"Skipping audit '%s' because model '%s' is not a valid ref",
test.name,
e.model_name,
)

return audits

Expand Down
8 changes: 8 additions & 0 deletions sqlmesh/utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ def __init__(self, message: str | Exception, location: t.Optional[Path] = None)
self.location = Path(location) if isinstance(location, str) else location


class MissingModelError(ConfigError):
"""Raised when a model that is referenced is missing."""

def __init__(self, model_name: str) -> None:
self.model_name = model_name
super().__init__(f"Model '{model_name}' was not found.")


class MissingDependencyError(SQLMeshError):
"""Local environment is missing a required dependency for the given operation"""

Expand Down
90 changes: 90 additions & 0 deletions tests/dbt/test_model.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import pytest

from pathlib import Path

from sqlmesh import Context
from sqlmesh.dbt.common import Dependencies
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.model import ModelConfig
from sqlmesh.dbt.test import TestConfig
from sqlmesh.utils.errors import ConfigError
from sqlmesh.utils.yaml import YAML

pytestmark = pytest.mark.dbt

Expand Down Expand Up @@ -44,3 +48,89 @@ def test_model_test_circular_references() -> None:
upstream_model.check_for_circular_test_refs(context)
with pytest.raises(ConfigError, match="between tests"):
downstream_model.check_for_circular_test_refs(context)


def test_load_invalid_ref_audit_constraints(tmp_path: Path, caplog) -> None:
yaml = YAML()
dbt_project_dir = tmp_path / "dbt"
dbt_project_dir.mkdir()
dbt_model_dir = dbt_project_dir / "models"
dbt_model_dir.mkdir()
full_model_contents = "SELECT 1 as cola"
full_model_file = dbt_model_dir / "full_model.sql"
with open(full_model_file, "w", encoding="utf-8") as f:
f.write(full_model_contents)
model_schema = {
"version": 2,
"models": [
{
"name": "full_model",
"description": "A full model bad ref for audit and constraints",
"columns": [
{
"name": "cola",
"description": "A column that is used in a ref audit and constraints",
"constraints": [
{
"type": "primary_key",
"columns": ["cola"],
"expression": "ref('not_real_model') (cola)",
}
],
"tests": [
{
"relationships": {
"to": "ref('not_real_model')",
"field": "cola",
"description": "A test that references a model that does not exist",
}
}
],
}
],
}
],
}
model_schema_file = dbt_model_dir / "schema.yml"
with open(model_schema_file, "w", encoding="utf-8") as f:
yaml.dump(model_schema, f)
dbt_project_config = {
"name": "invalid_ref_audit_constraints",
"version": "1.0.0",
"config-version": 2,
"profile": "test",
"model-paths": ["models"],
}
dbt_project_file = dbt_project_dir / "dbt_project.yml"
with open(dbt_project_file, "w", encoding="utf-8") as f:
yaml.dump(dbt_project_config, f)
sqlmesh_config = {
"model_defaults": {
"start": "2025-01-01",
}
}
sqlmesh_config_file = dbt_project_dir / "sqlmesh.yaml"
with open(sqlmesh_config_file, "w", encoding="utf-8") as f:
yaml.dump(sqlmesh_config, f)
dbt_data_dir = tmp_path / "dbt_data"
dbt_data_dir.mkdir()
dbt_data_file = dbt_data_dir / "local.db"
dbt_profile_config = {
"test": {
"outputs": {"duckdb": {"type": "duckdb", "path": str(dbt_data_file)}},
"target": "duckdb",
}
}
db_profile_file = dbt_project_dir / "profiles.yml"
with open(db_profile_file, "w", encoding="utf-8") as f:
yaml.dump(dbt_profile_config, f)

context = Context(paths=dbt_project_dir)
assert (
"Skipping audit 'relationships_full_model_cola__cola__ref_not_real_model_' because model 'not_real_model' is not a valid ref"
in caplog.text
)
fqn = '"local"."main"."full_model"'
assert fqn in context.snapshots
# The audit isn't loaded due to the invalid ref
assert context.snapshots[fqn].model.audits == []