diff --git a/docs/guides/linter.md b/docs/guides/linter.md index 22cc5077b8..294ab08b25 100644 --- a/docs/guides/linter.md +++ b/docs/guides/linter.md @@ -74,6 +74,8 @@ Here are all of SQLMesh's built-in linting rules: | `invalidselectstarexpansion` | Correctness | The query's top-level selection may be `SELECT *`, but only if SQLMesh can expand the `SELECT *` into individual columns | | `noselectstar` | Stylistic | The query's top-level selection may not be `SELECT *`, even if SQLMesh can expand the `SELECT *` into individual columns | | `nomissingaudits` | Governance | SQLMesh did not find any `audits` in the model's configuration to test data quality. | +| `nomissingexternalmodels` | Governance | All external models must be registered in the external_models.yaml file | +| `cronintervalalignment` | Governance | Upstream model has a cron expression with longer intervals than downstream model. Example: step_1(`@daily`) -> step_2(`@hourly`) -> step_3(`*/5 * * * *`). step_2 and step_3 will not behave as intended because their temporal grains are smaller than step_1's. step_1 will backfill all of `2020-01-01 - 2020-01-01`. step_2 will backfill every hour interval starting from `2020-01-02 00:00:00 - 2020-01-02 00:01:00`. However, it will wastefully backfill empty intervals given step_1 does not contain data for this interval. This can exacerbate scheduling inefficiencies. The fix is to align the schedules where a downstream model's cron is the same or has a longer cron interval than an upstream model's. | ### User-defined rules diff --git a/examples/sushi/config.py b/examples/sushi/config.py index 2c124421dd..8da1914cd1 100644 --- a/examples/sushi/config.py +++ b/examples/sushi/config.py @@ -49,6 +49,7 @@ "nomissingaudits", "nomissingowner", "nomissingexternalmodels", + "cronintervalalignment", ], ), ) diff --git a/sqlmesh/core/linter/rules/builtin.py b/sqlmesh/core/linter/rules/builtin.py index a793f79434..9a21f599bc 100644 --- a/sqlmesh/core/linter/rules/builtin.py +++ b/sqlmesh/core/linter/rules/builtin.py @@ -24,7 +24,7 @@ CreateFile, ) from sqlmesh.core.linter.definition import RuleSet -from sqlmesh.core.model import Model, SqlModel, ExternalModel +from sqlmesh.core.model import Model, SqlModel, ExternalModel, ModelKindName from sqlmesh.utils.lineage import extract_references_from_query, ExternalModelReference @@ -274,4 +274,43 @@ def create_fix(self, model_name: str) -> t.Optional[Fix]: ) +class CronIntervalAlignment(Rule): + """Upstream model has a cron expression with longer intervals than downstream model.""" + + def check_model(self, model: Model) -> t.Optional[t.List[RuleViolation]]: + placeholder_start_date = "2020-01-01 00:25:00" + + violations = [] + for upstream_model_name in model.depends_on: + upstream_model = self.context.get_model(upstream_model_name) + + # Skip if upstream model doesn't exist + if upstream_model is None: + continue + + # Skip model kinds since they don't run on cron schedules + skip_kinds = {ModelKindName.EXTERNAL, ModelKindName.EMBEDDED, ModelKindName.SEED} + if upstream_model.kind.name in skip_kinds: + continue + + for _ in range(12): + this_next = model.cron_next(placeholder_start_date) + upstream_next = upstream_model.cron_next(placeholder_start_date) + current_time = this_next + + # Find the first iteration pair where upstream runs after downstream + if upstream_next > this_next: + violations.append( + RuleViolation( + rule=self, + violation_msg=f"Upstream model {upstream_model_name} runs less frequently ({upstream_model.cron}) than this model ({model.cron})", + ) + ) + break # Found violation, stop checking this upstream model + else: + return None + + return violations + + BUILTIN_RULES = RuleSet(subclasses(__name__, Rule, (Rule,))) diff --git a/tests/core/linter/test_builtin.py b/tests/core/linter/test_builtin.py index 1a19d036b5..5e6672b386 100644 --- a/tests/core/linter/test_builtin.py +++ b/tests/core/linter/test_builtin.py @@ -1,7 +1,10 @@ import os +import pytest from sqlmesh import Context from sqlmesh.core.linter.rule import Position, Range +from sqlmesh.core.model import load_sql_based_model +from sqlmesh.core import dialect as d def test_no_missing_external_models(tmp_path, copy_to_temp_path) -> None: @@ -31,6 +34,7 @@ def test_no_missing_external_models(tmp_path, copy_to_temp_path) -> None: "nomissingaudits", "nomissingowner", "nomissingexternalmodels", + "cronintervalalignment", ], ),""" after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),""" @@ -84,6 +88,7 @@ def test_no_missing_external_models_with_existing_file_ending_in_newline( "nomissingaudits", "nomissingowner", "nomissingexternalmodels", + "cronintervalalignment", ], ),""" after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),""" @@ -141,6 +146,7 @@ def test_no_missing_external_models_with_existing_file_not_ending_in_newline( "nomissingaudits", "nomissingowner", "nomissingexternalmodels", + "cronintervalalignment", ], ),""" after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),""" @@ -172,3 +178,169 @@ def test_no_missing_external_models_with_existing_file_not_ending_in_newline( ) fix_path = sushi_path / "external_models.yaml" assert edit.path == fix_path + + +@pytest.mark.parametrize( + "upstream_cron,downstream_cron,expected_violations,violation_msg", + [ + ( + "@weekly", + "@daily", + 1, + 'Upstream model "memory"."sushi"."step_1" runs less frequently (@weekly) than this model (@daily)', + ), + ( + "5 * * * *", + "0 * * * *", + 1, + 'Upstream model "memory"."sushi"."step_1" runs less frequently (5 * * * *) than this model (0 * * * *)', + ), + ( + "15 10 * * *", + "0 * * * *", + 1, + 'Upstream model "memory"."sushi"."step_1" runs less frequently (15 10 * * *) than this model (0 * * * *)', + ), + ], +) +def test_cron_interval_alignment( + tmp_path, copy_to_temp_path, upstream_cron, downstream_cron, expected_violations, violation_msg +) -> None: + sushi_paths = copy_to_temp_path("examples/sushi") + sushi_path = sushi_paths[0] + + # Override the config.py to turn on lint + with open(sushi_path / "config.py", "r") as f: + read_file = f.read() + + before = """ linter=LinterConfig( + enabled=False, + rules=[ + "ambiguousorinvalidcolumn", + "invalidselectstarexpansion", + "noselectstar", + "nomissingaudits", + "nomissingowner", + "nomissingexternalmodels", + "cronintervalalignment", + ], + ),""" + after = """linter=LinterConfig(enabled=True, rules=["cronintervalalignment"]),""" + read_file = read_file.replace(before, after) + assert after in read_file + with open(sushi_path / "config.py", "w") as f: + f.writelines(read_file) + + # Load the context with the temporary sushi path + context = Context(paths=[sushi_path]) + + context.load() + + # Create model with cron intervals + upstream_model = load_sql_based_model( + d.parse( + f"MODEL (name memory.sushi.step_1, cron '{upstream_cron}'); SELECT * FROM (SELECT 1)" + ) + ) + + downstream_model = load_sql_based_model( + d.parse( + f"MODEL (name memory.sushi.step_2, cron '{downstream_cron}', depends_on ['memory.sushi.step_1']); SELECT * FROM (SELECT 1)" + ) + ) + + context.upsert_model(upstream_model) + context.upsert_model(downstream_model) + + lints = context.lint_models(raise_on_error=False) + assert len(lints) == expected_violations + + if expected_violations > 0: + lint = lints[0] + assert lint.violation_msg == violation_msg + + +@pytest.mark.parametrize( + "upstream_cron_a,upstream_cron_b,downstream_cron,expected_violations,violation_msg", + [ + ("@weekly", "@hourly", "@daily", 0, None), + ( + "@weekly", + "@weekly", + "@daily", + 2, + [ + 'Upstream model "memory"."sushi"."step_a" runs less frequently (@weekly) than this model (@daily)', + 'Upstream model "memory"."sushi"."step_b" runs less frequently (@weekly) than this model (@daily)', + ], + ), + ], +) +def test_cron_interval_alignment_upstream_multiple_dependencies( + tmp_path, + copy_to_temp_path, + upstream_cron_a, + upstream_cron_b, + downstream_cron, + expected_violations, + violation_msg, +) -> None: + sushi_paths = copy_to_temp_path("examples/sushi") + sushi_path = sushi_paths[0] + + # Override the config.py to turn on lint + with open(sushi_path / "config.py", "r") as f: + read_file = f.read() + + before = """ linter=LinterConfig( + enabled=False, + rules=[ + "ambiguousorinvalidcolumn", + "invalidselectstarexpansion", + "noselectstar", + "nomissingaudits", + "nomissingowner", + "nomissingexternalmodels", + "cronintervalalignment", + ], + ),""" + after = """linter=LinterConfig(enabled=True, rules=["cronintervalalignment"]),""" + read_file = read_file.replace(before, after) + assert after in read_file + with open(sushi_path / "config.py", "w") as f: + f.writelines(read_file) + + # Load the context with the temporary sushi path + context = Context(paths=[sushi_path]) + + context.load() + + # Create model with shorter cron interval that depends on model with longer interval + upstream_model_a = load_sql_based_model( + d.parse( + f"MODEL (name memory.sushi.step_a, cron '{upstream_cron_a}'); SELECT * FROM (SELECT 1)" + ) + ) + + upstream_model_b = load_sql_based_model( + d.parse( + f"MODEL (name memory.sushi.step_b, cron '{upstream_cron_b}'); SELECT * FROM (SELECT 1)" + ) + ) + + downstream_model = load_sql_based_model( + d.parse( + f"MODEL (name memory.sushi.step_c, cron '{downstream_cron}', depends_on ['memory.sushi.step_a', 'memory.sushi.step_b']); SELECT * FROM (SELECT 1)" + ) + ) + + context.upsert_model(upstream_model_a) + context.upsert_model(upstream_model_b) + context.upsert_model(downstream_model) + + lints = context.lint_models(raise_on_error=False) + assert len(lints) == expected_violations + + if expected_violations > 0: + for lint in lints: + assert lint.violation_msg in violation_msg diff --git a/tests/lsp/test_code_actions.py b/tests/lsp/test_code_actions.py index 509f49f9b1..787c85cd20 100644 --- a/tests/lsp/test_code_actions.py +++ b/tests/lsp/test_code_actions.py @@ -131,6 +131,7 @@ def test_code_actions_create_file(copy_to_temp_path: t.Callable) -> None: "nomissingaudits", "nomissingowner", "nomissingexternalmodels", + "cronintervalalignment", ], ),""" after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),""" diff --git a/tests/lsp/test_reference_external_model.py b/tests/lsp/test_reference_external_model.py index 25de22f10f..fbc10d56d9 100644 --- a/tests/lsp/test_reference_external_model.py +++ b/tests/lsp/test_reference_external_model.py @@ -94,6 +94,7 @@ def test_unregistered_external_model_with_schema( "nomissingaudits", "nomissingowner", "nomissingexternalmodels", + "cronintervalalignment", ], ),""" after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),""" diff --git a/vscode/extension/tests/quickfix.spec.ts b/vscode/extension/tests/quickfix.spec.ts index 84896713aa..c87d92ab11 100644 --- a/vscode/extension/tests/quickfix.spec.ts +++ b/vscode/extension/tests/quickfix.spec.ts @@ -35,6 +35,7 @@ test.fixme( "nomissingaudits", "nomissingowner", "nomissingexternalmodels", + "cronintervalalignment", ],`, targetRules, )