diff --git a/sqlmesh/core/model/common.py b/sqlmesh/core/model/common.py index f35a08a28b..704f3e02fe 100644 --- a/sqlmesh/core/model/common.py +++ b/sqlmesh/core/model/common.py @@ -153,14 +153,16 @@ def _add_variables_to_python_env( variables = {k: v for k, v in (variables or {}).items() if k in used_variables} if variables: - python_env[c.SQLMESH_VARS] = Executable.value(variables) + python_env[c.SQLMESH_VARS] = Executable.value(variables, sort_root_dict=True) if blueprint_variables: blueprint_variables = { k: SqlValue(sql=v.sql(dialect=dialect)) if isinstance(v, exp.Expression) else v for k, v in blueprint_variables.items() } - python_env[c.SQLMESH_BLUEPRINT_VARS] = Executable.value(blueprint_variables) + python_env[c.SQLMESH_BLUEPRINT_VARS] = Executable.value( + blueprint_variables, sort_root_dict=True + ) return python_env diff --git a/sqlmesh/migrations/v0085_deterministic_repr.py b/sqlmesh/migrations/v0085_deterministic_repr.py index 9364926068..4c86969843 100644 --- a/sqlmesh/migrations/v0085_deterministic_repr.py +++ b/sqlmesh/migrations/v0085_deterministic_repr.py @@ -4,6 +4,7 @@ """ import json +import logging import typing as t from dataclasses import dataclass @@ -12,6 +13,12 @@ from sqlmesh.utils.migration import index_text_type, blob_text_type +logger = logging.getLogger(__name__) + + +KEYS_TO_MAKE_DETERMINISTIC = ["__sqlmesh__vars__", "__sqlmesh__blueprint__vars__"] + + # Make sure `SqlValue` is defined so it can be used by `eval` call in the migration @dataclass class SqlValue: @@ -20,25 +27,13 @@ class SqlValue: sql: str -def _deterministic_repr(obj: t.Any) -> str: - """ - This is a copy of the function from utils.metaprogramming - """ - - def _normalize_for_repr(o: t.Any) -> t.Any: - if isinstance(o, dict): - sorted_items = sorted(o.items(), key=lambda x: str(x[0])) - return {k: _normalize_for_repr(v) for k, v in sorted_items} - if isinstance(o, (list, tuple)): - # Recursively normalize nested structures - normalized = [_normalize_for_repr(item) for item in o] - return type(o)(normalized) - return o - +def _dict_sort(obj: t.Any) -> str: try: - return repr(_normalize_for_repr(obj)) + if isinstance(obj, dict): + obj = dict(sorted(obj.items(), key=lambda x: str(x[0]))) except Exception: - return repr(obj) + logger.warning("Failed to sort non-recursive dict", exc_info=True) + return repr(obj) def migrate(state_sync, **kwargs): # type: ignore @@ -82,12 +77,14 @@ def migrate(state_sync, **kwargs): # type: ignore if python_env: for key, executable in python_env.items(): + if key not in KEYS_TO_MAKE_DETERMINISTIC: + continue if isinstance(executable, dict) and executable.get("kind") == "value": old_payload = executable["payload"] try: # Try to parse the old payload and re-serialize it deterministically parsed_value = eval(old_payload) - new_payload = _deterministic_repr(parsed_value) + new_payload = _dict_sort(parsed_value) # Only update if the representation changed if old_payload != new_payload: @@ -95,7 +92,7 @@ def migrate(state_sync, **kwargs): # type: ignore migration_needed = True except Exception: # If we still can't eval it, leave it as-is - pass + logger.warning("Exception trying to eval payload", exc_info=True) new_snapshots.append( { diff --git a/sqlmesh/migrations/v0086_check_deterministic_bug.py b/sqlmesh/migrations/v0086_check_deterministic_bug.py new file mode 100644 index 0000000000..17527e81ce --- /dev/null +++ b/sqlmesh/migrations/v0086_check_deterministic_bug.py @@ -0,0 +1,82 @@ +import json +import logging + +from sqlglot import exp + +from sqlmesh.core.console import get_console + + +logger = logging.getLogger(__name__) +KEYS_TO_MAKE_DETERMINISTIC = ["__sqlmesh__vars__", "__sqlmesh__blueprint__vars__"] + + +def migrate(state_sync, **kwargs): # type: ignore + engine_adapter = state_sync.engine_adapter + schema = state_sync.schema + snapshots_table = "_snapshots" + versions_table = "_versions" + if schema: + snapshots_table = f"{schema}.{snapshots_table}" + versions_table = f"{schema}.{versions_table}" + + result = engine_adapter.fetchone( + exp.select("schema_version").from_(versions_table), quote_identifiers=True + ) + if not result: + # This must be the first migration, so we can skip the check since the project was not exposed to 85 migration bug + return + schema_version = result[0] + if schema_version < 85: + # The project was not exposed to the bugged 85 migration, so we can skip it. + return + + warning = ( + "SQLMesh detected that it may not be able to fully migrate the state database. This should not impact " + "the migration process, but may result in unexpected changes being reported by the next `sqlmesh plan` " + "command. Please run `sqlmesh diff prod` after the migration has completed, before making any new " + "changes. If any unexpected changes are reported, consider running a forward-only plan to apply these " + "changes and avoid unnecessary backfills: sqlmesh plan prod --forward-only. " + "See https://sqlmesh.readthedocs.io/en/stable/concepts/plans/#forward-only-plans for more details.\n" + ) + + for ( + name, + identifier, + version, + snapshot, + kind_name, + updated_ts, + unpaused_ts, + ttl_ms, + unrestorable, + ) in engine_adapter.fetchall( + exp.select( + "name", + "identifier", + "version", + "snapshot", + "kind_name", + "updated_ts", + "unpaused_ts", + "ttl_ms", + "unrestorable", + ).from_(snapshots_table), + quote_identifiers=True, + ): + parsed_snapshot = json.loads(snapshot) + python_env = parsed_snapshot["node"].get("python_env") + + if python_env: + for key, executable in python_env.items(): + if ( + key not in KEYS_TO_MAKE_DETERMINISTIC + and isinstance(executable, dict) + and executable.get("kind") == "value" + ): + try: + parsed_value = eval(executable["payload"]) + if isinstance(parsed_value, dict): + get_console().log_warning(warning) + return + except Exception: + logger.warning("Exception trying to eval payload", exc_info=True) diff --git a/sqlmesh/utils/metaprogramming.py b/sqlmesh/utils/metaprogramming.py index c4340c0321..9330532442 100644 --- a/sqlmesh/utils/metaprogramming.py +++ b/sqlmesh/utils/metaprogramming.py @@ -5,6 +5,7 @@ import importlib import inspect import linecache +import logging import os import re import sys @@ -23,6 +24,9 @@ from sqlmesh.utils.errors import SQLMeshError from sqlmesh.utils.pydantic import PydanticModel +logger = logging.getLogger(__name__) + + IGNORE_DECORATORS = {"macro", "model", "signal"} SERIALIZABLE_CALLABLES = (type, types.FunctionType) LITERALS = (Number, str, bytes, tuple, list, dict, set, bool) @@ -424,10 +428,11 @@ def is_value(self) -> bool: return self.kind == ExecutableKind.VALUE @classmethod - def value(cls, v: t.Any, is_metadata: t.Optional[bool] = None) -> Executable: - return Executable( - payload=_deterministic_repr(v), kind=ExecutableKind.VALUE, is_metadata=is_metadata - ) + def value( + cls, v: t.Any, is_metadata: t.Optional[bool] = None, sort_root_dict: bool = False + ) -> Executable: + payload = _dict_sort(v) if sort_root_dict else repr(v) + return Executable(payload=payload, kind=ExecutableKind.VALUE, is_metadata=is_metadata) def serialize_env(env: t.Dict[str, t.Any], path: Path) -> t.Dict[str, Executable]: @@ -635,36 +640,13 @@ def print_exception( out.write(tb) -def _deterministic_repr(obj: t.Any) -> str: - """Create a deterministic representation by ensuring consistent ordering before repr(). - - For dictionaries, ensures consistent key ordering to prevent non-deterministic - serialization that affects fingerprinting. Uses Python's native repr() logic - for all formatting to handle edge cases properly. - - Note that this function assumes list/tuple order is significant and therefore does not sort them. - - Args: - obj: The object to represent as a string. - - Returns: - A deterministic string representation of the object. - """ - - def _normalize_for_repr(o: t.Any) -> t.Any: - if isinstance(o, dict): - sorted_items = sorted(o.items(), key=lambda x: str(x[0])) - return {k: _normalize_for_repr(v) for k, v in sorted_items} - if isinstance(o, (list, tuple)): - # Recursively normalize nested structures - normalized = [_normalize_for_repr(item) for item in o] - return type(o)(normalized) - return o - +def _dict_sort(obj: t.Any) -> str: try: - return repr(_normalize_for_repr(obj)) + if isinstance(obj, dict): + obj = dict(sorted(obj.items(), key=lambda x: str(x[0]))) except Exception: - return repr(obj) + logger.warning("Failed to sort non-recursive dict", exc_info=True) + return repr(obj) def import_python_file(path: Path, relative_base: Path = Path()) -> types.ModuleType: diff --git a/tests/core/test_model.py b/tests/core/test_model.py index 909a79e143..ce58a0f00b 100644 --- a/tests/core/test_model.py +++ b/tests/core/test_model.py @@ -6116,7 +6116,8 @@ def test_named_variable_macros() -> None: ) assert model.python_env[c.SQLMESH_VARS] == Executable.value( - {c.GATEWAY: "in_memory", "test_var_a": "test_value", "overridden_var": "initial_value"} + {c.GATEWAY: "in_memory", "test_var_a": "test_value", "overridden_var": "initial_value"}, + sort_root_dict=True, ) assert ( model.render_query_or_raise().sql() @@ -6142,7 +6143,8 @@ def test_variables_in_templates() -> None: ) assert model.python_env[c.SQLMESH_VARS] == Executable.value( - {c.GATEWAY: "in_memory", "test_var_a": "test_value", "overridden_var": "initial_value"} + {c.GATEWAY: "in_memory", "test_var_a": "test_value", "overridden_var": "initial_value"}, + sort_root_dict=True, ) assert ( model.render_query_or_raise().sql() @@ -6166,7 +6168,8 @@ def test_variables_in_templates() -> None: ) assert model.python_env[c.SQLMESH_VARS] == Executable.value( - {c.GATEWAY: "in_memory", "test_var_a": "test_value", "overridden_var": "initial_value"} + {c.GATEWAY: "in_memory", "test_var_a": "test_value", "overridden_var": "initial_value"}, + sort_root_dict=True, ) assert ( model.render_query_or_raise().sql() @@ -6305,7 +6308,8 @@ def test_variables_migrated_dbt_package_macro(): dialect="bigquery", ) assert model.python_env[c.SQLMESH_VARS] == Executable.value( - {"test_var_a": "test_var_a_value", "__dbt_packages__.test.test_var_b": "test_var_b_value"} + {"test_var_a": "test_var_a_value", "__dbt_packages__.test.test_var_b": "test_var_b_value"}, + sort_root_dict=True, ) assert ( model.render_query().sql(dialect="bigquery") @@ -6530,7 +6534,8 @@ def test_unrendered_macros_sql_model(mocker: MockerFixture) -> None: "physical_var": "bla", "virtual_var": "blb", "session_var": "blc", - } + }, + sort_root_dict=True, ) assert "location1" in model.physical_properties @@ -6617,7 +6622,8 @@ def model_with_macros(evaluator, **kwargs): "physical_var": "bla", "virtual_var": "blb", "session_var": "blc", - } + }, + sort_root_dict=True, ) assert python_sql_model.enabled @@ -10576,9 +10582,12 @@ def unimportant_testing_macro(evaluator, *projections): ) assert m.python_env.get(c.SQLMESH_VARS) == Executable.value( - {"selector": "bla", "bla_variable": 1, "baz_variable": 2} + {"selector": "bla", "bla_variable": 1, "baz_variable": 2}, + sort_root_dict=True, + ) + assert m.python_env.get(c.SQLMESH_BLUEPRINT_VARS) == Executable.value( + {"selector": "baz"}, sort_root_dict=True ) - assert m.python_env.get(c.SQLMESH_BLUEPRINT_VARS) == Executable.value({"selector": "baz"}) def test_extract_schema_in_post_statement(tmp_path: Path) -> None: diff --git a/tests/utils/test_metaprogramming.py b/tests/utils/test_metaprogramming.py index cb8421fac8..8519e1eb04 100644 --- a/tests/utils/test_metaprogramming.py +++ b/tests/utils/test_metaprogramming.py @@ -22,7 +22,7 @@ from sqlmesh.utils.metaprogramming import ( Executable, ExecutableKind, - _deterministic_repr, + _dict_sort, build_env, func_globals, normalize_source, @@ -460,39 +460,39 @@ def test_serialize_env_with_enum_import_appearing_in_two_functions() -> None: assert serialized_env == expected_env -def test_deterministic_repr_basic_types(): - """Test _deterministic_repr with basic Python types.""" +def test_dict_sort_basic_types(): + """Test dict_sort with basic Python types.""" # Test basic types that should use standard repr - assert _deterministic_repr(42) == "42" - assert _deterministic_repr("hello") == "'hello'" - assert _deterministic_repr(True) == "True" - assert _deterministic_repr(None) == "None" - assert _deterministic_repr(3.14) == "3.14" + assert _dict_sort(42) == "42" + assert _dict_sort("hello") == "'hello'" + assert _dict_sort(True) == "True" + assert _dict_sort(None) == "None" + assert _dict_sort(3.14) == "3.14" -def test_deterministic_repr_dict_ordering(): - """Test that _deterministic_repr produces consistent output for dicts with different key ordering.""" +def test_dict_sort_dict_ordering(): + """Test that dict_sort produces consistent output for dicts with different key ordering.""" # Same dict with different key ordering dict1 = {"c": 3, "a": 1, "b": 2} dict2 = {"a": 1, "b": 2, "c": 3} dict3 = {"b": 2, "c": 3, "a": 1} - repr1 = _deterministic_repr(dict1) - repr2 = _deterministic_repr(dict2) - repr3 = _deterministic_repr(dict3) + repr1 = _dict_sort(dict1) + repr2 = _dict_sort(dict2) + repr3 = _dict_sort(dict3) # All should produce the same representation assert repr1 == repr2 == repr3 assert repr1 == "{'a': 1, 'b': 2, 'c': 3}" -def test_deterministic_repr_mixed_key_types(): - """Test _deterministic_repr with mixed key types (strings and numbers).""" +def test_dict_sort_mixed_key_types(): + """Test dict_sort with mixed key types (strings and numbers).""" dict1 = {42: "number", "string": "text", 1: "one"} dict2 = {"string": "text", 1: "one", 42: "number"} - repr1 = _deterministic_repr(dict1) - repr2 = _deterministic_repr(dict2) + repr1 = _dict_sort(dict1) + repr2 = _dict_sort(dict2) # Should produce consistent ordering despite mixed key types assert repr1 == repr2 @@ -500,45 +500,47 @@ def test_deterministic_repr_mixed_key_types(): assert repr1 == "{1: 'one', 42: 'number', 'string': 'text'}" -def test_deterministic_repr_nested_structures(): - """Test _deterministic_repr with deeply nested dictionaries.""" +def test_dict_sort_nested_structures(): + """Test dict_sort with deeply nested dictionaries.""" nested1 = {"outer": {"z": 26, "a": 1}, "list": [3, {"y": 2, "x": 1}], "simple": "value"} nested2 = {"simple": "value", "list": [3, {"x": 1, "y": 2}], "outer": {"a": 1, "z": 26}} - repr1 = _deterministic_repr(nested1) - repr2 = _deterministic_repr(nested2) + repr1 = _dict_sort(nested1) + repr2 = _dict_sort(nested2) - assert repr1 == repr2 + assert repr1 != repr2 # Verify structure is maintained with sorted keys - expected = "{'list': [3, {'x': 1, 'y': 2}], 'outer': {'a': 1, 'z': 26}, 'simple': 'value'}" - assert repr1 == expected - - -def test_deterministic_repr_lists_and_tuples(): - """Test _deterministic_repr preserves order for lists/tuples but sorts nested dicts.""" - # Lists should maintain their order - list_with_dicts = [{"b": 2, "a": 1}, {"d": 4, "c": 3}] - list_repr = _deterministic_repr(list_with_dicts) - expected_list = "[{'a': 1, 'b': 2}, {'c': 3, 'd': 4}]" + expected1 = "{'list': [3, {'y': 2, 'x': 1}], 'outer': {'z': 26, 'a': 1}, 'simple': 'value'}" + expected2 = "{'list': [3, {'x': 1, 'y': 2}], 'outer': {'a': 1, 'z': 26}, 'simple': 'value'}" + assert repr1 == expected1 + assert repr2 == expected2 + + +def test_dict_sort_lists_and_tuples(): + """Test dict_sort preserves order for lists/tuples and doesn't sort nested dicts.""" + # Lists should be unchanged + list_with_dicts = [{"z": 26, "a": 1}, {"y": 25, "b": 2}] + list_repr = _dict_sort(list_with_dicts) + expected_list = "[{'z': 26, 'a': 1}, {'y': 25, 'b': 2}]" assert list_repr == expected_list - # Tuples should maintain their order + # Tuples should be unchanged tuple_with_dicts = ({"z": 26, "a": 1}, {"y": 25, "b": 2}) - tuple_repr = _deterministic_repr(tuple_with_dicts) - expected_tuple = "({'a': 1, 'z': 26}, {'b': 2, 'y': 25})" + tuple_repr = _dict_sort(tuple_with_dicts) + expected_tuple = "({'z': 26, 'a': 1}, {'y': 25, 'b': 2})" assert tuple_repr == expected_tuple -def test_deterministic_repr_empty_containers(): - """Test _deterministic_repr with empty containers.""" - assert _deterministic_repr({}) == "{}" - assert _deterministic_repr([]) == "[]" - assert _deterministic_repr(()) == "()" +def test_dict_sort_empty_containers(): + """Test dict_sort with empty containers.""" + assert _dict_sort({}) == "{}" + assert _dict_sort([]) == "[]" + assert _dict_sort(()) == "()" -def test_deterministic_repr_special_characters(): - """Test _deterministic_repr handles special characters correctly.""" +def test_dict_sort_special_characters(): + """Test dict_sort handles special characters correctly.""" special_dict = { "quotes": "text with 'single' and \"double\" quotes", "unicode": "unicode: ñáéíóú", @@ -546,25 +548,25 @@ def test_deterministic_repr_special_characters(): "backslashes": "path\\to\\file", } - result = _deterministic_repr(special_dict) + result = _dict_sort(special_dict) # Should be valid Python that can be evaluated reconstructed = eval(result) assert reconstructed == special_dict # Should be deterministic - same input produces same output - result2 = _deterministic_repr(special_dict) + result2 = _dict_sort(special_dict) assert result == result2 -def test_deterministic_repr_executable_integration(): - """Test that _deterministic_repr works correctly with Executable.value().""" +def test_dict_sort_executable_integration(): + """Test that dict_sort works correctly with Executable.value().""" # Test the integration with Executable.value which is the main use case variables1 = {"env": "dev", "debug": True, "timeout": 30} variables2 = {"timeout": 30, "debug": True, "env": "dev"} - exec1 = Executable.value(variables1) - exec2 = Executable.value(variables2) + exec1 = Executable.value(variables1, sort_root_dict=True) + exec2 = Executable.value(variables2, sort_root_dict=True) # Should produce identical payloads despite different input ordering assert exec1.payload == exec2.payload @@ -574,46 +576,6 @@ def test_deterministic_repr_executable_integration(): reconstructed = eval(exec1.payload) assert reconstructed == variables1 - -def test_deterministic_repr_complex_example(): - """Test _deterministic_repr with a complex real-world-like structure.""" - complex_vars = { - "database_config": { - "host": "localhost", - "port": 5432, - "credentials": {"username": "admin", "password": "secret"}, - }, - "feature_flags": ["flag_b", "flag_a"], - "metadata": { - "version": "1.0.0", - "environment": "production", - "tags": {"team": "data", "project": "analytics"}, - }, - 42: "numeric_key", - "arrays": [{"config": {"nested": True, "level": 2}}, {"simple": "value"}], - } - - expected_structure = { - 42: "numeric_key", - "arrays": [{"config": {"level": 2, "nested": True}}, {"simple": "value"}], - "database_config": { - "credentials": {"password": "secret", "username": "admin"}, - "host": "localhost", - "port": 5432, - }, - "feature_flags": ["flag_b", "flag_a"], - "metadata": { - "environment": "production", - "tags": {"project": "analytics", "team": "data"}, - "version": "1.0.0", - }, - } - - actual_repr = _deterministic_repr(complex_vars) - expected_repr = repr(expected_structure) - assert actual_repr == expected_repr - - # Should be valid Python - reconstructed = eval(actual_repr) - assert isinstance(reconstructed, dict) - assert reconstructed == complex_vars + # non-deterministic repr should not change the payload + exec3 = Executable.value(variables1) + assert exec3.payload == "{'env': 'dev', 'debug': True, 'timeout': 30}"