From ba16b224d0fd5d1144366793ce7567fc3fc8c2db Mon Sep 17 00:00:00 2001 From: vaggelisd Date: Thu, 31 Jul 2025 13:46:03 +0300 Subject: [PATCH 1/2] Fix: Unit test CTE failures not being captured --- sqlmesh/core/console.py | 52 ++++++---------- sqlmesh/core/test/definition.py | 17 +++-- sqlmesh/core/test/result.py | 19 +++++- tests/core/test_test.py | 107 +++++++++++++++++++++++++++++++- 4 files changed, 153 insertions(+), 42 deletions(-) diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index 5622c21107..cf87fd7443 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -2163,13 +2163,12 @@ def log_test_results(self, result: ModelTextTestResult, target_dialect: str) -> self._print("-" * divider_length) self._print("Test Failure Summary", style="red") self._print("=" * divider_length) - failures = len(result.failures) + len(result.errors) + fail_and_error_tests = result.get_fail_and_error_tests() self._print(f"{message} \n") - self._print(f"Failed tests ({failures}):") - for test, _ in result.failures + result.errors: - if isinstance(test, ModelTest): - self._print(f" • {test.path}::{test.test_name}") + self._print(f"Failed tests ({len(fail_and_error_tests)}):") + for test in fail_and_error_tests: + self._print(f" • {test.path}::{test.test_name}") self._print("=" * divider_length, end="\n\n") def _captured_unit_test_results(self, result: ModelTextTestResult) -> str: @@ -2721,28 +2720,15 @@ def _log_test_details( Args: result: The unittest test result that contains metrics like num success, fails, ect. """ - if result.wasSuccessful(): self._print("\n", end="") return - errors = result.errors - failures = result.failures - skipped = result.skipped - - infos = [] - if failures: - infos.append(f"failures={len(failures)}") - if errors: - infos.append(f"errors={len(errors)}") - if skipped: - infos.append(f"skipped={skipped}") - if unittest_char_separator: self._print(f"\n{unittest.TextTestResult.separator1}\n\n", end="") for (test_case, failure), test_failure_tables in zip_longest( # type: ignore - failures, result.failure_tables + result.failures, result.failure_tables ): self._print(unittest.TextTestResult.separator2) self._print(f"FAIL: {test_case}") @@ -2758,7 +2744,7 @@ def _log_test_details( self._print(failure_table) self._print("\n", end="") - for test_case, error in errors: + for test_case, error in result.errors: self._print(unittest.TextTestResult.separator2) self._print(f"ERROR: {test_case}") self._print(f"{unittest.TextTestResult.separator2}") @@ -3080,27 +3066,27 @@ def log_test_results(self, result: ModelTextTestResult, target_dialect: str) -> fail_shared_style = {**shared_style, **fail_color} header = str(h("span", {"style": fail_shared_style}, "-" * divider_length)) message = str(h("span", {"style": fail_shared_style}, "Test Failure Summary")) + fail_and_error_tests = result.get_fail_and_error_tests() failed_tests = [ str( h( "span", {"style": fail_shared_style}, - f"Failed tests ({len(result.failures) + len(result.errors)}):", + f"Failed tests ({len(fail_and_error_tests)}):", ) ) ] - for test, _ in result.failures + result.errors: - if isinstance(test, ModelTest): - failed_tests.append( - str( - h( - "span", - {"style": fail_shared_style}, - f" • {test.model.name}::{test.test_name}", - ) + for test in fail_and_error_tests: + failed_tests.append( + str( + h( + "span", + {"style": fail_shared_style}, + f" • {test.model.name}::{test.test_name}", ) ) + ) failures = "
".join(failed_tests) footer = str(h("span", {"style": fail_shared_style}, "=" * divider_length)) error_output = widgets.Textarea(output, layout={"height": "300px", "width": "100%"}) @@ -3508,10 +3494,10 @@ def log_test_results(self, result: ModelTextTestResult, target_dialect: str) -> self._log_test_details(result, unittest_char_separator=False) self._print("```\n\n") - failures = len(result.failures) + len(result.errors) + fail_and_error_tests = result.get_fail_and_error_tests() self._print(f"**{message}**\n") - self._print(f"**Failed tests ({failures}):**") - for test, _ in result.failures + result.errors: + self._print(f"**Failed tests ({len(fail_and_error_tests)}):**") + for test in fail_and_error_tests: if isinstance(test, ModelTest): self._print(f" • `{test.model.name}`::`{test.test_name}`\n\n") diff --git a/sqlmesh/core/test/definition.py b/sqlmesh/core/test/definition.py index c0e2f7a08e..8123f52d26 100644 --- a/sqlmesh/core/test/definition.py +++ b/sqlmesh/core/test/definition.py @@ -317,6 +317,13 @@ def _to_hashable(x: t.Any) -> t.Any: # # This is a bit of a hack, but it's a way to get the best of both worlds. args: t.List[t.Any] = [] + + failed_subtest = "" + + if subtest := getattr(self, "_subtest", None): + if cte := subtest.params.get("cte"): + failed_subtest = f" (CTE {cte})" + if expected.shape != actual.shape: _raise_if_unexpected_columns(expected.columns, actual.columns) @@ -325,13 +332,13 @@ def _to_hashable(x: t.Any) -> t.Any: missing_rows = _row_difference(expected, actual) if not missing_rows.empty: args[0] += f"\n\nMissing rows:\n\n{missing_rows}" - args.append(df_to_table("Missing rows", missing_rows)) + args.append(df_to_table(f"Missing rows{failed_subtest}", missing_rows)) unexpected_rows = _row_difference(actual, expected) if not unexpected_rows.empty: args[0] += f"\n\nUnexpected rows:\n\n{unexpected_rows}" - args.append(df_to_table("Unexpected rows", unexpected_rows)) + args.append(df_to_table(f"Unexpected rows{failed_subtest}", unexpected_rows)) else: diff = expected.compare(actual).rename(columns={"self": "exp", "other": "act"}) @@ -341,7 +348,8 @@ def _to_hashable(x: t.Any) -> t.Any: diff.rename(columns={"exp": "Expected", "act": "Actual"}, inplace=True) if self.verbosity == Verbosity.DEFAULT: args.extend( - df_to_table("Data mismatch", df) for df in _split_df_by_column_pairs(diff) + df_to_table(f"Data mismatch{failed_subtest}", df) + for df in _split_df_by_column_pairs(diff) ) else: from pandas import MultiIndex @@ -351,7 +359,8 @@ def _to_hashable(x: t.Any) -> t.Any: col_diff = diff[col] if not col_diff.empty: table = df_to_table( - f"[bold red]Column '{col}' mismatch[/bold red]", col_diff + f"[bold red]Column '{col}' mismatch{failed_subtest}[/bold red]", + col_diff, ) args.append(table) diff --git a/sqlmesh/core/test/result.py b/sqlmesh/core/test/result.py index 8621b8b10a..eefa0be513 100644 --- a/sqlmesh/core/test/result.py +++ b/sqlmesh/core/test/result.py @@ -4,6 +4,8 @@ import typing as t import unittest +from sqlmesh.core.test.definition import ModelTest + if t.TYPE_CHECKING: ErrorType = t.Union[ t.Tuple[type[BaseException], BaseException, types.TracebackType], @@ -42,7 +44,10 @@ def addSubTest( exctype, value, tb = err err = (exctype, value, None) # type: ignore - super().addSubTest(test, subtest, err) + if err[0] and issubclass(err[0], test.failureException): + self.addFailure(test, err) + else: + self.addError(test, err) def _print_char(self, char: str) -> None: from sqlmesh.core.console import TerminalConsole @@ -117,4 +122,14 @@ def merge(self, other: ModelTextTestResult) -> None: skipped_args = other.skipped[0] self.addSkip(skipped_args[0], skipped_args[1]) - self.testsRun += 1 + self.testsRun += other.testsRun + + def get_fail_and_error_tests(self) -> t.List[ModelTest]: + # If tests contain failed subtests (e.g testing CTE outputs) we don't want + # to report it as different test failures + test_name_to_test = { + test.test_name: test + for test, _ in self.failures + self.errors + if isinstance(test, ModelTest) + } + return list(test_name_to_test.values()) diff --git a/tests/core/test_test.py b/tests/core/test_test.py index 31fe0f3495..521773d1ca 100644 --- a/tests/core/test_test.py +++ b/tests/core/test_test.py @@ -6,7 +6,7 @@ from pathlib import Path import unittest from unittest.mock import call, patch -from shutil import copyfile, rmtree +from shutil import rmtree import pandas as pd # noqa: TID253 import pytest @@ -87,6 +87,7 @@ def _check_successful_or_raise( assert result is not None if not result.wasSuccessful(): error_or_failure_traceback = (result.errors or result.failures)[0][1] + print(error_or_failure_traceback) if expected_msg: assert expected_msg in error_or_failure_traceback else: @@ -2316,6 +2317,13 @@ def test_test_with_resolve_template_macro(tmp_path: Path): @use_terminal_console def test_test_output(tmp_path: Path) -> None: + def copy_test_file(test_file: Path, new_test_file: Path, index: int) -> None: + with open(test_file, "r") as file: + filedata = file.read() + + with open(new_test_file, "w") as file: + file.write(filedata.replace("test_example_full_model", f"test_{index}")) + init_example_project(tmp_path, engine_type="duckdb") original_test_file = tmp_path / "tests" / "test_full_model.yaml" @@ -2407,8 +2415,8 @@ def test_test_output(tmp_path: Path) -> None: # Case 3: Assert that concurrent execution is working properly for i in range(50): - copyfile(original_test_file, tmp_path / "tests" / f"test_success_{i}.yaml") - copyfile(new_test_file, tmp_path / "tests" / f"test_failure_{i}.yaml") + copy_test_file(original_test_file, tmp_path / "tests" / f"test_success_{i}.yaml", i) + copy_test_file(new_test_file, tmp_path / "tests" / f"test_failure_{i}.yaml", i) with capture_output() as captured_output: context.test() @@ -3327,3 +3335,96 @@ def execute(context: ExecutionContext, **kwargs: t.Any) -> pd.DataFrame: context=context, ) _check_successful_or_raise(test_default_vars.run()) + + +@use_terminal_console +def test_cte_failure(tmp_path: Path) -> None: + models_dir = tmp_path / "models" + models_dir.mkdir() + (models_dir / "foo.sql").write_text( + """ + MODEL ( + name test.foo, + kind full + ); + + with model_cte as ( + SELECT 1 AS id + ) + SELECT id FROM model_cte + """ + ) + + config = Config( + default_connection=DuckDBConnectionConfig(), + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + ) + context = Context(paths=tmp_path, config=config) + + expected_cte_failure_output = """Data mismatch (CTE "model_cte") +┏━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┓ +┃ Row ┃ id: Expected ┃ id: Actual ┃ +┡━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━┩ +│ 0 │ 2 │ 1 │ +└──────────┴─────────────────────────┴─────────────────────┘""" + + expected_query_failure_output = """Data mismatch +┏━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┓ +┃ Row ┃ id: Expected ┃ id: Actual ┃ +┡━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━┩ +│ 0 │ 2 │ 1 │ +└──────────┴─────────────────────────┴─────────────────────┘""" + + # Case 1: Ensure that a single CTE failure is reported correctly + tests_dir = tmp_path / "tests" + tests_dir.mkdir() + (tests_dir / "test_foo.yaml").write_text( + """ +test_foo: + model: test.foo + outputs: + ctes: + model_cte: + rows: + - id: 2 + query: + - id: 1 + """ + ) + + with capture_output() as captured_output: + context.test() + + output = captured_output.stdout + + assert expected_cte_failure_output in output + assert expected_query_failure_output not in output + + assert "Ran 1 tests" in output + assert "Failed tests (1)" in output + + # Case 2: Ensure that both CTE and query failures are reported correctly + (tests_dir / "test_foo.yaml").write_text( + """ +test_foo: + model: test.foo + outputs: + ctes: + model_cte: + rows: + - id: 2 + query: + - id: 2 + """ + ) + + with capture_output() as captured_output: + context.test() + + output = captured_output.stdout + + assert expected_cte_failure_output in output + assert expected_query_failure_output in output + + assert "Ran 1 tests" in output + assert "Failed tests (1)" in output From 681d3289c4375346614980ef159a7704f21665fb Mon Sep 17 00:00:00 2001 From: vaggelisd Date: Thu, 31 Jul 2025 16:51:31 +0300 Subject: [PATCH 2/2] Fix test --- tests/integrations/github/cicd/test_github_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integrations/github/cicd/test_github_commands.py b/tests/integrations/github/cicd/test_github_commands.py index 6d82755934..296fea5938 100644 --- a/tests/integrations/github/cicd/test_github_commands.py +++ b/tests/integrations/github/cicd/test_github_commands.py @@ -479,7 +479,7 @@ def test_run_all_test_failed( assert ( """sqlmesh.utils.errors.TestError: some error""" in test_checks_runs[2]["output"]["summary"] ) - assert """Failed tests (1):""" in test_checks_runs[2]["output"]["summary"] + assert """Failed tests""" in test_checks_runs[2]["output"]["summary"] assert "SQLMesh - Prod Plan Preview" in controller._check_run_mapping prod_plan_preview_checks_runs = controller._check_run_mapping[