Skip to content

Commit 905192d

Browse files
authored
Fix: #1435 introduced invalid sql for querying the state sync (#1479)
1 parent 60ccd67 commit 905192d

File tree

1 file changed

+43
-54
lines changed

1 file changed

+43
-54
lines changed

sqlmesh/core/state_sync/engine_adapter.py

Lines changed: 43 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ def invalidate_environment(self, name: str) -> None:
215215
if name == c.PROD:
216216
raise SQLMeshError("Cannot invalidate the production environment.")
217217

218-
filter_expr = exp.to_column("name").eq(name)
218+
filter_expr = exp.column("name").eq(name)
219219

220220
self.engine_adapter.update_table(
221221
self.environments_table,
@@ -226,7 +226,7 @@ def invalidate_environment(self, name: str) -> None:
226226
def delete_expired_environments(self) -> t.List[Environment]:
227227
now_ts = now_timestamp()
228228
filter_expr = exp.LTE(
229-
this=exp.to_column("expiration_ts"),
229+
this=exp.column("expiration_ts"),
230230
expression=exp.Literal.number(now_ts),
231231
)
232232

@@ -249,19 +249,16 @@ def delete_expired_environments(self) -> t.List[Environment]:
249249

250250
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
251251
self.engine_adapter.delete_from(
252-
self.snapshots_table, where=self._snapshot_id_filter(snapshot_ids, self.snapshots_table)
252+
self.snapshots_table, where=self._snapshot_id_filter(snapshot_ids)
253253
)
254254

255255
def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]:
256256
return {
257257
SnapshotId(name=name, identifier=identifier)
258258
for name, identifier in self.engine_adapter.fetchall(
259-
exp.select(
260-
exp.to_column(f"{self.snapshots_table}.name"),
261-
exp.to_column(f"{self.snapshots_table}.identifier"),
262-
)
259+
exp.select("name", "identifier")
263260
.from_(self.snapshots_table)
264-
.where(self._snapshot_id_filter(snapshot_ids, self.snapshots_table)),
261+
.where(self._snapshot_id_filter(snapshot_ids)),
265262
quote_identifiers=True,
266263
)
267264
}
@@ -293,7 +290,7 @@ def _update_environment(self, environment: Environment) -> None:
293290
self.engine_adapter.delete_from(
294291
self.environments_table,
295292
where=exp.EQ(
296-
this=exp.to_column("name"),
293+
this=exp.column("name"),
297294
expression=exp.Literal.string(environment.name),
298295
),
299296
)
@@ -309,7 +306,7 @@ def _update_snapshot(self, snapshot: Snapshot) -> None:
309306
self.engine_adapter.update_table(
310307
self.snapshots_table,
311308
{"snapshot": snapshot.json()},
312-
where=self._snapshot_id_filter([snapshot.snapshot_id], self.snapshots_table),
309+
where=self._snapshot_id_filter([snapshot.snapshot_id]),
313310
contains_json=True,
314311
)
315312

