Skip to content

Commit f5c48f0

Browse files
add the dbt node name
1 parent e42ad82 commit f5c48f0

File tree

10 files changed

+105
-39
lines changed

10 files changed

+105
-39
lines changed

sqlmesh/core/context.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1677,7 +1677,11 @@ def plan_builder(
16771677
end_override_per_model=max_interval_end_per_model,
16781678
console=self.console,
16791679
user_provided_flags=user_provided_flags,
1680-
selected_models=model_selector.expand_model_selections(select_models or "*"),
1680+
selected_models={
1681+
node_name
1682+
for model in model_selector.expand_model_selections(select_models or "*")
1683+
if (node_name := snapshots[model].node.node_name)
1684+
},
16811685
explain=explain or False,
16821686
ignore_cron=ignore_cron or False,
16831687
)

sqlmesh/core/node.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ class _Node(PydanticModel):
199199
interval_unit_: t.Optional[IntervalUnit] = Field(alias="interval_unit", default=None)
200200
tags: t.List[str] = []
201201
stamp: t.Optional[str] = None
202+
node_name: t.Optional[str] = None # dbt node name
202203
_path: t.Optional[Path] = None
203204
_data_hash: t.Optional[str] = None
204205
_metadata_hash: t.Optional[str] = None

sqlmesh/core/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,7 @@ def _run_or_audit(
812812
run_environment_statements=run_environment_statements,
813813
audit_only=audit_only,
814814
auto_restatement_triggers=auto_restatement_triggers,
815-
selected_models=selected_snapshots or {s.name for s in merged_intervals},
815+
selected_models={s.node.node_name for s in merged_intervals if s.node.node_name},
816816
)
817817

818818
return CompletionStatus.FAILURE if errors else CompletionStatus.SUCCESS

sqlmesh/dbt/adapter.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,6 @@ def graph(self) -> t.Any:
181181
}
182182
)
183183

184-
@property
185-
def selected_resources(self) -> t.List[str]:
186-
return []
187-
188184

189185
class ParsetimeAdapter(BaseAdapter):
190186
def get_relation(self, database: str, schema: str, identifier: str) -> t.Optional[BaseRelation]:
@@ -505,8 +501,3 @@ def _normalize(self, input_table: exp.Table) -> exp.Table:
505501
normalized_table.set("db", normalized_table.this)
506502
normalized_table.set("this", None)
507503
return normalized_table
508-
509-
def _dbt_model_id(self, sqlmesh_model_name: str) -> str:
510-
# Model prefix is needed to correspond to the key in the nodes within the dbt context variable
511-
parts = [part.strip('"') for part in sqlmesh_model_name.split(".")]
512-
return f"model.{parts[0]}.{parts[-1]}"

sqlmesh/dbt/builtin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ def create_builtin_globals(
545545
"run_query": sql_execution.run_query,
546546
"statement": sql_execution.statement,
547547
"graph": adapter.graph,
548-
"selected_resources": adapter.selected_resources,
548+
"selected_resources": list(jinja_globals.get("selected_models") or []),
549549
}
550550
)
551551

sqlmesh/dbt/model.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,7 @@ def to_sqlmesh(
689689
extract_dependencies_from_query=False,
690690
allow_partials=allow_partials,
691691
virtual_environment_mode=virtual_environment_mode,
692+
node_name=self.node_name,
692693
**optional_kwargs,
693694
**model_kwargs,
694695
)

sqlmesh/dbt/seed.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ def to_sqlmesh(
9292
audit_definitions=audit_definitions,
9393
virtual_environment_mode=virtual_environment_mode,
9494
start=self.start or context.sqlmesh_config.model_defaults.start,
95+
node_name=self.node_name,
9596
**kwargs,
9697
)
9798

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
"""Add 'node_name' property to node definition."""
2+
3+
4+
def migrate_schemas(state_sync, **kwargs): # type: ignore
5+
pass
6+
7+
8+
def migrate_rows(state_sync, **kwargs): # type: ignore
9+
pass

