Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/guides/linter.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ Here are all of SQLMesh's built-in linting rules:
| `invalidselectstarexpansion` | Correctness | The query's top-level selection may be `SELECT *`, but only if SQLMesh can expand the `SELECT *` into individual columns |
| `noselectstar` | Stylistic | The query's top-level selection may not be `SELECT *`, even if SQLMesh can expand the `SELECT *` into individual columns |
| `nomissingaudits` | Governance | SQLMesh did not find any `audits` in the model's configuration to test data quality. |
| `nomissingexternalmodels` | Governance | All external models must be registered in the external_models.yaml file |
| `cronintervalalignment` | Governance | Upstream model has a cron expression with longer intervals than downstream model. Example: step_1(`@daily`) -> step_2(`@hourly`) -> step_3(`*/5 * * * *`). step_2 and step_3 will not behave as intended because their temporal grains are smaller than step_1's. step_1 will backfill all of `2020-01-01 - 2020-01-01`. step_2 will backfill every hour interval starting from `2020-01-02 00:00:00 - 2020-01-02 00:01:00`. However, it will wastefully backfill empty intervals given step_1 does not contain data for this interval. This can exacerbate scheduling inefficiencies. The fix is to align the schedules where a downstream model's cron is the same or has a longer cron interval than an upstream model's. |

### User-defined rules

Expand Down
1 change: 1 addition & 0 deletions examples/sushi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronintervalalignment",
],
),
)
Expand Down
41 changes: 40 additions & 1 deletion sqlmesh/core/linter/rules/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
CreateFile,
)
from sqlmesh.core.linter.definition import RuleSet
from sqlmesh.core.model import Model, SqlModel, ExternalModel
from sqlmesh.core.model import Model, SqlModel, ExternalModel, ModelKindName
from sqlmesh.utils.lineage import extract_references_from_query, ExternalModelReference


Expand Down Expand Up @@ -274,4 +274,43 @@ def create_fix(self, model_name: str) -> t.Optional[Fix]:
)


class CronIntervalAlignment(Rule):
"""Upstream model has a cron expression with longer intervals than downstream model."""

def check_model(self, model: Model) -> t.Optional[t.List[RuleViolation]]:
placeholder_start_date = "2020-01-01 00:25:00"

violations = []
for upstream_model_name in model.depends_on:
upstream_model = self.context.get_model(upstream_model_name)

# Skip if upstream model doesn't exist
if upstream_model is None:
continue

# Skip model kinds since they don't run on cron schedules
skip_kinds = {ModelKindName.EXTERNAL, ModelKindName.EMBEDDED, ModelKindName.SEED}
if upstream_model.kind.name in skip_kinds:
continue

for _ in range(12):
this_next = model.cron_next(placeholder_start_date)
upstream_next = upstream_model.cron_next(placeholder_start_date)
current_time = this_next

# Find the first iteration pair where upstream runs after downstream
if upstream_next > this_next:
violations.append(
RuleViolation(
rule=self,
violation_msg=f"Upstream model {upstream_model_name} runs less frequently ({upstream_model.cron}) than this model ({model.cron})",
)
)
break # Found violation, stop checking this upstream model
else:
return None

return violations


BUILTIN_RULES = RuleSet(subclasses(__name__, Rule, (Rule,)))
172 changes: 172 additions & 0 deletions tests/core/linter/test_builtin.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import pytest

from sqlmesh import Context
from sqlmesh.core.linter.rule import Position, Range
from sqlmesh.core.model import load_sql_based_model
from sqlmesh.core import dialect as d


def test_no_missing_external_models(tmp_path, copy_to_temp_path) -> None:
Expand Down Expand Up @@ -31,6 +34,7 @@ def test_no_missing_external_models(tmp_path, copy_to_temp_path) -> None:
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronintervalalignment",
],
),"""
after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),"""
Expand Down Expand Up @@ -84,6 +88,7 @@ def test_no_missing_external_models_with_existing_file_ending_in_newline(
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronintervalalignment",
],
),"""
after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),"""
Expand Down Expand Up @@ -141,6 +146,7 @@ def test_no_missing_external_models_with_existing_file_not_ending_in_newline(
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronintervalalignment",
],
),"""
after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),"""
Expand Down Expand Up @@ -172,3 +178,169 @@ def test_no_missing_external_models_with_existing_file_not_ending_in_newline(
)
fix_path = sushi_path / "external_models.yaml"
assert edit.path == fix_path


