Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 72 additions & 9 deletions sqlmesh/core/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from sqlmesh.core.dialect import normalize_model_name
from sqlmesh.core.environment import Environment
from sqlmesh.core.model import update_model_schemas
from sqlmesh.core.audit import StandaloneAudit
from sqlmesh.utils import UniqueKeyDict
from sqlmesh.utils.dag import DAG
from sqlmesh.utils.git import GitClient
Expand All @@ -25,6 +26,7 @@
if t.TYPE_CHECKING:
from typing_extensions import Literal as Lit # noqa
from sqlmesh.core.model import Model
from sqlmesh.core.node import Node
from sqlmesh.core.state_sync import StateReader


Expand Down Expand Up @@ -167,7 +169,7 @@ def get_model(fqn: str) -> t.Optional[Model]:
return models

def expand_model_selections(
self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Model]] = None
self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None
) -> t.Set[str]:
"""Expands a set of model selections into a set of model fqns that can be looked up in the Context.

Expand All @@ -180,7 +182,7 @@ def expand_model_selections(

node = parse(" | ".join(f"({s})" for s in model_selections))

all_models = models or self._models
all_models: t.Dict[str, Node] = models or dict(self._models)
models_by_tags: t.Dict[str, t.Set[str]] = {}

for fqn, model in all_models.items():
Expand Down Expand Up @@ -226,6 +228,13 @@ def evaluate(node: exp.Expression) -> t.Set[str]:
if fnmatch.fnmatchcase(tag, pattern)
}
return models_by_tags.get(pattern, set())
if isinstance(node, ResourceType):
resource_type = node.name.lower()
return {
fqn
for fqn, model in all_models.items()
if self._matches_resource_type(resource_type, model)
}
if isinstance(node, Direction):
selected = set()

Expand All @@ -243,36 +252,49 @@ def evaluate(node: exp.Expression) -> t.Set[str]:
return evaluate(node)

@abc.abstractmethod
def _model_name(self, model: Model) -> str:
def _model_name(self, model: Node) -> str:
"""Given a model, return the name that a selector pattern contining wildcards should be fnmatch'd on"""
pass

@abc.abstractmethod
def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Model]) -> t.Set[str]:
def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]:
"""Given a pattern, return the keys of the matching models from :all_models"""
pass

@abc.abstractmethod
def _matches_resource_type(self, resource_type: str, model: Node) -> bool:
"""Indicate whether or not the supplied model matches the supplied resource type"""
pass


class NativeSelector(Selector):
"""Implementation of selectors that matches objects based on SQLMesh native names"""

def _model_name(self, model: Model) -> str:
def _model_name(self, model: Node) -> str:
return model.name

def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Model]) -> t.Set[str]:
def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]:
fqn = normalize_model_name(pattern, self._default_catalog, self._dialect)
return {fqn} if fqn in all_models else set()

def _matches_resource_type(self, resource_type: str, model: Node) -> bool:
if resource_type == "model":
return model.is_model
if resource_type == "audit":
return isinstance(model, StandaloneAudit)

raise SQLMeshError(f"Unsupported resource type: {resource_type}")


class DbtSelector(Selector):
"""Implementation of selectors that matches objects based on the DBT names instead of the SQLMesh native names"""

def _model_name(self, model: Model) -> str:
def _model_name(self, model: Node) -> str:
if dbt_fqn := model.dbt_fqn:
return dbt_fqn
raise SQLMeshError("dbt node information must be populated to use dbt selectors")

def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Model]) -> t.Set[str]:
def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]:
# a pattern like "staging.customers" should match a model called "jaffle_shop.staging.customers"
# but not a model called "jaffle_shop.customers.staging"
# also a pattern like "aging" should not match "staging" so we need to consider components; not substrings
Expand Down Expand Up @@ -306,6 +328,40 @@ def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Model]) -
matches.add(fqn)
return matches

