Skip to content

Commit a355552

Browse files
committed
fix!: correctly identify keys to make deterministic
1 parent 9a954d7 commit a355552

File tree

4 files changed

+124
-6
lines changed

4 files changed

+124
-6
lines changed

sqlmesh/core/model/common.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,16 @@ def _add_variables_to_python_env(
153153

154154
variables = {k: v for k, v in (variables or {}).items() if k in used_variables}
155155
if variables:
156-
python_env[c.SQLMESH_VARS] = Executable.value(variables)
156+
python_env[c.SQLMESH_VARS] = Executable.value(variables, use_deterministic_repr=True)
157157

158158
if blueprint_variables:
159159
blueprint_variables = {
160160
k: SqlValue(sql=v.sql(dialect=dialect)) if isinstance(v, exp.Expression) else v
161161
for k, v in blueprint_variables.items()
162162
}
163-
python_env[c.SQLMESH_BLUEPRINT_VARS] = Executable.value(blueprint_variables)
163+
python_env[c.SQLMESH_BLUEPRINT_VARS] = Executable.value(
164+
blueprint_variables, use_deterministic_repr=True
165+
)
164166

165167
return python_env
166168

sqlmesh/migrations/v0085_deterministic_repr.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
from sqlmesh.utils.migration import index_text_type, blob_text_type
1313

1414

15+
KEYS_TO_MAKE_DETERMINISTIC = ["__sqlmesh__vars__", "__sqlmesh__blueprint__vars__"]
16+
17+
1518
# Make sure `SqlValue` is defined so it can be used by `eval` call in the migration
1619
@dataclass
1720
class SqlValue:
@@ -82,6 +85,8 @@ def migrate(state_sync, **kwargs): # type: ignore
8285

8386
if python_env:
8487
for key, executable in python_env.items():
88+
if key not in KEYS_TO_MAKE_DETERMINISTIC:
89+
continue
8590
if isinstance(executable, dict) and executable.get("kind") == "value":
8691
old_payload = executable["payload"]
8792
try:
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import json
2+
import typing as t
3+
from sqlglot import exp
4+
5+
from sqlmesh.core.console import get_console
6+
7+
8+
KEYS_TO_MAKE_DETERMINISTIC = ["__sqlmesh__vars__", "__sqlmesh__blueprint__vars__"]
9+
10+
11+
def would_sorting_be_applied(obj: t.Any) -> bool:
12+
"""
13+
Detects if sorting would be applied to an object based on the
14+
deterministic_repr logic.
15+
16+
Returns True if the object is a dictionary or contains a dictionary
17+
at any nesting level (in lists or tuples).
18+
19+
Args:
20+
obj: The object to check
21+
22+
Returns:
23+
bool: True if sorting would be applied, False otherwise
24+
"""
25+
26+
def _check_for_dict(o: t.Any) -> bool:
27+
if isinstance(o, dict):
28+
return True
29+
if isinstance(o, (list, tuple)):
30+
return any(_check_for_dict(item) for item in o)
31+
32+
return False
33+
34+
try:
35+
return _check_for_dict(obj)
36+
except Exception:
37+
# If any error occurs during checking, assume no sorting
38+
return False
39+
40+
41+
def migrate(state_sync, **kwargs): # type: ignore
42+
engine_adapter = state_sync.engine_adapter
43+
schema = state_sync.schema
44+
snapshots_table = "_snapshots"
45+
versions_table = "_versions"
46+
if schema:
47+
snapshots_table = f"{schema}.{snapshots_table}"
48+
versions_table = f"{schema}.{versions_table}"
49+
50+
result = engine_adapter.fetchone(
51+
exp.select("schema_version").from_(versions_table), quote_identifiers=True
52+
)
53+
if not result:
54+
# This must be the first migration, so we can skip the check since the project was not exposed to 85 migration bug
55+
return
56+
schema_version = result[0]
57+
if schema_version < 85:
58+
# The project was not exposed to the bugged 85 migration, so we can skip it.
59+
return
60+
61+
warning = (
62+
"SQLMesh detected that it may not be able to fully migrate the state database. This should not impact "
63+
"the migration process, but may result in unexpected changes being reported by the next `sqlmesh plan` "
64+
"command. Please run `sqlmesh diff prod` after the migration has completed, before making any new "
65+
"changes. If any unexpected changes are reported, consider running a forward-only plan to apply these "
66+
"changes and avoid unnecessary backfills: sqlmesh plan prod --forward-only. "
67+
"See https://sqlmesh.readthedocs.io/en/stable/concepts/plans/#forward-only-plans for more details.\n"
68+
)
69+
70+
for (
71+
name,
72+
identifier,
73+
version,
74+
snapshot,
75+
kind_name,
76+
updated_ts,
77+
unpaused_ts,
78+
ttl_ms,
79+
unrestorable,
80+
) in engine_adapter.fetchall(
81+
exp.select(
82+
"name",
83+
"identifier",
84+
"version",
85+
"snapshot",
86+
"kind_name",
87+
"updated_ts",
88+
"unpaused_ts",
89+
"ttl_ms",
90+
"unrestorable",
91+
).from_(snapshots_table),
92+
quote_identifiers=True,
93+
):
94+
parsed_snapshot = json.loads(snapshot)
95+
python_env = parsed_snapshot["node"].get("python_env")
96+
97+
if python_env:
98+
for key, executable in python_env.items():
99+
if (
100+
key not in KEYS_TO_MAKE_DETERMINISTIC
101+
and isinstance(executable, dict)
102+
and executable.get("kind") == "value"
103+
):
104+
try:
105+
parsed_value = eval(executable["payload"])
106+
if would_sorting_be_applied(parsed_value):
107+
get_console().log_warning(warning)
108+
return
109+
except Exception:
110+
pass

sqlmesh/utils/metaprogramming.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -424,10 +424,11 @@ def is_value(self) -> bool:
424424
return self.kind == ExecutableKind.VALUE
425425

426426
@classmethod
427-
def value(cls, v: t.Any, is_metadata: t.Optional[bool] = None) -> Executable:
428-
return Executable(
429-
payload=_deterministic_repr(v), kind=ExecutableKind.VALUE, is_metadata=is_metadata
430-
)
427+
def value(
428+
cls, v: t.Any, is_metadata: t.Optional[bool] = None, use_deterministic_repr: bool = False
429+
) -> Executable:
430+
payload = _deterministic_repr(v) if use_deterministic_repr else repr(v)
431+
return Executable(payload=payload, kind=ExecutableKind.VALUE, is_metadata=is_metadata)
431432

432433

433434
def serialize_env(env: t.Dict[str, t.Any], path: Path) -> t.Dict[str, Executable]:

0 commit comments

Comments
 (0)