@@ -4263,6 +4263,7 @@ def python_model_prop(context, **kwargs):
42634263def test_model_defaults_macros (make_snapshot ):
42644264 model_defaults = ModelDefaultsConfig (
42654265 table_format = "@IF(@gateway = 'dev', 'iceberg', NULL)" ,
4266+ cron = "@cron_macro" ,
42664267 storage_format = "@IF(@gateway = 'local', 'parquet', NULL)" ,
42674268 optimize_query = "@IF(@gateway = 'dev', True, False)" ,
42684269 enabled = "@IF(@gateway = 'dev', True, False)" ,
@@ -4298,7 +4299,7 @@ def test_model_defaults_macros(make_snapshot):
42984299 default_dialect = "snowflake" ,
42994300 ),
43004301 defaults = model_defaults .dict (),
4301- variables = {"gateway" : "dev" , "create_type" : "SECURE" },
4302+ variables = {"gateway" : "dev" , "create_type" : "SECURE" , "cron_macro" : "@daily" },
43024303 )
43034304
43044305 snapshot : Snapshot = make_snapshot (model )
@@ -4311,6 +4312,7 @@ def test_model_defaults_macros(make_snapshot):
43114312 assert not model .allow_partials
43124313 assert model .interval_unit == IntervalUnit .DAY
43134314 assert model .table_format == "iceberg"
4315+ assert model .cron == "@daily"
43144316
43154317 # Validate disabling of conditional model default
43164318 assert not model .storage_format
@@ -4363,6 +4365,7 @@ def test_model_defaults_macros_python_model(make_snapshot):
43634365 "partition_expiration_days" : 13 ,
43644366 "creatable_type" : "@IF(@model_kind_name = 'FULL', 'TRANSIENT', NULL)" ,
43654367 },
4368+ "cron" : "@cron_macro_expr" ,
43664369 "table_format" : "@IF(@gateway = 'local', 'iceberg', NULL)" ,
43674370 "storage_format" : "@IF(@gateway = 'dev', 'parquet', NULL)" ,
43684371 "optimize_query" : "@IF(@gateway = 'local', True, False)" ,
@@ -4391,13 +4394,14 @@ def python_model_prop_macro(context, **kwargs):
43914394 path = Path ("." ),
43924395 dialect = "duckdb" ,
43934396 defaults = model_defaults ,
4394- variables = {"gateway" : "local" , "create_type" : "SECURE" },
4397+ variables = {"gateway" : "local" , "create_type" : "SECURE" , "cron_macro_expr" : "0 */2 * * *" },
43954398 )
43964399
43974400 # Even if in the project wide defaults this is ignored for python models
43984401 assert not m .optimize_query
43994402
44004403 # Validate rendering of model defaults
4404+ assert m .cron == "0 */2 * * *"
44014405 assert m .enabled
44024406 assert m .start == "2024-01-01"
44034407 assert m .allow_partials
@@ -6379,6 +6383,7 @@ def test_macros_python_model(mocker: MockerFixture) -> None:
63796383 columns = {"a" : "string" },
63806384 kind = dict (name = ModelKindName .INCREMENTAL_BY_TIME_RANGE , time_column = "@{time_col}" ),
63816385 stamp = "@{stamp}" ,
6386+ cron = "@some_cron_var" ,
63826387 owner = "@IF(@gateway = 'dev', @{dev_owner}, @{prod_owner})" ,
63836388 enabled = "@IF(@gateway = 'dev', True, False)" ,
63846389 start = "@IF(@gateway = 'dev', '1 month ago', '2024-01-01')" ,
@@ -6406,6 +6411,7 @@ def model_with_macros(context, **kwargs):
64066411 "prod_owner" : "pr_1" ,
64076412 "stamp" : "bump" ,
64086413 "time_col" : "a" ,
6414+ "some_cron_var" : "@daily" ,
64096415 },
64106416 )
64116417
@@ -6417,6 +6423,7 @@ def model_with_macros(context, **kwargs):
64176423 assert python_model .stamp == "bump"
64186424 assert python_model .time_column .column == exp .column ("a" , quoted = True )
64196425 assert python_model .partitioned_by [0 ].sql () == 'DATETIME_TRUNC("a", MONTH)'
6426+ assert python_model .cron == "@daily"
64206427
64216428 context = ExecutionContext (mocker .Mock (), {}, None , None )
64226429 df = list (python_model .render (context = context ))[0 ]
@@ -10747,3 +10754,169 @@ def test_datetime_without_timezone_variable_redshift() -> None:
1074710754 model .render_query_or_raise ().sql ("redshift" )
1074810755 == '''SELECT CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS "test_time_col"'''
1074910756 )
10757+
10758+
10759+ def test_python_model_cron_with_blueprints (tmp_path : Path ) -> None :
10760+ init_example_project (tmp_path , engine_type = "duckdb" , template = ProjectTemplate .EMPTY )
10761+
10762+ cron_blueprint_model = tmp_path / "models" / "cron_blueprint.py"
10763+ cron_blueprint_model .parent .mkdir (parents = True , exist_ok = True )
10764+ cron_blueprint_model .write_text (
10765+ """
10766+ import typing as t
10767+ from datetime import datetime
10768+
10769+ import pandas as pd
10770+ from sqlmesh import ExecutionContext, model
10771+
10772+ @model(
10773+ "@{customer}.some_table",
10774+ kind="FULL",
10775+ cron="@'*/@{min} * * * *'",
10776+ blueprints=[
10777+ {"customer": "customer1", "field_a": "x", "field_b": "y", "min": 5},
10778+ {"customer": "customer2", "field_a": "z", "field_b": "w", "min": 10},
10779+ ],
10780+ columns={
10781+ "field_a": "text",
10782+ "field_b": "text",
10783+ "customer": "text",
10784+ },
10785+ enabled=True
10786+ )
10787+ def entrypoint(
10788+ context: ExecutionContext,
10789+ start: datetime,
10790+ end: datetime,
10791+ execution_time: datetime,
10792+ **kwargs: t.Any,
10793+ ) -> pd.DataFrame:
10794+ return pd.DataFrame(
10795+ {
10796+ "field_a": [context.blueprint_var("field_a")],
10797+ "field_b": [context.blueprint_var("field_b")],
10798+ "customer": [context.blueprint_var("customer")],
10799+ }
10800+ )
10801+ """
10802+ )
10803+
10804+ context = Context (
10805+ paths = tmp_path , config = Config (model_defaults = ModelDefaultsConfig (dialect = "duckdb" ))
10806+ )
10807+ models = context .models
10808+
10809+ # Test first blueprint
10810+ customer1_model = models .get ('"memory"."customer1"."some_table"' )
10811+ assert customer1_model is not None
10812+ assert customer1_model .cron == "*/5 * * * *"
10813+ assert customer1_model .enabled
10814+ assert "blueprints" not in customer1_model .all_fields ()
10815+ assert customer1_model .python_env .get (c .SQLMESH_BLUEPRINT_VARS ) == Executable .value (
10816+ {"customer" : "customer1" , "field_a" : "x" , "field_b" : "y" , "min" : 5 }
10817+ )
10818+
10819+ # Test second blueprint
10820+ customer2_model = models .get ('"memory"."customer2"."some_table"' )
10821+ assert customer2_model is not None
10822+ assert customer2_model .cron == "*/10 * * * *"
10823+ assert customer2_model .python_env .get (c .SQLMESH_BLUEPRINT_VARS ) == Executable .value (
10824+ {"customer" : "customer2" , "field_a" : "z" , "field_b" : "w" , "min" : 10 }
10825+ )
10826+
10827+ # Test that the models can be planned and applied
10828+ context .plan (no_prompts = True , auto_apply = True , no_diff = True )
10829+
10830+ # Verify the data
10831+ assert context .fetchdf ('from "memory"."customer1"."some_table"' ).to_dict () == {
10832+ "field_a" : {0 : "x" },
10833+ "field_b" : {0 : "y" },
10834+ "customer" : {0 : "customer1" },
10835+ }
10836+ assert context .fetchdf ('from "memory"."customer2"."some_table"' ).to_dict () == {
10837+ "field_a" : {0 : "z" },
10838+ "field_b" : {0 : "w" },
10839+ "customer" : {0 : "customer2" },
10840+ }
10841+
10842+
10843+ def test_python_model_cron_macro_rendering (tmp_path : Path ) -> None :
10844+ init_example_project (tmp_path , engine_type = "duckdb" , template = ProjectTemplate .EMPTY )
10845+
10846+ cron_macro_model = tmp_path / "models" / "cron_macro.py"
10847+ cron_macro_model .parent .mkdir (parents = True , exist_ok = True )
10848+ cron_macro_model .write_text (
10849+ """
10850+ import pandas as pd
10851+ from sqlmesh import model
10852+
10853+ @model(
10854+ "msc.test_cron_model",
10855+ kind="FULL",
10856+ cron="@{cron_schedule}",
10857+ columns={"a": "int"},
10858+ )
10859+ def entrypoint(context, **kwargs):
10860+ return pd.DataFrame([{"a": 1}])
10861+ """
10862+ )
10863+
10864+ # Test with cron alias
10865+ context_daily = Context (
10866+ paths = tmp_path ,
10867+ config = Config (
10868+ model_defaults = ModelDefaultsConfig (dialect = "duckdb" ),
10869+ variables = {"cron_schedule" : "@daily" },
10870+ ),
10871+ )
10872+ model_daily = context_daily .models .get ('"memory"."msc"."test_cron_model"' )
10873+
10874+ assert model_daily is not None
10875+ assert model_daily .cron == "@daily"
10876+
10877+ # Test with cron expression
10878+ context_expr = Context (
10879+ paths = tmp_path ,
10880+ config = Config (
10881+ model_defaults = ModelDefaultsConfig (dialect = "duckdb" ),
10882+ variables = {"cron_schedule" : "0 */2 * * *" },
10883+ ),
10884+ )
10885+ model_expr = context_expr .models .get ('"memory"."msc"."test_cron_model"' )
10886+ assert model_expr is not None
10887+ assert model_expr .cron == "0 */2 * * *"
10888+
10889+
10890+ def test_python_model_normal_cron (tmp_path : Path ) -> None :
10891+ init_example_project (tmp_path , engine_type = "duckdb" , template = ProjectTemplate .EMPTY )
10892+
10893+ cron_macro_model = tmp_path / "models" / "cron_macro.py"
10894+ cron_macro_model .parent .mkdir (parents = True , exist_ok = True )
10895+ cron_macro_model .write_text (
10896+ """
10897+ import pandas as pd
10898+ from sqlmesh import model
10899+
10900+ @model(
10901+ "msc.normal_test_cron_model",
10902+ kind="FULL",
10903+ cron="@daily",
10904+ columns={"a": "int"},
10905+ )
10906+ def entrypoint(context, **kwargs):
10907+ return pd.DataFrame([{"a": 1}])
10908+ """
10909+ )
10910+
10911+ # Test with cron alias
10912+ context_daily = Context (
10913+ paths = tmp_path ,
10914+ config = Config (
10915+ model_defaults = ModelDefaultsConfig (dialect = "duckdb" ),
10916+ variables = {"cron_schedule" : "@daily" },
10917+ ),
10918+ )
10919+ model_daily = context_daily .models .get ('"memory"."msc"."normal_test_cron_model"' )
10920+
10921+ assert model_daily is not None
10922+ assert model_daily .cron == "@daily"
0 commit comments