def _matches_resource_type(self, resource_type: str, model: Node) -> bool:
"""
ref: https://docs.getdbt.com/reference/node-selection/methods#resource_type

# supported by SQLMesh
"model"
"seed"
"source" # external model
"test" # standalone audit

# not supported by SQLMesh yet, commented out to throw an error if someone tries to use them
"analysis"
"exposure"
"metric"
"saved_query"
"semantic_model"
"snapshot"
"unit_test"
"""
if resource_type not in ("model", "seed", "source", "test"):
raise SQLMeshError(f"Unsupported resource type: {resource_type}")

if isinstance(model, StandaloneAudit):
return resource_type == "test"

if resource_type == "model":
return model.is_model and not model.kind.is_external and not model.kind.is_seed
if resource_type == "source":
return model.kind.is_external
if resource_type == "seed":
return model.kind.is_seed

return False


class SelectorDialect(Dialect):
IDENTIFIERS_CAN_START_WITH_DIGIT = True
Expand Down Expand Up @@ -336,6 +392,10 @@ class Tag(exp.Expression):
pass


class ResourceType(exp.Expression):
pass


class Direction(exp.Expression):
pass

Expand Down Expand Up @@ -388,7 +448,8 @@ def _parse_var() -> exp.Expression:
upstream = _match(TokenType.PLUS)
downstream = None
tag = _parse_kind("tag")
git = False if tag else _parse_kind("git")
resource_type = False if tag else _parse_kind("resource_type")
git = False if resource_type else _parse_kind("git")
lstar = "*" if _match(TokenType.STAR) else ""
directions = {}

Expand All @@ -414,6 +475,8 @@ def _parse_var() -> exp.Expression:

if tag:
this = Tag(this=this)
if resource_type:
this = ResourceType(this=this)
if git:
this = Git(this=this)
if directions:
Expand Down
32 changes: 30 additions & 2 deletions sqlmesh_dbt/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,39 @@ def _cleanup() -> None:

select_option = click.option(
"-s",
"-m",
"--select",
multiple=True,
help="Specify the nodes to include.",
)
model_option = click.option(
"-m",
"--models",
"--model",
multiple=True,
help="Specify the nodes to include.",
help="Specify the model nodes to include; other nodes are excluded.",
)
exclude_option = click.option("--exclude", multiple=True, help="Specify the nodes to exclude.")

# TODO: expand this out into --resource-type/--resource-types and --exclude-resource-type/--exclude-resource-types
resource_types = [
"metric",
"semantic_model",
"saved_query",
"source",
"analysis",
"model",
"test",
"unit_test",
"exposure",
"snapshot",
"seed",
"default",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small question on the last two, what will all and default be in the sqlmesh context? I suppose this is a question for later on when multiple resource types support is added, but will these two mirror dbt as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, unimplemented :) but at a guess i'd say they would be aliases for the same thing because SQLMesh includes everything by default

"all",
]
resource_type_option = click.option(
"--resource-type", type=click.Choice(resource_types, case_sensitive=False)
)


