Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,15 @@ def resolve_table(self, model_name: str) -> str:
"""
model_name = normalize_model_name(model_name, self.default_catalog, self.default_dialect)

if model_name not in self._model_tables:
model_name_list = "\n".join(list(self._model_tables))
logger.debug(
f"'{model_name}' not found in model to table mapping. Available model names: \n{model_name_list}"
)
raise SQLMeshError(
f"Unable to find a table mapping for model '{model_name}'. Has it been spelled correctly?"
)

# We generate SQL for the default dialect because the table name may be used in a
# fetchdf call and so the quotes need to be correct (eg. backticks for bigquery)
return parse_one(self._model_tables[model_name]).sql(
Expand Down
4 changes: 2 additions & 2 deletions sqlmesh/core/test/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def _model_tables(self) -> t.Dict[str, str]:
# Include upstream dependencies to ensure they can be resolved during test execution
return {
name: self._test._test_fixture_table(name).sql()
for model in self._models.values()
for name in [model.name, *model.depends_on]
for normalized_model_name, model in self._models.items()
for name in [normalized_model_name, *model.depends_on]
}

def with_variables(
Expand Down
274 changes: 273 additions & 1 deletion tests/core/test_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from sqlmesh.core.model import Model, SqlModel, load_sql_based_model, model
from sqlmesh.core.test.definition import ModelTest, PythonModelTest, SqlModelTest
from sqlmesh.core.test.result import ModelTextTestResult
from sqlmesh.core.test.context import TestExecutionContext
from sqlmesh.utils import Verbosity
from sqlmesh.utils.errors import ConfigError, SQLMeshError, TestError
from sqlmesh.utils.yaml import dump as dump_yaml
Expand Down Expand Up @@ -69,11 +70,14 @@ def _create_model(
meta: str = SUSHI_FOO_META,
dialect: t.Optional[str] = None,
default_catalog: t.Optional[str] = None,
**kwargs: t.Any,
) -> SqlModel:
parsed_definition = parse(f"{meta};{query}", default_dialect=dialect)
return t.cast(
SqlModel,
load_sql_based_model(parsed_definition, dialect=dialect, default_catalog=default_catalog),
load_sql_based_model(
parsed_definition, dialect=dialect, default_catalog=default_catalog, **kwargs
),
)


Expand Down Expand Up @@ -2814,3 +2818,271 @@ def test_test_generation_with_timestamp_nat(tmp_path: Path) -> None:
assert query_output[0]["ts_col"] == datetime.datetime(2024, 9, 20, 11, 30, 0, 123456)
assert query_output[1]["ts_col"] is None
assert query_output[2]["ts_col"] == datetime.datetime(2024, 9, 21, 15, 45, 0, 987654)


def test_parameterized_name_sql_model() -> None:
variables = {"table_catalog": "gold"}
model = _create_model(
"select 1 as id, 'foo' as name",
meta="""
MODEL (
name @{table_catalog}.sushi.foo,
kind FULL
)
""",
dialect="snowflake",
variables=variables,
)
assert model.fqn == '"GOLD"."SUSHI"."FOO"'

test = _create_test(
body=load_yaml(
"""
test_foo:
model: {{ var('table_catalog' ) }}.sushi.foo
outputs:
query:
- id: 1
name: foo
""",
variables=variables,
),
test_name="test_foo",
model=model,
context=Context(
config=Config(
model_defaults=ModelDefaultsConfig(dialect="snowflake"), variables=variables
)
),
)

assert test.body["model"] == '"GOLD"."SUSHI"."FOO"'

_check_successful_or_raise(test.run())


def test_parameterized_name_python_model() -> None:
variables = {"table_catalog": "gold"}

@model(
name="@{table_catalog}.sushi.foo",
columns={
"id": "int",
"name": "varchar",
},
dialect="snowflake",
)
def execute(
context: ExecutionContext,
**kwargs: t.Any,
) -> pd.DataFrame:
return pd.DataFrame([{"ID": 1, "NAME": "foo"}])

python_model = model.get_registry()["@{table_catalog}.sushi.foo"].model(
module_path=Path("."), path=Path("."), variables=variables
)

assert python_model.fqn == '"GOLD"."SUSHI"."FOO"'

test = _create_test(
body=load_yaml(
"""
test_foo:
model: {{ var('table_catalog' ) }}.sushi.foo
outputs:
query:
- id: 1
name: foo
""",
variables=variables,
),
test_name="test_foo",
model=python_model,
context=Context(
config=Config(
model_defaults=ModelDefaultsConfig(dialect="snowflake"), variables=variables
)
),
)

assert test.body["model"] == '"GOLD"."SUSHI"."FOO"'

_check_successful_or_raise(test.run())


def test_parameterized_name_self_referential_model():
variables = {"table_catalog": "gold"}
model = _create_model(
"""
with last_value as (
select coalesce(max(v), 0) as v from @{table_catalog}.sushi.foo
)
select v + 1 as v from last_value
""",
meta="""
MODEL (
name @{table_catalog}.sushi.foo,
kind FULL
)
""",
dialect="snowflake",
variables=variables,
)
assert model.fqn == '"GOLD"."SUSHI"."FOO"'

test1 = _create_test(
body=load_yaml(
"""
test_foo_intial_state:
model: {{ var('table_catalog' ) }}.sushi.foo
inputs:
{{ var('table_catalog' ) }}.sushi.foo:
rows: []
columns:
v: int
outputs:
query:
- v: 1
""",
variables=variables,
),
test_name="test_foo_intial_state",
model=model,
context=Context(
config=Config(
model_defaults=ModelDefaultsConfig(dialect="snowflake"), variables=variables
)
),
)
assert isinstance(test1, SqlModelTest)
assert test1.body["model"] == '"GOLD"."SUSHI"."FOO"'
test1_model_query = test1._render_model_query().sql(dialect="snowflake")
assert '"GOLD"."SUSHI"."FOO"' not in test1_model_query
assert (
test1._test_fixture_table('"GOLD"."SUSHI"."FOO"').sql(dialect="snowflake", identify=True)
in test1_model_query
)

test2 = _create_test(
body=load_yaml(
"""
test_foo_cumulative:
model: {{ var('table_catalog' ) }}.sushi.foo
inputs:
{{ var('table_catalog' ) }}.sushi.foo:
rows:
- v: 5
outputs:
query:
- v: 6
""",
variables=variables,
),
test_name="test_foo_cumulative",
model=model,
context=Context(
config=Config(
model_defaults=ModelDefaultsConfig(dialect="snowflake"), variables=variables
)
),
)
assert isinstance(test2, SqlModelTest)
assert test2.body["model"] == '"GOLD"."SUSHI"."FOO"'
test2_model_query = test2._render_model_query().sql(dialect="snowflake")
assert '"GOLD"."SUSHI"."FOO"' not in test2_model_query
assert (
test2._test_fixture_table('"GOLD"."SUSHI"."FOO"').sql(dialect="snowflake", identify=True)
in test2_model_query
)

_check_successful_or_raise(test1.run())
_check_successful_or_raise(test2.run())


def test_parameterized_name_self_referential_python_model():
variables = {"table_catalog": "gold"}

@model(
name="@{table_catalog}.sushi.foo",
columns={
"id": "int",
},
depends_on=["@{table_catalog}.sushi.bar"],
dialect="snowflake",
)
def execute(
context: ExecutionContext,
**kwargs: t.Any,
) -> pd.DataFrame:
current_table = context.resolve_table(f"{context.var('table_catalog')}.sushi.foo")
current_df = context.fetchdf(f"select id from {current_table}")
upstream_table = context.resolve_table(f"{context.var('table_catalog')}.sushi.bar")
upstream_df = context.fetchdf(f"select id from {upstream_table}")

return pd.DataFrame([{"ID": upstream_df["ID"].sum() + current_df["ID"].sum()}])

@model(
name="@{table_catalog}.sushi.bar",
columns={
"id": "int",
},
dialect="snowflake",
)
def execute(
context: ExecutionContext,
**kwargs: t.Any,
) -> pd.DataFrame:
return pd.DataFrame([{"ID": 1}])

model_foo = model.get_registry()["@{table_catalog}.sushi.foo"].model(
module_path=Path("."), path=Path("."), variables=variables
)
model_bar = model.get_registry()["@{table_catalog}.sushi.bar"].model(
module_path=Path("."), path=Path("."), variables=variables
)

assert model_foo.fqn == '"GOLD"."SUSHI"."FOO"'
assert model_bar.fqn == '"GOLD"."SUSHI"."BAR"'

ctx = Context(
config=Config(model_defaults=ModelDefaultsConfig(dialect="snowflake"), variables=variables)
)
ctx.upsert_model(model_foo)
ctx.upsert_model(model_bar)

test = _create_test(
body=load_yaml(
"""
test_foo:
model: {{ var('table_catalog') }}.sushi.foo
inputs:
{{ var('table_catalog') }}.sushi.foo:
rows:
- id: 3
{{ var('table_catalog') }}.sushi.bar:
rows:
- id: 5
outputs:
query:
- id: 8
""",
variables=variables,
),
test_name="test_foo",
model=model_foo,
context=ctx,
)

assert isinstance(test, PythonModelTest)

assert test.body["model"] == '"GOLD"."SUSHI"."FOO"'
assert '"GOLD"."SUSHI"."BAR"' in test.body["inputs"]

assert isinstance(test.context, TestExecutionContext)
assert '"GOLD"."SUSHI"."FOO"' in test.context._model_tables
assert '"GOLD"."SUSHI"."BAR"' in test.context._model_tables

with pytest.raises(SQLMeshError, match=r"Unable to find a table mapping"):
test.context.resolve_table("silver.sushi.bar")

_check_successful_or_raise(test.run())