@pytest.mark.parametrize(
"upstream_cron,downstream_cron,expected_violations,violation_msg",
[
(
"@weekly",
"@daily",
1,
'Upstream model "memory"."sushi"."step_1" runs less frequently (@weekly) than this model (@daily)',
),
(
"5 * * * *",
"0 * * * *",
1,
'Upstream model "memory"."sushi"."step_1" runs less frequently (5 * * * *) than this model (0 * * * *)',
),
(
"15 10 * * *",
"0 * * * *",
1,
'Upstream model "memory"."sushi"."step_1" runs less frequently (15 10 * * *) than this model (0 * * * *)',
),
],
)
def test_cron_interval_alignment(
tmp_path, copy_to_temp_path, upstream_cron, downstream_cron, expected_violations, violation_msg
) -> None:
sushi_paths = copy_to_temp_path("examples/sushi")
sushi_path = sushi_paths[0]

# Override the config.py to turn on lint
with open(sushi_path / "config.py", "r") as f:
read_file = f.read()

before = """ linter=LinterConfig(
enabled=False,
rules=[
"ambiguousorinvalidcolumn",
"invalidselectstarexpansion",
"noselectstar",
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronintervalalignment",
],
),"""
after = """linter=LinterConfig(enabled=True, rules=["cronintervalalignment"]),"""
read_file = read_file.replace(before, after)
assert after in read_file
with open(sushi_path / "config.py", "w") as f:
f.writelines(read_file)

# Load the context with the temporary sushi path
context = Context(paths=[sushi_path])

context.load()

# Create model with cron intervals
upstream_model = load_sql_based_model(
d.parse(
f"MODEL (name memory.sushi.step_1, cron '{upstream_cron}'); SELECT * FROM (SELECT 1)"
)
)

downstream_model = load_sql_based_model(
d.parse(
f"MODEL (name memory.sushi.step_2, cron '{downstream_cron}', depends_on ['memory.sushi.step_1']); SELECT * FROM (SELECT 1)"
)
)

context.upsert_model(upstream_model)
context.upsert_model(downstream_model)

lints = context.lint_models(raise_on_error=False)
assert len(lints) == expected_violations

if expected_violations > 0:
lint = lints[0]
assert lint.violation_msg == violation_msg


@pytest.mark.parametrize(
"upstream_cron_a,upstream_cron_b,downstream_cron,expected_violations,violation_msg",
[
("@weekly", "@hourly", "@daily", 0, None),
(
"@weekly",
"@weekly",
"@daily",
2,
[
'Upstream model "memory"."sushi"."step_a" runs less frequently (@weekly) than this model (@daily)',
'Upstream model "memory"."sushi"."step_b" runs less frequently (@weekly) than this model (@daily)',
],
),
],
)
def test_cron_interval_alignment_upstream_multiple_dependencies(
tmp_path,
copy_to_temp_path,
upstream_cron_a,
upstream_cron_b,
downstream_cron,
expected_violations,
violation_msg,
) -> None:
sushi_paths = copy_to_temp_path("examples/sushi")
sushi_path = sushi_paths[0]

# Override the config.py to turn on lint
with open(sushi_path / "config.py", "r") as f:
read_file = f.read()

before = """ linter=LinterConfig(
enabled=False,
rules=[
"ambiguousorinvalidcolumn",
"invalidselectstarexpansion",
"noselectstar",
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronintervalalignment",
],
),"""
after = """linter=LinterConfig(enabled=True, rules=["cronintervalalignment"]),"""
read_file = read_file.replace(before, after)
assert after in read_file
with open(sushi_path / "config.py", "w") as f:
f.writelines(read_file)

# Load the context with the temporary sushi path
context = Context(paths=[sushi_path])

context.load()

# Create model with shorter cron interval that depends on model with longer interval
upstream_model_a = load_sql_based_model(
d.parse(
f"MODEL (name memory.sushi.step_a, cron '{upstream_cron_a}'); SELECT * FROM (SELECT 1)"
)
)

upstream_model_b = load_sql_based_model(
d.parse(
f"MODEL (name memory.sushi.step_b, cron '{upstream_cron_b}'); SELECT * FROM (SELECT 1)"
)
)

downstream_model = load_sql_based_model(
d.parse(
f"MODEL (name memory.sushi.step_c, cron '{downstream_cron}', depends_on ['memory.sushi.step_a', 'memory.sushi.step_b']); SELECT * FROM (SELECT 1)"
)
)

context.upsert_model(upstream_model_a)
context.upsert_model(upstream_model_b)
context.upsert_model(downstream_model)

lints = context.lint_models(raise_on_error=False)
assert len(lints) == expected_violations

if expected_violations > 0:
for lint in lints:
assert lint.violation_msg in violation_msg
1 change: 1 addition & 0 deletions tests/lsp/test_code_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def test_code_actions_create_file(copy_to_temp_path: t.Callable) -> None:
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronintervalalignment",
],
),"""
after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),"""
Expand Down
1 change: 1 addition & 0 deletions tests/lsp/test_reference_external_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def test_unregistered_external_model_with_schema(
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronintervalalignment",
],
),"""
after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),"""
Expand Down
1 change: 1 addition & 0 deletions vscode/extension/tests/quickfix.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ test.fixme(
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronintervalalignment",
],`,
targetRules,
)
Expand Down