From 676c37a13e8fdd41077dd9075759275db5aa98bd Mon Sep 17 00:00:00 2001 From: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com> Date: Mon, 21 Jul 2025 14:11:40 +0300 Subject: [PATCH 1/3] Chore: Add cron in the fields to render for python models --- sqlmesh/core/model/definition.py | 18 +++- tests/core/test_model.py | 177 ++++++++++++++++++++++++++++++- 2 files changed, 192 insertions(+), 3 deletions(-) diff --git a/sqlmesh/core/model/definition.py b/sqlmesh/core/model/definition.py index e900e2fc25..8ab9915069 100644 --- a/sqlmesh/core/model/definition.py +++ b/sqlmesh/core/model/definition.py @@ -84,6 +84,16 @@ "merge_filter", } | PROPERTIES +CRON_ALIASES = { + "@midnight", + "@hourly", + "@daily", + "@weekly", + "@monthly", + "@yearly", + "@annually", +} + class _Model(ModelMeta, frozen=True): """Model is the core abstraction for user defined datasets. @@ -2771,9 +2781,15 @@ def render_field_value(value: t.Any) -> t.Any: field_value = fields.get(field) # We don't want to parse python model cron="@..." kwargs (e.g. @daily) into MacroVar - if field == "cron" or field_value is None: + if ( + field == "cron" and isinstance(field_value, str) and field_value.lower() in CRON_ALIASES + ) or field_value is None: continue + # If it contains a macro reference we need to render it similar to sql models + if field == "cron" and isinstance(field_value, str): + field_value = exp.to_identifier(field_value) + if field in RUNTIME_RENDERED_MODEL_FIELDS: fields[field] = parse_strings_with_macro_refs(field_value, dialect) continue diff --git a/tests/core/test_model.py b/tests/core/test_model.py index b13a5797cc..405cf9b012 100644 --- a/tests/core/test_model.py +++ b/tests/core/test_model.py @@ -4263,6 +4263,7 @@ def python_model_prop(context, **kwargs): def test_model_defaults_macros(make_snapshot): model_defaults = ModelDefaultsConfig( table_format="@IF(@gateway = 'dev', 'iceberg', NULL)", + cron="@cron_macro", storage_format="@IF(@gateway = 'local', 'parquet', NULL)", optimize_query="@IF(@gateway = 'dev', True, False)", enabled="@IF(@gateway = 'dev', True, False)", @@ -4298,7 +4299,7 @@ def test_model_defaults_macros(make_snapshot): default_dialect="snowflake", ), defaults=model_defaults.dict(), - variables={"gateway": "dev", "create_type": "SECURE"}, + variables={"gateway": "dev", "create_type": "SECURE", "cron_macro": "@daily"}, ) snapshot: Snapshot = make_snapshot(model) @@ -4311,6 +4312,7 @@ def test_model_defaults_macros(make_snapshot): assert not model.allow_partials assert model.interval_unit == IntervalUnit.DAY assert model.table_format == "iceberg" + assert model.cron == "@daily" # Validate disabling of conditional model default assert not model.storage_format @@ -4363,6 +4365,7 @@ def test_model_defaults_macros_python_model(make_snapshot): "partition_expiration_days": 13, "creatable_type": "@IF(@model_kind_name = 'FULL', 'TRANSIENT', NULL)", }, + "cron": "@cron_macro_expr", "table_format": "@IF(@gateway = 'local', 'iceberg', NULL)", "storage_format": "@IF(@gateway = 'dev', 'parquet', NULL)", "optimize_query": "@IF(@gateway = 'local', True, False)", @@ -4391,13 +4394,14 @@ def python_model_prop_macro(context, **kwargs): path=Path("."), dialect="duckdb", defaults=model_defaults, - variables={"gateway": "local", "create_type": "SECURE"}, + variables={"gateway": "local", "create_type": "SECURE", "cron_macro_expr": "0 */2 * * *"}, ) # Even if in the project wide defaults this is ignored for python models assert not m.optimize_query # Validate rendering of model defaults + assert m.cron == "0 */2 * * *" assert m.enabled assert m.start == "2024-01-01" assert m.allow_partials @@ -6379,6 +6383,7 @@ def test_macros_python_model(mocker: MockerFixture) -> None: columns={"a": "string"}, kind=dict(name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, time_column="@{time_col}"), stamp="@{stamp}", + cron="@some_cron_var", owner="@IF(@gateway = 'dev', @{dev_owner}, @{prod_owner})", enabled="@IF(@gateway = 'dev', True, False)", start="@IF(@gateway = 'dev', '1 month ago', '2024-01-01')", @@ -6406,6 +6411,7 @@ def model_with_macros(context, **kwargs): "prod_owner": "pr_1", "stamp": "bump", "time_col": "a", + "some_cron_var": "@daily", }, ) @@ -6417,6 +6423,7 @@ def model_with_macros(context, **kwargs): assert python_model.stamp == "bump" assert python_model.time_column.column == exp.column("a", quoted=True) assert python_model.partitioned_by[0].sql() == 'DATETIME_TRUNC("a", MONTH)' + assert python_model.cron == "@daily" context = ExecutionContext(mocker.Mock(), {}, None, None) df = list(python_model.render(context=context))[0] @@ -10747,3 +10754,169 @@ def test_datetime_without_timezone_variable_redshift() -> None: model.render_query_or_raise().sql("redshift") == '''SELECT CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS "test_time_col"''' ) + + +def test_python_model_cron_with_blueprints(tmp_path: Path) -> None: + init_example_project(tmp_path, engine_type="duckdb", template=ProjectTemplate.EMPTY) + + cron_blueprint_model = tmp_path / "models" / "cron_blueprint.py" + cron_blueprint_model.parent.mkdir(parents=True, exist_ok=True) + cron_blueprint_model.write_text( + """ +import typing as t +from datetime import datetime + +import pandas as pd +from sqlmesh import ExecutionContext, model + +@model( + "@{customer}.some_table", + kind="FULL", + cron="*/@{min} * * * *", + blueprints=[ + {"customer": "customer1", "field_a": "x", "field_b": "y", "min": 5}, + {"customer": "customer2", "field_a": "z", "field_b": "w", "min": 10}, + ], + columns={ + "field_a": "text", + "field_b": "text", + "customer": "text", + }, + enabled=True +) +def entrypoint( + context: ExecutionContext, + start: datetime, + end: datetime, + execution_time: datetime, + **kwargs: t.Any, +) -> pd.DataFrame: + return pd.DataFrame( + { + "field_a": [context.blueprint_var("field_a")], + "field_b": [context.blueprint_var("field_b")], + "customer": [context.blueprint_var("customer")], + } + ) +""" + ) + + context = Context( + paths=tmp_path, config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")) + ) + models = context.models + + # Test first blueprint + customer1_model = models.get('"memory"."customer1"."some_table"') + assert customer1_model is not None + assert customer1_model.cron == "*/5 * * * *" + assert customer1_model.enabled + assert "blueprints" not in customer1_model.all_fields() + assert customer1_model.python_env.get(c.SQLMESH_BLUEPRINT_VARS) == Executable.value( + {"customer": "customer1", "field_a": "x", "field_b": "y", "min": 5} + ) + + # Test second blueprint + customer2_model = models.get('"memory"."customer2"."some_table"') + assert customer2_model is not None + assert customer2_model.cron == "*/10 * * * *" + assert customer2_model.python_env.get(c.SQLMESH_BLUEPRINT_VARS) == Executable.value( + {"customer": "customer2", "field_a": "z", "field_b": "w", "min": 10} + ) + + # Test that the models can be planned and applied + context.plan(no_prompts=True, auto_apply=True, no_diff=True) + + # Verify the data + assert context.fetchdf('from "memory"."customer1"."some_table"').to_dict() == { + "field_a": {0: "x"}, + "field_b": {0: "y"}, + "customer": {0: "customer1"}, + } + assert context.fetchdf('from "memory"."customer2"."some_table"').to_dict() == { + "field_a": {0: "z"}, + "field_b": {0: "w"}, + "customer": {0: "customer2"}, + } + + +def test_python_model_cron_macro_rendering(tmp_path: Path) -> None: + init_example_project(tmp_path, engine_type="duckdb", template=ProjectTemplate.EMPTY) + + cron_macro_model = tmp_path / "models" / "cron_macro.py" + cron_macro_model.parent.mkdir(parents=True, exist_ok=True) + cron_macro_model.write_text( + """ +import pandas as pd +from sqlmesh import model + +@model( + "msc.test_cron_model", + kind="FULL", + cron="@{cron_schedule}", + columns={"a": "int"}, +) +def entrypoint(context, **kwargs): + return pd.DataFrame([{"a": 1}]) +""" + ) + + # Test with cron alias + context_daily = Context( + paths=tmp_path, + config=Config( + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + variables={"cron_schedule": "@daily"}, + ), + ) + model_daily = context_daily.models.get('"memory"."msc"."test_cron_model"') + + assert model_daily is not None + assert model_daily.cron == "@daily" + + # Test with cron expression + context_expr = Context( + paths=tmp_path, + config=Config( + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + variables={"cron_schedule": "0 */2 * * *"}, + ), + ) + model_expr = context_expr.models.get('"memory"."msc"."test_cron_model"') + assert model_expr is not None + assert model_expr.cron == "0 */2 * * *" + + +def test_python_model_normal_cron(tmp_path: Path) -> None: + init_example_project(tmp_path, engine_type="duckdb", template=ProjectTemplate.EMPTY) + + cron_macro_model = tmp_path / "models" / "cron_macro.py" + cron_macro_model.parent.mkdir(parents=True, exist_ok=True) + cron_macro_model.write_text( + """ +import pandas as pd +from sqlmesh import model + +@model( + "msc.normal_test_cron_model", + kind="FULL", + cron="@daily", + columns={"a": "int"}, +) +def entrypoint(context, **kwargs): + return pd.DataFrame([{"a": 1}]) +""" + ) + + # Test with cron alias + context_daily = Context( + paths=tmp_path, + config=Config( + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + variables={"cron_schedule": "@daily"}, + ), + ) + model_daily = context_daily.models.get('"memory"."msc"."normal_test_cron_model"') + + assert model_daily is not None + assert model_daily.cron == "@daily" From 4ae5a0f876c5b1bf39d9a95b1f9eb606a71c9931 Mon Sep 17 00:00:00 2001 From: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com> Date: Mon, 21 Jul 2025 15:41:50 +0300 Subject: [PATCH 2/3] address comments --- sqlmesh/core/model/definition.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sqlmesh/core/model/definition.py b/sqlmesh/core/model/definition.py index 8ab9915069..44c84e4d2b 100644 --- a/sqlmesh/core/model/definition.py +++ b/sqlmesh/core/model/definition.py @@ -74,8 +74,6 @@ logger = logging.getLogger(__name__) -UNRENDERABLE_MODEL_FIELDS = {"cron", "description"} - PROPERTIES = {"physical_properties", "session_properties", "virtual_properties"} RUNTIME_RENDERED_MODEL_FIELDS = { @@ -84,7 +82,7 @@ "merge_filter", } | PROPERTIES -CRON_ALIASES = { +CRON_SHORTCUTS = { "@midnight", "@hourly", "@daily", @@ -2782,7 +2780,9 @@ def render_field_value(value: t.Any) -> t.Any: # We don't want to parse python model cron="@..." kwargs (e.g. @daily) into MacroVar if ( - field == "cron" and isinstance(field_value, str) and field_value.lower() in CRON_ALIASES + field == "cron" + and isinstance(field_value, str) + and field_value.lower() in CRON_SHORTCUTS ) or field_value is None: continue From 717af000c63aff9203aadd05665ba27475acfd71 Mon Sep 17 00:00:00 2001 From: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com> Date: Mon, 21 Jul 2025 17:08:39 +0300 Subject: [PATCH 3/3] remove conversion of cron to identifier; add doc clarification --- docs/concepts/models/python_models.md | 29 +++++++++++++++++++++++++++ sqlmesh/core/model/definition.py | 4 ---- tests/core/test_model.py | 2 +- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/docs/concepts/models/python_models.md b/docs/concepts/models/python_models.md index 53495d11aa..63796987a8 100644 --- a/docs/concepts/models/python_models.md +++ b/docs/concepts/models/python_models.md @@ -394,6 +394,35 @@ It's also possible to use the `@EACH` macro, combined with a global list variabl ... ``` +## Using macros in model properties + +Python models support macro variables in model properties. However, special care must be taken when the macro variable appears within a string. + +For example when using macro variables inside cron expressions, you need to wrap the entire expression in quotes and prefix it with `@` to ensure proper parsing: + +```python linenums="1" +# Correct: Wrap the cron expression containing a macro variable +@model( + "my_model", + cron="@'*/@{mins} * * * *'", # Note the @'...' syntax + ... +) + +# This also works with blueprint variables +@model( + "@{customer}.scheduled_model", + cron="@'0 @{hour} * * *'", + blueprints=[ + {"customer": "customer_1", "hour": 2}, # Runs at 2 AM + {"customer": "customer_2", "hour": 8}, # Runs at 8 AM + ], + ... +) + +``` + +This is necessary because cron expressions often use `@` for aliases (like `@daily`, `@hourly`), which can conflict with SQLMesh's macro syntax. + ## Examples ### Basic The following is an example of a Python model returning a static Pandas DataFrame. diff --git a/sqlmesh/core/model/definition.py b/sqlmesh/core/model/definition.py index 44c84e4d2b..25dd013f4d 100644 --- a/sqlmesh/core/model/definition.py +++ b/sqlmesh/core/model/definition.py @@ -2786,10 +2786,6 @@ def render_field_value(value: t.Any) -> t.Any: ) or field_value is None: continue - # If it contains a macro reference we need to render it similar to sql models - if field == "cron" and isinstance(field_value, str): - field_value = exp.to_identifier(field_value) - if field in RUNTIME_RENDERED_MODEL_FIELDS: fields[field] = parse_strings_with_macro_refs(field_value, dialect) continue diff --git a/tests/core/test_model.py b/tests/core/test_model.py index 405cf9b012..575d9038ae 100644 --- a/tests/core/test_model.py +++ b/tests/core/test_model.py @@ -10772,7 +10772,7 @@ def test_python_model_cron_with_blueprints(tmp_path: Path) -> None: @model( "@{customer}.some_table", kind="FULL", - cron="*/@{min} * * * *", + cron="@'*/@{min} * * * *'", blueprints=[ {"customer": "customer1", "field_a": "x", "field_b": "y", "min": 5}, {"customer": "customer2", "field_a": "z", "field_b": "w", "min": 10},