Skip to content

Commit 1f70c98

Browse files
authored
feat: redshift grant support (#5440)
1 parent ca9588e commit 1f70c98

File tree

3 files changed

+249
-3
lines changed

3 files changed

+249
-3
lines changed

sqlmesh/core/engine_adapter/redshift.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import pandas as pd
2929

3030
from sqlmesh.core._typing import SchemaName, TableName
31+
from sqlmesh.core.engine_adapter._typing import DCL, GrantsConfig
3132
from sqlmesh.core.engine_adapter.base import QueryOrDF, Query
3233

3334
logger = logging.getLogger(__name__)
@@ -46,6 +47,7 @@ class RedshiftEngineAdapter(
4647
# Redshift doesn't support comments for VIEWs WITH NO SCHEMA BINDING (which we always use)
4748
COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED
4849
SUPPORTS_REPLACE_TABLE = False
50+
SUPPORTS_GRANTS = True
4951

5052
SCHEMA_DIFFER_KWARGS = {
5153
"parameterized_type_defaults": {
@@ -163,6 +165,96 @@ def _fetch_native_df(
163165
result = [tuple(row) for row in fetcheddata]
164166
return pd.DataFrame(result, columns=columns)
165167

168+
@staticmethod
169+
def _grant_object_kind(table_type: DataObjectType) -> str:
170+
if table_type == DataObjectType.VIEW:
171+
return "VIEW"
172+
if table_type == DataObjectType.MATERIALIZED_VIEW:
173+
return "MATERIALIZED VIEW"
174+
return "TABLE"
175+
176+
def _dcl_grants_config_expr(
177+
self,
178+
dcl_cmd: t.Type[DCL],
179+
table: exp.Table,
180+
grant_config: GrantsConfig,
181+
table_type: DataObjectType = DataObjectType.TABLE,
182+
) -> t.List[exp.Expression]:
183+
expressions: t.List[exp.Expression] = []
184+
if not grant_config:
185+
return expressions
186+
187+
object_kind = self._grant_object_kind(table_type)
188+
for privilege, principals in grant_config.items():
189+
if not principals:
190+
continue
191+
192+
args: t.Dict[str, t.Any] = {
193+
"privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))],
194+
"securable": table.copy(),
195+
"principals": principals,
196+
}
197+
198+
if object_kind:
199+
args["kind"] = exp.Var(this=object_kind)
200+
201+
expressions.append(dcl_cmd(**args)) # type: ignore[arg-type]
202+
203+
return expressions
204+
205+
def _apply_grants_config_expr(
206+
self,
207+
table: exp.Table,
208+
grant_config: GrantsConfig,
209+
table_type: DataObjectType = DataObjectType.TABLE,
210+
) -> t.List[exp.Expression]:
211+
return self._dcl_grants_config_expr(exp.Grant, table, grant_config, table_type)
212+
213+
def _revoke_grants_config_expr(
214+
self,
215+
table: exp.Table,
216+
grant_config: GrantsConfig,
217+
table_type: DataObjectType = DataObjectType.TABLE,
218+
) -> t.List[exp.Expression]:
219+
return self._dcl_grants_config_expr(exp.Revoke, table, grant_config, table_type)
220+
221+
def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig:
222+
"""Returns current grants for a Redshift table as a dictionary."""
223+
table_schema = table.db or self.get_current_schema()
224+
table_name = table.name
225+
current_user = exp.func("current_user")
226+
227+
grant_expr = (
228+
exp.select("privilege_type", "grantee")
229+
.from_(exp.table_("table_privileges", db="information_schema"))
230+
.where(
231+
exp.and_(
232+
exp.column("table_schema").eq(exp.Literal.string(table_schema)),
233+
exp.column("table_name").eq(exp.Literal.string(table_name)),
234+
exp.column("grantor").eq(current_user),
235+
exp.column("grantee").neq(current_user),
236+
)
237+
)
238+
)
239+
240+
results = self.fetchall(grant_expr)
241+
242+
grants_dict: GrantsConfig = {}
243+
for privilege_raw, grantee_raw in results:
244+
if privilege_raw is None or grantee_raw is None:
245+
continue
246+
247+
privilege = str(privilege_raw)
248+
grantee = str(grantee_raw)
249+
if not privilege or not grantee:
250+
continue
251+
252+
grants_dict.setdefault(privilege, [])
253+
if grantee not in grants_dict[privilege]:
254+
grants_dict[privilege].append(grantee)
255+
256+
return grants_dict
257+
166258
def _create_table_from_source_queries(
167259
self,
168260
table_name: TableName,

tests/core/engine_adapter/integration/__init__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -750,7 +750,7 @@ def _get_create_user_or_role(
750750
self, username: str, password: t.Optional[str] = None
751751
) -> t.Tuple[str, t.Optional[str]]:
752752
password = password or random_id()
753-
if self.dialect == "postgres":
753+
if self.dialect in ["postgres", "redshift"]:
754754
return username, f"CREATE USER \"{username}\" WITH PASSWORD '{password}'"
755755
if self.dialect == "snowflake":
756756
return username, f"CREATE ROLE {username}"
@@ -777,6 +777,10 @@ def create_users_or_roles(self, *role_names: str) -> t.Iterator[t.Dict[str, str]
777777
self.add_test_suffix(f"test_{role_name}"), dialect=self.dialect
778778
).sql(dialect=self.dialect)
779779
password = random_id()
780+
if self.dialect == "redshift":
781+
password += (
782+
"A" # redshift requires passwords to have at least one uppercase letter
783+
)
780784
user_name = self._create_user_or_role(user_name, password)
781785
created_users.append(user_name)
782786
roles[role_name] = user_name
@@ -802,7 +806,7 @@ def get_update_privilege(self) -> str:
802806
def _cleanup_user_or_role(self, user_name: str) -> None:
803807
"""Helper function to clean up a PostgreSQL user and all their dependencies."""
804808
try:
805-
if self.dialect == "postgres":
809+
if self.dialect in ["postgres", "redshift"]:
806810
self.engine_adapter.execute(f"""
807811
SELECT pg_terminate_backend(pid)
808812
FROM pg_stat_activity

tests/core/engine_adapter/test_redshift.py

Lines changed: 151 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from sqlglot import parse_one
1010

1111
from sqlmesh.core.engine_adapter import RedshiftEngineAdapter
12-
from sqlmesh.core.engine_adapter.shared import DataObject
12+
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType
1313
from sqlmesh.utils.errors import SQLMeshError
1414
from tests.core.engine_adapter import to_sql_calls
1515

@@ -83,6 +83,156 @@ def test_varchar_size_workaround(make_mocked_engine_adapter: t.Callable, mocker:
8383
]
8484

8585

86+
def test_sync_grants_config(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture):
87+
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
88+
relation = exp.to_table("test_schema.test_table", dialect="redshift")
89+
new_grants_config = {"SELECT": ["user1", "user2"], "INSERT": ["user3"]}
90+
91+
current_grants = [("SELECT", "old_user"), ("UPDATE", "legacy_user")]
92+
fetchall_mock = mocker.patch.object(adapter, "fetchall", return_value=current_grants)
93+
94+
adapter.sync_grants_config(relation, new_grants_config)
95+
96+
fetchall_mock.assert_called_once()
97+
executed_query = fetchall_mock.call_args[0][0]
98+
executed_sql = executed_query.sql(dialect="redshift")
99+
expected_sql = (
100+
"SELECT privilege_type, grantee FROM information_schema.table_privileges "
101+
"WHERE table_schema = 'test_schema' AND table_name = 'test_table' "
102+
"AND grantor = CURRENT_USER AND grantee <> CURRENT_USER"
103+
)
104+
assert executed_sql == expected_sql
105+
106+
sql_calls = to_sql_calls(adapter)
107+
assert len(sql_calls) == 4
108+
assert 'REVOKE SELECT ON TABLE "test_schema"."test_table" FROM old_user' in sql_calls
109+
assert 'REVOKE UPDATE ON TABLE "test_schema"."test_table" FROM legacy_user' in sql_calls
110+
assert 'GRANT SELECT ON TABLE "test_schema"."test_table" TO user1, user2' in sql_calls
111+
assert 'GRANT INSERT ON TABLE "test_schema"."test_table" TO user3' in sql_calls
112+
113+
114+
def test_sync_grants_config_with_overlaps(
115+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
116+
):
117+
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
118+
relation = exp.to_table("test_schema.test_table", dialect="redshift")
119+
new_grants_config = {
120+
"SELECT": ["user_shared", "user_new"],
121+
"INSERT": ["user_shared", "user_writer"],
122+
}
123+
124+
current_grants = [
125+
("SELECT", "user_shared"),
126+
("SELECT", "user_legacy"),
127+
("INSERT", "user_shared"),
128+
]
129+
fetchall_mock = mocker.patch.object(adapter, "fetchall", return_value=current_grants)
130+
131+
adapter.sync_grants_config(relation, new_grants_config)
132+
133+
fetchall_mock.assert_called_once()
134+
executed_query = fetchall_mock.call_args[0][0]
135+
executed_sql = executed_query.sql(dialect="redshift")
136+
expected_sql = (
137+
"SELECT privilege_type, grantee FROM information_schema.table_privileges "
138+
"WHERE table_schema = 'test_schema' AND table_name = 'test_table' "
139+
"AND grantor = CURRENT_USER AND grantee <> CURRENT_USER"
140+
)
141+
assert executed_sql == expected_sql
142+
143+
sql_calls = to_sql_calls(adapter)
144+
assert len(sql_calls) == 3
145+
assert 'REVOKE SELECT ON TABLE "test_schema"."test_table" FROM user_legacy' in sql_calls
146+
assert 'GRANT SELECT ON TABLE "test_schema"."test_table" TO user_new' in sql_calls
147+
assert 'GRANT INSERT ON TABLE "test_schema"."test_table" TO user_writer' in sql_calls
148+
149+
150+
@pytest.mark.parametrize(
151+
"table_type, expected_keyword",
152+
[
153+
(DataObjectType.TABLE, "TABLE"),
154+
(DataObjectType.VIEW, "VIEW"),
155+
(DataObjectType.MATERIALIZED_VIEW, "MATERIALIZED VIEW"),
156+
],
157+
)
158+
def test_sync_grants_config_object_kind(
159+
make_mocked_engine_adapter: t.Callable,
160+
mocker: MockerFixture,
161+
table_type: DataObjectType,
162+
expected_keyword: str,
163+
) -> None:
164+
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
165+
relation = exp.to_table("test_schema.test_object", dialect="redshift")
166+
167+
mocker.patch.object(adapter, "fetchall", return_value=[])
168+
169+
adapter.sync_grants_config(relation, {"SELECT": ["user_test"]}, table_type)
170+
171+
sql_calls = to_sql_calls(adapter)
172+
assert sql_calls == [
173+
f'GRANT SELECT ON {expected_keyword} "test_schema"."test_object" TO user_test'
174+
]
175+
176+
177+
def test_sync_grants_config_quotes(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture):
178+
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
179+
relation = exp.to_table('"TestSchema"."TestTable"', dialect="redshift")
180+
new_grants_config = {"SELECT": ["user1", "user2"], "INSERT": ["user3"]}
181+
182+
current_grants = [("SELECT", "user_old"), ("UPDATE", "user_legacy")]
183+
fetchall_mock = mocker.patch.object(adapter, "fetchall", return_value=current_grants)
184+
185+
adapter.sync_grants_config(relation, new_grants_config)
186+
187+
fetchall_mock.assert_called_once()
188+
executed_query = fetchall_mock.call_args[0][0]
189+
executed_sql = executed_query.sql(dialect="redshift")
190+
expected_sql = (
191+
"SELECT privilege_type, grantee FROM information_schema.table_privileges "
192+
"WHERE table_schema = 'TestSchema' AND table_name = 'TestTable' "
193+
"AND grantor = CURRENT_USER AND grantee <> CURRENT_USER"
194+
)
195+
assert executed_sql == expected_sql
196+
197+
sql_calls = to_sql_calls(adapter)
198+
assert len(sql_calls) == 4
199+
assert 'REVOKE SELECT ON TABLE "TestSchema"."TestTable" FROM user_old' in sql_calls
200+
assert 'REVOKE UPDATE ON TABLE "TestSchema"."TestTable" FROM user_legacy' in sql_calls
201+
assert 'GRANT SELECT ON TABLE "TestSchema"."TestTable" TO user1, user2' in sql_calls
202+
assert 'GRANT INSERT ON TABLE "TestSchema"."TestTable" TO user3' in sql_calls
203+
204+
205+
def test_sync_grants_config_no_schema(
206+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
207+
):
208+
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
209+
relation = exp.to_table("test_table", dialect="redshift")
210+
new_grants_config = {"SELECT": ["user1"], "INSERT": ["user2"]}
211+
212+
current_grants = [("UPDATE", "user_old")]
213+
fetchall_mock = mocker.patch.object(adapter, "fetchall", return_value=current_grants)
214+
get_schema_mock = mocker.patch.object(adapter, "get_current_schema", return_value="public")
215+
216+
adapter.sync_grants_config(relation, new_grants_config)
217+
218+
get_schema_mock.assert_called_once()
219+
220+
executed_query = fetchall_mock.call_args[0][0]
221+
executed_sql = executed_query.sql(dialect="redshift")
222+
expected_sql = (
223+
"SELECT privilege_type, grantee FROM information_schema.table_privileges "
224+
"WHERE table_schema = 'public' AND table_name = 'test_table' "
225+
"AND grantor = CURRENT_USER AND grantee <> CURRENT_USER"
226+
)
227+
assert executed_sql == expected_sql
228+
229+
sql_calls = to_sql_calls(adapter)
230+
assert len(sql_calls) == 3
231+
assert 'REVOKE UPDATE ON TABLE "test_table" FROM user_old' in sql_calls
232+
assert 'GRANT SELECT ON TABLE "test_table" TO user1' in sql_calls
233+
assert 'GRANT INSERT ON TABLE "test_table" TO user2' in sql_calls
234+
235+
86236
def test_create_table_from_query_exists_no_if_not_exists(
87237
adapter: t.Callable, mocker: MockerFixture
88238
):

0 commit comments

Comments
 (0)