Skip to content

Commit 5d8b68d

Browse files
committed
fix!: correctly identify keys to make deterministic
1 parent 9a954d7 commit 5d8b68d

File tree

4 files changed

+88
-6
lines changed

4 files changed

+88
-6
lines changed

sqlmesh/core/model/common.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,16 @@ def _add_variables_to_python_env(
153153

154154
variables = {k: v for k, v in (variables or {}).items() if k in used_variables}
155155
if variables:
156-
python_env[c.SQLMESH_VARS] = Executable.value(variables)
156+
python_env[c.SQLMESH_VARS] = Executable.value(variables, use_deterministic_repr=True)
157157

158158
if blueprint_variables:
159159
blueprint_variables = {
160160
k: SqlValue(sql=v.sql(dialect=dialect)) if isinstance(v, exp.Expression) else v
161161
for k, v in blueprint_variables.items()
162162
}
163-
python_env[c.SQLMESH_BLUEPRINT_VARS] = Executable.value(blueprint_variables)
163+
python_env[c.SQLMESH_BLUEPRINT_VARS] = Executable.value(
164+
blueprint_variables, use_deterministic_repr=True
165+
)
164166

165167
return python_env
166168

sqlmesh/migrations/v0085_deterministic_repr.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
from sqlmesh.utils.migration import index_text_type, blob_text_type
1313

1414

15+
KEYS_TO_MAKE_DETERMINISTIC = ["__sqlmesh__vars__", "__sqlmesh__blueprint__vars__"]
16+
17+
1518
# Make sure `SqlValue` is defined so it can be used by `eval` call in the migration
1619
@dataclass
1720
class SqlValue:
@@ -82,6 +85,8 @@ def migrate(state_sync, **kwargs): # type: ignore
8285

8386
if python_env:
8487
for key, executable in python_env.items():
88+
if key not in KEYS_TO_MAKE_DETERMINISTIC:
89+
continue
8590
if isinstance(executable, dict) and executable.get("kind") == "value":
8691
old_payload = executable["payload"]
8792
try:
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import json
2+
from sqlglot import exp
3+
4+
from sqlmesh.core.console import get_console
5+
6+
7+
KEYS_TO_MAKE_DETERMINISTIC = ["__sqlmesh__vars__", "__sqlmesh__blueprint__vars__"]
8+
9+
10+
def migrate(state_sync, **kwargs): # type: ignore
11+
engine_adapter = state_sync.engine_adapter
12+
schema = state_sync.schema
13+
snapshots_table = "_snapshots"
14+
versions_table = "_versions"
15+
if schema:
16+
snapshots_table = f"{schema}.{snapshots_table}"
17+
versions_table = f"{schema}.{versions_table}"
18+
19+
result = engine_adapter.fetchone(
20+
exp.select("schema_version").from_(versions_table), quote_identifiers=True
21+
)
22+
if not result:
23+
# This must be the first migration, so we can skip the check since the project was not exposed to 85 migration bug
24+
return
25+
schema_version = result[0]
26+
if schema_version < 85:
27+
# The project was not exposed to the bugged 85 migration, so we can skip it.
28+
return
29+
30+
warning = (
31+
"SQLMesh detected that it may not be able to fully migrate the state database. This should not impact "
32+
"the migration process, but may result in unexpected changes being reported by the next `sqlmesh plan` "
33+
"command. Please run `sqlmesh diff prod` after the migration has completed, before making any new "
34+
"changes. If any unexpected changes are reported, consider running a forward-only plan to apply these "
35+
"changes and avoid unnecessary backfills: sqlmesh plan prod --forward-only. "
36+
"See https://sqlmesh.readthedocs.io/en/stable/concepts/plans/#forward-only-plans for more details.\n"
37+
)
38+
39+
for (
40+
name,
41+
identifier,
42+
version,
43+
snapshot,
44+
kind_name,
45+
updated_ts,
46+
unpaused_ts,
47+
ttl_ms,
48+
unrestorable,
49+
) in engine_adapter.fetchall(
50+
exp.select(
51+
"name",
52+
"identifier",
53+
"version",
54+
"snapshot",
55+
"kind_name",
56+
"updated_ts",
57+
"unpaused_ts",
58+
"ttl_ms",
59+
"unrestorable",
60+
).from_(snapshots_table),
61+
quote_identifiers=True,
62+
):
63+
parsed_snapshot = json.loads(snapshot)
64+
python_env = parsed_snapshot["node"].get("python_env")
65+
66+
if python_env:
67+
for key, executable in python_env.items():
68+
if (
69+
key not in KEYS_TO_MAKE_DETERMINISTIC
70+
and isinstance(executable, dict)
71+
and executable.get("kind") == "value"
72+
):
73+
get_console().log_warning(warning)
74+
return

sqlmesh/utils/metaprogramming.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -424,10 +424,11 @@ def is_value(self) -> bool:
424424
return self.kind == ExecutableKind.VALUE
425425

426426
@classmethod
427-
def value(cls, v: t.Any, is_metadata: t.Optional[bool] = None) -> Executable:
428-
return Executable(
429-
payload=_deterministic_repr(v), kind=ExecutableKind.VALUE, is_metadata=is_metadata
430-
)
427+
def value(
428+
cls, v: t.Any, is_metadata: t.Optional[bool] = None, use_deterministic_repr: bool = False
429+
) -> Executable:
430+
payload = _deterministic_repr(v) if use_deterministic_repr else repr(v)
431+
return Executable(payload=payload, kind=ExecutableKind.VALUE, is_metadata=is_metadata)
431432

432433

433434
def serialize_env(env: t.Dict[str, t.Any], path: Path) -> t.Dict[str, Executable]:

0 commit comments

Comments
 (0)