Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/concepts/models/python_models.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,35 @@ It's also possible to use the `@EACH` macro, combined with a global list variabl
...
```

## Using macros in model properties

Python models support macro variables in model properties. However, special care must be taken when the macro variable appears within a string.

For example when using macro variables inside cron expressions, you need to wrap the entire expression in quotes and prefix it with `@` to ensure proper parsing:

```python linenums="1"
# Correct: Wrap the cron expression containing a macro variable
@model(
"my_model",
cron="@'*/@{mins} * * * *'", # Note the @'...' syntax
...
)

# This also works with blueprint variables
@model(
"@{customer}.scheduled_model",
cron="@'0 @{hour} * * *'",
blueprints=[
{"customer": "customer_1", "hour": 2}, # Runs at 2 AM
{"customer": "customer_2", "hour": 8}, # Runs at 8 AM
],
...
)

```

This is necessary because cron expressions often use `@` for aliases (like `@daily`, `@hourly`), which can conflict with SQLMesh's macro syntax.

## Examples
### Basic
The following is an example of a Python model returning a static Pandas DataFrame.
Expand Down
18 changes: 15 additions & 3 deletions sqlmesh/core/model/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@
logger = logging.getLogger(__name__)


UNRENDERABLE_MODEL_FIELDS = {"cron", "description"}

PROPERTIES = {"physical_properties", "session_properties", "virtual_properties"}

RUNTIME_RENDERED_MODEL_FIELDS = {
Expand All @@ -84,6 +82,16 @@
"merge_filter",
} | PROPERTIES

CRON_SHORTCUTS = {
"@midnight",
"@hourly",
"@daily",
"@weekly",
"@monthly",
"@yearly",
"@annually",
}


class _Model(ModelMeta, frozen=True):
"""Model is the core abstraction for user defined datasets.
Expand Down Expand Up @@ -2771,7 +2779,11 @@ def render_field_value(value: t.Any) -> t.Any:
field_value = fields.get(field)

# We don't want to parse python model cron="@..." kwargs (e.g. @daily) into MacroVar
if field == "cron" or field_value is None:
if (
field == "cron"
and isinstance(field_value, str)
and field_value.lower() in CRON_SHORTCUTS
) or field_value is None:
continue

if field in RUNTIME_RENDERED_MODEL_FIELDS:
Expand Down
177 changes: 175 additions & 2 deletions tests/core/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4263,6 +4263,7 @@ def python_model_prop(context, **kwargs):
def test_model_defaults_macros(make_snapshot):
model_defaults = ModelDefaultsConfig(
table_format="@IF(@gateway = 'dev', 'iceberg', NULL)",
cron="@cron_macro",
storage_format="@IF(@gateway = 'local', 'parquet', NULL)",
optimize_query="@IF(@gateway = 'dev', True, False)",
enabled="@IF(@gateway = 'dev', True, False)",
Expand Down Expand Up @@ -4298,7 +4299,7 @@ def test_model_defaults_macros(make_snapshot):
default_dialect="snowflake",
),
defaults=model_defaults.dict(),
variables={"gateway": "dev", "create_type": "SECURE"},
variables={"gateway": "dev", "create_type": "SECURE", "cron_macro": "@daily"},
)

snapshot: Snapshot = make_snapshot(model)
Expand All @@ -4311,6 +4312,7 @@ def test_model_defaults_macros(make_snapshot):
assert not model.allow_partials
assert model.interval_unit == IntervalUnit.DAY
assert model.table_format == "iceberg"
assert model.cron == "@daily"

# Validate disabling of conditional model default
assert not model.storage_format
Expand Down Expand Up @@ -4363,6 +4365,7 @@ def test_model_defaults_macros_python_model(make_snapshot):
"partition_expiration_days": 13,
"creatable_type": "@IF(@model_kind_name = 'FULL', 'TRANSIENT', NULL)",
},
"cron": "@cron_macro_expr",
"table_format": "@IF(@gateway = 'local', 'iceberg', NULL)",
"storage_format": "@IF(@gateway = 'dev', 'parquet', NULL)",
"optimize_query": "@IF(@gateway = 'local', True, False)",
Expand Down Expand Up @@ -4391,13 +4394,14 @@ def python_model_prop_macro(context, **kwargs):
path=Path("."),
dialect="duckdb",
defaults=model_defaults,
variables={"gateway": "local", "create_type": "SECURE"},
variables={"gateway": "local", "create_type": "SECURE", "cron_macro_expr": "0 */2 * * *"},
)

# Even if in the project wide defaults this is ignored for python models
assert not m.optimize_query

# Validate rendering of model defaults
assert m.cron == "0 */2 * * *"
assert m.enabled
assert m.start == "2024-01-01"
assert m.allow_partials
Expand Down Expand Up @@ -6379,6 +6383,7 @@ def test_macros_python_model(mocker: MockerFixture) -> None:
columns={"a": "string"},
kind=dict(name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, time_column="@{time_col}"),
stamp="@{stamp}",
cron="@some_cron_var",
owner="@IF(@gateway = 'dev', @{dev_owner}, @{prod_owner})",
enabled="@IF(@gateway = 'dev', True, False)",
start="@IF(@gateway = 'dev', '1 month ago', '2024-01-01')",
Expand Down Expand Up @@ -6406,6 +6411,7 @@ def model_with_macros(context, **kwargs):
"prod_owner": "pr_1",
"stamp": "bump",
"time_col": "a",
"some_cron_var": "@daily",
},
)

Expand All @@ -6417,6 +6423,7 @@ def model_with_macros(context, **kwargs):
assert python_model.stamp == "bump"
assert python_model.time_column.column == exp.column("a", quoted=True)
assert python_model.partitioned_by[0].sql() == 'DATETIME_TRUNC("a", MONTH)'
assert python_model.cron == "@daily"

context = ExecutionContext(mocker.Mock(), {}, None, None)
df = list(python_model.render(context=context))[0]
Expand Down Expand Up @@ -10747,3 +10754,169 @@ def test_datetime_without_timezone_variable_redshift() -> None:
model.render_query_or_raise().sql("redshift")
== '''SELECT CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS "test_time_col"'''
)


def test_python_model_cron_with_blueprints(tmp_path: Path) -> None:
init_example_project(tmp_path, engine_type="duckdb", template=ProjectTemplate.EMPTY)

cron_blueprint_model = tmp_path / "models" / "cron_blueprint.py"
cron_blueprint_model.parent.mkdir(parents=True, exist_ok=True)
cron_blueprint_model.write_text(
"""
import typing as t
from datetime import datetime

import pandas as pd
from sqlmesh import ExecutionContext, model

@model(
"@{customer}.some_table",
kind="FULL",
cron="@'*/@{min} * * * *'",
blueprints=[
{"customer": "customer1", "field_a": "x", "field_b": "y", "min": 5},
{"customer": "customer2", "field_a": "z", "field_b": "w", "min": 10},
],
columns={
"field_a": "text",
"field_b": "text",
"customer": "text",
},
enabled=True
)
def entrypoint(
context: ExecutionContext,
start: datetime,
end: datetime,
execution_time: datetime,
**kwargs: t.Any,
) -> pd.DataFrame:
return pd.DataFrame(
{
"field_a": [context.blueprint_var("field_a")],
"field_b": [context.blueprint_var("field_b")],
"customer": [context.blueprint_var("customer")],
}
)
"""
)

context = Context(
paths=tmp_path, config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
)
models = context.models

# Test first blueprint
customer1_model = models.get('"memory"."customer1"."some_table"')
assert customer1_model is not None
assert customer1_model.cron == "*/5 * * * *"
assert customer1_model.enabled
assert "blueprints" not in customer1_model.all_fields()
assert customer1_model.python_env.get(c.SQLMESH_BLUEPRINT_VARS) == Executable.value(
{"customer": "customer1", "field_a": "x", "field_b": "y", "min": 5}
)

# Test second blueprint
customer2_model = models.get('"memory"."customer2"."some_table"')
assert customer2_model is not None
assert customer2_model.cron == "*/10 * * * *"
assert customer2_model.python_env.get(c.SQLMESH_BLUEPRINT_VARS) == Executable.value(
{"customer": "customer2", "field_a": "z", "field_b": "w", "min": 10}
)

# Test that the models can be planned and applied
context.plan(no_prompts=True, auto_apply=True, no_diff=True)

# Verify the data
assert context.fetchdf('from "memory"."customer1"."some_table"').to_dict() == {
"field_a": {0: "x"},
"field_b": {0: "y"},
"customer": {0: "customer1"},
}
assert context.fetchdf('from "memory"."customer2"."some_table"').to_dict() == {
"field_a": {0: "z"},
"field_b": {0: "w"},
"customer": {0: "customer2"},
}


def test_python_model_cron_macro_rendering(tmp_path: Path) -> None:
init_example_project(tmp_path, engine_type="duckdb", template=ProjectTemplate.EMPTY)

cron_macro_model = tmp_path / "models" / "cron_macro.py"
cron_macro_model.parent.mkdir(parents=True, exist_ok=True)
cron_macro_model.write_text(
"""
import pandas as pd
from sqlmesh import model

@model(
"msc.test_cron_model",
kind="FULL",
cron="@{cron_schedule}",
columns={"a": "int"},
)
def entrypoint(context, **kwargs):
return pd.DataFrame([{"a": 1}])
"""
)

# Test with cron alias
context_daily = Context(
paths=tmp_path,
config=Config(
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
variables={"cron_schedule": "@daily"},
),
)
model_daily = context_daily.models.get('"memory"."msc"."test_cron_model"')

assert model_daily is not None
assert model_daily.cron == "@daily"

# Test with cron expression
context_expr = Context(
paths=tmp_path,
config=Config(
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
variables={"cron_schedule": "0 */2 * * *"},
),
)
model_expr = context_expr.models.get('"memory"."msc"."test_cron_model"')
assert model_expr is not None
assert model_expr.cron == "0 */2 * * *"


def test_python_model_normal_cron(tmp_path: Path) -> None:
init_example_project(tmp_path, engine_type="duckdb", template=ProjectTemplate.EMPTY)

cron_macro_model = tmp_path / "models" / "cron_macro.py"
cron_macro_model.parent.mkdir(parents=True, exist_ok=True)
cron_macro_model.write_text(
"""
import pandas as pd
from sqlmesh import model

@model(
"msc.normal_test_cron_model",
kind="FULL",
cron="@daily",
columns={"a": "int"},
)
def entrypoint(context, **kwargs):
return pd.DataFrame([{"a": 1}])
"""
)

# Test with cron alias
context_daily = Context(
paths=tmp_path,
config=Config(
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
variables={"cron_schedule": "@daily"},
),
)
model_daily = context_daily.models.get('"memory"."msc"."normal_test_cron_model"')

assert model_daily is not None
assert model_daily.cron == "@daily"