@@ -362,24 +359,18 @@ def _get_snapshots(
362359
A dictionary of snapshot ids to snapshots for ones that could be found.
363360
"""
364361
query = (
365-
exp.select(exp.to_column(f"{self.snapshots_table}.snapshot"))
366-
.from_(self.snapshots_table)
367-
.where(
368-
self._snapshot_id_filter(snapshot_ids, self.snapshots_table)
369-
if snapshot_ids
370-
else None
371-
)
362+
exp.select(exp.column("snapshot", table="snapshots"))
363+
.from_(exp.to_table(self.snapshots_table).as_("snapshots"))
364+
.where(self._snapshot_id_filter(snapshot_ids, "snapshots") if snapshot_ids else None)
372365
)
373366
if hydrate_seeds:
374-
query = query.select(exp.to_column(f"{self.seeds_table}.content")).join(
375-
self.seeds_table,
367+
query = query.select(exp.column("content", table="seeds")).join(
368+
exp.to_table(self.seeds_table).as_("seeds"),
376369
on=exp.and_(
377-
*[
378-
exp.to_column(f"{self.snapshots_table}.{col}").eq(
379-
exp.to_column(f"{self.seeds_table}.{col}")
380-
)
381-
for col in ["name", "identifier"]
382-
]
370+
exp.column("name", table="snapshots").eq(exp.column("name", table="seeds")),
371+
exp.column("identifier", table="snapshots").eq(
372+
exp.column("identifier", table="seeds")
373+
),
383374
),
384375
join_type="left",
385376
)
@@ -436,9 +427,9 @@ def _get_snapshots_with_same_version(
436427
return []
437428

438429
query = (
439-
exp.select(exp.to_column(f"{self.snapshots_table}.snapshot"))
440-
.from_(self.snapshots_table)
441-
.where(self._snapshot_name_version_filter(snapshots, self.snapshots_table))
430+
exp.select("snapshot")
431+
.from_(exp.to_table(self.snapshots_table).as_("snapshots"))
432+
.where(self._snapshot_name_version_filter(snapshots))
442433
)
443434
if lock_for_update:
444435
query = query.lock(copy=False)
@@ -477,7 +468,7 @@ def _get_environment(
477468
row = self.engine_adapter.fetchone(
478469
self._environments_query(
479470
where=exp.EQ(
480-
this=exp.to_column("name"),
471+
this=exp.column("name"),
481472
expression=exp.Literal.string(environment),
482473
),
483474
lock_for_update=lock_for_update,
@@ -579,18 +570,18 @@ def _get_snapshot_intervals(
579570
query = (
580571
exp.select(
581572
"id",
582-
exp.to_column(f"{self.intervals_table}.name"),
583-
exp.to_column(f"{self.intervals_table}.identifier"),
573+
exp.column("name", table="intervals"),
574+
exp.column("identifier", table="intervals"),
584575
"version",
585576
"start_ts",
586577
"end_ts",
587578
"is_dev",
588579
"is_removed",
589580
)
590-
.from_(self.intervals_table)
581+
.from_(exp.to_table(self.intervals_table).as_("intervals"))
591582
.order_by(
592-
exp.to_column(f"{self.intervals_table}.name"),
593-
exp.to_column(f"{self.intervals_table}.identifier"),
583+
exp.column("name", table="intervals"),
584+
exp.column("identifier", table="intervals"),
594585
"created_ts",
595586
"is_removed",
596587
)
@@ -599,25 +590,23 @@ def _get_snapshot_intervals(
599590
if uncompacted_only:
600591
query.join(
601592
exp.select("name", "identifier")
602-
.from_(self.intervals_table)
593+
.from_(exp.to_table(self.intervals_table).as_("intervals"))
603594
.where(exp.column("is_compacted").not_())
604595
.distinct()
605596
.subquery(alias="uncompacted"),
606597
on=exp.and_(
607-
*[
608-
exp.to_column(f"{self.intervals_table}.{col}").eq(
609-
exp.column(col, table="uncompacted")
610-
)
611-
for col in ["name", "identifier"]
612-
]
598+
exp.column("name", table="intervals").eq(
599+
exp.column("name", table="uncompacted")
600+
),
601+
exp.column("identifier", table="intervals").eq(
602+
exp.column("identifier", table="uncompacted")
603+
),
613604
),
614605
copy=False,
615606
)
616607

617608
if snapshots:
618-
query.where(
619-
self._snapshot_name_version_filter(snapshots, self.intervals_table), copy=False
620-
)
609+
query.where(self._snapshot_name_version_filter(snapshots, "intervals"), copy=False)
621610
elif snapshots is not None:
622611
return (set(), [])
623612

@@ -880,7 +869,7 @@ def map_data_versions(
880869
self.unpause_snapshots(updated_prod_environment.snapshots, now_timestamp())
881870

882871
def _snapshot_id_filter(
883-
self, snapshot_ids: t.Iterable[SnapshotIdLike], fq_table_name: str
872+
self, snapshot_ids: t.Iterable[SnapshotIdLike], alias: t.Optional[str] = None
884873
) -> t.Union[exp.In, exp.Boolean, exp.Condition]:
885874
if not snapshot_ids:
886875
return exp.false()
@@ -889,24 +878,24 @@ def _snapshot_id_filter(
889878
exp.Tuple,
890879
exp.convert(
891880
(
892-
exp.to_column(f"{fq_table_name}.name"),
893-
exp.to_column(f"{fq_table_name}.identifier"),
881+
exp.column("name", table=alias),
882+
exp.column("identifier", table=alias),
894883
)
895884
),
896885
).isin(*[(snapshot_id.name, snapshot_id.identifier) for snapshot_id in snapshot_ids])
897886
else:
898887
return exp.or_(
899888
*[
900889
exp.and_(
901-
exp.to_column(f"{fq_table_name}.name").eq(snapshot_id.name),
902-
exp.to_column(f"{fq_table_name}.identifier").eq(snapshot_id.identifier),
890+
exp.column("name", table=alias).eq(snapshot_id.name),
891+
exp.column("identifier", table=alias).eq(snapshot_id.identifier),
903892
)
904893
for snapshot_id in snapshot_ids
905894
]
906895
)
907896

908897
def _snapshot_name_version_filter(
909-
self, snapshot_name_versions: t.Iterable[SnapshotNameVersionLike], fq_table_name: str
898+
self, snapshot_name_versions: t.Iterable[SnapshotNameVersionLike], alias: str = "snapshots"
910899
) -> t.Union[exp.In, exp.Boolean, exp.Condition]:
911900
if not snapshot_name_versions:
912901
return exp.false()
@@ -915,8 +904,8 @@ def _snapshot_name_version_filter(
915904
exp.Tuple,
916905
exp.convert(
917906
(
918-
exp.to_column(f"{fq_table_name}.name"),
919-
exp.to_column(f"{fq_table_name}.version"),
907+
exp.column("name", table=alias),
908+
exp.column("version", table=alias),
920909
)
921910
),
922911
).isin(
@@ -929,8 +918,8 @@ def _snapshot_name_version_filter(
929918
return exp.or_(
930919
*[
931920
exp.and_(
932-
exp.to_column(f"{fq_table_name}.name").eq(snapshot_name_version.name),
933-
exp.to_column(f"{fq_table_name}.version").eq(snapshot_name_version.version),
921+
exp.column("name", table=alias).eq(snapshot_name_version.name),
922+
exp.column("version", table=alias).eq(snapshot_name_version.version),
934923
)
935924
for snapshot_name_version in snapshot_name_versions
936925
]

0 commit comments

Comments
 (0)