Skip to content

Commit e839d4e

Browse files
committed
fix implementation
1 parent 5861dba commit e839d4e

File tree

2 files changed

+83
-24
lines changed

2 files changed

+83
-24
lines changed

sqlmesh/core/linter/rules/builtin.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from sqlglot.expressions import Star
88
from sqlglot.helper import subclasses
99

10+
from sqlmesh.core.node import IntervalUnit
1011
from sqlmesh.core.constants import EXTERNAL_MODELS_YAML
1112
from sqlmesh.core.dialect import normalize_model_name
1213
from sqlmesh.core.linter.helpers import (
@@ -297,14 +298,18 @@ def check_model(self, model: Model) -> t.Optional[t.List[RuleViolation]]:
297298

298299
upstream_model_cron_next = upstream_model.cron_next(placeholder_start_date)
299300

301+
upstream_cron_interval_unit = IntervalUnit.from_cron(upstream_model.cron)
302+
this_cron_interval_unit = IntervalUnit.from_cron(model.cron)
303+
rule_violation = RuleViolation(
304+
rule=self,
305+
violation_msg=f"Upstream model {upstream_model_name} has longer cron interval ({upstream_model.cron}) "
306+
f"than this model ({model.cron})",
307+
)
308+
300309
if upstream_model_cron_next > this_model_cron_next:
301-
violations.append(
302-
RuleViolation(
303-
rule=self,
304-
violation_msg=f"Upstream model {upstream_model_name} has longer cron interval ({upstream_model.cron}) "
305-
f"than this model ({model.cron})",
306-
)
307-
)
310+
violations.append(rule_violation)
311+
elif upstream_cron_interval_unit.seconds > this_cron_interval_unit.seconds:
312+
violations.append(rule_violation)
308313
elif upstream_model_cron_next <= this_model_cron_next:
309314
return None
310315
return violations

tests/core/linter/test_builtin.py

Lines changed: 71 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import pytest
23

34
from sqlmesh import Context
45
from sqlmesh.core.linter.rule import Position, Range
@@ -179,7 +180,27 @@ def test_no_missing_external_models_with_existing_file_not_ending_in_newline(
179180
assert edit.path == fix_path
180181

181182

182-
def test_cron_interval_alignment(tmp_path, copy_to_temp_path) -> None:
183+
@pytest.mark.parametrize(
184+
"upstream_cron,downstream_cron,expected_violations,violation_msg",
185+
[
186+
(
187+
"@weekly",
188+
"@daily",
189+
1,
190+
'Upstream model "memory"."sushi"."step_1" has longer cron interval (@weekly) than this model (@daily)',
191+
),
192+
("5 * * * *", "0 * * * *", 0, None),
193+
(
194+
"15 10 * * *",
195+
"0 * * * *",
196+
1,
197+
'Upstream model "memory"."sushi"."step_1" has longer cron interval (15 10 * * *) than this model (0 * * * *)',
198+
),
199+
],
200+
)
201+
def test_cron_interval_alignment(
202+
tmp_path, copy_to_temp_path, upstream_cron, downstream_cron, expected_violations, violation_msg
203+
) -> None:
183204
sushi_paths = copy_to_temp_path("examples/sushi")
184205
sushi_path = sushi_paths[0]
185206

@@ -210,30 +231,55 @@ def test_cron_interval_alignment(tmp_path, copy_to_temp_path) -> None:
210231

211232
context.load()
212233

213-
# Create model with shorter cron interval that depends on model with longer interval
234+
# Create model with cron intervals
214235
upstream_model = load_sql_based_model(
215-
d.parse("MODEL (name memory.sushi.step_1, cron '@weekly'); SELECT * FROM (SELECT 1)")
236+
d.parse(
237+
f"MODEL (name memory.sushi.step_1, cron '{upstream_cron}'); SELECT * FROM (SELECT 1)"
238+
)
216239
)
217240

218241
downstream_model = load_sql_based_model(
219242
d.parse(
220-
"MODEL (name memory.sushi.step_2, cron '@daily', depends_on ['memory.sushi.step_1']); SELECT * FROM (SELECT 1)"
243+
f"MODEL (name memory.sushi.step_2, cron '{downstream_cron}', depends_on ['memory.sushi.step_1']); SELECT * FROM (SELECT 1)"
221244
)
222245
)
223246

224247
context.upsert_model(upstream_model)
225248
context.upsert_model(downstream_model)
226249

227250
lints = context.lint_models(raise_on_error=False)
228-
assert len(lints) == 1
229-
lint = lints[0]
230-
assert (
231-
lint.violation_msg
232-
== 'Upstream model "memory"."sushi"."step_1" has longer cron interval (@weekly) than this model (@daily)'
233-
)
234-
235-
236-
def test_cron_interval_alignment_valid_upstream(tmp_path, copy_to_temp_path) -> None:
251+
assert len(lints) == expected_violations
252+
253+
if expected_violations > 0:
254+
lint = lints[0]
255+
assert lint.violation_msg == violation_msg
256+
257+
258+
@pytest.mark.parametrize(
259+
"upstream_cron_a,upstream_cron_b,downstream_cron,expected_violations,violation_msg",
260+
[
261+
("@weekly", "@hourly", "@daily", 0, None),
262+
(
263+
"@weekly",
264+
"@weekly",
265+
"@daily",
266+
2,
267+
[
268+
'Upstream model "memory"."sushi"."step_a" has longer cron interval (@weekly) than this model (@daily)',
269+
'Upstream model "memory"."sushi"."step_b" has longer cron interval (@weekly) than this model (@daily)',
270+
],
271+
),
272+
],
273+
)
274+
def test_cron_interval_alignment_valid_upstream_multiple_dependencies(
275+
tmp_path,
276+
copy_to_temp_path,
277+
upstream_cron_a,
278+
upstream_cron_b,
279+
downstream_cron,
280+
expected_violations,
281+
violation_msg,
282+
) -> None:
237283
sushi_paths = copy_to_temp_path("examples/sushi")
238284
sushi_path = sushi_paths[0]
239285

@@ -266,16 +312,20 @@ def test_cron_interval_alignment_valid_upstream(tmp_path, copy_to_temp_path) ->
266312

267313
# Create model with shorter cron interval that depends on model with longer interval
268314
upstream_model_a = load_sql_based_model(
269-
d.parse("MODEL (name memory.sushi.step_a, cron '@weekly'); SELECT * FROM (SELECT 1)")
315+
d.parse(
316+
f"MODEL (name memory.sushi.step_a, cron '{upstream_cron_a}'); SELECT * FROM (SELECT 1)"
317+
)
270318
)
271319

272320
upstream_model_b = load_sql_based_model(
273-
d.parse("MODEL (name memory.sushi.step_b, cron '@hourly'); SELECT * FROM (SELECT 1)")
321+
d.parse(
322+
f"MODEL (name memory.sushi.step_b, cron '{upstream_cron_b}'); SELECT * FROM (SELECT 1)"
323+
)
274324
)
275325

276326
downstream_model = load_sql_based_model(
277327
d.parse(
278-
"MODEL (name memory.sushi.step_c, cron '@daily', depends_on ['memory.sushi.step_1', 'memory.sushi.step_b']); SELECT * FROM (SELECT 1)"
328+
f"MODEL (name memory.sushi.step_c, cron '{downstream_cron}', depends_on ['memory.sushi.step_a', 'memory.sushi.step_b']); SELECT * FROM (SELECT 1)"
279329
)
280330
)
281331

@@ -284,4 +334,8 @@ def test_cron_interval_alignment_valid_upstream(tmp_path, copy_to_temp_path) ->
284334
context.upsert_model(downstream_model)
285335

286336
lints = context.lint_models(raise_on_error=False)
287-
assert len(lints) == 0
337+
assert len(lints) == expected_violations
338+
339+
if expected_violations > 0:
340+
for lint in lints:
341+
assert lint.violation_msg in violation_msg

0 commit comments

Comments
 (0)