Skip to content

Commit 0440ada

Browse files
authored
chore: grant mixin and normalize (#5447)
1 parent 40f8011 commit 0440ada

File tree

14 files changed

+265
-397
lines changed

14 files changed

+265
-397
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies = [
2424
"requests",
2525
"rich[jupyter]",
2626
"ruamel.yaml",
27-
"sqlglot[rs]~=27.17.0",
27+
"sqlglot[rs]~=27.19.0",
2828
"tenacity",
2929
"time-machine",
3030
"json-stream"

sqlmesh/core/engine_adapter/base_postgres.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ def _get_data_objects(
191191
for row in df.itertuples()
192192
]
193193

194-
def get_current_schema(self) -> str:
194+
def _get_current_schema(self) -> str:
195195
"""Returns the current default schema for the connection."""
196196
result = self.fetchone(exp.select(self.CURRENT_SCHEMA_EXPRESSION))
197197
if result and result[0]:

sqlmesh/core/engine_adapter/databricks.py

Lines changed: 16 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from sqlglot import exp
88

99
from sqlmesh.core.dialect import to_schema
10+
from sqlmesh.core.engine_adapter.mixins import GrantsFromInfoSchemaMixin
1011
from sqlmesh.core.engine_adapter.shared import (
1112
CatalogSupport,
1213
DataObject,
@@ -24,18 +25,19 @@
2425
import pandas as pd
2526

2627
from sqlmesh.core._typing import SchemaName, TableName, SessionProperties
27-
from sqlmesh.core.engine_adapter._typing import DF, PySparkSession, Query, GrantsConfig, DCL
28+
from sqlmesh.core.engine_adapter._typing import DF, PySparkSession, Query
2829

2930
logger = logging.getLogger(__name__)
3031

3132

32-
class DatabricksEngineAdapter(SparkEngineAdapter):
33+
class DatabricksEngineAdapter(SparkEngineAdapter, GrantsFromInfoSchemaMixin):
3334
DIALECT = "databricks"
3435
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.REPLACE_WHERE
3536
SUPPORTS_CLONING = True
3637
SUPPORTS_MATERIALIZED_VIEWS = True
3738
SUPPORTS_MATERIALIZED_VIEW_SCHEMA = True
3839
SUPPORTS_GRANTS = True
40+
USE_CATALOG_IN_GRANTS = True
3941
SCHEMA_DIFFER_KWARGS = {
4042
"support_positional_add": True,
4143
"nested_support": NestedSupport.ALL,
@@ -159,100 +161,19 @@ def _grant_object_kind(table_type: DataObjectType) -> str:
159161
return "MATERIALIZED VIEW"
160162
return "TABLE"
161163

162-
def _dcl_grants_config_expr(
163-
self,
164-
dcl_cmd: t.Type[DCL],
165-
table: exp.Table,
166-
grant_config: GrantsConfig,
167-
table_type: DataObjectType = DataObjectType.TABLE,
168-
) -> t.List[exp.Expression]:
169-
expressions: t.List[exp.Expression] = []
170-
if not grant_config:
171-
return expressions
172-
173-
object_kind = self._grant_object_kind(table_type)
174-
for privilege, principals in grant_config.items():
175-
for principal in principals:
176-
args: t.Dict[str, t.Any] = {
177-
"privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))],
178-
"securable": table.copy(),
179-
"principals": [exp.to_identifier(principal.lower())],
180-
}
181-
182-
if object_kind:
183-
args["kind"] = exp.Var(this=object_kind)
184-
185-
expressions.append(dcl_cmd(**args)) # type: ignore[arg-type]
186-
187-
return expressions
188-
189-
def _apply_grants_config_expr(
190-
self,
191-
table: exp.Table,
192-
grant_config: GrantsConfig,
193-
table_type: DataObjectType = DataObjectType.TABLE,
194-
) -> t.List[exp.Expression]:
195-
return self._dcl_grants_config_expr(exp.Grant, table, grant_config, table_type)
196-
197-
def _revoke_grants_config_expr(
198-
self,
199-
table: exp.Table,
200-
grant_config: GrantsConfig,
201-
table_type: DataObjectType = DataObjectType.TABLE,
202-
) -> t.List[exp.Expression]:
203-
return self._dcl_grants_config_expr(exp.Revoke, table, grant_config, table_type)
204-
205-
def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig:
206-
if schema_identifier := table.args.get("db"):
207-
schema_name = schema_identifier.this
208-
else:
209-
schema_name = self.get_current_database()
210-
if catalog_identifier := table.args.get("catalog"):
211-
catalog_name = catalog_identifier.this
212-
else:
213-
catalog_name = self.get_current_catalog()
214-
table_name = table.args.get("this").this # type: ignore
215-
216-
grant_expr = (
217-
exp.select("privilege_type", "grantee")
218-
.from_(
219-
exp.table_(
220-
"table_privileges",
221-
db="information_schema",
222-
catalog=catalog_name,
223-
)
224-
)
225-
.where(
226-
exp.and_(
227-
exp.column("table_catalog").eq(exp.Literal.string(catalog_name.lower())),
228-
exp.column("table_schema").eq(exp.Literal.string(schema_name.lower())),
229-
exp.column("table_name").eq(exp.Literal.string(table_name.lower())),
230-
exp.column("grantor").eq(exp.func("current_user")),
231-
exp.column("grantee").neq(exp.func("current_user")),
232-
# We only care about explicitly granted privileges and not inherited ones
233-
# if this is removed you would see grants inherited from the catalog get returned
234-
exp.column("inherited_from").eq(exp.Literal.string("NONE")),
235-
)
236-
)
164+
def _get_grant_expression(self, table: exp.Table) -> exp.Expression:
165+
# We only care about explicitly granted privileges and not inherited ones
166+
# if this is removed you would see grants inherited from the catalog get returned
167+
expression = super()._get_grant_expression(table)
168+
expression.args["where"].set(
169+
"this",
170+
exp.and_(
171+
expression.args["where"].this,
172+
exp.column("inherited_from").eq(exp.Literal.string("NONE")),
173+
wrap=False,
174+
),
237175
)
238-
239-
results = self.fetchall(grant_expr)
240-
241-
grants_dict: GrantsConfig = {}
242-
for privilege_raw, grantee_raw in results:
243-
if privilege_raw is None or grantee_raw is None:
244-
continue
245-
246-
privilege = str(privilege_raw)
247-
grantee = str(grantee_raw)
248-
if not privilege or not grantee:
249-
continue
250-
251-
grantees = grants_dict.setdefault(privilege, [])
252-
if grantee not in grantees:
253-
grantees.append(grantee)
254-
255-
return grants_dict
176+
return expression
256177

257178
def _begin_session(self, properties: SessionProperties) -> t.Any:
258179
"""Begin a new session."""

sqlmesh/core/engine_adapter/mixins.py

Lines changed: 142 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,23 @@
77

88
from sqlglot import exp, parse_one
99
from sqlglot.helper import seq_get
10+
from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
1011

1112
from sqlmesh.core.engine_adapter.base import EngineAdapter
13+
from sqlmesh.core.engine_adapter.shared import DataObjectType
1214
from sqlmesh.core.node import IntervalUnit
1315
from sqlmesh.core.dialect import schema_
1416
from sqlmesh.core.schema_diff import TableAlterOperation
1517
from sqlmesh.utils.errors import SQLMeshError
1618

1719
if t.TYPE_CHECKING:
1820
from sqlmesh.core._typing import TableName
19-
from sqlmesh.core.engine_adapter._typing import DF
21+
from sqlmesh.core.engine_adapter._typing import (
22+
DCL,
23+
DF,
24+
GrantsConfig,
25+
QueryOrDF,
26+
)
2027
from sqlmesh.core.engine_adapter.base import QueryOrDF
2128

2229
logger = logging.getLogger(__name__)
@@ -548,3 +555,137 @@ def _normalize_decimal_value(self, expr: exp.Expression, precision: int) -> exp.
548555

549556
def _normalize_boolean_value(self, expr: exp.Expression) -> exp.Expression:
550557
return exp.cast(expr, "INT")
558+
559+
560+
class GrantsFromInfoSchemaMixin(EngineAdapter):
561+
CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expression = exp.func("current_user")
562+
SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = False
563+
USE_CATALOG_IN_GRANTS = False
564+
GRANT_INFORMATION_SCHEMA_TABLE_NAME = "table_privileges"
565+
566+
@staticmethod
567+
@abc.abstractmethod
568+
def _grant_object_kind(table_type: DataObjectType) -> t.Optional[str]:
569+
pass
570+
571+
@abc.abstractmethod
572+
def _get_current_schema(self) -> str:
573+
pass
574+
575+
def _dcl_grants_config_expr(
576+
self,
577+
dcl_cmd: t.Type[DCL],
578+
table: exp.Table,
579+
grant_config: GrantsConfig,
580+
table_type: DataObjectType = DataObjectType.TABLE,
581+
) -> t.List[exp.Expression]:
582+
expressions: t.List[exp.Expression] = []
583+
if not grant_config:
584+
return expressions
585+
586+
object_kind = self._grant_object_kind(table_type)
587+
for privilege, principals in grant_config.items():
588+
args: t.Dict[str, t.Any] = {
589+
"privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))],
590+
"securable": table.copy(),
591+
}
592+
if object_kind:
593+
args["kind"] = exp.Var(this=object_kind)
594+
if self.SUPPORTS_MULTIPLE_GRANT_PRINCIPALS:
595+
args["principals"] = [
596+
normalize_identifiers(
597+
parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect),
598+
dialect=self.dialect,
599+
)
600+
for principal in principals
601+
]
602+
expressions.append(dcl_cmd(**args)) # type: ignore[arg-type]
603+
else:
604+
for principal in principals:
605+
args["principals"] = [
606+
normalize_identifiers(
607+
parse_one(principal, into=exp.GrantPrincipal, dialect=self.dialect),
608+
dialect=self.dialect,
609+
)
610+
]
611+
expressions.append(dcl_cmd(**args)) # type: ignore[arg-type]
612+
613+
return expressions
614+
615+
def _apply_grants_config_expr(
616+
self,
617+
table: exp.Table,
618+
grant_config: GrantsConfig,
619+
table_type: DataObjectType = DataObjectType.TABLE,
620+
) -> t.List[exp.Expression]:
621+
return self._dcl_grants_config_expr(exp.Grant, table, grant_config, table_type)
622+
623+
def _revoke_grants_config_expr(
624+
self,
625+
table: exp.Table,
626+
grant_config: GrantsConfig,
627+
table_type: DataObjectType = DataObjectType.TABLE,
628+
) -> t.List[exp.Expression]:
629+
return self._dcl_grants_config_expr(exp.Revoke, table, grant_config, table_type)
630+
631+
def _get_grant_expression(self, table: exp.Table) -> exp.Expression:
632+
schema_identifier = table.args.get("db") or normalize_identifiers(
633+
exp.to_identifier(self._get_current_schema(), quoted=True), dialect=self.dialect
634+
)
635+
schema_name = schema_identifier.this
636+
table_name = table.args.get("this").this # type: ignore
637+
638+
grant_conditions = [
639+
exp.column("table_schema").eq(exp.Literal.string(schema_name)),
640+
exp.column("table_name").eq(exp.Literal.string(table_name)),
641+
exp.column("grantor").eq(self.CURRENT_USER_OR_ROLE_EXPRESSION),
642+
exp.column("grantee").neq(self.CURRENT_USER_OR_ROLE_EXPRESSION),
643+
]
644+
645+
info_schema_table = normalize_identifiers(
646+
exp.table_(self.GRANT_INFORMATION_SCHEMA_TABLE_NAME, db="information_schema"),
647+
dialect=self.dialect,
648+
)
649+
if self.USE_CATALOG_IN_GRANTS:
650+
catalog_identifier = table.args.get("catalog")
651+
if not catalog_identifier:
652+
catalog_name = self.get_current_catalog()
653+
if not catalog_name:
654+
raise SQLMeshError(
655+
"Current catalog could not be determined for fetching grants. This is unexpected."
656+
)
657+
catalog_identifier = normalize_identifiers(
658+
exp.to_identifier(catalog_name, quoted=True), dialect=self.dialect
659+
)
660+
catalog_name = catalog_identifier.this
661+
info_schema_table.set("catalog", catalog_identifier.copy())
662+
grant_conditions.insert(
663+
0, exp.column("table_catalog").eq(exp.Literal.string(catalog_name))
664+
)
665+
666+
return (
667+
exp.select("privilege_type", "grantee")
668+
.from_(info_schema_table)
669+
.where(exp.and_(*grant_conditions))
670+
)
671+
672+
def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig:
673+
grant_expr = self._get_grant_expression(table)
674+
675+
results = self.fetchall(grant_expr)
676+
677+
grants_dict: GrantsConfig = {}
678+
for privilege_raw, grantee_raw in results:
679+
if privilege_raw is None or grantee_raw is None:
680+
continue
681+
682+
privilege = str(privilege_raw)
683+
grantee = str(grantee_raw)
684+
if not privilege or not grantee:
685+
continue
686+
687+
grantees = grants_dict.setdefault(privilege, [])
688+
if grantee not in grantees:
689+
grantees.append(grantee)
690+
691+
return grants_dict

0 commit comments

Comments
 (0)