tests/dbt/test_model.py

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ def test_load_microbatch_all_defined(
201201
concurrent_batches=true
202202
)
203203
}}
204-
204+
205205
SELECT 1 as cola, '2025-01-01' as ds
206206
"""
207207
microbatch_model_file = model_dir / "microbatch.sql"
@@ -633,3 +633,80 @@ def test_dbt_jinja_macro_undefined_variable_error(create_empty_project):
633633
assert "Failed to update model schemas" in error_message
634634
assert "Could not render jinja for" in error_message
635635
assert "Undefined macro/variable: 'columns' in macro: 'select_columns'" in error_message
636+
637+
638+
@pytest.mark.slow
639+
def test_node_name_populated_for_dbt_models(dbt_dummy_postgres_config: PostgresConfig) -> None:
640+
model_config = ModelConfig(
641+
name="test_model",
642+
package_name="test_package",
643+
sql="SELECT 1 as id",
644+
database="test_db",
645+
schema_="test_schema",
646+
alias="test_model",
647+
)
648+
649+
context = DbtContext()
650+
context.project_name = "test_project"
651+
context.target = dbt_dummy_postgres_config
652+
653+
# check after convert to SQLMesh model that node_name is populated correctly
654+
sqlmesh_model = model_config.to_sqlmesh(context)
655+
assert sqlmesh_model.node_name == "model.test_package.test_model"
656+
657+
658+
@pytest.mark.slow
659+
def test_load_model_dbt_node_name(tmp_path: Path) -> None:
660+
yaml = YAML()
661+
dbt_project_dir = tmp_path / "dbt"
662+
dbt_project_dir.mkdir()
663+
dbt_model_dir = dbt_project_dir / "models"
664+
dbt_model_dir.mkdir()
665+
666+
model_contents = "SELECT 1 as id, 'test' as name"
667+
model_file = dbt_model_dir / "simple_model.sql"
668+
with open(model_file, "w", encoding="utf-8") as f:
669+
f.write(model_contents)
670+
671+
dbt_project_config = {
672+
"name": "test_project",
673+
"version": "1.0.0",
674+
"config-version": 2,
675+
"profile": "test",
676+
"model-paths": ["models"],
677+
}
678+
dbt_project_file = dbt_project_dir / "dbt_project.yml"
679+
with open(dbt_project_file, "w", encoding="utf-8") as f:
680+
yaml.dump(dbt_project_config, f)
681+
682+
sqlmesh_config = {
683+
"model_defaults": {
684+
"start": "2025-01-01",
685+
}
686+
}
687+
sqlmesh_config_file = dbt_project_dir / "sqlmesh.yaml"
688+
with open(sqlmesh_config_file, "w", encoding="utf-8") as f:
689+
yaml.dump(sqlmesh_config, f)
690+
691+
dbt_data_dir = tmp_path / "dbt_data"
692+
dbt_data_dir.mkdir()
693+
dbt_data_file = dbt_data_dir / "local.db"
694+
dbt_profile_config = {
695+
"test": {
696+
"outputs": {"duckdb": {"type": "duckdb", "path": str(dbt_data_file)}},
697+
"target": "duckdb",
698+
}
699+
}
700+
db_profile_file = dbt_project_dir / "profiles.yml"
701+
with open(db_profile_file, "w", encoding="utf-8") as f:
702+
yaml.dump(dbt_profile_config, f)
703+
704+
context = Context(paths=dbt_project_dir)
705+
706+
# find the model by its sqlmesh fully qualified name
707+
model_fqn = '"local"."main"."simple_model"'
708+
assert model_fqn in context.snapshots
709+
710+
# Verify that node_name is the equivalent dbt one
711+
model = context.snapshots[model_fqn].model
712+
assert model.node_name == "model.test_project.simple_model"

tests/dbt/test_transformation.py

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import typing as t
66
from pathlib import Path
77
from unittest.mock import patch
8-
from sqlmesh.dbt.adapter import RuntimeAdapter
9-
from sqlmesh.utils.jinja import JinjaMacroRegistry
108

119
from sqlmesh.dbt.util import DBT_VERSION
1210

@@ -2417,7 +2415,7 @@ def test_selected_resources_context_variable(
24172415
):
24182416
context = sushi_test_project.context
24192417

2420-
# should be empty list during parse time
2418+
# empty selected resources
24212419
direct_access = context.render("{{ selected_resources }}")
24222420
assert direct_access == "[]"
24232421

@@ -2443,32 +2441,16 @@ def test_selected_resources_context_variable(
24432441
result = context.render(test_condition)
24442442
assert result.strip() == "no_resources"
24452443

2446-
# Test 4: Test with runtime adapter (simulating runtime execution)
2447-
runtime_adapter = RuntimeAdapter(
2448-
engine_adapter=sushi_test_dbt_context.engine_adapter,
2449-
jinja_macros=JinjaMacroRegistry(),
2450-
jinja_globals={
2451-
"selected_models": {
2452-
'"jaffle_shop"."main"."customers"',
2453-
'"jaffle_shop"."main"."orders"',
2454-
'"jaffle_shop"."main"."items"',
2455-
},
2456-
},
2457-
)
2458-
2459-
# it should return correct selected resources in dbt format
2460-
selected_resources = runtime_adapter.selected_resources
2461-
assert len(selected_resources) == 3
2462-
assert "model.jaffle_shop.customers" in selected_resources
2463-
assert "model.jaffle_shop.orders" in selected_resources
2464-
assert "model.jaffle_shop.items" in selected_resources
2444+
# selected resources in dbt format
2445+
selected_resources = [
2446+
"model.jaffle_shop.customers",
2447+
"model.jaffle_shop.items",
2448+
"model.jaffle_shop.orders",
2449+
]
24652450

24662451
# check the jinja macros rendering
24672452
result = context.render("{{ selected_resources }}", selected_resources=selected_resources)
2468-
assert (
2469-
result
2470-
== "['model.jaffle_shop.customers', 'model.jaffle_shop.items', 'model.jaffle_shop.orders']"
2471-
)
2453+
assert result == selected_resources.__repr__()
24722454

24732455
result = context.render(test_jinja, selected_resources=selected_resources)
24742456
assert result.strip() == "3"

0 commit comments

Comments
 (0)