@click.group(cls=ErrorHandlingGroup, invoke_without_command=True)
@click.option("--profile", help="Which existing profile to load. Overrides output.profile")
Expand Down Expand Up @@ -86,7 +110,9 @@ def dbt(

@dbt.command()
@select_option
@model_option
@exclude_option
@resource_type_option
@click.option(
"-f",
"--full-refresh",
Expand Down Expand Up @@ -116,7 +142,9 @@ def run(

@dbt.command(name="list")
@select_option
@model_option
@exclude_option
@resource_type_option
@vars_option
@click.pass_context
def list_(ctx: click.Context, vars: t.Optional[t.Dict[str, t.Any]], **kwargs: t.Any) -> None:
Expand Down
26 changes: 21 additions & 5 deletions sqlmesh_dbt/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ def list_(
self,
select: t.Optional[t.List[str]] = None,
exclude: t.Optional[t.List[str]] = None,
models: t.Optional[t.List[str]] = None,
resource_type: t.Optional[str] = None,
) -> None:
# dbt list prints:
# - models
# - "data tests" (audits) for those models
# it also applies selectors which is useful for testing selectors
selected_models = list(self._selected_models(select, exclude).values())
selected_models = list(
self._selected_models(select, exclude, models, resource_type).values()
)
self.console.list_models(
selected_models, {k: v.node for k, v in self.context.snapshots.items()}
)
Expand All @@ -41,13 +45,19 @@ def run(
environment: t.Optional[str] = None,
select: t.Optional[t.List[str]] = None,
exclude: t.Optional[t.List[str]] = None,
models: t.Optional[t.List[str]] = None,
resource_type: t.Optional[str] = None,
full_refresh: bool = False,
empty: bool = False,
) -> Plan:
consolidated_select, consolidated_exclude = selectors.consolidate(
select or [], exclude or [], models or [], resource_type
)

plan_builder = self._plan_builder(
environment=environment,
select=select,
exclude=exclude,
select=consolidated_select,
exclude=consolidated_exclude,
full_refresh=full_refresh,
empty=empty,
)
Expand Down Expand Up @@ -86,9 +96,15 @@ def _plan_builder(
)

def _selected_models(
self, select: t.Optional[t.List[str]] = None, exclude: t.Optional[t.List[str]] = None
self,
select: t.Optional[t.List[str]] = None,
exclude: t.Optional[t.List[str]] = None,
models: t.Optional[t.List[str]] = None,
resource_type: t.Optional[str] = None,
) -> t.Dict[str, Model]:
if sqlmesh_selector := selectors.to_sqlmesh(select or [], exclude or []):
if sqlmesh_selector := selectors.to_sqlmesh(
*selectors.consolidate(select or [], exclude or [], models or [], resource_type)
):
if self.debug:
self.console.print(f"dbt --select: {select}")
self.console.print(f"dbt --exclude: {exclude}")
Expand Down
40 changes: 39 additions & 1 deletion sqlmesh_dbt/selectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,45 @@
logger = logging.getLogger(__name__)


def to_sqlmesh(dbt_select: t.Collection[str], dbt_exclude: t.Collection[str]) -> t.Optional[str]:
def consolidate(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why even expose this outside this module? Why not make it a part of a single unified to_sqlmesh interface?

Copy link
Collaborator Author

@erindru erindru Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that but I wanted to keep the surface of to_sqlmesh() simple and take the minimum required parameters.

combine() still operates in dbt-land and produces consolidated selectors in dbt format. The goal is to not have to always pass down select, exclude, resource_type, models etc parameters through every layer when really all of these parameters can be described in terms of select/exclude.

So they get consolidated early to reduce the number of parameters being passed around and then the lower layers then just call to_sqlmesh() on just select / exclude and dont have to worry about or consider any of the other parameters.

You can see this in DbtOperations._plan_builder which deliberately only needs select/exclude, not all the other options that may be passed.

select: t.List[str],
exclude: t.List[str],
models: t.List[str],
resource_type: t.Optional[str],
) -> t.Tuple[t.List[str], t.List[str]]:
"""
Given a bunch of dbt CLI arguments that may or may not be defined:
--select, --exclude, --models, --resource-type

Combine them into a single set of --select/--exclude node selectors, throwing an error if mutually exclusive combinations are provided
Note that the returned value is still in dbt format, pass it to to_sqlmesh() to create a selector for the sqlmesh selector engine
"""
if models and select:
raise ValueError('"models" and "select" are mutually exclusive arguments')

if models and resource_type:
raise ValueError('"models" and "resource_type" are mutually exclusive arguments')

if models:
# --models implies resource_type:model
resource_type = "model"

if resource_type:
resource_type_selector = f"resource_type:{resource_type}"
all_selectors = [*select, *models]
select = (
[
f"resource_type:{resource_type},{original_selector}"
for original_selector in all_selectors
]
if all_selectors
else [resource_type_selector]
)

return select, exclude


def to_sqlmesh(dbt_select: t.List[str], dbt_exclude: t.List[str]) -> t.Optional[str]:
"""
Given selectors defined in the format of the dbt cli --select and --exclude arguments, convert them into a selector expression that
the SQLMesh selector engine can understand.
Expand Down
Loading