Skip to content

Commit e011ac6

Browse files
Merge branch 'main' into feat/add-fabric-engine
2 parents a0ea870 + 87f5335 commit e011ac6

File tree

10 files changed

+194
-165
lines changed

10 files changed

+194
-165
lines changed

docs/guides/signals.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ This example specifies that the `random_signal()` should evaluate once with a th
6363
MODEL (
6464
name example.signal_model,
6565
kind FULL,
66-
signals [
66+
signals (
6767
random_signal(threshold := 0.5), # specify threshold value
68-
]
68+
)
6969
);
7070

7171
SELECT 1
@@ -108,9 +108,9 @@ MODEL (
108108
time_column ds,
109109
),
110110
start '2 week ago',
111-
signals [
111+
signals (
112112
one_week_ago(),
113-
]
113+
)
114114
);
115115

116116

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ dependencies = [
2323
"requests",
2424
"rich[jupyter]",
2525
"ruamel.yaml",
26-
"sqlglot[rs]~=27.0.0",
26+
"sqlglot[rs]~=27.1.0",
2727
"tenacity",
2828
"time-machine",
2929
"json-stream"

sqlmesh/core/model/common.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,16 @@ def _add_variables_to_python_env(
153153

154154
variables = {k: v for k, v in (variables or {}).items() if k in used_variables}
155155
if variables:
156-
python_env[c.SQLMESH_VARS] = Executable.value(variables)
156+
python_env[c.SQLMESH_VARS] = Executable.value(variables, sort_root_dict=True)
157157

158158
if blueprint_variables:
159159
blueprint_variables = {
160160
k: SqlValue(sql=v.sql(dialect=dialect)) if isinstance(v, exp.Expression) else v
161161
for k, v in blueprint_variables.items()
162162
}
163-
python_env[c.SQLMESH_BLUEPRINT_VARS] = Executable.value(blueprint_variables)
163+
python_env[c.SQLMESH_BLUEPRINT_VARS] = Executable.value(
164+
blueprint_variables, sort_root_dict=True
165+
)
164166

165167
return python_env
166168

sqlmesh/core/model/definition.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2361,7 +2361,7 @@ def create_python_model(
23612361

23622362
used_variables = {k: v for k, v in (variables or {}).items() if k in referenced_variables}
23632363
if used_variables:
2364-
python_env[c.SQLMESH_VARS] = Executable.value(used_variables)
2364+
python_env[c.SQLMESH_VARS] = Executable.value(used_variables, sort_root_dict=True)
23652365

23662366
return _create_model(
23672367
PythonModel,

sqlmesh/migrations/v0085_deterministic_repr.py

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""
55

66
import json
7+
import logging
78
import typing as t
89
from dataclasses import dataclass
910

@@ -12,6 +13,12 @@
1213
from sqlmesh.utils.migration import index_text_type, blob_text_type
1314

1415

16+
logger = logging.getLogger(__name__)
17+
18+
19+
KEYS_TO_MAKE_DETERMINISTIC = ["__sqlmesh__vars__", "__sqlmesh__blueprint__vars__"]
20+
21+
1522
# Make sure `SqlValue` is defined so it can be used by `eval` call in the migration
1623
@dataclass
1724
class SqlValue:
@@ -20,25 +27,13 @@ class SqlValue:
2027
sql: str
2128

2229

23-
def _deterministic_repr(obj: t.Any) -> str:
24-
"""
25-
This is a copy of the function from utils.metaprogramming
26-
"""
27-
28-
def _normalize_for_repr(o: t.Any) -> t.Any:
29-
if isinstance(o, dict):
30-
sorted_items = sorted(o.items(), key=lambda x: str(x[0]))
31-
return {k: _normalize_for_repr(v) for k, v in sorted_items}
32-
if isinstance(o, (list, tuple)):
33-
# Recursively normalize nested structures
34-
normalized = [_normalize_for_repr(item) for item in o]
35-
return type(o)(normalized)
36-
return o
37-
30+
def _dict_sort(obj: t.Any) -> str:
3831
try:
39-
return repr(_normalize_for_repr(obj))
32+
if isinstance(obj, dict):
33+
obj = dict(sorted(obj.items(), key=lambda x: str(x[0])))
4034
except Exception:
41-
return repr(obj)
35+
logger.warning("Failed to sort non-recursive dict", exc_info=True)
36+
return repr(obj)
4237

4338

4439
def migrate(state_sync, **kwargs): # type: ignore
@@ -82,20 +77,22 @@ def migrate(state_sync, **kwargs): # type: ignore
8277

8378
if python_env:
8479
for key, executable in python_env.items():
80+
if key not in KEYS_TO_MAKE_DETERMINISTIC:
81+
continue
8582
if isinstance(executable, dict) and executable.get("kind") == "value":
8683
old_payload = executable["payload"]
8784
try:
8885
# Try to parse the old payload and re-serialize it deterministically
8986
parsed_value = eval(old_payload)
90-
new_payload = _deterministic_repr(parsed_value)
87+
new_payload = _dict_sort(parsed_value)
9188

9289
# Only update if the representation changed
9390
if old_payload != new_payload:
9491
executable["payload"] = new_payload
9592
migration_needed = True
9693
except Exception:
9794
# If we still can't eval it, leave it as-is
98-
pass
95+
logger.warning("Exception trying to eval payload", exc_info=True)
9996

10097
new_snapshots.append(
10198
{
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import json
2+
import logging
3+
4+
from sqlglot import exp
5+
6+
from sqlmesh.core.console import get_console
7+
8+
9+
logger = logging.getLogger(__name__)
10+
KEYS_TO_MAKE_DETERMINISTIC = ["__sqlmesh__vars__", "__sqlmesh__blueprint__vars__"]
11+
12+
13+
def migrate(state_sync, **kwargs): # type: ignore
14+
engine_adapter = state_sync.engine_adapter
15+
schema = state_sync.schema
16+
snapshots_table = "_snapshots"
17+
versions_table = "_versions"
18+
if schema:
19+
snapshots_table = f"{schema}.{snapshots_table}"
20+
versions_table = f"{schema}.{versions_table}"
21+
22+
result = engine_adapter.fetchone(
23+
exp.select("schema_version").from_(versions_table), quote_identifiers=True
24+
)
25+
if not result:
26+
# This must be the first migration, so we can skip the check since the project was not exposed to 85 migration bug
27+
return
28+
schema_version = result[0]
29+
if schema_version < 85:
30+
# The project was not exposed to the bugged 85 migration, so we can skip it.
31+
return
32+
33+
warning = (
34+
"SQLMesh detected that it may not be able to fully migrate the state database. This should not impact "
35+
"the migration process, but may result in unexpected changes being reported by the next `sqlmesh plan` "
36+
"command. Please run `sqlmesh diff prod` after the migration has completed, before making any new "
37+
"changes. If any unexpected changes are reported, consider running a forward-only plan to apply these "
38+
"changes and avoid unnecessary backfills: sqlmesh plan prod --forward-only. "
39+
"See https://sqlmesh.readthedocs.io/en/stable/concepts/plans/#forward-only-plans for more details.\n"
40+
)
41+
42+
for (
43+
name,
44+
identifier,
45+
version,
46+
snapshot,
47+
kind_name,
48+
updated_ts,
49+
unpaused_ts,
50+
ttl_ms,
51+
unrestorable,
52+
) in engine_adapter.fetchall(
53+
exp.select(
54+
"name",
55+
"identifier",
56+
"version",
57+
"snapshot",
58+
"kind_name",
59+
"updated_ts",
60+
"unpaused_ts",
61+
"ttl_ms",
62+
"unrestorable",
63+
).from_(snapshots_table),
64+
quote_identifiers=True,
65+
):
66+
parsed_snapshot = json.loads(snapshot)
67+
python_env = parsed_snapshot["node"].get("python_env")
68+
69+
if python_env:
70+
for key, executable in python_env.items():
71+
if (
72+
key not in KEYS_TO_MAKE_DETERMINISTIC
73+
and isinstance(executable, dict)
74+
and executable.get("kind") == "value"
75+
):
76+
try:
77+
parsed_value = eval(executable["payload"])
78+
if isinstance(parsed_value, dict):
79+
get_console().log_warning(warning)
80+
return
81+
except Exception:
82+
logger.warning("Exception trying to eval payload", exc_info=True)

sqlmesh/utils/metaprogramming.py

Lines changed: 14 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import importlib
66
import inspect
77
import linecache
8+
import logging
89
import os
910
import re
1011
import sys
@@ -23,6 +24,9 @@
2324
from sqlmesh.utils.errors import SQLMeshError
2425
from sqlmesh.utils.pydantic import PydanticModel
2526

27+
logger = logging.getLogger(__name__)
28+
29+
2630
IGNORE_DECORATORS = {"macro", "model", "signal"}
2731
SERIALIZABLE_CALLABLES = (type, types.FunctionType)
2832
LITERALS = (Number, str, bytes, tuple, list, dict, set, bool)
@@ -424,10 +428,11 @@ def is_value(self) -> bool:
424428
return self.kind == ExecutableKind.VALUE
425429

426430
@classmethod
427-
def value(cls, v: t.Any, is_metadata: t.Optional[bool] = None) -> Executable:
428-
return Executable(
429-
payload=_deterministic_repr(v), kind=ExecutableKind.VALUE, is_metadata=is_metadata
430-
)
431+
def value(
432+
cls, v: t.Any, is_metadata: t.Optional[bool] = None, sort_root_dict: bool = False
433+
) -> Executable:
434+
payload = _dict_sort(v) if sort_root_dict else repr(v)
435+
return Executable(payload=payload, kind=ExecutableKind.VALUE, is_metadata=is_metadata)
431436

432437

433438
def serialize_env(env: t.Dict[str, t.Any], path: Path) -> t.Dict[str, Executable]:
@@ -635,36 +640,13 @@ def print_exception(
635640
out.write(tb)
636641

637642

638-
def _deterministic_repr(obj: t.Any) -> str:
639-
"""Create a deterministic representation by ensuring consistent ordering before repr().
640-
641-
For dictionaries, ensures consistent key ordering to prevent non-deterministic
642-
serialization that affects fingerprinting. Uses Python's native repr() logic
643-
for all formatting to handle edge cases properly.
644-
645-
Note that this function assumes list/tuple order is significant and therefore does not sort them.
646-
647-
Args:
648-
obj: The object to represent as a string.
649-
650-
Returns:
651-
A deterministic string representation of the object.
652-
"""
653-
654-
def _normalize_for_repr(o: t.Any) -> t.Any:
655-
if isinstance(o, dict):
656-
sorted_items = sorted(o.items(), key=lambda x: str(x[0]))
657-
return {k: _normalize_for_repr(v) for k, v in sorted_items}
658-
if isinstance(o, (list, tuple)):
659-
# Recursively normalize nested structures
660-
normalized = [_normalize_for_repr(item) for item in o]
661-
return type(o)(normalized)
662-
return o
663-
643+
def _dict_sort(obj: t.Any) -> str:
664644
try:
665-
return repr(_normalize_for_repr(obj))
645+
if isinstance(obj, dict):
646+
obj = dict(sorted(obj.items(), key=lambda x: str(x[0])))
666647
except Exception:
667-
return repr(obj)
648+
logger.warning("Failed to sort non-recursive dict", exc_info=True)
649+
return repr(obj)
668650

669651

670652
def import_python_file(path: Path, relative_base: Path = Path()) -> types.ModuleType:

tests/core/test_macros.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -876,12 +876,7 @@ def test_date_spine(assert_exp_eq, dialect, date_part):
876876

877877
# Generate the expected SQL based on the dialect and date_part
878878
if dialect == "duckdb":
879-
if date_part == "week":
880-
interval = "(7 * INTERVAL '1' DAY)"
881-
elif date_part == "quarter":
882-
interval = "(90 * INTERVAL '1' DAY)"
883-
else:
884-
interval = f"INTERVAL '1' {date_part.upper()}"
879+
interval = f"INTERVAL '1' {date_part.upper()}"
885880
expected_sql = f"""
886881
SELECT
887882
date_{date_part}

tests/core/test_model.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6116,7 +6116,8 @@ def test_named_variable_macros() -> None:
61166116
)
61176117

61186118
assert model.python_env[c.SQLMESH_VARS] == Executable.value(
6119-
{c.GATEWAY: "in_memory", "test_var_a": "test_value", "overridden_var": "initial_value"}
6119+
{c.GATEWAY: "in_memory", "test_var_a": "test_value", "overridden_var": "initial_value"},
6120+
sort_root_dict=True,
61206121
)
61216122
assert (
61226123
model.render_query_or_raise().sql()
@@ -6142,7 +6143,8 @@ def test_variables_in_templates() -> None:
61426143
)
61436144

61446145
assert model.python_env[c.SQLMESH_VARS] == Executable.value(
6145-
{c.GATEWAY: "in_memory", "test_var_a": "test_value", "overridden_var": "initial_value"}
6146+
{c.GATEWAY: "in_memory", "test_var_a": "test_value", "overridden_var": "initial_value"},
6147+
sort_root_dict=True,
61466148
)
61476149
assert (
61486150
model.render_query_or_raise().sql()
@@ -6166,7 +6168,8 @@ def test_variables_in_templates() -> None:
61666168
)
61676169

61686170
assert model.python_env[c.SQLMESH_VARS] == Executable.value(
6169-
{c.GATEWAY: "in_memory", "test_var_a": "test_value", "overridden_var": "initial_value"}
6171+
{c.GATEWAY: "in_memory", "test_var_a": "test_value", "overridden_var": "initial_value"},
6172+
sort_root_dict=True,
61706173
)
61716174
assert (
61726175
model.render_query_or_raise().sql()
@@ -6305,7 +6308,8 @@ def test_variables_migrated_dbt_package_macro():
63056308
dialect="bigquery",
63066309
)
63076310
assert model.python_env[c.SQLMESH_VARS] == Executable.value(
6308-
{"test_var_a": "test_var_a_value", "__dbt_packages__.test.test_var_b": "test_var_b_value"}
6311+
{"test_var_a": "test_var_a_value", "__dbt_packages__.test.test_var_b": "test_var_b_value"},
6312+
sort_root_dict=True,
63096313
)
63106314
assert (
63116315
model.render_query().sql(dialect="bigquery")
@@ -6530,7 +6534,8 @@ def test_unrendered_macros_sql_model(mocker: MockerFixture) -> None:
65306534
"physical_var": "bla",
65316535
"virtual_var": "blb",
65326536
"session_var": "blc",
6533-
}
6537+
},
6538+
sort_root_dict=True,
65346539
)
65356540

65366541
assert "location1" in model.physical_properties
@@ -6617,7 +6622,8 @@ def model_with_macros(evaluator, **kwargs):
66176622
"physical_var": "bla",
66186623
"virtual_var": "blb",
66196624
"session_var": "blc",
6620-
}
6625+
},
6626+
sort_root_dict=True,
66216627
)
66226628
assert python_sql_model.enabled
66236629

@@ -6709,7 +6715,7 @@ def model_with_named_variables(
67096715
)
67106716

67116717
assert python_model.python_env[c.SQLMESH_VARS] == Executable.value(
6712-
{"test_var_a": "test_value", "start": "2024-01-01"}
6718+
{"test_var_a": "test_value", "start": "2024-01-01"}, sort_root_dict=True
67136719
)
67146720

67156721
context = ExecutionContext(mocker.Mock(), {}, None, None)
@@ -10576,9 +10582,12 @@ def unimportant_testing_macro(evaluator, *projections):
1057610582
)
1057710583

1057810584
assert m.python_env.get(c.SQLMESH_VARS) == Executable.value(
10579-
{"selector": "bla", "bla_variable": 1, "baz_variable": 2}
10585+
{"selector": "bla", "bla_variable": 1, "baz_variable": 2},
10586+
sort_root_dict=True,
10587+
)
10588+
assert m.python_env.get(c.SQLMESH_BLUEPRINT_VARS) == Executable.value(
10589+
{"selector": "baz"}, sort_root_dict=True
1058010590
)
10581-
assert m.python_env.get(c.SQLMESH_BLUEPRINT_VARS) == Executable.value({"selector": "baz"})
1058210591

1058310592

1058410593
def test_extract_schema_in_post_statement(tmp_path: Path) -> None:

0 commit comments

Comments
 (0)