Skip to content

Commit 73b530c

Browse files
Chore: Add cron in the fields to render for python models
1 parent 87f5335 commit 73b530c

File tree

2 files changed

+192
-3
lines changed

2 files changed

+192
-3
lines changed

sqlmesh/core/model/definition.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,16 @@
8484
"merge_filter",
8585
} | PROPERTIES
8686

87+
CRON_ALIASES = {
88+
"@midnight",
89+
"@hourly",
90+
"@daily",
91+
"@weekly",
92+
"@monthly",
93+
"@yearly",
94+
"@annually",
95+
}
96+
8797

8898
class _Model(ModelMeta, frozen=True):
8999
"""Model is the core abstraction for user defined datasets.
@@ -2771,9 +2781,15 @@ def render_field_value(value: t.Any) -> t.Any:
27712781
field_value = fields.get(field)
27722782

27732783
# We don't want to parse python model cron="@..." kwargs (e.g. @daily) into MacroVar
2774-
if field == "cron" or field_value is None:
2784+
if (
2785+
field == "cron" and isinstance(field_value, str) and field_value.lower() in CRON_ALIASES
2786+
) or field_value is None:
27752787
continue
27762788

2789+
# If it contains a macro reference we need to render it similar to sql models
2790+
if field == "cron" and isinstance(field_value, str):
2791+
field_value = exp.to_identifier(field_value)
2792+
27772793
if field in RUNTIME_RENDERED_MODEL_FIELDS:
27782794
fields[field] = parse_strings_with_macro_refs(field_value, dialect)
27792795
continue

tests/core/test_model.py

Lines changed: 175 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4263,6 +4263,7 @@ def python_model_prop(context, **kwargs):
42634263
def test_model_defaults_macros(make_snapshot):
42644264
model_defaults = ModelDefaultsConfig(
42654265
table_format="@IF(@gateway = 'dev', 'iceberg', NULL)",
4266+
cron="@cron_macro",
42664267
storage_format="@IF(@gateway = 'local', 'parquet', NULL)",
42674268
optimize_query="@IF(@gateway = 'dev', True, False)",
42684269
enabled="@IF(@gateway = 'dev', True, False)",
@@ -4298,7 +4299,7 @@ def test_model_defaults_macros(make_snapshot):
42984299
default_dialect="snowflake",
42994300
),
43004301
defaults=model_defaults.dict(),
4301-
variables={"gateway": "dev", "create_type": "SECURE"},
4302+
variables={"gateway": "dev", "create_type": "SECURE", "cron_macro": "@daily"},
43024303
)
43034304

43044305
snapshot: Snapshot = make_snapshot(model)
@@ -4311,6 +4312,7 @@ def test_model_defaults_macros(make_snapshot):
43114312
assert not model.allow_partials
43124313
assert model.interval_unit == IntervalUnit.DAY
43134314
assert model.table_format == "iceberg"
4315+
assert model.cron == "@daily"
43144316

43154317
# Validate disabling of conditional model default
43164318
assert not model.storage_format
@@ -4363,6 +4365,7 @@ def test_model_defaults_macros_python_model(make_snapshot):
43634365
"partition_expiration_days": 13,
43644366
"creatable_type": "@IF(@model_kind_name = 'FULL', 'TRANSIENT', NULL)",
43654367
},
4368+
"cron": "@cron_macro_expr",
43664369
"table_format": "@IF(@gateway = 'local', 'iceberg', NULL)",
43674370
"storage_format": "@IF(@gateway = 'dev', 'parquet', NULL)",
43684371
"optimize_query": "@IF(@gateway = 'local', True, False)",
@@ -4391,13 +4394,14 @@ def python_model_prop_macro(context, **kwargs):
43914394
path=Path("."),
43924395
dialect="duckdb",
43934396
defaults=model_defaults,
4394-
variables={"gateway": "local", "create_type": "SECURE"},
4397+
variables={"gateway": "local", "create_type": "SECURE", "cron_macro_expr": "0 */2 * * *"},
43954398
)
43964399

43974400
# Even if in the project wide defaults this is ignored for python models
43984401
assert not m.optimize_query
43994402

44004403
# Validate rendering of model defaults
4404+
assert m.cron == "0 */2 * * *"
44014405
assert m.enabled
44024406
assert m.start == "2024-01-01"
44034407
assert m.allow_partials
@@ -6379,6 +6383,7 @@ def test_macros_python_model(mocker: MockerFixture) -> None:
63796383
columns={"a": "string"},
63806384
kind=dict(name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, time_column="@{time_col}"),
63816385
stamp="@{stamp}",
6386+
cron="@some_cron_var",
63826387
owner="@IF(@gateway = 'dev', @{dev_owner}, @{prod_owner})",
63836388
enabled="@IF(@gateway = 'dev', True, False)",
63846389
start="@IF(@gateway = 'dev', '1 month ago', '2024-01-01')",
@@ -6406,6 +6411,7 @@ def model_with_macros(context, **kwargs):
64066411
"prod_owner": "pr_1",
64076412
"stamp": "bump",
64086413
"time_col": "a",
6414+
"some_cron_var": "@daily",
64096415
},
64106416
)
64116417

@@ -6417,6 +6423,7 @@ def model_with_macros(context, **kwargs):
64176423
assert python_model.stamp == "bump"
64186424
assert python_model.time_column.column == exp.column("a", quoted=True)
64196425
assert python_model.partitioned_by[0].sql() == 'DATETIME_TRUNC("a", MONTH)'
6426+
assert python_model.cron == "@daily"
64206427

64216428
context = ExecutionContext(mocker.Mock(), {}, None, None)
64226429
df = list(python_model.render(context=context))[0]
@@ -10747,3 +10754,169 @@ def test_datetime_without_timezone_variable_redshift() -> None:
1074710754
model.render_query_or_raise().sql("redshift")
1074810755
== '''SELECT CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS "test_time_col"'''
1074910756
)
10757+
10758+
10759+
def test_python_model_cron_with_blueprints(tmp_path: Path) -> None:
10760+
init_example_project(tmp_path, engine_type="duckdb", template=ProjectTemplate.EMPTY)
10761+
10762+
cron_blueprint_model = tmp_path / "models" / "cron_blueprint.py"
10763+
cron_blueprint_model.parent.mkdir(parents=True, exist_ok=True)
10764+
cron_blueprint_model.write_text(
10765+
"""
10766+
import typing as t
10767+
from datetime import datetime
10768+
10769+
import pandas as pd
10770+
from sqlmesh import ExecutionContext, model
10771+
10772+
@model(
10773+
"@{customer}.some_table",
10774+
kind="FULL",
10775+
cron="*/@{min} * * * *",
10776+
blueprints=[
10777+
{"customer": "customer1", "field_a": "x", "field_b": "y", "min": 5},
10778+
{"customer": "customer2", "field_a": "z", "field_b": "w", "min": 10},
10779+
],
10780+
columns={
10781+
"field_a": "text",
10782+
"field_b": "text",
10783+
"customer": "text",
10784+
},
10785+
enabled=True
10786+
)
10787+
def entrypoint(
10788+
context: ExecutionContext,
10789+
start: datetime,
10790+
end: datetime,
10791+
execution_time: datetime,
10792+
**kwargs: t.Any,
10793+
) -> pd.DataFrame:
10794+
return pd.DataFrame(
10795+
{
10796+
"field_a": [context.blueprint_var("field_a")],
10797+
"field_b": [context.blueprint_var("field_b")],
10798+
"customer": [context.blueprint_var("customer")],
10799+
}
10800+
)
10801+
"""
10802+
)
10803+
10804+
context = Context(
10805+
paths=tmp_path, config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
10806+
)
10807+
models = context.models
10808+
10809+
# Test first blueprint
10810+
customer1_model = models.get('"memory"."customer1"."some_table"')
10811+
assert customer1_model is not None
10812+
assert customer1_model.cron == "*/5 * * * *"
10813+
assert customer1_model.enabled
10814+
assert "blueprints" not in customer1_model.all_fields()
10815+
assert customer1_model.python_env.get(c.SQLMESH_BLUEPRINT_VARS) == Executable.value(
10816+
{"customer": "customer1", "field_a": "x", "field_b": "y", "min": 5}
10817+
)
10818+
10819+
# Test second blueprint
10820+
customer2_model = models.get('"memory"."customer2"."some_table"')
10821+
assert customer2_model is not None
10822+
assert customer2_model.cron == "*/10 * * * *"
10823+
assert customer2_model.python_env.get(c.SQLMESH_BLUEPRINT_VARS) == Executable.value(
10824+
{"customer": "customer2", "field_a": "z", "field_b": "w", "min": 10}
10825+
)
10826+
10827+
# Test that the models can be planned and applied
10828+
context.plan(no_prompts=True, auto_apply=True, no_diff=True)
10829+
10830+
# Verify the data
10831+
assert context.fetchdf('from "memory"."customer1"."some_table"').to_dict() == {
10832+
"field_a": {0: "x"},
10833+
"field_b": {0: "y"},
10834+
"customer": {0: "customer1"},
10835+
}
10836+
assert context.fetchdf('from "memory"."customer2"."some_table"').to_dict() == {
10837+
"field_a": {0: "z"},
10838+
"field_b": {0: "w"},
10839+
"customer": {0: "customer2"},
10840+
}
10841+
10842+
10843+
def test_python_model_cron_macro_rendering(tmp_path: Path) -> None:
10844+
init_example_project(tmp_path, engine_type="duckdb", template=ProjectTemplate.EMPTY)
10845+
10846+
cron_macro_model = tmp_path / "models" / "cron_macro.py"
10847+
cron_macro_model.parent.mkdir(parents=True, exist_ok=True)
10848+
cron_macro_model.write_text(
10849+
"""
10850+
import pandas as pd
10851+
from sqlmesh import model
10852+
10853+
@model(
10854+
"msc.test_cron_model",
10855+
kind="FULL",
10856+
cron="@{cron_schedule}",
10857+
columns={"a": "int"},
10858+
)
10859+
def entrypoint(context, **kwargs):
10860+
return pd.DataFrame([{"a": 1}])
10861+
"""
10862+
)
10863+
10864+
# Test with cron alias
10865+
context_daily = Context(
10866+
paths=tmp_path,
10867+
config=Config(
10868+
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
10869+
variables={"cron_schedule": "@daily"},
10870+
),
10871+
)
10872+
model_daily = context_daily.models.get('"memory"."msc"."test_cron_model"')
10873+
10874+
assert model_daily is not None
10875+
assert model_daily.cron == "@daily"
10876+
10877+
# Test with cron expression
10878+
context_expr = Context(
10879+
paths=tmp_path,
10880+
config=Config(
10881+
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
10882+
variables={"cron_schedule": "0 */2 * * *"},
10883+
),
10884+
)
10885+
model_expr = context_expr.models.get('"memory"."msc"."test_cron_model"')
10886+
assert model_expr is not None
10887+
assert model_expr.cron == "0 */2 * * *"
10888+
10889+
10890+
def test_python_model_normal_cron(tmp_path: Path) -> None:
10891+
init_example_project(tmp_path, engine_type="duckdb", template=ProjectTemplate.EMPTY)
10892+
10893+
cron_macro_model = tmp_path / "models" / "cron_macro.py"
10894+
cron_macro_model.parent.mkdir(parents=True, exist_ok=True)
10895+
cron_macro_model.write_text(
10896+
"""
10897+
import pandas as pd
10898+
from sqlmesh import model
10899+
10900+
@model(
10901+
"msc.normal_test_cron_model",
10902+
kind="FULL",
10903+
cron="@daily",
10904+
columns={"a": "int"},
10905+
)
10906+
def entrypoint(context, **kwargs):
10907+
return pd.DataFrame([{"a": 1}])
10908+
"""
10909+
)
10910+
10911+
# Test with cron alias
10912+
context_daily = Context(
10913+
paths=tmp_path,
10914+
config=Config(
10915+
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
10916+
variables={"cron_schedule": "@daily"},
10917+
),
10918+
)
10919+
model_daily = context_daily.models.get('"memory"."msc"."normal_test_cron_model"')
10920+
10921+
assert model_daily is not None
10922+
assert model_daily.cron == "@daily"

0 commit comments

Comments
 (0)